summaryrefslogtreecommitdiff
path: root/src/unheard/util/missionary.clj
diff options
context:
space:
mode:
Diffstat (limited to 'src/unheard/util/missionary.clj')
-rw-r--r--src/unheard/util/missionary.clj17
1 files changed, 8 insertions, 9 deletions
diff --git a/src/unheard/util/missionary.clj b/src/unheard/util/missionary.clj
index c7cc903..3d4a548 100644
--- a/src/unheard/util/missionary.clj
+++ b/src/unheard/util/missionary.clj
@@ -5,6 +5,7 @@
(defn differentiate
[zero subtract]
+ ;; Note that this is a stateful function
(let [state (object-array 1)]
(aset state 0 zero)
(fn [curr]
@@ -18,9 +19,6 @@
(into (map (juxt identity (constantly true))) (set/difference x y))
(into (map (juxt identity (constantly false))) (set/difference y x))))
-(comment
- (set-diffs #{1} #{1 2 3}))
-
(defn set-events
[flow-of-sets]
(m/ap (m/?> (m/seed ((differentiate {} set-diffs) (m/?> flow-of-sets))))))
@@ -45,13 +43,14 @@
t5 ;; flows 2 and 4 stop
Values merge together into a single flow."
- ;; Thanks to @leonoel
+ ;; Thanks to @leonoel for inspiration
;; https://clojurians.slack.com/archives/CL85MBPEF/p1763756594982769
[flow-of-sets-of-flows]
(m/ap (let [[flow lifecycle]
- (m/?> ##Inf
- (m/group-by key (set-events flow-of-sets-of-flows)))]
- (try (if (m/?< (m/eduction (map val) (take 2) lifecycle))
- (m/?> flow)
- (m/amb))
+ (m/?> ##Inf (m/group-by key (set-events flow-of-sets-of-flows)))
+ id (Object.)]
+ (try (let [v (m/?< (m/eduction (map val) (take 2) lifecycle))]
+ (if v
+ {:id id, :state :up, :value (m/?> flow)}
+ {:id id, :state :down}))
(catch Cancelled _ (m/amb))))))