Suche…


Einführung

Mit Threads können Python-Programme mehrere Funktionen gleichzeitig ausführen, anstatt eine Befehlsfolge einzeln auszuführen. In diesem Thema werden die Prinzipien für das Threading erläutert und seine Verwendung veranschaulicht.

Grundlagen des Multithreading

Mit dem threading Modul kann ein neuer Ausführungsthread gestartet werden, indem ein neues threading.Thread und ihm eine Funktion zur Ausführung zugewiesen wird:

import threading

def foo():
  print "Hello threading!"

my_thread = threading.Thread(target=foo)

Der target verweist auf die Funktion (oder aufrufbare Objekt) ausgeführt werden. Der Thread beginnt erst mit der Ausführung, wenn start für das Thread Objekt aufgerufen wird.

Einen Thread starten

my_thread.start() # prints 'Hello threading!'

my_thread nun ausgeführt und beendet wurde, wird beim RuntimeError Aufruf von start ein RuntimeError . Wenn Sie Ihren Thread als Daemon ausführen möchten, den daemon=True kwarg übergeben oder my_thread.daemon vor dem Aufruf von start() auf True , wird Ihr Thread als Daemon im Hintergrund ausgeführt.

Einen Thread verbinden

Wenn Sie einen großen Job in mehrere kleine aufteilen und gleichzeitig ausführen möchten, aber warten müssen, bis alle abgeschlossen sind, bevor Sie fortfahren, ist Thread.join() die Methode, nach der Sie suchen.

Angenommen, Sie möchten mehrere Seiten einer Website herunterladen und diese auf einer einzigen Seite zusammenstellen. Sie würden das tun:

import requests
from threading import Thread
from queue import Queue

q = Queue(maxsize=20)
def put_page_to_q(page_num):
    q.put(requests.get('http://some-website.com/page_%s.html' % page_num)

def compile(q):
    # magic function that needs all pages before being able to be executed
    if not q.full():
        raise ValueError
    else:
        print("Done compiling!")

threads = []
for page_num in range(20):
     t = Thread(target=requests.get, args=(page_num,))
     t.start()
     threads.append(t)

# Next, join all threads to make sure all threads are done running before
# we continue. join() is a blocking call (unless specified otherwise using 
# the kwarg blocking=False when calling join)
for t in threads:
    t.join()

# Call compile() now, since all threads have completed
compile(q)

Einen genaueren Einblick in die Funktionsweise von join() finden Sie hier .

Erstellen Sie eine benutzerdefinierte Thread-Klasse

Mit der threading.Thread Klasse können wir eine neue benutzerdefinierte Thread-Klasse unterteilen. Wir müssen die run Methode in einer Unterklasse überschreiben.

from threading import Thread
import time

class Sleepy(Thread):

    def run(self):
        time.sleep(5)
        print("Hello form Thread")

if __name__ == "__main__":
    t = Sleepy()
    t.start()      # start method automatic call Thread class run method.
    # print 'The main program continues to run in foreground.'
    t.join()
    print("The main program continues to run in the foreground.")

Kommunikation zwischen Threads

Ihr Code enthält mehrere Threads, und Sie müssen sicher zwischen ihnen kommunizieren.

Sie können eine Queue aus der queue verwenden.

from queue import Queue
from threading import Thread

# create a data producer 
def producer(output_queue):
    while True:
        data = data_computation()
        
        output_queue.put(data)

# create a consumer
def consumer(input_queue):
    while True:
        # retrieve data (blocking)
        data = input_queue.get()

        # do something with the data

        # indicate data has been consumed
        input_queue.task_done()

Producer- und Consumer-Threads mit einer gemeinsam genutzten Warteschlange erstellen

q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

Anlegen eines Worker-Pools

threading und queue :

from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue
    
def echo_server(addr, nworkers):
    print('Echo server running at', addr)
    # Launch the client workers
    q = Queue()
    for n in range(nworkers):
        t = Thread(target=echo_client, args=(q,))
        t.daemon = True
        t.start()

    # Run the server
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(addr)
    sock.listen(5)
    while True:
        client_sock, client_addr = sock.accept()
        q.put((client_sock, client_addr))

echo_server(('',15000), 128)

concurrent.futures.Threadpoolexecutor :

from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor

def echo_server(addr):
    print('Echo server running at', addr)
    pool = ThreadPoolExecutor(128)
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(addr)
    sock.listen(5)
    while True:
        client_sock, client_addr = sock.accept()
        pool.submit(echo_client, client_sock, client_addr)

echo_server(('',15000))

Python Cookbook, 3. Auflage, von David Beazley und Brian K. Jones (O'Reilly). Copyright 2013 David Beazley und Brian Jones, 978-1-449-34037-7.

Erweiterte Verwendung von Multithreads

Dieser Abschnitt enthält einige der fortschrittlichsten Beispiele, die mit Multithreading realisiert wurden.

Erweiterter Drucker (Logger)

Ein Thread, der alles druckt, wird empfangen und ändert die Ausgabe entsprechend der Klemmenbreite. Das Schöne daran ist, dass auch die "bereits geschriebene" Ausgabe geändert wird, wenn sich die Breite des Terminals ändert.

#!/usr/bin/env python2

import threading
import Queue
import time
import sys
import subprocess
from backports.shutil_get_terminal_size import get_terminal_size

printq = Queue.Queue()
interrupt = False
lines = []

def main():

    ptt = threading.Thread(target=printer) # Turn the printer on
    ptt.daemon = True
    ptt.start()

    # Stupid example of stuff to print
    for i in xrange(1,100):
        printq.put(' '.join([str(x) for x in range(1,i)]))           # The actual way to send stuff to the printer
        time.sleep(.5)

def split_line(line, cols):
    if len(line) > cols:
        new_line = ''
        ww = line.split()
        i = 0
        while len(new_line) <= (cols - len(ww[i]) - 1):
            new_line += ww[i] + ' '
            i += 1
            print len(new_line)
        if new_line == '':
            return (line, '')

        return (new_line, ' '.join(ww[i:]))
    else:
        return (line, '')


def printer():

    while True:
        cols, rows = get_terminal_size() # Get the terminal dimensions
        msg = '#' + '-' * (cols - 2) + '#\n' # Create the
        try:
            new_line = str(printq.get_nowait())
            if new_line != '!@#EXIT#@!': # A nice way to turn the printer
                                         # thread out gracefully
                lines.append(new_line)
                printq.task_done()
            else:
                printq.task_done()
                sys.exit()
        except Queue.Empty:
            pass

        # Build the new message to show and split too long lines
        for line in lines:
            res = line          # The following is to split lines which are
                                # longer than cols.
            while len(res) !=0:
                toprint, res = split_line(res, cols)
                msg += '\n' + toprint

        # Clear the shell and print the new output
        subprocess.check_call('clear') # Keep the shell clean
        sys.stdout.write(msg)
        sys.stdout.flush()
        time.sleep(.5)

Stoppbarer Thread mit einer while-Schleife

import threading
import time

class StoppableThread(threading.Thread):
    """Thread class with a stop() method. The thread itself has to check
    regularly for the stopped() condition."""

    def __init__(self):
        super(StoppableThread, self).__init__()
        self._stop_event = threading.Event()

    def stop(self):
        self._stop_event.set()

    def join(self, *args, **kwargs):
        self.stop()
        super(StoppableThread,self).join(*args, **kwargs)

    def run()
        while not self._stop_event.is_set():
            print("Still running!")
            time.sleep(2)
        print("stopped!"

Basierend auf dieser Frage .



Modified text is an extract of the original Stack Overflow Documentation
Lizenziert unter CC BY-SA 3.0
Nicht angeschlossen an Stack Overflow