Haskell Language
трубы
Поиск…
замечания
Как описано в хакерской странице :
pipe - это чистая и мощная библиотека обработки потоков, которая позволяет создавать и подключать многоразовые потоковые компоненты
Программы, реализованные посредством потоковой передачи, часто могут быть краткими и сложными, с простыми и короткими функциями, позволяющими легко «входить или выходить» с поддержкой системы типа Haskell.
await :: Monad m => Consumer' ama
Вытягивает значение из восходящего потока, где a
- это наш тип ввода.
yield :: Monad m => a -> Producer' am ()
Произведите значение, где a
- тип вывода.
Настоятельно рекомендуется прочитать встроенный пакет Pipes.Tutorial
, который дает отличный обзор основных концепций Pipes и того, как взаимодействуют Producer
, Consumer
и Effect
.
Производители
Producer
- это одно монадическое действие, которое может yield
значения для потребления в нисходящем потоке:
type Producer b = Proxy X () () b
yield :: Monad m => a -> Producer a m ()
Например:
naturals :: Monad m => Producer Int m ()
naturals = each [1..] -- each is a utility function exported by Pipes
Разумеется, у нас есть Producer
s, которые также являются функциями других значений:
naturalsUntil :: Monad m => Int -> Producer Int m ()
naturalsUntil n = each [1..n]
Потребители
Consumer
может только await
значений от восходящего потока.
type Consumer a = Proxy () a () X
await :: Monad m => Consumer a m a
Например:
fancyPrint :: MonadIO m => Consumer String m ()
fancyPrint = forever $ do
numStr <- await
liftIO $ putStrLn ("I received: " ++ numStr)
трубы
Трубы могут await
и yield
.
type Pipe a b = Proxy () a () b
Эта труба ждет Int
и преобразует ее в String
:
intToStr :: Monad m => Pipe Int String m ()
intToStr = forever $ await >>= (yield . show)
Запуск труб с runEffect
Мы используем runEffect
для запуска нашей Pipe
:
main :: IO ()
main = do
runEffect $ naturalsUntil 10 >-> intToStr >-> fancyPrint
Обратите внимание, что runEffect
требует Effect
, который является автономным Proxy
без каких-либо входов или выходов:
runEffect :: Monad m => Effect m r -> m r
type Effect = Proxy X () () X
(где X
- пустой тип, также известный как Void
).
Соединительные трубы
Используйте >->
для подключения Producer
, Consumer
и Pipe
для создания более крупных функций Pipe
.
printNaturals :: MonadIO m => Effect m ()
printNaturals = naturalsUntil 10 >-> intToStr >-> fancyPrint
Типы Producer
, Consumer
, Pipe
и Effect
определяются в терминах общего типа Proxy
. Поэтому >->
может использоваться для различных целей. Типы, определенные левым аргументом, должны соответствовать типу, потребляемому правильным аргументом:
(>->) :: Monad m => Producer b m r -> Consumer b m r -> Effect m r
(>->) :: Monad m => Producer b m r -> Pipe b c m r -> Producer c m r
(>->) :: Monad m => Pipe a b m r -> Consumer b m r -> Consumer a m r
(>->) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r
Трансформатор монумента Proxy
Основным типом данных pipes
является преобразователь monad Proxy
. Pipe
, Producer
, Consumer
и т. Д. Определяются в терминах Proxy
.
Поскольку Proxy
является монадным трансформатором, определения Pipe
s принимают форму монадических сценариев, которые await
и yield
значения, дополнительно выполняя эффекты от базовой монады m
.
Объединение труб и сетевых коммуникаций
Трубы поддерживают простую двоичную связь между клиентом и сервером
В этом примере:
- клиент подключается и отправляет
FirstMessage
- сервер получает и отвечает на
DoSomething 0
- клиент получает и отвечает
DoNothing
- шаги 2 и 3 повторяются бесконечно
Тип данных команды обменивается по сети:
-- Command.hs
{-# LANGUAGE DeriveGeneric #-}
module Command where
import Data.Binary
import GHC.Generics (Generic)
data Command = FirstMessage
| DoNothing
| DoSomething Int
deriving (Show,Generic)
instance Binary Command
Здесь сервер ожидает подключения клиента:
module Server where
import Pipes
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Command as C
import qualified Pipes.Parse as PP
import qualified Pipes.Prelude as PipesPrelude
pageSize :: Int
pageSize = 4096
-- pure handler, to be used with PipesPrelude.map
pureHandler :: C.Command -> C.Command
pureHandler c = c -- answers the same command that we have receveid
-- impure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
liftIO $ putStrLn $ "received message = " ++ (show c)
return $ C.DoSomething 0
-- whatever incoming command `c` from the client, answer DoSomething 0
main :: IO ()
main = PNT.serve (PNT.Host "127.0.0.1") "23456" $
\(connectionSocket, remoteAddress) -> do
putStrLn $ "Remote connection from ip = " ++ (show remoteAddress)
_ <- runEffect $ do
let bytesReceiver = PNT.fromSocket connectionSocket pageSize
let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
-- if we want to use the pureHandler
--commandDecoder >-> PipesPrelude.map pureHandler >-> for cat PipesBinary.Encode >-> PNT.toSocket connectionSocket
return ()
Клиент подключается таким образом:
module Client where
import Pipes
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Pipes.Prelude as PipesPrelude
import qualified Pipes.Parse as PP
import qualified Command as C
pageSize :: Int
pageSize = 4096
-- pure handler, to be used with PipesPrelude.amp
pureHandler :: C.Command -> C.Command
pureHandler c = c -- answer the same command received from the server
-- inpure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
liftIO $ putStrLn $ "Received: " ++ (show c)
return C.DoNothing -- whatever is received from server, answer DoNothing
main :: IO ()
main = PNT.connect ("127.0.0.1") "23456" $
\(connectionSocket, remoteAddress) -> do
putStrLn $ "Connected to distant server ip = " ++ (show remoteAddress)
sendFirstMessage connectionSocket
_ <- runEffect $ do
let bytesReceiver = PNT.fromSocket connectionSocket pageSize
let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
return ()
sendFirstMessage :: PNT.Socket -> IO ()
sendFirstMessage s = do
_ <- runEffect $ do
let encodedProducer = PipesBinary.encode C.FirstMessage
encodedProducer >-> PNT.toSocket s
return ()