Skip to content
Merged
59 changes: 59 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,65 @@ When ticker mode is **true** the `ScheduledDataLoaderRegistry` algorithm is as f
* If it returns **true**, then `dataLoader.dispatch()` is called **and** a task is scheduled to re-evaluate this specific dataloader in the near future
* The re-evaluation tasks are run periodically according to the `registry.getScheduleDuration()`

## Instrumenting the data loader code

A `DataLoader` can have a `DataLoaderInstrumentation` associated with it. This callback interface is intended to provide
insight into working of the `DataLoader` such as how long it takes to run or to allow for logging of key events.

You set the `DataLoaderInstrumentation` into the `DataLoaderOptions` at build time.

```java


DataLoaderInstrumentation timingInstrumentation = new DataLoaderInstrumentation() {
@Override
public DataLoaderInstrumentationContext<DispatchResult<?>> beginDispatch(DataLoader<?, ?> dataLoader) {
long then = System.currentTimeMillis();
return DataLoaderInstrumentationHelper.whenCompleted((result, err) -> {
long ms = System.currentTimeMillis() - then;
System.out.println(format("dispatch time: %d ms", ms));
});
}

@Override
public DataLoaderInstrumentationContext<List<?>> beginBatchLoader(DataLoader<?, ?> dataLoader, List<?> keys, BatchLoaderEnvironment environment) {
long then = System.currentTimeMillis();
return DataLoaderInstrumentationHelper.whenCompleted((result, err) -> {
long ms = System.currentTimeMillis() - then;
System.out.println(format("batch loader time: %d ms", ms));
});
}
};
DataLoaderOptions options = DataLoaderOptions.newOptions().setInstrumentation(timingInstrumentation);
DataLoader<String, User> userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader, options);

```

The example shows how long the overall `DataLoader` dispatch takes or how long the batch loader takes to run.

### Instrumenting the DataLoaderRegistry

You can also associate a `DataLoaderInstrumentation` with a `DataLoaderRegistry`. Every `DataLoader` registered will be changed so that the registry
`DataLoaderInstrumentation` is associated with it. This allows you to set just the one `DataLoaderInstrumentation` in place and it applies to all
data loaders.

```java
DataLoader<String, User> userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader);
DataLoader<String, User> teamsDataLoader = DataLoaderFactory.newDataLoader(teamsBatchLoader);

DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
.instrumentation(timingInstrumentation)
.register("users", userDataLoader)
.register("teams", teamsDataLoader)
.build();

DataLoader<String, User> changedUsersDataLoader = registry.getDataLoader("users");
```

The `timingInstrumentation` here will be associated with the `DataLoader` under the key `users` and the key `teams`. Note that since
DataLoader is immutable, a new changed object is created so you must use the registry to get the `DataLoader`.


## Other information sources

- [Facebook DataLoader Github repo](https://github.com/facebook/dataloader)
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/dataloader/DataLoaderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ public Builder<K, V> options(DataLoaderOptions options) {
return this;
}

DataLoader<K, V> build() {
public DataLoader<K, V> build() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed in previous PR

return mkDataLoader(batchLoadFunction, options);
}
}
Expand Down
38 changes: 32 additions & 6 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import org.dataloader.annotations.GuardedBy;
import org.dataloader.annotations.Internal;
import org.dataloader.impl.CompletableFutureKit;
import org.dataloader.instrumentation.DataLoaderInstrumentation;
import org.dataloader.instrumentation.DataLoaderInstrumentationContext;
import org.dataloader.reactive.ReactiveSupport;
import org.dataloader.scheduler.BatchLoaderScheduler;
import org.dataloader.stats.StatisticsCollector;
Expand Down Expand Up @@ -34,6 +36,7 @@
import static java.util.stream.Collectors.toList;
import static org.dataloader.impl.Assertions.assertState;
import static org.dataloader.impl.Assertions.nonNull;
import static org.dataloader.instrumentation.DataLoaderInstrumentationHelper.ctxOrNoopCtx;

