Suche…


Bemerkungen

Wie die Hacker-Seite beschreibt:

Pipes ist eine saubere und leistungsstarke Streamverarbeitungsbibliothek, mit der Sie wiederverwendbare Streaming-Komponenten erstellen und verbinden können

Programme, die durch Streaming implementiert werden, sind oft kurz und komponierbar, mit einfachen, kurzen Funktionen, die es Ihnen ermöglichen, Funktionen einfach mit der Unterstützung des Haskell-Systems "ein- oder auszusteigen".

await :: Monad m => Consumer' ama

Holt einen Wert aus dem Upstream, wobei a unser Eingabetyp ist.

yield :: Monad m => a -> Producer' am ()

Produzieren Sie einen Wert, wobei a der Ausgabetyp ist.

Es wird dringend empfohlen, das Embedded Pipes.Tutorial Paket Pipes.Tutorial , das einen hervorragenden Überblick über die Kernkonzepte von Pipes und die Interaktion zwischen Producer , Consumer und Effect bietet.

Erzeuger

Ein Producer ist einige monadischen Aktion, kann yield Werte für Downstream Verbrauch:

type Producer b = Proxy X () () b
yield :: Monad m => a -> Producer a m ()

Zum Beispiel:

naturals :: Monad m => Producer Int m ()
naturals = each [1..] -- each is a utility function exported by Pipes

Natürlich können wir auch Producer haben, die Funktionen anderer Werte sind:

naturalsUntil :: Monad m => Int -> Producer Int m ()
naturalsUntil n = each [1..n]

Verbraucher

Ein Consumer kann nur await Werte aus dem Upstream await .

type Consumer a = Proxy () a () X
await :: Monad m => Consumer a m a

Zum Beispiel:

fancyPrint :: MonadIO m => Consumer String m ()
fancyPrint = forever $ do
  numStr <- await
  liftIO $ putStrLn ("I received: " ++ numStr)

Pfeifen

Pipes können sowohl await als auch yield .

type Pipe a b = Proxy () a () b

Diese Pipe erwartet ein Int und wandelt es in einen String :

intToStr :: Monad m => Pipe Int String m ()
intToStr = forever $ await >>= (yield . show)

Pipes mit runEffect ausführen

Wir verwenden runEffect , um unsere Pipe auszuführen:

main :: IO ()
main = do
  runEffect $ naturalsUntil 10 >-> intToStr >-> fancyPrint

Beachten Sie, dass runEffect einen Effect benötigt, bei dem es sich um einen eigenständigen Proxy ohne Ein- oder Ausgänge handelt:

runEffect :: Monad m => Effect m r -> m r
type Effect = Proxy X () () X

(wobei X der leere Typ ist, auch bekannt als Void ).

Verbindungsrohre

Verwenden Sie >-> , um Producer , Consumer und Pipe miteinander zu verbinden, um größere Pipe Funktionen zu Pipe .

printNaturals :: MonadIO m => Effect m ()
printNaturals = naturalsUntil 10 >-> intToStr >-> fancyPrint

Producer , Consumer , Pipe und Effect Typen werden alle im Hinblick auf den allgemeinen Proxy Typ definiert. Daher kann >-> für verschiedene Zwecke verwendet werden. Typen, die durch das linke Argument definiert werden, müssen mit dem Typ übereinstimmen, der vom rechten Argument verwendet wird:

(>->) :: Monad m => Producer b m r -> Consumer b   m r -> Effect       m r
(>->) :: Monad m => Producer b m r -> Pipe     b c m r -> Producer   c m r
(>->) :: Monad m => Pipe   a b m r -> Consumer b   m r -> Consumer a   m r
(>->) :: Monad m => Pipe   a b m r -> Pipe     b c m r -> Pipe     a c m r

Der Proxy-Monadentransformator

Der Kerndatentyp von pipes ist der Proxy Monad-Transformator. Pipe , Producer , Consumer usw. werden in Form von Proxy .

Da Proxy ein Monadentransformator ist, haben die Definitionen von Pipe s die Form monadischer Skripte, die Werte await und yield , und zusätzlich Effekte von der Basismonad m ausführen.

Pipes und Netzwerkkommunikation kombinieren

Pipes unterstützt die einfache binäre Kommunikation zwischen einem Client und einem Server

In diesem Beispiel:

  1. Ein Client stellt eine Verbindung her und sendet eine FirstMessage
  2. Der Server empfängt und beantwortet DoSomething 0
  3. Der Kunde empfängt und beantwortet DoNothing
  4. Schritt 2 und 3 werden unbegrenzt wiederholt

Der Befehlsdatentyp, der über das Netzwerk ausgetauscht wird:

-- Command.hs
{-# LANGUAGE DeriveGeneric #-}
module Command where
import Data.Binary
import GHC.Generics (Generic)

data Command = FirstMessage
           | DoNothing
           | DoSomething Int
           deriving (Show,Generic)

instance Binary Command

Hier wartet der Server auf die Verbindung eines Clients:

module Server where

import Pipes 
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Command as C
import qualified Pipes.Parse as PP
import qualified Pipes.Prelude as PipesPrelude

pageSize :: Int
pageSize = 4096

-- pure handler, to be used with PipesPrelude.map
pureHandler :: C.Command -> C.Command 
pureHandler c = c  -- answers the same command that we have receveid

-- impure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
  liftIO $ putStrLn $ "received message = " ++ (show c)
  return $ C.DoSomething 0    
  -- whatever incoming command `c` from the client, answer DoSomething 0

main :: IO ()
main = PNT.serve (PNT.Host "127.0.0.1") "23456" $
  \(connectionSocket, remoteAddress) -> do
                 putStrLn $ "Remote connection from ip = " ++ (show remoteAddress)
                 _ <- runEffect $ do
                   let bytesReceiver = PNT.fromSocket connectionSocket pageSize
                   let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
                   commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
                   -- if we want to use the pureHandler
                   --commandDecoder >-> PipesPrelude.map pureHandler >-> for cat PipesBinary.Encode >-> PNT.toSocket connectionSocket
                 return ()

Der Client verbindet sich also:

module Client where

import Pipes
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Pipes.Prelude as PipesPrelude
import qualified Pipes.Parse as PP
import qualified Command as C

pageSize :: Int
pageSize = 4096

-- pure handler, to be used with PipesPrelude.amp
pureHandler :: C.Command -> C.Command 
pureHandler c = c  -- answer the same command received from the server

-- inpure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
  liftIO $ putStrLn $ "Received: " ++ (show c)
  return C.DoNothing  -- whatever is received from server, answer DoNothing

main :: IO ()
main = PNT.connect ("127.0.0.1") "23456" $
  \(connectionSocket, remoteAddress) -> do
    putStrLn $ "Connected to distant server ip = " ++ (show remoteAddress)
    sendFirstMessage connectionSocket
    _ <- runEffect $ do
      let bytesReceiver = PNT.fromSocket connectionSocket pageSize
      let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
      commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
    return ()

sendFirstMessage :: PNT.Socket -> IO ()
sendFirstMessage s = do
  _ <- runEffect $ do
    let encodedProducer = PipesBinary.encode C.FirstMessage 
    encodedProducer >-> PNT.toSocket s  
  return ()


Modified text is an extract of the original Stack Overflow Documentation
Lizenziert unter CC BY-SA 3.0
Nicht angeschlossen an Stack Overflow