diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 8c5a4ef4e94c88..289aed82b0cf97 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -956,12 +956,9 @@ public List ingestAspects( .map(mcl -> Pair.of(preprocessEvent(opContext, mcl), mcl)) .map( preprocessResult -> - MCLEmitResult.builder() - .emitted(false) - .processedMCL(preprocessResult.getFirst()) - .mclFuture(null) - .metadataChangeLog(preprocessResult.getSecond()) - .build()) + MCLEmitResult.notEmitted( + preprocessResult.getSecond(), + preprocessResult.getFirst())) .collect(Collectors.toList()); } updateAspectResults = @@ -2295,18 +2292,23 @@ public MCLEmitResult conditionallyProduceMCLAsync( } } - return MCLEmitResult.builder() - .metadataChangeLog(metadataChangeLog) - .mclFuture(emissionStatus.getFirst()) - .processedMCL(emissionStatus.getSecond()) - .emitted(emissionStatus.getFirst() != null) - .build(); + Future future = emissionStatus.getFirst(); + if (future != null) { + return MCLEmitResult.emitted( + metadataChangeLog, + future, + emissionStatus.getSecond()); + } else { + return MCLEmitResult.notEmitted( + metadataChangeLog, + emissionStatus.getSecond()); + } } else { log.info( "Skipped producing MCL for ingested aspect {}, urn {}. Aspect has not changed.", aspectSpec.getName(), entityUrn); - return MCLEmitResult.builder().metadataChangeLog(metadataChangeLog).emitted(false).build(); + return MCLEmitResult.notEmitted(metadataChangeLog, true); } } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/MCLEmitResult.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/MCLEmitResult.java index b4c2b6139e46ce..c96e35790b7a5b 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/MCLEmitResult.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/MCLEmitResult.java @@ -1,37 +1,190 @@ package com.linkedin.metadata.entity; import com.linkedin.mxe.MetadataChangeLog; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import lombok.Builder; -import lombok.Value; - -@Builder(toBuilder = true) -@Value -public class MCLEmitResult { - MetadataChangeLog metadataChangeLog; - - // The result when written to MCL Topic - Future mclFuture; - - // Whether the mcl was successfully written to the destination topic - boolean isProduced() { - if (mclFuture != null) { - try { - mclFuture.get(); - } catch (InterruptedException | ExecutionException e) { - return false; - } - return true; - } else { - return false; +import javax.annotation.Nonnull; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + +/** + * Result of attempting to emit a Metadata Change Log (MCL) to Kafka. + * + *

This class uses a state pattern to represent three distinct outcomes: + * + *

    + *
  1. Not Emitted: MCL was intentionally not sent (filtered, CDC mode, etc.) + *
  2. Emitted & Pending: MCL was sent to Kafka, async production in progress + *
  3. Emitted & Resolved: MCL production completed (successfully or with failure) + *
+ * + *

Use factory methods to construct instances in valid states: + * + *

+ */ +@Getter +@ToString +@EqualsAndHashCode +public final class MCLEmitResult { + + @Nonnull private final MetadataChangeLog metadataChangeLog; + + private final Future mclFuture; + + /** Whether the MCL was preprocessed (e.g., synchronous index update triggered). */ + private final boolean processedMCL; + + /** Whether this MCL was emitted to Kafka (regardless of production outcome). */ + private final boolean emitted; + + private MCLEmitResult( + @Nonnull MetadataChangeLog metadataChangeLog, + Future mclFuture, + boolean processedMCL, + boolean emitted) { + this.metadataChangeLog = metadataChangeLog; + this.mclFuture = mclFuture; + this.processedMCL = processedMCL; + this.emitted = emitted; + + // Invariant: emitted=true implies mclFuture != null + if (emitted && mclFuture == null) { + throw new IllegalArgumentException( + "Invalid state: emitted=true but mclFuture is null. " + + "Use notEmitted() factory method for non-emitted MCLs."); } + + // Invariant: emitted=false implies mclFuture == null + if (!emitted && mclFuture != null) { + throw new IllegalArgumentException( + "Invalid state: emitted=false but mclFuture is not null. " + + "Use emitted() factory method for emitted MCLs."); + } + } + + /** + * Creates a result for an MCL that was intentionally not emitted to Kafka. + * + * @param metadataChangeLog the MCL that was not emitted + * @param processedMCL whether the MCL was preprocessed (e.g., sync index update) + * @return result indicating the MCL was not emitted + */ + @Nonnull + public static MCLEmitResult notEmitted( + @Nonnull MetadataChangeLog metadataChangeLog, boolean processedMCL) { + return new MCLEmitResult(metadataChangeLog, null, processedMCL, false); + } + + /** + * Creates a result for an MCL that was emitted to Kafka. + * + * @param metadataChangeLog the MCL that was emitted + * @param mclFuture the future representing the async Kafka send operation + * @param processedMCL whether the MCL was preprocessed (e.g., sync index update) + * @return result indicating the MCL was emitted + * @throws IllegalArgumentException if mclFuture is null + */ + @Nonnull + public static MCLEmitResult emitted( + @Nonnull MetadataChangeLog metadataChangeLog, + @Nonnull Future mclFuture, + boolean processedMCL) { + return new MCLEmitResult(metadataChangeLog, mclFuture, processedMCL, true); + } + + /** + * Gets the production result, providing detailed information about success or failure. + * + *

For non-emitted MCLs, returns {@link ProductionResult#notEmitted()}. For emitted MCLs, + * blocks until the Kafka send completes and returns the result. + * + * @return the production result with success/failure details + */ + @Nonnull + public ProductionResult getProductionResult() { + if (!emitted) { + return ProductionResult.notEmitted(); + } + + try { + mclFuture.get(); + return ProductionResult.success(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return ProductionResult.failure(e); + } catch (ExecutionException e) { + // Unwrap the cause - ExecutionException is just boilerplate wrapper + Throwable cause = e.getCause(); + return ProductionResult.failure(cause != null ? cause : e); + } + } + + /** + * Attempts to determine if the MCL was successfully produced to Kafka. + * + *

This method blocks until the Kafka send operation completes or fails. For non-emitted MCLs, + * this always returns {@code false}. + * + * @return {@code true} if the MCL was successfully written to Kafka, {@code false} otherwise + */ + public boolean isProduced() { + return getProductionResult().isSuccess(); } - ; - // Whether this was preprocessed before being emitted - boolean processedMCL; + /** Represents the outcome of attempting to produce an MCL to Kafka. */ + @Getter + @ToString + @EqualsAndHashCode + public static final class ProductionResult { + private final ProductionStatus status; + private final Throwable error; + + private ProductionResult(ProductionStatus status, Throwable error) { + this.status = status; + this.error = error; + } + + @Nonnull + public static ProductionResult notEmitted() { + return new ProductionResult(ProductionStatus.NOT_EMITTED, null); + } + + @Nonnull + public static ProductionResult success() { + return new ProductionResult(ProductionStatus.SUCCESS, null); + } - // Set to true if the message was emitted, false if this was dropped due to some config. - boolean emitted; + @Nonnull + public static ProductionResult failure(@Nonnull Throwable error) { + return new ProductionResult(ProductionStatus.FAILURE, error); + } + + public boolean isSuccess() { + return status == ProductionStatus.SUCCESS; + } + + public boolean isFailure() { + return status == ProductionStatus.FAILURE; + } + + public boolean wasNotEmitted() { + return status == ProductionStatus.NOT_EMITTED; + } + + @Nonnull + public Optional getError() { + return Optional.ofNullable(error); + } + } + + public enum ProductionStatus { + NOT_EMITTED, + SUCCESS, + FAILURE + } } diff --git a/metadata-service/services/src/test/java/com/linkedin/metadata/entity/MCLEmitResultTest.java b/metadata-service/services/src/test/java/com/linkedin/metadata/entity/MCLEmitResultTest.java index 1a299073e653c6..ffc5efd5aab49f 100644 --- a/metadata-service/services/src/test/java/com/linkedin/metadata/entity/MCLEmitResultTest.java +++ b/metadata-service/services/src/test/java/com/linkedin/metadata/entity/MCLEmitResultTest.java @@ -7,6 +7,7 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.linkedin.mxe.MetadataChangeLog; import java.util.concurrent.CompletableFuture; @@ -17,17 +18,11 @@ public class MCLEmitResultTest { @Test - void testBuilderAndGetters() { + void testFactoryMethodsAndGetters() { MetadataChangeLog mcl = mock(MetadataChangeLog.class); Future mclFuture = CompletableFuture.completedFuture(null); - MCLEmitResult result = - MCLEmitResult.builder() - .metadataChangeLog(mcl) - .mclFuture(mclFuture) - .processedMCL(true) - .emitted(true) - .build(); + MCLEmitResult result = MCLEmitResult.emitted(mcl, mclFuture, true); assertEquals(result.getMetadataChangeLog(), mcl); assertEquals(result.getMclFuture(), mclFuture); @@ -40,19 +35,19 @@ void testIsProduced_SuccessfulFuture() { MetadataChangeLog mcl = mock(MetadataChangeLog.class); Future mclFuture = CompletableFuture.completedFuture("success"); - MCLEmitResult result = - MCLEmitResult.builder().metadataChangeLog(mcl).mclFuture(mclFuture).build(); + MCLEmitResult result = MCLEmitResult.emitted(mcl, mclFuture, false); assertTrue(result.isProduced()); } @Test - void testIsProduced_NullFuture() { + void testIsProduced_NotEmitted() { MetadataChangeLog mcl = mock(MetadataChangeLog.class); - MCLEmitResult result = MCLEmitResult.builder().metadataChangeLog(mcl).mclFuture(null).build(); + MCLEmitResult result = MCLEmitResult.notEmitted(mcl, false); assertFalse(result.isProduced()); + assertFalse(result.isEmitted()); } @Test @@ -66,8 +61,7 @@ void testIsProduced_FutureWithInterruptedException() { // This should not happen in test setup } - MCLEmitResult result = - MCLEmitResult.builder().metadataChangeLog(mcl).mclFuture(mclFuture).build(); + MCLEmitResult result = MCLEmitResult.emitted(mcl, mclFuture, false); assertFalse(result.isProduced()); } @@ -84,8 +78,7 @@ void testIsProduced_FutureWithExecutionException() { // This should not happen in test setup } - MCLEmitResult result = - MCLEmitResult.builder().metadataChangeLog(mcl).mclFuture(mclFuture).build(); + MCLEmitResult result = MCLEmitResult.emitted(mcl, mclFuture, false); assertFalse(result.isProduced()); } @@ -96,17 +89,16 @@ void testIsProduced_FailedFuture() { CompletableFuture mclFuture = new CompletableFuture<>(); mclFuture.completeExceptionally(new RuntimeException("Test failure")); - MCLEmitResult result = - MCLEmitResult.builder().metadataChangeLog(mcl).mclFuture(mclFuture).build(); + MCLEmitResult result = MCLEmitResult.emitted(mcl, mclFuture, false); assertFalse(result.isProduced()); } @Test - void testDefaultValues() { + void testNotEmittedDefaults() { MetadataChangeLog mcl = mock(MetadataChangeLog.class); - MCLEmitResult result = MCLEmitResult.builder().metadataChangeLog(mcl).build(); + MCLEmitResult result = MCLEmitResult.notEmitted(mcl, false); assertEquals(result.getMetadataChangeLog(), mcl); assertNull(result.getMclFuture()); @@ -115,48 +107,14 @@ void testDefaultValues() { assertFalse(result.isProduced()); } - @Test - void testToBuilder() { - MetadataChangeLog mcl = mock(MetadataChangeLog.class); - Future mclFuture = CompletableFuture.completedFuture(null); - - MCLEmitResult original = - MCLEmitResult.builder() - .metadataChangeLog(mcl) - .mclFuture(mclFuture) - .processedMCL(true) - .emitted(false) - .build(); - - MCLEmitResult modified = original.toBuilder().emitted(true).build(); - - assertEquals(modified.getMetadataChangeLog(), mcl); - assertEquals(modified.getMclFuture(), mclFuture); - assertTrue(modified.isProcessedMCL()); - assertTrue(modified.isEmitted()); - assertTrue(modified.isProduced()); - } - @Test void testEqualsAndHashCode() { MetadataChangeLog mcl = mock(MetadataChangeLog.class); Future mclFuture = CompletableFuture.completedFuture(null); - MCLEmitResult result1 = - MCLEmitResult.builder() - .metadataChangeLog(mcl) - .mclFuture(mclFuture) - .processedMCL(true) - .emitted(true) - .build(); - - MCLEmitResult result2 = - MCLEmitResult.builder() - .metadataChangeLog(mcl) - .mclFuture(mclFuture) - .processedMCL(true) - .emitted(true) - .build(); + MCLEmitResult result1 = MCLEmitResult.emitted(mcl, mclFuture, true); + + MCLEmitResult result2 = MCLEmitResult.emitted(mcl, mclFuture, true); assertEquals(result1, result2); assertEquals(result1.hashCode(), result2.hashCode()); @@ -167,13 +125,7 @@ void testToString() { MetadataChangeLog mcl = mock(MetadataChangeLog.class); Future mclFuture = CompletableFuture.completedFuture(null); - MCLEmitResult result = - MCLEmitResult.builder() - .metadataChangeLog(mcl) - .mclFuture(mclFuture) - .processedMCL(true) - .emitted(true) - .build(); + MCLEmitResult result = MCLEmitResult.emitted(mcl, mclFuture, true); String toString = result.toString(); assertNotNull(toString); @@ -185,21 +137,98 @@ void testToString() { @Test void testAllPossibleStates() { MetadataChangeLog mcl = mock(MetadataChangeLog.class); + Future mclFuture = CompletableFuture.completedFuture(null); // Test all combinations of processedMCL and emitted flags for (boolean processed : new boolean[] {true, false}) { - for (boolean emitted : new boolean[] {true, false}) { - MCLEmitResult result = - MCLEmitResult.builder() - .metadataChangeLog(mcl) - .processedMCL(processed) - .emitted(emitted) - .build(); - - assertEquals(result.isProcessedMCL(), processed); - assertEquals(result.isEmitted(), emitted); - assertFalse(result.isProduced()); // No future set, so should always be false - } + // Test not emitted state + MCLEmitResult notEmittedResult = MCLEmitResult.notEmitted(mcl, processed); + assertEquals(notEmittedResult.isProcessedMCL(), processed); + assertFalse(notEmittedResult.isEmitted()); + assertFalse(notEmittedResult.isProduced()); + + // Test emitted state + MCLEmitResult emittedResult = MCLEmitResult.emitted(mcl, mclFuture, processed); + assertEquals(emittedResult.isProcessedMCL(), processed); + assertTrue(emittedResult.isEmitted()); + assertTrue(emittedResult.isProduced()); // Future completed successfully + } + } + + @Test + void testInvariantValidation_EmittedWithNullFuture() { + MetadataChangeLog mcl = mock(MetadataChangeLog.class); + + try { + MCLEmitResult.emitted(mcl, null, false); + fail("Should throw IllegalArgumentException when mclFuture is null"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("mclFuture cannot be null")); + } + } + + @Test + void testProductionResult_Success() { + MetadataChangeLog mcl = mock(MetadataChangeLog.class); + Future mclFuture = CompletableFuture.completedFuture("success"); + + MCLEmitResult result = MCLEmitResult.emitted(mcl, mclFuture, false); + MCLEmitResult.ProductionResult productionResult = result.getProductionResult(); + + assertTrue(productionResult.isSuccess()); + assertFalse(productionResult.isFailure()); + assertFalse(productionResult.wasNotEmitted()); + assertFalse(productionResult.getError().isPresent()); + } + + @Test + void testProductionResult_Failure() { + MetadataChangeLog mcl = mock(MetadataChangeLog.class); + CompletableFuture mclFuture = new CompletableFuture<>(); + RuntimeException testException = new RuntimeException("Test failure"); + mclFuture.completeExceptionally(testException); + + MCLEmitResult result = MCLEmitResult.emitted(mcl, mclFuture, false); + MCLEmitResult.ProductionResult productionResult = result.getProductionResult(); + + assertFalse(productionResult.isSuccess()); + assertTrue(productionResult.isFailure()); + assertFalse(productionResult.wasNotEmitted()); + assertTrue(productionResult.getError().isPresent()); + assertEquals(productionResult.getError().get(), testException); + } + + @Test + void testProductionResult_NotEmitted() { + MetadataChangeLog mcl = mock(MetadataChangeLog.class); + + MCLEmitResult result = MCLEmitResult.notEmitted(mcl, false); + MCLEmitResult.ProductionResult productionResult = result.getProductionResult(); + + assertFalse(productionResult.isSuccess()); + assertFalse(productionResult.isFailure()); + assertTrue(productionResult.wasNotEmitted()); + assertFalse(productionResult.getError().isPresent()); + } + + @Test + void testProductionResult_ExecutionExceptionUnwrapping() { + MetadataChangeLog mcl = mock(MetadataChangeLog.class); + Future mclFuture = mock(Future.class); + RuntimeException rootCause = new RuntimeException("Root cause"); + + try { + when(mclFuture.get()).thenThrow(new ExecutionException("Wrapper", rootCause)); + } catch (InterruptedException | ExecutionException e) { + // This should not happen in test setup } + + MCLEmitResult result = MCLEmitResult.emitted(mcl, mclFuture, false); + MCLEmitResult.ProductionResult productionResult = result.getProductionResult(); + + assertTrue(productionResult.isFailure()); + assertTrue(productionResult.getError().isPresent()); + // Verify the root cause was unwrapped, not the ExecutionException wrapper + assertEquals(productionResult.getError().get(), rootCause); } }