Блог

Python в три ручья: работаем с потоками (часть 1)

В каких случаях вам нужна многопоточность, как реализовать её на Python и что нужно знать о глобальной блокировке GIL.
04 мая 2018Cbc0f0509128ea8a7487ec71c1240fb531db396bМария Лисянская5657114

Из этой статьи вы узнаете, как с Python выполнять несколько операций одновременно и распределять нагрузку между ядрами процессора, какие особенности языка  учитывать. Но главное — поймете, когда многопоточность в Python нужна, а когда только мешает.

Небольшое предупреждение для тех, кто впервые слышит о параллельных вычислениях. Что такое поток и чем он отличается от процесса, мы выяснили в статье «Внутри процесса: многопоточность и пинг-понг mutex'ом». Тогда мы приводили примеры на Java, но теоретические основы многопоточности верны и для Python. Совпадают, в том числе, механизмы синхронизации потоков: семафоры, взаимные исключения (mutex), условия, события. Поэтому сегодня сделаем акцент на особенностях Python, его механизмах и инструментах, связанных с многопоточностью.

Организовать параллельные вычисления в Python без внешних библиотек можно с помощью модулей:

  • threading — для управления потоками.
  • queue —  для организации очередей.
  • multiprocessing  — для управления процессами.

Пока нас интересует только первый пункт списка.

Как создавать потоки в Python

Метод 1  — «функциональный»

Для работы с потоками из модуля threading импортируем класс Thread. В начале кода пишем:

from threading import Thread

После этого нам будет доступна функция Thread()  — с ней легко создавать потоки. Синтаксис такой:

variable = Thread(target=function_name, args=(arg1, arg2,))

Первый параметр target — это «целевая» функция, которая определяет поведение потока и создаётся заранее. Следом идёт список аргументов. Если судьбу аргументов (например, кто будет делимым, а кто делителем в уравнении) определяет их позиция, их записывают как args=(x,y). Если же вам нужны аргументы в виде пар «ключ-значение», используйте запись вида kwargs={‘prop’:120}.

Ради удобства отладки можно также дать новому потоку имя. Для этого среди параметров функции прописывают name=«Имя потока». По умолчанию name хранит значение null. А ещё потоки можно группировать с помощью параметра group, который по умолчанию — None.

За дело! Пусть два потока параллельно выводят каждый в свой файл заданное число строк. Для начала нам понадобится функция, которая выполнит задуманный нами сценарий. Аргументами целевой функции будут число строк и имя текстового файла для записи.

Давайте попробуем:

#coding: UTF-8
from threading import Thread

def prescript(thefile, num):
    with open(thefile, 'w') as f:
        for i in range(num):
            if num > 500:
                f.write('МногоБукв\n')
            else:
                f.write('МалоБукв\n')

thread1 = Thread(target=prescript, args=('f1.txt', 200,))
thread2 = Thread(target=prescript, args=('f2.txt', 1000,))

thread1.start()
thread2.start()
thread1.join()
thread2.join()

Что start() запускает ранее созданный поток, вы уже догадались. Метод join() останавливает поток, когда тот выполнит свои задачи. Ведь нужно закрыть открытые файлы и освободить занятые ресурсы. Это называется «Уходя, гасите свет». Завершать потоки в предсказуемый момент и явно  — надёжнее, чем снаружи и неизвестно когда. Меньше риск, что вмешаются случайные факторы. В качестве параметра в скобках можно указать, на сколько секунд блокировать поток перед продолжением его работы.

Метод 2  — «классовый»

Для потока со сложным поведением обычно пишут отдельный класс, который наследуют от Thread из модуля threading. В этом случае программу действий потока прописывают в методе run() созданного класса. Ту же петрушку мы видели и в Java.

#coding: UTF-8

import threading
class MyThread(threading.Thread):
    def __init__(self, num):
        super().__init__(self, name="threddy" + num)
        self.num = num
    def run(self):
        print ("Thread ", self.num),
thread1 = MyThread("1")
thread2 = MyThread("2")
thread1.start()
thread2.start()
thread1.join()
thread2.join()

Стандартные методы работы с потоками

Чтобы управлять потоками, нужно следить, как они себя ведут. И для этого в threading есть специальные методы:

current_thread()  — смотрим, какой поток вызвал функцию;

active_count() — считаем работающие в данный момент экземпляры класса Thread;

enumerate()   — получаем список работающих потоков.

Ещё можно управлять потоком через методы класса:

is_alive()  —  спрашиваем поток: «Жив ещё, курилка?»  — получаем true или false;

getName()  — узнаём имя потока;

setName(any_name)  — даём потоку имя;

У каждого потока, пока он работает, есть уникальный идентификационный номер, который хранится в переменной ident.

thread1.start()
print(thread1.ident)

Отсрочить операции в вызываемых потоком функциях можно с помощью таймера. В инициализаторе объектов класса Timer всего два аргумента — время ожидания в секундах и функция, которую нужно в итоге выполнить:

import threading
print ("Waiting...")
def timer_test():
    print ("The timer has done its job!")
tim = threading.Timer(5.0, timer_test)
tim.start()

Таймер можно один раз создать, а затем запускать в разных частях кода.

Потусторонние потоки

Обычно Python-приложение не завершается, пока работает хоть один его поток. Но есть особые потоки, которые не мешают закрытию программы и останавливается вместе с ней. Их называют демонами (daemons). Проверить, является ли поток демоном, можно методом isDaemon(). Если является, метод вернёт истину.

Назначить поток демоном можно  при создании — через параметр “daemon=True” или аргумент в инициализаторе класса.

thread0 = Thread(target=target_func, kwargs={‘x’:10}, daemon=True)

Не поздно демонизировать и уже существующий поток методом setDaemon(daemonic).

Всё бы ничего, но это даже не верхушка айсберга, потому что прямо сейчас нас ждут великие открытия.

Приключение начинается. У древнего шлюза

Питон слывёт дружелюбным и простым в общении, но есть у него причуды. Нельзя просто взять и воспользоваться всеми преимуществами многопоточности в Python! Дорогу вам преградит огромный шлюз… Даже так — глобальный шлюз (Global Interpreter Lock, он же GIL), который ограничивает многопоточность на уровне интерпретатора. Технически, это один на всех mutex, созданный по умолчанию. Такого нет ни в C, ни в Java.

Задача шлюза  — пропускать потоки строго по одному, чтоб не летали наперегонки, как печально известные стритрейсеры, и не создавали угрозу работе интерпретатора.

Без шлюза потоки подрезали бы друг друга, чтобы первыми добраться до памяти, но это еще не всё. Они имеют обыкновение внезапно засыпать за рулём! Операционная система не спрашивает, вовремя или невовремя  — просто усыпляет их в ей одной известный момент. Из-за этого неупорядоченные потоки могут неожиданно перехватывать друг у друга инициативу в работе с общими ресурсами.

Дезориентированный спросонок поток, который видит перед собой совсем не ту ситуацию, при которой засыпал, рискует разбиться и повалить интерпретатор, либо попасть в тупиковую ситуацию (deadlock). Например, перед сном Поток 1 начал работу со списком, а после пробуждения не нашёл в этом списке элементов, т.к. их удалил или перезаписал Поток 2.

Чтобы такого не было, GIL в предсказуемый момент (по умолчанию раз в 5 миллисекунд для Python 3.2+) командует отработавшему потоку: «СПАААТЬ!»  — тот отключается и не мешает проезжать следующему желающему. Даже если желающего нет, блокировщик всё равно подождёт, прежде чем вернуться к предыдущему активному потоку.

Благодаря шлюзу однопоточные приложения работают быстро, а потоки не конфликтуют. Но, к сожалению, многопоточные программы при таком подходе выполняются медленнее  — слишком много времени уходит на регулировку «дорожного движения». А значит обработка графики, расчет математических моделей и поиск по большим массивам данных c GIL идут неприемлемо долго.

В статье «Understanding Python GIL»технический директор компании Gaglers Inc. и разработчик со стажем Chetan Giridhar приводит такой пример:

from datetime import datetime
import threading
def factorial(number): 
    fact = 1
    for n in range(1, number+1): 
        fact *= n 
    return fact 
number = 100000 
thread = threading.Thread(target=factorial, args=(number,)) 
startTime = datetime.now() 
thread.start() 
thread.join()

endTime = datetime.now() 
print "Время выполнения: ", endTime - startTime

Код вычисляет факториал числа 100 000 и показывает, сколько времени ушло у машины на эту задачу. При тестировании на одном ядре и с одним потоком вычисления заняли 3,4 секунды. Тогда Четан создал и запустил второй поток. Расчет факториала на двух ядрах длился 6,2 секунды. А ведь по логике скорость вычислений не должна была существенно измениться! Повторите этот эксперимент на своей машине и посмотрите, насколько медленнее будет решена задача, если вы добавите thread2. Я получила замедление ровно вдвое.

Глобальный шлюз  — наследие времён, когда программисты боролись за достойную реализацию многозадачности и у них не очень получалось. Но зачем он сегодня, когда есть много- и очень многоядерные процессоры? Как объяснил Гвидо ван Россум, без GIL не будут нормально работать C-расширения для Python. Ещё упадёт производительность однопоточных приложений: Python 3 станет медленнее, чем Python 2, а это никому не нужно.

Что делать?

«Нормальные герои всегда идут в обход»

Шлюз можно временно отключить. Для этого интерпретатор Python нужно отвлечь вызовом функции из внешней библиотеки или обращением к операционной системе. Например, шлюз выключится на время сохранения или открытия файла. Помните наш пример с записью строк в файлы? Как только вызванная функция возвратит управление коду Python или интерфейсу Python C API, GIL снова включается.

Как вариант, для параллельных вычислений можно использовать процессы, которые работают изолированно и неподвластны GIL. Но это большая отдельная тема. Сейчас нам важнее найти решение для многопоточности.

