Python Language
Асинхронный модуль
Поиск…
Синтаксис Coroutine и Delegation
Перед выпуском Python 3.5+ модуль asyncio
использовал генераторы для имитации асинхронных вызовов и, следовательно, имел другой синтаксис, чем текущий выпуск Python 3.5.
Python 3.5 представил ключевые слова async
и await
. Обратите внимание на отсутствие круглых скобок вокруг вызова await func()
.
import asyncio
async def main():
print(await func())
async def func():
# Do time intensive stuff...
return "Hello, world!"
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Перед Python 3.5 для определения сопрограммы использовался @asyncio.coroutine
. Выход из выражения использовался для делегирования генератора. Обратите внимание на круглые скобки вокруг yield from func()
.
import asyncio
@asyncio.coroutine
def main():
print((yield from func()))
@asyncio.coroutine
def func():
# Do time intensive stuff..
return "Hello, world!"
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Вот пример, показывающий, как две функции могут выполняться асинхронно:
import asyncio
async def cor1():
print("cor1 start")
for i in range(10):
await asyncio.sleep(1.5)
print("cor1", i)
async def cor2():
print("cor2 start")
for i in range(15):
await asyncio.sleep(1)
print("cor2", i)
loop = asyncio.get_event_loop()
cors = asyncio.wait([cor1(), cor2()])
loop.run_until_complete(cors)
Асинхронные исполнители
Примечание. Использует синтаксис aync / wait Python 3.5+
asyncio
поддерживает использование объектов Executor
найденных в concurrent.futures
для планирования задач асинхронно. Циклы событий имеют функцию run_in_executor()
которая принимает параметры объекта Executor
, Callable
и Callable.
Планирование задания для Executor
import asyncio
from concurrent.futures import ThreadPoolExecutor
def func(a, b):
# Do time intensive stuff...
return a + b
async def main(loop):
executor = ThreadPoolExecutor()
result = await loop.run_in_executor(executor, func, "Hello,", " world!")
print(result)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
В каждом цикле событий также есть слот Executor
умолчанию, который может быть назначен Executor
. Чтобы назначить Executor
и назначить задачи из цикла, вы используете метод set_default_executor()
.
import asyncio
from concurrent.futures import ThreadPoolExecutor
def func(a, b):
# Do time intensive stuff...
return a + b
async def main(loop):
# NOTE: Using `None` as the first parameter designates the `default` Executor.
result = await loop.run_in_executor(None, func, "Hello,", " world!")
print(result)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.set_default_executor(ThreadPoolExecutor())
loop.run_until_complete(main(loop))
Существует два основных типа Executor
в concurrent.futures
, ThreadPoolExecutor
и ProcessPoolExecutor
. ThreadPoolExecutor
содержит пул потоков, который может быть либо вручную настроен на определенное количество потоков через конструктор, либо по умолчанию равен числу ядер в машинных моментах 5. ThreadPoolExecutor
использует пул потоков для выполнения назначенных ему задач и как правило, лучше для операций с привязкой к ЦП, а не для операций, связанных с I / O. Сравните это с ProcessPoolExecutor
который порождает новый процесс для каждой назначенной ему задачи. ProcessPoolExecutor
может принимать только задачи и параметры, которые могут быть выбраны. Наиболее распространенными задачами, которые не учитываются, являются методы объектов. Если вы должны запланировать метод объекта как задачу в Executor
вы должны использовать ThreadPoolExecutor
.
Использование UVLoop
uvloop
- это реализация для asyncio.AbstractEventLoop
на основе libuv (Используется nodejs). Он совместим с 99% функций asyncio
и намного быстрее, чем традиционный asyncio.EventLoop
. uvloop
в настоящее время недоступен в Windows, установите его с помощью pip install uvloop
.
import asyncio
import uvloop
if __name__ == "__main__":
asyncio.set_event_loop(uvloop.new_event_loop())
# Do your stuff here ...
Можно также изменить фабрику циклов событий, установив EventLoopPolicy
в ту, которая находится в uvloop
.
import asyncio
import uvloop
if __name__ == "__main__":
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.new_event_loop()
Примитив синхронизации: событие
концепция
Используйте Event
чтобы синхронизировать планирование нескольких сопрограмм .
Проще говоря, событие похоже на выстрел из пушки в бегущей гонке: он позволяет бегунам покинуть стартовые блоки.
пример
import asyncio
# event trigger function
def trigger(event):
print('EVENT SET')
event.set() # wake up coroutines waiting
# event consumers
async def consumer_a(event):
consumer_name = 'Consumer A'
print('{} waiting'.format(consumer_name))
await event.wait()
print('{} triggered'.format(consumer_name))
async def consumer_b(event):
consumer_name = 'Consumer B'
print('{} waiting'.format(consumer_name))
await event.wait()
print('{} triggered'.format(consumer_name))
# event
event = asyncio.Event()
# wrap coroutines in one future
main_future = asyncio.wait([consumer_a(event),
consumer_b(event)])
# event loop
event_loop = asyncio.get_event_loop()
event_loop.call_later(0.1, functools.partial(trigger, event)) # trigger event in 0.1 sec
# complete main_future
done, pending = event_loop.run_until_complete(main_future)
Выход:
Ожидание потребителя B
Потребитель Ожидание
СОБЫТИЕ
Потребитель B активирован
Потребитель А
Простой веб-узел
Здесь мы делаем простой echo websocket, используя asyncio
. Мы определяем сопрограммы для подключения к серверу и отправки / получения сообщений. Передача websocket выполняется в main
сопрограмме, которая управляется контуром событий. Этот пример изменен из предыдущего сообщения .
import asyncio
import aiohttp
session = aiohttp.ClientSession() # handles the context manager
class EchoWebsocket:
async def connect(self):
self.websocket = await session.ws_connect("wss://echo.websocket.org")
async def send(self, message):
self.websocket.send_str(message)
async def receive(self):
result = (await self.websocket.receive())
return result.data
async def main():
echo = EchoWebsocket()
await echo.connect()
await echo.send("Hello World!")
print(await echo.receive()) # "Hello World!"
if __name__ == '__main__':
# The main loop
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Общее Заблуждение о asyncio
вероятно, наиболее распространенное заблуждение о asnycio
является то , что она позволяет запускать любую задачу параллельно - обходя GIL (глобальная блокировка интерпретатора) и , следовательно , выполнять блокирующие работу параллельно (в отдельных потоках). это не так !
asyncio
(и библиотеки, созданные для совместной работы с asyncio
), построены на сопрограмме: функции, которые (совместно) возвращают поток управления вызывающей функции. asyncio.sleep
приведенных выше примерах обратите внимание на asyncio.sleep
. это пример неблокирующей сопрограммы, которая ждет «в фоновом режиме» и возвращает управляющий поток вызывающей функции (при вызове с await
). time.sleep
является примером функции блокировки. поток выполнения программы будет просто остановлен и только вернется после окончания time.sleep
.
реальным примером является библиотека requests
которая состоит (пока) только для функций блокировки. нет параллелизма, если вы вызываете любую из своих функций в asyncio
. aiohttp
с другой стороны, был построен с asyncio
. его сопрограммы будут выполняться одновременно.
если у вас есть длительные задачи с привязкой к процессору, которые вы хотели бы запускать параллельно,
asyncio
не для вас. для этого вам нужныthreads
илиmultiprocessing
.если у вас запущены задания с привязкой к IO, вы можете запускать их одновременно с помощью
asyncio
.