提交 04abd884 authored 作者: xuxueli's avatar xuxueli

调度全异步处理:任务触发之后,推送到调度队列,多线程并发处理调度请求,提高任务调度速率的同时,避免因网络问题导致quartz调度线程阻塞的问题;

上级 05b2a60c
...@@ -1236,6 +1236,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 ...@@ -1236,6 +1236,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
- 17、自研Log组件参数占位符改为"{}",并修复打印有参日志时参数不匹配导致报错的问题; - 17、自研Log组件参数占位符改为"{}",并修复打印有参日志时参数不匹配导致报错的问题;
- 18、核心依赖Core内部国际化处理; - 18、核心依赖Core内部国际化处理;
- 19、默认Quartz线程数调整为50; - 19、默认Quartz线程数调整为50;
- 20、调度全异步处理:任务触发之后,推送到调度队列,多线程并发处理调度请求,提高任务调度速率的同时,避免因网络问题导致quartz调度线程阻塞的问题;
### TODO LIST ### TODO LIST
......
package com.xxl.job.admin.core.jobbean; package com.xxl.job.admin.core.jobbean;
import com.xxl.job.admin.core.trigger.XxlJobTrigger; import com.xxl.job.admin.core.thread.JobTriggerPoolHelper;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException; import org.quartz.JobExecutionException;
import org.quartz.JobKey; import org.quartz.JobKey;
...@@ -26,7 +26,8 @@ public class RemoteHttpJobBean extends QuartzJobBean { ...@@ -26,7 +26,8 @@ public class RemoteHttpJobBean extends QuartzJobBean {
Integer jobId = Integer.valueOf(jobKey.getName()); Integer jobId = Integer.valueOf(jobKey.getName());
// trigger // trigger
XxlJobTrigger.trigger(jobId); //XxlJobTrigger.trigger(jobId);
JobTriggerPoolHelper.trigger(jobId);
} }
} }
\ No newline at end of file
...@@ -4,6 +4,7 @@ import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean; ...@@ -4,6 +4,7 @@ import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean;
import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.thread.JobFailMonitorHelper; import com.xxl.job.admin.core.thread.JobFailMonitorHelper;
import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper; import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper;
import com.xxl.job.admin.core.thread.JobTriggerPoolHelper;
import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.admin.dao.XxlJobGroupDao; import com.xxl.job.admin.dao.XxlJobGroupDao;
import com.xxl.job.admin.dao.XxlJobInfoDao; import com.xxl.job.admin.dao.XxlJobInfoDao;
...@@ -93,6 +94,10 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware { ...@@ -93,6 +94,10 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware {
} }
public void destroy(){ public void destroy(){
// admin trigger pool stop
JobTriggerPoolHelper.toStop();
// admin registry stop // admin registry stop
JobRegistryMonitorHelper.getInstance().toStop(); JobRegistryMonitorHelper.getInstance().toStop();
......
package com.xxl.job.admin.core.thread;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* job trigger thread pool helper
*
* @author xuxueli 2018-07-03 21:08:07
*/
public class JobTriggerPoolHelper {
private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);
// ---------------------- trigger pool ----------------------
private ThreadPoolExecutor triggerPool = new ThreadPoolExecutor(
50,
500,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(100000),
new ThreadPoolExecutor.CallerRunsPolicy());
public void addTrigger(final int jobId){
triggerPool.execute(new Runnable() {
@Override
public void run() {
XxlJobTrigger.trigger(jobId);
}
});
}
public void stop(){
//triggerPool.shutdown();
triggerPool.shutdownNow();
logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
}
// ---------------------- helper ----------------------
private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
public static void trigger(int jobId) {
helper.addTrigger(jobId);
}
public static void toStop(){
helper.stop();
}
}
...@@ -5,6 +5,7 @@ import com.xxl.job.admin.core.model.XxlJobGroup; ...@@ -5,6 +5,7 @@ import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum; import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.admin.core.thread.JobTriggerPoolHelper;
import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.admin.dao.XxlJobGroupDao; import com.xxl.job.admin.dao.XxlJobGroupDao;
import com.xxl.job.admin.dao.XxlJobInfoDao; import com.xxl.job.admin.dao.XxlJobInfoDao;
...@@ -275,7 +276,11 @@ public class XxlJobServiceImpl implements XxlJobService { ...@@ -275,7 +276,11 @@ public class XxlJobServiceImpl implements XxlJobService {
@Override @Override
public ReturnT<String> triggerJob(int id) { public ReturnT<String> triggerJob(int id) {
XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
JobTriggerPoolHelper.trigger(id);
return ReturnT.SUCCESS;
/*XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
if (xxlJobInfo == null) { if (xxlJobInfo == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_id")+I18nUtil.getString("system_unvalid")) ); return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_id")+I18nUtil.getString("system_unvalid")) );
} }
...@@ -289,7 +294,8 @@ public class XxlJobServiceImpl implements XxlJobService { ...@@ -289,7 +294,8 @@ public class XxlJobServiceImpl implements XxlJobService {
} catch (SchedulerException e) { } catch (SchedulerException e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage()); return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
} }*/
} }
@Override @Override
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论