Документация по Python

Многопоточность

В: Документация по Python

Введение

Примеры

Основы многопоточности

Используя threading модуль, новый поток выполнения может быть начат путем создания нового threading.Thread и присвоения ему функции для выполнения:

 import threading

def foo():
  print "Hello threading!"

my_thread = threading.Thread(target=foo)

 

target параметр ссылается на функцию (или вызываемый объект) , который будет работать. Нить не начнется выполнение до start не вызывается на Thread объекта.

Начало темы

 my_thread.start() # prints 'Hello threading!'

 

Теперь, когда my_thread побежал и завершается, вызов start снова будет производить RuntimeError . Если вы хотите , чтобы запустить нить , как демон, передавая daemon=True kwarg или установить my_thread.daemon в True перед вызовом start() , вызывает ваш Thread запустить в фоновом режиме , как демон.

Присоединение к теме

В тех случаях , когда вы разбили одну большую работу на несколько маленьких и хотят запустить их одновременно, но нужно ждать их все , чтобы закончить , прежде чем продолжить, Thread.join() является метод , который вы ищете.

Например, допустим, вы хотите загрузить несколько страниц веб-сайта и собрать их в одну страницу. Вы бы сделали это:

 import requests
from threading import Thread
from queue import Queue

q = Queue(maxsize=20)
def put_page_to_q(page_num):
    q.put(requests.get('http://some-website.com/page_%s.html' % page_num)

def compile(q):
    # magic function that needs all pages before being able to be executed
    if not q.full():
        raise ValueError
    else:
        print("Done compiling!")

threads = []
for page_num in range(20):
     t = Thread(target=requests.get, args=(page_num,))
     t.start()
     threads.append(t)

# Next, join all threads to make sure all threads are done running before
# we continue. join() is a blocking call (unless specified otherwise using 
# the kwarg blocking=False when calling join)
for t in threads:
    t.join()

# Call compile() now, since all threads have completed
compile(q)

 

Более пристальный взгляд на то, как join() работает , можно найти здесь .

Создать пользовательский класс потока

Использование threading.Thread класса мы можем создать подкласс нового пользовательского класса Thread. мы должны переопределить run метод в подклассе.

 from threading import Thread
import time

class Sleepy(Thread):

    def run(self):
        time.sleep(5)
        print("Hello form Thread")

if __name__ == "__main__":
    t = Sleepy()
    t.start()      # start method automatic call Thread class run method.
    # print 'The main program continues to run in foreground.'
    t.join()
    print("The main program continues to run in the foreground.")



 

Общение между потоками

В вашем коде несколько потоков, и вам нужно безопасно общаться между ними.

Вы можете использовать Queue из queue библиотеки.

 from queue import Queue
from threading import Thread

# create a data producer 
def producer(output_queue):
    while True:
        data = data_computation()

        output_queue.put(data)

# create a consumer
def consumer(input_queue):
    while True:
        # retrieve data (blocking)
        data = input_queue.get()

        # do something with the data

        # indicate data has been consumed
        input_queue.task_done()

 

Создание потоков производителей и потребителей с общей очередью

 q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start() 

Создание рабочего пула

Использование threading & queue :

 from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue

def echo_server(addr, nworkers):
    print('Echo server running at', addr)
    # Launch the client workers
    q = Queue()
    for n in range(nworkers):
        t = Thread(target=echo_client, args=(q,))
        t.daemon = True
        t.start()

    # Run the server
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(addr)
    sock.listen(5)
    while True:
        client_sock, client_addr = sock.accept()
        q.put((client_sock, client_addr))

echo_server(('',15000), 128)

 

Использование concurrent.futures.Threadpoolexecutor :

 from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor

def echo_server(addr):
    print('Echo server running at', addr)
    pool = ThreadPoolExecutor(128)
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(addr)
    sock.listen(5)
    while True:
        client_sock, client_addr = sock.accept()
        pool.submit(echo_client, client_sock, client_addr)

echo_server(('',15000))

 

Поваренная книга Питона, 3-е издание, Дэвидом Бизли и Брайаном К. Джонсом (О'Рейли). Copyright 2013 Дэвид Бизли и Брайан Джонс, 978-1-449-34037-7.

Расширенное использование многопоточности

Этот раздел будет содержать некоторые из самых сложных примеров, реализованных с использованием многопоточности.

Усовершенствованный принтер (регистратор)

Поток, который печатает все, получен и изменяет вывод в соответствии с шириной терминала. Приятной особенностью является то, что также «уже записанный» вывод изменяется при изменении ширины терминала.

#!/usr/bin/env python2

import threading
import Queue
import time
import sys
import subprocess
from backports.shutil_get_terminal_size import get_terminal_size

printq = Queue.Queue()
interrupt = False
lines = []

def main():

    ptt = threading.Thread(target=printer) # Turn the printer on
    ptt.daemon = True
    ptt.start()

    # Stupid example of stuff to print
    for i in xrange(1,100):
        printq.put(' '.join([str(x) for x in range(1,i)]))           # The actual way to send stuff to the printer
        time.sleep(.5)

def split_line(line, cols):
    if len(line) > cols:
        new_line = ''
        ww = line.split()
        i = 0
        while len(new_line) <= (cols - len(ww[i]) - 1):
            new_line += ww[i] + ' '
            i += 1
            print len(new_line)
        if new_line == '':
            return (line, '')

        return (new_line, ' '.join(ww[i:]))
    else:
        return (line, '')


def printer():

    while True:
        cols, rows = get_terminal_size() # Get the terminal dimensions
        msg = '#' + '-' * (cols - 2) + '#\n' # Create the
        try:
            new_line = str(printq.get_nowait())
            if new_line != '!@#EXIT#@!': # A nice way to turn the printer
                                         # thread out gracefully
                lines.append(new_line)
                printq.task_done()
            else:
                printq.task_done()
                sys.exit()
        except Queue.Empty:
            pass

        # Build the new message to show and split too long lines
        for line in lines:
            res = line          # The following is to split lines which are
                                # longer than cols.
            while len(res) !=0:
                toprint, res = split_line(res, cols)
                msg += '\n' + toprint

        # Clear the shell and print the new output
        subprocess.check_call('clear') # Keep the shell clean
        sys.stdout.write(msg)
        sys.stdout.flush()
        time.sleep(.5) 

Остановляемая нить с петлей

 import threading
import time

class StoppableThread(threading.Thread):
    """Thread class with a stop() method. The thread itself has to check
    regularly for the stopped() condition."""

    def __init__(self):
        super(StoppableThread, self).__init__()
        self._stop_event = threading.Event()

    def stop(self):
        self._stop_event.set()

    def join(self, *args, **kwargs):
        self.stop()
        super(StoppableThread,self).join(*args, **kwargs)

    def run()
        while not self._stop_event.is_set():
            print("Still running!")
            time.sleep(2)
        print("stopped!"

 

На основе этого вопроса .

Синтаксис

Параметры

Примечания

Еще от кодкамп
Замечательно! Вы успешно подписались.
Добро пожаловать обратно! Вы успешно вошли
Вы успешно подписались на кодкамп.
Срок действия вашей ссылки истек.
Ура! Проверьте свою электронную почту на наличие волшебной ссылки для входа.
Успех! Ваша платежная информация обновлена.
Ваша платежная информация не была обновлена.