EventBus 源码复盘

第一次看源码,先看了几篇别人写的解析文章,熟悉代码逻辑,EventBus的大概思路,现在自己复盘一遍。

EventBus 的初始化

使用 EventBus 的第一件事是注册

1
EventBus.getDefault().register(object)

但是在调用 register 之前,先调用了 getDefault 方法

1
2
3
4
5
6
7
8
9
10
11
12
public static EventBus getDefault() {
EventBus instance = defaultInstance;
if (instance == null) {
synchronized (EventBus.class) {
instance = EventBus.defaultInstance;
if (instance == null) {
instance = EventBus.defaultInstance = new EventBus();
}
}
}
return instance;
}

很简单使用双重检查的单例生成一个 EventBus 实例,我们看看 EventBus 的构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public EventBus() {
this(DEFAULT_BUILDER);
}
EventBus(EventBusBuilder builder) {
//日志工具
logger = builder.getLogger();
//以 Event 类对象为 key , 订阅者 map 为 value 的集合。
subscriptionsByEventType = new HashMap<>();
//以 订阅者为 key ,订阅方法 list 为 value 的集合
typesBySubscriber = new HashMap<>();
//黏性事件集合,以event的类对象为 key,event 本身为 value
stickyEvents = new ConcurrentHashMap<>();
//一个封装类,作用是判断是否在主线程,多出用到
mainThreadSupport = builder.getMainThreadSupport();
//与发布事件相关的接口,后面会知道返回一个 handler 子类继承与它,管理事件队列
//=============以下成员在事件发布中很重要==================
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
//继承自 Runnable 和 Poster ,管理在后台线程订阅的事件队列
backgroundPoster = new BackgroundPoster(this);
//继承自 Runnable 和 Poster ,管理在异步线程订阅的事件队列
asyncPoster = new AsyncPoster(this);
//=====================================================
//索引数量
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
//一个搜索订阅者的订阅方法的封装类,参数为订阅索引,是否严格验证订阅方法,是否忽略索引。作用后面会提到。
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes, builder.strictMethodVerification, builder.ignoreGeneratedIndex);
//订阅异常是否打印日志 默认 true
logSubscriberExceptions = builder.logSubscriberExceptions;
//无订阅者异常是否打印日志 默认 true
logNoSubscriberMessages = builder.logNoSubscriberMessages;
//抛出异常是否发布给订阅者 默认 true
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
//抛出无订阅者异常是否发布给订阅者 默认 true
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
//是否抛出事件发布失败的异常 默认 false
throwSubscriberException = builder.throwSubscriberException;
//是否设置事件继承 默认 true
eventInheritance = builder.eventInheritance;
//线程池 Executors.newCachedThreadPool();
executorService = builder.executorService;
}

很显然这里对一些成员进行初始化,还使用了建造者模式对 EventBus 的一些成员属性做初始化,空参构造函数调用另一个构造函数,传入一个默认的 EventBusBuilder 作为参数,但 EventBus(EventBusBuilder builder) 是默认修饰符,这表示外部无法访问这个方法,但你可以通过 EventBus#builder() 静态方法来自定义属性初始化 EventBus

注册 register

1
2
3
4
5
6
7
8
9
10
11
12
13
//参数为订阅者,在 Android 中就是 Activity 或 Fragment 传入的 this
public void register(Object subscriber) {
//获取订阅者的类对象
Class<?> subscriberClass = subscriber.getClass();
//通过 subscriberMethodFinder.findSubscriberMethods 获取这个类对象的所有订阅方法(即 @Subcribe 注解的方法)
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
//遍历得到的订阅方法,将订阅者与订阅方法绑定
for (SubscriberMethod subscriberMethod : subscriberMethods){
subscribe(subscriber, subscriberMethod);
}
}
}

findSubscriberMethods

上面的代码用到 SubscriberMethodFinder#findSubscriberMethods 方法获取订阅方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//参数是订阅者的类对象
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//METHOD_CACHE 是一个集合,以 订阅者类对象为 key , 订阅方法的list 为 value
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
//如果订阅方法 list 不为空则返回
if (subscriberMethods != null) {
return subscriberMethods;
}
//否则
//如忽略生成的索引就使用反射查找
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {//反之通过索引查找
subscriberMethods = findUsingInfo(subscriberClass);
}
//如没有订阅方法则抛出异常
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {//反之返回订阅方法 list
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}

findUsingReflection

先来看看通过反射查找 findUsingReflection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//参数为订阅类对象
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
//FindState 是一个封装类,具体下面讲
//prepareFindState()的作用其实就是从一个默认空间为 4 的 FindState 数组中返回第一个不为 null 的实例,然后数组对应索引置空。否则就 new 一个 FindState 返回。
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
//clazz 就是 subscriberClass
while (findState.clazz != null) {
findUsingReflectionInSingleClass(findState);
//将 clazz 指向超类
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}

findUsingReflectionInSingleClass

FindState 的具体情况我们通过后面的调用来看,先看核心代码 findUsingReflectionInSingleClass(findState);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
//先尝试直接获取该类声明的或实现接口的方法,不包括继承的方法,更快
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
//获取失败则获取该所有方法
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
//遍历得到方法数组,如果方法是 public 且不是静态或抽象方法
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
//获取方法的参数数量
Class<?>[] parameterTypes = method.getParameterTypes();
//只能有一个参数
if (parameterTypes.length == 1) {
//获取注解对象
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
//获取第一个参数
Class<?> eventType = parameterTypes[0];
//检查该订阅方法是否已存在,不存在就加入
if (findState.checkAdd(method, eventType)) {
//通过注解获取线程模式
ThreadMode threadMode = subscribeAnnotation.threadMode();
//往 findState.subscriberMethods 新增 SubscriberMethod 实例
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
}
//反之,这个方法如果添加了 Subscribe 注解则抛出异常
else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
}
//反之,这个方法如果添加了 Subscribe 注解则抛出异常
else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}

FindState

在看看上面多次用到的 FindState 类的代码(省略部分代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78

class FindState {
//订阅方法的 list
final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
//事件类为 key , Object 为 value(后面可以知道主要是判断是不是 method 以及是否存在 )
final Map<Class, Object> anyMethodByEventType = new HashMap<>();
//方法名和事件类名为 key , 订阅者类对象 为 value
final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
//订阅者类对象,但看源码这个没有用到可能是遗留代码
Class<?> subscriberClass;
//订阅者类对象
Class<?> clazz;
//是否跳过超类,如果细心的话,可以发现在 findUsingReflectionInSingleClass 中获取 clazz 非继承方法失败的话,会置为 true
boolean skipSuperClasses;
//订阅者信息
SubscriberInfo subscriberInfo;
void initForSubscriber(Class<?> subscriberClass) {
this.subscriberClass = clazz = subscriberClass;
skipSuperClasses = false;
subscriberInfo = null;
}

boolean checkAdd(Method method, Class<?> eventType) {
// 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
// Usually a subscriber doesn't have methods listening to the same event type.
//插入事件类型和方法
Object existing = anyMethodByEventType.put(eventType, method);
//如果返回为 Null 表示是新的事件类型和方法
if (existing == null) {
return true;
} else {
if (existing instanceof Method) {//判断 existing 为 Method
if (!checkAddWithMethodSignature((Method) existing, eventType)) {//通过方法签名检查是否存在
// Paranoia check
throw new IllegalStateException();
}
// Put any non-Method object to "consume" the existing Method
//用任意非方法对象消费掉这个事件 key 的 value
anyMethodByEventType.put(eventType, this);
}
//如果 existing 不是 Method 则通过签名检查 method
return checkAddWithMethodSignature(method, eventType);
}
}

private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(method.getName());
methodKeyBuilder.append('>').append(eventType.getName());
//统一用 “方法名>事件类名” 作为 key
String methodKey = methodKeyBuilder.toString();
//获取声明这个方法的类对象
Class<?> methodClass = method.getDeclaringClass();
//获取 subscriberClassByMethodKey 中 methodKey 对应的类对象
Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {//如果 subscriberClassByMethodKey 不存在类对象或 methodClassOld 与 methodClassOld 是否相同或是另一个类的子类或接口
// Only add if not already found in a sub class
return true;
} else {//反之,subscriberClassByMethodKey 插入 methodKey 对应 methodClassOld
// Revert the put, old class is further down the class hierarchy
subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
}
}
//将 clazz 指向非 java/javax/android 包的超类
void moveToSuperclass() {
if (skipSuperClasses) {
clazz = null;
} else {
clazz = clazz.getSuperclass();
String clazzName = clazz.getName();
/** Skip system classes, this just degrades performance. */
if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.")) {
clazz = null;
}
}
}
}

SubscriberMethod

顺便看看 SubscriberMethod 类的代码(省略部分代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/** Used internally by EventBus and generated subscriber indexes. */
public class SubscriberMethod {
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;
/** Used for efficient comparison */
String methodString;

public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, int priority, boolean sticky) {
this.method = method;
this.threadMode = threadMode;
this.eventType = eventType;
this.priority = priority;
this.sticky = sticky;
}
...
}

这个就不注释了,就是将订阅方法的 方法对象,线程模式,事件类对象,优先级,是否黏性等等信息封装成一个类

findUsingInfo

接下来看看 findUsingInfo 是怎么写的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//获取订阅者信息,getSubscriberInfo 具体代码下面看
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
//遍历订阅方法数组
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {//反之还是通过反射
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
//返回方法 List 并释放 findState(调用 findState#recycle)
return getMethodsAndRelease(findState);
}

getSubscriberInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private SubscriberInfo getSubscriberInfo(FindState findState) {
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
return superclassInfo;
}
}
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}
return null;
}

这个方法获取订阅方法都是由 EventBus 3.0 带来的新特性 Subscriber Index
Subscriber Index 使用文档

看完文档,我们知道 Subscriber Index 是 EventBus 3 上的新技术,所以这里也建议还没学习过 EventBus 的可以跳过 2.X 之前的版本直接学习最新版本。
关于 EventBus 的 Subscriber Index 技术的特点,翻译一下官方解释:

It is an optional optimization to speed up initial subscriber registration.

Subscriber Index 是一个可选的优化技术,用来加速初始化订阅者注册。

The subscriber index can be created during build time using the EventBus annotation processor. While it is not required to use an index, it is recommended on Android for best performance.

Subscriber Index 在编译时使用 EventBus 注解处理器创建,虽然没有规定必须使用它,但是官方推荐使用这种方式,因为它在 Android 上有着最佳的性能。

到此,我们通过订阅者类对象获得了所有的订阅方法,开始遍历调用订阅方法,传入订阅者和订阅方法(同步)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//获取订阅方法的事件类型
Class<?> eventType = subscriberMethod.eventType;
//Subscription 封装了订阅者 Object 和 订阅方法类 SubscriberMethod,还用 boolean 类型的 active 来标记状态
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//通过 eventType 获取订阅者列表
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {//subscriptions 为空则新建实例,并插入 subscriptionsByEventType
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {// 反之若订阅者列表中包含 newSubscription ,则抛出异常
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}

//根据新的订阅方法的优先级将订阅者插入 subscriptions
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}

//从 typesBySubscriber 获取这个订阅者的订阅者方法列表
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {//为空则新建,并插入 typesBySubscriber
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
// 订阅方法是否设置黏性模式
if (subscriberMethod.sticky) {
// 是否设置了事件继承
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
// 判断当前事件类型是否为黏性事件或者其子类
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
// 执行设置了sticky模式的订阅方法
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}

因为 checkPostStickyEventToSubscription 内部判断传入的事件为黏性事件后直接调用了发布方法,所以这个方法的源码放到之后再讲。

发布

post

终于轮到发布事件了,先上 post 的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/** Posts the given event to the event bus. */
public void post(Object event) {
//PostingThreadState 是一个封装了 事件队列,发布状态,线程状态,订阅者等等信息的类,currentPostingThreadState 是一个 ThreadLocal 实例。也就是说每个线程都持有自己的 PostingThreadState
PostingThreadState postingState = currentPostingThreadState.get();
//获取事件队列
List<Object> eventQueue = postingState.eventQueue;
//将事件加入队列
eventQueue.add(event);

//判断是否有事件在发布
if (!postingState.isPosting) {
//赋值当前线程
postingState.isMainThread = isMainThread();
//标记正在发布
postingState.isPosting = true;
//如果已被取消,则抛出异常
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//循环发布队列中的事件,直到队列为空
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
//改变发布状态和线程标记
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

注释中提到的 ThreadLocal 其实是就相当于一个以 线程为 key, 泛型 T 为 value 的 map ,可以保证每个线程只能得到自己的值

postSingleEvent

再来看看核心代码 postSingleEvent 的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
//获取事件类对象
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {//如果设置了可继承事件
//通过这个事件类对象查找它的所有超类和接口类对象
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//循环发布事件,具体逻辑下面贴出
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//不可继承则直接发布本事件
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//如果没有找到订阅了这个事件的订阅者
if (!subscriptionFound) {
//logNoSubscriberMessages为true 则抛出异常
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
//发布无订阅者事件
post(new NoSubscriberEvent(this, event));
}
}
}

postSingleEventForEventType

接下来看两个核心方法代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//同步获取订阅了该事件的订阅者 list
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//循环取出订阅者,开始发布事件
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}

postToSubscription

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING://表示哪个线程发布的事件,就在哪个线程调用订阅方法
invokeSubscriber(subscription, event);
break;
case MAIN://无论哪个线程发布,都在主线程调用订阅方法
if (isMainThread) {//在主线程发布,则直接调用订阅方法
invokeSubscriber(subscription, event);
} else {//反之加入到主线程发布器,则加队列
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED://主线程顺序发布
if (mainThreadPoster != null) {//若主线程发布器不为空,加入都列
mainThreadPoster.enqueue(subscription, event);
} else {//反之直接调用订阅方法
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {//主线程发布,事件加入到后台事件发布器
backgroundPoster.enqueue(subscription, event);
} else {//反之直接调用
invokeSubscriber(subscription, event);
}
break;
case ASYNC://加入异步发布器
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

来看看几个发布器(方便记忆自己起的)的区别

HandlerPoster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class HandlerPoster extends Handler implements Poster {
//自定义封装的发布队列,PendingPost 封装了订阅者和对应的订阅事件
private final PendingPostQueue queue;
//处理消息的最大时间
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;

protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}

public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
//当有事件插入,且未处于激活状态时,更改激活标记,发送空消息
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}

@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
//记录开始处理的时刻
long started = SystemClock.uptimeMillis();
while (true) {
//循环取出发送队列的
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
//第二次同步验证队列是否没有东西
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
//通过反射调用订阅方法
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
//超过最大时间则退出循环
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}

可是看到 HandlerPoster 继承了 Handler,并辗转拿到了 MainLooper,所以可以在 UI 线程处理消息。
查了一下,设置 maxMillisInsideHandleMessage 的意义大概是防止大量事件发布阻塞了 UI 线程吧

BackgroundPoster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
final class BackgroundPoster implements Runnable, Poster {

private final PendingPostQueue queue;
private final EventBus eventBus;
//线程池运行标记
private volatile boolean executorRunning;

BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}

public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//插入事件
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
//开启线程池
eventBus.getExecutorService().execute(this);
}
}
}

