Python LanguageПараллельность Python

замечания

Разработчики Python следили за тем, чтобы API между threading и multiprocessing был схожим, так что для обоих программистов проще переключаться между двумя вариантами.

Модуль резьбонарезания

from __future__ import print_function
import threading
def counter(count):
    while count > 0:
        print("Count value", count)
        count -= 1
    return

t1 = threading.Thread(target=countdown,args=(10,))
t1.start()
t2 = threading.Thread(target=countdown,args=(20,))
t2.start()

В некоторых реализациях Python, таких как CPython, истинный параллелизм не достигается с помощью потоков из-за использования так называемого GIL или G lobal I nterpreter L ock.

Вот отличный обзор параллелизма Python:

Параллельный подход Python Дэвида Бизли (YouTube)

Многопроцессорный модуль

from __future__ import print_function
import multiprocessing


def countdown(count):
    while count > 0:
        print("Count value", count)
        count -= 1
    return

if __name__ == "__main__":
    p1 = multiprocessing.Process(target=countdown, args=(10,))
    p1.start()

    p2 = multiprocessing.Process(target=countdown, args=(20,))
    p2.start()

    p1.join()
    p2.join()

Здесь каждая функция выполняется в новом процессе. Поскольку новый экземпляр Python VM запускает код, GIL не существует, и вы выполняете параллелизм на нескольких ядрах.

Метод Process.start запускает этот новый процесс и запускает функцию, переданную в target аргументе аргументами args . Метод Process.join ждет завершения выполнения процессов p1 и p2 .

Новые процессы запускаются по-разному в зависимости от версии python и формы табло, на которой работает код, например :

  • Для создания нового процесса Windows использует spawn .
  • В Unix-системах и версии раньше 3.3 процессы создаются с использованием fork .
    Обратите внимание, что этот метод не учитывает использование вилки POSIX и, таким образом, приводит к неожиданному поведению, особенно при взаимодействии с другими многопроцессорными библиотеками.
  • С системой unix и версией 3.4+ вы можете запускать новые процессы с помощью fork , forkserver или spawn с использованием multiprocessing.set_start_method в начале вашей программы. forkserver и spawn медленнее, чем forking, но избегают некоторых неожиданных действий.

Использование вилки POSIX :

После вилки в многопоточной программе, ребенок может безопасно вызывать только функции, защищенные от асинхронного сигнала, до тех пор, пока он не вызовет execve.
( см. )

Используя fork, новый процесс будет запущен с тем же самым состоянием для всех текущих мьютексов, но будет запущен только MainThread . Это небезопасно, так как это может привести к условиям гонки, например :

  • Если вы используете Lock в MainThread и передаете его другому потоку, который, предположительно, заблокирует его в какой-то момент. Если fork одновременно, новый процесс начинается с заблокированной блокировки, которая никогда не будет выпущена, поскольку второй поток не существует в этом новом процессе.

Собственно, такое поведение не должно происходить в чистом питоне, так как multiprocessing обрабатывает его правильно, но если вы взаимодействуете с другой библиотекой, такое поведение может происходить, что приводит к сбою вашей системы (например, с помощью numpy / speed на macOS).

Передача данных между процессами многопроцессорности

Поскольку данные чувствительны при взаимодействии между двумя потоками (думаю, одновременное чтение и одновременная запись могут конфликтовать друг с другом, вызывая условия гонки), был создан набор уникальных объектов, чтобы облегчить передачу данных между потоками. Любая действительно атомная операция может использоваться между потоками, но всегда безопасно придерживаться очереди.

import multiprocessing
import queue
my_Queue=multiprocessing.Queue() 
#Creates a queue with an undefined maximum size
#this can be dangerous as the queue becomes increasingly large
#it will take a long time to copy data to/from each read/write thread

Большинство людей полагают, что при использовании очереди всегда ставить данные очереди в try: except: block вместо использования пустого. Тем не менее, для приложений, где не имеет значения, если вы пропустите цикл сканирования (данные могут быть помещены в очередь, в то время как они перебрасывают состояния из queue.Empty==True в queue.Empty==False ), как правило, лучше размещать чтение и записывать доступ в том, что я называю блоком Iftry, потому что оператор «if» технически более эффективен, чем перехват исключения.

import multiprocessing
import queue
'''Import necessary Python standard libraries, multiprocessing for classes and queue for the queue exceptions it provides'''
def Queue_Iftry_Get(get_queue, default=None, use_default=False, func=None, use_func=False):
    '''This global method for the Iftry block is provided for it's reuse and 
standard functionality, the if also saves on performance as opposed to catching
 the exception, which is expencive.
        It also allows the user to specify a function for the outgoing data to use,
 and a default value to return if the function cannot return the value from the queue'''
        if get_queue.empty():
            if use_default:
                return default
        else:
            try:
                value = get_queue.get_nowait()
            except queue.Empty:
                if use_default:
                    return default
            else:
                if use_func:
                    return func(value)
                else:
                    return value
    def Queue_Iftry_Put(put_queue, value):
        '''This global method for the Iftry block is provided because of its reuse 
and 
standard functionality, the If also saves on performance as opposed to catching
 the exception, which is expensive.
        Return True if placing value in the queue was successful. Otherwise, false'''
        if put_queue.full():
            return False
        else:
            try:
                put_queue.put_nowait(value)
            except queue.Full:
                return False
            else:
                return True