From 648b7008f34c38cf1a349f0964d77b36eb202181 Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Sun, 20 Nov 2022 02:18:08 -0600 Subject: [PATCH 01/19] Add concurrent.futures.Executor implementation --- charm4py/pool.py | 22 ++++++++++++++++++++++ charm4py/threads.py | 35 +++++++++++++++++++++++++++++++++-- 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/charm4py/pool.py b/charm4py/pool.py index 91c9a1db..fed18f14 100644 --- a/charm4py/pool.py +++ b/charm4py/pool.py @@ -3,6 +3,8 @@ from .threads import NotThreadedError from collections import defaultdict from copy import deepcopy +from concurrent.futures import Executor +from asyncio import wait_for import sys @@ -465,3 +467,23 @@ def submit(self, iterable, chunksize=1, ncores=-1): def submit_async(self, iterable, chunksize=1, ncores=-1, multi_future=False): return self.map_async(None, iterable, chunksize, ncores, multi_future) + + +class PoolExecutor(Executor): + + def __init__(self, pool_scheduler): + self.pool = Pool(pool_scheduler) + + def submit(self, fn, /, *args, **kwargs): + return self.pool.map_async(fn, *args, **kwargs) + + def map(self, func, *iterables, timeout=None, chunksize=1): + return wait_for(self.pool.map(func, zip(*iterables), chunksize=chunksize), timeout=timeout) + + def shutdown(wait=True, *, cancel_futures=False): + # Cancelling futures isn't implemented so + # cancel_futures currently does nothing + if wait: + wait_for(self.pool.pool_scheduler.schedule) + else: + self.pool.pool_scheduler.schedule() diff --git a/charm4py/threads.py b/charm4py/threads.py index e05678cf..3745a857 100644 --- a/charm4py/threads.py +++ b/charm4py/threads.py @@ -1,5 +1,7 @@ from greenlet import getcurrent - +from concurrent.futures import Future as CFuture +from concurrent.futures import CancelledError, TimeoutError +from asyncio import wait_for # Future IDs (fids) are sometimes carried as reference numbers inside # Charm++ CkCallback objects. The data type most commonly used for @@ -23,7 +25,7 @@ def __init__(self, msg): # See commit 25e2935 if need to resurrect code where proxies were included when # futures were pickled. -class Future(object): +class Future(CFuture): def __init__(self, fid, gr, src, num_vals): self.fid = fid # unique future ID within the process that created it @@ -93,6 +95,35 @@ def __getstate__(self): def __setstate__(self, state): self.fid, self.src = state + def cancel(self): + # Cancelling not currently implemented + return False + + def cancelled(self): + return False + + def running(self): + return not self.blocked and not self.ready() + + def done(self): + return self.ready() + + def result(timeout=None): + return wait_for(self.get(), timeout=timeout) + + def exception(timeout=None): + try: + wait_for(self.get(), timeout=timeout) + except (TimeoutError, CancelledError) as e: + raise e + except Exception as e: + if self.error is None: + raise e + + return self.error + + def add_done_callback(fn): + raise NotImplementedError class CollectiveFuture(Future): From 5e8ecb242454cd3a1be44d5a9018148a0c21e6f6 Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Sun, 20 Nov 2022 14:50:29 -0600 Subject: [PATCH 02/19] Use Task instead of map_async in PoolExecutor --- charm4py/pool.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/charm4py/pool.py b/charm4py/pool.py index fed18f14..d7ad5267 100644 --- a/charm4py/pool.py +++ b/charm4py/pool.py @@ -473,17 +473,22 @@ class PoolExecutor(Executor): def __init__(self, pool_scheduler): self.pool = Pool(pool_scheduler) + self.is_shutdown = False def submit(self, fn, /, *args, **kwargs): - return self.pool.map_async(fn, *args, **kwargs) + if self.is_shutdown: + raise RuntimeError("charm4py.pool.PoolExecutor object has been shut down") + return self.pool.Task(fn, *args, **kwargs, awaitable=True, ret=True) - def map(self, func, *iterables, timeout=None, chunksize=1): - return wait_for(self.pool.map(func, zip(*iterables), chunksize=chunksize), timeout=timeout) + def map(self, func, *iterables, timeout=None, chunksize=1, ncores=-1): + if self.is_shutdown: + raise RuntimeError("charm4py.pool.PoolExecutor object has been shut down") + return wait_for(self.pool.map(func, zip(*iterables), chunksize=chunksize, ncores=ncores), timeout=timeout) def shutdown(wait=True, *, cancel_futures=False): # Cancelling futures isn't implemented so # cancel_futures currently does nothing if wait: - wait_for(self.pool.pool_scheduler.schedule) + wait_for(self.pool.pool_scheduler.schedule()) else: self.pool.pool_scheduler.schedule() From 6f5558bd85a686f57244e69a4ed3527b13b6af54 Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Sun, 20 Nov 2022 15:04:31 -0600 Subject: [PATCH 03/19] Update shutdown status of PoolExecutor --- charm4py/pool.py | 1 + 1 file changed, 1 insertion(+) diff --git a/charm4py/pool.py b/charm4py/pool.py index d7ad5267..18109fe8 100644 --- a/charm4py/pool.py +++ b/charm4py/pool.py @@ -488,6 +488,7 @@ def map(self, func, *iterables, timeout=None, chunksize=1, ncores=-1): def shutdown(wait=True, *, cancel_futures=False): # Cancelling futures isn't implemented so # cancel_futures currently does nothing + self.is_shutdown = True if wait: wait_for(self.pool.pool_scheduler.schedule()) else: From ba3ec7912290c072bcb43a776d0fcad3decaf98c Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Sun, 20 Nov 2022 19:13:08 -0600 Subject: [PATCH 04/19] Add future cancelling support --- charm4py/pool.py | 9 +++++---- charm4py/threads.py | 13 +++++++++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/charm4py/pool.py b/charm4py/pool.py index 18109fe8..4cc60983 100644 --- a/charm4py/pool.py +++ b/charm4py/pool.py @@ -471,8 +471,8 @@ def submit_async(self, iterable, chunksize=1, ncores=-1, multi_future=False): class PoolExecutor(Executor): - def __init__(self, pool_scheduler): - self.pool = Pool(pool_scheduler) + def __init__(self, pool_scheduler_chare): + self.pool = Pool(pool_scheduler_chare) self.is_shutdown = False def submit(self, fn, /, *args, **kwargs): @@ -486,8 +486,9 @@ def map(self, func, *iterables, timeout=None, chunksize=1, ncores=-1): return wait_for(self.pool.map(func, zip(*iterables), chunksize=chunksize, ncores=ncores), timeout=timeout) def shutdown(wait=True, *, cancel_futures=False): - # Cancelling futures isn't implemented so - # cancel_futures currently does nothing + if cancel_futures == True: + raise NotImplementedError("Cancelling futures on shutdown not currently supported") + self.is_shutdown = True if wait: wait_for(self.pool.pool_scheduler.schedule()) diff --git a/charm4py/threads.py b/charm4py/threads.py index 3745a857..8d79fe8a 100644 --- a/charm4py/threads.py +++ b/charm4py/threads.py @@ -96,14 +96,19 @@ def __setstate__(self, state): self.fid, self.src = state def cancel(self): - # Cancelling not currently implemented - return False + if self.running() or self.done(): + return False + else: + threadMgr.cancelFuture(self) + return True def cancelled(self): - return False + return self.values == [None] * f.nvals: def running(self): - return not self.blocked and not self.ready() + # Not certain how to check if the future is currently running. + return not self.done() + #return not self.blocked and not self.ready() def done(self): return self.ready() From 3f6291e17f422d372b07f319e9cfe0c6f1a2c1aa Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Sun, 20 Nov 2022 19:15:10 -0600 Subject: [PATCH 05/19] Remove stray colon --- charm4py/threads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/charm4py/threads.py b/charm4py/threads.py index 8d79fe8a..ba4634ee 100644 --- a/charm4py/threads.py +++ b/charm4py/threads.py @@ -103,7 +103,7 @@ def cancel(self): return True def cancelled(self): - return self.values == [None] * f.nvals: + return self.values == [None] * f.nvals def running(self): # Not certain how to check if the future is currently running. From b71a5a5b254f3f131e50ceebf0bd103dea5cd69d Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Sun, 20 Nov 2022 20:20:49 -0600 Subject: [PATCH 06/19] Raise error if kwargs are used in submit, add cancelled property to future --- charm4py/pool.py | 9 +++++++-- charm4py/threads.py | 11 +++++++---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/charm4py/pool.py b/charm4py/pool.py index 4cc60983..708a6dae 100644 --- a/charm4py/pool.py +++ b/charm4py/pool.py @@ -475,10 +475,15 @@ def __init__(self, pool_scheduler_chare): self.pool = Pool(pool_scheduler_chare) self.is_shutdown = False - def submit(self, fn, /, *args, **kwargs): + # map_async can't handle **kwargs at present + def submit(self, fn, /, *args, **kwargs)#, chunksize=1, ncores=-1, multi_future=False): if self.is_shutdown: raise RuntimeError("charm4py.pool.PoolExecutor object has been shut down") - return self.pool.Task(fn, *args, **kwargs, awaitable=True, ret=True) + if kwargs is not None and len(kwargs > 0): + raise NotImplementedError("kwargs for PoolExecutor.submit are not supported currently") + + return self.pool.Task(fn, args, ret=True) + #return self.pool.map_async(fn, args, chunksize=chunksize, ncores=ncores, multi_future=multi_future) def map(self, func, *iterables, timeout=None, chunksize=1, ncores=-1): if self.is_shutdown: diff --git a/charm4py/threads.py b/charm4py/threads.py index ba4634ee..34f80fa4 100644 --- a/charm4py/threads.py +++ b/charm4py/threads.py @@ -36,6 +36,7 @@ def __init__(self, fid, gr, src, num_vals): self.blocked = False # flag to check if creator thread is blocked on the future self.gotvalues = False # flag to check if expected number of values have been received self.error = None # if the future receives an Exception, it is set here + self.cancelled = False def get(self): """ Blocking call on current entry method's thread to obtain the values of the @@ -103,12 +104,13 @@ def cancel(self): return True def cancelled(self): - return self.values == [None] * f.nvals + # What if function actually returns this? + #return self.values == [None] * f.nvals + return self.cancelled def running(self): - # Not certain how to check if the future is currently running. - return not self.done() - #return not self.blocked and not self.ready() + # Not certain if this is correct + return self.blocked != False and not self.done() def done(self): return self.ready() @@ -291,6 +293,7 @@ def cancelFuture(self, f): del self.futures[fid] f.gotvalues = True f.values = [None] * f.nvals + f.cancelled = True f.resume(self) # TODO: method to cancel collective future. the main issue with this is From 1a5280afbcd7b1fc9f9f1076710d9b2d2e9da73a Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Mon, 21 Nov 2022 01:01:06 -0600 Subject: [PATCH 07/19] Better cancelling check, remove asyncio timeout --- charm4py/pool.py | 38 +++++++++++++++++++++++++++++--------- charm4py/threads.py | 21 ++++++++++----------- 2 files changed, 39 insertions(+), 20 deletions(-) diff --git a/charm4py/pool.py b/charm4py/pool.py index 708a6dae..7eeb2836 100644 --- a/charm4py/pool.py +++ b/charm4py/pool.py @@ -4,7 +4,6 @@ from collections import defaultdict from copy import deepcopy from concurrent.futures import Executor -from asyncio import wait_for import sys @@ -468,6 +467,21 @@ def submit(self, iterable, chunksize=1, ncores=-1): def submit_async(self, iterable, chunksize=1, ncores=-1, multi_future=False): return self.map_async(None, iterable, chunksize, ncores, multi_future) +from dataclasses import dataclass +from typing import Callable +from frozendict import frozendict + +@dataclass +class _WrappedFunction: + + fn: Callable + #def __call__(self, args_iter, kwargs_dict): + # return self.fn(*args_iter, **kwargs_dict) + + def __call__(self, args_kwargs): + in_args = args_kwargs[0] + in_kwargs = args_kwargs[1] + return self.fn(*in_args, **in_kwargs) class PoolExecutor(Executor): @@ -475,20 +489,26 @@ def __init__(self, pool_scheduler_chare): self.pool = Pool(pool_scheduler_chare) self.is_shutdown = False - # map_async can't handle **kwargs at present - def submit(self, fn, /, *args, **kwargs)#, chunksize=1, ncores=-1, multi_future=False): + def submit(self, fn, /, *args, **kwargs): if self.is_shutdown: raise RuntimeError("charm4py.pool.PoolExecutor object has been shut down") - if kwargs is not None and len(kwargs > 0): - raise NotImplementedError("kwargs for PoolExecutor.submit are not supported currently") - return self.pool.Task(fn, args, ret=True) - #return self.pool.map_async(fn, args, chunksize=chunksize, ncores=ncores, multi_future=multi_future) + if kwargs is None or len(kwargs) == 0: + return self.pool.Task(fn, args, ret=True) + #args_iter = tuple(tuple([arg for arg in args]),) + #return self.pool.map_async(fn, tuple([0])) + else: + # Task doesn't support kwargs so this sneaks them in with a tuple + iterable_arg = tuple([tuple([args, frozendict(kwargs)])]) + return self.pool.Task(_WrappedFunction(fn), iterable_arg, ret=True) + #return self.pool.map_async(_WrappedFunction(fn), ((args,), frozendict(kwargs),)) def map(self, func, *iterables, timeout=None, chunksize=1, ncores=-1): + if timeout is not None: + print("Ignoring timeout. Timeout currently unsupported.") if self.is_shutdown: raise RuntimeError("charm4py.pool.PoolExecutor object has been shut down") - return wait_for(self.pool.map(func, zip(*iterables), chunksize=chunksize, ncores=ncores), timeout=timeout) + return self.pool.map(func, zip(*iterables), chunksize=chunksize, ncores=ncores) def shutdown(wait=True, *, cancel_futures=False): if cancel_futures == True: @@ -496,6 +516,6 @@ def shutdown(wait=True, *, cancel_futures=False): self.is_shutdown = True if wait: - wait_for(self.pool.pool_scheduler.schedule()) + self.pool.pool_scheduler.schedule() else: self.pool.pool_scheduler.schedule() diff --git a/charm4py/threads.py b/charm4py/threads.py index 34f80fa4..93e49ff7 100644 --- a/charm4py/threads.py +++ b/charm4py/threads.py @@ -1,7 +1,6 @@ from greenlet import getcurrent from concurrent.futures import Future as CFuture from concurrent.futures import CancelledError, TimeoutError -from asyncio import wait_for # Future IDs (fids) are sometimes carried as reference numbers inside # Charm++ CkCallback objects. The data type most commonly used for @@ -36,7 +35,6 @@ def __init__(self, fid, gr, src, num_vals): self.blocked = False # flag to check if creator thread is blocked on the future self.gotvalues = False # flag to check if expected number of values have been received self.error = None # if the future receives an Exception, it is set here - self.cancelled = False def get(self): """ Blocking call on current entry method's thread to obtain the values of the @@ -104,23 +102,25 @@ def cancel(self): return True def cancelled(self): - # What if function actually returns this? - #return self.values == [None] * f.nvals - return self.cancelled + return not self.fid in threadMgr.futures def running(self): # Not certain if this is correct return self.blocked != False and not self.done() def done(self): - return self.ready() + return self.ready() or self.cancelled or (self.error is not None) - def result(timeout=None): - return wait_for(self.get(), timeout=timeout) + def result(self, timeout=None): + if timeout is not None: + print("Ignoring timeout. Timeout currently unsupported.") + return self.get() - def exception(timeout=None): + def exception(self, timeout=None): + if timeout is not None: + print("Ignoring timeout. Timeout currently unsupported.") try: - wait_for(self.get(), timeout=timeout) + self.get() except (TimeoutError, CancelledError) as e: raise e except Exception as e: @@ -293,7 +293,6 @@ def cancelFuture(self, f): del self.futures[fid] f.gotvalues = True f.values = [None] * f.nvals - f.cancelled = True f.resume(self) # TODO: method to cancel collective future. the main issue with this is From 559546e6110ed72a769eb22bb9c42be1c1b8b13a Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Mon, 21 Nov 2022 01:09:09 -0600 Subject: [PATCH 08/19] Run autopep8 on pool.py and thread.py --- charm4py/pool.py | 129 +++++++++++++++++++++++++++++--------------- charm4py/threads.py | 35 +++++++----- 2 files changed, 108 insertions(+), 56 deletions(-) diff --git a/charm4py/pool.py b/charm4py/pool.py index 7eeb2836..4c784555 100644 --- a/charm4py/pool.py +++ b/charm4py/pool.py @@ -1,3 +1,6 @@ +from dataclasses import dataclass +from frozendict import frozendict +from typing import Callable from . import charm, Chare, Group, coro_ext, threads, Future from .charm import Charm4PyError from .threads import NotThreadedError @@ -56,22 +59,28 @@ def __init__(self, id, func, tasks, result, ncores, chunksize): if result is None or isinstance(result, threads.Future): self.results = [None] * len(tasks) self.future = result - self.tasks = [Chunk(tasks[i:i+chunksize], i) for i in range(0, len(tasks), chunksize)] + self.tasks = [Chunk(tasks[i:i + chunksize], i) + for i in range(0, len(tasks), chunksize)] else: - self.tasks = [Chunk(tasks[i:i+chunksize], result[i:i+chunksize]) for i in range(0, len(tasks), chunksize)] + self.tasks = [Chunk(tasks[i:i + chunksize], result[i:i + chunksize]) + for i in range(0, len(tasks), chunksize)] else: if result is None or isinstance(result, threads.Future): self.results = [None] * len(tasks) self.future = result if func is not None: - self.tasks = [Task(args, i) for i, args in enumerate(tasks)] + self.tasks = [Task(args, i) + for i, args in enumerate(tasks)] else: - self.tasks = [Task(args, i, func) for i, (func, args) in enumerate(tasks)] + self.tasks = [Task(args, i, func) + for i, (func, args) in enumerate(tasks)] else: if func is not None: - self.tasks = [Task(args, result[i]) for i, args in enumerate(tasks)] + self.tasks = [Task(args, result[i]) + for i, args in enumerate(tasks)] else: - self.tasks = [Task(args, result[i], func) for i, (func, args) in enumerate(tasks)] + self.tasks = [Task(args, result[i], func) + for i, (func, args) in enumerate(tasks)] # print('Created job with', len(self.tasks), 'tasks') self.tasks_pending = len(self.tasks) @@ -104,9 +113,12 @@ def __start__(self, func, tasks, result): if self.workers is None: assert self.num_workers > 0, 'Run with more than 1 PE to use charm.pool' # first time running a job, create Group of workers - print('Initializing charm.pool with', self.num_workers, 'worker PEs. ' - 'Warning: charm.pool is experimental (API and performance ' - 'is subject to change)') + print( + 'Initializing charm.pool with', + self.num_workers, + 'worker PEs. ' + 'Warning: charm.pool is experimental (API and performance ' + 'is subject to change)') self.workers = Group(Worker, args=[self.thisProxy]) if len(self.job_id_pool) == 0: @@ -118,10 +130,12 @@ def __start__(self, func, tasks, result): if charm.interactive: try: if func is not None: - self.workers.check(func.__module__, func.__name__, awaitable=True).get() + self.workers.check( + func.__module__, func.__name__, awaitable=True).get() else: for func_, args in tasks: - self.workers.check(func_.__module__, func_.__name__, awaitable=True).get() + self.workers.check( + func_.__module__, func_.__name__, awaitable=True).get() except Exception as e: if result is None: raise e @@ -139,7 +153,8 @@ def __addJob__(self, job): def startSingleTask(self, func, future, *args): self.__start__(func, None, None) - job = Job(self.job_id_pool.pop(), func, (args,), future, self.num_workers, 1) + job = Job(self.job_id_pool.pop(), func, + (args,), future, self.num_workers, 1) job.single_task = True self.__addJob__(job) if job.threaded: @@ -159,7 +174,13 @@ def start(self, func, tasks, result, ncores, chunksize): self.__start__(func, tasks, result) - job = Job(self.job_id_pool.pop(), func, tasks, result, ncores, chunksize) + job = Job( + self.job_id_pool.pop(), + func, + tasks, + result, + ncores, + chunksize) self.__addJob__(job) if job.chunked: @@ -239,7 +260,7 @@ def taskFinished(self, worker_id, job_id, result=None): if job.chunked: i, results = result n = len(results) - job.results[i:i+n] = results + job.results[i:i + n] = results else: i, _result = result job.results[i] = _result @@ -272,7 +293,8 @@ def taskError(self, worker_id, job_id, exception): job.exception = exception self.idle_workers.add(worker_id) # marking as failed will allow the scheduler to delete it from the linked list - # NOTE that we will only delete from the 'jobs' list once all the pending tasks are done + # NOTE that we will only delete from the 'jobs' list once all the + # pending tasks are done job.failed = True if not hasattr(job, 'future'): if job.chunked: @@ -301,7 +323,8 @@ class Worker(Chare): def __init__(self, scheduler): self.scheduler = scheduler - assert len(self.scheduler.elemIdx) > 0 # make sure points to the element, not collection + # make sure points to the element, not collection + assert len(self.scheduler.elemIdx) > 0 self.__addThreadEventSubscriber__(scheduler, self.thisIndex) # TODO: when to purge entries from this dict? self.funcs = {} # job ID -> function used by this job ID @@ -325,14 +348,18 @@ def runTask(self, func, args, result_destination, job_id): try: result = func(args) if isinstance(result_destination, int): - self.scheduler.taskFinished(self.thisIndex, job_id, (result_destination, result)) + self.scheduler.taskFinished( + self.thisIndex, job_id, (result_destination, result)) else: # assume result_destination is a future result_destination.send(result) self.scheduler.taskFinished(self.thisIndex, job_id) except Exception as e: if isinstance(e, NotThreadedError): - e = Charm4PyError('Function ' + str(func) + ' must be decorated with @coro to be able to suspend') + e = Charm4PyError( + 'Function ' + + str(func) + + ' must be decorated with @coro to be able to suspend') charm.prepareExceptionForSend(e) self.scheduler.taskError(self.thisIndex, job_id, e) if not isinstance(result_destination, int): @@ -346,14 +373,18 @@ def runTask_star(self, func, args, result_destination, job_id): try: result = func(*args) if isinstance(result_destination, int): - self.scheduler.taskFinished(self.thisIndex, job_id, (result_destination, result)) + self.scheduler.taskFinished( + self.thisIndex, job_id, (result_destination, result)) else: # assume result_destination is a future result_destination.send(result) self.scheduler.taskFinished(self.thisIndex, job_id) except Exception as e: if isinstance(e, NotThreadedError): - e = Charm4PyError('Function ' + str(func) + ' must be decorated with @coro to be able to suspend') + e = Charm4PyError( + 'Function ' + + str(func) + + ' must be decorated with @coro to be able to suspend') charm.prepareExceptionForSend(e) self.scheduler.taskError(self.thisIndex, job_id, e) if not isinstance(result_destination, int): @@ -391,7 +422,8 @@ def runChunk(self, _, chunk, result_destination, job_id): def send_chunk_results(self, results, result_destination, job_id): if isinstance(result_destination, int): - self.scheduler.taskFinished(self.thisIndex, job_id, (result_destination, results)) + self.scheduler.taskFinished( + self.thisIndex, job_id, (result_destination, results)) else: # assume result_destination is a list of futures # TODO: should send all results together to PE where future was created, @@ -402,7 +434,8 @@ def send_chunk_results(self, results, result_destination, job_id): def send_chunk_exc(self, e, result_destination, job_id): if isinstance(e, NotThreadedError): - e = Charm4PyError('Function not decorated with @coro tried to suspend') + e = Charm4PyError( + 'Function not decorated with @coro tried to suspend') charm.prepareExceptionForSend(e) self.scheduler.taskError(self.thisIndex, job_id, e) if not isinstance(result_destination, int): @@ -411,7 +444,8 @@ def send_chunk_exc(self, e, result_destination, job_id): def check(self, func_module, func_name): if charm.options.remote_exec is not True: - raise Charm4PyError('Remote code execution is disabled. Set charm.options.remote_exec to True') + raise Charm4PyError( + 'Remote code execution is disabled. Set charm.options.remote_exec to True') eval(func_name, sys.modules[func_module].__dict__) @@ -443,11 +477,18 @@ def Task(self, func, args, ret=False, awaitable=False): def map(self, func, iterable, chunksize=1, ncores=-1): result = Future() - # TODO shouldn't send task objects to a central place. what if they are large? + # TODO shouldn't send task objects to a central place. what if they are + # large? self.pool_scheduler.start(func, iterable, result, ncores, chunksize) return result.get() - def map_async(self, func, iterable, chunksize=1, ncores=-1, multi_future=False): + def map_async( + self, + func, + iterable, + chunksize=1, + ncores=-1, + multi_future=False): if self.mype == 0: # see deepcopy comment above (only need this for async case since # the sync case won't return until all the tasks have finished) @@ -464,25 +505,26 @@ def map_async(self, func, iterable, chunksize=1, ncores=-1, multi_future=False): def submit(self, iterable, chunksize=1, ncores=-1): return self.map(None, iterable, chunksize, ncores) - def submit_async(self, iterable, chunksize=1, ncores=-1, multi_future=False): + def submit_async( + self, + iterable, + chunksize=1, + ncores=-1, + multi_future=False): return self.map_async(None, iterable, chunksize, ncores, multi_future) -from dataclasses import dataclass -from typing import Callable -from frozendict import frozendict @dataclass class _WrappedFunction: fn: Callable - #def __call__(self, args_iter, kwargs_dict): - # return self.fn(*args_iter, **kwargs_dict) def __call__(self, args_kwargs): in_args = args_kwargs[0] in_kwargs = args_kwargs[1] return self.fn(*in_args, **in_kwargs) + class PoolExecutor(Executor): def __init__(self, pool_scheduler_chare): @@ -491,29 +533,30 @@ def __init__(self, pool_scheduler_chare): def submit(self, fn, /, *args, **kwargs): if self.is_shutdown: - raise RuntimeError("charm4py.pool.PoolExecutor object has been shut down") + raise RuntimeError( + "charm4py.pool.PoolExecutor object has been shut down") if kwargs is None or len(kwargs) == 0: - return self.pool.Task(fn, args, ret=True) - #args_iter = tuple(tuple([arg for arg in args]),) - #return self.pool.map_async(fn, tuple([0])) + return self.pool.Task(fn, args, ret=True) else: # Task doesn't support kwargs so this sneaks them in with a tuple iterable_arg = tuple([tuple([args, frozendict(kwargs)])]) - return self.pool.Task(_WrappedFunction(fn), iterable_arg, ret=True) - #return self.pool.map_async(_WrappedFunction(fn), ((args,), frozendict(kwargs),)) + return self.pool.Task(_WrappedFunction(fn), iterable_arg, ret=True) def map(self, func, *iterables, timeout=None, chunksize=1, ncores=-1): if timeout is not None: print("Ignoring timeout. Timeout currently unsupported.") if self.is_shutdown: - raise RuntimeError("charm4py.pool.PoolExecutor object has been shut down") - return self.pool.map(func, zip(*iterables), chunksize=chunksize, ncores=ncores) + raise RuntimeError( + "charm4py.pool.PoolExecutor object has been shut down") + return self.pool.map(func, zip(*iterables), + chunksize=chunksize, ncores=ncores) + + def shutdown(self, wait=True, *, cancel_futures=False): + if cancel_futures: + raise NotImplementedError( + "Cancelling futures on shutdown not currently supported") - def shutdown(wait=True, *, cancel_futures=False): - if cancel_futures == True: - raise NotImplementedError("Cancelling futures on shutdown not currently supported") - self.is_shutdown = True if wait: self.pool.pool_scheduler.schedule() diff --git a/charm4py/threads.py b/charm4py/threads.py index 93e49ff7..c01508f6 100644 --- a/charm4py/threads.py +++ b/charm4py/threads.py @@ -29,7 +29,8 @@ class Future(CFuture): def __init__(self, fid, gr, src, num_vals): self.fid = fid # unique future ID within the process that created it self.gr = gr # greenlet that created the future - self.src = src # PE where the future was created (not used for collective futures) + # PE where the future was created (not used for collective futures) + self.src = src self.nvals = num_vals # number of values that the future expects to receive self.values = [] # values of the future self.blocked = False # flag to check if creator thread is blocked on the future @@ -80,7 +81,8 @@ def deposit(self, result): def resume(self, threadMgr): if self.blocked == 2: - # someone is waiting for future to become ready, signal by sending myself + # someone is waiting for future to become ready, signal by sending + # myself self.blocked = False threadMgr.resumeThread(self.gr, self) elif self.blocked: @@ -98,16 +100,16 @@ def cancel(self): if self.running() or self.done(): return False else: - threadMgr.cancelFuture(self) + threadMgr.cancelFuture(self) return True def cancelled(self): - return not self.fid in threadMgr.futures + return self.fid not in threadMgr.futures def running(self): # Not certain if this is correct - return self.blocked != False and not self.done() - + return self.blocked and not self.done() + def done(self): return self.ready() or self.cancelled or (self.error is not None) @@ -132,6 +134,7 @@ def exception(self, timeout=None): def add_done_callback(fn): raise NotImplementedError + class CollectiveFuture(Future): def __init__(self, fid, gr, proxy, num_vals): @@ -185,12 +188,16 @@ def isMainThread(self): def objMigrating(self, obj): if obj._numthreads > 0: - raise Charm4PyError('Migration of chares with active threads is not currently supported') + raise Charm4PyError( + 'Migration of chares with active threads is not currently supported') def throwNotThreadedError(self): - raise NotThreadedError("Method '" + charm.last_em_exec.C.__name__ + "." + - charm.last_em_exec.name + - "' must be a couroutine to be able to suspend (decorate it with @coro)") + raise NotThreadedError( + "Method '" + + charm.last_em_exec.C.__name__ + + "." + + charm.last_em_exec.name + + "' must be a couroutine to be able to suspend (decorate it with @coro)") def pauseThread(self): """ Called by an entry method thread to wait for something. @@ -245,7 +252,8 @@ def createFuture(self, num_vals=1): # get a unique local Future ID global FIDMAXVAL futures = self.futures - assert len(futures) < FIDMAXVAL, 'Too many pending futures, cannot create more' + assert len( + futures) < FIDMAXVAL, 'Too many pending futures, cannot create more' fid = (self.lastfid % FIDMAXVAL) + 1 while fid in futures: fid = (fid % FIDMAXVAL) + 1 @@ -269,8 +277,9 @@ def depositFuture(self, fid, result): try: f = futures[fid] except KeyError: - raise Charm4PyError('No pending future with fid=' + str(fid) + '. A common reason is ' - 'sending to a future that already received its value(s)') + raise Charm4PyError( + 'No pending future with fid=' + str(fid) + '. A common reason is ' + 'sending to a future that already received its value(s)') if f.deposit(result): del futures[fid] # resume if a thread is blocked on the future From 1b08833be5ccc06cd5e0a5f2eaa1042f37dfa5cc Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Mon, 21 Nov 2022 01:32:04 -0600 Subject: [PATCH 09/19] Simplify shutdown code --- charm4py/pool.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/charm4py/pool.py b/charm4py/pool.py index 4c784555..9a384a41 100644 --- a/charm4py/pool.py +++ b/charm4py/pool.py @@ -558,7 +558,4 @@ def shutdown(self, wait=True, *, cancel_futures=False): "Cancelling futures on shutdown not currently supported") self.is_shutdown = True - if wait: - self.pool.pool_scheduler.schedule() - else: - self.pool.pool_scheduler.schedule() + self.pool.pool_scheduler.schedule() From 08cf569e62d65206deede09bfb31303104a35c02 Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Mon, 21 Nov 2022 14:16:34 -0600 Subject: [PATCH 10/19] Support wait and cancel_futures in shutdown --- charm4py/pool.py | 15 ++++++++++++--- charm4py/threads.py | 7 ++++--- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/charm4py/pool.py b/charm4py/pool.py index 9a384a41..a5673e22 100644 --- a/charm4py/pool.py +++ b/charm4py/pool.py @@ -553,9 +553,18 @@ def map(self, func, *iterables, timeout=None, chunksize=1, ncores=-1): chunksize=chunksize, ncores=ncores) def shutdown(self, wait=True, *, cancel_futures=False): - if cancel_futures: - raise NotImplementedError( - "Cancelling futures on shutdown not currently supported") + # Prevent more jobs from being submitted self.is_shutdown = True + + if cancel_futures: + for job in self.pool.pool_scheduler.jobs: + if isinstance(getattr(job, 'future', None), threads.Future): + job.future.cancel() + self.pool.pool_scheduler.schedule() + + if wait: + for job in self.pool.pool_scheduler.jobs: + if isinstance(getattr(job, 'future', None), threads.Future): + job.future.get() diff --git a/charm4py/threads.py b/charm4py/threads.py index c01508f6..15b5ac2f 100644 --- a/charm4py/threads.py +++ b/charm4py/threads.py @@ -116,13 +116,14 @@ def done(self): def result(self, timeout=None): if timeout is not None: print("Ignoring timeout. Timeout currently unsupported.") + if self.cancelled(): + raise CancelledError + return self.get() def exception(self, timeout=None): - if timeout is not None: - print("Ignoring timeout. Timeout currently unsupported.") try: - self.get() + self.result(timeout=timeout) except (TimeoutError, CancelledError) as e: raise e except Exception as e: From 7a2ce281d0b96ad72925fd57357d98f832337f50 Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Mon, 21 Nov 2022 14:55:47 -0600 Subject: [PATCH 11/19] Implement timeout with gevent --- charm4py/pool.py | 13 ++++++++----- charm4py/threads.py | 8 ++++---- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/charm4py/pool.py b/charm4py/pool.py index a5673e22..3161529d 100644 --- a/charm4py/pool.py +++ b/charm4py/pool.py @@ -6,7 +6,8 @@ from .threads import NotThreadedError from collections import defaultdict from copy import deepcopy -from concurrent.futures import Executor +from concurrent.futures import Executor, TimeoutError +from gevent import Timeout import sys @@ -544,13 +545,15 @@ def submit(self, fn, /, *args, **kwargs): return self.pool.Task(_WrappedFunction(fn), iterable_arg, ret=True) def map(self, func, *iterables, timeout=None, chunksize=1, ncores=-1): - if timeout is not None: - print("Ignoring timeout. Timeout currently unsupported.") if self.is_shutdown: raise RuntimeError( "charm4py.pool.PoolExecutor object has been shut down") - return self.pool.map(func, zip(*iterables), - chunksize=chunksize, ncores=ncores) + + with Timeout(timeout, TimeoutError) as timeout: + result = self.pool.map(func, zip(*iterables), + chunksize=chunksize, ncores=ncores) + + return result def shutdown(self, wait=True, *, cancel_futures=False): diff --git a/charm4py/threads.py b/charm4py/threads.py index 15b5ac2f..7472be32 100644 --- a/charm4py/threads.py +++ b/charm4py/threads.py @@ -1,6 +1,7 @@ from greenlet import getcurrent from concurrent.futures import Future as CFuture from concurrent.futures import CancelledError, TimeoutError +from gevent import Timeout # Future IDs (fids) are sometimes carried as reference numbers inside # Charm++ CkCallback objects. The data type most commonly used for @@ -114,12 +115,11 @@ def done(self): return self.ready() or self.cancelled or (self.error is not None) def result(self, timeout=None): - if timeout is not None: - print("Ignoring timeout. Timeout currently unsupported.") if self.cancelled(): raise CancelledError - - return self.get() + with Timeout(timeout, TimeoutError): + result = self.get() + return result def exception(self, timeout=None): try: From 42d32e130e737719a745a2b48a172e2d92edc5c8 Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Mon, 21 Nov 2022 15:00:30 -0600 Subject: [PATCH 12/19] Add self as argument to add_done_callback --- charm4py/threads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/charm4py/threads.py b/charm4py/threads.py index 7472be32..9eca78e3 100644 --- a/charm4py/threads.py +++ b/charm4py/threads.py @@ -132,7 +132,7 @@ def exception(self, timeout=None): return self.error - def add_done_callback(fn): + def add_done_callback(self, fn): raise NotImplementedError From ccf62e6563bc3246931c48698dc6eebfc6d09132 Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Mon, 21 Nov 2022 15:21:02 -0600 Subject: [PATCH 13/19] Use concurrent.futures.wait in shutdown --- charm4py/pool.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/charm4py/pool.py b/charm4py/pool.py index 3161529d..f52c218f 100644 --- a/charm4py/pool.py +++ b/charm4py/pool.py @@ -7,6 +7,7 @@ from collections import defaultdict from copy import deepcopy from concurrent.futures import Executor, TimeoutError +from concurrent.futures import wait as cwait from gevent import Timeout import sys @@ -568,6 +569,11 @@ def shutdown(self, wait=True, *, cancel_futures=False): self.pool.pool_scheduler.schedule() if wait: - for job in self.pool.pool_scheduler.jobs: - if isinstance(getattr(job, 'future', None), threads.Future): - job.future.get() + futures = [ + job.future for job in self.pool.pool_scheduler.jobs if isinstance( + getattr( + job, + 'future', + None), + threads.Future)] + cwait(futures) From 56fe0bc61051c5013f76aa651836661a5c6f5e20 Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Tue, 22 Nov 2022 02:12:04 -0600 Subject: [PATCH 14/19] Add remainder of concurrent.futures.Future API --- charm4py/threads.py | 124 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 99 insertions(+), 25 deletions(-) diff --git a/charm4py/threads.py b/charm4py/threads.py index 9eca78e3..93d4c832 100644 --- a/charm4py/threads.py +++ b/charm4py/threads.py @@ -1,7 +1,10 @@ from greenlet import getcurrent from concurrent.futures import Future as CFuture -from concurrent.futures import CancelledError, TimeoutError +from concurrent.futures import CancelledError, TimeoutError, InvalidStateError from gevent import Timeout +from sys import stderr +from dataclasses import dataclass +from typing import Union # Future IDs (fids) are sometimes carried as reference numbers inside # Charm++ CkCallback objects. The data type most commonly used for @@ -25,34 +28,46 @@ def __init__(self, msg): # See commit 25e2935 if need to resurrect code where proxies were included when # futures were pickled. +@dataclass() class Future(CFuture): + fid: int # unique future ID within the process that created it + gr: object # greenlet that created the future + # PE where the future was created (not used for collective futures) + src: int + nvals: int # number of values that the future expects to receive + values: list # values of the future + callbacks: list # list of callback functions + # flag to check if creator thread is blocked on the future + blocked: Union[bool, int] = False + gotvalues: bool = False # flag to check if expected number of values have been received + # if the future receives an Exception, it is set here + error: Union[None, Exception] = None + is_cancelled: bool = False # flag to check if the future is cancelled + is_running: bool = False # flag to check if the future is currently running + def __init__(self, fid, gr, src, num_vals): - self.fid = fid # unique future ID within the process that created it - self.gr = gr # greenlet that created the future - # PE where the future was created (not used for collective futures) + self.fid = fid + self.gr = gr self.src = src - self.nvals = num_vals # number of values that the future expects to receive - self.values = [] # values of the future - self.blocked = False # flag to check if creator thread is blocked on the future - self.gotvalues = False # flag to check if expected number of values have been received - self.error = None # if the future receives an Exception, it is set here + self.nvals = num_vals + self.values = [] + self.callbacks = [] def get(self): """ Blocking call on current entry method's thread to obtain the values of the future. If the values are already available then they are returned immediately. """ + if not self.gotvalues: self.blocked = True self.gr = getcurrent() self.values = threadMgr.pauseThread() - if self.error is not None: + if isinstance(self.error, Exception): raise self.error - if self.nvals == 1: - return self.values[0] - return self.values + return self.values[0] if self.nvals == 1 else self.values def ready(self): return self.gotvalues @@ -72,13 +87,23 @@ def getTargetProxyEntryMethod(self): def deposit(self, result): """ Deposit a value for this future. """ + if self.done(): + raise InvalidStateError() + + retval = False self.values.append(result) if isinstance(result, Exception): self.error = result if len(self.values) == self.nvals: self.gotvalues = True - return True - return False + retval = True + self.is_running = False + + while len(self.callbacks) > 0: + callback = self.callback.pop() + self._run_callback(callback) + + return retval def resume(self, threadMgr): if self.blocked == 2: @@ -101,22 +126,32 @@ def cancel(self): if self.running() or self.done(): return False else: + self.is_cancelled = True threadMgr.cancelFuture(self) + while len(self.callbacks) > 0: + callback = self.callback.pop() + self._run_callback(callback) return True def cancelled(self): - return self.fid not in threadMgr.futures + return self.is_cancelled def running(self): - # Not certain if this is correct - return self.blocked and not self.done() + return self.is_running def done(self): - return self.ready() or self.cancelled or (self.error is not None) + print( + self.ready(), + self.cancelled(), + isinstance( + self.error, + Exception), + self.running()) + return self.ready() or self.cancelled() or isinstance(self.error, Exception) def result(self, timeout=None): if self.cancelled(): - raise CancelledError + raise CancelledError() with Timeout(timeout, TimeoutError): result = self.get() return result @@ -124,16 +159,55 @@ def result(self, timeout=None): def exception(self, timeout=None): try: self.result(timeout=timeout) - except (TimeoutError, CancelledError) as e: - raise e except Exception as e: - if self.error is None: + if isinstance( + e, TimeoutError) or isinstance( + e, CancelledError) or e != self.error: raise e return self.error - def add_done_callback(self, fn): - raise NotImplementedError + def _run_callback(self, callback): + try: + callback(self) + except Exception as e: + print(e, file=stderr) + + def add_done_callback(self, callback): + if self.done(): + self._run_callback(callback) + else: + self.callbacks.append(callback) + # Status changed while appending. Run + # any remaining callbacks + if self.done(): + while len(self.callbacks) > 0: + callback = self.callback.pop() + self._run_callback(callback) + + def set_running_or_notify_cancel(self): + if self.cancelled(): + retval = False + elif self.done(): + raise InvalidStateError() + else: + retval = True + self.is_running = True + + # self.waitReady(None) + # self.resume(threadMgr) + # threadMgr.start() + + return retval + + # The following methods aren't used here, but concurrent.futures says unit + # tests may use them + + def set_result(self, result): + self.deposit(result) + + def set_exception(self, exception): + self.deposit(exception) class CollectiveFuture(Future): From 9822860898f2afe22b0b2a3e4e2936b4dad065c3 Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Tue, 22 Nov 2022 02:29:12 -0600 Subject: [PATCH 15/19] Refine add callback function --- charm4py/threads.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/charm4py/threads.py b/charm4py/threads.py index 93d4c832..41058c0d 100644 --- a/charm4py/threads.py +++ b/charm4py/threads.py @@ -141,12 +141,12 @@ def running(self): def done(self): print( - self.ready(), - self.cancelled(), - isinstance( + "Ready:", self.ready(), + "Cancelled:", self.cancelled(), + "Failed:", isinstance( self.error, Exception), - self.running()) + "Running:", self.running()) return self.ready() or self.cancelled() or isinstance(self.error, Exception) def result(self, timeout=None): @@ -174,16 +174,16 @@ def _run_callback(self, callback): print(e, file=stderr) def add_done_callback(self, callback): + self.callbacks.append(callback) + # Status may have changed while appending. Run + # any remaining callbacks if self.done(): - self._run_callback(callback) - else: - self.callbacks.append(callback) - # Status changed while appending. Run - # any remaining callbacks - if self.done(): - while len(self.callbacks) > 0: + while len(self.callbacks) > 0: + try: callback = self.callback.pop() self._run_callback(callback) + except IndexError as e: + break def set_running_or_notify_cancel(self): if self.cancelled(): From 282a11b237c293e60b42a4e2709f40f0255ed47f Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Tue, 6 Dec 2022 00:00:01 -0600 Subject: [PATCH 16/19] Simplify running callbacks, use get() to wait on futures during shutdown --- charm4py/pool.py | 12 +++++++++--- charm4py/threads.py | 37 +++++++++++++++++++------------------ 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/charm4py/pool.py b/charm4py/pool.py index f52c218f..c515b150 100644 --- a/charm4py/pool.py +++ b/charm4py/pool.py @@ -7,7 +7,7 @@ from collections import defaultdict from copy import deepcopy from concurrent.futures import Executor, TimeoutError -from concurrent.futures import wait as cwait +#from concurrent.futures import wait as cf_wait from gevent import Timeout import sys @@ -566,9 +566,14 @@ def shutdown(self, wait=True, *, cancel_futures=False): if isinstance(getattr(job, 'future', None), threads.Future): job.future.cancel() - self.pool.pool_scheduler.schedule() + # Is this necessary? + # self.pool.pool_scheduler.schedule() if wait: + for job in self.pool.pool_scheduler.jobs: + if isinstance(getattr(job, 'future', None), threads.Future): + job.future.get() + """ futures = [ job.future for job in self.pool.pool_scheduler.jobs if isinstance( getattr( @@ -576,4 +581,5 @@ def shutdown(self, wait=True, *, cancel_futures=False): 'future', None), threads.Future)] - cwait(futures) + cf_wait(futures) + """ diff --git a/charm4py/threads.py b/charm4py/threads.py index 41058c0d..2289d62b 100644 --- a/charm4py/threads.py +++ b/charm4py/threads.py @@ -1,5 +1,5 @@ from greenlet import getcurrent -from concurrent.futures import Future as CFuture +from concurrent.futures import Future as ConcurrentFuture from concurrent.futures import CancelledError, TimeoutError, InvalidStateError from gevent import Timeout from sys import stderr @@ -29,7 +29,7 @@ def __init__(self, msg): # futures were pickled. @dataclass() -class Future(CFuture): +class Future(ConcurrentFuture): fid: int # unique future ID within the process that created it gr: object # greenlet that created the future @@ -98,10 +98,7 @@ def deposit(self, result): self.gotvalues = True retval = True self.is_running = False - - while len(self.callbacks) > 0: - callback = self.callback.pop() - self._run_callback(callback) + self._run_callbacks() return retval @@ -128,9 +125,7 @@ def cancel(self): else: self.is_cancelled = True threadMgr.cancelFuture(self) - while len(self.callbacks) > 0: - callback = self.callback.pop() - self._run_callback(callback) + self._run_callbacks() return True def cancelled(self): @@ -173,17 +168,20 @@ def _run_callback(self, callback): except Exception as e: print(e, file=stderr) + def _run_callbacks(self): + while len(self.callbacks) > 0: + try: + callback = self.callbacks.pop() + self._run_callback(callback) + except IndexError: + break + def add_done_callback(self, callback): self.callbacks.append(callback) # Status may have changed while appending. Run # any remaining callbacks if self.done(): - while len(self.callbacks) > 0: - try: - callback = self.callback.pop() - self._run_callback(callback) - except IndexError as e: - break + self._run_callbacks() def set_running_or_notify_cancel(self): if self.cancelled(): @@ -191,11 +189,13 @@ def set_running_or_notify_cancel(self): elif self.done(): raise InvalidStateError() else: - retval = True self.is_running = True + retval = True + # How to force this future to start running asynchronously? # self.waitReady(None) - # self.resume(threadMgr) + self.blocked = True + self.resume(threadMgr) # threadMgr.start() return retval @@ -353,7 +353,8 @@ def depositFuture(self, fid, result): f = futures[fid] except KeyError: raise Charm4PyError( - 'No pending future with fid=' + str(fid) + '. A common reason is ' + 'No pending future with fid=' + + str(fid) + '. A common reason is ' 'sending to a future that already received its value(s)') if f.deposit(result): del futures[fid] From b71e4b379ec218655c2d6a40437aeabc22305b24 Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Tue, 6 Dec 2022 02:53:15 -0600 Subject: [PATCH 17/19] Fix PoolExecutor.map --- charm4py/pool.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/charm4py/pool.py b/charm4py/pool.py index c515b150..656b5b65 100644 --- a/charm4py/pool.py +++ b/charm4py/pool.py @@ -527,6 +527,15 @@ def __call__(self, args_kwargs): return self.fn(*in_args, **in_kwargs) +@dataclass +class _StarmappedFunction: + + fn: Callable + + def __call__(self, args_iterable): + return self.fn(*args_iterable) + + class PoolExecutor(Executor): def __init__(self, pool_scheduler_chare): @@ -551,7 +560,7 @@ def map(self, func, *iterables, timeout=None, chunksize=1, ncores=-1): "charm4py.pool.PoolExecutor object has been shut down") with Timeout(timeout, TimeoutError) as timeout: - result = self.pool.map(func, zip(*iterables), + result = self.pool.map(_StarmappedFunction(func), list(zip(*iterables)), chunksize=chunksize, ncores=ncores) return result From 081d3dca72370f52ab31658a4286414f861c9263 Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Tue, 6 Dec 2022 19:26:11 -0600 Subject: [PATCH 18/19] Use more of the Futures interface --- charm4py/pool.py | 46 +++--- charm4py/threads.py | 189 +++++++++++++++++----- examples/dist-task-scheduler/scheduler.py | 2 +- 3 files changed, 182 insertions(+), 55 deletions(-) diff --git a/charm4py/pool.py b/charm4py/pool.py index 656b5b65..ec51f237 100644 --- a/charm4py/pool.py +++ b/charm4py/pool.py @@ -101,6 +101,7 @@ def taskDone(self): class PoolScheduler(Chare): def __init__(self): + super().__init__() self.workers = None self.idle_workers = set(range(1, charm.numPes())) self.num_workers = len(self.idle_workers) @@ -158,6 +159,7 @@ def startSingleTask(self, func, future, *args): job = Job(self.job_id_pool.pop(), func, (args,), future, self.num_workers, 1) job.single_task = True + self.__addJob__(job) if job.threaded: job.remote = self.workers.runTask_star_th @@ -211,6 +213,8 @@ def start(self, func, tasks, result, ncores, chunksize): self.schedule() def schedule(self): + from time import time + print("SCHEDULED at time", time()) job = self.job_next prev = self while job is not None: @@ -218,24 +222,27 @@ def schedule(self): return while True: if not job.failed: - task = job.getTask() - if task is None: - break - worker_id = self.idle_workers.pop() - # print('Sending task to worker', worker_id) - - if job.func is not None: - func = None - if job.id not in self.worker_knows[worker_id]: - func = job.func - job.workers.append(worker_id) - self.worker_knows[worker_id].add(job.id) + if True:#job.future.set_running_or_notify_cancel(): + task = job.getTask() + if task is None: + break + worker_id = self.idle_workers.pop() + # print('Sending task to worker', worker_id) + + if job.func is not None: + func = None + if job.id not in self.worker_knows[worker_id]: + func = job.func + job.workers.append(worker_id) + self.worker_knows[worker_id].add(job.id) + else: + func = task.func + # NOTE: this is a non-standard way of using proxies, but is + # faster and allows the scheduler to reuse the same proxy + self.workers.elemIdx = worker_id + job.remote(func, task.data, task.result_dest, job.id) else: - func = task.func - # NOTE: this is a non-standard way of using proxies, but is - # faster and allows the scheduler to reuse the same proxy - self.workers.elemIdx = worker_id - job.remote(func, task.data, task.result_dest, job.id) + break if len(job.tasks) == 0: prev.job_next = job.job_next @@ -474,6 +481,7 @@ def Task(self, func, args, ret=False, awaitable=False): if ret or awaitable: f = Future() # unpack the arguments for sending to allow benefiting from direct copy + #print(f) self.pool_scheduler.startSingleTask(func, f, *args) return f @@ -549,6 +557,8 @@ def submit(self, fn, /, *args, **kwargs): if kwargs is None or len(kwargs) == 0: return self.pool.Task(fn, args, ret=True) + #return self.pool.map_async(_StarmappedFunction(fn), (args,), + # chunksize=1, ncores=-1) else: # Task doesn't support kwargs so this sneaks them in with a tuple iterable_arg = tuple([tuple([args, frozendict(kwargs)])]) @@ -576,7 +586,7 @@ def shutdown(self, wait=True, *, cancel_futures=False): job.future.cancel() # Is this necessary? - # self.pool.pool_scheduler.schedule() + self.pool.pool_scheduler.schedule() if wait: for job in self.pool.pool_scheduler.jobs: diff --git a/charm4py/threads.py b/charm4py/threads.py index 2289d62b..8e96280e 100644 --- a/charm4py/threads.py +++ b/charm4py/threads.py @@ -1,10 +1,12 @@ from greenlet import getcurrent from concurrent.futures import Future as ConcurrentFuture from concurrent.futures import CancelledError, TimeoutError, InvalidStateError +from concurrent.futures._base import CANCELLED, FINISHED, CANCELLED_AND_NOTIFIED from gevent import Timeout from sys import stderr from dataclasses import dataclass from typing import Union +from collections.abc import Iterable # Future IDs (fids) are sometimes carried as reference numbers inside # Charm++ CkCallback objects. The data type most commonly used for @@ -28,31 +30,34 @@ def __init__(self, msg): # See commit 25e2935 if need to resurrect code where proxies were included when # futures were pickled. -@dataclass() +#@dataclass() class Future(ConcurrentFuture): - fid: int # unique future ID within the process that created it - gr: object # greenlet that created the future + #fid: int # unique future ID within the process that created it + #gr: object # greenlet that created the future # PE where the future was created (not used for collective futures) - src: int - nvals: int # number of values that the future expects to receive - values: list # values of the future - callbacks: list # list of callback functions + #src: int + #nvals: int # number of values that the future expects to receive + #values: list # values of the future # flag to check if creator thread is blocked on the future - blocked: Union[bool, int] = False - gotvalues: bool = False # flag to check if expected number of values have been received + #blocked: Union[bool, int] = False + #gotvalues: bool = False # flag to check if expected number of values have been received # if the future receives an Exception, it is set here - error: Union[None, Exception] = None - is_cancelled: bool = False # flag to check if the future is cancelled - is_running: bool = False # flag to check if the future is currently running + #error: Union[None, Exception] = None + #callbacks: list # list of callback functions + #is_cancelled: bool = False # flag to check if the future is cancelled + #is_running: bool = False # flag to check if the future is currently running def __init__(self, fid, gr, src, num_vals): + super().__init__() self.fid = fid self.gr = gr self.src = src self.nvals = num_vals - self.values = [] - self.callbacks = [] + #self.values = [] + self._result = [] + self.blocked = False + #self.callbacks = [] def get(self): """ Blocking call on current entry method's thread to obtain the values of the @@ -60,18 +65,32 @@ def get(self): """ if not self.gotvalues: + print("NOT AVAILABLE") self.blocked = True self.gr = getcurrent() - self.values = threadMgr.pauseThread() + self._result = threadMgr.pauseThread() + else: + print("AVAILABLE") - if isinstance(self.error, Exception): - raise self.error + if isinstance(self._exception, Exception): + raise self._exception return self.values[0] if self.nvals == 1 else self.values + # self.ready() == self.gotvalues == (self._state == FINISHED)? + # Why do we need three ways to say the same thing? + # ready() is only true if all values have been received. def ready(self): return self.gotvalues + @property + def values(self): + return self._result + + @property + def gotvalues(self): + return len(self.values) == self.nvals + def waitReady(self, f): self.blocked = 2 @@ -87,18 +106,48 @@ def getTargetProxyEntryMethod(self): def deposit(self, result): """ Deposit a value for this future. """ - if self.done(): - raise InvalidStateError() retval = False - self.values.append(result) - if isinstance(result, Exception): - self.error = result - if len(self.values) == self.nvals: - self.gotvalues = True - retval = True - self.is_running = False - self._run_callbacks() + with self._condition: + if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: + raise InvalidStateError('{}: {!r}'.format(self._state, self)) + + self.values.append(result) + + # Should it be finished after receiving a single exception + # or should it wait for all of them? + if isinstance(result, Exception): + self._exception = exception + #self._state = FINISHED + #for waiter in self._waiters: + # waiter.add_exception(self) + #self._condition.notify_all() + #self._invoke_callbacks() + + if self.gotvalues: + self._state = FINISHED + #if isinstance(self._exception, Exception) + # for waiter in self._waiters: + # waiter.add_exception(self) + # add_exception and add_result seem to + # do the same thing so we shouldn't + # need to call them separately + for waiter in self._waiters: + waiter.add_result(self) + self._condition.notify_all() + self._invoke_callbacks() + retval = True + + #retval = False + #self.values.append(result) + #if isinstance(result, Exception): + # self._exception = result + #if len(self.values) == self.nvals: + # self._state = FINISHED + #self.gotvalues = True + #retval = True + #self.is_running = False + #self._run_callbacks() return retval @@ -119,7 +168,40 @@ def __getstate__(self): def __setstate__(self, state): self.fid, self.src = state + #@property + #def _result(): + # return self.values + def cancel(self): + retval = super().cancel() + if retval: + threadMgr.cancelFuture(self) + return retval + ''' + """Cancel the future if possible. + + Returns True if the future was cancelled, False otherwise. A future + cannot be cancelled if it is running or has already completed. + """ + with self._condition: + if self._state in [RUNNING, FINISHED]: + return False + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + return True + + self._state = CANCELLED + threadMgr.cancelFuture(self) + self._condition.notify_all() + + self._invoke_callbacks() + return True + ''' + + """ + def cancel(self): + #retval = super().cancel() + if self.running() or self.done(): return False else: @@ -127,7 +209,8 @@ def cancel(self): threadMgr.cancelFuture(self) self._run_callbacks() return True - + """ + """ def cancelled(self): return self.is_cancelled @@ -143,7 +226,9 @@ def done(self): Exception), "Running:", self.running()) return self.ready() or self.cancelled() or isinstance(self.error, Exception) + """ + """ def result(self, timeout=None): if self.cancelled(): raise CancelledError() @@ -161,7 +246,9 @@ def exception(self, timeout=None): raise e return self.error + """ + """ def _run_callback(self, callback): try: callback(self) @@ -182,8 +269,10 @@ def add_done_callback(self, callback): # any remaining callbacks if self.done(): self._run_callbacks() + """ def set_running_or_notify_cancel(self): + """ if self.cancelled(): retval = False elif self.done(): @@ -191,24 +280,52 @@ def set_running_or_notify_cancel(self): else: self.is_running = True retval = True + """ # How to force this future to start running asynchronously? - # self.waitReady(None) - self.blocked = True - self.resume(threadMgr) - # threadMgr.start() + retval = super().set_running_or_notify_cancel() + if retval: + #self.waitReady(None) + #self.resume(threadMgr) + threadMgr.start() return retval # The following methods aren't used here, but concurrent.futures says unit # tests may use them + # Add this logic (and the exception logic) to deposit + ''' + def set_result(self, result): + """Sets the return value of work associated with the future. + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: + raise InvalidStateError('{}: {!r}'.format(self._state, self)) + self._result = result + self._state = FINISHED + for waiter in self._waiters: + waiter.add_result(self) + self._condition.notify_all() + self._invoke_callbacks() + ''' + + # concurrent.futures assumes the future only needs to wait for a single + # result, but the charm4py future can wait on multiple deposits def set_result(self, result): - self.deposit(result) + if isinstance(result, Iterable): + assert len(result) == self.nvals + for entry in result: + self.deposit(entry) + else: + assert self.nvals == 1 + self.deposit(result) + #super().set_result(exception) def set_exception(self, exception): self.deposit(exception) - + #super().set_exception(exception) class CollectiveFuture(Future): @@ -376,8 +493,8 @@ def depositCollectiveFuture(self, fid, result, obj): def cancelFuture(self, f): fid = f.fid del self.futures[fid] - f.gotvalues = True - f.values = [None] * f.nvals + #f.gotvalues = True + f._result = [None] * f.nvals f.resume(self) # TODO: method to cancel collective future. the main issue with this is diff --git a/examples/dist-task-scheduler/scheduler.py b/examples/dist-task-scheduler/scheduler.py index 227db11a..cd8d2ddc 100644 --- a/examples/dist-task-scheduler/scheduler.py +++ b/examples/dist-task-scheduler/scheduler.py @@ -70,7 +70,7 @@ def taskDone(self, worker_id, task_id, job_id, result): job.addResult(task_id, result) if job.isDone(): self.jobs.pop(job.id) - # job is done, send the result back to whoever submitted the job + # job is done, send the result back to whomever submitted the job job.callback(job.results) # callback is a callable self.schedule() From 2bbb9a8af0cf76319e785304ae5b59cc2f346b61 Mon Sep 17 00:00:00 2001 From: Nicholas Christensen Date: Tue, 6 Dec 2022 21:45:16 -0600 Subject: [PATCH 19/19] Hack to make submit work --- charm4py/pool.py | 12 +++++++++--- charm4py/threads.py | 15 +++++++++++++-- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/charm4py/pool.py b/charm4py/pool.py index ec51f237..9e3293f9 100644 --- a/charm4py/pool.py +++ b/charm4py/pool.py @@ -165,6 +165,7 @@ def startSingleTask(self, func, future, *args): job.remote = self.workers.runTask_star_th else: job.remote = self.workers.runTask_star + #future.set_running_or_notify_cancel() self.schedule() def start(self, func, tasks, result, ncores, chunksize): @@ -218,6 +219,7 @@ def schedule(self): job = self.job_next prev = self while job is not None: + #job.future.set_running_or_notify_cancel() if len(self.idle_workers) == 0: return while True: @@ -482,6 +484,7 @@ def Task(self, func, args, ret=False, awaitable=False): f = Future() # unpack the arguments for sending to allow benefiting from direct copy #print(f) + #f.set_running_or_notify_cancel() self.pool_scheduler.startSingleTask(func, f, *args) return f @@ -556,13 +559,16 @@ def submit(self, fn, /, *args, **kwargs): "charm4py.pool.PoolExecutor object has been shut down") if kwargs is None or len(kwargs) == 0: - return self.pool.Task(fn, args, ret=True) + future = self.pool.Task(fn, args, ret=True) #return self.pool.map_async(_StarmappedFunction(fn), (args,), # chunksize=1, ncores=-1) else: # Task doesn't support kwargs so this sneaks them in with a tuple iterable_arg = tuple([tuple([args, frozendict(kwargs)])]) - return self.pool.Task(_WrappedFunction(fn), iterable_arg, ret=True) + future = self.pool.Task(_WrappedFunction(fn), iterable_arg, ret=True) + + future.set_running_or_notify_cancel() + return future def map(self, func, *iterables, timeout=None, chunksize=1, ncores=-1): if self.is_shutdown: @@ -586,7 +592,7 @@ def shutdown(self, wait=True, *, cancel_futures=False): job.future.cancel() # Is this necessary? - self.pool.pool_scheduler.schedule() + #self.pool.pool_scheduler.schedule() if wait: for job in self.pool.pool_scheduler.jobs: diff --git a/charm4py/threads.py b/charm4py/threads.py index 8e96280e..be098d74 100644 --- a/charm4py/threads.py +++ b/charm4py/threads.py @@ -106,7 +106,6 @@ def getTargetProxyEntryMethod(self): def deposit(self, result): """ Deposit a value for this future. """ - retval = False with self._condition: if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: @@ -283,11 +282,23 @@ def set_running_or_notify_cancel(self): """ # How to force this future to start running asynchronously? + # Can't really control which greenlet executes next so + # a greenlet might complete before it is marked to run. + # Need to make Concurrent.wait() work here. + #if self._state == FINISHED: + # return True retval = super().set_running_or_notify_cancel() if retval: #self.waitReady(None) + # Pause this thread (and allow other threads to execute) + threadMgr.pauseThread() + #self.blocked = True + # Resume this thread #self.resume(threadMgr) - threadMgr.start() + #threadMgr.start() + #self.gr = getcurrent() + #threadMgr.resumeThread(self.gr, self) + #threadMgr.resumeThread(threadMgr.main_gr, self) return retval