凯发app官方网站-凯发k8官网下载客户端中心 | | 凯发app官方网站-凯发k8官网下载客户端中心
  • 博客访问: 85422
  • 博文数量: 165
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1655
  • 用 户 组: 普通用户
  • 注册时间: 2022-09-26 14:37
文章分类

全部博文(165)

文章存档

2024年(2)

2023年(95)

2022年(68)

我的朋友
相关博文
  • ·
  • ·
  • ·
  • ·
  • ·
  • ·
  • ·
  • ·
  • ·
  • ·

分类: 架构设计与优化

2023-02-16 11:07:07

作者:京东零售 张宾

利用ducc配置平台实现一个动态化线程池-凯发app官方网站

在后台开发中,会经常用到线程池技术,对于线程池核心参数的配置很大程度上依靠经验。然而,由于系统运行过程中存在的不确定性,我们很难一劳永逸地规划一个合理的线程池参数。在对线程池配置参数进行调整时,一般需要对服务进行重启,这样修改的成本就会偏高。一种解决办法就是,将线程池的配置放到配置平台侧,系统运行期间开发人员根据系统运行情况对核心参数进行动态配置。

本文以公司ducc配置平台作为服务配置中心,以修改线程池核心线程数、{banned}最佳大线程数为例,实现一个简单的动态化线程池。

当前项目中使用的是spring 框架提供的线程池类threadpooltaskexecutor,而threadpooltaskexecutor底层又使用里了jdk中线程池类threadpoolexecutor,线程池类threadpoolexecutor有两个成员方法setcorepoolsize、setmaximumpoolsize可以在运行时设置核心线程数和{banned}最佳大线程数。

setcorepoolsize方法执行流程是:首先会覆盖之前构造函数设置的corepoolsize,然后,如果新的值比原始值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创建新的工作线程。流程图如下:

setmaximumpoolsize方法: 首先会覆盖之前构造函数设置的maximumpoolsize,然后,如果新的值比原来的值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁。

spring 框架提供的线程池类threadpooltaskexecutor,此类封装了对threadpoolexecutor有两个成员方法setcorepoolsize、setmaximumpoolsize的调用。

基于以上源代码分析,要实现一个简单的动态线程池需要以下几步:

(1)定义一个动态线程池类,继承threadpooltaskexecutor,目的跟非动态配置的线程池类threadpooltaskexecutor区分开;

(2)定义和实现一个动态线程池配置定时刷的类,目的定时对比ducc配置的线程池数和本地应用中线程数是否一致,若不一致,则更新本地动态线程池线程池数;

