Как избежать тупиковых блокировок в Java

Ещё раз о многопоточности.
12 апреля 2018Cbc0f0509128ea8a7487ec71c1240fb531db396bМария Лисянская1329212

В прошлой статье мы обсуждали многопоточность. В этот раз поговорим о главной проблеме многопоточных приложений — тупиковых взаимных блокировках, известных как deadlocks. Такие блокировки возникают, когда минимум два потока одновременно пытаются работать с общими ресурсами и ограничить доступ к этим ресурсам друг для друга. При этом часто создаётся ситуация, когда ни один поток не может ни получить нужный ресурс, ни освободить занимаемый. Для блокировки «конкурентов» поток может использовать mutex, критическую секцию или семафор.

Как происходит блокировка

Два потока работают с общими ресурсами.

Поток 1 захватывает Ресурс 1 и начинает операции с ним.

Поток 2 последовательно захватывает Ресурс 2 и Ресурс 1.

Поток 2 не получает доступа к Ресурсу 1 и в ступоре ждёт, когда тот освободится.

Поток 1 не завершил работу с Ресурсом 1, но пытается захватить Ресурс 2 и тоже впадает в ступор.

Как это выглядит в коде:

public class DeadlockTest {  
 public static void main(String[] args) {  
   final String res1 = "my sample text";  
   final String res2 = "some other text";  

   // Пусть поток P1 навесит замок на ресурс res1, а затем на res2
   Thread P1 = new Thread() {  
     public void run() {  
         synchronized (res1) {  
          System.out.println("Поток 1 навесил замок на Ресурс 1");  
          try { Thread.sleep(100);} catch (Exception e) {}  

          synchronized (res2) {  
           System.out.println("Поток 1 навесил замок на Ресурс 2");  
          }
        }
     }
   }

   // Поток P2 последовательно пытается запереть доступ к res2 и res1
   Thread P2 = new Thread() {  
     public void run() {  
       synchronized (res2) {  
         System.out.println("Поток 2 навесил замок на Ресурс 2");  
         try { Thread.sleep(100);} catch (Exception e) {}  
         synchronized (res1) {  
           System.out.println("Поток 2 навесил замок на Ресурс 1");  
         }
       }
     }
   }

   P1.start();  
   P2.start();  

 }
}  

Видимо-невидимо

Вторая проблема  — видимость данных. Если два потока работают с одной переменной, каждый из них хранит её копию в кэше процессора, на котором запущен. Изменения в одной копии не отражаются мгновенно в основной памяти и других копиях. Это ведёт к путанице: одни потоки работают с актуальным значением, другие  — с устаревшим.

Есть несколько способов уберечь Java-приложение от «падений» и «зависаний», связанных с противоречиями в работе потоков. Это механизмы synchronized и volatile, алгоритмы, реализованные в классах Java Concurrent, но главное  — забота о структуре вашего приложения.

Ключевое слово volatile в Java

Модификатор volatile используют, когда нужно:

  • обеспечить видимость данных  — убедиться, что при обращении к переменной любой поток получит её последнее записанное значение;
  • исключить кэширование значений переменной и хранить их только в основной памяти.

Как только один поток записал что-то в volatile-переменную, значение идёт прямо в общую память и тут же доступно остальным потокам:

class CarSharingBase
{
  static volatile int your_car_ID = 3222233;
}

Но учтите, что модификатор volatile никак не ограничивает одновременный доступ к данным. А значит, в работу одного потока с полем может вмешаться другой поток. Вот что будет, если два потока одновременно получат доступ к операции увеличения на единицу (i++):

int i = 0;

Поток 1: читает переменную (0)

Поток 1: прибавляет единицу

Поток 2: читает переменную (0)

Поток 1: записывает значение (1)

Поток 2: прибавляет единицу

Поток 2: записывает значение (1)