@Override
public void run() {
try {
try {
while (true) {
//设置 1 秒延迟,队头为空则等待 1 秒,反之直接取出队头
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
//二次同步检查
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
//同样通过反射调用订阅者的订阅方法
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}

}

AsyncPoster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class AsyncPoster implements Runnable, Poster {

private final PendingPostQueue queue;
private final EventBus eventBus;

AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}

public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}

@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}

}

可以看出 AsyncPoster 并没有双重验证(同步锁)循环取出队头的事件去发布,而只是取出一个。

网上找到一个关于线程模式的描述

ThreadMode mean
POSTING 在那个线程post就在线程执行method
MAIN 在主线程执行method,可能会阻塞主线程
MAIN_ORDERED 在主线程执行method,每次都将事件发送到队列,然后由handle发送执行
BACKGROUND 在后台线程中执行,一个线程执行完之后才会去执行另外一个线程
ASYNC 在后台线程中执行,每来一个线程执行一个,事件是并发的

注销(反注册)

HandlerPoster
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/** Unregisters the given subscriber from all event classes. */
//注销传入 subscriber 的所有事件类
public synchronized void unregister(Object subscriber) {
//获取 subscriber 的所有事件类对象
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {//若subscribedTypes列表不为空则遍历调用 unsubscribeByEventType ,最后 typesBySubscriber 中移除 subscriber
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {//反之抛出异常
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}

注册过程我们就知道 typesBySubscriber 是保存订阅者的所有订阅事件类型的一个 Map,这里根据订阅者拿到订阅事件类型 List,然后逐个取消订阅,最后 typesBySubscriber 移除该订阅者。这里只需要关注它是如果取消订阅的,跟踪 unsubscribeByEventType()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */
//只能更新 subscriptionsByEventType
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
//通过事件类型获取所有订阅者
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
//遍历订阅者 list, 若当前 subscription 与 subscriber 相同则改变 active = false 并从 subscriptions 移除
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}

注册过程我们就知道 typesBySubscriber 是保存订阅者的所有订阅事件类型的一个 Map,这里根据订阅者拿到订阅事件类型 List,然后逐个取消订阅,最后 typesBySubscriber 移除该订阅者。这里只需要关注它是如果取消订阅的,跟踪 unsubscribeByEventType()

小结

到这里,大概了解 EventBus 的原理。它将注册的 Object 实例保存起来,再通过反射获得类对象,以及通过注解扫描订阅方法和订阅类型,然后将三者通过几层关系保存起来; 在发布订阅时通过事件类遍历所有订阅方法再通过反射调用订阅方法,起到事件通知的作用。

尾巴

看源码是中级向高级进击的一个坎,我迈的很晚,这是第一次尝试。总得来说 EventBus 并不是一个很复杂的库,但设计的也足够巧妙。看源码是一个很枯燥事情,首先要梳理代码脉络,争取先顺着调用逻辑将一个行为从头走到尾,切忌陷入分支方法的嵌套或者自定义类,这样不但不利于看懂源码,也会使人越来越烦躁。

作者

ChinnSenn

發表於

2019-03-22

更新於

2023-04-20

許可協議

評論