diff --git a/charm4py/pool.py b/charm4py/pool.py index 91c9a1db..9e3293f9 100644 --- a/charm4py/pool.py +++ b/charm4py/pool.py @@ -1,8 +1,14 @@ +from dataclasses import dataclass +from frozendict import frozendict +from typing import Callable from . import charm, Chare, Group, coro_ext, threads, Future from .charm import Charm4PyError from .threads import NotThreadedError from collections import defaultdict from copy import deepcopy +from concurrent.futures import Executor, TimeoutError +#from concurrent.futures import wait as cf_wait +from gevent import Timeout import sys @@ -55,22 +61,28 @@ def __init__(self, id, func, tasks, result, ncores, chunksize): if result is None or isinstance(result, threads.Future): self.results = [None] * len(tasks) self.future = result - self.tasks = [Chunk(tasks[i:i+chunksize], i) for i in range(0, len(tasks), chunksize)] + self.tasks = [Chunk(tasks[i:i + chunksize], i) + for i in range(0, len(tasks), chunksize)] else: - self.tasks = [Chunk(tasks[i:i+chunksize], result[i:i+chunksize]) for i in range(0, len(tasks), chunksize)] + self.tasks = [Chunk(tasks[i:i + chunksize], result[i:i + chunksize]) + for i in range(0, len(tasks), chunksize)] else: if result is None or isinstance(result, threads.Future): self.results = [None] * len(tasks) self.future = result if func is not None: - self.tasks = [Task(args, i) for i, args in enumerate(tasks)] + self.tasks = [Task(args, i) + for i, args in enumerate(tasks)] else: - self.tasks = [Task(args, i, func) for i, (func, args) in enumerate(tasks)] + self.tasks = [Task(args, i, func) + for i, (func, args) in enumerate(tasks)] else: if func is not None: - self.tasks = [Task(args, result[i]) for i, args in enumerate(tasks)] + self.tasks = [Task(args, result[i]) + for i, args in enumerate(tasks)] else: - self.tasks = [Task(args, result[i], func) for i, (func, args) in enumerate(tasks)] + self.tasks = [Task(args, result[i], func) + for i, (func, args) in enumerate(tasks)] # print('Created job with', len(self.tasks), 'tasks') self.tasks_pending = len(self.tasks) @@ -89,6 +101,7 @@ def taskDone(self): class PoolScheduler(Chare): def __init__(self): + super().__init__() self.workers = None self.idle_workers = set(range(1, charm.numPes())) self.num_workers = len(self.idle_workers) @@ -103,9 +116,12 @@ def __start__(self, func, tasks, result): if self.workers is None: assert self.num_workers > 0, 'Run with more than 1 PE to use charm.pool' # first time running a job, create Group of workers - print('Initializing charm.pool with', self.num_workers, 'worker PEs. ' - 'Warning: charm.pool is experimental (API and performance ' - 'is subject to change)') + print( + 'Initializing charm.pool with', + self.num_workers, + 'worker PEs. ' + 'Warning: charm.pool is experimental (API and performance ' + 'is subject to change)') self.workers = Group(Worker, args=[self.thisProxy]) if len(self.job_id_pool) == 0: @@ -117,10 +133,12 @@ def __start__(self, func, tasks, result): if charm.interactive: try: if func is not None: - self.workers.check(func.__module__, func.__name__, awaitable=True).get() + self.workers.check( + func.__module__, func.__name__, awaitable=True).get() else: for func_, args in tasks: - self.workers.check(func_.__module__, func_.__name__, awaitable=True).get() + self.workers.check( + func_.__module__, func_.__name__, awaitable=True).get() except Exception as e: if result is None: raise e @@ -138,13 +156,16 @@ def __addJob__(self, job): def startSingleTask(self, func, future, *args): self.__start__(func, None, None) - job = Job(self.job_id_pool.pop(), func, (args,), future, self.num_workers, 1) + job = Job(self.job_id_pool.pop(), func, + (args,), future, self.num_workers, 1) job.single_task = True + self.__addJob__(job) if job.threaded: job.remote = self.workers.runTask_star_th else: job.remote = self.workers.runTask_star + #future.set_running_or_notify_cancel() self.schedule() def start(self, func, tasks, result, ncores, chunksize): @@ -158,7 +179,13 @@ def start(self, func, tasks, result, ncores, chunksize): self.__start__(func, tasks, result) - job = Job(self.job_id_pool.pop(), func, tasks, result, ncores, chunksize) + job = Job( + self.job_id_pool.pop(), + func, + tasks, + result, + ncores, + chunksize) self.__addJob__(job) if job.chunked: @@ -187,31 +214,37 @@ def start(self, func, tasks, result, ncores, chunksize): self.schedule() def schedule(self): + from time import time + print("SCHEDULED at time", time()) job = self.job_next prev = self while job is not None: + #job.future.set_running_or_notify_cancel() if len(self.idle_workers) == 0: return while True: if not job.failed: - task = job.getTask() - if task is None: - break - worker_id = self.idle_workers.pop() - # print('Sending task to worker', worker_id) - - if job.func is not None: - func = None - if job.id not in self.worker_knows[worker_id]: - func = job.func - job.workers.append(worker_id) - self.worker_knows[worker_id].add(job.id) + if True:#job.future.set_running_or_notify_cancel(): + task = job.getTask() + if task is None: + break + worker_id = self.idle_workers.pop() + # print('Sending task to worker', worker_id) + + if job.func is not None: + func = None + if job.id not in self.worker_knows[worker_id]: + func = job.func + job.workers.append(worker_id) + self.worker_knows[worker_id].add(job.id) + else: + func = task.func + # NOTE: this is a non-standard way of using proxies, but is + # faster and allows the scheduler to reuse the same proxy + self.workers.elemIdx = worker_id + job.remote(func, task.data, task.result_dest, job.id) else: - func = task.func - # NOTE: this is a non-standard way of using proxies, but is - # faster and allows the scheduler to reuse the same proxy - self.workers.elemIdx = worker_id - job.remote(func, task.data, task.result_dest, job.id) + break if len(job.tasks) == 0: prev.job_next = job.job_next @@ -238,7 +271,7 @@ def taskFinished(self, worker_id, job_id, result=None): if job.chunked: i, results = result n = len(results) - job.results[i:i+n] = results + job.results[i:i + n] = results else: i, _result = result job.results[i] = _result @@ -271,7 +304,8 @@ def taskError(self, worker_id, job_id, exception): job.exception = exception self.idle_workers.add(worker_id) # marking as failed will allow the scheduler to delete it from the linked list - # NOTE that we will only delete from the 'jobs' list once all the pending tasks are done + # NOTE that we will only delete from the 'jobs' list once all the + # pending tasks are done job.failed = True if not hasattr(job, 'future'): if job.chunked: @@ -300,7 +334,8 @@ class Worker(Chare): def __init__(self, scheduler): self.scheduler = scheduler - assert len(self.scheduler.elemIdx) > 0 # make sure points to the element, not collection + # make sure points to the element, not collection + assert len(self.scheduler.elemIdx) > 0 self.__addThreadEventSubscriber__(scheduler, self.thisIndex) # TODO: when to purge entries from this dict? self.funcs = {} # job ID -> function used by this job ID @@ -324,14 +359,18 @@ def runTask(self, func, args, result_destination, job_id): try: result = func(args) if isinstance(result_destination, int): - self.scheduler.taskFinished(self.thisIndex, job_id, (result_destination, result)) + self.scheduler.taskFinished( + self.thisIndex, job_id, (result_destination, result)) else: # assume result_destination is a future result_destination.send(result) self.scheduler.taskFinished(self.thisIndex, job_id) except Exception as e: if isinstance(e, NotThreadedError): - e = Charm4PyError('Function ' + str(func) + ' must be decorated with @coro to be able to suspend') + e = Charm4PyError( + 'Function ' + + str(func) + + ' must be decorated with @coro to be able to suspend') charm.prepareExceptionForSend(e) self.scheduler.taskError(self.thisIndex, job_id, e) if not isinstance(result_destination, int): @@ -345,14 +384,18 @@ def runTask_star(self, func, args, result_destination, job_id): try: result = func(*args) if isinstance(result_destination, int): - self.scheduler.taskFinished(self.thisIndex, job_id, (result_destination, result)) + self.scheduler.taskFinished( + self.thisIndex, job_id, (result_destination, result)) else: # assume result_destination is a future result_destination.send(result) self.scheduler.taskFinished(self.thisIndex, job_id) except Exception as e: if isinstance(e, NotThreadedError): - e = Charm4PyError('Function ' + str(func) + ' must be decorated with @coro to be able to suspend') + e = Charm4PyError( + 'Function ' + + str(func) + + ' must be decorated with @coro to be able to suspend') charm.prepareExceptionForSend(e) self.scheduler.taskError(self.thisIndex, job_id, e) if not isinstance(result_destination, int): @@ -390,7 +433,8 @@ def runChunk(self, _, chunk, result_destination, job_id): def send_chunk_results(self, results, result_destination, job_id): if isinstance(result_destination, int): - self.scheduler.taskFinished(self.thisIndex, job_id, (result_destination, results)) + self.scheduler.taskFinished( + self.thisIndex, job_id, (result_destination, results)) else: # assume result_destination is a list of futures # TODO: should send all results together to PE where future was created, @@ -401,7 +445,8 @@ def send_chunk_results(self, results, result_destination, job_id): def send_chunk_exc(self, e, result_destination, job_id): if isinstance(e, NotThreadedError): - e = Charm4PyError('Function not decorated with @coro tried to suspend') + e = Charm4PyError( + 'Function not decorated with @coro tried to suspend') charm.prepareExceptionForSend(e) self.scheduler.taskError(self.thisIndex, job_id, e) if not isinstance(result_destination, int): @@ -410,7 +455,8 @@ def send_chunk_exc(self, e, result_destination, job_id): def check(self, func_module, func_name): if charm.options.remote_exec is not True: - raise Charm4PyError('Remote code execution is disabled. Set charm.options.remote_exec to True') + raise Charm4PyError( + 'Remote code execution is disabled. Set charm.options.remote_exec to True') eval(func_name, sys.modules[func_module].__dict__) @@ -437,16 +483,25 @@ def Task(self, func, args, ret=False, awaitable=False): if ret or awaitable: f = Future() # unpack the arguments for sending to allow benefiting from direct copy + #print(f) + #f.set_running_or_notify_cancel() self.pool_scheduler.startSingleTask(func, f, *args) return f def map(self, func, iterable, chunksize=1, ncores=-1): result = Future() - # TODO shouldn't send task objects to a central place. what if they are large? + # TODO shouldn't send task objects to a central place. what if they are + # large? self.pool_scheduler.start(func, iterable, result, ncores, chunksize) return result.get() - def map_async(self, func, iterable, chunksize=1, ncores=-1, multi_future=False): + def map_async( + self, + func, + iterable, + chunksize=1, + ncores=-1, + multi_future=False): if self.mype == 0: # see deepcopy comment above (only need this for async case since # the sync case won't return until all the tasks have finished) @@ -463,5 +518,93 @@ def map_async(self, func, iterable, chunksize=1, ncores=-1, multi_future=False): def submit(self, iterable, chunksize=1, ncores=-1): return self.map(None, iterable, chunksize, ncores) - def submit_async(self, iterable, chunksize=1, ncores=-1, multi_future=False): + def submit_async( + self, + iterable, + chunksize=1, + ncores=-1, + multi_future=False): return self.map_async(None, iterable, chunksize, ncores, multi_future) + + +@dataclass +class _WrappedFunction: + + fn: Callable + + def __call__(self, args_kwargs): + in_args = args_kwargs[0] + in_kwargs = args_kwargs[1] + return self.fn(*in_args, **in_kwargs) + + +@dataclass +class _StarmappedFunction: + + fn: Callable + + def __call__(self, args_iterable): + return self.fn(*args_iterable) + + +class PoolExecutor(Executor): + + def __init__(self, pool_scheduler_chare): + self.pool = Pool(pool_scheduler_chare) + self.is_shutdown = False + + def submit(self, fn, /, *args, **kwargs): + if self.is_shutdown: + raise RuntimeError( + "charm4py.pool.PoolExecutor object has been shut down") + + if kwargs is None or len(kwargs) == 0: + future = self.pool.Task(fn, args, ret=True) + #return self.pool.map_async(_StarmappedFunction(fn), (args,), + # chunksize=1, ncores=-1) + else: + # Task doesn't support kwargs so this sneaks them in with a tuple + iterable_arg = tuple([tuple([args, frozendict(kwargs)])]) + future = self.pool.Task(_WrappedFunction(fn), iterable_arg, ret=True) + + future.set_running_or_notify_cancel() + return future + + def map(self, func, *iterables, timeout=None, chunksize=1, ncores=-1): + if self.is_shutdown: + raise RuntimeError( + "charm4py.pool.PoolExecutor object has been shut down") + + with Timeout(timeout, TimeoutError) as timeout: + result = self.pool.map(_StarmappedFunction(func), list(zip(*iterables)), + chunksize=chunksize, ncores=ncores) + + return result + + def shutdown(self, wait=True, *, cancel_futures=False): + + # Prevent more jobs from being submitted + self.is_shutdown = True + + if cancel_futures: + for job in self.pool.pool_scheduler.jobs: + if isinstance(getattr(job, 'future', None), threads.Future): + job.future.cancel() + + # Is this necessary? + #self.pool.pool_scheduler.schedule() + + if wait: + for job in self.pool.pool_scheduler.jobs: + if isinstance(getattr(job, 'future', None), threads.Future): + job.future.get() + """ + futures = [ + job.future for job in self.pool.pool_scheduler.jobs if isinstance( + getattr( + job, + 'future', + None), + threads.Future)] + cf_wait(futures) + """ diff --git a/charm4py/threads.py b/charm4py/threads.py index e05678cf..be098d74 100644 --- a/charm4py/threads.py +++ b/charm4py/threads.py @@ -1,5 +1,12 @@ from greenlet import getcurrent - +from concurrent.futures import Future as ConcurrentFuture +from concurrent.futures import CancelledError, TimeoutError, InvalidStateError +from concurrent.futures._base import CANCELLED, FINISHED, CANCELLED_AND_NOTIFIED +from gevent import Timeout +from sys import stderr +from dataclasses import dataclass +from typing import Union +from collections.abc import Iterable # Future IDs (fids) are sometimes carried as reference numbers inside # Charm++ CkCallback objects. The data type most commonly used for @@ -23,37 +30,67 @@ def __init__(self, msg): # See commit 25e2935 if need to resurrect code where proxies were included when # futures were pickled. -class Future(object): +#@dataclass() +class Future(ConcurrentFuture): + + #fid: int # unique future ID within the process that created it + #gr: object # greenlet that created the future + # PE where the future was created (not used for collective futures) + #src: int + #nvals: int # number of values that the future expects to receive + #values: list # values of the future + # flag to check if creator thread is blocked on the future + #blocked: Union[bool, int] = False + #gotvalues: bool = False # flag to check if expected number of values have been received + # if the future receives an Exception, it is set here + #error: Union[None, Exception] = None + #callbacks: list # list of callback functions + #is_cancelled: bool = False # flag to check if the future is cancelled + #is_running: bool = False # flag to check if the future is currently running def __init__(self, fid, gr, src, num_vals): - self.fid = fid # unique future ID within the process that created it - self.gr = gr # greenlet that created the future - self.src = src # PE where the future was created (not used for collective futures) - self.nvals = num_vals # number of values that the future expects to receive - self.values = [] # values of the future - self.blocked = False # flag to check if creator thread is blocked on the future - self.gotvalues = False # flag to check if expected number of values have been received - self.error = None # if the future receives an Exception, it is set here + super().__init__() + self.fid = fid + self.gr = gr + self.src = src + self.nvals = num_vals + #self.values = [] + self._result = [] + self.blocked = False + #self.callbacks = [] def get(self): """ Blocking call on current entry method's thread to obtain the values of the future. If the values are already available then they are returned immediately. """ + if not self.gotvalues: + print("NOT AVAILABLE") self.blocked = True self.gr = getcurrent() - self.values = threadMgr.pauseThread() + self._result = threadMgr.pauseThread() + else: + print("AVAILABLE") - if self.error is not None: - raise self.error + if isinstance(self._exception, Exception): + raise self._exception - if self.nvals == 1: - return self.values[0] - return self.values + return self.values[0] if self.nvals == 1 else self.values + # self.ready() == self.gotvalues == (self._state == FINISHED)? + # Why do we need three ways to say the same thing? + # ready() is only true if all values have been received. def ready(self): return self.gotvalues + @property + def values(self): + return self._result + + @property + def gotvalues(self): + return len(self.values) == self.nvals + def waitReady(self, f): self.blocked = 2 @@ -69,17 +106,54 @@ def getTargetProxyEntryMethod(self): def deposit(self, result): """ Deposit a value for this future. """ - self.values.append(result) - if isinstance(result, Exception): - self.error = result - if len(self.values) == self.nvals: - self.gotvalues = True - return True - return False + retval = False + with self._condition: + if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: + raise InvalidStateError('{}: {!r}'.format(self._state, self)) + + self.values.append(result) + + # Should it be finished after receiving a single exception + # or should it wait for all of them? + if isinstance(result, Exception): + self._exception = exception + #self._state = FINISHED + #for waiter in self._waiters: + # waiter.add_exception(self) + #self._condition.notify_all() + #self._invoke_callbacks() + + if self.gotvalues: + self._state = FINISHED + #if isinstance(self._exception, Exception) + # for waiter in self._waiters: + # waiter.add_exception(self) + # add_exception and add_result seem to + # do the same thing so we shouldn't + # need to call them separately + for waiter in self._waiters: + waiter.add_result(self) + self._condition.notify_all() + self._invoke_callbacks() + retval = True + + #retval = False + #self.values.append(result) + #if isinstance(result, Exception): + # self._exception = result + #if len(self.values) == self.nvals: + # self._state = FINISHED + #self.gotvalues = True + #retval = True + #self.is_running = False + #self._run_callbacks() + + return retval def resume(self, threadMgr): if self.blocked == 2: - # someone is waiting for future to become ready, signal by sending myself + # someone is waiting for future to become ready, signal by sending + # myself self.blocked = False threadMgr.resumeThread(self.gr, self) elif self.blocked: @@ -93,6 +167,176 @@ def __getstate__(self): def __setstate__(self, state): self.fid, self.src = state + #@property + #def _result(): + # return self.values + + def cancel(self): + retval = super().cancel() + if retval: + threadMgr.cancelFuture(self) + return retval + ''' + """Cancel the future if possible. + + Returns True if the future was cancelled, False otherwise. A future + cannot be cancelled if it is running or has already completed. + """ + with self._condition: + if self._state in [RUNNING, FINISHED]: + return False + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + return True + + self._state = CANCELLED + threadMgr.cancelFuture(self) + self._condition.notify_all() + + self._invoke_callbacks() + return True + ''' + + """ + def cancel(self): + #retval = super().cancel() + + if self.running() or self.done(): + return False + else: + self.is_cancelled = True + threadMgr.cancelFuture(self) + self._run_callbacks() + return True + """ + """ + def cancelled(self): + return self.is_cancelled + + def running(self): + return self.is_running + + def done(self): + print( + "Ready:", self.ready(), + "Cancelled:", self.cancelled(), + "Failed:", isinstance( + self.error, + Exception), + "Running:", self.running()) + return self.ready() or self.cancelled() or isinstance(self.error, Exception) + """ + + """ + def result(self, timeout=None): + if self.cancelled(): + raise CancelledError() + with Timeout(timeout, TimeoutError): + result = self.get() + return result + + def exception(self, timeout=None): + try: + self.result(timeout=timeout) + except Exception as e: + if isinstance( + e, TimeoutError) or isinstance( + e, CancelledError) or e != self.error: + raise e + + return self.error + """ + + """ + def _run_callback(self, callback): + try: + callback(self) + except Exception as e: + print(e, file=stderr) + + def _run_callbacks(self): + while len(self.callbacks) > 0: + try: + callback = self.callbacks.pop() + self._run_callback(callback) + except IndexError: + break + + def add_done_callback(self, callback): + self.callbacks.append(callback) + # Status may have changed while appending. Run + # any remaining callbacks + if self.done(): + self._run_callbacks() + """ + + def set_running_or_notify_cancel(self): + """ + if self.cancelled(): + retval = False + elif self.done(): + raise InvalidStateError() + else: + self.is_running = True + retval = True + """ + + # How to force this future to start running asynchronously? + # Can't really control which greenlet executes next so + # a greenlet might complete before it is marked to run. + # Need to make Concurrent.wait() work here. + #if self._state == FINISHED: + # return True + retval = super().set_running_or_notify_cancel() + if retval: + #self.waitReady(None) + # Pause this thread (and allow other threads to execute) + threadMgr.pauseThread() + #self.blocked = True + # Resume this thread + #self.resume(threadMgr) + #threadMgr.start() + #self.gr = getcurrent() + #threadMgr.resumeThread(self.gr, self) + #threadMgr.resumeThread(threadMgr.main_gr, self) + + return retval + + # The following methods aren't used here, but concurrent.futures says unit + # tests may use them + + # Add this logic (and the exception logic) to deposit + ''' + def set_result(self, result): + """Sets the return value of work associated with the future. + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: + raise InvalidStateError('{}: {!r}'.format(self._state, self)) + self._result = result + self._state = FINISHED + for waiter in self._waiters: + waiter.add_result(self) + self._condition.notify_all() + self._invoke_callbacks() + ''' + + # concurrent.futures assumes the future only needs to wait for a single + # result, but the charm4py future can wait on multiple deposits + def set_result(self, result): + if isinstance(result, Iterable): + assert len(result) == self.nvals + for entry in result: + self.deposit(entry) + else: + assert self.nvals == 1 + self.deposit(result) + #super().set_result(exception) + + def set_exception(self, exception): + self.deposit(exception) + #super().set_exception(exception) class CollectiveFuture(Future): @@ -147,12 +391,16 @@ def isMainThread(self): def objMigrating(self, obj): if obj._numthreads > 0: - raise Charm4PyError('Migration of chares with active threads is not currently supported') + raise Charm4PyError( + 'Migration of chares with active threads is not currently supported') def throwNotThreadedError(self): - raise NotThreadedError("Method '" + charm.last_em_exec.C.__name__ + "." + - charm.last_em_exec.name + - "' must be a couroutine to be able to suspend (decorate it with @coro)") + raise NotThreadedError( + "Method '" + + charm.last_em_exec.C.__name__ + + "." + + charm.last_em_exec.name + + "' must be a couroutine to be able to suspend (decorate it with @coro)") def pauseThread(self): """ Called by an entry method thread to wait for something. @@ -207,7 +455,8 @@ def createFuture(self, num_vals=1): # get a unique local Future ID global FIDMAXVAL futures = self.futures - assert len(futures) < FIDMAXVAL, 'Too many pending futures, cannot create more' + assert len( + futures) < FIDMAXVAL, 'Too many pending futures, cannot create more' fid = (self.lastfid % FIDMAXVAL) + 1 while fid in futures: fid = (fid % FIDMAXVAL) + 1 @@ -231,8 +480,10 @@ def depositFuture(self, fid, result): try: f = futures[fid] except KeyError: - raise Charm4PyError('No pending future with fid=' + str(fid) + '. A common reason is ' - 'sending to a future that already received its value(s)') + raise Charm4PyError( + 'No pending future with fid=' + + str(fid) + '. A common reason is ' + 'sending to a future that already received its value(s)') if f.deposit(result): del futures[fid] # resume if a thread is blocked on the future @@ -253,8 +504,8 @@ def depositCollectiveFuture(self, fid, result, obj): def cancelFuture(self, f): fid = f.fid del self.futures[fid] - f.gotvalues = True - f.values = [None] * f.nvals + #f.gotvalues = True + f._result = [None] * f.nvals f.resume(self) # TODO: method to cancel collective future. the main issue with this is diff --git a/examples/dist-task-scheduler/scheduler.py b/examples/dist-task-scheduler/scheduler.py index 227db11a..cd8d2ddc 100644 --- a/examples/dist-task-scheduler/scheduler.py +++ b/examples/dist-task-scheduler/scheduler.py @@ -70,7 +70,7 @@ def taskDone(self, worker_id, task_id, job_id, result): job.addResult(task_id, result) if job.isDone(): self.jobs.pop(job.id) - # job is done, send the result back to whoever submitted the job + # job is done, send the result back to whomever submitted the job job.callback(job.results) # callback is a callable self.schedule()