926 lines
30 KiB
Clojure
926 lines
30 KiB
Clojure
(ns cljs.core.async
|
|
(:refer-clojure :exclude [reduce transduce into merge map take partition partition-by])
|
|
(:require [cljs.core.async.impl.protocols :as impl]
|
|
[cljs.core.async.impl.channels :as channels]
|
|
[cljs.core.async.impl.buffers :as buffers]
|
|
[cljs.core.async.impl.timers :as timers]
|
|
[cljs.core.async.impl.dispatch :as dispatch]
|
|
[cljs.core.async.impl.ioc-helpers :as helpers]
|
|
[goog.array :as garray])
|
|
(:require-macros [cljs.core.async.impl.ioc-macros :as ioc]
|
|
[cljs.core.async :refer [go go-loop]]))
|
|
|
|
(defn- fn-handler
|
|
([f] (fn-handler f true))
|
|
([f blockable]
|
|
(reify
|
|
impl/Handler
|
|
(active? [_] true)
|
|
(blockable? [_] blockable)
|
|
(commit [_] f))))
|
|
|
|
(defn buffer
|
|
"Returns a fixed buffer of size n. When full, puts will block/park."
|
|
[n]
|
|
(buffers/fixed-buffer n))
|
|
|
|
(defn dropping-buffer
|
|
"Returns a buffer of size n. When full, puts will complete but
|
|
val will be dropped (no transfer)."
|
|
[n]
|
|
(buffers/dropping-buffer n))
|
|
|
|
(defn sliding-buffer
|
|
"Returns a buffer of size n. When full, puts will complete, and be
|
|
buffered, but oldest elements in buffer will be dropped (not
|
|
transferred)."
|
|
[n]
|
|
(buffers/sliding-buffer n))
|
|
|
|
(defn unblocking-buffer?
|
|
"Returns true if a channel created with buff will never block. That is to say,
|
|
puts into this buffer will never cause the buffer to be full. "
|
|
[buff]
|
|
(satisfies? impl/UnblockingBuffer buff))
|
|
|
|
(defn chan
|
|
"Creates a channel with an optional buffer, an optional transducer (like (map f),
|
|
(filter p) etc or a composition thereof), and an optional exception handler.
|
|
If buf-or-n is a number, will create and use a fixed buffer of that size. If a
|
|
transducer is supplied a buffer must be specified. ex-handler must be a
|
|
fn of one argument - if an exception occurs during transformation it will be called
|
|
with the thrown value as an argument, and any non-nil return value will be placed
|
|
in the channel."
|
|
([] (chan nil))
|
|
([buf-or-n] (chan buf-or-n nil nil))
|
|
([buf-or-n xform] (chan buf-or-n xform nil))
|
|
([buf-or-n xform ex-handler]
|
|
(let [buf-or-n (if (= buf-or-n 0)
|
|
nil
|
|
buf-or-n)]
|
|
(when xform (assert buf-or-n "buffer must be supplied when transducer is"))
|
|
(channels/chan (if (number? buf-or-n)
|
|
(buffer buf-or-n)
|
|
buf-or-n)
|
|
xform
|
|
ex-handler))))
|
|
|
|
(defn promise-chan
|
|
"Creates a promise channel with an optional transducer, and an optional
|
|
exception-handler. A promise channel can take exactly one value that consumers
|
|
will receive. Once full, puts complete but val is dropped (no transfer).
|
|
Consumers will block until either a value is placed in the channel or the
|
|
channel is closed. See chan for the semantics of xform and ex-handler."
|
|
([] (promise-chan nil))
|
|
([xform] (promise-chan xform nil))
|
|
([xform ex-handler]
|
|
(chan (buffers/promise-buffer) xform ex-handler)))
|
|
|
|
(defn timeout
|
|
"Returns a channel that will close after msecs"
|
|
[msecs]
|
|
(timers/timeout msecs))
|
|
|
|
(defn <!
|
|
"takes a val from port. Must be called inside a (go ...) block. Will
|
|
return nil if closed. Will park if nothing is available.
|
|
Returns true unless port is already closed"
|
|
[port]
|
|
(throw (js/Error. "<! used not in (go ...) block")))
|
|
|
|
(defn take!
|
|
"Asynchronously takes a val from port, passing to fn1. Will pass nil
|
|
if closed. If on-caller? (default true) is true, and value is
|
|
immediately available, will call fn1 on calling thread.
|
|
Returns nil."
|
|
([port fn1] (take! port fn1 true))
|
|
([port fn1 on-caller?]
|
|
(let [ret (impl/take! port (fn-handler fn1))]
|
|
(when ret
|
|
(let [val @ret]
|
|
(if on-caller?
|
|
(fn1 val)
|
|
(dispatch/run #(fn1 val)))))
|
|
nil)))
|
|
|
|
(defn- nop [_])
|
|
(def ^:private fhnop (fn-handler nop))
|
|
|
|
(defn >!
|
|
"puts a val into port. nil values are not allowed. Must be called
|
|
inside a (go ...) block. Will park if no buffer space is available.
|
|
Returns true unless port is already closed."
|
|
[port val]
|
|
(throw (js/Error. ">! used not in (go ...) block")))
|
|
|
|
(defn put!
|
|
"Asynchronously puts a val into port, calling fn1 (if supplied) when
|
|
complete. nil values are not allowed. Will throw if closed. If
|
|
on-caller? (default true) is true, and the put is immediately
|
|
accepted, will call fn1 on calling thread. Returns nil."
|
|
([port val]
|
|
(if-let [ret (impl/put! port val fhnop)]
|
|
@ret
|
|
true))
|
|
([port val fn1] (put! port val fn1 true))
|
|
([port val fn1 on-caller?]
|
|
(if-let [retb (impl/put! port val (fn-handler fn1))]
|
|
(let [ret @retb]
|
|
(if on-caller?
|
|
(fn1 ret)
|
|
(dispatch/run #(fn1 ret)))
|
|
ret)
|
|
true)))
|
|
|
|
(defn close!
|
|
([port]
|
|
(impl/close! port)))
|
|
|
|
|
|
(defn- random-array
|
|
[n]
|
|
(let [a (make-array n)]
|
|
(dotimes [x n]
|
|
(aset a x x))
|
|
(garray/shuffle a)
|
|
a))
|
|
|
|
(defn- alt-flag []
|
|
(let [flag (atom true)]
|
|
(reify
|
|
impl/Handler
|
|
(active? [_] @flag)
|
|
(blockable? [_] true)
|
|
(commit [_]
|
|
(reset! flag nil)
|
|
true))))
|
|
|
|
(defn- alt-handler [flag cb]
|
|
(reify
|
|
impl/Handler
|
|
(active? [_] (impl/active? flag))
|
|
(blockable? [_] true)
|
|
(commit [_]
|
|
(impl/commit flag)
|
|
cb)))
|
|
|
|
(defn do-alts
|
|
"returns derefable [val port] if immediate, nil if enqueued"
|
|
[fret ports opts]
|
|
(assert (pos? (count ports)) "alts must have at least one channel operation")
|
|
(let [flag (alt-flag)
|
|
n (count ports)
|
|
idxs (random-array n)
|
|
priority (:priority opts)
|
|
ret
|
|
(loop [i 0]
|
|
(when (< i n)
|
|
(let [idx (if priority i (aget idxs i))
|
|
port (nth ports idx)
|
|
wport (when (vector? port) (port 0))
|
|
vbox (if wport
|
|
(let [val (port 1)]
|
|
(impl/put! wport val (alt-handler flag #(fret [% wport]))))
|
|
(impl/take! port (alt-handler flag #(fret [% port]))))]
|
|
(if vbox
|
|
(channels/box [@vbox (or wport port)])
|
|
(recur (inc i))))))]
|
|
(or
|
|
ret
|
|
(when (contains? opts :default)
|
|
(when-let [got (and (impl/active? flag) (impl/commit flag))]
|
|
(channels/box [(:default opts) :default]))))))
|
|
|
|
(defn alts!
|
|
"Completes at most one of several channel operations. Must be called
|
|
inside a (go ...) block. ports is a vector of channel endpoints,
|
|
which can be either a channel to take from or a vector of
|
|
[channel-to-put-to val-to-put], in any combination. Takes will be
|
|
made as if by <!, and puts will be made as if by >!. Unless
|
|
the :priority option is true, if more than one port operation is
|
|
ready a non-deterministic choice will be made. If no operation is
|
|
ready and a :default value is supplied, [default-val :default] will
|
|
be returned, otherwise alts! will park until the first operation to
|
|
become ready completes. Returns [val port] of the completed
|
|
operation, where val is the value taken for takes, and a
|
|
boolean (true unless already closed, as per put!) for puts.
|
|
|
|
opts are passed as :key val ... Supported options:
|
|
|
|
:default val - the value to use if none of the operations are immediately ready
|
|
:priority true - (default nil) when true, the operations will be tried in order.
|
|
|
|
Note: there is no guarantee that the port exps or val exprs will be
|
|
used, nor in what order should they be, so they should not be
|
|
depended upon for side effects."
|
|
|
|
[ports & {:as opts}]
|
|
(throw (js/Error. "alts! used not in (go ...) block")))
|
|
|
|
(defn offer!
|
|
"Puts a val into port if it's possible to do so immediately.
|
|
nil values are not allowed. Never blocks. Returns true if offer succeeds."
|
|
[port val]
|
|
(let [ret (impl/put! port val (fn-handler nop false))]
|
|
(when ret @ret)))
|
|
|
|
(defn poll!
|
|
"Takes a val from port if it's possible to do so immediately.
|
|
Never blocks. Returns value if successful, nil otherwise."
|
|
[port]
|
|
(let [ret (impl/take! port (fn-handler nop false))]
|
|
(when ret @ret)))
|
|
|
|
;;;;;;; channel ops
|
|
|
|
(defn pipe
|
|
"Takes elements from the from channel and supplies them to the to
|
|
channel. By default, the to channel will be closed when the from
|
|
channel closes, but can be determined by the close? parameter. Will
|
|
stop consuming the from channel if the to channel closes"
|
|
|
|
([from to] (pipe from to true))
|
|
([from to close?]
|
|
(go-loop []
|
|
(let [v (<! from)]
|
|
(if (nil? v)
|
|
(when close? (close! to))
|
|
(when (>! to v)
|
|
(recur)))))
|
|
to))
|
|
|
|
(defn- pipeline*
|
|
([n to xf from close? ex-handler type]
|
|
(assert (pos? n))
|
|
(let [jobs (chan n)
|
|
results (chan n)
|
|
process (fn [[v p :as job]]
|
|
(if (nil? job)
|
|
(do (close! results) nil)
|
|
(let [res (chan 1 xf ex-handler)]
|
|
(go
|
|
(>! res v)
|
|
(close! res))
|
|
(put! p res)
|
|
true)))
|
|
async (fn [[v p :as job]]
|
|
(if (nil? job)
|
|
(do (close! results) nil)
|
|
(let [res (chan 1)]
|
|
(xf v res)
|
|
(put! p res)
|
|
true)))]
|
|
(dotimes [_ n]
|
|
(case type
|
|
:compute (go-loop []
|
|
(let [job (<! jobs)]
|
|
(when (process job)
|
|
(recur))))
|
|
:async (go-loop []
|
|
(let [job (<! jobs)]
|
|
(when (async job)
|
|
(recur))))))
|
|
(go-loop []
|
|
(let [v (<! from)]
|
|
(if (nil? v)
|
|
(close! jobs)
|
|
(let [p (chan 1)]
|
|
(>! jobs [v p])
|
|
(>! results p)
|
|
(recur)))))
|
|
(go-loop []
|
|
(let [p (<! results)]
|
|
(if (nil? p)
|
|
(when close? (close! to))
|
|
(let [res (<! p)]
|
|
(loop []
|
|
(let [v (<! res)]
|
|
(when (and (not (nil? v)) (>! to v))
|
|
(recur))))
|
|
(recur))))))))
|
|
|
|
(defn pipeline-async
|
|
"Takes elements from the from channel and supplies them to the to
|
|
channel, subject to the async function af, with parallelism n. af
|
|
must be a function of two arguments, the first an input value and
|
|
the second a channel on which to place the result(s). af must close!
|
|
the channel before returning. The presumption is that af will
|
|
return immediately, having launched some asynchronous operation
|
|
whose completion/callback will manipulate the result channel. Outputs
|
|
will be returned in order relative to the inputs. By default, the to
|
|
channel will be closed when the from channel closes, but can be
|
|
determined by the close? parameter. Will stop consuming the from
|
|
channel if the to channel closes."
|
|
([n to af from] (pipeline-async n to af from true))
|
|
([n to af from close?] (pipeline* n to af from close? nil :async)))
|
|
|
|
(defn pipeline
|
|
"Takes elements from the from channel and supplies them to the to
|
|
channel, subject to the transducer xf, with parallelism n. Because
|
|
it is parallel, the transducer will be applied independently to each
|
|
element, not across elements, and may produce zero or more outputs
|
|
per input. Outputs will be returned in order relative to the
|
|
inputs. By default, the to channel will be closed when the from
|
|
channel closes, but can be determined by the close? parameter. Will
|
|
stop consuming the from channel if the to channel closes.
|
|
|
|
Note this is supplied for API compatibility with the Clojure version.
|
|
Values of N > 1 will not result in actual concurrency in a
|
|
single-threaded runtime."
|
|
([n to xf from] (pipeline n to xf from true))
|
|
([n to xf from close?] (pipeline n to xf from close? nil))
|
|
([n to xf from close? ex-handler] (pipeline* n to xf from close? ex-handler :compute)))
|
|
|
|
(defn split
|
|
"Takes a predicate and a source channel and returns a vector of two
|
|
channels, the first of which will contain the values for which the
|
|
predicate returned true, the second those for which it returned
|
|
false.
|
|
|
|
The out channels will be unbuffered by default, or two buf-or-ns can
|
|
be supplied. The channels will close after the source channel has
|
|
closed."
|
|
([p ch] (split p ch nil nil))
|
|
([p ch t-buf-or-n f-buf-or-n]
|
|
(let [tc (chan t-buf-or-n)
|
|
fc (chan f-buf-or-n)]
|
|
(go-loop []
|
|
(let [v (<! ch)]
|
|
(if (nil? v)
|
|
(do (close! tc) (close! fc))
|
|
(when (>! (if (p v) tc fc) v)
|
|
(recur)))))
|
|
[tc fc])))
|
|
|
|
(defn reduce
|
|
"f should be a function of 2 arguments. Returns a channel containing
|
|
the single result of applying f to init and the first item from the
|
|
channel, then applying f to that result and the 2nd item, etc. If
|
|
the channel closes without yielding items, returns init and f is not
|
|
called. ch must close before reduce produces a result."
|
|
[f init ch]
|
|
(go-loop [ret init]
|
|
(let [v (<! ch)]
|
|
(if (nil? v)
|
|
ret
|
|
(let [ret' (f ret v)]
|
|
(if (reduced? ret')
|
|
@ret'
|
|
(recur ret')))))))
|
|
|
|
(defn transduce
|
|
"async/reduces a channel with a transformation (xform f).
|
|
Returns a channel containing the result. ch must close before
|
|
transduce produces a result."
|
|
[xform f init ch]
|
|
(let [f (xform f)]
|
|
(go
|
|
(let [ret (<! (reduce f init ch))]
|
|
(f ret)))))
|
|
|
|
(defn onto-chan
|
|
"Puts the contents of coll into the supplied channel.
|
|
|
|
By default the channel will be closed after the items are copied,
|
|
but can be determined by the close? parameter.
|
|
|
|
Returns a channel which will close after the items are copied."
|
|
([ch coll] (onto-chan ch coll true))
|
|
([ch coll close?]
|
|
(go-loop [vs (seq coll)]
|
|
(if (and vs (>! ch (first vs)))
|
|
(recur (next vs))
|
|
(when close?
|
|
(close! ch))))))
|
|
|
|
|
|
(defn to-chan
|
|
"Creates and returns a channel which contains the contents of coll,
|
|
closing when exhausted."
|
|
[coll]
|
|
(let [ch (chan (bounded-count 100 coll))]
|
|
(onto-chan ch coll)
|
|
ch))
|
|
|
|
|
|
(defprotocol Mux
|
|
(muxch* [_]))
|
|
|
|
(defprotocol Mult
|
|
(tap* [m ch close?])
|
|
(untap* [m ch])
|
|
(untap-all* [m]))
|
|
|
|
(defn mult
|
|
"Creates and returns a mult(iple) of the supplied channel. Channels
|
|
containing copies of the channel can be created with 'tap', and
|
|
detached with 'untap'.
|
|
|
|
Each item is distributed to all taps in parallel and synchronously,
|
|
i.e. each tap must accept before the next item is distributed. Use
|
|
buffering/windowing to prevent slow taps from holding up the mult.
|
|
|
|
Items received when there are no taps get dropped.
|
|
|
|
If a tap puts to a closed channel, it will be removed from the mult."
|
|
[ch]
|
|
(let [cs (atom {}) ;;ch->close?
|
|
m (reify
|
|
Mux
|
|
(muxch* [_] ch)
|
|
|
|
Mult
|
|
(tap* [_ ch close?] (swap! cs assoc ch close?) nil)
|
|
(untap* [_ ch] (swap! cs dissoc ch) nil)
|
|
(untap-all* [_] (reset! cs {}) nil))
|
|
dchan (chan 1)
|
|
dctr (atom nil)
|
|
done (fn [_] (when (zero? (swap! dctr dec))
|
|
(put! dchan true)))]
|
|
(go-loop []
|
|
(let [val (<! ch)]
|
|
(if (nil? val)
|
|
(doseq [[c close?] @cs]
|
|
(when close? (close! c)))
|
|
(let [chs (keys @cs)]
|
|
(reset! dctr (count chs))
|
|
(doseq [c chs]
|
|
(when-not (put! c val done)
|
|
(done nil)
|
|
(untap* m c)))
|
|
;;wait for all
|
|
(when (seq chs)
|
|
(<! dchan))
|
|
(recur)))))
|
|
m))
|
|
|
|
(defn tap
|
|
"Copies the mult source onto the supplied channel.
|
|
|
|
By default the channel will be closed when the source closes,
|
|
but can be determined by the close? parameter."
|
|
([mult ch] (tap mult ch true))
|
|
([mult ch close?] (tap* mult ch close?) ch))
|
|
|
|
(defn untap
|
|
"Disconnects a target channel from a mult"
|
|
[mult ch]
|
|
(untap* mult ch))
|
|
|
|
(defn untap-all
|
|
"Disconnects all target channels from a mult"
|
|
[mult] (untap-all* mult))
|
|
|
|
(defprotocol Mix
|
|
(admix* [m ch])
|
|
(unmix* [m ch])
|
|
(unmix-all* [m])
|
|
(toggle* [m state-map])
|
|
(solo-mode* [m mode]))
|
|
|
|
(defn ioc-alts! [state cont-block ports & {:as opts}]
|
|
(ioc/aset-all! state helpers/STATE-IDX cont-block)
|
|
(when-let [cb (cljs.core.async/do-alts
|
|
(fn [val]
|
|
(ioc/aset-all! state helpers/VALUE-IDX val)
|
|
(helpers/run-state-machine-wrapped state))
|
|
ports
|
|
opts)]
|
|
(ioc/aset-all! state helpers/VALUE-IDX @cb)
|
|
:recur))
|
|
|
|
(defn mix
|
|
"Creates and returns a mix of one or more input channels which will
|
|
be put on the supplied out channel. Input sources can be added to
|
|
the mix with 'admix', and removed with 'unmix'. A mix supports
|
|
soloing, muting and pausing multiple inputs atomically using
|
|
'toggle', and can solo using either muting or pausing as determined
|
|
by 'solo-mode'.
|
|
|
|
Each channel can have zero or more boolean modes set via 'toggle':
|
|
|
|
:solo - when true, only this (ond other soloed) channel(s) will appear
|
|
in the mix output channel. :mute and :pause states of soloed
|
|
channels are ignored. If solo-mode is :mute, non-soloed
|
|
channels are muted, if :pause, non-soloed channels are
|
|
paused.
|
|
|
|
:mute - muted channels will have their contents consumed but not included in the mix
|
|
:pause - paused channels will not have their contents consumed (and thus also not included in the mix)
|
|
"
|
|
[out]
|
|
(let [cs (atom {}) ;;ch->attrs-map
|
|
solo-modes #{:mute :pause}
|
|
attrs (conj solo-modes :solo)
|
|
solo-mode (atom :mute)
|
|
change (chan)
|
|
changed #(put! change true)
|
|
pick (fn [attr chs]
|
|
(reduce-kv
|
|
(fn [ret c v]
|
|
(if (attr v)
|
|
(conj ret c)
|
|
ret))
|
|
#{} chs))
|
|
calc-state (fn []
|
|
(let [chs @cs
|
|
mode @solo-mode
|
|
solos (pick :solo chs)
|
|
pauses (pick :pause chs)]
|
|
{:solos solos
|
|
:mutes (pick :mute chs)
|
|
:reads (conj
|
|
(if (and (= mode :pause) (not (empty? solos)))
|
|
(vec solos)
|
|
(vec (remove pauses (keys chs))))
|
|
change)}))
|
|
m (reify
|
|
Mux
|
|
(muxch* [_] out)
|
|
Mix
|
|
(admix* [_ ch] (swap! cs assoc ch {}) (changed))
|
|
(unmix* [_ ch] (swap! cs dissoc ch) (changed))
|
|
(unmix-all* [_] (reset! cs {}) (changed))
|
|
(toggle* [_ state-map] (swap! cs (partial merge-with cljs.core/merge) state-map) (changed))
|
|
(solo-mode* [_ mode]
|
|
(assert (solo-modes mode) (str "mode must be one of: " solo-modes))
|
|
(reset! solo-mode mode)
|
|
(changed)))]
|
|
(go-loop [{:keys [solos mutes reads] :as state} (calc-state)]
|
|
(let [[v c] (alts! reads)]
|
|
(if (or (nil? v) (= c change))
|
|
(do (when (nil? v)
|
|
(swap! cs dissoc c))
|
|
(recur (calc-state)))
|
|
(if (or (solos c)
|
|
(and (empty? solos) (not (mutes c))))
|
|
(when (>! out v)
|
|
(recur state))
|
|
(recur state)))))
|
|
m))
|
|
|
|
(defn admix
|
|
"Adds ch as an input to the mix"
|
|
[mix ch]
|
|
(admix* mix ch))
|
|
|
|
(defn unmix
|
|
"Removes ch as an input to the mix"
|
|
[mix ch]
|
|
(unmix* mix ch))
|
|
|
|
(defn unmix-all
|
|
"removes all inputs from the mix"
|
|
[mix]
|
|
(unmix-all* mix))
|
|
|
|
(defn toggle
|
|
"Atomically sets the state(s) of one or more channels in a mix. The
|
|
state map is a map of channels -> channel-state-map. A
|
|
channel-state-map is a map of attrs -> boolean, where attr is one or
|
|
more of :mute, :pause or :solo. Any states supplied are merged with
|
|
the current state.
|
|
|
|
Note that channels can be added to a mix via toggle, which can be
|
|
used to add channels in a particular (e.g. paused) state."
|
|
[mix state-map]
|
|
(toggle* mix state-map))
|
|
|
|
(defn solo-mode
|
|
"Sets the solo mode of the mix. mode must be one of :mute or :pause"
|
|
[mix mode]
|
|
(solo-mode* mix mode))
|
|
|
|
|
|
(defprotocol Pub
|
|
(sub* [p v ch close?])
|
|
(unsub* [p v ch])
|
|
(unsub-all* [p] [p v]))
|
|
|
|
(defn pub
|
|
"Creates and returns a pub(lication) of the supplied channel,
|
|
partitioned into topics by the topic-fn. topic-fn will be applied to
|
|
each value on the channel and the result will determine the 'topic'
|
|
on which that value will be put. Channels can be subscribed to
|
|
receive copies of topics using 'sub', and unsubscribed using
|
|
'unsub'. Each topic will be handled by an internal mult on a
|
|
dedicated channel. By default these internal channels are
|
|
unbuffered, but a buf-fn can be supplied which, given a topic,
|
|
creates a buffer with desired properties.
|
|
|
|
Each item is distributed to all subs in parallel and synchronously,
|
|
i.e. each sub must accept before the next item is distributed. Use
|
|
buffering/windowing to prevent slow subs from holding up the pub.
|
|
|
|
Items received when there are no matching subs get dropped.
|
|
|
|
Note that if buf-fns are used then each topic is handled
|
|
asynchronously, i.e. if a channel is subscribed to more than one
|
|
topic it should not expect them to be interleaved identically with
|
|
the source."
|
|
([ch topic-fn] (pub ch topic-fn (constantly nil)))
|
|
([ch topic-fn buf-fn]
|
|
(let [mults (atom {}) ;;topic->mult
|
|
ensure-mult (fn [topic]
|
|
(or (get @mults topic)
|
|
(get (swap! mults
|
|
#(if (% topic) % (assoc % topic (mult (chan (buf-fn topic))))))
|
|
topic)))
|
|
p (reify
|
|
Mux
|
|
(muxch* [_] ch)
|
|
|
|
Pub
|
|
(sub* [p topic ch close?]
|
|
(let [m (ensure-mult topic)]
|
|
(tap m ch close?)))
|
|
(unsub* [p topic ch]
|
|
(when-let [m (get @mults topic)]
|
|
(untap m ch)))
|
|
(unsub-all* [_] (reset! mults {}))
|
|
(unsub-all* [_ topic] (swap! mults dissoc topic)))]
|
|
(go-loop []
|
|
(let [val (<! ch)]
|
|
(if (nil? val)
|
|
(doseq [m (vals @mults)]
|
|
(close! (muxch* m)))
|
|
(let [topic (topic-fn val)
|
|
m (get @mults topic)]
|
|
(when m
|
|
(when-not (>! (muxch* m) val)
|
|
(swap! mults dissoc topic)))
|
|
(recur)))))
|
|
p)))
|
|
|
|
(defn sub
|
|
"Subscribes a channel to a topic of a pub.
|
|
|
|
By default the channel will be closed when the source closes,
|
|
but can be determined by the close? parameter."
|
|
([p topic ch] (sub p topic ch true))
|
|
([p topic ch close?] (sub* p topic ch close?)))
|
|
|
|
(defn unsub
|
|
"Unsubscribes a channel from a topic of a pub"
|
|
[p topic ch]
|
|
(unsub* p topic ch))
|
|
|
|
(defn unsub-all
|
|
"Unsubscribes all channels from a pub, or a topic of a pub"
|
|
([p] (unsub-all* p))
|
|
([p topic] (unsub-all* p topic)))
|
|
|
|
|
|
;;;;
|
|
|
|
(defn map
|
|
"Takes a function and a collection of source channels, and returns a
|
|
channel which contains the values produced by applying f to the set
|
|
of first items taken from each source channel, followed by applying
|
|
f to the set of second items from each channel, until any one of the
|
|
channels is closed, at which point the output channel will be
|
|
closed. The returned channel will be unbuffered by default, or a
|
|
buf-or-n can be supplied"
|
|
([f chs] (map f chs nil))
|
|
([f chs buf-or-n]
|
|
(let [chs (vec chs)
|
|
out (chan buf-or-n)
|
|
cnt (count chs)
|
|
rets (object-array cnt)
|
|
dchan (chan 1)
|
|
dctr (atom nil)
|
|
done (mapv (fn [i]
|
|
(fn [ret]
|
|
(aset rets i ret)
|
|
(when (zero? (swap! dctr dec))
|
|
(put! dchan (.slice rets 0)))))
|
|
(range cnt))]
|
|
(go-loop []
|
|
(reset! dctr cnt)
|
|
(dotimes [i cnt]
|
|
(try
|
|
(take! (chs i) (done i))
|
|
(catch js/Object e
|
|
(swap! dctr dec))))
|
|
(let [rets (<! dchan)]
|
|
(if (some nil? rets)
|
|
(close! out)
|
|
(do (>! out (apply f rets))
|
|
(recur)))))
|
|
out)))
|
|
|
|
(defn merge
|
|
"Takes a collection of source channels and returns a channel which
|
|
contains all values taken from them. The returned channel will be
|
|
unbuffered by default, or a buf-or-n can be supplied. The channel
|
|
will close after all the source channels have closed."
|
|
([chs] (merge chs nil))
|
|
([chs buf-or-n]
|
|
(let [out (chan buf-or-n)]
|
|
(go-loop [cs (vec chs)]
|
|
(if (pos? (count cs))
|
|
(let [[v c] (alts! cs)]
|
|
(if (nil? v)
|
|
(recur (filterv #(not= c %) cs))
|
|
(do (>! out v)
|
|
(recur cs))))
|
|
(close! out)))
|
|
out)))
|
|
|
|
(defn into
|
|
"Returns a channel containing the single (collection) result of the
|
|
items taken from the channel conjoined to the supplied
|
|
collection. ch must close before into produces a result."
|
|
[coll ch]
|
|
(reduce conj coll ch))
|
|
|
|
(defn take
|
|
"Returns a channel that will return, at most, n items from ch. After n items
|
|
have been returned, or ch has been closed, the return chanel will close.
|
|
|
|
The output channel is unbuffered by default, unless buf-or-n is given."
|
|
([n ch]
|
|
(take n ch nil))
|
|
([n ch buf-or-n]
|
|
(let [out (chan buf-or-n)]
|
|
(go (loop [x 0]
|
|
(when (< x n)
|
|
(let [v (<! ch)]
|
|
(when (not (nil? v))
|
|
(>! out v)
|
|
(recur (inc x))))))
|
|
(close! out))
|
|
out)))
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; deprecated - do not use ;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
(defn map<
|
|
"Deprecated - this function will be removed. Use transducer instead"
|
|
[f ch]
|
|
(reify
|
|
impl/Channel
|
|
(close! [_] (impl/close! ch))
|
|
(closed? [_] (impl/closed? ch))
|
|
|
|
impl/ReadPort
|
|
(take! [_ fn1]
|
|
(let [ret
|
|
(impl/take! ch
|
|
(reify
|
|
impl/Handler
|
|
(active? [_] (impl/active? fn1))
|
|
(blockable? [_] true)
|
|
#_(lock-id [_] (impl/lock-id fn1))
|
|
(commit [_]
|
|
(let [f1 (impl/commit fn1)]
|
|
#(f1 (if (nil? %) nil (f %)))))))]
|
|
(if (and ret (not (nil? @ret)))
|
|
(channels/box (f @ret))
|
|
ret)))
|
|
|
|
impl/WritePort
|
|
(put! [_ val fn1] (impl/put! ch val fn1))))
|
|
|
|
(defn map>
|
|
"Deprecated - this function will be removed. Use transducer instead"
|
|
[f ch]
|
|
(reify
|
|
impl/Channel
|
|
(close! [_] (impl/close! ch))
|
|
|
|
impl/ReadPort
|
|
(take! [_ fn1] (impl/take! ch fn1))
|
|
|
|
impl/WritePort
|
|
(put! [_ val fn1]
|
|
(impl/put! ch (f val) fn1))))
|
|
|
|
(defn filter>
|
|
"Deprecated - this function will be removed. Use transducer instead"
|
|
[p ch]
|
|
(reify
|
|
impl/Channel
|
|
(close! [_] (impl/close! ch))
|
|
(closed? [_] (impl/closed? ch))
|
|
|
|
impl/ReadPort
|
|
(take! [_ fn1] (impl/take! ch fn1))
|
|
|
|
impl/WritePort
|
|
(put! [_ val fn1]
|
|
(if (p val)
|
|
(impl/put! ch val fn1)
|
|
(channels/box (not (impl/closed? ch)))))))
|
|
|
|
(defn remove>
|
|
"Deprecated - this function will be removed. Use transducer instead"
|
|
[p ch]
|
|
(filter> (complement p) ch))
|
|
|
|
(defn filter<
|
|
"Deprecated - this function will be removed. Use transducer instead"
|
|
([p ch] (filter< p ch nil))
|
|
([p ch buf-or-n]
|
|
(let [out (chan buf-or-n)]
|
|
(go-loop []
|
|
(let [val (<! ch)]
|
|
(if (nil? val)
|
|
(close! out)
|
|
(do (when (p val)
|
|
(>! out val))
|
|
(recur)))))
|
|
out)))
|
|
|
|
(defn remove<
|
|
"Deprecated - this function will be removed. Use transducer instead"
|
|
([p ch] (remove< p ch nil))
|
|
([p ch buf-or-n] (filter< (complement p) ch buf-or-n)))
|
|
|
|
(defn- mapcat* [f in out]
|
|
(go-loop []
|
|
(let [val (<! in)]
|
|
(if (nil? val)
|
|
(close! out)
|
|
(do (doseq [v (f val)]
|
|
(>! out v))
|
|
(when-not (impl/closed? out)
|
|
(recur)))))))
|
|
|
|
(defn mapcat<
|
|
"Deprecated - this function will be removed. Use transducer instead"
|
|
([f in] (mapcat< f in nil))
|
|
([f in buf-or-n]
|
|
(let [out (chan buf-or-n)]
|
|
(mapcat* f in out)
|
|
out)))
|
|
|
|
(defn mapcat>
|
|
"Deprecated - this function will be removed. Use transducer instead"
|
|
([f out] (mapcat> f out nil))
|
|
([f out buf-or-n]
|
|
(let [in (chan buf-or-n)]
|
|
(mapcat* f in out)
|
|
in)))
|
|
|
|
(defn unique
|
|
"Deprecated - this function will be removed. Use transducer instead"
|
|
([ch]
|
|
(unique ch nil))
|
|
([ch buf-or-n]
|
|
(let [out (chan buf-or-n)]
|
|
(go (loop [last nil]
|
|
(let [v (<! ch)]
|
|
(when (not (nil? v))
|
|
(if (= v last)
|
|
(recur last)
|
|
(do (>! out v)
|
|
(recur v))))))
|
|
(close! out))
|
|
out)))
|
|
|
|
(defn partition
|
|
"Deprecated - this function will be removed. Use transducer instead"
|
|
([n ch]
|
|
(partition n ch nil))
|
|
([n ch buf-or-n]
|
|
(let [out (chan buf-or-n)]
|
|
(go (loop [arr (make-array n)
|
|
idx 0]
|
|
(let [v (<! ch)]
|
|
(if (not (nil? v))
|
|
(do (aset ^objects arr idx v)
|
|
(let [new-idx (inc idx)]
|
|
(if (< new-idx n)
|
|
(recur arr new-idx)
|
|
(do (>! out (vec arr))
|
|
(recur (make-array n) 0)))))
|
|
(do (when (> idx 0)
|
|
(>! out (vec arr)))
|
|
(close! out))))))
|
|
out)))
|
|
|
|
|
|
(defn partition-by
|
|
"Deprecated - this function will be removed. Use transducer instead"
|
|
([f ch]
|
|
(partition-by f ch nil))
|
|
([f ch buf-or-n]
|
|
(let [out (chan buf-or-n)]
|
|
(go (loop [lst (make-array 0)
|
|
last ::nothing]
|
|
(let [v (<! ch)]
|
|
(if (not (nil? v))
|
|
(let [new-itm (f v)]
|
|
(if (or (= new-itm last)
|
|
(keyword-identical? last ::nothing))
|
|
(do (.push lst v)
|
|
(recur lst new-itm))
|
|
(do (>! out (vec lst))
|
|
(let [new-lst (make-array 0)]
|
|
(.push new-lst v)
|
|
(recur new-lst new-itm)))))
|
|
(do (when (> (alength lst) 0)
|
|
(>! out (vec lst)))
|
|
(close! out))))))
|
|
out)))
|