Получите бесплатно 4 курса для лёгкого старта работы в IT
Получить курсы бесплатно
ГлавнаяБлогPython в три ручья (часть 2). Блокировки
12 телеграм-каналов
29 253
Время чтения: 14 минут

Python в три ручья (часть 2). Блокировки

29 253
Время чтения: 14 минут
Сохранить статью:
Сохранить статью:

В прошлой статье мы познакомились с многопоточностью и глобальной блокировкой GIL. В этот раз поговорим о блокировках, которые вы можете устанавливать сами. Они защитят код от проблем при работе с общими ресурсами.

В статье рассказывается:

  1. Простая блокировка в Python
  2. С блокировками и без. Пример–сравнение
  3. Как избежать взаимных блокировок?
  4. Другие инструменты синхронизации в Python
  5. Компактные блокировки с with
  6. Пройди тест и узнай, какая сфера тебе подходит:
    айти, дизайн или маркетинг.
    Бесплатно от Geekbrains

Потоки стремятся к ресурсам, с которыми должны работать. И когда к одному и тому же ресурсу обращается несколько потоков, возникает конфликт. Как его предотвратить?

Потоки нельзя в любой момент напрямую остановить или завершить: на то они и потоки. Но можно на их пути поставить дамбу — блокировку. Она пропустит только один поток, а остальные временно удержит. Так вы исключите конфликт.

Узнай, какие ИТ - профессии
входят в ТОП-30 с доходом
от 210 000 ₽/мес
Павел Симонов - исполнительный директор Geekbrains
Павел Симонов
Исполнительный директор Geekbrains
Команда GeekBrains совместно с международными специалистами по развитию карьеры подготовили материалы, которые помогут вам начать путь к профессии мечты.
Подборка содержит только самые востребованные и высокооплачиваемые специальности и направления в IT-сфере. 86% наших учеников с помощью данных материалов определились с карьерной целью на ближайшее будущее!

Скачивайте и используйте уже сегодня:

Павел Симонов - исполнительный директор Geekbrains
Павел Симонов
Исполнительный директор Geekbrains
pdf иконка

Топ-30 самых востребованных и высокооплачиваемых профессий 2023

Поможет разобраться в актуальной ситуации на рынке труда

doc иконка

Подборка 50+ бесплатных нейросетей для упрощения работы и увеличения заработка

Только проверенные нейросети с доступом из России и свободным использованием

pdf иконка

ТОП-100 площадок для поиска работы от GeekBrains

Список проверенных ресурсов реальных вакансий с доходом от 210 000 ₽

pdf 3,7mb
doc 1,7mb
Уже скачали 27880 pdf иконка

Когда поток A выполняет операцию с общими ресурсами, а поток Б не может вмешаться в нее до завершения — говорят, что такая операция атомарна. Залогом потокобезопасности как раз выступает атомарность — непрерывность, неделимость операции.

Простая блокировка в Python

Взаимоисключение (mutual exception, кратко — mutex) — простейшая блокировка, которая на время работы потока с ресурсом закрывает последний от других обращений. Реализуют это с помощью класса Lock.

import threading
mutex = threading.Lock()

Мы создали блокировку с именем mutex, но могли бы назвать её lock или иначе. Теперь её можно ставить и снимать методами .acquire() и .release():

resource = 0
def thread_safe_function():
global resource
    for i in range(1000000):
        mutex.acquire()
        # Делаем что-то с переменной resource
        mutex.release()

Обратите внимание: обойти простую блокировку не может даже поток, который её активировал. Он будет заблокирован, если попытается повторно захватить ресурс, который удерживает.

С блокировками и без. Пример–сравнение

Что происходит, когда два потока бьются за ресурсы, и как при этом сохранить целостность данных? Разберёмся на практике.

Возьмём простейшие операции инкремента и декремента (увеличения и уменьшения числа). В роли общих ресурсов выступят глобальные числовые переменные: назовём их protected_resource и unprotected_resource. К каждой обратятся по два потока: один будет в цикле увеличивать значение с 0 до 50 000, другой — уменьшать до 0. Первую переменную обработаем с блокировками, а вторую — без.

import threading
protected_resource = 0
unprotected_resource = 0
NUM = 50000
mutex = threading.Lock()
# Потокобезопасный инкремент
def safe_plus():
    global protected_resource
    for i in range(NUM):
        # Ставим блокировку
        mutex.acquire()
        protected_resource += 1
        mutex.release()
# Потокобезопасный декремент
def safe_minus():
    global protected_resource
    for i in range(NUM):
        mutex.acquire()
        protected_resource -= 1
        mutex.release()
# То же, но без блокировки
def risky_plus():
    global unprotected_resource
    for i in range(NUM):
        unprotected_resource += 1
