Java如何實現(xiàn)協(xié)程

2020-10-21 11:41 更新

協(xié)程(Coroutine)這個詞其實有很多叫法,比如有的人喜歡稱為纖程(Fiber),或者綠色線程(GreenThread)。其實究其本質(zhì),對于協(xié)程最直觀的解釋是線程的線程。雖然讀上去有點拗口,但本質(zhì)上就是這樣。

協(xié)程的核心在于調(diào)度那塊由他來負責解決,遇到阻塞操作,立刻放棄掉,并且記錄當前棧上的數(shù)據(jù),阻塞完后立刻再找一個線程恢復棧并把阻塞的結(jié)果放到這個線程上去跑,這樣看上去好像跟寫同步代碼沒有任何差別,這整個流程可以稱為coroutine,而跑在由coroutine負責調(diào)度的線程稱為Fiber。


java協(xié)程的實現(xiàn)

早期,在JVM上實現(xiàn)協(xié)程一般會使用kilim,不過這個工具已經(jīng)很久不更新了,現(xiàn)在常用的工具是Quasar,而本文章會全部基于Quasar來介紹。

下面嘗試通過Quasar來實現(xiàn)類似于go語言的coroutine以及channel。

為了能有明確的對比,這里先用go語言實現(xiàn)一個對于10以內(nèi)自然數(shù)分別求平方的例子。

func counter(out chan<- int) {
  for x := 0; x < 10; x++ {
    out <- x
  }
  close(out)
}

func squarer(out chan<- int, in <-chan int) {
  for v := range in {
    out <- v * v
  }
  close(out)
}

func printer(in <-chan int) {
  for v := range in {
    fmt.Println(v)
  }
}

func main() {
  //定義兩個int類型的channel
  naturals := make(chan int)
  squares := make(chan int)

  //產(chǎn)生兩個Fiber,用go關(guān)鍵字
  go counter(naturals)
  go squarer(squares, naturals)
  //獲取計算結(jié)果
  printer(squares)
}

上面這個例子,通過channel兩解耦兩邊的數(shù)據(jù)共享。對于這個channel,大家可以理解為Java里的SynchronousQueue。下面我直接上Quasar版JAVA代碼的,幾乎可以原封不動的復制go語言的代碼。

public class Example {

  private static void printer(Channel<Integer> in) throws SuspendExecution,  InterruptedException {
    Integer v;
    while ((v = in.receive()) != null) {
      System.out.println(v);
    }
  }

  public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
    //定義兩個Channel
    Channel<Integer> naturals = Channels.newChannel(-1);
    Channel<Integer> squares = Channels.newChannel(-1);

    //運行兩個Fiber實現(xiàn).
    new Fiber(() -> {
      for (int i = 0; i < 10; i++)
        naturals.send(i);
      naturals.close();
    }).start();

    new Fiber(() -> {
      Integer v;
      while ((v = naturals.receive()) != null)
        squares.send(v * v);
      squares.close();
    }).start();

    printer(squares);
  }
}

兩者對比,看上去Java似好像更復雜些,沒辦法這就是Java的風格,而且這還是通過第三方的庫來實現(xiàn)的。

說到這里各位肯定對Fiber很好奇了。也許你會表示懷疑Fiber是不是如上面所描述的那樣,下面我們嘗試用Quasar建立一百萬個Fiber,看看內(nèi)存占用多少,我先嘗試了創(chuàng)建百萬個Thread。

for (int i = 0; i < 1_000_000; i++) {
  new Thread(() -> {
    try {
      Thread.sleep(10000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }).start();
}

很不幸,直接報Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread,這是情理之中的。下面是通過Quasar建立百萬個Fiber。

public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
  int FiberNumber = 1_000_000;
  CountDownLatch latch = new CountDownLatch(1);
  AtomicInteger counter = new AtomicInteger(0);

  for (int i = 0; i < FiberNumber; i++) {
    new Fiber(() -> {
      counter.incrementAndGet();
      if (counter.get() == FiberNumber) {
        System.out.println("done");
      }
      Strand.sleep(1000000);
    }).start();
  }
  latch.await();
}

我這里加了latch,阻止程序跑完就關(guān)閉,Strand.sleep其實跟Thread.sleep一樣,只是這里針對的是Fiber。

最終控制臺是可以輸出done的,說明程序已經(jīng)創(chuàng)建了百萬個Fiber,設置Sleep是為了讓Fiber一直運行,從而方便計算內(nèi)存占用。官方宣稱一個空閑的Fiber大約占用400Byte,那這里應該是占用400MB堆內(nèi)存,但是這里通過jmap -heap pid顯示大約占用了1000MB,也就是說一個Fiber占用1KB。


Quasar是怎么實現(xiàn)Fiber的

其實Quasar實現(xiàn)的coroutine的方式與Go語言很像,只不過前者是使用框架來實現(xiàn),而go語言則是語言內(nèi)置的功能。

不過如果你熟悉了Go語言的調(diào)度機制的話,那么對于Quasar的調(diào)度機制就會好理解很多了,因為兩者有很多相似之處。

Quasar里的Fiber其實是一個continuation,他可以被Quasar定義的scheduler調(diào)度,一個continuation記錄著運行實例的狀態(tài),而且會被隨時中斷,并且也會隨后在他被中斷的地方恢復。

Quasar其實是通過修改bytecode來達到這個目的,所以運行Quasar程序的時候,你需要先通過java-agent在運行時修改你的代碼,當然也可以在編譯期間這么干。go語言的內(nèi)置了自己的調(diào)度器,而Quasar則是默認使用ForkJoinPool這個具有work-stealing功能的線程池來當調(diào)度器。work-stealing非常重要,因為你不清楚哪個Fiber會先執(zhí)行完,而work-stealing可以動態(tài)的從其他的等等隊列偷一個context過來,這樣可以最大化使用CPU資源。

那這里你會問了,Quasar怎么知道修改哪些字節(jié)碼呢,其實也很簡單,Quasar會通過java-agent在運行時掃描哪些方法是可以中斷的,同時會在方法被調(diào)用前和調(diào)度后的方法內(nèi)插入一些continuation邏輯,如果你在方法上定義了@Suspendable注解,那Quasar會對調(diào)用該注解的方法做類似下面的事情。

這里假設你在方法f上定義了@Suspendable,同時去調(diào)用了有同樣注解的方法g,那么所有調(diào)用f的方法會插入一些字節(jié)碼,這些字節(jié)碼的邏輯就是記錄當前Fiber棧上的狀態(tài),以便在未來可以動態(tài)的恢復。(Fiber類似線程也有自己的棧)。在suspendable方法鏈內(nèi)Fiber的父類會調(diào)用Fiber.park,這樣會拋出SuspendExecution異常,從而來停止線程的運行,好讓Quasar的調(diào)度器執(zhí)行調(diào)度。這里的SuspendExecution會被Fiber自己捕獲,業(yè)務層面上不應該捕獲到。如果Fiber被喚醒了(調(diào)度器層面會去調(diào)用Fiber.unpark),那么f會在被中斷的地方重新被調(diào)用(這里Fiber會知道自己在哪里被中斷),同時會把g的調(diào)用結(jié)果(g會return結(jié)果)插入到f的恢復點,這樣看上去就好像g的return是f的local variables了,從而避免了callback嵌套。

上面說了一大堆,其實簡單點來講就是,想辦法讓運行中的線程棧停下來,然后讓Quasar的調(diào)度器介入。

JVM線程中斷的條件有兩個:

1、拋異常

2、return。

而在Quasar中,一般就是通過拋異常的方式來達到的,所以你會看到上面的代碼會拋出SuspendExecution。但是如果你真捕獲到這個異常,那就說明有問題了,所以一般會這么寫。

@Suspendable
public int f() {
  try {
    // do some stuff
    return g() * 2;
  } catch(SuspendExecution s) {
    //這里不應該捕獲到異常.
    throw new AssertionError(s);
  }
}


Coroutine in Java - Quasar Fiber實現(xiàn) 

Quasar Fiber則是通過字節(jié)碼修改技術(shù)在編譯或載入時織入必要的上下文保存/恢復代碼,通過拋異常來暫停,恢復的時候根據(jù)保存的上下文(Continuation),恢復jvm的方法調(diào)用棧和局部變量,Quasar Fiber提供相應的Java類庫來實現(xiàn),對應用有一定的侵入性(很小)

Quasar Fiber 主要有 Instrument + Continuation + Scheduler幾個部分組成

  • Instrument 做一些代碼的植入,如park前后上下文的保存/恢復等
  • Continuation 保存方法調(diào)用的信息,如局部變量,引用等,用戶態(tài)的stack,這個也是跟akka等基于固定callback接口的異步框架最大的區(qū)別
  • Scheduler 調(diào)度器,負責將fiber分配到具體的os thread執(zhí)行



相關(guān)閱讀:

JAVA微課——像玩游戲般學習java

JAVA多線程編程


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號