/**
* This helps break up the large DataLoader class functionality, and it contains the logic to dispatch the
Expand Down Expand Up @@ -167,6 +170,8 @@ Object getCacheKeyWithContext(K key, Object context) {
}

DispatchResult<V> dispatch() {
DataLoaderInstrumentationContext<DispatchResult<?>> instrCtx = ctxOrNoopCtx(instrumentation().beginDispatch(dataLoader));

boolean batchingEnabled = loaderOptions.batchingEnabled();
final List<K> keys;
final List<Object> callContexts;
Expand All @@ -175,7 +180,8 @@ DispatchResult<V> dispatch() {
int queueSize = loaderQueue.size();
if (queueSize == 0) {
lastDispatchTime.set(now());
return emptyDispatchResult();
instrCtx.onDispatched();
return endDispatchCtx(instrCtx, emptyDispatchResult());
}

// we copy the pre-loaded set of futures ready for dispatch
Expand All @@ -192,7 +198,8 @@ DispatchResult<V> dispatch() {
lastDispatchTime.set(now());
}
if (!batchingEnabled) {
return emptyDispatchResult();
instrCtx.onDispatched();
return endDispatchCtx(instrCtx, emptyDispatchResult());
}
final int totalEntriesHandled = keys.size();
//
Expand All @@ -213,7 +220,15 @@ DispatchResult<V> dispatch() {
} else {
futureList = dispatchQueueBatch(keys, callContexts, queuedFutures);
}
return new DispatchResult<>(futureList, totalEntriesHandled);
instrCtx.onDispatched();
return endDispatchCtx(instrCtx, new DispatchResult<>(futureList, totalEntriesHandled));
}

private DispatchResult<V> endDispatchCtx(DataLoaderInstrumentationContext<DispatchResult<?>> instrCtx, DispatchResult<V> dispatchResult) {
// once the CF completes, we can tell the instrumentation
dispatchResult.getPromisedResults()
.whenComplete((result, throwable) -> instrCtx.onCompleted(dispatchResult, throwable));
return dispatchResult;
}

private CompletableFuture<List<V>> sliceIntoBatchesOfBatches(List<K> keys, List<CompletableFuture<V>> queuedFutures, List<Object> callContexts, int maxBatchSize) {
Expand Down Expand Up @@ -427,11 +442,14 @@ CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts,
}

CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts, List<CompletableFuture<V>> queuedFutures) {
Object context = loaderOptions.getBatchLoaderContextProvider().getContext();
BatchLoaderEnvironment environment = BatchLoaderEnvironment.newBatchLoaderEnvironment()
.context(context).keyContexts(keys, keyContexts).build();

DataLoaderInstrumentationContext<List<?>> instrCtx = ctxOrNoopCtx(instrumentation().beginBatchLoader(dataLoader, keys, environment));

CompletableFuture<List<V>> batchLoad;
try {
Object context = loaderOptions.getBatchLoaderContextProvider().getContext();
BatchLoaderEnvironment environment = BatchLoaderEnvironment.newBatchLoaderEnvironment()
.context(context).keyContexts(keys, keyContexts).build();
if (isMapLoader()) {
batchLoad = invokeMapBatchLoader(keys, environment);
} else if (isPublisher()) {
Expand All @@ -441,12 +459,16 @@ CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts,
} else {
batchLoad = invokeListBatchLoader(keys, environment);
}
instrCtx.onDispatched();
} catch (Exception e) {
instrCtx.onDispatched();
batchLoad = CompletableFutureKit.failedFuture(e);
}
batchLoad.whenComplete(instrCtx::onCompleted);
return batchLoad;
}


@SuppressWarnings("unchecked")
private CompletableFuture<List<V>> invokeListBatchLoader(List<K> keys, BatchLoaderEnvironment environment) {
CompletionStage<List<V>> loadResult;
Expand Down Expand Up @@ -575,6 +597,10 @@ private boolean isMappedPublisher() {
return batchLoadFunction instanceof MappedBatchPublisher;
}

private DataLoaderInstrumentation instrumentation() {
return loaderOptions.getInstrumentation();
}

int dispatchDepth() {
synchronized (dataLoader) {
return loaderQueue.size();
Expand Down
54 changes: 42 additions & 12 deletions src/main/java/org/dataloader/DataLoaderOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.dataloader;

import org.dataloader.annotations.PublicApi;
import org.dataloader.instrumentation.DataLoaderInstrumentation;
import org.dataloader.instrumentation.DataLoaderInstrumentationHelper;
import org.dataloader.scheduler.BatchLoaderScheduler;
import org.dataloader.stats.NoOpStatisticsCollector;
import org.dataloader.stats.StatisticsCollector;
Expand Down Expand Up @@ -52,6 +54,7 @@ public class DataLoaderOptions {
private final BatchLoaderContextProvider environmentProvider;
private final ValueCacheOptions valueCacheOptions;
private final BatchLoaderScheduler batchLoaderScheduler;
private final DataLoaderInstrumentation instrumentation;

/**
* Creates a new data loader options with default settings.
Expand All @@ -68,6 +71,7 @@ public DataLoaderOptions() {
environmentProvider = NULL_PROVIDER;
valueCacheOptions = DEFAULT_VALUE_CACHE_OPTIONS;
batchLoaderScheduler = null;
instrumentation = DataLoaderInstrumentationHelper.NOOP_INSTRUMENTATION;
}

private DataLoaderOptions(Builder builder) {
Expand All @@ -82,6 +86,7 @@ private DataLoaderOptions(Builder builder) {
this.environmentProvider = builder.environmentProvider;
this.valueCacheOptions = builder.valueCacheOptions;
this.batchLoaderScheduler = builder.batchLoaderScheduler;
this.instrumentation = builder.instrumentation;
}

/**
Expand All @@ -101,7 +106,8 @@ public DataLoaderOptions(DataLoaderOptions other) {
this.statisticsCollector = other.statisticsCollector;
this.environmentProvider = other.environmentProvider;
this.valueCacheOptions = other.valueCacheOptions;
batchLoaderScheduler = other.batchLoaderScheduler;
this.batchLoaderScheduler = other.batchLoaderScheduler;
this.instrumentation = other.instrumentation;
}

/**
Expand Down Expand Up @@ -169,7 +175,7 @@ public boolean batchingEnabled() {
* Sets the option that determines whether batch loading is enabled.
*
* @param batchingEnabled {@code true} to enable batch loading, {@code false} otherwise
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setBatchingEnabled(boolean batchingEnabled) {
return builder().setBatchingEnabled(batchingEnabled).build();
Expand All @@ -188,7 +194,7 @@ public boolean cachingEnabled() {
* Sets the option that determines whether caching is enabled.
*
* @param cachingEnabled {@code true} to enable caching, {@code false} otherwise
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setCachingEnabled(boolean cachingEnabled) {
return builder().setCachingEnabled(cachingEnabled).build();
Expand All @@ -212,7 +218,7 @@ public boolean cachingExceptionsEnabled() {
* Sets the option that determines whether exceptional values are cache enabled.
*
* @param cachingExceptionsEnabled {@code true} to enable caching exceptional values, {@code false} otherwise
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setCachingExceptionsEnabled(boolean cachingExceptionsEnabled) {
return builder().setCachingExceptionsEnabled(cachingExceptionsEnabled).build();
Expand All @@ -233,7 +239,7 @@ public Optional<CacheKey> cacheKeyFunction() {
* Sets the function to use for creating the cache key, if caching is enabled.
*
* @param cacheKeyFunction the cache key function to use
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setCacheKeyFunction(CacheKey<?> cacheKeyFunction) {
return builder().setCacheKeyFunction(cacheKeyFunction).build();
Expand All @@ -254,7 +260,7 @@ public DataLoaderOptions setCacheKeyFunction(CacheKey<?> cacheKeyFunction) {
* Sets the cache map implementation to use for caching, if caching is enabled.
*
* @param cacheMap the cache map instance
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setCacheMap(CacheMap<?, ?> cacheMap) {
return builder().setCacheMap(cacheMap).build();
Expand All @@ -275,7 +281,7 @@ public int maxBatchSize() {
* before they are split into multiple class
*
* @param maxBatchSize the maximum batch size
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setMaxBatchSize(int maxBatchSize) {
return builder().setMaxBatchSize(maxBatchSize).build();
Expand All @@ -294,7 +300,7 @@ public StatisticsCollector getStatisticsCollector() {
* a common value
*
* @param statisticsCollector the statistics collector to use
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setStatisticsCollector(Supplier<StatisticsCollector> statisticsCollector) {
return builder().setStatisticsCollector(nonNull(statisticsCollector)).build();
Expand All @@ -311,7 +317,7 @@ public BatchLoaderContextProvider getBatchLoaderContextProvider() {
* Sets the batch loader environment provider that will be used to give context to batch load functions
*
* @param contextProvider the batch loader context provider
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setBatchLoaderContextProvider(BatchLoaderContextProvider contextProvider) {
return builder().setBatchLoaderContextProvider(nonNull(contextProvider)).build();
Expand All @@ -332,7 +338,7 @@ public DataLoaderOptions setBatchLoaderContextProvider(BatchLoaderContextProvide
* Sets the value cache implementation to use for caching values, if caching is enabled.
*
* @param valueCache the value cache instance
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setValueCache(ValueCache<?, ?> valueCache) {
return builder().setValueCache(valueCache).build();
Expand All @@ -349,7 +355,7 @@ public ValueCacheOptions getValueCacheOptions() {
* Sets the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used
*
* @param valueCacheOptions the value cache options
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setValueCacheOptions(ValueCacheOptions valueCacheOptions) {
return builder().setValueCacheOptions(nonNull(valueCacheOptions)).build();
Expand All @@ -367,12 +373,29 @@ public BatchLoaderScheduler getBatchLoaderScheduler() {
* to some future time.
*
* @param batchLoaderScheduler the scheduler
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setBatchLoaderScheduler(BatchLoaderScheduler batchLoaderScheduler) {
return builder().setBatchLoaderScheduler(batchLoaderScheduler).build();
}

/**
* @return the {@link DataLoaderInstrumentation} to use
*/
public DataLoaderInstrumentation getInstrumentation() {
return instrumentation;
}

/**
* Sets in a new {@link DataLoaderInstrumentation}
*
* @param instrumentation the new {@link DataLoaderInstrumentation}
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setInstrumentation(DataLoaderInstrumentation instrumentation) {
return builder().setInstrumentation(instrumentation).build();
}

private Builder builder() {
return new Builder(this);
}
Expand All @@ -389,6 +412,7 @@ public static class Builder {
private BatchLoaderContextProvider environmentProvider;
private ValueCacheOptions valueCacheOptions;
private BatchLoaderScheduler batchLoaderScheduler;
private DataLoaderInstrumentation instrumentation;

public Builder() {
this(new DataLoaderOptions()); // use the defaults of the DataLoaderOptions for this builder
Expand All @@ -406,6 +430,7 @@ public Builder() {
this.environmentProvider = other.environmentProvider;
this.valueCacheOptions = other.valueCacheOptions;
this.batchLoaderScheduler = other.batchLoaderScheduler;
this.instrumentation = other.instrumentation;
}

public Builder setBatchingEnabled(boolean batchingEnabled) {
Expand Down Expand Up @@ -463,6 +488,11 @@ public Builder setBatchLoaderScheduler(BatchLoaderScheduler batchLoaderScheduler
return this;
}

public Builder setInstrumentation(DataLoaderInstrumentation instrumentation) {
this.instrumentation = nonNull(instrumentation);
return this;
}

public DataLoaderOptions build() {
return new DataLoaderOptions(this);
}
Expand Down
Loading