Skip to content

Commit 8349cbd

Browse files
committed
Split dataloader into a helper implementation for most of the GUTS of the code
1 parent ea8a429 commit 8349cbd

File tree

2 files changed

+268
-215
lines changed

2 files changed

+268
-215
lines changed

src/main/java/org/dataloader/DataLoader.java

Lines changed: 6 additions & 215 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,10 @@
2222

2323
import java.util.AbstractMap.SimpleImmutableEntry;
2424
import java.util.ArrayList;
25-
import java.util.Collection;
2625
import java.util.List;
27-
import java.util.Map;
2826
import java.util.concurrent.CompletableFuture;
29-
import java.util.concurrent.CompletionStage;
3027
import java.util.stream.Collectors;
3128

32-
import static java.util.Collections.emptyList;
33-
import static java.util.Collections.singletonList;
34-
import static org.dataloader.impl.Assertions.assertState;
3529
import static org.dataloader.impl.Assertions.nonNull;
3630

3731
/**
@@ -63,7 +57,7 @@
6357
*/
6458
public class DataLoader<K, V> {
6559

66-
private final Object batchLoadFunction;
60+
private final DataLoaderHelper<K,V> helper;
6761
private final DataLoaderOptions loaderOptions;
6862
private final CacheMap<Object, CompletableFuture<V>> futureCache;
6963
private final List<SimpleImmutableEntry<K, CompletableFuture<V>>> loaderQueue;
@@ -352,12 +346,13 @@ public DataLoader(BatchLoader<K, V> batchLoadFunction, DataLoaderOptions options
352346
}
353347

354348
private DataLoader(Object batchLoadFunction, DataLoaderOptions options) {
355-
this.batchLoadFunction = nonNull(batchLoadFunction);
356349
this.loaderOptions = options == null ? new DataLoaderOptions() : options;
357350
this.futureCache = determineCacheMap(loaderOptions);
358351
// order of keys matter in data loader
359352
this.loaderQueue = new ArrayList<>();
360353
this.stats = nonNull(this.loaderOptions.getStatisticsCollector());
354+
355+
this.helper = new DataLoaderHelper<>(this, batchLoadFunction,this.loaderOptions, futureCache, loaderQueue, stats);
361356
}
362357

363358
@SuppressWarnings("unchecked")
@@ -377,33 +372,7 @@ private CacheMap<Object, CompletableFuture<V>> determineCacheMap(DataLoaderOptio
377372
* @return the future of the value
378373
*/
379374
public CompletableFuture<V> load(K key) {
380-
synchronized (this) {
381-
Object cacheKey = getCacheKey(nonNull(key));
382-
stats.incrementLoadCount();
383-
384-
boolean batchingEnabled = loaderOptions.batchingEnabled();
385-
boolean cachingEnabled = loaderOptions.cachingEnabled();
386-
387-
if (cachingEnabled) {
388-
if (futureCache.containsKey(cacheKey)) {
389-
stats.incrementCacheHitCount();
390-
return futureCache.get(cacheKey);
391-
}
392-
}
393-
394-
CompletableFuture<V> future = new CompletableFuture<>();
395-
if (batchingEnabled) {
396-
loaderQueue.add(new SimpleImmutableEntry<>(key, future));
397-
} else {
398-
stats.incrementBatchLoadCountBy(1);
399-
// immediate execution of batch function
400-
future = invokeLoaderImmediately(key);
401-
}
402-
if (cachingEnabled) {
403-
futureCache.set(cacheKey, future);
404-
}
405-
return future;
406-
}
375+
return helper.load(key);
407376
}
408377

409378
/**
@@ -436,183 +405,7 @@ public CompletableFuture<List<V>> loadMany(List<K> keys) {
436405
* @return the promise of the queued load requests
437406
*/
438407
public CompletableFuture<List<V>> dispatch() {
439-
boolean batchingEnabled = loaderOptions.batchingEnabled();
440-
//
441-
// we copy the pre-loaded set of futures ready for dispatch
442-
final List<K> keys = new ArrayList<>();
443-
final List<CompletableFuture<V>> queuedFutures = new ArrayList<>();
444-
synchronized (this) {
445-
loaderQueue.forEach(entry -> {
446-
keys.add(entry.getKey());
447-
queuedFutures.add(entry.getValue());
448-
});
449-
loaderQueue.clear();
450-
}
451-
if (!batchingEnabled || keys.size() == 0) {
452-
return CompletableFuture.completedFuture(emptyList());
453-
}
454-
//
455-
// order of keys -> values matter in data loader hence the use of linked hash map
456-
//
457-
// See https://github.com/facebook/dataloader/blob/master/README.md for more details
458-
//
459-
460-
//
461-
// when the promised list of values completes, we transfer the values into
462-
// the previously cached future objects that the client already has been given
463-
// via calls to load("foo") and loadMany(["foo","bar"])
464-
//
465-
int maxBatchSize = loaderOptions.maxBatchSize();
466-
if (maxBatchSize > 0 && maxBatchSize < keys.size()) {
467-
return sliceIntoBatchesOfBatches(keys, queuedFutures, maxBatchSize);
468-
} else {
469-
return dispatchQueueBatch(keys, queuedFutures);
470-
}
471-
}
472-
473-
private CompletableFuture<List<V>> sliceIntoBatchesOfBatches(List<K> keys, List<CompletableFuture<V>> queuedFutures, int maxBatchSize) {
474-
// the number of keys is > than what the batch loader function can accept
475-
// so make multiple calls to the loader
476-
List<CompletableFuture<List<V>>> allBatches = new ArrayList<>();
477-
int len = keys.size();
478-
int batchCount = (int) Math.ceil(len / (double) maxBatchSize);
479-
for (int i = 0; i < batchCount; i++) {
480-
481-
int fromIndex = i * maxBatchSize;
482-
int toIndex = Math.min((i + 1) * maxBatchSize, len);
483-
484-
List<K> subKeys = keys.subList(fromIndex, toIndex);
485-
List<CompletableFuture<V>> subFutures = queuedFutures.subList(fromIndex, toIndex);
486-
487-
allBatches.add(dispatchQueueBatch(subKeys, subFutures));
488-
}
489-
//
490-
// now reassemble all the futures into one that is the complete set of results
491-
return CompletableFuture.allOf(allBatches.toArray(new CompletableFuture[allBatches.size()]))
492-
.thenApply(v -> allBatches.stream()
493-
.map(CompletableFuture::join)
494-
.flatMap(Collection::stream)
495-
.collect(Collectors.toList()));
496-
}
497-
498-
@SuppressWarnings("unchecked")
499-
private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<CompletableFuture<V>> queuedFutures) {
500-
stats.incrementBatchLoadCountBy(keys.size());
501-
CompletionStage<List<V>> batchLoad = invokeLoader(keys);
502-
return batchLoad
503-
.toCompletableFuture()
504-
.thenApply(values -> {
505-
assertResultSize(keys, values);
506-
507-
for (int idx = 0; idx < queuedFutures.size(); idx++) {
508-
Object value = values.get(idx);
509-
CompletableFuture<V> future = queuedFutures.get(idx);
510-
if (value instanceof Throwable) {
511-
stats.incrementLoadErrorCount();
512-
future.completeExceptionally((Throwable) value);
513-
// we don't clear the cached view of this entry to avoid
514-
// frequently loading the same error
515-
} else if (value instanceof Try) {
516-
// we allow the batch loader to return a Try so we can better represent a computation
517-
// that might have worked or not.
518-
Try<V> tryValue = (Try<V>) value;
519-
if (tryValue.isSuccess()) {
520-
future.complete(tryValue.get());
521-
} else {
522-
stats.incrementLoadErrorCount();
523-
future.completeExceptionally(tryValue.getThrowable());
524-
}
525-
} else {
526-
V val = (V) value;
527-
future.complete(val);
528-
}
529-
}
530-
return values;
531-
}).exceptionally(ex -> {
532-
stats.incrementBatchLoadExceptionCount();
533-
for (int idx = 0; idx < queuedFutures.size(); idx++) {
534-
K key = keys.get(idx);
535-
CompletableFuture<V> future = queuedFutures.get(idx);
536-
future.completeExceptionally(ex);
537-
// clear any cached view of this key because they all failed
538-
clear(key);
539-
}
540-
return emptyList();
541-
});
542-
}
543-
544-
private CompletableFuture<V> invokeLoaderImmediately(K key) {
545-
List<K> keys = singletonList(key);
546-
CompletionStage<V> singleLoadCall;
547-
try {
548-
BatchLoaderEnvironment environment = loaderOptions.getBatchLoaderEnvironmentProvider().get();
549-
if (isMapLoader()) {
550-
singleLoadCall = invokeMapBatchLoader(keys, environment).thenApply(list -> list.get(0));
551-
} else {
552-
singleLoadCall = invokeListBatchLoader(keys, environment).thenApply(list -> list.get(0));
553-
}
554-
return singleLoadCall.toCompletableFuture();
555-
} catch (Exception e) {
556-
return CompletableFutureKit.failedFuture(e);
557-
}
558-
}
559-
560-
private CompletionStage<List<V>> invokeLoader(List<K> keys) {
561-
CompletionStage<List<V>> batchLoad;
562-
try {
563-
BatchLoaderEnvironment environment = loaderOptions.getBatchLoaderEnvironmentProvider().get();
564-
if (isMapLoader()) {
565-
batchLoad = invokeMapBatchLoader(keys, environment);
566-
} else {
567-
batchLoad = invokeListBatchLoader(keys, environment);
568-
}
569-
} catch (Exception e) {
570-
batchLoad = CompletableFutureKit.failedFuture(e);
571-
}
572-
return batchLoad;
573-
}
574-
575-
@SuppressWarnings("unchecked")
576-
private CompletionStage<List<V>> invokeListBatchLoader(List<K> keys, BatchLoaderEnvironment environment) {
577-
CompletionStage<List<V>> loadResult;
578-
if (batchLoadFunction instanceof BatchLoaderWithContext) {
579-
loadResult = ((BatchLoaderWithContext<K, V>) batchLoadFunction).load(keys, environment);
580-
} else {
581-
loadResult = ((BatchLoader<K, V>) batchLoadFunction).load(keys);
582-
}
583-
return nonNull(loadResult, "Your batch loader function MUST return a non null CompletionStage promise");
584-
}
585-
586-
/*
587-
* Turns a map of results that MAY be smaller than the key list back into a list by mapping null
588-
* to missing elements.
589-
*/
590-
591-
@SuppressWarnings("unchecked")
592-
private CompletionStage<List<V>> invokeMapBatchLoader(List<K> keys, BatchLoaderEnvironment environment) {
593-
CompletionStage<Map<K, V>> loadResult;
594-
if (batchLoadFunction instanceof MappedBatchLoaderWithContext) {
595-
loadResult = ((MappedBatchLoaderWithContext<K, V>) batchLoadFunction).load(keys, environment);
596-
} else {
597-
loadResult = ((MappedBatchLoader<K, V>) batchLoadFunction).load(keys);
598-
}
599-
CompletionStage<Map<K, V>> mapBatchLoad = nonNull(loadResult, "Your batch loader function MUST return a non null CompletionStage promise");
600-
return mapBatchLoad.thenApply(map -> {
601-
List<V> values = new ArrayList<>();
602-
for (K key : keys) {
603-
V value = map.get(key);
604-
values.add(value);
605-
}
606-
return values;
607-
});
608-
}
609-
610-
private boolean isMapLoader() {
611-
return batchLoadFunction instanceof MappedBatchLoader || batchLoadFunction instanceof MappedBatchLoaderWithContext;
612-
}
613-
614-
private void assertResultSize(List<K> keys, List<V> values) {
615-
assertState(keys.size() == values.size(), "The size of the promised values MUST be the same size as the key list");
408+
return helper.dispatch();
616409
}
617410

618411
/**
@@ -718,10 +511,8 @@ public DataLoader<K, V> prime(K key, Exception error) {
718511
*
719512
* @return the cache key after the input is transformed with the cache key function
720513
*/
721-
@SuppressWarnings("unchecked")
722514
public Object getCacheKey(K key) {
723-
return loaderOptions.cacheKeyFunction().isPresent() ?
724-
loaderOptions.cacheKeyFunction().get().getKey(key) : key;
515+
return helper.getCacheKey(key);
725516
}
726517

727518
/**

0 commit comments

Comments
 (0)