Поиск…


замечания

Как описано в хакерской странице :

pipe - это чистая и мощная библиотека обработки потоков, которая позволяет создавать и подключать многоразовые потоковые компоненты

Программы, реализованные посредством потоковой передачи, часто могут быть краткими и сложными, с простыми и короткими функциями, позволяющими легко «входить или выходить» с поддержкой системы типа Haskell.

await :: Monad m => Consumer' ama

Вытягивает значение из восходящего потока, где a - это наш тип ввода.

yield :: Monad m => a -> Producer' am ()

Произведите значение, где a - тип вывода.

Настоятельно рекомендуется прочитать встроенный пакет Pipes.Tutorial , который дает отличный обзор основных концепций Pipes и того, как взаимодействуют Producer , Consumer и Effect .

Производители

Producer - это одно монадическое действие, которое может yield значения для потребления в нисходящем потоке:

type Producer b = Proxy X () () b
yield :: Monad m => a -> Producer a m ()

Например:

naturals :: Monad m => Producer Int m ()
naturals = each [1..] -- each is a utility function exported by Pipes

Разумеется, у нас есть Producer s, которые также являются функциями других значений:

naturalsUntil :: Monad m => Int -> Producer Int m ()
naturalsUntil n = each [1..n]

Потребители

Consumer может только await значений от восходящего потока.

type Consumer a = Proxy () a () X
await :: Monad m => Consumer a m a

Например:

fancyPrint :: MonadIO m => Consumer String m ()
fancyPrint = forever $ do
  numStr <- await
  liftIO $ putStrLn ("I received: " ++ numStr)

трубы

Трубы могут await и yield .

type Pipe a b = Proxy () a () b

Эта труба ждет Int и преобразует ее в String :

intToStr :: Monad m => Pipe Int String m ()
intToStr = forever $ await >>= (yield . show)

Запуск труб с runEffect

Мы используем runEffect для запуска нашей Pipe :

main :: IO ()
main = do
  runEffect $ naturalsUntil 10 >-> intToStr >-> fancyPrint

Обратите внимание, что runEffect требует Effect , который является автономным Proxy без каких-либо входов или выходов:

runEffect :: Monad m => Effect m r -> m r
type Effect = Proxy X () () X

(где X - пустой тип, также известный как Void ).

Соединительные трубы

Используйте >-> для подключения Producer , Consumer и Pipe для создания более крупных функций Pipe .

printNaturals :: MonadIO m => Effect m ()
printNaturals = naturalsUntil 10 >-> intToStr >-> fancyPrint

Типы Producer , Consumer , Pipe и Effect определяются в терминах общего типа Proxy . Поэтому >-> может использоваться для различных целей. Типы, определенные левым аргументом, должны соответствовать типу, потребляемому правильным аргументом:

(>->) :: Monad m => Producer b m r -> Consumer b   m r -> Effect       m r
(>->) :: Monad m => Producer b m r -> Pipe     b c m r -> Producer   c m r
(>->) :: Monad m => Pipe   a b m r -> Consumer b   m r -> Consumer a   m r
(>->) :: Monad m => Pipe   a b m r -> Pipe     b c m r -> Pipe     a c m r

Трансформатор монумента Proxy

Основным типом данных pipes является преобразователь monad Proxy . Pipe , Producer , Consumer и т. Д. Определяются в терминах Proxy .

Поскольку Proxy является монадным трансформатором, определения Pipe s принимают форму монадических сценариев, которые await и yield значения, дополнительно выполняя эффекты от базовой монады m .

Объединение труб и сетевых коммуникаций

Трубы поддерживают простую двоичную связь между клиентом и сервером

В этом примере:

  1. клиент подключается и отправляет FirstMessage
  2. сервер получает и отвечает на DoSomething 0
  3. клиент получает и отвечает DoNothing
  4. шаги 2 и 3 повторяются бесконечно

Тип данных команды обменивается по сети:

-- Command.hs
{-# LANGUAGE DeriveGeneric #-}
module Command where
import Data.Binary
import GHC.Generics (Generic)

data Command = FirstMessage
           | DoNothing
           | DoSomething Int
           deriving (Show,Generic)

instance Binary Command

Здесь сервер ожидает подключения клиента:

module Server where

import Pipes 
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Command as C
import qualified Pipes.Parse as PP
import qualified Pipes.Prelude as PipesPrelude

pageSize :: Int
pageSize = 4096

-- pure handler, to be used with PipesPrelude.map
pureHandler :: C.Command -> C.Command 
pureHandler c = c  -- answers the same command that we have receveid

-- impure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
  liftIO $ putStrLn $ "received message = " ++ (show c)
  return $ C.DoSomething 0    
  -- whatever incoming command `c` from the client, answer DoSomething 0

main :: IO ()
main = PNT.serve (PNT.Host "127.0.0.1") "23456" $
  \(connectionSocket, remoteAddress) -> do
                 putStrLn $ "Remote connection from ip = " ++ (show remoteAddress)
                 _ <- runEffect $ do
                   let bytesReceiver = PNT.fromSocket connectionSocket pageSize
                   let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
                   commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
                   -- if we want to use the pureHandler
                   --commandDecoder >-> PipesPrelude.map pureHandler >-> for cat PipesBinary.Encode >-> PNT.toSocket connectionSocket
                 return ()

Клиент подключается таким образом:

module Client where

import Pipes
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Pipes.Prelude as PipesPrelude
import qualified Pipes.Parse as PP
import qualified Command as C

pageSize :: Int
pageSize = 4096

-- pure handler, to be used with PipesPrelude.amp
pureHandler :: C.Command -> C.Command 
pureHandler c = c  -- answer the same command received from the server

-- inpure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
  liftIO $ putStrLn $ "Received: " ++ (show c)
  return C.DoNothing  -- whatever is received from server, answer DoNothing

main :: IO ()
main = PNT.connect ("127.0.0.1") "23456" $
  \(connectionSocket, remoteAddress) -> do
    putStrLn $ "Connected to distant server ip = " ++ (show remoteAddress)
    sendFirstMessage connectionSocket
    _ <- runEffect $ do
      let bytesReceiver = PNT.fromSocket connectionSocket pageSize
      let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
      commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
    return ()

sendFirstMessage :: PNT.Socket -> IO ()
sendFirstMessage s = do
  _ <- runEffect $ do
    let encodedProducer = PipesBinary.encode C.FirstMessage 
    encodedProducer >-> PNT.toSocket s  
  return ()


Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow