-
Notifications
You must be signed in to change notification settings - Fork 90
Autopopulate 2.0 #1244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ttngu207
wants to merge
44
commits into
datajoint:feat/autopopulate2
Choose a base branch
from
ttngu207:autopopulate-2.0
base: feat/autopopulate2
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Autopopulate 2.0 #1244
Changes from 29 commits
Commits
Show all changes
44 commits
Select commit
Hold shift + click to select a range
4f01ea3
added JobsConfig
de4437c
successful prototype for key_source
8edf179
added `refresh_jobs` and `purge_invalid_jobs`
fd849bc
Merge branch 'populate_success_count' into autopopulate-2.0
8692eb4
rename function: `schedule_jobs`
f38ce9f
implement `schedule_jobs` as part of `populate()`
ef3adc2
Merge branch 'master' into autopopulate-2.1
8b9ac0f
remove JobConfigTable and register_key_source
71b0696
bugfix, add tests
c13b3e1
bugfix - remove `jobconfig`
1f3fac1
Merge branch 'master' into autopulate-2.0
ttngu207 32fbc6d
Merge branch 'master' into autopulate-2.0
ttngu207 0d9ec01
chore: minor bugfix
ttngu207 4cc170d
chore: code cleanup
ttngu207 b7e4d9b
fix: `key` attribute of type `JSON`
ttngu207 57c7247
feat: prevent excessive scheduling with `min_scheduling_interval`
ttngu207 3f5247b
chore: minor cleanup
ttngu207 45b5658
feat: improve logic to prevent excessive scheduling
ttngu207 9903e02
chore: minor bugfix
ttngu207 872c5dc
chore: tiny bugfix
ttngu207 69d8831
fix: fix scheduling logic
ttngu207 cc0f398
chore: minor logging tweak
ttngu207 2ac9fa2
fix: log run_duration in error jobs
ttngu207 db93e0a
fix: improve logic in `purge_invalid_jobs`
ttngu207 c9a5750
docs: new `jobs_orchestration.md` docs
ttngu207 28df6c2
Update jobs_orchestration.md
ttngu207 b0308e2
Update jobs_orchestration.md
ttngu207 e9f5377
feat: add `run_metadata` column to `jobs` table
ttngu207 eb90d3d
Merge branch 'datajoint:master' into autopopulate-2.0
ttngu207 e7c8943
fix: improve error handling when `make_fetch` referential integrity f…
ttngu207 e55bbcb
style: black format
ttngu207 964743e
style: format
ttngu207 53e38f7
Merge pull request #1245 from ttngu207/bugfix-three-part-make
dimitri-yatsenko 0cf1ea0
Merge remote-tracking branch 'upstream/master' into autopopulate-2.0
ttngu207 15f791c
feat: add `_job` hidden column for `Imported` `Computed` tables
ttngu207 18727e9
chore: rename `purge_valid_jobs` -> `purge_jobs`
ttngu207 efbb920
feat: insert `_job` metadata upon `make` completion
ttngu207 918cc9d
chore: minor code optimization in `purge_jobs`
ttngu207 dcfeaf5
feat: remove logging of `success` jobs in Jobs table
ttngu207 1f773fa
docs: minor updates
ttngu207 7184ce5
format: black
ttngu207 9d3a9e4
chore: remove the optional `purge_jobs` in `schedule_jobs`
ttngu207 269c4af
chore: code cleanup
ttngu207 3d7c4ea
fix: update job metadata in the make's transaction
ttngu207 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,4 @@ | ||
"""This module defines class dj.AutoPopulate""" | ||
|
||
import contextlib | ||
import datetime | ||
import inspect | ||
|
@@ -15,6 +14,8 @@ | |
from .errors import DataJointError, LostConnectionError | ||
from .expression import AndList, QueryExpression | ||
from .hash import key_hash | ||
from .settings import config | ||
from .utils import user_choice, to_camel_case | ||
|
||
# noinspection PyExceptionInherit,PyCallingNonCallable | ||
|
||
|
@@ -24,14 +25,14 @@ | |
# --- helper functions for multiprocessing -- | ||
|
||
|
||
def _initialize_populate(table, jobs, populate_kwargs): | ||
def _initialize_populate(table, reserve_jobs, populate_kwargs): | ||
""" | ||
Initialize the process for multiprocessing. | ||
Saves the unpickled copy of the table to the current process and reconnects. | ||
""" | ||
process = mp.current_process() | ||
process.table = table | ||
process.jobs = jobs | ||
process.reserve_jobs = reserve_jobs | ||
process.populate_kwargs = populate_kwargs | ||
table.connection.connect() # reconnect | ||
|
||
|
@@ -43,7 +44,9 @@ def _call_populate1(key): | |
:return: key, error if error, otherwise None | ||
""" | ||
process = mp.current_process() | ||
return process.table._populate1(key, process.jobs, **process.populate_kwargs) | ||
return process.table._populate1( | ||
key, process.reserve_jobs, **process.populate_kwargs | ||
) | ||
|
||
|
||
class AutoPopulate: | ||
|
@@ -91,6 +94,7 @@ def _rename_attributes(table, props): | |
self._key_source = _rename_attributes(*parents[0]) | ||
for q in parents[1:]: | ||
self._key_source *= _rename_attributes(*q) | ||
|
||
return self._key_source | ||
|
||
def make(self, key): | ||
|
@@ -228,6 +232,7 @@ def populate( | |
display_progress=False, | ||
processes=1, | ||
make_kwargs=None, | ||
schedule_jobs=True, | ||
): | ||
""" | ||
``table.populate()`` calls ``table.make(key)`` for every primary key in | ||
|
@@ -249,6 +254,8 @@ def populate( | |
to be passed down to each ``make()`` call. Computation arguments should be | ||
specified within the pipeline e.g. using a `dj.Lookup` table. | ||
:type make_kwargs: dict, optional | ||
:param schedule_jobs: if True, run schedule_jobs before doing populate (default: True), | ||
only applicable if reserved_jobs is True | ||
:return: a dict with two keys | ||
"success_count": the count of successful ``make()`` calls in this ``populate()`` call | ||
"error_list": the error list that is filled if `suppress_errors` is True | ||
|
@@ -261,32 +268,40 @@ def populate( | |
raise DataJointError( | ||
"The order argument must be one of %s" % str(valid_order) | ||
) | ||
jobs = ( | ||
self.connection.schemas[self.target.database].jobs if reserve_jobs else None | ||
) | ||
|
||
if schedule_jobs: | ||
self.schedule_jobs(*restrictions, purge_invalid_jobs=False) | ||
|
||
# define and set up signal handler for SIGTERM: | ||
if reserve_jobs: | ||
|
||
def handler(signum, frame): | ||
logger.info("Populate terminated by SIGTERM") | ||
raise SystemExit("SIGTERM received") | ||
|
||
old_handler = signal.signal(signal.SIGTERM, handler) | ||
|
||
# retrieve `keys` if not provided | ||
if keys is None: | ||
keys = (self._jobs_to_do(restrictions) - self.target).fetch( | ||
"KEY", limit=limit | ||
) | ||
|
||
# exclude "error", "ignore" or "reserved" jobs | ||
if reserve_jobs: | ||
exclude_key_hashes = ( | ||
jobs | ||
& {"table_name": self.target.table_name} | ||
& 'status in ("error", "ignore", "reserved")' | ||
).fetch("key_hash") | ||
keys = [key for key in keys if key_hash(key) not in exclude_key_hashes] | ||
if reserve_jobs: | ||
keys = ( | ||
self.jobs | ||
& {'status': 'scheduled'} | ||
).fetch("key", order_by="timestamp", limit=limit) | ||
if restrictions: | ||
# hitting the `key_source` again to apply the restrictions | ||
# this is expensive/suboptimal | ||
keys = (self._jobs_to_do(restrictions) & keys).fetch("KEY") | ||
else: | ||
keys = (self._jobs_to_do(restrictions) - self.target).fetch( | ||
"KEY", limit=limit | ||
) | ||
else: | ||
# exclude "error", "ignore" or "reserved" jobs | ||
if reserve_jobs: | ||
exclude_key_hashes = ( | ||
self.jobs | ||
& 'status in ("error", "ignore", "reserved")' | ||
).fetch("key_hash") | ||
keys = [key for key in keys if key_hash(key) not in exclude_key_hashes] | ||
|
||
if order == "reverse": | ||
keys.reverse() | ||
|
@@ -316,7 +331,7 @@ def handler(signum, frame): | |
if display_progress | ||
else keys | ||
): | ||
status = self._populate1(key, jobs, **populate_kwargs) | ||
status = self._populate1(key, reserve_jobs, **populate_kwargs) | ||
if status is True: | ||
success_list.append(1) | ||
elif isinstance(status, tuple): | ||
|
@@ -358,11 +373,16 @@ def handler(signum, frame): | |
} | ||
|
||
def _populate1( | ||
self, key, jobs, suppress_errors, return_exception_objects, make_kwargs=None | ||
self, | ||
key, | ||
reserve_jobs, | ||
suppress_errors, | ||
return_exception_objects, | ||
make_kwargs=None, | ||
): | ||
""" | ||
populates table for one source key, calling self.make inside a transaction. | ||
:param jobs: the jobs table or None if not reserve_jobs | ||
:param reserve_jobs: if True, reserve jobs to populate in asynchronous fashion | ||
:param key: dict specifying job to populate | ||
:param suppress_errors: bool if errors should be suppressed and returned | ||
:param return_exception_objects: if True, errors must be returned as objects | ||
|
@@ -372,7 +392,7 @@ def _populate1( | |
# use the legacy `_make_tuples` callback. | ||
make = self._make_tuples if hasattr(self, "_make_tuples") else self.make | ||
|
||
if jobs is not None and not jobs.reserve( | ||
if reserve_jobs and not self._Jobs.reserve( | ||
self.target.table_name, self._job_key(key) | ||
): | ||
return False | ||
|
@@ -385,12 +405,12 @@ def _populate1( | |
if key in self.target: # already populated | ||
if not is_generator: | ||
self.connection.cancel_transaction() | ||
if jobs is not None: | ||
jobs.complete(self.target.table_name, self._job_key(key)) | ||
self._Jobs.complete(self.target.table_name, self._job_key(key)) | ||
return False | ||
|
||
logger.debug(f"Making {key} -> {self.target.full_table_name}") | ||
self.__class__._allow_insert = True | ||
make_start = datetime.datetime.utcnow() | ||
|
||
try: | ||
if not is_generator: | ||
|
@@ -431,13 +451,16 @@ def _populate1( | |
logger.debug( | ||
f"Error making {key} -> {self.target.full_table_name} - {error_message}" | ||
) | ||
if jobs is not None: | ||
if reserve_jobs: | ||
# show error name and error message (if any) | ||
jobs.error( | ||
self._Jobs.error( | ||
self.target.table_name, | ||
self._job_key(key), | ||
error_message=error_message, | ||
error_stack=traceback.format_exc(), | ||
run_duration=( | ||
datetime.datetime.utcnow() - make_start | ||
).total_seconds(), | ||
) | ||
if not suppress_errors or isinstance(error, SystemExit): | ||
raise | ||
|
@@ -446,9 +469,14 @@ def _populate1( | |
return key, error if return_exception_objects else error_message | ||
else: | ||
self.connection.commit_transaction() | ||
self._Jobs.complete( | ||
self.target.table_name, | ||
self._job_key(key), | ||
run_duration=( | ||
datetime.datetime.utcnow() - make_start | ||
).total_seconds(), | ||
) | ||
logger.debug(f"Success making {key} -> {self.target.full_table_name}") | ||
if jobs is not None: | ||
jobs.complete(self.target.table_name, self._job_key(key)) | ||
return True | ||
finally: | ||
self.__class__._allow_insert = False | ||
|
@@ -475,3 +503,104 @@ def progress(self, *restrictions, display=False): | |
), | ||
) | ||
return remaining, total | ||
|
||
@property | ||
def _Jobs(self): | ||
return self.connection.schemas[self.target.database].jobs | ||
|
||
@property | ||
def jobs(self): | ||
return self._Jobs & {"table_name": self.target.table_name} | ||
|
||
def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_interval=None): | ||
""" | ||
Schedule new jobs for this autopopulate table by finding keys that need computation. | ||
|
||
This method implements an optimization strategy to avoid excessive scheduling: | ||
1. First checks if jobs were scheduled recently (within min_scheduling_interval) | ||
2. If recent scheduling event exists, skips scheduling to prevent database load | ||
3. Otherwise, finds keys that need computation and schedules them | ||
|
||
The method also optionally purges invalid jobs (jobs that no longer exist in key_source) | ||
to maintain database cleanliness. | ||
|
||
Args: | ||
restrictions: a list of restrictions each restrict (table.key_source - target.proj()) | ||
purge_invalid_jobs: if True, remove invalid entry from the jobs table (potentially expensive operation) | ||
min_scheduling_interval: minimum time in seconds that must have passed since last job scheduling. | ||
If None, uses the value from dj.config["min_scheduling_interval"] (default: None) | ||
|
||
Returns: | ||
None | ||
""" | ||
__scheduled_event = { | ||
"table_name": self.target.table_name, | ||
"__type__": "jobs scheduling event" | ||
} | ||
|
||
if min_scheduling_interval is None: | ||
min_scheduling_interval = config["min_scheduling_interval"] | ||
|
||
if min_scheduling_interval > 0: | ||
recent_scheduling_event = ( | ||
self._Jobs.proj(last_scheduled="TIMESTAMPDIFF(SECOND, timestamp, UTC_TIMESTAMP())") | ||
& {"table_name": f"__{self.target.table_name}__"} | ||
& {"key_hash": key_hash(__scheduled_event)} | ||
& f"last_scheduled <= {min_scheduling_interval}" | ||
) | ||
if recent_scheduling_event: | ||
logger.info(f"Skip jobs scheduling for `{to_camel_case(self.target.table_name)}` (last scheduled {recent_scheduling_event.fetch1('last_scheduled')} seconds ago)") | ||
return | ||
|
||
try: | ||
with self.connection.transaction: | ||
schedule_count = 0 | ||
for key in (self._jobs_to_do(restrictions) - self.target).fetch("KEY"): | ||
schedule_count += self._Jobs.schedule(self.target.table_name, key) | ||
except Exception as e: | ||
logger.exception(str(e)) | ||
else: | ||
self._Jobs.ignore(f"__{self.target.table_name}__", __scheduled_event, | ||
message=f"Jobs scheduling event: {__scheduled_event['table_name']}") | ||
logger.info( | ||
f"{schedule_count} new jobs scheduled for `{to_camel_case(self.target.table_name)}`" | ||
) | ||
finally: | ||
if purge_invalid_jobs: | ||
self.purge_invalid_jobs() | ||
|
||
|
||
def purge_invalid_jobs(self): | ||
dimitri-yatsenko marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
""" | ||
Check and remove any invalid/outdated jobs in the JobTable for this autopopulate table. | ||
|
||
This method handles two types of invalid jobs: | ||
1. Jobs that are no longer in the `key_source` (e.g. entries in upstream table(s) got deleted) | ||
2. Jobs with "success" status that are no longer in the target table (e.g. entries in target table got deleted) | ||
|
||
The method is potentially time-consuming as it needs to: | ||
- Compare all jobs against the current key_source | ||
- For success jobs, verify their existence in the target table | ||
- Delete any jobs that fail these checks | ||
|
||
This cleanup should not need to run very often, but helps maintain database consistency. | ||
""" | ||
invalid_removed = 0 | ||
|
||
invalid_success = len(self.jobs & "status = 'success'") - len(self.target) | ||
if invalid_success > 0: | ||
for key, job_key in zip(*(self.jobs & "status = 'success'").fetch("KEY", "key")): | ||
if not (self.target & job_key): | ||
(self.jobs & key).delete() | ||
invalid_removed += 1 | ||
|
||
keys2do = self._jobs_to_do({}).fetch("KEY") | ||
invalid_incomplete = len(self.jobs & "status != 'success'") - len(keys2do) | ||
if invalid_incomplete > 0: | ||
for key, job_key in zip(*(self.jobs & "status != 'success'").fetch("KEY", "key")): | ||
if job_key not in keys2do: | ||
(self.jobs & key).delete() | ||
invalid_removed += 1 | ||
|
||
logger.info( | ||
f"{invalid_removed} invalid jobs removed for `{to_camel_case(self.target.table_name)}`" | ||
) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than baking this operation into populate, which makes the logic more convoluted, consider making
schedule_jobs
a separate, explicit process.