diff --git a/docs/enUS/arguments.md b/docs/enUS/arguments.md new file mode 100644 index 0000000..5f1dfa5 --- /dev/null +++ b/docs/enUS/arguments.md @@ -0,0 +1,12 @@ +## Init Policy + +There four type init policy for restart RedisBeat,which defined at [redisbeat/constans.py](): + +- INIT_POLICY_DEFAULT + - Default policy for RedisBeat, If you miss N times run during your restart, your task will run N times after restart. +- INIT_POLICY_RESET + - Reset all task's last run time to restart time. +- INIT_POLICY_FAST_FORWARD + - If you miss N times run during your restart, your task will only run ONCE after restart and continue from curr time. +- INIT_POLICY_IMMEDIATELY + - Run all task immediately, and reset all task's last run time as curr restart time. diff --git a/docs/zhCN/arguments.md b/docs/zhCN/arguments.md new file mode 100644 index 0000000..f765ecd --- /dev/null +++ b/docs/zhCN/arguments.md @@ -0,0 +1,12 @@ +## 初始化策略 + +RedisBeat 在 [redisbeat/constans.py]() 中定义了 4 种不同的初始化策略,分别为: + +- INIT_POLICY_DEFAULT + - RedisBeat 的默认策略,如果在重启期间你错过了 N 次执行,那么重启之后会补足这 N 次执行; +- INIT_POLICY_RESET + - 在重启时将所有任务的开始计数时间重置为当前时间; +- INIT_POLICY_FAST_FORWARD + - 如果在重启期间你错过了 N 次执行,那么重启之后只会执行一次,并且后续将按照正常周期计算; +- INIT_POLICY_IMMEDIATELY + - 在重启后马上执行所有的任务,并且重置计数时间为当前时间; diff --git a/example/Dockerfile b/example/Dockerfile index 62b573b..4e343a4 100644 --- a/example/Dockerfile +++ b/example/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.6-alpine +FROM python:2.7-alpine WORKDIR /usr/src/app RUN apk add --no-cache --virtual \ diff --git a/example/Makefile b/example/Makefile new file mode 100644 index 0000000..5dc9604 --- /dev/null +++ b/example/Makefile @@ -0,0 +1,14 @@ + +.PHONY: worker beat + +worker: + celery worker -A tasks -l INFO + +beat: + celery beat -A tasks -S redisbeat.RedisScheduler -l DEBUG + +add_task: + python add_task.py + +rm_task: + python rem_task.py \ No newline at end of file diff --git a/example/add_task.py b/example/add_task.py index 7bc7cd7..4faa771 100644 --- a/example/add_task.py +++ b/example/add_task.py @@ -21,7 +21,7 @@ if __name__ == "__main__": schduler = RedisScheduler(app=app, skip_init=True) schduler.add(**{ - 'name': 'sub-perminute', + 'name': 'sub-every-3-seconds', 'task': 'tasks.sub', 'schedule': timedelta(seconds=3), 'args': (1, 1) diff --git a/example/rem_task.py b/example/rem_task.py index 0875f82..70a69ea 100644 --- a/example/rem_task.py +++ b/example/rem_task.py @@ -7,5 +7,5 @@ if __name__ == "__main__": schduler = RedisScheduler(app=app, skip_init=True) - result = schduler.remove('sub-perminute') + result = schduler.remove('sub-every-3-seconds') print("rem result: ", result) diff --git a/example/tasks.py b/example/tasks.py index 82afc50..5683dfc 100644 --- a/example/tasks.py +++ b/example/tasks.py @@ -11,15 +11,14 @@ if hostname != "beat" and hostname != "worker": redis_url = 'redis://localhost:6379' - app = Celery('tasks', backend=redis_url, broker=redis_url) app.conf.update(CELERY_REDIS_SCHEDULER_URL = redis_url) -if hostname == "beat": +if hostname == "devops": app.conf.update( CELERYBEAT_SCHEDULE={ - 'perminute': { + 'every-3-seconds': { 'task': 'tasks.add', 'schedule': timedelta(seconds=3), 'args': (1, 1) diff --git a/redisbeat/constants.py b/redisbeat/constants.py index ab110cd..80a5ad1 100644 --- a/redisbeat/constants.py +++ b/redisbeat/constants.py @@ -9,14 +9,21 @@ @created at: 2020/3/7 """ +DEFUALT_INIT_POLICY = "DEFAULT" INIT_POLICY_RESET = "RESET" -INIT_POLICY_DEFAULT = "DEFAULT" INIT_POLICY_IMMEDIATELY = "IMMEDIATELY" INIT_POLICY_FAST_FORWARD = "FAST_FORWARD" INIT_POLICIES = [ INIT_POLICY_RESET, - INIT_POLICY_DEFAULT, + DEFUALT_INIT_POLICY, INIT_POLICY_IMMEDIATELY, INIT_POLICY_FAST_FORWARD, -] \ No newline at end of file +] + +CONFIG_INIT_POLICY = "CELERY_REDIS_SCHEDULER_INIT_POLICY" +BROKER_URL = "CELERY_REDIS_SCHEDULER_URL" +BROKER_TRANSPORT_OPTIONS = "CELERY_BROKER_TRANSPORT_OPTIONS" +BROKER_KEY = "CELERY_REDIS_SCHEDULER_KEY" +MULTI_NODE_MODE = "CELERY_REDIS_MULTI_NODE_MODE" +LOCK_TTL = "CELERY_REDIS_SCHEDULER_LOCK_TTL" \ No newline at end of file diff --git a/redisbeat/scheduler.py b/redisbeat/scheduler.py index 8b961c4..efe35da 100644 --- a/redisbeat/scheduler.py +++ b/redisbeat/scheduler.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 2014 Kong Luoxing @@ -11,9 +12,8 @@ # use this file except in compliance with the License. You may obtain a copy # of the License at http://www.apache.org/licenses/LICENSE-2.0 -from functools import partial -import jsonpickle import sys +import jsonpickle from time import mktime import traceback @@ -28,69 +28,124 @@ except ImportError: import urlparse -logger = get_logger(__name__) -debug, linfo, error, warning = (logger.debug, logger.info, logger.error, - logger.warning) try: MAXINT = sys.maxint except AttributeError: # python3 MAXINT = sys.maxsize +from redisbeat.constants import ( + INIT_POLICIES, + DEFUALT_INIT_POLICY, + INIT_POLICY_IMMEDIATELY, + INIT_POLICY_RESET, + INIT_POLICY_FAST_FORWARD, + CONFIG_INIT_POLICY, + BROKER_URL, + BROKER_KEY, + BROKER_TRANSPORT_OPTIONS, + MULTI_NODE_MODE, + LOCK_TTL, +) + + +logger = get_logger(__name__) +debug, linfo, error, warning = (logger.debug, logger.info, logger.error, + logger.warning) + +default_transport_options = { + "master_name": "mymaster", +} +default_broker_key = "celery:beat:order_tasks" + + +class EncodeException(Exception): + pass + +class TaskNotExistsException(Exception): + pass + +class Codec(object): + """ + Codec is used to encode and decode task entry + The default codec is jsonpickle + """ + def encode(self, obj): + encode_obj = jsonpickle.encode(obj) + if encode_obj is None: + raise EncodeException("encode obj is None") + return encode_obj + + def decode(self, obj): + return jsonpickle.decode(obj) + class RedisScheduler(Scheduler): def __init__(self, *args, **kwargs): app = kwargs['app'] self.skip_init = kwargs.get('skip_init', False) - self.key = app.conf.get("CELERY_REDIS_SCHEDULER_KEY", - "celery:beat:order_tasks") + + self.codec = Codec() + + self.key = app.conf.get(BROKER_KEY, default_broker_key) + self.schedule_init_policy = app.conf.get(CONFIG_INIT_POLICY, DEFUALT_INIT_POLICY) + if self.schedule_init_policy not in INIT_POLICIES: + raise "unexpected init policy " + self.schedule_init_policy self.max_interval = 2 # default max interval is 2 seconds - self.schedule_url = app.conf.get("CELERY_REDIS_SCHEDULER_URL", - "redis://localhost:6379") + self.schedule_url = app.conf.get(BROKER_URL, "redis://localhost:6379") # using sentinels # supports 'sentinel://:pass@host:port/db if self.schedule_url.startswith('sentinel://'): - self.broker_transport_options = app.conf.get( - "CELERY_BROKER_TRANSPORT_OPTIONS", {"master_name": "mymaster"}) + self.broker_transport_options = app.conf.get(BROKER_TRANSPORT_OPTIONS, default_transport_options) self.rdb = self.sentinel_connect( self.broker_transport_options['master_name']) else: self.rdb = StrictRedis.from_url(self.schedule_url) Scheduler.__init__(self, *args, **kwargs) - app.add_task = partial(self.add, self) + app.add_task = self.add - self.multi_node = app.conf.get("CELERY_REDIS_MULTI_NODE_MODE", False) + self.multi_node = app.conf.get(MULTI_NODE_MODE, False) # how long we should hold on to the redis lock in seconds if self.multi_node: - self.lock_ttl = current_app.conf.get( - "CELERY_REDIS_SCHEDULER_LOCK_TTL", 30) + self.lock_ttl = app.conf.get(LOCK_TTL, 30) self._lock_acquired = False self._lock = self.rdb.lock( 'celery:beat:task_lock', timeout=self.lock_ttl) self._lock_acquired = self._lock.acquire(blocking=False) - + def _remove_db(self): linfo("remove db now") self.rdb.delete(self.key) - def _when(self, entry, next_time_to_run): - return mktime(entry.schedule.now().timetuple()) + (self.adjust(next_time_to_run) or 0) + def _when(self, entry, next_run_time): + return mktime(entry.schedule.now().timetuple()) + (self.adjust(next_run_time) or 0) def setup_schedule(self): debug("setup schedule, skip_init: %s", self.skip_init) if self.skip_init: return - # init entries - self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE) - tasks = [jsonpickle.decode(entry) - for entry in self.rdb.zrange(self.key, 0, -1)] - linfo('Current schedule:\n' + '\n'.join( - str('task: ' + entry.task + '; each: ' + repr(entry.schedule)) - for entry in tasks)) + + # if the tasks in config not exists in db, add it to db + self.merge_inplace(self.app.conf['CELERYBEAT_SCHEDULE']) + # init entries + # entries = [jsonpickle.decode(task) for task in self.rdb.zrange(self.key, 0, -1)] + entries = self.rdb.zrange(self.key, 0, MAXINT) + # linfo('Current schedule:\n' + '\n'.join( + # str('task: ' + entry.task + '; each: ' + repr(entry.schedule)) + # for entry in entries)) + for entry in entries: + decode_task = self.codec.decode(entry) + linfo("checking task entry(%s): %s", decode_task.name, decode_task.schedule) + next_run_interval, new_task = self._calculate_next_run_time_with_init_policy(decode_task) + next_run_time = self._when(new_task, next_run_interval) + + self.rdb.zrem(self.key, entry) + self.rdb.zadd(self.key, {self.codec.encode(new_task): next_run_time}) + def merge_inplace(self, tasks): - old_entries = self.rdb.zrangebyscore( - self.key, 0, MAXINT, withscores=True) + old_entries = self.rdb.zrangebyscore(self.key, 0, MAXINT, withscores=True) + print("old_entries dict: {}".format(old_entries)) old_entries_dict = dict({}) for task, score in old_entries: if not task: @@ -99,25 +154,39 @@ def merge_inplace(self, tasks): entry = jsonpickle.decode(task) old_entries_dict[entry.name] = (entry, score) debug("old_entries: %s", old_entries_dict) - - self.rdb.delete(self.key) - - for key in tasks: - last_run_at = 0 - e = self.Entry(**dict(tasks[key], name=key, app=self.app)) - if key in old_entries_dict: - # replace entry and remain old score - last_run_at = old_entries_dict[key][1] - del old_entries_dict[key] - self.rdb.zadd(self.key, {jsonpickle.encode(e): min( - last_run_at, self._when(e, e.is_due()[1]) or 0)}) - debug("old_entries: %s", old_entries_dict) - for key, tasks in old_entries_dict.items(): - debug("key: %s", key) - debug("tasks: %s", tasks) - debug("zadd: %s", self.rdb.zadd( - self.key, {jsonpickle.encode(tasks[0]): tasks[1]})) - debug(self.rdb.zrange(self.key, 0, -1)) + print("old_entries dict: {}".format(old_entries_dict)) + + # self.rdb.delete(self.key) + for task_name, task in tasks.items(): + e = self.Entry(**dict(task, name=task_name, app=self.app)) + if task_name not in old_entries_dict: + _, next_run_interval = e.is_due() + next_run_time = self._when(e, next_run_interval) + linfo("add task entry: %s, next_run_time:%d to db", task_name, next_run_interval) + print("add task entry: {}, next_run_time:{} to db".format(task_name, next_run_interval)) + self.rdb.zadd(self.key, {self.codec.encode(e): next_run_time}) + + def _calculate_next_run_time_with_init_policy(self, entry): + if self.schedule_init_policy == INIT_POLICY_RESET: + entry.last_run_at = entry.default_now() + _, next_run_time = entry.is_due() + return next_run_time, entry + elif self.schedule_init_policy == INIT_POLICY_IMMEDIATELY: + return 0, entry + elif self.schedule_init_policy == INIT_POLICY_FAST_FORWARD: + last_run_at = entry.last_run_at + should_run_now, next_run_time = entry.is_due() + if should_run_now: + while should_run_now: #TODO: this is not a good way to do this + entry.last_run_at = last_run_at + next_run_time + should_run_now, next_run_time = entry.is_due() + entry.last_run_at -= next_run_time + return 0, entry + else: + return next_run_time, entry + else: # default policy + _, next_run_time = entry.is_due() + return next_run_time, entry def is_due(self, entry): return entry.is_due() @@ -129,12 +198,13 @@ def adjust(self, n, drift=-0.010): def add(self, **kwargs): e = self.Entry(app=current_app, **kwargs) - self.rdb.zadd(self.key, {jsonpickle.encode( - e): self._when(e, e.is_due()[1]) or 0}) + _, next_run_interval = e.is_due() + next_run_time = self._when(e, next_run_interval) + self.rdb.zadd(self.key, {self.codec.encode(e): next_run_time}) return True def remove(self, task_key): - tasks = self.rdb.zrange(self.key, 0, -1) or [] + tasks = self.rdb.zrange(self.key, 0, MAXINT) or [] for idx, task in enumerate(tasks): entry = jsonpickle.decode(task) if entry.name == task_key: @@ -144,10 +214,10 @@ def remove(self, task_key): return False def list(self): - return [jsonpickle.decode(entry) for entry in self.rdb.zrange(self.key, 0, -1)] + return [jsonpickle.decode(entry) for entry in self.rdb.zrange(self.key, 0, MAXINT)] def get(self, task_key): - tasks = self.rdb.zrange(self.key, 0, -1) or [] + tasks = self.rdb.zrange(self.key, 0, MAXINT) or [] for idx, task in enumerate(tasks): entry = jsonpickle.decode(task) if entry.name == task_key: @@ -159,15 +229,16 @@ def tick(self): tasks = self.rdb.zrangebyscore( self.key, 0, self.adjust(mktime(self.app.now().timetuple()), drift=0.010), - withscores=True) or [] + withscores=True, + ) or [] next_times = [self.max_interval, ] - for task, score in tasks: + for task, _ in tasks: entry = jsonpickle.decode(task) - is_due, next_time_to_run = self.is_due(entry) + is_due, next_run_time = self.is_due(entry) - next_times.append(next_time_to_run) + next_times.append(next_run_time) if is_due: next_entry = self.reserve(entry) try: @@ -177,13 +248,12 @@ def tick(self): error('Message Error: %s\n%s', exc, traceback.format_stack(), exc_info=True) else: - debug('%s sent. id->%s', entry.task, result.id) + debug('%s sent. id->%s', entry.task, result) + print("remove 2: '{}'".format(task)) self.rdb.zrem(self.key, task) - self.rdb.zadd(self.key, {jsonpickle.encode( - next_entry): self._when(next_entry, next_time_to_run) or 0}) + self.rdb.zadd(self.key, {self.codec.encode(next_entry): self._when(next_entry, next_run_time) or 0}) - next_task = self.rdb.zrangebyscore( - self.key, 0, MAXINT, withscores=True, num=1, start=0) + next_task = self.rdb.zrangebyscore(self.key, 0, MAXINT, withscores=True, num=1, start=0) if not next_task: linfo("no next task found") return min(next_times) diff --git a/t/unit/test_scheduler_init.py b/t/unit/test_scheduler_init.py new file mode 100644 index 0000000..0bffd00 --- /dev/null +++ b/t/unit/test_scheduler_init.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python +# encoding: utf-8 +import time +from datetime import timedelta +import unittest + +from celery import Celery +from redis import StrictRedis + +from redisbeat import RedisScheduler + + +redis_key = "celery:beat:order_tasks" +min_redis_score = 0 +max_redis_score = 10000000000 + +class TestSchedulerInitPolicy(unittest.TestCase): + def setUp(self): + super(TestSchedulerInitPolicy, self).setUp() + + self.redis_url = 'redis://localhost:6379' + self.redis_cli = StrictRedis.from_url(self.redis_url) + self.redis_cli.zpopmin(redis_key, count=1000) + + def test_default_init_policy(self): + app = Celery('tasks', backend=self.redis_url, broker=self.redis_url) + + app.conf.update( + CELERY_REDIS_SCHEDULER_URL=self.redis_url, + CELERY_REDIS_SCHEDULER_INIT_POLICY="DEFAULT", + CELERYBEAT_SCHEDULE={ + 'perminute': { + 'task': 'tasks.add', + 'schedule': timedelta(seconds=1), + 'args': (1, 1) + } + } + ) + + RedisScheduler(app=app) + time.sleep(3) + RedisScheduler(app=app) # reinit again + + results = self.redis_cli.zrange(redis_key, min_redis_score, max_redis_score, withscores=True) + self.assertEqual(len(results), 1) + self.assertLess(results[0][1], time.time()) + + def test_reset_init_policy(self): + app = Celery('tasks', backend=self.redis_url, broker=self.redis_url) + + app.conf.update( + CELERY_REDIS_SCHEDULER_URL=self.redis_url, + CELERY_REDIS_SCHEDULER_INIT_POLICY="DEFAULT", + CELERYBEAT_SCHEDULE={ + 'perminute': { + 'task': 'tasks.add', + 'schedule': timedelta(seconds=1), + 'args': (1, 1) + } + } + ) + + time.sleep(3) + + RedisScheduler(app=app) + results = self.redis_cli.zrangebyscore(redis_key, min_redis_score, max_redis_score, withscores=True) + self.assertEqual(len(results), 1) + self.assertGreater(results[0][1], time.time()) + + def test_immediately_init_policy(self): + app = Celery('tasks', backend=self.redis_url, broker=self.redis_url) + + app.conf.update( + CELERY_REDIS_SCHEDULER_URL=self.redis_url, + CELERY_REDIS_SCHEDULER_INIT_POLICY="DEFAULT", + CELERYBEAT_SCHEDULE={ + 'perminute': { + 'task': 'tasks.add', + 'schedule': timedelta(seconds=1), + 'args': (1, 1) + } + } + ) + + time.sleep(3) + + RedisScheduler(app=app) + results = self.redis_cli.zrangebyscore(redis_key, min_redis_score, max_redis_score, withscores=True) + self.assertEqual(len(results), 1) + self.assertGreater(results[0][1], time.time()) + + def test_fast_forward_init_policy(self): + app = Celery('tasks', backend=self.redis_url, broker=self.redis_url) + + app.conf.update( + CELERY_REDIS_SCHEDULER_URL=self.redis_url, + CELERY_REDIS_SCHEDULER_INIT_POLICY="DEFAULT", + CELERYBEAT_SCHEDULE={ + 'perminute': { + 'task': 'tasks.add', + 'schedule': timedelta(seconds=1), + 'args': (1, 1) + } + } + ) + + time.sleep(3) + + RedisScheduler(app=app) + results = self.redis_cli.zrangebyscore(redis_key, min_redis_score, max_redis_score, withscores=True) + self.assertEqual(len(results), 1) + self.assertGreater(results[0][1], time.time()) diff --git a/t/unit/test_scheduler.py b/t/unit/test_scheduler_tasks_operation.py similarity index 66% rename from t/unit/test_scheduler.py rename to t/unit/test_scheduler_tasks_operation.py index 3ac3094..fbf9b14 100644 --- a/t/unit/test_scheduler.py +++ b/t/unit/test_scheduler_tasks_operation.py @@ -12,20 +12,21 @@ redis_key = "celery:beat:order_tasks" min_redis_score = 0 -max_redis_score = 10000000000000000 +max_redis_score = 100000000000 -class TestStringMethods(unittest.TestCase): - def setUp(self) -> None: - super().setUp() +class TestDynamicOeration(unittest.TestCase): + def setUp(self): + super(TestDynamicOeration, self).setUp() + self.redis_url = 'redis://localhost:6379' self.redis_cli = StrictRedis.from_url(self.redis_url) self.redis_cli.zpopmin(redis_key, count=1000) - def test_redisbeat(self): + def test_add_tasks(self): app = Celery('tasks', backend=self.redis_url, broker=self.redis_url) - app.conf.update(CELERY_REDIS_SCHEDULER_URL = self.redis_url) app.conf.update( + CELERY_REDIS_SCHEDULER_URL=self.redis_url, CELERYBEAT_SCHEDULE={ 'perminute': { 'task': 'tasks.add', @@ -35,8 +36,11 @@ def test_redisbeat(self): } ) + results = self.redis_cli.zrange(redis_key, min_redis_score, max_redis_score) + self.assertEqual(len(results), 0) + RedisScheduler(app=app) - results = self.redis_cli.zrangebyscore(redis_key, min_redis_score, max_redis_score, withscores=True) + results = self.redis_cli.zrange(redis_key, min_redis_score, max_redis_score) for result in results: print(result) self.assertEqual(len(results), 1)