Java 实现协程的方法
作者:W3CSCHOOL 发布时间:2022-02-18 22:55:05
协程(Coroutine)这个词其实有很多叫法,比如有的人喜欢称为纤程(Fiber),或者绿色线程(GreenThread)。其实究其本质,对于协程最直观的解释是线程的线程。虽然读上去有点拗口,但本质上就是这样。
协程的核心在于调度那块由他来负责解决,遇到阻塞操作,立刻放弃掉,并且记录当前栈上的数据,阻塞完后立刻再找一个线程恢复栈并把阻塞的结果放到这个线程上去跑,这样看上去好像跟写同步代码没有任何差别,这整个流程可以称为coroutine,而跑在由coroutine负责调度的线程称为Fiber。
java协程的实现
早期,在JVM上实现协程一般会使用kilim,不过这个工具已经很久不更新了,现在常用的工具是Quasar,而本文章会全部基于Quasar来介绍。
下面尝试通过Quasar来实现类似于go语言的coroutine以及channel。
为了能有明确的对比,这里先用go语言实现一个对于10以内自然数分别求平方的例子。
func counter(out chan<- int) {
for x := 0; x < 10; x++ {
out <- x
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v * v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
//定义两个int类型的channel
naturals := make(chan int)
squares := make(chan int)
//产生两个Fiber,用go关键字
go counter(naturals)
go squarer(squares, naturals)
//获取计算结果
printer(squares)
}
上面这个例子,通过channel两解耦两边的数据共享。对于这个channel,大家可以理解为Java里的SynchronousQueue。下面我直接上Quasar版JAVA代码的,几乎可以原封不动的复制go语言的代码。
public class Example {
private static void printer(Channel<Integer> in) throws SuspendExecution, InterruptedException {
Integer v;
while ((v = in.receive()) != null) {
System.out.println(v);
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
//定义两个Channel
Channel<Integer> naturals = Channels.newChannel(-1);
Channel<Integer> squares = Channels.newChannel(-1);
//运行两个Fiber实现.
new Fiber(() -> {
for (int i = 0; i < 10; i++)
naturals.send(i);
naturals.close();
}).start();
new Fiber(() -> {
Integer v;
while ((v = naturals.receive()) != null)
squares.send(v * v);
squares.close();
}).start();
printer(squares);
}
}
两者对比,看上去Java似好像更复杂些,没办法这就是Java的风格,而且这还是通过第三方的库来实现的。
说到这里各位肯定对Fiber很好奇了。也许你会表示怀疑Fiber是不是如上面所描述的那样,下面我们尝试用Quasar建立一百万个Fiber,看看内存占用多少,我先尝试了创建百万个Thread。
for (int i = 0; i < 1_000_000; i++) {
new Thread(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
很不幸,直接报Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread,这是情理之中的。下面是通过Quasar建立百万个Fiber。
public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
int FiberNumber = 1_000_000;
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger counter = new AtomicInteger(0);
for (int i = 0; i < FiberNumber; i++) {
new Fiber(() -> {
counter.incrementAndGet();
if (counter.get() == FiberNumber) {
System.out.println("done");
}
Strand.sleep(1000000);
}).start();
}
latch.await();
}
我这里加了latch,阻止程序跑完就关闭,Strand.sleep其实跟Thread.sleep一样,只是这里针对的是Fiber。
最终控制台是可以输出done的,说明程序已经创建了百万个Fiber,设置Sleep是为了让Fiber一直运行,从而方便计算内存占用。官方宣称一个空闲的Fiber大约占用400Byte,那这里应该是占用400MB堆内存,但是这里通过jmap -heap pid显示大约占用了1000MB,也就是说一个Fiber占用1KB。
Quasar是怎么实现Fiber的
其实Quasar实现的coroutine的方式与Go语言很像,只不过前者是使用框架来实现,而go语言则是语言内置的功能。
不过如果你熟悉了Go语言的调度机制的话,那么对于Quasar的调度机制就会好理解很多了,因为两者有很多相似之处。
Quasar里的Fiber其实是一个continuation,他可以被Quasar定义的scheduler调度,一个continuation记录着运行实例的状态,而且会被随时中断,并且也会随后在他被中断的地方恢复。
Quasar其实是通过修改bytecode来达到这个目的,所以运行Quasar程序的时候,你需要先通过java-agent在运行时修改你的代码,当然也可以在编译期间这么干。go语言的内置了自己的调度器,而Quasar则是默认使用ForkJoinPool这个具有work-stealing功能的线程池来当调度器。work-stealing非常重要,因为你不清楚哪个Fiber会先执行完,而work-stealing可以动态的从其他的等等队列偷一个context过来,这样可以最大化使用CPU资源。
那这里你会问了,Quasar怎么知道修改哪些字节码呢,其实也很简单,Quasar会通过java-agent在运行时扫描哪些方法是可以中断的,同时会在方法被调用前和调度后的方法内插入一些continuation逻辑,如果你在方法上定义了@Suspendable注解,那Quasar会对调用该注解的方法做类似下面的事情。
这里假设你在方法f上定义了@Suspendable,同时去调用了有同样注解的方法g,那么所有调用f的方法会插入一些字节码,这些字节码的逻辑就是记录当前Fiber栈上的状态,以便在未来可以动态的恢复。(Fiber类似线程也有自己的栈)。在suspendable方法链内Fiber的父类会调用Fiber.park,这样会抛出SuspendExecution异常,从而来停止线程的运行,好让Quasar的调度器执行调度。这里的SuspendExecution会被Fiber自己捕获,业务层面上不应该捕获到。如果Fiber被唤醒了(调度器层面会去调用Fiber.unpark),那么f会在被中断的地方重新被调用(这里Fiber会知道自己在哪里被中断),同时会把g的调用结果(g会return结果)插入到f的恢复点,这样看上去就好像g的return是f的local variables了,从而避免了callback嵌套。
上面说了一大堆,其实简单点来讲就是,想办法让运行中的线程栈停下来,然后让Quasar的调度器介入。
JVM线程中断的条件有两个:
1、抛异常
2、return。
而在Quasar中,一般就是通过抛异常的方式来达到的,所以你会看到上面的代码会抛出SuspendExecution。但是如果你真捕获到这个异常,那就说明有问题了,所以一般会这么写。
@Suspendable
public int f() {
try {
// do some stuff
return g() * 2;
} catch(SuspendExecution s) {
//这里不应该捕获到异常.
throw new AssertionError(s);
}
}
来源:https://www.w3cschool.cn/java/java-x3pi2oso.html


猜你喜欢
- springboot http转https一、安全证书的生成可以使用jdk自带的证书生成工具,jdk自带一个叫keytool的证书管理工具,
- 先看进度条的效果:具体实现:新建类,继承自View,在onDraw中进行绘制:import android.content.Context;
- 获取方法的相关信息的两种形式反射是一种允许用户获得类信息的C#功能,Type对象映射它代表的底层对象;在.Net 中, 一旦获得了Type对
- 先新建一个文件夹kun,kun就是类所在的package。新建一个java文件。HelloWorld.java的代码如下:package k
- 目录概述语法索引器(Indexer)的用途重载索引器(Indexer)概述索引器(Indexer) 允许一个对象可以像数组一样使用下标的方式
- 写在前面jenkins作为java的好 * ,经历过单体项目时代->集群项目时代->容器集群分布式时代,使用稳定可靠,cpu友好(
- C++实现接两个链表实例代码有以ha为头结点的链表,元素个数为m;以hb为头结点的链表,元素个数为n。现在需要你把这两个链表连接
- 在Java 8之前,对集合进行排序需要为排序中使用的比较器 Comparator 创建一个匿名内部类:new Compa
- 关于mybatis基础我们前面几篇博客已经介绍了很多了,今天我们来说一个简单的问题,那就是mybatis中的缓存问题。mybatis本身对缓
- 具体详情如下所示:int -> Stringint i=12345;String s="";第一种方法:s=i+&
- //测试StringBuilder的运行效率 publi
- 在同一个类中: 对于静态方法,其他的静态或非静态方法都可以直接调用它。而对于非静态方法,其他的非静态方法是可以直接调用它的。但是其他静态方法
- 本文实例为大家分享了ListView分页加载数据的具体代码,供大家参考,具体内容如下FenyeActivitypackage com.exa
- 为什么要写这篇文章经过了若干年的发展,Java逐步从java8升级为java11,java17。让我们对比学习一下最新一版的LTS版本和ja
- 安装完 Android Studio 后启动,却报错如下:failed to create jvm error code -4这一般应是内存
- /* * Copyright 2012-2013 The Haohui Network Cor
- 1、每帧检查定义一个时间变量 timer,每帧将此时间减去帧间隔时间 Time.deltaTime,如果小于或者等于零,说明定时器到了,执行
- 所谓文件的断点续传,就是一个线程传输文件,另一个线程控制传输标识,以达到暂停文件效果、恢复文件上传的效果。本demo使用最基本的线程之间的通
- 本文实例为大家分享了Java实现简易计算器的具体代码,供大家参考,具体内容如下程序的运行环境为Windows10 ,编译环境为IDEA。计算
- 测试1@BenchmarkMode(Mode.AverageTime)@OutputTimeUnit(TimeUnit.NANOSECOND