锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

Python 异步 IO 、协程、asyncio、async/await、aiohttp

时间:2023-12-27 16:07:02 sc连接器挂掉的原因


From :廖雪峰异步IO :https://www.liaoxuefeng.com/wiki/1016959663602400/1017959540289152

Python Async/Await入门指南 :https://zhuanlan.zhihu.com/p/27258289

Python 生成器 和 yield 关键字:https://blog.csdn.net/freeking101/article/details/51126293

协程与任务官网文档https://docs.python.org/zh-cn/3/library/asyncio-task.html

Python介绍了中异步协程的使用方法https://blog.csdn.net/freeking101/article/details/88119858

python 协程详解及I/O多路复用,I/O异步:https://blog.csdn.net/u014028063/article/details/81408395

Python深入理解协程:https://www.cnblogs.com/zhaof/p/7631851.html

asyncio 进阶:Python黑魔法 --- 异步IO( asyncio) 协程:https://www.cnblogs.com/dhcn/p/9033628.html

谈谈Python协程技术的演变:https://www.freebuf.com/company-information/153421.html

最后推荐一下《流畅的Python》,这本书中 第16章 协程部分非常详细
《流畅的Python》pdf 下载地址:https://download.csdn.net/download/freeking101/10993120

gevent 是 python 微线程程并发框架 greenlet 用于核心 epoll 事件监控机制等诸多优化变得高效。

aiohttp 使用代理 ip 访问 https 网站报错的问题https://blog.csdn.net/qq_43210211/article/details/108379917

Python:使用 Future、asyncio 处理并发

:https://blog.csdn.net/sinat_38682860/article/details/105419842

异步 IO

在 IO 编程(廖雪峰 Python IO 编程 :https://www.liaoxuefeng.com/wiki/1016959663602400/1017606916795776)我们已经知道一节,CPU比磁盘、网络等快得多IO。在线程中,CPU执行代码的速度非常快,但一旦遇到IO如果你读写文件并发送网络数据,你需要等待IO只有完成操作,才能继续下一步操作。这种情况称为同步IO。

在IO在操作过程中,当前线程被悬挂,其他需要CPU当前线程无法执行执行代码。

因为一个 IO 操作阻塞了当前的线程,导致其他代码无法执行,因此我们必须使用多线程或多过程并发执行代码,以服务于多个用户。如果遇到它,每个用户将分配一个线程IO其他用户的线程不受影响。

虽然多线程和多过程模型解决了并发问题,但系统不能无上限地增加线程。一旦线程数量过多,因为系统切换线程的成本也很高,CPU在线程切换需要时间,实际操作代码的时间更少,导致性能严重下降。

因为我们需要解决的问题是CPU高速执行能力和IO设备的龟速严重不匹配,多线程和多过程只是解决这个问题的一种方法。

另一种解决IO问题的方法是异步IO。当代码需要执行一个耗时的IO操作时,它只发出IO指令,不等待IO然后执行其他代码。一段时间后,当IO返回结果时,再通知CPU进行处理。

消息模型 事实上,早在桌面应用程序中。 GUI 程序的主线程负责不断阅读和处理信息。所有键盘、鼠标和其他信息都被发送到GUI程序的消息队列中,然后由GUI主线程处理程序。

由于GUI 键盘、鼠标等新闻的线程处理速度非常快,用户感觉不到延迟。某些时候,GUI线程在消息处理过程中遇到问题,导致消息处理时间过长。此时,用户会感觉到整个过程GUI程序停止响应,键盘和鼠标没有响应。这表明,在消息模型中,处理消息必须非常快,否则主线程将无法及时处理消息队列中的其他消息,导致程序似乎停止响应。

消息模型 是 如何解决 同步IO 必须等待IO操作这一问题的呢 ?

在消息处理过程中,遇到 IO 操作时,代码只负责发出IO请求,不等待IO结果,本轮消息处理直接结束,进入下一轮消息处理过程。IO操作完成后,将收到一条IO完成消息,处理消息可以直接获取IO操作结果。

在 “发出IO请求” 到收到 “IO完成” 同步IO在模型下,主线程只能悬挂,但异步IO在模型下,主线程不休息,而是继续处理其他消息。这样,在异步IO在模型下,一个线程可以同时处理多个线程IO请求,并且没有切换线程的操作。对于大多数IO使用异步密集型应用程序IO将大大提高系统的多任务处理能力。

协程 (Coroutines)

在学习异步IO在模型之前,让我们先了解协程,协程 又称 微线程,纤程,英文名 Coroutine

子程序(又叫函数 ) 协程

  • 子程序在 所有语言都是层次调用。比如: A 调用 B,B 在执行过程中再次调用 C,C 执行后返回,B 执行后返回,最后是 A 完成执行 子程序 即 函数 调用是通过堆栈实现的,线程是执行子程序。子程序调用总是一个入口,一次返回,调用顺序明确。
  • 协程的调用 和 子程序 不同。协程 看上去也是 子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

注意,在一个 中断子程序,执行其他子程序,不是函数调用,有点类似CPU中断。例如:子程序 A 和 B :

def A():     print('1')     print('2')     print('3')   def B():     print('x')     print('y')     print('z')

假设由协议执行,在执行中 A 在执行过程中,可以随时中断 B,B 也可能在执行过程中中断执行 A,结果可能是:

1 2 x y 3 z

但是在 A 中间没有调用 B 的,所以 协程的调用 比 函数调用 更难理解。

看起来 A、B 执行有点像多线程,但是 协程 其特点是线程执行。

协程 和 协程多线程比有什么优势?

  • 1. 最大的优点是协程执行效率高。 子程序 切换不是线程切换,而是由程序本身控制,因此,与多线程相比,线程数量越多,协程的性能优势就越明显。
  • 2. 第二个优点是不需要多线程的定机制,因为只有一个线程,不存在同时编写变量冲突。控制共享资源不锁定协议,只需要判断状态,因此执行效率远高于多线程。

因为协程是线程执行,如何使用多核?CPU呢?

最简单的方法是 多进程 协程,既充分利用多核,又充分发挥协程的高效率,能获得极高的性能。

Python 对 协程 的支持 是通过 generator (生成器)实现

在 generator 我们不仅可以通过 for 循环迭代也可以不断调用 next() 函数获取由 yield 句子返回的下一个值。

但是 Python 的 yield 不但可以返一个值,它还可以接收调用者发出的参数。

来看例子:

传统的 生产者-消费者 模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。如果改用协程,生产者生产消息后,直接通过 yield 跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高:

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author      : 
# @File        : text.py
# @Software    : PyCharm
# @description : XXX


def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'


def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()


c = consumer()
produce(c)

执行结果:

[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK

注意到 consumer函数 是一个 generator,把一个 consumer 传入 produce 后:

  1. 首先调用 c.send(None) 启动生成器;
  2. 然后,一旦生产了东西,通过 c.send(n) 切换到 consumer 执行;
  3. consumer 通过 yield拿到消息,处理,又通过yield把结果传回;

  4. produce 拿到 consumer 处理的结果,继续生产下一条消息;

  5. produce 决定不生产了,通过 c.close() 关闭 consumer,整个过程结束。

整个流程无锁,由一个线程执行,produce 和 consumer 协作完成任务,所以称为 “协程”,而非线程的抢占式多任务。

最后套用 Donald Knuth 的一句话总结协程的特点:“子程序就是协程的一种特例。”

参考源码:https://github.com/michaelliao/learn-python3/blob/master/samples/async/coroutine.py

在 Python 中,异步函数  通常 被称作  协程

创建一个协程仅仅只需使用 async 关键字,或者使用 @asyncio.coroutine 装饰器。下面的任一代码,都可以作为协程工作,形式上也是等同的:

import asyncio

# 方式 1
async def ping_server(ip):
        pass


# 方式 2
@asyncio.coroutine
def load_file(path):
      pass

上面这两个 特殊的函数,在调用时会返回协程对象。熟悉 JavaScript 中 Promise 的同学,可以把这个返回对象当作跟 Promise 差不多。调用他们中的任意一个,实际上并未立即运行,而是返回一个协程对象,然后将其传递到 Eventloop 中,之后再执行。

  • 如何判断一个 函数是不是协程 ?   asyncio 提供了 asyncio.iscoroutinefunction(func) 方法。
  • 如何判断一个 函数返回的是不是协程对象 ?  可以使用 asyncio.iscoroutine(obj) 。

用 asyncio 提供的 @asyncio.coroutine 可以把一个 generator 标记为 coroutine 类型,然后在 coroutine 内部用 yield from 调用另一个 coroutine 实现异步操作。

Python 3.5 开始引入了新的语法 async await

为了简化并更好地标识异步 IO,从 Python 3.5 开始引入了新的语法 async await,可以让 coroutine 的代码更简洁易读。

 async / await 是 python3.5 的新语法,需使用 Python3.5 版本 或 以上才能正确运行。

注意:async 和 await 是针对 coroutine 的新语法,要使用新的语法,只需要做两步简单的替换:

  • 把 @asyncio.coroutine 替换为 async 
  • 把 yield from 替换为 await

 Python 3.5 以前 版本原来老的语法使用 协程

import asyncio


@asyncio.coroutine
def hello():
    print("Hello world!")
    r = yield from asyncio.sleep(1)
    print("Hello again!")

Python 3.5 以后 用新语法重新编写如下:

import asyncio


async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")

在过去几年内,异步编程由于某些好的原因得到了充分的重视。虽然它比线性编程难一点,但是效率相对来说也是更高。

比如,利用 Python 的 异步协程 (async coroutine) ,在提交 HTTP 请求后,就没必要等待请求完成再进一步操作,而是可以一边等着请求完成,一边做着其他工作。这可能在逻辑上需要多些思考来保证程序正确运行,但是好处是可以利用更少的资源做更多的事。

即便逻辑上需要多些思考,但实际上在 Python 语言中,异步编程的语法和执行并不难。跟 Javascript 不一样,现在 Python 的异步协程已经执行得相当好了。

对于服务端编程,异步性似乎是 Node.js 流行的一大原因。我们写的很多代码,特别是那些诸如网站之类的高 I/O 应用,都依赖于外部资源。这可以是任何资源,包括从远程数据库调用到 POST 一个 REST 请求。一旦你请求这些资源的任一一个,你的代码在等待资源响应时便无事可做 (译者注:如果没有异步编程的话)。

有了异步编程,在等待这些资源响应的过程中,你的代码便可以去处理其他的任务。

Python async / await 手册

Python 部落:Python async/await 手册:https://python.freelycode.com/contribution/detail/57

知乎:从 0 到 1,Python 异步编程的演进之路( 通过爬虫演示进化之路 )https://zhuanlan.zhihu.com/p/25228075

async / await 的使用

async 用来声明一个函数是协程然后使用 await 调用这个协程, await 必须在函数内部,这个函数通常也被声明为另一个协程await 的目的是等待协程控制流的返回yield 的目的 是 暂停并挂起函数的操作。

正常的函数在执行时是不会中断的,所以你要写一个能够中断的函数,就需要添加 async 关键。

  • async 用来声明一个函数为异步函数异步函数的特点是能在函数执行过程中挂起,去执行其他异步函数,等到挂起条件(假设挂起条件是sleep(5))消失后,也就是5秒到了再回来执行。
  • await 可以将耗时等待的操作挂起,让出控制权await 语法来挂起自身的协程 )比如:异步程序执行到某一步时需要等待的时间很长,就将此挂起,去执行其他的异步程序。await 后面只能跟 异步程序 或 有 __await__ 属性 的 对象因为异步程序与一般程序不同

假设有两个异步函数 async a,async b,a 中的某一步有 await,当程序碰到关键字 await b() 后,异步程序挂起后去执行另一个异步b程序,就是从函数内部跳出去执行其他函数,当挂起条件消失后,不管b是否执行完,要马上从b程序中跳出来,回到原程序执行原来的操作。如果 await 后面跟的 b 函数不是异步函数,那么操作就只能等 b 执行完再返回,无法在 b 执行的过程中返回。如果要在 b 执行完才返回,也就不需要用 await 关键字了,直接调用 b 函数就行。所以这就需要 await 后面跟的是 异步函数了。在一个异步函数中,可以不止一次挂起,也就是可以用多个 await 。

看下 Python 中常见的几种函数形式:

# 1. 普通函数
def function():
    return 1
    
# 2. 生成器函数
def generator():
    yield 1

# 在3.5过后,我们可以使用async修饰将普通函数和生成器函数包装成异步函数和异步生成器。

# 3. 异步函数(协程)
async def async_function():
    return 1

# 4. 异步生成器
async def async_generator():
    yield 1

通过类型判断可以验证函数的类型

import types


# 1. 普通函数
def function():
    return 1
    
# 2. 生成器函数
def generator():
    yield 1

# 在3.5过后,我们可以使用async修饰将普通函数和生成器函数包装成异步函数和异步生成器。

# 3. 异步函数(协程)
async def async_function():
    return 1

# 4. 异步生成器
async def async_generator():
    yield 1


print(type(function) is types.FunctionType)
print(type(generator()) is types.GeneratorType)
print(type(async_function()) is types.CoroutineType)
print(type(async_generator()) is types.AsyncGeneratorType)

直接调用异步函数不会返回结果,而是返回一个coroutine对象:

print(async_function())
# 

协程 需要通过其他方式来驱动,因此可以使用这个协程对象的 send 方法给协程发送一个值:

print(async_function().send(None))

不幸的是,如果通过上面的调用会抛出一个异常:StopIteration: 1

因为 生成器 / 协程 在正常返回退出时会抛出一个 StopIteration 异常,而原来的返回值会存放在 StopIteration 对象的 value 属性中,通过以下捕获可以获取协程真正的返回值: 

try:
    async_function().send(None)
except StopIteration as e:
    print(e.value)
# 1

通过上面的方式来新建一个 run 函数来驱动协程函数,在协程函数中,可以通过 await 语法来挂起自身的协程,并等待另一个 协程 完成直到返回结果:

def run(coroutine):
    try:
        coroutine.send(None)
    except StopIteration as e:
        return 'run() : return {0}'.format(e.value)

async def async_function():
    return 1


async def await_coroutine():
    result = await async_function()
    print('await_coroutine() : print {0} '.format(result))

ret_val = run(await_coroutine())
print(ret_val)

要注意的是,await 语法只能出现在通过 async 修饰的函数中,否则会报 SyntaxError 错误。

而且 await 后面的对象需要是一个 Awaitable,或者实现了相关的协议。

查看 Awaitable 抽象类的代码,表明了只要一个类实现了__await__方法,那么通过它构造出来的实例就是一个 Awaitable:

class Awaitable(metaclass=ABCMeta):
    __slots__ = ()

    @abstractmethod
    def __await__(self):
        yield

    @classmethod
    def __subclasshook__(cls, C):
        if cls is Awaitable:
            return _check_methods(C, "__await__")
        return NotImplemented

而且可以看到,Coroutine类 也继承了 Awaitable,而且实现了 send,throw 和 close 方法。所以 await 一个调用异步函数返回的协程对象是合法的。

class Coroutine(Awaitable):
    __slots__ = ()

    @abstractmethod
    def send(self, value):
        ...

    @abstractmethod
    def throw(self, typ, val=None, tb=None):
        ...

    def close(self):
        ...
        
    @classmethod
    def __subclasshook__(cls, C):
        if cls is Coroutine:
            return _check_methods(C, '__await__', 'send', 'throw', 'close')
        return NotImplemented

接下来是异步生成器,来看一个例子:

假如我要到一家超市去购买土豆,而超市货架上的土豆数量是有限的:

class Potato:
    @classmethod
    def make(cls, num, *args, **kws):
        potatos = []
        for i in range(num):
            potatos.append(cls.__new__(cls, *args, **kws))
        return potatos

all_potatos = Potato.make(5)

现在我想要买50个土豆,每次从货架上拿走一个土豆放到篮子:

def take_potatos(num):
    count = 0
    while True:
        if len(all_potatos) == 0:
            sleep(.1)
        else:
            potato = all_potatos.pop()
            yield potato
            count += 1
            if count == num:
                break

def buy_potatos():
    bucket = []
    for p in take_potatos(50):
        bucket.append(p)

对应到代码中,就是迭代一个生成器的模型,显然,当货架上的土豆不够的时候,这时只能够死等,而且在上面例子中等多长时间都不会有结果(因为一切都是同步的),也许可以用多进程和多线程解决,而在现实生活中,更应该像是这样的:

import asyncio
import random


class Potato:
    @classmethod
    def make(cls, num, *args, **kws):
        potatos = []
        for i in range(num):
            potatos.append(cls.__new__(cls, *args, **kws))
        return potatos


all_potatos = Potato.make(5)


async def take_potatos(num):
    count = 0
    while True:
        if len(all_potatos) == 0:
            await ask_for_potato()
        potato = all_potatos.pop()
        yield potato
        count += 1
        if count == num:
            break


async def ask_for_potato():
    await asyncio.sleep(random.random())
    all_potatos.extend(Potato.make(random.randint(1, 10)))


async def buy_potatos():
    bucket = []
    async for p in take_potatos(50):
        bucket.append(p)
        print(f'Got potato {id(p)}...')


def main():
    loop = asyncio.get_event_loop()
    res = loop.run_until_complete(buy_potatos())
    loop.close()


if __name__ == '__main__':
    main()

当货架上的土豆没有了之后,可以询问超市请求需要更多的土豆,这时候需要等待一段时间直到生产者完成生产的过程。

当生产者完成和返回之后,这是便能从 await 挂起的地方继续往下跑,完成消费的过程。而这整一个过程,就是一个异步生成器迭代的流程。

用 asyncio 运行这段代码,结果是这样的:

Got potato 4338641384...
Got potato 4338641160...
Got potato 4338614736...
Got potato 4338614680...
Got potato 4338614568...
Got potato 4344861864...
Got potato 4344843456...
Got potato 4344843400...
Got potato 4338641384...
Got potato 4338641160...
...

既然是异步的,在请求之后不一定要死等,而是可以做其他事情。比如除了土豆,我还想买番茄,这时只需要在事件循环中再添加一个过程:

def main():
    import asyncio
    loop = asyncio.get_event_loop()
    res = loop.run_until_complete(asyncio.wait([buy_potatos(), buy_tomatos()]))
    loop.close()

再来运行这段代码:

Got potato 4423119312...
Got tomato 4423119368...
Got potato 4429291024...
Got potato 4421640768...
Got tomato 4429331704...
Got tomato 4429331760...
Got tomato 4423119368...
Got potato 4429331760...
Got potato 4429331704...
Got potato 4429346688...
Got potato 4429346072...
Got tomato 4429347360...
...

看下 AsyncGenerator 的定义,它需要实现 __aiter__ 和 __anext__ 两个核心方法,以及 asend,athrow,aclose 方法。

class AsyncGenerator(AsyncIterator):
    __slots__ = ()

    async def __anext__(self):
        ...

    @abstractmethod
    async def asend(self, value):
        ...

    @abstractmethod
    async def athrow(self, typ, val=None, tb=None):
        ...

    async def aclose(self):
        ...

    @classmethod
    def __subclasshook__(cls, C):
        if cls is AsyncGenerator:
            return _check_methods(C, '__aiter__', '__anext__',
                                  'asend', 'athrow', 'aclose')
        return NotImplemented

异步生成器是在 3.6 之后才有的特性,同样的还有异步推导表达式,因此在上面的例子中,也可以写成这样:

bucket = [p async for p in take_potatos(50)]

类似的,还有 await 表达式:

result = [await fun() for fun in funcs if await condition()]

除了函数之外,类实例的普通方法也能用 async 语法修饰:

class ThreeTwoOne:
    async def begin(self):
        print(3)
        await asyncio.sleep(1)
        print(2)
        await asyncio.sleep(1)
        print(1)        
        await asyncio.sleep(1)
        return

async def game():
    t = ThreeTwoOne()
    await t.begin()
    print('start')

实例方法的调用同样是返回一个 coroutine:

function = ThreeTwoOne.begin
method = function.__get__(ThreeTwoOne, ThreeTwoOne())
import inspect
assert inspect.isfunction(function)
assert inspect.ismethod(method)
assert inspect.iscoroutine(method())

同理 还有类方法:

class ThreeTwoOne:
    @classmethod
    async def begin(cls):
        print(3)
        await asyncio.sleep(1)
        print(2)
        await asyncio.sleep(1)
        print(1)        
        await asyncio.sleep(1)
        return

async def game():
    await ThreeTwoOne.begin()
    print('start')

根据PEP 492中,async 也可以应用到 上下文管理器中,__aenter__ 和 __aexit__ 需要返回一个 Awaitable:

class GameContext:
    async def __aenter__(self):
        print('game loading...')
        await asyncio.sleep(1)

    async def __aexit__(self, exc_type, exc, tb):
        print('game exit...')
        await asyncio.sleep(1)

async def game():
    async with GameContext():
        print('game start...')
        await asyncio.sleep(2)

在3.7版本,contextlib 中会新增一个 asynccontextmanager 装饰器来包装一个实现异步协议的上下文管理器:

from contextlib import asynccontextmanager

@asynccontextmanager
async def get_connection():
    conn = await acquire_db_connection()
    try:
        yield
    finally:
        await release_db_connection(conn)

async 修饰符也能用在 __call__ 方法上:

class GameContext:
    async def __aenter__(self):
        self._started = time()
        print('game loading...')
        await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc, tb):
        print('game exit...')
        await asyncio.sleep(1)

    async def __call__(self, *args, **kws):
        if args[0] == 'time':
            return time() - self._started

async def game():
    async with GameContext() as ctx:
        print('game start...')
        await asyncio.sleep(2)
        print('game time: ', await ctx('time'))

asyncio

asyncio 是 Python 3.4 版本引入的标准库,直接内置了对 异步 IO 的支持。

asyncio 官方只实现了比较底层的协议,比如TCP,UDP。所以诸如 HTTP 协议之类都需要借助第三方库,比如 aiohttp 。

虽然异步编程的生态不够同步编程的生态那么强大,但是如果有高并发的需求不妨试试,下面说一下比较成熟的异步库

aiohttp:异步 http client/server框架。github地址: https://github.com/aio-libs/aiohttp
sanic:速度更快的类 flask web框架。github地址:https://github.com/channelcat/sanic
uvloop 快速,内嵌于 asyncio 事件循环的库,使用 cython 基于 libuv 实现。github地址: https://github.com/MagicStack/uvloop

asyncio 的编程模型就是一个 消息循环我们从 asyncio 模块中直接获取一个 EventLoop 的引用,然后把需要执行的协程扔到 EventLoop 中执行,就实现了 异步IO

python 用asyncio 模块实现异步编程,该模块最大特点就是,只存在一个线程

由于只有一个线程,就不可能多个任务同时运行。asyncio 是 "多任务合作" 模式(cooperative multitasking),允许异步任务交出执行权给其他任务,等到其他任务完成,再收回执行权继续往下执行

