提交 554429ea authored 作者: xueli.xue's avatar xueli.xue

重要版本升级:

1、新增“任务组”;
2、整合“远程调度”和“本地调度”;
...@@ -33,7 +33,9 @@ public class JobLogController { ...@@ -33,7 +33,9 @@ public class JobLogController {
public IXxlJobLogDao xxlJobLogDao; public IXxlJobLogDao xxlJobLogDao;
@RequestMapping @RequestMapping
public String index(Model model) { public String index(Model model, String jobGroup, String jobName) {
model.addAttribute("jobGroup", jobGroup);
model.addAttribute("jobName", jobName);
model.addAttribute("JobGroupList", JobGroupEnum.values()); model.addAttribute("JobGroupList", JobGroupEnum.values());
return "joblog/index"; return "joblog/index";
} }
...@@ -71,8 +73,8 @@ public class JobLogController { ...@@ -71,8 +73,8 @@ public class JobLogController {
@RequestMapping("/save") @RequestMapping("/save")
@ResponseBody @ResponseBody
public ReturnT<String> triggerLog(int triggerLogId, String status, String msg) { public ReturnT<String> triggerLog(int trigger_log_id, String status, String msg) {
XxlJobLog log = xxlJobLogDao.load(triggerLogId); XxlJobLog log = xxlJobLogDao.load(trigger_log_id);
if (log!=null) { if (log!=null) {
log.setHandleTime(new Date()); log.setHandleTime(new Date());
log.setHandleStatus(status); log.setHandleStatus(status);
......
...@@ -10,21 +10,22 @@ public class XxlJobInfo { ...@@ -10,21 +10,22 @@ public class XxlJobInfo {
private int id; private int id;
private String jobGroup; // base on quartz 任务组 private String jobGroup; // 任务组
private String jobName; // base on quartz 任务名 private String jobName; // 任务名
private String jobCron; // base on quartz 任务执行CRON表达式 private String jobCron; // 任务执行CRON表达式 【base on quartz】
private String jobClass; // base on quartz 任务执行JobBean private String jobDesc;
private String jobData; // base on db, Map-JSON-String 任务执行数据 private String jobClass; // 任务执行JobBean 【base on quartz】
private String jobData; // 任务执行数据 Map-JSON-String
private Date addTime; private Date addTime;
private Date updateTime; private Date updateTime;
private String author; // 作者 private String author; // 负责人
private String alarmEmail; // 报警邮件 private String alarmEmail; // 报警邮件
private int alarmThreshold; // 报警阀值 private int alarmThreshold; // 报警阀值
// copy from quartz // copy from quartz
private String jobStatus; // 任务状态 private String jobStatus; // 任务状态 【base on quartz】
public int getId() { public int getId() {
return id; return id;
...@@ -58,6 +59,14 @@ public class XxlJobInfo { ...@@ -58,6 +59,14 @@ public class XxlJobInfo {
this.jobCron = jobCron; this.jobCron = jobCron;
} }
public String getJobDesc() {
return jobDesc;
}
public void setJobDesc(String jobDesc) {
this.jobDesc = jobDesc;
}
public String getJobClass() { public String getJobClass() {
return jobClass; return jobClass;
} }
...@@ -125,9 +134,9 @@ public class XxlJobInfo { ...@@ -125,9 +134,9 @@ public class XxlJobInfo {
@Override @Override
public String toString() { public String toString() {
return "XxlJobInfo [id=" + id + ", jobGroup=" + jobGroup + ", jobName=" + jobName + ", jobCron=" + jobCron return "XxlJobInfo [id=" + id + ", jobGroup=" + jobGroup + ", jobName=" + jobName + ", jobCron=" + jobCron
+ ", jobClass=" + jobClass + ", jobData=" + jobData + ", addTime=" + addTime + ", updateTime=" + ", jobDesc=" + jobDesc + ", jobClass=" + jobClass + ", jobData=" + jobData + ", addTime=" + addTime
+ updateTime + ", author=" + author + ", alarmEmail=" + alarmEmail + ", alarmThreshold=" + ", updateTime=" + updateTime + ", author=" + author + ", alarmEmail=" + alarmEmail
+ alarmThreshold + ", jobStatus=" + jobStatus + "]"; + ", alarmThreshold=" + alarmThreshold + ", jobStatus=" + jobStatus + "]";
} }
} }
...@@ -14,6 +14,7 @@ public class XxlJobLog { ...@@ -14,6 +14,7 @@ public class XxlJobLog {
private String jobGroup; private String jobGroup;
private String jobName; private String jobName;
private String jobCron; private String jobCron;
private String jobDesc;
private String jobClass; private String jobClass;
private String jobData; private String jobData;
...@@ -26,7 +27,6 @@ public class XxlJobLog { ...@@ -26,7 +27,6 @@ public class XxlJobLog {
private Date handleTime; private Date handleTime;
private String handleStatus; private String handleStatus;
private String handleMsg; private String handleMsg;
public int getId() { public int getId() {
return id; return id;
} }
...@@ -51,6 +51,12 @@ public class XxlJobLog { ...@@ -51,6 +51,12 @@ public class XxlJobLog {
public void setJobCron(String jobCron) { public void setJobCron(String jobCron) {
this.jobCron = jobCron; this.jobCron = jobCron;
} }
public String getJobDesc() {
return jobDesc;
}
public void setJobDesc(String jobDesc) {
this.jobDesc = jobDesc;
}
public String getJobClass() { public String getJobClass() {
return jobClass; return jobClass;
} }
...@@ -103,9 +109,9 @@ public class XxlJobLog { ...@@ -103,9 +109,9 @@ public class XxlJobLog {
@Override @Override
public String toString() { public String toString() {
return "XxlJobLog [id=" + id + ", jobGroup=" + jobGroup + ", jobName=" + jobName + ", jobCron=" + jobCron return "XxlJobLog [id=" + id + ", jobGroup=" + jobGroup + ", jobName=" + jobName + ", jobCron=" + jobCron
+ ", jobClass=" + jobClass + ", jobData=" + jobData + ", triggerTime=" + triggerTime + ", jobDesc=" + jobDesc + ", jobClass=" + jobClass + ", jobData=" + jobData + ", triggerTime="
+ ", triggerStatus=" + triggerStatus + ", triggerMsg=" + triggerMsg + ", handleTime=" + handleTime + triggerTime + ", triggerStatus=" + triggerStatus + ", triggerMsg=" + triggerMsg + ", handleTime="
+ ", handleStatus=" + handleStatus + ", handleMsg=" + handleMsg + "]"; + handleTime + ", handleStatus=" + handleStatus + ", handleMsg=" + handleMsg + "]";
} }
} }
...@@ -3,6 +3,7 @@ package com.xxl.job.core.util; ...@@ -3,6 +3,7 @@ package com.xxl.job.core.util;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
...@@ -29,6 +30,7 @@ import org.slf4j.LoggerFactory; ...@@ -29,6 +30,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import com.xxl.job.client.util.JacksonUtil;
import com.xxl.job.core.model.XxlJobInfo; import com.xxl.job.core.model.XxlJobInfo;
import com.xxl.job.dao.IXxlJobInfoDao; import com.xxl.job.dao.IXxlJobInfoDao;
import com.xxl.job.dao.IXxlJobLogDao; import com.xxl.job.dao.IXxlJobLogDao;
...@@ -124,29 +126,42 @@ public final class DynamicSchedulerUtil implements InitializingBean { ...@@ -124,29 +126,42 @@ public final class DynamicSchedulerUtil implements InitializingBean {
e.printStackTrace(); e.printStackTrace();
} }
} }
// check if exists
public static boolean checkExists(String jobName, String jobGroup) throws SchedulerException{
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
return scheduler.checkExists(triggerKey);
}
// addJob 新增 // addJob 新增
public static boolean addJob(String triggerKeyName, String cronExpression, Class<? extends Job> jobClass, Map<String, Object> jobData) throws SchedulerException { @SuppressWarnings("unchecked")
public static boolean addJob(XxlJobInfo jobInfo) throws SchedulerException {
// TriggerKey : name + group // TriggerKey : name + group
String group = Scheduler.DEFAULT_GROUP; TriggerKey triggerKey = TriggerKey.triggerKey(jobInfo.getJobName(), jobInfo.getJobGroup());
TriggerKey triggerKey = TriggerKey.triggerKey(triggerKeyName, group); JobKey jobKey = new JobKey(jobInfo.getJobName(), jobInfo.getJobGroup());
// TriggerKey valid if_exists // TriggerKey valid if_exists
if (scheduler.checkExists(triggerKey)) { if (checkExists(jobInfo.getJobName(), jobInfo.getJobGroup())) {
Trigger trigger = scheduler.getTrigger(triggerKey); logger.info(">>>>>>>>> addJob fail, job already exist, jobInfo:{}", jobInfo);
logger.info(">>>>>>>>> Already exist trigger [" + trigger + "] by key [" + triggerKey + "] in Scheduler");
return false; return false;
} }
// CronTrigger : TriggerKey + cronExpression // withMisfireHandlingInstructionDoNothing 忽略掉调度终止过程中忽略的调度 // CronTrigger : TriggerKey + cronExpression // withMisfireHandlingInstructionDoNothing 忽略掉调度终止过程中忽略的调度
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing(); CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(jobInfo.getJobCron()).withMisfireHandlingInstructionDoNothing();
CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build(); CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();
// JobDetail : jobClass // JobDetail : jobClass
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(triggerKeyName, group).build(); Class<? extends Job> jobClass_ = null;
if (jobData!=null && jobData.size() > 0) { try {
jobClass_ = (Class<? extends Job>)Class.forName(jobInfo.getJobClass());
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
JobDetail jobDetail = JobBuilder.newJob(jobClass_).withIdentity(jobKey).build();
if (jobInfo.getJobData()!=null) {
JobDataMap jobDataMap = jobDetail.getJobDataMap(); JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap.putAll(jobData); // JobExecutionContext context.getMergedJobDataMap().get("mailGuid"); jobDataMap.putAll(JacksonUtil.readValue(jobInfo.getJobData(), Map.class)); // JobExecutionContext context.getMergedJobDataMap().get("mailGuid");
} }
// schedule : jobDetail + cronTrigger // schedule : jobDetail + cronTrigger
...@@ -156,49 +171,60 @@ public final class DynamicSchedulerUtil implements InitializingBean { ...@@ -156,49 +171,60 @@ public final class DynamicSchedulerUtil implements InitializingBean {
return true; return true;
} }
// reschedule 重置cron // reschedule
public static boolean rescheduleJob(String triggerKeyName, String cronExpression) throws SchedulerException { @SuppressWarnings("unchecked")
public static boolean rescheduleJob(XxlJobInfo jobInfo) throws SchedulerException {
// TriggerKey valid if_exists
if (!checkExists(jobInfo.getJobName(), jobInfo.getJobGroup())) {
logger.info(">>>>>>>>>>> rescheduleJob fail, job not exists, jobInfo:{}", jobInfo);
return false;
}
// TriggerKey : name + group // TriggerKey : name + group
String group = Scheduler.DEFAULT_GROUP; TriggerKey triggerKey = TriggerKey.triggerKey(jobInfo.getJobName(), jobInfo.getJobGroup());
TriggerKey triggerKey = TriggerKey.triggerKey(triggerKeyName, group); JobKey jobKey = new JobKey(jobInfo.getJobName(), jobInfo.getJobGroup());
boolean result = false; // CronTrigger : TriggerKey + cronExpression
if (scheduler.checkExists(triggerKey)) { CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(jobInfo.getJobCron()).withMisfireHandlingInstructionDoNothing();
// CronTrigger : TriggerKey + cronExpression CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing();
CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build(); //scheduler.rescheduleJob(triggerKey, cronTrigger);
Date date = scheduler.rescheduleJob(triggerKey, cronTrigger); // JobDetail-JobDataMap fresh
result = true; JobDetail jobDetail = scheduler.getJobDetail(jobKey);
logger.info(">>>>>>>>>>> resumeJob success, triggerKey:{}, cronExpression:{}, date:{}", triggerKey, cronExpression, date); JobDataMap jobDataMap = jobDetail.getJobDataMap();
} else { jobDataMap.clear();
logger.info(">>>>>>>>>>> resumeJob fail, triggerKey:{}, cronExpression:{}", triggerKey, cronExpression); jobDataMap.putAll(JacksonUtil.readValue(jobInfo.getJobData(), Map.class));
}
return result; // Trigger fresh
HashSet<Trigger> triggerSet = new HashSet<Trigger>();
triggerSet.add(cronTrigger);
scheduler.scheduleJob(jobDetail, triggerSet, true);
logger.info(">>>>>>>>>>> resumeJob success, jobInfo:{}", jobInfo);
return true;
} }
// unscheduleJob 删除 // unscheduleJob
public static boolean removeJob(String triggerKeyName) throws SchedulerException { public static boolean removeJob(String jobName, String jobGroup) throws SchedulerException {
// TriggerKey : name + group // TriggerKey : name + group
String group = Scheduler.DEFAULT_GROUP; TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
TriggerKey triggerKey = TriggerKey.triggerKey(triggerKeyName, group);
boolean result = false; boolean result = false;
if (scheduler.checkExists(triggerKey)) { if (checkExists(jobName, jobGroup)) {
result = scheduler.unscheduleJob(triggerKey); result = scheduler.unscheduleJob(triggerKey);
logger.info(">>>>>>>>>>> removeJob, triggerKey:{}, result [{}]", triggerKey, result);
} }
logger.info(">>>>>>>>>>> removeJob, triggerKey:{}, result [{}]", triggerKey, result); return true;
return result;
} }
// Pause 暂停 // Pause
public static boolean pauseJob(String triggerKeyName) throws SchedulerException { public static boolean pauseJob(String jobName, String jobGroup) throws SchedulerException {
// TriggerKey : name + group // TriggerKey : name + group
String group = Scheduler.DEFAULT_GROUP; TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
TriggerKey triggerKey = TriggerKey.triggerKey(triggerKeyName, group);
boolean result = false; boolean result = false;
if (scheduler.checkExists(triggerKey)) { if (checkExists(jobName, jobGroup)) {
scheduler.pauseTrigger(triggerKey); scheduler.pauseTrigger(triggerKey);
result = true; result = true;
logger.info(">>>>>>>>>>> pauseJob success, triggerKey:{}", triggerKey); logger.info(">>>>>>>>>>> pauseJob success, triggerKey:{}", triggerKey);
...@@ -208,14 +234,13 @@ public final class DynamicSchedulerUtil implements InitializingBean { ...@@ -208,14 +234,13 @@ public final class DynamicSchedulerUtil implements InitializingBean {
return result; return result;
} }
// resume 重启 // resume
public static boolean resumeJob(String triggerKeyName) throws SchedulerException { public static boolean resumeJob(String jobName, String jobGroup) throws SchedulerException {
// TriggerKey : name + group // TriggerKey : name + group
String group = Scheduler.DEFAULT_GROUP; TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
TriggerKey triggerKey = TriggerKey.triggerKey(triggerKeyName, group);
boolean result = false; boolean result = false;
if (scheduler.checkExists(triggerKey)) { if (checkExists(jobName, jobGroup)) {
scheduler.resumeTrigger(triggerKey); scheduler.resumeTrigger(triggerKey);
result = true; result = true;
logger.info(">>>>>>>>>>> resumeJob success, triggerKey:{}", triggerKey); logger.info(">>>>>>>>>>> resumeJob success, triggerKey:{}", triggerKey);
...@@ -225,14 +250,13 @@ public final class DynamicSchedulerUtil implements InitializingBean { ...@@ -225,14 +250,13 @@ public final class DynamicSchedulerUtil implements InitializingBean {
return result; return result;
} }
// run 执行一次 // run
public static boolean triggerJob(String triggerKeyName) throws SchedulerException { public static boolean triggerJob(String jobName, String jobGroup) throws SchedulerException {
// TriggerKey : name + group // TriggerKey : name + group
String group = Scheduler.DEFAULT_GROUP; JobKey jobKey = new JobKey(jobName, jobGroup);
JobKey jobKey = JobKey.jobKey(triggerKeyName, group);
boolean result = false; boolean result = false;
if (scheduler.checkExists(jobKey)) { if (checkExists(jobName, jobGroup)) {
scheduler.triggerJob(jobKey); scheduler.triggerJob(jobKey);
result = true; result = true;
logger.info(">>>>>>>>>>> runJob success, jobKey:{}", jobKey); logger.info(">>>>>>>>>>> runJob success, jobKey:{}", jobKey);
......
package com.xxl.job.dao; package com.xxl.job.dao;
import java.util.Date;
import java.util.List; import java.util.List;
import com.xxl.job.core.model.XxlJobInfo; import com.xxl.job.core.model.XxlJobInfo;
...@@ -11,15 +10,15 @@ import com.xxl.job.core.model.XxlJobInfo; ...@@ -11,15 +10,15 @@ import com.xxl.job.core.model.XxlJobInfo;
*/ */
public interface IXxlJobInfoDao { public interface IXxlJobInfoDao {
public List<XxlJobInfo> pageList(int offset, int pagesize, String jobName, Date addTimeStart, Date addTimeEnd); public List<XxlJobInfo> pageList(int offset, int pagesize, String jobGroup, String jobName);
public int pageListCount(int offset, int pagesize, String jobName, Date addTimeStart, Date addTimeEnd); public int pageListCount(int offset, int pagesize, String jobGroup, String jobName);
public int save(XxlJobInfo info); public int save(XxlJobInfo info);
public XxlJobInfo load(String jobName); public XxlJobInfo load(String jobGroup, String jobName);
public int update(XxlJobInfo item); public int update(XxlJobInfo item);
public int delete(String jobName); public int delete(String jobGroup, String jobName);
} }
package com.xxl.job.dao.impl; package com.xxl.job.dao.impl;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
...@@ -23,25 +22,23 @@ public class XxlJobInfoDaoImpl implements IXxlJobInfoDao { ...@@ -23,25 +22,23 @@ public class XxlJobInfoDaoImpl implements IXxlJobInfoDao {
public SqlSessionTemplate sqlSessionTemplate; public SqlSessionTemplate sqlSessionTemplate;
@Override @Override
public List<XxlJobInfo> pageList(int offset, int pagesize, String jobName, Date addTimeStart, Date addTimeEnd) { public List<XxlJobInfo> pageList(int offset, int pagesize, String jobGroup, String jobName) {
HashMap<String, Object> params = new HashMap<String, Object>(); HashMap<String, Object> params = new HashMap<String, Object>();
params.put("offset", offset); params.put("offset", offset);
params.put("pagesize", pagesize); params.put("pagesize", pagesize);
params.put("jobGroup", jobGroup);
params.put("jobName", jobName); params.put("jobName", jobName);
params.put("addTimeStart", addTimeStart);
params.put("addTimeEnd", addTimeEnd);
return sqlSessionTemplate.selectList("XxlJobInfoMapper.pageList", params); return sqlSessionTemplate.selectList("XxlJobInfoMapper.pageList", params);
} }
@Override @Override
public int pageListCount(int offset, int pagesize, String jobName, Date addTimeStart, Date addTimeEnd) { public int pageListCount(int offset, int pagesize, String jobGroup, String jobName) {
HashMap<String, Object> params = new HashMap<String, Object>(); HashMap<String, Object> params = new HashMap<String, Object>();
params.put("offset", offset); params.put("offset", offset);
params.put("pagesize", pagesize); params.put("pagesize", pagesize);
params.put("jobGroup", jobGroup);
params.put("jobName", jobName); params.put("jobName", jobName);
params.put("addTimeStart", addTimeStart);
params.put("addTimeEnd", addTimeEnd);
return sqlSessionTemplate.selectOne("XxlJobInfoMapper.pageListCount", params); return sqlSessionTemplate.selectOne("XxlJobInfoMapper.pageListCount", params);
} }
...@@ -52,8 +49,12 @@ public class XxlJobInfoDaoImpl implements IXxlJobInfoDao { ...@@ -52,8 +49,12 @@ public class XxlJobInfoDaoImpl implements IXxlJobInfoDao {
} }
@Override @Override
public XxlJobInfo load(String jobName) { public XxlJobInfo load(String jobGroup, String jobName) {
return sqlSessionTemplate.selectOne("XxlJobInfoMapper.load", jobName); HashMap<String, Object> params = new HashMap<String, Object>();
params.put("jobGroup", jobGroup);
params.put("jobName", jobName);
return sqlSessionTemplate.selectOne("XxlJobInfoMapper.load", params);
} }
@Override @Override
...@@ -62,8 +63,12 @@ public class XxlJobInfoDaoImpl implements IXxlJobInfoDao { ...@@ -62,8 +63,12 @@ public class XxlJobInfoDaoImpl implements IXxlJobInfoDao {
} }
@Override @Override
public int delete(String jobName) { public int delete(String jobGroup, String jobName) {
return sqlSessionTemplate.update("XxlJobInfoMapper.delete", jobName); HashMap<String, Object> params = new HashMap<String, Object>();
params.put("jobGroup", jobGroup);
params.put("jobName", jobName);
return sqlSessionTemplate.update("XxlJobInfoMapper.delete", params);
} }
} }
...@@ -5,9 +5,10 @@ import java.util.HashMap; ...@@ -5,9 +5,10 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException; import org.quartz.JobExecutionException;
import org.quartz.impl.triggers.CronTriggerImpl; import org.quartz.JobKey;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.scheduling.quartz.QuartzJobBean; import org.springframework.scheduling.quartz.QuartzJobBean;
...@@ -22,44 +23,43 @@ import com.xxl.job.core.util.PropertiesUtil; ...@@ -22,44 +23,43 @@ import com.xxl.job.core.util.PropertiesUtil;
/** /**
* http job bean * http job bean
* “@DisallowConcurrentExecution” diable concurrent, thread size can not be only one, better given more
* @author xuxueli 2015-12-17 18:20:34 * @author xuxueli 2015-12-17 18:20:34
*/ */
@DisallowConcurrentExecution
public class HttpJobBean extends QuartzJobBean { public class HttpJobBean extends QuartzJobBean {
private static Logger logger = LoggerFactory.getLogger(HttpJobBean.class); private static Logger logger = LoggerFactory.getLogger(HttpJobBean.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
protected void executeInternal(JobExecutionContext context) protected void executeInternal(JobExecutionContext context)
throws JobExecutionException { throws JobExecutionException {
String triggerKey = context.getTrigger().getJobKey().getName(); JobKey jobKey = context.getTrigger().getJobKey();
// jobDataMap 2 params
Map<String, String> params = new HashMap<String, String>();
XxlJobInfo jobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(triggerKey);
if (jobInfo!=null && jobInfo.getJobData()!=null) {
params = JacksonUtil.readValue(jobInfo.getJobData(), Map.class);
}
// corn
String cornExp = null;
if (context.getTrigger() instanceof CronTriggerImpl) {
CronTriggerImpl trigger = (CronTriggerImpl) context.getTrigger();
cornExp = trigger.getCronExpression();
}
XxlJobInfo jobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(jobKey.getGroup(), jobKey.getName());
HashMap<String, String> jobDataMap = (HashMap<String, String>) JacksonUtil.readValueRefer(jobInfo.getJobData(), Map.class);
// save log // save log
XxlJobLog jobLog = new XxlJobLog(); XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobName(triggerKey); jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobCron(cornExp); jobLog.setJobName(jobInfo.getJobName());
jobLog.setJobCron(jobInfo.getJobCron());
jobLog.setJobDesc(jobInfo.getJobDesc());
jobLog.setJobClass(jobInfo.getJobClass());
jobLog.setJobData(jobInfo.getJobData());
jobLog.setJobClass(HttpJobBean.class.getName()); jobLog.setJobClass(HttpJobBean.class.getName());
jobLog.setJobData(jobInfo.getJobData()); jobLog.setJobData(jobInfo.getJobData());
DynamicSchedulerUtil.xxlJobLogDao.save(jobLog); DynamicSchedulerUtil.xxlJobLogDao.save(jobLog);
logger.info(">>>>>>>>>>> xxl-job trigger start, jobLog:{}", jobLog); logger.info(">>>>>>>>>>> xxl-job trigger start, jobLog:{}", jobLog);
// trigger request // trigger request
params.put(HandlerRepository.triggerLogId, String.valueOf(jobLog.getId())); HashMap<String, String> params = new HashMap<String, String>();
params.put(HandlerRepository.triggerLogUrl, PropertiesUtil.getString(HandlerRepository.triggerLogUrl)); params.put(HandlerRepository.TRIGGER_LOG_URL, PropertiesUtil.getString(HandlerRepository.TRIGGER_LOG_URL));
String[] postResp = HttpUtil.post(params.get(HandlerRepository.job_url), params); params.put(HandlerRepository.TRIGGER_LOG_ID, String.valueOf(jobLog.getId()));
params.put(HandlerRepository.HANDLER_NAME, jobDataMap.get(HandlerRepository.HANDLER_NAME));
params.put(HandlerRepository.HANDLER_PARAMS, jobDataMap.get(HandlerRepository.HANDLER_PARAMS));
String[] postResp = HttpUtil.post(jobDataMap.get(HandlerRepository.HANDLER_ADDRESS), params);
logger.info(">>>>>>>>>>> xxl-job trigger http response, jobLog.id:{}, jobLog:{}", jobLog.getId(), jobLog); logger.info(">>>>>>>>>>> xxl-job trigger http response, jobLog.id:{}, jobLog:{}", jobLog.getId(), jobLog);
// parse trigger response // parse trigger response
......
...@@ -6,37 +6,45 @@ ...@@ -6,37 +6,45 @@
<resultMap id="XxlJobInfo" type="com.xxl.job.core.model.XxlJobInfo" > <resultMap id="XxlJobInfo" type="com.xxl.job.core.model.XxlJobInfo" >
<result column="id" property="id" /> <result column="id" property="id" />
<result column="job_group" property="jobGroup" />
<result column="job_name" property="jobName" /> <result column="job_name" property="jobName" />
<result column="job_cron" property="jobCron" /> <result column="job_cron" property="jobCron" />
<result column="job_desc" property="jobDesc" />
<result column="job_class" property="jobClass" /> <result column="job_class" property="jobClass" />
<result column="job_data" property="jobData" /> <result column="job_data" property="jobData" />
<result column="add_time" property="addTime" /> <result column="add_time" property="addTime" />
<result column="update_time" property="updateTime" /> <result column="update_time" property="updateTime" />
<result column="author" property="author" />
<result column="alarm_email" property="alarmEmail" />
<result column="alarm_threshold" property="alarmThreshold" />
</resultMap> </resultMap>
<sql id="Base_Column_List"> <sql id="Base_Column_List">
t.id, t.id,
t.job_group,
t.job_name, t.job_name,
t.job_cron, t.job_cron,
t.job_desc,
t.job_class, t.job_class,
t.job_data, t.job_data,
t.add_time, t.add_time,
t.update_time t.update_time,
t.author,
t.alarm_email,
t.alarm_threshold
</sql> </sql>
<select id="pageList" parameterType="java.util.HashMap" resultMap="XxlJobInfo"> <select id="pageList" parameterType="java.util.HashMap" resultMap="XxlJobInfo">
SELECT <include refid="Base_Column_List" /> SELECT <include refid="Base_Column_List" />
FROM xxl_job_qrtz_trigger_info AS t FROM xxl_job_qrtz_trigger_info AS t
<trim prefix="WHERE" prefixOverrides="AND | OR" > <trim prefix="WHERE" prefixOverrides="AND | OR" >
<if test="jobName != null and jobName!=''"> <if test="jobGroup != null and jobGroup != ''">
AND t.job_name = #{jobName} AND t.job_group = #{jobGroup}
</if>
<if test="addTimeStart != null">
AND t.add_time <![CDATA[ > ]]> #{addTimeStart}
</if> </if>
<if test="addTimeEnd != null"> <if test="jobName != null and jobName != ''">
AND t.add_time <![CDATA[ < ]]> #{addTimeEnd} AND t.job_name like CONCAT(CONCAT('%', #{jobName}), '%')
</if> </if>
</trim> </trim>
ORDER BY id DESC ORDER BY id DESC
...@@ -47,56 +55,74 @@ ...@@ -47,56 +55,74 @@
SELECT count(1) SELECT count(1)
FROM xxl_job_qrtz_trigger_info AS t FROM xxl_job_qrtz_trigger_info AS t
<trim prefix="WHERE" prefixOverrides="AND | OR" > <trim prefix="WHERE" prefixOverrides="AND | OR" >
<if test="jobName != null and jobName!=''"> <if test="jobGroup != null and jobGroup != ''">
AND t.job_name = #{jobName} AND t.job_group = #{jobGroup}
</if> </if>
<if test="addTimeStart != null"> <if test="jobName != null and jobName != ''">
AND t.add_time <![CDATA[ > ]]> #{addTimeStart} AND t.job_name like CONCAT(CONCAT('%', #{jobName}), '%')
</if>
<if test="addTimeEnd != null">
AND t.add_time <![CDATA[ < ]]> #{addTimeEnd}
</if> </if>
</trim> </trim>
</select> </select>
<insert id="save" parameterType="com.xxl.job.core.model.XxlJobInfo" useGeneratedKeys="true" keyProperty="id" > <insert id="save" parameterType="com.xxl.job.core.model.XxlJobInfo" useGeneratedKeys="true" keyProperty="id" >
INSERT INTO `xxl_job_qrtz_trigger_info` ( INSERT INTO `xxl_job_qrtz_trigger_info` (
`job_name`, job_group,
`job_cron`, job_name,
`job_class`, job_cron,
`job_data`, job_desc,
`add_time`, job_class,
`update_time` job_data,
add_time,
update_time,
author,
alarm_email,
alarm_threshold
) VALUES ( ) VALUES (
#{jobGroup},
#{jobName}, #{jobName},
#{jobCron}, #{jobCron},
#{jobDesc},
#{jobClass}, #{jobClass},
#{jobData}, #{jobData},
NOW(), NOW(),
NOW() NOW(),
#{author},
#{alarmEmail},
#{alarmThreshold}
); );
<selectKey resultType="java.lang.Integer" order="AFTER" keyProperty="id"> <selectKey resultType="java.lang.Integer" order="AFTER" keyProperty="id">
SELECT LAST_INSERT_ID() SELECT LAST_INSERT_ID()
</selectKey> </selectKey>
</insert> </insert>
<select id="load" parameterType="java.lang.String" resultMap="XxlJobInfo"> <select id="load" parameterType="java.util.HashMap" resultMap="XxlJobInfo">
SELECT <include refid="Base_Column_List" /> SELECT <include refid="Base_Column_List" />
FROM xxl_job_qrtz_trigger_info AS t FROM xxl_job_qrtz_trigger_info AS t
WHERE t.job_name = #{jobName} WHERE t.job_group = #{jobGroup}
AND t.job_name = #{jobName}
</select> </select>
<update id="update"> <update id="update" parameterType="com.xxl.job.core.model.XxlJobInfo" >
UPDATE `xxl_job_qrtz_trigger_info` UPDATE `xxl_job_qrtz_trigger_info`
SET `job_cron`= #{jobCron}, SET
`job_data`= #{jobData}, job_cron = #{jobCron},
`update_time`= NOW() job_desc = #{jobDesc},
WHERE `id`= #{id} job_data = #{jobData},
update_time = NOW(),
author = #{author},
alarm_email = #{alarmEmail},
alarm_threshold = #{alarmThreshold}
WHERE job_group = #{jobGroup}
AND job_name = #{jobName}
</update> </update>
<delete id="delete" parameterType="java.lang.String"> <delete id="delete" parameterType="java.lang.String">
delete from xxl_job_qrtz_trigger_info DELETE
where job_name = #{jobName} FROM
xxl_job_qrtz_trigger_info
WHERE
job_group = #{jobGroup}
AND job_name = #{jobName}
</delete> </delete>
</mapper> </mapper>
\ No newline at end of file
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
<result column="job_group" property="jobGroup" /> <result column="job_group" property="jobGroup" />
<result column="job_name" property="jobName" /> <result column="job_name" property="jobName" />
<result column="job_cron" property="jobCron" /> <result column="job_cron" property="jobCron" />
<result column="job_desc" property="jobDesc" />
<result column="job_class" property="jobClass" /> <result column="job_class" property="jobClass" />
<result column="job_data" property="jobData" /> <result column="job_data" property="jobData" />
...@@ -26,8 +27,9 @@ ...@@ -26,8 +27,9 @@
t.job_group, t.job_group,
t.job_name, t.job_name,
t.job_cron, t.job_cron,
t.job_desc,
t.job_class, t.job_class,
t.job_data, t.job_desc,
t.trigger_time, t.trigger_time,
t.trigger_status, t.trigger_status,
t.trigger_msg, t.trigger_msg,
...@@ -94,12 +96,14 @@ ...@@ -94,12 +96,14 @@
`job_group`, `job_group`,
`job_name`, `job_name`,
`job_cron`, `job_cron`,
`job_desc`,
`job_class`, `job_class`,
`job_data` `job_data`
) VALUES ( ) VALUES (
#{jobGroup}, #{jobGroup},
#{jobName}, #{jobName},
#{jobCron}, #{jobCron},
#{jobDesc},
#{jobClass}, #{jobClass},
#{jobData} #{jobData}
); );
......
...@@ -42,9 +42,7 @@ ...@@ -42,9 +42,7 @@
</div> </div>
<div class="col-xs-4"> <div class="col-xs-4">
<div class="input-group"> <div class="input-group">
<span class="input-group-addon"> <span class="input-group-addon">任务名</span>
jobName
</span>
<input type="text" class="form-control" id="jobName" value="${jobName}" autocomplete="on" > <input type="text" class="form-control" id="jobName" value="${jobName}" autocomplete="on" >
</div> </div>
</div> </div>
...@@ -66,63 +64,23 @@ ...@@ -66,63 +64,23 @@
<table id="job_list" class="table table-bordered table-striped"> <table id="job_list" class="table table-bordered table-striped">
<thead> <thead>
<tr> <tr>
<th>id</th> <th name="id" >id</th>
<th>任务Key</th> <th name="jobGroup" >任务组</th>
<th>任务Cron</th> <th name="jobName" >任务名</th>
<th>任务Class</th> <th name="jobCron" >Cron</th>
<th>状态Status</th> <th name="jobDesc" >描述</th>
<th>参数</th> <th name="jobClass" >JobBean</th>
<th>addTime</th> <th name="jobData" >任务数据</th>
<th>updateTime</th> <th name="addTime" >新增时间</th>
<th name="updateTime" >更新时间</th>
<th name="author" >负责人</th>
<th name="alarmEmail" >报警邮件</th>
<th name="alarmThreshold" >报警阀值</th>
<th name="jobStatus" >状态</th>
<th>操作</th> <th>操作</th>
</tr> </tr>
</thead> </thead>
<tbody> <tbody></tbody>
<#--
<#if jobList?exists && jobList?size gt 0>
<#list jobList as item>
<tr>
<td>${item['TriggerKey'].name}</td>
<td>${item['Trigger'].cronExpression}</td>
<td>${item['JobDetail'].jobClass}</td>
<td>
<#assign jobDataMap = item['JobDetail'].jobDataMap />
<#if jobDataMap?exists && jobDataMap?keys?size gt 0>
<#list jobDataMap?keys as key>
${key} = ${jobDataMap[key]} <br>
</#list>
</#if>
</td>
<td state="${item['TriggerState']}" >
<#if item['TriggerState'] == 'NORMAL'>
<button class="btn btn-block btn-success" type="button">运行ing</button>
<#elseif item['TriggerState'] == 'PAUSED'>
<button class="btn btn-block btn-warning" type="button">暂停ing</button>
<#else>
<button class="btn btn-block" type="button">${item['TriggerState']}</button>
</#if>
</td>
<td>
<p name="${item['TriggerKey'].name}" group="${item['TriggerKey'].group}"
cronExpression="${item['Trigger'].cronExpression}" jobClassName="${item['JobDetail'].jobClass}" jobDesc="${job_desc?if_exists}" >
<#if item['TriggerState'] == 'NORMAL'>
<button class="btn btn-info btn-xs job_operate" type="job_pause" type="button">暂停</button>
<#elseif item['TriggerState'] == 'PAUSED'>
<button class="btn btn-info btn-xs job_operate" type="job_resume" type="button">恢复</button>
</#if>
<button class="btn btn-info btn-xs job_operate" type="job_trigger" type="button">执行一次</button>
<button class="btn btn-info btn-xs update" type="button">更新corn</button>
<button class="btn btn-danger btn-xs job_operate" type="job_del" type="button">删除</button>
<button class="btn btn-warning btn-xs" type="job_del" type="button"
onclick="javascript:window.open('${request.contextPath}/joblog?jobName=${item['TriggerKey'].name}')" >查看日志</button>
</p>
</td>
</tr>
</#list>
</#if>
-->
</tbody>
<tfoot></tfoot> <tfoot></tfoot>
</table> </table>
</div> </div>
...@@ -155,29 +113,51 @@ ...@@ -155,29 +113,51 @@
</select> </select>
</div> </div>
<label for="firstname" class="col-sm-2 control-label">任务名</label> <label for="firstname" class="col-sm-2 control-label">任务名</label>
<div class="col-sm-4"><input type="text" class="form-control" name="triggerKeyName" placeholder="请输入任务Key" minlength="4" maxlength="100" ></div> <div class="col-sm-4"><input type="text" class="form-control" name="jobName" placeholder="请输入“任务名”" minlength="4" maxlength="100" ></div>
</div> </div>
<div class="form-group"> <div class="form-group">
<label for="lastname" class="col-sm-3 control-label">任务Corn</label> <label for="lastname" class="col-sm-2 control-label">Corn</label>
<div class="col-sm-9"><input type="text" class="form-control" name="cronExpression" placeholder="请输入任务Corn" maxlength="100" ></div> <div class="col-sm-4"><input type="text" class="form-control" name="jobCron" placeholder="请输入“Corn”" maxlength="100" ></div>
<label for="lastname" class="col-sm-2 control-label">描述</label>
<div class="col-sm-4"><input type="text" class="form-control" name="jobDesc" placeholder="请输入“描述”" maxlength="200" ></div>
</div> </div>
<div class="form-group"> <div class="form-group">
<label for="lastname" class="col-sm-3 control-label">任务描述</label> <label for="firstname" class="col-sm-2 control-label">JobBean</label>
<div class="col-sm-9"><input type="text" class="form-control" name="job_desc" placeholder="请输入任务描述" maxlength="200" ></div> <div class="col-sm-4">
<select class="form-control" name="jobClass" >
<#if remoteJobBean?exists >
<option value="${remoteJobBean.name}" jobClassType="remote" >【远程任务】</option>
</#if>
<#if localJobBeanList?exists && localJobBeanList?size gt 0 >
<#list localJobBeanList as localJobBean>
<option value="${localJobBean.name}" jobClassType="local" >${localJobBean.name}</option>
</#list>
</#if>
</select>
</div>
<label for="firstname" class="col-sm-2 control-label">执行参数</label>
<div class="col-sm-4"><input type="text" class="form-control" name="handler_params" placeholder="请输入“执行参数”" maxlength="100" ></div>
</div>
<div class="form-group remote_panel">
<label for="lastname" class="col-sm-2 control-label">远程-机器地址</label>
<div class="col-sm-4"><input type="text" class="form-control" name="handler_address" placeholder="请输入“远程-机器地址”" maxlength="200" ></div>
<label for="lastname" class="col-sm-2 control-label">远程-执行器</label>
<div class="col-sm-4"><input type="text" class="form-control" name="handler_name" placeholder="请输入“远程-执行器”" maxlength="200" ></div>
</div> </div>
<div class="form-group"> <div class="form-group">
<label for="lastname" class="col-sm-3 control-label">任务URL</label> <label for="lastname" class="col-sm-2 control-label">负责人</label>
<div class="col-sm-9"><input type="text" class="form-control" name="job_url" placeholder="请输入任务URL" maxlength="200" ></div> <div class="col-sm-4"><input type="text" class="form-control" name="author" placeholder="请输入“负责人”" maxlength="200" ></div>
<label for="lastname" class="col-sm-2 control-label">报警邮件</label>
<div class="col-sm-4"><input type="text" class="form-control" name="alarm_email" placeholder="请输入“报警邮件”,多个邮件地址逗号分隔" maxlength="200" ></div>
</div> </div>
<div class="form-group"> <div class="form-group">
<label for="lastname" class="col-sm-3 control-label">任务handler</label> <label for="lastname" class="col-sm-2 control-label">报警阈值</label>
<div class="col-sm-9"><input type="text" class="form-control" name="handleName" placeholder="请输入任务handler" maxlength="200" ></div> <div class="col-sm-4"><input type="text" class="form-control" name="alarm_threshold" placeholder="请输入“报警阈值”" maxlength="200" ></div>
</div> </div>
<div class="form-group"> <div class="form-group">
<div class="col-sm-offset-3 col-sm-9"> <div class="col-sm-offset-3 col-sm-9">
<button type="submit" class="btn btn-primary" >保存</button> <button type="submit" class="btn btn-primary" >保存</button>
<button type="button" class="btn btn-default" data-dismiss="modal">取消</button> <button type="button" class="btn btn-default" data-dismiss="modal">取消</button>
<button type="button" class="btn btn-info pull-right addParam">+ arg</button>
</div> </div>
</div> </div>
</form> </form>
......
...@@ -35,7 +35,7 @@ ...@@ -35,7 +35,7 @@
<span class="input-group-addon">任务组</span> <span class="input-group-addon">任务组</span>
<select class="form-control" id="jobGroup" > <select class="form-control" id="jobGroup" >
<#list JobGroupList as group> <#list JobGroupList as group>
<option value="${group}" <#if jobInfo?exists && group == jobInfo.jobGroup>selected</#if> >${group.desc}</option> <option value="${group}" <#if jobGroup == group>selected</#if> >${group.desc}</option>
</#list> </#list>
</select> </select>
</div> </div>
...@@ -74,6 +74,7 @@ ...@@ -74,6 +74,7 @@
<th name="jobGroup" >任务组</th> <th name="jobGroup" >任务组</th>
<th name="jobName" >任务名</th> <th name="jobName" >任务名</th>
<th name="jobCron" >Cron</th> <th name="jobCron" >Cron</th>
<th name="jobDesc" >描述</th>
<th name="jobClass" >JobBean</th> <th name="jobClass" >JobBean</th>
<th name="jobData" >任务数据</th> <th name="jobData" >任务数据</th>
<th name="triggerTime" >调度时间</th> <th name="triggerTime" >调度时间</th>
......
...@@ -33,6 +33,7 @@ $(function() { ...@@ -33,6 +33,7 @@ $(function() {
}, },
{ "data": 'jobName'}, { "data": 'jobName'},
{ "data": 'jobCron', "visible" : false}, { "data": 'jobCron', "visible" : false},
{ "data": 'jobDesc', "visible" : false},
{ "data": 'jobClass', "visible" : false}, { "data": 'jobClass', "visible" : false},
{ {
"data": 'jobData', "data": 'jobData',
......
...@@ -21,8 +21,8 @@ public class XxlJobInfoTest { ...@@ -21,8 +21,8 @@ public class XxlJobInfoTest {
@Test @Test
public void pageList(){ public void pageList(){
List<XxlJobInfo> list = xxlJobInfoDao.pageList(0, 20, null, null, null); List<XxlJobInfo> list = xxlJobInfoDao.pageList(0, 20, null, null);
int list_count = xxlJobInfoDao.pageListCount(0, 20, null, null, null); int list_count = xxlJobInfoDao.pageListCount(0, 20, null, null);
System.out.println(list); System.out.println(list);
System.out.println(list_count); System.out.println(list_count);
...@@ -39,13 +39,13 @@ public class XxlJobInfoTest { ...@@ -39,13 +39,13 @@ public class XxlJobInfoTest {
System.out.println(count); System.out.println(count);
System.out.println(info.getId()); System.out.println(info.getId());
XxlJobInfo item = xxlJobInfoDao.load("job_name"); XxlJobInfo item = xxlJobInfoDao.load(null ,"job_name");
System.out.println(item); System.out.println(item);
} }
@Test @Test
public void update(){ public void update(){
XxlJobInfo item = xxlJobInfoDao.load("job_name"); XxlJobInfo item = xxlJobInfoDao.load(null ,"job_name");
item.setJobCron("jobCron2"); item.setJobCron("jobCron2");
item.setJobData("jobData2"); item.setJobData("jobData2");
......
...@@ -57,8 +57,8 @@ public class XxlJobLogTest { ...@@ -57,8 +57,8 @@ public class XxlJobLogTest {
@Test @Test
public void pageList(){ public void pageList(){
List<XxlJobLog> list = xxlJobLogDao.pageList(0, 20, null, null, null); List<XxlJobLog> list = xxlJobLogDao.pageList(0, 20, null, null, null, null);
int list_count = xxlJobLogDao.pageListCount(0, 20, null, null, null); int list_count = xxlJobLogDao.pageListCount(0, 20, null, null, null, null);
System.out.println(list); System.out.println(list);
System.out.println(list_count); System.out.println(list_count);
......
package com.xxl.job.service.handler; package com.xxl.job.service.handler;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -20,12 +19,12 @@ public class DemoJobHandler extends IJobHandler { ...@@ -20,12 +19,12 @@ public class DemoJobHandler extends IJobHandler {
private static transient Logger logger = LoggerFactory.getLogger(DemoJobHandler.class); private static transient Logger logger = LoggerFactory.getLogger(DemoJobHandler.class);
public DemoJobHandler() { public DemoJobHandler() {
HandlerRepository.regist(DemoJobHandler.class.getName(), this); HandlerRepository.regist("demoJobHandler", this);
} }
@Override @Override
public JobHandleStatus handle(Map<String, String> param) throws Exception { public JobHandleStatus handle(String... params) throws Exception {
logger.info(" ... param:{}", param); logger.info(" ... params:" + params);
TimeUnit.SECONDS.sleep(new Random().nextInt(5)); TimeUnit.SECONDS.sleep(new Random().nextInt(5));
return JobHandleStatus.SUCCESS; return JobHandleStatus.SUCCESS;
} }
......
package com.xxl.job.client.handler; package com.xxl.job.client.handler;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.xxl.job.client.handler.IJobHandler.JobHandleStatus;
import com.xxl.job.client.util.HttpUtil; import com.xxl.job.client.util.HttpUtil;
import com.xxl.job.client.util.JacksonUtil; import com.xxl.job.client.util.JacksonUtil;
/** /**
* handler repository * handler repository
* @author xuxueli 2015-12-19 19:28:44 * @author xuxueli 2015-12-19 19:28:44
...@@ -23,142 +17,47 @@ import com.xxl.job.client.util.JacksonUtil; ...@@ -23,142 +17,47 @@ import com.xxl.job.client.util.JacksonUtil;
public class HandlerRepository { public class HandlerRepository {
private static Logger logger = LoggerFactory.getLogger(HandlerRepository.class); private static Logger logger = LoggerFactory.getLogger(HandlerRepository.class);
public static final String job_desc = "job_desc"; public static final String HANDLER_ADDRESS = "handler_address";
public static final String job_url = "job_url"; public static final String HANDLER_NAME = "handler_name";
public static final String handleName = "handleName"; public static final String HANDLER_PARAMS = "handler_params";
public static final String triggerLogId = "triggerLogId";
public static final String triggerLogUrl = "triggerLogUrl"; public static final String TRIGGER_LOG_ID = "trigger_log_id";
public static final String TRIGGER_LOG_URL = "trigger_log_url";
// handler class map
private static ConcurrentHashMap<String, IJobHandler> handlerClassMap = new ConcurrentHashMap<String, IJobHandler>(); public static ConcurrentHashMap<String, HandlerThread> handlerTreadMap = new ConcurrentHashMap<String, HandlerThread>();
// handler thread map
private static ConcurrentHashMap<String, HandlerThread> handlerTreadMap = new ConcurrentHashMap<String, HandlerThread>();
// handler date queue map
private static ConcurrentHashMap<String, LinkedBlockingQueue<Map<String, String>>> handlerDataQueueMap = new ConcurrentHashMap<String, LinkedBlockingQueue<Map<String, String>>>();
// regist handler // regist handler
public static void regist(String handleName, IJobHandler handler){ public static void regist(String handleName, IJobHandler handler){
handlerClassMap.put(handleName, handler); HandlerThread handlerThread = new HandlerThread(handler);
LinkedBlockingQueue<Map<String, String>> handlerDateQueue = new LinkedBlockingQueue<Map<String, String>>();
handlerDataQueueMap.put(handleName, handlerDateQueue);
HandlerThread handlerThread = new HandlerThread(handleName);
handlerThread.start(); handlerThread.start();
handlerTreadMap.put(handleName, handlerThread); handlerTreadMap.put(handleName, handlerThread); // putIfAbsent
logger.info(">>>>>>>>>>> xxl-job regist handler success, handleName:{}, handler:{}, handlerDateQueue:{}, handlerThread:{}", logger.info(">>>>>>>>>>> xxl-job regist handler success, handleName:{}, handler:{}", new Object[]{handleName, handler});
new Object[]{handleName, handler, handlerDateQueue, handlerThread});
}
// create handler thread
static class HandlerThread extends Thread{
private String _handleName;
public HandlerThread(String _handleName) {
this._handleName = _handleName;
}
public boolean isValid = true;
public void stopThread(){
isValid = false;
}
@Override
public void run() {
while (isValid) {
LinkedBlockingQueue<Map<String, String>> handlerDateQueue = handlerDataQueueMap.get(_handleName);
Map<String, String> handlerData = handlerDateQueue.poll();
if (handlerData!=null) {
// handle job
JobHandleStatus _status = JobHandleStatus.FAIL;
String _msg = null;
try {
IJobHandler handler = handlerClassMap.get(_handleName);
_status = handler.handle(handlerData);
} catch (Exception e) {
e.printStackTrace();
_status = JobHandleStatus.FAIL;
StringWriter out = new StringWriter();
e.printStackTrace(new PrintWriter(out));
_msg = out.toString();
}
// callback handler info
String callback_response[] = null;
try {
String _triggerLogUrl = handlerData.get(HandlerRepository.triggerLogUrl);
HashMap<String, String> params = new HashMap<String, String>();
params.put(HandlerRepository.triggerLogId, handlerData.get(HandlerRepository.triggerLogId));
params.put(HttpUtil.status, _status.name());
params.put(HttpUtil.msg, _msg);
callback_response = HttpUtil.post(_triggerLogUrl, params);
} catch (Exception e) {
e.printStackTrace();
}
logger.info("<<<<<<<<<<< xxl-job thread handle, handlerData:{}, callback_status:{}, callback_msg:{}, callback_response:{}, thread:{}",
new Object[]{handlerData, _status, _msg, callback_response, this});
} else {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
} }
// handler push to queue // handler push to queue
public static String pushHandleQueue(Map<String, String> _param) { public static String pushHandleQueue(Map<String, String> _param) {
logger.info(">>>>>>>>>>> xxl-job pushHandleQueue start, _param:{}", new Object[]{_param});
// resuolt // result
String _status = HttpUtil.FAIL; String _status = HttpUtil.FAIL;
String _msg = ""; String _msg = "";
// push data to queue // push data to queue
String _handleName = _param.get(HandlerRepository.handleName); HandlerThread handlerThread = handlerTreadMap.get(_param.get(HandlerRepository.HANDLER_NAME));
int _triggerLogId = Integer.valueOf(_param.get(HandlerRepository.triggerLogId)); // 次数应校验,停止队列功能为开发 if (handlerThread != null) {
try { handlerThread.pushData(_param);
if (_handleName!=null && _handleName.trim().length()>0) { _status = HttpUtil.SUCCESS;
IJobHandler handler = handlerClassMap.get(_handleName); } else {
if (handler != null) { _msg = "handler not found.";
// push data to handler queue
LinkedBlockingQueue<Map<String, String>> handlerDateQueue = handlerDataQueueMap.get(_handleName);
if (handlerDateQueue == null) {
handlerDateQueue = new LinkedBlockingQueue<Map<String, String>>();
handlerDataQueueMap.put(_handleName, handlerDateQueue);
logger.info(">>>>>>>>>>> xxl-job handler lazy fresh handlerDateQueue, _handleName:{}, handler:{}, handlerDateQueue:{}",
new Object[]{_handleName, handler, handlerDateQueue});
}
// check handler thread
HandlerThread handlerThreadOld = handlerTreadMap.get(_handleName);
if (!handlerThreadOld.isAlive()) {
handlerThreadOld.stopThread();
HandlerThread handlerThread = new HandlerThread(_handleName);
handlerThread.start();
handlerTreadMap.put(_handleName, handlerThread);
logger.info(">>>>>>>>>>> xxl-job handler lazy fresh thread, _handleName:{}, handler:{}, handlerThread:{}",
new Object[]{_handleName, handler, handlerThread});
}
// push to queue
handlerDateQueue.offer(_param);
_status = HttpUtil.SUCCESS;
}
}
} catch (Exception e) {
e.printStackTrace();
StringWriter out = new StringWriter();
e.printStackTrace(new PrintWriter(out));
_status = HttpUtil.FAIL;
_msg = out.toString();
} }
logger.info(">>>>>>>>>>> xxl-job pushHandleQueue, _handleName:{}, _triggerLogId:{}, _param:{}, _status:{}, _msg:{}",
new Object[]{_handleName, _triggerLogId, _param, _status, _msg});
HashMap<String, String> triggerData = new HashMap<String, String>(); HashMap<String, String> triggerData = new HashMap<String, String>();
triggerData.put(HandlerRepository.TRIGGER_LOG_ID, _param.get(HandlerRepository.TRIGGER_LOG_ID));
triggerData.put(HttpUtil.status, _status); triggerData.put(HttpUtil.status, _status);
triggerData.put(HttpUtil.msg, _msg); triggerData.put(HttpUtil.msg, _msg);
return JacksonUtil.writeValueAsString(triggerData);
/** logger.info(">>>>>>>>>>> xxl-job pushHandleQueue end, triggerData:{}", new Object[]{triggerData});
* trigger-log : return JacksonUtil.writeValueAsString(triggerData);
* trigger side : store trigger-info >> trigger request >> update trigger-response-status
* job side : handler trigger >> update trigger-result
*/
} }
} }
package com.xxl.job.client.handler; package com.xxl.job.client.handler;
import java.util.Map;
/** /**
* remote job handler * remote job handler
* @author xuxueli 2015-12-19 19:06:38 * @author xuxueli 2015-12-19 19:06:38
...@@ -15,7 +13,7 @@ public abstract class IJobHandler extends HandlerRepository{ ...@@ -15,7 +13,7 @@ public abstract class IJobHandler extends HandlerRepository{
* @return * @return
* @throws Exception * @throws Exception
*/ */
public abstract JobHandleStatus handle(Map<String, String> param) throws Exception; public abstract JobHandleStatus handle(String... params) throws Exception;
public enum JobHandleStatus{ public enum JobHandleStatus{
/** /**
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论