guavaEventBus学习二源码
上篇介绍了guava包中的event bus的使用, 本篇深入源码了解实现细节
EventBus
了解
首先当然要看EventBus类, 类上的注释说了很多, 总结如下:
1 EventBus保证在多线程环境下, 不会同时访问订阅者, 除非订阅者标注了AllowConcurrentEvents注解. 如果没有使用这个注解, 订阅者不需要担心消息重入的问题.
2 消息接收方法只能接收一个参数, 而且会根据消息类型路由消息到指定的订阅者. 如果消息类型存在父子关系, 那么发送子类消息, 父类订阅者也会收到.
3 当执行call方法时, 所有该事件的订阅者会顺序执行, 如果一个订阅者处理耗时(比如数据库操作), 那么整个线程会阻塞, 为了解决这个问题, 我们一来可以尽量减少订阅者的耗时操作, 比如可以异步. 另一种就是可以使用AsyncEventBus, AsyncEventBus采用多线程执行订阅者方法.
4 发出消息后, 如果没有任何消费者消费, 那么认为这个消息是"dead". 我们定义了DeadEvent类来专门处理这些消息, 我们可以监听这类的消息. 如果订阅者监听的消息是Object, 那么会收到所有的消息, 就不存在"dead" 消息了.
EventBus 属性如下
// event bus 标识, 就是个名字private final String identifier;
// jdk executor, 用来执行线程
private final Executor executor;
// 异常处理器
private final SubscriberExceptionHandler exceptionHandler;
// 用于注册订阅者
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
// 消息分发器
private final Dispatcher dispatcher;
大概了解后, 我们从简单例子出发
@Testpublic void 科一() {
AwesomeEventBusDriver.register(new AwesomeStudent());
AwesomeEventBusDriver.publishAnything("通过~");
}
这里省略了一步EventBus创建过程
EventBus eventBus = new EventBus();
源码如下, 很简单, 设置EventBus的名字, 创建默认的一些类. 这些类我们在使用到的地方再看
/** Creates a new EventBus named "default". */public EventBus() {
this("default");
}
public EventBus(String identifier) {
this(
identifier,
MoreExecutors.directExecutor(),
Dispatcher.perThreadDispatchQueue(),
LoggingHandler.INSTANCE);
}
注册
创建EventBus后开始注册事件
public void register(Object object) { subscribers.register(object);
}
注册由SubscriberRegistry处理, 原理就是扫描listener, 然后把符合要求的(有订阅注解)方法存储到集合中, 集合的key是listener方法中的入参类型, 也就是后面消息路由的依据. 每一个key(消息类型)对应一个list(多个订阅者).
那我们就先来看下SubscriberRegistry类. 注释就一句话, 该类是用来注册订阅者到一个事件总线上的. 事件总线在实例化SubscriberRegistry类的时候指定
SubscriberRegistry(EventBus bus) { this.bus = checkNotNull(bus);
}
SubscriberRegistry类属性除了EventBus还有一个集合类, 用于存储注册的订阅者
/*** All registered subscribers, indexed by event type.
*
* <p>The {@link CopyOnWriteArraySet} values make it easy and relatively lightweight to get an
* immutable snapshot of all current subscribers to an event without any locking.
*/
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();
为了线程安全, 集合使用ConcurrentMap, 内部使用CopyOnWriteArraySet存储相同路由的订阅者, 注释中解释, 使用CopyOnWriteArraySet可以在不是用锁的情况下, 很简单而且相对轻量级的获取订阅一个事件(路由)的全部订阅者的不可变快照. COW(Copy On Write)读取不用加锁是因为在修改数据的时候会复制一份, 读取一份, 修改一份, 修改之后再替换之前的内容, 所以读取的内容不会受修改数据影响.
在注册阶段, SubscriberRegistry会将订阅者封装为Subscriber对象, 那我们就来看下订阅者的抽象Subscriber类.
按照惯例先看注释, 一共就两句话
1 这是一个订阅者类(...)
2 如果两个订阅者都是同一个对象的同一个方法, 那么这两个订阅者是相等的. 也就是说每个订阅者的坐标是@Subscribe修饰方法以及方法所在的对象, 每个坐标唯一确定一个订阅者. 这样可以保证, 订阅者重复注册也只会收到一份消息.
好, 接下来是属性, 很简单.
/** 订阅者注册的事件总线 */@Weak private EventBus bus;
/** 订阅者方法所在对象 */
@VisibleForTesting final Object target;
/** 订阅者方法. */
private final Method method;
/** Executor to use for dispatching events to this subscriber. */
private final Executor executor;
比较有意思的是订阅者的创建, 代码如下(我的代码怎么都是public class呢...看着就low)
class Subscriber { /** Creates a {@code Subscriber} for {@code method} on {@code listener}. */
static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)
? new Subscriber(bus, listener, method)
: new SynchronizedSubscriber(bus, listener, method);
}
...
private Subscriber(EventBus bus, Object target, Method method) {
this.bus = bus;
this.target = checkNotNull(target);
this.method = method;
method.setAccessible(true);
this.executor = bus.executor();
}
...
static final class SynchronizedSubscriber extends Subscriber {
private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
super(bus, target, method);
}
...
}
}
订阅者类提供了静态的创建方法, 方法中会根据条件创建两种对象, 通过名字我们可以看出来, 一个是普通的, 一个是同步的. 不知道大家是否还记得EventBus类开头注释中有一段话.
EventBus保证在多线程环境下, 不会同时访问订阅者, 除非订阅者标注了AllowConcurrentEvents注解. 如果没有使用这个注解, 订阅者不需要担心消息重入的问题.
其中的原理就是这里, 首先根据@AllowConcurrentEvents注解生成不同的两个对象, 注解修饰的是普通订阅者, 没有修饰的是同步订阅者. 也就是说默认情况下, 订阅者是线程安全的.
private static boolean isDeclaredThreadSafe(Method method) { return method.getAnnotation(AllowConcurrentEvents.class) != null;
}
同步订阅者由一个内部类实现, 唯一特殊的就是调用订阅者方法时使用了同步关键词
@Overridevoid invokeSubscriberMethod(Object event) throws InvocationTargetException {
synchronized (this) { // 里面内容还是调用的父类方法
super.invokeSubscriberMethod(event);
}
}
测试代码如下
@Testpublic void 多线程开车() throws InterruptedException {
MultiThreadSub threadSub = new MultiThreadSub();
AwesomeEventBusDriver.register(threadSub);
int count = 10000;
Thread t1 = new Thread(() -> {
int c1 = count;
while (c1 > 0) {
c1--;
AwesomeEventBusDriver.publishAnything(1);
}
});
Thread t2 = new Thread(() -> {
int c1 = count;
while (c1 > 0) {
c1--;
AwesomeEventBusDriver.publishAnything(1);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
threadSub.print();
}
public class MultiThreadSub {
private int allowConcurrentSum = 0;
private int noConcurrentSum = 0;
@Subscribe
@AllowConcurrentEvents
public void addAllow(Integer i) {
allowConcurrentSum += i;
}
@Subscribe
public void addNo(Integer i) {
noConcurrentSum += i;
}
public void print() {
System.out.println("allowConcurrentSum: " + allowConcurrentSum);
System.out.println("noConcurrentSum: " + noConcurrentSum);
}
}
结果
然后就是扫描注册类了, 通过下面的方法获取到注册类所有的订阅方法
private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) { return subscriberMethodsCache.getUnchecked(clazz);
}
getUnchecked方法调用层级比较深, 细节先不用关心, 最终会调用下面这个方法
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) { Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
for (Class<?> supertype : supertypes) {
for (Method method : supertype.getDeclaredMethods()) {
if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
// TODO(cgdecker): Should check for a generic parameter type and error out
Class<?>[] parameterTypes = method.getParameterTypes();
// 就是这里在检查参数的个数
checkArgument(
parameterTypes.length == 1,
"Method %s has @Subscribe annotation but has %s parameters."
+ "Subscriber methods must have exactly 1 parameter.",
method,
parameterTypes.length);
MethodIdentifier ident = new MethodIdentifier(method);
if (!identifiers.containsKey(ident)) {
identifiers.put(ident, method);
}
}
}
}
return ImmutableList.copyOf(identifiers.values());
}
// 方法入口
/**
* A thread-safe cache that contains the mapping from each class to all methods in that class and
* all super-classes, that are annotated with {@code @Subscribe}. The cache is shared across all
* instances of this class; this greatly improves performance if multiple EventBus instances are
* created and objects of the same class are registered on all of them.
*/
private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
CacheBuilder.newBuilder()
.weakKeys()
.build(
new CacheLoader<Class<?>, ImmutableList<Method>>() {
@Override
public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
return getAnnotatedMethodsNotCached(concreteClass);
}
});
该方法会扫描注册类, 以及注册类的所有父类, 将其中有@Subscribe的方法缓存下来, key是方法的参数类型. 这样的好处就是同一个类不会扫描多次.
方法中有一句注释TODO(cgdecker): Should check for a generic parameter type and error out, 大概的意思是说后面版本应该检查下泛型参数, 并且抛出异常. 那就来看一下对于泛型参数是如何处理的.
public class GenericObject<T> { @Subscribe
public void TestGeneric(T obj) {
System.out.println("generic " + obj);
}
}
@Test
public void 泛型() {
AwesomeEventBusDriver.register(new GenericObject<String>());
AwesomeEventBusDriver.publishAnything("string");
AwesomeEventBusDriver.publishAnything(90);
AwesomeEventBusDriver.publishAnything(100L);
}
// 执行结果
generic string
generic 90
generic 100
从结果可以看出来, 泛型没有起作用, 其实也好理解, 泛型在编译后会被擦除, 在运行时泛型类型会被认为Object. 测试了泛型马上就想到了基础类型, debug到上述代码, 可以看到(方法参数类型为int)注册的方法参数为int.class . 但是如果我们发送一个数字, 比如5, 会自动装箱为Integer类型, 所以发送的时候就找不到对应的订阅者. 但是如果把int类型换成为Integer就可以接收到消息了.
注册阶段到这里就差不多, 原理很简单, 细节大家可以看看代码.
发送
代码如下
/*** Posts an event to all registered subscribers. This method will return successfully after the
* event has been posted to all subscribers, and regardless of any exceptions thrown by
* subscribers.
*
* <p>If no subscribers have been subscribed for {@code event}"s class, and {@code event} is not
* already a {@link DeadEvent}, it will be wrapped in a DeadEvent and reposted.
*
* @param event event to post.
*/
public void post(Object event) {
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
先看注释, 两句话
1 发送方法会将事件发送给所有的订阅者, 只要订阅者收到了事件, 那么发送方法就会放回成功, 不论在收到事件后执行是否报错. 也就是说订阅者执行报错, 那么本次的事件消息也就丢失了, 大家在使用的时候要考虑好场景.
2 如果该事件没有订阅者, 而且不是DeadEvent类, 则会把该事件包装为DeadEvent类重新发送.
如果注册阶段看的比较明白, 那发送的逻辑就非常简单了.
首先根据发送的消息类型拿到对应的订阅者, 然后调用Dispatcher类的dispatch方法. Dispatcher实例是在实例化EventBus时候创建的, 同步使用的是PerThreadQueuedDispatcher, 异步使用的是LegacyAsyncDispatcher. 先来看同步情况.
/** Per-thread queue of events to dispatch. */private final ThreadLocal<Queue<Event>> queue =
new ThreadLocal<Queue<Event>>() {
@Override
protected Queue<Event> initialValue() {
return Queues.newArrayDeque();
}
};
/** Per-thread dispatch state, used to avoid reentrant event dispatching. */
private final ThreadLocal<Boolean> dispatching =
new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
// 当前线程保存的一个队列, 该队列就是后面的发送队列
Queue<Event> queueForThread = queue.get();
// 将要分发的事件加入队列
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) { // 如果没有正在发送的话就发送
dispatching.set(true); // 设置正在发送(? 多线程临界值)
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
// 拿到订阅者类Subscriber, 并执行发送方法
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
通过类名字PerThreadQueuedDispatcher, 大概知道这是"一个线程一个队列"发送器(...). 看到该类的两个ThreadLocal属性, 大概也明白PreThread的意思了. 发送时先从ThreadLocal中拿到队列, 然后将要发送的内容插入到队列中, 接着根据另一个ThreadLocal对象dispatching判断当前是否正在发送, 如果正在发送, 因为消息已经进入队列了, 所以就不用管了. 如果没有发送, 则执行发送逻辑. 从队列中取出数据, 然后执行数据订阅者的dispatchEvent方法. 订阅者的同步发送方法前面讲过, 最后就是一个反射.
再来看下异步发送, 异步EventBus实例化的时候, 唯一的不同就是入参给了一个线程池, 并指定Dispatcher为LegacyAsyncDispatcher, 在发送逻辑上也很简单, 与同步不同就是队列使用了线程安全的类来保存. 其他没有什么大的区别.
如果你是一个敏感的男银, 可能已经发现还有一个Dispatcher类ImmediateDispatcher, 但是作者没有找到它在包中使用过. 该类更简单了, 从名字就知道, 它不用队列存储消息, 而是直接将消息发送给订阅者.
总结
至此EventBus的学习就结束了, 本文从源码入手从头过了一遍事件总线的执行过程,作者没有一行一行解释源码, 而只是讲解大体流程, 然后拿出比较有意思的部分, 讲了下自己的见解. 后面如果碰到有意思的点再继续补充.
以上是 guavaEventBus学习二源码 的全部内容, 来源链接: utcz.com/z/516780.html