本文是翻译大神的文章《A Web Crawler With asyncio Coroutines》,本人在学习和测试代码的过程中顺便翻译,水平有限。补充一点,在原文的每段代码的后面,我都会附上我的测试结果和简单评论。
原文地址:http://aosabook.org/en/500L/a-web-crawler-with-asyncio-coroutines.html


两位作者:

A. Jesse Jiryu Davis,是MongoDB的核心工程师,在纽约。他写了Motor,一个MongDB的Python异步驱动,他也是MongoDB C语言驱动的开发组长,也是PyMongo组的成员。他也是asyncio和Tornado的contributor。

Guido van Rossum是Python的作者,Python社区提到他会用BDFL(Benevolent Dictator For Life),他的介绍:http://www.python.org/~guido/

介绍

传统的计算机科学强调,用高效的算法尽可能快的完成计算。但是,很多网络程序把时间不是花费在计算,而是在处理大量慢速的连接上。这些程序体现出一个非常不一样的挑战:尽可能高效地去等待这些大量的网络事件。现阶段的方案是异步IO,即async。

这篇文章介绍一个基本的Web爬虫。爬虫程序天生就应该是异步的,因为它在等待大量的网络相应,而仅有少量的计算。它一次能获取越多的页面,它越能尽早结束。如果爬虫程序为每一次请求分配一个线程,随着并发的请求数增长,在它消耗完sockets(操作系统的限制)前,它会耗尽系统内存或者线程资源。我们应该通过异步IO来避免过多的线程。

我们通过三步来介绍示例。首先,我们展示一个async event loop,通过带有callback的event loop描绘一个爬虫程序:它非常高效,但是难于扩展,会变得复杂且难于管理。其次,我们展示协程(corroutine)是既高效又易于扩展的,我们通过Python实现一个基于generator function的corroutine。第三,我们通过asyncio库中的full-featured coroutines来实现爬虫,且用了异步队列。

任务

一个爬虫抓取并下载一个网站的所有页面,从一个根Url开始,抓取所有新url加入到队列中,当没有新链接抓取并且队列为空时,爬虫结束。

我们可以通过并发下载多个页面来加速这一过程。当爬虫找到新的链接,在新的socket上,爬虫对新页面采取相似的爬取操作。当网络响应就绪时,将新的链接加入到队列。可能会有这样的点,当过量的并发请求会导致得到请求的速度变慢,所以我们要控制并发请求的数量,在发出去的请求得到响应前,将新的请求放在队列里等待。

传统的解决

如何提高并发呢?传统地,我们会创建一个线程池,每一个线程通过socket来下载一个页面。例子:

def fetch(url):
    sock = socket.socket()
    sock.connect(('xkcd.com', 80))
    request = 'GET {} HTTP/1.0rnHost: xkcd.comrnrn'.format(url)
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)

    # Page is now downloaded.
    links = parse_links(response)
    q.add(links)

默认地,socket操作是阻塞的(blocking):当一个线程调用connect或recv时,它会阻塞直到操作完成,所以,想要一次下载多个页面时,我们需要多个线程。有经验的程序会分摊线程创建的开销,维护一个空闲的线程池,从线程池中分配并调用线程,执行任务,就像sockets的链接池。

但是,线程是昂贵的,操作系统会对线程的数量有限制(进程、用户、操作系统)。Jesse的系统上,一个Python线程消耗50k内存,创建10000个线程会报错。当我们扩展到几万类似操作时,在耗尽socket前已经耗尽线程了。每个线程的消耗或者系统的限制成为瓶颈。

在1999的著名文章“C10K”中,Dan Kegel强调了IO并发中的多线程中的限制,还有:

It’s time for web servers to handle ten thousand clients simultaneously, don’t you think? After all, the web is a big place now.

Kegel创建了C10K这个术语,10K连接现在看起来很容易,但是这个问题只是在上限上变了而已,本质没变。回到从前,一个线程处理10K连接是不切实际的,现在这个限制的量级已经很高了。确实,我们的demo爬虫能够通过多线程运行的很好,当几十万连接时,限制仍然存在:当达到限制时,大多数系统仍然能够创建socket,但是已经耗尽了线程,我们如何克服呢?

Async

异步IO框架在单一线程中通过非阻塞sockets来处理并发操作。在我们的异步爬虫中,我们在开始连接服务器之前就设置socket为非阻塞:

sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect(('xkcd.com', 80))
except BlockingIOError:
    pass

但是,即使socket工作正常,异步socket会在connect时抛出异常。这是由于底层的C函数,设置errno为EINPROGRESS的结果。

现在我们的爬虫需要一个方法去知道什么时候连接建立,然后发送HTTP请求。虽然,我们可以通过一个无限循环来实现:

request = 'GET {} HTTP/1.0rnHost: xkcd.comrnrn'.format(url)
encoded = request.encode('ascii')

while True:
    try:
        sock.send(encoded)
        break  # Done.
    except OSError as e:
        pass

print('sent')

这个方法不仅浪费电力,也没法在多个socket间有效地等待事件。在很久以前,BSD Unix的解决方法是select,一个C函数,非阻塞地监听一个或多个socket的事件。现在,需要海量连接的WEB程序有另一个方案:poll,BSD的kqueue,Linux的epoll。API类似于select,但是面对海量连接表现的更好。

Python3.4的DefaultSelector会使用可用的最好的类select方法。为了注册网络IO的消息,我们创建一个非阻塞的socket,为它注册一个默认的selector。

from selectors import DefaultSelector, EVENT_WRITE

selector = DefaultSelector()

sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect(('xkcd.com', 80))
except BlockingIOError:
    pass

def connected():
    selector.unregister(sock.fileno())
    print('connected!')

selector.register(sock.fileno(), EVENT_WRITE, connected)

我们忽略假的错误信息,调用selector.register,传入socket的file descripter、一个表示等待事件的常量。为了当连接建立时得到通知,我们传入EVENT_WRITE:我们需要知道什么时候socket是可写入的。我们还穿入一个Python函数connected,当事件发生时运行,没错,就是一个callback函数。

当selector接受事件时,我们处理IO通知:

def loop():
    while True:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

callback函数connected存储在event_key.data中,当非阻塞socket连接上时被执行。

这里调用select函数会pause,等待下一个IO事件,然后loop执行这些IO事件中的callback。

有哪些我们已经展示了?我们展示了,执行一个操作,当操作完成时调用callback函数。一个异步的框架就建立在我们展示的两个特性之上:非阻塞socket和事件循环-为了在一个线程上运行并发操作。

我们已经达到了“并发”(concurrency),而没有达到传统的“并行”(parallelism)。这里,我们建立了一个小的系统,执行重叠(overlapping)的IO,在其它操作还没返回时,我们开始新的操作。但实际上,我们没有利用多核去并行执行计算。话又说回来,我们这个系统是为解决IO bound问题,而非CPU bound问题。

所以,event loop处理并发还是很高效的,他并没有为每个连接都分配线程资源。但是,在我们继续之前,重要地,我们要更正一个常见的误解:异步(async)要比多线程(multithreading)快。大多数情况下,异步要慢,确实,在python中,一个event loop处理少量的非常活跃的连接时,要比多线程慢。在没有GIL时, 多线程的表现要更好。异步IO的正确情景是,大量的不活跃的、慢的、但是又有很频繁的事件时。

Programming With Callbacks

到目前为止,基于我们小巧的异步框架,我们如何创建一个WEB爬虫呢?即使是仅仅抓取网页也很痛苦。

我们设置全局URL集合:

urls_todo = set(['/'])
seen_urls = set(['/'])

seen_urls包括urls_todo加上已经完成的URL,都初始化为/。

抓取一个页面需要一系列的callbacks。当连接建立时,调用connected回调函数,发送GET请求到服务器,然后,它必须等待一个响应,需要注册另一个回调函数,然后,当这个回调函数执行时,如果它没有读完全部响应体,需要再次注册回调,直到读取完成。

让我们收集这些回调函数导Fetcher对象中,它需要URL,一个socket对象,一个地方用来存放响应字节:

class Fetcher:
    def __init__(self, url):
        self.response = b''  # Empty array of bytes.
        self.url = url
        self.sock = None

然后,执行Fetcher.fetch函数:

    # Method on Fetcher class.
    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass

        # Register next callback.
        selector.register(self.sock.fileno(),
                          EVENT_WRITE,
                          self.connected)

fetch函数连接一个socket,要注意,连接创建之前函数已经返回了,因为设置了非阻塞,为了等待连接建立,它必须把控制权交给event loop。要理解为什么,想象一下,我们整个的程序结构:

