Haskell Language
Pfeifen
Suche…
Bemerkungen
Wie die Hacker-Seite beschreibt:
Pipes ist eine saubere und leistungsstarke Streamverarbeitungsbibliothek, mit der Sie wiederverwendbare Streaming-Komponenten erstellen und verbinden können
Programme, die durch Streaming implementiert werden, sind oft kurz und komponierbar, mit einfachen, kurzen Funktionen, die es Ihnen ermöglichen, Funktionen einfach mit der Unterstützung des Haskell-Systems "ein- oder auszusteigen".
await :: Monad m => Consumer' ama
Holt einen Wert aus dem Upstream, wobei a
unser Eingabetyp ist.
yield :: Monad m => a -> Producer' am ()
Produzieren Sie einen Wert, wobei a
der Ausgabetyp ist.
Es wird dringend empfohlen, das Embedded Pipes.Tutorial
Paket Pipes.Tutorial
, das einen hervorragenden Überblick über die Kernkonzepte von Pipes und die Interaktion zwischen Producer
, Consumer
und Effect
bietet.
Erzeuger
Ein Producer
ist einige monadischen Aktion, kann yield
Werte für Downstream Verbrauch:
type Producer b = Proxy X () () b
yield :: Monad m => a -> Producer a m ()
Zum Beispiel:
naturals :: Monad m => Producer Int m ()
naturals = each [1..] -- each is a utility function exported by Pipes
Natürlich können wir auch Producer
haben, die Funktionen anderer Werte sind:
naturalsUntil :: Monad m => Int -> Producer Int m ()
naturalsUntil n = each [1..n]
Verbraucher
Ein Consumer
kann nur await
Werte aus dem Upstream await
.
type Consumer a = Proxy () a () X
await :: Monad m => Consumer a m a
Zum Beispiel:
fancyPrint :: MonadIO m => Consumer String m ()
fancyPrint = forever $ do
numStr <- await
liftIO $ putStrLn ("I received: " ++ numStr)
Pfeifen
Pipes können sowohl await
als auch yield
.
type Pipe a b = Proxy () a () b
Diese Pipe erwartet ein Int
und wandelt es in einen String
:
intToStr :: Monad m => Pipe Int String m ()
intToStr = forever $ await >>= (yield . show)
Pipes mit runEffect ausführen
Wir verwenden runEffect
, um unsere Pipe
auszuführen:
main :: IO ()
main = do
runEffect $ naturalsUntil 10 >-> intToStr >-> fancyPrint
Beachten Sie, dass runEffect
einen Effect
benötigt, bei dem es sich um einen eigenständigen Proxy
ohne Ein- oder Ausgänge handelt:
runEffect :: Monad m => Effect m r -> m r
type Effect = Proxy X () () X
(wobei X
der leere Typ ist, auch bekannt als Void
).
Verbindungsrohre
Verwenden Sie >->
, um Producer
, Consumer
und Pipe
miteinander zu verbinden, um größere Pipe
Funktionen zu Pipe
.
printNaturals :: MonadIO m => Effect m ()
printNaturals = naturalsUntil 10 >-> intToStr >-> fancyPrint
Producer
, Consumer
, Pipe
und Effect
Typen werden alle im Hinblick auf den allgemeinen Proxy
Typ definiert. Daher kann >->
für verschiedene Zwecke verwendet werden. Typen, die durch das linke Argument definiert werden, müssen mit dem Typ übereinstimmen, der vom rechten Argument verwendet wird:
(>->) :: Monad m => Producer b m r -> Consumer b m r -> Effect m r
(>->) :: Monad m => Producer b m r -> Pipe b c m r -> Producer c m r
(>->) :: Monad m => Pipe a b m r -> Consumer b m r -> Consumer a m r
(>->) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r
Der Proxy-Monadentransformator
Der Kerndatentyp von pipes
ist der Proxy
Monad-Transformator. Pipe
, Producer
, Consumer
usw. werden in Form von Proxy
.
Da Proxy
ein Monadentransformator ist, haben die Definitionen von Pipe
s die Form monadischer Skripte, die Werte await
und yield
, und zusätzlich Effekte von der Basismonad m
ausführen.
Pipes und Netzwerkkommunikation kombinieren
Pipes unterstützt die einfache binäre Kommunikation zwischen einem Client und einem Server
In diesem Beispiel:
- Ein Client stellt eine Verbindung her und sendet eine
FirstMessage
- Der Server empfängt und beantwortet
DoSomething 0
- Der Kunde empfängt und beantwortet
DoNothing
- Schritt 2 und 3 werden unbegrenzt wiederholt
Der Befehlsdatentyp, der über das Netzwerk ausgetauscht wird:
-- Command.hs
{-# LANGUAGE DeriveGeneric #-}
module Command where
import Data.Binary
import GHC.Generics (Generic)
data Command = FirstMessage
| DoNothing
| DoSomething Int
deriving (Show,Generic)
instance Binary Command
Hier wartet der Server auf die Verbindung eines Clients:
module Server where
import Pipes
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Command as C
import qualified Pipes.Parse as PP
import qualified Pipes.Prelude as PipesPrelude
pageSize :: Int
pageSize = 4096
-- pure handler, to be used with PipesPrelude.map
pureHandler :: C.Command -> C.Command
pureHandler c = c -- answers the same command that we have receveid
-- impure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
liftIO $ putStrLn $ "received message = " ++ (show c)
return $ C.DoSomething 0
-- whatever incoming command `c` from the client, answer DoSomething 0
main :: IO ()
main = PNT.serve (PNT.Host "127.0.0.1") "23456" $
\(connectionSocket, remoteAddress) -> do
putStrLn $ "Remote connection from ip = " ++ (show remoteAddress)
_ <- runEffect $ do
let bytesReceiver = PNT.fromSocket connectionSocket pageSize
let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
-- if we want to use the pureHandler
--commandDecoder >-> PipesPrelude.map pureHandler >-> for cat PipesBinary.Encode >-> PNT.toSocket connectionSocket
return ()
Der Client verbindet sich also:
module Client where
import Pipes
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Pipes.Prelude as PipesPrelude
import qualified Pipes.Parse as PP
import qualified Command as C
pageSize :: Int
pageSize = 4096
-- pure handler, to be used with PipesPrelude.amp
pureHandler :: C.Command -> C.Command
pureHandler c = c -- answer the same command received from the server
-- inpure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
liftIO $ putStrLn $ "Received: " ++ (show c)
return C.DoNothing -- whatever is received from server, answer DoNothing
main :: IO ()
main = PNT.connect ("127.0.0.1") "23456" $
\(connectionSocket, remoteAddress) -> do
putStrLn $ "Connected to distant server ip = " ++ (show remoteAddress)
sendFirstMessage connectionSocket
_ <- runEffect $ do
let bytesReceiver = PNT.fromSocket connectionSocket pageSize
let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
return ()
sendFirstMessage :: PNT.Socket -> IO ()
sendFirstMessage s = do
_ <- runEffect $ do
let encodedProducer = PipesBinary.encode C.FirstMessage
encodedProducer >-> PNT.toSocket s
return ()