diff options
| author | Jake Zerrer <him@jakezerrer.com> | 2025-11-24 12:10:42 -0500 |
|---|---|---|
| committer | Jake Zerrer <him@jakezerrer.com> | 2025-11-25 14:11:43 -0500 |
| commit | 1a3fefa99bc49f0cbedf0318b802ac334a6be16b (patch) | |
| tree | 264137f05ee8f4b6904d142fe7f1a159e9761274 | |
| parent | 1f566b3c8af9bcdcc6206a0b3279a90ad73b1702 (diff) | |
Create reconcile-merge missionary utility
| -rw-r--r-- | src/unheard/util/missionary.clj | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/src/unheard/util/missionary.clj b/src/unheard/util/missionary.clj new file mode 100644 index 0000000..c7cc903 --- /dev/null +++ b/src/unheard/util/missionary.clj @@ -0,0 +1,57 @@ +(ns unheard.util.missionary + (:require [missionary.core :as m] + [clojure.set :as set]) + (:import [missionary Cancelled])) + +(defn differentiate + [zero subtract] + (let [state (object-array 1)] + (aset state 0 zero) + (fn [curr] + (let [prev (aget state 0)] + (aset state 0 curr) + (subtract curr prev))))) + +(defn set-diffs + [x y] + (-> {} + (into (map (juxt identity (constantly true))) (set/difference x y)) + (into (map (juxt identity (constantly false))) (set/difference y x)))) + +(comment + (set-diffs #{1} #{1 2 3})) + +(defn set-events + [flow-of-sets] + (m/ap (m/?> (m/seed ((differentiate {} set-diffs) (m/?> flow-of-sets)))))) + +(defn reconcile-merge + "Read from interleaved, concurrent flows represented by + the input flow-of-sets-of-flows. + + Given: + t0 #{flow-1} + t1 #{flow-1} + t2 #{flow-1 flow-2 flow-3} + t3 #{flow-2 flow-3} + t4 #{flow-2 flow-4} + + Produces: + t0 flow-1 ;; flow 1 starts + t1 | ;; + t2 flow-1 flow-2 flow-3 ;; flows 2 and 3 start + t3 | flow-3 ;; flow 1 stops + t4 flow-2 flow-4 ;; flow 3 stops; flow 4 starts + t5 ;; flows 2 and 4 stop + + Values merge together into a single flow." + ;; Thanks to @leonoel + ;; https://clojurians.slack.com/archives/CL85MBPEF/p1763756594982769 + [flow-of-sets-of-flows] + (m/ap (let [[flow lifecycle] + (m/?> ##Inf + (m/group-by key (set-events flow-of-sets-of-flows)))] + (try (if (m/?< (m/eduction (map val) (take 2) lifecycle)) + (m/?> flow) + (m/amb)) + (catch Cancelled _ (m/amb)))))) |
