提交 34299011 authored 作者: xuxueli's avatar xuxueli

任务触发逻辑重构

上级 5c0b206f
...@@ -32,7 +32,7 @@ public class ExecutorRouteBusyover extends ExecutorRouter { ...@@ -32,7 +32,7 @@ public class ExecutorRouteBusyover extends ExecutorRouter {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e ); idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
} }
idleBeatResultSB.append("<br>----------------------<br>") idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"<br><br>":"")
.append("空闲检测:") .append("空闲检测:")
.append("<br>address:").append(address) .append("<br>address:").append(address)
.append("<br>code:").append(idleBeatResult.getCode()) .append("<br>code:").append(idleBeatResult.getCode())
...@@ -43,7 +43,7 @@ public class ExecutorRouteBusyover extends ExecutorRouter { ...@@ -43,7 +43,7 @@ public class ExecutorRouteBusyover extends ExecutorRouter {
jobLog.setExecutorAddress(address); jobLog.setExecutorAddress(address);
ReturnT<String> runResult = runExecutor(triggerParam, address); ReturnT<String> runResult = runExecutor(triggerParam, address);
idleBeatResultSB.append("<br>----------------------<br>").append(runResult.getMsg()); idleBeatResultSB.append("<br><br>").append(runResult.getMsg());
return new ReturnT<String>(runResult.getCode(), idleBeatResultSB.toString()); return new ReturnT<String>(runResult.getCode(), idleBeatResultSB.toString());
} }
......
...@@ -85,8 +85,6 @@ public class ExecutorRouteConsistentHash extends ExecutorRouter { ...@@ -85,8 +85,6 @@ public class ExecutorRouteConsistentHash extends ExecutorRouter {
// run executor // run executor
ReturnT<String> runResult = runExecutor(triggerParam, address); ReturnT<String> runResult = runExecutor(triggerParam, address);
runResult.setMsg("<br>----------------------<br>" + runResult.getMsg());
return runResult; return runResult;
} }
} }
...@@ -32,7 +32,7 @@ public class ExecutorRouteFailover extends ExecutorRouter { ...@@ -32,7 +32,7 @@ public class ExecutorRouteFailover extends ExecutorRouter {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e ); beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
} }
beatResultSB.append("<br>----------------------<br>") beatResultSB.append( (beatResultSB.length()>0)?"<br><br>":"")
.append("心跳检测:") .append("心跳检测:")
.append("<br>address:").append(address) .append("<br>address:").append(address)
.append("<br>code:").append(beatResult.getCode()) .append("<br>code:").append(beatResult.getCode())
...@@ -43,7 +43,7 @@ public class ExecutorRouteFailover extends ExecutorRouter { ...@@ -43,7 +43,7 @@ public class ExecutorRouteFailover extends ExecutorRouter {
jobLog.setExecutorAddress(address); jobLog.setExecutorAddress(address);
ReturnT<String> runResult = runExecutor(triggerParam, address); ReturnT<String> runResult = runExecutor(triggerParam, address);
beatResultSB.append("<br>----------------------<br>").append(runResult.getMsg()); beatResultSB.append("<br><br>").append(runResult.getMsg());
return new ReturnT<String>(runResult.getCode(), beatResultSB.toString()); return new ReturnT<String>(runResult.getCode(), beatResultSB.toString());
} }
......
...@@ -25,8 +25,6 @@ public class ExecutorRouteFirst extends ExecutorRouter { ...@@ -25,8 +25,6 @@ public class ExecutorRouteFirst extends ExecutorRouter {
// run executor // run executor
ReturnT<String> runResult = runExecutor(triggerParam, address); ReturnT<String> runResult = runExecutor(triggerParam, address);
runResult.setMsg("<br>----------------------<br>" + runResult.getMsg());
return runResult; return runResult;
} }
} }
...@@ -65,8 +65,6 @@ public class ExecutorRouteLFU extends ExecutorRouter { ...@@ -65,8 +65,6 @@ public class ExecutorRouteLFU extends ExecutorRouter {
// run executor // run executor
ReturnT<String> runResult = runExecutor(triggerParam, address); ReturnT<String> runResult = runExecutor(triggerParam, address);
runResult.setMsg("<br>----------------------<br>" + runResult.getMsg());
return runResult; return runResult;
} }
......
...@@ -64,8 +64,6 @@ public class ExecutorRouteLRU extends ExecutorRouter { ...@@ -64,8 +64,6 @@ public class ExecutorRouteLRU extends ExecutorRouter {
// run executor // run executor
ReturnT<String> runResult = runExecutor(triggerParam, address); ReturnT<String> runResult = runExecutor(triggerParam, address);
runResult.setMsg("<br>----------------------<br>" + runResult.getMsg());
return runResult; return runResult;
} }
......
...@@ -24,8 +24,6 @@ public class ExecutorRouteLast extends ExecutorRouter { ...@@ -24,8 +24,6 @@ public class ExecutorRouteLast extends ExecutorRouter {
// run executor // run executor
ReturnT<String> runResult = runExecutor(triggerParam, address); ReturnT<String> runResult = runExecutor(triggerParam, address);
runResult.setMsg("<br>----------------------<br>" + runResult.getMsg());
return runResult; return runResult;
} }
} }
...@@ -28,8 +28,6 @@ public class ExecutorRouteRandom extends ExecutorRouter { ...@@ -28,8 +28,6 @@ public class ExecutorRouteRandom extends ExecutorRouter {
// run executor // run executor
ReturnT<String> runResult = runExecutor(triggerParam, address); ReturnT<String> runResult = runExecutor(triggerParam, address);
runResult.setMsg("<br>----------------------<br>" + runResult.getMsg());
return runResult; return runResult;
} }
......
...@@ -44,10 +44,6 @@ public class ExecutorRouteRound extends ExecutorRouter { ...@@ -44,10 +44,6 @@ public class ExecutorRouteRound extends ExecutorRouter {
// run executor // run executor
ReturnT<String> runResult = runExecutor(triggerParam, address); ReturnT<String> runResult = runExecutor(triggerParam, address);
runResult.setMsg("<br>----------------------<br>" + runResult.getMsg());
return runResult; return runResult;
} }
} }
...@@ -31,24 +31,49 @@ public class XxlJobTrigger { ...@@ -31,24 +31,49 @@ public class XxlJobTrigger {
*/ */
public static void trigger(int jobId) { public static void trigger(int jobId) {
// load job // load data
XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); // job info
XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); // group info
// log part-1 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
ExecutorFailStrategyEnum failStrategy = ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(), ExecutorFailStrategyEnum.FAIL_ALARM); // fail strategy
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();
// 1、save log-id
XxlJobLog jobLog = new XxlJobLog(); XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup()); jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId()); jobLog.setJobId(jobInfo.getId());
XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog); XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// log part-2 param // 2、prepare trigger-info
//jobLog.setExecutorAddress(executorAddress); //jobLog.setExecutorAddress(executorAddress);
jobLog.setGlueType(jobInfo.getGlueType()); jobLog.setGlueType(jobInfo.getGlueType());
jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam()); jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setTriggerTime(new Date()); jobLog.setTriggerTime(new Date());
// trigger request ReturnT<String> triggerResult = new ReturnT<String>(null);
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append("注册方式:").append( (group.getAddressType() == 0)?"自动注册":"手动录入" );
triggerMsgSb.append("<br>阻塞处理策略:").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>失败处理策略:").append(failStrategy.getTitle());
triggerMsgSb.append("<br>地址列表:").append(group.getRegistryList());
triggerMsgSb.append("<br>路由策略:").append(executorRouteStrategyEnum.getTitle());
// 3、trigger-valid
if (triggerResult.getCode()==ReturnT.SUCCESS_CODE && CollectionUtils.isEmpty(addressList)) {
triggerResult.setCode(ReturnT.FAIL_CODE);
triggerMsgSb.append("<br>----------------------<br>").append("调度失败:").append("执行器地址为空");
}
if (triggerResult.getCode() == ReturnT.SUCCESS_CODE && executorRouteStrategyEnum == null) {
triggerResult.setCode(ReturnT.FAIL_CODE);
triggerMsgSb.append("<br>----------------------<br>").append("调度失败:").append("执行器路由策略为空");
}
if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
// 4.1、trigger-param
TriggerParam triggerParam = new TriggerParam(); TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId()); triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
...@@ -60,58 +85,25 @@ public class XxlJobTrigger { ...@@ -60,58 +85,25 @@ public class XxlJobTrigger {
triggerParam.setLogId(jobLog.getId()); triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime()); triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
// do trigger // 4.2、trigger-run (route run / trigger remote executor)
ReturnT<String> triggerResult = doTrigger(triggerParam, jobInfo, jobLog); triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList, jobLog);
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>触发调度<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
// fail retry // 4.3、trigger (fail retry)
if (triggerResult.getCode()==ReturnT.FAIL_CODE && if (triggerResult.getCode()!=ReturnT.SUCCESS_CODE && failStrategy == ExecutorFailStrategyEnum.FAIL_RETRY) {
ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(), null) == ExecutorFailStrategyEnum.FAIL_RETRY) { triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList, jobLog);
ReturnT<String> retryTriggerResult = doTrigger(triggerParam, jobInfo, jobLog); triggerMsgSb.append("<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>失败重试<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
}
triggerResult.setCode(retryTriggerResult.getCode());
triggerResult.setMsg(triggerResult.getMsg() + "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>失败重试<<<<<<<<<<< </span><br><br>" +retryTriggerResult.getMsg());
} }
// log part-2 // 5、save trigger-info
jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerResult.getMsg()); jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog); XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);
// monitor triger // 6、monitor triger
JobFailMonitorHelper.monitor(jobLog.getId()); JobFailMonitorHelper.monitor(jobLog.getId());
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
} }
private static ReturnT<String> doTrigger(TriggerParam triggerParam, XxlJobInfo jobInfo, XxlJobLog jobLog){
StringBuffer triggerSb = new StringBuffer();
// exerutor address list
XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup());
triggerSb.append( (group.getAddressType() == 0)?"注册方式:自动注册":"注册方式:手动录入" );
ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();
triggerSb.append("<br>阻塞处理策略:").append(ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION).getTitle());
triggerSb.append("<br>失败处理策略:").append(ExecutorFailStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorFailStrategyEnum.FAIL_ALARM).getTitle());
triggerSb.append("<br>地址列表:").append(addressList!=null?addressList.toString():"");
if (CollectionUtils.isEmpty(addressList)) {
triggerSb.append("<br>----------------------<br>").append("调度失败:").append("执行器地址为空");
return new ReturnT<String>(ReturnT.FAIL_CODE, triggerSb.toString());
}
// executor route strategy
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);
if (executorRouteStrategyEnum == null) {
triggerSb.append("<br>----------------------<br>").append("调度失败:").append("执行器路由策略为空");
return new ReturnT<String>(ReturnT.FAIL_CODE, triggerSb.toString());
}
triggerSb.append("<br>路由策略:").append(executorRouteStrategyEnum.name() + "-" + executorRouteStrategyEnum.getTitle());
// route run / trigger remote executor
ReturnT<String> routeRunResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList, jobLog);
triggerSb.append("<br>----------------------<br>").append(routeRunResult.getMsg());
return new ReturnT<String>(routeRunResult.getCode(), triggerSb.toString());
}
} }
...@@ -11,7 +11,7 @@ xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin ...@@ -11,7 +11,7 @@ xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### xxl-job executor address ### xxl-job executor address
xxl.job.executor.appname=xxl-job-executor-example xxl.job.executor.appname=xxl-job-executor-example
xxl.job.executor.ip= xxl.job.executor.ip=
xxl.job.executor.port=9999 xxl.job.executor.port=9998
### xxl-job log path ### xxl-job log path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler/ xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler/
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论