(ns unheard.util.missionary (:require [missionary.core :as m] [clojure.set :as set]) (:import [missionary Cancelled])) (defn differentiate [zero subtract] (let [state (object-array 1)] (aset state 0 zero) (fn [curr] (let [prev (aget state 0)] (aset state 0 curr) (subtract curr prev))))) (defn set-diffs [x y] (-> {} (into (map (juxt identity (constantly true))) (set/difference x y)) (into (map (juxt identity (constantly false))) (set/difference y x)))) (comment (set-diffs #{1} #{1 2 3})) (defn set-events [flow-of-sets] (m/ap (m/?> (m/seed ((differentiate {} set-diffs) (m/?> flow-of-sets)))))) (defn reconcile-merge "Read from interleaved, concurrent flows represented by the input flow-of-sets-of-flows. Given: t0 #{flow-1} t1 #{flow-1} t2 #{flow-1 flow-2 flow-3} t3 #{flow-2 flow-3} t4 #{flow-2 flow-4} Produces: t0 flow-1 ;; flow 1 starts t1 | ;; t2 flow-1 flow-2 flow-3 ;; flows 2 and 3 start t3 | flow-3 ;; flow 1 stops t4 flow-2 flow-4 ;; flow 3 stops; flow 4 starts t5 ;; flows 2 and 4 stop Values merge together into a single flow." ;; Thanks to @leonoel ;; https://clojurians.slack.com/archives/CL85MBPEF/p1763756594982769 [flow-of-sets-of-flows] (m/ap (let [[flow lifecycle] (m/?> ##Inf (m/group-by key (set-events flow-of-sets-of-flows)))] (try (if (m/?< (m/eduction (map val) (take 2) lifecycle)) (m/?> flow) (m/amb)) (catch Cancelled _ (m/amb))))))