亲宝软件园·资讯

展开

Java 实现协程的方法

W3CSCHOOL 人气:0

协程(Coroutine)这个词其实有很多叫法,比如有的人喜欢称为纤程(Fiber),或者绿色线程(GreenThread)。其实究其本质,对于协程最直观的解释是线程的线程。虽然读上去有点拗口,但本质上就是这样。

协程的核心在于调度那块由他来负责解决,遇到阻塞操作,立刻放弃掉,并且记录当前栈上的数据,阻塞完后立刻再找一个线程恢复栈并把阻塞的结果放到这个线程上去跑,这样看上去好像跟写同步代码没有任何差别,这整个流程可以称为coroutine,而跑在由coroutine负责调度的线程称为Fiber。

java协程的实现

早期,在JVM上实现协程一般会使用kilim,不过这个工具已经很久不更新了,现在常用的工具是Quasar,而本文章会全部基于Quasar来介绍。

下面尝试通过Quasar来实现类似于go语言的coroutine以及channel。

为了能有明确的对比,这里先用go语言实现一个对于10以内自然数分别求平方的例子。

func counter(out chan<- int) {
 for x := 0; x < 10; x++ {
  out <- x
 }
 close(out)
}

func squarer(out chan<- int, in <-chan int) {
 for v := range in {
  out <- v * v
 }
 close(out)
}

func printer(in <-chan int) {
 for v := range in {
  fmt.Println(v)
 }
}

func main() {
 //定义两个int类型的channel
 naturals := make(chan int)
 squares := make(chan int)

 //产生两个Fiber,用go关键字
 go counter(naturals)
 go squarer(squares, naturals)
 //获取计算结果
 printer(squares)
}

上面这个例子,通过channel两解耦两边的数据共享。对于这个channel,大家可以理解为Java里的SynchronousQueue。下面我直接上Quasar版JAVA代码的,几乎可以原封不动的复制go语言的代码。

public class Example {

 private static void printer(Channel<Integer> in) throws SuspendExecution, InterruptedException {
  Integer v;
  while ((v = in.receive()) != null) {
   System.out.println(v);
  }
 }

 public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
  //定义两个Channel
  Channel<Integer> naturals = Channels.newChannel(-1);
  Channel<Integer> squares = Channels.newChannel(-1);

  //运行两个Fiber实现.
  new Fiber(() -> {
   for (int i = 0; i < 10; i++)
    naturals.send(i);
   naturals.close();
  }).start();

  new Fiber(() -> {
   Integer v;
   while ((v = naturals.receive()) != null)
    squares.send(v * v);
   squares.close();
  }).start();

  printer(squares);
 }
}

两者对比,看上去Java似好像更复杂些,没办法这就是Java的风格,而且这还是通过第三方的库来实现的。

说到这里各位肯定对Fiber很好奇了。也许你会表示怀疑Fiber是不是如上面所描述的那样,下面我们尝试用Quasar建立一百万个Fiber,看看内存占用多少,我先尝试了创建百万个Thread。

for (int i = 0; i < 1_000_000; i++) {
 new Thread(() -> {
  try {
   Thread.sleep(10000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }).start();
}

很不幸,直接报Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread,这是情理之中的。下面是通过Quasar建立百万个Fiber。

public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
 int FiberNumber = 1_000_000;
 CountDownLatch latch = new CountDownLatch(1);
 AtomicInteger counter = new AtomicInteger(0);

 for (int i = 0; i < FiberNumber; i++) {
  new Fiber(() -> {
   counter.incrementAndGet();
   if (counter.get() == FiberNumber) {
    System.out.println("done");
   }
   Strand.sleep(1000000);
  }).start();
 }
 latch.await();
}

我这里加了latch,阻止程序跑完就关闭,Strand.sleep其实跟Thread.sleep一样,只是这里针对的是Fiber。

最终控制台是可以输出done的,说明程序已经创建了百万个Fiber,设置Sleep是为了让Fiber一直运行,从而方便计算内存占用。官方宣称一个空闲的Fiber大约占用400Byte,那这里应该是占用400MB堆内存,但是这里通过jmap -heap pid显示大约占用了1000MB,也就是说一个Fiber占用1KB。

Quasar是怎么实现Fiber的

其实Quasar实现的coroutine的方式与Go语言很像,只不过前者是使用框架来实现,而go语言则是语言内置的功能。

不过如果你熟悉了Go语言的调度机制的话,那么对于Quasar的调度机制就会好理解很多了,因为两者有很多相似之处。

Quasar里的Fiber其实是一个continuation,他可以被Quasar定义的scheduler调度,一个continuation记录着运行实例的状态,而且会被随时中断,并且也会随后在他被中断的地方恢复。

Quasar其实是通过修改bytecode来达到这个目的,所以运行Quasar程序的时候,你需要先通过java-agent在运行时修改你的代码,当然也可以在编译期间这么干。go语言的内置了自己的调度器,而Quasar则是默认使用ForkJoinPool这个具有work-stealing功能的线程池来当调度器。work-stealing非常重要,因为你不清楚哪个Fiber会先执行完,而work-stealing可以动态的从其他的等等队列偷一个context过来,这样可以最大化使用CPU资源。

那这里你会问了,Quasar怎么知道修改哪些字节码呢,其实也很简单,Quasar会通过java-agent在运行时扫描哪些方法是可以中断的,同时会在方法被调用前和调度后的方法内插入一些continuation逻辑,如果你在方法上定义了@Suspendable注解,那Quasar会对调用该注解的方法做类似下面的事情。

这里假设你在方法f上定义了@Suspendable,同时去调用了有同样注解的方法g,那么所有调用f的方法会插入一些字节码,这些字节码的逻辑就是记录当前Fiber栈上的状态,以便在未来可以动态的恢复。(Fiber类似线程也有自己的栈)。在suspendable方法链内Fiber的父类会调用Fiber.park,这样会抛出SuspendExecution异常,从而来停止线程的运行,好让Quasar的调度器执行调度。这里的SuspendExecution会被Fiber自己捕获,业务层面上不应该捕获到。如果Fiber被唤醒了(调度器层面会去调用Fiber.unpark),那么f会在被中断的地方重新被调用(这里Fiber会知道自己在哪里被中断),同时会把g的调用结果(g会return结果)插入到f的恢复点,这样看上去就好像g的return是f的local variables了,从而避免了callback嵌套。

上面说了一大堆,其实简单点来讲就是,想办法让运行中的线程栈停下来,然后让Quasar的调度器介入。

JVM线程中断的条件有两个:

1、抛异常

2、return。

而在Quasar中,一般就是通过抛异常的方式来达到的,所以你会看到上面的代码会抛出SuspendExecution。但是如果你真捕获到这个异常,那就说明有问题了,所以一般会这么写。

@Suspendable
public int f() {
 try {
  // do some stuff
  return g() * 2;
 } catch(SuspendExecution s) {
  //这里不应该捕获到异常.
  throw new AssertionError(s);
 }
}

加载全部内容

相关教程
猜你喜欢
用户评论