第二十八章:軟件事務(wù)內(nèi)存 (STM)

2018-02-24 15:49 更新

第二十八章:軟件事務(wù)內(nèi)存 (STM)

在并發(fā)編程的傳統(tǒng)線程模型中,線程之間的數(shù)據(jù)共享需要通過鎖來保持一致性(consistentBalance),當(dāng)數(shù)據(jù)產(chǎn)生變化時(shí),還需要使用條件變量(condition variable)對各個(gè)線程進(jìn)行通知。

某種程度上,Haskell 的 MVar 機(jī)制對上面提到的工具進(jìn)行了改進(jìn),但是,它仍然帶有和這些工具一樣的缺陷:

  • 因?yàn)橥浭褂面i而導(dǎo)致條件競爭(race condition)
  • 因?yàn)椴徽_的加鎖順序而導(dǎo)致死鎖(deadblock)
  • 因?yàn)槲幢徊蹲降漠惓6斐沙绦虮罎?corruption)
  • 因?yàn)殄e(cuò)誤地忽略了通知,造成線程無法正常喚醒(lost wakeup)

這些問題即使在很小的并發(fā)程序里也會經(jīng)常發(fā)生,而在更加龐大的代碼庫或是高負(fù)載的情況下,這些問題會引發(fā)更加糟糕的難題。

比如說,對一個(gè)只有幾個(gè)大范圍鎖的程序進(jìn)行編程并不難,只是一旦這個(gè)程序在高負(fù)載的環(huán)境下運(yùn)行,鎖之間的相互競爭就會變得非常嚴(yán)重。另一方面,如果采用細(xì)粒度(fineo-grained)的鎖機(jī)制,保持軟件正常工作將會變得非常困難。除此之外,就算在負(fù)載不高的情況下, 加鎖帶來的額外的簿記工作(book-keeping)也會對性能產(chǎn)生影響。

基礎(chǔ)知識

軟件事務(wù)內(nèi)存(Software transactional memory)提供了一些簡單但強(qiáng)大的工具。通過這些工具我們可以解決前面提到的大多數(shù)問題。通過 atomically 組合器(combinator), 我們可以在一個(gè)事務(wù)內(nèi)執(zhí)行一批操作。當(dāng)這一組操作開始執(zhí)行的時(shí)候,其他線程是覺察不到這些操作所產(chǎn)生的任何修改,直到所有操作完成。同樣的,當(dāng)前線程也無法察覺其他線程的所產(chǎn)生的修改。這些性質(zhì)表明的操作的隔離性(isolated)。

當(dāng)從一個(gè)事務(wù)退出的時(shí)候,只會發(fā)生以下情況中的一種:

  • 如果沒有其他線程修改了同樣的數(shù)據(jù),當(dāng)前線程產(chǎn)生的修改將會對所有其他線程可見。
  • 否則,當(dāng)前線程的所產(chǎn)生的改動會被丟棄,然后這組操作會被重新執(zhí)行。

atomically 這種全有或全無(all-or-nothing)的天性被稱之為原子性(atomic), atomically 也因?yàn)榈妹?。如果你使用過支持事務(wù)的數(shù)據(jù)庫,你會覺得STM使用起來非常熟悉。

一些簡單的例子

在多玩家角色扮演的游戲里, 一個(gè)玩家的角色會有許多屬性,比如健康,財(cái)產(chǎn)以及金錢。讓我們從基于游戲人物屬性的一些簡單的函數(shù)和類型開始去了解STM的精彩內(nèi)容。隨著學(xué)習(xí)的深入,我們也會不斷地改進(jìn)我們的代碼。

STM的API位于 stm 包,模塊 Control.Concurrent.STM 。

-- file: ch28/GameInventory.hs
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

import Control.Concurrent.STM
import Control.Monad

data Item = Scroll
          | Wand
          | Banjo
            deriving (Eq, Ord, Show)

newtype Gold = Gold Int
    deriving (Eq, Ord, Show, Num)

newtype HitPoint = HitPoint Int
    deriving (Eq, Ord, Show, Num)

type Inventory = TVar [Item]
type Health = TVar HitPoint
type Balance = TVar Gold

data Player = Player {
      balance :: Balance,
      health :: Health,
      inventory :: Inventory
}

參數(shù)化類型 TVar 是一個(gè)可變量,可以在 atomically 塊中讀取或者修改。為了簡單起見,我們把玩家的背包(Inventory)定義為物品的列表。同時(shí)注意到,我們用到了 newtype ,這樣不會混淆財(cái)富和健康屬性。

當(dāng)需要在兩個(gè)賬戶(Balance)之間轉(zhuǎn)賬,我們所要的做的就只是調(diào)整下各自的 Tvar 。

-- file: ch28/GameInventory.hs
basicTransfer qty fromBal toBal = do
  fromQty <- readTVar fromBal
  toQty   <- readTVar toBal
  writeTVar fromBal (fromQty - qty)
  writeTVar toBal   (toQty + qty)

讓我們寫個(gè)簡單的測試函數(shù)

-- file: ch28/GameInventory.hs
transferTest = do
  alice <- newTVar (12 :: Gold)
  bob   <- newTVar 4
  basicTransfer 3 alice bob
  liftM2 (,) (readTVar alice) (readTVar bob)

如果我們在ghci里執(zhí)行下這個(gè)函數(shù),應(yīng)該有如下的結(jié)果

ghci> :load GameInventory
[1 of 1] Compiling Main             ( GameInventory.hs, interpreted )
Ok, modules loaded: Main.
ghci> atomically transferTest
Loading package array-0.4.0.0 ... linking ... done.
Loading package stm-2.3 ... linking ... done.
(Gold 9,Gold 7)

原子性和隔離性保證了當(dāng)其他線程同時(shí)看到 bob 的賬戶和 alice 的賬戶被修改了。

即使在并發(fā)程序里,我們也努力保持代碼盡量的純函數(shù)化。這使得我們的代碼更加容易推導(dǎo)和測試。由于數(shù)據(jù)并沒有事務(wù)性,這也讓底層的STM做更少的事。以下的純函數(shù)實(shí)現(xiàn)了從我們來表示玩家背包的數(shù)列里移除一個(gè)物品。

-- file: ch28/GameInventory.hs
removeInv :: Eq a => a -> [a] -> Maybe [a]
removeInv x xs =
    case takeWhile (/= x) xs of
      (_:ys) -> Just ys
      []     -> Nothing

這里返回值用了 Maybe 類型,它可以用來表示物品是否在玩家的背包里。

下面這個(gè)事務(wù)性的函數(shù)實(shí)現(xiàn)了把一個(gè)物品給另外一個(gè)玩家。這個(gè)函數(shù)有一點(diǎn)點(diǎn)復(fù)雜因?yàn)樾枰袛嘟o予者是否有這個(gè)物品。

-- file: ch28/GameInventory.hs
maybeGiveItem item fromInv toInv = do
  fromList <- readTVar fromInv
  case removeInv item fromList of
    Nothing      -> return False
    Just newList -> do
      writeTVar fromInv newList
      destItems <- readTVar toInv
      writeTVar toInv (item : destItems)
      return True

STM的安全性

既然我們提供了有原子性和隔離型的事務(wù),那么保證我們不能有意或是無意的從 atomically 執(zhí)行塊從脫離顯得格外重要。借由 STM monad,Haskell的類型系統(tǒng)保證了我們這種行為。

ghci> :type atomically
atomically :: STM a -> IO a

atomically 接受一個(gè) STM monad的動作, 然后執(zhí)行并讓我們可以從 IO monad里拿到這個(gè)結(jié)果。 STM monad是所有事務(wù)相關(guān)代碼執(zhí)行的地方。比如這些操作 TVar 值的函數(shù)都在 STM monad里被執(zhí)行。

ghci> :type newTVar
newTVar :: a -> STM (TVar a)
ghci> :type readTVar
readTVar :: TVar a -> STM a
ghci> :type writeTVar
writeTVar :: TVar a -> a -> STM ()

我們之前定義的事務(wù)性函數(shù)也有這個(gè)特性

-- file: ch28/GameInventory.hs
basicTransfer :: Gold -> Balance -> Balance -> STM ()
maybeGiveItem :: Item -> Inventory -> Inventory -> STM Bool

在 STM monad里是不允許執(zhí)行I/O操作或者是修改非事務(wù)性的可變狀態(tài),比如 MVar 的值。這就使得我們可以避免那些違背事務(wù)完整的操作。

重試一個(gè)事務(wù)

maybeGiveItem 這個(gè)函數(shù)看上去稍微有點(diǎn)怪異。只有當(dāng)角色有這個(gè)物品時(shí)才會將它給另外一個(gè)角色,這看上去還算合理,然后返回一個(gè) Bool 值使調(diào)用這個(gè)函數(shù)的代碼變得復(fù)雜。下面這個(gè)函數(shù)調(diào)用了 maybeGiveItem, 它必須根據(jù) maybeGiveItem 的返回結(jié)果來決定如何繼續(xù)執(zhí)行。

maybeSellItem :: Item -> Gold -> Player -> Player -> STM Bool
maybeSellItem item price buyer seller = do
  given <- maybeGiveItem item (inventory seller) (inventory buyer)
  if given
    then do
      basicTransfer price (balance buyer) (balance seller)
      return True
    else return False

我們不僅要檢查物品是否給到了另一個(gè)玩家,而且還得把是否成功這個(gè)信號傳遞給調(diào)用者。這就意味了復(fù)雜性被延續(xù)到了更外層。

下面我們來看看如何用更加優(yōu)雅的方式處理事務(wù)無法成功進(jìn)行的情況。 STM API 提供了一個(gè) retry 函數(shù),它可以立即中斷一個(gè) 無法成功進(jìn)行的 atomically 執(zhí)行塊。正如這個(gè)函數(shù)名本身所指明的意思,當(dāng)它發(fā)生時(shí),執(zhí)行塊會被重新執(zhí)行,所有在這之前的操作都不會被記錄。我們使用 retry 重新實(shí)現(xiàn)了 maybeGiveItem 。

-- file: ch28/GameInventory.hs
giveItem :: Item -> Inventory -> Inventory -> STM ()

giveItem item fromInv toInv = do
    fromList <- readTVar fromInv
    case removeInv item fromList of
        Nothing -> retry
        Just newList -> do
            writeTVar fromInv newList
            readTVar toInv >>= writeTVar toInv . (item :)

我們之前實(shí)現(xiàn)的 basicTransfer 有一個(gè)缺陷:沒有檢查發(fā)送者的賬戶是否有足夠的資金。我們可以使用 retry 來糾正這個(gè)問題并保持方法簽名不變。

-- file: ch28/GameInventory.hs
transfer :: Gold -> Balance -> Balance -> STM ()

transfer qty fromBal toBal = do
  fromQty <- readTVar fromBal
  when (qty > fromQty) $
    retry
  writeTVar fromBal (fromQty - qty)
  readTVar toBal >>= writeTVar toBal . (qty +)

使用 retry 后,銷售物品的函數(shù)就顯得簡單很多。

sellItem :: Item -> Gold -> Player -> Player -> STM ()
sellItem item price buyer seller = do
  giveItem item (inventory seller) (inventory buyer)
  transfer price (balance buyer) (balance seller)

這個(gè)實(shí)現(xiàn)和之前的稍微有點(diǎn)不同。如果有必要會會阻塞以至賣家有東西可賣并且買家有足夠的余額支付,而不是在發(fā)現(xiàn)賣家沒這個(gè)物品可銷售時(shí)馬上返回 False 。

retry 時(shí)到底發(fā)生了什么?

retry 不僅僅使得代碼更加簡潔:它似乎有魔力般的內(nèi)部實(shí)現(xiàn)。當(dāng)我們調(diào)用 retry 的時(shí)候,它并不是馬上重啟事務(wù),而是會先阻塞線程,一直到那些在 retry 之前被訪問過的變量被其他線程修改。

比如,如果我們調(diào)用 transfer 而發(fā)現(xiàn)余額不足, retry 會自發(fā)的等待,直到賬戶余額的變動,然后會重新啟動事務(wù)。 同樣的,對于函數(shù) giveItem , 如果賣家沒有那個(gè)物品,線程就會阻塞直到他有了那個(gè)物品。

選擇替代方案

有時(shí)候我們并不總是希望重啟 atomically 操作即使調(diào)用了 retry 或者由于其他線程的同步修改而導(dǎo)致的失敗。比如函數(shù) sellItem 會不斷地重試,只要沒有滿足其條件:要有物品并且余額足夠。然而我們可能更希望只重試一次。

orElse 組合器允許我們在主操作失敗的情況下,執(zhí)行一個(gè)”備用”操作。

ghci> :type orElse
orElse :: STM a -> STM a -> STM a

我們對 sellItem 做了一點(diǎn)修改:如果 sellItem 失敗, 則 orElse 執(zhí)行 returnFalse 的動作從而使這個(gè)sale函數(shù)立即返回。

trySellItem :: Item -> Gold -> Player -> Player -> STM Bool
trySellItem item price buyer seller =
   sellItem item price buyer seller >> return True
  `orElse`
   return False

在事務(wù)中使用高階代碼

假設(shè)我們想做稍微有挑戰(zhàn)的事情,從一系列的物品中,選取第一個(gè)賣家擁有的并且買家能承擔(dān)費(fèi)用的物品進(jìn)行購買,如果沒有這樣的物品則什么都不做。顯然我們可以很直觀的給出實(shí)現(xiàn)。

-- file: ch28/GameInventory.hs
crummyList :: [(Item, Gold)] -> Player -> Player
             -> STM (Maybe (Item, Gold))
crummyList list buyer seller = go list
    where go []                         = return Nothing
          go (this@(item,price) : rest) = do
              sellItem item price buyer seller
              return (Just this)
           `orElse`
              go rest

在這個(gè)實(shí)現(xiàn)里,我們有碰到了一個(gè)熟悉的問題:把我們的需求和如果實(shí)現(xiàn)混淆在一個(gè)。再深入一點(diǎn)觀察,則會發(fā)現(xiàn)兩個(gè)可重復(fù)使用的模式。

第一個(gè)就是讓事務(wù)失敗而不是重試。

-- file: ch28/GameInventory.hs
maybeSTM :: STM a -> STM (Maybe a)
maybeSTM m = (Just `liftM` m) `orElse` return Nothing

第二個(gè),我們要對一系列的對象執(zhí)行否一個(gè)操作,直到有一個(gè)成功為止。如果全部都失敗,則執(zhí)行 retry 操作。由于 STM 是 MonadPlus 類型類的一個(gè)實(shí)例,所以顯得很方便。

-- file: ch28/STMPlus.hs
instance MonadPlus STM where
  mzero = retry
  mplus = orElse

Control.Monad 模塊定義了一個(gè) msum 函數(shù),而它就是我們所需要的。

-- file: ch28/STMPlus.hs
msum :: MonadPlus m => [m a] -> m a
msum =  foldr mplus mzero

有了這些重要的工具,我們就可以寫出更加簡潔的實(shí)現(xiàn)了。

-- file: ch28/GameInventory.hs
shoppingList :: [(Item, Gold)] -> Player -> Player
             -> STM (Maybe (Item, Gold))
shoppingList list buyer seller = maybeSTM . msum $ map sellOne list
    where sellOne this@(item,price) = do
            sellItem item price buyer seller
            return this

既然 STM 是 MonadPlus 類型類的實(shí)例,我們可以改進(jìn) maybeSTM ,這樣就可以適用于任何 MonadPlus 的實(shí)例。

-- file: ch28/GameInventory.hs
maybeM :: MonadPlus m => m a -> m (Maybe a)
maybeM m = (Just `liftM` m) `mplus` return Nothing

這個(gè)函數(shù)會在很多不同情況下顯得非常有用。

I/O 和 STM

STM monad 禁止任意的I/O操作,因?yàn)镮/O操作會破壞原子性和隔離性。當(dāng)然I/O的操作還是需要的,只是我們需要非常的謹(jǐn)慎。

大多數(shù)時(shí)候,我們會執(zhí)行I/O操作是由于我們在 atomically 塊中產(chǎn)生的一個(gè)結(jié)果。在這些情況下,正確的做法通常是 atomically 返回一些數(shù)據(jù),在I/O monad里的調(diào)用者則根據(jù)這些數(shù)據(jù)知道如何繼續(xù)下一步動作。我們甚至可以返回需要被操作的動作 (action), 因?yàn)樗麄兪堑谝活愔?First Class vaules)。

-- file: ch28/STMIO.hs
someAction :: IO a

stmTransaction :: STM (IO a)
stmTransaction = return someAction

doSomething :: IO a
doSomething = join (atomically stmTransaction)

我們偶爾也需要在 STM 里進(jìn)行I/O操作。比如從一個(gè)肯定存在的文件里讀取一些非可變數(shù)據(jù),這樣的操作并不會違背 STM 保證原子性和隔離性的原則。在這些情況,我們可以使用 unsafeIOToSTM 來執(zhí)行一個(gè) IO 操作。這個(gè)函數(shù)位于偏底層的一個(gè)模塊 GHC.Conc ,所以要謹(jǐn)慎使用。

ghci> :m +GHC.Conc
ghci> :type unsafeIOToSTM
unsafeIOToSTM :: IO a -> STM a

我們所執(zhí)行的這個(gè) IO 動作絕對不能打開另外一個(gè) atomically 事務(wù)。如果一個(gè)線程嘗試嵌套的事務(wù),系統(tǒng)就會拋出異常。

由于類型系統(tǒng)無法幫助我們確保 IO 代碼沒有執(zhí)行一些敏感動作,最安全的做法就是我們盡量的限制使用 unsafeIOToSTM 。下面的例子展示了在 atomically 中執(zhí)行 IO 的典型錯(cuò)誤。

-- file: ch28/STMIO.hs
launchTorpedoes :: IO ()

notActuallyAtomic = do
  doStuff
  unsafeIOToSTM launchTorpedoes
  mightRetry

如果 mightRetry 會引發(fā)事務(wù)的重啟,那么 launchTorpedoes 會被調(diào)用多次。事實(shí)上,我們無法預(yù)見它會被調(diào)用多少次, 因?yàn)橹卦囀怯蛇\(yùn)行時(shí)系統(tǒng)所處理的。解決方案就是在事務(wù)中不要有這種類型的non-idempotent I/O操作。

線程之間的通訊

正如基礎(chǔ)類型 TVar 那樣, stm 包也提供了兩個(gè)更有用的類型用于線程之間的通訊, TMVar 和 TChan 。 TMVar 是STM世界的 MVar , 它可以保存一個(gè) Maybe 類型的值, 即 Just 值或者 Nothing 。 TChan 則是 STM 世界里的 Chan ,它實(shí)現(xiàn)了一個(gè)有類型的先進(jìn)先出(FIFO)通道。

[譯者注:為何說 TMVar 是STM世界的 MVar 而不是 TVar ?是因?yàn)閺膶?shí)踐意義上理解的。 MVar 的特性是要么有值要么為空的一個(gè)容器,所以當(dāng)線程去讀這個(gè)容器時(shí),要么讀到值繼續(xù)執(zhí)行,要么讀不到值就等待。 而 TVar 并沒有這樣的特性,所以引入了 TMVar 。 它的實(shí)現(xiàn)是這樣的, newtypeTMVara=TMVar(TVar(Maybea)) , 正是由于它包含了一個(gè) Maybe 類型的值,這樣就有了”要么有值要么為空”這樣的特性,也就是 MVar 所擁有的特性。]

并發(fā)網(wǎng)絡(luò)鏈接檢查器

作為一個(gè)使用 STM 的實(shí)際例子, 我們將開發(fā)一個(gè)檢查HTML文件里不正確鏈接的程序,這里不正確的鏈接是指那些鏈接指向了一個(gè)錯(cuò)誤的網(wǎng)頁或是無法訪問到其指向的服務(wù)器。用并發(fā)的方式解決這個(gè)問題非常得合適:如果我們嘗試和已經(jīng)下線的服務(wù)器(dead server)通訊,需要有兩分鐘的超時(shí)時(shí)間。如果使用多線程,即使有一兩個(gè)線程由于和響應(yīng)很慢或者下線的服務(wù)器通訊而停住(stuck),我們還是可以繼續(xù)進(jìn)行一些有用的事情。

我們不能簡單直觀的給每一個(gè)URL新建一個(gè)線程,因?yàn)橛捎冢ㄒ彩俏覀冾A(yù)想的)大多數(shù)鏈接是正確的,那么這樣做就會導(dǎo)致CPU或是網(wǎng)絡(luò)連接超負(fù)荷。因此,我們只會創(chuàng)建固定數(shù)量的線程,這些線程會從一個(gè)隊(duì)列里拿URL做檢查。