(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key;

(4)定义和实现一个应用启动后根据动态线程池bean和从ducc配置平台拉取配置刷新应用中的线程数配置;

接下来代码一一实现:

(1)动态线程池类

/**
 * 动态线程池
 *
 */ public class dynamicthreadpooltaskexecutor extends threadpooltaskexecutor {
} 

(2)动态线程池配置定时刷新类

@slf4j
public class dynamicthreadpoolrefresh implements initializingbean { /**
     * maintain all automatically registered and manually registered dynamicthreadpooltaskexecutor.
     */ private static final concurrentmap dtp_registry = new concurrenthashmap<>(); /**
     * @param threadpoolbeanname
     * @param threadpooltaskexecutor
     */ public static void registerdynamicthreadpool(string threadpoolbeanname, dynamicthreadpooltaskexecutor threadpooltaskexecutor) {
        log.info("dynamicthreadpool register threadpooltaskexecutor, threadpoolbeanname: {}, executor: {}", threadpoolbeanname, executorconverter.convert(threadpoolbeanname, threadpooltaskexecutor.getthreadpoolexecutor()));
        dtp_registry.putifabsent(threadpoolbeanname, threadpooltaskexecutor);
    } @override public void afterpropertiesset() throws exception {
        this.refresh(); //创建定时任务线程池 scheduledexecutorservice executorservice = new scheduledthreadpoolexecutor(1, (new basicthreadfactory.builder()).namingpattern("dynamicthreadpoolrefresh-%d").daemon(true).build()); //延迟1秒执行,每个1分钟check一次 executorservice.scheduleatfixedrate(new refreshthreadpoolconfig(), 1000l, 60000l, timeunit.milliseconds);
    }
    private void refresh() {
        string dynamicthreadpool = "";
        try {
            if (dtp_registry.isempty()) {
                log.debug("dynamicthreadpool refresh dtp_registry is empty");
                return;
            }
            dynamicthreadpool = duccconfigutil.getvalue(duccconfigconstants.dynamic_thread_pool);
            if (stringutils.isblank(dynamicthreadpool)) {
                log.debug("dynamicthreadpool refresh dynamicthreadpool not config");
                return;
            }
            log.debug("dynamicthreadpool refresh dynamicthreadpool:{}", dynamicthreadpool);
            list threadpoolpropertieslist = jsonutil.json2object(dynamicthreadpool, new typereference>() {
            });
            if (collectionutils.isempty(threadpoolpropertieslist)) {
                log.error("dynamicthreadpool refresh dynamicthreadpool json2object error!{}", dynamicthreadpool);
                return;
            }
            for (threadpoolproperties properties : threadpoolpropertieslist) { dorefresh(properties);
            }
        } catch (exception e) {
            log.error("dynamicthreadpool refresh exception!dynamicthreadpool:{}", dynamicthreadpool, e);
        }
    } /**
     * @param properties
     */ private void dorefresh(threadpoolproperties properties) {
        if (stringutils.isblank(properties.getthreadpoolbeanname())
                || properties.getcorepoolsize() < 1 || properties.getmaxpoolsize() < 1 || properties.getmaxpoolsize() < properties.getcorepoolsize()) {
            log.error("dynamicthreadpool refresh, invalid parameters exist, properties: {}", properties);
            return;
        }
        dynamicthreadpooltaskexecutor threadpooltaskexecutor = dtp_registry.get(properties.getthreadpoolbeanname());
        if (objects.isnull(threadpooltaskexecutor)) {
            log.warn("dynamicthreadpool refresh, dtp_registry not found {}", properties.getthreadpoolbeanname());
            return;
        }
        threadpoolproperties oldprop = executorconverter.convert(properties.getthreadpoolbeanname(), threadpooltaskexecutor.getthreadpoolexecutor());
        if (objects.equals(oldprop.getcorepoolsize(), properties.getcorepoolsize())
                && objects.equals(oldprop.getmaxpoolsize(), properties.getmaxpoolsize())) {
            log.warn("dynamicthreadpool refresh, properties of [{}] have not changed.", properties.getthreadpoolbeanname());
            return;
        }
        if (!objects.equals(oldprop.getcorepoolsize(), properties.getcorepoolsize())) {
            threadpooltaskexecutor.setcorepoolsize(properties.getcorepoolsize());
            log.info("dynamicthreadpool refresh, corepoolsize changed!{} {}", properties.getthreadpoolbeanname(), properties.getcorepoolsize());
        }
        if (!objects.equals(oldprop.getmaxpoolsize(), properties.getmaxpoolsize())) {
            threadpooltaskexecutor.setmaxpoolsize(properties.getmaxpoolsize());
            log.info("dynamicthreadpool refresh, maxpoolsize changed!{} {}", properties.getthreadpoolbeanname(), properties.getmaxpoolsize());
        }
       
        threadpoolproperties newprop = executorconverter.convert(properties.getthreadpoolbeanname(), threadpooltaskexecutor.getthreadpoolexecutor());
        log.info("dynamicthreadpool refresh result!{} oldprop:{},newprop:{}", properties.getthreadpoolbeanname(), oldprop, newprop);
    }
    private class refreshthreadpoolconfig extends timertask {
        private refreshthreadpoolconfig() {
        } @override public void run() {
            dynamicthreadpoolrefresh.this.refresh();
        }
    }
} 

线程池配置类

@data public class threadpoolproperties { /**
     * 线程池名称
     */ private string threadpoolbeanname; /**
     * 线程池核心线程数量
     */ private int corepoolsize; /**
     * 线程池{banned}最佳大线程池数量
     */ private int maxpoolsize;
} 

(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key

ducc配置平台使用见:?

动态线程池配置key:dynamic.thread.pool

配置value:

[  { "threadpoolbeanname": "submitorderthreadpooltaskexecutor", "corepoolsize": 32, "maxpoolsize": 128  }] 

(4) 应用启动刷新应用本地动态线程池配置

@slf4j public class dynamicthreadpoolpostprocessor implements beanpostprocessor { @override public object postprocessafterinitialization(object bean, string beanname) throws beansexception { if (bean instanceof dynamicthreadpooltaskexecutor) { dynamicthreadpoolrefresh.registerdynamicthreadpool(beanname, (dynamicthreadpooltaskexecutor) bean);
        } return bean;
    }
} 

动态线程池bean声明

  <bean id="threadpooltaskexecutor" class="com.jd.concurrent.threadpooltaskexecutorwrapper">  <property name="corepoolsize" value="128"/>  <property name="maxpoolsize" value="512"/>  <property name="queuecapacity" value="500"/>  <property name="keepaliveseconds" value="60"/>  <property name="rejectedexecutionhandler">     <bean class="java.util.concurrent.threadpoolexecutor$callerrunspolicy"/> property> bean>  <bean id="submitorderthreadpooltaskexecutor" class="com.jd.concurrent.dynamicthreadpooltaskexecutor">  <property name="corepoolsize" value="32"/>  <property name="maxpoolsize" value="128"/>  <property name="queuecapacity" value="500"/>  <property name="keepaliveseconds" value="60"/>  <property name="rejectedexecutionhandler">     <bean class="java.util.concurrent.threadpoolexecutor$callerrunspolicy"/> property> bean>  <bean class="com.jd.concurrent.dynamicthreadpoolpostprocessor"/> <bean class="com.jd.concurrent.dynamicthreadpoolrefresh"/> 

业务类注入spring bean后,直接使用即可

 @resource
 private threadpooltaskexecutor submitorderthreadpooltaskexecutor; runnable asynctask = ()->{...}; completablefuture.runasync(asynctask, this.submitorderthreadpooltaskexecutor); 

本文从实际项目的业务痛点场景出发,并基于公司已有的ducc配置平台简单实现了线程池线程数量可配置。

阅读(255) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~
")); function link(t){ var href= $(t).attr('href'); href ="?url=" encodeuricomponent(location.href); $(t).attr('href',href); //setcookie("returnouturl", location.href, 60, "/"); }
网站地图