前言

EventBus使用和源码都比较简单,适合新手上手源码阅读。代码基于3.2.0版本,implementation 'org.greenrobot:eventbus:3.2.0'。

主要功能

简单使用流程:在需要订阅消息的地方(activity或者fragment),定义处理消息的方法,并用@Subscribe修饰,同时要在create注册,在destroy解注册,然后在合适的地方,调用post发送该类型的消息,就能在处理消息的地方收到发送的消息。

其中有下面几个主要功能:
1.获取EventBus对象
2.注册
3.解注册
4.发送消息
5.接收消息

        val eventBus = EventBus.getDefault()
        eventBus.register(this)
        eventBus.unregister(this)
        eventBus.post("post")
        eventBus.postSticky("postSticky")
        
    @Subscribe(threadMode = ThreadMode.MAIN)
    fun back(str: String) {
    }

获取EventBus对象

一般使用getDefault()方法,这是一个使用双重校验锁方式实现的单例模式,返回一个EventBus对象,源码如下:

static volatile EventBus defaultInstance;
// Convenience singleton for apps using a process-wide EventBus instance. 
public static EventBus getDefault() {
       EventBus instance = defaultInstance;
       if (instance == null) {
           synchronized (EventBus.class) {
               instance = EventBus.defaultInstance;
               if (instance == null) {
                   instance = EventBus.defaultInstance = new EventBus();
               }
           }
       }
       return instance;
   }

可以看到EventBus中有两个构造方法

public EventBus() {
       this(DEFAULT_BUILDER);
}

EventBus(EventBusBuilder builder) {
       logger = builder.getLogger();
       subscriptionsByEventType = new HashMap<>();
       ......一些初始化操作
       eventInheritance = builder.eventInheritance;
       executorService = builder.executorService;
}

EventBus的对象创建使用了建造者模式(Builder模式),除了使用默认的builder构造EventBus,还可以使用自定义的builder,EventBus提供了一个静态方法生成Builder对象,而在Builer中又提供了方法生成EventBus对象,代码如下:

//静态方法生产Builder
public static EventBusBuilder builder() {
        return new EventBusBuilder();
    }

//builder对象生成EventBus对象
    public EventBus build() {
        return new EventBus(this);
    }

综上,获取EventBus对象有如下两种方式:

//1进程单例方式获取,使用默认Builder
        val eventBus1 = EventBus.getDefault()

//2用户在默认Builder的基础上,可以按照需求自定义Builder参数
        val eventBus2 = EventBus.builder()
            .ignoreGeneratedIndex(true)
            ......自定义Builder参数
            .logger(null)
            .build()

注册

总注册流程

register(this)方法源码如下:

public void register(Object subscriber) {
        //1.获取传入参数的class对象
        //2.通过subscriberMethodFinder找到传入对象类中所有使用@Subscribe修饰的方法
        //3.把找到的方法存起来
        
        Class<?> subscriberClass = subscriber.getClass();//1

        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);//2

        //3
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }

    }

找订阅方法

先看订阅方法包装类SubscriberMethod,有以下几个属性

    final Method method;//具体的方法
    final ThreadMode threadMode;//指定线程模式
    final Class<?> eventType;//事件类型,方法的参数的class对象
    final int priority;//优先级
    final boolean sticky;//是否是粘性消息
    /** Used for efficient comparison */
    String methodString;

在用注解@Subscribe修饰方法时,有三个参数可以选,和上面的属性对应,源码如下:

public @interface Subscribe {
    ThreadMode threadMode() default ThreadMode.POSTING;
    boolean sticky() default false;
    int priority() default 0;
}

threadMode属性,表示指定的线程模式,默认使用ThreadMode.POSTING,ThreadMode是一个枚举类,有以下五种类型:ThreadMode.POSTING、ThreadMode.MAIN、ThreadMode.MAIN_ORDERED、ThreadMode.BACKGROUND、ThreadMode.ASYNC。每个枚举的具体含义后面再说。

然后是findSubscriberMethods()方法,源码如下:

    List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        //查看该类是否有已经缓存的订阅方法,如果有,直接返回
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }

        //找订阅方法的具体逻辑
        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            subscriberMethods = findUsingInfo(subscriberClass);
        }

        if (subscriberMethods.isEmpty()) {
            //该类下面没有注解@Subscribe修饰的订阅方法,不可注册,抛出异常
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            //把找到的订阅方法缓存到METHOD_CACHE并返回所有订阅方法,METHOD_CACHE是一个ConcurrentHashMap对象,支持并发
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

看一看找订阅方法的具体逻辑,如果ignoreGeneratedIndex为true,就调用findUsingReflection(subscriberClass),否则调用findUsingInfo(subscriberClass)。参数含义是 是否忽略生成索引,索引是EventBus在3.0新加的功能,目的是解决直接使用反射查找订阅方法带来的性能问题,我们先看不使用索引的逻辑,即ignoreGeneratedIndex为true,关于索引的内容放后面再说。
findUsingReflection源码如下:

    private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
        //1.获取FindState实例,
        //使用FindState[] FIND_STATE_POOL数组存放findState实例,数组长度固定为4
        //如果数组里面没有实例,新建一个findState
        //如果数组里面有实例,获取对象使用,避免新建对象带来的消耗,并把池里面的该位置置空,
        //和下面的getMethodsAndRelease方法相反,getMethodsAndRelease会把使用的findState清空数据,然后放入池中,以便下次使用
        SubscriberMethodFinder.FindState findState = prepareFindState();

        //2.根据订阅类subscriberClass初始化findState
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            //3.通过反射方式,找到订阅者下面的所有订阅方法
            findUsingReflectionInSingleClass(findState);

            //4.再去他的父类里面找订阅方法
            findState.moveToSuperclass();
        }

        //5.通过findState实例,获取所有找到的订阅方法,并把findState清空数据后放入池中
        return getMethodsAndRelease(findState);
    }

其中关键代码是3处的findUsingReflectionInSingleClass,通过反射找订阅方法,源码如下:

