Класс Phaser позволяет синхронизировать потоки, представляющие отдельную фазу или стадию выполнения общего действия. Phaser определяет объект синхронизации, который ждет, пока не завершится определенная фаза. Затем Phaser переходит к следующей стадии или фазе и снова ожидает ее завершения.
Для создания объекта Phaser используется один из конструкторов:
Phaser() Phaser(int parties) Phaser(Phaser parent) Phaser(Phaser parent, int parties)
Параметр parties
указывает на количество участников (грубо говоря, потоков), которые должны выполнять все фазы действия. Первый конструктор создает объект Phaser
без каких-либо участников. Второй конструктор регистрирует передаваемое в конструктор количество участников. Третий и четвертый конструкторы также устанавливают родительский объект Phaser.
Основные методы класса Phaser:
int register(): регистрирует участника, который выполняет фазы, и возвращает номер текущей фазы - обычно фаза 0
int arrive(): сообщает, что участник завершил фазу, и возвращает номер текущей фазы
int arriveAndAwaitAdvance(): аналогичен методу arrive, только при этом заставляет phaser ожидать завершения фазы всеми остальными участниками
int arriveAndDeregister(): сообщает о завершении всех фаз участником и снимает его с регистрации. Возвращает номер текущей фазы или отрицательное число, если синхронизатор Phaser завершил свою работу
int getPhase(): возвращает номер текущей фазы
При работае с классом Phaser обычно сначала создается его объект. Далее нам надо зарегистрировать всех участников. Для регистрации
для каждоого участника вызывается метод register()
, либо можно обойтись и без этого метода, передав нужное количество участников в конструктор Phaser.
Затем каждый участник выполняет некоторый набор действий, составляющих фазу. А синхронизатор Phaser ждет, пока все участники не завершат выполнение фазы. Чтобы сообщить синхронизатору,
что фаза завершена, участник должен вызвать метод arrive()
или arriveAndAwaitAdvance()
. После этого синхронизатор переходит к следующей фазе.
Применим Phaser в приложении:
import java.util.concurrent.Phaser; public class Program { public static void main(String[] args) { Phaser phaser = new Phaser(1); new Thread(new PhaseThread(phaser, "PhaseThread 1")).start(); new Thread(new PhaseThread(phaser, "PhaseThread 2")).start(); // ждем завершения фазы 0 int phase = phaser.getPhase(); phaser.arriveAndAwaitAdvance(); System.out.println("Фаза " + phase + " завершена"); // ждем завершения фазы 1 phase = phaser.getPhase(); phaser.arriveAndAwaitAdvance(); System.out.println("Фаза " + phase + " завершена"); // ждем завершения фазы 2 phase = phaser.getPhase(); phaser.arriveAndAwaitAdvance(); System.out.println("Фаза " + phase + " завершена"); phaser.arriveAndDeregister(); } } class PhaseThread implements Runnable{ Phaser phaser; String name; PhaseThread(Phaser p, String n){ this.phaser=p; this.name=n; phaser.register(); } public void run(){ System.out.println(name + " выполняет фазу " + phaser.getPhase()); phaser.arriveAndAwaitAdvance(); // сообщаем, что первая фаза достигнута System.out.println(name + " выполняет фазу " + phaser.getPhase()); phaser.arriveAndAwaitAdvance(); // сообщаем, что вторая фаза достигнута System.out.println(name + " выполняет фазу " + phaser.getPhase()); phaser.arriveAndDeregister(); // сообщаем о завершении фаз и удаляем с регистрации объекты } }
Итак, здесь у нас фазы выполняются тремя участниками - главным потоком и двумя потоками PhaseThread. Поэтому при создании объекта Phaser ему передается
число 1 - главный поток, а в конструкторе PhaseThread вызывается метод register()
. Мы в принципе могли бы не использовать метод register,
но тогда нам надо было бы указать Phaser phaser = new Phaser(3)
, так как у нас три участника.
Фаза для каждого участника представляет минимальный примитивный набор действий: для потоков PhaseThread это вывод сообщения, а для главного потока - подсчет текущей фазы с помощью метода
getPhase()
. При этом отсчет фаз начинается с нуля. Каждый участник завершает выполнение фазы вызовом метода
phaser.arriveAndAwaitAdvance()
. При вызове этого метода пока последний участник не завершит выполнение текущей фазы, все остальные
участники блокируются.
После завершения выполнения последней фазы происходит отмена регистрации всех участников с помощью метода arriveAndDeregister()
.
В итоге работа программы даст следующий вывод:
PhaseThread 1 выполняет фазу 0 PhaseThread 2 выполняет фазу 0 PhaseThread 1 выполняет фазу 1 PhaseThread 2 выполняет фазу 1 Фаза 0 завершена Фаза 1 завершена PhaseThread 1 выполняет фазу 2 PhaseThread 2 выполняет фазу 2 Фаза 2 завершена
В данном случае получается немного путанный вывод. Так, сообщения о выполнении фазы 1 выводится после сообщения об окончании фазы 0. Что связано с многопоточностью - фазы завершились, но в одном потоке еще не выведено сообщение о завершении, тогда как другие потоки уже начали выполнение следующей фазы. В любом случае все это происходит уже после завершения фазы.
Но чтобы было более наглядно, мы можем использовать sleep в потоках:
public void run(){ System.out.println(name + " выполняет фазу " + phaser.getPhase()); phaser.arriveAndAwaitAdvance(); // сообщаем, что первая фаза достигнута try{ Thread.sleep(200); } catch(InterruptedException ex){ System.out.println(ex.getMessage()); } System.out.println(name + " выполняет фазу " + phaser.getPhase()); phaser.arriveAndAwaitAdvance(); // сообщаем, что вторая фаза достигнута try{ Thread.sleep(200); } catch(InterruptedException ex){ System.out.println(ex.getMessage()); } System.out.println(name + " выполняет фазу " + phaser.getPhase()); phaser.arriveAndDeregister(); // сообщаем о завершении фаз и удаляем с регистрации объекты }
И в этом случае вывод будет более привычным, хотя на работу фаз это никак не повлияет.
PhaseThread 1 выполняет фазу 0 PhaseThread 2 выполняет фазу 0 Фаза 0 завершена PhaseThread 2 выполняет фазу 1 PhaseThread 1 выполняет фазу 1 Фаза 1 завершена PhaseThread 2 выполняет фазу 2 PhaseThread 1 выполняет фазу 2 Фаза 2 завершена