From 28b68f1897280f6ba8ee2bc0d2cc1b3e0e1d8eef Mon Sep 17 00:00:00 2001 From: lusong <404828407@qq.com> Date: Tue, 5 Sep 2017 14:00:47 +0800 Subject: [PATCH] =?UTF-8?q?retryhandler=E5=A4=84=E7=90=86repeat=20job?= =?UTF-8?q?=E7=9A=84bug=EF=BC=8C=20=E5=81=87=E5=A6=82job=20repeatCount?= =?UTF-8?q?=E4=B8=BA5=EF=BC=8C=E9=87=8D=E8=AF=95=E6=AC=A1=E6=95=B0?= =?UTF-8?q?=E8=AE=BE=E7=BD=AE=E4=B8=BA5=E6=AC=A1=EF=BC=8C=E9=82=A3?= =?UTF-8?q?=E4=B9=88=E5=89=8D=E9=9D=A24=E6=AC=A1repeat=E9=83=BD=E4=BC=9A?= =?UTF-8?q?=E6=AF=94=E8=BE=83retry=E7=9A=84=E6=97=B6=E9=97=B4=E5=92=8C?= =?UTF-8?q?=E4=B8=8B=E6=AC=A1repeat=20job=E7=9A=84=E6=97=B6=E9=97=B4.=20?= =?UTF-8?q?=E8=80=8C=E6=9C=80=E5=90=8E=E4=B8=80=E6=AC=A1repeat=20job?= =?UTF-8?q?=E4=BC=9A=E6=8A=8A=E9=87=8D=E8=AF=95=E6=AC=A1=E6=95=B05?= =?UTF-8?q?=E6=AC=A1=E7=94=A8=E5=AE=8C=E3=80=82=E5=85=B7=E4=BD=93=E7=9C=8B?= =?UTF-8?q?=E6=B3=A8=E9=87=8A=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 原来的逻辑会导致job永远不会结束执行,因为repeatedCount一直没有变,永远不能时repeatCount变为5,循环执行下去! --- .../jobtracker/complete/JobRetryHandler.java | 246 ++++++++++-------- 1 file changed, 131 insertions(+), 115 deletions(-) diff --git a/lts-jobtracker/src/main/java/com/github/ltsopensource/jobtracker/complete/JobRetryHandler.java b/lts-jobtracker/src/main/java/com/github/ltsopensource/jobtracker/complete/JobRetryHandler.java index ac832bf3b..db4ea9d5e 100644 --- a/lts-jobtracker/src/main/java/com/github/ltsopensource/jobtracker/complete/JobRetryHandler.java +++ b/lts-jobtracker/src/main/java/com/github/ltsopensource/jobtracker/complete/JobRetryHandler.java @@ -1,115 +1,131 @@ -package com.github.ltsopensource.jobtracker.complete; - -import com.github.ltsopensource.core.commons.utils.CollectionUtils; -import com.github.ltsopensource.core.constant.Constants; -import com.github.ltsopensource.core.constant.ExtConfig; -import com.github.ltsopensource.core.domain.Job; -import com.github.ltsopensource.core.domain.JobMeta; -import com.github.ltsopensource.core.domain.JobRunResult; -import com.github.ltsopensource.core.json.JSON; -import com.github.ltsopensource.core.logger.Logger; -import com.github.ltsopensource.core.logger.LoggerFactory; -import com.github.ltsopensource.core.spi.ServiceLoader; -import com.github.ltsopensource.core.support.CronExpressionUtils; -import com.github.ltsopensource.core.support.JobUtils; -import com.github.ltsopensource.core.support.SystemClock; -import com.github.ltsopensource.jobtracker.complete.retry.DefaultJobRetryTimeGenerator; -import com.github.ltsopensource.jobtracker.complete.retry.JobRetryTimeGenerator; -import com.github.ltsopensource.jobtracker.domain.JobTrackerAppContext; -import com.github.ltsopensource.queue.domain.JobPo; -import com.github.ltsopensource.store.jdbc.exception.DupEntryException; - -import java.util.Date; -import java.util.List; - -/** - * @author Robert HG (254963746@qq.com) on 11/11/15. - */ -public class JobRetryHandler { - - private static final Logger LOGGER = LoggerFactory.getLogger(JobRetryHandler.class); - - private JobTrackerAppContext appContext; - private int retryInterval = 30 * 1000; // 默认30s - private JobRetryTimeGenerator jobRetryTimeGenerator; - - public JobRetryHandler(JobTrackerAppContext appContext) { - this.appContext = appContext; - this.retryInterval = appContext.getConfig().getParameter(ExtConfig.JOB_TRACKER_JOB_RETRY_INTERVAL_MILLIS, 30 * 1000); - this.jobRetryTimeGenerator = ServiceLoader.load(JobRetryTimeGenerator.class, appContext.getConfig()); - } - - public void onComplete(List results) { - - if (CollectionUtils.isEmpty(results)) { - return; - } - for (JobRunResult result : results) { - - JobMeta jobMeta = result.getJobMeta(); - // 1. 加入到重试队列 - JobPo jobPo = appContext.getExecutingJobQueue().getJob(jobMeta.getJobId()); - if (jobPo == null) { // 表示已经被删除了 - continue; - } - - Job job = jobMeta.getJob(); - if (!(jobRetryTimeGenerator instanceof DefaultJobRetryTimeGenerator)) { - job = JobUtils.copy(jobMeta.getJob()); - job.setTaskId(jobMeta.getRealTaskId()); // 这个对于用户需要转换为用户提交的taskId - } - // 得到下次重试时间 - Long nextRetryTriggerTime = jobRetryTimeGenerator.getNextRetryTriggerTime(job, jobPo.getRetryTimes(), retryInterval); - // 重试次数+1 - jobPo.setRetryTimes((jobPo.getRetryTimes() == null ? 0 : jobPo.getRetryTimes()) + 1); - - if (jobPo.isCron()) { - // 如果是 cron Job, 判断任务下一次执行时间和重试时间的比较 - JobPo cronJobPo = appContext.getCronJobQueue().getJob(jobMeta.getJobId()); - if (cronJobPo != null) { - Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronJobPo.getCronExpression()); - if (nextTriggerTime != null && nextTriggerTime.getTime() < nextRetryTriggerTime) { - // 表示下次还要执行, 并且下次执行时间比下次重试时间要早, 那么不重试,直接使用下次的执行时间 - nextRetryTriggerTime = nextTriggerTime.getTime(); - jobPo = cronJobPo; - } else { - jobPo.setInternalExtParam(Constants.IS_RETRY_JOB, Boolean.TRUE.toString()); - } - } - } else if (jobPo.isRepeatable()) { - JobPo repeatJobPo = appContext.getRepeatJobQueue().getJob(jobMeta.getJobId()); - if (repeatJobPo != null) { - // 比较下一次重复时间和重试时间 - if (repeatJobPo.getRepeatCount() == -1 || (repeatJobPo.getRepeatedCount() < repeatJobPo.getRepeatCount())) { - long nexTriggerTime = JobUtils.getRepeatNextTriggerTime(jobPo); - if (nexTriggerTime < nextRetryTriggerTime) { - // 表示下次还要执行, 并且下次执行时间比下次重试时间要早, 那么不重试,直接使用下次的执行时间 - nextRetryTriggerTime = nexTriggerTime; - jobPo = repeatJobPo; - } else { - if(jobPo.getRetryTimes() < repeatJobPo.getMaxRetryTimes()) { //最后一次重试时,这个参数不能设置,为了在finishHandler时能执行到incRepeatedCount - jobPo.setInternalExtParam(Constants.IS_RETRY_JOB, Boolean.TRUE.toString()); - } - } - } - } - } else { - jobPo.setInternalExtParam(Constants.IS_RETRY_JOB, Boolean.TRUE.toString()); - } - - // 加入到队列, 重试 - jobPo.setTaskTrackerIdentity(null); - jobPo.setIsRunning(false); - jobPo.setGmtModified(SystemClock.now()); - // 延迟重试时间就等于重试次数(分钟) - jobPo.setTriggerTime(nextRetryTriggerTime); - try { - appContext.getExecutableJobQueue().add(jobPo); - } catch (DupEntryException e) { - LOGGER.warn("ExecutableJobQueue already exist:" + JSON.toJSONString(jobPo)); - } - // 从正在执行的队列中移除 - appContext.getExecutingJobQueue().remove(jobPo.getJobId()); - } - } -} +package com.github.ltsopensource.jobtracker.complete; + +import com.github.ltsopensource.core.commons.utils.CollectionUtils; +import com.github.ltsopensource.core.constant.Constants; +import com.github.ltsopensource.core.constant.ExtConfig; +import com.github.ltsopensource.core.domain.Job; +import com.github.ltsopensource.core.domain.JobMeta; +import com.github.ltsopensource.core.domain.JobRunResult; +import com.github.ltsopensource.core.json.JSON; +import com.github.ltsopensource.core.logger.Logger; +import com.github.ltsopensource.core.logger.LoggerFactory; +import com.github.ltsopensource.core.spi.ServiceLoader; +import com.github.ltsopensource.core.support.CronExpressionUtils; +import com.github.ltsopensource.core.support.JobUtils; +import com.github.ltsopensource.core.support.SystemClock; +import com.github.ltsopensource.jobtracker.complete.retry.DefaultJobRetryTimeGenerator; +import com.github.ltsopensource.jobtracker.complete.retry.JobRetryTimeGenerator; +import com.github.ltsopensource.jobtracker.domain.JobTrackerAppContext; +import com.github.ltsopensource.queue.domain.JobPo; +import com.github.ltsopensource.store.jdbc.exception.DupEntryException; + +import java.util.Date; +import java.util.List; + +/** + * @author Robert HG (254963746@qq.com) on 11/11/15. + */ +public class JobRetryHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(JobRetryHandler.class); + + private JobTrackerAppContext appContext; + private int retryInterval = 30 * 1000; // 默认30s + private JobRetryTimeGenerator jobRetryTimeGenerator; + + public JobRetryHandler(JobTrackerAppContext appContext) { + this.appContext = appContext; + this.retryInterval = appContext.getConfig().getParameter(ExtConfig.JOB_TRACKER_JOB_RETRY_INTERVAL_MILLIS, 30 * 1000); + this.jobRetryTimeGenerator = ServiceLoader.load(JobRetryTimeGenerator.class, appContext.getConfig()); + } + + public void onComplete(List results) { + + if (CollectionUtils.isEmpty(results)) { + return; + } + for (JobRunResult result : results) { + + JobMeta jobMeta = result.getJobMeta(); + // 1. 加入到重试队列 + JobPo jobPo = appContext.getExecutingJobQueue().getJob(jobMeta.getJobId()); + if (jobPo == null) { // 表示已经被删除了 + continue; + } + + Job job = jobMeta.getJob(); + if (!(jobRetryTimeGenerator instanceof DefaultJobRetryTimeGenerator)) { + job = JobUtils.copy(jobMeta.getJob()); + job.setTaskId(jobMeta.getRealTaskId()); // 这个对于用户需要转换为用户提交的taskId + } + // 得到下次重试时间 + Long nextRetryTriggerTime = jobRetryTimeGenerator.getNextRetryTriggerTime(job, jobPo.getRetryTimes(), retryInterval); + // 重试次数+1 + jobPo.setRetryTimes((jobPo.getRetryTimes() == null ? 0 : jobPo.getRetryTimes()) + 1); + + if (jobPo.isCron()) { + // 如果是 cron Job, 判断任务下一次执行时间和重试时间的比较 + JobPo cronJobPo = appContext.getCronJobQueue().getJob(jobMeta.getJobId()); + if (cronJobPo != null) { + Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronJobPo.getCronExpression()); + if (nextTriggerTime != null && nextTriggerTime.getTime() < nextRetryTriggerTime) { + // 表示下次还要执行, 并且下次执行时间比下次重试时间要早, 那么不重试,直接使用下次的执行时间 + nextRetryTriggerTime = nextTriggerTime.getTime(); + jobPo = cronJobPo; + } else { + jobPo.setInternalExtParam(Constants.IS_RETRY_JOB, Boolean.TRUE.toString()); + } + } + } else if (jobPo.isRepeatable()) { + JobPo repeatJobPo = appContext.getRepeatJobQueue().getJob(jobMeta.getJobId()); + if (repeatJobPo != null) { + // 比较下一次重复时间和重试时间 + if (repeatJobPo.getRepeatCount() == -1 + || (repeatJobPo.getRepeatedCount() < repeatJobPo.getRepeatCount())) { + long nexTriggerTime = JobUtils.getRepeatNextTriggerTime(jobPo); + if (nexTriggerTime < nextRetryTriggerTime + && (repeatJobPo.getRepeatedCount() + 1 < repeatJobPo.getRepeatCount())) { + // 表示下次还要执行, 并且下次执行时间比下次重试时间要早, 那么不重试,直接使用下次的执行时间 + // 最后一次repeat job的重试不会走到这里,会继续生成retry的job,cool + nextRetryTriggerTime = nexTriggerTime; + jobPo = repeatJobPo; + + // 虽然不再执行这次job的retry,但是也要更新repeatQueue的repeatedCount + final int updatedRepeatedCount = appContext.getRepeatJobQueue().incRepeatedCount( + jobMeta.getJobId()); + if (updatedRepeatedCount >= jobPo.getRepeatCount()) { + // 如果更新后的repeatedCount已经达到预定次数,就不再生产可执行job + return; + } + + if (jobPo.getRelyOnPrevCycle()) { + // 依赖上一周期的repeat job,下次job的repeatedCount要加1 + jobPo.setRepeatedCount(updatedRepeatedCount + 1); + } + } else { + if (jobPo.getRetryTimes() < repeatJobPo.getMaxRetryTimes()) { // 最后一次重试时,这个参数不能设置,为了在finish时能执行到incRepeatedCount + jobPo.setInternalExtParam(Constants.IS_RETRY_JOB, Boolean.TRUE.toString()); + } + } + } + } + } else { + jobPo.setInternalExtParam(Constants.IS_RETRY_JOB, Boolean.TRUE.toString()); + } + + // 加入到队列, 重试 + jobPo.setTaskTrackerIdentity(null); + jobPo.setIsRunning(false); + jobPo.setGmtModified(SystemClock.now()); + // 延迟重试时间就等于重试次数(分钟) + jobPo.setTriggerTime(nextRetryTriggerTime); + try { + appContext.getExecutableJobQueue().add(jobPo); + } catch (DupEntryException e) { + LOGGER.warn("ExecutableJobQueue already exist:" + JSON.toJSONString(jobPo)); + } + // 从正在执行的队列中移除 + appContext.getExecutingJobQueue().remove(jobPo.getJobId()); + } + } +}