blob: c7cc903d99c4099aae2305e7ee12073dfae877af (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
(ns unheard.util.missionary
(:require [missionary.core :as m]
[clojure.set :as set])
(:import [missionary Cancelled]))
(defn differentiate
[zero subtract]
(let [state (object-array 1)]
(aset state 0 zero)
(fn [curr]
(let [prev (aget state 0)]
(aset state 0 curr)
(subtract curr prev)))))
(defn set-diffs
[x y]
(-> {}
(into (map (juxt identity (constantly true))) (set/difference x y))
(into (map (juxt identity (constantly false))) (set/difference y x))))
(comment
(set-diffs #{1} #{1 2 3}))
(defn set-events
[flow-of-sets]
(m/ap (m/?> (m/seed ((differentiate {} set-diffs) (m/?> flow-of-sets))))))
(defn reconcile-merge
"Read from interleaved, concurrent flows represented by
the input flow-of-sets-of-flows.
Given:
t0 #{flow-1}
t1 #{flow-1}
t2 #{flow-1 flow-2 flow-3}
t3 #{flow-2 flow-3}
t4 #{flow-2 flow-4}
Produces:
t0 flow-1 ;; flow 1 starts
t1 | ;;
t2 flow-1 flow-2 flow-3 ;; flows 2 and 3 start
t3 | flow-3 ;; flow 1 stops
t4 flow-2 flow-4 ;; flow 3 stops; flow 4 starts
t5 ;; flows 2 and 4 stop
Values merge together into a single flow."
;; Thanks to @leonoel
;; https://clojurians.slack.com/archives/CL85MBPEF/p1763756594982769
[flow-of-sets-of-flows]
(m/ap (let [[flow lifecycle]
(m/?> ##Inf
(m/group-by key (set-events flow-of-sets-of-flows)))]
(try (if (m/?< (m/eduction (map val) (take 2) lifecycle))
(m/?> flow)
(m/amb))
(catch Cancelled _ (m/amb))))))
|