類型 Future

2018-02-24 16:00 更新

作為一個對 Scala 充滿熱情的開發(fā)者,你應(yīng)該已經(jīng)聽說過 Scala 處理并發(fā)的能力,或許你就是被這個吸引來的。相較于大多數(shù)編程語言低級的并發(fā) API,Scala 提供的方法可以讓人們更好的理解并發(fā)以及編寫良構(gòu)的并發(fā)程序。

本章的主題- Future 就是這種方法的兩大基石之一。(另一個是 actor)我會解釋 Future 的優(yōu)點,以及它的函數(shù)式特征。

如果你想動手試試接下來的例子,請確保 Scala 版本不低于 2.9.3,F(xiàn)uture 在 2.10.0 版本中引入,并向后兼容到 2.9.3,最初,它是 Akka 庫的一部分(API略有不同)。

順序代碼為什么會變壞

假設(shè)你想準(zhǔn)備一杯卡布奇諾,你可以一個接一個的執(zhí)行以下步驟:

  1. 研磨所需的咖啡豆
  2. 加熱一些水
  3. 用研磨好的咖啡豆和熱水制做一杯咖啡
  4. 打奶泡
  5. 結(jié)合咖啡和奶泡做成卡布奇諾

轉(zhuǎn)換成 Scala 代碼,可能會是這樣:

import scala.util.Try
// Some type aliases, just for getting more meaningful method signatures:
type CoffeeBeans = String
type GroundCoffee = String
case class Water(temperature: Int)
type Milk = String
type FrothedMilk = String
type Espresso = String
type Cappuccino = String

// dummy implementations of the individual steps:
def grind(beans: CoffeeBeans): GroundCoffee = s"ground coffee of $beans"
def heatWater(water: Water): Water ` water.copy(temperature `  85)
def frothMilk(milk: Milk): FrothedMilk = s"frothed $milk"
def brew(coffee: GroundCoffee, heatedWater: Water): Espresso = "espresso"
def combine(espresso: Espresso, frothedMilk: FrothedMilk): Cappuccino = "cappuccino"

// some exceptions for things that might go wrong in the individual steps
// (we'll need some of them later, use the others when experimenting with the code):
case class GrindingException(msg: String) extends Exception(msg)
case class FrothingException(msg: String) extends Exception(msg)
case class WaterBoilingException(msg: String) extends Exception(msg)
case class BrewingException(msg: String) extends Exception(msg)

// going through these steps sequentially:
def prepareCappuccino(): Try[Cappuccino] = for {
  ground <- Try(grind("arabica beans"))
  water <- Try(heatWater(Water(25)))
  espresso <- Try(brew(ground, water))
  foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)

這樣做有幾個優(yōu)點:可以很輕易的弄清楚事情的步驟,一目了然,而且不會混淆。(畢竟沒有上下文切換)不好的一面是,大部分時間,你的大腦和身體都處于等待的狀態(tài):在等待研磨咖啡豆時,你完全不能做任何事情,只有當(dāng)這一步完成后,你才能開始燒水。這顯然是在浪費時間,所以你可能想一次開始多個步驟,讓它們同時執(zhí)行,一旦水燒開,咖啡豆也磨好了,你可以制做咖啡了,這期間,打奶泡也可以開始了。

這和編寫軟件沒什么不同。一個 Web 服務(wù)器可以用來處理和響應(yīng)請求的線程只有那么多,不能因為要等待數(shù)據(jù)庫查詢或其他 HTTP 服務(wù)調(diào)用的結(jié)果而阻塞了這些可貴的線程。相反,一個異步編程模型和非阻塞 IO 會更合適,這樣的話,當(dāng)一個請求處理在等待數(shù)據(jù)庫查詢結(jié)果時,處理這個請求的線程也能夠為其他請求服務(wù)。

"I heard you like callbacks, so I put a callback in your callback!"

在并發(fā)家族里,你應(yīng)該已經(jīng)知道 nodejs 這個很酷的家伙,nodejs 完全通過回調(diào)來通信,不幸的是,這很容易導(dǎo)致回調(diào)中包含回調(diào)的回調(diào),這簡直是一團(tuán)糟,代碼難以閱讀和調(diào)試。

Scala 的 Future 也允許回調(diào),但它提供了更好的選擇,所以你不怎么需要它。

"I know Futures, and they are completely useless!"

也許你知道些其他的 Future 實現(xiàn),最引人注目的是 Java 提供的那個。但是對于 Java 的 Future,你只能去查看它是否已經(jīng)完成,或者阻塞線程直到其結(jié)束。簡而言之,Java 的 Future 幾乎沒有用,而且用起來絕對不會讓人開心。

如果你認(rèn)為 Scala 的 Future 也是這樣,那大錯特錯了!

Future 語義

scala.concurrent 包里的 Future[T] 是一個容器類型,代表一種返回值類型為 T 的計算。計算可能會出錯,也可能會超時;從而,當(dāng)一個 future 完成時,它可能會包含異常,而不是你期望的那個值。

Future 只能寫一次: 當(dāng)一個 future 完成后,它就不能再被改變了。同時,F(xiàn)uture 只提供了讀取計算值的接口,寫入計算值的任務(wù)交給了 Promise,這樣,API 層面上會有一個清晰的界限。這篇文章里,我們主要關(guān)注前者,下一章會介紹 Promise 的使用。

使用 Future

Future 有多種使用方式,我將通過重寫 “卡布奇諾” 這個例子來說明。

首先,所有可以并行執(zhí)行的函數(shù),應(yīng)該返回一個 Future:

import scala.concurrent.future
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random

def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
  println("start grinding...")
  Thread.sleep(Random.nextInt(2000))
  if (beans == "baked beans") throw GrindingException("are you joking?")
  println("finished grinding...")
  s"ground coffee of $beans"
}

def heatWater(water: Water): Future[Water] = Future {
  println("heating the water now")
  Thread.sleep(Random.nextInt(2000))
  println("hot, it's hot!")
  water.copy(temperature = 85)
}

def frothMilk(milk: Milk): Future[FrothedMilk] = Future {
  println("milk frothing system engaged!")
  Thread.sleep(Random.nextInt(2000))
  println("shutting down milk frothing system")
  s"frothed $milk"
}

def brew(coffee: GroundCoffee, heatedWater: Water): Future[Espresso] = Future {
  println("happy brewing :)")
  Thread.sleep(Random.nextInt(2000))
  println("it's brewed!")
  "espresso"
}

上面的代碼有幾處需要解釋。

首先是 Future 伴生對象里的 apply 方法需要兩個參數(shù):

object Future {
  def apply[T](body: => T)(implicit execctx: ExecutionContext): Future[T]
}

要異步執(zhí)行的計算通過傳名參數(shù) body 傳入。第二個參數(shù)是一個隱式參數(shù),隱式參數(shù)是說,函數(shù)調(diào)用時,如果作用域中存在一個匹配的隱式值,就無需顯示指定這個參數(shù)。ExecutionContext 可以執(zhí)行一個 Future,可以把它看作是一個線程池,是絕大部分 Future API 的隱式參數(shù)。

import scala.concurrent.ExecutionContext.Implicits.global 語句引入了一個全局的執(zhí)行上下文,確保了隱式值的存在。這時候,只需要一個單元素列表,可以用大括號來代替小括號。調(diào)用 future 方法時,經(jīng)常使用這種形式,使得它看起來像是一種語言特性,而不是一個普通方法的調(diào)用。

這個例子沒有大量計算,所以用隨機休眠來模擬以說明問題,而且,為了更清晰的說明并發(fā)代碼的執(zhí)行順序,還在“計算”之前和之后打印了些東西。

計算會在 Future 創(chuàng)建后的某個不確定時間點上由 ExecutionContext 給其分配的某個線程中執(zhí)行。

回調(diào)

對于一些簡單的問題,使用回調(diào)就能很好解決。Future 的回調(diào)是偏函數(shù),你可以把回調(diào)傳遞給 Future 的 onSuccess 方法,如果這個 Future 成功完成,這個回調(diào)就會執(zhí)行,并把 Future 的返回值作為參數(shù)輸入:

 grind("arabica beans").onSuccess { case ground =>
   println("okay, got my ground coffee")
 }

類似的,也可以在 onFailure 上注冊回調(diào),只不過它是在 Future 失敗時調(diào)用,其輸入是一個 Throwable

通常的做法是將兩個回調(diào)結(jié)合在一起以更好的處理 Future:在 onComplete 方法上注冊回調(diào),回調(diào)的輸入是一個 Try。

 import scala.util.{Success, Failure}
 grind("baked beans").onComplete {
   case Success(ground) => println(s"got my $ground")
   case Failure(ex) => println("This grinder needs a replacement, seriously!")
 }

傳遞給 grind 的是 “baked beans”,因此 grind 方法會產(chǎn)生異常,進(jìn)而導(dǎo)致 Future 中的計算失敗。

Future 組合

當(dāng)嵌套使用 Future 時,回調(diào)就變得比較煩人。不過,你也沒必要這么做,因為 Future 是可組合的,這是它真正發(fā)揮威力的時候!

你一定已經(jīng)注意到,之前討論過的所有容器類型都可以進(jìn)行 map 、 flatMap 操作,也可以用在 for 語句中。作為一種容器類型,F(xiàn)uture 支持這些操作也不足為奇!

真正的問題是,在還沒有完成的計算上執(zhí)行這些操作意味這什么,如何去理解它們?

Map 操作

