summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJake Zerrer <him@jakezerrer.com>2025-11-24 12:10:42 -0500
committerJake Zerrer <him@jakezerrer.com>2025-11-25 14:11:43 -0500
commit1a3fefa99bc49f0cbedf0318b802ac334a6be16b (patch)
tree264137f05ee8f4b6904d142fe7f1a159e9761274 /src
parent1f566b3c8af9bcdcc6206a0b3279a90ad73b1702 (diff)
Create reconcile-merge missionary utility
Diffstat (limited to 'src')
-rw-r--r--src/unheard/util/missionary.clj57
1 files changed, 57 insertions, 0 deletions
diff --git a/src/unheard/util/missionary.clj b/src/unheard/util/missionary.clj
new file mode 100644
index 0000000..c7cc903
--- /dev/null
+++ b/src/unheard/util/missionary.clj
@@ -0,0 +1,57 @@
+(ns unheard.util.missionary
+ (:require [missionary.core :as m]
+ [clojure.set :as set])
+ (:import [missionary Cancelled]))
+
+(defn differentiate
+ [zero subtract]
+ (let [state (object-array 1)]
+ (aset state 0 zero)
+ (fn [curr]
+ (let [prev (aget state 0)]
+ (aset state 0 curr)
+ (subtract curr prev)))))
+
+(defn set-diffs
+ [x y]
+ (-> {}
+ (into (map (juxt identity (constantly true))) (set/difference x y))
+ (into (map (juxt identity (constantly false))) (set/difference y x))))
+
+(comment
+ (set-diffs #{1} #{1 2 3}))
+
+(defn set-events
+ [flow-of-sets]
+ (m/ap (m/?> (m/seed ((differentiate {} set-diffs) (m/?> flow-of-sets))))))
+
+(defn reconcile-merge
+ "Read from interleaved, concurrent flows represented by
+ the input flow-of-sets-of-flows.
+
+ Given:
+ t0 #{flow-1}
+ t1 #{flow-1}
+ t2 #{flow-1 flow-2 flow-3}
+ t3 #{flow-2 flow-3}
+ t4 #{flow-2 flow-4}
+
+ Produces:
+ t0 flow-1 ;; flow 1 starts
+ t1 | ;;
+ t2 flow-1 flow-2 flow-3 ;; flows 2 and 3 start
+ t3 | flow-3 ;; flow 1 stops
+ t4 flow-2 flow-4 ;; flow 3 stops; flow 4 starts
+ t5 ;; flows 2 and 4 stop
+
+ Values merge together into a single flow."
+ ;; Thanks to @leonoel
+ ;; https://clojurians.slack.com/archives/CL85MBPEF/p1763756594982769
+ [flow-of-sets-of-flows]
+ (m/ap (let [[flow lifecycle]
+ (m/?> ##Inf
+ (m/group-by key (set-events flow-of-sets-of-flows)))]
+ (try (if (m/?< (m/eduction (map val) (take 2) lifecycle))
+ (m/?> flow)
+ (m/amb))
+ (catch Cancelled _ (m/amb))))))