凯发app官方网站-凯发k8官网下载客户端中心 | | 凯发app官方网站-凯发k8官网下载客户端中心
  • 博客访问: 85415
  • 博文数量: 165
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1655
  • 用 户 组: 普通用户
  • 注册时间: 2022-09-26 14:37
文章分类

全部博文(165)

文章存档

2024年(2)

2023年(95)

2022年(68)

我的朋友
相关博文
  • ·
  • ·
  • ·
  • ·
  • ·
  • ·
  • ·
  • ·
  • ·
  • ·

分类: 架构设计与优化

2023-02-15 16:03:37

作者:京东科技 刘子洋

一文读懂guava eventbus(订阅\\发布事件)-凯发app官方网站

{banned}最佳近项目出现同一消息发送多次的现象,对下游业务方造成困扰,经过排查发现使用 eventbus 方式不正确。也借此机会学习了下 eventbus 并进行分享。以下为分享内容,本文主要分为五个部分,篇幅较长,望大家耐心阅读。

  • 1、简述:简单介绍 eventbus 及其组成部分。

  • 2、原理解析:主要对 listener 注册流程及 event 发布流程进行解析。

  • 3、使用指导:eventbus 简单的使用指导。

  • 4、注意事项:在使用 eventbus 中需要注意的一些隐藏逻辑。

  • 5、分享时提问的问题

  • 6、项目中遇到的问题:上述问题进行详细描述并复现场景。

1.1、概念

下文摘自 eventbus 源码注释,从注释中可以直观了解到他的功能、特性、注意事项

【源码注释】

dispatches events to listeners, and provides ways for listeners to register themselves.

the eventbus allows publish-subscribe-style communication between components without requiring the components to explicitly register with one another (and thus be aware of each other). it is designed exclusively to replace traditional java in-process event distribution using explicit registration. it is not a general-purpose publish-subscribe system, nor is it intended for interprocess communication.

receiving events

to receive events, an object should:

  • expose a public method, known as the event subscriber, which accepts a single argument of the type of event desired;

  • mark it with a subscribe annotation;

  • pass itself to an eventbus instance's register(object) method.

posting events

to post an event, simply provide the event object to the post(object) method. the eventbus instance will determine the type of event and route it to all registered listeners.

events are routed based on their type — an event will be delivered to any subscriber for any type to which the event is assignable. this includes implemented interfaces, all superclasses, and all interfaces implemented by superclasses.

when post is called, all registered subscribers for an event are run in sequence, so subscribers should be reasonably quick. if an event may trigger an extended process (such as a database load), spawn a thread or queue it for later. (for a convenient way to do this, use an asynceventbus.)

subscriber methods

event subscriber methods must accept only one argument: the event.

subscribers should not, in general, throw. if they do, the eventbus will catch and log the exception. this is rarely the right solution for error handling and should not be relied upon; it is intended solely to help find problems during development.

the eventbus guarantees that it will not call a subscriber method from multiple threads simultaneously, unless the method explicitly allows it by bearing the allowconcurrentevents annotation. if this annotation is not present, subscriber methods need not worry about being reentrant, unless also called from outside the eventbus.

dead events

if an event is posted, but no registered subscribers can accept it, it is considered "dead." to give the system a second chance to handle dead events, they are wrapped in an instance of deadevent and reposted.

if a subscriber for a supertype of all events (such as object) is registered, no event will ever be considered dead, and no deadevents will be generated. accordingly, while deadevent extends object, a subscriber registered to receive any object will never receive a deadevent.

this class is safe for concurrent use.

see the guava user guide article on eventbus.
since:
10.0
author:
cliff biffle

1.2、系统流程

1.3、组成部分

1.3.1、调度器

eventbus、asynceventbus 都是一个调度的角色,区别是一个同步一个异步。

  • eventbus
