guavaEventBus学习二源码

编程

上篇介绍了guava包中的event bus的使用, 本篇深入源码了解实现细节

EventBus

了解

首先当然要看EventBus类, 类上的注释说了很多, 总结如下:

1 EventBus保证在多线程环境下, 不会同时访问订阅者, 除非订阅者标注了AllowConcurrentEvents注解. 如果没有使用这个注解, 订阅者不需要担心消息重入的问题.

2 消息接收方法只能接收一个参数, 而且会根据消息类型路由消息到指定的订阅者. 如果消息类型存在父子关系, 那么发送子类消息, 父类订阅者也会收到.

3 当执行call方法时, 所有该事件的订阅者会顺序执行, 如果一个订阅者处理耗时(比如数据库操作), 那么整个线程会阻塞, 为了解决这个问题, 我们一来可以尽量减少订阅者的耗时操作, 比如可以异步. 另一种就是可以使用AsyncEventBus, AsyncEventBus采用多线程执行订阅者方法.

4 发出消息后, 如果没有任何消费者消费, 那么认为这个消息是"dead". 我们定义了DeadEvent类来专门处理这些消息, 我们可以监听这类的消息. 如果订阅者监听的消息是Object, 那么会收到所有的消息, 就不存在"dead" 消息了.

EventBus 属性如下

// event bus 标识, 就是个名字

private final String identifier;

// jdk executor, 用来执行线程

private final Executor executor;

// 异常处理器

private final SubscriberExceptionHandler exceptionHandler;

// 用于注册订阅者

private final SubscriberRegistry subscribers = new SubscriberRegistry(this);

// 消息分发器

private final Dispatcher dispatcher;

大概了解后, 我们从简单例子出发

@Test

public void 科一() {

AwesomeEventBusDriver.register(new AwesomeStudent());

AwesomeEventBusDriver.publishAnything("通过~");

}

这里省略了一步EventBus创建过程

EventBus eventBus = new EventBus();

源码如下, 很简单, 设置EventBus的名字, 创建默认的一些类. 这些类我们在使用到的地方再看

/** Creates a new EventBus named "default". */

public EventBus() {

this("default");

}

public EventBus(String identifier) {

this(

identifier,

MoreExecutors.directExecutor(),

Dispatcher.perThreadDispatchQueue(),

LoggingHandler.INSTANCE);

}

注册

创建EventBus后开始注册事件

public void register(Object object) {

subscribers.register(object);

}

注册由SubscriberRegistry处理, 原理就是扫描listener, 然后把符合要求的(有订阅注解)方法存储到集合中, 集合的key是listener方法中的入参类型, 也就是后面消息路由的依据. 每一个key(消息类型)对应一个list(多个订阅者).

那我们就先来看下SubscriberRegistry类. 注释就一句话, 该类是用来注册订阅者到一个事件总线上的. 事件总线在实例化SubscriberRegistry类的时候指定

 SubscriberRegistry(EventBus bus) {

this.bus = checkNotNull(bus);

}

SubscriberRegistry类属性除了EventBus还有一个集合类, 用于存储注册的订阅者

/**

* All registered subscribers, indexed by event type.

*

* <p>The {@link CopyOnWriteArraySet} values make it easy and relatively lightweight to get an

* immutable snapshot of all current subscribers to an event without any locking.

*/

private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =

Maps.newConcurrentMap();

为了线程安全, 集合使用ConcurrentMap, 内部使用CopyOnWriteArraySet存储相同路由的订阅者, 注释中解释, 使用CopyOnWriteArraySet可以在不是用锁的情况下, 很简单而且相对轻量级的获取订阅一个事件(路由)的全部订阅者的不可变快照. COW(Copy On Write)读取不用加锁是因为在修改数据的时候会复制一份, 读取一份, 修改一份, 修改之后再替换之前的内容, 所以读取的内容不会受修改数据影响.

在注册阶段, SubscriberRegistry会将订阅者封装为Subscriber对象, 那我们就来看下订阅者的抽象Subscriber类.

按照惯例先看注释, 一共就两句话

1 这是一个订阅者类(...)

2 如果两个订阅者都是同一个对象的同一个方法, 那么这两个订阅者是相等的. 也就是说每个订阅者的坐标是@Subscribe修饰方法以及方法所在的对象, 每个坐标唯一确定一个订阅者. 这样可以保证, 订阅者重复注册也只会收到一份消息.

好, 接下来是属性, 很简单.

/** 订阅者注册的事件总线 */

@Weak private EventBus bus;

/** 订阅者方法所在对象 */

@VisibleForTesting final Object target;

/** 订阅者方法. */

private final Method method;

/** Executor to use for dispatching events to this subscriber. */

private final Executor executor;

比较有意思的是订阅者的创建, 代码如下(我的代码怎么都是public class呢...看着就low)

class Subscriber {

/** Creates a {@code Subscriber} for {@code method} on {@code listener}. */

static Subscriber create(EventBus bus, Object listener, Method method) {

return isDeclaredThreadSafe(method)

? new Subscriber(bus, listener, method)

: new SynchronizedSubscriber(bus, listener, method);

}

...

private Subscriber(EventBus bus, Object target, Method method) {

this.bus = bus;

this.target = checkNotNull(target);

this.method = method;

method.setAccessible(true);

this.executor = bus.executor();

}

...

static final class SynchronizedSubscriber extends Subscriber {

private SynchronizedSubscriber(EventBus bus, Object target, Method method) {

super(bus, target, method);

}

...

}

}

订阅者类提供了静态的创建方法, 方法中会根据条件创建两种对象, 通过名字我们可以看出来, 一个是普通的, 一个是同步的. 不知道大家是否还记得EventBus类开头注释中有一段话.

EventBus保证在多线程环境下, 不会同时访问订阅者, 除非订阅者标注了AllowConcurrentEvents注解. 如果没有使用这个注解, 订阅者不需要担心消息重入的问题.

其中的原理就是这里, 首先根据@AllowConcurrentEvents注解生成不同的两个对象, 注解修饰的是普通订阅者, 没有修饰的是同步订阅者. 也就是说默认情况下, 订阅者是线程安全的.

private static boolean isDeclaredThreadSafe(Method method) {

return method.getAnnotation(AllowConcurrentEvents.class) != null;

}

同步订阅者由一个内部类实现, 唯一特殊的就是调用订阅者方法时使用了同步关键词

@Override

void invokeSubscriberMethod(Object event) throws InvocationTargetException {

synchronized (this) { // 里面内容还是调用的父类方法

super.invokeSubscriberMethod(event);

}

}

测试代码如下

@Test

public void 多线程开车() throws InterruptedException {

MultiThreadSub threadSub = new MultiThreadSub();

AwesomeEventBusDriver.register(threadSub);

int count = 10000;

Thread t1 = new Thread(() -> {

int c1 = count;

while (c1 > 0) {

c1--;

AwesomeEventBusDriver.publishAnything(1);

}

});

Thread t2 = new Thread(() -> {

int c1 = count;

while (c1 > 0) {

c1--;

AwesomeEventBusDriver.publishAnything(1);

}

});

t1.start();

t2.start();

t1.join();

t2.join();

threadSub.print();

}

public class MultiThreadSub {

private int allowConcurrentSum = 0;

private int noConcurrentSum = 0;

@Subscribe

@AllowConcurrentEvents

public void addAllow(Integer i) {

allowConcurrentSum += i;

}

@Subscribe

public void addNo(Integer i) {

noConcurrentSum += i;

}

public void print() {

System.out.println("allowConcurrentSum: " + allowConcurrentSum);

System.out.println("noConcurrentSum: " + noConcurrentSum);

}

}

结果

然后就是扫描注册类了, 通过下面的方法获取到注册类所有的订阅方法

private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {

return subscriberMethodsCache.getUnchecked(clazz);

}

getUnchecked方法调用层级比较深, 细节先不用关心, 最终会调用下面这个方法

private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {

Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();

Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();

for (Class<?> supertype : supertypes) {

for (Method method : supertype.getDeclaredMethods()) {

if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {

// TODO(cgdecker): Should check for a generic parameter type and error out

Class<?>[] parameterTypes = method.getParameterTypes();

// 就是这里在检查参数的个数

checkArgument(

parameterTypes.length == 1,

"Method %s has @Subscribe annotation but has %s parameters."

+ "Subscriber methods must have exactly 1 parameter.",

method,

parameterTypes.length);

MethodIdentifier ident = new MethodIdentifier(method);

if (!identifiers.containsKey(ident)) {

identifiers.put(ident, method);

}

}

}

}

return ImmutableList.copyOf(identifiers.values());

}

// 方法入口

/**

* A thread-safe cache that contains the mapping from each class to all methods in that class and

* all super-classes, that are annotated with {@code @Subscribe}. The cache is shared across all

* instances of this class; this greatly improves performance if multiple EventBus instances are

* created and objects of the same class are registered on all of them.

*/

private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =

CacheBuilder.newBuilder()

.weakKeys()

.build(

new CacheLoader<Class<?>, ImmutableList<Method>>() {

@Override

public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {

return getAnnotatedMethodsNotCached(concreteClass);

}

});

该方法会扫描注册类, 以及注册类的所有父类, 将其中有@Subscribe的方法缓存下来, key是方法的参数类型. 这样的好处就是同一个类不会扫描多次.

方法中有一句注释TODO(cgdecker): Should check for a generic parameter type and error out, 大概的意思是说后面版本应该检查下泛型参数, 并且抛出异常. 那就来看一下对于泛型参数是如何处理的.

public class GenericObject<T> {

@Subscribe

public void TestGeneric(T obj) {

System.out.println("generic " + obj);

}

}

@Test

public void 泛型() {

AwesomeEventBusDriver.register(new GenericObject<String>());

AwesomeEventBusDriver.publishAnything("string");

AwesomeEventBusDriver.publishAnything(90);

AwesomeEventBusDriver.publishAnything(100L);

}

// 执行结果

generic string

generic 90

generic 100

从结果可以看出来, 泛型没有起作用, 其实也好理解, 泛型在编译后会被擦除, 在运行时泛型类型会被认为Object. 测试了泛型马上就想到了基础类型, debug到上述代码, 可以看到(方法参数类型为int)注册的方法参数为int.class . 但是如果我们发送一个数字, 比如5, 会自动装箱为Integer类型, 所以发送的时候就找不到对应的订阅者. 但是如果把int类型换成为Integer就可以接收到消息了.

注册阶段到这里就差不多, 原理很简单, 细节大家可以看看代码.

发送

代码如下

/**

* Posts an event to all registered subscribers. This method will return successfully after the

* event has been posted to all subscribers, and regardless of any exceptions thrown by

* subscribers.

*

* <p>If no subscribers have been subscribed for {@code event}"s class, and {@code event} is not

* already a {@link DeadEvent}, it will be wrapped in a DeadEvent and reposted.

*

* @param event event to post.

*/

public void post(Object event) {

Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);

if (eventSubscribers.hasNext()) {

dispatcher.dispatch(event, eventSubscribers);

} else if (!(event instanceof DeadEvent)) {

// the event had no subscribers and was not itself a DeadEvent

post(new DeadEvent(this, event));

}

}

先看注释, 两句话

1 发送方法会将事件发送给所有的订阅者, 只要订阅者收到了事件, 那么发送方法就会放回成功, 不论在收到事件后执行是否报错. 也就是说订阅者执行报错, 那么本次的事件消息也就丢失了, 大家在使用的时候要考虑好场景.

2 如果该事件没有订阅者, 而且不是DeadEvent类, 则会把该事件包装为DeadEvent类重新发送.

如果注册阶段看的比较明白, 那发送的逻辑就非常简单了.

首先根据发送的消息类型拿到对应的订阅者, 然后调用Dispatcher类的dispatch方法. Dispatcher实例是在实例化EventBus时候创建的, 同步使用的是PerThreadQueuedDispatcher, 异步使用的是LegacyAsyncDispatcher. 先来看同步情况.

/** Per-thread queue of events to dispatch. */

private final ThreadLocal<Queue<Event>> queue =

new ThreadLocal<Queue<Event>>() {

@Override

protected Queue<Event> initialValue() {

return Queues.newArrayDeque();

}

};

/** Per-thread dispatch state, used to avoid reentrant event dispatching. */

private final ThreadLocal<Boolean> dispatching =

new ThreadLocal<Boolean>() {

@Override

protected Boolean initialValue() {

return false;

}

};

@Override

void dispatch(Object event, Iterator<Subscriber> subscribers) {

checkNotNull(event);

checkNotNull(subscribers);

// 当前线程保存的一个队列, 该队列就是后面的发送队列

Queue<Event> queueForThread = queue.get();

// 将要分发的事件加入队列

queueForThread.offer(new Event(event, subscribers));

if (!dispatching.get()) { // 如果没有正在发送的话就发送

dispatching.set(true); // 设置正在发送(? 多线程临界值)

try {

Event nextEvent;

while ((nextEvent = queueForThread.poll()) != null) {

while (nextEvent.subscribers.hasNext()) {

// 拿到订阅者类Subscriber, 并执行发送方法

nextEvent.subscribers.next().dispatchEvent(nextEvent.event);

}

}

} finally {

dispatching.remove();

queue.remove();

}

}

}

通过类名字PerThreadQueuedDispatcher, 大概知道这是"一个线程一个队列"发送器(...). 看到该类的两个ThreadLocal属性, 大概也明白PreThread的意思了. 发送时先从ThreadLocal中拿到队列, 然后将要发送的内容插入到队列中, 接着根据另一个ThreadLocal对象dispatching判断当前是否正在发送, 如果正在发送, 因为消息已经进入队列了, 所以就不用管了. 如果没有发送, 则执行发送逻辑. 从队列中取出数据, 然后执行数据订阅者的dispatchEvent方法. 订阅者的同步发送方法前面讲过, 最后就是一个反射.

再来看下异步发送, 异步EventBus实例化的时候, 唯一的不同就是入参给了一个线程池, 并指定Dispatcher为LegacyAsyncDispatcher, 在发送逻辑上也很简单, 与同步不同就是队列使用了线程安全的类来保存. 其他没有什么大的区别.

如果你是一个敏感的男银, 可能已经发现还有一个Dispatcher类ImmediateDispatcher, 但是作者没有找到它在包中使用过. 该类更简单了, 从名字就知道, 它不用队列存储消息, 而是直接将消息发送给订阅者.

总结

至此EventBus的学习就结束了, 本文从源码入手从头过了一遍事件总线的执行过程,作者没有一行一行解释源码, 而只是讲解大体流程, 然后拿出比较有意思的部分, 讲了下自己的见解. 后面如果碰到有意思的点再继续补充.

以上是 guavaEventBus学习二源码 的全部内容, 来源链接: utcz.com/z/516780.html

回到顶部