From b7ec80284906c67c7b698715749bc7a39a370d32 Mon Sep 17 00:00:00 2001 From: legalles Date: Mon, 10 Feb 2025 11:52:07 +0100 Subject: [PATCH 1/3] feat: implements IJobWorker.IAsyncDisposable --- Client/Api/Worker/IJobWorker.cs | 2 +- Client/Impl/Worker/JobWorker.cs | 16 ++++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/Client/Api/Worker/IJobWorker.cs b/Client/Api/Worker/IJobWorker.cs index 7a9563bb..904ec1e1 100644 --- a/Client/Api/Worker/IJobWorker.cs +++ b/Client/Api/Worker/IJobWorker.cs @@ -22,7 +22,7 @@ namespace Zeebe.Client.Api.Worker; /// open, the worker continuously receives jobs from the broker and hands them to a registered /// . /// -public interface IJobWorker : IDisposable +public interface IJobWorker : IDisposable, IAsyncDisposable { /// true if this registration is currently active and work items are being received for it. bool IsOpen(); diff --git a/Client/Impl/Worker/JobWorker.cs b/Client/Impl/Worker/JobWorker.cs index 3d7a3ad3..d2b70d6f 100644 --- a/Client/Impl/Worker/JobWorker.cs +++ b/Client/Impl/Worker/JobWorker.cs @@ -61,16 +61,20 @@ internal JobWorker(JobWorkerBuilder builder) } /// + [Obsolete("Use DisposeAsync instead.", false)] public void Dispose() + { + _ = DisposeAsync(); + } + + /// + public async ValueTask DisposeAsync() { source.Cancel(); // delay disposing, since poll and handler take some time to close - _ = Task.Delay(TimeSpan.FromMilliseconds(pollInterval.TotalMilliseconds * 2)) - .ContinueWith(t => - { - logger?.LogError("Dispose source"); - source.Dispose(); - }); + await Task.Delay(TimeSpan.FromMilliseconds(pollInterval.TotalMilliseconds * 2)); + logger?.LogError("Dispose source"); + source.Dispose(); isRunning = false; } From cc37a0963df2bb41f0b61487741683820afcdc26 Mon Sep 17 00:00:00 2001 From: legalles Date: Tue, 11 Feb 2025 12:27:04 +0100 Subject: [PATCH 2/3] feat: PR comments. Improve disposal + UT --- Client.UnitTests/JobWorkerTest.cs | 117 ++++++++++++++++++++---------- Client/Impl/Worker/JobWorker.cs | 56 +++++++++----- 2 files changed, 114 insertions(+), 59 deletions(-) diff --git a/Client.UnitTests/JobWorkerTest.cs b/Client.UnitTests/JobWorkerTest.cs index 4c34dc51..9af3f684 100644 --- a/Client.UnitTests/JobWorkerTest.cs +++ b/Client.UnitTests/JobWorkerTest.cs @@ -17,6 +17,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading; +using System.Threading.Tasks; using GatewayProtocol; using NLog; using NUnit.Framework; @@ -42,19 +43,19 @@ public void ShouldSendRequestReceiveResponseAsExpected() RequestTimeout = 5_000L }; - TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse()); + TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse()); // when var signal = new EventWaitHandle(false, EventResetMode.AutoReset); var receivedJobs = new List(); using (var jobWorker = ZeebeClient.NewWorker() .JobType("foo") - .Handler((jobClient, job) => + .Handler((_, job) => { receivedJobs.Add(job); if (receivedJobs.Count == 3) { - _ = signal.Set(); + signal.Set(); } }) .MaxJobsActive(3) @@ -65,7 +66,7 @@ public void ShouldSendRequestReceiveResponseAsExpected() .Open()) { Assert.True(jobWorker.IsOpen()); - _ = signal.WaitOne(); + signal.WaitOne(); } // then @@ -118,7 +119,7 @@ public void ShouldSendRequestWithTenantIdsListReceiveResponseAsExpected() .Open()) { Assert.True(jobWorker.IsOpen()); - _ = signal.WaitOne(); + signal.WaitOne(); } // then @@ -139,13 +140,13 @@ public void ShouldFailWithZeroThreadCount() var aggregateException = Assert.Throws( () => { - _ = ZeebeClient.NewWorker() + ZeebeClient.NewWorker() .JobType("foo") - .Handler((jobClient, job) => { }) + .Handler((_, _) => { }) .HandlerThreads(0); }); StringAssert.Contains("Expected an handler thread count larger then zero, but got 0.", - aggregateException.Message); + aggregateException?.Message); } [Test] @@ -161,7 +162,7 @@ public void ShouldSendAsyncCompleteInHandler() RequestTimeout = 5_000L }; - TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse()); + TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse()); // when var signal = new EventWaitHandle(false, EventResetMode.AutoReset); @@ -170,11 +171,11 @@ public void ShouldSendAsyncCompleteInHandler() .JobType("foo") .Handler(async (jobClient, job) => { - _ = await jobClient.NewCompleteJobCommand(job).Send(); + await jobClient.NewCompleteJobCommand(job).Send(); completedJobs.Add(job); if (completedJobs.Count == 3) { - _ = signal.Set(); + signal.Set(); } }) .MaxJobsActive(3) @@ -186,7 +187,7 @@ public void ShouldSendAsyncCompleteInHandler() .Open()) { Assert.True(jobWorker.IsOpen()); - _ = signal.WaitOne(); + signal.WaitOne(); } // then @@ -214,7 +215,7 @@ public void ShouldUseMultipleHandlerThreads() RequestTimeout = 5_000L }; - TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse()); + TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse()); // when var signal = new EventWaitHandle(false, EventResetMode.AutoReset); @@ -223,11 +224,11 @@ public void ShouldUseMultipleHandlerThreads() .JobType("foo") .Handler(async (jobClient, job) => { - _ = await jobClient.NewCompleteJobCommand(job).Send(); - _ = completedJobs.TryAdd(job.Key, job); + await jobClient.NewCompleteJobCommand(job).Send(); + completedJobs.TryAdd(job.Key, job); if (completedJobs.Count == 3) { - _ = signal.Set(); + signal.Set(); } }) .MaxJobsActive(3) @@ -239,7 +240,7 @@ public void ShouldUseMultipleHandlerThreads() .Open()) { Assert.True(jobWorker.IsOpen()); - _ = signal.WaitOne(); + signal.WaitOne(); } // then @@ -265,7 +266,7 @@ public void ShouldSendCompleteInHandler() RequestTimeout = 5_000L }; - TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse()); + TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse()); // when var signal = new EventWaitHandle(false, EventResetMode.AutoReset); @@ -274,11 +275,11 @@ public void ShouldSendCompleteInHandler() .JobType("foo") .Handler((jobClient, job) => { - _ = jobClient.NewCompleteJobCommand(job).Send(); + jobClient.NewCompleteJobCommand(job).Send(); completedJobs.Add(job); if (completedJobs.Count == 3) { - _ = signal.Set(); + signal.Set(); } }) .MaxJobsActive(3) @@ -289,7 +290,7 @@ public void ShouldSendCompleteInHandler() .Open()) { Assert.True(jobWorker.IsOpen()); - _ = signal.WaitOne(); + signal.WaitOne(); } // then @@ -332,13 +333,13 @@ public void ShouldSendRequestsWithDifferentAmounts() RequestTimeout = 5_000L }; - TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse()); + TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse()); // when var receivedJobs = new List(); using (var jobWorker = ZeebeClient.NewWorker() .JobType("foo") - .Handler((jobClient, job) => + .Handler((_, job) => { // block job handling receivedJobs.Add(job); @@ -346,7 +347,7 @@ public void ShouldSendRequestsWithDifferentAmounts() { using (var signal = new EventWaitHandle(false, EventResetMode.AutoReset)) { - _ = signal.WaitOne(); + signal.WaitOne(); } } }) @@ -384,7 +385,7 @@ public void ShouldSendRequestWithTimeSpanTimeoutAsMilliseconds() RequestTimeout = 5_000L }; - TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse()); + TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse()); // when var signal = new EventWaitHandle(false, EventResetMode.AutoReset); @@ -396,7 +397,7 @@ public void ShouldSendRequestWithTimeSpanTimeoutAsMilliseconds() receivedJobs.Add(job); if (receivedJobs.Count == 3) { - _ = signal.Set(); + signal.Set(); } }) .MaxJobsActive(1) @@ -407,7 +408,7 @@ public void ShouldSendRequestWithTimeSpanTimeoutAsMilliseconds() .Open()) { Assert.True(jobWorker.IsOpen()); - _ = signal.WaitOne(); + signal.WaitOne(); } // then @@ -434,7 +435,7 @@ public void ShouldSendRequestWithFetchVariables() FetchVariable = { "foo", "bar", "test" } }; - TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse()); + TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse()); // when var signal = new EventWaitHandle(false, EventResetMode.AutoReset); @@ -446,7 +447,7 @@ public void ShouldSendRequestWithFetchVariables() receivedJobs.Add(job); if (receivedJobs.Count == 3) { - _ = signal.Set(); + signal.Set(); } }) .MaxJobsActive(1) @@ -457,7 +458,7 @@ public void ShouldSendRequestWithFetchVariables() .Open()) { Assert.True(jobWorker.IsOpen()); - _ = signal.WaitOne(); + signal.WaitOne(); } // then @@ -484,7 +485,7 @@ public void ShouldSendRequestWithFetchVariablesList() FetchVariable = { "foo", "bar", "test" } }; IList variableNames = new List { "foo", "bar", "test" }; - TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse()); + TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse()); // when var signal = new EventWaitHandle(false, EventResetMode.AutoReset); @@ -496,7 +497,7 @@ public void ShouldSendRequestWithFetchVariablesList() receivedJobs.Add(job); if (receivedJobs.Count == 3) { - _ = signal.Set(); + signal.Set(); } }) .MaxJobsActive(1) @@ -507,7 +508,7 @@ public void ShouldSendRequestWithFetchVariablesList() .Open()) { Assert.True(jobWorker.IsOpen()); - _ = signal.WaitOne(); + signal.WaitOne(); } // then @@ -540,12 +541,12 @@ public void ShouldSendFailCommandOnExceptionInJobHandler() Retries = 2 }; - TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse()); + TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse()); // when using (var jobWorker = ZeebeClient.NewWorker() .JobType("foo") - .Handler((jobClient, job) => + .Handler((_, job) => { if (job.Key == 1) { @@ -577,7 +578,7 @@ public void ShouldSendFailCommandOnExceptionInJobHandler() public void ShouldCompleteAfterSendFailCommandOnExceptionInJobHandler() { // given - TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse()); + TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse()); // when using (var jobWorker = ZeebeClient.NewWorker() @@ -589,7 +590,7 @@ public void ShouldCompleteAfterSendFailCommandOnExceptionInJobHandler() throw new Exception("Fail"); } - _ = await jobClient.NewCompleteJobCommand(job).Send(); + await jobClient.NewCompleteJobCommand(job).Send(); }) .MaxJobsActive(3) .Name("jobWorker") @@ -626,12 +627,12 @@ public void ShouldUseAutoCompleteWithWorker() }; TestService.AddRequestHandler( typeof(ActivateJobsRequest), - request => CreateExpectedResponse()); + _ => CreateExpectedResponse()); // when using (var jobWorker = ZeebeClient.NewWorker() .JobType("foo") - .Handler((jobClient, job) => { Logger.Info("Handler has seen job '{0}'", job); }) + .Handler((_, job) => { Logger.Info("Handler has seen job '{Job}'", job); }) .AutoCompletion() .MaxJobsActive(3) .Name("jobWorker") @@ -659,6 +660,44 @@ public void ShouldUseAutoCompleteWithWorker() Assert.Contains(3, completeJobRequests); } + [Test] + public async Task CanDisposeRunningWorker() + { + // given + TestService.AddRequestHandler( + typeof(ActivateJobsRequest), + _ => CreateExpectedResponse()); + var signal = new EventWaitHandle(false, EventResetMode.AutoReset); + + // when + var jobWorker = ZeebeClient.NewWorker() + .JobType("foo") + .Handler(async (jobClient, job) => + { + // trigger worker disposal with second job + signal.Set(); + + // this should not be completed + await jobClient.NewCompleteJobCommand(job).Send(); + }) + .AutoCompletion() + .MaxJobsActive(3) + .Name("jobWorker") + .Timeout(TimeSpan.FromSeconds(1)) + .PollInterval(TimeSpan.FromSeconds(3)) // long + .PollingTimeout(TimeSpan.FromMilliseconds(100)) + .Open(); + Assert.True(jobWorker.IsOpen()); + + signal.WaitOne(); + // disposal must be quick even though the polling interval is long + await jobWorker.DisposeAsync().ConfigureAwait(false); + + // activated job was not awaited + Assert.AreEqual(0, TestService.Requests[typeof(CompleteJobRequest)].Count); + Assert.True(jobWorker.IsClosed()); + } + public static ActivateJobsResponse CreateExpectedResponse() { return new ActivateJobsResponse diff --git a/Client/Impl/Worker/JobWorker.cs b/Client/Impl/Worker/JobWorker.cs index d2b70d6f..e50a369b 100644 --- a/Client/Impl/Worker/JobWorker.cs +++ b/Client/Impl/Worker/JobWorker.cs @@ -13,6 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#nullable enable using System; using System.Threading; using System.Threading.Tasks; @@ -36,11 +37,12 @@ public sealed class JobWorker : IJobWorker private readonly JobActivator jobActivator; private readonly AsyncJobHandler jobHandler; private readonly JobWorkerBuilder jobWorkerBuilder; - private readonly ILogger logger; + private readonly ILogger? logger; private readonly int maxJobsActive; private readonly TimeSpan pollInterval; + private Task? pollingTask; - private readonly CancellationTokenSource source; + private readonly CancellationTokenSource source = new (); private readonly double thresholdJobsActivation; private int currentJobsActive; @@ -49,7 +51,6 @@ public sealed class JobWorker : IJobWorker internal JobWorker(JobWorkerBuilder builder) { jobWorkerBuilder = builder; - source = new CancellationTokenSource(); logger = builder.LoggerFactory?.CreateLogger(); jobHandler = jobWorkerBuilder.Handler(); autoCompletion = builder.AutoCompletionEnabled(); @@ -70,10 +71,13 @@ public void Dispose() /// public async ValueTask DisposeAsync() { - source.Cancel(); - // delay disposing, since poll and handler take some time to close - await Task.Delay(TimeSpan.FromMilliseconds(pollInterval.TotalMilliseconds * 2)); - logger?.LogError("Dispose source"); + if (pollingTask != null) + { + source.Cancel(); + await pollingTask; + } + + logger?.LogInformation("JobWorker is now disposed"); source.Dispose(); isRunning = false; } @@ -108,14 +112,26 @@ internal void Open() var output = new ActionBlock(activatedJob => { _ = Interlocked.Decrement(ref currentJobsActive); }, executionOptions); - _ = input.LinkTo(transformer); - _ = transformer.LinkTo(output); + var linkInputTransformer = input.LinkTo(transformer); + var linkTransformerOutput = transformer.LinkTo(output); // Start polling - _ = Task.Run(async () => await PollJobs(input, cancellationToken), - cancellationToken).ContinueWith( - t => logger?.LogError(t.Exception, "Job polling failed."), - TaskContinuationOptions.OnlyOnFaulted); + pollingTask = Task.Run(async () => await PollJobs(input, cancellationToken), + cancellationToken).ContinueWith( + t => + { + if (t.IsFaulted) + { + logger?.LogError(t.Exception, "Job polling failed"); + } + else if (t.IsCanceled) + { + logger?.LogInformation("Job polling Cancelled"); + } + + linkInputTransformer.Dispose(); + linkTransformerOutput.Dispose(); + }, CancellationToken.None); logger?.LogDebug( "Job worker ({worker}) for job type {type} has been opened.", @@ -155,7 +171,7 @@ private async Task PollJobs(ITargetBlock input, CancellationToken cancella try { await jobActivator.SendActivateRequest(activateJobsRequest, - async jobsResponse => await HandleActivationResponse(input, jobsResponse, jobCount), + async jobsResponse => await HandleActivationResponse(input, jobsResponse, jobCount, cancellationToken), null, cancellationToken); } @@ -172,7 +188,8 @@ await jobActivator.SendActivateRequest(activateJobsRequest, } } - private async Task HandleActivationResponse(ITargetBlock input, IActivateJobsResponse response, int jobCount) + private async Task HandleActivationResponse(ITargetBlock input, IActivateJobsResponse response, int jobCount, + CancellationToken cancellationToken) { logger?.LogDebug( "Job worker ({worker}) activated {activatedCount} of {requestCount} successfully.", @@ -182,8 +199,8 @@ private async Task HandleActivationResponse(ITargetBlock input, IActivateJ foreach (var job in response.Jobs) { - _ = await input.SendAsync(job); - _ = Interlocked.Increment(ref currentJobsActive); + await input.SendAsync(job, cancellationToken); + Interlocked.Increment(ref currentJobsActive); } } @@ -228,8 +245,7 @@ private async Task TryToAutoCompleteJob(JobClientWrapper jobClient, IJob activat "Job worker ({worker}) will auto complete job with key '{key}'", activateJobsRequest.Worker, activatedJob.Key); - _ = await jobClient.NewCompleteJobCommand(activatedJob) - .Send(cancellationToken); + await jobClient.NewCompleteJobCommand(activatedJob).Send(cancellationToken); } } @@ -254,6 +270,6 @@ private Task FailActivatedJob(JobClientWrapper jobClient, IJob activatedJob, Can { logger?.LogWarning(task.Exception, "Problem on failing job occured."); } - }, cancellationToken); + }, CancellationToken.None); } } \ No newline at end of file From f2b6276129933e4ef415db09db7a12bfbbeca5f8 Mon Sep 17 00:00:00 2001 From: legalles Date: Wed, 19 Feb 2025 11:05:18 +0100 Subject: [PATCH 3/3] fix PR comments --- Client.UnitTests/JobWorkerTest.cs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/Client.UnitTests/JobWorkerTest.cs b/Client.UnitTests/JobWorkerTest.cs index 9af3f684..82a56944 100644 --- a/Client.UnitTests/JobWorkerTest.cs +++ b/Client.UnitTests/JobWorkerTest.cs @@ -672,13 +672,10 @@ public async Task CanDisposeRunningWorker() // when var jobWorker = ZeebeClient.NewWorker() .JobType("foo") - .Handler(async (jobClient, job) => + .Handler((_, _) => { // trigger worker disposal with second job signal.Set(); - - // this should not be completed - await jobClient.NewCompleteJobCommand(job).Send(); }) .AutoCompletion() .MaxJobsActive(3) @@ -692,9 +689,6 @@ public async Task CanDisposeRunningWorker() signal.WaitOne(); // disposal must be quick even though the polling interval is long await jobWorker.DisposeAsync().ConfigureAwait(false); - - // activated job was not awaited - Assert.AreEqual(0, TestService.Requests[typeof(CompleteJobRequest)].Count); Assert.True(jobWorker.IsClosed()); }