提交 86dea7ff authored 作者: xueli.xue's avatar xueli.xue

重要重构:底层通讯模块升级优化;

上级 484b80dc
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>com.xuxueli</groupId> <groupId>com.xuxueli</groupId>
<artifactId>xxl-job</artifactId> <artifactId>xxl-job</artifactId>
<version>1.6.0</version> <version>1.6.0-SNAPSHOT</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<name>xxl-job</name> <name>xxl-job</name>
......
...@@ -4,13 +4,13 @@ ...@@ -4,13 +4,13 @@
<parent> <parent>
<groupId>com.xuxueli</groupId> <groupId>com.xuxueli</groupId>
<artifactId>xxl-job</artifactId> <artifactId>xxl-job</artifactId>
<version>1.6.0</version> <version>1.6.0-SNAPSHOT</version>
</parent> </parent>
<artifactId>xxl-job-admin</artifactId> <artifactId>xxl-job-admin</artifactId>
<packaging>war</packaging> <packaging>war</packaging>
<properties> <properties>
<xxl-job.version>1.6.0</xxl-job.version> <xxl-job.version>1.6.0-SNAPSHOT</xxl-job.version>
<spring.version>3.2.17.RELEASE</spring.version> <spring.version>3.2.17.RELEASE</spring.version>
</properties> </properties>
......
package com.xxl.job.admin.controller; package com.xxl.job.admin.controller;
import javax.servlet.http.HttpServletRequest; import com.xxl.job.admin.controller.annotation.PermessionLimit;
import javax.servlet.http.HttpServletResponse; import com.xxl.job.admin.controller.interceptor.PermissionInterceptor;
import com.xxl.job.admin.core.util.PropertiesUtil;
import com.xxl.job.core.biz.model.ReturnT;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Controller; import org.springframework.stereotype.Controller;
import org.springframework.ui.Model; import org.springframework.ui.Model;
...@@ -10,10 +11,8 @@ import org.springframework.web.bind.annotation.RequestMapping; ...@@ -10,10 +11,8 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseBody;
import com.xxl.job.admin.controller.annotation.PermessionLimit; import javax.servlet.http.HttpServletRequest;
import com.xxl.job.admin.controller.interceptor.PermissionInterceptor; import javax.servlet.http.HttpServletResponse;
import com.xxl.job.admin.core.model.ReturnT;
import com.xxl.job.admin.core.util.PropertiesUtil;
/** /**
* index controller * index controller
......
package com.xxl.job.admin.controller; package com.xxl.job.admin.controller;
import com.xxl.job.admin.core.model.ReturnT;
import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLogGlue; import com.xxl.job.admin.core.model.XxlJobLogGlue;
import com.xxl.job.admin.dao.IXxlJobInfoDao; import com.xxl.job.admin.dao.IXxlJobInfoDao;
import com.xxl.job.admin.dao.IXxlJobLogGlueDao; import com.xxl.job.admin.dao.IXxlJobLogGlueDao;
import com.xxl.job.core.biz.model.ReturnT;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Controller; import org.springframework.stereotype.Controller;
import org.springframework.ui.Model; import org.springframework.ui.Model;
......
package com.xxl.job.admin.controller; package com.xxl.job.admin.controller;
import com.xxl.job.admin.core.model.ReturnT;
import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.thread.JobRegistryHelper; import com.xxl.job.admin.core.thread.JobRegistryHelper;
import com.xxl.job.admin.dao.IXxlJobGroupDao; import com.xxl.job.admin.dao.IXxlJobGroupDao;
import com.xxl.job.admin.dao.IXxlJobInfoDao; import com.xxl.job.admin.dao.IXxlJobInfoDao;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.registry.RegistHelper; import com.xxl.job.core.registry.RegistHelper;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
......
package com.xxl.job.admin.controller; package com.xxl.job.admin.controller;
import com.xxl.job.admin.core.model.ReturnT;
import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.dao.IXxlJobGroupDao; import com.xxl.job.admin.dao.IXxlJobGroupDao;
import com.xxl.job.admin.service.IXxlJobService; import com.xxl.job.admin.service.IXxlJobService;
import com.xxl.job.core.biz.model.ReturnT;
import org.springframework.stereotype.Controller; import org.springframework.stereotype.Controller;
import org.springframework.ui.Model; import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
...@@ -49,8 +49,8 @@ public class JobInfoController { ...@@ -49,8 +49,8 @@ public class JobInfoController {
@RequestMapping("/add") @RequestMapping("/add")
@ResponseBody @ResponseBody
public ReturnT<String> add(int jobGroup, String jobCron, String jobDesc, String author, String alarmEmail, public ReturnT<String> add(int jobGroup, String jobCron, String jobDesc, String author, String alarmEmail,
String executorAppname, String executorAddress, String executorHandler, String executorParam, String executorAppname, String executorAddress, String executorHandler, String executorParam,
int glueSwitch, String glueSource, String glueRemark, String childJobKey) { int glueSwitch, String glueSource, String glueRemark, String childJobKey) {
return xxlJobService.add(jobGroup, jobCron, jobDesc, author, alarmEmail, return xxlJobService.add(jobGroup, jobCron, jobDesc, author, alarmEmail,
executorAddress, executorHandler, executorParam, executorAddress, executorHandler, executorParam,
......
package com.xxl.job.admin.controller; package com.xxl.job.admin.controller;
import com.xxl.job.admin.core.model.ReturnT;
import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.dao.IXxlJobGroupDao; import com.xxl.job.admin.dao.IXxlJobGroupDao;
import com.xxl.job.admin.dao.IXxlJobInfoDao; import com.xxl.job.admin.dao.IXxlJobInfoDao;
import com.xxl.job.admin.dao.IXxlJobLogDao; import com.xxl.job.admin.dao.IXxlJobLogDao;
import com.xxl.job.core.router.HandlerRouter.ActionRepository; import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.router.model.RequestModel; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.router.model.ResponseModel; import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import com.xxl.job.core.util.XxlJobNetCommUtil;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils; import org.apache.commons.lang.time.DateUtils;
import org.springframework.stereotype.Controller; import org.springframework.stereotype.Controller;
...@@ -99,22 +97,24 @@ public class JobLogController { ...@@ -99,22 +97,24 @@ public class JobLogController {
if (log == null) { if (log == null) {
return new ReturnT<String>(500, "查看执行日志失败: 参数异常"); return new ReturnT<String>(500, "查看执行日志失败: 参数异常");
} }
if (!(ResponseModel.SUCCESS.equals(log.getTriggerStatus()) || StringUtils.isNotBlank(log.getHandleStatus()))) { if (!((ReturnT.SUCCESS_CODE+"").equals(log.getTriggerStatus()) || StringUtils.isNotBlank(log.getHandleStatus()))) {
return new ReturnT<String>(500, "查看执行日志失败: 任务发起调度失败,无法查看执行日志"); return new ReturnT<String>(500, "查看执行日志失败: 任务发起调度失败,无法查看执行日志");
} }
// trigger id, trigger time // trigger id, trigger time
RequestModel requestModel = new RequestModel(); ExecutorBiz executorBiz = null;
requestModel.setTimestamp(System.currentTimeMillis()); try {
requestModel.setAction(ActionRepository.LOG.name()); executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, log.getExecutorAddress()).getObject();
requestModel.setLogId(id); } catch (Exception e) {
requestModel.setLogDateTim(log.getTriggerTime().getTime()); e.printStackTrace();
return new ReturnT<String>(500, e.getMessage());
}
ReturnT<String> logResult = executorBiz.log(log.getTriggerTime().getTime(), id);
ResponseModel responseModel = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(log.getExecutorAddress()), requestModel); if (ReturnT.SUCCESS_CODE == logResult.getCode()) {
if (ResponseModel.SUCCESS.equals(responseModel.getStatus())) { return new ReturnT<String>(logResult.getMsg());
return new ReturnT<String>(responseModel.getMsg());
} else { } else {
return new ReturnT<String>(500, "查看执行日志失败: " + responseModel.getMsg()); return new ReturnT<String>(500, "查看执行日志失败: " + logResult.getMsg());
} }
} }
...@@ -134,26 +134,28 @@ public class JobLogController { ...@@ -134,26 +134,28 @@ public class JobLogController {
if (log == null || jobInfo==null) { if (log == null || jobInfo==null) {
return new ReturnT<String>(500, "参数异常"); return new ReturnT<String>(500, "参数异常");
} }
if (!ResponseModel.SUCCESS.equals(log.getTriggerStatus())) { if (!(ReturnT.SUCCESS_CODE +"").equals(log.getTriggerStatus())) {
return new ReturnT<String>(500, "调度失败,无法终止日志"); return new ReturnT<String>(500, "调度失败,无法终止日志");
} }
// request of kill // request of kill
RequestModel requestModel = new RequestModel(); ExecutorBiz executorBiz = null;
requestModel.setTimestamp(System.currentTimeMillis()); try {
requestModel.setAction(ActionRepository.KILL.name()); executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, log.getExecutorAddress()).getObject();
requestModel.setJobGroup(String.valueOf(log.getJobGroup())); } catch (Exception e) {
requestModel.setJobName(log.getJobName()); e.printStackTrace();
return new ReturnT<String>(500, e.getMessage());
}
ReturnT<String> runResult = executorBiz.kill(String.valueOf(log.getJobGroup()), log.getJobName());
ResponseModel responseModel = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(log.getExecutorAddress()), requestModel); if (ReturnT.SUCCESS_CODE == runResult.getCode()) {
if (ResponseModel.SUCCESS.equals(responseModel.getStatus())) { log.setHandleStatus(ReturnT.SUCCESS_CODE+"");
log.setHandleStatus(ResponseModel.FAIL);
log.setHandleMsg("人为操作主动终止"); log.setHandleMsg("人为操作主动终止");
log.setHandleTime(new Date()); log.setHandleTime(new Date());
xxlJobLogDao.updateHandleInfo(log); xxlJobLogDao.updateHandleInfo(log);
return new ReturnT<String>(responseModel.getMsg()); return new ReturnT<String>(runResult.getMsg());
} else { } else {
return new ReturnT<String>(500, responseModel.getMsg()); return new ReturnT<String>(500, runResult.getMsg());
} }
} }
} }
package com.xxl.job.admin.controller.resolver; package com.xxl.job.admin.controller.resolver;
import com.xxl.job.admin.core.model.ReturnT; import com.xxl.job.admin.core.util.JacksonUtil;
import com.xxl.job.core.util.JacksonUtil; import com.xxl.job.core.biz.model.ReturnT;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseBody;
......
package com.xxl.job.admin.core.biz;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.schedule.DynamicSchedulerUtil;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import org.apache.commons.lang.StringUtils;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.Date;
/**
* Created by xuxueli on 17/3/1.
*/
public class AdminBizImpl implements AdminBiz {
private static Logger logger = LoggerFactory.getLogger(AdminBizImpl.class);
@Override
public ReturnT<String> callback(TriggerParam triggerParam) {
// valid log item
XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(triggerParam.getLogId());
if (log == null) {
return new ReturnT(ReturnT.FAIL_CODE, "log item not found.");
}
// trigger success, to trigger child job, and avoid repeat trigger child job
String childTriggerMsg = null;
if ((ReturnT.SUCCESS_CODE+"").equals(triggerParam.getStatus()) && !(ReturnT.SUCCESS_CODE+"").equals(log.getHandleStatus())) {
XxlJobInfo xxlJobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(log.getJobGroup(), log.getJobName());
if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) {
childTriggerMsg = "<hr>";
String[] childJobKeys = xxlJobInfo.getChildJobKey().split(",");
for (int i = 0; i < childJobKeys.length; i++) {
String[] jobKeyArr = childJobKeys[i].split("_");
if (jobKeyArr!=null && jobKeyArr.length==2) {
XxlJobInfo childJobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(Integer.valueOf(jobKeyArr[0]), jobKeyArr[1]);
if (childJobInfo!=null) {
try {
boolean ret = DynamicSchedulerUtil.triggerJob(childJobInfo.getJobName(), String.valueOf(childJobInfo.getJobGroup()));
// add msg
childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务成功, 子任务Key: {2}, status: {3}, 子任务描述: {4}",
(i+1), childJobKeys.length, childJobKeys[i], ret, childJobInfo.getJobDesc());
} catch (SchedulerException e) {
logger.error("", e);
}
} else {
childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务失败, 子任务xxlJobInfo不存在, 子任务Key: {2}",
(i+1), childJobKeys.length, childJobKeys[i]);
}
} else {
childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务失败, 子任务Key格式错误, 子任务Key: {2}",
(i+1), childJobKeys.length, childJobKeys[i]);
}
}
}
}
// handle msg
StringBuffer handleMsg = new StringBuffer();
if (triggerParam.getMsg() != null) {
handleMsg.append("执行备注:").append(triggerParam.getMsg());
}
if (childTriggerMsg !=null) {
handleMsg.append("<br>子任务触发备注:").append(childTriggerMsg);
}
// success, save log
log.setHandleTime(new Date());
log.setHandleStatus(triggerParam.getStatus());
log.setHandleMsg(handleMsg.toString());
DynamicSchedulerUtil.xxlJobLogDao.updateHandleInfo(log);
return new ReturnT(ReturnT.SUCCESS_CODE, null);
}
}
package com.xxl.job.admin.core.callback;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by xuxueli on 2016-5-22 11:15:42
*/
public class XxlJobLogCallbackServer {
private static final Logger logger = LoggerFactory.getLogger(XxlJobLogCallbackServer.class);
private Server server = null;
public void start(int callBackPort) throws Exception {
final int port = Integer.valueOf(callBackPort);
new Thread(new Runnable() {
@Override
public void run() {
server = new Server();
server.setThreadPool(new ExecutorThreadPool(200, 200, 30000)); // 非阻塞
// connector
SelectChannelConnector connector = new SelectChannelConnector();
connector.setPort(port);
connector.setMaxIdleTime(30000);
server.setConnectors(new Connector[] { connector });
// handler
HandlerCollection handlerc =new HandlerCollection();
handlerc.setHandlers(new Handler[]{new XxlJobLogCallbackServerHandler()});
server.setHandler(handlerc);
try {
server.start();
logger.info(">>>>>>>>>>>> xxl-job XxlJobCallbackServer start success at port:{}.", port);
server.join(); // block until server ready
logger.info(">>>>>>>>>>>> xxl-job XxlJobCallbackServer join success at port:{}.", port);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
public void destroy() {
if (server!=null) {
try {
server.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
package com.xxl.job.admin.core.callback;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.util.DynamicSchedulerUtil;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import com.xxl.job.core.util.XxlJobNetCommUtil;
import org.apache.commons.lang.StringUtils;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Date;
/**
* Created by xuxueli on 2016-5-22 11:15:42
*/
public class XxlJobLogCallbackServerHandler extends AbstractHandler {
private static Logger logger = LoggerFactory.getLogger(XxlJobLogCallbackServerHandler.class);
@Override
public void handle(String s, Request baseRequest, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException, ServletException {
httpServletRequest.setCharacterEncoding("UTF-8");
httpServletResponse.setCharacterEncoding("UTF-8");
// parse hex-json to request model
String requestHex = httpServletRequest.getParameter(XxlJobNetCommUtil.HEX);
// do biz
ResponseModel responseModel = dobiz(requestHex);
// format response model to hex-json
String responseHex = XxlJobNetCommUtil.formatObj2HexJson(responseModel);
// response
httpServletResponse.setContentType("text/html;charset=utf-8");
httpServletResponse.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(true);
httpServletResponse.getWriter().println(responseHex);
}
private ResponseModel dobiz(String requestHex){
// valid hex
if (requestHex==null || requestHex.trim().length()==0) {
return new ResponseModel(ResponseModel.FAIL, "request hex is null.");
}
// valid request model
RequestModel requestModel = XxlJobNetCommUtil.parseHexJson2Obj(requestHex, RequestModel.class);
if (requestModel==null) {
return new ResponseModel(ResponseModel.FAIL, "request hex parse fail.");
}
// valid log item
XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(requestModel.getLogId());
if (log == null) {
return new ResponseModel(ResponseModel.FAIL, "log item not found.");
}
// trigger success, to trigger child job, and avoid repeat trigger child job
String childTriggerMsg = null;
if (ResponseModel.SUCCESS.equals(requestModel.getStatus()) && !ResponseModel.SUCCESS.equals(log.getHandleStatus())) {
XxlJobInfo xxlJobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(log.getJobGroup(), log.getJobName());
if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) {
childTriggerMsg = "<hr>";
String[] childJobKeys = xxlJobInfo.getChildJobKey().split(",");
for (int i = 0; i < childJobKeys.length; i++) {
String[] jobKeyArr = childJobKeys[i].split("_");
if (jobKeyArr!=null && jobKeyArr.length==2) {
XxlJobInfo childJobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(Integer.valueOf(jobKeyArr[0]), jobKeyArr[1]);
if (childJobInfo!=null) {
try {
boolean ret = DynamicSchedulerUtil.triggerJob(childJobInfo.getJobName(), String.valueOf(childJobInfo.getJobGroup()));
// add msg
childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务成功, 子任务Key: {2}, status: {3}, 子任务描述: {4}",
(i+1), childJobKeys.length, childJobKeys[i], ret, childJobInfo.getJobDesc());
} catch (SchedulerException e) {
logger.error("", e);
}
} else {
childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务失败, 子任务xxlJobInfo不存在, 子任务Key: {2}",
(i+1), childJobKeys.length, childJobKeys[i]);
}
} else {
childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务失败, 子任务Key格式错误, 子任务Key: {2}",
(i+1), childJobKeys.length, childJobKeys[i]);
}
}
}
}
// handle msg
StringBuffer handleMsg = new StringBuffer();
if (requestModel.getMsg() != null) {
handleMsg.append("执行备注:").append(requestModel.getMsg());
}
if (childTriggerMsg !=null) {
handleMsg.append("<br>子任务触发备注:").append(childTriggerMsg);
}
// success, save log
log.setHandleTime(new Date());
log.setHandleStatus(requestModel.getStatus());
log.setHandleMsg(handleMsg.toString());
DynamicSchedulerUtil.xxlJobLogDao.updateHandleInfo(log);
return new ResponseModel(ResponseModel.SUCCESS, null);
}
}
...@@ -5,12 +5,12 @@ import com.xxl.job.admin.core.model.XxlJobInfo; ...@@ -5,12 +5,12 @@ import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.thread.JobMonitorHelper; import com.xxl.job.admin.core.thread.JobMonitorHelper;
import com.xxl.job.admin.core.thread.JobRegistryHelper; import com.xxl.job.admin.core.thread.JobRegistryHelper;
import com.xxl.job.admin.core.util.DynamicSchedulerUtil; import com.xxl.job.admin.core.schedule.DynamicSchedulerUtil;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.registry.RegistHelper; import com.xxl.job.core.registry.RegistHelper;
import com.xxl.job.core.router.HandlerRouter.ActionRepository; import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import com.xxl.job.core.util.XxlJobNetCommUtil;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException; import org.quartz.JobExecutionException;
...@@ -56,17 +56,15 @@ public class RemoteHttpJobBean extends QuartzJobBean { ...@@ -56,17 +56,15 @@ public class RemoteHttpJobBean extends QuartzJobBean {
jobLog.setTriggerTime(new Date()); jobLog.setTriggerTime(new Date());
// trigger request // trigger request
RequestModel requestModel = new RequestModel(); TriggerParam triggerParam = new TriggerParam();
requestModel.setTimestamp(System.currentTimeMillis()); triggerParam.setJobGroup(String.valueOf(jobInfo.getJobGroup()));
requestModel.setAction(ActionRepository.RUN.name()); triggerParam.setJobName(jobInfo.getJobName());
requestModel.setJobGroup(String.valueOf(jobInfo.getJobGroup())); triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
requestModel.setJobName(jobInfo.getJobName()); triggerParam.setExecutorParams(jobInfo.getExecutorParam());
requestModel.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setGlueSwitch((jobInfo.getGlueSwitch()==0)?false:true);
requestModel.setExecutorParams(jobInfo.getExecutorParam()); triggerParam.setLogAddress(adminAddressSet);
requestModel.setGlueSwitch((jobInfo.getGlueSwitch()==0)?false:true); triggerParam.setLogId(jobLog.getId());
requestModel.setLogAddress(adminAddressSet); triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
requestModel.setLogId(jobLog.getId());
requestModel.setLogDateTim(jobLog.getTriggerTime().getTime());
// parse address // parse address
List<String> addressList = new ArrayList<String>(); List<String> addressList = new ArrayList<String>();
...@@ -76,13 +74,13 @@ public class RemoteHttpJobBean extends QuartzJobBean { ...@@ -76,13 +74,13 @@ public class RemoteHttpJobBean extends QuartzJobBean {
} }
// failover trigger // failover trigger
ResponseModel responseModel = failoverTrigger(addressList, requestModel, jobLog); ReturnT<String> responseModel = failoverTrigger(addressList, triggerParam, jobLog);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam()); jobLog.setExecutorParam(jobInfo.getExecutorParam());
logger.info(">>>>>>>>>>> xxl-job failoverTrigger response, jobId:{}, responseModel:{}", jobLog.getId(), responseModel.toString()); logger.info(">>>>>>>>>>> xxl-job failoverTrigger response, jobId:{}, responseModel:{}", jobLog.getId(), responseModel.toString());
// update trigger info 2/2 // update trigger info 2/2
jobLog.setTriggerStatus(responseModel.getStatus()); jobLog.setTriggerStatus(responseModel.getCode()+"");
jobLog.setTriggerMsg(responseModel.getMsg()); jobLog.setTriggerMsg(responseModel.getMsg());
DynamicSchedulerUtil.xxlJobLogDao.updateTriggerInfo(jobLog); DynamicSchedulerUtil.xxlJobLogDao.updateTriggerInfo(jobLog);
...@@ -97,21 +95,28 @@ public class RemoteHttpJobBean extends QuartzJobBean { ...@@ -97,21 +95,28 @@ public class RemoteHttpJobBean extends QuartzJobBean {
* failover for trigger remote address * failover for trigger remote address
* @return * @return
*/ */
public ResponseModel failoverTrigger(List<String> addressList, RequestModel requestModel, XxlJobLog jobLog){ public ReturnT<String> failoverTrigger(List<String> addressList, TriggerParam triggerParam, XxlJobLog jobLog){
if (addressList==null || addressList.size() < 1) { if (addressList==null || addressList.size() < 1) {
ResponseModel result = new ResponseModel(); return new ReturnT<String>(ReturnT.FAIL_CODE, "Trigger error, <br>>>>[address] is null <br><hr>");
result.setStatus(ResponseModel.FAIL);
result.setMsg( "Trigger error, <br>>>>[address] is null <br><hr>" );
return result;
} else if (addressList.size() == 1) { } else if (addressList.size() == 1) {
String address = addressList.get(0); String address = addressList.get(0);
// store real address // store real address
jobLog.setExecutorAddress(address); jobLog.setExecutorAddress(address);
ResponseModel triggerCallback = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), requestModel); // real trigger
String failoverMessage = MessageFormat.format("Trigger running, <br>>>>[address] : {0}, <br>>>>[status] : {1}, <br>>>>[msg] : {2} <br><hr>", address, triggerCallback.getStatus(), triggerCallback.getMsg()); ExecutorBiz executorBiz = null;
triggerCallback.setMsg(failoverMessage); try {
return triggerCallback; executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
ReturnT<String> runResult = executorBiz.run(triggerParam);
String failoverMessage = MessageFormat.format("Trigger running, <br>>>>[address] : {0}, <br>>>>[code] : {1}, <br>>>>[msg] : {2} <br><hr>",
address, runResult.getCode(), runResult.getMsg());
runResult.setMsg(runResult.getMsg() + failoverMessage);
return runResult;
} else { } else {
// for ha // for ha
...@@ -122,32 +127,38 @@ public class RemoteHttpJobBean extends QuartzJobBean { ...@@ -122,32 +127,38 @@ public class RemoteHttpJobBean extends QuartzJobBean {
for (String address : addressList) { for (String address : addressList) {
if (StringUtils.isNotBlank(address)) { if (StringUtils.isNotBlank(address)) {
// beat check
RequestModel beatRequest = new RequestModel(); ExecutorBiz executorBiz = null;
beatRequest.setTimestamp(System.currentTimeMillis()); try {
beatRequest.setAction(ActionRepository.BEAT.name()); executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject();
ResponseModel beatResult = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), beatRequest); } catch (Exception e) {
failoverMessage += MessageFormat.format("BEAT running, <br>>>>[address] : {0}, <br>>>>[status] : {1}, <br>>>>[msg] : {2} <br><hr>", address, beatResult.getStatus(), beatResult.getMsg()); e.printStackTrace();
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
// beat check
ReturnT<String> beatResult = executorBiz.beat();
failoverMessage += MessageFormat.format("BEAT running, <br>>>>[address] : {0}, <br>>>>[code] : {1}, <br>>>>[msg] : {2} <br><hr>",
address, beatResult.getCode(), beatResult.getMsg());
// beat success, trigger do // beat success, trigger do
if (beatResult.SUCCESS.equals(beatResult.getStatus())) { if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
// store real address // store real address
jobLog.setExecutorAddress(address); jobLog.setExecutorAddress(address);
// real trigger // real trigger
ResponseModel triggerCallback = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), requestModel); ReturnT<String> runResult = executorBiz.run(triggerParam);
failoverMessage += MessageFormat.format("Trigger running, <br>>>>[address] : {0}, <br>>>>[status] : {1}, <br>>>>[msg] : {2} <br><hr>", address, triggerCallback.getStatus(), triggerCallback.getMsg());
triggerCallback.setMsg(failoverMessage); failoverMessage += MessageFormat.format("Trigger running, <br>>>>[address] : {0}, <br>>>>[status] : {1}, <br>>>>[msg] : {2} <br><hr>",
return triggerCallback; address, runResult.getCode(), runResult.getMsg());
runResult.setMsg( runResult.getMsg() + failoverMessage);
return runResult;
} }
} }
} }
ResponseModel result = new ResponseModel(); return new ReturnT<String>(ReturnT.FAIL_CODE, failoverMessage);
result.setStatus(ResponseModel.FAIL);
result.setMsg(failoverMessage);
return result;
} }
} }
......
package com.xxl.job.admin.core.util; package com.xxl.job.admin.core.schedule;
import com.xxl.job.admin.core.callback.XxlJobLogCallbackServer; import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean;
import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean; import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.thread.JobRegistryHelper;
import com.xxl.job.admin.core.thread.JobRegistryHelper; import com.xxl.job.admin.dao.IXxlJobGroupDao;
import com.xxl.job.admin.dao.IXxlJobGroupDao; import com.xxl.job.admin.dao.IXxlJobInfoDao;
import com.xxl.job.admin.dao.IXxlJobInfoDao; import com.xxl.job.admin.dao.IXxlJobLogDao;
import com.xxl.job.admin.dao.IXxlJobLogDao; import com.xxl.job.admin.dao.IXxlJobRegistryDao;
import com.xxl.job.admin.dao.IXxlJobRegistryDao; import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.util.IpUtil; import com.xxl.job.admin.core.biz.AdminBizImpl;
import org.quartz.*; import com.xxl.job.core.rpc.netcom.NetComServerFactory;
import org.quartz.Trigger.TriggerState; import com.xxl.job.core.util.IpUtil;
import org.quartz.impl.matchers.GroupMatcher; import org.quartz.*;
import org.quartz.impl.triggers.CronTriggerImpl; import org.quartz.Trigger.TriggerState;
import org.slf4j.Logger; import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.LoggerFactory; import org.quartz.impl.triggers.CronTriggerImpl;
import org.springframework.beans.BeansException; import org.slf4j.Logger;
import org.springframework.beans.factory.InitializingBean; import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext; import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContextAware; import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import java.util.*; import org.springframework.util.Assert;
/** import java.util.*;
* base quartz scheduler util
* @author xuxueli 2015-12-19 16:13:53 /**
*/ * base quartz scheduler util
public final class DynamicSchedulerUtil implements ApplicationContextAware, InitializingBean { * @author xuxueli 2015-12-19 16:13:53
private static final Logger logger = LoggerFactory.getLogger(DynamicSchedulerUtil.class); */
public final class DynamicSchedulerUtil implements ApplicationContextAware, InitializingBean {
// Scheduler private static final Logger logger = LoggerFactory.getLogger(DynamicSchedulerUtil.class);
private static Scheduler scheduler;
public static void setScheduler(Scheduler scheduler) { // Scheduler
DynamicSchedulerUtil.scheduler = scheduler; private static Scheduler scheduler;
} public static void setScheduler(Scheduler scheduler) {
DynamicSchedulerUtil.scheduler = scheduler;
// trigger callback address }
private String callBackIp;
private int callBackPort = 8888; // trigger callback address
private static String callbackAddress; private String callBackIp;
private int callBackPort = 8888;
public void setCallBackIp(String callBackIp) { private static String callbackAddress;
this.callBackIp = callBackIp;
} public void setCallBackIp(String callBackIp) {
public void setCallBackPort(int callBackPort) { this.callBackIp = callBackIp;
this.callBackPort = callBackPort; }
} public void setCallBackPort(int callBackPort) {
public static String getCallbackAddress(){ this.callBackPort = callBackPort;
return callbackAddress; }
} public static String getCallbackAddress(){
return callbackAddress;
// init }
XxlJobLogCallbackServer xxlJobLogCallbackServer = null;
public void init(){ // init
try { private NetComServerFactory serverFactory = new NetComServerFactory();
// start callback server public void init() throws Exception {
xxlJobLogCallbackServer = new XxlJobLogCallbackServer(); // server
xxlJobLogCallbackServer.start(callBackPort); NetComServerFactory.putService(AdminBiz.class, new AdminBizImpl());
} catch (Exception e) { serverFactory.start(callBackPort, callBackIp, null, null);
e.printStackTrace();
} // init callbackAddress
if (callBackIp!=null && callBackIp.trim().length()>0) {
// init callbackAddress callbackAddress = callBackIp.trim().concat(":").concat(String.valueOf(callBackPort));
if (callBackIp!=null && callBackIp.trim().length()>0) { } else {
callbackAddress = callBackIp.trim().concat(":").concat(String.valueOf(callBackPort)); callbackAddress = IpUtil.getIpPort(callBackPort);;
} else { }
callbackAddress = IpUtil.getIpPort(callBackPort);;
} // init JobRegistryHelper
JobRegistryHelper.discover("g", "k");
// init JobRegistryHelper }
JobRegistryHelper.discover("g", "k");
} // destroy
public void destroy(){
// destroy serverFactory.destroy();
public void destroy(){ }
if (xxlJobLogCallbackServer!=null) {
xxlJobLogCallbackServer.destroy(); // xxlJobLogDao、xxlJobInfoDao
} public static IXxlJobLogDao xxlJobLogDao;
} public static IXxlJobInfoDao xxlJobInfoDao;
public static IXxlJobRegistryDao xxlJobRegistryDao;
// xxlJobLogDao、xxlJobInfoDao public static IXxlJobGroupDao xxlJobGroupDao;
public static IXxlJobLogDao xxlJobLogDao;
public static IXxlJobInfoDao xxlJobInfoDao; @Override
public static IXxlJobRegistryDao xxlJobRegistryDao; public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
public static IXxlJobGroupDao xxlJobGroupDao; DynamicSchedulerUtil.xxlJobLogDao = applicationContext.getBean(IXxlJobLogDao.class);
DynamicSchedulerUtil.xxlJobInfoDao = applicationContext.getBean(IXxlJobInfoDao.class);
@Override DynamicSchedulerUtil.xxlJobRegistryDao = applicationContext.getBean(IXxlJobRegistryDao.class);
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { DynamicSchedulerUtil.xxlJobGroupDao = applicationContext.getBean(IXxlJobGroupDao.class);
DynamicSchedulerUtil.xxlJobLogDao = applicationContext.getBean(IXxlJobLogDao.class); }
DynamicSchedulerUtil.xxlJobInfoDao = applicationContext.getBean(IXxlJobInfoDao.class);
DynamicSchedulerUtil.xxlJobRegistryDao = applicationContext.getBean(IXxlJobRegistryDao.class); @Override
DynamicSchedulerUtil.xxlJobGroupDao = applicationContext.getBean(IXxlJobGroupDao.class); public void afterPropertiesSet() throws Exception {
} Assert.notNull(scheduler, "quartz scheduler is null");
logger.info(">>>>>>>>> init quartz scheduler success.[{}]", scheduler);
@Override
public void afterPropertiesSet() throws Exception { }
Assert.notNull(scheduler, "quartz scheduler is null");
logger.info(">>>>>>>>> init quartz scheduler success.[{}]", scheduler); // getJobKeys
@Deprecated
} public static List<Map<String, Object>> getJobList(){
List<Map<String, Object>> jobList = new ArrayList<Map<String,Object>>();
// getJobKeys
@Deprecated try {
public static List<Map<String, Object>> getJobList(){ if (scheduler.getJobGroupNames()==null || scheduler.getJobGroupNames().size()==0) {
List<Map<String, Object>> jobList = new ArrayList<Map<String,Object>>(); return null;
}
try { String groupName = scheduler.getJobGroupNames().get(0);
if (scheduler.getJobGroupNames()==null || scheduler.getJobGroupNames().size()==0) { Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName));
return null; if (jobKeys!=null && jobKeys.size()>0) {
} for (JobKey jobKey : jobKeys) {
String groupName = scheduler.getJobGroupNames().get(0); TriggerKey triggerKey = TriggerKey.triggerKey(jobKey.getName(), Scheduler.DEFAULT_GROUP);
Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName)); Trigger trigger = scheduler.getTrigger(triggerKey);
if (jobKeys!=null && jobKeys.size()>0) { JobDetail jobDetail = scheduler.getJobDetail(jobKey);
for (JobKey jobKey : jobKeys) { TriggerState triggerState = scheduler.getTriggerState(triggerKey);
TriggerKey triggerKey = TriggerKey.triggerKey(jobKey.getName(), Scheduler.DEFAULT_GROUP); Map<String, Object> jobMap = new HashMap<String, Object>();
Trigger trigger = scheduler.getTrigger(triggerKey); jobMap.put("TriggerKey", triggerKey);
JobDetail jobDetail = scheduler.getJobDetail(jobKey); jobMap.put("Trigger", trigger);
TriggerState triggerState = scheduler.getTriggerState(triggerKey); jobMap.put("JobDetail", jobDetail);
Map<String, Object> jobMap = new HashMap<String, Object>(); jobMap.put("TriggerState", triggerState);
jobMap.put("TriggerKey", triggerKey); jobList.add(jobMap);
jobMap.put("Trigger", trigger); }
jobMap.put("JobDetail", jobDetail); }
jobMap.put("TriggerState", triggerState);
jobList.add(jobMap); } catch (SchedulerException e) {
} e.printStackTrace();
} return null;
}
} catch (SchedulerException e) { return jobList;
e.printStackTrace(); }
return null;
} // fill job info
return jobList; public static void fillJobInfo(XxlJobInfo jobInfo) {
} // TriggerKey : name + group
TriggerKey triggerKey = TriggerKey.triggerKey(jobInfo.getJobName(), String.valueOf(jobInfo.getJobGroup()));
// fill job info
public static void fillJobInfo(XxlJobInfo jobInfo) { try {
// TriggerKey : name + group Trigger trigger = scheduler.getTrigger(triggerKey);
TriggerKey triggerKey = TriggerKey.triggerKey(jobInfo.getJobName(), String.valueOf(jobInfo.getJobGroup()));
TriggerState triggerState = scheduler.getTriggerState(triggerKey);
try {
Trigger trigger = scheduler.getTrigger(triggerKey); // parse params
if (trigger!=null && trigger instanceof CronTriggerImpl) {
TriggerState triggerState = scheduler.getTriggerState(triggerKey); String cronExpression = ((CronTriggerImpl) trigger).getCronExpression();
jobInfo.setJobCron(cronExpression);
// parse params }
if (trigger!=null && trigger instanceof CronTriggerImpl) {
String cronExpression = ((CronTriggerImpl) trigger).getCronExpression(); //JobKey jobKey = new JobKey(jobInfo.getJobName(), String.valueOf(jobInfo.getJobGroup()));
jobInfo.setJobCron(cronExpression); //JobDetail jobDetail = scheduler.getJobDetail(jobKey);
} //String jobClass = jobDetail.getJobClass().getName();
//JobKey jobKey = new JobKey(jobInfo.getJobName(), String.valueOf(jobInfo.getJobGroup())); if (triggerState!=null) {
//JobDetail jobDetail = scheduler.getJobDetail(jobKey); jobInfo.setJobStatus(triggerState.name());
//String jobClass = jobDetail.getJobClass().getName(); }
if (triggerState!=null) { } catch (SchedulerException e) {
jobInfo.setJobStatus(triggerState.name()); e.printStackTrace();
} }
}
} catch (SchedulerException e) {
e.printStackTrace(); // check if exists
} public static boolean checkExists(String jobName, String jobGroup) throws SchedulerException{
} TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
return scheduler.checkExists(triggerKey);
// check if exists }
public static boolean checkExists(String jobName, String jobGroup) throws SchedulerException{
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); // addJob 新增
return scheduler.checkExists(triggerKey); @SuppressWarnings("unchecked")
} public static boolean addJob(String jobGroup, String jobName, String cronExpression) throws SchedulerException {
// TriggerKey : name + group
// addJob 新增 TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
@SuppressWarnings("unchecked") JobKey jobKey = new JobKey(jobName, jobGroup);
public static boolean addJob(String jobGroup, String jobName, String cronExpression) throws SchedulerException {
// TriggerKey : name + group // TriggerKey valid if_exists
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); if (checkExists(jobName, jobGroup)) {
JobKey jobKey = new JobKey(jobName, jobGroup); logger.info(">>>>>>>>> addJob fail, job already exist, jobGroup:{}, jobName:{}", jobGroup, jobName);
return false;
// TriggerKey valid if_exists }
if (checkExists(jobName, jobGroup)) {
logger.info(">>>>>>>>> addJob fail, job already exist, jobGroup:{}, jobName:{}", jobGroup, jobName); // CronTrigger : TriggerKey + cronExpression // withMisfireHandlingInstructionDoNothing 忽略掉调度终止过程中忽略的调度
return false; CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing();
} CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();
// CronTrigger : TriggerKey + cronExpression // withMisfireHandlingInstructionDoNothing 忽略掉调度终止过程中忽略的调度 // JobDetail : jobClass
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing(); Class<? extends Job> jobClass_ = RemoteHttpJobBean.class; // Class.forName(jobInfo.getJobClass());
CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();
JobDetail jobDetail = JobBuilder.newJob(jobClass_).withIdentity(jobKey).build();
// JobDetail : jobClass /*if (jobInfo.getJobData()!=null) {
Class<? extends Job> jobClass_ = RemoteHttpJobBean.class; // Class.forName(jobInfo.getJobClass()); JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap.putAll(JacksonUtil.readValue(jobInfo.getJobData(), Map.class));
JobDetail jobDetail = JobBuilder.newJob(jobClass_).withIdentity(jobKey).build(); // JobExecutionContext context.getMergedJobDataMap().get("mailGuid");
/*if (jobInfo.getJobData()!=null) { }*/
JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap.putAll(JacksonUtil.readValue(jobInfo.getJobData(), Map.class)); // schedule : jobDetail + cronTrigger
// JobExecutionContext context.getMergedJobDataMap().get("mailGuid"); Date date = scheduler.scheduleJob(jobDetail, cronTrigger);
}*/
logger.info(">>>>>>>>>>> addJob success, jobDetail:{}, cronTrigger:{}, date:{}", jobDetail, cronTrigger, date);
// schedule : jobDetail + cronTrigger return true;
Date date = scheduler.scheduleJob(jobDetail, cronTrigger); }
logger.info(">>>>>>>>>>> addJob success, jobDetail:{}, cronTrigger:{}, date:{}", jobDetail, cronTrigger, date); // reschedule
return true; public static boolean rescheduleJob(String jobGroup, String jobName, String cronExpression) throws SchedulerException {
}
// TriggerKey valid if_exists
// reschedule if (!checkExists(jobName, jobGroup)) {
public static boolean rescheduleJob(String jobGroup, String jobName, String cronExpression) throws SchedulerException { logger.info(">>>>>>>>>>> rescheduleJob fail, job not exists, JobGroup:{}, JobName:{}", jobGroup, jobName);
return false;
// TriggerKey valid if_exists }
if (!checkExists(jobName, jobGroup)) {
logger.info(">>>>>>>>>>> rescheduleJob fail, job not exists, JobGroup:{}, JobName:{}", jobGroup, jobName); // TriggerKey : name + group
return false; TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
} JobKey jobKey = new JobKey(jobName, jobGroup);
// TriggerKey : name + group // CronTrigger : TriggerKey + cronExpression
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing();
JobKey jobKey = new JobKey(jobName, jobGroup); CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();
// CronTrigger : TriggerKey + cronExpression //scheduler.rescheduleJob(triggerKey, cronTrigger);
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing();
CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build(); // JobDetail-JobDataMap fresh
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
//scheduler.rescheduleJob(triggerKey, cronTrigger); /*JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap.clear();
// JobDetail-JobDataMap fresh jobDataMap.putAll(JacksonUtil.readValue(jobInfo.getJobData(), Map.class));*/
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
/*JobDataMap jobDataMap = jobDetail.getJobDataMap(); // Trigger fresh
jobDataMap.clear(); HashSet<Trigger> triggerSet = new HashSet<Trigger>();
jobDataMap.putAll(JacksonUtil.readValue(jobInfo.getJobData(), Map.class));*/ triggerSet.add(cronTrigger);
// Trigger fresh scheduler.scheduleJob(jobDetail, triggerSet, true);
HashSet<Trigger> triggerSet = new HashSet<Trigger>(); logger.info(">>>>>>>>>>> resumeJob success, JobGroup:{}, JobName:{}", jobGroup, jobName);
triggerSet.add(cronTrigger); return true;
}
scheduler.scheduleJob(jobDetail, triggerSet, true);
logger.info(">>>>>>>>>>> resumeJob success, JobGroup:{}, JobName:{}", jobGroup, jobName); // unscheduleJob
return true; public static boolean removeJob(String jobName, String jobGroup) throws SchedulerException {
} // TriggerKey : name + group
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
// unscheduleJob boolean result = false;
public static boolean removeJob(String jobName, String jobGroup) throws SchedulerException { if (checkExists(jobName, jobGroup)) {
// TriggerKey : name + group result = scheduler.unscheduleJob(triggerKey);
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); logger.info(">>>>>>>>>>> removeJob, triggerKey:{}, result [{}]", triggerKey, result);
boolean result = false; }
if (checkExists(jobName, jobGroup)) { return true;
result = scheduler.unscheduleJob(triggerKey); }
logger.info(">>>>>>>>>>> removeJob, triggerKey:{}, result [{}]", triggerKey, result);
} // Pause
return true; public static boolean pauseJob(String jobName, String jobGroup) throws SchedulerException {
} // TriggerKey : name + group
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
// Pause
public static boolean pauseJob(String jobName, String jobGroup) throws SchedulerException { boolean result = false;
// TriggerKey : name + group if (checkExists(jobName, jobGroup)) {
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); scheduler.pauseTrigger(triggerKey);
result = true;
boolean result = false; logger.info(">>>>>>>>>>> pauseJob success, triggerKey:{}", triggerKey);
if (checkExists(jobName, jobGroup)) { } else {
scheduler.pauseTrigger(triggerKey); logger.info(">>>>>>>>>>> pauseJob fail, triggerKey:{}", triggerKey);
result = true; }
logger.info(">>>>>>>>>>> pauseJob success, triggerKey:{}", triggerKey); return result;
} else { }
logger.info(">>>>>>>>>>> pauseJob fail, triggerKey:{}", triggerKey);
} // resume
return result; public static boolean resumeJob(String jobName, String jobGroup) throws SchedulerException {
} // TriggerKey : name + group
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
// resume
public static boolean resumeJob(String jobName, String jobGroup) throws SchedulerException { boolean result = false;
// TriggerKey : name + group if (checkExists(jobName, jobGroup)) {
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); scheduler.resumeTrigger(triggerKey);
result = true;
boolean result = false; logger.info(">>>>>>>>>>> resumeJob success, triggerKey:{}", triggerKey);
if (checkExists(jobName, jobGroup)) { } else {
scheduler.resumeTrigger(triggerKey); logger.info(">>>>>>>>>>> resumeJob fail, triggerKey:{}", triggerKey);
result = true; }
logger.info(">>>>>>>>>>> resumeJob success, triggerKey:{}", triggerKey); return result;
} else { }
logger.info(">>>>>>>>>>> resumeJob fail, triggerKey:{}", triggerKey);
} // run
return result; public static boolean triggerJob(String jobName, String jobGroup) throws SchedulerException {
} // TriggerKey : name + group
JobKey jobKey = new JobKey(jobName, jobGroup);
// run
public static boolean triggerJob(String jobName, String jobGroup) throws SchedulerException { boolean result = false;
// TriggerKey : name + group if (checkExists(jobName, jobGroup)) {
JobKey jobKey = new JobKey(jobName, jobGroup); scheduler.triggerJob(jobKey);
result = true;
boolean result = false; logger.info(">>>>>>>>>>> runJob success, jobKey:{}", jobKey);
if (checkExists(jobName, jobGroup)) { } else {
scheduler.triggerJob(jobKey); logger.info(">>>>>>>>>>> runJob fail, jobKey:{}", jobKey);
result = true; }
logger.info(">>>>>>>>>>> runJob success, jobKey:{}", jobKey); return result;
} else { }
logger.info(">>>>>>>>>>> runJob fail, jobKey:{}", jobKey);
}
return result;
}
} }
\ No newline at end of file
...@@ -3,9 +3,9 @@ package com.xxl.job.admin.core.thread; ...@@ -3,9 +3,9 @@ package com.xxl.job.admin.core.thread;
import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.util.DynamicSchedulerUtil; import com.xxl.job.admin.core.schedule.DynamicSchedulerUtil;
import com.xxl.job.admin.core.util.MailUtil; import com.xxl.job.admin.core.util.MailUtil;
import com.xxl.job.core.router.model.ResponseModel; import com.xxl.job.core.biz.model.ReturnT;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -41,7 +41,7 @@ public class JobMonitorHelper { ...@@ -41,7 +41,7 @@ public class JobMonitorHelper {
logger.info(">>>>>>>>>>> job monitor heat success, JobLogId:{}", jobLogId); logger.info(">>>>>>>>>>> job monitor heat success, JobLogId:{}", jobLogId);
XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(jobLogId); XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(jobLogId);
if (log!=null) { if (log!=null) {
if (ResponseModel.SUCCESS.equals(log.getTriggerStatus()) && StringUtils.isBlank(log.getHandleStatus())) { if ((ReturnT.SUCCESS_CODE+"").equals(log.getTriggerStatus()) && StringUtils.isBlank(log.getHandleStatus())) {
try { try {
TimeUnit.SECONDS.sleep(10); TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) { } catch (InterruptedException e) {
...@@ -49,10 +49,10 @@ public class JobMonitorHelper { ...@@ -49,10 +49,10 @@ public class JobMonitorHelper {
} }
JobMonitorHelper.monitor(jobLogId); JobMonitorHelper.monitor(jobLogId);
} }
if (ResponseModel.SUCCESS.equals(log.getTriggerStatus()) && ResponseModel.SUCCESS.equals(log.getHandleStatus())) { if ((ReturnT.SUCCESS_CODE+"").equals(log.getTriggerStatus()) && (ReturnT.SUCCESS_CODE+"").equals(log.getHandleStatus())) {
// pass // pass
} }
if (ResponseModel.FAIL.equals(log.getTriggerStatus()) || ResponseModel.FAIL.equals(log.getHandleStatus())) { if ((ReturnT.FAIL+"").equals(log.getTriggerStatus()) || (ReturnT.FAIL+"").equals(log.getHandleStatus())) {
XxlJobInfo info = DynamicSchedulerUtil.xxlJobInfoDao.load(log.getJobGroup(), log.getJobName()); XxlJobInfo info = DynamicSchedulerUtil.xxlJobInfoDao.load(log.getJobGroup(), log.getJobName());
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) { if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
......
package com.xxl.job.admin.core.thread; package com.xxl.job.admin.core.thread;
import com.xxl.job.admin.core.model.XxlJobRegistry; import com.xxl.job.admin.core.model.XxlJobRegistry;
import com.xxl.job.admin.core.util.DynamicSchedulerUtil; import com.xxl.job.admin.core.schedule.DynamicSchedulerUtil;
import com.xxl.job.core.registry.RegistHelper; import com.xxl.job.core.registry.RegistHelper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
......
...@@ -38,12 +38,10 @@ public class CookieUtil { ...@@ -38,12 +38,10 @@ public class CookieUtil {
/** /**
* 保存 * 保存
* @param request
* @param response * @param response
* @param key * @param key
* @param value * @param value
* @param maxAge * @param maxAge
* @param domain
*/ */
private static void set(HttpServletResponse response, private static void set(HttpServletResponse response,
String key, String value, int maxAge, String path) { String key, String value, int maxAge, String path) {
......
package com.xxl.job.core.util; package com.xxl.job.admin.core.util;
import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference; import org.codehaus.jackson.type.TypeReference;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
/** /**
* Jackson util * Jackson util
* *
* 1、obj need private and set/get; * 1、obj need private and set/get;
* 2、do not support inner class; * 2、do not support inner class;
* *
* @author xuxueli 2015-9-25 18:02:56 * @author xuxueli 2015-9-25 18:02:56
*/ */
public class JacksonUtil { public class JacksonUtil {
private final static ObjectMapper objectMapper = new ObjectMapper(); private final static ObjectMapper objectMapper = new ObjectMapper();
public static ObjectMapper getInstance() { public static ObjectMapper getInstance() {
return objectMapper; return objectMapper;
} }
/** /**
* bean、array、List、Map --> json * bean、array、List、Map --> json
* *
* @param obj * @param obj
* @return json string * @return json string
* @throws Exception * @throws Exception
*/ */
public static String writeValueAsString(Object obj) { public static String writeValueAsString(Object obj) {
try { try {
return getInstance().writeValueAsString(obj); return getInstance().writeValueAsString(obj);
} catch (JsonGenerationException e) { } catch (JsonGenerationException e) {
e.printStackTrace(); e.printStackTrace();
} catch (JsonMappingException e) { } catch (JsonMappingException e) {
e.printStackTrace(); e.printStackTrace();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
return null; return null;
} }
/** /**
* string --> bean、Map、List(array) * string --> bean、Map、List(array)
* *
* @param jsonStr * @param jsonStr
* @param clazz * @param clazz
* @return obj * @return obj
* @throws Exception * @throws Exception
*/ */
public static <T> T readValue(String jsonStr, Class<T> clazz) { public static <T> T readValue(String jsonStr, Class<T> clazz) {
try { try {
return getInstance().readValue(jsonStr, clazz); return getInstance().readValue(jsonStr, clazz);
} catch (JsonParseException e) { } catch (JsonParseException e) {
e.printStackTrace(); e.printStackTrace();
} catch (JsonMappingException e) { } catch (JsonMappingException e) {
e.printStackTrace(); e.printStackTrace();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
return null; return null;
} }
public static <T> T readValueRefer(String jsonStr, Class<T> clazz) { public static <T> T readValueRefer(String jsonStr, Class<T> clazz) {
try { try {
return getInstance().readValue(jsonStr, new TypeReference<T>() { }); return getInstance().readValue(jsonStr, new TypeReference<T>() { });
} catch (JsonParseException e) { } catch (JsonParseException e) {
e.printStackTrace(); e.printStackTrace();
} catch (JsonMappingException e) { } catch (JsonMappingException e) {
e.printStackTrace(); e.printStackTrace();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
return null; return null;
} }
public static void main(String[] args) { public static void main(String[] args) {
try { try {
Map<String, String> map = new HashMap<String, String>(); Map<String, String> map = new HashMap<String, String>();
map.put("aaa", "111"); map.put("aaa", "111");
map.put("bbb", "222"); map.put("bbb", "222");
String json = writeValueAsString(map); String json = writeValueAsString(map);
System.out.println(json); System.out.println(json);
System.out.println(readValue(json, Map.class)); System.out.println(readValue(json, Map.class));
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
} }
package com.xxl.job.admin.service; package com.xxl.job.admin.service;
import com.xxl.job.admin.core.model.ReturnT;
import com.xxl.job.core.biz.model.ReturnT;
import java.util.Map; import java.util.Map;
...@@ -13,9 +14,9 @@ public interface IXxlJobService { ...@@ -13,9 +14,9 @@ public interface IXxlJobService {
public Map<String, Object> pageList(int start, int length, int jobGroup, String executorHandler, String filterTime); public Map<String, Object> pageList(int start, int length, int jobGroup, String executorHandler, String filterTime);
public ReturnT<String> add(int jobGroup, String jobCron, String jobDesc,String author, String alarmEmail, public ReturnT<String> add(int jobGroup, String jobCron, String jobDesc, String author, String alarmEmail,
String executorAddress, String executorHandler, String executorParam, String executorAddress, String executorHandler, String executorParam,
int glueSwitch, String glueSource, String glueRemark, String childJobKey); int glueSwitch, String glueSource, String glueRemark, String childJobKey);
public ReturnT<String> reschedule(int jobGroup, String jobName, String jobCron, String jobDesc, String author, String alarmEmail, public ReturnT<String> reschedule(int jobGroup, String jobName, String jobCron, String jobDesc, String author, String alarmEmail,
String executorAddress, String executorHandler, String executorParam, int glueSwitch, String childJobKey); String executorAddress, String executorHandler, String executorParam, int glueSwitch, String childJobKey);
......
package com.xxl.job.admin.service.impl; package com.xxl.job.admin.service.impl;
import com.xxl.job.admin.core.model.ReturnT;
import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.util.DynamicSchedulerUtil; import com.xxl.job.admin.core.schedule.DynamicSchedulerUtil;
import com.xxl.job.admin.dao.IXxlJobGroupDao; import com.xxl.job.admin.dao.IXxlJobGroupDao;
import com.xxl.job.admin.dao.IXxlJobInfoDao; import com.xxl.job.admin.dao.IXxlJobInfoDao;
import com.xxl.job.admin.dao.IXxlJobLogDao; import com.xxl.job.admin.dao.IXxlJobLogDao;
import com.xxl.job.admin.dao.IXxlJobLogGlueDao; import com.xxl.job.admin.dao.IXxlJobLogGlueDao;
import com.xxl.job.admin.service.IXxlJobService; import com.xxl.job.admin.service.IXxlJobService;
import com.xxl.job.core.biz.model.ReturnT;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.FastDateFormat; import org.apache.commons.lang.time.FastDateFormat;
import org.quartz.CronExpression; import org.quartz.CronExpression;
...@@ -64,9 +64,9 @@ public class XxlJobServiceImpl implements IXxlJobService { ...@@ -64,9 +64,9 @@ public class XxlJobServiceImpl implements IXxlJobService {
} }
@Override @Override
public ReturnT<String> add(int jobGroup, String jobCron, String jobDesc,String author, String alarmEmail, public ReturnT<String> add(int jobGroup, String jobCron, String jobDesc, String author, String alarmEmail,
String executorAddress, String executorHandler, String executorParam, String executorAddress, String executorHandler, String executorParam,
int glueSwitch, String glueSource, String glueRemark, String childJobKey) { int glueSwitch, String glueSource, String glueRemark, String childJobKey) {
// valid // valid
XxlJobGroup group = xxlJobGroupDao.load(jobGroup); XxlJobGroup group = xxlJobGroupDao.load(jobGroup);
if (group == null) { if (group == null) {
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
</bean> </bean>
<!-- 协同-调度器 --> <!-- 协同-调度器 -->
<bean id="dynamicSchedulerUtil" class="com.xxl.job.admin.core.util.DynamicSchedulerUtil" init-method="init" destroy-method="destroy" > <bean id="dynamicSchedulerUtil" class="com.xxl.job.admin.core.schedule.DynamicSchedulerUtil" init-method="init" destroy-method="destroy" >
<!-- (轻易不要变更“调度器名称”, 任务创建时会绑定该“调度器名称”) --> <!-- (轻易不要变更“调度器名称”, 任务创建时会绑定该“调度器名称”) -->
<property name="scheduler" ref="quartzScheduler"/> <property name="scheduler" ref="quartzScheduler"/>
<!-- 调度中心回调IP[选填],为空则自动获取 --> <!-- 调度中心回调IP[选填],为空则自动获取 -->
......
package com.xxl.job.dao.impl;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.dao.IXxlJobLogDao;
import com.xxl.job.core.router.model.ResponseModel;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath*:applicationcontext-*.xml")
public class XxlJobLogTest {
@Resource
private IXxlJobLogDao xxlJobLogDao;
@Test
public void save_load(){
XxlJobLog xxlJobLog = new XxlJobLog();
xxlJobLog.setJobName("job_name");
int count = xxlJobLogDao.save(xxlJobLog);
System.out.println(count);
System.out.println(xxlJobLog.getId());
XxlJobLog item = xxlJobLogDao.load(xxlJobLog.getId());
System.out.println(item);
}
@Test
public void updateTriggerInfo(){
XxlJobLog xxlJobLog = xxlJobLogDao.load(29);
xxlJobLog.setTriggerTime(new Date());
xxlJobLog.setTriggerStatus(ResponseModel.SUCCESS);
xxlJobLog.setTriggerMsg("trigger msg");
xxlJobLogDao.updateTriggerInfo(xxlJobLog);
}
@Test
public void updateHandleInfo(){
XxlJobLog xxlJobLog = xxlJobLogDao.load(29);
xxlJobLog.setHandleTime(new Date());
xxlJobLog.setHandleStatus(ResponseModel.SUCCESS);
xxlJobLog.setHandleMsg("handle msg");
xxlJobLogDao.updateHandleInfo(xxlJobLog);
}
@Test
public void pageList(){
List<XxlJobLog> list = xxlJobLogDao.pageList(0, 20, 0, null, null, null);
int list_count = xxlJobLogDao.pageListCount(0, 20, 0, null, null, null);
System.out.println(list);
System.out.println(list_count);
}
}
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
<parent> <parent>
<groupId>com.xuxueli</groupId> <groupId>com.xuxueli</groupId>
<artifactId>xxl-job</artifactId> <artifactId>xxl-job</artifactId>
<version>1.6.0</version> <version>1.6.0-SNAPSHOT</version>
</parent> </parent>
<artifactId>xxl-job-core</artifactId> <artifactId>xxl-job-core</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
...@@ -40,6 +40,13 @@ ...@@ -40,6 +40,13 @@
<version>1.7.5</version> <version>1.7.5</version>
</dependency> </dependency>
<!-- hessian -->
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.38</version>
</dependency>
<!-- jackson --> <!-- jackson -->
<dependency> <dependency>
<groupId>org.codehaus.jackson</groupId> <groupId>org.codehaus.jackson</groupId>
......
package com.xxl.job.core.biz;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
/**
* Created by xuxueli on 17/3/1.
*/
public interface AdminBiz {
public ReturnT<String> callback(TriggerParam triggerParam);
}
package com.xxl.job.core.biz;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
/**
* Created by xuxueli on 17/3/1.
*/
public interface ExecutorBiz {
/**
* beat
* @return
*/
public ReturnT<String> beat();
/**
* kill
* @param jobGroup
* @param jobName
* @return
*/
public ReturnT<String> kill(String jobGroup, String jobName);
/**
* log
* @param logDateTim
* @param logId
* @return
*/
public ReturnT<String> log(long logDateTim, int logId);
/**
* run
* @param triggerParam
* @return
*/
public ReturnT<String> run(TriggerParam triggerParam);
}
package com.xxl.job.core.router.action; package com.xxl.job.core.biz.impl;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.glue.GlueFactory; import com.xxl.job.core.glue.GlueFactory;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.impl.GlueJobHandler; import com.xxl.job.core.handler.impl.GlueJobHandler;
import com.xxl.job.core.router.HandlerRouter; import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.router.IAction; import com.xxl.job.core.thread.JobThread;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel; import java.util.Date;
import com.xxl.job.core.router.thread.JobThread;
/** /**
* Created by xuxueli on 16/7/22. * Created by xuxueli on 17/3/1.
*/ */
public class RunAction extends IAction { public class ExecutorBizImpl implements ExecutorBiz {
@Override
public ReturnT<String> beat() {
return ReturnT.SUCCESS;
}
@Override
public ReturnT<String> kill(String jobGroup, String jobName) {
// generate jobKey
String jobKey = jobGroup.concat("_").concat(jobName);
// kill handlerThread, and create new one
JobThread jobThread = XxlJobExecutor.loadJobThread(jobKey);
if (jobThread != null) {
IJobHandler handler = jobThread.getHandler();
jobThread.toStop("人工手动终止");
jobThread.interrupt();
//XxlJobExecutor.registJobThread(jobKey, handler);
return ReturnT.SUCCESS;
}
return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread not found.");
}
@Override @Override
public ResponseModel execute(RequestModel requestModel) { public ReturnT<String> log(long logDateTim, int logId) {
// log filename: yyyy-MM-dd/9999.log
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logDateTim), logId);
String logConteng = XxlJobFileAppender.readLog(logFileName);
return new ReturnT<String>(ReturnT.SUCCESS_CODE, logConteng);
}
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// generate jobKey // generate jobKey
String jobKey = requestModel.getJobGroup().concat("_").concat(requestModel.getJobName()); String jobKey = triggerParam.getJobGroup().concat("_").concat(triggerParam.getJobName());
// load old thread // load old thread
JobThread jobThread = HandlerRouter.loadJobThread(jobKey); JobThread jobThread = XxlJobExecutor.loadJobThread(jobKey);
if (!requestModel.isGlueSwitch()) { if (!triggerParam.isGlueSwitch()) {
// bean model // bean model
// valid handler instance // valid handler instance
IJobHandler jobHandler = HandlerRouter.loadJobHandler(requestModel.getExecutorHandler()); IJobHandler jobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
if (jobHandler==null) { if (jobHandler==null) {
return new ResponseModel(ResponseModel.FAIL, "job handler for jobKey=[" + jobKey + "] not found."); return new ReturnT(ReturnT.FAIL_CODE, "job handler for jobKey=[" + jobKey + "] not found.");
} }
if (jobThread == null) { if (jobThread == null) {
jobThread = HandlerRouter.registJobThread(jobKey, jobHandler); jobThread = XxlJobExecutor.registJobThread(jobKey, jobHandler);
} else { } else {
// job handler update, kill old job thread // job handler update, kill old job thread
if (jobThread.getHandler() != jobHandler) { if (jobThread.getHandler() != jobHandler) {
...@@ -42,7 +78,7 @@ public class RunAction extends IAction { ...@@ -42,7 +78,7 @@ public class RunAction extends IAction {
jobThread.interrupt(); jobThread.interrupt();
// new thread, with new job handler // new thread, with new job handler
jobThread = HandlerRouter.registJobThread(jobKey, jobHandler); jobThread = XxlJobExecutor.registJobThread(jobKey, jobHandler);
} }
} }
} else { } else {
...@@ -50,11 +86,11 @@ public class RunAction extends IAction { ...@@ -50,11 +86,11 @@ public class RunAction extends IAction {
// valid glueloader // valid glueloader
if (!GlueFactory.isActive()) { if (!GlueFactory.isActive()) {
return new ResponseModel(ResponseModel.FAIL, "glueLoader for jobKey=[" + jobKey + "] not found."); return new ReturnT(ReturnT.FAIL_CODE, "glueLoader for jobKey=[" + jobKey + "] not found.");
} }
if (jobThread == null) { if (jobThread == null) {
jobThread = HandlerRouter.registJobThread(jobKey, new GlueJobHandler(requestModel.getJobGroup(), requestModel.getJobName())); jobThread = XxlJobExecutor.registJobThread(jobKey, new GlueJobHandler(triggerParam.getJobGroup(), triggerParam.getJobName()));
} else { } else {
// job handler update, kill old job thread // job handler update, kill old job thread
if (!(jobThread.getHandler() instanceof GlueJobHandler)) { if (!(jobThread.getHandler() instanceof GlueJobHandler)) {
...@@ -63,14 +99,14 @@ public class RunAction extends IAction { ...@@ -63,14 +99,14 @@ public class RunAction extends IAction {
jobThread.interrupt(); jobThread.interrupt();
// new thread, with new job handler // new thread, with new job handler
jobThread = HandlerRouter.registJobThread(jobKey, new GlueJobHandler(requestModel.getJobGroup(), requestModel.getJobName())); jobThread = XxlJobExecutor.registJobThread(jobKey, new GlueJobHandler(triggerParam.getJobGroup(), triggerParam.getJobName()));
} }
} }
} }
// push data to queue // push data to queue
jobThread.pushTriggerQueue(requestModel); jobThread.pushTriggerQueue(triggerParam);
return new ResponseModel(ResponseModel.SUCCESS, null); return ReturnT.SUCCESS;
} }
} }
package com.xxl.job.admin.core.model; package com.xxl.job.core.biz.model;
/** import java.io.Serializable;
* common return
* @author xuxueli 2015-12-4 16:32:31 /**
* @param <T> * common return
*/ * @author xuxueli 2015-12-4 16:32:31
public class ReturnT<T> { * @param <T>
public static final ReturnT<String> SUCCESS = new ReturnT<String>(null); */
public static final ReturnT<String> FAIL = new ReturnT<String>(500, null); public class ReturnT<T> implements Serializable {
public static final long serialVersionUID = 42L;
private int code;
private String msg; public static final int SUCCESS_CODE = 200;
private T content; public static final int FAIL_CODE = 500;
public static final ReturnT<String> SUCCESS = new ReturnT<String>(null);
public ReturnT(int code, String msg) { public static final ReturnT<String> FAIL = new ReturnT<String>(FAIL_CODE, null);
this.code = code;
this.msg = msg; private int code;
} private String msg;
public ReturnT(T content) { private T content;
this.code = 200;
this.content = content; public ReturnT(int code, String msg) {
} this.code = code;
this.msg = msg;
public int getCode() { }
return code; public ReturnT(T content) {
} this.code = SUCCESS_CODE;
public void setCode(int code) { this.content = content;
this.code = code; }
}
public String getMsg() { public int getCode() {
return msg; return code;
} }
public void setMsg(String msg) { public void setCode(int code) {
this.msg = msg; this.code = code;
} }
public T getContent() { public String getMsg() {
return content; return msg;
} }
public void setContent(T content) { public void setMsg(String msg) {
this.content = content; this.msg = msg;
} }
public T getContent() {
@Override return content;
public String toString() { }
return "ReturnT [code=" + code + ", msg=" + msg + ", content=" public void setContent(T content) {
+ content + "]"; this.content = content;
} }
} @Override
public String toString() {
return "ReturnT [code=" + code + ", msg=" + msg + ", content=" + content + "]";
}
}
package com.xxl.job.core.router.model; package com.xxl.job.core.biz.model;
import java.io.Serializable;
import java.util.Set; import java.util.Set;
/** /**
* Created by xuxueli on 16/7/22. * Created by xuxueli on 16/7/22.
*/ */
public class RequestModel { public class TriggerParam implements Serializable{
private static final long serialVersionUID = 42L;
private long timestamp;
private String action; private String action;
private String jobGroup; private String jobGroup;
...@@ -25,15 +26,6 @@ public class RequestModel { ...@@ -25,15 +26,6 @@ public class RequestModel {
private String status; private String status;
private String msg; private String msg;
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public String getAction() { public String getAction() {
return action; return action;
} }
...@@ -125,7 +117,6 @@ public class RequestModel { ...@@ -125,7 +117,6 @@ public class RequestModel {
@Override @Override
public String toString() { public String toString() {
return "RequestModel{" + return "RequestModel{" +
"timestamp=" + timestamp +
", action='" + action + '\'' + ", action='" + action + '\'' +
", jobGroup='" + jobGroup + '\'' + ", jobGroup='" + jobGroup + '\'' +
", jobName='" + jobName + '\'' + ", jobName='" + jobName + '\'' +
......
package com.xxl.job.core.executor.jetty; package com.xxl.job.core.executor;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.impl.ExecutorBizImpl;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.JobHander; import com.xxl.job.core.handler.annotation.JobHander;
import com.xxl.job.core.registry.RegistHelper; import com.xxl.job.core.registry.RegistHelper;
import com.xxl.job.core.router.HandlerRouter; import com.xxl.job.core.rpc.netcom.NetComServerFactory;
import com.xxl.job.core.util.IpUtil; import com.xxl.job.core.thread.JobThread;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
...@@ -21,7 +17,7 @@ import org.springframework.context.ApplicationListener; ...@@ -21,7 +17,7 @@ import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent; import org.springframework.context.event.ContextClosedEvent;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.ConcurrentHashMap;
/** /**
* Created by xuxueli on 2016/3/2 21:14. * Created by xuxueli on 2016/3/2 21:14.
...@@ -48,100 +44,28 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe ...@@ -48,100 +44,28 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
} }
// ---------------------------------- job server ------------------------------------ // ---------------------------------- job server ------------------------------------
private Server server = null; private NetComServerFactory serverFactory = new NetComServerFactory();
public void start() throws Exception { public void start() throws Exception {
NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl());
Thread executorTnread = new Thread(new Runnable() { serverFactory.start(port, ip, appName, registHelper);
@Override
public void run() {
server = new Server();
server.setThreadPool(new ExecutorThreadPool(200, 200, 30000)); // 非阻塞
// connector
SelectChannelConnector connector = new SelectChannelConnector();
connector.setPort(port);
connector.setMaxIdleTime(30000);
server.setConnectors(new Connector[] { connector });
// handler
HandlerCollection handlerc =new HandlerCollection();
handlerc.setHandlers(new Handler[]{new XxlJobExecutorHandler()});
server.setHandler(handlerc);
try {
server.start();
logger.info(">>>>>>>>>>>> xxl-job jetty server start success at port:{}.", port);
registryBeat();
server.join(); // block until thread stopped
logger.info(">>>>>>>>>>>> xxl-job jetty server join success at port:{}.", port);
} catch (Exception e) {
e.printStackTrace();
}
}
});
executorTnread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
executorTnread.start();
} }
public void destroy(){ public void destroy(){
if (server!=null) { serverFactory.destroy();
try {
server.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void registryBeat(){
if (registHelper==null && appName==null || appName.trim().length()==0) {
return;
}
Thread registryThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
// generate addredd = ip:port
String address = null;
if (ip != null && ip.trim().length()>0) {
address = ip.trim().concat(":").concat(String.valueOf(port));
} else {
address = IpUtil.getIpPort(port);
}
registHelper.registry(RegistHelper.RegistType.EXECUTOR.name(), appName, address);
TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT);
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
registryThread.setDaemon(true);
registryThread.start();
} }
// ---------------------------------- init job handler ------------------------------------ // ---------------------------------- init job handler ------------------------------------
public static ApplicationContext applicationContext;
@Override @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
XxlJobExecutor.applicationContext = applicationContext;
initJobHandler(); // init job handler action
} Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHander.class);
/**
* init job handler action
*/
public void initJobHandler(){
Map<String, Object> serviceBeanMap = XxlJobExecutor.applicationContext.getBeansWithAnnotation(JobHander.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) { if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) { for (Object serviceBean : serviceBeanMap.values()) {
if (serviceBean instanceof IJobHandler){ if (serviceBean instanceof IJobHandler){
String name = serviceBean.getClass().getAnnotation(JobHander.class).value(); String name = serviceBean.getClass().getAnnotation(JobHander.class).value();
IJobHandler handler = (IJobHandler) serviceBean; IJobHandler handler = (IJobHandler) serviceBean;
HandlerRouter.registJobHandler(name, handler); registJobHandler(name, handler);
} }
} }
} }
...@@ -155,4 +79,27 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe ...@@ -155,4 +79,27 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
} }
} }
// ---------------------------------- job handler repository
private static ConcurrentHashMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info("xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
}
public static IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
}
// ---------------------------------- job thread repository
private static ConcurrentHashMap<String, JobThread> JobThreadRepository = new ConcurrentHashMap<String, JobThread>();
public static JobThread registJobThread(String jobkey, IJobHandler handler){
JobThread jobThread = new JobThread(handler);
jobThread.start();
logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobkey:{}, handler:{}", new Object[]{jobkey, handler});
JobThreadRepository.put(jobkey, jobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
return jobThread;
}
public static JobThread loadJobThread(String jobKey){
return JobThreadRepository.get(jobKey);
}
} }
package com.xxl.job.core.executor.jetty;
import com.xxl.job.core.router.HandlerRouter;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import com.xxl.job.core.util.XxlJobNetCommUtil;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
/**
* Created by xuxueli on 2016/3/2 21:23.
*/
public class XxlJobExecutorHandler extends AbstractHandler {
private static Logger logger = LoggerFactory.getLogger(XxlJobExecutorHandler.class);
@Override
public void handle(String s, Request baseRequest, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException, ServletException {
httpServletRequest.setCharacterEncoding("UTF-8");
httpServletResponse.setCharacterEncoding("UTF-8");
// parse hex-json to request model
String requestHex = httpServletRequest.getParameter(XxlJobNetCommUtil.HEX);
ResponseModel responseModel = null;
if (requestHex!=null && requestHex.trim().length()>0) {
try {
// route trigger
RequestModel requestModel = XxlJobNetCommUtil.parseHexJson2Obj(requestHex, RequestModel.class);
responseModel = HandlerRouter.route(requestModel);
} catch (Exception e) {
logger.error("", e);
responseModel = new ResponseModel(ResponseModel.SUCCESS, e.getMessage());
}
}
if (responseModel == null) {
responseModel = new ResponseModel(ResponseModel.SUCCESS, "系统异常");
}
// format response model to hex-json
String responseHex = XxlJobNetCommUtil.formatObj2HexJson(responseModel);
// return
httpServletResponse.setContentType("text/plain;charset=utf-8");
httpServletResponse.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(true);
httpServletResponse.getWriter().println(responseHex);
}
}
//package com.xxl.job.client.netcom.servlet;
//
//
//import java.io.IOException;
//import java.util.HashMap;
//import java.util.Map;
//
//import javax.servlet.ServletException;
//import javax.servlet.http.HttpServlet;
//import javax.servlet.http.HttpServletRequest;
//import javax.servlet.http.HttpServletResponse;
//
//import com.xxl.job.client.handler.HandlerRouter;
//
//
///**
// * remote job client on http
// * @author xuxueli 2015-12-19 18:36:47
// */
//@Deprecated
//public class XxlJobServlet extends HttpServlet {
// private static final long serialVersionUID = 1L;
//
// /**
// * Default constructor.
// */
// public XxlJobServlet() {
// // TODO Auto-generated constructor stub
// }
//
// /**
// * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse response)
// */
// protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
// request.setCharacterEncoding("UTF-8");
// response.setCharacterEncoding("UTF-8");
//
// Map<String, String> _param = new HashMap<String, String>();
// if (request.getParameterMap()!=null && request.getParameterMap().size()>0) {
// for (Object paramKey : request.getParameterMap().keySet()) {
// if (paramKey!=null) {
// String paramKeyStr = paramKey.toString();
// _param.put(paramKeyStr, request.getParameter(paramKeyStr));
// }
// }
// }
//
// String resp = HandlerRouter.action(_param);
// response.getWriter().append(resp);
// return;
// }
//
// /**
// * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse response)
// */
// protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
// // TODO Auto-generated method stub
// doGet(request, response);
// }
//
//}
package com.xxl.job.core.handler; package com.xxl.job.core.handler;
import com.xxl.job.core.router.HandlerRouter;
/** /**
* remote job handler * remote job handler
* @author xuxueli 2015-12-19 19:06:38 * @author xuxueli 2015-12-19 19:06:38
*/ */
public abstract class IJobHandler extends HandlerRouter { public abstract class IJobHandler {
/** /**
* job handler <br><br> * job handler <br><br>
......
package com.xxl.job.core.router;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.router.action.BeatAction;
import com.xxl.job.core.router.action.KillAction;
import com.xxl.job.core.router.action.LogAction;
import com.xxl.job.core.router.action.RunAction;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import com.xxl.job.core.router.thread.JobThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
/**
* handler repository
* @author xuxueli 2015-12-19 19:28:44
*/
public class HandlerRouter {
private static Logger logger = LoggerFactory.getLogger(HandlerRouter.class);
/**
* job handler repository
*/
private static ConcurrentHashMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info("xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return HandlerRouter.jobHandlerRepository.put(name, jobHandler);
}
public static IJobHandler loadJobHandler(String name){
return HandlerRouter.jobHandlerRepository.get(name);
}
/**
* job thread repository
*/
private static ConcurrentHashMap<String, JobThread> JobThreadRepository = new ConcurrentHashMap<String, JobThread>();
public static JobThread registJobThread(String jobkey, IJobHandler handler){
JobThread jobThread = new JobThread(handler);
jobThread.start();
logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobkey:{}, handler:{}", new Object[]{jobkey, handler});
HandlerRouter.JobThreadRepository.put(jobkey, jobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
return jobThread;
}
public static JobThread loadJobThread(String jobKey){
return HandlerRouter.JobThreadRepository.get(jobKey);
}
/**
* route action repository
*/
public enum ActionRepository {
RUN(new RunAction()),
KILL(new KillAction()),
LOG(new LogAction()),
BEAT(new BeatAction());
private IAction action;
private ActionRepository(IAction action){
this.action = action;
}
/**
* match Action by enum name
* @param name
* @return action
*/
public static IAction matchAction(String name){
if (name!=null && name.trim().length()>0) {
for (ActionRepository item : ActionRepository.values()) {
if (item.name().equals(name)) {
return item.action;
}
}
}
return null;
}
}
// handler push to queue
public static ResponseModel route(RequestModel requestModel) {
logger.debug(">>>>>>>>>>> xxl-job route, RequestModel:{}", new Object[]{requestModel.toString()});
// timestamp check
if (System.currentTimeMillis() - requestModel.getTimestamp() > 60000) {
return new ResponseModel(ResponseModel.FAIL, "Timestamp Timeout.");
}
// match action
IAction action = ActionRepository.matchAction(requestModel.getAction());
if (action == null) {
return new ResponseModel(ResponseModel.FAIL, "Action match fail.");
}
return action.execute(requestModel);
}
}
package com.xxl.job.core.router;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
/**
* Created by xuxueli on 16/7/22.
*/
public abstract class IAction {
public abstract ResponseModel execute(RequestModel requestModel);
}
package com.xxl.job.core.router.action;
import com.xxl.job.core.router.IAction;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
/**
* Created by xuxueli on 16/7/22.
*/
public class BeatAction extends IAction {
@Override
public ResponseModel execute(RequestModel requestModel) {
return new ResponseModel(ResponseModel.SUCCESS, "i am alive.");
}
}
package com.xxl.job.core.router.action;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.router.HandlerRouter;
import com.xxl.job.core.router.IAction;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import com.xxl.job.core.router.thread.JobThread;
/**
* Created by xuxueli on 16/7/22.
*/
public class KillAction extends IAction {
@Override
public ResponseModel execute(RequestModel requestModel) {
// generate jobKey
String jobKey = requestModel.getJobGroup().concat("_").concat(requestModel.getJobName());
// kill handlerThread, and create new one
JobThread jobThread = HandlerRouter.loadJobThread(jobKey);
if (jobThread != null) {
IJobHandler handler = jobThread.getHandler();
jobThread.toStop("人工手动终止");
jobThread.interrupt();
HandlerRouter.registJobThread(jobKey, handler);
return new ResponseModel(ResponseModel.SUCCESS, "job thread kill success.");
}
return new ResponseModel(ResponseModel.FAIL, "job thread not found.");
}
}
package com.xxl.job.core.router.action;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.router.IAction;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import java.util.Date;
/**
* Created by xuxueli on 16/7/22.
*/
public class LogAction extends IAction {
@Override
public ResponseModel execute(RequestModel requestModel) {
// log filename: yyyy-MM-dd/9999.log
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(requestModel.getLogDateTim()), requestModel.getLogId());
String logConteng = XxlJobFileAppender.readLog(logFileName);
return new ResponseModel(ResponseModel.SUCCESS, logConteng);
}
}
package com.xxl.job.core.router.model;
/**
* Created by xuxueli on 16/7/22.
*/
public class ResponseModel {
public static final String SUCCESS = "SUCCESS";
public static final String FAIL = "FAIL";
private String status;
private String msg;
public ResponseModel() {
}
public ResponseModel(String status, String msg) {
this.status = status;
this.msg = msg;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
@Override
public String toString() {
return "ResponseModel{" +
"status='" + status + '\'' +
", msg='" + msg + '\'' +
'}';
}
}
package com.xxl.job.core.rpc.codec;
import java.io.Serializable;
import java.util.Arrays;
/**
* request
* @author xuxueli 2015-10-29 19:39:12
*/
public class RpcRequest implements Serializable{
private static final long serialVersionUID = 1L;
private String serverAddress;
private long createMillisTime;
private String className;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] parameters;
public String getServerAddress() {
return serverAddress;
}
public void setServerAddress(String serverAddress) {
this.serverAddress = serverAddress;
}
public long getCreateMillisTime() {
return createMillisTime;
}
public void setCreateMillisTime(long createMillisTime) {
this.createMillisTime = createMillisTime;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public Object[] getParameters() {
return parameters;
}
public void setParameters(Object[] parameters) {
this.parameters = parameters;
}
@Override
public String toString() {
return "NettyRequest [serverAddress=" + serverAddress + ", createMillisTime="
+ createMillisTime + ", className=" + className
+ ", methodName=" + methodName + ", parameterTypes="
+ Arrays.toString(parameterTypes) + ", parameters="
+ Arrays.toString(parameters) + "]";
}
}
package com.xxl.job.core.rpc.codec;
import java.io.Serializable;
/**
* response
* @author xuxueli 2015-10-29 19:39:54
*/
public class RpcResponse implements Serializable{
private static final long serialVersionUID = 1L;
private String error;
private Object result;
public boolean isError() {
return error != null;
}
public String getError() {
return error;
}
public void setError(String error) {
this.error = error;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
@Override
public String toString() {
return "NettyResponse [error=" + error
+ ", result=" + result + "]";
}
}
package com.xxl.job.core.rpc.netcom;
import com.xxl.job.core.rpc.codec.RpcRequest;
import com.xxl.job.core.rpc.codec.RpcResponse;
import com.xxl.job.core.rpc.netcom.jetty.client.JettyClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.FactoryBean;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* rpc proxy
* @author xuxueli 2015-10-29 20:18:32
*/
public class NetComClientProxy implements FactoryBean<Object> {
private static final Logger logger = LoggerFactory.getLogger(NetComClientProxy.class);
// ---------------------- config ----------------------
private Class<?> iface;
String serverAddress;
JettyClient client = new JettyClient();
public NetComClientProxy(Class<?> iface, String serverAddress) {
this.iface = iface;
this.serverAddress = serverAddress;
}
@Override
public Object getObject() throws Exception {
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { iface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// request
RpcRequest request = new RpcRequest();
request.setServerAddress(serverAddress);
request.setCreateMillisTime(System.currentTimeMillis());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
// send
RpcResponse response = client.send(request);
// valid response
if (response == null) {
logger.error(">>>>>>>>>>> xxl-rpc netty response not found.");
throw new Exception(">>>>>>>>>>> xxl-rpc netty response not found.");
}
if (response.isError()) {
throw new RuntimeException(response.getError());
} else {
return response.getResult();
}
}
});
}
@Override
public Class<?> getObjectType() {
return iface;
}
@Override
public boolean isSingleton() {
return false;
}
}
package com.xxl.job.core.rpc.netcom;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.registry.RegistHelper;
import com.xxl.job.core.rpc.codec.RpcRequest;
import com.xxl.job.core.rpc.codec.RpcResponse;
import com.xxl.job.core.rpc.netcom.jetty.server.JettyServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cglib.reflect.FastClass;
import org.springframework.cglib.reflect.FastMethod;
import java.util.HashMap;
import java.util.Map;
/**
* netcom init
* @author xuxueli 2015-10-31 22:54:27
*/
public class NetComServerFactory {
private static final Logger logger = LoggerFactory.getLogger(NetComServerFactory.class);
// ---------------------- server start ----------------------
JettyServer server = new JettyServer();
public void start(int port, String ip, String appName, RegistHelper registHelper) throws Exception {
server.start(port, ip, appName, registHelper);
}
// ---------------------- server destroy ----------------------
public void destroy(){
server.destroy();
}
// ---------------------- server init ----------------------
/**
* init local rpc service map
*/
private static Map<String, Object> serviceMap = new HashMap<String, Object>();
public static void putService(Class<?> iface, Object serviceBean){
serviceMap.put(iface.getName(), serviceBean);
}
public static RpcResponse invokeService(RpcRequest request, Object serviceBean) {
if (serviceBean==null) {
serviceBean = serviceMap.get(request.getClassName());
}
if (serviceBean == null) {
// TODO
}
RpcResponse response = new RpcResponse();
if (System.currentTimeMillis() - request.getCreateMillisTime() > 60000) {
response.setResult(new ReturnT<String>(ReturnT.FAIL_CODE, "Timestamp Timeout."));
return response;
}
try {
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
Object result = serviceFastMethod.invoke(serviceBean, parameters);
response.setResult(result);
} catch (Throwable t) {
t.printStackTrace();
response.setError(t.getMessage());
}
return response;
}
}
package com.xxl.job.core.rpc.netcom.jetty.client;
import com.xxl.job.core.rpc.codec.RpcRequest;
import com.xxl.job.core.rpc.codec.RpcResponse;
import com.xxl.job.core.rpc.serialize.HessianSerializer;
import com.xxl.job.core.util.HttpClientUtil;
/**
* jetty client
* @author xuxueli 2015-11-24 22:25:15
*/
public class JettyClient {
public RpcResponse send(RpcRequest request) throws Exception {
byte[] requestBytes = HessianSerializer.serialize(request);
byte[] responseBytes = HttpClientUtil.postRequest("http://" + request.getServerAddress() + "/", requestBytes);
return (RpcResponse) HessianSerializer.deserialize(responseBytes, RpcResponse.class);
}
}
package com.xxl.job.core.rpc.netcom.jetty.server;
import com.xxl.job.core.registry.RegistHelper;
import com.xxl.job.core.util.IpUtil;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
* rpc jetty server
* @author xuxueli 2015-11-19 22:29:03
*/
public class JettyServer {
private static final Logger logger = LoggerFactory.getLogger(JettyServer.class);
private Server server;
public void start(final int port, final String ip, final String appName, final RegistHelper registHelper) throws Exception {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
server = new Server();
server.setThreadPool(new ExecutorThreadPool(200, 200, 30000)); // 非阻塞
// connector
SelectChannelConnector connector = new SelectChannelConnector();
connector.setPort(port);
connector.setMaxIdleTime(30000);
server.setConnectors(new Connector[] { connector });
// handler
HandlerCollection handlerc =new HandlerCollection();
handlerc.setHandlers(new Handler[]{new JettyServerHandler()});
server.setHandler(handlerc);
try {
server.start();
logger.info(">>>>>>>>>>>> xxl-job jetty server start success at port:{}.", port);
executorRegistryBeat(port, ip, appName, registHelper);
server.join(); // block until thread stopped
logger.info(">>>>>>>>>>> xxl-rpc server start success, netcon={}, port={}", JettyServer.class.getName(), port);
} catch (Exception e) {
logger.error("", e);
} finally {
server.destroy();
}
}
});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
public void destroy() {
if (server != null) {
try {
server.destroy();
} catch (Exception e) {
logger.error("", e);
}
}
logger.info(">>>>>>>>>>> xxl-rpc server destroy success, netcon={}", JettyServer.class.getName());
}
/**
* registry beat
* @param port
* @param ip
* @param appName
* @param registHelper
*/
private void executorRegistryBeat(final int port, final String ip, final String appName, final RegistHelper registHelper){
if (registHelper==null && appName==null || appName.trim().length()==0) {
return;
}
Thread registryThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
// generate addredd = ip:port
String address = null;
if (ip != null && ip.trim().length()>0) {
address = ip.trim().concat(":").concat(String.valueOf(port));
} else {
address = IpUtil.getIpPort(port);
}
registHelper.registry(RegistHelper.RegistType.EXECUTOR.name(), appName, address);
TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT);
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
registryThread.setDaemon(true);
registryThread.start();
}
}
package com.xxl.job.core.rpc.netcom.jetty.server;
import com.xxl.job.core.rpc.codec.RpcRequest;
import com.xxl.job.core.rpc.codec.RpcResponse;
import com.xxl.job.core.rpc.netcom.NetComServerFactory;
import com.xxl.job.core.rpc.serialize.HessianSerializer;
import com.xxl.job.core.util.HttpClientUtil;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStream;
/**
* jetty handler
* @author xuxueli 2015-11-19 22:32:36
*/
public class JettyServerHandler extends AbstractHandler {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
// deserialize request
byte[] requestBytes = HttpClientUtil.readBytes(request);
RpcRequest rpcRequest = (RpcRequest) HessianSerializer.deserialize(requestBytes, RpcRequest.class);
// invoke
RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest, null);
// serialize response
byte[] responseBytes = HessianSerializer.serialize(rpcResponse);
response.setContentType("text/html;charset=utf-8");
response.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(true);
OutputStream out = response.getOutputStream();
out.write(responseBytes);
out.flush();
}
}
package com.xxl.job.core.rpc.serialize;
import com.caucho.hessian.io.HessianInput;
import com.caucho.hessian.io.HessianOutput;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* hessian serialize
* @author xuxueli 2015-9-26 02:53:29
*/
public class HessianSerializer {
public static <T> byte[] serialize(T obj){
ByteArrayOutputStream os = new ByteArrayOutputStream();
HessianOutput ho = new HessianOutput(os);
try {
ho.writeObject(obj);
} catch (IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
return os.toByteArray();
}
public static <T> Object deserialize(byte[] bytes, Class<T> clazz) {
ByteArrayInputStream is = new ByteArrayInputStream(bytes);
HessianInput hi = new HessianInput(is);
try {
return hi.readObject();
} catch (IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
package com.xxl.job.core.router.thread; package com.xxl.job.core.thread;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import org.eclipse.jetty.util.ConcurrentHashSet; import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -23,7 +23,7 @@ public class JobThread extends Thread{ ...@@ -23,7 +23,7 @@ public class JobThread extends Thread{
private static Logger logger = LoggerFactory.getLogger(JobThread.class); private static Logger logger = LoggerFactory.getLogger(JobThread.class);
private IJobHandler handler; private IJobHandler handler;
private LinkedBlockingQueue<RequestModel> triggerQueue; private LinkedBlockingQueue<TriggerParam> triggerQueue;
private ConcurrentHashSet<Integer> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID private ConcurrentHashSet<Integer> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID
private boolean toStop = false; private boolean toStop = false;
...@@ -31,21 +31,21 @@ public class JobThread extends Thread{ ...@@ -31,21 +31,21 @@ public class JobThread extends Thread{
public JobThread(IJobHandler handler) { public JobThread(IJobHandler handler) {
this.handler = handler; this.handler = handler;
triggerQueue = new LinkedBlockingQueue<RequestModel>(); triggerQueue = new LinkedBlockingQueue<TriggerParam>();
triggerLogIdSet = new ConcurrentHashSet<Integer>(); triggerLogIdSet = new ConcurrentHashSet<Integer>();
} }
public IJobHandler getHandler() { public IJobHandler getHandler() {
return handler; return handler;
} }
public void pushTriggerQueue(RequestModel requestModel) { public void pushTriggerQueue(TriggerParam triggerParam) {
if (triggerLogIdSet.contains(requestModel.getLogId())) { if (triggerLogIdSet.contains(triggerParam.getLogId())) {
logger.debug("repeate trigger job, logId:{}", requestModel.getLogId()); logger.debug("repeate trigger job, logId:{}", triggerParam.getLogId());
return; return;
} }
triggerLogIdSet.add(requestModel.getLogId()); triggerLogIdSet.add(triggerParam.getLogId());
triggerQueue.add(requestModel); triggerQueue.add(triggerParam);
} }
public void toStop(String stopReason) { public void toStop(String stopReason) {
...@@ -64,46 +64,46 @@ public class JobThread extends Thread{ ...@@ -64,46 +64,46 @@ public class JobThread extends Thread{
while(!toStop){ while(!toStop){
try { try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
RequestModel triggerDate = triggerQueue.poll(3L, TimeUnit.SECONDS); TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerDate!=null) { if (triggerParam!=null) {
triggerLogIdSet.remove(triggerDate.getLogId()); triggerLogIdSet.remove(triggerParam.getLogId());
// parse param // parse param
String[] handlerParams = (triggerDate.getExecutorParams()!=null && triggerDate.getExecutorParams().trim().length()>0) String[] handlerParams = (triggerParam.getExecutorParams()!=null && triggerParam.getExecutorParams().trim().length()>0)
? (String[])(Arrays.asList(triggerDate.getExecutorParams().split(",")).toArray()) : null; ? (String[])(Arrays.asList(triggerParam.getExecutorParams().split(",")).toArray()) : null;
// handle job // handle job
String _status = ResponseModel.SUCCESS; int _code = ReturnT.SUCCESS_CODE;
String _msg = null; String _msg = null;
try { try {
// log filename: yyyy-MM-dd/9999.log // log filename: yyyy-MM-dd/9999.log
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerDate.getLogDateTim()), triggerDate.getLogId()); String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
XxlJobFileAppender.contextHolder.set(logFileName); XxlJobFileAppender.contextHolder.set(logFileName);
logger.info("----------- xxl-job job execute start -----------"); logger.info("----------- xxl-job job execute start -----------");
handler.execute(handlerParams); handler.execute(handlerParams);
} catch (Exception e) { } catch (Exception e) {
logger.error("JobThread Exception:", e); logger.error("JobThread Exception:", e);
_status = ResponseModel.FAIL; _code = ReturnT.FAIL_CODE;
StringWriter out = new StringWriter(); StringWriter out = new StringWriter();
e.printStackTrace(new PrintWriter(out)); e.printStackTrace(new PrintWriter(out));
_msg = out.toString(); _msg = out.toString();
} }
logger.info("----------- xxl-job job execute end ----------- <br> Look : ExecutorParams:{}, Status:{}, Msg:{}", logger.info("----------- xxl-job job execute end ----------- <br> Look : ExecutorParams:{}, Code:{}, Msg:{}",
new Object[]{handlerParams, _status, _msg}); new Object[]{handlerParams, _code, _msg});
// callback handler info // callback handler info
if (!toStop) { if (!toStop) {
// commonm // commonm
triggerDate.setStatus(_status); triggerParam.setStatus(_code+"");
triggerDate.setMsg(_msg); triggerParam.setMsg(_msg);
TriggerCallbackThread.pushCallBack(triggerDate); TriggerCallbackThread.pushCallBack(triggerParam);
} else { } else {
// is killed // is killed
triggerDate.setStatus(ResponseModel.FAIL); triggerParam.setStatus(ReturnT.FAIL_CODE+"");
triggerDate.setMsg(stopReason + " [业务运行中,被强制终止]"); triggerParam.setMsg(stopReason + " [业务运行中,被强制终止]");
TriggerCallbackThread.pushCallBack(triggerDate); TriggerCallbackThread.pushCallBack(triggerParam);
} }
} }
} catch (Exception e) { } catch (Exception e) {
...@@ -113,12 +113,12 @@ public class JobThread extends Thread{ ...@@ -113,12 +113,12 @@ public class JobThread extends Thread{
// callback trigger request in queue // callback trigger request in queue
while(triggerQueue !=null && triggerQueue.size()>0){ while(triggerQueue !=null && triggerQueue.size()>0){
RequestModel triggerDate = triggerQueue.poll(); TriggerParam triggerParam = triggerQueue.poll();
if (triggerDate!=null) { if (triggerParam!=null) {
// is killed // is killed
triggerDate.setStatus(ResponseModel.FAIL); triggerParam.setStatus(ReturnT.FAIL_CODE+"");
triggerDate.setMsg(stopReason + " [任务尚未执行,在调度队列中被终止]"); triggerParam.setMsg(stopReason + " [任务尚未执行,在调度队列中被终止]");
TriggerCallbackThread.pushCallBack(triggerDate); TriggerCallbackThread.pushCallBack(triggerParam);
} }
} }
......
package com.xxl.job.core.router.thread; package com.xxl.job.core.thread;
import com.xxl.job.core.router.model.RequestModel; import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.router.model.ResponseModel; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.util.XxlJobNetCommUtil; import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -14,20 +15,24 @@ import java.util.concurrent.LinkedBlockingQueue; ...@@ -14,20 +15,24 @@ import java.util.concurrent.LinkedBlockingQueue;
public class TriggerCallbackThread { public class TriggerCallbackThread {
private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class); private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class);
private static LinkedBlockingQueue<RequestModel> callBackQueue = new LinkedBlockingQueue<RequestModel>(); private static LinkedBlockingQueue<TriggerParam> callBackQueue = new LinkedBlockingQueue<TriggerParam>();
static { static {
new Thread(new Runnable() { new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
while(true){ while(true){
try { try {
RequestModel callback = callBackQueue.take(); TriggerParam callback = callBackQueue.take();
if (callback != null) { if (callback != null) {
for (String address : callback.getLogAddress()) { for (String address : callback.getLogAddress()) {
try { try {
ResponseModel responseModel = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), callback);
logger.info(">>>>>>>>>>> xxl-job callback , RequestModel:{}, ResponseModel:{}", new Object[]{callback.toString(), responseModel.toString()}); // callback
if (ResponseModel.SUCCESS.equals(responseModel.getStatus())) { AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, address).getObject();
ReturnT<String> callbackResult = adminBiz.callback(callback);
logger.info(">>>>>>>>>>> xxl-job callback , CallbackParam:{}, callbackResult:{}", new Object[]{callback.toString(), callbackResult.toString()});
if (ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
break; break;
} }
} catch (Exception e) { } catch (Exception e) {
...@@ -42,7 +47,7 @@ public class TriggerCallbackThread { ...@@ -42,7 +47,7 @@ public class TriggerCallbackThread {
} }
}).start(); }).start();
} }
public static void pushCallBack(RequestModel callback){ public static void pushCallBack(TriggerParam callback){
callBackQueue.add(callback); callBackQueue.add(callback);
logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId()); logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());
} }
......
package com.xxl.job.core.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.math.BigInteger;
/**
* hex/byte util
* @author xuxueli 2015-11-14 22:47:28
*/
public class ByteHexConverter {
private static Logger logger = LoggerFactory.getLogger(ByteHexConverter.class);
/**
* byte - to - radix, use BigInteger
*/
private static final String hex_tables = "0123456789ABCDEF";
public static String byte2hex (byte[] iBytes) {
StringBuilder hex = new StringBuilder(iBytes.length * 2);
for (int index = 0; index < iBytes.length; index++) {
hex.append(hex_tables.charAt((iBytes[index] & 0xf0) >> 4));
hex.append(hex_tables.charAt((iBytes[index] & 0x0f) >> 0));
}
return hex.toString();
}
public static byte[] hex2Byte(String hexString) {
if (hexString == null || hexString.equals("")) {
return null;
}
byte[] res = new byte[hexString.length() / 2];
char[] chs = hexString.toCharArray();
for (int i = 0, c = 0; i < chs.length; i += 2, c++) {
res[c] = (byte) (Integer.parseInt(new String(chs, i, 2), 16));
}
return res;
}
/**
* byte - to - radix, use BigInteger
*/
public static final int HEX = 16;
public static String byte2radix(byte[] iBytes, int radix){
return new BigInteger(1, iBytes).toString(radix);
}
public static byte[] radix2byte(String val, int radix){
return new BigInteger(val, radix).toByteArray();
}
/**
* get length of string
* @param str
* @return len of string byte
*/
public static int getByteLen(String str){
if (str==null || str.length()==0) {
return 0;
}
// because java base on unicode, and one china code's length is one, but it's cost 2 bytes.
//int len = str.getBytes().length * 2;
int len = 0;
try {
len = str.getBytes("UTF-8").length;
} catch (UnsupportedEncodingException e) {
logger.error("", e);
len = str.getBytes().length * 2;
}
if (len % 4 != 0) {
// Length is best in multiples of four
len = (len/4 + 1) * 4;
}
return len;
}
public static void main(String[] args) {
// hex - byte[] 方案A:位移
String temp = "1111111111113d1f3a51sd3f1a32sd1f32as1df2a13sd21f3a2s1df32a13sd2f123s2a3d13fa13sd9999999999";
System.out.println("明文:" + new String(temp.getBytes()));
System.out.println("编码:" + byte2hex(temp.getBytes()));
System.out.println("解码:" + new String(hex2Byte(byte2hex(temp.getBytes()))));
// hex - byte[] 方案B:BigInteger
System.out.println("编码:" + byte2radix(temp.getBytes(), HEX));
System.out.println("解码:" + new String(radix2byte(byte2radix(temp.getBytes(), HEX), HEX)));
}
}
\ No newline at end of file
package com.xxl.job.core.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
/**
* byte read util
* @author xuxueli 2015-11-15 03:50:10
*/
public class ByteReadFactory {
private static transient Logger logger = LoggerFactory.getLogger(ByteReadFactory.class);
private int m_iPos;
private int m_iReqLen;
private byte[] m_byte = null;
public ByteReadFactory(byte[] hexBytes){
m_iPos = 0;
m_byte = hexBytes;
m_iReqLen = m_byte.length;
}
public int readInt() {
if (m_iPos + 4 > m_iReqLen) {
return 0;
}
int iInt = (m_byte[m_iPos] & 0xff)
| ((m_byte[m_iPos + 1] & 0xff) << 8)
| ((m_byte[m_iPos + 2] & 0xff) << 16)
| ((m_byte[m_iPos + 3] & 0xff) << 24);
m_iPos += 4;
return iInt;
}
public long readLong() {
if (m_iPos + 8 > m_iReqLen) {
return 0;
}
long iLong = (m_byte[m_iPos] & 0xff)
| ((m_byte[m_iPos + 1] & 0xff) << 8)
| ((m_byte[m_iPos + 2] & 0xff) << 16)
| ((m_byte[m_iPos + 3] & 0xff) << 24)
| ((m_byte[m_iPos + 4] & 0xff) << 32)
| ((m_byte[m_iPos + 5] & 0xff) << 40)
| ((m_byte[m_iPos + 6] & 0xff) << 48)
| ((m_byte[m_iPos + 7] & 0xff) << 56);
m_iPos += 8;
return iLong;
}
public String readString(int length) {
if (m_iPos + length > m_iReqLen) {
logger.error("[byte stream factory read string length error.]");
return "";
}
int index = 0;
for (index = 0; index < length; index++) {
if (m_byte[m_iPos + index] == 0) {
break;
}
}
String msg = "";
try {
msg = new String(m_byte, m_iPos, index, "UTF-8");
} catch (UnsupportedEncodingException e) {
logger.error("[byte stream factory read string exception.]", e);
}
m_iPos += length;
return msg;
}
public byte[] read(int length) {
if (m_iPos + length > m_iReqLen || length<=0) {
logger.error("[byte stream factory read string length error.]");
return null;
}
for (int i = 0; i < length; i++) {
if (m_byte[m_iPos + i] == 0) {
break;
}
}
byte[] result = new byte[length];
for (int i = 0; i < length; i++) {
result[i] = m_byte[m_iPos + i];
}
m_iPos += length;
return result;
}
public byte[] readByteAll() {
return read(m_iReqLen - m_iPos);
}
}
package com.xxl.job.core.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
/**
* byte write util
* @author xuxueli 2015-11-15 03:49:36
*/
public class ByteWriteFactory {
private static transient Logger logger = LoggerFactory.getLogger(ByteWriteFactory.class);
private ByteBuffer m_byteBuf = null;
public ByteWriteFactory() {
m_byteBuf = ByteBuffer.allocate(1024 * 4);
}
public ByteWriteFactory(int capacity) {
m_byteBuf = ByteBuffer.allocate(capacity);
}
public void writeInt(int intValue) {
byte[] intBytes = new byte[4];
for (int index = 0; index < 4; index++) {
intBytes[index] = (byte) (intValue >>> (index * 8));
}
m_byteBuf.put(intBytes);
}
public void write(int[] intArr) {
for (int index = 0; index < intArr.length; index++) {
writeInt(intArr[index]);
}
}
public void write(byte[] byteArr) {
m_byteBuf.put(byteArr);
}
public void writeString(String value, int length) {
byte[] bytes = new byte[length];
if (value != null && value.trim().length() > 0) {
try {
byte[] infoBytes = value.getBytes("UTF-8");
int len = infoBytes.length < length ? infoBytes.length : length;
System.arraycopy(infoBytes, 0, bytes, 0, len);
} catch (UnsupportedEncodingException e) {
logger.error("[response stream factory encoding exception.]", e);
}
}
m_byteBuf.put(bytes);
}
public byte[] getBytes() {
m_byteBuf.flip();
if (m_byteBuf.limit() == 0) {
return null;
}
byte[] bytes = new byte[m_byteBuf.limit()];
m_byteBuf.get(bytes);
return bytes;
}
}
package com.xxl.job.core.util;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.io.InputStream;
/**
* httpclient util
* @author xuxueli 2015-10-31 19:50:41
*/
public class HttpClientUtil {
/**
* post request
*/
public static byte[] postRequest(String reqURL, byte[] date) {
byte[] responseBytes = null;
HttpPost httpPost = new HttpPost(reqURL);
CloseableHttpClient httpClient = HttpClients.createDefault();
try {
// init post
/*if (params != null && !params.isEmpty()) {
List<NameValuePair> formParams = new ArrayList<NameValuePair>();
for (Map.Entry<String, String> entry : params.entrySet()) {
formParams.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
}
httpPost.setEntity(new UrlEncodedFormEntity(formParams, "UTF-8"));
}*/
if (date != null) {
httpPost.setEntity(new ByteArrayEntity(date, ContentType.DEFAULT_BINARY));
}
// do post
HttpResponse response = httpClient.execute(httpPost);
HttpEntity entity = response.getEntity();
if (null != entity) {
responseBytes = EntityUtils.toByteArray(entity);
EntityUtils.consume(entity);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
httpPost.releaseConnection();
try {
httpClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return responseBytes;
}
/**
* read bytes from http request
* @param request
* @return
* @throws IOException
*/
public static final byte[] readBytes(HttpServletRequest request) throws IOException {
request.setCharacterEncoding("UTF-8");
int contentLen = request.getContentLength();
InputStream is = request.getInputStream();
if (contentLen > 0) {
int readLen = 0;
int readLengthThisTime = 0;
byte[] message = new byte[contentLen];
try {
while (readLen != contentLen) {
readLengthThisTime = is.read(message, readLen, contentLen - readLen);
if (readLengthThisTime == -1) {
break;
}
readLen += readLengthThisTime;
}
return message;
} catch (IOException e) {
e.printStackTrace();
}
}
return new byte[] {};
}
}
package com.xxl.job.core.util;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* http util to send data
* @author xuxueli
* @version 2015-11-28 15:30:59
*/
public class XxlJobNetCommUtil {
private static Logger logger = LoggerFactory.getLogger(XxlJobNetCommUtil.class);
// hex param key
public static final String HEX = "hex";
/**
* format object to hex-json
* @param obj
* @return result
*/
public static String formatObj2HexJson(Object obj){
// obj to json
String json = JacksonUtil.writeValueAsString(obj);
int len = ByteHexConverter.getByteLen(json);
// json to byte[]
ByteWriteFactory byteWriteFactory = new ByteWriteFactory(4 + len);
byteWriteFactory.writeInt(len);
byteWriteFactory.writeString(json, len);
byte[] bytes = byteWriteFactory.getBytes();
// byte to hex
String hex = ByteHexConverter.byte2hex(bytes);
return hex;
}
/**
* parse hex-json to object
* @param hex
* @param clazz
* @return result
*/
public static <T> T parseHexJson2Obj(String hex, Class<T> clazz){
// hex to byte[]
byte[] bytes = ByteHexConverter.hex2Byte(hex);
// byte[] to json
ByteReadFactory byteReadFactory = new ByteReadFactory(bytes);
String json = byteReadFactory.readString(byteReadFactory.readInt());
// json to obj
T obj = JacksonUtil.readValue(json, clazz);
return obj;
}
public static void main(String[] args) {
RequestModel requestModel = new RequestModel();
requestModel.setJobGroup("group");
String hex = formatObj2HexJson(requestModel);
System.out.println(hex);
System.out.println(parseHexJson2Obj(hex, RequestModel.class));
}
/**
* http post request
* @param reqURL
*/
public static ResponseModel postHex(String reqURL, RequestModel requestModel){
// parse RequestModel to hex-json
String requestHex = XxlJobNetCommUtil.formatObj2HexJson(requestModel);
// msg
String failMsg = null;
// do post
HttpPost httpPost = null;
CloseableHttpClient httpClient = null;
try{
httpPost = new HttpPost(reqURL);
List<NameValuePair> formParams = new ArrayList<NameValuePair>();
formParams.add(new BasicNameValuePair(XxlJobNetCommUtil.HEX, requestHex));
httpPost.setEntity(new UrlEncodedFormEntity(formParams, "UTF-8"));
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(5000).setConnectTimeout(5000).build();
httpPost.setConfig(requestConfig);
//httpClient = HttpClients.createDefault(); // default retry 3 times
httpClient = HttpClients.custom().disableAutomaticRetries().build();
HttpResponse response = httpClient.execute(httpPost);
HttpEntity entity = response.getEntity();
if (response.getStatusLine().getStatusCode() == 200 && null != entity) {
String responseHex = EntityUtils.toString(entity, "UTF-8");
logger.debug("xxl-job, net comm success, requestHex:{}, responseHex:{}", requestHex, responseHex);
EntityUtils.consume(entity);
// i do not know why
//responseHex = responseHex.replace("\n", "");
//responseHex = responseHex.replace("\r", "");
if (responseHex!=null) {
responseHex = responseHex.trim();
}
// parse hex-json to ResponseModel
ResponseModel responseModel = XxlJobNetCommUtil.parseHexJson2Obj(responseHex, ResponseModel.class);
if (responseModel!=null) {
return responseModel;
}
} else {
failMsg = "http statusCode error, statusCode:" + response.getStatusLine().getStatusCode();
}
} catch (Exception e) {
logger.error("", e);
/*StringWriter out = new StringWriter();
e.printStackTrace(new PrintWriter(out));
callback.setMsg(out.toString());*/
failMsg = e.getMessage();
} finally{
if (httpPost!=null) {
httpPost.releaseConnection();
}
if (httpClient!=null) {
try {
httpClient.close();
} catch (IOException e) {
logger.error("", e);
}
}
}
// other, default fail
ResponseModel callback = new ResponseModel();
callback.setStatus(ResponseModel.FAIL);
callback.setMsg(failMsg);
return callback;
}
/**
* parse address ip:port to url http://.../
* @param address
* @return result
*/
public static String addressToUrl(String address){
return "http://" + address + "/";
}
}
...@@ -4,13 +4,13 @@ ...@@ -4,13 +4,13 @@
<parent> <parent>
<groupId>com.xuxueli</groupId> <groupId>com.xuxueli</groupId>
<artifactId>xxl-job</artifactId> <artifactId>xxl-job</artifactId>
<version>1.6.0</version> <version>1.6.0-SNAPSHOT</version>
</parent> </parent>
<artifactId>xxl-job-executor-example</artifactId> <artifactId>xxl-job-executor-example</artifactId>
<packaging>war</packaging> <packaging>war</packaging>
<properties> <properties>
<xxl-job.version>1.6.0</xxl-job.version> <xxl-job.version>1.6.0-SNAPSHOT</xxl-job.version>
<spring.version>3.2.17.RELEASE</spring.version> <spring.version>3.2.17.RELEASE</spring.version>
</properties> </properties>
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
<context:component-scan base-package="com.xxl.job.executor.service.jobhandler" /> <context:component-scan base-package="com.xxl.job.executor.service.jobhandler" />
<!-- 配置02、执行器 --> <!-- 配置02、执行器 -->
<bean id="xxlJobExecutor" class="com.xxl.job.core.executor.jetty.XxlJobExecutor" init-method="start" destroy-method="destroy" > <bean id="xxlJobExecutor" class="com.xxl.job.core.executor.XxlJobExecutor" init-method="start" destroy-method="destroy" >
<!-- 执行器IP[选填],为空则自动获取 --> <!-- 执行器IP[选填],为空则自动获取 -->
<!--<property name="ip" value="" />--> <!--<property name="ip" value="" />-->
<!-- 执行器端口号 --> <!-- 执行器端口号 -->
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论