asyncio 模块在单线程上启动一个事件循环(event loop),时刻监听新进入循环的事件,加以处理,并不断重复这个过程,直到异步任务结束。

什么是事件循环?

单线程就意味着所有的任务需要在单线程上排队执行,也就是前一个任务没有执行完成,后一个任务就没有办法执行。在CPU密集型的任务之中,这样其实还行,但是如果我们的任务都是IO密集型的呢?也就是我们大部分的任务都是在等待网络的数据返回,等待磁盘文件的数据,这就会造成CPU一直在等待这些任务的完成再去执行下一个任务。

有没有什么办法能够让单线程的任务执行不这么笨呢?其实我们可以将这些需要等待IO设备的任务挂在一边嘛!这时候,如果我们的任务都是需要等待的任务,那么单线程在执行时遇到一个就把它挂起来,这里可以通过一个数据结构(例如队列)将这些处于执行等待状态的任务放进去,为什么是执行等待状态呢?因为它们正在执行但是又不得不等待例如网络数据的返回等等。直到将所有的任务都放进去之后,单线程就可以开始它的接连不断的表演了:有没有任务完成的小伙伴呀!快来我这里执行!

此时如果有某个任务完成了,它会得到结果,于是发出一个信号:我完成了。那边还在循环追问的单线程终于得到了答复,就会去看看这个任务有没有绑定什么回调函数呀?如果绑定了回调函数就进去把回调函数给执行了,如果没有,就将它所在的任务恢复执行,并将结果返回。

asyncio 就是一个 协程库

  • (1)事件循环 (event loop)。事件循环需要实现两个功能,一是顺序执行协程代码;二是完成协程的调度,即一个协程“暂停”时,决定接下来执行哪个协程。
  • (2)协程上下文的切换。基本上Python 生成器的 yeild 已经能完成切换,Python3中还有特定语法支持协程切换。

Python 的异步IO:API

官方文档:https://docs.python.org/zh-cn/3/library/asyncio.html

Python 的 asyncio 是使用 async/await 语法编写并发代码的标准库。Python3.7 这个版本,asyncio又做了比较大的调整,把这个库的 API 分为了 高层级API低层级API,并引入asyncio.run() 这样的高级方法,让编写异步程序更加简洁。

这里先从全局认识 Python 这个异步IO库。

asyncio 的 高层级 API 主要提高如下几个方面:

  • 并发地运行Python协程并完全控制其执行过程;
  • 执行网络IO和IPC;
  • 控制子进程;
  • 通过队列实现分布式任务;
  • 同步并发代码。

