Python Language
multithreading
Sök…
Introduktion
Trådar tillåter Python-program att hantera flera funktioner samtidigt i motsats till att köra en sekvens med kommandon individuellt. Detta ämne förklarar principerna bakom gängning och demonstrerar dess användning.
Grunderna i multithreading
Med hjälp av threading
kan en ny tråd för körning startas genom att skapa en ny threading.Thread
och tilldela den en funktion att utföra:
import threading
def foo():
print "Hello threading!"
my_thread = threading.Thread(target=foo)
De target
parameter referenser den funktions (eller inlösbara objekt) som ska köras. Tråden börjar inte köras förrän start
kallas på Thread
.
Starta en tråd
my_thread.start() # prints 'Hello threading!'
Nu när my_thread
har kört och avslutats, kallar start
återigen kommer att producera en RuntimeError
. Om du vill köra din tråd som en demon, vidarebefordra daemon=True
kwarg eller ställa my_thread.daemon
till True
innan du ringer start()
, får din Thread
att köra tyst i bakgrunden som en daemon.
Gå med i en tråd
I de fall du delar upp ett stort jobb i flera små och vill köra dem samtidigt, men behöver vänta på att alla avslutas innan du fortsätter, är Thread.join()
den metod du letar efter.
Låt oss till exempel säga att du vill ladda ner flera sidor på en webbplats och sammanställa dem till en enda sida. Du skulle göra detta:
import requests
from threading import Thread
from queue import Queue
q = Queue(maxsize=20)
def put_page_to_q(page_num):
q.put(requests.get('http://some-website.com/page_%s.html' % page_num)
def compile(q):
# magic function that needs all pages before being able to be executed
if not q.full():
raise ValueError
else:
print("Done compiling!")
threads = []
for page_num in range(20):
t = Thread(target=requests.get, args=(page_num,))
t.start()
threads.append(t)
# Next, join all threads to make sure all threads are done running before
# we continue. join() is a blocking call (unless specified otherwise using
# the kwarg blocking=False when calling join)
for t in threads:
t.join()
# Call compile() now, since all threads have completed
compile(q)
En närmare titt på hur join()
fungerar kan hittas här .
Skapa en anpassad trådklass
Med hjälp av threading.Thread
Trådklass kan vi underklassa ny anpassad trådklass. Vi måste åsidosätta run
metoden i en underklass.
from threading import Thread
import time
class Sleepy(Thread):
def run(self):
time.sleep(5)
print("Hello form Thread")
if __name__ == "__main__":
t = Sleepy()
t.start() # start method automatic call Thread class run method.
# print 'The main program continues to run in foreground.'
t.join()
print("The main program continues to run in the foreground.")
Kommunikera mellan trådar
Det finns flera trådar i din kod och du måste kommunicera säkert mellan dem.
Du kan använda en Queue
från queue
biblioteket.
from queue import Queue
from threading import Thread
# create a data producer
def producer(output_queue):
while True:
data = data_computation()
output_queue.put(data)
# create a consumer
def consumer(input_queue):
while True:
# retrieve data (blocking)
data = input_queue.get()
# do something with the data
# indicate data has been consumed
input_queue.task_done()
Skapa producent- och konsumenttrådar med en delad kö
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
Skapa en arbetarpool
Använda threading
och queue
:
from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue
def echo_server(addr, nworkers):
print('Echo server running at', addr)
# Launch the client workers
q = Queue()
for n in range(nworkers):
t = Thread(target=echo_client, args=(q,))
t.daemon = True
t.start()
# Run the server
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
q.put((client_sock, client_addr))
echo_server(('',15000), 128)
Använda concurrent.futures.Threadpoolexecutor
:
from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor
def echo_server(addr):
print('Echo server running at', addr)
pool = ThreadPoolExecutor(128)
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
pool.submit(echo_client, client_sock, client_addr)
echo_server(('',15000))
Python Cookbook, 3: e upplagan, av David Beazley och Brian K. Jones (O'Reilly). Copyright 2013 David Beazley och Brian Jones, 978-1-449-34037-7.
Avancerad användning av multithreads
Det här avsnittet kommer att innehålla några av de mest avancerade exemplen som realiserats med Multithreading.
Avancerad skrivare (logger)
En tråd som skriver ut allt tas emot och ändrar utgången enligt terminalbredden. Den trevliga delen är att även den "redan skrivna" utgången ändras när terminalens bredd förändras.
#!/usr/bin/env python2
import threading
import Queue
import time
import sys
import subprocess
from backports.shutil_get_terminal_size import get_terminal_size
printq = Queue.Queue()
interrupt = False
lines = []
def main():
ptt = threading.Thread(target=printer) # Turn the printer on
ptt.daemon = True
ptt.start()
# Stupid example of stuff to print
for i in xrange(1,100):
printq.put(' '.join([str(x) for x in range(1,i)])) # The actual way to send stuff to the printer
time.sleep(.5)
def split_line(line, cols):
if len(line) > cols:
new_line = ''
ww = line.split()
i = 0
while len(new_line) <= (cols - len(ww[i]) - 1):
new_line += ww[i] + ' '
i += 1
print len(new_line)
if new_line == '':
return (line, '')
return (new_line, ' '.join(ww[i:]))
else:
return (line, '')
def printer():
while True:
cols, rows = get_terminal_size() # Get the terminal dimensions
msg = '#' + '-' * (cols - 2) + '#\n' # Create the
try:
new_line = str(printq.get_nowait())
if new_line != '!@#EXIT#@!': # A nice way to turn the printer
# thread out gracefully
lines.append(new_line)
printq.task_done()
else:
printq.task_done()
sys.exit()
except Queue.Empty:
pass
# Build the new message to show and split too long lines
for line in lines:
res = line # The following is to split lines which are
# longer than cols.
while len(res) !=0:
toprint, res = split_line(res, cols)
msg += '\n' + toprint
# Clear the shell and print the new output
subprocess.check_call('clear') # Keep the shell clean
sys.stdout.write(msg)
sys.stdout.flush()
time.sleep(.5)
Stoppbar tråd med en stund Loop
import threading
import time
class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self):
super(StoppableThread, self).__init__()
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def join(self, *args, **kwargs):
self.stop()
super(StoppableThread,self).join(*args, **kwargs)
def run()
while not self._stop_event.is_set():
print("Still running!")
time.sleep(2)
print("stopped!"
Baserat på denna fråga .