Поиск…
основные операции с каналами: создание, размещение, принятие, закрытие и буферы.
core.async
- это создание процессов, которые принимают значения и помещают значения в каналы .
(require [clojure.core.async :as a])
Создание каналов с помощью chan
Вы создаете канал, используя функцию chan
:
(def chan-0 (a/chan)) ;; unbuffered channel: acts as a rendez-vous point.
(def chan-1 (a/chan 3)) ;; channel with a buffer of size 3.
(def chan-2 (a/chan (a/dropping-buffer 3)) ;; channel with a *dropping* buffer of size 3
(def chan-3 (a/chan (a/sliding-buffer 3)) ;; channel with a *sliding* buffer of size 3
Ввод значений в каналы с помощью >!!
и >!
Вы добавляете значения в канал с помощью >!!
:
(a/>!! my-channel :an-item)
Вы можете поместить любое значение (строки, числа, карты, коллекции, объекты, другие каналы и т. Д.) В канал, кроме nil
:
;; WON'T WORK
(a/>!! my-channel nil)
=> IllegalArgumentException Can't put nil on channel
В зависимости от буфера канала >!!
может блокировать текущий поток.
(let [ch (a/chan)] ;; unbuffered channel
(a/>!! ch :item)
;; the above call blocks, until another process
;; takes the item from the channel.
)
(let [ch (a/chan 3)] ;; channel with 3-size buffer
(a/>!! ch :item-1) ;; => true
(a/>!! ch :item-2) ;; => true
(a/>!! ch :item-3) ;; => true
(a/>!! ch :item-4)
;; now the buffer is full; blocks until :item-1 is taken from ch.
)
Изнутри блока (go ...)
вы можете - и должны - использовать a/>!
вместо a/>!!
:
(a/go (a/>! ch :item))
Логическое поведение будет таким же, как a/>!!
, но только логический процесс goroutine будет блокироваться вместо фактического потока ОС.
Используя a/>!!
внутри блока (go ...)
является анти-шаблон:
;; NEVER DO THIS
(a/go
(a/>!! ch :item))
Принимая значения из каналов с помощью <!!
Вы берете значение из канала, используя <!!
:
;; creating a channel
(def ch (a/chan 3))
;; putting some items in it
(do
(a/>!! ch :item-1)
(a/>!! ch :item-2)
(a/>!! ch :item-3))
;; taking a value
(a/<!! ch) ;; => :item-1
(a/<!! ch) ;; => :item-2
Если в канале нет элемента, a/<!!
будет блокировать текущий поток до тех пор, пока значение не будет помещено в канал (или канал не будет закрыт, см. ниже):
(def ch (a/chan))
(a/<!! ch) ;; blocks until another process puts something into ch or closes it
Изнутри блока (go ...)
вы можете - и должны - использовать a/<!
вместо a/<!!
:
(a/go (let [x (a/<! ch)] ...))
Логическое поведение будет таким же, как a/<!!
, но только логический процесс goroutine будет блокироваться вместо фактического потока ОС.
Используя a/<!!
внутри блока (go ...)
является анти-шаблон:
;; NEVER DO THIS
(a/go
(a/<!! ch))
Закрывающие каналы
Вы закрываете канал с a/close!
:
(a/close! ch)
Как только канал закрыт, и все данные в канале исчерпаны, take всегда будет возвращать nil
:
(def ch (a/chan 5))
;; putting 2 values in the channel, then closing it
(a/>!! ch :item-1)
(a/>!! ch :item-2)
(a/close! ch)
;; taking from ch will return the items that were put in it, then nil
(a/<!! ch) ;; => :item-1
(a/<!! ch) ;; => :item-2
(a/<!! ch) ;; => nil
(a/<!! ch) ;; => nil
(a/<!! ch) ;; => nil
;; once the channel is closed, >!! will have no effect on the channel:
(a/>!! ch :item-3)
=> false ;; false means the put did not succeed
(a/<!! ch) ;; => nil
Асинхронный ставит с put!
В качестве альтернативы a/>!!
(который может блокироваться), вы можете вызвать a/put!
для размещения значения в канале в другом потоке с дополнительным обратным вызовом.
(a/put! ch :item)
(a/put! ch :item (fn once-put [closed?] ...)) ;; callback function, will receive
В ClojureScript, поскольку блокировка текущего потока невозможна, a/>!!
не поддерживается и put!
является единственным способом поместить данные в канал извне блока (go)
.
Асинхронный take!
с take!
В качестве альтернативы a/<!!
(который может блокировать текущий поток), вы можете использовать a/take!
для получения значения из канала асинхронно, передавая его на обратный вызов.
(a/take! ch (fn [x] (do something with x)))
Использование отбрасывающих и сдвижных буферов
С сбросом и сдвигом буферов, однако, никогда не блокируется, когда буфер заполнен, вы теряете данные. Отбрасывающий буфер теряет последние данные, тогда как скользящие буферы теряют первые добавленные данные.
Пример сброса буфера:
(def ch (a/chan (a/dropping-buffer 2)))
;; putting more items than buffer size
(a/>!! ch :item-1)
=> true ;; put succeeded
(a/>!! ch :item-2)
=> true
(a/>!! ch :item-3)
=> false ;; put failed
;; no we take from the channel
(a/<!! ch)
=> :item-1
(a/<!! ch)
=> :item-2
(a/<!! ch)
;; blocks! :item-3 is lost
Пример раздвижного буфера:
(def ch (a/chan (a/sliding-buffer 2)))
;; putting more items than buffer size
(a/>!! ch :item-1)
=> true
(a/>!! ch :item-2)
=> true
(a/>!! ch :item-3)
=> true
;; no when we take from the channel:
(a/<!! ch)
=> :item-2
(a/<!! ch)
=> :item-3
;; :item-1 was lost