# Begin fetching http://xkcd.com/353/
fetcher = Fetcher('/353/')
fetcher.fetch()

while True:
    events = selector.select()
    for event_key, event_mask in events:
        callback = event_key.data
        callback(event_key, event_mask)

当调用select函数时,所有的事件通知被处理。强调一下,fetch必须把控制权交给event loop,只有这样,程序才知道什么时候socket连接成功,然后调用connected函数,也就是fetch后的下一callback函数。

来看一下connected函数的实现:

    # Method on Fetcher class.
    def connected(self, key, mask):
        print('connected!')
        selector.unregister(key.fd)
        request = 'GET {} HTTP/1.0rnHost: xkcd.comrnrn'.format(self.url)
        self.sock.send(request.encode('ascii'))

        # Register the next callback.
        selector.register(key.fd,
                          EVENT_READ,
                          self.read_response)

这个函数发送GET请求,真正的程序会去检查send的返回值,以防整个消息没有一次发送成功,但是我们的请求很小,而且程序简单,直接调用send,等待响应。当然,需要注册另一个callback函数,把控制权交还给event loop,下一个callback是read_response函数,处理服务器的响应:

    # Method on Fetcher class.
    def read_response(self, key, mask):
        global stopped

        chunk = self.sock.recv(4096)  # 4k chunk size.
        if chunk:
            self.response += chunk
        else:
            selector.unregister(key.fd)  # Done reading.
            links = self.parse_links()

            # Python set-logic:
            for link in links.difference(seen_urls):
                urls_todo.add(link)
                Fetcher(link).fetch()  # 

每次selector发现socket为可读状态时,这个callbacck就被执行,意味着两个点,socket要么有数据要么就被关闭了。

callback函数(read_response)向socket请求4KB数据。如果不够4KB,chunk就得到剩余的数据。如果大于4KB数据,socket只会读取4KB数据,socket的状态仍保持为可读状态,event loop下次轮询时,会继续调用callback读取这个socket。当响应读取完成,服务器会关闭socket,chunk为空,然后event loop注销该事件。

parse_links函数,没有在这显示,返回一个URL的集合,我们对每一个新的url开始一个新的fetcher,没有并发数的限制。注意,一个异步编程的很好的特性:我们不需要一个mutex来锁住共享的数据(因为不是多线程的嘛,本质上仍然是顺序的执行。。),比如,我们向seen_urls加入URL时就不用加锁。当然,这里也没有分优先级的多任务处理,也就是我们无法任意地在程序里中断现有流程。

我们加一个stopped全局变量,用来控制event loop。

stopped = False

def loop():
    while not stopped:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

一旦所有页面被下载,fetcher设置stopped为True,event loop在下次tick时会终止,程序结束。

这个示例让异步程序看起来简单:面条式的代码。我们需要一些机制来展现一系列的计算和IO操作,然后调度多个系列,并发执行。但是,没有多线程的话,一系列的操作没法在单个函数中并发:无论何时,当一个函数开始一个IO操作,一旦IO操作返回,函数一定要读取这个IO操作的结果或状态。我们要负责去思考和实现这样的代码:保存状态的代码。

通过代码,让我们解释我们刚才在表达什么,考虑传统模式下,我们在一个socket阻塞的一个线程中,抓取一个URL:

# Blocking version.
def fetch(url):
    sock = socket.socket()
    sock.connect(('xkcd.com', 80))
    request = 'GET {} HTTP/1.0rnHost: xkcd.comrnrn'.format(url)
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)

    # Page is now downloaded.
    links = parse_links(response)
    q.add(links)

这个函数中,在一个socket操作和下一个socket操作之间,需要记录哪些状态呢?一个socket,一个url,外加一个累积的响应数据(response)。一个线程的一个函数中,会通过栈中的本地变量来存储状态。这个函数还有一个预存区“continuation”,当IO操作完成时,下一步要执行的操作。运行时的程序会记录预存区,记录线程指令的指针,我们不需要考虑这些。这是语言的内在机制。

但是,基于callback的异步框架,这些语言特性和机制并没有什么用。等待IO操作时,函数必须明确的存储它当前的状态,因为函数在返回时已经丢失了栈信息,这个时候,IO操作还没结束。作为局部变量的代替方案,我们的基于callback的例子,存储socket和response为self的属性,Fetcher实例。作为局部指令的替代方案,通过注册callback来存储预存区,也就是存储connected和read_response函数指针。随着应用功能的增多,我们存储的多个callback间的状态也变得复杂。如此繁重的记账(such onerous bookkeeping)让程序员头痛。

更糟的情况时,在调度下次callback链之前,callback函数抛出异常会发生什么呢?假如在parse_links时出错,抛出异常:

Traceback (most recent call last):
  File "loop-with-callbacks.py", line 111, in 
    loop()
  File "loop-with-callbacks.py", line 106, in loop
    callback(event_key, event_mask)
  File "loop-with-callbacks.py", line 51, in read_response
    links = self.parse_links()
  File "loop-with-callbacks.py", line 67, in parse_links
    raise Exception('parse error')
Exception: parse error

栈trace中仅显示了eventloop中曾运行了一个callback。我们不知道哪里出错导致了异常,这个调用链在两端都出错了:我们不记得我们要去哪,也不知什么时候来的这里。这里,这个上下文的丢失叫做“栈裂开”(stack ripping),很多情况下,这让调查者很混乱。栈裂开也让我们无法在callback链中使用异常处理,也就是“try/except”。

所以,除了关于多线程和异步谁更高效的长时间的辩论外,还有另一个辩论,关于哪一种方式更易于抛出更多的错误:当发生错误,同步处理时,多线程由于数据的竞态环境而易受影响,而callbacks因为栈裂开而难于debug。(threads are susceptible to data races if you make a mistake synchronizing them, but callbacks are stubborn to debug due to stack ripping.)

Coroutines

我们给一个承诺,写出这样的异步代码是有可能的,既有callback的高效,也有经典代码的结构。这个结合是就是“协程”(corroutines),在Python3.4中,通过aiohttp可以直接以协程的方式抓取URL:

    @asyncio.coroutine
    def fetch(self, url):
        response = yield from self.session.get(url)
        body = yield from response.read()

协程也是可以扩展的,相比于每个线程占用50k内存和操作系统的限制,一个Python协程仅占用3k内存。Python很容易几十万个协程。

协程的概念,可以追溯到早年的计算机科学,协程就是,一个可以被暂停和唤醒的子函数。然而,多线程先被操作系统用于多任务并发了。协程多任务控制:选择何时去暂停,哪个协程下次执行。

有很多版本的协程实现,在Python中,也有几个。Python3.4标准库中的是“asyncio”,建立在generators、a Future class和“yield from”语法之上。从Python3.5开始,协程成为Python自带的属性,无论如何,协程在Python3.4中实现,理解它有助于理解Python3.5中的自带coroutines。

为了解释Python3.4基于generator的协程,我们看看generator如何被用于asyncio中的协程,相信你一定会享受地去看完,就像我们很享受写出来一样。一旦我们解释了基于generator的协程,我们就会在爬虫程序中使用它。

How Python Generator Work

在掌握Python的生成器(Generator)之前,需要先理解普通的Python函数是如何工作的。正常情况下,当一个Python函数调用一个子函数,这个子函数保持控制权直到函数返回或者抛出异常,子函数返还控制权给调用者:

>>> def foo():
...     bar()
...
>>> def bar():
...     pass

标准Python解释器是用C语言写的,实现Python函数的那个C语言函数被调用,顺畅的(mellifluously),PyEval_EvalFrameEx。它接受一个Python stack frame object,处理Python的bytecode在frame的上下文中,以下是foo函数的bytecode:

>>> import dis
>>> dis.dis(foo)
  2           0 LOAD_GLOBAL              0 (bar)
              3 CALL_FUNCTION            0 (0 positional, 0 keyword pair)
              6 POP_TOP
              7 LOAD_CONST               0 (None)
             10 RETURN_VALUE

foo函数载入bar函数到它的栈,调用bar,然后返回栈中的值,载入None到栈,返回None。

当PyEval_EvalFrameEx进入CALL_FUNCTION字节码,它递归地创建一个新的Python stack frame:它递归地调用PyEval_EvalFrameEx在新frame中,在新frame中,执行bar函数。

非常重要地一点是,Python stack frame是在堆内存中分配的,Python解释器不过是一个正常的C语言程序,所以解释器程序的栈frame是正常的栈frame。但是,解释器C程序操作的Python stack frames是在堆上操作的,很惊奇,这意味着,Python stack frame可以比调用者的寿命长(不确定??,Among other surprises, this means a Python stack frame can outlive its function call),让我们来看一下,在bar函数中存储frame状态:

