Twitter標(biāo)準(zhǔn)庫

2018-02-24 15:48 更新

Twitter最重要的標(biāo)準(zhǔn)庫是?Util?和?Finagle。Util 可以理解為Scala和Java的標(biāo)準(zhǔn)庫擴(kuò)展,提供了標(biāo)準(zhǔn)庫中沒有的功能或已有功能的更合適的實現(xiàn)。Finagle 是我們的RPC系統(tǒng),核心分布式系統(tǒng)組件。

Future

Futures已經(jīng)在并發(fā)一節(jié)中簡單討論過。它是調(diào)異步處理的中心機(jī)制,滲透在我們代碼庫中,也是Finagle的核心。Futures允許組合并發(fā)事件,簡化了高并發(fā)操作。也是JVM上異步并發(fā)的一種高效的實現(xiàn)。

Twitter的future是異步的,所以基本上任何操作(阻塞操作)——基本上任何可以suspend它的線程的執(zhí)行;網(wǎng)絡(luò)IO和磁盤IO是就是例子——必須由系統(tǒng)處理,它為結(jié)果提供future。Finagle為網(wǎng)絡(luò)IO提供了這樣一種系統(tǒng)。

Futures清晰簡單:它們持有一個尚未完成運算結(jié)果的 promise 。它們是一個簡單的容器——一個占位符。一次計算當(dāng)然可能會失敗,這種狀況必須被編碼:一個Future可以是三種狀態(tài)之一: pending, failed, completed。

閑話:?組合(composition)

讓我們重新審視我們所說的組合:將簡單的組件合成一個更復(fù)雜的。函數(shù)組合的一個權(quán)威的例子:給定函數(shù) f 和 g,組合函數(shù) (g°f)(x) = g(f(x)) ——結(jié)果先對 x使用f函數(shù),然后在使用g函數(shù)——用Scala來寫:

val f = (i: Int) => i.toString
val g = (s: String) => s+s+s
val h = g compose f  // : Int => String

scala> h(123)
res0: java.lang.String = 123123123

復(fù)合函數(shù)h,是個新的函數(shù),由之前定義的f和g函數(shù)合成。

Futures是一種集合類型——它是個包含0或1個元素的容器——你可以發(fā)現(xiàn)他們有標(biāo)準(zhǔn)的集合方法(eg:map, filter, foreach)。因為Future的值是延遲的,結(jié)果應(yīng)用這些方法中的任何一種必然也延遲;在

 val result: Future[Int]
 val resultStr: Future[String] = result map { i => i.toString }

函數(shù) { i => i.toString } 不會被調(diào)用,直到int值可用;轉(zhuǎn)換集合的resultStr在可用之前也一直是待定狀態(tài)。

List可以被扁平化(flattened):

 val listOfList: List[List[Int]] = ..
 val list: List[Int] = listOfList.flatten

這對future也是有意義的:

 val futureOfFuture: Future[Future[Int]] = ..
 val future: Future[Int] = futureOfFuture.flatten

因為future是延遲的,flatten的實現(xiàn)——立即返回——不得不返回一個等待外部future (**Future[**Future[Int]**]**) 完成的future (Future[**Future[Int]**]).如果外部future失敗,內(nèi)部flattened future也將失敗。

Future (類似List) 也定義了flatMap;Future[A] 定義方法flatMap的簽名

 flatMap[B](f: A => Future[B]): Future[B]

如同組合 map 和 flatten,我們可以這樣實現(xiàn):

 def flatMap[B](f: A => Future[B]): Future[B] = {
   val mapped: Future[Future[B]] = this map f
   val flattened: Future[B] = mapped.flatten
   flattened
 }

這是一種有威力的組合!使用flatMap我們可以定義一個 Future 作為兩個Future序列的結(jié)果。第二個future 的計算基于第一個的結(jié)果。想象我們需要2次RPC調(diào)用來驗證一個用戶身份,我們可以用下面的方式組合操作:

 def getUser(id: Int): Future[User]
 def authenticate(user: User): Future[Boolean]

 def isIdAuthed(id: Int): Future[Boolean] =
   getUser(id) flatMap { user => authenticate(user) }

這種組合類型的一個額外的好處是錯誤處理是內(nèi)置的:如果getUser(..)或authenticate(..)失敗,future 從 isAuthred(..)返回時將會失敗。這里我們沒有額外的錯誤處理的代碼。

風(fēng)格

Future回調(diào)方法(respond, onSuccess, onFailure, ensure) 返回一個新的Future,并鏈接到調(diào)用者。這個Future被保證只有在它調(diào)用者完成后才完成,使用模式如下:

 acquireResource()
 future onSuccess { value =>
   computeSomething(value)
 } ensure {
   freeResource()
 }

freeResource() 被保證只有在 computeSomething之后才執(zhí)行,這樣就模擬了try-finally 模式。

使用 onSuccess替代 foreach —— 它與 onFailure 方法對稱,命名的意圖更明確,并且也允許 chaining。

永遠(yuǎn)避免直接創(chuàng)建Promise實例: 幾乎每一個任務(wù)都可以通過使用預(yù)定義的組合子完成。這些組合子確保錯誤和取消是可傳播的, 通常鼓勵的數(shù)據(jù)流風(fēng)格的編程,不再需要同步和volatility聲明。

用尾遞歸風(fēng)格編寫的代碼不再導(dǎo)致堆棧空間泄漏,并使得以數(shù)據(jù)流風(fēng)格高效的實現(xiàn)循環(huán)成為可能:

 case class Node(parent: Option[Node], ...)
 def getNode(id: Int): Future[Node] = ...

 def getHierarchy(id: Int, nodes: List[Node] = Nil): Future[Node] =
   getNode(id) flatMap {
     case n@Node(Some(parent), ..) => getHierarchy(parent, n :: nodes)
     case n => Future.value((n :: nodes).reverse)
   }

Future定義很多有用的方法: 使用 Future.value() 和 Future.exception() 來創(chuàng)建未滿意(pre-satisfied) 的future。Future.collect(), Future.join() 和 Future.select() 提供了組合子將多個future合成一個(例如:scatter-gather操作的gather部分)。

Cancellation

Future實現(xiàn)了一種弱形式的取消。調(diào)用Future#cancel 不會直接終止運算,而是發(fā)送某個級別的可被任何處理查詢的觸發(fā)信號,最終滿足這個future。Cancellation信號流向相反的方向:一個由消費者設(shè)置的cancellation信號,會傳播到它的生產(chǎn)者。生產(chǎn)者使用 Promise的onCancellation來監(jiān)聽信號并執(zhí)行相應(yīng)的動作。

這意味這cancellation語意上依賴生產(chǎn)者,沒有默認(rèn)的實現(xiàn)。cancellation只是一個提示。

Local

Util的Local提供了一個位于特定的future派發(fā)樹(dispatch tree)的引用單元(cell)。設(shè)定一個local的值,使這個值可以用于被同一個線程的Future 延遲的任何計算。有一些類似于thread locals(注:Java中的線程機(jī)制),不同的是它們的范圍不是一個Java線程,而是一個 future 線程樹。在

 trait User {
   def name: String
   def incrCost(points: Int)
 }
 val user = new Local[User]

 ...

 user() = currentUser
 rpc() ensure {
   user().incrCost(10)
 }

在 ensure塊中的 user() 將在回調(diào)被添加的時候引用 user local的值。

就thread locals來說,我們的Locals非常的方便,但要盡量避免使用:除非確信通過顯式傳遞數(shù)據(jù)時問題不能被充分的解決,哪怕解決起來有些繁重。

Locals有效的被核心庫使用在非常常見的問題上——線程通過RPC跟蹤,傳播監(jiān)視器,為future的回調(diào)創(chuàng)建stack traces——任何其他解決方法都使得用戶負(fù)擔(dān)過度。Locals在幾乎任何其他情況下都不適合。

Offer/Broker

并發(fā)系統(tǒng)由于需要協(xié)調(diào)訪問數(shù)據(jù)和資源而變得復(fù)雜。Actor提出一種簡化的策略:每一個actor是一個順序的進(jìn)程(process),保持自己的狀態(tài)和資源,數(shù)據(jù)通過消息的方式與其它actor共享。 共享數(shù)據(jù)需要actor之間通信。

Offer/Broker 建立于Actor之上,以這三種重要的方式表現(xiàn):1,通信通道(Brokers)是first class——即發(fā)送消息需要通過Brokers,而非直接到actor。2, Offer/Broker 是一種同步機(jī)制:通信會話是同步的。 這意味我們可以用 Broker做為協(xié)調(diào)機(jī)制:當(dāng)進(jìn)程a發(fā)送一條信息給進(jìn)程b;a和b都要對系統(tǒng)狀態(tài)達(dá)成一致。3, 最后,通信可以選擇性地執(zhí)行:一個進(jìn)程可以提出幾個不同的通信,其中的一個將被獲取。

為了以一種通用的方式支持選擇性通信(以及其他組合),我們需要將通信的描述和執(zhí)行解耦。這正是Offer做的——它是一個持久數(shù)據(jù)用于描述一次通信;為了執(zhí)行這個通信(offer執(zhí)行),我們通過它的sync()方法同步

 trait Offer[T] {
   def sync(): Future[T]
 }

返回 Future[T] 當(dāng)通信被獲取的時候生成交換值。

Broker通過offer協(xié)調(diào)值的交換——它是通信的通道:

 trait Broker[T] {
   def send(msg: T): Offer[Unit]
   val recv: Offer[T]
 }

所以,當(dāng)創(chuàng)建兩個offer

 val b: Broker[Int]
 val sendOf = b.send(1)
 val recvOf = b.recv

sendOf和recvOf都同步

 // In process 1:
 sendOf.sync()

 // In process 2:
 recvOf.sync()

兩個offer都獲取并且值1被交換。

通過將多個offer和Offer.choose綁定來執(zhí)行可選擇通信。

 def choose[T](ofs: Offer[T]*): Offer[T]

上面的代碼生成一個新的offer,當(dāng)同步時獲取一個特定的ofs——第一個可用的。當(dāng)多個都立即可用時,隨機(jī)獲取一個。

Offer對象有些一次性的Offers用于與來自Broker的Offer構(gòu)建。

 Offer.timeout(duration): Offer[Unit]

offer在給定時間后激活。Offer.never將用于不會有效,Offer.const(value)在給定值后立即有效。這些操作由選擇性通信來組合是非常有用的。例如,在一個send操作中使用超時:

 Offer.choose(
   Offer.timeout(10.seconds),
   broker.send("my value")
 ).sync()

人們可能會比較 Offer/Broker 與SynchronousQueue,他們有細(xì)微但非常重要的區(qū)別。Offer可以被組合,而queue不能。例如,考慮一組queues,描述為 Brokers:

 val q0 = new Broker[Int]
 val q1 = new Broker[Int]
 val q2 = new Broker[Int]

現(xiàn)在讓我們?yōu)樽x取創(chuàng)建一個合并的queue

 val anyq: Offer[Int] = Offer.choose(q0.recv, q1.recv, q2.recv)

anyq是一個將從第一個可用的queue中讀取的offer。注意 anyq 仍是同步的——我們?nèi)匀粨碛械讓雨犃械恼Z義。這類組合是不可能用queue實現(xiàn)的。

例子:一個簡單的連接池

連接池在網(wǎng)絡(luò)應(yīng)用中很常見,并且它們的實現(xiàn)常常需要技巧——例如,在從池中獲取一個連接的時候,通常需要超時機(jī)制,因為不同的客戶端有不同的延遲需求。池的簡單原則:維護(hù)一個連接隊列,滿足那些進(jìn)入的等待者。使用傳統(tǒng)的同步原語,這通常需要兩個隊列(queues):一個用于等待者(當(dāng)沒有連接可用時),一個用于連接(當(dāng)沒有等待者時)。

使用 Offer/Brokers ,可以表達(dá)得非常自然:

 class Pool(conns: Seq[Conn]) {
   private[this] val waiters = new Broker[Conn]
   private[this] val returnConn = new Broker[Conn]

   val get: Offer[Conn] = waiters.recv
   def put(c: Conn) { returnConn ! c }

   private[this] def loop(connq: Queue[Conn]) {
     Offer.choose(
       if (connq.isEmpty) Offer.never else {
         val (head, rest) = connq.dequeue
         waiters.send(head) { _ => loop(rest) }
       },
       returnConn.recv { c => loop(connq enqueue c) }
     ).sync()
   }

   loop(Queue.empty ++ conns)
 }

loop總是提供一個歸還的連接,但只有queue非空的時候才會send。 使用持久化隊列(persistent queue)更進(jìn)一步簡化邏輯。與連接池的接口也是通過Offer實現(xiàn),所以調(diào)用者如果愿意設(shè)置timeout,他們可以通過利用組合子(combinators)來做:

  val conn: Future[Option[Conn]] = Offer.choose(
    pool.get { conn => Some(conn) },
    Offer.timeout(1.second) { _ => None }
  ).sync()

實現(xiàn)timeout不需要額外的記賬(bookkeeping);這是因為Offer的語義:如果Offer.timeout被選擇,不會再有offer從池中獲得——連接池和它的調(diào)用者在各自waiter的broker上不必同時同意接受和發(fā)送。

埃拉托色尼篩子(Sieve of Eratosthenes 譯注:一種用于篩選素數(shù)的算法)

把并發(fā)程序構(gòu)造為一組順序的同步通信進(jìn)程,通常很有用——有時程序被大大地簡化了。Offer和Broker提供了一組工具來讓它簡單并一致。確實,它們的應(yīng)用超越了我們可能認(rèn)為是經(jīng)典并發(fā)性問題——并發(fā)編程(有Offer/Broker的輔助)是一種有用的構(gòu)建工具,正如子例程(subroutines),類,和模塊都是——來自CSP(譯注:Communicating sequential processes的縮寫,即通信順序進(jìn)程)的重要思想。

這里有一個埃拉托色尼篩子可以構(gòu)造為一個針對一個整數(shù)流(stream of integers)的連續(xù)的應(yīng)用過濾器 。首先,我們需要一個整數(shù)的源(source of integers):

 def integers(from: Int): Offer[Int] = {
   val b = new Broker[Int]
   def gen(n: Int): Unit = b.send(n).sync() ensure gen(n + 1)
   gen(from)
   b.recv
 }

integers(n) 方法簡單地提供了從n開始的所有連續(xù)的整數(shù)。然后我們需要一個過濾器:

 def filter(in: Offer[Int], prime: Int): Offer[Int] = {
   val b = new Broker[Int]
   def loop() {
     in.sync() onSuccess { i =>
       if (i % prime != 0)
         b.send(i).sync() ensure loop()
       else
         loop()
     }
   }
   loop()

   b.recv
 }

filter(in, p) 方法返回的offer刪除了in中的所有質(zhì)數(shù)(prime)的倍數(shù)。最終我們定義了我們的篩子(sieve):

 def sieve = {
   val b = new Broker[Int]
   def loop(of: Offer[Int]) {
     for (prime <- of.sync(); _ <- b.send(prime).sync())
       loop(filter(of, prime))
   }
   loop(integers(2))
   b.recv
 }

loop() 工作很簡單:從of中讀取下一個質(zhì)數(shù),然后對of應(yīng)用過濾器排除這個質(zhì)數(shù)。loop不斷的遞歸,持續(xù)的質(zhì)數(shù)被過濾,于是我們得到了篩選結(jié)果。我們現(xiàn)在打印前10000個質(zhì)數(shù):

 val primes = sieve
 0 until 10000 foreach { _ =>
   println(primes.sync()())
 }

除了構(gòu)造簡單,組件正交,這種做法也給你一種流式篩子(streaming sieve):你不需要事先計算出你感興趣的質(zhì)數(shù)集合,從而進(jìn)一步提高了模塊化。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號