提交 4db870c8 authored 作者: xueli.xue's avatar xueli.xue

执行器注册组件优化:注册逻辑调整为异步方式,提高注册性能;

上级 22762856
...@@ -2063,10 +2063,9 @@ data: post-data ...@@ -2063,10 +2063,9 @@ data: post-data
- 17、执行器注册线程优化,修复极端情况下初始化失败时导致NPE问题; - 17、执行器注册线程优化,修复极端情况下初始化失败时导致NPE问题;
- 18、执行器Commandhandler示例任务优化,修复极端情况下脚本进程挂起问题; - 18、执行器Commandhandler示例任务优化,修复极端情况下脚本进程挂起问题;
- 19、调度中心页面交互优化:用户管理模块密码列取消;多处表达autocomplete取消;执行器管理模块XSS拦截校验等; - 19、调度中心页面交互优化:用户管理模块密码列取消;多处表达autocomplete取消;执行器管理模块XSS拦截校验等;
- 19、[ING]任务触发参数优化:支持选择 "Cron触发"、"固定间隔时间触发"、"指定时间点触发"、"不选择" 等; - 20、执行器注册组件优化:注册逻辑调整为异步方式,提高注册性能;
- 20、[ING]任务 misfire 策略:忽略、补偿一次、补偿最近10次……等; - 21、[ING]任务触发参数优化:支持选择 "Cron触发"、"固定间隔时间触发"、"指定时间点触发"、"不选择" 等;
- 21、[规划中]执行器注册,异步写入; - 22、[ING]任务 misfire 策略:忽略、补偿一次等;
- 22、[规划中]默认开启访问令牌鉴权;
### 7.32 版本 v2.3.0 Release Notes[规划中] ### 7.32 版本 v2.3.0 Release Notes[规划中]
......
...@@ -28,7 +28,7 @@ public class XxlJobScheduler { ...@@ -28,7 +28,7 @@ public class XxlJobScheduler {
JobTriggerPoolHelper.toStart(); JobTriggerPoolHelper.toStart();
// admin registry monitor run // admin registry monitor run
JobRegistryMonitorHelper.getInstance().start(); JobRegistryHelper.getInstance().start();
// admin fail-monitor run // admin fail-monitor run
JobFailMonitorHelper.getInstance().start(); JobFailMonitorHelper.getInstance().start();
...@@ -61,7 +61,7 @@ public class XxlJobScheduler { ...@@ -61,7 +61,7 @@ public class XxlJobScheduler {
JobFailMonitorHelper.getInstance().toStop(); JobFailMonitorHelper.getInstance().toStop();
// admin registry stop // admin registry stop
JobRegistryMonitorHelper.getInstance().toStop(); JobRegistryHelper.getInstance().toStop();
// admin trigger pool stop // admin trigger pool stop
JobTriggerPoolHelper.toStop(); JobTriggerPoolHelper.toStop();
......
...@@ -3,29 +3,57 @@ package com.xxl.job.admin.core.thread; ...@@ -3,29 +3,57 @@ package com.xxl.job.admin.core.thread;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobRegistry; import com.xxl.job.admin.core.model.XxlJobRegistry;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.RegistryConfig; import com.xxl.job.core.enums.RegistryConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.*;
/** /**
* job registry instance * job registry instance
* @author xuxueli 2016-10-02 19:10:24 * @author xuxueli 2016-10-02 19:10:24
*/ */
public class JobRegistryMonitorHelper { public class JobRegistryHelper {
private static Logger logger = LoggerFactory.getLogger(JobRegistryMonitorHelper.class); private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class);
private static JobRegistryMonitorHelper instance = new JobRegistryMonitorHelper(); private static JobRegistryHelper instance = new JobRegistryHelper();
public static JobRegistryMonitorHelper getInstance(){ public static JobRegistryHelper getInstance(){
return instance; return instance;
} }
private Thread registryThread; private ThreadPoolExecutor registryOrRemoveThreadPool = null;
private Thread registryMonitorThread;
private volatile boolean toStop = false; private volatile boolean toStop = false;
public void start(){ public void start(){
registryThread = new Thread(new Runnable() {
// for registry or remove
registryOrRemoveThreadPool = new ThreadPoolExecutor(
2,
10,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
}
});
// for monitor
registryMonitorThread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
while (!toStop) { while (!toStop) {
...@@ -95,20 +123,82 @@ public class JobRegistryMonitorHelper { ...@@ -95,20 +123,82 @@ public class JobRegistryMonitorHelper {
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop"); logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
} }
}); });
registryThread.setDaemon(true); registryMonitorThread.setDaemon(true);
registryThread.setName("xxl-job, admin JobRegistryMonitorHelper"); registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
registryThread.start(); registryMonitorThread.start();
} }
public void toStop(){ public void toStop(){
toStop = true; toStop = true;
// interrupt and wait
registryThread.interrupt(); // stop registryOrRemoveThreadPool
registryOrRemoveThreadPool.shutdownNow();
// stop monitir (interrupt and wait)
registryMonitorThread.interrupt();
try { try {
registryThread.join(); registryMonitorThread.join();
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} }
} }
// ---------------------- helper ----------------------
public ReturnT<String> registry(RegistryParam registryParam) {
// valid
if (!StringUtils.hasText(registryParam.getRegistryGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
// async execute
registryOrRemoveThreadPool.execute(new Runnable() {
@Override
public void run() {
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
if (ret < 1) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
// fresh
freshGroupRegistryInfo(registryParam);
}
}
});
return ReturnT.SUCCESS;
}
public ReturnT<String> registryRemove(RegistryParam registryParam) {
// valid
if (!StringUtils.hasText(registryParam.getRegistryGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
// async execute
registryOrRemoveThreadPool.execute(new Runnable() {
@Override
public void run() {
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
if (ret > 0) {
// fresh
freshGroupRegistryInfo(registryParam);
}
}
});
return ReturnT.SUCCESS;
}
private void freshGroupRegistryInfo(RegistryParam registryParam){
// Under consideration, prevent affecting core tables
}
} }
...@@ -2,6 +2,7 @@ package com.xxl.job.admin.service.impl; ...@@ -2,6 +2,7 @@ package com.xxl.job.admin.service.impl;
import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.thread.JobRegistryHelper;
import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; import com.xxl.job.admin.core.thread.JobTriggerPoolHelper;
import com.xxl.job.admin.core.trigger.TriggerTypeEnum; import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.admin.core.util.I18nUtil;
...@@ -17,7 +18,6 @@ import com.xxl.job.core.handler.IJobHandler; ...@@ -17,7 +18,6 @@ import com.xxl.job.core.handler.IJobHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.text.MessageFormat; import java.text.MessageFormat;
...@@ -131,45 +131,12 @@ public class AdminBizImpl implements AdminBiz { ...@@ -131,45 +131,12 @@ public class AdminBizImpl implements AdminBiz {
@Override @Override
public ReturnT<String> registry(RegistryParam registryParam) { public ReturnT<String> registry(RegistryParam registryParam) {
return JobRegistryHelper.getInstance().registry(registryParam);
// valid
if (!StringUtils.hasText(registryParam.getRegistryGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
if (ret < 1) {
xxlJobRegistryDao.registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
// fresh
freshGroupRegistryInfo(registryParam);
}
return ReturnT.SUCCESS;
} }
@Override @Override
public ReturnT<String> registryRemove(RegistryParam registryParam) { public ReturnT<String> registryRemove(RegistryParam registryParam) {
return JobRegistryHelper.getInstance().registryRemove(registryParam);
// valid
if (!StringUtils.hasText(registryParam.getRegistryGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
int ret = xxlJobRegistryDao.registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
if (ret > 0) {
// fresh
freshGroupRegistryInfo(registryParam);
}
return ReturnT.SUCCESS;
}
private void freshGroupRegistryInfo(RegistryParam registryParam){
// Under consideration, prevent affecting core tables
} }
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论