>>> import inspect
>>> frame = None
>>> def foo():
...     bar()
...
>>> def bar():
...     global frame
...     frame = inspect.currentframe()
...
>>> foo()
>>> # The frame was executing the code for 'bar'.
>>> frame.f_code.co_name
'bar'
>>> # Its back pointer refers to the frame for 'foo'.
>>> caller_frame = frame.f_back
>>> caller_frame.f_code.co_name
'foo'
Paste_Image.png

现在,我们把这个场景设置在Python的生成器(Generator)中,以同样的粒度来研究,即code objects和stack frame,很惊奇的:

下面是Generator函数:

>>> def gen_fn():
...     result = yield 1
...     print('result of yield: {}'.format(result))
...     result2 = yield 2
...     print('result of 2nd yield: {}'.format(result2))
...     return 'done'
...    

当Python编译gen_fn函数到字节码时,编译器看到yield语句,知道gen_fn时一个生成器函数,不是一个普通函数,对这个函数设置一个标记:

>>> # The generator flag is bit position 5.
>>> generator_bit = 1 >> bool(gen_fn.__code__.co_flags & generator_bit)
True

当你调用一个generator函数时,Python看到这个generator flag,编译器不会实际运行这个函数,编译器会创建一个generator:

>>> gen = gen_fn()
>>> type(gen)

一个Python的generator压缩一个stack frame加上一个对一些code的引用,也就是gen_fn的函数体:

>>> gen.gi_code.co_name
'gen_fn'

所有调用gen_fn的generator都会指向这段同样的代码,但是,每次都会创建一个自己的stack frame,这个stack frame不是一些实际的stack,它在堆上,等待被调用:

Paste_Image.png

这个frame有一个“last instruction”指针,这个指令(instruction)运行得非常频繁,在开始,last instruction的指针为-1,意味着generator还没有开始:

>>> gen.gi_frame.f_lasti
-1

当调用send,generator到达第一个yield,然后暂停,send的返回值为1,因为函数里yield返回1:

>>> gen.send(None)
1

现在,generator的指令指针从开始已经走过了3字节,从Python编译的字节码上已经走过了56字节:

>>> gen.gi_frame.f_lasti
3
>>> len(gen.gi_code.co_code)
56

generator可以被在任何时候被任何函数唤起,因为它的stack frame不是在真正的栈上:它在堆上。它的位置在调用结构上不是固定的,generator不用遵守先入后出的执行顺序,就像传统函数那样,generator很随意,就像云彩一样。

我们可以发送“hellp”到generator,它会成为yield的结果,然后generator继续执行,知道返回2:

>>> gen.send('hello')
result of yield: hello
2

现在,generator的stack frame包括一个局部变量result:

>>> gen.gi_frame.f_locals
{'result': 'hello'}

其它通过gen_fn创建的generator将会拥有他们自己的stack frame和局部变量。

当再一次调用send函数时,generator会继续从第二个yield开始,完成执行,抛出StopIteration异常:

>>> gen.send('goodbye')
result of 2nd yield: goodbye
Traceback (most recent call last):
  File "", line 1, in 
StopIteration: done

这个异常有一个返回值,也就是generator的返回值:字符串“done”。

Building Coroutines With Generators

所以,一个Generator可以暂停,可以通过一个值来唤起,也可以返回一个值。听起来像一个古老的实现异步编程的模型,没有面条一样的回调函数(callback)!我们要创建一个“协程”(coroutine):一个函数,在同一个程序中可以和其它函数协同执行(a routine that is cooperatively scheduled with other routines in the program)。我们的coroutines将会是在Python中简化了的标准库asyncio中的协程。在asyncio中,我们使用generators,futures,“yield from”语法。

首先,我们需要一个方法来代表:coroutine等待的未来的执行结果(future result),一个简化的版本:

class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

一个Future的最初状态为“pengding”,当调用完set_result后为“resolved”。

让我们的fetcher来使用futures和coroutines,写一个带callback的fetch方法:

class Fetcher:
    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass
        selector.register(self.sock.fileno(),
                          EVENT_WRITE,
                          self.connected)

    def connected(self, key, mask):
        print('connected!')
        # And so on....

