分类: 架构设计与优化
2023-02-16 11:07:07
作者:京东零售 张宾
在后台开发中,会经常用到线程池技术,对于线程池核心参数的配置很大程度上依靠经验。然而,由于系统运行过程中存在的不确定性,我们很难一劳永逸地规划一个合理的线程池参数。在对线程池配置参数进行调整时,一般需要对服务进行重启,这样修改的成本就会偏高。一种解决办法就是,将线程池的配置放到配置平台侧,系统运行期间开发人员根据系统运行情况对核心参数进行动态配置。
本文以公司ducc配置平台作为服务配置中心,以修改线程池核心线程数、{banned}最佳大线程数为例,实现一个简单的动态化线程池。
当前项目中使用的是spring 框架提供的线程池类threadpooltaskexecutor,而threadpooltaskexecutor底层又使用里了jdk中线程池类threadpoolexecutor,线程池类threadpoolexecutor有两个成员方法setcorepoolsize、setmaximumpoolsize可以在运行时设置核心线程数和{banned}最佳大线程数。
setcorepoolsize方法执行流程是:首先会覆盖之前构造函数设置的corepoolsize,然后,如果新的值比原始值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创建新的工作线程。流程图如下:
setmaximumpoolsize方法: 首先会覆盖之前构造函数设置的maximumpoolsize,然后,如果新的值比原来的值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁。
spring 框架提供的线程池类threadpooltaskexecutor,此类封装了对threadpoolexecutor有两个成员方法setcorepoolsize、setmaximumpoolsize的调用。
基于以上源代码分析,要实现一个简单的动态线程池需要以下几步:
(1)定义一个动态线程池类,继承threadpooltaskexecutor,目的跟非动态配置的线程池类threadpooltaskexecutor区分开;
(2)定义和实现一个动态线程池配置定时刷的类,目的定时对比ducc配置的线程池数和本地应用中线程数是否一致,若不一致,则更新本地动态线程池线程池数;
(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key;
(4)定义和实现一个应用启动后根据动态线程池bean和从ducc配置平台拉取配置刷新应用中的线程数配置;
接下来代码一一实现:
(1)动态线程池类
/**
* 动态线程池
*
*/ public class dynamicthreadpooltaskexecutor extends threadpooltaskexecutor {
}
(2)动态线程池配置定时刷新类
@slf4j
public class dynamicthreadpoolrefresh implements initializingbean { /**
* maintain all automatically registered and manually registered dynamicthreadpooltaskexecutor.
*/ private static final concurrentmap dtp_registry = new concurrenthashmap<>(); /**
* @param threadpoolbeanname
* @param threadpooltaskexecutor
*/ public static void registerdynamicthreadpool(string threadpoolbeanname, dynamicthreadpooltaskexecutor threadpooltaskexecutor) {
log.info("dynamicthreadpool register threadpooltaskexecutor, threadpoolbeanname: {}, executor: {}", threadpoolbeanname, executorconverter.convert(threadpoolbeanname, threadpooltaskexecutor.getthreadpoolexecutor()));
dtp_registry.putifabsent(threadpoolbeanname, threadpooltaskexecutor);
} @override public void afterpropertiesset() throws exception {
this.refresh(); //创建定时任务线程池 scheduledexecutorservice executorservice = new scheduledthreadpoolexecutor(1, (new basicthreadfactory.builder()).namingpattern("dynamicthreadpoolrefresh-%d").daemon(true).build()); //延迟1秒执行,每个1分钟check一次 executorservice.scheduleatfixedrate(new refreshthreadpoolconfig(), 1000l, 60000l, timeunit.milliseconds);
}
private void refresh() {
string dynamicthreadpool = "";
try {
if (dtp_registry.isempty()) {
log.debug("dynamicthreadpool refresh dtp_registry is empty");
return;
}
dynamicthreadpool = duccconfigutil.getvalue(duccconfigconstants.dynamic_thread_pool);
if (stringutils.isblank(dynamicthreadpool)) {
log.debug("dynamicthreadpool refresh dynamicthreadpool not config");
return;
}
log.debug("dynamicthreadpool refresh dynamicthreadpool:{}", dynamicthreadpool);
list threadpoolpropertieslist = jsonutil.json2object(dynamicthreadpool, new typereference>() {
});
if (collectionutils.isempty(threadpoolpropertieslist)) {
log.error("dynamicthreadpool refresh dynamicthreadpool json2object error!{}", dynamicthreadpool);
return;
}
for (threadpoolproperties properties : threadpoolpropertieslist) { dorefresh(properties);
}
} catch (exception e) {
log.error("dynamicthreadpool refresh exception!dynamicthreadpool:{}", dynamicthreadpool, e);
}
} /**
* @param properties
*/ private void dorefresh(threadpoolproperties properties) {
if (stringutils.isblank(properties.getthreadpoolbeanname())
|| properties.getcorepoolsize() < 1 || properties.getmaxpoolsize() < 1 || properties.getmaxpoolsize() < properties.getcorepoolsize()) {
log.error("dynamicthreadpool refresh, invalid parameters exist, properties: {}", properties);
return;
}
dynamicthreadpooltaskexecutor threadpooltaskexecutor = dtp_registry.get(properties.getthreadpoolbeanname());
if (objects.isnull(threadpooltaskexecutor)) {
log.warn("dynamicthreadpool refresh, dtp_registry not found {}", properties.getthreadpoolbeanname());
return;
}
threadpoolproperties oldprop = executorconverter.convert(properties.getthreadpoolbeanname(), threadpooltaskexecutor.getthreadpoolexecutor());
if (objects.equals(oldprop.getcorepoolsize(), properties.getcorepoolsize())
&& objects.equals(oldprop.getmaxpoolsize(), properties.getmaxpoolsize())) {
log.warn("dynamicthreadpool refresh, properties of [{}] have not changed.", properties.getthreadpoolbeanname());
return;
}
if (!objects.equals(oldprop.getcorepoolsize(), properties.getcorepoolsize())) {
threadpooltaskexecutor.setcorepoolsize(properties.getcorepoolsize());
log.info("dynamicthreadpool refresh, corepoolsize changed!{} {}", properties.getthreadpoolbeanname(), properties.getcorepoolsize());
}
if (!objects.equals(oldprop.getmaxpoolsize(), properties.getmaxpoolsize())) {
threadpooltaskexecutor.setmaxpoolsize(properties.getmaxpoolsize());
log.info("dynamicthreadpool refresh, maxpoolsize changed!{} {}", properties.getthreadpoolbeanname(), properties.getmaxpoolsize());
}
threadpoolproperties newprop = executorconverter.convert(properties.getthreadpoolbeanname(), threadpooltaskexecutor.getthreadpoolexecutor());
log.info("dynamicthreadpool refresh result!{} oldprop:{},newprop:{}", properties.getthreadpoolbeanname(), oldprop, newprop);
}
private class refreshthreadpoolconfig extends timertask {
private refreshthreadpoolconfig() {
} @override public void run() {
dynamicthreadpoolrefresh.this.refresh();
}
}
}
线程池配置类
@data public class threadpoolproperties { /**
* 线程池名称
*/ private string threadpoolbeanname; /**
* 线程池核心线程数量
*/ private int corepoolsize; /**
* 线程池{banned}最佳大线程池数量
*/ private int maxpoolsize;
}
(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key
ducc配置平台使用见:?
动态线程池配置key:dynamic.thread.pool
配置value:
[ { "threadpoolbeanname": "submitorderthreadpooltaskexecutor", "corepoolsize": 32, "maxpoolsize": 128 }]
(4) 应用启动刷新应用本地动态线程池配置
@slf4j public class dynamicthreadpoolpostprocessor implements beanpostprocessor { @override public object postprocessafterinitialization(object bean, string beanname) throws beansexception { if (bean instanceof dynamicthreadpooltaskexecutor) { dynamicthreadpoolrefresh.registerdynamicthreadpool(beanname, (dynamicthreadpooltaskexecutor) bean);
} return bean;
}
}
动态线程池bean声明
<bean id="threadpooltaskexecutor" class="com.jd.concurrent.threadpooltaskexecutorwrapper"> <property name="corepoolsize" value="128"/> <property name="maxpoolsize" value="512"/> <property name="queuecapacity" value="500"/> <property name="keepaliveseconds" value="60"/> <property name="rejectedexecutionhandler"> <bean class="java.util.concurrent.threadpoolexecutor$callerrunspolicy"/> property> bean> <bean id="submitorderthreadpooltaskexecutor" class="com.jd.concurrent.dynamicthreadpooltaskexecutor"> <property name="corepoolsize" value="32"/> <property name="maxpoolsize" value="128"/> <property name="queuecapacity" value="500"/> <property name="keepaliveseconds" value="60"/> <property name="rejectedexecutionhandler"> <bean class="java.util.concurrent.threadpoolexecutor$callerrunspolicy"/> property> bean> <bean class="com.jd.concurrent.dynamicthreadpoolpostprocessor"/> <bean class="com.jd.concurrent.dynamicthreadpoolrefresh"/>
业务类注入spring bean后,直接使用即可
@resource
private threadpooltaskexecutor submitorderthreadpooltaskexecutor; runnable asynctask = ()->{...}; completablefuture.runasync(asynctask, this.submitorderthreadpooltaskexecutor);
本文从实际项目的业务痛点场景出发,并基于公司已有的ducc配置平台简单实现了线程池线程数量可配置。