Python Language
Multithreading
Recherche…
Introduction
Les threads permettent aux programmes Python de gérer plusieurs fonctions à la fois, plutôt que d'exécuter une séquence de commandes individuellement. Cette rubrique explique les principes qui sous-tendent les threads et illustre son utilisation.
Bases du multithreading
En utilisant le module de threading
, un nouveau thread d'exécution peut être démarré en créant un nouveau threading.Thread
et en lui assignant une fonction à exécuter:
import threading
def foo():
print "Hello threading!"
my_thread = threading.Thread(target=foo)
Le paramètre target
référence à la fonction (ou à l'objet appelable) à exécuter. Le thread ne commencera pas l'exécution tant que l'objet Thread
n'a pas start
.
Commencer un fil
my_thread.start() # prints 'Hello threading!'
Maintenant que my_thread
a été exécuté et terminé, l'appel start
nouveau produira une RuntimeError
. Si vous souhaitez exécuter votre thread en tant que démon, en passant le daemon=True
kwarg ou en définissant my_thread.daemon
sur True
avant d'appeler start()
, votre Thread
s'exécute en arrière-plan en tant que démon.
Rejoindre un fil
Dans les cas où vous divisez un gros job en plusieurs petits et que vous souhaitez les exécuter simultanément, mais que vous devez attendre qu'ils soient tous terminés avant de continuer, Thread.join()
est la méthode que vous recherchez.
Par exemple, supposons que vous souhaitiez télécharger plusieurs pages d'un site Web et les compiler en une seule page. Vous feriez ceci:
import requests
from threading import Thread
from queue import Queue
q = Queue(maxsize=20)
def put_page_to_q(page_num):
q.put(requests.get('http://some-website.com/page_%s.html' % page_num)
def compile(q):
# magic function that needs all pages before being able to be executed
if not q.full():
raise ValueError
else:
print("Done compiling!")
threads = []
for page_num in range(20):
t = Thread(target=requests.get, args=(page_num,))
t.start()
threads.append(t)
# Next, join all threads to make sure all threads are done running before
# we continue. join() is a blocking call (unless specified otherwise using
# the kwarg blocking=False when calling join)
for t in threads:
t.join()
# Call compile() now, since all threads have completed
compile(q)
Un aperçu plus précis de la manière dont fonctionne join()
peut être trouvé ici .
Créer une classe de threads personnalisée
En utilisant la classe threading.Thread
, on peut sous-classer la nouvelle classe de threads personnalisée. nous devons remplacer la méthode d' run
dans une sous-classe.
from threading import Thread
import time
class Sleepy(Thread):
def run(self):
time.sleep(5)
print("Hello form Thread")
if __name__ == "__main__":
t = Sleepy()
t.start() # start method automatic call Thread class run method.
# print 'The main program continues to run in foreground.'
t.join()
print("The main program continues to run in the foreground.")
Communiquer entre les threads
Il y a plusieurs threads dans votre code et vous devez communiquer entre eux en toute sécurité.
Vous pouvez utiliser une Queue
d' Queue
de la bibliothèque de queue
.
from queue import Queue
from threading import Thread
# create a data producer
def producer(output_queue):
while True:
data = data_computation()
output_queue.put(data)
# create a consumer
def consumer(input_queue):
while True:
# retrieve data (blocking)
data = input_queue.get()
# do something with the data
# indicate data has been consumed
input_queue.task_done()
Création de threads producteur et consommateur avec une file d'attente partagée
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
Création d'un pool de travailleurs
Utilisation de threading
& queue
:
from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue
def echo_server(addr, nworkers):
print('Echo server running at', addr)
# Launch the client workers
q = Queue()
for n in range(nworkers):
t = Thread(target=echo_client, args=(q,))
t.daemon = True
t.start()
# Run the server
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
q.put((client_sock, client_addr))
echo_server(('',15000), 128)
En utilisant concurrent.futures.Threadpoolexecutor
:
from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor
def echo_server(addr):
print('Echo server running at', addr)
pool = ThreadPoolExecutor(128)
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
pool.submit(echo_client, client_sock, client_addr)
echo_server(('',15000))
Python Cookbook, 3ème édition, par David Beazley et Brian K. Jones (O'Reilly). Copyright 2013 David Beazley et Brian Jones, 978-1-449-34037-7.
Utilisation avancée de multithreads
Cette section contient certains des exemples les plus avancés réalisés avec Multithreading.
Imprimante avancée (enregistreur)
Un thread qui imprime tout est reçu et modifie la sortie en fonction de la largeur du terminal. Le point intéressant est que la sortie "déjà écrite" est modifiée lorsque la largeur du terminal change.
#!/usr/bin/env python2
import threading
import Queue
import time
import sys
import subprocess
from backports.shutil_get_terminal_size import get_terminal_size
printq = Queue.Queue()
interrupt = False
lines = []
def main():
ptt = threading.Thread(target=printer) # Turn the printer on
ptt.daemon = True
ptt.start()
# Stupid example of stuff to print
for i in xrange(1,100):
printq.put(' '.join([str(x) for x in range(1,i)])) # The actual way to send stuff to the printer
time.sleep(.5)
def split_line(line, cols):
if len(line) > cols:
new_line = ''
ww = line.split()
i = 0
while len(new_line) <= (cols - len(ww[i]) - 1):
new_line += ww[i] + ' '
i += 1
print len(new_line)
if new_line == '':
return (line, '')
return (new_line, ' '.join(ww[i:]))
else:
return (line, '')
def printer():
while True:
cols, rows = get_terminal_size() # Get the terminal dimensions
msg = '#' + '-' * (cols - 2) + '#\n' # Create the
try:
new_line = str(printq.get_nowait())
if new_line != '!@#EXIT#@!': # A nice way to turn the printer
# thread out gracefully
lines.append(new_line)
printq.task_done()
else:
printq.task_done()
sys.exit()
except Queue.Empty:
pass
# Build the new message to show and split too long lines
for line in lines:
res = line # The following is to split lines which are
# longer than cols.
while len(res) !=0:
toprint, res = split_line(res, cols)
msg += '\n' + toprint
# Clear the shell and print the new output
subprocess.check_call('clear') # Keep the shell clean
sys.stdout.write(msg)
sys.stdout.flush()
time.sleep(.5)
Thread bloquable avec une boucle while
import threading
import time
class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self):
super(StoppableThread, self).__init__()
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def join(self, *args, **kwargs):
self.stop()
super(StoppableThread,self).join(*args, **kwargs)
def run()
while not self._stop_event.is_set():
print("Still running!")
time.sleep(2)
print("stopped!"
Basé sur cette question .