Python Language
Параллельность Python
Поиск…
замечания
Разработчики Python следили за тем, чтобы API между threading
и multiprocessing
был схожим, так что для обоих программистов проще переключаться между двумя вариантами.
Модуль резьбонарезания
from __future__ import print_function
import threading
def counter(count):
while count > 0:
print("Count value", count)
count -= 1
return
t1 = threading.Thread(target=countdown,args=(10,))
t1.start()
t2 = threading.Thread(target=countdown,args=(20,))
t2.start()
В некоторых реализациях Python, таких как CPython, истинный параллелизм не достигается с помощью потоков из-за использования так называемого GIL или G lobal I nterpreter L ock.
Вот отличный обзор параллелизма Python:
Параллельный подход Python Дэвида Бизли (YouTube)
Многопроцессорный модуль
from __future__ import print_function
import multiprocessing
def countdown(count):
while count > 0:
print("Count value", count)
count -= 1
return
if __name__ == "__main__":
p1 = multiprocessing.Process(target=countdown, args=(10,))
p1.start()
p2 = multiprocessing.Process(target=countdown, args=(20,))
p2.start()
p1.join()
p2.join()
Здесь каждая функция выполняется в новом процессе. Поскольку новый экземпляр Python VM запускает код, GIL
не существует, и вы выполняете параллелизм на нескольких ядрах.
Метод Process.start
запускает этот новый процесс и запускает функцию, переданную в target
аргументе аргументами args
. Метод Process.join
ждет завершения выполнения процессов p1
и p2
.
Новые процессы запускаются по-разному в зависимости от версии python и формы табло, на которой работает код, например :
- Для создания нового процесса Windows использует
spawn
. - В Unix-системах и версии раньше 3.3 процессы создаются с использованием
fork
.
Обратите внимание, что этот метод не учитывает использование вилки POSIX и, таким образом, приводит к неожиданному поведению, особенно при взаимодействии с другими многопроцессорными библиотеками. - С системой unix и версией 3.4+ вы можете запускать новые процессы с помощью
fork
,forkserver
илиspawn
с использованиемmultiprocessing.set_start_method
в начале вашей программы.forkserver
иspawn
медленнее, чем forking, но избегают некоторых неожиданных действий.
Использование вилки POSIX :
После вилки в многопоточной программе, ребенок может безопасно вызывать только функции, защищенные от асинхронного сигнала, до тех пор, пока он не вызовет execve.
( см. )
Используя fork, новый процесс будет запущен с тем же самым состоянием для всех текущих мьютексов, но будет запущен только MainThread
. Это небезопасно, так как это может привести к условиям гонки, например :
- Если вы используете
Lock
вMainThread
и передаете его другому потоку, который, предположительно, заблокирует его в какой-то момент. Еслиfork
одновременно, новый процесс начинается с заблокированной блокировки, которая никогда не будет выпущена, поскольку второй поток не существует в этом новом процессе.
Собственно, такое поведение не должно происходить в чистом питоне, так как multiprocessing
обрабатывает его правильно, но если вы взаимодействуете с другой библиотекой, такое поведение может происходить, что приводит к сбою вашей системы (например, с помощью numpy / speed на macOS).
Передача данных между процессами многопроцессорности
Поскольку данные чувствительны при взаимодействии между двумя потоками (думаю, одновременное чтение и одновременная запись могут конфликтовать друг с другом, вызывая условия гонки), был создан набор уникальных объектов, чтобы облегчить передачу данных между потоками. Любая действительно атомная операция может использоваться между потоками, но всегда безопасно придерживаться очереди.
import multiprocessing
import queue
my_Queue=multiprocessing.Queue()
#Creates a queue with an undefined maximum size
#this can be dangerous as the queue becomes increasingly large
#it will take a long time to copy data to/from each read/write thread
Большинство людей полагают, что при использовании очереди всегда ставить данные очереди в try: except: block вместо использования пустого. Тем не менее, для приложений, где не имеет значения, если вы пропустите цикл сканирования (данные могут быть помещены в очередь, в то время как они перебрасывают состояния из queue.Empty==True
в queue.Empty==False
), как правило, лучше размещать чтение и записывать доступ в том, что я называю блоком Iftry, потому что оператор «if» технически более эффективен, чем перехват исключения.
import multiprocessing
import queue
'''Import necessary Python standard libraries, multiprocessing for classes and queue for the queue exceptions it provides'''
def Queue_Iftry_Get(get_queue, default=None, use_default=False, func=None, use_func=False):
'''This global method for the Iftry block is provided for it's reuse and
standard functionality, the if also saves on performance as opposed to catching
the exception, which is expencive.
It also allows the user to specify a function for the outgoing data to use,
and a default value to return if the function cannot return the value from the queue'''
if get_queue.empty():
if use_default:
return default
else:
try:
value = get_queue.get_nowait()
except queue.Empty:
if use_default:
return default
else:
if use_func:
return func(value)
else:
return value
def Queue_Iftry_Put(put_queue, value):
'''This global method for the Iftry block is provided because of its reuse
and
standard functionality, the If also saves on performance as opposed to catching
the exception, which is expensive.
Return True if placing value in the queue was successful. Otherwise, false'''
if put_queue.full():
return False
else:
try:
put_queue.put_nowait(value)
except queue.Full:
return False
else:
return True