Suche…
Grundlegende Kanaloperationen: Erstellen, Setzen, Nehmen, Schließen und Puffern.
core.async
geht es darum, Prozesse zu core.async
, die Werte aus Kanälen übernehmen und in Kanäle core.async
.
(require [clojure.core.async :as a])
Channels mit chan
Sie erstellen einen Kanal mit der chan
Funktion:
(def chan-0 (a/chan)) ;; unbuffered channel: acts as a rendez-vous point.
(def chan-1 (a/chan 3)) ;; channel with a buffer of size 3.
(def chan-2 (a/chan (a/dropping-buffer 3)) ;; channel with a *dropping* buffer of size 3
(def chan-3 (a/chan (a/sliding-buffer 3)) ;; channel with a *sliding* buffer of size 3
Werte in Kanäle einfügen mit >!!
und >!
Sie setzen Werte in einen Kanal mit >!!
:
(a/>!! my-channel :an-item)
Sie können einen beliebigen Wert (Strings, Zahlen, Karten, Sammlungen, Objekte, sogar andere Kanäle usw.) in einen Kanal mit Ausnahme von nil
:
;; WON'T WORK
(a/>!! my-channel nil)
=> IllegalArgumentException Can't put nil on channel
Je nach Puffer des Kanals >!!
kann den aktuellen Thread blockieren.
(let [ch (a/chan)] ;; unbuffered channel
(a/>!! ch :item)
;; the above call blocks, until another process
;; takes the item from the channel.
)
(let [ch (a/chan 3)] ;; channel with 3-size buffer
(a/>!! ch :item-1) ;; => true
(a/>!! ch :item-2) ;; => true
(a/>!! ch :item-3) ;; => true
(a/>!! ch :item-4)
;; now the buffer is full; blocks until :item-1 is taken from ch.
)
Innerhalb eines (go ...)
Blocks können und sollten Sie a/>!
statt a/>!!
:
(a/go (a/>! ch :item))
Das logische Verhalten ist dasselbe wie a/>!!
, aber nur der logische Prozess der Goroutine blockiert anstelle des tatsächlichen OS-Threads.
Verwenden Sie a/>!!
Innerhalb eines (go ...)
Blocks befindet sich ein Anti-Pattern:
;; NEVER DO THIS
(a/go
(a/>!! ch :item))
Werte aus Kanälen mit <!!
Sie nehmen einen Wert aus einem Kanal mit <!!
:
;; creating a channel
(def ch (a/chan 3))
;; putting some items in it
(do
(a/>!! ch :item-1)
(a/>!! ch :item-2)
(a/>!! ch :item-3))
;; taking a value
(a/<!! ch) ;; => :item-1
(a/<!! ch) ;; => :item-2
Wenn im Kanal kein Element verfügbar ist, wird a/<!!
blockiert den aktuellen Thread, bis ein Wert in den Kanal eingefügt wird (oder der Kanal geschlossen wird, siehe später):
(def ch (a/chan))
(a/<!! ch) ;; blocks until another process puts something into ch or closes it
Innerhalb eines (go ...)
Blocks können und sollten Sie a/<!
statt a/<!!
:
(a/go (let [x (a/<! ch)] ...))
Das logische Verhalten ist dasselbe wie a/<!!
, aber nur der logische Prozess der Goroutine blockiert anstelle des tatsächlichen OS-Threads.
Verwenden Sie a/<!!
Innerhalb eines (go ...)
Blocks befindet sich ein Anti-Pattern:
;; NEVER DO THIS
(a/go
(a/<!! ch))
Kanäle schließen
Sie schließen einen Kanal mit a/close!
:
(a/close! ch)
Sobald ein Kanal geschlossen ist und alle Daten im Kanal erschöpft sind, werden die Takes immer gleich nil
:
(def ch (a/chan 5))
;; putting 2 values in the channel, then closing it
(a/>!! ch :item-1)
(a/>!! ch :item-2)
(a/close! ch)
;; taking from ch will return the items that were put in it, then nil
(a/<!! ch) ;; => :item-1
(a/<!! ch) ;; => :item-2
(a/<!! ch) ;; => nil
(a/<!! ch) ;; => nil
(a/<!! ch) ;; => nil
;; once the channel is closed, >!! will have no effect on the channel:
(a/>!! ch :item-3)
=> false ;; false means the put did not succeed
(a/<!! ch) ;; => nil
Asynchrones put!
mit put!
Als Alternative zu a/>!!
(das kann sperren), kannst du a/put!
aufrufen a/put!
mit einem optionalen Rückruf einen Wert in einen Kanal in einem anderen Thread einfügen.
(a/put! ch :item)
(a/put! ch :item (fn once-put [closed?] ...)) ;; callback function, will receive
In ClojureScript ist das Blockieren des aktuellen Threads nicht möglich. a/>!!
wird nicht unterstützt und put!
ist die einzige Möglichkeit, Daten außerhalb eines (go)
-Blocks in einen Kanal einzufügen.
Asynchrone Takes mit take!
Als Alternative zu a/<!!
(das kann den aktuellen Thread blockieren), können Sie a/take!
einen Wert asynchron von einem Kanal übernehmen und an einen Rückruf übergeben.
(a/take! ch (fn [x] (do something with x)))
Verwenden von Puffer- und Schiebepuffern
Mit fallenden und gleitenden Puffern wird nie blockiert, wenn jedoch der Puffer voll ist, verlieren Sie Daten. Der Löschpuffer verliert die zuletzt hinzugefügten Daten, während Schiebepuffer die ersten hinzugefügten Daten verlieren.
Löschpuffer Beispiel:
(def ch (a/chan (a/dropping-buffer 2)))
;; putting more items than buffer size
(a/>!! ch :item-1)
=> true ;; put succeeded
(a/>!! ch :item-2)
=> true
(a/>!! ch :item-3)
=> false ;; put failed
;; no we take from the channel
(a/<!! ch)
=> :item-1
(a/<!! ch)
=> :item-2
(a/<!! ch)
;; blocks! :item-3 is lost
Beispiel für Schiebepuffer:
(def ch (a/chan (a/sliding-buffer 2)))
;; putting more items than buffer size
(a/>!! ch :item-1)
=> true
(a/>!! ch :item-2)
=> true
(a/>!! ch :item-3)
=> true
;; no when we take from the channel:
(a/<!! ch)
=> :item-2
(a/<!! ch)
=> :item-3
;; :item-1 was lost