Онлайн-марафон "Найди себя в digital"

Блог

Python в три ручья (часть 2). Блокировки

Простыми словами о потокобезопасности.
08 июня 2018Cbc0f0509128ea8a7487ec71c1240fb531db396bМария Лисянская254804

В прошлой статье мы познакомились с многопоточностью и глобальной блокировкой GIL. В этот раз поговорим о блокировках, которые вы можете устанавливать сами. Они защитят код от проблем при работе с общими ресурсами.

Потоки стремятся к ресурсам, с которыми должны работать. И когда к одному и тому же ресурсу обращается несколько потоков, возникает конфликт. Как его предотвратить?

Потоки нельзя в любой момент напрямую остановить или завершить: на то они и потоки. Но можно на их пути поставить дамбу — блокировку. Она пропустит только один поток, а остальные временно удержит. Так вы исключите конфликт.

Когда поток A выполняет операцию с общими ресурсами, а поток Б не может вмешаться в нее до завершения — говорят, что такая операция атомарна. Залогом потокобезопасности как раз выступает атомарность — непрерывность, неделимость операции.

Простая блокировка в Python

Взаимоисключение (mutual exception, кратко — mutex) — простейшая блокировка, которая на время работы потока с ресурсом закрывает последний от других обращений. Реализуют это с помощью класса Lock.

import threading
mutex = threading.Lock()

Мы создали блокировку с именем mutex, но могли бы назвать её lock или иначе. Теперь её можно ставить и снимать методами .acquire() и .release():

resource = 0

def thread_safe_function():
global resource
    for i in range(1000000):
        mutex.acquire()
        # Делаем что-то с переменной resource
        mutex.release()

Обратите внимание: обойти простую блокировку не может даже поток, который её активировал. Он будет заблокирован, если попытается повторно захватить ресурс, который удерживает.

С блокировками и без. Пример–сравнение

Что происходит, когда два потока бьются за ресурсы, и как при этом сохранить целостность данных? Разберёмся на практике.

Возьмём простейшие операции инкремента и декремента (увеличения и уменьшения числа). В роли общих ресурсов выступят глобальные числовые переменные: назовём их protected_resource и unprotected_resource. К каждой обратятся по два потока: один будет в цикле увеличивать значение с 0 до 50 000, другой — уменьшать до 0. Первую переменную обработаем с блокировками, а вторую — без.

import threading

protected_resource = 0
unprotected_resource = 0

NUM = 50000
mutex = threading.Lock()

# Потокобезопасный инкремент
def safe_plus():
    global protected_resource
    for i in range(NUM):
        # Ставим блокировку
        mutex.acquire()
        protected_resource += 1
        mutex.release()

# Потокобезопасный декремент
def safe_minus():
    global protected_resource
    for i in range(NUM):
        mutex.acquire()
        protected_resource -= 1
        mutex.release()

# То же, но без блокировки
def risky_plus():
    global unprotected_resource
    for i in range(NUM):
        unprotected_resource += 1

def risky_minus():
    global unprotected_resource
    for i in range(NUM):
        unprotected_resource -= 1

В названия потокобезопасных функций мы поставили префикс safe_, а небезопасных — risky_.

Создадим 4 потока, которые будут выполнять функции с блокировками и без:

thread1 = threading.Thread(target = safe_plus)
thread2 = threading.Thread(target = safe_minus)
thread3 = threading.Thread(target = risky_plus)
thread4 = threading.Thread(target = risky_minus)
thread1.start()
thread2.start()
thread3.start()
thread4.start()
thread1.join()
thread2.join()
thread3.join()
thread4.join()
print ("Результат при работе с блокировкой %s" % protected_resource)
print ("Результат без блокировки %s" % unprotected_resource)

Запускаем код несколько раз подряд и видим, что полученное без блокировки значение меняется случайным образом. При использовании блокировки всё работает последовательно: сначала значение растёт, затем — уменьшается, и в итоге получаем 0. А потоки thread3 и thread4 работают без блокировки и наперебой обращаются к глобальной переменной. Каждый выполняет столько операций своего цикла, сколько успевает за время активности. Поэтому при каждом запуске получаем случайные числа.

Как избежать взаимных блокировок?

Следите, чтобы у нескольких блокировок не было шанса сработать одновременно. Иначе одна заглушка перекроет один поток, другая — другой, и может случиться взаимная блокировка — тупик (deadlock). Это ситуация, когда ни один поток не имеет права действовать и программа зависает или рушится.

Если есть «захват» мьютекса, ничто не должно помешать последующему «высвобождению». Это значит, что release() должен срабатывать, как только блокировка становится не нужна.

Пишите код так, чтобы блокировки снимались, даже если функция выбрасывает исключение и завершает работу нештатно. Подстраховаться можно с помощью конструкции try-except-finally:

try:
    mutex.acquire()
    # Ваш код...

except SomethingGoesWrong:
    # Обрабатываем исключения

finally:
    # Ещё код
    mutex.release()

Другие инструменты синхронизации в Python

До сих пор мы работали только с простой блокировкой Lock, но распределять доступ к общим ресурсам можно разными средствами.

Семафоры (Semaphore)

Семафор — это связка из блокировки и счётчика потоков. Если заданное число потоков уже работает с ресурсам, лишние будут блокироваться. Это удобно, чтобы ограничить число подключений к сети или одновременно авторизованных пользователей программы.

Значение счётчика уменьшается с каждым новым вызовом acquire(), то есть с подключением к ресурсу новых потоков. Когда ресурс высвобождается, значение возрастает. При нулевом значении счётчика работа потока останавливается, пока другой поток не вызовет метод release(). По умолчанию значение счётчика равно 1.

s = Semaphore(5)
# В скобках при необходимости указывают стартовое значение счётчика

Можно создать «ограниченный семафор» конструктором BoundedSemaphore().

События (Event)

Событие — сигнал от одного потока другим. Если событие возникло — ставят флаг методом .set(), а после обработки события — снимают с помощью .clear(). Пока флага нет, ресурс заблокирован. Ждать события могут один или несколько потоков. Важную роль играет wait(): если флаг установлен, этот метод спокойно отдаёт управление ресурсом; если нет — блокирует его на заданное время или до установки флага одним из потоков.

e = threading.Event()

def event_manager():
    # Ждём, когда кто-нибудь захватит флаг
    e.wait()
    ...
    # Ставим флаг
    e.set()

    # Работаем с ресурсом
        ...
   # Снимаем флаг и ждём нового
   e.clear()

Если нужно задать время ожидания, его пишут в секундах, в виде числа с плавающей запятой. Например: e.wait(3,0).

Метод is_set() проверяет, активно ли событие. Важно следить, чтобы события попадали в поле зрения потоков-потребителей сразу после появления. Иначе работа зависящих от события потоков нарушится.

Рекурсивная блокировка (RLock)

Такая блокировка позволяет одному потоку захватывать ресурс несколько раз, но блокирует все остальные потоки. Это полезно, когда вы используете вложенные функции, каждая из которых тоже применяет блокировку. Число вложенных .acquire() и .release() не даст интерпретатору запутаться, сколько раз поток имеет право захватывать ресурс, а когда блокировку надо снять полностью. Механизм основан на классе RLock:

import threading, random

counter = 0
re_mutex = threading.RLock()

def step_one():
    global counter
    re_mutex.acquire()
    counter = random.randint(1,100)
    print("Random number %s" % counter)
    re_mutex.release()

def step_two():
    global counter
    re_mutex.acquire()
    counter *= 2
    print("Doubled = %s" % counter)
    re_mutex.release()
   
def walkthrough():
    re_mutex.acquire()
    try:
        step_one()
        step_two()
    finally:
        re_mutex.release()

t = threading.Thread(target = walkthrough)
t2 = threading.Thread(target = walkthrough)

t.start()
t2.start()
t.join()
t2.join()

Запустите это и проверьте результат: арифметика должна быть верна.

Теперь попробуйте убрать блокировку внутри walkthrough:

def walkthrough():
        step_one()
        step_two()

Ещё раз запустите код — порядок действий нарушится. Программа умножит на 2 только второе случайное число, а затем удвоит полученное произведение.

Переменные состояния (Condition)

Переменная состояния — усложнённый вариант события (Event). Через Condition на ресурс ставят блокировку нужного типа, и она работает, пока не произойдёт ожидаемое потоками изменение. Как только это случается, один или несколько потоков разблокируются. Оповестить потоки о событии можно методами:

  • notify() — для одного потока;
  • notifyAll() — для всех ожидающих потоков.

Это выглядит так:

# Создаём рекурсивную блокировку
mutex = threading.RLock()

# Создаём переменную состояния и связываем с блокировкой
cond = threading.Condition(mutex)

# Поток-потребитель ждёт свободного ресурса и захватывает его
def consumer():
    while True:
            cond.acquire()
            while not resourse_free():
            cond.wait()
            get_free_resource()
            cond.release()

# Поток-производитель разблокирует ресурс и уведомляет об этом потребителя
def producer():
    while True:
            cond.acquire()
            unblock_resource()
            # Сигналим потоку: "Налетай на новые данные!"
            cond.notify()
            cond.release()

Чтобы выставить больше одного условия разблокировки, можно увязать доступ к ресурсу с несколькими переменными состояния.

cond = threading.Condition(mutex)
another_cond = threading.Condition(mutex)

Компактные блокировки с with

При множестве участков с блокировками каждый раз прописывать «захват» и «высвобождение» утомительно. Сократить код поможет конструкция с оператором with. Она использует менеджер контекста, который позволяет сначала подготовить приложение к выполнению фрагмента кода, а затем гарантированно освободить задействованные ресурсы.

Чтобы понять дальнейший материал, кратко разберем работу with, хотя это и не про блокировки. У класса, который мы собираемся использовать с with, должно быть два метода:

  • «Предисловие» — метод __enter__(). Здесь можно ставить блокировку и  прописывать другие настройки;

  • «Послесловие» — метод __exit__(). Он срабатывает, когда все инструкции выполнены или работа блока прервана. Здесь можно снять блокировку и/или предусмотреть реакцию на исключения, которые могут быть выброшены.

Удача! У нашего целевого класса Lock эти два метода уже прописаны. Поэтому любой экземпляр объекта Lock можно использовать с with без дополнительных настроек.

Отредактируем функцию из примера с инкрементом. Поставим блокировку, которая сама снимется, как только управляющий поток выйдет за пределы with-блока:

def safe_plus():
    global protected_resource
    for i in range(NUM):
        with mutex:
 protected_resource += 1

# И никаких acquire-release!
deadlockmutexmultithreading
Нашли ошибку в тексте? Напишите нам.

Новые комментарии

Спасибо,
что читаете наш блог!
Posts popup