Если бы два потока не мешали друг другу, а работали последовательно, мы получили бы на выходе значение «2», но вместо этого  видим единицу. Чтобы такого не происходило, нужно обеспечить атомарность операции. Атомарными называют операции, которые могут быть выполнены только полностью. Если они не выполняются полностью, они не выполняются вообще, но прервать их невозможно.

В примере с увеличением на единицу мы видим сразу три действия:  чтение, сложение, запись. Чтение и запись — операции атомарные, но между ними могут вклиниться действия другого потока. Поэтому составная операция инкремента (i++) полностью атомарной не является.

Обратите внимание: с volatile-переменной возможны как атомарные, так и неатомарные операции. Ключевое слово volatile позволяет сделать так, чтобы все потоки читали одно и то же из основной памяти, но не более того.

Простейший способ гарантировать атомарность  — выстроить потоки в очередь за ресурсами с помощью механизма synchronized. Представьте, что на электронный счёт одновременно переводят деньги два клиента. Уж лучше попросить одного из них немного подождать, чем допустить ошибки в денежных расчетах.

Ключевое слово synchronized

Модификатор synchronized исключает доступ второго и последующих потоков к данным, с которыми уже работает один поток. Это ключевое слово используют только для методов и произвольных блоков кода.

Используйте synchronized, чтобы:

  • обеспечить доступ только одного потока к методу или блоку единовременно;
  • обеспечить каждому работающему с ресурсами потоку видимость изменений, внесённых предыдущим потоком;
  • гарантировать, что операции внутри блока или метода будут выполнены полностью, либо не выполнены вовсе.

Метод может быть статическим или нет  — без разницы. Но синхронизация влияет на видимость данных в памяти. В прошлой статье мы говорили о взаимном исключении (mutex’e). С его помощью synchronized ограничивает доступ к данным. Образно говоря, это замок, с помощью которого поток запирается наедине с объектом, чтобы никто не мешал работать. Обратите внимание: замок запирают до начала работы. То есть проверка, не заняты ли ресурсы кем-то другим, происходит на входе в synchronized-блок или метод.

public class SynchronizeThis {
    private int syn_result;
    public synchronized int methodGet() {
        return syn_result;
    }
}

Разблокировка же ресурсов происходит на выходе. Поэтому атомарность операций гарантирована.

Данные, которые изменились внутри метода или блока sychronized, находятся в кэше поверх основной памяти и видны следующему потоку, к которому перешёл мьютекс.

Подсказки по блокирующей синхронизации

Главное при работе с synchronized  — правильно выбрать объект, по которому будет происходить проверка доступности ресурсов. Если вам нужна синхронизация на входе в метод, учитывайте, принадлежит этот метод классу или объекту. Вход в статичный метод блокируют по объекту класса, а в абстрактный метод  — по this. Если по ошибке заблокировать метод класса по this, доступ к ресурсам останется открыт для всех.

Никогда не используйте synchronized в конструкторе  — получите ошибку компиляции. А ещё остерегайтесь «матрёшек», когда синхронизированные методы одного класса вызывают внутри себя синхронизированные методы других классов.

Помните, что синхронизация требует ресурсов. При обработке большого массива данных вызов мьютексов становится особенно затратным. Чтобы гарантировать атомарность без синхронизации, используют классы Concurrent.

Атомарность с помощью Java Concurrent

Вернёмся к составной операции «чтение-изменение-запись». Когда нам нужно развести потоки по углам, но без мьютекса, можно использовать инструкцию «сравнение с обменом»  — compare and swap (CAS).

Для этого сначала заводят переменную, по значению которой можно понять, заняты ли ресурсы и, если да,  — кем. Например, пока ресурсы свободны, переменная хранит «-1», а если заняты — номер процессора, который с ними работает (0,1 и т.д.).

Поток приходит за свободными ресурсами, видит «-1»,  перезаписывает значение на номер процессора, на котором сам работает, выполняет действия. После завершения всех операций переменной возвращается значение «-1». Если  же поток на входе видит номер какого-то процессора, он получает отказ и не может выполнить намеченную операцию. Это простейший случай сравнения с обменом. Важно понимать, что поток, который получил отказ, не блокируется. Он может сообщить программе, что у него проблемы, и перейти к запасному плану действий.

Можно сказать, что это более интеллигентная форма взаимодействия между потоками. Они уже не бодаются за ресурс и не закрывают дверь перед носом оппонента, а обмениваются сообщениями в духе: «Хотелось бы поработать вот с этим»  — «Извините, оно пока занято. Не желаете ли кофе?».

На этом принципе построен целый ряд алгоритмов синхронизации, которые называют неблокирующими (non-blocking). Создание таких алгоритмов  — задача не для новичка. Но, к статью, в Java «из коробки» есть несколько готовых неблокирующих решений. Они собраны в пакете java.util.concurrent.

ConcurrentLinkedQueue

На русский название класса переводится как «параллельная очередь». Работает такая очередь по принципу First In First Out («Первым зашёл  — первым выйдешь»). Алгоритм основан на CAS, быстр и оптимизирован под работу со сборщиком мусора.

Если вы хотите создать очередь из объектов, а затем добавлять и удалять их в нужный момент, не нужно писать и синхронизировать методы вручную. Достаточно создать классы для потоков, производящих и потребляющих данные, а затем поставить эти потоки в очередь ConcurrentLinkedQueue.

В прошлой статье мы говорили, что в Java поток можно создать как экземпляр класса Thread или как отдельный класс с интерфейсом Runnable. Сейчас мы используем второй подход. Единственный метод интерфейса Runnable —  run(). Чтобы задать нужное поведение для потребителя и производителя, мы будем переопределять этот метод в каждом случае по-своему.

Поток-производитель:

