Unverified 提交 c3c7ded5 authored 作者: 许雪里's avatar 许雪里 提交者: GitHub

Merge branch 'master' into feature/timeout

......@@ -156,6 +156,36 @@ XXL-JOB是一个轻量级分布式任务调度框架,其核心设计目标是
- 64、未来无线网
- 65、厦门瓷禧网络有限公司
- 66、北京递蓝科软件股份有限公司
- 67、郑州创海软件科技公司
- 68、北京国槐信息科技有限公司
- 69、浪潮软件集团
- 70、多立恒(北京)信息技术有限公司
- 71、广州极迅客信息科技有限公司
- 72、赫基(中国)集团股份有限公司
- 73、海投汇
- 74、上海润益创业孵化器管理股份有限公司
- 75、汉纳森(厦门)数据股份有限公司
- 76、安信信托
- 77、岚儒财富
- 78、捷道软件
- 79、湖北享七网络科技有限公司
- 80、湖南创发科技责任有限公司
- 81、深圳小安时代互联网金融服务有限公司
- 82、湖北享七网络科技有限公司
- 83、钱包行云(北京)科技有限公司
- 84、360金融 (360)
- 85、易企秀
- 86、摩贝(上海)生物科技有限公司
- 87、广东芯智慧科技有限公司
- 88、联想集团 (联想)
- 89、怪兽充电
- 90、行圆汽车
- 91、深圳店店通科技邮箱公司
- 92、京东 (京东)
- 93、米庄理财
- 94、咖啡易融
- 95、梧桐诚选
- 96、恒大地产 (恒大)
- ……
> 更多接入的公司,欢迎在 [登记地址](https://github.com/xuxueli/xxl-job/issues/1 ) 登记,登记仅仅为了产品推广。
......
差异被折叠。
......@@ -181,12 +181,13 @@ CREATE TABLE `XXL_JOB_QRTZ_TRIGGER_LOG` (
`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
`executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
`trigger_time` datetime DEFAULT NULL COMMENT '调度-时间',
`trigger_code` varchar(255) NOT NULL DEFAULT '0' COMMENT '调度-结果',
`trigger_code` int(11) NOT NULL COMMENT '调度-结果',
`trigger_msg` varchar(2048) DEFAULT NULL COMMENT '调度-日志',
`handle_time` datetime DEFAULT NULL COMMENT '执行-时间',
`handle_code` varchar(255) NOT NULL DEFAULT '0' COMMENT '执行-状态',
`handle_code` int(11) NOT NULL COMMENT '执行-状态',
`handle_msg` varchar(2048) DEFAULT NULL COMMENT '执行-日志',
PRIMARY KEY (`id`)
PRIMARY KEY (`id`),
KEY `I_trigger_time` (`trigger_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `XXL_JOB_QRTZ_TRIGGER_LOGGLUE` (
......
......@@ -27,6 +27,9 @@ public class XxlJobAdminConfig implements InitializingBean{
@Value("${xxl.job.mail.port}")
private String mailPort;
@Value("${xxl.job.mail.ssl}")
private boolean mailSSL;
@Value("${xxl.job.mail.username}")
private String mailUsername;
......@@ -54,6 +57,10 @@ public class XxlJobAdminConfig implements InitializingBean{
return mailPort;
}
public boolean isMailSSL() {
return mailSSL;
}
public String getMailUsername() {
return mailUsername;
}
......
......@@ -4,12 +4,14 @@ import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.thread.JobFailMonitorHelper;
import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper;
import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.admin.dao.XxlJobGroupDao;
import com.xxl.job.admin.dao.XxlJobInfoDao;
import com.xxl.job.admin.dao.XxlJobLogDao;
import com.xxl.job.admin.dao.XxlJobRegistryDao;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import com.xxl.job.core.rpc.netcom.NetComServerFactory;
import org.quartz.*;
......@@ -76,11 +78,20 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware {
NetComServerFactory.putService(AdminBiz.class, XxlJobDynamicScheduler.adminBiz);
NetComServerFactory.setAccessToken(accessToken);
// init i18n
initI18n();
// valid
Assert.notNull(scheduler, "quartz scheduler is null");
logger.info(">>>>>>>>> init xxl-job admin success.");
}
private void initI18n(){
for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));
}
}
public void destroy(){
// admin registry stop
JobRegistryMonitorHelper.getInstance().toStop();
......
......@@ -56,22 +56,22 @@ public class JobFailMonitorHelper {
continue;
}
if (IJobHandler.SUCCESS.getCode() == log.getTriggerCode() && log.getHandleCode() == 0) {
// job running
JobFailMonitorHelper.monitor(jobLogId);
logger.info(">>>>>>>>>>> job monitor, job running, JobLogId:{}", jobLogId);
} else if (IJobHandler.SUCCESS.getCode() == log.getHandleCode()) {
// job success, pass
logger.info(">>>>>>>>>>> job monitor, job success, JobLogId:{}", jobLogId);
} else if (IJobHandler.FAIL.getCode() == log.getTriggerCode()
} else /*if (IJobHandler.FAIL.getCode() == log.getTriggerCode()
|| IJobHandler.FAIL.getCode() == log.getHandleCode()
|| IJobHandler.TIMEOUT.getCode() == log.getHandleCode()
|| IJobHandler.FAIL_RETRY.getCode() == log.getHandleCode() ) {
|| IJobHandler.FAIL_RETRY.getCode() == log.getHandleCode() )*/ {
// job fail,
failAlarm(log);
logger.info(">>>>>>>>>>> job monitor, job fail, JobLogId:{}", jobLogId);
} else {
}/* else {
JobFailMonitorHelper.monitor(jobLogId);
logger.info(">>>>>>>>>>> job monitor, job status unknown, JobLogId:{}", jobLogId);
}
}*/
}
}
......
......@@ -30,7 +30,7 @@ public class I18nUtil {
return prop;
}
try {
// bild i18n prop
// build i18n prop
String i18n = XxlJobAdminConfig.getAdminConfig().getI18n();
i18n = StringUtils.isNotBlank(i18n)?("_"+i18n):i18n;
String i18nFile = MessageFormat.format("i18n/message{0}.properties", i18n);
......
......@@ -3,6 +3,7 @@ package com.xxl.job.admin.core.util;
import org.apache.commons.lang3.StringUtils;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* local cache tool
......@@ -11,7 +12,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class LocalCacheUtil {
private static ConcurrentHashMap<String, LocalCacheData> cacheRepository = new ConcurrentHashMap<>();
private static ConcurrentMap<String, LocalCacheData> cacheRepository = new ConcurrentHashMap<String, LocalCacheData>(); // 类型建议用抽象父类,兼容性更好;
private static class LocalCacheData{
private String key;
private Object val;
......
......@@ -35,8 +35,14 @@ public class MailUtil {
//email.setSSL(true);
email.setHostName(XxlJobAdminConfig.getAdminConfig().getMailHost());
if (XxlJobAdminConfig.getAdminConfig().isMailSSL()) {
email.setSslSmtpPort(XxlJobAdminConfig.getAdminConfig().getMailPort());
email.setSSLOnConnect(true);
} else {
email.setSmtpPort(Integer.valueOf(XxlJobAdminConfig.getAdminConfig().getMailPort()));
//email.setSslSmtpPort(port);
}
email.setAuthenticator(new DefaultAuthenticator(XxlJobAdminConfig.getAdminConfig().getMailUsername(), XxlJobAdminConfig.getAdminConfig().getMailPassword()));
email.setCharset(Charset.defaultCharset().name());
......
......@@ -6,7 +6,6 @@ import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.admin.core.util.LocalCacheUtil;
import com.xxl.job.admin.dao.XxlJobGroupDao;
import com.xxl.job.admin.dao.XxlJobInfoDao;
import com.xxl.job.admin.dao.XxlJobLogDao;
......@@ -324,12 +323,12 @@ public class XxlJobServiceImpl implements XxlJobService {
private static final String TRIGGER_CHART_DATA_CACHE = "trigger_chart_data_cache";
@Override
public ReturnT<Map<String, Object>> chartInfo(Date startDate, Date endDate) {
// get cache
/*// get cache
String cacheKey = TRIGGER_CHART_DATA_CACHE + "_" + startDate.getTime() + "_" + endDate.getTime();
Map<String, Object> chartInfo = (Map<String, Object>) LocalCacheUtil.get(cacheKey);
if (chartInfo != null) {
return new ReturnT<Map<String, Object>>(chartInfo);
}
}*/
// process
List<String> triggerDayList = new ArrayList<String>();
......@@ -376,8 +375,8 @@ public class XxlJobServiceImpl implements XxlJobService {
result.put("triggerCountSucTotal", triggerCountSucTotal);
result.put("triggerCountFailTotal", triggerCountFailTotal);
// set cache
LocalCacheUtil.set(cacheKey, result, 60*1000); // cache 60s
/*// set cache
LocalCacheUtil.set(cacheKey, result, 60*1000); // cache 60s*/
return new ReturnT<Map<String, Object>>(result);
}
......
......@@ -193,6 +193,9 @@ jobgroup_del_limit_0=拒绝删除,该执行器使用中
jobgroup_del_limit_1=拒绝删除, 系统至少保留一个执行器
## job conf
jobconf_block_SERIAL_EXECUTION=单机串行
jobconf_block_DISCARD_LATER=丢弃后续调度
jobconf_block_COVER_EARLY=覆盖之前调度
jobconf_fail_alarm=失败告警
jobconf_fail_retry=失败重试
jobconf_route_first=第一个
......
......@@ -193,6 +193,9 @@ jobgroup_del_limit_0=Refuse to delete, the executor is being used
jobgroup_del_limit_1=Refuses to delete, the system retains at least one executor
## job conf
jobconf_block_SERIAL_EXECUTION=Serial execution
jobconf_block_DISCARD_LATER=Discard Later
jobconf_block_COVER_EARLY=Cover Early
jobconf_fail_alarm=Fail Alarm
jobconf_fail_retry=Fail Retry
jobconf_route_first=First
......
......@@ -45,7 +45,7 @@
SELECT <include refid="Base_Column_List" />
FROM XXL_JOB_QRTZ_TRIGGER_LOG AS t
<trim prefix="WHERE" prefixOverrides="AND | OR" >
<if test="jobGroup != null and jobGroup != ''">
<if test="jobGroup gt 0">
AND t.job_group = #{jobGroup}
</if>
<if test="jobId gt 0">
......@@ -62,12 +62,13 @@
</if>
<if test="logStatus == 2" >
AND (
(t.trigger_code <![CDATA[ > ]]> 0 AND t.trigger_code!=200) ||
(t.handle_code <![CDATA[ > ]]> 0 AND t.handle_code!=200)
t.trigger_code NOT IN (0, 200) OR
t.handle_code NOT IN (0, 200)
)
</if>
<if test="logStatus == 3" >
AND (t.trigger_code = 200 AND t.handle_code=0)
AND t.trigger_code = 200
AND t.handle_code = 0
</if>
</trim>
ORDER BY id DESC
......@@ -78,7 +79,7 @@
SELECT count(1)
FROM XXL_JOB_QRTZ_TRIGGER_LOG AS t
<trim prefix="WHERE" prefixOverrides="AND | OR" >
<if test="jobGroup != null and jobGroup != ''">
<if test="jobGroup gt 0">
AND t.job_group = #{jobGroup}
</if>
<if test="jobId gt 0">
......@@ -95,12 +96,13 @@
</if>
<if test="logStatus == 2" >
AND (
(t.trigger_code <![CDATA[ > ]]> 0 AND t.trigger_code!=200) ||
(t.handle_code <![CDATA[ > ]]> 0 AND t.handle_code!=200)
t.trigger_code NOT IN (0, 200) OR
t.handle_code NOT IN (0, 200)
)
</if>
<if test="logStatus == 3" >
AND (t.trigger_code = 200 AND t.handle_code=0)
AND t.trigger_code = 200
AND t.handle_code = 0
</if>
</trim>
</select>
......@@ -115,10 +117,14 @@
<insert id="save" parameterType="com.xxl.job.admin.core.model.XxlJobLog" useGeneratedKeys="true" keyProperty="id" >
INSERT INTO XXL_JOB_QRTZ_TRIGGER_LOG (
`job_group`,
`job_id`
`job_id`,
`trigger_code`,
`handle_code`
) VALUES (
#{jobGroup},
#{jobId}
#{jobId},
#{triggerCode},
#{handleCode}
);
<!--<selectKey resultType="java.lang.Integer" order="AFTER" keyProperty="id">
SELECT LAST_INSERT_ID()
......@@ -166,7 +172,7 @@
SELECT
DATE_FORMAT(trigger_time,'%Y-%m-%d') triggerDay,
COUNT(handle_code) triggerDayCount,
SUM(CASE WHEN handle_code = 0 then 1 else 0 end) as triggerDayCountRunning,
SUM(CASE WHEN (trigger_code = 200 and handle_code = 0) then 1 else 0 end) as triggerDayCountRunning,
SUM(CASE WHEN handle_code = 200 then 1 else 0 end) as triggerDayCountSuc
FROM XXL_JOB_QRTZ_TRIGGER_LOG
WHERE trigger_time BETWEEN #{from} and #{to}
......
......@@ -7,6 +7,7 @@ xxl.job.db.password=root_pwd
### xxl-job email
xxl.job.mail.host=smtp.163.com
xxl.job.mail.port=25
xxl.job.mail.ssl=false
xxl.job.mail.username=ovono802302@163.com
xxl.job.mail.password=asdfzxcv
xxl.job.mail.sendNick=《任务调度平台XXL-JOB》
......
......@@ -135,7 +135,7 @@
</#list>
</select>
</div>
<label for="firstname" class="col-sm-2 control-label">JobHandler<font color="black">*</font></label>
<label for="firstname" class="col-sm-2 control-label">JobHandler<font color="red">*</font></label>
<div class="col-sm-4"><input type="text" class="form-control" name="executorHandler" placeholder="${I18n.system_please_input}JobHandler" maxlength="100" ></div>
</div>
<div class="form-group">
......@@ -308,7 +308,7 @@ process.exit(0)
</#list>
</select>
</div>
<label for="firstname" class="col-sm-2 control-label">JobHandler<font color="black">*</font></label>
<label for="firstname" class="col-sm-2 control-label">JobHandler<font color="red">*</font></label>
<div class="col-sm-4"><input type="text" class="form-control" name="executorHandler" placeholder="${I18n.system_please_input}JobHandler" maxlength="100" ></div>
</div>
<div class="form-group">
......
......@@ -229,7 +229,7 @@ $(function() {
end: function(layero, index){
if (needFresh) {
//window.location.reload();
jobTable.fnDraw();
jobTable.fnDraw(false);
}
}
});
......
......@@ -18,7 +18,7 @@ import java.text.MessageFormat;
public class MailUtilTest {
@Test
public void registryTest() throws Exception {
public void mailTest() throws Exception {
String mailBodyTemplate = "<h5>监控告警明细:</span>" +
"<table border=\"1\" cellpadding=\"3\" style=\"border-collapse:collapse; width:80%;\" >\n" +
......
......@@ -13,6 +13,9 @@ public interface AdminBiz {
public static final String MAPPING = "/api";
// ---------------------- callback ----------------------
/**
* callback
*
......@@ -21,6 +24,9 @@ public interface AdminBiz {
*/
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList);
// ---------------------- registry ----------------------
/**
* registry
*
......@@ -38,6 +44,8 @@ public interface AdminBiz {
public ReturnT<String> registryRemove(RegistryParam registryParam);
// ---------------------- job opt ----------------------
/**
* trigger job for once
*
......
......@@ -9,11 +9,14 @@ public class HandleCallbackParam implements Serializable {
private static final long serialVersionUID = 42L;
private int logId;
private long logDateTim;
private ReturnT<String> executeResult;
public HandleCallbackParam(){}
public HandleCallbackParam(int logId, ReturnT<String> executeResult) {
public HandleCallbackParam(int logId, long logDateTim, ReturnT<String> executeResult) {
this.logId = logId;
this.logDateTim = logDateTim;
this.executeResult = executeResult;
}
......@@ -25,6 +28,14 @@ public class HandleCallbackParam implements Serializable {
this.logId = logId;
}
public long getLogDateTim() {
return logDateTim;
}
public void setLogDateTim(long logDateTim) {
this.logDateTim = logDateTim;
}
public ReturnT<String> getExecuteResult() {
return executeResult;
}
......@@ -37,7 +48,9 @@ public class HandleCallbackParam implements Serializable {
public String toString() {
return "HandleCallbackParam{" +
"logId=" + logId +
", logDateTim=" + logDateTim +
", executeResult=" + executeResult +
'}';
}
}
......@@ -5,15 +5,19 @@ package com.xxl.job.core.enums;
*/
public enum ExecutorBlockStrategyEnum {
SERIAL_EXECUTION("单机串行"),
SERIAL_EXECUTION("Serial execution"),
/*CONCURRENT_EXECUTION("并行"),*/
DISCARD_LATER("丢弃后续调度"),
COVER_EARLY("覆盖之前调度");
DISCARD_LATER("Discard Later"),
COVER_EARLY("Cover Early");
private final String title;
private String title;
private ExecutorBlockStrategyEnum (String title) {
this.title = title;
}
public void setTitle(String title) {
this.title = title;
}
public String getTitle() {
return title;
}
......
......@@ -5,11 +5,11 @@ package com.xxl.job.core.glue;
*/
public enum GlueTypeEnum {
BEAN("BEAN模式", false, null, null),
GLUE_GROOVY("GLUE模式(Java)", false, null, null),
GLUE_SHELL("GLUE模式(Shell)", true, "bash", ".sh"),
GLUE_PYTHON("GLUE模式(Python)", true, "python", ".py"),
GLUE_NODEJS("GLUE模式(Nodejs)", true, "node", ".js");
BEAN("BEAN", false, null, null),
GLUE_GROOVY("GLUE(Java)", false, null, null),
GLUE_SHELL("GLUE(Shell)", true, "bash", ".sh"),
GLUE_PYTHON("GLUE(Python)", true, "python", ".py"),
GLUE_NODEJS("GLUE(Nodejs)", true, "node", ".js");
private String desc;
private boolean isScript;
......
......@@ -8,6 +8,8 @@ import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ScriptUtil;
import com.xxl.job.core.util.ShardingUtil;
import java.io.File;
/**
* Created by xuxueli on 17/4/27.
*/
......@@ -41,7 +43,7 @@ public class ScriptJobHandler extends IJobHandler {
// make script file
String scriptFileName = XxlJobFileAppender.getGlueSrcPath()
.concat("/")
.concat(File.separator)
.concat(String.valueOf(jobId))
.concat("_")
.concat(String.valueOf(glueUpdatetime))
......@@ -61,8 +63,15 @@ public class ScriptJobHandler extends IJobHandler {
// invoke
XxlJobLogger.log("----------- script file:"+ scriptFileName +" -----------");
int exitValue = ScriptUtil.execToFile(cmd, scriptFileName, logFileName, scriptParams);
ReturnT<String> result = (exitValue==0)?IJobHandler.SUCCESS:new ReturnT<String>(IJobHandler.FAIL.getCode(), "script exit value("+exitValue+") is failed");
return result;
if (exitValue == 0) {
return IJobHandler.SUCCESS;
} else if (exitValue == 101) {
return IJobHandler.FAIL_RETRY;
} else {
return new ReturnT<String>(IJobHandler.FAIL.getCode(), "script exit value("+exitValue+") is failed");
}
}
}
......@@ -79,7 +79,7 @@ public class XxlJobFileAppender {
// filePath/yyyy-MM-dd/9999.log
String logFileName = logFilePath.getPath()
.concat("/")
.concat(File.separator)
.concat(String.valueOf(logId))
.concat(".log");
return logFileName;
......@@ -115,12 +115,13 @@ public class XxlJobFileAppender {
appendLog += "\r\n";
// append file content
try {
FileOutputStream fos = null;
try {
fos = new FileOutputStream(logFile, true);
fos.write(appendLog.getBytes("utf-8"));
fos.flush();
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
if (fos != null) {
try {
......@@ -130,9 +131,6 @@ public class XxlJobFileAppender {
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
......
......@@ -2,10 +2,11 @@ package com.xxl.job.core.log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.FormattingTuple;
import org.slf4j.helpers.MessageFormatter;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.text.MessageFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
......@@ -49,15 +50,18 @@ public class XxlJobLogger {
/**
* append log with pattern
*
* @param appendLogPattern like "aaa {0} bbb {1} ccc"
* @param appendLogPattern like "aaa {} bbb {} ccc"
* @param appendLogArguments like "111, true"
*/
public static void log(String appendLogPattern, Object ... appendLogArguments) {
String appendLog = appendLogPattern;
FormattingTuple ft = MessageFormatter.format(appendLogPattern, appendLogArguments);
String appendLog = ft.getMessage();
/*appendLog = appendLogPattern;
if (appendLogArguments!=null && appendLogArguments.length>0) {
appendLog = MessageFormat.format(appendLogPattern, appendLogArguments);
}
}*/
StackTraceElement callInfo = new Throwable().getStackTrace()[1];
logDetail(callInfo, appendLog);
......
......@@ -58,8 +58,7 @@ public class NetComClientProxy implements FactoryBean<Object> {
// valid response
if (response == null) {
logger.error(">>>>>>>>>>> xxl-rpc netty response not found.");
throw new Exception(">>>>>>>>>>> xxl-rpc netty response not found.");
throw new Exception("Network request fail, response not found.");
}
if (response.isError()) {
throw new RuntimeException(response.getError());
......
......@@ -29,7 +29,7 @@ public class JettyClient {
byte[] responseBytes = HttpClientUtil.postRequest(reqURL, requestBytes);
if (responseBytes == null || responseBytes.length==0) {
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setError("RpcResponse byte[] is null");
rpcResponse.setError("Network request fail, RpcResponse byte[] is null");
return rpcResponse;
}
......@@ -40,7 +40,7 @@ public class JettyClient {
logger.error(e.getMessage(), e);
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setError("Client-error:" + e.getMessage());
rpcResponse.setError("Network request error: " + e.getMessage());
return rpcResponse;
}
}
......
......@@ -8,16 +8,22 @@ import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.*;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* handler thread
* @author xuxueli 2016-1-16 19:52:47
......@@ -28,9 +34,9 @@ public class JobThread extends Thread{
private int jobId;
private IJobHandler handler;
private LinkedBlockingQueue<TriggerParam> triggerQueue;
private ConcurrentHashSet<Integer> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID
private Set<Integer> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID
private boolean toStop = false;
private volatile boolean toStop = false;
private String stopReason;
private boolean running = false; // if running job
......@@ -41,7 +47,7 @@ public class JobThread extends Thread{
this.jobId = jobId;
this.handler = handler;
this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();
this.triggerLogIdSet = new ConcurrentHashSet<Integer>();
this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Integer>());
}
public IJobHandler getHandler() {
return handler;
......@@ -171,11 +177,11 @@ public class JobThread extends Thread{
// callback handler info
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), executeResult));
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), executeResult));
} else {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult));
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
}
}
}
......@@ -187,7 +193,7 @@ public class JobThread extends Thread{
if (triggerParam!=null) {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult));
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
}
}
......
......@@ -4,10 +4,13 @@ import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.log.XxlJobLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
......@@ -108,17 +111,27 @@ public class TriggerCallbackThread {
try {
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
callbackResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job callback success, callbackParamList:{}, callbackResult:{}", new Object[]{callbackParamList, callbackResult});
callbackLog(callbackParamList, "<br>----------- xxl-job callback success");
break;
} else {
logger.info(">>>>>>>>>>> xxl-job callback fail, callbackParamList:{}, callbackResult:{}", new Object[]{callbackParamList, callbackResult});
callbackLog(callbackParamList, "<br>----------- xxl-job callback fail, callbackResult:" + callbackResult);
}
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job callback error, callbackParamList:{}", callbackParamList, e);
callbackLog(callbackParamList, "<br>----------- xxl-job callback error, errorMsg:" + e.getMessage());
//getInstance().callBackQueue.addAll(callbackParamList);
}
}
}
/**
* callback log
*/
private void callbackLog(List<HandleCallbackParam> callbackParamList, String logContent){
for (HandleCallbackParam callbackParam: callbackParamList) {
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(callbackParam.getLogDateTim()), callbackParam.getLogId());
XxlJobFileAppender.contextHolder.set(logFileName);
XxlJobLogger.log(logContent);
}
}
}
......@@ -26,7 +26,7 @@ public class HttpClientUtil {
/**
* post request
*/
public static byte[] postRequest(String reqURL, byte[] date) throws Exception {
public static byte[] postRequest(String reqURL, byte[] data) throws Exception {
byte[] responseBytes = null;
HttpPost httpPost = new HttpPost(reqURL);
......@@ -53,8 +53,8 @@ public class HttpClientUtil {
httpPost.setConfig(requestConfig);
// data
if (date != null) {
httpPost.setEntity(new ByteArrayEntity(date, ContentType.DEFAULT_BINARY));
if (data != null) {
httpPost.setEntity(new ByteArrayEntity(data, ContentType.DEFAULT_BINARY));
}
// do post
HttpResponse response = httpClient.execute(httpPost);
......@@ -64,7 +64,6 @@ public class HttpClientUtil {
EntityUtils.consume(entity);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw e;
} finally {
httpPost.releaseConnection();
......
package com.xxl.job.core.util;
import com.xxl.job.core.log.XxlJobLogger;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.PumpStreamHandler;
......@@ -59,7 +60,10 @@ public class ScriptUtil {
// 标准输出:print (null if watchdog timeout)
// 错误输出:logging + 异常 (still exists if watchdog timeout)
// 标准输入
try (FileOutputStream fileOutputStream = new FileOutputStream(logFile, true)) {
FileOutputStream fileOutputStream = null; //
try {
fileOutputStream = new FileOutputStream(logFile, true);
PumpStreamHandler streamHandler = new PumpStreamHandler(fileOutputStream, fileOutputStream, null);
// command
......@@ -75,6 +79,18 @@ public class ScriptUtil {
exec.setStreamHandler(streamHandler);
int exitValue = exec.execute(commandline); // exit code: 0=success, 1=error
return exitValue;
} catch (Exception e) {
XxlJobLogger.log(e);
return -1;
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
XxlJobLogger.log(e);
}
}
}
}
......
......@@ -84,6 +84,7 @@
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<executions>
<execution>
<goals>
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论