前言
EventBus使用和源码都比较简单,适合新手上手源码阅读。代码基于3.2.0版本,implementation 'org.greenrobot:eventbus:3.2.0'。
主要功能
简单使用流程:在需要订阅消息的地方(activity或者fragment),定义处理消息的方法,并用@Subscribe修饰,同时要在create注册,在destroy解注册,然后在合适的地方,调用post发送该类型的消息,就能在处理消息的地方收到发送的消息。
其中有下面几个主要功能:
1.获取EventBus对象
2.注册
3.解注册
4.发送消息
5.接收消息
val eventBus = EventBus.getDefault()
eventBus.register(this)
eventBus.unregister(this)
eventBus.post("post")
eventBus.postSticky("postSticky")
@Subscribe(threadMode = ThreadMode.MAIN)
fun back(str: String) {
}
获取EventBus对象
一般使用getDefault()方法,这是一个使用双重校验锁方式实现的单例模式,返回一个EventBus对象,源码如下:
static volatile EventBus defaultInstance;
// Convenience singleton for apps using a process-wide EventBus instance.
public static EventBus getDefault() {
EventBus instance = defaultInstance;
if (instance == null) {
synchronized (EventBus.class) {
instance = EventBus.defaultInstance;
if (instance == null) {
instance = EventBus.defaultInstance = new EventBus();
}
}
}
return instance;
}
可以看到EventBus中有两个构造方法
public EventBus() {
this(DEFAULT_BUILDER);
}
EventBus(EventBusBuilder builder) {
logger = builder.getLogger();
subscriptionsByEventType = new HashMap<>();
......一些初始化操作
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
EventBus的对象创建使用了建造者模式(Builder模式),除了使用默认的builder构造EventBus,还可以使用自定义的builder,EventBus提供了一个静态方法生成Builder对象,而在Builer中又提供了方法生成EventBus对象,代码如下:
//静态方法生产Builder
public static EventBusBuilder builder() {
return new EventBusBuilder();
}
//builder对象生成EventBus对象
public EventBus build() {
return new EventBus(this);
}
综上,获取EventBus对象有如下两种方式:
//1进程单例方式获取,使用默认Builder
val eventBus1 = EventBus.getDefault()
//2用户在默认Builder的基础上,可以按照需求自定义Builder参数
val eventBus2 = EventBus.builder()
.ignoreGeneratedIndex(true)
......自定义Builder参数
.logger(null)
.build()
注册
总注册流程
register(this)方法源码如下:
public void register(Object subscriber) {
//1.获取传入参数的class对象
//2.通过subscriberMethodFinder找到传入对象类中所有使用@Subscribe修饰的方法
//3.把找到的方法存起来
Class<?> subscriberClass = subscriber.getClass();//1
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);//2
//3
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
找订阅方法
先看订阅方法包装类SubscriberMethod,有以下几个属性
final Method method;//具体的方法
final ThreadMode threadMode;//指定线程模式
final Class<?> eventType;//事件类型,方法的参数的class对象
final int priority;//优先级
final boolean sticky;//是否是粘性消息
/** Used for efficient comparison */
String methodString;
在用注解@Subscribe修饰方法时,有三个参数可以选,和上面的属性对应,源码如下:
public @interface Subscribe {
ThreadMode threadMode() default ThreadMode.POSTING;
boolean sticky() default false;
int priority() default 0;
}
threadMode属性,表示指定的线程模式,默认使用ThreadMode.POSTING,ThreadMode是一个枚举类,有以下五种类型:ThreadMode.POSTING、ThreadMode.MAIN、ThreadMode.MAIN_ORDERED、ThreadMode.BACKGROUND、ThreadMode.ASYNC。每个枚举的具体含义后面再说。
然后是findSubscriberMethods()方法,源码如下:
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//查看该类是否有已经缓存的订阅方法,如果有,直接返回
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
//找订阅方法的具体逻辑
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
//该类下面没有注解@Subscribe修饰的订阅方法,不可注册,抛出异常
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//把找到的订阅方法缓存到METHOD_CACHE并返回所有订阅方法,METHOD_CACHE是一个ConcurrentHashMap对象,支持并发
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
看一看找订阅方法的具体逻辑,如果ignoreGeneratedIndex为true,就调用findUsingReflection(subscriberClass),否则调用findUsingInfo(subscriberClass)。参数含义是 是否忽略生成索引,索引是EventBus在3.0新加的功能,目的是解决直接使用反射查找订阅方法带来的性能问题,我们先看不使用索引的逻辑,即ignoreGeneratedIndex为true,关于索引的内容放后面再说。
findUsingReflection源码如下:
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
//1.获取FindState实例,
//使用FindState[] FIND_STATE_POOL数组存放findState实例,数组长度固定为4
//如果数组里面没有实例,新建一个findState
//如果数组里面有实例,获取对象使用,避免新建对象带来的消耗,并把池里面的该位置置空,
//和下面的getMethodsAndRelease方法相反,getMethodsAndRelease会把使用的findState清空数据,然后放入池中,以便下次使用
SubscriberMethodFinder.FindState findState = prepareFindState();
//2.根据订阅类subscriberClass初始化findState
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//3.通过反射方式,找到订阅者下面的所有订阅方法
findUsingReflectionInSingleClass(findState);
//4.再去他的父类里面找订阅方法
findState.moveToSuperclass();
}
//5.通过findState实例,获取所有找到的订阅方法,并把findState清空数据后放入池中
return getMethodsAndRelease(findState);
}
其中关键代码是3处的findUsingReflectionInSingleClass,通过反射找订阅方法,源码如下:
private void findUsingReflectionInSingleClass(SubscriberMethodFinder.FindState findState) {
//1.反射获取所有方法
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
try {
methods = findState.clazz.getMethods();
} catch (LinkageError error) { // super class of NoClassDefFoundError to be a bit more broad...
String msg = "Could not inspect methods of " + findState.clazz.getName();
if (ignoreGeneratedIndex) {
msg += ". Please consider using EventBus annotation processor to avoid reflection.";
} else {
msg += ". Please make this class visible to EventBus annotation processor to avoid reflection.";
}
throw new EventBusException(msg, error);
}
findState.skipSuperClasses = true;
}
//2.遍历所有方法,找到满足条件的订阅方法
for (Method method : methods) {
int modifiers = method.getModifiers();
//方法是public,且非static,非abstract
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
//只能有一个参数
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
//方法被注解Subscribe修饰
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
//获取注解参数值
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
代码看起来比较多,其实逻辑很清晰,一看就明白是在做什么。
把找到的方法存起来
遍历找到的订阅方法,调用subscribe方法保存,subscribe源码如下:
// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
//使用订阅者类和订阅方法,生成订阅类Subscription实例
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//根据事件类型,获取所有的订阅subscriptions,如果为空,新建并放入到subscriptionsByEventType这个map里面,是事件和CopyOnWriteArrayList<Subscription>的映射
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
//如果subscriptions不为空,且newSubscription实例已经包含在subscriptions中,说明同一个订阅者的同一个事件重复注册了,抛出异常
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
//按照订阅方法的优先级,把订阅newSubscription添加到subscriptions
//如果两个方法优先级相同按照,先订阅的在前,后订阅的在后
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//把订阅事件添加到对应订阅者的订阅事件list
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
//如果订阅方法是粘性的,把之前发送过的最新粘性事件,再次发送到这个订阅方法
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
至此,注册流程完成。
解注册
unregister(this)方法源码如下:
public synchronized void unregister(Object subscriber) {
//获取该订阅者的所有订阅事件
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
//根据订阅者和订阅事件,取消所有订阅
unsubscribeByEventType(subscriber, eventType);
}
//移除该订阅者
typesBySubscriber.remove(subscriber);
} else {
//如果没有找到该订阅者的事件,打印log,提示该订阅者没有注册
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
//根据订阅事件,获取所有的订阅
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
//比较订阅者,把匹配的数据移除
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
逻辑比较清晰简单,只要看懂了注册流程,解注册自然而然就明白是怎么回事了。
发送消息
发送消息分为普通消息和粘性消息,先看看普通消息的发送
post()源码如下:
public void post(Object event) {
//获取当前线程下的postingState,currentPostingThreadState是一个ThreadLocal<PostingThreadState>实例,保证拿到的postingState是通过线程隔离的
PostingThreadState postingState = currentPostingThreadState.get();
//把事件添加到队列
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
//判断是否正在发送数据,如果不是,开始发送数据的逻辑
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
//事件队列里面有数据时,具体的发送逻辑
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
再看看postSingleEvent的逻辑,都会走到postSingleEventForEventType方法,然后走到postToSubscription方法,下面是postToSubscription的源码:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//按照订阅方法的指定线程发送消息
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
可以看到这段代码的目的是,根据订阅方法的指定的线程模式发送消息。
简单说一下几种线程模式。
POSTING,不进行线程切换,接收消息和发送消息在同一个线程;
MAIN,在主线程接收消息,如果发送线程是主线程,直接调用invokeSubscriber,如果是子线程,把消息放到mainThreadPoster队列;
MAIN_ORDERED,在主线程接收消息,和MAIN的不同之处在于,优先把消息放到mainThreadPoster队列,只有当mainThreadPoster为空是才会直接调用invokeSubscriber;
BACKGROUND,在子线程接收消息,如果发送线程是主线程,把消息放到子线程队列backgroundPoster,如果是子现场,直接调用invokeSubscriber;
ASYNC,在子线程接收消息,独立于发送线程和主线程,直接把消息放到asyncPoster队列。
上述流程中,最关键的两个点是invokeSubscriber和asyncPoster队列(他和mainThreadPoster以及backgroundPoster原理一样,看一个就行),下面看看这两部分的代码。
invokeSubscriber源码如下:
void invokeSubscriber(Subscription subscription, Object event) {
try {
//通过反射调用订阅方法
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
逻辑简单清晰,直接通过反射调用订阅方法。
asyncPoster队列原理是,把消息放在队列里面,然后通过线程池拿到子线程后,在子线程获取消息队列里的消息,然后再执行上面的invokeSubscriber。每来一条消息就开一个线程处理,消息之间是并发的。
backgroundPoster原理和asyncPoster类似,区别是增加了同步逻辑,用来保证消息顺序执行,上一条消息执行完成再执行下一条消息。
mainThreadPoster和backgroundPoster类似,都有加同步,然后通过handler发送消息到主线程,再在主线程调用invokeSubscriber。
说完了post,在来看一下postSticky,源码如下:
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
逻辑比较简单清晰,先把事件存到map里面,然后再调用post,因为是用的map存,所以同一个事件只会有最新的消息。
以上,就是发送消息以及接受消息的所有流程。
索引
使用
首先在app的build.gradle中加入配置:
android {
defaultConfig {
javaCompileOptions {
annotationProcessorOptions {
// 根据项目实际情况,指定辅助索引类的名称和包名
arguments = [ eventBusIndex : 'com.af.etest.MyEventBusIndex' ]
}
}
}
}
dependencies {
compile 'org.greenrobot:eventbus:3.2.0'
// 引入注解处理器
annotationProcessor 'org.greenrobot:eventbus-annotation-processor:3.2.0'
}
然后新建两个activity,增加订阅方法,如下:
public class MainActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
EventBus eventBus = EventBus.builder().addIndex(new MyEventBusIndex()).build();
eventBus.register(this);
}
@Subscribe(threadMode = ThreadMode.ASYNC,priority = 100,sticky = true)
public void callback1(String e) {
}
@Subscribe
public void callback2(Activity activity) {
}
}
public class TestActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_test);
EventBus.getDefault().register(this);
}
@Subscribe
public void testCallback(TestActivity testActivity){
}
}
编译之后,就会自动生成如下的java文件:
//This class is generated by EventBus, do not edit.
public class MyEventBusIndex implements SubscriberInfoIndex {
private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;
static {
SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();
putIndex(new SimpleSubscriberInfo(TestActivity.class, true, new SubscriberMethodInfo[] {
new SubscriberMethodInfo("testCallback", TestActivity.class),
}));
putIndex(new SimpleSubscriberInfo(MainActivity.class, true, new SubscriberMethodInfo[] {
new SubscriberMethodInfo("callback1", String.class, ThreadMode.ASYNC, 100, true),
new SubscriberMethodInfo("callback2", android.app.Activity.class),
}));
}
private static void putIndex(SubscriberInfo info) {
SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
}
@Override
public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
if (info != null) {
return info;
} else {
return null;
}
}
}
基本原理就是在编译时,先获取到所有的订阅方法信息,然后存到指定的SubscriberInfoIndex实现类,以便注册时使用。实现这一功能的代码在这儿
源码
当没有忽略索引的时候,注册流程中,找订阅方法会走到findUsingInfo方法,和上文提过的findUsingReflection方法相比,增加了注释1处的代码,源码如下:
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
//1
while (findState.clazz != null) {
//通过builder设置的SubscriberInfoIndex实现类,获取到SubscriberInfo,这个实现类就是使用索引那一步,自动生成的类MyEventBusIndex
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
//通过获取到的SubscriberInfo信息,生成订阅方法
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
}
else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
和不使用索引相比,索引是在编译的时候就获得了所有的订阅方法信息,而不用在运行时通过反射去获取,大大提升了注册的性能。
其他
另外,EventBus还有几个不常用的api,cancelEventDelivery()取消事件,removeAllStickyEvents()移除所有粘性消息,源码也比较简单清晰,如下:
public void cancelEventDelivery(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
if (!postingState.isPosting) {
throw new EventBusException(
"This method may only be called from inside event handling methods on the posting thread");
} else if (event == null) {
throw new EventBusException("Event may not be null");
} else if (postingState.event != event) {
throw new EventBusException("Only the currently handled event may be aborted");
} else if (postingState.subscription.subscriberMethod.threadMode != ThreadMode.POSTING) {
throw new EventBusException(" event handlers may only abort the incoming event");
}
postingState.canceled = true;
}
public void removeAllStickyEvents() {
synchronized (stickyEvents) {
stickyEvents.clear();
}
}
最后
对这次的源码阅读做一个小小的总结。
读完源码后,对EventBus的使用基本上全部清楚,包括一些不常使用的功能。另外也对EventBus的整个实现流程,包括获取对象实例,注册,发送消息等都比较清楚。还对这些流程里面的细节,包括找订阅方法的两种方式,发送消息时的线程切换等比较清楚。
虽然EventBus用起来比较简单,源码不多,但是有很多地方值得学习的,比如:
使用Builder模式新建对象;
对一些需要耗时才能生成的对象进行缓存,例如订阅方法,findState实例;
使用队列处理消息;
线程池的使用和线程切换;
在编译期生成文件,避免反射导致的性能问题;