源码注释: > dispatches events to listeners, and provides ways for listeners to register themselves. 意思是说eventbus分发事件(event)给listeners处理,并且提供listeners注册自己的方法。从这里我们可以看出eventbus主要是一个调度的角色。 **eventbus总结** - 1.同步执行,事件发送方在发出事件之后,会等待所有的事件消费方执行完毕后,才会回来继续执行自己后面的代码。 - 2.事件发送方和事件消费方会在同一个线程中执行,消费方的执行线程取决于发送方。 - 3.同一个事件的多个订阅者,在接收到事件的顺序上面有不同。谁先注册到eventbus的,谁先执行,如果是在同一个类中的两个订阅者一起被注册到eventbus的情况,收到事件的顺序跟方法名有关。 
  • asynceventbus
源码注释: > an {@link eventbus} that takes the executor of your choice and uses it to dispatch events, allowing dispatch to occur asynchronously. 意思是说asynceventbus就是eventbus,只不过asynceventbus使用你指定的线程池(不指定使用默认线程池)去分发事件(event),并且是异步进行的。 **asynceventbus总结** - 1.异步执行,事件发送方异步发出事件,不会等待事件消费方是否收到,直接执行自己后面的代码。 - 2.在定义asynceventbus时,构造函数中会传入一个线程池。事件消费方收到异步事件时,消费方会从线程池中获取一个新的线程来执行自己的任务。 - 3.同一个事件的多个订阅者,它们的注册顺序跟接收到事件的顺序上没有任何联系,都会同时收到事件,并且都是在新的线程中,**异步并发**的执行自己的任务。 

1.3.2、事件承载器

  • event
事件主体,用于承载消息。 
  • deadevent
 源码注释:
>wraps an event that was posted, but which had no subscribers and thus could not be delivered, registering a deadevent subscriber is useful for debugging or logging, as it can detect misconfigurations in a system's event distribution.
意思是说deadevent就是一个被包装的event,只不过是一个没有订阅者无法被分发的event。我们可以在开发时注册一个deadevent,因为它可以检测系统事件分布中的错误配置。 

1.3.3、事件注册中心

subscriberregistry

 源码注释: > registry of subscribers to a single event bus. 意思是说subscriberregistry是单个事件总线(eventbus)的订阅者注册表。 

1.3.4、事件分发器

dispatcher

源码注释:
>handler for dispatching events to subscribers, providing different event ordering guarantees that make sense for different situations.
>note: the dispatcher is orthogonal to the subscriber's executor. the dispatcher controls the order in which events are dispatched, while the executor controls how (i.e. on which thread) the subscriber is actually called when an event is dispatched to it.
意思是说dispatcher主要任务是将事件分发到订阅者,并且可以不同的情况,按不同的顺序分发。 

dispatcher 有三个子类,用以满足不同的分发情况

1.perthreadqueueddispatcher

源码注释:
> returns a dispatcher that queues events that are posted reentrantly on a thread that is already dispatching an event, guaranteeing that all events posted on a single thread are dispatched to all subscribers in the order they are posted.
> when all subscribers are dispatched to using a direct executor (which dispatches on the same thread that posts the event), this yields a breadth-first dispatch order on each thread. that is, all subscribers to a single event a will be called before any subscribers to any events b and c that are posted to the event bus by the subscribers to a.
意思是说一个线程在处理事件过程中又发布了一个事件,perthreadqueueddispatcher会将后面这个事件放到{banned}最佳后,从而保证在单个线程上发布的所有事件都按其发布顺序分发给订阅者。**注意,每个线程都要自己存储事件的队列。**
第二段是说perthreadqueueddispatcher按**广度优先**分发事件。并给了一个例子:
代码中发布了事件a,订阅者收到后,在执行过程中又发布了事件b和事件c,perthreadqueueddispatcher会确保事件a分发给所有订阅者后,再分发b、c事件。 

2.legacyasyncdispatcher

源码注释:
> returns a dispatcher that queues events that are posted in a single global queue. this behavior matches the original behavior of asynceventbus exactly, but is otherwise not especially useful. for async dispatch, an immediate dispatcher should generally be preferable.
意思是说legacyasyncdispatcher有一个全局队列用于存放所有事件,legacyasyncdispatcher特性与asynceventbus特性完全相符,除此之外没有其他什么特性。如果异步分发的话,{banned}最佳好用immediate dispatcher。 