asyncio 的 低层级API 用以支持开发异步库和框架:

  • 创建和管理事件循环(event loop),提供异步的API用于网络,运行子进程,处理操作系统信号等;
  • 通过 transports 实现高效率协议;
  • 通过 async/await  语法桥架基于回调的库和代码。

asyncio 高级 API

高层级API让我们更方便的编写基于asyncio的应用程序。这些API包括:

(1)协程和任务

协程通过 async/await 语法进行声明,是编写异步应用的推荐方式。历史的 @asyncio.coroutine 和 yield from 已经被弃用,并计划在Python 3.10中移除。协程可以通过 asyncio.run(coro, *, debug=False) 函数运行,该函数负责管理事件循环并完结异步生成器。它应该被用作asyncio程序的主入口点,相当于main函数,应该只被调用一次。

任务被用于并发调度协程,可用于网络爬虫的并发。使用 asyncio.create_task() 就可以把一个协程打包为一个任务,该协程会自动安排为很快运行。

协程,任务和Future都是可等待对象。其中,Future是低层级的可等待对象,表示一个异步操作的最终结果。

(2)流

流是用于网络连接的高层级的使用 async/await的原语。流允许在不使用回调或低层级协议和传输的情况下发送和接收数据。异步读写TCP有客户端函数 asyncio.open_connection() 和 服务端函数 asyncio.start_server() 。它还支持 Unix Sockets: asyncio.open_unix_connection() 和 asyncio.start_unix_server()

(3)同步原语

asyncio同步原语的设计类似于threading模块的原语,有两个重要的注意事项:
asyncio原语不是线程安全的,因此它们不应该用于OS线程同步(而是用threading)
这些同步原语的方法不接受超时参数; 使用asyncio.wait_for()函数执行超时操作。
asyncio具有以下基本同步原语:

  • Lock
  • Event
  • Condition
  • Semaphore
  • BoundedSemaphore

(4)子进程

asyncio提供了通过 async/await 创建和管理子进程的API。不同于Python标准库的subprocess,asyncio的子进程函数都是异步的,并且提供了多种工具来处理这些函数,这就很容易并行执行和监视多个子进程。创建子进程的方法主要有两个:

coroutine asyncio.create_subprocess_exec()coroutine asyncio.create_subprocess_shell()

(5)队列

asyncio 队列的设计类似于标准模块queue的类。虽然asyncio队列不是线程安全的,但它们被设计为专门用于 async/await 代码。需要注意的是,asyncio队列的方法没有超时参数,使用 asyncio.wait_for()函数进行超时的队列操作。
因为和标注模块queue的类设计相似,使用起来跟queue无太多差异,只需要在对应的函数前面加 await 即可。asyncio 队列提供了三种不同的队列:

  • class asyncio.Queue 先进先出队列
  • class asyncio.PriorityQueue 优先队列
  • class asyncio.LifoQueue 后进先出队列

(6)异常

asyncio提供了几种异常,它们是:

  • TimeoutError,
  • CancelledError,
  • InvalidStateError,
  • SendfileNotAvailableError
  • IncompleteReadError
  • LimitOverrunError

asyncio低级API

低层级API为编写基于asyncio的库和框架提供支持,有意编写异步库和框架的大牛们需要熟悉这些低层级API。主要包括:

(1)事件循环

事件循环是每个asyncio应用程序的核心。 事件循环运行异步任务和回调,执行网络IO操作以及运行子进程。

应用程序开发人员通常应该使用高级asyncio函数,例如asyncio.run(),并且很少需要引用循环对象或调用其方法。

Python 3.7 新增了 asyncio.get_running_loop()函数。

(2)Futures

Future对象用于将基于低层级回调的代码与高层级的 async/await 代码进行桥接。
Future表示异步操作的最终结果。 不是线程安全的。
Future是一个可等待对象。 协程可以等待Future对象,直到它们有结果或异常集,或者直到它们被取消。
通常,Futures用于启用基于低层级回调的代码(例如,在使用asyncio传输实现的协议中)以与高层级 async/await 代码进行互操作。

(3)传输和协议(Transports和Protocols)

Transport 和 Protocol由低层级事件循环使用,比如函数loop.create_connection()。它们使用基于回调的编程风格,并支持网络或IPC协议(如HTTP)的高性能实现。

在最高级别,传输涉及字节的传输方式,而协议确定要传输哪些字节(在某种程度上何时传输)。

换种方式说就是:传输是套接字(或类似的I/O端点)的抽象,而协议是从传输的角度来看的应用程序的抽象。

另一种观点是传输和协议接口共同定义了一个使用网络I/O和进程间I/O的抽象接口。

传输和协议对象之间始终存在1:1的关系:协议调用传输方法来发送数据,而传输调用协议方法来传递已接收的数据。

大多数面向连接的事件循环方法(例如loop.create_connection())通常接受protocol_factory参数,该参数用于为接受的连接创建Protocol对象,由Transport对象表示。 这些方法通常返回(传输,协议)元组。

(4)策略(Policy)

事件循环策略是一个全局的按进程划分的对象,用于控制事件循环的管理。 每个事件循环都有一个默认策略,可以使用策略API对其进行更改和自定义

策略定义了上下文的概念,并根据上下文管理单独的事件循环。 默认策略将上下文定义为当前线程。

通过使用自定义事件循环策略,可以自定义get_event_loop()set_event_loop()new_event_loop()函数的行为。

(5)平台支持

asyncio模块设计为可移植的,但由于平台的底层架构和功能,某些平台存在细微的差异和限制。在Windows平台,有些是不支持的,比如 loop.create_unix_connection() and loop.create_unix_server()。而Linux和比较新的macOS全部支持。

总结

Python 3.7 通过对 asyncio 分组使得它的架构更加清晰,普通写异步IO的应用程序只需熟悉高层级API,需要写异步IO的库和框架时才需要理解低层级的API。

生产者 --- 消费者

Python 分布与并行 asyncio实现生产者消费者模型:https://blog.csdn.net/weixin_43594279/article/details/111243453

示例 1:

# coding=utf-8

import asyncio


async def consumer(n, q):
    print('consumer {}: starting'.format(n))
    while True:
        print('consumer {}: waiting for item'.format(n))
        item = await q.get()
        print('consumer {}: has item {}'.format(n, item))
        if item is None:
            # None is the signal to stop.
            q.task_done()
            break
        else:
            await asyncio.sleep(0.01 * item)
            q.task_done()
    print('consumer {}: ending'.format(n))


async def producer(q, num_workers):
    print('producer: starting')
    # Add some numbers to the queue to simulate jobs
    for i in range(num_workers * 3):
        await q.put(i)
        print('producer: added task {} to the queue'.format(i))
    # Add None entries in the queue
    # to signal the consumers to exit
    print('producer: adding stop signals to the queue')
    for i in range(num_workers):
        await q.put(None)
    print('producer: waiting for queue to empty')
    await q.join()
    print('producer: ending')


async def main(num_consumers=1):
    q = asyncio.Queue(maxsize=num_consumers)

    consumer_list = [
        # asyncio.create_task(consumer(i, q)) for i in range(num_consumers)
        asyncio.ensure_future(consumer(i, q)) for i in range(num_consumers)
    ]

    # produce_list = [asyncio.create_task(producer(q, num_consumers))]
    produce_list = [asyncio.ensure_future(producer(q, num_consumers))]

    task_list = consumer_list + produce_list
    for item in task_list:
        await item


if __name__ == '__main__':
    asyncio.run(main(num_consumers=3))
    pass

示例 2:

Python 的异步IO编程例子

以 Python 3.7 上的 asyncio 为例讲解如何使用 Python 的异步 IO。

创建第一个协程

Python 3.7 推荐使用 async/await 语法来声明协程,来编写异步应用程序。我们来创建第一个协程函数:首先打印一行“你好”,等待1秒钟后再打印 "大家同好"。

import asyncio


async def say_hi():
    print('你好')
    await asyncio.sleep(1)
    print('大家同好')

asyncio.run(say_hi())

"""
你好
大家同好
"""

say_hi() 函数通过 async 声明为协程函数,较之前的修饰器声明更简洁明了。

在实践过程中,什么功能的函数要用 async 声明为协程函数呢?就是那些能发挥异步IO性能的函数,比如读写文件、读写网络、读写数据库,这些都是浪费时间的IO操作,把它们协程化、异步化从而提高程序的整体效率(速度)。

say_hi() 函数是通过 asyncio.run()来运行的,而不是直接调用这个函数(协程)。因为,直接调用并不会把它加入调度日程,而只是简单的返回一个协程对象:

print(say_hi())  # 

真正运行一个协程

那么,如何真正运行一个协程呢?

asyncio 提供了三种机制:

  • (1)asyncio.run() 函数,这是异步程序的主入口,相当于C语言中的main函数。
  • (2)用 await 等待协程,比如上例中的 await asyncio.sleep(1) 。

再看下面的例子,我们定义了协程 say_delay() ,在 main() 协程中调用两次,第一次延迟1秒后打印“你好”,第二次延迟2秒后打印 "大家同好"。这样我们通过 await 运行了两个协程。

import asyncio
import datetime


async def say_delay(msg=None, delay=None):
    await asyncio.sleep(delay)
    print(msg)


async def main():
    print(f'begin at {datetime.datetime.now().replace(microsecond=0)}')
    await say_delay('你好', 2)
    await say_delay('大家同好', 1)
    print(f'end at {datetime.datetime.now().replace(microsecond=0)}')

asyncio.run(main())

'''
begin at 2020-12-19 00:55:01
你好
大家同好
end at 2020-12-19 00:55:04
'''

从起止时间可以看出,两个协程是顺序执行的,总共耗时1+2=3秒。

  • (3)通过 asyncio.create_task() 函数并发运行作为 asyncio 任务(Task) 的多个协程。下面,我们用 create_task() 来修改上面的 main() 协程,从而让两个 say_delay() 协程并发运行:
import asyncio
import datetime


async def say_delay(msg=None, delay=None):
    await asyncio.sleep(delay)
    print(msg)


async def main():
    task_1 = asyncio.create_task(say_delay('你好', 2))
    task_2 = asyncio.create_task(say_delay('大家同好', 1))
    print(f'begin at {datetime.datetime.now().replace(microsecond=0)}')
    await task_1
    await task_2
    print(f'end at {datetime.datetime.now().replace(microsecond=0)}')

asyncio.run(main())

'''
begin at 2020-12-19 00:58:20
大家同好
你好
end at 2020-12-19 00:58:22
'''

从运行结果的起止时间可以看出,两个协程是并发执行的了,总耗时等于最大耗时2秒。

asyncio.create_task() 是一个很有用的函数,在爬虫中它可以帮助我们实现大量并发去下载网页。在 Python 3.6中与它对应的是 ensure_future()

生产者 --- 消费者

示例 代码:

# coding=utf-8

import asyncio


async def consumer(n, q):
    print('consumer {}: starting'.format(n))
    while True:
        print('consumer {}: waiting for item'.format(n))
        item = await q.get()
        print('consumer {}: has item {}'.format(n, item))
        if item is None:
            # None is the signal to stop.
            q.task_done()
            break
        else:
            await asyncio.sleep(0.01 * item)
            q.task_done()
    print('consumer {}: ending'.format(n))


async def producer(q, num_workers):
    print('producer: starting')
    # Add some numbers to the queue to simulate jobs
    for i in range(num_workers * 3):
        await q.put(i)
        print('producer: added task {} to the queue'.format(i))
    # Add None entries in the queue
    # to signal the consumers to exit
    print('producer: adding stop signals to the queue')
    for i in range(num_workers):
        await q.put(None)
    print('producer: waiting for queue to empty')
    await q.join()
    print('producer: ending')


async def main(num_consumers=1):
    q = asyncio.Queue(maxsize=num_consumers)

    consumer_list = [
        asyncio.create_task(consumer(i, q)) for i in range(num_consumers)
    ]

    produce_list = [asyncio.create_task(producer(q, num_consumers))]

    task_list = consumer_list + produce_list
    for item in task_list:
        await item


if __name__ == '__main__':
    asyncio.run(main(num_consumers=3))
    pass

可等待对象(awaitables)

可等待对象,就是可以在 await 表达式中使用的对象,前面我们已经接触了两种可等待对象的类型:协程任务,还有一个是低层级的 Future

asyncio 模块的许多 API 都需要传入可等待对象,比如 run(), create_task() 等等。

(1)协程

协程是可等待对象,可以在其它协程中被等待。协程两个紧密相关的概念是:

  • 协程函数:通过 async def 定义的函数;
  • 协程对象:调用协程函数返回的对象。

运行上面这段程序,结果为:

