From ea7c6bc477fbe11358e4a5f3d2100e7f6d4eaf36 Mon Sep 17 00:00:00 2001 From: lusong <404828407@qq.com> Date: Tue, 5 Sep 2017 14:10:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8D=E4=BE=9D=E8=B5=96=E4=B8=8A=E4=B8=80?= =?UTF-8?q?=E5=91=A8=E6=9C=9Frepeat=20job=E7=9A=84=E7=94=9F=E6=88=90job?= =?UTF-8?q?=E7=9A=84=E7=AD=96=E7=95=A5bug=EF=BC=8C=E6=94=B9=E7=9A=84?= =?UTF-8?q?=E8=BF=99=E4=B8=AAaddRepeatJobForInterval=EF=BC=88=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 主要两点: nextTriggerTime 要大于getLastGenerateTriggerTime(),每一轮都要大于上一轮的getLastGenerateTriggerTime。 防止上次生成的job正在执行时,这轮又可以生成重复的可执行job,因为这里是通过数据库taskid和tasktracker 唯一索引去重的。原来的逻辑会在上一轮的job正在执行时,生成重复的job。 另一点: 第一次生成job时,lastGenerateTriggerTime应该为0,而不是now。因为如果是now的话,nextTriggerTime 有可能选择时小于now,导致少生成一次job。 逻辑已经测过,不会多生成job,也不会少生成job。 --- .../queue/support/NonRelyJobUtils.java | 255 +++++++++--------- 1 file changed, 133 insertions(+), 122 deletions(-) diff --git a/lts-core/src/main/java/com/github/ltsopensource/queue/support/NonRelyJobUtils.java b/lts-core/src/main/java/com/github/ltsopensource/queue/support/NonRelyJobUtils.java index e6eb0d5d8..cc58c7f78 100644 --- a/lts-core/src/main/java/com/github/ltsopensource/queue/support/NonRelyJobUtils.java +++ b/lts-core/src/main/java/com/github/ltsopensource/queue/support/NonRelyJobUtils.java @@ -1,122 +1,133 @@ -package com.github.ltsopensource.queue.support; - -import com.github.ltsopensource.core.commons.utils.DateUtils; -import com.github.ltsopensource.core.constant.Constants; -import com.github.ltsopensource.core.logger.Logger; -import com.github.ltsopensource.core.logger.LoggerFactory; -import com.github.ltsopensource.core.support.CronExpressionUtils; -import com.github.ltsopensource.core.support.JobUtils; -import com.github.ltsopensource.queue.CronJobQueue; -import com.github.ltsopensource.queue.ExecutableJobQueue; -import com.github.ltsopensource.queue.RepeatJobQueue; -import com.github.ltsopensource.queue.domain.JobPo; -import com.github.ltsopensource.store.jdbc.exception.DupEntryException; - -import java.util.Date; - -/** - * @author Robert HG (254963746@qq.com) on 4/6/16. - */ -public class NonRelyJobUtils { - - private static final Logger LOGGER = LoggerFactory.getLogger(NonRelyJobUtils.class); - - /** - * 生成一个小时的任务 - */ - public static void addCronJobForInterval(ExecutableJobQueue executableJobQueue, - CronJobQueue cronJobQueue, - int scheduleIntervalMinute, - final JobPo finalJobPo, - Date lastGenerateTime) { - JobPo jobPo = JobUtils.copy(finalJobPo); - - String cronExpression = jobPo.getCronExpression(); - long endTime = DateUtils.addMinute(lastGenerateTime, scheduleIntervalMinute).getTime(); - Date timeAfter = lastGenerateTime; - boolean stop = false; - while (!stop) { - Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronExpression, timeAfter); - if (nextTriggerTime == null) { - stop = true; - } else { - if (nextTriggerTime.getTime() <= endTime) { - // 添加任务 - jobPo.setTriggerTime(nextTriggerTime.getTime()); - jobPo.setJobId(JobUtils.generateJobId()); - jobPo.setTaskId(finalJobPo.getTaskId() + "_" + DateUtils.format(nextTriggerTime, "MMdd-HHmmss")); - jobPo.setInternalExtParam(Constants.ONCE, Boolean.TRUE.toString()); - try { - jobPo.setInternalExtParam(Constants.EXE_SEQ_ID, JobUtils.generateExeSeqId(jobPo)); - executableJobQueue.add(jobPo); - } catch (DupEntryException e) { - LOGGER.warn("Cron Job[taskId={}, taskTrackerNodeGroup={}] Already Exist in ExecutableJobQueue", - jobPo.getTaskId(), jobPo.getTaskTrackerNodeGroup()); - } - } else { - stop = true; - } - } - timeAfter = nextTriggerTime; - } - cronJobQueue.updateLastGenerateTriggerTime(finalJobPo.getJobId(), endTime); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Add CronJob {} to {}", jobPo, DateUtils.formatYMD_HMS(new Date(endTime))); - } - } - - public static void addRepeatJobForInterval( - ExecutableJobQueue executableJobQueue, - RepeatJobQueue repeatJobQueue, - int scheduleIntervalMinute, final JobPo finalJobPo, Date lastGenerateTime) { - JobPo jobPo = JobUtils.copy(finalJobPo); - long firstTriggerTime = Long.valueOf(jobPo.getInternalExtParam(Constants.FIRST_FIRE_TIME)); - - Long repeatInterval = jobPo.getRepeatInterval(); - Integer repeatCount = jobPo.getRepeatCount(); - - long endTime = DateUtils.addMinute(lastGenerateTime, scheduleIntervalMinute).getTime(); - if (endTime <= firstTriggerTime) { - return; - } - // 计算出应该重复的次数 - int repeatedCount = Long.valueOf((lastGenerateTime.getTime() - firstTriggerTime) / jobPo.getRepeatInterval()).intValue(); - if (repeatedCount <= 0) { - repeatedCount = 1; //repeatedCount从1开始 - } - - if (repeatedCount < 0) { - repeatedCount = 0; - } - - boolean stop = false; - while (!stop) { - Long nextTriggerTime = firstTriggerTime + (repeatedCount - 1) * repeatInterval; //第一次执行时间点应该是firstTriggerTime - - if (nextTriggerTime <= endTime && - (repeatCount == -1 || repeatedCount <= repeatCount)) { - // 添加任务 - jobPo.setTriggerTime(nextTriggerTime); - jobPo.setJobId(JobUtils.generateJobId()); - jobPo.setTaskId(finalJobPo.getTaskId() + "_" + DateUtils.format(new Date(nextTriggerTime), "MMdd-HHmmss")); - jobPo.setRepeatedCount(repeatedCount); - jobPo.setInternalExtParam(Constants.ONCE, Boolean.TRUE.toString()); - try { - jobPo.setInternalExtParam(Constants.EXE_SEQ_ID, JobUtils.generateExeSeqId(jobPo)); - executableJobQueue.add(jobPo); - } catch (DupEntryException e) { - LOGGER.warn("Repeat Job[taskId={}, taskTrackerNodeGroup={}] Already Exist in ExecutableJobQueue", - jobPo.getTaskId(), jobPo.getTaskTrackerNodeGroup()); - } - repeatedCount++; - } else { - stop = true; - } - } - // 更新时间 - repeatJobQueue.updateLastGenerateTriggerTime(finalJobPo.getJobId(), endTime); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Add RepeatJob {} to {}", jobPo, DateUtils.formatYMD_HMS(new Date(endTime))); - } - } -} +package com.github.ltsopensource.queue.support; + +import com.github.ltsopensource.core.commons.utils.DateUtils; +import com.github.ltsopensource.core.constant.Constants; +import com.github.ltsopensource.core.logger.Logger; +import com.github.ltsopensource.core.logger.LoggerFactory; +import com.github.ltsopensource.core.support.CronExpressionUtils; +import com.github.ltsopensource.core.support.JobUtils; +import com.github.ltsopensource.queue.CronJobQueue; +import com.github.ltsopensource.queue.ExecutableJobQueue; +import com.github.ltsopensource.queue.RepeatJobQueue; +import com.github.ltsopensource.queue.domain.JobPo; +import com.github.ltsopensource.store.jdbc.exception.DupEntryException; + +import java.util.Date; + +/** + * @author Robert HG (254963746@qq.com) on 4/6/16. + */ +public class NonRelyJobUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(NonRelyJobUtils.class); + + /** + * 生成一个小时的任务 + */ + public static void addCronJobForInterval(ExecutableJobQueue executableJobQueue, + CronJobQueue cronJobQueue, + int scheduleIntervalMinute, + final JobPo finalJobPo, + Date lastGenerateTime) { + JobPo jobPo = JobUtils.copy(finalJobPo); + + String cronExpression = jobPo.getCronExpression(); + long endTime = DateUtils.addMinute(lastGenerateTime, scheduleIntervalMinute).getTime(); + Date timeAfter = lastGenerateTime; + boolean stop = false; + while (!stop) { + Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronExpression, timeAfter); + if (nextTriggerTime == null) { + stop = true; + } else { + if (nextTriggerTime.getTime() <= endTime) { + // 添加任务 + jobPo.setTriggerTime(nextTriggerTime.getTime()); + jobPo.setJobId(JobUtils.generateJobId()); + jobPo.setTaskId(finalJobPo.getTaskId() + "_" + DateUtils.format(nextTriggerTime, "MMdd-HHmmss")); + jobPo.setInternalExtParam(Constants.ONCE, Boolean.TRUE.toString()); + try { + jobPo.setInternalExtParam(Constants.EXE_SEQ_ID, JobUtils.generateExeSeqId(jobPo)); + executableJobQueue.add(jobPo); + } catch (DupEntryException e) { + LOGGER.warn("Cron Job[taskId={}, taskTrackerNodeGroup={}] Already Exist in ExecutableJobQueue", + jobPo.getTaskId(), jobPo.getTaskTrackerNodeGroup()); + } + } else { + stop = true; + } + } + timeAfter = nextTriggerTime; + } + cronJobQueue.updateLastGenerateTriggerTime(finalJobPo.getJobId(), endTime); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Add CronJob {} to {}", jobPo, DateUtils.formatYMD_HMS(new Date(endTime))); + } + } + + public static void addRepeatJobForInterval( + ExecutableJobQueue executableJobQueue, + RepeatJobQueue repeatJobQueue, + int scheduleIntervalMinute, final JobPo finalJobPo, Date lastGenerateTime) { + JobPo jobPo = JobUtils.copy(finalJobPo); + long firstTriggerTime = Long.valueOf(jobPo.getInternalExtParam(Constants.FIRST_FIRE_TIME)); + + Long repeatInterval = jobPo.getRepeatInterval(); + Integer repeatCount = jobPo.getRepeatCount(); + + long endTime = DateUtils.addMinute(lastGenerateTime, scheduleIntervalMinute).getTime(); + if (endTime <= firstTriggerTime) { + return; + } + + // 计算出已经生成的可执行job 起始repeatedCount + long initTime = lastGenerateTime.getTime(); + int repeatedCount = Long.valueOf((initTime - firstTriggerTime) / jobPo.getRepeatInterval()).intValue(); + if (repeatedCount <= 0) { + repeatedCount = 1; + } + + final JobPo repeatJobPjo = repeatJobQueue.getJob(jobPo.getTaskTrackerNodeGroup(), jobPo.getTaskId()); + + boolean stop = false; + while (!stop) { + final Long nextTriggerTime = firstTriggerTime + (repeatedCount - 1) * repeatInterval; + + if (repeatJobPjo.getLastGenerateTriggerTime() == null || repeatJobPjo.getLastGenerateTriggerTime() == 0) { + // 说明是第一次生成executable job,默认第一次为0,如果第一次设置为当前时间,那么有可能第一次triggertime小于当前时间,导致少生成一次job + initTime = 0; + } + + //这里也要大于上次的generateTiggerTime,防止上次生成的job正在执行时,这回又生成成功可执行job,因为这里是通过数据库唯一索引去重的 + if ((nextTriggerTime > initTime) && (nextTriggerTime <= endTime) + && (repeatCount == -1 || repeatedCount <= repeatCount)) { + // 添加任务 + jobPo.setTriggerTime(nextTriggerTime); + jobPo.setJobId(JobUtils.generateJobId()); + jobPo.setTaskId(finalJobPo.getTaskId() + "_" + + DateUtils.format(new Date(nextTriggerTime), "MMdd-HHmmss")); + jobPo.setRepeatedCount(repeatedCount); + jobPo.setInternalExtParam(Constants.ONCE, Boolean.TRUE.toString()); + try { + jobPo.setInternalExtParam(Constants.EXE_SEQ_ID, JobUtils.generateExeSeqId(jobPo)); + executableJobQueue.add(jobPo); + } catch (DupEntryException e) { + LOGGER.warn("Repeat Job[taskId={}, taskTrackerNodeGroup={}] Already Exist in ExecutableJobQueue", + jobPo.getTaskId(), jobPo.getTaskTrackerNodeGroup()); + } + repeatedCount++; + } else if (repeatedCount < repeatCount) { + //前面的job已经生成过了,继续增加repeatedCount,生成下面的job + repeatedCount++; + } else { + stop = true; + } + } + + // 更新时间 + repeatJobQueue.updateLastGenerateTriggerTime(finalJobPo.getJobId(), endTime); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Add RepeatJob {} to {}", jobPo, DateUtils.formatYMD_HMS(new Date(endTime))); + } + } +}