summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJake Zerrer <him@jakezerrer.com>2025-11-25 14:53:20 -0500
committerJake Zerrer <him@jakezerrer.com>2025-11-25 15:24:39 -0500
commit20ce2f424552a6124575c79c1ce5476addbaba95 (patch)
tree3230c1c7efd7a212cbcdb0b17b0dad1f622a69b1 /src
parent1a3fefa99bc49f0cbedf0318b802ac334a6be16b (diff)
Implement interval tree logic
Diffstat (limited to 'src')
-rw-r--r--src/unheard/instrument/util.clj33
-rw-r--r--src/unheard/midi.clj382
-rw-r--r--src/unheard/theory.clj53
-rw-r--r--src/unheard/time_object.clj22
-rw-r--r--src/unheard/util/missionary.clj17
5 files changed, 252 insertions, 255 deletions
diff --git a/src/unheard/instrument/util.clj b/src/unheard/instrument/util.clj
index 0faa5b5..7c99f8b 100644
--- a/src/unheard/instrument/util.clj
+++ b/src/unheard/instrument/util.clj
@@ -3,16 +3,16 @@
(:import [javax.sound.midi ShortMessage]))
(def matching-control-change
- (filter (fn [^ShortMessage m] (= (.getCommand m) ShortMessage/CONTROL_CHANGE))))
+ (filter (fn [^ShortMessage m]
+ (= (.getCommand m) ShortMessage/CONTROL_CHANGE))))
-(defn matching-channel [ch]
+(defn matching-channel
+ [ch]
(filter (fn [^ShortMessage m] (= (.getChannel m) ch))))
-(defn matching-data-1 [d1]
- (filter (fn [^ShortMessage m] (= (.getData1 m) d1))))
+(defn matching-data-1 [d1] (filter (fn [^ShortMessage m] (= (.getData1 m) d1))))
-(def get-data-2
- (map (fn [^ShortMessage m] (.getData2 m))))
+(def get-data-2 (map (fn [^ShortMessage m] (.getData2 m))))
(defn matching-control
"Returns a function filtering flow of ShortMessage `f` down to control
@@ -25,15 +25,12 @@
"
[init ch k]
(fn [f]
- ;; TODO: Should be signal
- (m/stream
- (m/reductions {} init
- (m/eduction
- (comp
- matching-control-change
- (matching-channel ch)
- (matching-data-1 k)
- get-data-2
- ;; TODO git-bug f109911
- (dedupe))
- f)))))
+ (m/stream (m/reductions {}
+ init
+ (m/eduction (comp matching-control-change
+ (matching-channel ch)
+ (matching-data-1 k)
+ get-data-2
+ ;; TODO git-bug f109911
+ (dedupe))
+ f)))))
diff --git a/src/unheard/midi.clj b/src/unheard/midi.clj
index 93fd6c0..f0debad 100644
--- a/src/unheard/midi.clj
+++ b/src/unheard/midi.clj
@@ -1,34 +1,41 @@
(ns unheard.midi
(:require [missionary.core :as m]
[taoensso.trove :as log])
- (:import [javax.sound.midi MidiSystem Receiver ShortMessage MidiDevice$Info MidiDevice Transmitter MidiMessage]
- [uk.co.xfactorylibrarians.coremidi4j CoreMidiDeviceProvider CoreMidiNotification]))
+ (:import [javax.sound.midi MidiSystem Receiver ShortMessage MidiDevice$Info
+ MidiDevice Transmitter MidiMessage]
+ [uk.co.xfactorylibrarians.coremidi4j CoreMidiDeviceProvider
+ CoreMidiNotification]))
-(defn get-all-midi-device-info []
- (CoreMidiDeviceProvider/getMidiDeviceInfo))
+(defn get-all-midi-device-info [] (CoreMidiDeviceProvider/getMidiDeviceInfo))
(def device-infos
"A publisher containing the latest result of MidiSystem#getMidiDeviceInfo."
(m/signal
- (m/cp
- (m/?<
- (m/ap
- ;; TODO: getMidiDeviceInfo could theoretically block
- ;; Move to m/blk
- (let [devices (atom (CoreMidiDeviceProvider/getMidiDeviceInfo))
- >devices (m/watch devices)
- notification-listener
- (reify CoreMidiNotification
- (midiSystemUpdated [_this]
- (reset! devices (CoreMidiDeviceProvider/getMidiDeviceInfo))))]
- (m/amb=
- (do
- (m/? (m/via m/blk (CoreMidiDeviceProvider/addNotificationListener notification-listener)))
- (m/amb))
- (m/?< >devices)
- (try (m/? m/never)
- (finally
- (m/? (m/via m/blk (CoreMidiDeviceProvider/removeNotificationListener notification-listener))))))))))))
+ (m/cp
+ (m/?<
+ (m/ap
+ ;; TODO: getMidiDeviceInfo could theoretically block. Move to
+ ;; m/blk
+ (let [devices (atom (CoreMidiDeviceProvider/getMidiDeviceInfo))
+ >devices (m/watch devices)
+ notification-listener
+ (reify
+ CoreMidiNotification
+ (midiSystemUpdated [_this]
+ (reset! devices
+ (CoreMidiDeviceProvider/getMidiDeviceInfo))))]
+ (m/amb=
+ (do (m/? (m/via m/blk
+ (CoreMidiDeviceProvider/addNotificationListener
+ notification-listener)))
+ (m/amb))
+ (m/?< >devices)
+ (try (m/? m/never)
+ (finally
+ (m/? (m/via
+ m/blk
+ (CoreMidiDeviceProvider/removeNotificationListener
+ notification-listener))))))))))))
;; Move to tools.repl
(defn print-all-midi-devices
@@ -48,28 +55,29 @@
(->> devices
(filter (fn [^MidiDevice$Info di]
(let [d (MidiSystem/getMidiDevice di)]
- (and
- (= device-name (.getName di))
- (or (not tx?) (= -1 (.getMaxTransmitters d)))
- (or (not rx?) (= -1 (.getMaxReceivers d)))))))
+ (and (= device-name (.getName di))
+ (or (not tx?) (= -1 (.getMaxTransmitters d)))
+ (or (not rx?) (= -1 (.getMaxReceivers d)))))))
(map #(MidiSystem/getMidiDevice %))))
;; TODO: git-bug d317eca
(defn with-device
"Open `device` and then run task returned by invoking `tfn` with `device` as its sole argument."
[^MidiDevice device tfn]
- (m/sp
- (try
- (m/? (m/via m/blk (.open device)))
- (log/log! {:level :info, :id :midi/device-opened, :data {:device (str device)}})
- (m/? (tfn device))
- (finally
- (log/log! {:level :info, :id :midi/closing-device})
- ;; NOTE:
- ;; Be careful, (.close device) will wait for (.send receiver ...) to return.
- ;; This can lead to deadlocks during cancellation.
- (m/? (m/via m/blk (.close device)))
- (log/log! {:level :info, :id :midi/device-closed, :data {:device (str device)}})))))
+ (m/sp (try (m/? (m/via m/blk (.open device)))
+ (log/log! {:level :info,
+ :id :midi/device-opened,
+ :data {:device (str device)}})
+ (m/? (tfn device))
+ (finally (log/log! {:level :info, :id :midi/closing-device})
+ ;; NOTE:
+ ;; Be careful, (.close device) will wait for (.send
+ ;; receiver ...) to return. This can lead to
+ ;; deadlocks during cancellation.
+ (m/? (m/via m/blk (.close device)))
+ (log/log! {:level :info,
+ :id :midi/device-closed,
+ :data {:device (str device)}})))))
(defn with-tx
"Feed a transmitter device (e.g. a MIDI keyboard) into a consumer `t`.
@@ -84,42 +92,41 @@
Returns a task."
[^MidiDevice device t]
(m/sp
- (let [^Transmitter transmitter
- (m/? (m/via m/blk (.getTransmitter device)))
- rv (m/mbx)
- receiver
- (reify Receiver
- (send [_this midi-message _timestamp]
- (log/log! {:level :debug, :id :midi/sending-message})
- ;; NOTE:
- ;; Be careful, (.close device) will wait for (.send receiver ...) to return.
- ;; This can lead to deadlocks during cancellation.
- ;;
- ;; TODO: git-bug a1652f9
- (rv midi-message)
- (log/log! {:level :debug, :id :midi/message-sent}))
- (close [_this]))]
- (log/log! {:level :debug, :id :midi/rx-object-defined, :data {:receiver (str receiver)}})
- (try
- (log/log! {:level :debug, :id :midi/setting-receiver})
- (m/? (m/via m/blk (.setReceiver transmitter receiver)))
- (log/log! {:level :debug, :id :midi/receiver-set})
- (m/?
- (t (m/stream
- (m/ap
- (loop []
- (m/amb
- (do
- (log/log! {:level :debug, :id :midi/tx-awaiting-value})
- (m/amb))
- (let [v (m/? rv)]
- (log/log! {:level :debug, :id :midi/tx-received-value, :data {:value (str v)}})
- v)
- (recur)))))))
- (finally
- (log/log! {:level :info, :id :midi/closing-tx})
- (m/? (m/via m/blk (.close transmitter)))
- (log/log! {:level :info, :id :midi/tx-closed}))))))
+ (let [^Transmitter transmitter (m/? (m/via m/blk (.getTransmitter device)))
+ rv (m/rdv)
+ receiver (reify
+ Receiver
+ (send [_this midi-message _timestamp]
+ (log/log! {:level :debug, :id :midi/sending-message})
+ ;; NOTE:
+ ;; Be careful, (.close device) will wait for
+ ;; (.send receiver ...) to return. This can lead
+ ;; to deadlocks during cancellation.
+ ;;
+ ;; TODO: git-bug a1652f9
+ (m/? (rv midi-message))
+ (log/log! {:level :debug, :id :midi/message-sent}))
+ (close [_this]))]
+ (log/log! {:level :debug,
+ :id :midi/rx-object-defined,
+ :data {:receiver (str receiver)}})
+ (try (log/log! {:level :debug, :id :midi/setting-receiver})
+ (m/? (m/via m/blk (.setReceiver transmitter receiver)))
+ (log/log! {:level :debug, :id :midi/receiver-set})
+ (m/? (t (m/stream
+ (m/ap (loop []
+ (m/amb (do (log/log! {:level :debug,
+ :id :midi/tx-awaiting-value})
+ (m/amb))
+ (let [v (m/? rv)]
+ (log/log! {:level :debug,
+ :id :midi/tx-received-value,
+ :data {:value (str v)}})
+ v)
+ (recur)))))))
+ (finally (log/log! {:level :info, :id :midi/closing-tx})
+ (m/? (m/via m/blk (.close transmitter)))
+ (log/log! {:level :info, :id :midi/tx-closed}))))))
(def UNSCHEDULED-EVENT -1)
@@ -135,39 +142,34 @@
Returns a task.
"
[^MidiDevice device f]
- (m/sp
- (let [^Receiver receiver
- (m/? (m/via m/blk (.getReceiver device)))]
- (log/log! {:level :info, :id :midi/receiver-mounted})
- (try
- (m/?
- (m/reduce {} nil
- (m/ap
- (let [^MidiMessage v (m/?< f)]
- (log/log! {:level :debug, :id :midi/rx-received-value, :data {:value (str v)}})
- (m/? (m/via m/blk (.send receiver v UNSCHEDULED-EVENT)))
- (log/log! {:level :debug, :id :midi/send-returned})))))
- (finally
- (log/log! {:level :info, :id :midi/closing-rx})
- (m/? (m/via m/blk (.close receiver)))
- (log/log! {:level :info, :id :midi/rx-closed}))))))
+ (m/sp (let [^Receiver receiver (m/? (m/via m/blk (.getReceiver device)))]
+ (log/log! {:level :info, :id :midi/receiver-mounted})
+ (try (m/? (m/reduce {}
+ nil
+ (m/ap (let [^MidiMessage v (m/?< f)]
+ (log/log! {:level :debug,
+ :id :midi/rx-received-value,
+ :data {:value (str v)}})
+ (m/? (m/via m/blk
+ (.send receiver v UNSCHEDULED-EVENT)))
+ (log/log! {:level :debug,
+ :id :midi/send-returned})))))
+ (finally (log/log! {:level :info, :id :midi/closing-rx})
+ (m/? (m/via m/blk (.close receiver)))
+ (log/log! {:level :info, :id :midi/rx-closed}))))))
(defn >bus
"Opens device named `name`.
Device will consume `flow`, a flow of Message objects."
[>name flow]
- (m/ap
- (let [device
- (first
- (select-devices (get-all-midi-device-info)
- (m/?< name) false true))]
- (if device
- (m/?
- (with-device device
- (fn [d]
- (with-rx d flow))))
- (m/amb)))))
+ (m/ap (let [device (first (select-devices (get-all-midi-device-info)
+ (m/?< name)
+ false
+ true))]
+ (if device
+ (m/? (with-device device (fn [d] (with-rx d flow))))
+ (m/amb)))))
(defn <bus
"Opens device named `name`.
@@ -176,62 +178,54 @@
`flow-handler` should return a flow."
[name flow-handler]
- (m/sp
- (let [device
- (first
- (select-devices (get-all-midi-device-info)
- name
- true false))]
- (if device
- (m/?
- (with-device device
- (fn [d]
- (with-tx d
- (fn [f]
- (m/reduce prn nil (flow-handler f)))))))
- (m/amb)))
- ))
+ (m/sp (let [device
+ (first
+ (select-devices (get-all-midi-device-info) name true false))]
+ (if device
+ (m/? (with-device
+ device
+ (fn [d]
+ (with-tx d (fn [f] (m/reduce prn nil (flow-handler f)))))))
+ (m/amb)))))
;; TODO: Move elsewhere
(defn echo
"Echo test."
[name from-ch to-ch]
(m/sp
- (let [rv (m/rdv)]
- (m/?
- (m/join vector
- (<bus name
- (fn [f]
- (m/ap
- (let [v (m/?< f)]
- (if (= (class v) ShortMessage)
- (let [v ^ShortMessage v]
- (if (and (= from-ch (.getChannel v))
- (#{ShortMessage/NOTE_ON ShortMessage/NOTE_OFF} (.getCommand v)))
- (let [new-msg (ShortMessage. (.getCommand v) to-ch
- (.getData1 v)
- (.getData2 v))]
- (m/? (rv new-msg)))
- (m/amb)))
- (m/amb)))
-
- (log/log! {:level :debug, :id :midi/value-sent}))))
- (>bus name
- (m/ap
- (m/amb=
- (m/? m/never)
- (loop []
- (log/log! {:level :debug, :id :midi/echo-rx-awaiting-value})
- (m/amb
- (m/? rv)
- (recur)))))))))))
+ (let [rv (m/rdv)]
+ (m/?
+ (m/join
+ vector
+ (<bus name
+ (fn [f]
+ (m/ap (let [v (m/?< f)]
+ (if (= (class v) ShortMessage)
+ (let [v ^ShortMessage v]
+ (if (and (= from-ch (.getChannel v))
+ (#{ShortMessage/NOTE_ON
+ ShortMessage/NOTE_OFF}
+ (.getCommand v)))
+ (let [new-msg (ShortMessage. (.getCommand v)
+ to-ch
+ (.getData1 v)
+ (.getData2 v))]
+ (m/? (rv new-msg)))
+ (m/amb)))
+ (m/amb)))
+ (log/log! {:level :debug, :id :midi/value-sent}))))
+ (>bus name
+ (m/ap (m/amb= (m/? m/never)
+ (loop []
+ (log/log! {:level :debug,
+ :id :midi/echo-rx-awaiting-value})
+ (m/amb (m/? rv) (recur)))))))))))
(defn controller
;; NOTE: The structure of `config` currently assumes a fairly specific
- ;; structure. It might be better for `config` to be a simple `kv` structure,
- ;; where `k` can be e.g. a tuple [:knob 1], a single value [:mod-wheel],
- ;; etc.
-
+ ;; structure. It might be better for `config` to be a simple `kv`
+ ;; structure, where `k` can be e.g. a tuple [:knob 1], a single value
+ ;; [:mod-wheel], etc.
"Given a flow `f` and a controller config `config`, return a map of
controller flows taking from `f`.
@@ -241,11 +235,10 @@
accepting a flow of ShortMessages as its sole argument, and returning
a flow of values associated with the control."
[f config]
-
(into {}
(map (fn [[group instance]]
{group (into {} (map (fn [[id flow]] {id (flow f)}) instance))})
- config)))
+ config)))
;; TODO git-bug c947320
(def short-message->notes
@@ -263,60 +256,61 @@
(and (= ShortMessage/CONTROL_CHANGE command)
(= 123 (.getData1 input))
(= 0 (.getData2 input)))
- (do
- (vreset! prev #{})
- (rf result #{}))
+ (do (vreset! prev #{}) (rf result #{}))
(= ShortMessage/NOTE_ON command)
- (let [prev-v @prev
- next (conj (into #{} prev-v) (.getData1 input))]
- (vreset! prev next)
- (rf result next))
+ (let [prev-v @prev
+ next (conj (into #{} prev-v) (.getData1 input))]
+ (vreset! prev next)
+ (rf result next))
(= ShortMessage/NOTE_OFF command)
- (let [prev-v @prev
- next (disj (into #{} prev-v) (.getData1 input))]
- (vreset! prev next)
- (rf result next))
- :else
- result)))))))
+ (let [prev-v @prev
+ next (disj (into #{} prev-v) (.getData1 input))]
+ (vreset! prev next)
+ (rf result next))
+ :else result)))))))
;; TODO: Move this logic into bus fn
(defn short-messages
[>device-name]
(m/stream
- (m/reductions {} nil
- (m/ap
- (let [device-name (m/?< >device-name)
- short-messages (atom nil)
- >short-messages (m/watch short-messages)]
- (m/amb=
- (do (reset! short-messages nil)
- (m/?
- (<bus device-name
- (fn [v]
- (m/ap
- (try (let [msg (m/?< v)]
- (reset! short-messages msg))
- (catch missionary.Cancelled c
- ;; When the upstream flow is cancelled, we emit "All notes off" to consumers
- (doseq [ch (range 0 16)]
- (reset! short-messages (ShortMessage. ShortMessage/CONTROL_CHANGE ch 123 0)))
- (throw c))))))))
- (if-let [m (m/?< >short-messages)]
- m
- (m/amb))))))))
-
-(defn notes [short-messages]
- (m/signal
- (m/cp
- (m/?<
- (m/ap
- (m/amb= #{}
- (m/?< (m/eduction short-message->notes short-messages))))))))
+ (m/reductions
+ {}
+ nil
+ (m/ap
+ (let [device-name (m/?< >device-name)
+ short-messages (atom nil)
+ >short-messages (m/watch short-messages)]
+ (m/amb= (do (reset! short-messages nil)
+ (m/? (<bus device-name
+ (fn [v]
+ (m/ap (try
+ (let [msg (m/?< v)]
+ (reset! short-messages msg))
+ (catch missionary.Cancelled c
+ ;; When the upstream flow is
+ ;; cancelled, we emit "All
+ ;; notes off" to consumers
+ (doseq [ch (range 0 16)]
+ (reset! short-messages
+ (ShortMessage.
+ ShortMessage/CONTROL_CHANGE
+ ch
+ 123
+ 0)))
+ (throw c))))))))
+ (if-let [m (m/?< >short-messages)]
+ m
+ (m/amb))))))))
+
+(defn notes
+ [short-messages]
+ (m/signal (m/ap (m/?< (m/ap (m/amb= #{}
+ (m/?< (m/eduction short-message->notes
+ short-messages))))))))
(comment
(def dn (atom "CoreMIDI4J - Minilab3 MIDI"))
(def >dn (m/watch dn))
-
(def cancel ((m/reduce prn nil (notes (short-messages >dn))) prn prn))
(reset! dn "CoreMIDI4J - IAC Bus")
(reset! dn nil)
diff --git a/src/unheard/theory.clj b/src/unheard/theory.clj
index 6ceff3e..e7ed9eb 100644
--- a/src/unheard/theory.clj
+++ b/src/unheard/theory.clj
@@ -1,39 +1,42 @@
(ns unheard.theory
(:require [missionary.core :as m]
- [unheard.time-object :refer [time-object]]
- [clojure.set :refer [union]]))
+ [unheard.time-object :refer
+ [time-object lift phrase timeline point-query]]
+ [clojure.set :refer [union]]
+ [unheard.util.missionary :refer [reconcile-merge]]))
(defn note
[>clock start duration >value]
- (time-object start
- duration
- [(gensym)
- (m/cp (let [[c v] (m/?< (m/latest vector >clock >value))]
- (if (<= start c (dec (+ start duration))) #{v} #{})))]))
+ (lift (time-object start
+ duration
+ (m/stream
+ (m/ap
+ (let [[c v] (m/?> (m/relieve
+ (m/latest vector >clock >value)))]
+ v))))))
-;; Reducing function that returns diffs :add v :remove v
-;; Reducing function that unfolds to values
+;; BUG: 2d7f861
+(defn read
+ [>clock timeline]
+ (m/relieve
+ (m/reductions {} nil
+ (m/eduction (map vals)
+ (m/reductions
+ (fn [acc {:keys [id state value]}]
+ (if (= :up state)
+ (assoc acc id value)
+ (dissoc acc id)))
+ {}
+ (reconcile-merge (point-query timeline >clock)))))))
(comment
- (require '[unheard.time-object :refer [lift phrase timeline point-query]])
(def c (atom 0))
(def >c (m/signal (m/watch c)))
(def v (atom 0))
- (def >v (m/watch v))
- (def n (note >c 4 8 >v))
- (def song (phrase (lift n)))
- (def t (timeline (song 0)))
- (def r (point-query t >c))
- (def r
- (m/ap (try (m/?<
- ;; TODO: Simplify
- (apply m/latest
- vector
- (point-query t >c)
- (vals (m/?< (m/eduction (map #(into {} %))
- (point-query t >c))))))
- (catch missionary.Cancelled _ (m/amb)))))
- (def cancel ((m/reduce prn nil r) prn prn))
+ (def >v (m/signal (m/watch v)))
+ (def song (phrase (note >c 4 8 >v) (note >c 6 6 >v)))
+ (def t (timeline song))
+ (def cancel ((m/reduce prn nil (read >c t)) prn prn))
(cancel)
(swap! c dec)
(swap! c inc)
diff --git a/src/unheard/time_object.clj b/src/unheard/time_object.clj
index d2e888f..eea950b 100644
--- a/src/unheard/time_object.clj
+++ b/src/unheard/time_object.clj
@@ -19,6 +19,7 @@
;; BUG c9be408
(defn phrase
+ ;; TODO: Description
[& children]
(fn [start]
{:start start,
@@ -27,19 +28,20 @@
(update time-object :start (partial + start)))}))
(comment
- (def a (phrase (lift (time-object 0 4 :x))))
+ (def a (phrase (lift (time-object 0 4 :x)) (lift (time-object 0 4 :a))))
(def b
(phrase (a 0)
(a 1)
- (lift (time-object 10 2 :x))
+ (lift (time-object 10 2 :x) (time-object 10 2 :b))
(lift (time-object 0 2 :y))))
(def c (phrase (b 0) (b 3)))
(c 0))
(defn timeline
"Primary timeline bookkeeping mehanism."
- [{:keys [time-objects]}]
- (let [m imap/empty]
+ [phrase]
+ (let [{:keys [time-objects]} (phrase 0)
+ m imap/empty]
(if (seq? time-objects)
(loop [time-objects time-objects
m m]
@@ -50,18 +52,20 @@
m)))
(comment
- (def t (timeline (a 0)))
- (def t (timeline (c 0)))
- (get t 2))
+ (def t (timeline a))
+ (def t (timeline c))
+ (get t 1))
(defn point-query
"Query a timeline. Returns a flow of time objects."
[timeline >at]
- (m/ap (let [at (m/?< >at)] (get timeline at))))
+ (m/stream (m/ap (let [at (m/?< >at)] (get timeline at)))))
+
(comment
(def at (atom 0))
(def >at (m/watch at))
(def cancel ((m/reduce prn nil (point-query t >at)) prn prn))
- (reset! at 0)
+ (reset! at 1)
+ (reset! at 14)
(cancel))
diff --git a/src/unheard/util/missionary.clj b/src/unheard/util/missionary.clj
index c7cc903..3d4a548 100644
--- a/src/unheard/util/missionary.clj
+++ b/src/unheard/util/missionary.clj
@@ -5,6 +5,7 @@
(defn differentiate
[zero subtract]
+ ;; Note that this is a stateful function
(let [state (object-array 1)]
(aset state 0 zero)
(fn [curr]
@@ -18,9 +19,6 @@
(into (map (juxt identity (constantly true))) (set/difference x y))
(into (map (juxt identity (constantly false))) (set/difference y x))))
-(comment
- (set-diffs #{1} #{1 2 3}))
-
(defn set-events
[flow-of-sets]
(m/ap (m/?> (m/seed ((differentiate {} set-diffs) (m/?> flow-of-sets))))))
@@ -45,13 +43,14 @@
t5 ;; flows 2 and 4 stop
Values merge together into a single flow."
- ;; Thanks to @leonoel
+ ;; Thanks to @leonoel for inspiration
;; https://clojurians.slack.com/archives/CL85MBPEF/p1763756594982769
[flow-of-sets-of-flows]
(m/ap (let [[flow lifecycle]
- (m/?> ##Inf
- (m/group-by key (set-events flow-of-sets-of-flows)))]
- (try (if (m/?< (m/eduction (map val) (take 2) lifecycle))
- (m/?> flow)
- (m/amb))
+ (m/?> ##Inf (m/group-by key (set-events flow-of-sets-of-flows)))
+ id (Object.)]
+ (try (let [v (m/?< (m/eduction (map val) (take 2) lifecycle))]
+ (if v
+ {:id id, :state :up, :value (m/?> flow)}
+ {:id id, :state :down}))
(catch Cancelled _ (m/amb))))))