def risky_minus():
    global unprotected_resource
    for i in range(NUM):
        unprotected_resource -= 1

В названия потокобезопасных функций мы поставили префикс safe_, а небезопасных — risky_.

Создадим 4 потока, которые будут выполнять функции с блокировками и без:

thread1 = threading.Thread(target = safe_plus)
thread2 = threading.Thread(target = safe_minus)
thread3 = threading.Thread(target = risky_plus)
thread4 = threading.Thread(target = risky_minus)
thread1.start()
thread2.start()
thread3.start()
thread4.start()
thread1.join()
thread2.join()
thread3.join()
thread4.join()
print ("Результат при работе с блокировкой %s" % protected_resource)
print ("Результат без блокировки %s" % unprotected_resource)

Запускаем код несколько раз подряд и видим, что полученное без блокировки значение меняется случайным образом. При использовании блокировки всё работает последовательно: сначала значение растёт, затем — уменьшается, и в итоге получаем 0. А потоки thread3 и thread4 работают без блокировки и наперебой обращаются к глобальной переменной. Каждый выполняет столько операций своего цикла, сколько успевает за время активности. Поэтому при каждом запуске получаем случайные числа.

Как избежать взаимных блокировок?

Следите, чтобы у нескольких блокировок не было шанса сработать одновременно. Иначе одна заглушка перекроет один поток, другая — другой, и может случиться взаимная блокировка — тупик (deadlock). Это ситуация, когда ни один поток не имеет права действовать и программа зависает или рушится.

Дарим скидку от 60%
на курсы от GeekBrains до 05 мая
Уже через 9 месяцев сможете устроиться на работу с доходом от 150 000 рублей
Забронировать скидку

Если есть «захват» мьютекса, ничто не должно помешать последующему «высвобождению». Это значит, что release() должен срабатывать, как только блокировка становится не нужна.

Пишите код так, чтобы блокировки снимались, даже если функция выбрасывает исключение и завершает работу нештатно. Подстраховаться можно с помощью конструкции try-except-finally:

try:
    mutex.acquire()
    # Ваш код...
except SomethingGoesWrong:
    # Обрабатываем исключения
finally:
    # Ещё код
    mutex.release()

Другие инструменты синхронизации в Python

До сих пор мы работали только с простой блокировкой Lock, но распределять доступ к общим ресурсам можно разными средствами.

Семафоры (Semaphore)

Семафор — это связка из блокировки и счётчика потоков. Если заданное число потоков уже работает с ресурсам, лишние будут блокироваться. Это удобно, чтобы ограничить число подключений к сети или одновременно авторизованных пользователей программы.

Значение счётчика уменьшается с каждым новым вызовом acquire(), то есть с подключением к ресурсу новых потоков. Когда ресурс высвобождается, значение возрастает. При нулевом значении счётчика работа потока останавливается, пока другой поток не вызовет метод release(). По умолчанию значение счётчика равно 1.

s = Semaphore(5)
# В скобках при необходимости указывают стартовое значение счётчика

Можно создать «ограниченный семафор» конструктором BoundedSemaphore().

События (Event)

Событие — сигнал от одного потока другим. Если событие возникло — ставят флаг методом .set(), а после обработки события — снимают с помощью .clear(). Пока флага нет, ресурс заблокирован. Ждать события могут один или несколько потоков. Важную роль играет wait(): если флаг установлен, этот метод спокойно отдаёт управление ресурсом; если нет — блокирует его на заданное время или до установки флага одним из потоков.

e = threading.Event()
def event_manager():
    # Ждём, когда кто-нибудь захватит флаг
    e.wait()
    ...
    # Ставим флаг
    e.set()
    # Работаем с ресурсом
        ...
   # Снимаем флаг и ждём нового
   e.clear()

Если нужно задать время ожидания, его пишут в секундах, в виде числа с плавающей запятой. Например: e.wait(3,0).

Только до 29.04
Скачай подборку материалов, чтобы гарантированно найти работу в IT за 14 дней
Список документов:
ТОП-100 площадок для поиска работы от GeekBrains
20 профессий 2023 года, с доходом от 150 000 рублей
Чек-лист «Как успешно пройти собеседование»
Чтобы получить файл, укажите e-mail:
Введите e-mail, чтобы получить доступ к документам
Подтвердите, что вы не робот,
указав номер телефона:
Введите телефон, чтобы получить доступ к документам
Уже скачали 52300

Метод is_set() проверяет, активно ли событие. Важно следить, чтобы события попадали в поле зрения потоков-потребителей сразу после появления. Иначе работа зависящих от события потоков нарушится.

Рекурсивная блокировка (RLock)

