summaryrefslogtreecommitdiff
path: root/src/unheard/util/missionary.clj
blob: 3d4a548333eb4fd0a21833290068a2651314caa2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
(ns unheard.util.missionary
  (:require [missionary.core :as m]
            [clojure.set :as set])
  (:import [missionary Cancelled]))

(defn differentiate
  [zero subtract]
  ;; Note that this is a stateful function
  (let [state (object-array 1)]
    (aset state 0 zero)
    (fn [curr]
      (let [prev (aget state 0)]
        (aset state 0 curr)
        (subtract curr prev)))))

(defn set-diffs
  [x y]
  (-> {}
      (into (map (juxt identity (constantly true))) (set/difference x y))
      (into (map (juxt identity (constantly false))) (set/difference y x))))

(defn set-events
  [flow-of-sets]
  (m/ap (m/?> (m/seed ((differentiate {} set-diffs) (m/?> flow-of-sets))))))

(defn reconcile-merge
  "Read from interleaved, concurrent flows represented by
  the input flow-of-sets-of-flows.

  Given:
  t0 #{flow-1}
  t1 #{flow-1}
  t2 #{flow-1 flow-2 flow-3}
  t3 #{flow-2 flow-3}
  t4 #{flow-2 flow-4}

  Produces:
  t0 flow-1                      ;; flow 1 starts
  t1   |                         ;; 
  t2 flow-1 flow-2 flow-3        ;; flows 2 and 3 start
  t3          |    flow-3        ;; flow 1 stops
  t4        flow-2        flow-4 ;; flow 3 stops; flow 4 starts
  t5                             ;; flows 2 and 4 stop

  Values merge together into a single flow."
  ;; Thanks to @leonoel for inspiration
  ;; https://clojurians.slack.com/archives/CL85MBPEF/p1763756594982769
  [flow-of-sets-of-flows]
  (m/ap (let [[flow lifecycle]
                (m/?> ##Inf (m/group-by key (set-events flow-of-sets-of-flows)))
              id (Object.)]
          (try (let [v (m/?< (m/eduction (map val) (take 2) lifecycle))]
                 (if v
                   {:id id, :state :up, :value (m/?> flow)}
                   {:id id, :state :down}))
               (catch Cancelled _ (m/amb))))))