-- file: ch28/Check.hs
{-# LANGUAGE FlexibleContexts, GeneralizedNewtypeDeriving,
             PatternGuards #-}

import Control.Concurrent (forkIO)
import Control.Concurrent.STM
import Control.Exception (catch, finally)
import Control.Monad.Error
import Control.Monad.State
import Data.Char (isControl)
import Data.List (nub)
import Network.URI
import Prelude hiding (catch)
import System.Console.GetOpt
import System.Environment (getArgs)
import System.Exit (ExitCode(..), exitWith)
import System.IO (hFlush, hPutStrLn, stderr, stdout)
import Text.Printf (printf)
import qualified Data.ByteString.Lazy.Char8 as B
import qualified Data.Set as S

-- 這里需要HTTP包, 它并不是GHC自帶的.
import Network.HTTP

type URL = B.ByteString

data Task = Check URL | Done

main 函數(shù)顯示了這個(gè)程序的主體腳手架(scaffolding)。

-- file: ch28/Check.hs
main :: IO ()
main = do
    (files,k) <- parseArgs
    let n = length files

    -- count of broken links
    badCount <- newTVarIO (0 :: Int)

    -- for reporting broken links
    badLinks <- newTChanIO

    -- for sending jobs to workers
    jobs <- newTChanIO

    -- the number of workers currently running
    workers <- newTVarIO k

    -- one thread reports bad links to stdout
    forkIO $ writeBadLinks badLinks

    -- start worker threads
    forkTimes k workers (worker badLinks jobs badCount)

    -- read links from files, and enqueue them as jobs
    stats <- execJob (mapM_ checkURLs files)
                     (JobState S.empty 0 jobs)

    -- enqueue "please finish" messages
    atomically $ replicateM_ k (writeTChan jobs Done)

    waitFor workers

    broken <- atomically $ readTVar badCount

    printf fmt broken
               (linksFound stats)
               (S.size (linksSeen stats))
               n
  where
    fmt   = "Found %d broken links. " ++
            "Checked %d links (%d unique) in %d files.\n"

當(dāng)我們處于 IO monad時(shí),可以使用 newTVarIO 函數(shù)新建一個(gè) TVar 值。同樣的,也有類似的函數(shù)可以新建 TMVar 和 TChan 值。

在程序用了 printf 函數(shù)打印出最后的結(jié)果。和C語言里類似函數(shù) printf 不同的是Haskell這個(gè)版本會在運(yùn)行時(shí)檢查參數(shù)的個(gè)數(shù)以及其類型。

ghci> :m +Text.Printf
ghci> printf "%d and %d\n" (3::Int)
3 and *** Exception: Printf.printf: argument list ended prematurely
ghci> printf "%s and %d\n" "foo" (3::Int)
foo and 3

ghci 里試試 printf"%d"True ,看看會得到什么結(jié)果。

支持 main 函數(shù)的是幾個(gè)短小的函數(shù)。

-- file: ch28/Check.hs
modifyTVar_ :: TVar a -> (a -> a) -> STM ()
modifyTVar_ tv f = readTVar tv >>= writeTVar tv . f

forkTimes :: Int -> TVar Int -> IO () -> IO ()
forkTimes k alive act =
  replicateM_ k . forkIO $
    act
    `finally`
    (atomically $ modifyTVar_ alive (subtract 1))

forkTimes 函數(shù)新建特定數(shù)量的相同的工作線程,每當(dāng)一個(gè)線程推出時(shí),則”活動”線程的計(jì)數(shù)器相應(yīng)的減一。我們使用 finally 組合器確保無論線程是如何終止的,都會減少”活動”線程的數(shù)量。

下一步, writeBadLinks 會把每個(gè)失效或者死亡(dead)的鏈接打印到 stdout 。

-- file: ch28/Check.hs
writeBadLinks :: TChan String -> IO ()
writeBadLinks c =
  forever $
    atomically (readTChan c) >>= putStrLn >> hFlush stdout

上面我們使用了 forever 組合器使一個(gè)操作永遠(yuǎn)的執(zhí)行。

ghci> :m +Control.Monad
ghci> :type forever
forever :: (Monad m) => m a -> m ()

waitFor 函數(shù)使用了 check , 當(dāng)它的參數(shù)是 False 時(shí)會調(diào)用 retry 。

-- file: ch28/Check.hs
waitFor :: TVar Int -> IO ()
waitFor alive = atomically $ do
  count <- readTVar alive
  check (count == 0)

檢查一個(gè)鏈接

這個(gè)原生的函數(shù)實(shí)現(xiàn)了如何檢查一個(gè)鏈接的狀態(tài)。 代碼和 [第二十二章 Chapter 22, Extended Example: Web Client Programming] 里的 podcatcher 相似但有一點(diǎn)不同。

-- file: ch28/Check.hs
getStatus :: URI -> IO (Either String Int)
getStatus = chase (5 :: Int)
  where
    chase 0 _ = bail "too many redirects"
    chase n u = do
      resp <- getHead u
      case resp of
        Left err -> bail (show err)
        Right r ->
          case rspCode r of
            (3,_,_) ->
               case findHeader HdrLocation r of
                 Nothing -> bail (show r)
                 Just u' ->
                   case parseURI u' of
                     Nothing -> bail "bad URL"
                     Just url -> chase (n-1) url
            (a,b,c) -> return . Right $ a * 100 + b * 10 + c

    bail = return . Left

getHead :: URI -> IO (Result Response)
getHead uri = simpleHTTP Request { rqURI = uri,
                                   rqMethod = HEAD,
                                   rqHeaders = [],
                                   rqBody = "" }

為了避免無盡的重定向相應(yīng),我們只允許固定次數(shù)的重定向請求。我們通過查看HTTP標(biāo)準(zhǔn)HEAD信息來確認(rèn)鏈接的有效性, 比起一個(gè)完整的GET請求,這樣做可以減少網(wǎng)絡(luò)流量。

這個(gè)代碼是典型的”marching off the left of the screen”風(fēng)格。正如之前我們提到的,需要謹(jǐn)慎使用這樣的風(fēng)格。下面我們用 ErrorT monad transformer 和幾個(gè)通用一點(diǎn)的方法進(jìn)行了重新實(shí)現(xiàn),它看上去簡潔了很多。

-- file: ch28/Check.hs
getStatusE = runErrorT . chase (5 :: Int)
  where
    chase :: Int -> URI -> ErrorT String IO Int
    chase 0 _ = throwError "too many redirects"
    chase n u = do
      r <- embedEither show =<< liftIO (getHead u)
      case rspCode r of
        (3,_,_) -> do
            u'  <- embedMaybe (show r)  $ findHeader HdrLocation r
            url <- embedMaybe "bad URL" $ parseURI u'
            chase (n-1) url
        (a,b,c) -> return $ a*100 + b*10 + c

-- Some handy embedding functions.
embedEither :: (MonadError e m) => (s -> e) -> Either s a -> m a
embedEither f = either (throwError . f) return

embedMaybe :: (MonadError e m) => e -> Maybe a -> m a
embedMaybe err = maybe (throwError err) return

工作者線程

每個(gè)工作者線程(Worker Thread)從一個(gè)共享隊(duì)列里拿一個(gè)任務(wù),這個(gè)任務(wù)要么檢查鏈接有效性,要么讓線程推出。

-- file: ch28/Check.hs
worker :: TChan String -> TChan Task -> TVar Int -> IO ()
worker badLinks jobQueue badCount = loop
  where
    -- Consume jobs until we are told to exit.
    loop = do
        job <- atomically $ readTChan jobQueue
        case job of
            Done  -> return ()
            Check x -> checkOne (B.unpack x) >> loop

    -- Check a single link.
    checkOne url = case parseURI url of
        Just uri -> do
            code <- getStatus uri `catch` (return . Left . show)
            case code of
                Right 200 -> return ()
                Right n   -> report (show n)
                Left err  -> report err
        _ -> report "invalid URL"

        where report s = atomically $ do
                           modifyTVar_ badCount (+1)
                           writeTChan badLinks (url ++ " " ++ s)

查找鏈接

我們構(gòu)造了基于 IO monad 的 狀態(tài) monad transformer棧用于查找鏈接。這個(gè)狀態(tài)會記錄我們已經(jīng)找到過的鏈接(避免重復(fù))、鏈接的數(shù)量以及一個(gè)隊(duì)列,我們會把需要做檢查的鏈接放到這個(gè)隊(duì)列里。

-- file: ch28/Check.hs
data JobState = JobState { linksSeen :: S.Set URL,
                           linksFound :: Int,
                           linkQueue :: TChan Task }

newtype Job a = Job { runJob :: StateT JobState IO a }
    deriving (Monad, MonadState JobState, MonadIO)

execJob :: Job a -> JobState -> IO JobState
execJob = execStateT . runJob

嚴(yán)格來說,對于對立運(yùn)行的小型程序,我們并不需要用到 newtype ,然后我們還是將它作為一個(gè)好的編碼實(shí)踐的例子放在這里。(畢竟也只多了幾行代碼)

main 函數(shù)實(shí)現(xiàn)了對每個(gè)輸入文件調(diào)用一次 checkURLs 方法,所以 checkURLs 的參數(shù)就是單個(gè)文件。

-- file: ch28/Check.hs
checkURLs :: FilePath -> Job ()
checkURLs f = do
    src <- liftIO $ B.readFile f
    let urls = extractLinks src
    filterM seenURI urls >>= sendJobs
    updateStats (length urls)

updateStats :: Int -> Job ()
updateStats a = modify $ \s ->
    s { linksFound = linksFound s + a }

-- | Add a link to the set we have seen.
insertURI :: URL -> Job ()
insertURI c = modify $ \s ->
    s { linksSeen = S.insert c (linksSeen s) }

-- | If we have seen a link, return False.  Otherwise, record that we
-- have seen it, and return True.
seenURI :: URL -> Job Bool
seenURI url = do
    seen <- (not . S.member url) `liftM` gets linksSeen
    insertURI url
    return seen

sendJobs :: [URL] -> Job ()
sendJobs js = do
    c <- gets linkQueue
    liftIO . atomically $ mapM_ (writeTChan c . Check) js

extractLinks 函數(shù)并沒有嘗試去準(zhǔn)確的去解析一個(gè)HTMP或是文本文件,而只是匹配那些看上去像URL的字符串。我們認(rèn)為這樣做就夠了。

-- file: ch28/Check.hs
extractLinks :: B.ByteString -> [URL]
extractLinks = concatMap uris . B.lines
  where uris s      = filter looksOkay (B.splitWith isDelim s)
        isDelim c   = isControl c || c `elem` " <>\"{}|\\^[]`"
        looksOkay s = http `B.isPrefixOf` s
        http        = B.pack "http:"

命令行的實(shí)現(xiàn)

我們使用了 System.Console.GetOpt 模塊來解析命令行參數(shù)。這個(gè)模塊提供了很多解析命令行參數(shù)的很有用的方法,不過使用起來稍微有點(diǎn)繁瑣。

-- file: ch28/Check.hs
data Flag = Help | N Int
            deriving Eq

parseArgs :: IO ([String], Int)
parseArgs = do
    argv <- getArgs
    case parse argv of
        ([], files, [])                     -> return (nub files, 16)
        (opts, files, [])
            | Help `elem` opts              -> help
            | [N n] <- filter (/=Help) opts -> return (nub files, n)
        (_,_,errs)                          -> die errs
  where
    parse argv = getOpt Permute options argv
    header     = "Usage: urlcheck [-h] [-n n] [file ...]"
    info       = usageInfo header options
    dump       = hPutStrLn stderr
    die errs   = dump (concat errs ++ info) >> exitWith (ExitFailure 1)
    help       = dump info                  >> exitWith ExitSuccess

getOpt 函數(shù)接受三個(gè)參數(shù)

  • 參數(shù)順序的定義。 它定義了選項(xiàng)(Option)是否可以和其他參數(shù)混淆使用(就是我們上面用到的 Permute )或者是選項(xiàng)必須出現(xiàn)在參數(shù)之前。
  • 選項(xiàng)的定義。 每個(gè)選項(xiàng)有這四個(gè)部分組成: 簡稱,全稱,選項(xiàng)的描述(比如是否接受參數(shù)) 以及用戶說明。
  • 參數(shù)和選項(xiàng)數(shù)組,類似于 getArgs 的返回值。

這個(gè)函數(shù)返回一個(gè)三元組,包括用戶輸入的選項(xiàng),參數(shù)以及錯(cuò)誤信息(如果有的話)。

我們使用 Flag 代數(shù)類型(Algebraic Data Type)表示程序所能接收的選項(xiàng)。

-- file: ch28/Check.hs
options :: [OptDescr Flag]
options = [ Option ['h'] ["help"] (NoArg Help)
                   "Show this help message",
            Option ['n'] []       (ReqArg (\s -> N (read s)) "N")
                   "Number of concurrent connections (default 16)" ]

options 列表保存了每個(gè)程序能接收選項(xiàng)的描述。每個(gè)描述必須要生成一個(gè) Flag 值。參考上面例子中是如何使用 NoArg 和 ReqArg 。 GetOpt 模塊的 ArgDescr 類型有很多構(gòu)造函數(shù)(Constructors)。

-- file: ch28/GetOpt.hs
data ArgDescr a = NoArg a
                | ReqArg (String -> a) String
                | OptArg (Maybe String -> a) String
  • NoArg 接受一個(gè)參數(shù)用來表示這個(gè)選項(xiàng)。在我們這個(gè)例子中,如果用戶在調(diào)用程序時(shí)輸入 -h 或者 --help , 我們就用 Help 值表示。
  • ReqArg 的第一個(gè)函數(shù)作為參數(shù),這個(gè)函數(shù)把用戶輸入的參數(shù)轉(zhuǎn)化成相應(yīng)的值;第二個(gè)參數(shù)是用來說明的。 這里我們是將字符串轉(zhuǎn)換為數(shù)值(integer),然后再給類型 Flag 的構(gòu)造函數(shù) N 。
  • OptArg 和 ReqArg 很相似,但它允許選項(xiàng)沒有對應(yīng)的參數(shù)。

模式守衛(wèi) (Pattern guards)

函數(shù) parseArgs 的定義里其實(shí)潛在了一個(gè)語言擴(kuò)展(Language Extension), Pattern guards。用它可以寫出更加簡要的guard expressions. 它通過語言擴(kuò)展 PatternGuards 來使用。

一個(gè)Pattern Guard有三個(gè)組成部分: 一個(gè)模式(Pattern), 一個(gè) <- 符號以及一個(gè)表達(dá)式。表達(dá)式會被解釋然后和模式相匹配。 如果成功,在模式中定義的變量會被賦值。我們可以在一個(gè)guard里同時(shí)使用pattern guards和普通的 Bool guard expressions。

-- file: ch28/PatternGuard.hs
{-# LANGUAGE PatternGuards #-}

testme x xs | Just y <- lookup x xs, y > 3 = y
            | otherwise                    = 0

在上面的例子中,當(dāng)關(guān)鍵字 x 存在于alist xs 并且大于等于3,則返回它所對應(yīng)的值。下面的定義實(shí)現(xiàn)了同樣的功能。

-- file: ch28/PatternGuard.hs
testme_noguards x xs = case lookup x xs of
                         Just y | y > 3 -> y
                         _              -> 0

Pattern guards 使得我們可以把一系列的guards和 case 表達(dá)式組合到單個(gè)guard,從而寫出更加簡潔并容易理解的guards。

STM的實(shí)踐意義

至此我們還并未提及STM所提供的特別優(yōu)越的地方。比如它在做組合(composes)方面就表現(xiàn)的很好:當(dāng)需要向一個(gè)事務(wù)中增加邏輯時(shí),只需要用到常見的函數(shù) (>>=) 和 (>>) 。

組合的概念在構(gòu)建模塊化軟件是顯得格外重要。如果我們把倆段都沒有問題的代碼組合在一起,也應(yīng)該是能很好工作的。常規(guī)的線程編程技術(shù)無法實(shí)現(xiàn)組合,然而由于STM提供了一些很關(guān)鍵的前提,從而使在線程編程時(shí)使用組合變得可能。

STM monad防止了我們意外的非事務(wù)性的I/O。我們不再需要關(guān)心鎖的順序,因?yàn)榇a里根本沒有鎖機(jī)制。我們可以忘記丟失喚醒,因?yàn)椴辉儆袟l件變量了。如果有異常發(fā)生,我們則可以用函數(shù) catchSTM 捕捉到,或者是往上級傳遞。 最后,我們可以用 retry 和 orElse 以更加漂亮的方式組織代碼。