Такая блокировка позволяет одному потоку захватывать ресурс несколько раз, но блокирует все остальные потоки. Это полезно, когда вы используете вложенные функции, каждая из которых тоже применяет блокировку. Число вложенных .acquire() и .release() не даст интерпретатору запутаться, сколько раз поток имеет право захватывать ресурс, а когда блокировку надо снять полностью. Механизм основан на классе RLock:

import threading, random
counter = 0
re_mutex = threading.RLock()
def step_one():
    global counter
    re_mutex.acquire()
    counter = random.randint(1,100)
    print("Random number %s" % counter)
    re_mutex.release()
def step_two():
    global counter
    re_mutex.acquire()
    counter *= 2
    print("Doubled = %s" % counter)
    re_mutex.release()
 def walkthrough():
    re_mutex.acquire()
    try:
        step_one()
        step_two()
    finally:
        re_mutex.release()
t = threading.Thread(target = walkthrough)
t2 = threading.Thread(target = walkthrough)
t.start()
t2.start()
t.join()
t2.join()

Запустите это и проверьте результат: арифметика должна быть верна.

Теперь попробуйте убрать блокировку внутри walkthrough:

def walkthrough():
        step_one()
        step_two()

Ещё раз запустите код — порядок действий нарушится. Программа умножит на 2 только второе случайное число, а затем удвоит полученное произведение.

Переменные состояния (Condition)

Переменная состояния — усложнённый вариант события (Event). Через Condition на ресурс ставят блокировку нужного типа, и она работает, пока не произойдёт ожидаемое потоками изменение. Как только это случается, один или несколько потоков разблокируются. Оповестить потоки о событии можно методами:

  • notify() — для одного потока;
  • notifyAll() — для всех ожидающих потоков.

Это выглядит так:

# Создаём рекурсивную блокировку
mutex = threading.RLock()
# Создаём переменную состояния и связываем с блокировкой
cond = threading.Condition(mutex)
# Поток-потребитель ждёт свободного ресурса и захватывает его
def consumer():
    while True:
            cond.acquire()
            while not resourse_free():
            cond.wait()
            get_free_resource()
            cond.release()
# Поток-производитель разблокирует ресурс и уведомляет об этом потребителя
def producer():
    while True:
            cond.acquire()
            unblock_resource()
            # Сигналим потоку: "Налетай на новые данные!"
            cond.notify()
            cond.release()

Чтобы выставить больше одного условия разблокировки, можно увязать доступ к ресурсу с несколькими переменными состояния.

cond = threading.Condition(mutex)
another_cond = threading.Condition(mutex)

Компактные блокировки с with

При множестве участков с блокировками каждый раз прописывать «захват» и «высвобождение» утомительно. Сократить код поможет конструкция с оператором with. Она использует менеджер контекста, который позволяет сначала подготовить приложение к выполнению фрагмента кода, а затем гарантированно освободить задействованные ресурсы.

Чтобы понять дальнейший материал, кратко разберем работу with, хотя это и не про блокировки. У класса, который мы собираемся использовать с with, должно быть два метода:

  • «Предисловие» — метод __enter__(). Здесь можно ставить блокировку и прописывать другие настройки;
  • «Послесловие» — метод __exit__(). Он срабатывает, когда все инструкции выполнены или работа блока прервана. Здесь можно снять блокировку и/или предусмотреть реакцию на исключения, которые могут быть выброшены.

Удача! У нашего целевого класса Lock эти два метода уже прописаны. Поэтому любой экземпляр объекта Lock можно использовать с with без дополнительных настроек.

Отредактируем функцию из примера с инкрементом. Поставим блокировку, которая сама снимется, как только управляющий поток выйдет за пределы with-блока:

def safe_plus():
    global protected_resource
    for i in range(NUM):
        with mutex:
 protected_resource += 1
# И никаких acquire-release!

Оцените статью
Рейтинг:
( голосов )
Поделиться статьей
Добавить комментарий

Сортировать:
По дате публикации
По рейтингу
До конца акции осталось
0 дней 00:00:00
Дарим скидку 64% на обучение «Разработчик»
  • Получите новую профессию с гарантией трудоустройства
  • Начните учиться бесплатно, 3 месяца обучения в подарок
Забронировать скидку на обучение
Забрать подарок

Получите подробную стратегию для новичков на 2023 год, как с нуля выйти на доход 200 000 ₽ за 7 месяцев

Подарки от Geekbrains из закрытой базы:
Осталось 17 мест

Поздравляем!
Вы выиграли 4 курса по IT-профессиям.
Дождитесь звонка нашего менеджера для уточнения деталей

Иван Степанин
Иван Степанин печатает ...