fetch函数开始连接一个socket,注册一个callback函数,也就是connected,让连接建立时会调用这个函数。现在,让我们结合这两个步骤到一个coroutine中:

def fetch(self):
        sock = socket.socket()
        sock.setblocking(False)
        try:
            sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass

        f = Future()

        def on_connected():
            f.set_result(None)

        selector.register(sock.fileno(),
                          EVENT_WRITE,
                          on_connected)
        yield f
        selector.unregister(sock.fileno())
        print('connected!')

现在,fetch是一个generator function,而不是一个普通的函数,因为它包含一个yield语句,我们创建一个挂起的future,然后yield这个future来暂停fetch函数,直到socket就绪,里层函数on_connected处理future。

但是,当future处理完,谁来唤醒generator呢?我们需要一个coroutine驱动,让我们叫它Task:

class Task:
    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            next_future = self.coro.send(future.result)
        except StopIteration:
            return

        next_future.add_done_callback(self.step)

# Begin fetching http://xkcd.com/353/
fetcher = Fetcher('/353/')
Task(fetcher.fetch())

loop()

task通过发送None到fetch来开始generator,然后fetch运行直到yield一个future,task会把它当成next_future来捕获。当socket连接成功,event loop执行call back函数on_connected,处理future,这个future调用step,唤起fetch。

Factoring Coroutines With yield from

一旦socket创建成功,我们发送HTTP GET请求并且读取服务器的响应,这些步骤不必分散在各个callback函数中,我们把这些步骤汇总到一个generator函数中:

    def fetch(self):
        # ... connection logic from above, then:
        sock.send(request.encode('ascii'))

        while True:
            f = Future()

            def on_readable():
                f.set_result(sock.recv(4096))

            selector.register(sock.fileno(),
                              EVENT_READ,
                              on_readable)
            chunk = yield f
            selector.unregister(sock.fileno())
            if chunk:
                self.response += chunk
            else:
                # Done reading.
                break

这段代码,通过socket读取整个信息,看起来很有用。我们如何把fetch包成一个子函数呢?现在,Python3的yield from来处理这用情况,它让一个generator代理另一个generator。

来看一下如何做,回到我们简单的generator例子上:

>>> def gen_fn():
...     result = yield 1
...     print('result of yield: {}'.format(result))
...     result2 = yield 2
...     print('result of 2nd yield: {}'.format(result2))
...     return 'done'
...   

为了在另一个generator中调用这个generator,通过yield from来代理它:

>>> # Generator function:
>>> def caller_fn():
...     gen = gen_fn()
...     rv = yield from gen
...     print('return value of yield-from: {}'
...           .format(rv))
...
>>> # Make a generator from the
>>> # generator function.
>>> caller = caller_fn()

生成器caller就好像它是生成器gen一样,caller好像是gen的代理:

>>> caller.send(None)
1
>>> caller.gi_frame.f_lasti
15
>>> caller.send('hello')
result of yield: hello
2
>>> caller.gi_frame.f_lasti  # Hasn't advanced.
15
>>> caller.send('goodbye')
result of 2nd yield: goodbye
return value of yield-from: done
Traceback (most recent call last):
  File "", line 1, in 
StopIteration

当caller从gen中yield from时,caller的函数栈指令并没有增长,指令指针一直是15,也就是yield from语句的位置,虽然里面的生成器gen的指令指针从一个yield增加到另一个yield的位置。站在caller的外面来看,我们不能说出一个值是从caller中yield出来的,还是从caller代理的生成器中返回的。站在里面的生成器gen来看,我们不能说出传入的参数是从caller中传入的还是从caller外面传入的。yield from语句就像一个无阻力的通道,通过这个通道,参数传入gen,值传出gen直到gen完成。

一个coroutine可以通过yield from来为子函数代理工作,也可以接收工作的结果。注意,上面,那个caller打印了“return value of yield-from: done”,当gen完成了,它的返回值成为了caller中yield from语句中的值:

rv = yield from gen

更早的,当我们批评基于callback的异步编程时,我们最抱怨的地方是“栈裂开”:当一个callback抛出一个异常,栈追踪基本就没有用了。栈只显示了event loop在执行callback,而丢失了细节,我们来看看coroutine怎么养?

>>> def gen_fn():
...     raise Exception('my error')
>>> caller = caller_fn()
>>> caller.send(None)
Traceback (most recent call last):
  File "", line 1, in 
  File "", line 3, in caller_fn
  File "", line 2, in gen_fn
Exception: my error

这个信息就很有用了!栈追踪信息显示了,当出异常时,caller_fn曾代理gen_fn。更舒服的是,我们可以把调用generator包进一个异常处理里面,就是普通的子函数那样:

>>> def gen_fn():
...     yield 1
...     raise Exception('uh oh')
...
>>> def caller_fn():
...     try:
...         yield from gen_fn()
...     except Exception as exc:
...         print('caught {}'.format(exc))
...
>>> caller = caller_fn()
>>> caller.send(None)
1
>>> caller.send('hello')
caught uh oh

所以,我们代理子生成器的逻辑就像普通函数那样,让我们从fetcher中代理一些有用的子协程,我们写一个read协程来接收一个块:

def read(sock):
    f = Future()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield f  # Read one chunk.
    selector.unregister(sock.fileno())
    return chunk

我们基于read函数创建read_all协程来接收整个消息:

def read_all(sock):
    response = []
    # Read whole response.
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)

    return b''.join(response)

如果你眯着眼看,yield from语句消失了,这就像一个传统常见的做阻塞IO的函数。但是,事实上,read和read_all都是协程。yield from read时,暂停read_all执导IO完成,而read_all暂停时,asyncio的event loop做其他的工作,等待其他IO事件,在下次轮询时,一旦这个事件就绪,read_all就会被唤醒,传入read的结果。

在栈的根上,fetch调用read_all:

class Fetcher:
    def fetch(self):
         # ... connection logic from above, then:
        sock.send(request.encode('ascii'))
        self.response = yield from read_all(sock)

惊奇的是,Task类不需要修改,它驱动外面的fetch协程就像以前那样:

Task(fetcher.fetch())
loop()

当read协程yield一个future,task通过yield from渠道收到它,就好像future从fetch协程直接返回一样。当loop解决一个future,task发送返回值给fetch,然后被read协程接收,就好像直接驱动read一样:

Paste_Image.png

为了让我们的协程实现更完美,有一点需要改善:当等待一个future时,使用yield,当代理一个子协程时,使用yield from。这样可以更好,只要协程暂停,我们就使用yield from。然后协程就不用考虑它在等待什么了。

我们利用Python中generator和iterator间深度通信的优点。优化一个generator,对caller来讲,就像一个iterator,我们使我们的Future类可遍历的,通过一个特殊的方法:

    # Method on Future class.
    def __iter__(self):
        # Tell Task to resume me here.
        yield self
        return self.result

Future类的iter方法是一个协程,可以yield future本身,现在我们把代码改成这样:

# f is a Future.
yield f

换成:

# f is a Future.
yield from f

结果是一样的!作为驱动的Task从调用send时接收到future,当future解决时,发送结果回到协程内。

哪里都用yield from的优点是什么呢?为什么这样做要好于“用yield等待future用yield from代理子协程”呢?这是因为,一个方法可以自由的修改其实现,不用影响调用者:它可能就是一个普通的方法,返回future然后解决它返回一个值,也可以是一个包含yield from的协程,返回一个值。

读者们,我们已经到达了asyncio coroutines的探索末期,我们深入了解了generator的极致,实现了一个建议的future和tasks,我们描述了asyncio如何兼顾两者:高效的并发IO,比callback易读的代码。当然,真正的asyncio要远比我们的实现要复杂,真正的框架要达到:zero-copy IO,公平的调度,异常处理,还有其他特性。

对于asyncio的用户,基于coroutine的代码要比我们刚才看到的简单,在上面的代码中,我们基于第一原则实现了coroutine,所以我们看到callback,tasks,futures。我们甚至看到了非阻塞sockets,还有调用了select。但是,当我们基于asyncio创建应用时,上面的这些都不会出现在代码中。像我们承诺的,我们可以简单的抓取一个url:

    @asyncio.coroutine
    def fetch(self, url):
        response = yield from self.session.get(url)
        body = yield from response.read()

这趟探索已经满足了我们,我们会到最开始的作业:基于asyncio实现一个异步的WEB爬虫。

Coordinating Coroutines

我们开始时,描述了我们希望爬虫如何工作,现在是时候基于asyncio coroutines来实现它了。

我们的爬虫将抓取第一个页面,传入链接,将链接加入队列,然后抓取整个网站,并发抓取所有页面。但是,为了限制客户端和服务器端的负载,我们希望最多数量的workers运行,但不能过多。当一个worker完成抓取一个页面,它应该立即从队列中拿出下一个链接。我们会经过一个没有足够的工作要做的周期,所以一些workers必须暂停。但是,当一个worker碰到一个有很多新链接的页面时,队列突然增大,workers被唤醒工作。最后,当工作结束,程序结束。

想象一下,如果workers是线程,我们该如何实现爬虫程序的算法呢?我们可以使用Python标准库中的同步队列,每次一个新链接加入队列,队列的任务数加1,worker线程完成一个任务后调用task_done,主进程阻塞在Queue.join直到所有任务完成后,结束。

coroutines使用相同的形式,通过asyncio队列:

try:
    from asyncio import JoinableQueue as Queue
except ImportError:
    # In Python 3.5, asyncio.JoinableQueue is
    # merged into Queue.
    from asyncio import Queue

我们收集crawler类中workers共享的状态,在crawl方法中实现主要逻辑,我们在协程中开始crawl方法,在asyncio事件轮询中运行直至完成:

loop = asyncio.get_event_loop()

crawler = crawling.Crawler('http://xkcd.com',
                           max_redirect=10)

loop.run_until_complete(crawler.crawl())

crawler开始时,有一个根url和max_redirect值,也就是最大的跳转数量。它将(URL,max_redirect)对压入队列。

class Crawler:
    def __init__(self, root_url, max_redirect):
        self.max_tasks = 10
        self.max_redirect = max_redirect
        self.q = Queue()
        self.seen_urls = set()

        # aiohttp's ClientSession does connection pooling and
        # HTTP keep-alives for us.
        self.session = aiohttp.ClientSession(loop=loop)

        # Put (URL, max_redirect) in the queue.
        self.q.put((root_url, self.max_redirect))

现在,队列中未完成的任务只有一个,我们载入事件轮询和crawler方法:

loop.run_until_complete(crawler.crawl())

crawler协程唤醒了workers,就像主线程一样:它在join阻塞,直到所有任务完成,同时,所有workers在后台运行。

@asyncio.coroutine
    def crawl(self):
        """Run the crawler until all work is done."""
        workers = [asyncio.Task(self.work())
                   for _ in range(self.max_tasks)]

        # When all work is done, exit.
        yield from self.q.join()
        for w in workers:
            w.cancel()

如果workers时线程的话,我们可能不希望同时开始所有线程。除非确实必要时,我们尽量避免创建昂贵的线程,一个线程池常常会增长。但是,协程却是便宜的,所以,我们直接运行允许的最大的协程数量。

很有趣的是,我们如何结束crawler的。当join所有解决的futures时,worker任务是处于活着并且挂起的状态:它们等待更多的URLs,但是并没有新的任务。所以,主协程在结束前取消所有workers任务。否则,当Python解释器关闭时,调用所有对象的销毁方法,活着的任务会报错:

ERROR:asyncio:Task was destroyed but it is pending!

那么,cancel是如何工作的呢?generators有一个我们没有提到的特性,你可以从generator外面向generator里面抛出一个异常:

>>> gen = gen_fn()
>>> gen.send(None)  # Start the generator as usual.
1
>>> gen.throw(Exception('error'))
Traceback (most recent call last):
  File "", line 3, in 
  File "", line 2, in gen_fn
Exception: error

generator被throw唤醒,但是它现在抛出一个异常。如果generator的调用栈没有代码捕获这个异常,这个异常就会冒泡到最上面,所以,为了取消任务协程:

    # Method of Task class.
    def cancel(self):
        self.coro.throw(CancelledError)

无论何时generator被暂停,一些yield from语句唤醒活着抛出异常,我们在任务的step方法中处理协程取消:

    # Method of Task class.
    def step(self, future):
        try:
            next_future = self.coro.send(future.result)
        except CancelledError:
            self.cancelled = True
            return
        except StopIteration:
            return

        next_future.add_done_callback(self.step)

现在,任务知道它被取消了,所以,当它被销毁时,它不会抱怨。

一旦crawl取消了workers,退出程序。event loop看见协程完成,event loop也会结束:

loop.run_until_complete(crawler.crawl())

文章来源于互联网:基于Asyncio Coroutines的Web爬虫

发表评论