Python Language
マルチスレッド
サーチ…
前書き
スレッドは、一連のコマンドを個別に実行するのではなく、Pythonプログラムが複数の関数を同時に処理できるようにします。このトピックでは、スレッディングの原則について説明し、スレッドの使用方法を示します。
マルチスレッドの基礎
threading
モジュールを使用して、新しいスレッドthreadを作成し、実行する関数を割り当てることによって、新しいthreading.Thread
を開始することができます。
import threading
def foo():
print "Hello threading!"
my_thread = threading.Thread(target=foo)
target
パラメータは、実行される関数(または呼び出し可能オブジェクト)を参照します。スレッドは、 Thread
オブジェクトに対してstart
が呼び出されるまで、実行をstart
ません。
スレッドの開始
my_thread.start() # prints 'Hello threading!'
my_thread
が実行され終了したので、 start
再度呼び出すとRuntimeError
が生成されます。スレッドをデーモンとして実行したい場合は、 daemon=True
kwargを渡すか、 start()
呼び出す前にmy_thread.daemon
をTrue
に設定すると、 Thread
がバックグラウンドでサイレントmy_thread.daemon
デーモンとして実行されます。
スレッドに参加する
ある大きなジョブをいくつかの小さなジョブに分割し、それらを同時に実行したいが、それらのすべてが終了するのを待つ必要がある場合は、 Thread.join()
があなたが探しているメソッドです。
たとえば、ウェブサイトの複数のページをダウンロードして1つのページにまとめたいとします。あなたはこれをするでしょう:
import requests
from threading import Thread
from queue import Queue
q = Queue(maxsize=20)
def put_page_to_q(page_num):
q.put(requests.get('http://some-website.com/page_%s.html' % page_num)
def compile(q):
# magic function that needs all pages before being able to be executed
if not q.full():
raise ValueError
else:
print("Done compiling!")
threads = []
for page_num in range(20):
t = Thread(target=requests.get, args=(page_num,))
t.start()
threads.append(t)
# Next, join all threads to make sure all threads are done running before
# we continue. join() is a blocking call (unless specified otherwise using
# the kwarg blocking=False when calling join)
for t in threads:
t.join()
# Call compile() now, since all threads have completed
compile(q)
join()
動作を詳しく見ることができます 。
カスタムスレッドクラスを作成する
threading.Thread
クラスを使用すると、新しいカスタムThreadクラスをサブクラスthreading.Thread
できます。サブクラスでrun
メソッドをオーバーライドrun
必要があります。
from threading import Thread
import time
class Sleepy(Thread):
def run(self):
time.sleep(5)
print("Hello form Thread")
if __name__ == "__main__":
t = Sleepy()
t.start() # start method automatic call Thread class run method.
# print 'The main program continues to run in foreground.'
t.join()
print("The main program continues to run in the foreground.")
スレッド間の通信
コードには複数のスレッドがあり、それらの間で安全に通信する必要があります。
queue
ライブラリからQueue
を使用できます。
from queue import Queue
from threading import Thread
# create a data producer
def producer(output_queue):
while True:
data = data_computation()
output_queue.put(data)
# create a consumer
def consumer(input_queue):
while True:
# retrieve data (blocking)
data = input_queue.get()
# do something with the data
# indicate data has been consumed
input_queue.task_done()
共有キューを持つプロデューサスレッドとコンシューマスレッドの作成
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
ワーカー・プールの作成
threading
とqueue
使用:
from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue
def echo_server(addr, nworkers):
print('Echo server running at', addr)
# Launch the client workers
q = Queue()
for n in range(nworkers):
t = Thread(target=echo_client, args=(q,))
t.daemon = True
t.start()
# Run the server
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
q.put((client_sock, client_addr))
echo_server(('',15000), 128)
concurrent.futures.Threadpoolexecutor
使用:
from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor
def echo_server(addr):
print('Echo server running at', addr)
pool = ThreadPoolExecutor(128)
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
pool.submit(echo_client, client_sock, client_addr)
echo_server(('',15000))
David BeazleyとBrian K. Jones(O'Reilly)によるPython Cookbook、3rd edition。 Copyright 2013 David Beazley and Brian Jones、978-1-449-34037-7。
高度なマルチスレッドの使用
このセクションには、マルチスレッドを使用して実現された最も高度な例がいくつか含まれています。
高度なプリンタ(ロガー)
すべてを出力するスレッドが受信され、端末の幅に応じて出力が変更されます。いい部分は、端末の幅が変わったときに "既に書き込まれた"出力が変更されていることです。
#!/usr/bin/env python2
import threading
import Queue
import time
import sys
import subprocess
from backports.shutil_get_terminal_size import get_terminal_size
printq = Queue.Queue()
interrupt = False
lines = []
def main():
ptt = threading.Thread(target=printer) # Turn the printer on
ptt.daemon = True
ptt.start()
# Stupid example of stuff to print
for i in xrange(1,100):
printq.put(' '.join([str(x) for x in range(1,i)])) # The actual way to send stuff to the printer
time.sleep(.5)
def split_line(line, cols):
if len(line) > cols:
new_line = ''
ww = line.split()
i = 0
while len(new_line) <= (cols - len(ww[i]) - 1):
new_line += ww[i] + ' '
i += 1
print len(new_line)
if new_line == '':
return (line, '')
return (new_line, ' '.join(ww[i:]))
else:
return (line, '')
def printer():
while True:
cols, rows = get_terminal_size() # Get the terminal dimensions
msg = '#' + '-' * (cols - 2) + '#\n' # Create the
try:
new_line = str(printq.get_nowait())
if new_line != '!@#EXIT#@!': # A nice way to turn the printer
# thread out gracefully
lines.append(new_line)
printq.task_done()
else:
printq.task_done()
sys.exit()
except Queue.Empty:
pass
# Build the new message to show and split too long lines
for line in lines:
res = line # The following is to split lines which are
# longer than cols.
while len(res) !=0:
toprint, res = split_line(res, cols)
msg += '\n' + toprint
# Clear the shell and print the new output
subprocess.check_call('clear') # Keep the shell clean
sys.stdout.write(msg)
sys.stdout.flush()
time.sleep(.5)
whileループを持つ停止可能なスレッド
import threading
import time
class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self):
super(StoppableThread, self).__init__()
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def join(self, *args, **kwargs):
self.stop()
super(StoppableThread,self).join(*args, **kwargs)
def run()
while not self._stop_event.is_set():
print("Still running!")
time.sleep(2)
print("stopped!"
この質問に基づいて。