作為一個對 Scala 充滿熱情的開發(fā)者,你應(yīng)該已經(jīng)聽說過 Scala 處理并發(fā)的能力,或許你就是被這個吸引來的。相較于大多數(shù)編程語言低級的并發(fā) API,Scala 提供的方法可以讓人們更好的理解并發(fā)以及編寫良構(gòu)的并發(fā)程序。
本章的主題- Future 就是這種方法的兩大基石之一。(另一個是 actor)我會解釋 Future 的優(yōu)點,以及它的函數(shù)式特征。
如果你想動手試試接下來的例子,請確保 Scala 版本不低于 2.9.3,F(xiàn)uture 在 2.10.0 版本中引入,并向后兼容到 2.9.3,最初,它是 Akka 庫的一部分(API略有不同)。
假設(shè)你想準(zhǔn)備一杯卡布奇諾,你可以一個接一個的執(zhí)行以下步驟:
轉(zhuǎn)換成 Scala 代碼,可能會是這樣:
import scala.util.Try
// Some type aliases, just for getting more meaningful method signatures:
type CoffeeBeans = String
type GroundCoffee = String
case class Water(temperature: Int)
type Milk = String
type FrothedMilk = String
type Espresso = String
type Cappuccino = String
// dummy implementations of the individual steps:
def grind(beans: CoffeeBeans): GroundCoffee = s"ground coffee of $beans"
def heatWater(water: Water): Water ` water.copy(temperature ` 85)
def frothMilk(milk: Milk): FrothedMilk = s"frothed $milk"
def brew(coffee: GroundCoffee, heatedWater: Water): Espresso = "espresso"
def combine(espresso: Espresso, frothedMilk: FrothedMilk): Cappuccino = "cappuccino"
// some exceptions for things that might go wrong in the individual steps
// (we'll need some of them later, use the others when experimenting with the code):
case class GrindingException(msg: String) extends Exception(msg)
case class FrothingException(msg: String) extends Exception(msg)
case class WaterBoilingException(msg: String) extends Exception(msg)
case class BrewingException(msg: String) extends Exception(msg)
// going through these steps sequentially:
def prepareCappuccino(): Try[Cappuccino] = for {
ground <- Try(grind("arabica beans"))
water <- Try(heatWater(Water(25)))
espresso <- Try(brew(ground, water))
foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)
這樣做有幾個優(yōu)點:可以很輕易的弄清楚事情的步驟,一目了然,而且不會混淆。(畢竟沒有上下文切換)不好的一面是,大部分時間,你的大腦和身體都處于等待的狀態(tài):在等待研磨咖啡豆時,你完全不能做任何事情,只有當(dāng)這一步完成后,你才能開始燒水。這顯然是在浪費時間,所以你可能想一次開始多個步驟,讓它們同時執(zhí)行,一旦水燒開,咖啡豆也磨好了,你可以制做咖啡了,這期間,打奶泡也可以開始了。
這和編寫軟件沒什么不同。一個 Web 服務(wù)器可以用來處理和響應(yīng)請求的線程只有那么多,不能因為要等待數(shù)據(jù)庫查詢或其他 HTTP 服務(wù)調(diào)用的結(jié)果而阻塞了這些可貴的線程。相反,一個異步編程模型和非阻塞 IO 會更合適,這樣的話,當(dāng)一個請求處理在等待數(shù)據(jù)庫查詢結(jié)果時,處理這個請求的線程也能夠為其他請求服務(wù)。
"I heard you like callbacks, so I put a callback in your callback!"
在并發(fā)家族里,你應(yīng)該已經(jīng)知道 nodejs 這個很酷的家伙,nodejs 完全通過回調(diào)來通信,不幸的是,這很容易導(dǎo)致回調(diào)中包含回調(diào)的回調(diào),這簡直是一團(tuán)糟,代碼難以閱讀和調(diào)試。
Scala 的 Future 也允許回調(diào),但它提供了更好的選擇,所以你不怎么需要它。
"I know Futures, and they are completely useless!"
也許你知道些其他的 Future 實現(xiàn),最引人注目的是 Java 提供的那個。但是對于 Java 的 Future,你只能去查看它是否已經(jīng)完成,或者阻塞線程直到其結(jié)束。簡而言之,Java 的 Future 幾乎沒有用,而且用起來絕對不會讓人開心。
如果你認(rèn)為 Scala 的 Future 也是這樣,那大錯特錯了!
scala.concurrent 包里的 Future[T]
是一個容器類型,代表一種返回值類型為 T
的計算。計算可能會出錯,也可能會超時;從而,當(dāng)一個 future 完成時,它可能會包含異常,而不是你期望的那個值。
Future 只能寫一次: 當(dāng)一個 future 完成后,它就不能再被改變了。同時,F(xiàn)uture 只提供了讀取計算值的接口,寫入計算值的任務(wù)交給了 Promise,這樣,API 層面上會有一個清晰的界限。這篇文章里,我們主要關(guān)注前者,下一章會介紹 Promise 的使用。
Future 有多種使用方式,我將通過重寫 “卡布奇諾” 這個例子來說明。
首先,所有可以并行執(zhí)行的函數(shù),應(yīng)該返回一個 Future:
import scala.concurrent.future
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random
def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
println("start grinding...")
Thread.sleep(Random.nextInt(2000))
if (beans == "baked beans") throw GrindingException("are you joking?")
println("finished grinding...")
s"ground coffee of $beans"
}
def heatWater(water: Water): Future[Water] = Future {
println("heating the water now")
Thread.sleep(Random.nextInt(2000))
println("hot, it's hot!")
water.copy(temperature = 85)
}
def frothMilk(milk: Milk): Future[FrothedMilk] = Future {
println("milk frothing system engaged!")
Thread.sleep(Random.nextInt(2000))
println("shutting down milk frothing system")
s"frothed $milk"
}
def brew(coffee: GroundCoffee, heatedWater: Water): Future[Espresso] = Future {
println("happy brewing :)")
Thread.sleep(Random.nextInt(2000))
println("it's brewed!")
"espresso"
}
上面的代碼有幾處需要解釋。
首先是 Future 伴生對象里的 apply
方法需要兩個參數(shù):
object Future {
def apply[T](body: => T)(implicit execctx: ExecutionContext): Future[T]
}
要異步執(zhí)行的計算通過傳名參數(shù) body
傳入。第二個參數(shù)是一個隱式參數(shù),隱式參數(shù)是說,函數(shù)調(diào)用時,如果作用域中存在一個匹配的隱式值,就無需顯示指定這個參數(shù)。ExecutionContext
可以執(zhí)行一個 Future,可以把它看作是一個線程池,是絕大部分 Future API 的隱式參數(shù)。
import scala.concurrent.ExecutionContext.Implicits.global
語句引入了一個全局的執(zhí)行上下文,確保了隱式值的存在。這時候,只需要一個單元素列表,可以用大括號來代替小括號。調(diào)用 future
方法時,經(jīng)常使用這種形式,使得它看起來像是一種語言特性,而不是一個普通方法的調(diào)用。
這個例子沒有大量計算,所以用隨機休眠來模擬以說明問題,而且,為了更清晰的說明并發(fā)代碼的執(zhí)行順序,還在“計算”之前和之后打印了些東西。
計算會在 Future 創(chuàng)建后的某個不確定時間點上由 ExecutionContext
給其分配的某個線程中執(zhí)行。
對于一些簡單的問題,使用回調(diào)就能很好解決。Future 的回調(diào)是偏函數(shù),你可以把回調(diào)傳遞給 Future 的 onSuccess
方法,如果這個 Future 成功完成,這個回調(diào)就會執(zhí)行,并把 Future 的返回值作為參數(shù)輸入:
grind("arabica beans").onSuccess { case ground =>
println("okay, got my ground coffee")
}
類似的,也可以在 onFailure
上注冊回調(diào),只不過它是在 Future 失敗時調(diào)用,其輸入是一個 Throwable
。
通常的做法是將兩個回調(diào)結(jié)合在一起以更好的處理 Future:在 onComplete
方法上注冊回調(diào),回調(diào)的輸入是一個 Try。
import scala.util.{Success, Failure}
grind("baked beans").onComplete {
case Success(ground) => println(s"got my $ground")
case Failure(ex) => println("This grinder needs a replacement, seriously!")
}
傳遞給 grind
的是 “baked beans”,因此 grind
方法會產(chǎn)生異常,進(jìn)而導(dǎo)致 Future 中的計算失敗。
當(dāng)嵌套使用 Future 時,回調(diào)就變得比較煩人。不過,你也沒必要這么做,因為 Future 是可組合的,這是它真正發(fā)揮威力的時候!
你一定已經(jīng)注意到,之前討論過的所有容器類型都可以進(jìn)行 map
、 flatMap
操作,也可以用在 for 語句中。作為一種容器類型,F(xiàn)uture 支持這些操作也不足為奇!
真正的問題是,在還沒有完成的計算上執(zhí)行這些操作意味這什么,如何去理解它們?
Scala 讓 “時間旅行” 成為可能!假設(shè)想在水加熱后就去檢查它的溫度,可以通過將 Future[Water]
映射到 Future[Boolean]
來完成這件事情:
val tempreatureOkay: Future[Boolean] = heatWater(Water(25)) map { water =>
println("we're in the future!")
(80 to 85) contains (water.temperature)
}
tempreatureOkay
最終會包含水溫的結(jié)果。你可以去改變 heatWater
的實現(xiàn)來讓它拋出異常(比如說,加熱器爆炸了),然后等待 “we're in the future!” 出現(xiàn)在顯示屏上,不過你永遠(yuǎn)等不到。
寫傳遞給 map
的函數(shù)時,你就處在未來(或者說可能的未來)。一旦 Future[Water]
實例成功完成,這個函數(shù)就會執(zhí)行,只不過,該函數(shù)所在的時間線可能不是你現(xiàn)在所處的這個。如果 Future[Water
失敗,傳遞給 map
的函數(shù)中的事情永遠(yuǎn)不會發(fā)生,調(diào)用 map
的結(jié)果將是一個失敗的 Future[Boolean]
。
如果一個 Future 的計算依賴于另一個 Future 的結(jié)果,那需要求救于 flatMap
以避免 Future 的嵌套。
假設(shè),測量水溫的線程需要一些時間,那你可能想異步的去檢查水溫是否 OK。比如,有一個函數(shù),接受一個 Water
,并返回 Future[Boolean]
:
def temperatureOkay(water: Water): Future[Boolean] = future {
(80 to 85) contains (water.temperature)
}
使用 flatMap
(而不是 map
)得到一個 Future[Boolean]
,而不是 Future[Future[Boolean]]
:
val nestedFuture: Future[Future[Boolean]] = heatWater(Water(25)) map {
water => temperatureOkay(water)
}
val flatFuture: Future[Boolean] = heatWater(Water(25)) flatMap {
water => temperatureOkay(water)
}
同樣,映射只會發(fā)生在 Future[Water]
成功完成情況下。
除了調(diào)用 flatMap
,也可以寫成 for 語句。上面的例子可以重寫成:
val acceptable: Future[Boolean] = for {
heatedWater <- heatWater(Water(25))
okay <- temperatureOkay(heatedWater)
} yield okay
如果有多個可以并行執(zhí)行的計算,則需要特別注意,要先在 for 語句外面創(chuàng)建好對應(yīng)的 Futures。
def prepareCappuccinoSequentially(): Future[Cappuccino] =
for {
ground <- grind("arabica beans")
water <- heatWater(Water(25))
foam <- frothMilk("milk")
espresso <- brew(ground, water)
} yield combine(espresso, foam)
這看起來很漂亮,但要知道,for 語句只不過是 flatMap
嵌套調(diào)用的語法糖。這意味著,只有當(dāng) Future[GroundCoffee]
成功完成后, heatWater
才會創(chuàng)建 Future[Water]
。你可以查看函數(shù)運行時打印出來的東西來驗證這個說法。
因此,要確保在 for 語句之前實例化所有相互獨立的 Futures:
def prepareCappuccino(): Future[Cappuccino] = {
val groundCoffee = grind("arabica beans")
val heatedWater = heatWater(Water(20))
val frothedMilk = frothMilk("milk")
for {
ground <- groundCoffee
water <- heatedWater
foam <- frothedMilk
espresso <- brew(ground, water)
} yield combine(espresso, foam)
}
在 for 語句之前,三個 Future 在創(chuàng)建之后就開始各自獨立的運行,顯示屏的輸出是不確定的。唯一能確定的是 “happy brewing” 總是出現(xiàn)在后面,因為該輸出所在的函數(shù) brew
是在其他兩個函數(shù)執(zhí)行完畢后才開始執(zhí)行的。也因為此,可以在 for 語句里面直接調(diào)用它,當(dāng)然,前提是前面的 Future 都成功完成。
你可能會發(fā)現(xiàn) Future[T]
是成功偏向的,允許你使用 map
、flatMap
、filter
等。
但是,有時候可能處理事情出錯的情況。調(diào)用 Future[T]
上的 failed
方法,會得到一個失敗偏向的 Future,類型是 Future[Throwable]
。之后就可以映射這個 Future[Throwable]
,在失敗的情況下執(zhí)行 mapping 函數(shù)。
你已經(jīng)見過 Future 了,而且它的前途看起來很光明!因為它是一個可組合、可函數(shù)式使用的容器類型,這讓我們的工作變得異常舒服。
調(diào)用 future
方法可以輕易將阻塞執(zhí)行的代碼變成并發(fā)執(zhí)行,但是,代碼最好原本就是非阻塞的。為了實現(xiàn)它,我們還需要 Promise
來完成 Future
,這就是下一章的主題。
更多建議: