自学内容网 自学内容网

spark 事件总线listenerBus

事件总线基本流程

图片来源:https://blog.csdn.net/sinat_26781639/article/details/105012302

LiveListenerBus创建

在sparkContext初始化中创建LiveListenerBus对象。

主要变量有两个

  • queues:事件队列,里面存放四个队列,每个队列中有对应的listener注册
  • queuedEvents:发生的事件


注册listener

内部listener注册

举例DynamicAllocation,在SpackContext初始化的时候创建了ExecutorAllocationManager对象,再调用start方法。

在ExecutorAllocationManager的start方法中,调用listenerBus相关方法完成注册

LiveListenerBus中queues分成四个队列,分别对应不同的注册方法,最终都是调用addToQueue方法

addToQueue比较简单,就是从queues中获取,有AsyncEventQueue就加入listener,没有就创建再加入。加入是调用AsyncEventQueue的addListener方法。

AsyncEventQueue的addListener方法是父类ListenerBus的addListener方法。
将listener加入到listenersPlusTimers中


内部listener是放入了对应队列的listenersPlusTimers中。

外部listener注册

在内部listener注册完成后调用setupAndStartListenerBus注册外部listener

在setupAndStartListenerBus方法中,读取配置spark.extraListeners获取要注册的外部listener集合,使用反射创建listener对象,遍历调用addToSharedQueue加入share queue。最后调用listenerBus的start方法启动listenerBus。

事件总线启动

如上图,在listener全部完成注册后,调用listenerBus的start方法启动。
将started变量修改为true,标记listenerBus启动。遍历queues,启动有注册的队列AsyncEventQueue(总共四个队列,但不一定全部都启用)。遍历queuedEvents处理在listenerBus没有启动期间产生的event。最后不再需要缓存消息了,将queuedEvents置为空。

AsyncEventQueue启动
将started变量变成true,标记AsyncEventQueue启动。启动dispatchThread线程。

dispatchThread线程是调用dispatch方法。
在dispatch方法中,可以看到是循环读取eventQueue,从其中读取event,调用postToAll发送给全部的listener。
eventQueue是一个阻塞队列LinkedBlockingQueue


到此,listenerBus启动完成,其中的列队AsyncEventQueue也启动完成。AsyncEventQueue循环从eventQueue中获取event来处理(这里是阻塞的)

发送消息

发送消息的入口是调用LiveListenerBus的post方法。
如果还没有启动,就将消息先缓存到queueEvents中。
如果启动了,就调用postToQueues将消息发送给全部队列。
在postToQueues中是遍历queue,调用post方法。

AsyncEventQueue的post方法中,就是将消息放入eventQueue即可。
但是eventQueue是有容量大小的,超过的消息就会丢弃。

至此,发送消息完成,将消息放入到AsyncEventQueue的eventQueue中。

处理消息

在启动的时候,dispatch线程已经完成了启动,从eventQueue获取event来处理。
处理消息是调用父类postToAll方法

postToAll方法中是遍历该队列全部listener,调用doPostEvent方法。

doPostEvent对应是SparkListenerBus的doPostEvent方法,根据event的类型,调用listener的不同的方法。

listener是要实现的SparkListenerInterface的方法,可以看到方法很多。。。


原文地址:https://blog.csdn.net/weixin_43839095/article/details/140501941

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!