Twitter最重要的標(biāo)準(zhǔn)庫是?Util?和?Finagle。Util 可以理解為Scala和Java的標(biāo)準(zhǔn)庫擴(kuò)展,提供了標(biāo)準(zhǔn)庫中沒有的功能或已有功能的更合適的實(shí)現(xiàn)。Finagle 是我們的RPC系統(tǒng),核心分布式系統(tǒng)組件。
Futures已經(jīng)在并發(fā)一節(jié)中簡單討論過。它是調(diào)異步處理的中心機(jī)制,滲透在我們代碼庫中,也是Finagle的核心。Futures允許組合并發(fā)事件,簡化了高并發(fā)操作。也是JVM上異步并發(fā)的一種高效的實(shí)現(xiàn)。
Twitter的future是異步的,所以基本上任何操作(阻塞操作)——基本上任何可以suspend它的線程的執(zhí)行;網(wǎng)絡(luò)IO和磁盤IO是就是例子——必須由系統(tǒng)處理,它為結(jié)果提供future。Finagle為網(wǎng)絡(luò)IO提供了這樣一種系統(tǒng)。
Futures清晰簡單:它們持有一個(gè)尚未完成運(yùn)算結(jié)果的 promise 。它們是一個(gè)簡單的容器——一個(gè)占位符。一次計(jì)算當(dāng)然可能會(huì)失敗,這種狀況必須被編碼:一個(gè)Future可以是三種狀態(tài)之一: pending, failed, completed。
讓我們重新審視我們所說的組合:將簡單的組件合成一個(gè)更復(fù)雜的。函數(shù)組合的一個(gè)權(quán)威的例子:給定函數(shù) f 和 g,組合函數(shù) (g°f)(x) = g(f(x)) ——結(jié)果先對(duì) x使用f函數(shù),然后在使用g函數(shù)——用Scala來寫:
val f = (i: Int) => i.toString
val g = (s: String) => s+s+s
val h = g compose f // : Int => String
scala> h(123)
res0: java.lang.String = 123123123
復(fù)合函數(shù)h,是個(gè)新的函數(shù),由之前定義的f和g函數(shù)合成。
Futures是一種集合類型——它是個(gè)包含0或1個(gè)元素的容器——你可以發(fā)現(xiàn)他們有標(biāo)準(zhǔn)的集合方法(eg:map, filter, foreach)。因?yàn)镕uture的值是延遲的,結(jié)果應(yīng)用這些方法中的任何一種必然也延遲;在
val result: Future[Int]
val resultStr: Future[String] = result map { i => i.toString }
函數(shù) { i => i.toString } 不會(huì)被調(diào)用,直到int值可用;轉(zhuǎn)換集合的resultStr在可用之前也一直是待定狀態(tài)。
List可以被扁平化(flattened):
val listOfList: List[List[Int]] = ..
val list: List[Int] = listOfList.flatten
這對(duì)future也是有意義的:
val futureOfFuture: Future[Future[Int]] = ..
val future: Future[Int] = futureOfFuture.flatten
因?yàn)閒uture是延遲的,flatten的實(shí)現(xiàn)——立即返回——不得不返回一個(gè)等待外部future (**Future[**Future[Int]**]**
) 完成的future (Future[**Future[Int]**]
).如果外部future失敗,內(nèi)部flattened future也將失敗。
Future (類似List) 也定義了flatMap;Future[A] 定義方法flatMap的簽名
flatMap[B](f: A => Future[B]): Future[B]
如同組合 map 和 flatten,我們可以這樣實(shí)現(xiàn):
def flatMap[B](f: A => Future[B]): Future[B] = {
val mapped: Future[Future[B]] = this map f
val flattened: Future[B] = mapped.flatten
flattened
}
這是一種有威力的組合!使用flatMap我們可以定義一個(gè) Future 作為兩個(gè)Future序列的結(jié)果。第二個(gè)future 的計(jì)算基于第一個(gè)的結(jié)果。想象我們需要2次RPC調(diào)用來驗(yàn)證一個(gè)用戶身份,我們可以用下面的方式組合操作:
def getUser(id: Int): Future[User]
def authenticate(user: User): Future[Boolean]
def isIdAuthed(id: Int): Future[Boolean] =
getUser(id) flatMap { user => authenticate(user) }
這種組合類型的一個(gè)額外的好處是錯(cuò)誤處理是內(nèi)置的:如果getUser(..)或authenticate(..)失敗,future 從 isAuthred(..)返回時(shí)將會(huì)失敗。這里我們沒有額外的錯(cuò)誤處理的代碼。
Future回調(diào)方法(respond, onSuccess, onFailure, ensure) 返回一個(gè)新的Future,并鏈接到調(diào)用者。這個(gè)Future被保證只有在它調(diào)用者完成后才完成,使用模式如下:
acquireResource()
future onSuccess { value =>
computeSomething(value)
} ensure {
freeResource()
}
freeResource() 被保證只有在 computeSomething之后才執(zhí)行,這樣就模擬了try-finally 模式。
使用 onSuccess替代 foreach —— 它與 onFailure 方法對(duì)稱,命名的意圖更明確,并且也允許 chaining。
永遠(yuǎn)避免直接創(chuàng)建Promise實(shí)例: 幾乎每一個(gè)任務(wù)都可以通過使用預(yù)定義的組合子完成。這些組合子確保錯(cuò)誤和取消是可傳播的, 通常鼓勵(lì)的數(shù)據(jù)流風(fēng)格的編程,不再需要同步和volatility聲明。
用尾遞歸風(fēng)格編寫的代碼不再導(dǎo)致堆??臻g泄漏,并使得以數(shù)據(jù)流風(fēng)格高效的實(shí)現(xiàn)循環(huán)成為可能:
case class Node(parent: Option[Node], ...)
def getNode(id: Int): Future[Node] = ...
def getHierarchy(id: Int, nodes: List[Node] = Nil): Future[Node] =
getNode(id) flatMap {
case n@Node(Some(parent), ..) => getHierarchy(parent, n :: nodes)
case n => Future.value((n :: nodes).reverse)
}
Future定義很多有用的方法: 使用 Future.value() 和 Future.exception() 來創(chuàng)建未滿意(pre-satisfied) 的future。Future.collect(), Future.join() 和 Future.select() 提供了組合子將多個(gè)future合成一個(gè)(例如:scatter-gather操作的gather部分)。
Future實(shí)現(xiàn)了一種弱形式的取消。調(diào)用Future#cancel 不會(huì)直接終止運(yùn)算,而是發(fā)送某個(gè)級(jí)別的可被任何處理查詢的觸發(fā)信號(hào),最終滿足這個(gè)future。Cancellation信號(hào)流向相反的方向:一個(gè)由消費(fèi)者設(shè)置的cancellation信號(hào),會(huì)傳播到它的生產(chǎn)者。生產(chǎn)者使用 Promise的onCancellation來監(jiān)聽信號(hào)并執(zhí)行相應(yīng)的動(dòng)作。
這意味這cancellation語意上依賴生產(chǎn)者,沒有默認(rèn)的實(shí)現(xiàn)。cancellation只是一個(gè)提示。
Util的Local提供了一個(gè)位于特定的future派發(fā)樹(dispatch tree)的引用單元(cell)。設(shè)定一個(gè)local的值,使這個(gè)值可以用于被同一個(gè)線程的Future 延遲的任何計(jì)算。有一些類似于thread locals(注:Java中的線程機(jī)制),不同的是它們的范圍不是一個(gè)Java線程,而是一個(gè) future 線程樹。在
trait User {
def name: String
def incrCost(points: Int)
}
val user = new Local[User]
...
user() = currentUser
rpc() ensure {
user().incrCost(10)
}
在 ensure塊中的 user() 將在回調(diào)被添加的時(shí)候引用 user local的值。
就thread locals來說,我們的Locals非常的方便,但要盡量避免使用:除非確信通過顯式傳遞數(shù)據(jù)時(shí)問題不能被充分的解決,哪怕解決起來有些繁重。
Locals有效的被核心庫使用在非常常見的問題上——線程通過RPC跟蹤,傳播監(jiān)視器,為future的回調(diào)創(chuàng)建stack traces——任何其他解決方法都使得用戶負(fù)擔(dān)過度。Locals在幾乎任何其他情況下都不適合。
并發(fā)系統(tǒng)由于需要協(xié)調(diào)訪問數(shù)據(jù)和資源而變得復(fù)雜。Actor提出一種簡化的策略:每一個(gè)actor是一個(gè)順序的進(jìn)程(process),保持自己的狀態(tài)和資源,數(shù)據(jù)通過消息的方式與其它actor共享。 共享數(shù)據(jù)需要actor之間通信。
Offer/Broker 建立于Actor之上,以這三種重要的方式表現(xiàn):1,通信通道(Brokers)是first class——即發(fā)送消息需要通過Brokers,而非直接到actor。2, Offer/Broker 是一種同步機(jī)制:通信會(huì)話是同步的。 這意味我們可以用 Broker做為協(xié)調(diào)機(jī)制:當(dāng)進(jìn)程a發(fā)送一條信息給進(jìn)程b;a和b都要對(duì)系統(tǒng)狀態(tài)達(dá)成一致。3, 最后,通信可以選擇性地執(zhí)行:一個(gè)進(jìn)程可以提出幾個(gè)不同的通信,其中的一個(gè)將被獲取。
為了以一種通用的方式支持選擇性通信(以及其他組合),我們需要將通信的描述和執(zhí)行解耦。這正是Offer做的——它是一個(gè)持久數(shù)據(jù)用于描述一次通信;為了執(zhí)行這個(gè)通信(offer執(zhí)行),我們通過它的sync()方法同步
trait Offer[T] {
def sync(): Future[T]
}
返回 Future[T] 當(dāng)通信被獲取的時(shí)候生成交換值。
Broker通過offer協(xié)調(diào)值的交換——它是通信的通道:
trait Broker[T] {
def send(msg: T): Offer[Unit]
val recv: Offer[T]
}
所以,當(dāng)創(chuàng)建兩個(gè)offer
val b: Broker[Int]
val sendOf = b.send(1)
val recvOf = b.recv
sendOf和recvOf都同步
// In process 1:
sendOf.sync()
// In process 2:
recvOf.sync()
兩個(gè)offer都獲取并且值1被交換。
通過將多個(gè)offer和Offer.choose綁定來執(zhí)行可選擇通信。
def choose[T](ofs: Offer[T]*): Offer[T]
上面的代碼生成一個(gè)新的offer,當(dāng)同步時(shí)獲取一個(gè)特定的ofs——第一個(gè)可用的。當(dāng)多個(gè)都立即可用時(shí),隨機(jī)獲取一個(gè)。
Offer對(duì)象有些一次性的Offers用于與來自Broker的Offer構(gòu)建。
Offer.timeout(duration): Offer[Unit]
offer在給定時(shí)間后激活。Offer.never將用于不會(huì)有效,Offer.const(value)在給定值后立即有效。這些操作由選擇性通信來組合是非常有用的。例如,在一個(gè)send操作中使用超時(shí):
Offer.choose(
Offer.timeout(10.seconds),
broker.send("my value")
).sync()
人們可能會(huì)比較 Offer/Broker 與SynchronousQueue,他們有細(xì)微但非常重要的區(qū)別。Offer可以被組合,而queue不能。例如,考慮一組queues,描述為 Brokers:
val q0 = new Broker[Int]
val q1 = new Broker[Int]
val q2 = new Broker[Int]
現(xiàn)在讓我們?yōu)樽x取創(chuàng)建一個(gè)合并的queue
val anyq: Offer[Int] = Offer.choose(q0.recv, q1.recv, q2.recv)
anyq是一個(gè)將從第一個(gè)可用的queue中讀取的offer。注意 anyq 仍是同步的——我們?nèi)匀粨碛械讓雨?duì)列的語義。這類組合是不可能用queue實(shí)現(xiàn)的。
連接池在網(wǎng)絡(luò)應(yīng)用中很常見,并且它們的實(shí)現(xiàn)常常需要技巧——例如,在從池中獲取一個(gè)連接的時(shí)候,通常需要超時(shí)機(jī)制,因?yàn)椴煌目蛻舳擞胁煌难舆t需求。池的簡單原則:維護(hù)一個(gè)連接隊(duì)列,滿足那些進(jìn)入的等待者。使用傳統(tǒng)的同步原語,這通常需要兩個(gè)隊(duì)列(queues):一個(gè)用于等待者(當(dāng)沒有連接可用時(shí)),一個(gè)用于連接(當(dāng)沒有等待者時(shí))。
使用 Offer/Brokers ,可以表達(dá)得非常自然:
class Pool(conns: Seq[Conn]) {
private[this] val waiters = new Broker[Conn]
private[this] val returnConn = new Broker[Conn]
val get: Offer[Conn] = waiters.recv
def put(c: Conn) { returnConn ! c }
private[this] def loop(connq: Queue[Conn]) {
Offer.choose(
if (connq.isEmpty) Offer.never else {
val (head, rest) = connq.dequeue
waiters.send(head) { _ => loop(rest) }
},
returnConn.recv { c => loop(connq enqueue c) }
).sync()
}
loop(Queue.empty ++ conns)
}
loop總是提供一個(gè)歸還的連接,但只有queue非空的時(shí)候才會(huì)send。 使用持久化隊(duì)列(persistent queue)更進(jìn)一步簡化邏輯。與連接池的接口也是通過Offer實(shí)現(xiàn),所以調(diào)用者如果愿意設(shè)置timeout,他們可以通過利用組合子(combinators)來做:
val conn: Future[Option[Conn]] = Offer.choose(
pool.get { conn => Some(conn) },
Offer.timeout(1.second) { _ => None }
).sync()
實(shí)現(xiàn)timeout不需要額外的記賬(bookkeeping);這是因?yàn)镺ffer的語義:如果Offer.timeout被選擇,不會(huì)再有offer從池中獲得——連接池和它的調(diào)用者在各自waiter的broker上不必同時(shí)同意接受和發(fā)送。
把并發(fā)程序構(gòu)造為一組順序的同步通信進(jìn)程,通常很有用——有時(shí)程序被大大地簡化了。Offer和Broker提供了一組工具來讓它簡單并一致。確實(shí),它們的應(yīng)用超越了我們可能認(rèn)為是經(jīng)典并發(fā)性問題——并發(fā)編程(有Offer/Broker的輔助)是一種有用的構(gòu)建工具,正如子例程(subroutines),類,和模塊都是——來自CSP(譯注:Communicating sequential processes的縮寫,即通信順序進(jìn)程)的重要思想。
這里有一個(gè)埃拉托色尼篩子可以構(gòu)造為一個(gè)針對(duì)一個(gè)整數(shù)流(stream of integers)的連續(xù)的應(yīng)用過濾器 。首先,我們需要一個(gè)整數(shù)的源(source of integers):
def integers(from: Int): Offer[Int] = {
val b = new Broker[Int]
def gen(n: Int): Unit = b.send(n).sync() ensure gen(n + 1)
gen(from)
b.recv
}
integers(n) 方法簡單地提供了從n開始的所有連續(xù)的整數(shù)。然后我們需要一個(gè)過濾器:
def filter(in: Offer[Int], prime: Int): Offer[Int] = {
val b = new Broker[Int]
def loop() {
in.sync() onSuccess { i =>
if (i % prime != 0)
b.send(i).sync() ensure loop()
else
loop()
}
}
loop()
b.recv
}
filter(in, p) 方法返回的offer刪除了in中的所有質(zhì)數(shù)(prime)的倍數(shù)。最終我們定義了我們的篩子(sieve):
def sieve = {
val b = new Broker[Int]
def loop(of: Offer[Int]) {
for (prime <- of.sync(); _ <- b.send(prime).sync())
loop(filter(of, prime))
}
loop(integers(2))
b.recv
}
loop() 工作很簡單:從of中讀取下一個(gè)質(zhì)數(shù),然后對(duì)of應(yīng)用過濾器排除這個(gè)質(zhì)數(shù)。loop不斷的遞歸,持續(xù)的質(zhì)數(shù)被過濾,于是我們得到了篩選結(jié)果。我們現(xiàn)在打印前10000個(gè)質(zhì)數(shù):
val primes = sieve
0 until 10000 foreach { _ =>
println(primes.sync()())
}
除了構(gòu)造簡單,組件正交,這種做法也給你一種流式篩子(streaming sieve):你不需要事先計(jì)算出你感興趣的質(zhì)數(shù)集合,從而進(jìn)一步提高了模塊化。
更多建議: