Модуль asyncio

Синтаксис сопрограммы (coroutine) и делегирования (delegation)

Перед Python 3.5+ был отпущен, asyncio модуль используется генераторы для имитации асинхронных вызовов и , таким образом , был другой синтаксис , чем текущая версия Python 3.5.

В Python 3.5 введены ключевые слова async и await. Обратите внимание на отсутствие скобок вокруг вызова await func().

import asyncio

async def main():
    print(await func())

async def func():
    # Do time intensive stuff...
    return "Hello, world!"

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

До Python 3.5 для определения сопрограммы использовался декоратор @asyncio.coroutine. Выход из выражения был использован для делегирования генератора. Обратите внимание на круглые скобки вокруг yield from func().

import asyncio

@asyncio.coroutine
def main():
    print((yield from func()))

@asyncio.coroutine
def func():
    # Do time intensive stuff..
    return "Hello, world!"

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Вот пример, который показывает, как две функции могут выполняться асинхронно:

import asyncio

async def cor1():
    print("cor1 start")
    for i in range(10):
        await asyncio.sleep(1.5)
        print("cor1", i)

async def cor2():
    print("cor2 start")
    for i in range(15):
        await asyncio.sleep(1)
        print("cor2", i)

loop = asyncio.get_event_loop()
cors = asyncio.wait([cor1(), cor2()])
loop.run_until_complete(cors)

Асинхронные исполнители

Примечание. Использует асинхронный / ожидающий синтаксис Python 3.5+.

asyncio поддерживает использование Executor объектов , найденные в concurrent.futures для планирования задач асинхронна. Петли событий имеют функцию run_in_executor() , который принимает Executor объект, Callable и параметры отзывных в.

Планирование задачи для Executor

import asyncio
from concurrent.futures import ThreadPoolExecutor

def func(a, b):
    # Do time intensive stuff...
    return a + b

async def main(loop):
    executor = ThreadPoolExecutor()
    result = await loop.run_in_executor(executor, func, "Hello,", " world!")
    print(result)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop)) 

Каждый цикл событий также имеет « по умолчанию» Executor слот , который может быть назначен на Executor.Чтобы назначить Executor и планировать задачи из цикла вы используете set_default_executor() метод.

import asyncio
from concurrent.futures import ThreadPoolExecutor

def func(a, b):
    # Do time intensive stuff...
    return a + b

async def main(loop):
    # NOTE: Using `None` as the first parameter designates the `default` Executor.
    result = await loop.run_in_executor(None, func, "Hello,", " world!")
    print(result)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.set_default_executor(ThreadPoolExecutor())
    loop.run_until_complete(main(loop))

Есть два основных типа Executor в concurrent.futures , тем ThreadPoolExecutor и ProcessPoolExecutor.ThreadPoolExecutor содержит пул потоков , которые могут быть либо вручную для определенного числа потоков через конструктор по умолчанию или к количеству ядер на время машины 5. ThreadPoolExecutor использует пул потоков для выполнения задач , поставленных перед ней и обычно лучше при операциях с процессором, чем при операциях ввода-вывода.

Контраст, что к ProcessPoolExecutor , который порождает новый процесс для каждой задачи , возложенной на него. ProcessPoolExecutor может принимать только задачи и параметры, которые пригодны для консервирования. Наиболее распространенными задачами, которые нельзя отлавливать, являются методы объектов. Если вы должны запланировать метод объекта в качестве задачи в Executor необходимо использовать ThreadPoolExecutor .

Использование UVLoop

uvloop является реализация для asyncio.AbstractEventLoop на основе libuv (используется nodejs). Это соответствует 99% asyncio функций и гораздо быстрее , чем традиционное asyncio.EventLoop.uvloop в настоящее время не доступно на Windows, установить его с pip install uvloop .

import asyncio
import uvloop

if __name__ == "__main__":
    asyncio.set_event_loop(uvloop.new_event_loop())
    # Do your stuff here ... 

Можно также изменить завод цикла событий, установив EventLoopPolicy на один в uvloop .

import asyncio
import uvloop

if __name__ == "__main__":
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    loop = asyncio.new_event_loop() 

Примитив синхронизации: событие

Концепция

Используйте Event для синхронизации планирования нескольких сопрограмм.

Проще говоря, событие похоже на выстрел из пистолета в беговой гонке: оно позволяет бегунам покинуть стартовые блоки.

Пример

import asyncio

# event trigger function
def trigger(event):
    print('EVENT SET')
    event.set()  # wake up coroutines waiting

# event consumers
async def consumer_a(event):
    consumer_name = 'Consumer A'
    print('{} waiting'.format(consumer_name))
    await event.wait()
    print('{} triggered'.format(consumer_name))

async def consumer_b(event):
    consumer_name = 'Consumer B'
    print('{} waiting'.format(consumer_name))
    await event.wait()
    print('{} triggered'.format(consumer_name))

# event
event = asyncio.Event()

# wrap coroutines in one future
main_future = asyncio.wait([consumer_a(event),
                            consumer_b(event)])

# event loop
event_loop = asyncio.get_event_loop()
event_loop.call_later(0.1, functools.partial(trigger, event))  # trigger event in 0.1 sec

# complete main_future
done, pending = event_loop.run_until_complete(main_future) 

Выход:

Consumer B waiting
Consumer A waiting
EVENT SET
Consumer B triggered
Consumer A triggered

Простой WebSocket

Здесь мы делаем простое эхо WebSocket использования asyncio. Мы определяем сопрограммы для подключения к серверу и отправки / получения сообщений. В communcations по WebSocket выполняется в main сопрограммы, который выполняется с помощью цикла обработки событий. В этом примере изменяется от предыдущего поста .

import asyncio
import aiohttp

session = aiohttp.ClientSession()                          # handles the context manager
class EchoWebsocket:

    async def connect(self):
        self.websocket = await session.ws_connect("wss://echo.websocket.org")

    async def send(self, message):
        self.websocket.send_str(message)

    async def receive(self):
        result = (await self.websocket.receive())
        return result.data

async def main():
    echo = EchoWebsocket()
    await echo.connect()
    await echo.send("Hello World!")
    print(await echo.receive())                            # "Hello World!"


if __name__ == '__main__':
    # The main loop
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main()) 

Распространенное заблуждение об асинчо

вероятно, наиболее распространенное заблуждение о asnycio является то , что она позволяет запускать любую задачу параллельно - обходя GIL (глобальная блокировка интерпретатора) и , следовательно , выполнять блокирующие работу параллельно (в отдельных потоках). это не так!

asyncio (и библиотеки, которые строятся для совместной работы с asyncio ) строить сопрограмм: функции , которые (совместно) дают поток управления обратно в вызывающую функцию. обратите внимание asyncio.sleep в приведенных выше примерах. это пример неблокируемой сопрограммы , что ждет «в фоновом режиме» и дает управляющий поток обратно в вызывающую функцию (при вызове с await ). time.sleep является примером функции блокировки. поток выполнения программы будет просто останавливаться и возвращать только после того, как time.sleep закончил.

в режиме реального живой пример является requests библиотека , которая состоит (в настоящее время) на блокирование только функции. нет параллелизма , если вы звоните любой из ее функций в asyncio.aiohttp с другой стороны , был построен с asyncio в виду. его сопрограммы будут работать одновременно.

  • если вы затянувшийся CPU переплете задач , которые вы хотели бы работать параллельно asyncio не для вас. для этого вам нужно threads или multiprocessing .