Haskell Language
rör
Sök…
Anmärkningar
Som hackasidan beskriver:
rör är ett rent och kraftfullt strömbehandlingsbibliotek som låter dig bygga och ansluta återanvändbara strömningskomponenter
Program som implementeras genom streaming kan ofta vara kortfattade och komponerbara, med enkla, korta funktioner som gör att du enkelt kan "spela in eller ut" -funktioner med stöd av Haskell-systemet.
await :: Monad m => Consumer' ama
Drar ett värde från uppströms, där a
är vår inmatningstyp.
yield :: Monad m => a -> Producer' am ()
Producera ett värde, där a
är utgångstypen.
Det rekommenderas starkt att du läser igenom det inbäddade Pipes.Tutorial
paketet som ger en utmärkt överblick över kärnbegreppen i Pipes och hur Producer
, Consumer
and Effect
interagerar.
producenter
En Producer
är en monadisk åtgärd som kan yield
värden för nedströmsförbrukning:
type Producer b = Proxy X () () b
yield :: Monad m => a -> Producer a m ()
Till exempel:
naturals :: Monad m => Producer Int m ()
naturals = each [1..] -- each is a utility function exported by Pipes
Vi kan naturligtvis ha Producer
som också är funktioner för andra värden:
naturalsUntil :: Monad m => Int -> Producer Int m ()
naturalsUntil n = each [1..n]
konsumenter
En Consumer
kan bara await
värden från uppströms.
type Consumer a = Proxy () a () X
await :: Monad m => Consumer a m a
Till exempel:
fancyPrint :: MonadIO m => Consumer String m ()
fancyPrint = forever $ do
numStr <- await
liftIO $ putStrLn ("I received: " ++ numStr)
rör
Rör kan både await
och yield
.
type Pipe a b = Proxy () a () b
Denna rör väntar på en Int
och konverterar den till en String
:
intToStr :: Monad m => Pipe Int String m ()
intToStr = forever $ await >>= (yield . show)
Körrör med runEffect
Vi använder runEffect
att köra vårt Pipe
:
main :: IO ()
main = do
runEffect $ naturalsUntil 10 >-> intToStr >-> fancyPrint
Observera att runEffect
kräver en Effect
, som är en fristående Proxy
utan ingångar eller utgångar:
runEffect :: Monad m => Effect m r -> m r
type Effect = Proxy X () () X
(där X
är den tomma typen, även känd som Void
).
Anslutande rör
Använd >->
att ansluta Producer
, Consumer
och Pipe
att komponera större Pipe
.
printNaturals :: MonadIO m => Effect m ()
printNaturals = naturalsUntil 10 >-> intToStr >-> fancyPrint
Producer
, Consumer
, Pipe
och Effect
definieras i termer av den allmänna Proxy
. Därför kan >->
användas för en mängd olika syften. Typer som definieras av det vänstra argumentet måste matcha den typ som konsumeras av det högra argumentet:
(>->) :: Monad m => Producer b m r -> Consumer b m r -> Effect m r
(>->) :: Monad m => Producer b m r -> Pipe b c m r -> Producer c m r
(>->) :: Monad m => Pipe a b m r -> Consumer b m r -> Consumer a m r
(>->) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r
Proxy-monadtransformatorn
pipes
kärndatatyp är Proxy
monad-transformatorn. Pipe
, Producer
, Consumer
och så vidare definieras i form av Proxy
.
Eftersom Proxy
är en monadtransformator, tar definitioner av Pipe
s form av monadiska skript som await
och yield
värden, och dessutom utför effekter från basmonaden m
.
Kombinera rör och nätverkskommunikation
Pipes stöder enkel binär kommunikation mellan en klient och en server
I detta exempel:
- en klient ansluter och skickar en
FirstMessage
- servern tar emot och svarar
DoSomething 0
- klienten tar emot och svarar på
DoNothing
- steg 2 och 3 upprepas på obestämd tid
Kommandot datatyp utbyts över nätverket:
-- Command.hs
{-# LANGUAGE DeriveGeneric #-}
module Command where
import Data.Binary
import GHC.Generics (Generic)
data Command = FirstMessage
| DoNothing
| DoSomething Int
deriving (Show,Generic)
instance Binary Command
Här väntar servern på att en klient ska ansluta:
module Server where
import Pipes
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Command as C
import qualified Pipes.Parse as PP
import qualified Pipes.Prelude as PipesPrelude
pageSize :: Int
pageSize = 4096
-- pure handler, to be used with PipesPrelude.map
pureHandler :: C.Command -> C.Command
pureHandler c = c -- answers the same command that we have receveid
-- impure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
liftIO $ putStrLn $ "received message = " ++ (show c)
return $ C.DoSomething 0
-- whatever incoming command `c` from the client, answer DoSomething 0
main :: IO ()
main = PNT.serve (PNT.Host "127.0.0.1") "23456" $
\(connectionSocket, remoteAddress) -> do
putStrLn $ "Remote connection from ip = " ++ (show remoteAddress)
_ <- runEffect $ do
let bytesReceiver = PNT.fromSocket connectionSocket pageSize
let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
-- if we want to use the pureHandler
--commandDecoder >-> PipesPrelude.map pureHandler >-> for cat PipesBinary.Encode >-> PNT.toSocket connectionSocket
return ()
Klienten ansluter sålunda:
module Client where
import Pipes
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Pipes.Prelude as PipesPrelude
import qualified Pipes.Parse as PP
import qualified Command as C
pageSize :: Int
pageSize = 4096
-- pure handler, to be used with PipesPrelude.amp
pureHandler :: C.Command -> C.Command
pureHandler c = c -- answer the same command received from the server
-- inpure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
liftIO $ putStrLn $ "Received: " ++ (show c)
return C.DoNothing -- whatever is received from server, answer DoNothing
main :: IO ()
main = PNT.connect ("127.0.0.1") "23456" $
\(connectionSocket, remoteAddress) -> do
putStrLn $ "Connected to distant server ip = " ++ (show remoteAddress)
sendFirstMessage connectionSocket
_ <- runEffect $ do
let bytesReceiver = PNT.fromSocket connectionSocket pageSize
let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
return ()
sendFirstMessage :: PNT.Socket -> IO ()
sendFirstMessage s = do
_ <- runEffect $ do
let encodedProducer = PipesBinary.encode C.FirstMessage
encodedProducer >-> PNT.toSocket s
return ()