co is 
now is 1548512708.2026224
now is 1548512708.202648

可以看到,直接运行协程函数 whattime()得到的co是一个协程对象,因为协程对象是可等待的,所以通过 await 得到真正的当前时间。now2是直接await 协程函数,也得到了当前时间的返回值。

(2)任务

前面我们讲到,任务是用来调度协程的,以便并发执行协程。当一个协程通过 asyncio.create_task() 被打包为一个 任务,该协程将自动加入调度队列中,但是还未执行

create_task() 的基本使用前面例子已经讲过。它返回的 task 通过 await 来等待其运行完。如果,我们不等待,会发生什么?“准备立即运行”又该如何理解呢?先看看下面这个例子:

运行这段代码的情况是这样的:首先,1秒钟后打印一行,这是第13,14行代码运行的结果:

calling:0, now is 09:15:15

接着,停顿1秒后,连续打印4行:

calling:1, now is 09:15:16
calling:2, now is 09:15:16
calling:3, now is 09:15:16
calling:4, now is 09:15:16

从这个结果看,asyncio.create_task()产生的4个任务,我们并没有 await,它们也执行了。关键在于第18行的 await,如果把这一行去掉或是 sleep 的时间小于1秒(比whattime()里面的sleep时间少即可),就会只看到第一行的输出结果而看不到后面四行的输出。这是因为,main() 不 sleep 或 sleep 少于1秒钟,main() 就在 whattime() 还未来得及打印结果(因为,它要sleep 1秒)就退出了,从而整个程序也退出了,就没有 whattime() 的输出结果。

再来理解一下 “准备立即执行” 这个说法。它的意思就是,create_task() 只是打包了协程并加入调度队列还未执行,并准备立即执行,什么时候执行呢?在 “主协程”(调用create_task()的协程)挂起的时候,这里的“挂起”有两个方式:

  • 一是,通过 await task 来执行这个任务;
  • 另一个是,主协程通过 await sleep 挂起,事件循环就去执行task了。

我们知道,asyncio 是通过事件循环实现异步的。在主协程 main()里面,没有遇到 await 时,事件就是执行 main() 函数,遇到 await 时,事件循环就去执行别的协程,即 create_task() 生成的 whattime()的4个任务,这些任务一开始就是 await sleep 1秒。这时候,主协程和4个任务协程都挂起了,CPU空闲,事件循环等待协程的消息。

如果 main() 协程只 sleep了 0.1秒,它就先醒了,给事件循环发消息,事件循环就来继续执行 main() 协程,而 main() 后面已经没有代码,就退出该协程,退出它也就意味着整个程序退出,4个任务就没机会打印结果;

如果 main()协程sleep时间多余1秒,那么4个任务先唤醒,就会得到全部的打印结果;

如果main()的18行sleep等于1秒时,和4个任务的sleep时间相同,也会得到全部打印结果。这是为什么呢?

我猜想是这样的:4个任务生成在前,第18行的sleep在后,事件循环的消息响应可能有个先进先出的顺序。后面深入asyncio的代码专门研究一下这个猜想正确与否。

示例:

# -*- coding: utf-8 -*-

"""
@File    : aio_test.py
@Author  : XXX
@Time    : 2020/12/25 23:54
"""

import asyncio
import datetime


async def hi(msg=None, sec=None):
    print(f'enter hi(), {msg} @{datetime.datetime.now().replace(microsecond=0)}')
    await asyncio.sleep(sec)
    print(f'leave hi(), {msg} @{datetime.datetime.now().replace(microsecond=0)}')
    return sec


async def main_1():
    print(f'main() begin at {datetime.datetime.now().replace(microsecond=0)}')
    tasks = []
    for i in range(1, 5):
        tsk = asyncio.create_task(hi(i, i))
        tasks.append(tsk)
    for tsk in tasks:
        ret_val = await tsk
        print(f'ret_val:{ret_val}')
    print(f'main() end at {datetime.datetime.now().replace(microsecond=0)}')


async def main_2():
    # *****  注意:main_2 中睡眠了2秒,导致睡眠时间大于2秒的协程没有执行完成 *****
    print(f'main() begin at {datetime.datetime.now().replace(microsecond=0)}')
    tasks = []
    for i in range(1, 5):
        tsk = asyncio.create_task(hi(i, i))
        tasks.append(tsk)
    await asyncio.sleep(2)
    print(f'main() end at {datetime.datetime.now().replace(microsecond=0)}')


async def main_3():
    # *****  注意:main_3方法并没有实现并发执行,只是顺序执行 *****
    print(f'main() begin at {datetime.datetime.now().replace(microsecond=0)}')
    tasks = []
    for i in range(1, 5):
        tsk = asyncio.create_task(hi(i, i))
        await tsk
    print(f'main() end at {datetime.datetime.now().replace(microsecond=0)}')


print('*' * 50)
asyncio.run(main_1())
print('*' * 50)
asyncio.run(main_2())
print('*' * 50)
asyncio.run(main_3())
print('*' * 50)

(3)Future

它是一个低层级的可等待对象,表示一个异步操作的最终结果。目前,我们写应用程序还用不到它,暂不学习。

asyncio异步IO协程总结

协程就是我们异步操作的片段。通常,写程序都会把全部功能分成很多不同功能的函数,目的是为了结构清晰;进一步,把那些涉及耗费时间的IO操作(读写文件、数据库、网络)的函数通过 async def 异步化,就是异步编程。

那些异步函数(协程函数)都是通过消息机制被事件循环管理调度着,整个程序的执行是单线程的,但是某个协程A进行IO时,事件循环就去执行其它协程非IO的代码。当事件循环收到协程A结束IO的消息时,就又回来执行协程A,这样事件循环不断在协程之间转换,充分利用了IO的闲置时间,从而并发的进行多个IO操作,这就是异步IO。

写异步IO程序时记住一个准则:需要IO的地方异步。其它地方即使用了协程函数也是没用的。

不 使用 asyncio 的 消息循环 让协程运行

先看下 不使用  asyncio 的消息循环 怎么 调用 协程,让协程 运行:

async def func_1():
    print("func_1 start")
    print("func_1 end")


async def func_2():
    print("func_2 start")
    print("func_2 a")
    print("func_2 b")
    print("func_2 c")
    print("func_2 end")


f_1 = func_1()
print(f_1)

f_2 = func_2()
print(f_2)


try:
    print('f_1.send')
    f_1.send(None)
except StopIteration as e:
    # 这里也是需要去捕获StopIteration方法
    pass

