在并發(fā)編程的傳統(tǒng)線程模型中,線程之間的數(shù)據(jù)共享需要通過鎖來保持一致性(consistentBalance),當(dāng)數(shù)據(jù)產(chǎn)生變化時,還需要使用條件變量(condition variable)對各個線程進(jìn)行通知。
某種程度上,Haskell 的 MVar 機(jī)制對上面提到的工具進(jìn)行了改進(jìn),但是,它仍然帶有和這些工具一樣的缺陷:
這些問題即使在很小的并發(fā)程序里也會經(jīng)常發(fā)生,而在更加龐大的代碼庫或是高負(fù)載的情況下,這些問題會引發(fā)更加糟糕的難題。
比如說,對一個只有幾個大范圍鎖的程序進(jìn)行編程并不難,只是一旦這個程序在高負(fù)載的環(huán)境下運(yùn)行,鎖之間的相互競爭就會變得非常嚴(yán)重。另一方面,如果采用細(xì)粒度(fineo-grained)的鎖機(jī)制,保持軟件正常工作將會變得非常困難。除此之外,就算在負(fù)載不高的情況下, 加鎖帶來的額外的簿記工作(book-keeping)也會對性能產(chǎn)生影響。
軟件事務(wù)內(nèi)存(Software transactional memory)提供了一些簡單但強(qiáng)大的工具。通過這些工具我們可以解決前面提到的大多數(shù)問題。通過 atomically 組合器(combinator), 我們可以在一個事務(wù)內(nèi)執(zhí)行一批操作。當(dāng)這一組操作開始執(zhí)行的時候,其他線程是覺察不到這些操作所產(chǎn)生的任何修改,直到所有操作完成。同樣的,當(dāng)前線程也無法察覺其他線程的所產(chǎn)生的修改。這些性質(zhì)表明的操作的隔離性(isolated)。
當(dāng)從一個事務(wù)退出的時候,只會發(fā)生以下情況中的一種:
atomically 這種全有或全無(all-or-nothing)的天性被稱之為原子性(atomic), atomically 也因為得名。如果你使用過支持事務(wù)的數(shù)據(jù)庫,你會覺得STM使用起來非常熟悉。
在多玩家角色扮演的游戲里, 一個玩家的角色會有許多屬性,比如健康,財產(chǎn)以及金錢。讓我們從基于游戲人物屬性的一些簡單的函數(shù)和類型開始去了解STM的精彩內(nèi)容。隨著學(xué)習(xí)的深入,我們也會不斷地改進(jìn)我們的代碼。
STM的API位于 stm 包,模塊 Control.Concurrent.STM 。
-- file: ch28/GameInventory.hs
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
import Control.Concurrent.STM
import Control.Monad
data Item = Scroll
| Wand
| Banjo
deriving (Eq, Ord, Show)
newtype Gold = Gold Int
deriving (Eq, Ord, Show, Num)
newtype HitPoint = HitPoint Int
deriving (Eq, Ord, Show, Num)
type Inventory = TVar [Item]
type Health = TVar HitPoint
type Balance = TVar Gold
data Player = Player {
balance :: Balance,
health :: Health,
inventory :: Inventory
}
參數(shù)化類型 TVar 是一個可變量,可以在 atomically 塊中讀取或者修改。為了簡單起見,我們把玩家的背包(Inventory)定義為物品的列表。同時注意到,我們用到了 newtype ,這樣不會混淆財富和健康屬性。
當(dāng)需要在兩個賬戶(Balance)之間轉(zhuǎn)賬,我們所要的做的就只是調(diào)整下各自的 Tvar 。
-- file: ch28/GameInventory.hs
basicTransfer qty fromBal toBal = do
fromQty <- readTVar fromBal
toQty <- readTVar toBal
writeTVar fromBal (fromQty - qty)
writeTVar toBal (toQty + qty)
讓我們寫個簡單的測試函數(shù)
-- file: ch28/GameInventory.hs
transferTest = do
alice <- newTVar (12 :: Gold)
bob <- newTVar 4
basicTransfer 3 alice bob
liftM2 (,) (readTVar alice) (readTVar bob)
如果我們在ghci里執(zhí)行下這個函數(shù),應(yīng)該有如下的結(jié)果
ghci> :load GameInventory
[1 of 1] Compiling Main ( GameInventory.hs, interpreted )
Ok, modules loaded: Main.
ghci> atomically transferTest
Loading package array-0.4.0.0 ... linking ... done.
Loading package stm-2.3 ... linking ... done.
(Gold 9,Gold 7)
原子性和隔離性保證了當(dāng)其他線程同時看到 bob 的賬戶和 alice 的賬戶被修改了。
即使在并發(fā)程序里,我們也努力保持代碼盡量的純函數(shù)化。這使得我們的代碼更加容易推導(dǎo)和測試。由于數(shù)據(jù)并沒有事務(wù)性,這也讓底層的STM做更少的事。以下的純函數(shù)實(shí)現(xiàn)了從我們來表示玩家背包的數(shù)列里移除一個物品。
-- file: ch28/GameInventory.hs
removeInv :: Eq a => a -> [a] -> Maybe [a]
removeInv x xs =
case takeWhile (/= x) xs of
(_:ys) -> Just ys
[] -> Nothing
這里返回值用了 Maybe 類型,它可以用來表示物品是否在玩家的背包里。
下面這個事務(wù)性的函數(shù)實(shí)現(xiàn)了把一個物品給另外一個玩家。這個函數(shù)有一點(diǎn)點(diǎn)復(fù)雜因為需要判斷給予者是否有這個物品。
-- file: ch28/GameInventory.hs
maybeGiveItem item fromInv toInv = do
fromList <- readTVar fromInv
case removeInv item fromList of
Nothing -> return False
Just newList -> do
writeTVar fromInv newList
destItems <- readTVar toInv
writeTVar toInv (item : destItems)
return True
既然我們提供了有原子性和隔離型的事務(wù),那么保證我們不能有意或是無意的從 atomically 執(zhí)行塊從脫離顯得格外重要。借由 STM monad,Haskell的類型系統(tǒng)保證了我們這種行為。
ghci> :type atomically
atomically :: STM a -> IO a
atomically 接受一個 STM monad的動作, 然后執(zhí)行并讓我們可以從 IO monad里拿到這個結(jié)果。 STM monad是所有事務(wù)相關(guān)代碼執(zhí)行的地方。比如這些操作 TVar 值的函數(shù)都在 STM monad里被執(zhí)行。
ghci> :type newTVar
newTVar :: a -> STM (TVar a)
ghci> :type readTVar
readTVar :: TVar a -> STM a
ghci> :type writeTVar
writeTVar :: TVar a -> a -> STM ()
我們之前定義的事務(wù)性函數(shù)也有這個特性
-- file: ch28/GameInventory.hs
basicTransfer :: Gold -> Balance -> Balance -> STM ()
maybeGiveItem :: Item -> Inventory -> Inventory -> STM Bool
在 STM monad里是不允許執(zhí)行I/O操作或者是修改非事務(wù)性的可變狀態(tài),比如 MVar 的值。這就使得我們可以避免那些違背事務(wù)完整的操作。
maybeGiveItem 這個函數(shù)看上去稍微有點(diǎn)怪異。只有當(dāng)角色有這個物品時才會將它給另外一個角色,這看上去還算合理,然后返回一個 Bool 值使調(diào)用這個函數(shù)的代碼變得復(fù)雜。下面這個函數(shù)調(diào)用了 maybeGiveItem, 它必須根據(jù) maybeGiveItem 的返回結(jié)果來決定如何繼續(xù)執(zhí)行。
maybeSellItem :: Item -> Gold -> Player -> Player -> STM Bool
maybeSellItem item price buyer seller = do
given <- maybeGiveItem item (inventory seller) (inventory buyer)
if given
then do
basicTransfer price (balance buyer) (balance seller)
return True
else return False
我們不僅要檢查物品是否給到了另一個玩家,而且還得把是否成功這個信號傳遞給調(diào)用者。這就意味了復(fù)雜性被延續(xù)到了更外層。
下面我們來看看如何用更加優(yōu)雅的方式處理事務(wù)無法成功進(jìn)行的情況。 STM API 提供了一個 retry 函數(shù),它可以立即中斷一個 無法成功進(jìn)行的 atomically 執(zhí)行塊。正如這個函數(shù)名本身所指明的意思,當(dāng)它發(fā)生時,執(zhí)行塊會被重新執(zhí)行,所有在這之前的操作都不會被記錄。我們使用 retry 重新實(shí)現(xiàn)了 maybeGiveItem 。
-- file: ch28/GameInventory.hs
giveItem :: Item -> Inventory -> Inventory -> STM ()
giveItem item fromInv toInv = do
fromList <- readTVar fromInv
case removeInv item fromList of
Nothing -> retry
Just newList -> do
writeTVar fromInv newList
readTVar toInv >>= writeTVar toInv . (item :)
我們之前實(shí)現(xiàn)的 basicTransfer 有一個缺陷:沒有檢查發(fā)送者的賬戶是否有足夠的資金。我們可以使用 retry 來糾正這個問題并保持方法簽名不變。
-- file: ch28/GameInventory.hs
transfer :: Gold -> Balance -> Balance -> STM ()
transfer qty fromBal toBal = do
fromQty <- readTVar fromBal
when (qty > fromQty) $
retry
writeTVar fromBal (fromQty - qty)
readTVar toBal >>= writeTVar toBal . (qty +)
使用 retry 后,銷售物品的函數(shù)就顯得簡單很多。
sellItem :: Item -> Gold -> Player -> Player -> STM ()
sellItem item price buyer seller = do
giveItem item (inventory seller) (inventory buyer)
transfer price (balance buyer) (balance seller)
這個實(shí)現(xiàn)和之前的稍微有點(diǎn)不同。如果有必要會會阻塞以至賣家有東西可賣并且買家有足夠的余額支付,而不是在發(fā)現(xiàn)賣家沒這個物品可銷售時馬上返回 False 。
retry 不僅僅使得代碼更加簡潔:它似乎有魔力般的內(nèi)部實(shí)現(xiàn)。當(dāng)我們調(diào)用 retry 的時候,它并不是馬上重啟事務(wù),而是會先阻塞線程,一直到那些在 retry 之前被訪問過的變量被其他線程修改。
比如,如果我們調(diào)用 transfer 而發(fā)現(xiàn)余額不足, retry 會自發(fā)的等待,直到賬戶余額的變動,然后會重新啟動事務(wù)。 同樣的,對于函數(shù) giveItem , 如果賣家沒有那個物品,線程就會阻塞直到他有了那個物品。
有時候我們并不總是希望重啟 atomically 操作即使調(diào)用了 retry 或者由于其他線程的同步修改而導(dǎo)致的失敗。比如函數(shù) sellItem 會不斷地重試,只要沒有滿足其條件:要有物品并且余額足夠。然而我們可能更希望只重試一次。
orElse 組合器允許我們在主操作失敗的情況下,執(zhí)行一個”備用”操作。
ghci> :type orElse
orElse :: STM a -> STM a -> STM a
我們對 sellItem 做了一點(diǎn)修改:如果 sellItem 失敗, 則 orElse 執(zhí)行 returnFalse 的動作從而使這個sale函數(shù)立即返回。
trySellItem :: Item -> Gold -> Player -> Player -> STM Bool
trySellItem item price buyer seller =
sellItem item price buyer seller >> return True
`orElse`
return False
假設(shè)我們想做稍微有挑戰(zhàn)的事情,從一系列的物品中,選取第一個賣家擁有的并且買家能承擔(dān)費(fèi)用的物品進(jìn)行購買,如果沒有這樣的物品則什么都不做。顯然我們可以很直觀的給出實(shí)現(xiàn)。
-- file: ch28/GameInventory.hs
crummyList :: [(Item, Gold)] -> Player -> Player
-> STM (Maybe (Item, Gold))
crummyList list buyer seller = go list
where go [] = return Nothing
go (this@(item,price) : rest) = do
sellItem item price buyer seller
return (Just this)
`orElse`
go rest
在這個實(shí)現(xiàn)里,我們有碰到了一個熟悉的問題:把我們的需求和如果實(shí)現(xiàn)混淆在一個。再深入一點(diǎn)觀察,則會發(fā)現(xiàn)兩個可重復(fù)使用的模式。
第一個就是讓事務(wù)失敗而不是重試。
-- file: ch28/GameInventory.hs
maybeSTM :: STM a -> STM (Maybe a)
maybeSTM m = (Just `liftM` m) `orElse` return Nothing
第二個,我們要對一系列的對象執(zhí)行否一個操作,直到有一個成功為止。如果全部都失敗,則執(zhí)行 retry 操作。由于 STM 是 MonadPlus 類型類的一個實(shí)例,所以顯得很方便。
-- file: ch28/STMPlus.hs
instance MonadPlus STM where
mzero = retry
mplus = orElse
Control.Monad 模塊定義了一個 msum 函數(shù),而它就是我們所需要的。
-- file: ch28/STMPlus.hs
msum :: MonadPlus m => [m a] -> m a
msum = foldr mplus mzero
有了這些重要的工具,我們就可以寫出更加簡潔的實(shí)現(xiàn)了。
-- file: ch28/GameInventory.hs
shoppingList :: [(Item, Gold)] -> Player -> Player
-> STM (Maybe (Item, Gold))
shoppingList list buyer seller = maybeSTM . msum $ map sellOne list
where sellOne this@(item,price) = do
sellItem item price buyer seller
return this
既然 STM 是 MonadPlus 類型類的實(shí)例,我們可以改進(jìn) maybeSTM ,這樣就可以適用于任何 MonadPlus 的實(shí)例。
-- file: ch28/GameInventory.hs
maybeM :: MonadPlus m => m a -> m (Maybe a)
maybeM m = (Just `liftM` m) `mplus` return Nothing
這個函數(shù)會在很多不同情況下顯得非常有用。
STM monad 禁止任意的I/O操作,因為I/O操作會破壞原子性和隔離性。當(dāng)然I/O的操作還是需要的,只是我們需要非常的謹(jǐn)慎。
大多數(shù)時候,我們會執(zhí)行I/O操作是由于我們在 atomically 塊中產(chǎn)生的一個結(jié)果。在這些情況下,正確的做法通常是 atomically 返回一些數(shù)據(jù),在I/O monad里的調(diào)用者則根據(jù)這些數(shù)據(jù)知道如何繼續(xù)下一步動作。我們甚至可以返回需要被操作的動作 (action), 因為他們是第一類值(First Class vaules)。
-- file: ch28/STMIO.hs
someAction :: IO a
stmTransaction :: STM (IO a)
stmTransaction = return someAction
doSomething :: IO a
doSomething = join (atomically stmTransaction)
我們偶爾也需要在 STM 里進(jìn)行I/O操作。比如從一個肯定存在的文件里讀取一些非可變數(shù)據(jù),這樣的操作并不會違背 STM 保證原子性和隔離性的原則。在這些情況,我們可以使用 unsafeIOToSTM 來執(zhí)行一個 IO 操作。這個函數(shù)位于偏底層的一個模塊 GHC.Conc ,所以要謹(jǐn)慎使用。
ghci> :m +GHC.Conc
ghci> :type unsafeIOToSTM
unsafeIOToSTM :: IO a -> STM a
我們所執(zhí)行的這個 IO 動作絕對不能打開另外一個 atomically 事務(wù)。如果一個線程嘗試嵌套的事務(wù),系統(tǒng)就會拋出異常。
由于類型系統(tǒng)無法幫助我們確保 IO 代碼沒有執(zhí)行一些敏感動作,最安全的做法就是我們盡量的限制使用 unsafeIOToSTM 。下面的例子展示了在 atomically 中執(zhí)行 IO 的典型錯誤。
-- file: ch28/STMIO.hs
launchTorpedoes :: IO ()
notActuallyAtomic = do
doStuff
unsafeIOToSTM launchTorpedoes
mightRetry
如果 mightRetry 會引發(fā)事務(wù)的重啟,那么 launchTorpedoes 會被調(diào)用多次。事實(shí)上,我們無法預(yù)見它會被調(diào)用多少次, 因為重試是由運(yùn)行時系統(tǒng)所處理的。解決方案就是在事務(wù)中不要有這種類型的non-idempotent I/O操作。
正如基礎(chǔ)類型 TVar 那樣, stm 包也提供了兩個更有用的類型用于線程之間的通訊, TMVar 和 TChan 。 TMVar 是STM世界的 MVar , 它可以保存一個 Maybe 類型的值, 即 Just 值或者 Nothing 。 TChan 則是 STM 世界里的 Chan ,它實(shí)現(xiàn)了一個有類型的先進(jìn)先出(FIFO)通道。
[譯者注:為何說 TMVar 是STM世界的 MVar 而不是 TVar ?是因為從實(shí)踐意義上理解的。 MVar 的特性是要么有值要么為空的一個容器,所以當(dāng)線程去讀這個容器時,要么讀到值繼續(xù)執(zhí)行,要么讀不到值就等待。 而 TVar 并沒有這樣的特性,所以引入了 TMVar 。 它的實(shí)現(xiàn)是這樣的, newtypeTMVara=TMVar(TVar(Maybea)) , 正是由于它包含了一個 Maybe 類型的值,這樣就有了”要么有值要么為空”這樣的特性,也就是 MVar 所擁有的特性。]
作為一個使用 STM 的實(shí)際例子, 我們將開發(fā)一個檢查HTML文件里不正確鏈接的程序,這里不正確的鏈接是指那些鏈接指向了一個錯誤的網(wǎng)頁或是無法訪問到其指向的服務(wù)器。用并發(fā)的方式解決這個問題非常得合適:如果我們嘗試和已經(jīng)下線的服務(wù)器(dead server)通訊,需要有兩分鐘的超時時間。如果使用多線程,即使有一兩個線程由于和響應(yīng)很慢或者下線的服務(wù)器通訊而停住(stuck),我們還是可以繼續(xù)進(jìn)行一些有用的事情。
我們不能簡單直觀的給每一個URL新建一個線程,因為由于(也是我們預(yù)想的)大多數(shù)鏈接是正確的,那么這樣做就會導(dǎo)致CPU或是網(wǎng)絡(luò)連接超負(fù)荷。因此,我們只會創(chuàng)建固定數(shù)量的線程,這些線程會從一個隊列里拿URL做檢查。
-- file: ch28/Check.hs
{-# LANGUAGE FlexibleContexts, GeneralizedNewtypeDeriving,
PatternGuards #-}
import Control.Concurrent (forkIO)
import Control.Concurrent.STM
import Control.Exception (catch, finally)
import Control.Monad.Error
import Control.Monad.State
import Data.Char (isControl)
import Data.List (nub)
import Network.URI
import Prelude hiding (catch)
import System.Console.GetOpt
import System.Environment (getArgs)
import System.Exit (ExitCode(..), exitWith)
import System.IO (hFlush, hPutStrLn, stderr, stdout)
import Text.Printf (printf)
import qualified Data.ByteString.Lazy.Char8 as B
import qualified Data.Set as S
-- 這里需要HTTP包, 它并不是GHC自帶的.
import Network.HTTP
type URL = B.ByteString
data Task = Check URL | Done
main 函數(shù)顯示了這個程序的主體腳手架(scaffolding)。
-- file: ch28/Check.hs
main :: IO ()
main = do
(files,k) <- parseArgs
let n = length files
-- count of broken links
badCount <- newTVarIO (0 :: Int)
-- for reporting broken links
badLinks <- newTChanIO
-- for sending jobs to workers
jobs <- newTChanIO
-- the number of workers currently running
workers <- newTVarIO k
-- one thread reports bad links to stdout
forkIO $ writeBadLinks badLinks
-- start worker threads
forkTimes k workers (worker badLinks jobs badCount)
-- read links from files, and enqueue them as jobs
stats <- execJob (mapM_ checkURLs files)
(JobState S.empty 0 jobs)
-- enqueue "please finish" messages
atomically $ replicateM_ k (writeTChan jobs Done)
waitFor workers
broken <- atomically $ readTVar badCount
printf fmt broken
(linksFound stats)
(S.size (linksSeen stats))
n
where
fmt = "Found %d broken links. " ++
"Checked %d links (%d unique) in %d files.\n"
當(dāng)我們處于 IO monad時,可以使用 newTVarIO 函數(shù)新建一個 TVar 值。同樣的,也有類似的函數(shù)可以新建 TMVar 和 TChan 值。
在程序用了 printf 函數(shù)打印出最后的結(jié)果。和C語言里類似函數(shù) printf 不同的是Haskell這個版本會在運(yùn)行時檢查參數(shù)的個數(shù)以及其類型。
ghci> :m +Text.Printf
ghci> printf "%d and %d\n" (3::Int)
3 and *** Exception: Printf.printf: argument list ended prematurely
ghci> printf "%s and %d\n" "foo" (3::Int)
foo and 3
在 ghci 里試試 printf"%d"True ,看看會得到什么結(jié)果。
支持 main 函數(shù)的是幾個短小的函數(shù)。
-- file: ch28/Check.hs
modifyTVar_ :: TVar a -> (a -> a) -> STM ()
modifyTVar_ tv f = readTVar tv >>= writeTVar tv . f
forkTimes :: Int -> TVar Int -> IO () -> IO ()
forkTimes k alive act =
replicateM_ k . forkIO $
act
`finally`
(atomically $ modifyTVar_ alive (subtract 1))
forkTimes 函數(shù)新建特定數(shù)量的相同的工作線程,每當(dāng)一個線程推出時,則”活動”線程的計數(shù)器相應(yīng)的減一。我們使用 finally 組合器確保無論線程是如何終止的,都會減少”活動”線程的數(shù)量。
下一步, writeBadLinks 會把每個失效或者死亡(dead)的鏈接打印到 stdout 。
-- file: ch28/Check.hs
writeBadLinks :: TChan String -> IO ()
writeBadLinks c =
forever $
atomically (readTChan c) >>= putStrLn >> hFlush stdout
上面我們使用了 forever 組合器使一個操作永遠(yuǎn)的執(zhí)行。
ghci> :m +Control.Monad
ghci> :type forever
forever :: (Monad m) => m a -> m ()
waitFor 函數(shù)使用了 check , 當(dāng)它的參數(shù)是 False 時會調(diào)用 retry 。
-- file: ch28/Check.hs
waitFor :: TVar Int -> IO ()
waitFor alive = atomically $ do
count <- readTVar alive
check (count == 0)
這個原生的函數(shù)實(shí)現(xiàn)了如何檢查一個鏈接的狀態(tài)。 代碼和 [第二十二章 Chapter 22, Extended Example: Web Client Programming] 里的 podcatcher 相似但有一點(diǎn)不同。
-- file: ch28/Check.hs
getStatus :: URI -> IO (Either String Int)
getStatus = chase (5 :: Int)
where
chase 0 _ = bail "too many redirects"
chase n u = do
resp <- getHead u
case resp of
Left err -> bail (show err)
Right r ->
case rspCode r of
(3,_,_) ->
case findHeader HdrLocation r of
Nothing -> bail (show r)
Just u' ->
case parseURI u' of
Nothing -> bail "bad URL"
Just url -> chase (n-1) url
(a,b,c) -> return . Right $ a * 100 + b * 10 + c
bail = return . Left
getHead :: URI -> IO (Result Response)
getHead uri = simpleHTTP Request { rqURI = uri,
rqMethod = HEAD,
rqHeaders = [],
rqBody = "" }
為了避免無盡的重定向相應(yīng),我們只允許固定次數(shù)的重定向請求。我們通過查看HTTP標(biāo)準(zhǔn)HEAD信息來確認(rèn)鏈接的有效性, 比起一個完整的GET請求,這樣做可以減少網(wǎng)絡(luò)流量。
這個代碼是典型的”marching off the left of the screen”風(fēng)格。正如之前我們提到的,需要謹(jǐn)慎使用這樣的風(fēng)格。下面我們用 ErrorT monad transformer 和幾個通用一點(diǎn)的方法進(jìn)行了重新實(shí)現(xiàn),它看上去簡潔了很多。
-- file: ch28/Check.hs
getStatusE = runErrorT . chase (5 :: Int)
where
chase :: Int -> URI -> ErrorT String IO Int
chase 0 _ = throwError "too many redirects"
chase n u = do
r <- embedEither show =<< liftIO (getHead u)
case rspCode r of
(3,_,_) -> do
u' <- embedMaybe (show r) $ findHeader HdrLocation r
url <- embedMaybe "bad URL" $ parseURI u'
chase (n-1) url
(a,b,c) -> return $ a*100 + b*10 + c
-- Some handy embedding functions.
embedEither :: (MonadError e m) => (s -> e) -> Either s a -> m a
embedEither f = either (throwError . f) return
embedMaybe :: (MonadError e m) => e -> Maybe a -> m a
embedMaybe err = maybe (throwError err) return
每個工作者線程(Worker Thread)從一個共享隊列里拿一個任務(wù),這個任務(wù)要么檢查鏈接有效性,要么讓線程推出。
-- file: ch28/Check.hs
worker :: TChan String -> TChan Task -> TVar Int -> IO ()
worker badLinks jobQueue badCount = loop
where
-- Consume jobs until we are told to exit.
loop = do
job <- atomically $ readTChan jobQueue
case job of
Done -> return ()
Check x -> checkOne (B.unpack x) >> loop
-- Check a single link.
checkOne url = case parseURI url of
Just uri -> do
code <- getStatus uri `catch` (return . Left . show)
case code of
Right 200 -> return ()
Right n -> report (show n)
Left err -> report err
_ -> report "invalid URL"
where report s = atomically $ do
modifyTVar_ badCount (+1)
writeTChan badLinks (url ++ " " ++ s)
我們構(gòu)造了基于 IO monad 的 狀態(tài) monad transformer棧用于查找鏈接。這個狀態(tài)會記錄我們已經(jīng)找到過的鏈接(避免重復(fù))、鏈接的數(shù)量以及一個隊列,我們會把需要做檢查的鏈接放到這個隊列里。
-- file: ch28/Check.hs
data JobState = JobState { linksSeen :: S.Set URL,
linksFound :: Int,
linkQueue :: TChan Task }
newtype Job a = Job { runJob :: StateT JobState IO a }
deriving (Monad, MonadState JobState, MonadIO)
execJob :: Job a -> JobState -> IO JobState
execJob = execStateT . runJob
嚴(yán)格來說,對于對立運(yùn)行的小型程序,我們并不需要用到 newtype ,然后我們還是將它作為一個好的編碼實(shí)踐的例子放在這里。(畢竟也只多了幾行代碼)
main 函數(shù)實(shí)現(xiàn)了對每個輸入文件調(diào)用一次 checkURLs 方法,所以 checkURLs 的參數(shù)就是單個文件。
-- file: ch28/Check.hs
checkURLs :: FilePath -> Job ()
checkURLs f = do
src <- liftIO $ B.readFile f
let urls = extractLinks src
filterM seenURI urls >>= sendJobs
updateStats (length urls)
updateStats :: Int -> Job ()
updateStats a = modify $ \s ->
s { linksFound = linksFound s + a }
-- | Add a link to the set we have seen.
insertURI :: URL -> Job ()
insertURI c = modify $ \s ->
s { linksSeen = S.insert c (linksSeen s) }
-- | If we have seen a link, return False. Otherwise, record that we
-- have seen it, and return True.
seenURI :: URL -> Job Bool
seenURI url = do
seen <- (not . S.member url) `liftM` gets linksSeen
insertURI url
return seen
sendJobs :: [URL] -> Job ()
sendJobs js = do
c <- gets linkQueue
liftIO . atomically $ mapM_ (writeTChan c . Check) js
extractLinks 函數(shù)并沒有嘗試去準(zhǔn)確的去解析一個HTMP或是文本文件,而只是匹配那些看上去像URL的字符串。我們認(rèn)為這樣做就夠了。
-- file: ch28/Check.hs
extractLinks :: B.ByteString -> [URL]
extractLinks = concatMap uris . B.lines
where uris s = filter looksOkay (B.splitWith isDelim s)
isDelim c = isControl c || c `elem` " <>\"{}|\\^[]`"
looksOkay s = http `B.isPrefixOf` s
http = B.pack "http:"
我們使用了 System.Console.GetOpt 模塊來解析命令行參數(shù)。這個模塊提供了很多解析命令行參數(shù)的很有用的方法,不過使用起來稍微有點(diǎn)繁瑣。
-- file: ch28/Check.hs
data Flag = Help | N Int
deriving Eq
parseArgs :: IO ([String], Int)
parseArgs = do
argv <- getArgs
case parse argv of
([], files, []) -> return (nub files, 16)
(opts, files, [])
| Help `elem` opts -> help
| [N n] <- filter (/=Help) opts -> return (nub files, n)
(_,_,errs) -> die errs
where
parse argv = getOpt Permute options argv
header = "Usage: urlcheck [-h] [-n n] [file ...]"
info = usageInfo header options
dump = hPutStrLn stderr
die errs = dump (concat errs ++ info) >> exitWith (ExitFailure 1)
help = dump info >> exitWith ExitSuccess
getOpt 函數(shù)接受三個參數(shù)
- 參數(shù)順序的定義。 它定義了選項(Option)是否可以和其他參數(shù)混淆使用(就是我們上面用到的 Permute )或者是選項必須出現(xiàn)在參數(shù)之前。
- 選項的定義。 每個選項有這四個部分組成: 簡稱,全稱,選項的描述(比如是否接受參數(shù)) 以及用戶說明。
- 參數(shù)和選項數(shù)組,類似于 getArgs 的返回值。
這個函數(shù)返回一個三元組,包括用戶輸入的選項,參數(shù)以及錯誤信息(如果有的話)。
我們使用 Flag 代數(shù)類型(Algebraic Data Type)表示程序所能接收的選項。
-- file: ch28/Check.hs
options :: [OptDescr Flag]
options = [ Option ['h'] ["help"] (NoArg Help)
"Show this help message",
Option ['n'] [] (ReqArg (\s -> N (read s)) "N")
"Number of concurrent connections (default 16)" ]
options 列表保存了每個程序能接收選項的描述。每個描述必須要生成一個 Flag 值。參考上面例子中是如何使用 NoArg 和 ReqArg 。 GetOpt 模塊的 ArgDescr 類型有很多構(gòu)造函數(shù)(Constructors)。
-- file: ch28/GetOpt.hs
data ArgDescr a = NoArg a
| ReqArg (String -> a) String
| OptArg (Maybe String -> a) String
函數(shù) parseArgs 的定義里其實(shí)潛在了一個語言擴(kuò)展(Language Extension), Pattern guards。用它可以寫出更加簡要的guard expressions. 它通過語言擴(kuò)展 PatternGuards 來使用。
一個Pattern Guard有三個組成部分: 一個模式(Pattern), 一個 <- 符號以及一個表達(dá)式。表達(dá)式會被解釋然后和模式相匹配。 如果成功,在模式中定義的變量會被賦值。我們可以在一個guard里同時使用pattern guards和普通的 Bool guard expressions。
-- file: ch28/PatternGuard.hs
{-# LANGUAGE PatternGuards #-}
testme x xs | Just y <- lookup x xs, y > 3 = y
| otherwise = 0
在上面的例子中,當(dāng)關(guān)鍵字 x 存在于alist xs 并且大于等于3,則返回它所對應(yīng)的值。下面的定義實(shí)現(xiàn)了同樣的功能。
-- file: ch28/PatternGuard.hs
testme_noguards x xs = case lookup x xs of
Just y | y > 3 -> y
_ -> 0
Pattern guards 使得我們可以把一系列的guards和 case 表達(dá)式組合到單個guard,從而寫出更加簡潔并容易理解的guards。
至此我們還并未提及STM所提供的特別優(yōu)越的地方。比如它在做組合(composes)方面就表現(xiàn)的很好:當(dāng)需要向一個事務(wù)中增加邏輯時,只需要用到常見的函數(shù) (>>=) 和 (>>) 。
組合的概念在構(gòu)建模塊化軟件是顯得格外重要。如果我們把倆段都沒有問題的代碼組合在一起,也應(yīng)該是能很好工作的。常規(guī)的線程編程技術(shù)無法實(shí)現(xiàn)組合,然而由于STM提供了一些很關(guān)鍵的前提,從而使在線程編程時使用組合變得可能。
STM monad防止了我們意外的非事務(wù)性的I/O。我們不再需要關(guān)心鎖的順序,因為代碼里根本沒有鎖機(jī)制。我們可以忘記丟失喚醒,因為不再有條件變量了。如果有異常發(fā)生,我們則可以用函數(shù) catchSTM 捕捉到,或者是往上級傳遞。 最后,我們可以用 retry 和 orElse 以更加漂亮的方式組織代碼。
采用STM機(jī)制的代碼不會死鎖,但是導(dǎo)致饑餓還是有可能的。一個長事務(wù)導(dǎo)致另外一個事務(wù)不停的 retry 。為了解決這樣的問題,需要盡量的短事務(wù)并保持?jǐn)?shù)據(jù)一致性。
無論是同步管理還是內(nèi)存管理,經(jīng)常會遇到保留控制權(quán)的情況:一些軟件需要對延時或是內(nèi)存使用記錄有很強(qiáng)的保證,因此就必須花很多時間和精力去管理和調(diào)試顯式的代碼。然后對于軟件的大多數(shù)實(shí)際情況,垃圾回收(Garbage Collection)和STM已經(jīng)做的足夠好了。
STM并不是一顆完美的靈丹妙藥。當(dāng)我們選擇垃圾回收而不是顯式的內(nèi)存管理, 我們是放棄了控制權(quán)從而獲得更加安全的代碼。 同樣的,當(dāng)使用STM時,我們放棄了底層的細(xì)節(jié),從而希望代碼可讀性更好,更加容易理解。
STM并不能消除某些類型的bug。比如,我們在一個 atomically 事務(wù)中從某個賬號中取錢,然后返回到 IO monad,然后在另一個 atomically 事務(wù)中把錢存到另一個賬號,那么代碼就會產(chǎn)生不一致性,因為會在某個特定時刻,這部分錢不會出現(xiàn)的任意一個賬號里。
-- file: ch28/GameInventory.hs
bogusTransfer qty fromBal toBal = do
fromQty <- atomically $ readTVar fromBal
-- window of inconsistency
toQty <- atomically $ readTVar toBal
atomically $ writeTVar fromBal (fromQty - qty)
-- window of inconsistency
atomically $ writeTVar toBal (toQty + qty)
bogusSale :: Item -> Gold -> Player -> Player -> IO ()
bogusSale item price buyer seller = do
atomically $ giveItem item (inventory seller) (inventory buyer)
bogusTransfer price (balance buyer) (balance seller)
在同步程序中,這類問題顯然很難而且不容易重現(xiàn)。比如上述例子中的不一致性問題通常只存在一段很短的時間內(nèi)。在開發(fā)階段通常不會出現(xiàn)這類問題,而往往只有在負(fù)載很高的產(chǎn)品環(huán)境才有可能發(fā)生。
我們可以用函數(shù) alwaysSucceeds 定義一個不變量,它是永遠(yuǎn)為真的一個數(shù)據(jù)屬性。
ghci> :type alwaysSucceeds
alwaysSucceeds :: STM a -> STM ()
當(dāng)創(chuàng)建一個不變量時,它馬上會被檢查。如果要失敗,那么這個不變量會拋出異常。更有意思的是,不變量會在經(jīng)后每個事務(wù)完成時自動被檢查。如果在任何一個點(diǎn)上失敗,事務(wù)就會推出,不變量拋出的異常也會被傳遞下去。這就意味著當(dāng)不變量的條件被違反時,我們就可以馬上得到反饋。
比如,下面兩個函數(shù)給本章開始時定義的游戲世界增加玩家
-- file: ch28/GameInventory.hs
newPlayer :: Gold -> HitPoint -> [Item] -> STM Player
newPlayer balance health inventory =
Player `liftM` newTVar balance
`ap` newTVar health
`ap` newTVar inventory
populateWorld :: STM [Player]
populateWorld = sequence [ newPlayer 20 20 [Wand, Banjo],
newPlayer 10 12 [Scroll] ]
下面的函數(shù)則返回了一個不變量,通過它我們可以保證整個游戲世界資金總是平衡的:即任何時候的資金總量和游戲建立時的總量是一樣的。
-- file: ch28/GameInventory.hs
consistentBalance :: [Player] -> STM (STM ())
consistentBalance players = do
initialTotal <- totalBalance
return $ do
curTotal <- totalBalance
when (curTotal /= initialTotal) $
error "inconsistent global balance"
where totalBalance = foldM addBalance 0 players
addBalance a b = (a+) `liftM` readTVar (balance b)
下面我們寫個函數(shù)來試驗下。
-- file: ch28/GameInventory.hs
tryBogusSale = do
players@(alice:bob:_) <- atomically populateWorld
atomically $ alwaysSucceeds =<< consistentBalance players
bogusSale Wand 5 alice bob
由于在函數(shù) bogusTransfer 中不正確地使用了 atomically 而會導(dǎo)致不一致性, 當(dāng)我們在 ghci 里運(yùn)行這個方法時則會檢測到這個不一致性。
ghci> tryBogusSale
*** Exception: inconsistent global balance
更多建議: