Skip to content

Commit 564ccf8

Browse files
committed
Multy-repository transactions support
1 parent 5572e5e commit 564ccf8

File tree

10 files changed

+379
-0
lines changed

10 files changed

+379
-0
lines changed

api/src/main/kotlin/org/taymyr/play/repository/domain/Repository.kt

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,15 @@ interface Repository<Aggregate, Identity> {
5454
*/
5555
fun remove(aggregate: Aggregate): CompletionStage<Done>
5656

57+
/**
58+
* Removing aggregate from the repository within multy-repository transaction.
59+
* @param aggregate Aggregate.
60+
* @param transaction Transaction
61+
* @return [Done] if removing successfully. Otherwise will throw an exception.
62+
* @throws Exception Any exceptions while execute a query on the database will wrapped.
63+
*/
64+
fun remove(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done>
65+
5766
/**
5867
* Removing aggregates from the repository.
5968
* @param aggregates List of aggregates.
@@ -62,6 +71,15 @@ interface Repository<Aggregate, Identity> {
6271
*/
6372
fun removeAll(aggregates: Collection<Aggregate>): CompletionStage<Done>
6473

74+
/**
75+
* Removing aggregates from the repository within multy-repository transaction.
76+
* @param aggregates List of aggregates.
77+
* @param transaction Transaction
78+
* @return [Done] if removing successfully. Otherwise will throw an exception.
79+
* @throws Exception Any exceptions while execute a query on the database will wrapped.
80+
*/
81+
fun removeAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done>
82+
6583
/**
6684
* Create aggregate on the repository.
6785
* @param aggregate Aggregate.
@@ -70,6 +88,15 @@ interface Repository<Aggregate, Identity> {
7088
*/
7189
fun create(aggregate: Aggregate): CompletionStage<Done>
7290

91+
/**
92+
* Create aggregate on the repository within multy-repository transaction.
93+
* @param aggregate Aggregate.
94+
* @param transaction Transaction
95+
* @return [Done] if creation successfully. Otherwise will throw an exception.
96+
* @throws Exception Any exceptions while execute a query on the database will wrapped.
97+
*/
98+
fun create(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done>
99+
73100
/**
74101
* Create aggregates on the repository.
75102
* @param aggregates Aggregates.
@@ -78,6 +105,15 @@ interface Repository<Aggregate, Identity> {
78105
*/
79106
fun createAll(aggregates: Collection<Aggregate>): CompletionStage<Done>
80107

108+
/**
109+
* Create aggregates on the repository within multy-repository transaction.
110+
* @param aggregates Aggregates.
111+
* @param transaction Transaction
112+
* @return [Done] if creation successfully. Otherwise will throw an exception.
113+
* @throws Exception Any exceptions while execute a query on the database will wrapped.
114+
*/
115+
fun createAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done>
116+
81117
/**
82118
* Saving aggregate on the repository.
83119
* @param aggregate Aggregate.
@@ -86,11 +122,34 @@ interface Repository<Aggregate, Identity> {
86122
*/
87123
fun save(aggregate: Aggregate): CompletionStage<Done>
88124

125+
/**
126+
* Saving aggregate on the repository within multy-repository transaction.
127+
* @param aggregate Aggregate.
128+
* @param transaction Transaction
129+
* @return [Done] if saving successfully. Otherwise will throw an exception.
130+
* @throws Exception Any exceptions while execute a query on the database will wrapped.
131+
*/
132+
fun save(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done>
133+
89134
/**
90135
* Saving aggregates on the repository.
91136
* @param aggregates Aggregates.
92137
* @return [Done] if saving successfully. Otherwise will throw an exception.
93138
* @throws Exception Any exceptions while execute a query on the database will wrapped.
94139
*/
95140
fun saveAll(aggregates: Collection<Aggregate>): CompletionStage<Done>
141+
142+
/**
143+
* Saving aggregates on the repository within multy-repository transaction.
144+
* @param aggregates Aggregates.
145+
* @param transaction Transaction
146+
* @return [Done] if saving successfully. Otherwise will throw an exception.
147+
* @throws Exception Any exceptions while execute a query on the database will wrapped.
148+
*/
149+
fun saveAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done>
150+
151+
/**
152+
* Create new multy-repository transaction.
153+
*/
154+
fun createTransaction(): Transaction
96155
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.taymyr.play.repository.domain
2+
3+
import akka.Done
4+
import java.util.concurrent.CompletionStage
5+
6+
/**
7+
* DDD repository transaction
8+
*/
9+
interface Transaction {
10+
11+
/**
12+
* Commits transaction.
13+
*/
14+
fun commit(): CompletionStage<Done>
15+
}

jpa/src/main/kotlin/org/taymyr/play/repository/infrastructure/persistence/JPARepository.kt

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package org.taymyr.play.repository.infrastructure.persistence
33
import akka.Done
44
import org.hibernate.Session
55
import org.taymyr.play.repository.domain.Repository
6+
import org.taymyr.play.repository.domain.Transaction
67
import play.db.jpa.JPAApi
78
import java.io.Serializable
9+
import java.lang.IllegalArgumentException
810
import java.util.Optional
911
import java.util.Optional.ofNullable
1012
import java.util.concurrent.CompletableFuture.supplyAsync
@@ -59,6 +61,11 @@ abstract class JPARepository<Aggregate : Any, Identity : Serializable> @JvmOverl
5961
Done.getInstance()
6062
}
6163

64+
override fun remove(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done> {
65+
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
66+
return transaction.remove(this, aggregate)
67+
}
68+
6269
override fun removeAll(aggregates: Collection<Aggregate>): CompletionStage<Done> = execute { em ->
6370
aggregates.forEach {
6471
if (em.contains(it)) em.remove(it)
@@ -67,23 +74,50 @@ abstract class JPARepository<Aggregate : Any, Identity : Serializable> @JvmOverl
6774
Done.getInstance()
6875
}
6976

77+
override fun removeAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done> {
78+
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
79+
return transaction.remove(this, aggregates)
80+
}
81+
7082
override fun create(aggregate: Aggregate): CompletionStage<Done> = execute { em ->
7183
em.persist(aggregate)
7284
Done.getInstance()
7385
}
7486

87+
override fun create(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done> {
88+
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
89+
return transaction.create(this, aggregate)
90+
}
91+
7592
override fun createAll(aggregates: Collection<Aggregate>): CompletionStage<Done> = execute { em ->
7693
aggregates.forEach { em.persist(it) }
7794
Done.getInstance()
7895
}
7996

97+
override fun createAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done> {
98+
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
99+
return transaction.create(this, aggregates)
100+
}
101+
80102
override fun save(aggregate: Aggregate): CompletionStage<Done> = execute { em ->
81103
em.merge(aggregate)
82104
Done.getInstance()
83105
}
84106

107+
override fun save(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done> {
108+
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
109+
return transaction.save(this, aggregate)
110+
}
111+
85112
override fun saveAll(aggregates: Collection<Aggregate>): CompletionStage<Done> = execute { em ->
86113
aggregates.forEach { em.merge(it) }
87114
Done.getInstance()
88115
}
116+
117+
override fun saveAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done> {
118+
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
119+
return transaction.save(this, aggregates)
120+
}
121+
122+
override fun createTransaction(): Transaction = JPATransaction(jpaApi, executionContext, persistenceUnitName)
89123
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package org.taymyr.play.repository.infrastructure.persistence
2+
3+
import akka.Done
4+
import org.taymyr.play.repository.domain.Transaction
5+
import play.db.jpa.JPAApi
6+
import java.util.concurrent.CompletableFuture
7+
import java.util.concurrent.CompletionStage
8+
import java.util.function.Supplier
9+
import javax.persistence.EntityManager
10+
11+
class JPATransaction(
12+
protected val jpaApi: JPAApi,
13+
protected val executionContext: DatabaseExecutionContext,
14+
protected val persistenceUnitName: String = "default"
15+
) : Transaction {
16+
17+
private val oparetionsLog: MutableList<Operation<*>> = mutableListOf()
18+
19+
/**
20+
* Saves to transaction log remove operation for repository and aggregate.
21+
*
22+
* @param repository JPA repository
23+
* @param aggregate removed aggregate
24+
*/
25+
fun <AGGREGATE : Any> remove(repository: JPARepository<AGGREGATE, *>, aggregate: AGGREGATE): CompletionStage<Done> {
26+
oparetionsLog.add(Remove(repository, listOf(aggregate)))
27+
return CompletableFuture.completedFuture(Done.getInstance())
28+
}
29+
30+
/**
31+
* Saves to transaction log remove operation for repository and aggregates.
32+
*
33+
* @param repository JPA repository
34+
* @param aggregate removed aggregates
35+
*/
36+
fun <AGGREGATE : Any> remove(repository: JPARepository<AGGREGATE, *>, aggregates: Collection<AGGREGATE>): CompletionStage<Done> {
37+
oparetionsLog.add(Remove(repository, aggregates))
38+
return CompletableFuture.completedFuture(Done.getInstance())
39+
}
40+
41+
/**
42+
* Saves to transaction log create operation for repository and aggregate.
43+
*
44+
* @param repository JPA repository
45+
* @param aggregate created aggregate
46+
*/
47+
fun <AGGREGATE : Any> create(repository: JPARepository<AGGREGATE, *>, aggregate: AGGREGATE): CompletionStage<Done> {
48+
oparetionsLog.add(Create(repository, listOf(aggregate)))
49+
return CompletableFuture.completedFuture(Done.getInstance())
50+
}
51+
52+
/**
53+
* Saves to transaction log create operation for repository and aggregates.
54+
*
55+
* @param repository JPA repository
56+
* @param aggregate created aggregates
57+
*/
58+
fun <AGGREGATE : Any> create(repository: JPARepository<AGGREGATE, *>, aggregates: Collection<AGGREGATE>): CompletionStage<Done> {
59+
oparetionsLog.add(Create(repository, aggregates))
60+
return CompletableFuture.completedFuture(Done.getInstance())
61+
}
62+
63+
/**
64+
* Saves to transaction log save operation for repository and aggregate.
65+
*
66+
* @param repository JPA repository
67+
* @param aggregate saved aggregate
68+
*/
69+
fun <AGGREGATE : Any> save(repository: JPARepository<AGGREGATE, *>, aggregate: AGGREGATE): CompletionStage<Done> {
70+
oparetionsLog.add(Save(repository, listOf(aggregate)))
71+
return CompletableFuture.completedFuture(Done.getInstance())
72+
}
73+
74+
/**
75+
* Saves to transaction log save operation for repository and aggregates.
76+
*
77+
* @param repository JPA repository
78+
* @param aggregate saved aggregates
79+
*/
80+
fun <AGGREGATE : Any> save(repository: JPARepository<AGGREGATE, *>, aggregates: Collection<AGGREGATE>): CompletionStage<Done> {
81+
oparetionsLog.add(Save(repository, aggregates))
82+
return CompletableFuture.completedFuture(Done.getInstance())
83+
}
84+
85+
override fun commit(): CompletionStage<Done> = execute { em ->
86+
oparetionsLog.forEach { it.process(em) }
87+
Done.getInstance()
88+
}
89+
90+
protected fun <E> transaction(function: (EntityManager) -> E): E = jpaApi.withTransaction(persistenceUnitName, function)
91+
92+
protected fun <E> execute(function: (EntityManager) -> E): CompletionStage<E> =
93+
CompletableFuture.supplyAsync(Supplier { transaction(function) }, executionContext)
94+
95+
private abstract class Operation<AGGREGATE : Any>(open val repository: JPARepository<AGGREGATE, *>, open val aggregates: Collection<AGGREGATE>) {
96+
abstract fun process(em: EntityManager)
97+
}
98+
99+
private data class Remove<AGGREGATE : Any>(override val repository: JPARepository<AGGREGATE, *>, override val aggregates: Collection<AGGREGATE>) : Operation<AGGREGATE>(repository, aggregates) {
100+
override fun process(em: EntityManager) {
101+
aggregates.forEach {
102+
if (em.contains(it)) em.remove(it)
103+
else em.remove(em.merge(it))
104+
}
105+
}
106+
}
107+
108+
private data class Create<AGGREGATE : Any>(override val repository: JPARepository<AGGREGATE, *>, override val aggregates: Collection<AGGREGATE>) : Operation<AGGREGATE>(repository, aggregates) {
109+
override fun process(em: EntityManager) {
110+
aggregates.forEach { em.persist(it) }
111+
}
112+
}
113+
114+
private data class Save<AGGREGATE : Any>(override val repository: JPARepository<AGGREGATE, *>, override val aggregates: Collection<AGGREGATE>) : Operation<AGGREGATE>(repository, aggregates) {
115+
override fun process(em: EntityManager) {
116+
aggregates.forEach { em.merge(it) }
117+
}
118+
}
119+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package org.taymyr.play.repository.domain
2+
3+
interface Address {
4+
val id: String
5+
val zip: String?
6+
val city: String
7+
val street: String
8+
val user: User
9+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package org.taymyr.play.repository.domain
2+
3+
interface AddressRepository : Repository<Address, String>
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.taymyr.play.repository.infrastructure.persistence
2+
3+
import org.taymyr.play.repository.domain.Address
4+
import javax.persistence.Entity
5+
import javax.persistence.FetchType
6+
import javax.persistence.Id
7+
import javax.persistence.JoinColumn
8+
import javax.persistence.ManyToOne
9+
import javax.persistence.Table
10+
11+
@Entity
12+
@Table(name = "ADDRESS")
13+
data class AddressImpl(
14+
15+
@Id override val id: String,
16+
17+
override val zip: String?,
18+
19+
override val city: String,
20+
21+
override val street: String,
22+
23+
@ManyToOne(fetch = FetchType.EAGER)
24+
@JoinColumn(name = "user_id")
25+
override val user: UserImpl
26+
) : Address
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.taymyr.play.repository.infrastructure.persistence
2+
3+
import org.taymyr.play.repository.domain.Address
4+
import org.taymyr.play.repository.domain.AddressRepository
5+
import play.db.jpa.JPAApi
6+
import java.util.UUID
7+
import javax.inject.Inject
8+
9+
class AddressRepositoryImpl @Inject constructor(
10+
jpaApi: JPAApi,
11+
executionContext: DatabaseExecutionContext
12+
) : JPARepository<Address, String>(jpaApi, executionContext, AddressImpl::class.java), AddressRepository {
13+
14+
override fun nextIdentity(): String = UUID.randomUUID().toString()
15+
}

0 commit comments

Comments
 (0)