Реализовать потокобезопасный неблокирующий стек на Java

faangmaster - Aug 21 '23 - - Dev Community

Задача.

Необходимо реализовать потокобезопасный (Thread Safe) неблокирующий стек на Java. Т.е. нельзя использовать локи или synchonized, при этом он должен корректно работать в многопоточной среде.

Решение.

Для решения будем использовать так называемые неблокирующие алгоритмы основанные на атомиках.
В основе неблокирующих алгоритмов лежит использование атомарной CAS (Compare-and-Swap или Compare-and-Set) операции. В Java для этого можно использовать атомики, которые поддерживают эти атомарные операции.

Для реализации стека нам нужно реализовать две операции: push (поместить элемент в вершину стека) и pop (извлечь элеменет из вершины стека).

Стек реализуем на основе односвязного списка. Каждый элемент этого списка будет хранить объект и ссылку на следующий элемент в списке. Также нам нужно хранить ссылку на текущую вершину стека.

Класс элемента односвязного списка:

private final class Node<E> {
    public final E item;
    public Node<E> next;

    public Node(E item) {
        this.item = item;
    }
}
Enter fullscreen mode Exit fullscreen mode

Т.к. обе операции со стеком это, по-сути, манипуляции с вершиной стека, то в обоих операциях мы будем производить измение вершины.

Давайте сначала напишем не Thread Safe версию операций push и pop:

push (not Thread Safe):

Node<E> top;

public void push(E item) {
    //Создаем новую вершину списка
    Node<E> newHead = new Node<>(item);
    //Следующий элемент - это текущая вершина
    newHead.next = top;
    //Заменяем текущую вершину новым элементом
    top = newHead;
}
Enter fullscreen mode Exit fullscreen mode

pop(not Thread Safe):

public E pop() {
    //Если стек пустой, то возвращаем null
    if (top == null) {
        return null;
    }
    Node<E> oldHead = top;
    //Если стек не пустой, то новой вершиной стека будет следующий 
    //элемент стека
    top = top.next;
    //В качестве результата возвращаем старую вершину
    return oldHead.item;
}
Enter fullscreen mode Exit fullscreen mode

Как можно заметить, в каждом из методов мы производим несколько неатомарных операций с вершиной стека. В процессе этих операций состояние стека и соответственно вершины может измениться, если с ним работают из нескольких потоков.
Чтобы операции сделать потокобезопасными и не использовать synchronized и локи, сделаем ссылку на вершину атомиком - AtomicReference и применим неблокирующий алгоритм, который устроен следующим образом:

do {
    doing some changes...
} while (!compareAndSet(our assumption))
Enter fullscreen mode Exit fullscreen mode

Будем делать изменения в цикле и каждый раз проверять, что наши предположения все еще валидны и никто не изменил состояние объектов, к которым есть доступ из нескольких потоков.

В нашем случае будем менять вершину стека и проверять, что она не была изменена кем-то еще из другого потока. Если была - попробуем повторить операцию еще раз в новой итерации цикла. Если никто не менял ее в другом потоке - то изменим нашу вершину новым значение в атомарной операции compareAndSet.

В коде это выглядит так:

push (Thread Safe):

AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();

public void push(E item) {
    Node<E> newHead = new Node<>(item);
    Node<E> oldHead;
    do {
        oldHead = top.get();
        //Новая вершина должна ссылаться на старую вершину стека
        newHead.next = oldHead;
    //Если вершина не была изменена кем-то еще (top == oldHead), 
    //то подменяем и выходим из цикла (top = newHead)
    } while (!top.compareAndSet(oldHead, newHead));
}
Enter fullscreen mode Exit fullscreen mode

pop(Thread Safe):

public E pop() {
    Node<E> oldHead;
    Node<E> newHead;
    do {
        oldHead = top.get();
        //Если стек пустой, то возвращаем null
        if (oldHead == null) {
            return null;
        }
        //Если стек не пустой, то новой вершиной стека будет 
        //следующий элемент стека
        newHead = oldHead.next;
    //Если вершина не была изменена кем-то еще (top == oldHead), 
    //то подменяем и выходим из цикла (top = newHead)
    } while (!top.compareAndSet(oldHead, newHead));
    return oldHead.item;
}
Enter fullscreen mode Exit fullscreen mode

Все вместе:

import java.util.concurrent.atomic.AtomicReference;

public class NonBlockingStack<E> {

    AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();

    public void push(E item) {
        Node<E> newHead = new Node<>(item);
        Node<E> oldHead;
        do {
            oldHead = top.get();
            //Новая вершина должна ссылаться на старую вершину стека
            newHead.next = oldHead;
            //Если вершина не была изменена кем-то еще (top == oldHead),
            //то подменяем и выходим из цикла (top = newHead)
        } while (!top.compareAndSet(oldHead, newHead ));
    }

    public E pop() {
        Node<E> oldHead;
        Node<E> newHead;
        do {
            oldHead = top.get();
            //Если стек пустой, то возвращаем null
            if (oldHead == null) {
                return null;
            }
            //Если стек не пустой, то новой вершиной стека будет
            //следующий элемент стека
            newHead = oldHead.next;
            //Если вершина не была изменена кем-то еще (top == oldHead),
            //то подменяем и выходим из цикла (top = newHead)
        } while (!top.compareAndSet(oldHead, newHead));
        return oldHead.item;
    }

    private final class Node<E> {
        public final E item;
        public Node<E> next;

        public Node(E item) {
            this.item = item;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Terabox Video Player