3.immediatedispatcher

源码注释:
> returns a dispatcher that dispatches events to subscribers immediately as they're posted without using an intermediate queue to change the dispatch order. this is effectively a depth-first dispatch order, vs. breadth-first when using a queue.
意思是说immediatedispatcher在发布事件时立即将事件分发给订阅者,而不使用中间队列更改分发顺序。这实际上是**深度优先**的调度顺序,而不是使用队列时的**广度优先**。 

1.3.4、订阅者

  • subscriber
源码注释:
> a subscriber method on a specific object, plus the executor that should be used for dispatching events to it. two subscribers are equivalent when they refer to the same method on the same object (not class). this property is used to ensure that no subscriber method is registered more than once.
{banned}中国第一段意思是说,subscriber是特定对象(event)的订阅方法,用于执行被分发事件。第二段说当两个订阅者在同一对象 **(不是类)** 上引用相同的方法时,它们是等效的,此属性用于确保不会多次注册任何订阅者方法,主要说明会对订阅者进行判重,如果是同一个对象的同一个方法,则认为是同一个订阅者,不会进行重复注册。 
  • synchronizedsubscriber
源码注释: > subscriber that synchronizes invocations of a method to ensure that only one thread may enter the method at a time. 意思是说同步方法调用以确保一次只有一个线程可以执行订阅者方法(线程安全)。 

2.1、主体流程

  1. listener 通过 eventbus 进行注册。

  2. subscriberregister 会根据 listener、listener 中含有【@subscribe】注解的方法及各方法参数创建 subscriber 对象,并将其维护在 subscribers(concurrentmap 类型,key 为 event 类对象,value 为 subscriber 集合)中。

  3. publisher 发布事件 event。

  4. 发布 event 后,eventbus 会从 subscriberregister 中查找出所有订阅此事件的 subscriber,然后让 dispatcher 分发 event 到每一个 subscriber。

流程如下:

2.2、listener 注册原理

2.2.1、listener 注册流程

  1. 缓存所有含有 @subscribe 注解方法到 subscribermethodscache(loadingcache, immutablelist>, key 为 listener,value 为 method 集合)。

  2. listener 注册。

2.2.2、原理分析

  • 获取含有 @subscribe 注释的方法进行缓存
    找到所有被【@subscribe】修饰的方法,并进行缓存
    注意!!!这两个方法被 static 修饰,类加载的时候就进行寻找

订阅者唯一标识是【方法名 入参】

  • 注册订阅者
    1. 注册方法

    创建 subscriber 时,如果 method 含有【@allowconcurrentevents】注释,则创建 synchronizedsubscriber,否则创建 subscriber

    2、获取所有订阅者

    3、从缓存中获取所有订阅方法

2.3、event 发布原理

2.3.1、发布主体流程

  • publisher 发布事件 event。

  • eventbus 根据 event 类对象从 subscriberregistry 中获取所有订阅者。

  • 将 event 和 eventsubscribers 交由 dispatcher 去分发。

  • dispatcher 将 event 分发给每个 subscribers。

  • subscriber 利用反射执行订阅者方法。

图中画出了三个 dispatcher 的分发原理。

2.3.2、原理分析

  • 创建缓存
    缓存 eventmsg 所有超类
    注意!!!此处是静态方法,因此在代码加载的时候就会缓存 event 所有超类。

  • 发布 event 事件
    此方法是发布事件时调用的方法。

  • 获取所有订阅者
    1、从缓存中获取所有订阅者

    2、获取 event 超类
    image.png

  • 事件分发
    1、分发入口

    2、分发器分发
    2.1、immediatedispatcher
    来了一个事件则通知对这个事件感兴趣的订阅者。

    2.2、perthreadqueueddispatcher(eventbus 默认选项)
    在同一个线程 post 的 event 执行顺序是有序的。用 threadlocal queue 来实现每个线程的 event 有序性,在把事件添加到 queue 后会有一个 threadlocal dispatching 来判断当前线程是否正在分发,如果正在分发,则这次添加的 event 不会马上进行分发而是等到 dispatching 的值为 false(分发完成)才进行。
    源码如下:


    2.3、legacyasyncdispatcher(asynceventbus 默认选项)
    会有一个全局的队列 concurrentlinkedqueue queue 保存 eventwithsubscriber (事件和 subscriber), 如果被不同的线程 poll,不能保证在 queue 队列中的 event 是有序发布的。源码如下:

  • 执行订阅者方法
    方法入口是 dispatchevent,源码如下:

    由于 subscriber 有两种,因此执行方法也有两种:
    1.subscriber(非线程安全)

    2.synchronizedsubscriber(线程安全)
    注意!!!执行方法会加同步锁

3.1、主要流程

3.2、流程详解

  • 1、创建 eventbus、asynceventbus bean
    在项目中统一配置全局单例 bean(如特殊需求,可配置多例)

  • 2、定义 eventmsg
    设置消息载体。

  • 3、注册 listener
    注册 listener,处理事件

    注意!在使用 postconstruct 注释进行注册时,需要注意子类会执行父类含有 postconstruct 注释的方法。

  • 3、事件发布
    封装统一发布事件的 bean,然后通过 bean 注入到需要发布的 bean 里面进行事件发布。

此处对 eventbus 进行了统一封装收口操作,主要考虑的是如果做一些操作,直接改这一处就可以。如果不需要封装,可以在使用的地方直接注入 eventbus 即可。

4.1、循环分发事件

如果业务流程较长,切记梳理好业务流程,不要让事件循环分发。
目前 eventbus 没有对循环事件进行处理。

4.2、使用 @postconstrucrt 注册 listener

子类在执行实例化时,会执行父类 @postconstrucrt 注释。 如果 listenerson 继承 listenerfather,当两者都使用 @postconstrucrt 注册订阅方法时,子类也会调用父类的注册方法进行注册订阅方法。由于 eventbus 机制,子类注册订阅方法时,也会注册父类的监听方法

subscriber 唯一标志是(listener method),因此在对同一方法注册时,由于不是同一个 listener,所以对于 eventbus 是两个订阅方法。


因此,如果存在 listenerson、listenerfather 两个 listener,且 listenerson 继承 listenerfather。当都使用 @postconstrucrt 注册时,会导致 listenerfather 里面的订阅方法注册两次。

4.3、含有继承关系的 listener

当注册 listener 含有继承关系时,listener 处理 event 消息时,listener 的父类也会处理该消息。

4.3.1、继承关系的订阅者

4.3.2、原理

子类 listener 注册,父类 listener 也会注册

4.4、含有继承关系的 event

如果作为参数的 event 有继承关系,使用 eventbus 发布 event 时,event 父类的监听者也会对 event 进行处理。

4.4.1、执行结果


4.4.2、原理

在分发消息的时候,会获取所有订阅者数据(event 订阅者和 event 超类的订阅者),然后进行分发数据。
获取订阅者数据如下图:

缓存 event 及其超类的类对象,key 为 event 类对象。

问题 1:perthreadqueueddispatcherd 里面的队列,是否是有界队列?

有界队列,{banned}最佳大值为 int 的{banned}最佳大值 (2147483647),源码如下图:

问题 2:dispatcher 分发给订阅者是否有序?

eventbus: 同步事件总线
同一个事件的多个订阅者,在接收到事件的顺序上面有不同。谁先注册到 eventbus 的,谁先执行(由于 base 使用的是 postconstruct 进行注册,因此跟不同 bean 之间的初始化顺序有关系)。如果是在同一个类中的两个订阅者一起被注册到 eventbus 的情况,收到事件的顺序跟方法名有关。

asynceventbus: 异步事件总线:同一个事件的多个订阅者,它们的注册顺序跟接收到事件的顺序上没有任何联系,都会同时收到事件,并且都是在新的线程中,异步并发的执行自己的任务。