private void findUsingReflectionInSingleClass(SubscriberMethodFinder.FindState findState) {
        
        //1.反射获取所有方法
        Method[] methods;
        try {
            // This is faster than getMethods, especially when subscribers are fat classes like Activities
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
            try {
                methods = findState.clazz.getMethods();
            } catch (LinkageError error) { // super class of NoClassDefFoundError to be a bit more broad...
                String msg = "Could not inspect methods of " + findState.clazz.getName();
                if (ignoreGeneratedIndex) {
                    msg += ". Please consider using EventBus annotation processor to avoid reflection.";
                } else {
                    msg += ". Please make this class visible to EventBus annotation processor to avoid reflection.";
                }
                throw new EventBusException(msg, error);
            }
            findState.skipSuperClasses = true;
        }
        
        //2.遍历所有方法,找到满足条件的订阅方法
        for (Method method : methods) {
            int modifiers = method.getModifiers();
            //方法是public,且非static,非abstract
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                //只能有一个参数
                if (parameterTypes.length == 1) {
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    //方法被注解Subscribe修饰
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        if (findState.checkAdd(method, eventType)) {
                            //获取注解参数值
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }

代码看起来比较多,其实逻辑很清晰,一看就明白是在做什么。

把找到的方法存起来

遍历找到的订阅方法,调用subscribe方法保存,subscribe源码如下:

    // Must be called in synchronized block
    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType;
        //使用订阅者类和订阅方法,生成订阅类Subscription实例
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        //根据事件类型,获取所有的订阅subscriptions,如果为空,新建并放入到subscriptionsByEventType这个map里面,是事件和CopyOnWriteArrayList<Subscription>的映射
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            //如果subscriptions不为空,且newSubscription实例已经包含在subscriptions中,说明同一个订阅者的同一个事件重复注册了,抛出异常
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }

        //按照订阅方法的优先级,把订阅newSubscription添加到subscriptions
        //如果两个方法优先级相同按照,先订阅的在前,后订阅的在后
        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }

        //把订阅事件添加到对应订阅者的订阅事件list
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);

        //如果订阅方法是粘性的,把之前发送过的最新粘性事件,再次发送到这个订阅方法
        if (subscriberMethod.sticky) {
            if (eventInheritance) {
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

至此,注册流程完成。

解注册

unregister(this)方法源码如下:

    public synchronized void unregister(Object subscriber) {
        //获取该订阅者的所有订阅事件
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                //根据订阅者和订阅事件,取消所有订阅
                unsubscribeByEventType(subscriber, eventType);
            }
            //移除该订阅者
            typesBySubscriber.remove(subscriber);
        } else {
            //如果没有找到该订阅者的事件,打印log,提示该订阅者没有注册
            logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }

    private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
        //根据订阅事件,获取所有的订阅
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions != null) {
            int size = subscriptions.size();
            for (int i = 0; i < size; i++) {
                //比较订阅者,把匹配的数据移除
                Subscription subscription = subscriptions.get(i);
                if (subscription.subscriber == subscriber) {
                    subscription.active = false;
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
        }
    }

逻辑比较清晰简单,只要看懂了注册流程,解注册自然而然就明白是怎么回事了。

发送消息

发送消息分为普通消息和粘性消息,先看看普通消息的发送
post()源码如下:

    public void post(Object event) {
        //获取当前线程下的postingState,currentPostingThreadState是一个ThreadLocal<PostingThreadState>实例,保证拿到的postingState是通过线程隔离的
        PostingThreadState postingState = currentPostingThreadState.get();
        //把事件添加到队列
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);

        //判断是否正在发送数据,如果不是,开始发送数据的逻辑
        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {
                    //事件队列里面有数据时,具体的发送逻辑
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

再看看postSingleEvent的逻辑,都会走到postSingleEventForEventType方法,然后走到postToSubscription方法,下面是postToSubscription的源码:

 private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        //按照订阅方法的指定线程发送消息
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

可以看到这段代码的目的是,根据订阅方法的指定的线程模式发送消息。

简单说一下几种线程模式。
POSTING,不进行线程切换,接收消息和发送消息在同一个线程;
MAIN,在主线程接收消息,如果发送线程是主线程,直接调用invokeSubscriber,如果是子线程,把消息放到mainThreadPoster队列;
MAIN_ORDERED,在主线程接收消息,和MAIN的不同之处在于,优先把消息放到mainThreadPoster队列,只有当mainThreadPoster为空是才会直接调用invokeSubscriber;
BACKGROUND,在子线程接收消息,如果发送线程是主线程,把消息放到子线程队列backgroundPoster,如果是子现场,直接调用invokeSubscriber;
ASYNC,在子线程接收消息,独立于发送线程和主线程,直接把消息放到asyncPoster队列。

上述流程中,最关键的两个点是invokeSubscriber和asyncPoster队列(他和mainThreadPoster以及backgroundPoster原理一样,看一个就行),下面看看这两部分的代码。
invokeSubscriber源码如下:

    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            //通过反射调用订阅方法
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

逻辑简单清晰,直接通过反射调用订阅方法。
asyncPoster队列原理是,把消息放在队列里面,然后通过线程池拿到子线程后,在子线程获取消息队列里的消息,然后再执行上面的invokeSubscriber。每来一条消息就开一个线程处理,消息之间是并发的。
backgroundPoster原理和asyncPoster类似,区别是增加了同步逻辑,用来保证消息顺序执行,上一条消息执行完成再执行下一条消息。
mainThreadPoster和backgroundPoster类似,都有加同步,然后通过handler发送消息到主线程,再在主线程调用invokeSubscriber。

说完了post,在来看一下postSticky,源码如下:

 public void postSticky(Object event) {
        synchronized (stickyEvents) {
            stickyEvents.put(event.getClass(), event);
        }
        // Should be posted after it is putted, in case the subscriber wants to remove immediately
        post(event);
    }

逻辑比较简单清晰,先把事件存到map里面,然后再调用post,因为是用的map存,所以同一个事件只会有最新的消息。
以上,就是发送消息以及接受消息的所有流程。

索引

使用

首先在app的build.gradle中加入配置:

android {
    defaultConfig {
        javaCompileOptions {
            annotationProcessorOptions {
                // 根据项目实际情况,指定辅助索引类的名称和包名
                arguments = [ eventBusIndex : 'com.af.etest.MyEventBusIndex' ]
            }
        }
    }
}
dependencies {
    compile 'org.greenrobot:eventbus:3.2.0'
    // 引入注解处理器
    annotationProcessor 'org.greenrobot:eventbus-annotation-processor:3.2.0'
}

然后新建两个activity,增加订阅方法,如下:

public class MainActivity extends AppCompatActivity {

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        EventBus eventBus = EventBus.builder().addIndex(new MyEventBusIndex()).build();
        eventBus.register(this);
    }

    @Subscribe(threadMode = ThreadMode.ASYNC,priority = 100,sticky = true)
    public void callback1(String e) {

    }

    @Subscribe
    public void callback2(Activity activity) {

    }
}

public class TestActivity extends AppCompatActivity {

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_test);
        EventBus.getDefault().register(this);
    }

    @Subscribe
    public void testCallback(TestActivity testActivity){

    }
}

编译之后,就会自动生成如下的java文件:

//This class is generated by EventBus, do not edit. 
public class MyEventBusIndex implements SubscriberInfoIndex {
    private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;

    static {
        SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();

        putIndex(new SimpleSubscriberInfo(TestActivity.class, true, new SubscriberMethodInfo[] {
            new SubscriberMethodInfo("testCallback", TestActivity.class),
        }));

        putIndex(new SimpleSubscriberInfo(MainActivity.class, true, new SubscriberMethodInfo[] {
            new SubscriberMethodInfo("callback1", String.class, ThreadMode.ASYNC, 100, true),
            new SubscriberMethodInfo("callback2", android.app.Activity.class),
        }));

    }

    private static void putIndex(SubscriberInfo info) {
        SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
    }

    @Override
    public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
        SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
        if (info != null) {
            return info;
        } else {
            return null;
        }
    }
}

基本原理就是在编译时,先获取到所有的订阅方法信息,然后存到指定的SubscriberInfoIndex实现类,以便注册时使用。实现这一功能的代码在这儿

源码

当没有忽略索引的时候,注册流程中,找订阅方法会走到findUsingInfo方法,和上文提过的findUsingReflection方法相比,增加了注释1处的代码,源码如下:

 private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);

        //1
        while (findState.clazz != null) {
            //通过builder设置的SubscriberInfoIndex实现类,获取到SubscriberInfo,这个实现类就是使用索引那一步,自动生成的类MyEventBusIndex
            findState.subscriberInfo = getSubscriberInfo(findState);
            if (findState.subscriberInfo != null) {
                //通过获取到的SubscriberInfo信息,生成订阅方法
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            }

            else {
                findUsingReflectionInSingleClass(findState);
            }
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }

和不使用索引相比,索引是在编译的时候就获得了所有的订阅方法信息,而不用在运行时通过反射去获取,大大提升了注册的性能。

其他

另外,EventBus还有几个不常用的api,cancelEventDelivery()取消事件,removeAllStickyEvents()移除所有粘性消息,源码也比较简单清晰,如下:

    public void cancelEventDelivery(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();
        if (!postingState.isPosting) {
            throw new EventBusException(
                    "This method may only be called from inside event handling methods on the posting thread");
        } else if (event == null) {
            throw new EventBusException("Event may not be null");
        } else if (postingState.event != event) {
            throw new EventBusException("Only the currently handled event may be aborted");
        } else if (postingState.subscription.subscriberMethod.threadMode != ThreadMode.POSTING) {
            throw new EventBusException(" event handlers may only abort the incoming event");
        }

        postingState.canceled = true;
    }

        public void removeAllStickyEvents() {
        synchronized (stickyEvents) {
            stickyEvents.clear();
        }
    }

最后

对这次的源码阅读做一个小小的总结。
读完源码后,对EventBus的使用基本上全部清楚,包括一些不常使用的功能。另外也对EventBus的整个实现流程,包括获取对象实例,注册,发送消息等都比较清楚。还对这些流程里面的细节,包括找订阅方法的两种方式,发送消息时的线程切换等比较清楚。
虽然EventBus用起来比较简单,源码不多,但是有很多地方值得学习的,比如:
使用Builder模式新建对象;
对一些需要耗时才能生成的对象进行缓存,例如订阅方法,findState实例;
使用队列处理消息;
线程池的使用和线程切换;
在编译期生成文件,避免反射导致的性能问题;