Python Language
Многопоточность
Поиск…
Вступление
Темы позволяют программам Python обрабатывать сразу несколько функций, а не выполнять последовательность команд по отдельности. В этом разделе объясняются принципы работы с потоками и демонстрируется его использование.
Основы многопоточности
Используя threading
модуль, новый поток выполнения может быть начат путем создания нового threading.Thread
и присвоения ему функции для выполнения:
import threading
def foo():
print "Hello threading!"
my_thread = threading.Thread(target=foo)
target
параметр ссылается на функцию (или вызываемый объект), которая должна быть запущена. Нить не будет start
до тех пор, пока не start
вызов объекта Thread
.
Запуск темы
my_thread.start() # prints 'Hello threading!'
Теперь, когда my_thread
запущен и завершен, вызов start
снова приведет к RuntimeError
. Если вы хотите запустить свой поток в качестве демона, передав daemon=True
kwarg или установив my_thread.daemon
в True
перед вызовом start()
, ваш Thread
будет запускаться тихо в фоновом режиме в качестве демона.
Присоединение к теме
В случаях, когда вы разделяете одну большую работу на несколько небольших и хотите запускать их одновременно, но перед тем, как продолжить, нужно дождаться, пока все они закончатся, Thread.join()
- это метод, который вы ищете.
Например, скажем, вы хотите загрузить несколько страниц веб-сайта и скомпилировать их на одну страницу. Вы сделали бы это:
import requests
from threading import Thread
from queue import Queue
q = Queue(maxsize=20)
def put_page_to_q(page_num):
q.put(requests.get('http://some-website.com/page_%s.html' % page_num)
def compile(q):
# magic function that needs all pages before being able to be executed
if not q.full():
raise ValueError
else:
print("Done compiling!")
threads = []
for page_num in range(20):
t = Thread(target=requests.get, args=(page_num,))
t.start()
threads.append(t)
# Next, join all threads to make sure all threads are done running before
# we continue. join() is a blocking call (unless specified otherwise using
# the kwarg blocking=False when calling join)
for t in threads:
t.join()
# Call compile() now, since all threads have completed
compile(q)
Более пристальный взгляд на то, как работают функции join()
можно найти здесь .
Создание пользовательского класса темы
Используя класс threading.Thread
мы можем подклассифицировать новый пользовательский класс Thread. мы должны переопределить метод run
в подклассе.
from threading import Thread
import time
class Sleepy(Thread):
def run(self):
time.sleep(5)
print("Hello form Thread")
if __name__ == "__main__":
t = Sleepy()
t.start() # start method automatic call Thread class run method.
# print 'The main program continues to run in foreground.'
t.join()
print("The main program continues to run in the foreground.")
Общение между потоками
В коде есть несколько потоков, и вам нужно безопасно общаться между ними.
Вы можете использовать Queue
из библиотеки queue
.
from queue import Queue
from threading import Thread
# create a data producer
def producer(output_queue):
while True:
data = data_computation()
output_queue.put(data)
# create a consumer
def consumer(input_queue):
while True:
# retrieve data (blocking)
data = input_queue.get()
# do something with the data
# indicate data has been consumed
input_queue.task_done()
Создание потоков производителей и потребителей с общей очередью
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
Создание рабочего пула
Использование threading
и queue
:
from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue
def echo_server(addr, nworkers):
print('Echo server running at', addr)
# Launch the client workers
q = Queue()
for n in range(nworkers):
t = Thread(target=echo_client, args=(q,))
t.daemon = True
t.start()
# Run the server
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
q.put((client_sock, client_addr))
echo_server(('',15000), 128)
Использование concurrent.futures.Threadpoolexecutor
:
from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor
def echo_server(addr):
print('Echo server running at', addr)
pool = ThreadPoolExecutor(128)
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
pool.submit(echo_client, client_sock, client_addr)
echo_server(('',15000))
Python Cookbook, 3-е издание, Дэвид Бэйсли и Брайан К. Джонс (O'Reilly). Copyright 2013 Дэвид Бэйсли и Брайан Джонс, 978-1-449-34037-7.
Расширенное использование многопоточных
Этот раздел будет содержать некоторые из самых передовых примеров, реализованных с использованием многопоточности.
Расширенный принтер (регистратор)
Поток, который печатает все, принимается и изменяет выходные данные в соответствии с шириной терминала. Приятная часть заключается в том, что и «уже написанный» выход изменяется при изменении ширины терминала.
#!/usr/bin/env python2
import threading
import Queue
import time
import sys
import subprocess
from backports.shutil_get_terminal_size import get_terminal_size
printq = Queue.Queue()
interrupt = False
lines = []
def main():
ptt = threading.Thread(target=printer) # Turn the printer on
ptt.daemon = True
ptt.start()
# Stupid example of stuff to print
for i in xrange(1,100):
printq.put(' '.join([str(x) for x in range(1,i)])) # The actual way to send stuff to the printer
time.sleep(.5)
def split_line(line, cols):
if len(line) > cols:
new_line = ''
ww = line.split()
i = 0
while len(new_line) <= (cols - len(ww[i]) - 1):
new_line += ww[i] + ' '
i += 1
print len(new_line)
if new_line == '':
return (line, '')
return (new_line, ' '.join(ww[i:]))
else:
return (line, '')
def printer():
while True:
cols, rows = get_terminal_size() # Get the terminal dimensions
msg = '#' + '-' * (cols - 2) + '#\n' # Create the
try:
new_line = str(printq.get_nowait())
if new_line != '!@#EXIT#@!': # A nice way to turn the printer
# thread out gracefully
lines.append(new_line)
printq.task_done()
else:
printq.task_done()
sys.exit()
except Queue.Empty:
pass
# Build the new message to show and split too long lines
for line in lines:
res = line # The following is to split lines which are
# longer than cols.
while len(res) !=0:
toprint, res = split_line(res, cols)
msg += '\n' + toprint
# Clear the shell and print the new output
subprocess.check_call('clear') # Keep the shell clean
sys.stdout.write(msg)
sys.stdout.flush()
time.sleep(.5)
Стопорная нить с петлей while
import threading
import time
class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self):
super(StoppableThread, self).__init__()
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def join(self, *args, **kwargs):
self.stop()
super(StoppableThread,self).join(*args, **kwargs)
def run()
while not self._stop_event.is_set():
print("Still running!")
time.sleep(2)
print("stopped!"
Исходя из этого вопроса .