Если вы собираетесь использовать Python для сложных научных расчётов, обойти скоростную проблему GIL помогут библиотеки Numba, NumPy, SciPy и др. Опишу некоторые  из них в двух словах, чтобы вы поняли, стоит ли разведывать это направление дальше.

Numba  для математики

Numba — динамически, «на лету» компилирует Python-код, превращая его в машинный код для исполнения на CPU и GPU. Такая технология компиляции называется JIT — “Just in time”. Она помогает оптимизировать производительность программ за счет ускорения работы циклов и компиляции функций при первом запуске.

Суть в том, что вы ставите аннотации (декораторы) в узких местах кода, где вам нужно ускорить работу функций.

Для математических расчётов библиотеку удобно использовать в связке c NumPy.  Допустим, нужно сложить одномерные массивы — элемент за элементом.

def arr_sum (x , y): 
    result_arr = nupmy.empty_like ( x)
    for i in range (len (x)) : 
    result_arr [i ] = x[i ] + y[i ] 
    return result_arr

Метод nupmy.empty_like() принимает массив и возвращает (но не инициализирует!) другой  — соответствующий исходному по форме и типу. Чтобы ускорить выполнение кода, импортируем класс jit из модуля numba и добавляем в начало кода аннотацию @jit:

from numba import jit
@jit
def arr_sum(x,y):

Это скромное дополнение способно ускорить выполнение операции более чем в 100 раз! Если интересно, посмотрите замеры скорости математических расчётов при использовании разных библиотек для Python.

PyCUDA и Numba для графики

В графических вычислениях Numba тоже кое-что может. Она умеет работать с программной моделью CUDA, чтобы визуализировать научные данные и работу алгоритмов, выдавать информацию о GPU и др. Подробнее о том, как работают графический процессор и CUDA  — здесь. И снова мы встретимся с многопоточностью.

При работе с многомерными массивами в CUDA, чтобы понять, какой поток сейчас работает с элементами массива, нужно отследить, кто и когда вызывает функцию ядра. Например, поток может определять свою позицию в сетке блоков и рассчитать соответствующий элемент массива:

from numba import cuda
@cuda.jit
def call_for_kernel(io_arr):
    # Идентификатор потока в одномерном блоке
    thread_x = cuda.threadIdx.x
    # Идентификатор блока в одномерной сетке
    thread_y = cuda.blockIdx.x
    # Число потоков на блок (т.е. ширина блока)
    block_width = cuda.blockDim.x
    # Находим положение в массиве
    t_position = thread_x + thread_y * block_width
    if  t_position < io_arr.size:  # Убеждаемся, что не вышли за границы массива
        io_arr[ t_position] *= 2 # Считаем

Главный плюс этого кода даже не в скорости исполнения, а в прозрачности и простоте. Снова сошлюсь на Хабр, где есть сравнение скорости GPU-расчетов при использовании Numba, PyCUDA и эталонного С CUDA. Небольшой спойлер: PyCUDA позволяет достичь скорости вычислений, сопоставимой с Cи, а Numba подходит для небольших задач.

Когда многопоточность в Python оправдана

Стоит ли преодолевать связанные c GIL сложности и тратить время на реализацию многопоточности? Вот примеры ситуаций, когда многопоточность несёт с собой больше плюсов, чем минусов.

  • Для длительных и несвязанных друг с другом операций ввода-вывода. Например, нужно обрабатывать ворох разрозненных запросов с большой задержкой на ожидание. В режиме «живой очереди» это долго  — лучше распараллелить задачу.
  • Вычисления занимают более миллисекунды и вы хотите сэкономить время за счёт их параллельного выполнения. Если операции укладываются в 1 мс, многопоточность не оправдает себя из-за высоких накладных расходов.
  • Число потоков не превышает количество ядер. В противном случае параллельной работы всех потоков не получается и мы больше теряем, чем выигрываем.

Когда лучше с одним потоком

  • При взаимозависимых вычислениях. Считать что-то в одном потоке и передавать для дальнейшей обработки второму — плохая идея. Возникает лишняя зависимость, которая приводит к снижению производительности, а в случае ошибки  — к ступору и краху программы.
  • При работе через GIL. Это мы уже выяснили выше.
  • Когда важна хорошая переносимость на разных устройствах. Правильно подобрать число потоков для машины пользователя  — задача не из легких. Если вы пишете под известное вам «железо», всё можно решить тестированием. Если же нет  — понадобится дополнительно создавать гибкую систему подстройки под аппаратную часть, что потребует времени и умения.

Анонс — взаимные блокировки в Python

Самое смешное, что по умолчанию GIL защищает только интерпретатор и не предохраняет наш код от взаимных блокировок (deadlock) и других логических ошибок синхронизации. Поэтому разводить потоки по углам, как и в Java, нужно  принудительно — с помощью блокирующих механизмов. Об этом и о не упомянутых в статье компонентах модуля threading мы поговорим в следующий раз.

gilthreadingpython
Нашли ошибку в тексте? Напишите нам.

Новые комментарии

Спасибо,
что читаете наш блог!
Posts popup