(ns unheard.util.missionary (:require [missionary.core :as m] [clojure.set :as set]) (:import [missionary Cancelled])) (defn differentiate [zero subtract] ;; Note that this is a stateful function (let [state (object-array 1)] (aset state 0 zero) (fn [curr] (let [prev (aget state 0)] (aset state 0 curr) (subtract curr prev))))) (defn set-diffs [x y] (-> {} (into (map (juxt identity (constantly true))) (set/difference x y)) (into (map (juxt identity (constantly false))) (set/difference y x)))) (defn set-events [flow-of-sets] (m/ap (m/?> (m/seed ((differentiate {} set-diffs) (m/?> flow-of-sets)))))) (defn reconcile-merge "Read from interleaved, concurrent flows represented by the input flow-of-sets-of-flows. Given: t0 #{flow-1} t1 #{flow-1} t2 #{flow-1 flow-2 flow-3} t3 #{flow-2 flow-3} t4 #{flow-2 flow-4} Produces: t0 flow-1 ;; flow 1 starts t1 | ;; t2 flow-1 flow-2 flow-3 ;; flows 2 and 3 start t3 | flow-3 ;; flow 1 stops t4 flow-2 flow-4 ;; flow 3 stops; flow 4 starts t5 ;; flows 2 and 4 stop Values merge together into a single flow." ;; Thanks to @leonoel for inspiration ;; https://clojurians.slack.com/archives/CL85MBPEF/p1763756594982769 [flow-of-sets-of-flows] (m/ap (let [[flow lifecycle] (m/?> ##Inf (m/group-by key (set-events flow-of-sets-of-flows))) id (Object.)] (try (let [v (m/?< (m/eduction (map val) (take 2) lifecycle))] (if v {:id id, :state :up, :value (m/?> flow)} {:id id, :state :down})) (catch Cancelled _ (m/amb))))))