Skip to content

Commit fb0a072

Browse files
committed
Adds a predicate to DataLoaderRegistry - added tests
1 parent 1151ccb commit fb0a072

File tree

4 files changed

+233
-20
lines changed

4 files changed

+233
-20
lines changed

src/main/java/org/dataloader/DataLoaderRegistry.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import java.util.function.Function;
1616

1717
/**
18-
* This allows data loaders to be registered together into a single place so
18+
* This allows data loaders to be registered together into a single place, so
1919
* they can be dispatched as one. It also allows you to retrieve data loaders by
2020
* name from a central place
2121
*/
@@ -32,6 +32,7 @@ public DataLoaderRegistry() {
3232

3333
protected DataLoaderRegistry(Builder<?> builder) {
3434
this.dataLoaders.putAll(builder.dataLoaders);
35+
this.dataLoaderPredicates.putAll(builder.dataLoaderPredicates);
3536
this.dispatchPredicate = builder.dispatchPredicate;
3637
}
3738

@@ -116,6 +117,20 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) {
116117
return new LinkedHashMap<>(dataLoaders);
117118
}
118119

120+
/**
121+
* @return the current dispatch predicate
122+
*/
123+
public DispatchPredicate getDispatchPredicate() {
124+
return dispatchPredicate;
125+
}
126+
127+
/**
128+
* @return a map of data loaders to specific dispatch predicates
129+
*/
130+
public Map<DataLoader<?, ?>, DispatchPredicate> getDataLoaderPredicates() {
131+
return new LinkedHashMap<>(dataLoaderPredicates);
132+
}
133+
119134
/**
120135
* This will unregister a new dataloader
121136
*
@@ -153,7 +168,7 @@ public Set<String> getKeys() {
153168
}
154169

155170
/**
156-
* This will called {@link org.dataloader.DataLoader#dispatch()} on each of the registered
171+
* This will be called {@link org.dataloader.DataLoader#dispatch()} on each of the registered
157172
* {@link org.dataloader.DataLoader}s
158173
*/
159174
public void dispatchAll() {
@@ -183,38 +198,27 @@ public int dispatchAllWithCount() {
183198
* {@link org.dataloader.DataLoader}s
184199
*/
185200
public int dispatchDepth() {
186-
int totalDispatchDepth = 0;
187-
for (Map.Entry<String, DataLoader<?, ?>> entry : dataLoaders.entrySet()) {
188-
DataLoader<?, ?> dataLoader = entry.getValue();
189-
String key = entry.getKey();
190-
if (shouldDispatch(key, dataLoader)) {
191-
totalDispatchDepth += dataLoader.dispatchDepth();
192-
}
193-
}
194-
return totalDispatchDepth;
201+
return dataLoaders.values().stream().mapToInt(DataLoader::dispatchDepth).sum();
195202
}
196203

197204
/**
198205
* This will immediately dispatch the {@link DataLoader}s in the registry
199-
* without testing the predicate
206+
* without testing the predicates
200207
*/
201208
public void dispatchAllImmediately() {
202209
dispatchAllWithCountImmediately();
203210
}
204211

205212
/**
206213
* This will immediately dispatch the {@link DataLoader}s in the registry
207-
* without testing the predicate
214+
* without testing the predicates
208215
*
209216
* @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s.
210217
*/
211218
public int dispatchAllWithCountImmediately() {
212-
int sum = 0;
213-
for (Map.Entry<String, DataLoader<?, ?>> entry : dataLoaders.entrySet()) {
214-
DataLoader<?, ?> dataLoader = entry.getValue();
215-
sum += dataLoader.dispatchWithCounts().getKeysCount();
216-
}
217-
return sum;
219+
return dataLoaders.values().stream()
220+
.mapToInt(dataLoader -> dataLoader.dispatchWithCounts().getKeysCount())
221+
.sum();
218222
}
219223

220224

src/main/java/org/dataloader/registries/DispatchPredicate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public interface DispatchPredicate {
1818
/**
1919
* A predicate that always returns false
2020
*/
21-
DispatchPredicate DISPATCH_NEVER = (dataLoaderKey, dataLoader) -> true;
21+
DispatchPredicate DISPATCH_NEVER = (dataLoaderKey, dataLoader) -> false;
2222

2323
/**
2424
* This predicate tests whether the data loader should be dispatched or not.
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
package org.dataloader;
2+
3+
import org.dataloader.registries.DispatchPredicate;
4+
import org.junit.Test;
5+
6+
import java.util.concurrent.CompletableFuture;
7+
8+
import static java.util.Arrays.asList;
9+
import static org.dataloader.DataLoaderFactory.newDataLoader;
10+
import static org.dataloader.fixtures.TestKit.asSet;
11+
import static org.dataloader.registries.DispatchPredicate.DISPATCH_NEVER;
12+
import static org.hamcrest.Matchers.equalTo;
13+
import static org.junit.Assert.assertThat;
14+
15+
public class DataLoaderRegistryPredicateTest {
16+
final BatchLoader<Object, Object> identityBatchLoader = CompletableFuture::completedFuture;
17+
18+
static class CountingDispatchPredicate implements DispatchPredicate {
19+
int count = 0;
20+
int max = 0;
21+
22+
public CountingDispatchPredicate(int max) {
23+
this.max = max;
24+
}
25+
26+
@Override
27+
public boolean test(String dataLoaderKey, DataLoader<?, ?> dataLoader) {
28+
boolean shouldFire = count >= max;
29+
count++;
30+
return shouldFire;
31+
}
32+
}
33+
34+
@Test
35+
public void predicate_registration_works() {
36+
DataLoader<Object, Object> dlA = newDataLoader(identityBatchLoader);
37+
DataLoader<Object, Object> dlB = newDataLoader(identityBatchLoader);
38+
DataLoader<Object, Object> dlC = newDataLoader(identityBatchLoader);
39+
40+
DispatchPredicate predicateA = new CountingDispatchPredicate(1);
41+
DispatchPredicate predicateB = new CountingDispatchPredicate(2);
42+
DispatchPredicate predicateC = new CountingDispatchPredicate(3);
43+
44+
DispatchPredicate predicateOverAll = new CountingDispatchPredicate(10);
45+
46+
DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
47+
.register("a", dlA, predicateA)
48+
.register("b", dlB, predicateB)
49+
.register("c", dlC, predicateC)
50+
.dispatchPredicate(predicateOverAll)
51+
.build();
52+
53+
assertThat(registry.getDataLoaders(), equalTo(asList(dlA, dlB, dlC)));
54+
assertThat(registry.getDataLoadersMap().keySet(), equalTo(asSet("a", "b", "c")));
55+
assertThat(asSet(registry.getDataLoadersMap().values()), equalTo(asSet(dlA, dlB, dlC)));
56+
assertThat(registry.getDispatchPredicate(), equalTo(predicateOverAll));
57+
assertThat(asSet(registry.getDataLoaderPredicates().values()), equalTo(asSet(predicateA, predicateB, predicateC)));
58+
59+
// and unregister (fluently)
60+
DataLoaderRegistry dlR = registry.unregister("c");
61+
assertThat(dlR, equalTo(registry));
62+
63+
assertThat(registry.getDataLoaders(), equalTo(asList(dlA, dlB)));
64+
assertThat(registry.getDispatchPredicate(), equalTo(predicateOverAll));
65+
assertThat(asSet(registry.getDataLoaderPredicates().values()), equalTo(asSet(predicateA, predicateB)));
66+
67+
// direct on the registry works
68+
registry.register("c", dlC, predicateC);
69+
assertThat(registry.getDataLoaders(), equalTo(asList(dlA, dlB, dlC)));
70+
assertThat(registry.getDispatchPredicate(), equalTo(predicateOverAll));
71+
assertThat(asSet(registry.getDataLoaderPredicates().values()), equalTo(asSet(predicateA, predicateB, predicateC)));
72+
73+
}
74+
75+
@Test
76+
public void predicate_firing_works() {
77+
DataLoader<Object, Object> dlA = newDataLoader(identityBatchLoader);
78+
DataLoader<Object, Object> dlB = newDataLoader(identityBatchLoader);
79+
DataLoader<Object, Object> dlC = newDataLoader(identityBatchLoader);
80+
81+
DispatchPredicate predicateA = new CountingDispatchPredicate(1);
82+
DispatchPredicate predicateB = new CountingDispatchPredicate(2);
83+
DispatchPredicate predicateC = new CountingDispatchPredicate(3);
84+
85+
DispatchPredicate predicateOverAll = new CountingDispatchPredicate(10);
86+
87+
DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
88+
.register("a", dlA, predicateA)
89+
.register("b", dlB, predicateB)
90+
.register("c", dlC, predicateC)
91+
.dispatchPredicate(predicateOverAll)
92+
.build();
93+
94+
95+
CompletableFuture<Object> cfA = dlA.load("A");
96+
CompletableFuture<Object> cfB = dlB.load("B");
97+
CompletableFuture<Object> cfC = dlC.load("C");
98+
99+
int count = registry.dispatchAllWithCount(); // first firing
100+
// none should fire
101+
assertThat(count, equalTo(0));
102+
assertThat(cfA.isDone(), equalTo(false));
103+
assertThat(cfB.isDone(), equalTo(false));
104+
assertThat(cfC.isDone(), equalTo(false));
105+
106+
count = registry.dispatchAllWithCount(); // second firing
107+
// one should fire
108+
assertThat(count, equalTo(1));
109+
assertThat(cfA.isDone(), equalTo(true));
110+
assertThat(cfA.join(), equalTo("A"));
111+
112+
assertThat(cfB.isDone(), equalTo(false));
113+
assertThat(cfC.isDone(), equalTo(false));
114+
115+
count = registry.dispatchAllWithCount(); // third firing
116+
assertThat(count, equalTo(1));
117+
assertThat(cfA.isDone(), equalTo(true));
118+
assertThat(cfB.isDone(), equalTo(true));
119+
assertThat(cfB.join(), equalTo("B"));
120+
assertThat(cfC.isDone(), equalTo(false));
121+
122+
count = registry.dispatchAllWithCount(); // fourth firing
123+
assertThat(count, equalTo(1));
124+
assertThat(cfA.isDone(), equalTo(true));
125+
assertThat(cfB.isDone(), equalTo(true));
126+
assertThat(cfC.isDone(), equalTo(true));
127+
assertThat(cfC.join(), equalTo("C"));
128+
}
129+
130+
@Test
131+
public void test_the_registry_overall_predicate_firing_works() {
132+
DataLoader<Object, Object> dlA = newDataLoader(identityBatchLoader);
133+
DataLoader<Object, Object> dlB = newDataLoader(identityBatchLoader);
134+
DataLoader<Object, Object> dlC = newDataLoader(identityBatchLoader);
135+
136+
DispatchPredicate predicateOverAllOnThree = new CountingDispatchPredicate(3);
137+
138+
DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
139+
.register("a", dlA, DISPATCH_NEVER)
140+
.register("b", dlB, DISPATCH_NEVER)
141+
.register("c", dlC, DISPATCH_NEVER)
142+
.dispatchPredicate(predicateOverAllOnThree)
143+
.build();
144+
145+
146+
CompletableFuture<Object> cfA = dlA.load("A");
147+
CompletableFuture<Object> cfB = dlB.load("B");
148+
CompletableFuture<Object> cfC = dlC.load("C");
149+
150+
int count = registry.dispatchAllWithCount(); // first firing
151+
// none should fire
152+
assertThat(count, equalTo(0));
153+
assertThat(cfA.isDone(), equalTo(false));
154+
assertThat(cfB.isDone(), equalTo(false));
155+
assertThat(cfC.isDone(), equalTo(false));
156+
157+
count = registry.dispatchAllWithCount(); // second firing but the overall been asked 3 times already
158+
assertThat(count, equalTo(3));
159+
assertThat(cfA.isDone(), equalTo(true));
160+
assertThat(cfB.isDone(), equalTo(true));
161+
assertThat(cfC.isDone(), equalTo(true));
162+
}
163+
164+
@Test
165+
public void dispatch_immediate_firing_works() {
166+
DataLoader<Object, Object> dlA = newDataLoader(identityBatchLoader);
167+
DataLoader<Object, Object> dlB = newDataLoader(identityBatchLoader);
168+
DataLoader<Object, Object> dlC = newDataLoader(identityBatchLoader);
169+
170+
DispatchPredicate predicateA = new CountingDispatchPredicate(1);
171+
DispatchPredicate predicateB = new CountingDispatchPredicate(2);
172+
DispatchPredicate predicateC = new CountingDispatchPredicate(3);
173+
174+
DispatchPredicate predicateOverAll = new CountingDispatchPredicate(10);
175+
176+
DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
177+
.register("a", dlA, predicateA)
178+
.register("b", dlB, predicateB)
179+
.register("c", dlC, predicateC)
180+
.dispatchPredicate(predicateOverAll)
181+
.build();
182+
183+
184+
CompletableFuture<Object> cfA = dlA.load("A");
185+
CompletableFuture<Object> cfB = dlB.load("B");
186+
CompletableFuture<Object> cfC = dlC.load("C");
187+
188+
int count = registry.dispatchAllWithCountImmediately(); // all should fire
189+
assertThat(count, equalTo(3));
190+
assertThat(cfA.isDone(), equalTo(true));
191+
assertThat(cfA.join(), equalTo("A"));
192+
assertThat(cfB.isDone(), equalTo(true));
193+
assertThat(cfB.join(), equalTo("B"));
194+
assertThat(cfC.isDone(), equalTo(true));
195+
assertThat(cfC.join(), equalTo("C"));
196+
}
197+
198+
}

src/test/java/org/dataloader/fixtures/TestKit.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@
66
import org.dataloader.DataLoaderOptions;
77

88
import java.util.ArrayList;
9+
import java.util.Arrays;
910
import java.util.Collection;
11+
import java.util.LinkedHashSet;
1012
import java.util.List;
13+
import java.util.Set;
1114
import java.util.concurrent.CompletableFuture;
1215

1316
import static java.util.stream.Collectors.toList;
@@ -67,4 +70,12 @@ public static void snooze(int millis) {
6770
public static <T> List<T> sort(Collection<? extends T> collection) {
6871
return collection.stream().sorted().collect(toList());
6972
}
73+
74+
public static <T> Set<T> asSet(T... elements) {
75+
return new LinkedHashSet<>(Arrays.asList(elements));
76+
}
77+
78+
public static <T> Set<T> asSet(Collection<T> elements) {
79+
return new LinkedHashSet<>(elements);
80+
}
7081
}

0 commit comments

Comments
 (0)