Синтаксис сопрограммы (coroutine) и делегирования (delegation)
Перед Python 3.5+ был отпущен, asyncio
модуль используется генераторы для имитации асинхронных вызовов и , таким образом , был другой синтаксис , чем текущая версия Python 3.5.
В Python 3.5 введены ключевые слова async
и await
. Обратите внимание на отсутствие скобок вокруг вызова await func().
import asyncio
async def main():
print(await func())
async def func():
# Do time intensive stuff...
return "Hello, world!"
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
До Python 3.5 для определения сопрограммы использовался декоратор @asyncio.coroutine
. Выход из выражения был использован для делегирования генератора. Обратите внимание на круглые скобки вокруг yield from func()
.
import asyncio
@asyncio.coroutine
def main():
print((yield from func()))
@asyncio.coroutine
def func():
# Do time intensive stuff..
return "Hello, world!"
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Вот пример, который показывает, как две функции могут выполняться асинхронно:
import asyncio
async def cor1():
print("cor1 start")
for i in range(10):
await asyncio.sleep(1.5)
print("cor1", i)
async def cor2():
print("cor2 start")
for i in range(15):
await asyncio.sleep(1)
print("cor2", i)
loop = asyncio.get_event_loop()
cors = asyncio.wait([cor1(), cor2()])
loop.run_until_complete(cors)
Асинхронные исполнители
Примечание. Использует асинхронный / ожидающий синтаксис Python 3.5+.
asyncio
поддерживает использование Executor
объектов , найденные в concurrent.futures
для планирования задач асинхронна. Петли событий имеют функцию run_in_executor()
, который принимает Executor
объект, Callable
и параметры отзывных в.
Планирование задачи для Executor
import asyncio
from concurrent.futures import ThreadPoolExecutor
def func(a, b):
# Do time intensive stuff...
return a + b
async def main(loop):
executor = ThreadPoolExecutor()
result = await loop.run_in_executor(executor, func, "Hello,", " world!")
print(result)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
Каждый цикл событий также имеет « по умолчанию» Executor
слот , который может быть назначен на Executor
.Чтобы назначить Executor
и планировать задачи из цикла вы используете set_default_executor()
метод.
import asyncio
from concurrent.futures import ThreadPoolExecutor
def func(a, b):
# Do time intensive stuff...
return a + b
async def main(loop):
# NOTE: Using `None` as the first parameter designates the `default` Executor.
result = await loop.run_in_executor(None, func, "Hello,", " world!")
print(result)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.set_default_executor(ThreadPoolExecutor())
loop.run_until_complete(main(loop))
Есть два основных типа Executor
в concurrent.futures
, тем ThreadPoolExecutor
и ProcessPoolExecutor
.ThreadPoolExecutor
содержит пул потоков , которые могут быть либо вручную для определенного числа потоков через конструктор по умолчанию или к количеству ядер на время машины 5. ThreadPoolExecutor
использует пул потоков для выполнения задач , поставленных перед ней и обычно лучше при операциях с процессором, чем при операциях ввода-вывода.
Контраст, что к ProcessPoolExecutor
, который порождает новый процесс для каждой задачи , возложенной на него. ProcessPoolExecutor
может принимать только задачи и параметры, которые пригодны для консервирования. Наиболее распространенными задачами, которые нельзя отлавливать, являются методы объектов. Если вы должны запланировать метод объекта в качестве задачи в Executor
необходимо использовать ThreadPoolExecutor
.
Использование UVLoop
uvloop
является реализация для asyncio.AbstractEventLoop
на основе libuv (используется nodejs). Это соответствует 99% asyncio
функций и гораздо быстрее , чем традиционное asyncio.EventLoop
.uvloop
в настоящее время не доступно на Windows, установить его с pip install uvloop
.
import asyncio
import uvloop
if __name__ == "__main__":
asyncio.set_event_loop(uvloop.new_event_loop())
# Do your stuff here ...
Можно также изменить завод цикла событий, установив EventLoopPolicy
на один в uvloop
.
import asyncio
import uvloop
if __name__ == "__main__":
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.new_event_loop()
Примитив синхронизации: событие
Концепция
Используйте Event
для синхронизации планирования нескольких сопрограмм.
Проще говоря, событие похоже на выстрел из пистолета в беговой гонке: оно позволяет бегунам покинуть стартовые блоки.
Пример
import asyncio
# event trigger function
def trigger(event):
print('EVENT SET')
event.set() # wake up coroutines waiting
# event consumers
async def consumer_a(event):
consumer_name = 'Consumer A'
print('{} waiting'.format(consumer_name))
await event.wait()
print('{} triggered'.format(consumer_name))
async def consumer_b(event):
consumer_name = 'Consumer B'
print('{} waiting'.format(consumer_name))
await event.wait()
print('{} triggered'.format(consumer_name))
# event
event = asyncio.Event()
# wrap coroutines in one future
main_future = asyncio.wait([consumer_a(event),
consumer_b(event)])
# event loop
event_loop = asyncio.get_event_loop()
event_loop.call_later(0.1, functools.partial(trigger, event)) # trigger event in 0.1 sec
# complete main_future
done, pending = event_loop.run_until_complete(main_future)
Выход:
Consumer B waiting
Consumer A waiting
EVENT SET
Consumer B triggered
Consumer A triggered
Простой WebSocket
Здесь мы делаем простое эхо WebSocket использования asyncio
. Мы определяем сопрограммы для подключения к серверу и отправки / получения сообщений. В communcations по WebSocket выполняется в main
сопрограммы, который выполняется с помощью цикла обработки событий. В этом примере изменяется от предыдущего поста .
import asyncio
import aiohttp
session = aiohttp.ClientSession() # handles the context manager
class EchoWebsocket:
async def connect(self):
self.websocket = await session.ws_connect("wss://echo.websocket.org")
async def send(self, message):
self.websocket.send_str(message)
async def receive(self):
result = (await self.websocket.receive())
return result.data
async def main():
echo = EchoWebsocket()
await echo.connect()
await echo.send("Hello World!")
print(await echo.receive()) # "Hello World!"
if __name__ == '__main__':
# The main loop
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Распространенное заблуждение об асинчо
вероятно, наиболее распространенное заблуждение о asnycio
является то , что она позволяет запускать любую задачу параллельно - обходя GIL (глобальная блокировка интерпретатора) и , следовательно , выполнять блокирующие работу параллельно (в отдельных потоках). это не так!
asyncio
(и библиотеки, которые строятся для совместной работы с asyncio
) строить сопрограмм: функции , которые (совместно) дают поток управления обратно в вызывающую функцию. обратите внимание asyncio.sleep
в приведенных выше примерах. это пример неблокируемой сопрограммы , что ждет «в фоновом режиме» и дает управляющий поток обратно в вызывающую функцию (при вызове с await
). time.sleep
является примером функции блокировки. поток выполнения программы будет просто останавливаться и возвращать только после того, как time.sleep
закончил.
в режиме реального живой пример является requests
библиотека , которая состоит (в настоящее время) на блокирование только функции. нет параллелизма , если вы звоните любой из ее функций в asyncio
.aiohttp
с другой стороны , был построен с asyncio
в виду. его сопрограммы будут работать одновременно.
- если вы затянувшийся CPU переплете задач , которые вы хотели бы работать параллельно
asyncio
не для вас. для этого вам нужноthreads
илиmultiprocessing
.