提交 0ddef140 authored 作者: xuxueli's avatar xuxueli

调度中心参数配置逻辑重构

上级 4e0d1be6
......@@ -120,7 +120,7 @@ public class JobLogController {
@ResponseBody
public ReturnT<LogResult> logDetailCat(String executorAddress, long triggerTime, int logId, int fromLineNum){
try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, executorAddress, XxlJobDynamicScheduler.getAccessToken()).getObject();
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(executorAddress);
ReturnT<LogResult> logResult = executorBiz.log(triggerTime, logId, fromLineNum);
// is end
......@@ -154,7 +154,7 @@ public class JobLogController {
// request of kill
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, log.getExecutorAddress(), XxlJobDynamicScheduler.getAccessToken()).getObject();
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(log.getExecutorAddress());
runResult = executorBiz.kill(jobInfo.getId());
} catch (Exception e) {
logger.error(e.getMessage(), e);
......
package com.xxl.job.admin.core.route;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -25,30 +22,4 @@ public abstract class ExecutorRouter {
*/
public abstract ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList);
/**
* run executor
* @param triggerParam
* @param address
* @return ReturnT.content: final address
*/
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address, XxlJobDynamicScheduler.getAccessToken()).getObject();
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(e.getMessage(), e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
StringBuffer runResultSB = new StringBuffer("触发调度:");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
runResult.setContent(address);
return runResult;
}
}
......@@ -2,10 +2,10 @@ package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import java.util.ArrayList;
......@@ -26,7 +26,7 @@ public class ExecutorRouteBusyover extends ExecutorRouter {
// beat
ReturnT<String> idleBeatResult = null;
try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address, XxlJobDynamicScheduler.getAccessToken()).getObject();
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
idleBeatResult = executorBiz.idleBeat(triggerParam.getJobId());
} catch (Exception e) {
logger.error(e.getMessage(), e);
......@@ -41,7 +41,7 @@ public class ExecutorRouteBusyover extends ExecutorRouter {
// beat success
if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
ReturnT<String> runResult = runExecutor(triggerParam, address);
ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
idleBeatResultSB.append("<br><br>").append(runResult.getMsg());
// result
......
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
......@@ -82,7 +83,7 @@ public class ExecutorRouteConsistentHash extends ExecutorRouter {
String address = route(triggerParam.getJobId(), addressList);
// run executor
ReturnT<String> runResult = runExecutor(triggerParam, address);
ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
runResult.setContent(address);
return runResult;
}
......
......@@ -2,10 +2,10 @@ package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import java.util.ArrayList;
......@@ -26,7 +26,7 @@ public class ExecutorRouteFailover extends ExecutorRouter {
// beat
ReturnT<String> beatResult = null;
try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address, XxlJobDynamicScheduler.getAccessToken()).getObject();
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
beatResult = executorBiz.beat();
} catch (Exception e) {
logger.error(e.getMessage(), e);
......@@ -41,7 +41,7 @@ public class ExecutorRouteFailover extends ExecutorRouter {
// beat success
if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
ReturnT<String> runResult = runExecutor(triggerParam, address);
ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
beatResultSB.append("<br><br>").append(runResult.getMsg());
// result
......
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
......@@ -23,7 +23,7 @@ public class ExecutorRouteFirst extends ExecutorRouter {
String address = route(triggerParam.getJobId(), addressList);
// run executor
ReturnT<String> runResult = runExecutor(triggerParam, address);
ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
runResult.setContent(address);
return runResult;
}
......
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
......@@ -62,7 +63,7 @@ public class ExecutorRouteLFU extends ExecutorRouter {
String address = route(triggerParam.getJobId(), addressList);
// run executor
ReturnT<String> runResult = runExecutor(triggerParam, address);
ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
runResult.setContent(address);
return runResult;
}
......
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
......@@ -61,7 +62,7 @@ public class ExecutorRouteLRU extends ExecutorRouter {
String address = route(triggerParam.getJobId(), addressList);
// run executor
ReturnT<String> runResult = runExecutor(triggerParam, address);
ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
runResult.setContent(address);
return runResult;
}
......
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
......@@ -22,7 +22,7 @@ public class ExecutorRouteLast extends ExecutorRouter {
String address = route(triggerParam.getJobId(), addressList);
// run executor
ReturnT<String> runResult = runExecutor(triggerParam, address);
ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
runResult.setContent(address);
return runResult;
}
......
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
......@@ -25,7 +26,7 @@ public class ExecutorRouteRandom extends ExecutorRouter {
String address = route(triggerParam.getJobId(), addressList);
// run executor
ReturnT<String> runResult = runExecutor(triggerParam, address);
ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
runResult.setContent(address);
return runResult;
}
......
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
......@@ -41,7 +42,7 @@ public class ExecutorRouteRound extends ExecutorRouter {
String address = route(triggerParam.getJobId(), addressList);
// run executor
ReturnT<String> runResult = runExecutor(triggerParam, address);
ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
runResult.setContent(address);
return runResult;
}
......
......@@ -5,9 +5,9 @@ import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.admin.core.thread.JobFailMonitorHelper;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
......@@ -90,12 +90,12 @@ public class XxlJobTrigger {
triggerParam.setBroadcastTotal(addressList.size()); // update02
// 4.2、trigger-run (route run / trigger remote executor)
triggerResult = ExecutorRouter.runExecutor(triggerParam, address); // update03
triggerResult = runExecutor(triggerParam, address); // update03
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>触发调度<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
// 4.3、trigger (fail retry)
if (triggerResult.getCode()!=ReturnT.SUCCESS_CODE && failStrategy == ExecutorFailStrategyEnum.FAIL_RETRY) {
triggerResult = ExecutorRouter.runExecutor(triggerParam, address); // update04
triggerResult = runExecutor(triggerParam, address); // update04
triggerMsgSb.append("<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>失败重试<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
}
}
......@@ -179,4 +179,30 @@ public class XxlJobTrigger {
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
/**
* run executor
* @param triggerParam
* @param address
* @return ReturnT.content: final address
*/
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(e.getMessage(), e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
StringBuffer runResultSB = new StringBuffer("触发调度:");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
runResult.setContent(address);
return runResult;
}
}
......@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class XxlJobExecutor implements ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);
// ---------------------------------- param ------------------------------------
// ---------------------- param ----------------------
private String ip;
private int port = 9999;
private String appName;
......@@ -54,7 +54,7 @@ public class XxlJobExecutor implements ApplicationContextAware {
}
// ---------------------------------- applicationContext ------------------------------------
// ---------------------- applicationContext ----------------------
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
......@@ -65,7 +65,7 @@ public class XxlJobExecutor implements ApplicationContextAware {
}
// ---------------------------------- start + stop ------------------------------------
// ---------------------- start + stop ----------------------
public void start() throws Exception {
// init admin-client
initAdminBizList(adminAddresses, accessToken);
......@@ -95,7 +95,7 @@ public class XxlJobExecutor implements ApplicationContextAware {
}
// ---------------------------------- admin-client ------------------------------------
// ---------------------- admin-client ----------------------
private static List<AdminBiz> adminBizList;
private static void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
......@@ -116,7 +116,7 @@ public class XxlJobExecutor implements ApplicationContextAware {
}
// ---------------------------------- executor-server ------------------------------------
// ---------------------- executor-server(jetty) ----------------------
private NetComServerFactory serverFactory = new NetComServerFactory();
private void initExecutorServer(int port, String ip, String appName, String accessToken) throws Exception {
NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl()); // rpc-service, base on jetty
......@@ -128,7 +128,7 @@ public class XxlJobExecutor implements ApplicationContextAware {
}
// ---------------------------------- job handler repository ------------------------------------
// ---------------------- job handler repository ----------------------
private static ConcurrentHashMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info("xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
......@@ -156,7 +156,7 @@ public class XxlJobExecutor implements ApplicationContextAware {
}
// ---------------------------------- job thread repository ------------------------------------
// ---------------------- job thread repository ----------------------
private static ConcurrentHashMap<Integer, JobThread> JobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
JobThread newJobThread = new JobThread(jobId, handler);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论