Поиск…


основные операции с каналами: создание, размещение, принятие, закрытие и буферы.

core.async - это создание процессов, которые принимают значения и помещают значения в каналы .

(require [clojure.core.async :as a])

Создание каналов с помощью chan

Вы создаете канал, используя функцию chan :

(def chan-0 (a/chan)) ;; unbuffered channel: acts as a rendez-vous point.
(def chan-1 (a/chan 3)) ;; channel with a buffer of size 3. 
(def chan-2 (a/chan (a/dropping-buffer 3)) ;; channel with a *dropping* buffer of size 3
(def chan-3 (a/chan (a/sliding-buffer 3)) ;; channel with a *sliding* buffer of size 3

Ввод значений в каналы с помощью >!! и >!

Вы добавляете значения в канал с помощью >!! :

(a/>!! my-channel :an-item)

Вы можете поместить любое значение (строки, числа, карты, коллекции, объекты, другие каналы и т. Д.) В канал, кроме nil :

;; WON'T WORK
(a/>!! my-channel nil)
=> IllegalArgumentException Can't put nil on channel

В зависимости от буфера канала >!! может блокировать текущий поток.

(let [ch (a/chan)] ;; unbuffered channel
  (a/>!! ch :item) 
  ;; the above call blocks, until another process 
  ;; takes the item from the channel.
  )
(let [ch (a/chan 3)] ;; channel with 3-size buffer
  (a/>!! ch :item-1) ;; => true
  (a/>!! ch :item-2) ;; => true
  (a/>!! ch :item-3) ;; => true
  (a/>!! ch :item-4) 
  ;; now the buffer is full; blocks until :item-1 is taken from ch.
  )

Изнутри блока (go ...) вы можете - и должны - использовать a/>! вместо a/>!! :

 (a/go (a/>! ch :item))

Логическое поведение будет таким же, как a/>!! , но только логический процесс goroutine будет блокироваться вместо фактического потока ОС.

Используя a/>!! внутри блока (go ...) является анти-шаблон:

;; NEVER DO THIS
(a/go 
  (a/>!! ch :item))

Принимая значения из каналов с помощью <!!

Вы берете значение из канала, используя <!! :

;; creating a channel
(def ch (a/chan 3))
;; putting some items in it
(do 
  (a/>!! ch :item-1)
  (a/>!! ch :item-2)
  (a/>!! ch :item-3))
;; taking a value
(a/<!! ch) ;; => :item-1
(a/<!! ch) ;; => :item-2

Если в канале нет элемента, a/<!! будет блокировать текущий поток до тех пор, пока значение не будет помещено в канал (или канал не будет закрыт, см. ниже):

(def ch (a/chan))
(a/<!! ch) ;; blocks until another process puts something into ch or closes it

Изнутри блока (go ...) вы можете - и должны - использовать a/<! вместо a/<!! :

 (a/go (let [x (a/<! ch)] ...))

Логическое поведение будет таким же, как a/<!! , но только логический процесс goroutine будет блокироваться вместо фактического потока ОС.

Используя a/<!! внутри блока (go ...) является анти-шаблон:

;; NEVER DO THIS
(a/go 
  (a/<!! ch))

Закрывающие каналы

Вы закрываете канал с a/close! :

(a/close! ch)

Как только канал закрыт, и все данные в канале исчерпаны, take всегда будет возвращать nil :

(def ch (a/chan 5))

;; putting 2 values in the channel, then closing it
(a/>!! ch :item-1)
(a/>!! ch :item-2)
(a/close! ch)

;; taking from ch will return the items that were put in it, then nil
(a/<!! ch) ;; => :item-1
(a/<!! ch) ;; => :item-2
(a/<!! ch) ;; => nil
(a/<!! ch) ;; => nil
(a/<!! ch) ;; => nil

;; once the channel is closed, >!! will have no effect on the channel:
(a/>!! ch :item-3)
=> false ;; false means the put did not succeed
(a/<!! ch) ;; => nil

Асинхронный ставит с put!

В качестве альтернативы a/>!! (который может блокироваться), вы можете вызвать a/put! для размещения значения в канале в другом потоке с дополнительным обратным вызовом.

(a/put! ch :item)
(a/put! ch :item (fn once-put [closed?] ...)) ;; callback function, will receive 

В ClojureScript, поскольку блокировка текущего потока невозможна, a/>!! не поддерживается и put! является единственным способом поместить данные в канал извне блока (go) .

Асинхронный take! с take!

В качестве альтернативы a/<!! (который может блокировать текущий поток), вы можете использовать a/take! для получения значения из канала асинхронно, передавая его на обратный вызов.

(a/take! ch (fn [x] (do something with x)))

Использование отбрасывающих и сдвижных буферов

С сбросом и сдвигом буферов, однако, никогда не блокируется, когда буфер заполнен, вы теряете данные. Отбрасывающий буфер теряет последние данные, тогда как скользящие буферы теряют первые добавленные данные.

Пример сброса буфера:

(def ch (a/chan (a/dropping-buffer 2)))

;; putting more items than buffer size
(a/>!! ch :item-1)
=> true ;; put succeeded
(a/>!! ch :item-2)
=> true
(a/>!! ch :item-3)
=> false ;; put failed

;; no we take from the channel
(a/<!! ch)
=> :item-1
(a/<!! ch)
=> :item-2
(a/<!! ch)
;; blocks! :item-3 is lost

Пример раздвижного буфера:

(def ch (a/chan (a/sliding-buffer 2)))

;; putting more items than buffer size
(a/>!! ch :item-1)
=> true
(a/>!! ch :item-2)
=> true
(a/>!! ch :item-3)
=> true

;; no when we take from the channel:
(a/<!! ch)
=> :item-2
(a/<!! ch)
=> :item-3
;; :item-1 was lost


Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow