實戰(zhàn)中的 Promise 和 Future

2018-02-24 16:00 更新

上一章介紹了 Future 類型,以及如何用它來編寫高可讀性、高組合性的異步執(zhí)行代碼。

Future 只是整個謎團的一部分:它是一個只讀類型,允許你使用它計算得到的值,或者處理計算中出現(xiàn)的錯誤。但是在這之前,必須得有一種方法把這個值放進去。這一章里,你將會看到如何通過 Promise 類型來達到這個目的。

類型 Promise

之前,我們把一段順序執(zhí)行的代碼塊傳遞給了 scala.concurrent 里的 future 方法,并且在作用域中給出了一個 ExecutionContext,它神奇地異步調(diào)用代碼塊,返回一個 Future 類型的結(jié)果。

雖然這種獲得 Future 的方式很簡單,但還有其他的方法來創(chuàng)建 Future 實例,并填充它,這就是 Promise。Promise 允許你在 Future 里放入一個值,不過只能做一次,F(xiàn)uture 一旦完成,就不能更改了。

一個 Future 實例總是和一個(也只能是一個)Promise 實例關(guān)聯(lián)在一起。如果你在 REPL 里調(diào)用 future 方法,你會發(fā)現(xiàn)返回的也是一個 Promise:

import concurrent.Future
import concurrent.Future

scala> import concurrent.future
import concurrent.future

scala> import concurrent.ExecutionContext.Implicits.global
import concurrent.ExecutionContext.Implicits.global

scala> val f: Future[String] = future { "Hello World!" }
f: scala.concurrent.Future[String] = scala.concurrent.impl.Promise$DefaultPromise@2b509249

你得到的對象是一個 DefaultPromise ,它實現(xiàn)了 FuturePromise 接口,不過這就是具體的實現(xiàn)細節(jié)了(譯注,有興趣的讀者可翻閱其實現(xiàn)的源碼),使用者只需要知道代碼實現(xiàn)把 Future 和對應(yīng)的 Promise 之間的聯(lián)系分的很清晰。

這個小例子說明了:除了通過 Promise,沒有其他方法可以完成一個 Future,future 方法也只是一個輔助函數(shù),隱藏了具體的實現(xiàn)機制。

現(xiàn)在,讓我們動動手,看看怎樣直接使用 Promise 類型。

給出承諾

當我們談?wù)撈鸪兄Z能否被兌現(xiàn)時,一個很熟知的例子是那些政客的競選諾言。

假設(shè)被推選的政客給他的投票者一個減稅的承諾。這可以用 Promise[TaxCut] 表示:

import concurrent.Promise
case class TaxCut(reduction: Int)
// either give the type as a type parameter to the factory method:
val taxcut = Promise[TaxCut]()
// or give the compiler a hint by specifying the type of your val:
val taxcut2: Promise[TaxCut] = Promise()
// taxcut: scala.concurrent.Promise[TaxCut] = scala.concurrent.impl.Promise$DefaultPromise@66ae2a84
// taxcut2: scala.concurrent.Promise[TaxCut] = scala.concurrent.impl.Promise$DefaultPromise@346974c6

一旦創(chuàng)建了這個 Promise,就可以在它上面調(diào)用 future 方法來獲取承諾的未來:

 val taxCutF: Future[TaxCut] = taxcut.future
 // `> scala.concurrent.Future[TaxCut] `  scala.concurrent.impl.Promise$DefaultPromise@66ae2a84

返回的 Future 可能并不和 Promise 一樣,但在同一個 Promise 上調(diào)用 future 方法總是返回同一個對象,以確保 Promise 和 Future 之間一對一的關(guān)系。

結(jié)束承諾

一旦給出了承諾,并告訴全世界會在不遠的將來兌現(xiàn)它,那最好盡力去實現(xiàn)。在 Scala 中,可以結(jié)束一個 Promise,無論成功還是失敗。

兌現(xiàn)承諾

為了成功結(jié)束一個 Promise,你可以調(diào)用它的 success 方法,并傳遞一個大家期許的結(jié)果:

  taxcut.success(TaxCut(20))

這樣做之后,Promise 就無法再寫入其他值了,如果偏要再寫,會產(chǎn)生異常。

此時,和 Promise 關(guān)聯(lián)的 Future 也成功完成,注冊的回調(diào)會開始執(zhí)行,或者說對這個 Future 進行了映射,那這個時候,映射函數(shù)也該執(zhí)行了。

一般來說,Promise 的完成和對返回的 Future 的處理發(fā)生在不同的線程。很可能你創(chuàng)建了 Promise,并立即返回和它關(guān)聯(lián)的 Future 給調(diào)用者,而實際上,另外一個線程還在計算它。

為了說明這一點,我們拿減稅來舉個例子:

object Government {
  def redeemCampaignPledge(): Future[TaxCut] = {
    val p = Promise[TaxCut]()
    Future {
      println("Starting the new legislative period.")
      Thread.sleep(2000)
      p.success(TaxCut(20))
      println("We reduced the taxes! You must reelect us!!!!1111")
    }
    p.future
  }
}

這個例子中使用了 Future 伴生對象,不過不要被它搞混淆了,這個例子的重點是:Promise 并不是在調(diào)用者的線程里完成的。

現(xiàn)在我們來兌現(xiàn)當初的競選宣言,在 Future 上添加一個 onComplete 回調(diào):

import scala.util.{Success, Failure}
val taxCutF: Future[TaxCut] = Government.redeemCampaignPledge()
println("Now that they're elected, let's see if they remember their promises...")
taxCutF.onComplete {
  case Success(TaxCut(reduction)) =>
    println(s"A miracle! They really cut our taxes by $reduction percentage points!")
  case Failure(ex) =>
    println(s"They broke their promises! Again! Because of a ${ex.getMessage}")
}

多次運行這個例子,會發(fā)現(xiàn)顯示屏輸出的結(jié)果順序是不確定的,而且,最終回調(diào)函數(shù)會執(zhí)行,進入成功的那個 case 。

違背諾言

政客習(xí)慣違背諾言,Scala 程序員有時候也只能這樣做。調(diào)用 failure 方法,傳遞一個異常,結(jié)束 Promise:

case class LameExcuse(msg: String) extends Exception(msg)
object Government {
  def redeemCampaignPledge(): Future[TaxCut] = {
     val p = Promise[TaxCut]()
     Future {
       println("Starting the new legislative period.")
       Thread.sleep(2000)
       p.failure(LameExcuse("global economy crisis"))
       println("We didn't fulfill our promises, but surely they'll understand.")
     }
     p.future
   }
}

這個 redeemCampaignPledge 實現(xiàn)最終會違背承諾。一旦用 failure 結(jié)束這個 Promise,也無法再次寫入了,正如 success 方法一樣。相關(guān)聯(lián)的 Future 也會以 Failure 收場。

如果已經(jīng)有了一個 Try,那可以直接把它傳遞給 Promise 的 complete 方法,以此來結(jié)束這個它。如果這個 Try 是一個 Success,關(guān)聯(lián)的 Future 會成功完成,否則,就失敗。

基于 Future 的編程實踐

如果想使用基于 Future 的編程范式以增加應(yīng)用的擴展性,那應(yīng)用從下到上都必須被設(shè)計成非阻塞模式。這意味著,基本上應(yīng)用層所有的函數(shù)都應(yīng)該是異步的,并且返回 Future。

當下,一個可能的使用場景是開發(fā) Web 應(yīng)用。流行的 Scala Web 框架,允許你將響應(yīng)作為 Future[Response] 返回,而不是等到你完成響應(yīng)再返回。這個非常重要,因為它允許 Web 服務(wù)器用少量的線程處理更多的連接。通過賦予服務(wù)器 Future[Response] 的能力,你可以最大化服務(wù)器線程池的利用率。

而且,應(yīng)用的服務(wù)可能需要多次調(diào)用數(shù)據(jù)庫層以及(或者)某些外部服務(wù),這時候可以獲取多個 Future,用 for 語句將它們組合成新的 Future,簡單可讀!最終,Web 層再將這樣的一個 Future 變成 Future[Response]。

但是該怎樣在實踐中實現(xiàn)這些呢?需要考慮三種不同的場景:

非阻塞IO

應(yīng)用很可能涉及到大量的 IO 操作。比如,可能需要和數(shù)據(jù)庫交互,還可能作為客戶端去調(diào)用其他的 Web 服務(wù)。

如果是這樣,可以使用一些基于 Java 非阻塞 IO 實現(xiàn)的庫,也可以直接或通過 Netty 這樣的庫來使用 Java 的 NIO API。這樣的庫可以用定量的線程池處理大量的連接。

但如果是想開發(fā)這樣的一個庫,直接和 Promise 打交道更為合適。

阻塞 IO

有時候,并沒有基于 NIO 的庫可用。比如,Java 世界里大多數(shù)的數(shù)據(jù)庫驅(qū)動都是使用阻塞 IO。在 Web 應(yīng)用中,如果用這樣的驅(qū)動發(fā)起大量訪問數(shù)據(jù)庫的調(diào)用,要記得這些調(diào)用是發(fā)生在服務(wù)器線程里的。為了避免這個問題,可以將所有需要和數(shù)據(jù)庫交互的代碼都放入 future 代碼塊里,就像這樣:

// get back a Future[ResultSet] or something similar:
Future {
  queryDB(query)
}

到現(xiàn)在為止,我們都是使用隱式可用的全局 ExecutionContext 來執(zhí)行這些代碼塊。通常,更好的方式是創(chuàng)建一個專用的 ExecutionContext 放在數(shù)據(jù)庫層里??梢詮?Java的 ExecutorService 來它,這也意味著,可以異步的調(diào)整線程池來執(zhí)行數(shù)據(jù)庫調(diào)用,應(yīng)用的其他部分不受影響。

import java.util.concurrent.Executors
import concurrent.ExecutionContext
val executorService = Executors.newFixedThreadPool(4)
val executionContext = ExecutionContext.fromExecutorService(executorService)

長時間運行的計算

取決于應(yīng)用的本質(zhì)特點,一個應(yīng)用偶爾還會調(diào)用一些長時間運行的任務(wù),它們完全不涉及 IO(CPU 密集的任務(wù))。這些任務(wù)也不應(yīng)該在服務(wù)器線程中執(zhí)行,因此需要將它們變成 Future:

Future {
  longRunningComputation(data, moreData)
}

同樣,最好有一些專屬的 ExecutionContext 來處理這些 CPU 密集的計算。怎樣調(diào)整這些線程池大小取決于應(yīng)用的特征,這些已經(jīng)超過了本文的范圍。

總結(jié)

這一章里,我們學(xué)習(xí)了 Promise - 基于 Future 的并發(fā)范式的可寫組件,以及怎樣用它來完成一個 Future;同時,還給出了一些在實踐中使用它們的建議。

下一章會討論 Scala 函數(shù)式編程是如何增加代碼可用性(一個長久以來和面向?qū)ο缶幊滔嚓P(guān)聯(lián)的概念)的。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號