问题 3:eventbus 与 springevent 的对比?

  • 使用方式比较
项目 事件 发布者 发布方法 是否异步 监听者 注册方式
eventbus 任意对象 eventbus eventbus#post 支持同步异步 注解 subscribe 方法 手动注册 eventbus#register
springevent 任意对象 applicationeventpublisher applicationeventpublisher#publishevent 支持同步异步 注解 eventlistener 方法 系统注册
  • 使用场景比较
项目 事件区分 是否支持事件簇 是否支持自定义 event 是否支持过滤 是否支持事件隔离 是否支持事务 是否支持设置订阅者消费顺序 复杂程度
eventbus class 简单
spring event class 复杂

参考链接 

问题 4:eventbus 的使用场景,结合现有使用场景考虑是否合适?

eventbus 暂时不适用,主要有一下几个点:

  • eventbus 不支持事务,项目在更新、创建商品时,{banned}最佳好等事务提交成功后,再发送 mq 消息(主要问题点)

  • eventbus 不支持设置同一消息的订阅者消费顺序。

  • eventbus 不支持消息过滤。springevent 支持消息过滤

6.1、问题描述

商品上架时会触发渠道分发功能,会有两步操作

  • 1、创建一条分发记录,并对外发送一条未分发状态的商品变更消息(通过 eventbus 事件发送消息)。

  • 2、将分发记录改为审核中(需要审核)或审核通过(不需要审核),并对外发送一条已分发状态的商品变更消息(通过 eventbus 事件发送消息)。

所以分发会触发两条分发状态不同的商品变更消息,一条是未分发,另一条是已分发。实际发送了两条分发状态相同的商品变更消息,状态都是已分发

6.2、原因

我们先来回顾下 eventbus 监听者处理事件时有三种策略,这是根本原因:

  • immediatedispatcher:来一个事件马上进行处理。

  • perthreadqueueddispatcher(eventbus 默认选项,项目中使用此策略):在同一个线程 post 的 event, 执行的顺序是有序的。用 threadlocal queue 来实现每个线程 post 的 event 是有序的,在把事件添加到 queue 后会有一个 threadlocal dispatching 来判断当前线程是否正在分发,如果正在分发,则这次添加的 event 不会马上进行分发而是等到 dispatching 的值为 false 才进行。

  • legacyasyncdispatcher(asynceventbus 默认选项):会有一个全局的队列 concurrentlinkedqueue queue 保存 eventwithsubscriber (事件和 subscriber), 如果被不同的线程 poll 不能保证在 queue 队列中的 event 是有序发布的。

详情可见上文中的【2.3.4、事件分发】

再看下项目中的逻辑:

商品自动分发在商品变更的listener里操作。
由于当前分发操作处于商品上架事件处理过程中,因此对于添加分发记录事件不会立马处理,而是将其放入队列。
上架操作完成,分发状态变为已分发。
等上架操作完成后,商品变更listener处理分发事件(此时有两条eventmsg,一个是添加分发记录另一个是修改分发状态),分发状态实时查询,对于{banned}中国第一个分发事件,查询到的分发记录是已分发状态。
{banned}最佳终导致两条消息都是已分发状态。 

6.3、场景复现

在 handler 中对静态变量进行两次 1 操作,每操作一步发送一条事件,此处假设静态变量为分发状态。

6.4、解决办法

目前 dispatcher 包用 default 修饰,使用者无法指定 dispatcher 策略。并且 immediatedispatcher 使用 private 修饰。


因此目前暂无解决非同步问题,只能在业务逻辑上进行规避。

其实可以修改源码并发布一个包自己使用,但是公司安全规定不允许这样做,只能通过业务逻辑上进行规避,下图是 github 上对此问题的讨论。

如果项目中需要使用异步解耦处理一些事项,使用 eventbus 还是比较方便的。

阅读(321) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~
")); function link(t){ var href= $(t).attr('href'); href ="?url=" encodeuricomponent(location.href); $(t).attr('href',href); //setcookie("returnouturl", location.href, 60, "/"); }
网站地图