Sök…


Anmärkningar

Som hackasidan beskriver:

rör är ett rent och kraftfullt strömbehandlingsbibliotek som låter dig bygga och ansluta återanvändbara strömningskomponenter

Program som implementeras genom streaming kan ofta vara kortfattade och komponerbara, med enkla, korta funktioner som gör att du enkelt kan "spela in eller ut" -funktioner med stöd av Haskell-systemet.

await :: Monad m => Consumer' ama

Drar ett värde från uppströms, där a är vår inmatningstyp.

yield :: Monad m => a -> Producer' am ()

Producera ett värde, där a är utgångstypen.

Det rekommenderas starkt att du läser igenom det inbäddade Pipes.Tutorial paketet som ger en utmärkt överblick över kärnbegreppen i Pipes och hur Producer , Consumer and Effect interagerar.

producenter

En Producer är en monadisk åtgärd som kan yield värden för nedströmsförbrukning:

type Producer b = Proxy X () () b
yield :: Monad m => a -> Producer a m ()

Till exempel:

naturals :: Monad m => Producer Int m ()
naturals = each [1..] -- each is a utility function exported by Pipes

Vi kan naturligtvis ha Producer som också är funktioner för andra värden:

naturalsUntil :: Monad m => Int -> Producer Int m ()
naturalsUntil n = each [1..n]

konsumenter

En Consumer kan bara await värden från uppströms.

type Consumer a = Proxy () a () X
await :: Monad m => Consumer a m a

Till exempel:

fancyPrint :: MonadIO m => Consumer String m ()
fancyPrint = forever $ do
  numStr <- await
  liftIO $ putStrLn ("I received: " ++ numStr)

rör

Rör kan både await och yield .

type Pipe a b = Proxy () a () b

Denna rör väntar på en Int och konverterar den till en String :

intToStr :: Monad m => Pipe Int String m ()
intToStr = forever $ await >>= (yield . show)

Körrör med runEffect

Vi använder runEffect att köra vårt Pipe :

main :: IO ()
main = do
  runEffect $ naturalsUntil 10 >-> intToStr >-> fancyPrint

Observera att runEffect kräver en Effect , som är en fristående Proxy utan ingångar eller utgångar:

runEffect :: Monad m => Effect m r -> m r
type Effect = Proxy X () () X

(där X är den tomma typen, även känd som Void ).

Anslutande rör

Använd >-> att ansluta Producer , Consumer och Pipe att komponera större Pipe .

printNaturals :: MonadIO m => Effect m ()
printNaturals = naturalsUntil 10 >-> intToStr >-> fancyPrint

Producer , Consumer , Pipe och Effect definieras i termer av den allmänna Proxy . Därför kan >-> användas för en mängd olika syften. Typer som definieras av det vänstra argumentet måste matcha den typ som konsumeras av det högra argumentet:

(>->) :: Monad m => Producer b m r -> Consumer b   m r -> Effect       m r
(>->) :: Monad m => Producer b m r -> Pipe     b c m r -> Producer   c m r
(>->) :: Monad m => Pipe   a b m r -> Consumer b   m r -> Consumer a   m r
(>->) :: Monad m => Pipe   a b m r -> Pipe     b c m r -> Pipe     a c m r

Proxy-monadtransformatorn

pipes kärndatatyp är Proxy monad-transformatorn. Pipe , Producer , Consumer och så vidare definieras i form av Proxy .

Eftersom Proxy är en monadtransformator, tar definitioner av Pipe s form av monadiska skript som await och yield värden, och dessutom utför effekter från basmonaden m .

Kombinera rör och nätverkskommunikation

Pipes stöder enkel binär kommunikation mellan en klient och en server

I detta exempel:

  1. en klient ansluter och skickar en FirstMessage
  2. servern tar emot och svarar DoSomething 0
  3. klienten tar emot och svarar på DoNothing
  4. steg 2 och 3 upprepas på obestämd tid

Kommandot datatyp utbyts över nätverket:

-- Command.hs
{-# LANGUAGE DeriveGeneric #-}
module Command where
import Data.Binary
import GHC.Generics (Generic)

data Command = FirstMessage
           | DoNothing
           | DoSomething Int
           deriving (Show,Generic)

instance Binary Command

Här väntar servern på att en klient ska ansluta:

module Server where

import Pipes 
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Command as C
import qualified Pipes.Parse as PP
import qualified Pipes.Prelude as PipesPrelude

pageSize :: Int
pageSize = 4096

-- pure handler, to be used with PipesPrelude.map
pureHandler :: C.Command -> C.Command 
pureHandler c = c  -- answers the same command that we have receveid

-- impure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
  liftIO $ putStrLn $ "received message = " ++ (show c)
  return $ C.DoSomething 0    
  -- whatever incoming command `c` from the client, answer DoSomething 0

main :: IO ()
main = PNT.serve (PNT.Host "127.0.0.1") "23456" $
  \(connectionSocket, remoteAddress) -> do
                 putStrLn $ "Remote connection from ip = " ++ (show remoteAddress)
                 _ <- runEffect $ do
                   let bytesReceiver = PNT.fromSocket connectionSocket pageSize
                   let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
                   commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
                   -- if we want to use the pureHandler
                   --commandDecoder >-> PipesPrelude.map pureHandler >-> for cat PipesBinary.Encode >-> PNT.toSocket connectionSocket
                 return ()

Klienten ansluter sålunda:

module Client where

import Pipes
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Pipes.Prelude as PipesPrelude
import qualified Pipes.Parse as PP
import qualified Command as C

pageSize :: Int
pageSize = 4096

-- pure handler, to be used with PipesPrelude.amp
pureHandler :: C.Command -> C.Command 
pureHandler c = c  -- answer the same command received from the server

-- inpure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
  liftIO $ putStrLn $ "Received: " ++ (show c)
  return C.DoNothing  -- whatever is received from server, answer DoNothing

main :: IO ()
main = PNT.connect ("127.0.0.1") "23456" $
  \(connectionSocket, remoteAddress) -> do
    putStrLn $ "Connected to distant server ip = " ++ (show remoteAddress)
    sendFirstMessage connectionSocket
    _ <- runEffect $ do
      let bytesReceiver = PNT.fromSocket connectionSocket pageSize
      let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
      commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
    return ()

sendFirstMessage :: PNT.Socket -> IO ()
sendFirstMessage s = do
  _ <- runEffect $ do
    let encodedProducer = PipesBinary.encode C.FirstMessage 
    encodedProducer >-> PNT.toSocket s  
  return ()


Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow