Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
df74670
Make ``handleEntity`` throw exceptions instead of returning empty opt…
staudtMarius Apr 24, 2023
1a207d7
Fix failing test.
staudtMarius Apr 24, 2023
546edb4
Fix quality gate issues.
staudtMarius Apr 26, 2023
f392485
Fix quality gate issues.
staudtMarius Apr 26, 2023
ce12aa0
Fixing failing test.
staudtMarius Apr 26, 2023
0cb9fb6
Fixing failing quality gate.
staudtMarius Apr 26, 2023
1d3afb8
Merge branch 'dev' into ms/#682-method-handleEntity-throws-Exception
staudtMarius Apr 27, 2023
d80b542
Merge branch 'dev' into ms/#682-method-handleEntity-throws-Exception
staudtMarius May 4, 2023
a4561ad
Merge branch 'dev' into ms/#682-method-handleEntity-throws-Exception
staudtMarius May 9, 2023
60e2afa
Merge branch 'dev' into ms/#682-method-handleEntity-throws-Exception
staudtMarius May 15, 2023
2f40a80
Merge branch 'dev' into ms/#682-method-handleEntity-throws-Exception
staudtMarius May 16, 2023
e02890a
Merge branch 'dev' into ms/#682-method-handleEntity-throws-Exception
staudtMarius May 22, 2023
98b6280
Merge branch 'dev' into ms/#682-method-handleEntity-throws-Exception
staudtMarius May 25, 2023
05d8a6b
Merge branch 'dev' into ms/#682-method-handleEntity-throws-Exception
staudtMarius May 30, 2023
9bb4e5a
Merge branch 'dev' into ms/#682-method-handleEntity-throws-Exception
staudtMarius Jun 7, 2023
e7ae53e
Merge branch 'dev' into ms/#682-method-handleEntity-throws-Exception
sebastian-peter Jun 21, 2023
5a3f95d
Merge branch 'dev' into ms/#682-method-handleEntity-throws-Exception
staudtMarius Jul 19, 2023
bec0f02
Merge branch 'dev' into ms/#682-method-handleEntity-throws-Exception
sebastian-peter Jul 25, 2023
87660a0
Merge branch 'dev' into ms/#682-method-handleEntity-throws-Exception
staudtMarius Jul 26, 2023
2187cfc
Merge branch 'dev' into ms/#682-method-handleEntity-throws-Exception
staudtMarius Jul 28, 2023
098ab79
Merge branch 'dev' into ms/#682-method-handleEntity-throws-Exception
sebastian-peter Jul 31, 2023
bb64244
Implementing requested changes.
staudtMarius Aug 1, 2023
ed9231b
Merge branch 'dev' into ms/#682-method-handleEntity-throws-Exception
staudtMarius Aug 1, 2023
2aebb8c
Applied type parameters to method call
sebastian-peter Aug 1, 2023
f83abfb
Avoided usage of RuntimeException
sebastian-peter Aug 1, 2023
3fcdca3
Avoided usage of RuntimeException in time series processor map creation
sebastian-peter Aug 1, 2023
0871294
Fixed code smell
sebastian-peter Aug 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected EntityProcessor(Class<? extends T> registeredClass) {
* @return an optional Map with fieldName to fieldValue or an empty optional if an error occurred
* during processing
*/
public Optional<LinkedHashMap<String, String>> handleEntity(T entity) {
public LinkedHashMap<String, String> handleEntity(T entity) {
if (!registeredClass.equals(entity.getClass()))
throw new EntityProcessorException(
"Cannot process "
Expand All @@ -63,10 +63,10 @@ public Optional<LinkedHashMap<String, String>> handleEntity(T entity) {
+ ".class!");

try {
return Optional.of(processObject(entity, fieldNameToMethod));
return processObject(entity, fieldNameToMethod);
} catch (EntityProcessorException e) {
logger.error("Cannot process the entity{}.", entity, e);
return Optional.empty();
throw new EntityProcessorException("Entity " + entity + " cannot be processed.", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,14 @@ public ProcessorProvider(
this.timeSeriesProcessors = timeSeriesProcessors;
}

public <T extends UniqueEntity> Optional<LinkedHashMap<String, String>> handleEntity(T entity) {
public <T extends UniqueEntity> LinkedHashMap<String, String> handleEntity(T entity)
throws ProcessorProviderException {
try {
EntityProcessor<? extends UniqueEntity> processor = getEntityProcessor(entity.getClass());
return castProcessor(processor).handleEntity(entity);
} catch (ProcessorProviderException e) {
log.error("Exception occurred during entity handling.", e);
return Optional.empty();
throw e;
}
}

Expand Down Expand Up @@ -112,17 +113,19 @@ private EntityProcessor<? extends UniqueEntity> getEntityProcessor(
* @return A set of mappings from field name to value
*/
public <T extends TimeSeries<E, V>, E extends TimeSeriesEntry<V>, V extends Value>
Optional<Set<LinkedHashMap<String, String>>> handleTimeSeries(T timeSeries) {
Set<LinkedHashMap<String, String>> handleTimeSeries(T timeSeries)
throws ProcessorProviderException {
TimeSeriesProcessorKey key = new TimeSeriesProcessorKey(timeSeries);
try {
TimeSeriesProcessor<T, E, V> processor = getTimeSeriesProcessor(key);
return Optional.of(processor.handleTimeSeries(timeSeries));
return processor.handleTimeSeries(timeSeries);
} catch (ProcessorProviderException e) {
log.error("Cannot handle the time series '{}'.", timeSeries, e);
return Optional.empty();
throw new ProcessorProviderException(
"Cannot handle the time series {" + timeSeries + "}.", e);
} catch (EntityProcessorException e) {
log.error("Error during processing of time series.", e);
return Optional.empty();
throw new EntityProcessorException("Error during processing of time series.", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private SortedMap<String, FieldSourceToMethod> buildFieldToSource(
}

@Override
public Optional<LinkedHashMap<String, String>> handleEntity(TimeSeries entity) {
public LinkedHashMap<String, String> handleEntity(TimeSeries entity) {
throw new UnsupportedOperationException(
"Don't invoke this simple method, but TimeSeriesProcessor#handleTimeSeries(TimeSeries).");
}
Expand Down
36 changes: 5 additions & 31 deletions src/main/java/edu/ie3/datamodel/io/sink/CsvFileSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,23 +264,10 @@ public <E extends TimeSeriesEntry<V>, V extends Value> void persistTimeSeries(
}

private <E extends TimeSeriesEntry<V>, V extends Value> void persistTimeSeries(
TimeSeries<E, V> timeSeries, BufferedCsvWriter writer) {
TimeSeriesProcessorKey key = new TimeSeriesProcessorKey(timeSeries);

TimeSeries<E, V> timeSeries, BufferedCsvWriter writer) throws ProcessorProviderException {
try {
Set<LinkedHashMap<String, String>> entityFieldData =
processorProvider
.handleTimeSeries(timeSeries)
.orElseThrow(
() ->
new SinkException(
"Cannot persist time series of combination '"
+ key
+ "'. This sink can only process the following combinations: ["
+ processorProvider.getRegisteredTimeSeriesCombinations().stream()
.map(TimeSeriesProcessorKey::toString)
.collect(Collectors.joining(","))
+ "]"));
processorProvider.handleTimeSeries(timeSeries);
entityFieldData.forEach(
data -> {
try {
Expand All @@ -291,8 +278,9 @@ private <E extends TimeSeriesEntry<V>, V extends Value> void persistTimeSeries(
log.error("Exception occurred during processing the provided data fields: ", e);
}
});
} catch (SinkException e) {
} catch (ProcessorProviderException e) {
log.error("Exception occurred during processor request: ", e);
throw new ProcessorProviderException("Exception occurred during processor request: ", e);
}
}

Expand All @@ -306,21 +294,7 @@ private <E extends TimeSeriesEntry<V>, V extends Value> void persistTimeSeries(
private <C extends UniqueEntity> void write(C entity) {
LinkedHashMap<String, String> entityFieldData;
try {
entityFieldData =
processorProvider
.handleEntity(entity)
.map(this::csvEntityFieldData)
.orElseThrow(
() ->
new SinkException(
"Cannot persist entity of type '"
+ entity.getClass().getSimpleName()
+ "'. This sink can only process the following entities: ["
+ processorProvider.getRegisteredClasses().stream()
.map(Class::getSimpleName)
.collect(Collectors.joining(","))
+ "]"));

entityFieldData = csvEntityFieldData(processorProvider.handleEntity(entity));
String[] headerElements = processorProvider.getHeaderElements(entity.getClass());
BufferedCsvWriter writer =
connector.getOrInitWriter(entity.getClass(), headerElements, csvSep);
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/edu/ie3/datamodel/io/sink/DataSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package edu.ie3.datamodel.io.sink;

import edu.ie3.datamodel.exceptions.ProcessorProviderException;
import edu.ie3.datamodel.io.connectors.DataConnector;
import edu.ie3.datamodel.io.processor.EntityProcessor;
import edu.ie3.datamodel.models.UniqueEntity;
Expand Down Expand Up @@ -37,7 +38,7 @@ public interface DataSink {
* @param <C> bounded to be all unique entities. Handling of specific entities is normally then
* executed by a specific {@link EntityProcessor}
*/
<C extends UniqueEntity> void persist(C entity);
<C extends UniqueEntity> void persist(C entity) throws ProcessorProviderException;

/**
* Should implement the entry point of a data sink to persist multiple entities in a collection.
Expand All @@ -51,7 +52,8 @@ public interface DataSink {
* @param <C> bounded to be all unique entities. Handling of specific entities is normally then
* executed by a specific {@link EntityProcessor}
*/
<C extends UniqueEntity> void persistAll(Collection<C> entities);
<C extends UniqueEntity> void persistAll(Collection<C> entities)
throws ProcessorProviderException;

/**
* Should implement the handling of a whole time series. Therefore the single entries have to be
Expand All @@ -62,5 +64,5 @@ public interface DataSink {
* @param <V> Type of actual value, that is inside the entry
*/
<E extends TimeSeriesEntry<V>, V extends Value> void persistTimeSeries(
TimeSeries<E, V> timeSeries);
TimeSeries<E, V> timeSeries) throws ProcessorProviderException;
}
118 changes: 49 additions & 69 deletions src/main/java/edu/ie3/datamodel/io/sink/InfluxDbSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
*/
package edu.ie3.datamodel.io.sink;

import edu.ie3.datamodel.exceptions.SinkException;
import edu.ie3.datamodel.exceptions.ProcessorProviderException;
import edu.ie3.datamodel.io.connectors.InfluxDbConnector;
import edu.ie3.datamodel.io.naming.EntityPersistenceNamingStrategy;
import edu.ie3.datamodel.io.processor.ProcessorProvider;
import edu.ie3.datamodel.io.processor.timeseries.TimeSeriesProcessorKey;
import edu.ie3.datamodel.models.UniqueEntity;
import edu.ie3.datamodel.models.result.ResultEntity;
import edu.ie3.datamodel.models.timeseries.TimeSeries;
Expand All @@ -18,7 +17,6 @@
import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.slf4j.Logger;
Expand All @@ -32,6 +30,8 @@ public class InfluxDbSink implements OutputDataSink {
/** Field name for input model uuid field in result entities */
private static final String FIELD_NAME_INPUT = "inputModel";

private static final String ERROR_MESSAGE = "Cannot persist provided entity '{}'. Exception: {}";

private final InfluxDbConnector connector;
private final EntityPersistenceNamingStrategy entityPersistenceNamingStrategy;
private final ProcessorProvider processorProvider;
Expand Down Expand Up @@ -68,15 +68,16 @@ public void shutdown() {
}

@Override
public <C extends UniqueEntity> void persist(C entity) {
public <C extends UniqueEntity> void persist(C entity) throws ProcessorProviderException {
Set<Point> points = extractPoints(entity);
// writes only the exact one point instead of unnecessarily wrapping it in BatchPoints
if (points.size() == 1) write(points.iterator().next());
else writeAll(points);
}

@Override
public <C extends UniqueEntity> void persistAll(Collection<C> entities) {
public <C extends UniqueEntity> void persistAll(Collection<C> entities)
throws ProcessorProviderException {
Set<Point> points = new HashSet<>();
for (C entity : entities) {
points.addAll(extractPoints(entity));
Expand All @@ -86,7 +87,7 @@ public <C extends UniqueEntity> void persistAll(Collection<C> entities) {

@Override
public <E extends TimeSeriesEntry<V>, V extends Value> void persistTimeSeries(
TimeSeries<E, V> timeSeries) {
TimeSeries<E, V> timeSeries) throws ProcessorProviderException {
Set<Point> points = transformToPoints(timeSeries);
writeAll(points);
}
Expand All @@ -107,7 +108,7 @@ public void flush() {
*
* @param entity the entity to transform
*/
private Optional<Point> transformToPoint(ResultEntity entity) {
private Point transformToPoint(ResultEntity entity) throws ProcessorProviderException {
Optional<String> measurementName =
entityPersistenceNamingStrategy.getResultEntityName(entity.getClass());
if (measurementName.isEmpty())
Expand All @@ -124,37 +125,23 @@ private Optional<Point> transformToPoint(ResultEntity entity) {
* @param entity the entity to transform
* @param measurementName equivalent to the name of a relational table
*/
private Optional<Point> transformToPoint(ResultEntity entity, String measurementName) {
private Point transformToPoint(ResultEntity entity, String measurementName)
throws ProcessorProviderException {
LinkedHashMap<String, String> entityFieldData;
try {
entityFieldData =
processorProvider
.handleEntity(entity)
.orElseThrow(
() ->
new SinkException(
"Cannot persist entity of type '"
+ entity.getClass().getSimpleName()
+ "'. This sink can only process the following entities: ["
+ processorProvider.getRegisteredClasses().stream()
.map(Class::getSimpleName)
.collect(Collectors.joining(","))
+ "]"));
entityFieldData = processorProvider.handleEntity(entity);
entityFieldData.remove(FIELD_NAME_TIME);
return Optional.of(
Point.measurement(transformToMeasurementName(measurementName))
.time(entity.getTime().toInstant().toEpochMilli(), TimeUnit.MILLISECONDS)
.tag("input_model", entityFieldData.remove(FIELD_NAME_INPUT))
.tag("scenario", connector.getScenarioName())
.fields(Collections.unmodifiableMap(entityFieldData))
.build());
} catch (SinkException e) {
log.error(
"Cannot persist provided entity '{}'. Exception: {}",
entity.getClass().getSimpleName(),
e);
return Point.measurement(transformToMeasurementName(measurementName))
.time(entity.getTime().toInstant().toEpochMilli(), TimeUnit.MILLISECONDS)
.tag("input_model", entityFieldData.remove(FIELD_NAME_INPUT))
.tag("scenario", connector.getScenarioName())
.fields(Collections.unmodifiableMap(entityFieldData))
.build();
} catch (ProcessorProviderException e) {
log.error(ERROR_MESSAGE, entity.getClass().getSimpleName(), e);

throw new ProcessorProviderException(e);
}
return Optional.empty();
}

/**
Expand All @@ -166,18 +153,25 @@ private Optional<Point> transformToPoint(ResultEntity entity, String measurement
* @param timeSeries the time series to transform
*/
private <E extends TimeSeriesEntry<V>, V extends Value> Set<Point> transformToPoints(
TimeSeries<E, V> timeSeries) {
TimeSeries<E, V> timeSeries) throws ProcessorProviderException {
if (timeSeries.getEntries().isEmpty()) return Collections.emptySet();
Optional<String> measurementName = entityPersistenceNamingStrategy.getEntityName(timeSeries);
if (measurementName.isEmpty()) {
String valueClassName =
timeSeries.getEntries().iterator().next().getValue().getClass().getSimpleName();
log.warn(
"I could not get a measurement name for TimeSeries value class {}. I am using it's value's simple name instead.",
valueClassName);
return transformToPoints(timeSeries, valueClassName);

try {
Optional<String> measurementName = entityPersistenceNamingStrategy.getEntityName(timeSeries);
if (measurementName.isEmpty()) {
String valueClassName =
timeSeries.getEntries().iterator().next().getValue().getClass().getSimpleName();
log.warn(
"I could not get a measurement name for TimeSeries value class {}. I am using it's value's simple name instead.",
valueClassName);
return transformToPoints(timeSeries, valueClassName);
}
return transformToPoints(timeSeries, measurementName.get());
} catch (ProcessorProviderException e) {
log.error(ERROR_MESSAGE, timeSeries.getClass().getSimpleName(), e);
throw new ProcessorProviderException(
"Cannot persist provided time series {" + timeSeries.getClass().getSimpleName() + "}", e);
}
return transformToPoints(timeSeries, measurementName.get());
}

/**
Expand All @@ -188,23 +182,11 @@ private <E extends TimeSeriesEntry<V>, V extends Value> Set<Point> transformToPo
* @param measurementName equivalent to the name of a relational table
*/
private <E extends TimeSeriesEntry<V>, V extends Value> Set<Point> transformToPoints(
TimeSeries<E, V> timeSeries, String measurementName) {
TimeSeriesProcessorKey key = new TimeSeriesProcessorKey(timeSeries);
TimeSeries<E, V> timeSeries, String measurementName) throws ProcessorProviderException {
Set<Point> points = new HashSet<>();
try {
Set<LinkedHashMap<String, String>> entityFieldData =
processorProvider
.handleTimeSeries(timeSeries)
.orElseThrow(
() ->
new SinkException(
"Cannot persist time series of combination '"
+ key
+ "'. This sink can only process the following combinations: ["
+ processorProvider.getRegisteredTimeSeriesCombinations().stream()
.map(TimeSeriesProcessorKey::toString)
.collect(Collectors.joining(","))
+ "]"));
processorProvider.handleTimeSeries(timeSeries);

for (LinkedHashMap<String, String> dataMapping : entityFieldData) {
String timeString = dataMapping.remove(FIELD_NAME_TIME);
Expand All @@ -217,8 +199,10 @@ private <E extends TimeSeriesEntry<V>, V extends Value> Set<Point> transformToPo
.build();
points.add(point);
}
} catch (SinkException e) {
log.error("Cannot persist provided time series '{}'. Exception: {}", key, e);
} catch (ProcessorProviderException e) {
log.error(ERROR_MESSAGE, timeSeries.getClass().getSimpleName(), e);
throw new ProcessorProviderException(
"Cannot persist provided time series {" + timeSeries.getClass().getSimpleName() + "}", e);
}
return points;
}
Expand All @@ -233,19 +217,15 @@ private <E extends TimeSeriesEntry<V>, V extends Value> Set<Point> transformToPo
* @param <C> bounded to be all unique entities, but logs an error and returns an empty Set if it
* does not extend {@link ResultEntity} or {@link TimeSeries}
*/
private <C extends UniqueEntity> Set<Point> extractPoints(C entity) {
private <C extends UniqueEntity> Set<Point> extractPoints(C entity)
throws ProcessorProviderException {
Set<Point> points = new HashSet<>();
/* Distinguish between result models and time series */
if (entity instanceof ResultEntity resultEntity) {
try {
points.add(
transformToPoint(resultEntity)
.orElseThrow(() -> new SinkException("Could not transform entity")));
} catch (SinkException e) {
log.error(
"Cannot persist provided entity '{}'. Exception: {}",
entity.getClass().getSimpleName(),
e);
points.add(transformToPoint(resultEntity));
} catch (ProcessorProviderException e) {
log.error(ERROR_MESSAGE, entity.getClass().getSimpleName(), e);
}
} else if (entity instanceof TimeSeries<?, ?> timeSeries) {
points.addAll(transformToPoints(timeSeries));
Expand Down
Loading