Scala 讓 “時間旅行” 成為可能!假設(shè)想在水加熱后就去檢查它的溫度,可以通過將 Future[Water] 映射到 Future[Boolean] 來完成這件事情:

 val tempreatureOkay: Future[Boolean] = heatWater(Water(25)) map { water =>
   println("we're in the future!")
   (80 to 85) contains (water.temperature)
 }

tempreatureOkay 最終會包含水溫的結(jié)果。你可以去改變 heatWater 的實現(xiàn)來讓它拋出異常(比如說,加熱器爆炸了),然后等待 “we're in the future!” 出現(xiàn)在顯示屏上,不過你永遠(yuǎn)等不到。

寫傳遞給 map 的函數(shù)時,你就處在未來(或者說可能的未來)。一旦 Future[Water] 實例成功完成,這個函數(shù)就會執(zhí)行,只不過,該函數(shù)所在的時間線可能不是你現(xiàn)在所處的這個。如果 Future[Water 失敗,傳遞給 map 的函數(shù)中的事情永遠(yuǎn)不會發(fā)生,調(diào)用 map 的結(jié)果將是一個失敗的 Future[Boolean]。

FlatMap 操作

如果一個 Future 的計算依賴于另一個 Future 的結(jié)果,那需要求救于 flatMap 以避免 Future 的嵌套。

假設(shè),測量水溫的線程需要一些時間,那你可能想異步的去檢查水溫是否 OK。比如,有一個函數(shù),接受一個 Water ,并返回 Future[Boolean]

def temperatureOkay(water: Water): Future[Boolean] = future {
  (80 to 85) contains (water.temperature)
}

使用 flatMap(而不是 map)得到一個 Future[Boolean],而不是 Future[Future[Boolean]]

val nestedFuture: Future[Future[Boolean]] = heatWater(Water(25)) map {
  water => temperatureOkay(water)
}

val flatFuture: Future[Boolean] = heatWater(Water(25)) flatMap {
  water => temperatureOkay(water)
}

同樣,映射只會發(fā)生在 Future[Water] 成功完成情況下。

for 語句

除了調(diào)用 flatMap ,也可以寫成 for 語句。上面的例子可以重寫成:

val acceptable: Future[Boolean] = for {
  heatedWater <- heatWater(Water(25))
  okay <- temperatureOkay(heatedWater)
} yield okay

如果有多個可以并行執(zhí)行的計算,則需要特別注意,要先在 for 語句外面創(chuàng)建好對應(yīng)的 Futures。

def prepareCappuccinoSequentially(): Future[Cappuccino] =
  for {
    ground <- grind("arabica beans")
    water <- heatWater(Water(25))
    foam <- frothMilk("milk")
    espresso <- brew(ground, water)
  } yield combine(espresso, foam)

這看起來很漂亮,但要知道,for 語句只不過是 flatMap 嵌套調(diào)用的語法糖。這意味著,只有當(dāng) Future[GroundCoffee] 成功完成后, heatWater 才會創(chuàng)建 Future[Water]。你可以查看函數(shù)運行時打印出來的東西來驗證這個說法。

因此,要確保在 for 語句之前實例化所有相互獨立的 Futures:

def prepareCappuccino(): Future[Cappuccino] = {
  val groundCoffee = grind("arabica beans")
  val heatedWater = heatWater(Water(20))
  val frothedMilk = frothMilk("milk")
  for {
    ground <- groundCoffee
    water <- heatedWater
    foam <- frothedMilk
    espresso <- brew(ground, water)
  } yield combine(espresso, foam)
}

在 for 語句之前,三個 Future 在創(chuàng)建之后就開始各自獨立的運行,顯示屏的輸出是不確定的。唯一能確定的是 “happy brewing” 總是出現(xiàn)在后面,因為該輸出所在的函數(shù) brew 是在其他兩個函數(shù)執(zhí)行完畢后才開始執(zhí)行的。也因為此,可以在 for 語句里面直接調(diào)用它,當(dāng)然,前提是前面的 Future 都成功完成。

失敗偏向的 Future

你可能會發(fā)現(xiàn) Future[T] 是成功偏向的,允許你使用 map、flatMap、filter 等。

但是,有時候可能處理事情出錯的情況。調(diào)用 Future[T] 上的 failed 方法,會得到一個失敗偏向的 Future,類型是 Future[Throwable]。之后就可以映射這個 Future[Throwable],在失敗的情況下執(zhí)行 mapping 函數(shù)。

總結(jié)

你已經(jīng)見過 Future 了,而且它的前途看起來很光明!因為它是一個可組合、可函數(shù)式使用的容器類型,這讓我們的工作變得異常舒服。

調(diào)用 future 方法可以輕易將阻塞執(zhí)行的代碼變成并發(fā)執(zhí)行,但是,代碼最好原本就是非阻塞的。為了實現(xiàn)它,我們還需要 Promise 來完成 Future,這就是下一章的主題。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號