Skip to content

Create a fallback task in order to monitor the session's health #13627

@Gpetrak

Description

@Gpetrak

The harvesting workflow’s architecture is based on the execution of a group of tasks (each task representing a harvestable resource). Once the group is completed, a task called the finalizer finalizes the process and resets the status of both the session and the harvester.

However, if any task in the group expires or reaches its timeout, the finalizer will never be executed. This causes the session to get stuck. This behavior is a result of how Celery internally handles task groups and finalizers.

To address this issue, we can schedule an external task that monitors the harvesting session’s health, which we can be called harvesting_session_monitor.

This monitoring task will be scheduled to run at a future point in time and check the harvesting session's status and the processing time of the whole harvesting session. If the monitor detects that the session is still running or in an inconsistent state after a specific amount of time, it will restore the state and allow future harvesting runs to proceed. Ideally, the finalizer should be able to unschedule the monitoring task: in that case, the session status has been managed successfully, and no external intervention is needed.

Proposed implementation
First, we can define a dynamic workflow time (similar to the dynamic expiration time but with a larger buffer):

workflow_time = num_resources * estimated_duration_per_resource + buffer_time

Next, we create the harvesting_session_monitor task, which will be scheduled from within the harvest_resources task immediately after retrieving the session.

At this point, we have all the necessary information:

  • workflow_time
  • the AsynchronousHarvestingSession object (which includes the session’s status and its start time)

The monitoring task should then verify whether the workflow time has passed and check the session’s status by following these steps:

  • Retrieve the current session using its ID: AsynchronousHarvestingSession
  • if session.status not in [session.STATUS_ON_GOING, session.STATUS_ABORTING], return
  • Set the workflow_time based on the number of the resources
  • Set the expected time of the session: expected_finish = session.started + timedelta(seconds=workflow_time)
  • if now_ > expected_finish, the session got stuck, call the finalizer
  • The finalizer will stop the monitoring task
  • Else, call the monitoring task in a specific amount of seconds

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions