Haskell Language
Pipes
Recherche…
Remarques
Comme la page de piratage décrit:
pipes est une bibliothèque de traitement de flux propre et puissante qui vous permet de créer et de connecter des composants de streaming réutilisables
Les programmes mis en œuvre par le biais de la diffusion en continu peuvent souvent être succincts et composables, avec des fonctions simples et courtes vous permettant d'intégrer ou non des fonctions facilement grâce au système de type Haskell.
await :: Monad m => Consumer' ama
Extrait une valeur de l'amont, où a
est notre type d'entrée.
yield :: Monad m => a -> Producer' am ()
Produire une valeur, où a
est le type de sortie.
Il est fortement recommandé de lire le paquet Pipes.Tutorial
intégré qui donne un excellent aperçu des concepts de base de Pipes et de la manière dont Producer
, Consumer
et Effect
interagissent.
Producteurs
Un Producer
est une action monadique qui peut yield
valeurs pour la consommation en aval:
type Producer b = Proxy X () () b
yield :: Monad m => a -> Producer a m ()
Par exemple:
naturals :: Monad m => Producer Int m ()
naturals = each [1..] -- each is a utility function exported by Pipes
On peut bien sûr avoir aussi des Producer
qui sont des fonctions d'autres valeurs:
naturalsUntil :: Monad m => Int -> Producer Int m ()
naturalsUntil n = each [1..n]
Les consommateurs
Un Consumer
ne peut await
valeurs en amont.
type Consumer a = Proxy () a () X
await :: Monad m => Consumer a m a
Par exemple:
fancyPrint :: MonadIO m => Consumer String m ()
fancyPrint = forever $ do
numStr <- await
liftIO $ putStrLn ("I received: " ++ numStr)
Pipes
Les tuyaux peuvent à la fois await
et yield
.
type Pipe a b = Proxy () a () b
Ce Pipe attend un Int
et le convertit en String
:
intToStr :: Monad m => Pipe Int String m ()
intToStr = forever $ await >>= (yield . show)
Running Pipes avec runEffect
Nous utilisons runEffect
pour faire fonctionner notre Pipe
:
main :: IO ()
main = do
runEffect $ naturalsUntil 10 >-> intToStr >-> fancyPrint
Notez que runEffect
nécessite un Effect
, qui est un Proxy
autonome sans entrées ni sorties:
runEffect :: Monad m => Effect m r -> m r
type Effect = Proxy X () () X
(où X
est le type vide, également appelé Void
).
Tuyaux de raccordement
Utilisez >->
pour connecter les Producer
, les Consumer
et les Pipe
pour composer des fonctions de Pipe
plus larges.
printNaturals :: MonadIO m => Effect m ()
printNaturals = naturalsUntil 10 >-> intToStr >-> fancyPrint
Producer
types Producer
, Consumer
, Pipe
et Effect
sont tous définis en termes de type Proxy
général. Par conséquent, >->
peut être utilisé à diverses fins. Les types définis par l'argument de gauche doivent correspondre au type consommé par le bon argument:
(>->) :: Monad m => Producer b m r -> Consumer b m r -> Effect m r
(>->) :: Monad m => Producer b m r -> Pipe b c m r -> Producer c m r
(>->) :: Monad m => Pipe a b m r -> Consumer b m r -> Consumer a m r
(>->) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r
Le transformateur Proxy Monad
Le type de données de base de pipes
est le transformateur de monade de Proxy
. Pipe
, Producer
, Consumer
, etc. sont définis en termes de Proxy
.
Comme Proxy
est un transformateur monad, les définitions de Pipe
s prennent la forme de scripts monadiques qui await
et yield
valeurs, effectuant en outre des effets à partir de la monade de base m
.
Combinaison de tuyaux et de communication réseau
Pipes prend en charge la communication binaire simple entre un client et un serveur
Dans cet exemple:
- un client se connecte et envoie un
FirstMessage
- le serveur reçoit et répond à
DoSomething 0
- le client reçoit et réponses
DoNothing
- les étapes 2 et 3 sont répétées indéfiniment
Le type de données de commande échangé sur le réseau:
-- Command.hs
{-# LANGUAGE DeriveGeneric #-}
module Command where
import Data.Binary
import GHC.Generics (Generic)
data Command = FirstMessage
| DoNothing
| DoSomething Int
deriving (Show,Generic)
instance Binary Command
Ici, le serveur attend qu'un client se connecte:
module Server where
import Pipes
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Command as C
import qualified Pipes.Parse as PP
import qualified Pipes.Prelude as PipesPrelude
pageSize :: Int
pageSize = 4096
-- pure handler, to be used with PipesPrelude.map
pureHandler :: C.Command -> C.Command
pureHandler c = c -- answers the same command that we have receveid
-- impure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
liftIO $ putStrLn $ "received message = " ++ (show c)
return $ C.DoSomething 0
-- whatever incoming command `c` from the client, answer DoSomething 0
main :: IO ()
main = PNT.serve (PNT.Host "127.0.0.1") "23456" $
\(connectionSocket, remoteAddress) -> do
putStrLn $ "Remote connection from ip = " ++ (show remoteAddress)
_ <- runEffect $ do
let bytesReceiver = PNT.fromSocket connectionSocket pageSize
let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
-- if we want to use the pureHandler
--commandDecoder >-> PipesPrelude.map pureHandler >-> for cat PipesBinary.Encode >-> PNT.toSocket connectionSocket
return ()
Le client se connecte ainsi:
module Client where
import Pipes
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Pipes.Prelude as PipesPrelude
import qualified Pipes.Parse as PP
import qualified Command as C
pageSize :: Int
pageSize = 4096
-- pure handler, to be used with PipesPrelude.amp
pureHandler :: C.Command -> C.Command
pureHandler c = c -- answer the same command received from the server
-- inpure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
liftIO $ putStrLn $ "Received: " ++ (show c)
return C.DoNothing -- whatever is received from server, answer DoNothing
main :: IO ()
main = PNT.connect ("127.0.0.1") "23456" $
\(connectionSocket, remoteAddress) -> do
putStrLn $ "Connected to distant server ip = " ++ (show remoteAddress)
sendFirstMessage connectionSocket
_ <- runEffect $ do
let bytesReceiver = PNT.fromSocket connectionSocket pageSize
let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
return ()
sendFirstMessage :: PNT.Socket -> IO ()
sendFirstMessage s = do
_ <- runEffect $ do
let encodedProducer = PipesBinary.encode C.FirstMessage
encodedProducer >-> PNT.toSocket s
return ()