public class ProducerThread implements Runnable {
    @Override
    public void run() {
        System.out.println("Генерируем сообщения в очередь");
        try {
            for (int i = 1; i <= 10; i++) {
                QueueTest.enqueueTask("Задача номер " + i);
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

Поток-потребитель:

public class ConsumerThread implements Runnable {
    @Override
    public void run() {
        String task;
        System.out.println("Ждём задачи \n");
   //Пока есть задачи в очереди:
        while (QueueTest.isTaskHasBeenSet() || QueueTest.getQueue().size() > 0) {
            if ((task = QueueTest.getQueue().poll()) != null)
                System.out.println("Выполняю задачу : " + task);
            try {
                Thread.sleep(500);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }
}

Обратите внимание, если очередь пуста, метод poll() вернёт значение null. Поэтому нам нужно было убедиться, что он возвращает что-то другое.

Чтобы узнавать, сколько всего элементов в очереди, у класса ConcurrentLinkedQueue есть метод size(). Он работает медленно, поэтому злоупотреблять им не стоит. При необходимости можно вывести весь список элементов очереди методом toArray(), но сейчас нам это не нужно.

Очередь, в которой будут работать потоки:

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class QueueTest {
    private static Queue<String> queue = null;
    private static boolean taskHasBeenSet = false;

    public static void main(String[] args) {
        queue = new ConcurrentLinkedQueue<String>();
// Создаём и запускаем потребителя и производителя
        Thread producer = new Thread(new ProducerThread());
        Thread consumer = new Thread(new ConsumerThread());

        producer.start();
        consumer.start();

        while (consumer.isAlive()) {
            try {
    // Оставим время на ожидание постановки задач
                Thread.sleep(1000); 
            } catch (InterruptedException e) {
                 // Выводим трейс вместе с текстом исключения
                e.printStackTrace();
            }
        }
   // Всё выполнено нормально, выходим.
        System.exit(0);
    }

    public static Queue<String> getQueue() {
        return queue;
    }

    // Добавляем задачи в очередь
    public static void enqueueTask(String task) {
        try {
            queue.add(task);
            System.out.println("Добавлена задача : " + task);
            Thread.sleep(200);
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }

    public static boolean isTaskHasBeenSet() {
        return taskHasBeenSet;
    }
    
    public static void setTaskHasBeenSet(boolean taskHasBeenSet) {
        QueueTest.taskHasBeenSet = taskHasBeenSet;
    }
}

Запустите и посмотрите, как добавляются и выполняются задачи.

Атомарные классы

Пакет java.util.concurrent.atomic включает в себя классы для работы с:

  • примитивами  — AtomicBoolean, AtomicInteger, AtomicLong;
  • ссылочными типами   — AtomicReference;
  • массивами  — AtomicBooleanArray, AtomicIntegerArray, AtomicReferenceArray и др.;
  • аккумуляторами  — DoubleAccumulator, LongAccumulator;
  • обновлениями («апдейтерами»)  — AtomicIntegerFieldUpdater, AtomicLongFieldUpdater, AtomicReferenceFieldUpdater.

Давайте посмотрим, как два потока могут работать с переменной AtomicInteger без синхронизации. Для этого создадим класс myThread, в котором будет атомарный счетчик:

import java.util.concurrent.atomic.AtomicInteger;

class myThread extends Thread{
    public volatile AtomicInteger counter;
/* Для наглядности тестирования мы сделали поле волатильным.
   Данные не попадут в кэш и работу двух потоков будет легче отследить */

    myThread(AtomicInteger counter){
        this.counter = counter;
    }

    @Override
    public void run() {
        for(int i = 0; i < 1000; i++){
            counter.updateAndGet(n -> n + 2);
        }
// После работы счётчика в каждом потоке будем выводить значение:
        System.out.println(counter);
    }
}

Для операций над числами мы использовали метод updateAndGet() класса AtomicInteger. Этот метод увеличивает число на основе аргумента  — лямбда-выражения

Теперь посмотрим на всё это в действии. Создадим и запустим два потока myThread:

public class MultiCounter {
    public static void main(String[] args) {
        AtomicInteger myAtomicCounter = new AtomicInteger(0);

        // Создаём потоки
        myThread t1 = new myThread(myAtomicCounter);
        myThread t2 = new myThread(myAtomicCounter);

        t1.start();
        t2.start();
    }
}

В результате работы кода получим два значения: первое из них будет случайным числом в диапазоне от 2000 до 4000, а второе  — всегда 4000. Например, при первом запуске я получила результат:

2670

4000

Запустите код ещё раз и убедитесь, что меняется только первое значение. Второе число предсказуемо именно благодаря работе атомарного счётчика. Если бы мы использовали для счётчика обычный (неатомарный) Integer, второе число было бы случайным.

Блокирующие и неблокирующие алгоритмы в Java

Блокирующие алгоритмы парализуют работу потока  — навсегда или до момента, пока другой поток не выполнит нужное условие. Представьте, что поток A заблокирован и ждёт, когда поток B завершит операцию. Но тот не завершает, потому что заблокирован кем-то ещё. Случайно создать такую западню в приложении очень легко, а вот просчитать, когда она сработает  — трудно. Если без синхронизации можно обойтись, лучше обойдитесь — ради экономии нервов и ресурсов.

Неблокирующие алгоритмы тоже могут вести к проблемам, например, к live-lock. Это ситуация, когда несколько потоков буксуют:  продолжают работать, но без реального результата. Причиной может быть загруженность потоков сообщениями друг от друга. Чем больше сообщение, тем больше оно грузит память. Другая возможная причина  — неудачная реализация алгоритма, при которой в очереди возникает конфликт.  Поэтому начинающим лучше не мастерить велосипед, а сначала разобраться с чужими наработками.

Важно понимать, что панацеи не существует. Использовать блокирующие или неблокирующие алгоритмы, либо их комбинацию — придётся решать в каждом конкретном случае. Всё будет зависеть от структуры вашего приложения и от того, какие ресурсы вы делаете общими.

concurrentjava_developermutexdeadlocksmultithreadingjava
Нашли ошибку в тексте? Напишите нам.
Спасибо,
что читаете наш блог!
Posts popup