Skip to content
项目
群组
代码片段
帮助
正在加载...
登录
切换导航
X
XXL-JOB
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
分枝图
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
分枝图
统计图
创建新议题
作业
提交
议题看板
打开侧边栏
靳帅
XXL-JOB
Commits
443c946e
提交
443c946e
authored
10月 27, 2018
作者:
xuxueli
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
- 1、调度中心迁移到 springboot;
- 2、底层通讯组件迁移至 xxl-rpc;
上级
02dc2a4c
隐藏空白字符变更
内嵌
并排
正在显示
28 个修改的文件
包含
322 行增加
和
1218 行删除
+322
-1218
XXL-JOB官方文档.md
doc/XXL-JOB官方文档.md
+13
-11
pom.xml
xxl-job-admin/pom.xml
+28
-0
JobApiController.java
...n/java/com/xxl/job/admin/controller/JobApiController.java
+9
-44
PermissionInterceptor.java
...b/admin/controller/interceptor/PermissionInterceptor.java
+3
-3
XxlJobDynamicScheduler.java
...m/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java
+56
-27
XxlJobTrigger.java
...in/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java
+1
-1
application.properties
xxl-job-admin/src/main/resources/application.properties
+1
-1
AdminBizTest.java
...dmin/src/test/java/com/xxl/job/adminbiz/AdminBizTest.java
+10
-3
pom.xml
xxl-job-core/pom.xml
+4
-25
XxlJobExecutor.java
...c/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
+94
-27
RpcRequest.java
.../src/main/java/com/xxl/job/core/rpc/codec/RpcRequest.java
+0
-92
RpcResponse.java
...src/main/java/com/xxl/job/core/rpc/codec/RpcResponse.java
+0
-41
NetComClientProxy.java
...n/java/com/xxl/job/core/rpc/netcom/NetComClientProxy.java
+0
-81
NetComServerFactory.java
...java/com/xxl/job/core/rpc/netcom/NetComServerFactory.java
+0
-84
JettyClient.java
...com/xxl/job/core/rpc/netcom/jetty/client/JettyClient.java
+0
-48
JettyServer.java
...com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java
+0
-92
JettyServerHandler.java
.../job/core/rpc/netcom/jetty/server/JettyServerHandler.java
+0
-68
HessianSerializer.java
...ava/com/xxl/job/core/rpc/serialize/HessianSerializer.java
+0
-62
ExecutorRegistryThread.java
.../java/com/xxl/job/core/thread/ExecutorRegistryThread.java
+3
-12
HttpClientUtil.java
...e/src/main/java/com/xxl/job/core/util/HttpClientUtil.java
+0
-110
IpUtil.java
xxl-job-core/src/main/java/com/xxl/job/core/util/IpUtil.java
+0
-135
NetUtil.java
...job-core/src/main/java/com/xxl/job/core/util/NetUtil.java
+0
-70
HttpJobHandler.java
...eli/executor/sample/jfinal/jobhandler/HttpJobHandler.java
+22
-47
HttpJobHandler.java
...xueli/executor/sample/nutz/jobhandler/HttpJobHandler.java
+22
-44
HttpJobHandler.java
...m/xxl/job/executor/service/jobhandler/HttpJobHandler.java
+22
-44
DemoJobHandlerTest.java
...c/test/java/com/xxl/executor/test/DemoJobHandlerTest.java
+7
-2
pom.xml
...ecutor-samples/xxl-job-executor-sample-springboot/pom.xml
+5
-0
HttpJobHandler.java
...m/xxl/job/executor/service/jobhandler/HttpJobHandler.java
+22
-44
没有找到文件。
doc/XXL-JOB官方文档.md
浏览文件 @
443c946e
...
...
@@ -1324,17 +1324,19 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
-
41、docker支持:调度中心提供 Dockerfile 方便快速构建docker镜像;
### 6.23 版本 V2.0.0 特性[迭代中]
-
1、
[
迭代中
]
调度中心迁移到springboot;
-
2、
[
迭代中
]
SimpleTrigger 支持;
-
3、
[
迭代中
]
任务状态与quartz解耦,降低quartz调度压力,仅NORMAL状态任务绑定quartz;
-
4、
[
迭代中
]
新增任务默认运行状态,任务更新时运行状态保持不变;
-
5、
[
迭代中
]
任务权限管理:执行器为粒度分配权限,核心操作校验权限;
-
6、
[
迭代中
]
Release发布时,一同发布调度中心安装包,真正实现开箱即用;
-
7、
[
迭代中
]
docker镜像,并且推送docker镜像到中央仓库,更进一步实现产品开箱即用;
-
8、
[
迭代中
]
cron在线生成工具,如 "cronboot/cron.qqe2";
-
9、
[
迭代中
]
原生提供通用命令行任务Handler(Bean任务,"CommandJobHandler");业务方只需要提供命令行即可,可执行任意命令;
-
10、IP获取逻辑优化,优先遍历网卡来获取可用IP;
-
11、任务新增的API服务接口返回任务ID,方便调用方实用;
-
1、调度中心迁移到 springboot;
-
2、底层通讯组件迁移至 xxl-rpc;
-
3、IP获取逻辑优化,优先遍历网卡来获取可用IP;
-
4、任务新增的API服务接口返回任务ID,方便调用方实用;
-
5、
[
迭代中
]
SimpleTrigger 支持;
-
6、
[
迭代中
]
任务状态与quartz解耦,降低quartz调度压力,仅NORMAL状态任务绑定quartz;
-
7、
[
迭代中
]
新增任务默认运行状态,任务更新时运行状态保持不变;
-
8、
[
迭代中
]
任务权限管理:执行器为粒度分配权限,核心操作校验权限;
-
9、
[
迭代中
]
Release发布时,一同发布调度中心安装包,真正实现开箱即用;
-
10、
[
迭代中
]
docker镜像,并且推送docker镜像到中央仓库,更进一步实现产品开箱即用;
-
11、
[
迭代中
]
cron在线生成工具,如 "cronboot/cron.qqe2";
-
12、
[
迭代中
]
原生提供通用命令行任务Handler(Bean任务,"CommandJobHandler");业务方只需要提供命令行即可,可执行任意命令;
### TODO LIST
-
1、任务分片路由:分片采用一致性Hash算法计算出尽量稳定的分片顺序,即使注册机器存在波动也不会引起分批分片顺序大的波动;目前采用IP自然排序,可以满足需求,待定;
...
...
xxl-job-admin/pom.xml
浏览文件 @
443c946e
...
...
@@ -18,6 +18,34 @@
<type>
pom
</type>
<scope>
import
</scope>
</dependency>
<!-- jetty -->
<dependency>
<groupId>
org.eclipse.jetty
</groupId>
<artifactId>
jetty-server
</artifactId>
<version>
${jetty-server.version}
</version>
</dependency>
<dependency>
<groupId>
org.eclipse.jetty
</groupId>
<artifactId>
jetty-util
</artifactId>
<version>
${jetty-server.version}
</version>
</dependency>
<dependency>
<groupId>
org.eclipse.jetty
</groupId>
<artifactId>
jetty-http
</artifactId>
<version>
${jetty-server.version}
</version>
</dependency>
<dependency>
<groupId>
org.eclipse.jetty
</groupId>
<artifactId>
jetty-io
</artifactId>
<version>
${jetty-server.version}
</version>
</dependency>
<dependency>
<groupId>
org.eclipse.jetty
</groupId>
<artifactId>
jetty-client
</artifactId>
<version>
${jetty-server.version}
</version>
</dependency>
</dependencies>
</dependencyManagement>
...
...
xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java
浏览文件 @
443c946e
package
com
.
xxl
.
job
.
admin
.
controller
;
import
com.xxl.job.admin.controller.annotation.PermessionLimit
;
import
com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler
;
import
com.xxl.job.core.biz.AdminBiz
;
import
com.xxl.job.core.rpc.codec.RpcRequest
;
import
com.xxl.job.core.rpc.codec.RpcResponse
;
import
com.xxl.job.core.rpc.netcom.NetComServerFactory
;
import
com.xxl.job.core.rpc.serialize.HessianSerializer
;
import
com.xxl.job.core.util.HttpClientUtil
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.InitializingBean
;
import
org.springframework.stereotype.Controller
;
import
org.springframework.web.bind.annotation.RequestMapping
;
import
javax.servlet.ServletException
;
import
javax.servlet.http.HttpServletRequest
;
import
javax.servlet.http.HttpServletResponse
;
import
java.io.IOException
;
import
java.io.OutputStream
;
/**
* Created by xuxueli on 17/5/10.
*/
@Controller
public
class
JobApiController
{
private
static
Logger
logger
=
LoggerFactory
.
getLogger
(
JobApiController
.
class
);
public
class
JobApiController
implements
InitializingBean
{
private
RpcResponse
doInvoke
(
HttpServletRequest
request
)
{
try
{
// deserialize request
byte
[]
requestBytes
=
HttpClientUtil
.
readBytes
(
request
);
if
(
requestBytes
==
null
||
requestBytes
.
length
==
0
)
{
RpcResponse
rpcResponse
=
new
RpcResponse
();
rpcResponse
.
setError
(
"RpcRequest byte[] is null"
);
return
rpcResponse
;
}
RpcRequest
rpcRequest
=
(
RpcRequest
)
HessianSerializer
.
deserialize
(
requestBytes
,
RpcRequest
.
class
);
// invoke
RpcResponse
rpcResponse
=
NetComServerFactory
.
invokeService
(
rpcRequest
,
null
);
return
rpcResponse
;
}
catch
(
Exception
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
@Override
public
void
afterPropertiesSet
()
throws
Exception
{
RpcResponse
rpcResponse
=
new
RpcResponse
();
rpcResponse
.
setError
(
"Server-error:"
+
e
.
getMessage
());
return
rpcResponse
;
}
}
@RequestMapping
(
AdminBiz
.
MAPPING
)
@PermessionLimit
(
limit
=
false
)
public
void
api
(
HttpServletRequest
request
,
HttpServletResponse
response
)
throws
IOException
{
// invoke
RpcResponse
rpcResponse
=
doInvoke
(
request
);
// serialize response
byte
[]
responseBytes
=
HessianSerializer
.
serialize
(
rpcResponse
);
response
.
setContentType
(
"text/html;charset=utf-8"
);
response
.
setStatus
(
HttpServletResponse
.
SC_OK
);
//baseRequest.setHandled(true);
OutputStream
out
=
response
.
getOutputStream
();
out
.
write
(
responseBytes
);
out
.
flush
();
public
void
api
(
HttpServletRequest
request
,
HttpServletResponse
response
)
throws
IOException
,
ServletException
{
XxlJobDynamicScheduler
.
invokeAdminService
(
request
,
response
);
}
}
xxl-job-admin/src/main/java/com/xxl/job/admin/controller/interceptor/PermissionInterceptor.java
浏览文件 @
443c946e
...
...
@@ -3,8 +3,8 @@ package com.xxl.job.admin.controller.interceptor;
import
com.xxl.job.admin.controller.annotation.PermessionLimit
;
import
com.xxl.job.admin.core.conf.XxlJobAdminConfig
;
import
com.xxl.job.admin.core.util.CookieUtil
;
import
org.apache.commons.codec.digest.DigestUtils
;
import
org.springframework.stereotype.Component
;
import
org.springframework.util.DigestUtils
;
import
org.springframework.web.method.HandlerMethod
;
import
org.springframework.web.servlet.handler.HandlerInterceptorAdapter
;
...
...
@@ -29,7 +29,7 @@ public class PermissionInterceptor extends HandlerInterceptorAdapter {
String
password
=
XxlJobAdminConfig
.
getAdminConfig
().
getLoginPassword
();
// login token
String
tokenTmp
=
DigestUtils
.
md5
Hex
(
username
+
"_"
+
password
);
String
tokenTmp
=
DigestUtils
.
md5
DigestAsHex
(
String
.
valueOf
(
username
+
"_"
+
password
).
getBytes
());
//.getBytes("UTF-8")
tokenTmp
=
new
BigInteger
(
1
,
tokenTmp
.
getBytes
()).
toString
(
16
);
LOGIN_IDENTITY_TOKEN
=
tokenTmp
;
...
...
@@ -40,7 +40,7 @@ public class PermissionInterceptor extends HandlerInterceptorAdapter {
public
static
boolean
login
(
HttpServletResponse
response
,
String
username
,
String
password
,
boolean
ifRemember
){
// login token
String
tokenTmp
=
DigestUtils
.
md5
Hex
(
username
+
"_"
+
password
);
String
tokenTmp
=
DigestUtils
.
md5
DigestAsHex
(
String
.
valueOf
(
username
+
"_"
+
password
).
getBytes
()
);
tokenTmp
=
new
BigInteger
(
1
,
tokenTmp
.
getBytes
()).
toString
(
16
);
if
(!
getLoginIdentityToken
().
equals
(
tokenTmp
)){
...
...
xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java
浏览文件 @
443c946e
...
...
@@ -7,27 +7,28 @@ import com.xxl.job.admin.core.thread.JobFailMonitorHelper;
import
com.xxl.job.admin.core.thread.JobRegistryMonitorHelper
;
import
com.xxl.job.admin.core.thread.JobTriggerPoolHelper
;
import
com.xxl.job.admin.core.util.I18nUtil
;
import
com.xxl.job.admin.dao.XxlJobGroupDao
;
import
com.xxl.job.admin.dao.XxlJobInfoDao
;
import
com.xxl.job.admin.dao.XxlJobLogDao
;
import
com.xxl.job.admin.dao.XxlJobRegistryDao
;
import
com.xxl.job.core.biz.AdminBiz
;
import
com.xxl.job.core.biz.ExecutorBiz
;
import
com.xxl.job.core.enums.ExecutorBlockStrategyEnum
;
import
com.xxl.job.core.rpc.netcom.NetComClientProxy
;
import
com.xxl.job.core.rpc.netcom.NetComServerFactory
;
import
com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory
;
import
com.xxl.rpc.remoting.invoker.call.CallType
;
import
com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean
;
import
com.xxl.rpc.remoting.net.NetEnum
;
import
com.xxl.rpc.remoting.net.impl.jetty.server.JettyServerHandler
;
import
com.xxl.rpc.remoting.provider.XxlRpcProviderFactory
;
import
com.xxl.rpc.serialize.Serializer
;
import
org.eclipse.jetty.server.Request
;
import
org.quartz.*
;
import
org.quartz.Trigger.TriggerState
;
import
org.quartz.impl.triggers.CronTriggerImpl
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.BeansException
;
import
org.springframework.beans.factory.DisposableBean
;
import
org.springframework.beans.factory.InitializingBean
;
import
org.springframework.context.ApplicationContext
;
import
org.springframework.context.ApplicationContextAware
;
import
org.springframework.util.Assert
;
import
javax.servlet.ServletException
;
import
javax.servlet.http.HttpServletRequest
;
import
javax.servlet.http.HttpServletResponse
;
import
java.io.IOException
;
import
java.util.Date
;
import
java.util.HashSet
;
import
java.util.concurrent.ConcurrentHashMap
;
...
...
@@ -50,32 +51,26 @@ public final class XxlJobDynamicScheduler {
// ---------------------- init + destroy ----------------------
public
void
start
()
throws
Exception
{
// valid
Assert
.
notNull
(
scheduler
,
"quartz scheduler is null"
);
// init i18n
initI18n
();
// admin registry monitor run
JobRegistryMonitorHelper
.
getInstance
().
start
();
// admin monitor run
JobFailMonitorHelper
.
getInstance
().
start
();
// admin-server(spring-mvc)
NetComServerFactory
.
putService
(
AdminBiz
.
class
,
XxlJobAdminConfig
.
getAdminConfig
().
getAdminBiz
());
NetComServerFactory
.
setAccessToken
(
XxlJobAdminConfig
.
getAdminConfig
().
getAccessToken
());
// init i18n
initI18n
();
// admin-server
initRpcProvider
();
// valid
Assert
.
notNull
(
scheduler
,
"quartz scheduler is null"
);
logger
.
info
(
">>>>>>>>> init xxl-job admin success."
);
}
private
void
initI18n
(){
for
(
ExecutorBlockStrategyEnum
item:
ExecutorBlockStrategyEnum
.
values
())
{
item
.
setTitle
(
I18nUtil
.
getString
(
"jobconf_block_"
.
concat
(
item
.
name
())));
}
}
public
void
destroy
(){
public
void
destroy
()
throws
Exception
{
// admin trigger pool stop
JobTriggerPoolHelper
.
toStop
();
...
...
@@ -84,6 +79,38 @@ public final class XxlJobDynamicScheduler {
// admin monitor stop
JobFailMonitorHelper
.
getInstance
().
toStop
();
// admin-server
stopRpcProvider
();
}
// ---------------------- I18n ----------------------
private
void
initI18n
(){
for
(
ExecutorBlockStrategyEnum
item:
ExecutorBlockStrategyEnum
.
values
())
{
item
.
setTitle
(
I18nUtil
.
getString
(
"jobconf_block_"
.
concat
(
item
.
name
())));
}
}
// ---------------------- admin rpc provider (no server version) ----------------------
private
static
JettyServerHandler
jettyServerHandler
;
private
void
initRpcProvider
(){
// init
XxlRpcProviderFactory
xxlRpcProviderFactory
=
new
XxlRpcProviderFactory
();
xxlRpcProviderFactory
.
initConfig
(
NetEnum
.
JETTY
,
Serializer
.
SerializeEnum
.
HESSIAN
.
getSerializer
(),
null
,
0
,
XxlJobAdminConfig
.
getAdminConfig
().
getAccessToken
(),
null
,
null
);
// add services
xxlRpcProviderFactory
.
addService
(
AdminBiz
.
class
.
getName
(),
null
,
XxlJobAdminConfig
.
getAdminConfig
().
getAdminBiz
());
// jetty handler
jettyServerHandler
=
new
JettyServerHandler
(
xxlRpcProviderFactory
);
}
private
void
stopRpcProvider
()
throws
Exception
{
new
XxlRpcInvokerFactory
().
stop
();
}
public
static
void
invokeAdminService
(
HttpServletRequest
request
,
HttpServletResponse
response
)
throws
IOException
,
ServletException
{
jettyServerHandler
.
handle
(
null
,
new
Request
(
null
,
null
),
request
,
response
);
}
...
...
@@ -103,7 +130,9 @@ public final class XxlJobDynamicScheduler {
}
// set-cache
executorBiz
=
(
ExecutorBiz
)
new
NetComClientProxy
(
ExecutorBiz
.
class
,
address
,
XxlJobAdminConfig
.
getAdminConfig
().
getAccessToken
()).
getObject
();
executorBiz
=
(
ExecutorBiz
)
new
XxlRpcReferenceBean
(
NetEnum
.
JETTY
,
Serializer
.
SerializeEnum
.
HESSIAN
.
getSerializer
(),
CallType
.
SYNC
,
ExecutorBiz
.
class
,
null
,
10000
,
address
,
XxlJobAdminConfig
.
getAdminConfig
().
getAccessToken
(),
null
).
getObject
();
executorBizRepository
.
put
(
address
,
executorBiz
);
return
executorBiz
;
}
...
...
xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java
浏览文件 @
443c946e
...
...
@@ -12,7 +12,7 @@ import com.xxl.job.core.biz.ExecutorBiz;
import
com.xxl.job.core.biz.model.ReturnT
;
import
com.xxl.job.core.biz.model.TriggerParam
;
import
com.xxl.job.core.enums.ExecutorBlockStrategyEnum
;
import
com.xxl.
job.core
.util.IpUtil
;
import
com.xxl.
rpc
.util.IpUtil
;
import
org.apache.commons.collections4.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.Logger
;
...
...
xxl-job-admin/src/main/resources/application.properties
浏览文件 @
443c946e
...
...
@@ -16,7 +16,7 @@ spring.freemarker.request-context-attribute=request
mybatis.mapper-locations
=
classpath:/mybatis-mapper/*Mapper.xml
### xxl-job, datasource
spring.datasource.url
=
jdbc:mysql://1
27.0.0.1
:3306/xxl-job?Unicode=true&characterEncoding=UTF-8
spring.datasource.url
=
jdbc:mysql://1
92.168.99.100
:3306/xxl-job?Unicode=true&characterEncoding=UTF-8
spring.datasource.username
=
root
spring.datasource.password
=
root_pwd
spring.datasource.driver-class-name
=
com.mysql.jdbc.Driver
...
...
xxl-job-admin/src/test/java/com/xxl/job/adminbiz/AdminBizTest.java
浏览文件 @
443c946e
...
...
@@ -4,7 +4,10 @@ import com.xxl.job.core.biz.AdminBiz;
import
com.xxl.job.core.biz.model.RegistryParam
;
import
com.xxl.job.core.biz.model.ReturnT
;
import
com.xxl.job.core.enums.RegistryConfig
;
import
com.xxl.job.core.rpc.netcom.NetComClientProxy
;
import
com.xxl.rpc.remoting.invoker.call.CallType
;
import
com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean
;
import
com.xxl.rpc.remoting.net.NetEnum
;
import
com.xxl.rpc.serialize.Serializer
;
import
org.junit.Assert
;
import
org.junit.Test
;
...
...
@@ -26,7 +29,9 @@ public class AdminBizTest {
*/
@Test
public
void
registryTest
()
throws
Exception
{
AdminBiz
adminBiz
=
(
AdminBiz
)
new
NetComClientProxy
(
AdminBiz
.
class
,
addressUrl
,
accessToken
).
getObject
();
addressUrl
=
addressUrl
.
replace
(
"http://"
,
""
);
AdminBiz
adminBiz
=
(
AdminBiz
)
new
XxlRpcReferenceBean
(
NetEnum
.
JETTY
,
Serializer
.
SerializeEnum
.
HESSIAN
.
getSerializer
(),
CallType
.
SYNC
,
AdminBiz
.
class
,
null
,
10000
,
addressUrl
,
accessToken
,
null
).
getObject
();
// test executor registry
RegistryParam
registryParam
=
new
RegistryParam
(
RegistryConfig
.
RegistType
.
EXECUTOR
.
name
(),
"xxl-job-executor-example"
,
"127.0.0.1:9999"
);
...
...
@@ -41,7 +46,9 @@ public class AdminBizTest {
*/
@Test
public
void
registryRemove
()
throws
Exception
{
AdminBiz
adminBiz
=
(
AdminBiz
)
new
NetComClientProxy
(
AdminBiz
.
class
,
addressUrl
,
accessToken
).
getObject
();
addressUrl
=
addressUrl
.
replace
(
"http://"
,
""
);
AdminBiz
adminBiz
=
(
AdminBiz
)
new
XxlRpcReferenceBean
(
NetEnum
.
JETTY
,
Serializer
.
SerializeEnum
.
HESSIAN
.
getSerializer
(),
CallType
.
SYNC
,
AdminBiz
.
class
,
null
,
10000
,
addressUrl
,
accessToken
,
null
).
getObject
();
// test executor registry remove
RegistryParam
registryParam
=
new
RegistryParam
(
RegistryConfig
.
RegistType
.
EXECUTOR
.
name
(),
"xxl-job-executor-example"
,
"127.0.0.1:9999"
);
...
...
xxl-job-core/pom.xml
浏览文件 @
443c946e
...
...
@@ -15,32 +15,11 @@
<dependencies>
<!--
slf4j
-->
<!--
xxl-rpc-core
-->
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
<version>
${slf4j-api.version}
</version>
</dependency>
<!-- jetty -->
<dependency>
<groupId>
org.eclipse.jetty
</groupId>
<artifactId>
jetty-server
</artifactId>
<version>
${jetty-server.version}
</version>
</dependency>
<!-- httpclient -->
<dependency>
<groupId>
org.apache.httpcomponents
</groupId>
<artifactId>
httpclient
</artifactId>
<version>
${httpclient.version}
</version>
</dependency>
<!-- hessian -->
<dependency>
<groupId>
com.caucho
</groupId>
<artifactId>
hessian
</artifactId>
<version>
${hessian.version}
</version>
<groupId>
com.xuxueli
</groupId>
<artifactId>
xxl-rpc-core
</artifactId>
<version>
1.2.0
</version>
</dependency>
<!-- groovy-all -->
...
...
xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
浏览文件 @
443c946e
...
...
@@ -6,11 +6,19 @@ import com.xxl.job.core.biz.impl.ExecutorBizImpl;
import
com.xxl.job.core.handler.IJobHandler
;
import
com.xxl.job.core.handler.annotation.JobHandler
;
import
com.xxl.job.core.log.XxlJobFileAppender
;
import
com.xxl.job.core.rpc.netcom.NetComClientProxy
;
import
com.xxl.job.core.rpc.netcom.NetComServerFactory
;
import
com.xxl.job.core.thread.ExecutorRegistryThread
;
import
com.xxl.job.core.thread.JobLogFileCleanThread
;
import
com.xxl.job.core.thread.JobThread
;
import
com.xxl.job.core.util.NetUtil
;
import
com.xxl.job.core.thread.TriggerCallbackThread
;
import
com.xxl.rpc.registry.impl.LocalServiceRegistry
;
import
com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory
;
import
com.xxl.rpc.remoting.invoker.call.CallType
;
import
com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean
;
import
com.xxl.rpc.remoting.net.NetEnum
;
import
com.xxl.rpc.remoting.provider.XxlRpcProviderFactory
;
import
com.xxl.rpc.serialize.Serializer
;
import
com.xxl.rpc.util.IpUtil
;
import
com.xxl.rpc.util.NetUtil
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.BeansException
;
...
...
@@ -30,7 +38,7 @@ public class XxlJobExecutor implements ApplicationContextAware {
// ---------------------- param ----------------------
private
String
adminAddresses
;
private
String
appName
;
private
static
String
appName
;
private
String
ip
;
private
int
port
;
private
String
accessToken
;
...
...
@@ -72,20 +80,26 @@ public class XxlJobExecutor implements ApplicationContextAware {
// ---------------------- start + stop ----------------------
public
void
start
()
throws
Exception
{
// init admin-client
initAdminBizList
(
adminAddresses
,
accessToken
);
// init executor-jobHandlerRepository
initJobHandlerRepository
(
applicationContext
);
// init logpath
XxlJobFileAppender
.
initLogPath
(
logPath
);
// init executor-server
initExecutorServer
(
port
,
ip
,
appName
,
accessToken
);
// init JobHandler Repository
initJobHandlerRepository
(
applicationContext
);
// init admin-client
initAdminBizList
(
adminAddresses
,
accessToken
);
// init JobLogFileCleanThread
JobLogFileCleanThread
.
getInstance
().
start
(
logRetentionDays
);
// init TriggerCallbackThread
TriggerCallbackThread
.
getInstance
().
start
();
// init executor-server
port
=
port
>
0
?
port:
NetUtil
.
findAvailablePort
(
9999
);
ip
=
(
ip
!=
null
&&
ip
.
trim
().
length
()>
0
)?
ip:
IpUtil
.
getIp
();
initRpcProvider
(
ip
,
port
,
appName
,
accessToken
);
}
public
void
destroy
(){
// destory jobThreadRepository
...
...
@@ -96,22 +110,35 @@ public class XxlJobExecutor implements ApplicationContextAware {
jobThreadRepository
.
clear
();
}
// destory executor-server
stopExecutorServer
();
// destory JobLogFileCleanThread
JobLogFileCleanThread
.
getInstance
().
toStop
();
// destory TriggerCallbackThread
TriggerCallbackThread
.
getInstance
().
toStop
();
// destory executor-server
stopRpcProvider
();
}
// ---------------------- admin-client ----------------------
// ---------------------- admin-client
(rpc invoker)
----------------------
private
static
List
<
AdminBiz
>
adminBizList
;
private
static
void
initAdminBizList
(
String
adminAddresses
,
String
accessToken
)
throws
Exception
{
if
(
adminAddresses
!=
null
&&
adminAddresses
.
trim
().
length
()>
0
)
{
for
(
String
address:
adminAddresses
.
trim
().
split
(
","
))
{
if
(
address
!=
null
&&
address
.
trim
().
length
()>
0
)
{
String
addressUrl
=
address
.
concat
(
AdminBiz
.
MAPPING
);
AdminBiz
adminBiz
=
(
AdminBiz
)
new
NetComClientProxy
(
AdminBiz
.
class
,
addressUrl
,
accessToken
).
getObject
();
if
(
addressUrl
.
startsWith
(
"http://"
))
{
addressUrl
=
addressUrl
.
replace
(
"http://"
,
""
);
}
if
(
addressUrl
.
startsWith
(
"https://"
))
{
addressUrl
=
addressUrl
.
replace
(
"https://"
,
""
);
}
AdminBiz
adminBiz
=
(
AdminBiz
)
new
XxlRpcReferenceBean
(
NetEnum
.
JETTY
,
Serializer
.
SerializeEnum
.
HESSIAN
.
getSerializer
(),
CallType
.
SYNC
,
AdminBiz
.
class
,
null
,
10000
,
addressUrl
,
accessToken
,
null
).
getObject
();
if
(
adminBizList
==
null
)
{
adminBizList
=
new
ArrayList
<
AdminBiz
>();
}
...
...
@@ -125,19 +152,59 @@ public class XxlJobExecutor implements ApplicationContextAware {
}
// ---------------------- executor-server(jetty) ----------------------
private
NetComServerFactory
serverFactory
=
new
NetComServerFactory
();
private
void
initExecutorServer
(
int
port
,
String
ip
,
String
appName
,
String
accessToken
)
throws
Exception
{
// valid param
port
=
port
>
0
?
port:
NetUtil
.
findAvailablePort
(
9999
);
// ---------------------- executor-server (rpc provider) ----------------------
private
XxlRpcInvokerFactory
xxlRpcInvokerFactory
=
null
;
private
XxlRpcProviderFactory
xxlRpcProviderFactory
=
null
;
private
void
initRpcProvider
(
String
ip
,
int
port
,
String
appName
,
String
accessToken
)
throws
Exception
{
// init invoker factory
xxlRpcInvokerFactory
=
new
XxlRpcInvokerFactory
();
// init, provider factory
xxlRpcProviderFactory
=
new
XxlRpcProviderFactory
();
xxlRpcProviderFactory
.
initConfig
(
NetEnum
.
JETTY
,
Serializer
.
SerializeEnum
.
HESSIAN
.
getSerializer
(),
ip
,
port
,
accessToken
,
ExecutorServiceRegistry
.
class
,
null
);
// add services
xxlRpcProviderFactory
.
addService
(
ExecutorBiz
.
class
.
getName
(),
null
,
new
ExecutorBizImpl
());
// start
xxlRpcProviderFactory
.
start
();
// start server
NetComServerFactory
.
putService
(
ExecutorBiz
.
class
,
new
ExecutorBizImpl
());
// rpc-service, base on jetty
NetComServerFactory
.
setAccessToken
(
accessToken
);
serverFactory
.
start
(
port
,
ip
,
appName
);
// jetty + registry
}
private
void
stopExecutorServer
()
{
serverFactory
.
destroy
();
// jetty + registry + callback
public
static
class
ExecutorServiceRegistry
extends
LocalServiceRegistry
{
@Override
public
boolean
registry
(
String
key
,
String
value
)
{
// start registry
if
(
ExecutorBiz
.
class
.
getName
().
equalsIgnoreCase
(
key
))
{
ExecutorRegistryThread
.
getInstance
().
start
(
appName
,
value
);
}
return
super
.
registry
(
key
,
value
);
}
@Override
public
void
stop
()
{
// stop registry
ExecutorRegistryThread
.
getInstance
().
toStop
();
super
.
stop
();
}
}
private
void
stopRpcProvider
()
{
// stop invoker factory
try
{
xxlRpcInvokerFactory
.
stop
();
}
catch
(
Exception
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
// stop provider factory
try
{
xxlRpcProviderFactory
.
stop
();
}
catch
(
Exception
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
}
...
...
xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcRequest.java
deleted
100644 → 0
浏览文件 @
02dc2a4c
package
com
.
xxl
.
job
.
core
.
rpc
.
codec
;
import
java.io.Serializable
;
import
java.util.Arrays
;
/**
* request
* @author xuxueli 2015-10-29 19:39:12
*/
public
class
RpcRequest
implements
Serializable
{
private
static
final
long
serialVersionUID
=
1L
;
private
String
serverAddress
;
private
long
createMillisTime
;
private
String
accessToken
;
private
String
className
;
private
String
methodName
;
private
Class
<?>[]
parameterTypes
;
private
Object
[]
parameters
;
public
String
getServerAddress
()
{
return
serverAddress
;
}
public
void
setServerAddress
(
String
serverAddress
)
{
this
.
serverAddress
=
serverAddress
;
}
public
long
getCreateMillisTime
()
{
return
createMillisTime
;
}
public
void
setCreateMillisTime
(
long
createMillisTime
)
{
this
.
createMillisTime
=
createMillisTime
;
}
public
String
getAccessToken
()
{
return
accessToken
;
}
public
void
setAccessToken
(
String
accessToken
)
{
this
.
accessToken
=
accessToken
;
}
public
String
getClassName
()
{
return
className
;
}
public
void
setClassName
(
String
className
)
{
this
.
className
=
className
;
}
public
String
getMethodName
()
{
return
methodName
;
}
public
void
setMethodName
(
String
methodName
)
{
this
.
methodName
=
methodName
;
}
public
Class
<?>[]
getParameterTypes
()
{
return
parameterTypes
;
}
public
void
setParameterTypes
(
Class
<?>[]
parameterTypes
)
{
this
.
parameterTypes
=
parameterTypes
;
}
public
Object
[]
getParameters
()
{
return
parameters
;
}
public
void
setParameters
(
Object
[]
parameters
)
{
this
.
parameters
=
parameters
;
}
@Override
public
String
toString
()
{
return
"RpcRequest{"
+
"serverAddress='"
+
serverAddress
+
'\''
+
", createMillisTime="
+
createMillisTime
+
", accessToken='"
+
accessToken
+
'\''
+
", className='"
+
className
+
'\''
+
", methodName='"
+
methodName
+
'\''
+
", parameterTypes="
+
Arrays
.
toString
(
parameterTypes
)
+
", parameters="
+
Arrays
.
toString
(
parameters
)
+
'}'
;
}
}
xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcResponse.java
deleted
100644 → 0
浏览文件 @
02dc2a4c
package
com
.
xxl
.
job
.
core
.
rpc
.
codec
;
import
java.io.Serializable
;
/**
* response
* @author xuxueli 2015-10-29 19:39:54
*/
public
class
RpcResponse
implements
Serializable
{
private
static
final
long
serialVersionUID
=
1L
;
private
String
error
;
private
Object
result
;
public
boolean
isError
()
{
return
error
!=
null
;
}
public
String
getError
()
{
return
error
;
}
public
void
setError
(
String
error
)
{
this
.
error
=
error
;
}
public
Object
getResult
()
{
return
result
;
}
public
void
setResult
(
Object
result
)
{
this
.
result
=
result
;
}
@Override
public
String
toString
()
{
return
"NettyResponse [error="
+
error
+
", result="
+
result
+
"]"
;
}
}
xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComClientProxy.java
deleted
100644 → 0
浏览文件 @
02dc2a4c
package
com
.
xxl
.
job
.
core
.
rpc
.
netcom
;
import
com.xxl.job.core.rpc.codec.RpcRequest
;
import
com.xxl.job.core.rpc.codec.RpcResponse
;
import
com.xxl.job.core.rpc.netcom.jetty.client.JettyClient
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.FactoryBean
;
import
java.lang.reflect.InvocationHandler
;
import
java.lang.reflect.Method
;
import
java.lang.reflect.Proxy
;
/**
* rpc proxy
* @author xuxueli 2015-10-29 20:18:32
*/
public
class
NetComClientProxy
implements
FactoryBean
<
Object
>
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
NetComClientProxy
.
class
);
// ---------------------- config ----------------------
private
Class
<?>
iface
;
private
String
serverAddress
;
private
String
accessToken
;
private
JettyClient
client
=
new
JettyClient
();
public
NetComClientProxy
(
Class
<?>
iface
,
String
serverAddress
,
String
accessToken
)
{
this
.
iface
=
iface
;
this
.
serverAddress
=
serverAddress
;
this
.
accessToken
=
accessToken
;
}
@Override
public
Object
getObject
()
throws
Exception
{
return
Proxy
.
newProxyInstance
(
Thread
.
currentThread
()
.
getContextClassLoader
(),
new
Class
[]
{
iface
},
new
InvocationHandler
()
{
@Override
public
Object
invoke
(
Object
proxy
,
Method
method
,
Object
[]
args
)
throws
Throwable
{
// filter method like "Object.toString()"
if
(
Object
.
class
.
getName
().
equals
(
method
.
getDeclaringClass
().
getName
()))
{
logger
.
error
(
">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]"
,
method
.
getDeclaringClass
().
getName
(),
method
.
getName
());
throw
new
RuntimeException
(
"xxl-rpc proxy class-method not support"
);
}
// request
RpcRequest
request
=
new
RpcRequest
();
request
.
setServerAddress
(
serverAddress
);
request
.
setCreateMillisTime
(
System
.
currentTimeMillis
());
request
.
setAccessToken
(
accessToken
);
request
.
setClassName
(
method
.
getDeclaringClass
().
getName
());
request
.
setMethodName
(
method
.
getName
());
request
.
setParameterTypes
(
method
.
getParameterTypes
());
request
.
setParameters
(
args
);
// send
RpcResponse
response
=
client
.
send
(
request
);
// valid response
if
(
response
==
null
)
{
throw
new
Exception
(
"Network request fail, response not found."
);
}
if
(
response
.
isError
())
{
throw
new
RuntimeException
(
response
.
getError
());
}
else
{
return
response
.
getResult
();
}
}
});
}
@Override
public
Class
<?>
getObjectType
()
{
return
iface
;
}
@Override
public
boolean
isSingleton
()
{
return
false
;
}
}
xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComServerFactory.java
deleted
100644 → 0
浏览文件 @
02dc2a4c
package
com
.
xxl
.
job
.
core
.
rpc
.
netcom
;
import
com.xxl.job.core.biz.model.ReturnT
;
import
com.xxl.job.core.rpc.codec.RpcRequest
;
import
com.xxl.job.core.rpc.codec.RpcResponse
;
import
com.xxl.job.core.rpc.netcom.jetty.server.JettyServer
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.cglib.reflect.FastClass
;
import
org.springframework.cglib.reflect.FastMethod
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* netcom init
* @author xuxueli 2015-10-31 22:54:27
*/
public
class
NetComServerFactory
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
NetComServerFactory
.
class
);
// ---------------------- server start ----------------------
JettyServer
server
=
new
JettyServer
();
public
void
start
(
int
port
,
String
ip
,
String
appName
)
throws
Exception
{
server
.
start
(
port
,
ip
,
appName
);
}
// ---------------------- server destroy ----------------------
public
void
destroy
(){
server
.
destroy
();
}
// ---------------------- server instance ----------------------
/**
* init local rpc service map
*/
private
static
Map
<
String
,
Object
>
serviceMap
=
new
HashMap
<
String
,
Object
>();
private
static
String
accessToken
;
public
static
void
putService
(
Class
<?>
iface
,
Object
serviceBean
){
serviceMap
.
put
(
iface
.
getName
(),
serviceBean
);
}
public
static
void
setAccessToken
(
String
accessToken
)
{
NetComServerFactory
.
accessToken
=
accessToken
;
}
public
static
RpcResponse
invokeService
(
RpcRequest
request
,
Object
serviceBean
)
{
if
(
serviceBean
==
null
)
{
serviceBean
=
serviceMap
.
get
(
request
.
getClassName
());
}
if
(
serviceBean
==
null
)
{
// TODO
}
RpcResponse
response
=
new
RpcResponse
();
if
(
System
.
currentTimeMillis
()
-
request
.
getCreateMillisTime
()
>
180000
)
{
response
.
setResult
(
new
ReturnT
<
String
>(
ReturnT
.
FAIL_CODE
,
"The timestamp difference between admin and executor exceeds the limit."
));
return
response
;
}
if
(
accessToken
!=
null
&&
accessToken
.
trim
().
length
()>
0
&&
!
accessToken
.
trim
().
equals
(
request
.
getAccessToken
()))
{
response
.
setResult
(
new
ReturnT
<
String
>(
ReturnT
.
FAIL_CODE
,
"The access token["
+
request
.
getAccessToken
()
+
"] is wrong."
));
return
response
;
}
try
{
Class
<?>
serviceClass
=
serviceBean
.
getClass
();
String
methodName
=
request
.
getMethodName
();
Class
<?>[]
parameterTypes
=
request
.
getParameterTypes
();
Object
[]
parameters
=
request
.
getParameters
();
FastClass
serviceFastClass
=
FastClass
.
create
(
serviceClass
);
FastMethod
serviceFastMethod
=
serviceFastClass
.
getMethod
(
methodName
,
parameterTypes
);
Object
result
=
serviceFastMethod
.
invoke
(
serviceBean
,
parameters
);
response
.
setResult
(
result
);
}
catch
(
Throwable
t
)
{
t
.
printStackTrace
();
response
.
setError
(
t
.
getMessage
());
}
return
response
;
}
}
xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/client/JettyClient.java
deleted
100644 → 0
浏览文件 @
02dc2a4c
package
com
.
xxl
.
job
.
core
.
rpc
.
netcom
.
jetty
.
client
;
import
com.xxl.job.core.rpc.codec.RpcRequest
;
import
com.xxl.job.core.rpc.codec.RpcResponse
;
import
com.xxl.job.core.rpc.serialize.HessianSerializer
;
import
com.xxl.job.core.util.HttpClientUtil
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* jetty client
* @author xuxueli 2015-11-24 22:25:15
*/
public
class
JettyClient
{
private
static
Logger
logger
=
LoggerFactory
.
getLogger
(
JettyClient
.
class
);
public
RpcResponse
send
(
RpcRequest
request
)
throws
Exception
{
try
{
// serialize request
byte
[]
requestBytes
=
HessianSerializer
.
serialize
(
request
);
// reqURL
String
reqURL
=
request
.
getServerAddress
();
if
(
reqURL
!=
null
&&
reqURL
.
toLowerCase
().
indexOf
(
"http"
)==-
1
)
{
reqURL
=
"http://"
+
request
.
getServerAddress
()
+
"/"
;
// IP:PORT, need parse to url
}
// remote invoke
byte
[]
responseBytes
=
HttpClientUtil
.
postRequest
(
reqURL
,
requestBytes
);
if
(
responseBytes
==
null
||
responseBytes
.
length
==
0
)
{
RpcResponse
rpcResponse
=
new
RpcResponse
();
rpcResponse
.
setError
(
"Network request fail, RpcResponse byte[] is null"
);
return
rpcResponse
;
}
// deserialize response
RpcResponse
rpcResponse
=
(
RpcResponse
)
HessianSerializer
.
deserialize
(
responseBytes
,
RpcResponse
.
class
);
return
rpcResponse
;
}
catch
(
Exception
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
RpcResponse
rpcResponse
=
new
RpcResponse
();
rpcResponse
.
setError
(
"Network request error: "
+
e
.
getMessage
());
return
rpcResponse
;
}
}
}
xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java
deleted
100644 → 0
浏览文件 @
02dc2a4c
package
com
.
xxl
.
job
.
core
.
rpc
.
netcom
.
jetty
.
server
;
import
com.xxl.job.core.thread.ExecutorRegistryThread
;
import
com.xxl.job.core.thread.TriggerCallbackThread
;
import
org.eclipse.jetty.server.Connector
;
import
org.eclipse.jetty.server.Handler
;
import
org.eclipse.jetty.server.Server
;
import
org.eclipse.jetty.server.ServerConnector
;
import
org.eclipse.jetty.server.handler.HandlerCollection
;
import
org.eclipse.jetty.util.thread.ExecutorThreadPool
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* rpc jetty server
* @author xuxueli 2015-11-19 22:29:03
*/
public
class
JettyServer
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
JettyServer
.
class
);
private
Server
server
;
private
Thread
thread
;
public
void
start
(
final
int
port
,
final
String
ip
,
final
String
appName
)
throws
Exception
{
thread
=
new
Thread
(
new
Runnable
()
{
@Override
public
void
run
()
{
// The Server
server
=
new
Server
(
new
ExecutorThreadPool
(
1000
));
// HTTP connector
ServerConnector
connector
=
new
ServerConnector
(
server
);
if
(
ip
!=
null
&&
ip
.
trim
().
length
()>
0
)
{
//connector.setHost(ip); // The network interface this connector binds to as an IP address or a hostname. If null or 0.0.0.0, then bind to all interfaces.
}
connector
.
setPort
(
port
);
server
.
setConnectors
(
new
Connector
[]{
connector
});
// Set a handler
HandlerCollection
handlerc
=
new
HandlerCollection
();
handlerc
.
setHandlers
(
new
Handler
[]{
new
JettyServerHandler
()});
server
.
setHandler
(
handlerc
);
try
{
// Start server
server
.
start
();
logger
.
info
(
">>>>>>>>>>> xxl-job jetty server start success at port:{}."
,
port
);
// Start Registry-Server
ExecutorRegistryThread
.
getInstance
().
start
(
port
,
ip
,
appName
);
// Start Callback-Server
TriggerCallbackThread
.
getInstance
().
start
();
server
.
join
();
// block until thread stopped
logger
.
info
(
">>>>>>>>>>> xxl-rpc server join success, netcon={}, port={}"
,
JettyServer
.
class
.
getName
(),
port
);
}
catch
(
Exception
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
finally
{
//destroy();
}
}
});
thread
.
setDaemon
(
true
);
// daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread
.
start
();
}
public
void
destroy
()
{
// destroy Registry-Server
ExecutorRegistryThread
.
getInstance
().
toStop
();
// destroy Callback-Server
TriggerCallbackThread
.
getInstance
().
toStop
();
// destroy server
if
(
server
!=
null
)
{
try
{
server
.
stop
();
server
.
destroy
();
}
catch
(
Exception
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
}
if
(
thread
.
isAlive
())
{
thread
.
interrupt
();
}
logger
.
info
(
">>>>>>>>>>> xxl-rpc server destroy success, netcon={}"
,
JettyServer
.
class
.
getName
());
}
}
xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServerHandler.java
deleted
100644 → 0
浏览文件 @
02dc2a4c
package
com
.
xxl
.
job
.
core
.
rpc
.
netcom
.
jetty
.
server
;
import
com.xxl.job.core.rpc.codec.RpcRequest
;
import
com.xxl.job.core.rpc.codec.RpcResponse
;
import
com.xxl.job.core.rpc.netcom.NetComServerFactory
;
import
com.xxl.job.core.rpc.serialize.HessianSerializer
;
import
com.xxl.job.core.util.HttpClientUtil
;
import
org.eclipse.jetty.server.Request
;
import
org.eclipse.jetty.server.handler.AbstractHandler
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
javax.servlet.ServletException
;
import
javax.servlet.http.HttpServletRequest
;
import
javax.servlet.http.HttpServletResponse
;
import
java.io.IOException
;
import
java.io.OutputStream
;
/**
* jetty handler
* @author xuxueli 2015-11-19 22:32:36
*/
public
class
JettyServerHandler
extends
AbstractHandler
{
private
static
Logger
logger
=
LoggerFactory
.
getLogger
(
JettyServerHandler
.
class
);
@Override
public
void
handle
(
String
target
,
Request
baseRequest
,
HttpServletRequest
request
,
HttpServletResponse
response
)
throws
IOException
,
ServletException
{
// invoke
RpcResponse
rpcResponse
=
doInvoke
(
request
);
// serialize response
byte
[]
responseBytes
=
HessianSerializer
.
serialize
(
rpcResponse
);
response
.
setContentType
(
"text/html;charset=utf-8"
);
response
.
setStatus
(
HttpServletResponse
.
SC_OK
);
baseRequest
.
setHandled
(
true
);
OutputStream
out
=
response
.
getOutputStream
();
out
.
write
(
responseBytes
);
out
.
flush
();
}
private
RpcResponse
doInvoke
(
HttpServletRequest
request
)
{
try
{
// deserialize request
byte
[]
requestBytes
=
HttpClientUtil
.
readBytes
(
request
);
if
(
requestBytes
==
null
||
requestBytes
.
length
==
0
)
{
RpcResponse
rpcResponse
=
new
RpcResponse
();
rpcResponse
.
setError
(
"RpcRequest byte[] is null"
);
return
rpcResponse
;
}
RpcRequest
rpcRequest
=
(
RpcRequest
)
HessianSerializer
.
deserialize
(
requestBytes
,
RpcRequest
.
class
);
// invoke
RpcResponse
rpcResponse
=
NetComServerFactory
.
invokeService
(
rpcRequest
,
null
);
return
rpcResponse
;
}
catch
(
Exception
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
RpcResponse
rpcResponse
=
new
RpcResponse
();
rpcResponse
.
setError
(
"Server-error:"
+
e
.
getMessage
());
return
rpcResponse
;
}
}
}
xxl-job-core/src/main/java/com/xxl/job/core/rpc/serialize/HessianSerializer.java
deleted
100644 → 0
浏览文件 @
02dc2a4c
package
com
.
xxl
.
job
.
core
.
rpc
.
serialize
;
import
com.caucho.hessian.io.Hessian2Input
;
import
com.caucho.hessian.io.Hessian2Output
;
import
java.io.ByteArrayInputStream
;
import
java.io.ByteArrayOutputStream
;
import
java.io.IOException
;
/**
* hessian serialize
* @author xuxueli 2015-9-26 02:53:29
*/
public
class
HessianSerializer
{
public
static
<
T
>
byte
[]
serialize
(
T
obj
){
ByteArrayOutputStream
os
=
new
ByteArrayOutputStream
();
Hessian2Output
ho
=
new
Hessian2Output
(
os
);
try
{
ho
.
writeObject
(
obj
);
ho
.
flush
();
byte
[]
result
=
os
.
toByteArray
();
return
result
;
}
catch
(
IOException
e
)
{
throw
new
IllegalStateException
(
e
.
getMessage
(),
e
);
}
finally
{
try
{
ho
.
close
();
}
catch
(
IOException
e
)
{
throw
new
IllegalStateException
(
e
.
getMessage
(),
e
);
}
try
{
os
.
close
();
}
catch
(
IOException
e
)
{
throw
new
IllegalStateException
(
e
.
getMessage
(),
e
);
}
}
}
public
static
<
T
>
Object
deserialize
(
byte
[]
bytes
,
Class
<
T
>
clazz
)
{
ByteArrayInputStream
is
=
new
ByteArrayInputStream
(
bytes
);
Hessian2Input
hi
=
new
Hessian2Input
(
is
);
try
{
Object
result
=
hi
.
readObject
();
return
result
;
}
catch
(
IOException
e
)
{
throw
new
IllegalStateException
(
e
.
getMessage
(),
e
);
}
finally
{
try
{
hi
.
close
();
}
catch
(
Exception
e
)
{
throw
new
IllegalStateException
(
e
.
getMessage
(),
e
);
}
try
{
is
.
close
();
}
catch
(
IOException
e
)
{
throw
new
IllegalStateException
(
e
.
getMessage
(),
e
);
}
}
}
}
xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java
浏览文件 @
443c946e
...
...
@@ -5,7 +5,6 @@ import com.xxl.job.core.biz.model.RegistryParam;
import
com.xxl.job.core.biz.model.ReturnT
;
import
com.xxl.job.core.enums.RegistryConfig
;
import
com.xxl.job.core.executor.XxlJobExecutor
;
import
com.xxl.job.core.util.IpUtil
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -24,7 +23,7 @@ public class ExecutorRegistryThread extends Thread {
private
Thread
registryThread
;
private
volatile
boolean
toStop
=
false
;
public
void
start
(
final
int
port
,
final
String
ip
,
final
String
appName
){
public
void
start
(
final
String
appName
,
final
String
address
){
// valid
if
(
appName
==
null
||
appName
.
trim
().
length
()==
0
)
{
...
...
@@ -36,14 +35,6 @@ public class ExecutorRegistryThread extends Thread {
return
;
}
// executor address (generate addredd = ip:port)
final
String
executorAddress
;
if
(
ip
!=
null
&&
ip
.
trim
().
length
()>
0
)
{
executorAddress
=
ip
.
trim
().
concat
(
":"
).
concat
(
String
.
valueOf
(
port
));
}
else
{
executorAddress
=
IpUtil
.
getIpPort
(
port
);
}
registryThread
=
new
Thread
(
new
Runnable
()
{
@Override
public
void
run
()
{
...
...
@@ -51,7 +42,7 @@ public class ExecutorRegistryThread extends Thread {
// registry
while
(!
toStop
)
{
try
{
RegistryParam
registryParam
=
new
RegistryParam
(
RegistryConfig
.
RegistType
.
EXECUTOR
.
name
(),
appName
,
executorA
ddress
);
RegistryParam
registryParam
=
new
RegistryParam
(
RegistryConfig
.
RegistType
.
EXECUTOR
.
name
(),
appName
,
a
ddress
);
for
(
AdminBiz
adminBiz:
XxlJobExecutor
.
getAdminBizList
())
{
try
{
ReturnT
<
String
>
registryResult
=
adminBiz
.
registry
(
registryParam
);
...
...
@@ -80,7 +71,7 @@ public class ExecutorRegistryThread extends Thread {
// registry remove
try
{
RegistryParam
registryParam
=
new
RegistryParam
(
RegistryConfig
.
RegistType
.
EXECUTOR
.
name
(),
appName
,
executorA
ddress
);
RegistryParam
registryParam
=
new
RegistryParam
(
RegistryConfig
.
RegistType
.
EXECUTOR
.
name
(),
appName
,
a
ddress
);
for
(
AdminBiz
adminBiz:
XxlJobExecutor
.
getAdminBizList
())
{
try
{
ReturnT
<
String
>
registryResult
=
adminBiz
.
registryRemove
(
registryParam
);
...
...
xxl-job-core/src/main/java/com/xxl/job/core/util/HttpClientUtil.java
deleted
100644 → 0
浏览文件 @
02dc2a4c
package
com
.
xxl
.
job
.
core
.
util
;
import
org.apache.http.HttpEntity
;
import
org.apache.http.HttpResponse
;
import
org.apache.http.client.config.RequestConfig
;
import
org.apache.http.client.methods.HttpPost
;
import
org.apache.http.entity.ByteArrayEntity
;
import
org.apache.http.entity.ContentType
;
import
org.apache.http.impl.client.CloseableHttpClient
;
import
org.apache.http.impl.client.HttpClients
;
import
org.apache.http.util.EntityUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
javax.servlet.http.HttpServletRequest
;
import
java.io.IOException
;
import
java.io.InputStream
;
/**
* httpclient util
* @author xuxueli 2015-10-31 19:50:41
*/
public
class
HttpClientUtil
{
private
static
Logger
logger
=
LoggerFactory
.
getLogger
(
HttpClientUtil
.
class
);
/**
* post request
*/
public
static
byte
[]
postRequest
(
String
reqURL
,
byte
[]
data
)
throws
Exception
{
byte
[]
responseBytes
=
null
;
HttpPost
httpPost
=
new
HttpPost
(
reqURL
);
//CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpClient
httpClient
=
HttpClients
.
custom
().
disableAutomaticRetries
().
build
();
// disable retry
try
{
// init post
/*if (params != null && !params.isEmpty()) {
List<NameValuePair> formParams = new ArrayList<NameValuePair>();
for (Map.Entry<String, String> entry : params.entrySet()) {
formParams.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
}
httpPost.setEntity(new UrlEncodedFormEntity(formParams, "UTF-8"));
}*/
// timeout
RequestConfig
requestConfig
=
RequestConfig
.
custom
()
.
setConnectionRequestTimeout
(
10000
)
.
setSocketTimeout
(
10000
)
.
setConnectTimeout
(
10000
)
.
build
();
httpPost
.
setConfig
(
requestConfig
);
// data
if
(
data
!=
null
)
{
httpPost
.
setEntity
(
new
ByteArrayEntity
(
data
,
ContentType
.
DEFAULT_BINARY
));
}
// do post
HttpResponse
response
=
httpClient
.
execute
(
httpPost
);
HttpEntity
entity
=
response
.
getEntity
();
if
(
null
!=
entity
)
{
responseBytes
=
EntityUtils
.
toByteArray
(
entity
);
EntityUtils
.
consume
(
entity
);
}
}
catch
(
Exception
e
)
{
throw
e
;
}
finally
{
httpPost
.
releaseConnection
();
try
{
httpClient
.
close
();
}
catch
(
IOException
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
}
return
responseBytes
;
}
/**
* read bytes from http request
* @param request
* @return
* @throws IOException
*/
public
static
final
byte
[]
readBytes
(
HttpServletRequest
request
)
throws
IOException
{
request
.
setCharacterEncoding
(
"UTF-8"
);
int
contentLen
=
request
.
getContentLength
();
InputStream
is
=
request
.
getInputStream
();
if
(
contentLen
>
0
)
{
int
readLen
=
0
;
int
readLengthThisTime
=
0
;
byte
[]
message
=
new
byte
[
contentLen
];
try
{
while
(
readLen
!=
contentLen
)
{
readLengthThisTime
=
is
.
read
(
message
,
readLen
,
contentLen
-
readLen
);
if
(
readLengthThisTime
==
-
1
)
{
break
;
}
readLen
+=
readLengthThisTime
;
}
return
message
;
}
catch
(
IOException
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
throw
e
;
}
}
return
new
byte
[]
{};
}
}
xxl-job-core/src/main/java/com/xxl/job/core/util/IpUtil.java
deleted
100644 → 0
浏览文件 @
02dc2a4c
package
com
.
xxl
.
job
.
core
.
util
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.net.InetAddress
;
import
java.net.NetworkInterface
;
import
java.net.UnknownHostException
;
import
java.util.Enumeration
;
import
java.util.regex.Pattern
;
/**
* get ip
*
* @author xuxueli 2016-5-22 11:38:05
*/
public
class
IpUtil
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
IpUtil
.
class
);
private
static
final
String
ANYHOST
=
"0.0.0.0"
;
private
static
final
String
LOCALHOST
=
"127.0.0.1"
;
public
static
final
Pattern
IP_PATTERN
=
Pattern
.
compile
(
"\\d{1,3}(\\.\\d{1,3}){3,5}$"
);
private
static
volatile
String
LOCAL_ADDRESS
=
null
;
/**
* valid address
* @param address
* @return boolean
*/
private
static
boolean
isValidAddress
(
InetAddress
address
)
{
if
(
address
==
null
||
address
.
isLoopbackAddress
()
||
address
.
isLinkLocalAddress
())
{
return
false
;
}
String
name
=
address
.
getHostAddress
();
return
(
name
!=
null
&&
!
ANYHOST
.
equals
(
name
)
&&
!
LOCALHOST
.
equals
(
name
)
&&
IP_PATTERN
.
matcher
(
name
).
matches
());
}
/**
* get first valid addredd
*
* @return InetAddress
*/
private
static
InetAddress
getFirstValidAddress
()
{
// NetworkInterface address
try
{
Enumeration
<
NetworkInterface
>
interfaces
=
NetworkInterface
.
getNetworkInterfaces
();
if
(
interfaces
!=
null
)
{
while
(
interfaces
.
hasMoreElements
())
{
try
{
NetworkInterface
network
=
interfaces
.
nextElement
();
Enumeration
<
InetAddress
>
addresses
=
network
.
getInetAddresses
();
if
(
addresses
!=
null
)
{
while
(
addresses
.
hasMoreElements
())
{
try
{
InetAddress
address
=
addresses
.
nextElement
();
if
(
isValidAddress
(
address
))
{
return
address
;
}
}
catch
(
Throwable
e
)
{
logger
.
error
(
"Failed to retriving ip address, "
+
e
.
getMessage
(),
e
);
}
}
}
}
catch
(
Throwable
e
)
{
logger
.
error
(
"Failed to retriving ip address, "
+
e
.
getMessage
(),
e
);
}
}
}
}
catch
(
Throwable
e
)
{
logger
.
error
(
"Failed to retriving ip address, "
+
e
.
getMessage
(),
e
);
}
// getLocalHost address
try
{
InetAddress
localAddress
=
InetAddress
.
getLocalHost
();
if
(
isValidAddress
(
localAddress
))
{
return
localAddress
;
}
}
catch
(
Throwable
e
)
{
logger
.
error
(
"Failed to retriving ip address, "
+
e
.
getMessage
(),
e
);
}
logger
.
error
(
"Could not get local host ip address, will use 127.0.0.1 instead."
);
return
null
;
}
/**
* get address
*
* @return String
*/
private
static
String
getAddress
()
{
if
(
LOCAL_ADDRESS
!=
null
)
{
return
LOCAL_ADDRESS
;
}
InetAddress
localAddress
=
getFirstValidAddress
();
LOCAL_ADDRESS
=
localAddress
.
getHostAddress
();
return
LOCAL_ADDRESS
;
}
/**
* get ip
*
* @return String
*/
public
static
String
getIp
(){
return
getAddress
();
}
/**
* get ip:port
*
* @param port
* @return String
*/
public
static
String
getIpPort
(
int
port
){
String
ip
=
getIp
();
if
(
ip
==
null
)
{
return
null
;
}
return
ip
.
concat
(
":"
).
concat
(
String
.
valueOf
(
port
));
}
public
static
void
main
(
String
[]
args
)
throws
UnknownHostException
{
System
.
out
.
println
(
getIp
());
System
.
out
.
println
(
getIpPort
(
8080
));
}
}
xxl-job-core/src/main/java/com/xxl/job/core/util/NetUtil.java
deleted
100644 → 0
浏览文件 @
02dc2a4c
package
com
.
xxl
.
job
.
core
.
util
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.IOException
;
import
java.net.ServerSocket
;
/**
* net util
*
* @author xuxueli 2017-11-29 17:00:25
*/
public
class
NetUtil
{
private
static
Logger
logger
=
LoggerFactory
.
getLogger
(
NetUtil
.
class
);
/**
* find avaliable port
*
* @param defaultPort
* @return
*/
public
static
int
findAvailablePort
(
int
defaultPort
)
{
int
portTmp
=
defaultPort
;
while
(
portTmp
<
65535
)
{
if
(!
isPortUsed
(
portTmp
))
{
return
portTmp
;
}
else
{
portTmp
++;
}
}
portTmp
=
defaultPort
--;
while
(
portTmp
>
0
)
{
if
(!
isPortUsed
(
portTmp
))
{
return
portTmp
;
}
else
{
portTmp
--;
}
}
throw
new
IllegalStateException
(
"no available port."
);
}
/**
* check port used
*
* @param port
* @return
*/
public
static
boolean
isPortUsed
(
int
port
)
{
boolean
used
=
false
;
ServerSocket
serverSocket
=
null
;
try
{
serverSocket
=
new
ServerSocket
(
port
);
used
=
false
;
}
catch
(
IOException
e
)
{
logger
.
debug
(
">>>>>>>>>>> xxl-job, port[{}] is in use."
,
port
);
used
=
true
;
}
finally
{
if
(
serverSocket
!=
null
)
{
try
{
serverSocket
.
close
();
}
catch
(
IOException
e
)
{
logger
.
info
(
""
);
}
}
}
return
used
;
}
}
xxl-job-executor-samples/xxl-job-executor-sample-jfinal/src/main/java/com/xuxueli/executor/sample/jfinal/jobhandler/HttpJobHandler.java
浏览文件 @
443c946e
...
...
@@ -3,18 +3,13 @@ package com.xuxueli.executor.sample.jfinal.jobhandler;
import
com.xxl.job.core.biz.model.ReturnT
;
import
com.xxl.job.core.handler.IJobHandler
;
import
com.xxl.job.core.log.XxlJobLogger
;
import
com.xxl.job.core.util.ShardingUtil
;
import
org.apache.http.HttpEntity
;
import
org.apache.http.HttpResponse
;
import
org.apache.http.client.config.RequestConfig
;
import
org.apache.http.client.methods.HttpGet
;
import
org.apache.http.impl.client.CloseableHttpClient
;
import
org.apache.http.impl.client.HttpClients
;
import
org.apache.http.util.EntityUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.eclipse.jetty.client.HttpClient
;
import
org.eclipse.jetty.client.api.ContentResponse
;
import
org.eclipse.jetty.client.api.Request
;
import
org.eclipse.jetty.http.HttpMethod
;
import
org.eclipse.jetty.http.HttpStatus
;
import
java.
io.IOException
;
import
java.
util.concurrent.TimeUnit
;
/**
* 跨平台Http任务
...
...
@@ -32,46 +27,26 @@ public class HttpJobHandler extends IJobHandler {
return
FAIL
;
}
// http
Get config
Http
Get
httpGet
=
new
HttpGet
(
param
);
RequestConfig
requestConfig
=
RequestConfig
.
custom
().
setSocketTimeout
(
5000
).
setConnectTimeout
(
5000
).
build
();
http
Get
.
setConfig
(
requestConfig
);
// http
client
Http
Client
httpClient
=
new
HttpClient
(
);
httpClient
.
setFollowRedirects
(
false
);
// Configure HttpClient, for example:
http
Client
.
start
();
// Start HttpClient
CloseableHttpClient
httpClient
=
null
;
try
{
httpClient
=
HttpClients
.
custom
().
disableAutomaticRetries
().
build
();
// request
Request
request
=
httpClient
.
newRequest
(
param
);
request
.
method
(
HttpMethod
.
GET
);
request
.
timeout
(
5000
,
TimeUnit
.
MILLISECONDS
);
// parse response
HttpResponse
response
=
httpClient
.
execute
(
httpGet
);
HttpEntity
entity
=
response
.
getEntity
();
if
(
response
.
getStatusLine
().
getStatusCode
()
!=
200
)
{
XxlJobLogger
.
log
(
"Http StatusCode({}) Invalid."
,
response
.
getStatusLine
().
getStatusCode
());
return
FAIL
;
}
if
(
null
==
entity
)
{
XxlJobLogger
.
log
(
"Http Entity Empty."
);
return
FAIL
;
}
String
responseMsg
=
EntityUtils
.
toString
(
entity
,
"UTF-8"
);
XxlJobLogger
.
log
(
responseMsg
);
EntityUtils
.
consume
(
entity
);
return
SUCCESS
;
}
catch
(
Exception
e
)
{
XxlJobLogger
.
log
(
e
);
// invoke
ContentResponse
response
=
request
.
send
();
if
(
response
.
getStatus
()
!=
HttpStatus
.
OK_200
)
{
XxlJobLogger
.
log
(
"Http StatusCode({}) Invalid."
,
response
.
getStatus
());
return
FAIL
;
}
finally
{
if
(
httpGet
!=
null
)
{
httpGet
.
releaseConnection
();
}
if
(
httpClient
!=
null
)
{
try
{
httpClient
.
close
();
}
catch
(
IOException
e
)
{
XxlJobLogger
.
log
(
e
);
}
}
}
String
responseMsg
=
response
.
getContentAsString
();
XxlJobLogger
.
log
(
responseMsg
);
return
SUCCESS
;
}
}
xxl-job-executor-samples/xxl-job-executor-sample-nutz/src/main/java/com/xuxueli/executor/sample/nutz/jobhandler/HttpJobHandler.java
浏览文件 @
443c946e
...
...
@@ -4,16 +4,14 @@ import com.xxl.job.core.biz.model.ReturnT;
import
com.xxl.job.core.handler.IJobHandler
;
import
com.xxl.job.core.handler.annotation.JobHandler
;
import
com.xxl.job.core.log.XxlJobLogger
;
import
org.apache.http.HttpEntity
;
import
org.apache.http.HttpResponse
;
import
org.apache.http.client.config.RequestConfig
;
import
org.apache.http.client.methods.HttpGet
;
import
org.apache.http.impl.client.CloseableHttpClient
;
import
org.apache.http.impl.client.HttpClients
;
import
org.apache.http.util.EntityUtils
;
import
org.eclipse.jetty.client.HttpClient
;
import
org.eclipse.jetty.client.api.ContentResponse
;
import
org.eclipse.jetty.client.api.Request
;
import
org.eclipse.jetty.http.HttpMethod
;
import
org.eclipse.jetty.http.HttpStatus
;
import
org.nutz.ioc.loader.annotation.IocBean
;
import
java.
io.IOException
;
import
java.
util.concurrent.TimeUnit
;
/**
* 跨平台Http任务
...
...
@@ -33,46 +31,26 @@ public class HttpJobHandler extends IJobHandler {
return
FAIL
;
}
// http
Get config
Http
Get
httpGet
=
new
HttpGet
(
param
);
RequestConfig
requestConfig
=
RequestConfig
.
custom
().
setSocketTimeout
(
5000
).
setConnectTimeout
(
5000
).
build
();
http
Get
.
setConfig
(
requestConfig
);
// http
client
Http
Client
httpClient
=
new
HttpClient
(
);
httpClient
.
setFollowRedirects
(
false
);
// Configure HttpClient, for example:
http
Client
.
start
();
// Start HttpClient
CloseableHttpClient
httpClient
=
null
;
try
{
httpClient
=
HttpClients
.
custom
().
disableAutomaticRetries
().
build
();
// request
Request
request
=
httpClient
.
newRequest
(
param
);
request
.
method
(
HttpMethod
.
GET
);
request
.
timeout
(
5000
,
TimeUnit
.
MILLISECONDS
);
// parse response
HttpResponse
response
=
httpClient
.
execute
(
httpGet
);
HttpEntity
entity
=
response
.
getEntity
();
if
(
response
.
getStatusLine
().
getStatusCode
()
!=
200
)
{
XxlJobLogger
.
log
(
"Http StatusCode({}) Invalid."
,
response
.
getStatusLine
().
getStatusCode
());
return
FAIL
;
}
if
(
null
==
entity
)
{
XxlJobLogger
.
log
(
"Http Entity Empty."
);
return
FAIL
;
}
String
responseMsg
=
EntityUtils
.
toString
(
entity
,
"UTF-8"
);
XxlJobLogger
.
log
(
responseMsg
);
EntityUtils
.
consume
(
entity
);
return
SUCCESS
;
}
catch
(
Exception
e
)
{
XxlJobLogger
.
log
(
e
);
// invoke
ContentResponse
response
=
request
.
send
();
if
(
response
.
getStatus
()
!=
HttpStatus
.
OK_200
)
{
XxlJobLogger
.
log
(
"Http StatusCode({}) Invalid."
,
response
.
getStatus
());
return
FAIL
;
}
finally
{
if
(
httpGet
!=
null
)
{
httpGet
.
releaseConnection
();
}
if
(
httpClient
!=
null
)
{
try
{
httpClient
.
close
();
}
catch
(
IOException
e
)
{
XxlJobLogger
.
log
(
e
);
}
}
}
String
responseMsg
=
response
.
getContentAsString
();
XxlJobLogger
.
log
(
responseMsg
);
return
SUCCESS
;
}
}
xxl-job-executor-samples/xxl-job-executor-sample-spring/src/main/java/com/xxl/job/executor/service/jobhandler/HttpJobHandler.java
浏览文件 @
443c946e
...
...
@@ -4,16 +4,14 @@ import com.xxl.job.core.biz.model.ReturnT;
import
com.xxl.job.core.handler.IJobHandler
;
import
com.xxl.job.core.handler.annotation.JobHandler
;
import
com.xxl.job.core.log.XxlJobLogger
;
import
org.apache.http.HttpEntity
;
import
org.apache.http.HttpResponse
;
import
org.apache.http.client.config.RequestConfig
;
import
org.apache.http.client.methods.HttpGet
;
import
org.apache.http.impl.client.CloseableHttpClient
;
import
org.apache.http.impl.client.HttpClients
;
import
org.apache.http.util.EntityUtils
;
import
org.eclipse.jetty.client.HttpClient
;
import
org.eclipse.jetty.client.api.ContentResponse
;
import
org.eclipse.jetty.client.api.Request
;
import
org.eclipse.jetty.http.HttpMethod
;
import
org.eclipse.jetty.http.HttpStatus
;
import
org.springframework.stereotype.Component
;
import
java.
io.IOException
;
import
java.
util.concurrent.TimeUnit
;
/**
* 跨平台Http任务
...
...
@@ -33,46 +31,26 @@ public class HttpJobHandler extends IJobHandler {
return
FAIL
;
}
// http
Get config
Http
Get
httpGet
=
new
HttpGet
(
param
);
RequestConfig
requestConfig
=
RequestConfig
.
custom
().
setSocketTimeout
(
5000
).
setConnectTimeout
(
5000
).
build
();
http
Get
.
setConfig
(
requestConfig
);
// http
client
Http
Client
httpClient
=
new
HttpClient
(
);
httpClient
.
setFollowRedirects
(
false
);
// Configure HttpClient, for example:
http
Client
.
start
();
// Start HttpClient
CloseableHttpClient
httpClient
=
null
;
try
{
httpClient
=
HttpClients
.
custom
().
disableAutomaticRetries
().
build
();
// request
Request
request
=
httpClient
.
newRequest
(
param
);
request
.
method
(
HttpMethod
.
GET
);
request
.
timeout
(
5000
,
TimeUnit
.
MILLISECONDS
);
// parse response
HttpResponse
response
=
httpClient
.
execute
(
httpGet
);
HttpEntity
entity
=
response
.
getEntity
();
if
(
response
.
getStatusLine
().
getStatusCode
()
!=
200
)
{
XxlJobLogger
.
log
(
"Http StatusCode({}) Invalid."
,
response
.
getStatusLine
().
getStatusCode
());
return
FAIL
;
}
if
(
null
==
entity
)
{
XxlJobLogger
.
log
(
"Http Entity Empty."
);
return
FAIL
;
}
String
responseMsg
=
EntityUtils
.
toString
(
entity
,
"UTF-8"
);
XxlJobLogger
.
log
(
responseMsg
);
EntityUtils
.
consume
(
entity
);
return
SUCCESS
;
}
catch
(
Exception
e
)
{
XxlJobLogger
.
log
(
e
);
// invoke
ContentResponse
response
=
request
.
send
();
if
(
response
.
getStatus
()
!=
HttpStatus
.
OK_200
)
{
XxlJobLogger
.
log
(
"Http StatusCode({}) Invalid."
,
response
.
getStatus
());
return
FAIL
;
}
finally
{
if
(
httpGet
!=
null
)
{
httpGet
.
releaseConnection
();
}
if
(
httpClient
!=
null
)
{
try
{
httpClient
.
close
();
}
catch
(
IOException
e
)
{
XxlJobLogger
.
log
(
e
);
}
}
}
String
responseMsg
=
response
.
getContentAsString
();
XxlJobLogger
.
log
(
responseMsg
);
return
SUCCESS
;
}
}
xxl-job-executor-samples/xxl-job-executor-sample-spring/src/test/java/com/xxl/executor/test/DemoJobHandlerTest.java
浏览文件 @
443c946e
...
...
@@ -5,7 +5,10 @@ import com.xxl.job.core.biz.model.ReturnT;
import
com.xxl.job.core.biz.model.TriggerParam
;
import
com.xxl.job.core.enums.ExecutorBlockStrategyEnum
;
import
com.xxl.job.core.glue.GlueTypeEnum
;
import
com.xxl.job.core.rpc.netcom.NetComClientProxy
;
import
com.xxl.rpc.remoting.invoker.call.CallType
;
import
com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean
;
import
com.xxl.rpc.remoting.net.NetEnum
;
import
com.xxl.rpc.serialize.Serializer
;
/**
* executor-api client, test
...
...
@@ -34,7 +37,9 @@ public class DemoJobHandlerTest {
// do remote trigger
String
accessToken
=
null
;
ExecutorBiz
executorBiz
=
(
ExecutorBiz
)
new
NetComClientProxy
(
ExecutorBiz
.
class
,
"127.0.0.1:9999"
,
null
).
getObject
();
ExecutorBiz
executorBiz
=
(
ExecutorBiz
)
new
XxlRpcReferenceBean
(
NetEnum
.
JETTY
,
Serializer
.
SerializeEnum
.
HESSIAN
.
getSerializer
(),
CallType
.
SYNC
,
ExecutorBiz
.
class
,
null
,
10000
,
"127.0.0.1:9999"
,
null
,
null
).
getObject
();
ReturnT
<
String
>
runResult
=
executorBiz
.
run
(
triggerParam
);
}
...
...
xxl-job-executor-samples/xxl-job-executor-sample-springboot/pom.xml
浏览文件 @
443c946e
...
...
@@ -53,6 +53,11 @@
<artifactId>
jetty-io
</artifactId>
<version>
${jetty-server.version}
</version>
</dependency>
<dependency>
<groupId>
org.eclipse.jetty
</groupId>
<artifactId>
jetty-client
</artifactId>
<version>
${jetty-server.version}
</version>
</dependency>
</dependencies>
</dependencyManagement>
...
...
xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/HttpJobHandler.java
浏览文件 @
443c946e
...
...
@@ -4,16 +4,14 @@ import com.xxl.job.core.biz.model.ReturnT;
import
com.xxl.job.core.handler.IJobHandler
;
import
com.xxl.job.core.handler.annotation.JobHandler
;
import
com.xxl.job.core.log.XxlJobLogger
;
import
org.apache.http.HttpEntity
;
import
org.apache.http.HttpResponse
;
import
org.apache.http.client.config.RequestConfig
;
import
org.apache.http.client.methods.HttpGet
;
import
org.apache.http.impl.client.CloseableHttpClient
;
import
org.apache.http.impl.client.HttpClients
;
import
org.apache.http.util.EntityUtils
;
import
org.eclipse.jetty.client.HttpClient
;
import
org.eclipse.jetty.client.api.ContentResponse
;
import
org.eclipse.jetty.client.api.Request
;
import
org.eclipse.jetty.http.HttpMethod
;
import
org.eclipse.jetty.http.HttpStatus
;
import
org.springframework.stereotype.Component
;
import
java.
io.IOException
;
import
java.
util.concurrent.TimeUnit
;
/**
* 跨平台Http任务
...
...
@@ -33,46 +31,26 @@ public class HttpJobHandler extends IJobHandler {
return
FAIL
;
}
// http
Get config
Http
Get
httpGet
=
new
HttpGet
(
param
);
RequestConfig
requestConfig
=
RequestConfig
.
custom
().
setSocketTimeout
(
5000
).
setConnectTimeout
(
5000
).
build
();
http
Get
.
setConfig
(
requestConfig
);
// http
client
Http
Client
httpClient
=
new
HttpClient
(
);
httpClient
.
setFollowRedirects
(
false
);
// Configure HttpClient, for example:
http
Client
.
start
();
// Start HttpClient
CloseableHttpClient
httpClient
=
null
;
try
{
httpClient
=
HttpClients
.
custom
().
disableAutomaticRetries
().
build
();
// request
Request
request
=
httpClient
.
newRequest
(
param
);
request
.
method
(
HttpMethod
.
GET
);
request
.
timeout
(
5000
,
TimeUnit
.
MILLISECONDS
);
// parse response
HttpResponse
response
=
httpClient
.
execute
(
httpGet
);
HttpEntity
entity
=
response
.
getEntity
();
if
(
response
.
getStatusLine
().
getStatusCode
()
!=
200
)
{
XxlJobLogger
.
log
(
"Http StatusCode({}) Invalid."
,
response
.
getStatusLine
().
getStatusCode
());
return
FAIL
;
}
if
(
null
==
entity
)
{
XxlJobLogger
.
log
(
"Http Entity Empty."
);
return
FAIL
;
}
String
responseMsg
=
EntityUtils
.
toString
(
entity
,
"UTF-8"
);
XxlJobLogger
.
log
(
responseMsg
);
EntityUtils
.
consume
(
entity
);
return
SUCCESS
;
}
catch
(
Exception
e
)
{
XxlJobLogger
.
log
(
e
);
// invoke
ContentResponse
response
=
request
.
send
();
if
(
response
.
getStatus
()
!=
HttpStatus
.
OK_200
)
{
XxlJobLogger
.
log
(
"Http StatusCode({}) Invalid."
,
response
.
getStatus
());
return
FAIL
;
}
finally
{
if
(
httpGet
!=
null
)
{
httpGet
.
releaseConnection
();
}
if
(
httpClient
!=
null
)
{
try
{
httpClient
.
close
();
}
catch
(
IOException
e
)
{
XxlJobLogger
.
log
(
e
);
}
}
}
String
responseMsg
=
response
.
getContentAsString
();
XxlJobLogger
.
log
(
responseMsg
);
return
SUCCESS
;
}
}
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论