采用STM機(jī)制的代碼不會死鎖,但是導(dǎo)致饑餓還是有可能的。一個(gè)長事務(wù)導(dǎo)致另外一個(gè)事務(wù)不停的 retry 。為了解決這樣的問題,需要盡量的短事務(wù)并保持?jǐn)?shù)據(jù)一致性。

合理的放棄控制權(quán)

無論是同步管理還是內(nèi)存管理,經(jīng)常會遇到保留控制權(quán)的情況:一些軟件需要對延時(shí)或是內(nèi)存使用記錄有很強(qiáng)的保證,因此就必須花很多時(shí)間和精力去管理和調(diào)試顯式的代碼。然后對于軟件的大多數(shù)實(shí)際情況,垃圾回收(Garbage Collection)和STM已經(jīng)做的足夠好了。

STM并不是一顆完美的靈丹妙藥。當(dāng)我們選擇垃圾回收而不是顯式的內(nèi)存管理, 我們是放棄了控制權(quán)從而獲得更加安全的代碼。 同樣的,當(dāng)使用STM時(shí),我們放棄了底層的細(xì)節(jié),從而希望代碼可讀性更好,更加容易理解。

使用不變量

STM并不能消除某些類型的bug。比如,我們在一個(gè) atomically 事務(wù)中從某個(gè)賬號中取錢,然后返回到 IO monad,然后在另一個(gè) atomically 事務(wù)中把錢存到另一個(gè)賬號,那么代碼就會產(chǎn)生不一致性,因?yàn)闀谀硞€(gè)特定時(shí)刻,這部分錢不會出現(xiàn)的任意一個(gè)賬號里。

-- file: ch28/GameInventory.hs
bogusTransfer qty fromBal toBal = do
  fromQty <- atomically $ readTVar fromBal
  -- window of inconsistency
  toQty   <- atomically $ readTVar toBal
  atomically $ writeTVar fromBal (fromQty - qty)
  -- window of inconsistency
  atomically $ writeTVar toBal   (toQty + qty)

bogusSale :: Item -> Gold -> Player -> Player -> IO ()
bogusSale item price buyer seller = do
  atomically $ giveItem item (inventory seller) (inventory buyer)
  bogusTransfer price (balance buyer) (balance seller)

在同步程序中,這類問題顯然很難而且不容易重現(xiàn)。比如上述例子中的不一致性問題通常只存在一段很短的時(shí)間內(nèi)。在開發(fā)階段通常不會出現(xiàn)這類問題,而往往只有在負(fù)載很高的產(chǎn)品環(huán)境才有可能發(fā)生。

我們可以用函數(shù) alwaysSucceeds 定義一個(gè)不變量,它是永遠(yuǎn)為真的一個(gè)數(shù)據(jù)屬性。

ghci> :type alwaysSucceeds
alwaysSucceeds :: STM a -> STM ()

當(dāng)創(chuàng)建一個(gè)不變量時(shí),它馬上會被檢查。如果要失敗,那么這個(gè)不變量會拋出異常。更有意思的是,不變量會在經(jīng)后每個(gè)事務(wù)完成時(shí)自動被檢查。如果在任何一個(gè)點(diǎn)上失敗,事務(wù)就會推出,不變量拋出的異常也會被傳遞下去。這就意味著當(dāng)不變量的條件被違反時(shí),我們就可以馬上得到反饋。

比如,下面兩個(gè)函數(shù)給本章開始時(shí)定義的游戲世界增加玩家

-- file: ch28/GameInventory.hs
newPlayer :: Gold -> HitPoint -> [Item] -> STM Player
newPlayer balance health inventory =
    Player `liftM` newTVar balance
              `ap` newTVar health
              `ap` newTVar inventory

populateWorld :: STM [Player]
populateWorld = sequence [ newPlayer 20 20 [Wand, Banjo],
                           newPlayer 10 12 [Scroll] ]

下面的函數(shù)則返回了一個(gè)不變量,通過它我們可以保證整個(gè)游戲世界資金總是平衡的:即任何時(shí)候的資金總量和游戲建立時(shí)的總量是一樣的。

-- file: ch28/GameInventory.hs
consistentBalance :: [Player] -> STM (STM ())
consistentBalance players = do
    initialTotal <- totalBalance
    return $ do
      curTotal <- totalBalance
      when (curTotal /= initialTotal) $
        error "inconsistent global balance"
  where totalBalance   = foldM addBalance 0 players
        addBalance a b = (a+) `liftM` readTVar (balance b)

下面我們寫個(gè)函數(shù)來試驗(yàn)下。

-- file: ch28/GameInventory.hs
tryBogusSale = do
  players@(alice:bob:_) <- atomically populateWorld
  atomically $ alwaysSucceeds =<< consistentBalance players
  bogusSale Wand 5 alice bob

由于在函數(shù) bogusTransfer 中不正確地使用了 atomically 而會導(dǎo)致不一致性, 當(dāng)我們在 ghci 里運(yùn)行這個(gè)方法時(shí)則會檢測到這個(gè)不一致性。

ghci> tryBogusSale
*** Exception: inconsistent global balance
以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號