try:
    print('f_2.send')
    f_2.send(None)
except StopIteration as e:
    pass

运行结果:



f_1.send
func_1 start
func_1 end
f_2.send
func_2 start
func_2 a
func_2 b
func_2 c
func_2 end

示例代码2:

async def test(x):
    return x * 2

print(test(100))

try:
    # 既然是协程,我们像之前yield协程那样
    test(100).send(None)
except BaseException as e:
    print(type(e))
    ret_val = e.value
    print(ret_val)

示例代码3:

def simple_coroutine():
    print('-> start')
    x = yield
    print('-> recived', x)


sc = simple_coroutine()

next(sc)

try:
    sc.send('zhexiao')
except BaseException as e:
    print(e)

对上述例子的分析:yield 的右边没有表达式,所以这里默认产出的值是None。刚开始先调用了next(...)是因为这个时候生成器还没有启动,没有停在yield那里,这个时候也是无法通过send发送数据。所以当我们通过 next(...)激活协程后 ,程序就会运行到x = yield,这里有个问题我们需要注意, x = yield这个表达式的计算过程是先计算等号右边的内容,然后在进行赋值,所以当激活生成器后,程序会停在yield这里,但并没有给x赋值。当我们调用 send 方法后 yield 会收到这个值并赋值给 x,而当程序运行到协程定义体的末尾时和用生成器的时候一样会抛出StopIteration异常

如果协程没有通过 next(...) 激活(同样我们可以通过send(None)的方式激活),但是我们直接send,会提示如下错误:

最先调用 next(sc) 函数这一步通常称为“预激”(prime)协程 (即,让协程执行到第一个 yield 表达式,准备好作为活跃的协程使用)。

协程在运行过程中有四个状态:

  1. GEN_CREATE: 等待开始执行

  2. GEN_RUNNING: 解释器正在执行,这个状态一般看不到

  3. GEN_SUSPENDED: 在yield表达式处暂停

  4. GEN_CLOSED: 执行结束

通过下面例子来查看协程的状态:

示例代码4:(使用协程计算移动平均值)

def averager():
    total = 0.0
    count = 0
    avg = None

    while True:
        num = yield avg
        total += num
        count += 1
        avg = total / count


# run
ag = averager()
# 预激协程
print(next(ag))  # None

print(ag.send(10))  # 10
print(ag.send(20))  # 15

这里是一个死循环,只要不停 send 值 给 协程,可以一直计算下去。

解释:

  • 1. 调用 next(ag) 函数后,协程会向前执行到 yield 表达式,产出 average 变量的初始值 None。
  • 2. 此时,协程在 yield 表达式处暂停。
  • 3. 使用 send() 激活协程,把发送的值赋给 num,并计算出 avg 的值。
  • 4. 使用 print 打印出 yield 返回的数据。

单步 调试 上面程序。

使用 asyncio 的 消息循环 让协程运行

使用 asyncio 异步 IO 调用 协程

示例代码 1:

import asyncio


async def func_1():
    print("func_1 start")
    print("func_1 end")
    # await asyncio.sleep(1)


async def func_2():
    print("func_2 start")
    print("func_2 a")
    print("func_2 b")
    print("func_2 c")
    print("func_2 end")
    # await asyncio.sleep(1)


f_1 = func_1()
print(f_1)

f_2 = func_2()
print(f_2)


# 获取 EventLoop:
loop = asyncio.get_event_loop()
tasks = [func_1(), func_2()]

# 执行 coroutine
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

示例代码 2:

import asyncio
import time

start = time.time()


def tic():
    return 'at %1.1f seconds' % (time.time() - start)


async def gr1():
    # Busy waits for a second, but we don't want to stick around...
    print('gr1 started work: {}'.format(tic()))
    # 暂停两秒,但不阻塞时间循环,下同
    await asyncio.sleep(2)
    print('gr1 ended work: {}'.format(tic()))


async def gr2():
    # Busy waits for a second, but we don't want to stick around...
    print('gr2 started work: {}'.format(tic()))
    await asyncio.sleep(2)
    print('gr2 Ended work: {}'.format(tic()))


async def gr3():
    print("Let's do some stuff while the coroutines are blocked, {}".format(tic()))
    await asyncio.sleep(1)
    print("Done!")

# 事件循环
ioloop = asyncio.get_event_loop()

# tasks中也可以使用 asyncio.ensure_future(gr1())..
tasks = [
    ioloop.create_task(gr1()),
    ioloop.create_task(gr2()),
    ioloop.create_task(gr3())
]
ioloop.run_until_complete(asyncio.wait(tasks))
ioloop.close()


"""
结果:
gr1 started work: at 0.0 seconds
gr2 started work: at 0.0 seconds
Let's do some stuff while the coroutines are blocked, at 0.0 seconds
Done!
gr2 Ended work: at 2.0 seconds
gr1 ended work: at 2.0 seconds
"""

多个 coroutine 可以封装成一组 Task 然后并发执行。

  • asyncio.wait(...) 协程的参数是一个由 future 或 协程 构成的可迭代对象;wait 会分别
    把各个协程包装进一个 Task 对象。最终的结果是,wait 处理的所有对象都通过某种方式变成 Future 类的实例。wait 是协程函数,因此返回的是一个协程或生成器对象。

  • ioloop.run_until_complete 方法的参数是一个 future 或 协程。如果是协程,run_until_complete方法与 wait 函数一样,把协程包装进一个 Task 对象中。

  • 在 asyncio 包中,future和协程关系紧密,因为可以使用 yield from 从 asyncio.Future 对象中产出结果。这意味着,如果 foo 是协程函数(调用后返回协程对象),抑或是返回Future 或 Task 实例的普通函数,那么可以这样写:res = yield from foo()。这是 asyncio 包的 API 中很多地方可以互换协程与期物的原因之一。 例如上面的例子中 tasks 可以改写成协程列表:tasks = [gr1(), gr(2), gr(3)]

详细的各个类说明,类方法,传参,以及方法返回的是什么类型都可以在官方文档上仔细研读,多读几遍,方有体会。

示例代码 3:

import asyncio
import time
import aiohttp
import async_timeout

msg = "http://www.nationalgeographic.com.cn/photography/photo_of_the_day/{}.html"
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6'
}

urls = [msg.format(i) for i in range(5400, 5500)]


async def fetch(sessio
锐单商城拥有海量元器件数据手册IC替代型号,打造电子元器件IC百科大全!

相关文章