Phaser 解决了什么问题
- 它可以取代 CyclicBarrier 和 CountDownLatch 这两个类,功能丰富;
- 它可以管理分阶段任务,可以有父子任务(大小任务,一个任务拆解成几个小任务)。
PPhaser是Java的并发辅助类,支持多阶段执行、可变参与者数和灵活的线程同步。相比其他工具如CyclicBarrier和CountDownLatch,Phaser更动态、灵活。其主要功能是管理分阶段任务,即所有线程必须在进入下一阶段前完成当前阶段。Phaser提供了register、arrive、arriveAndAwaitAdvance和arriveAndDeregister等方法,可用于注册线程、通知任务完成、等待其他线程到达和动态变化参与线程数,使其成为处理复杂并发任务的有效工具。
替换 CyclicBarrier
所有任务参与者到先等待,都到达后再执行。
@Test
public void testCyclicBarrierFunction() {
//这里分配四个任务参与者,包括主线程
Phaser phaser = new Phaser(4);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
log.debug("start");
//先等待其他参与者到达
phaser.arriveAndAwaitAdvance();
log.debug("end");
}).start();
}
log.debug("main start");
//主线程等待其他参与者到达
phaser.arriveAndAwaitAdvance();
log.debug("main end");
}
执行结果如下:
17:39:25.635 [Thread-0] DEBUG com.zempty.aqs.PhaserDemo - start
17:39:25.635 [Thread-1] DEBUG com.zempty.aqs.PhaserDemo - start
17:39:25.635 [Thread-2] DEBUG com.zempty.aqs.PhaserDemo - start
17:39:25.635 [main] DEBUG com.zempty.aqs.PhaserDemo - main start
17:39:25.638 [Thread-0] DEBUG com.zempty.aqs.PhaserDemo - end
17:39:25.638 [main] DEBUG com.zempty.aqs.PhaserDemo - main end
17:39:25.638 [Thread-2] DEBUG com.zempty.aqs.PhaserDemo - end
17:39:25.639 [Thread-1] DEBUG com.zempty.aqs.PhaserDemo - end
这里输出结果是先输出 start ,然后输出 end;
arriveAndAwaitAdvance() 这个方法会使线程阻塞,暂停运行都到达后(也就是参与者数量都到达以后)才会运行。
替换 CountDownLatch
等其它任务都执行完,我才会继续执行。
@Test
public void testCountDownLatchFunction() {
Phaser phaser = new Phaser(3);
for(int i = 0; i < 3; i++) {
new Thread(() -> {
log.debug("{}执行任务", Thread.currentThread().getName());
phaser.arrive();
}).start();
}
phaser.awaitAdvance(phaser.getPhase());
log.debug("主线程执行完毕");
}
输出结果:
17:44:05.652 [Thread-2] DEBUG com.zempty.aqs.PhaserDemo - Thread-2执行任务
17:44:05.652 [Thread-0] DEBUG com.zempty.aqs.PhaserDemo - Thread-0执行任务
17:44:05.652 [Thread-1] DEBUG com.zempty.aqs.PhaserDemo - Thread-1执行任务
17:44:05.663 [main] DEBUG com.zempty.aqs.PhaserDemo - 主线程执行完毕
这里主线程执行完毕一定是最后输出。
- arrive () 方法会标记当前任务到达,类似于 CountDownLatch 中的 countDown() 方法的功能;
- awaitAdvance() 方法会阻塞当前的线程,等待所有参与线程到达 arrive 以后,才会正常往下执行。
父子 Phaser 研究
Phaser 有一个构造方法:
public Phaser(Phaser parent,int parties)
这个构造方法不好理解,下面介绍一下具体使用,请看如下代码:
Phaser leader = new Phaser(1) ;
Phaser frontend = new Phaser(leader,3);
Phaser backend = new Phaser(leader,3);
比如开发一个项目,分为前后端开发,领导负责任务分配,前端有三个人负责开发,后端有三个人负责开发。
Phaser leader = new Phaser(1) ;
这里就是 leader 负责管理项目进度,此时任务参与者是 1 , 也就是 parties = 1;
Phaser Frontend = new Phaser(leader,3);
当初始化完 frontend 以后, 这里前端任务参与者是 3,前端 3 个人完成任务,但是此时 leader 的参与者不是 1 了,变成 2 了(parties = 2) , leader 需要来监管掌握前端的完成进度。
Phaser backend = new Phaser(leader,3);
当初始化完 backend 以后,这里的后端任务参与者是 3, 后端 3 个人完成任务,但是此时 leader 的参与者不是 2了,变成 3 了(parties = 3) , leader 需要来监管掌握后端的完成进度。
总结: 只要有通过 Phaser 初始化一个子 Phaser, 父 Phaser 的任务参与者会增加 1 个。
下面举例分析:
//父 Phaser 和子 Phaser
@Test
public void testPhaserParentAndSons() {
Phaser leader = new Phaser(1);
Phaser frontend = new Phaser(leader,3);
Phaser backend = new Phaser(leader,3);
for(int i = 0; i < 3; i++) {
new Thread(() -> {
log.debug("前端线程 {} 任务完成", Thread.currentThread().getName());
//这里会等待前端三个家伙都完成,人后才可以继续执行提交代码任务
frontend.arriveAndAwaitAdvance();
log.debug("前端线程 {} 代码提交完毕", Thread.currentThread().getName());
//这里会注销前端线程,parties 会减少一个
frontend.arriveAndDeregister();
}).start();
}
for(int i = 0; i < 3; i++) {
new Thread(() -> {
log.debug("后端线程 {} 任务完成", Thread.currentThread().getName());
//这里会等待后端三个家伙都完成,人后才可以继续执行提交代码任务
backend.arriveAndAwaitAdvance();
log.debug("后端线程 {} 代码提交完毕", Thread.currentThread().getName());
//这里会注销后端线程, parties 会减少一个
backend.arriveAndDeregister();
}).start();
}
//这里会完成两个功能:
//1. 等待所有任务到达
//2. 如果有子 Phaser,它会等待所有子 Phaser 完成任务
//leader会到达,去监督等待所有人任务完成,然后再去提交代码(完成第一阶段-任务完成)
leader.arriveAndAwaitAdvance();
//这里会等待所有人提交代码完毕(完成第二阶段-代码提交)
//当 parties 为 1 的时候,主线程会在这里放行,继续执行后续代码
leader.arriveAndAwaitAdvance();
log.debug("{},审核前后端代码完毕,任务完成!", Thread.currentThread().getName());
}
执行结果如下:
18:58:28.748 [Thread-2] DEBUG com.zempty.aqs.PhaserDemo - 前端线程 Thread-2 任务完成
18:58:28.748 [Thread-0] DEBUG com.zempty.aqs.PhaserDemo - 前端线程 Thread-0 任务完成
18:58:28.748 [Thread-3] DEBUG com.zempty.aqs.PhaserDemo - 后端线程 Thread-3 任务完成
18:58:28.748 [Thread-1] DEBUG com.zempty.aqs.PhaserDemo - 前端线程 Thread-1 任务完成
18:58:28.748 [Thread-4] DEBUG com.zempty.aqs.PhaserDemo - 后端线程 Thread-4 任务完成
18:58:28.748 [Thread-5] DEBUG com.zempty.aqs.PhaserDemo - 后端线程 Thread-5 任务完成
18:58:28.758 [Thread-1] DEBUG com.zempty.aqs.PhaserDemo - 前端线程 Thread-1 代码提交完毕
18:58:28.758 [Thread-3] DEBUG com.zempty.aqs.PhaserDemo - 后端线程 Thread-3 代码提交完毕
18:58:28.758 [Thread-2] DEBUG com.zempty.aqs.PhaserDemo - 前端线程 Thread-2 代码提交完毕
18:58:28.758 [Thread-5] DEBUG com.zempty.aqs.PhaserDemo - 后端线程 Thread-5 代码提交完毕
18:58:28.758 [Thread-4] DEBUG com.zempty.aqs.PhaserDemo - 后端线程 Thread-4 代码提交完毕
18:58:28.759 [Thread-0] DEBUG com.zempty.aqs.PhaserDemo - 前端线程 Thread-0 代码提交完毕
18:58:28.763 [main] DEBUG com.zempty.aqs.PhaserDemo - main,leader 审核前后端代码完毕,任务完成!
执行顺序是:任务完成→代码提交完毕→审核前后端代码完毕,任务完成!
结果分析:
frontend.arriveAndAwaitAdvance() 开始会阻塞等待所有前端任务完成后,才会继续往下面执行;
backend.arriveAndAwaitAdvance() 开始会阻塞等待所有后端任务完成后,才会继续往下面执行;
leader.arriveAndAwaitAdvance() 第一次调用会等待所有子任务的任务完成后才会继续往下面执行;
frontend.arriveAndDeregister() 会进行第二轮的到达,这里 frontend 一位同志代码提交完成,下线,这里 frontend 的任务由初始化的 3 ,变成 2 ,然后 1,最后 0;
backend 同理;
leader.arriveAndAwaitAdvance() 第二次调用的时候,当 frontend 和 backend 代码已经提交,纷纷下线,leader 任务参与者变成 1 ( parties = 1) , 此时主线程到达,可以继续执行,最后会运行:
main,leader 审核前后端代码完毕,任务完成!