hydro_lang/keyed_stream.rs
1use std::hash::Hash;
2use std::marker::PhantomData;
3
4use stageleft::{IntoQuotedMut, QuotedWithContext, q};
5
6use crate::cycle::{CycleCollection, CycleComplete, ForwardRefMarker};
7use crate::ir::HydroNode;
8use crate::keyed_singleton::KeyedSingleton;
9use crate::location::tick::NoAtomic;
10use crate::location::{LocationId, NoTick, check_matching_location};
11use crate::manual_expr::ManualExpr;
12use crate::stream::{ExactlyOnce, MinRetries};
13use crate::unsafety::NonDet;
14use crate::*;
15
16/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`,
17/// where the order of keys is non-deterministic but the order *within* each group may
18/// be deterministic.
19///
20/// Type Parameters:
21/// - `K`: the type of the key for each group
22/// - `V`: the type of the elements inside each group
23/// - `Loc`: the [`Location`] where the keyed stream is materialized
24/// - `Order`: tracks whether the elements within each group have deterministic order
25/// ([`TotalOrder`]) or not ([`NoOrder`])
26/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
27/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::stream::AtLeastOnce`])
28pub struct KeyedStream<K, V, Loc, Bound, Order = TotalOrder, Retries = ExactlyOnce> {
29 pub(crate) underlying: Stream<(K, V), Loc, Bound, NoOrder, Retries>,
30 pub(crate) _phantom_order: PhantomData<Order>,
31}
32
33impl<'a, K, V, L, B, R> From<KeyedStream<K, V, L, B, TotalOrder, R>>
34 for KeyedStream<K, V, L, B, NoOrder, R>
35where
36 L: Location<'a>,
37{
38 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
39 KeyedStream {
40 underlying: stream.underlying,
41 _phantom_order: Default::default(),
42 }
43 }
44}
45
46impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound, Order, Retries> Clone
47 for KeyedStream<K, V, Loc, Bound, Order, Retries>
48{
49 fn clone(&self) -> Self {
50 KeyedStream {
51 underlying: self.underlying.clone(),
52 _phantom_order: PhantomData,
53 }
54 }
55}
56
57impl<'a, K, V, L, B, O, R> CycleCollection<'a, ForwardRefMarker> for KeyedStream<K, V, L, B, O, R>
58where
59 L: Location<'a> + NoTick,
60{
61 type Location = L;
62
63 fn create_source(ident: syn::Ident, location: L) -> Self {
64 Stream::create_source(ident, location).into_keyed()
65 }
66}
67
68impl<'a, K, V, L, B, O, R> CycleComplete<'a, ForwardRefMarker> for KeyedStream<K, V, L, B, O, R>
69where
70 L: Location<'a> + NoTick,
71{
72 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
73 self.underlying.complete(ident, expected_location);
74 }
75}
76
77impl<'a, K, V, L: Location<'a>, B, O, R> KeyedStream<K, V, L, B, O, R> {
78 /// Explicitly "casts" the keyed stream to a type with a different ordering
79 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
80 /// by the type-system.
81 ///
82 /// # Non-Determinism
83 /// This function is used as an escape hatch, and any mistakes in the
84 /// provided ordering guarantee will propagate into the guarantees
85 /// for the rest of the program.
86 pub fn assume_ordering<O2>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
87 KeyedStream {
88 underlying: self.underlying,
89 _phantom_order: PhantomData,
90 }
91 }
92
93 /// Explicitly "casts" the keyed stream to a type with a different retries
94 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
95 /// be proven by the type-system.
96 ///
97 /// # Non-Determinism
98 /// This function is used as an escape hatch, and any mistakes in the
99 /// provided retries guarantee will propagate into the guarantees
100 /// for the rest of the program.
101 pub fn assume_retries<R2>(self, nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
102 KeyedStream {
103 underlying: self.underlying.assume_retries::<R2>(nondet),
104 _phantom_order: PhantomData,
105 }
106 }
107
108 /// Flattens the keyed stream into a single stream of key-value pairs, with non-deterministic
109 /// element ordering.
110 ///
111 /// # Example
112 /// ```rust
113 /// # use hydro_lang::*;
114 /// # use futures::StreamExt;
115 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
116 /// process
117 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
118 /// .into_keyed()
119 /// .entries()
120 /// # }, |mut stream| async move {
121 /// // (1, 2), (1, 3), (2, 4) in any order
122 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
123 /// # assert_eq!(stream.next().await.unwrap(), w);
124 /// # }
125 /// # }));
126 /// ```
127 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
128 self.underlying
129 }
130
131 /// Flattens the keyed stream into a single stream of only the values, with non-deterministic
132 /// element ordering.
133 ///
134 /// # Example
135 /// ```rust
136 /// # use hydro_lang::*;
137 /// # use futures::StreamExt;
138 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
139 /// process
140 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
141 /// .into_keyed()
142 /// .values()
143 /// # }, |mut stream| async move {
144 /// // 2, 3, 4 in any order
145 /// # for w in vec![2, 3, 4] {
146 /// # assert_eq!(stream.next().await.unwrap(), w);
147 /// # }
148 /// # }));
149 /// ```
150 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
151 self.underlying.map(q!(|(_, v)| v))
152 }
153
154 /// Transforms each value by invoking `f` on each element, with keys staying the same
155 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
156 ///
157 /// If you do not want to modify the stream and instead only want to view
158 /// each item use [`KeyedStream::inspect`] instead.
159 ///
160 /// # Example
161 /// ```rust
162 /// # use hydro_lang::*;
163 /// # use futures::StreamExt;
164 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
165 /// process
166 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
167 /// .into_keyed()
168 /// .map(q!(|v| v + 1))
169 /// # .entries()
170 /// # }, |mut stream| async move {
171 /// // { 1: [3, 4], 2: [5] }
172 /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
173 /// # assert_eq!(stream.next().await.unwrap(), w);
174 /// # }
175 /// # }));
176 /// ```
177 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
178 where
179 F: Fn(V) -> U + 'a,
180 {
181 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
182 KeyedStream {
183 underlying: self.underlying.map(q!({
184 let orig = f;
185 move |(k, v)| (k, orig(v))
186 })),
187 _phantom_order: Default::default(),
188 }
189 }
190
191 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
192 /// re-grouped even they are tuples; instead they will be grouped under the original key.
193 ///
194 /// If you do not want to modify the stream and instead only want to view
195 /// each item use [`KeyedStream::inspect_with_key`] instead.
196 ///
197 /// # Example
198 /// ```rust
199 /// # use hydro_lang::*;
200 /// # use futures::StreamExt;
201 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
202 /// process
203 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
204 /// .into_keyed()
205 /// .map_with_key(q!(|(k, v)| k + v))
206 /// # .entries()
207 /// # }, |mut stream| async move {
208 /// // { 1: [3, 4], 2: [6] }
209 /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
210 /// # assert_eq!(stream.next().await.unwrap(), w);
211 /// # }
212 /// # }));
213 /// ```
214 pub fn map_with_key<U, F>(
215 self,
216 f: impl IntoQuotedMut<'a, F, L> + Copy,
217 ) -> KeyedStream<K, U, L, B, O, R>
218 where
219 F: Fn((K, V)) -> U + 'a,
220 K: Clone,
221 {
222 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
223 KeyedStream {
224 underlying: self.underlying.map(q!({
225 let orig = f;
226 move |(k, v)| {
227 let out = orig((k.clone(), v));
228 (k, out)
229 }
230 })),
231 _phantom_order: Default::default(),
232 }
233 }
234
235 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
236 /// `f`, preserving the order of the elements within the group.
237 ///
238 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
239 /// not modify or take ownership of the values. If you need to modify the values while filtering
240 /// use [`KeyedStream::filter_map`] instead.
241 ///
242 /// # Example
243 /// ```rust
244 /// # use hydro_lang::*;
245 /// # use futures::StreamExt;
246 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
247 /// process
248 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
249 /// .into_keyed()
250 /// .filter(q!(|&x| x > 2))
251 /// # .entries()
252 /// # }, |mut stream| async move {
253 /// // { 1: [3], 2: [4] }
254 /// # for w in vec![(1, 3), (2, 4)] {
255 /// # assert_eq!(stream.next().await.unwrap(), w);
256 /// # }
257 /// # }));
258 /// ```
259 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
260 where
261 F: Fn(&V) -> bool + 'a,
262 {
263 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
264 KeyedStream {
265 underlying: self.underlying.filter(q!({
266 let orig = f;
267 move |(_k, v)| orig(v)
268 })),
269 _phantom_order: Default::default(),
270 }
271 }
272
273 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
274 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
275 ///
276 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
277 /// not modify or take ownership of the values. If you need to modify the values while filtering
278 /// use [`KeyedStream::filter_map_with_key`] instead.
279 ///
280 /// # Example
281 /// ```rust
282 /// # use hydro_lang::*;
283 /// # use futures::StreamExt;
284 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
285 /// process
286 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
287 /// .into_keyed()
288 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
289 /// # .entries()
290 /// # }, |mut stream| async move {
291 /// // { 1: [3], 2: [4] }
292 /// # for w in vec![(1, 3), (2, 4)] {
293 /// # assert_eq!(stream.next().await.unwrap(), w);
294 /// # }
295 /// # }));
296 /// ```
297 pub fn filter_with_key<F>(
298 self,
299 f: impl IntoQuotedMut<'a, F, L> + Copy,
300 ) -> KeyedStream<K, V, L, B, O, R>
301 where
302 F: Fn(&(K, V)) -> bool + 'a,
303 {
304 KeyedStream {
305 underlying: self.underlying.filter(f),
306 _phantom_order: Default::default(),
307 }
308 }
309
310 /// An operator that both filters and maps each value, with keys staying the same.
311 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
312 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
313 ///
314 /// # Example
315 /// ```rust
316 /// # use hydro_lang::*;
317 /// # use futures::StreamExt;
318 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
319 /// process
320 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
321 /// .into_keyed()
322 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
323 /// # .entries()
324 /// # }, |mut stream| async move {
325 /// // { 1: [2], 2: [4] }
326 /// # for w in vec![(1, 2), (2, 4)] {
327 /// # assert_eq!(stream.next().await.unwrap(), w);
328 /// # }
329 /// # }));
330 /// ```
331 pub fn filter_map<U, F>(
332 self,
333 f: impl IntoQuotedMut<'a, F, L> + Copy,
334 ) -> KeyedStream<K, U, L, B, O, R>
335 where
336 F: Fn(V) -> Option<U> + 'a,
337 {
338 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
339 KeyedStream {
340 underlying: self.underlying.filter_map(q!({
341 let orig = f;
342 move |(k, v)| orig(v).map(|o| (k, o))
343 })),
344 _phantom_order: Default::default(),
345 }
346 }
347
348 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
349 /// re-grouped even they are tuples; instead they will be grouped under the original key.
350 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
351 ///
352 /// # Example
353 /// ```rust
354 /// # use hydro_lang::*;
355 /// # use futures::StreamExt;
356 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
357 /// process
358 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
359 /// .into_keyed()
360 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
361 /// # .entries()
362 /// # }, |mut stream| async move {
363 /// // { 2: [2] }
364 /// # for w in vec![(2, 2)] {
365 /// # assert_eq!(stream.next().await.unwrap(), w);
366 /// # }
367 /// # }));
368 /// ```
369 pub fn filter_map_with_key<U, F>(
370 self,
371 f: impl IntoQuotedMut<'a, F, L> + Copy,
372 ) -> KeyedStream<K, U, L, B, O, R>
373 where
374 F: Fn((K, V)) -> Option<U> + 'a,
375 K: Clone,
376 {
377 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
378 KeyedStream {
379 underlying: self.underlying.filter_map(q!({
380 let orig = f;
381 move |(k, v)| {
382 let out = orig((k.clone(), v));
383 out.map(|o| (k, o))
384 }
385 })),
386 _phantom_order: Default::default(),
387 }
388 }
389
390 /// An operator which allows you to "inspect" each element of a stream without
391 /// modifying it. The closure `f` is called on a reference to each value. This is
392 /// mainly useful for debugging, and should not be used to generate side-effects.
393 ///
394 /// # Example
395 /// ```rust
396 /// # use hydro_lang::*;
397 /// # use futures::StreamExt;
398 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
399 /// process
400 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
401 /// .into_keyed()
402 /// .inspect(q!(|v| println!("{}", v)))
403 /// # .entries()
404 /// # }, |mut stream| async move {
405 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
406 /// # assert_eq!(stream.next().await.unwrap(), w);
407 /// # }
408 /// # }));
409 /// ```
410 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
411 where
412 F: Fn(&V) + 'a,
413 {
414 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
415 KeyedStream {
416 underlying: self.underlying.inspect(q!({
417 let orig = f;
418 move |(_k, v)| orig(v)
419 })),
420 _phantom_order: Default::default(),
421 }
422 }
423
424 /// An operator which allows you to "inspect" each element of a stream without
425 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
426 /// mainly useful for debugging, and should not be used to generate side-effects.
427 ///
428 /// # Example
429 /// ```rust
430 /// # use hydro_lang::*;
431 /// # use futures::StreamExt;
432 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
433 /// process
434 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
435 /// .into_keyed()
436 /// .inspect(q!(|v| println!("{}", v)))
437 /// # .entries()
438 /// # }, |mut stream| async move {
439 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
440 /// # assert_eq!(stream.next().await.unwrap(), w);
441 /// # }
442 /// # }));
443 /// ```
444 pub fn inspect_with_key<F>(
445 self,
446 f: impl IntoQuotedMut<'a, F, L>,
447 ) -> KeyedStream<K, V, L, B, O, R>
448 where
449 F: Fn(&(K, V)) + 'a,
450 {
451 KeyedStream {
452 underlying: self.underlying.inspect(f),
453 _phantom_order: Default::default(),
454 }
455 }
456
457 /// An operator which allows you to "name" a `HydroNode`.
458 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
459 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
460 {
461 let mut node = self.underlying.ir_node.borrow_mut();
462 let metadata = node.metadata_mut();
463 metadata.tag = Some(name.to_string());
464 }
465 self
466 }
467}
468
469impl<'a, K, V, L: Location<'a> + NoTick + NoAtomic, O, R> KeyedStream<K, V, L, Unbounded, O, R> {
470 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
471 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
472 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
473 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
474 ///
475 /// Currently, both input streams must be [`Unbounded`].
476 ///
477 /// # Example
478 /// ```rust
479 /// # use hydro_lang::*;
480 /// # use futures::StreamExt;
481 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
482 /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
483 /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
484 /// numbers1.interleave(numbers2)
485 /// # .entries()
486 /// # }, |mut stream| async move {
487 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
488 /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
489 /// # assert_eq!(stream.next().await.unwrap(), w);
490 /// # }
491 /// # }));
492 /// ```
493 pub fn interleave<O2, R2: MinRetries<R>>(
494 self,
495 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
496 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, R::Min>
497 where
498 R: MinRetries<R2, Min = R2::Min>,
499 {
500 self.entries().interleave(other.entries()).into_keyed()
501 }
502}
503
504impl<'a, K, V, L, B> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
505where
506 K: Eq + Hash,
507 L: Location<'a>,
508{
509 /// A special case of [`Stream::scan`] for keyd streams. For each key group the values are transformed via the `f` combinator.
510 ///
511 /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
512 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
513 /// early by returning `None`.
514 ///
515 /// The function takes a mutable reference to the accumulator and the current element, and returns
516 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
517 /// If the function returns `None`, the stream is terminated and no more elements are processed.
518 ///
519 /// # Example
520 /// ```rust
521 /// # use hydro_lang::*;
522 /// # use futures::StreamExt;
523 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
524 /// process
525 /// .source_iter(q!(vec![(0, 1), (0, 2), (1, 3), (1, 4)]))
526 /// .into_keyed()
527 /// .scan(
528 /// q!(|| 0),
529 /// q!(|acc, x| {
530 /// *acc += x;
531 /// Some(*acc)
532 /// }),
533 /// )
534 /// # .entries()
535 /// # }, |mut stream| async move {
536 /// // Output: { 0: [1, 3], 1: [3, 7] }
537 /// # for w in vec![(0, 1), (0, 3), (1, 3), (1, 7)] {
538 /// # assert_eq!(stream.next().await.unwrap(), w);
539 /// # }
540 /// # }));
541 /// ```
542 pub fn scan<A, U, I, F>(
543 self,
544 init: impl IntoQuotedMut<'a, I, L> + Copy,
545 f: impl IntoQuotedMut<'a, F, L> + Copy,
546 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
547 where
548 K: Clone,
549 I: Fn() -> A + 'a,
550 F: Fn(&mut A, V) -> Option<U> + 'a,
551 {
552 KeyedStream {
553 underlying: self
554 .underlying
555 .assume_ordering::<TotalOrder>(
556 nondet!(/** keyed scan does not rely on order of keys */),
557 )
558 .scan_keyed(init, f)
559 .into(),
560 _phantom_order: Default::default(),
561 }
562 }
563
564 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed in-order across the values
565 /// in each group. But the aggregation function returns a boolean, which when true indicates that the aggregated
566 /// result is complete and can be released to downstream computation. Unlike [`Stream::fold_keyed`], this means that
567 /// even if the input stream is [`crate::Unbounded`], the outputs of the fold can be processed like normal stream elements.
568 ///
569 /// # Example
570 /// ```rust
571 /// # use hydro_lang::*;
572 /// # use futures::StreamExt;
573 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
574 /// process
575 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
576 /// .into_keyed()
577 /// .fold_early_stop(
578 /// q!(|| 0),
579 /// q!(|acc, x| {
580 /// *acc += x;
581 /// x % 2 == 0
582 /// }),
583 /// )
584 /// # .entries()
585 /// # }, |mut stream| async move {
586 /// // Output: { 0: 2, 1: 9 }
587 /// # for w in vec![(0, 2), (1, 9)] {
588 /// # assert_eq!(stream.next().await.unwrap(), w);
589 /// # }
590 /// # }));
591 /// ```
592 pub fn fold_early_stop<A, I, F>(
593 self,
594 init: impl IntoQuotedMut<'a, I, L> + Copy,
595 f: impl IntoQuotedMut<'a, F, L> + Copy,
596 ) -> KeyedStream<K, A, L, B, TotalOrder, ExactlyOnce>
597 where
598 K: Clone,
599 I: Fn() -> A + 'a,
600 F: Fn(&mut A, V) -> bool + 'a,
601 {
602 KeyedStream {
603 underlying: {
604 self.underlying
605 .assume_ordering::<TotalOrder>(
606 nondet!(/** keyed scan does not rely on order of keys */),
607 )
608 .fold_keyed_early_stop(init, f)
609 .into()
610 },
611 _phantom_order: Default::default(),
612 }
613 }
614
615 /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
616 ///
617 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
618 /// to depend on the order of elements in the group.
619 ///
620 /// If the input and output value types are the same and do not require initialization then use
621 /// [`KeyedStream::reduce`].
622 ///
623 /// # Example
624 /// ```rust
625 /// # use hydro_lang::*;
626 /// # use futures::StreamExt;
627 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
628 /// let tick = process.tick();
629 /// let numbers = process
630 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
631 /// .into_keyed();
632 /// let batch = numbers.batch(&tick, nondet!(/** test */));
633 /// batch
634 /// .fold(q!(|| 0), q!(|acc, x| *acc += x))
635 /// .entries()
636 /// .all_ticks()
637 /// # }, |mut stream| async move {
638 /// // (1, 5), (2, 7)
639 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
640 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
641 /// # }));
642 /// ```
643 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
644 self,
645 init: impl IntoQuotedMut<'a, I, L>,
646 comb: impl IntoQuotedMut<'a, F, L>,
647 ) -> KeyedSingleton<K, A, L, B> {
648 let init = init.splice_fn0_ctx(&self.underlying.location).into();
649 let comb = comb
650 .splice_fn2_borrow_mut_ctx(&self.underlying.location)
651 .into();
652
653 let out_ir = HydroNode::FoldKeyed {
654 init,
655 acc: comb,
656 input: Box::new(self.underlying.ir_node.into_inner()),
657 metadata: self.underlying.location.new_node_metadata::<(K, A)>(),
658 };
659
660 KeyedSingleton {
661 underlying: Stream::new(self.underlying.location, out_ir),
662 }
663 }
664
665 /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
666 ///
667 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
668 /// to depend on the order of elements in the stream.
669 ///
670 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
671 ///
672 /// # Example
673 /// ```rust
674 /// # use hydro_lang::*;
675 /// # use futures::StreamExt;
676 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
677 /// let tick = process.tick();
678 /// let numbers = process
679 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
680 /// .into_keyed();
681 /// let batch = numbers.batch(&tick, nondet!(/** test */));
682 /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
683 /// # }, |mut stream| async move {
684 /// // (1, 5), (2, 7)
685 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
686 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
687 /// # }));
688 /// ```
689 pub fn reduce<F: Fn(&mut V, V) + 'a>(
690 self,
691 comb: impl IntoQuotedMut<'a, F, L>,
692 ) -> KeyedSingleton<K, V, L, B> {
693 let f = comb
694 .splice_fn2_borrow_mut_ctx(&self.underlying.location)
695 .into();
696
697 let out_ir = HydroNode::ReduceKeyed {
698 f,
699 input: Box::new(self.underlying.ir_node.into_inner()),
700 metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
701 };
702
703 KeyedSingleton {
704 underlying: Stream::new(self.underlying.location, out_ir),
705 }
706 }
707
708 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
709 ///
710 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
711 /// to depend on the order of elements in the stream.
712 ///
713 /// # Example
714 /// ```rust
715 /// # use hydro_lang::*;
716 /// # use futures::StreamExt;
717 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
718 /// let tick = process.tick();
719 /// let watermark = tick.singleton(q!(1));
720 /// let numbers = process
721 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
722 /// .into_keyed();
723 /// let batch = numbers.batch(&tick, nondet!(/** test */));
724 /// batch
725 /// .reduce_watermark(watermark, q!(|acc, x| *acc += x))
726 /// .entries()
727 /// .all_ticks()
728 /// # }, |mut stream| async move {
729 /// // (2, 204)
730 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
731 /// # }));
732 /// ```
733 pub fn reduce_watermark<O, F>(
734 self,
735 other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
736 comb: impl IntoQuotedMut<'a, F, L>,
737 ) -> KeyedSingleton<K, V, L, B>
738 where
739 O: Clone,
740 F: Fn(&mut V, V) + 'a,
741 {
742 let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
743 check_matching_location(&self.underlying.location.root(), other.location.outer());
744 let f = comb
745 .splice_fn2_borrow_mut_ctx(&self.underlying.location)
746 .into();
747
748 let out_ir = Stream::new(
749 self.underlying.location.clone(),
750 HydroNode::ReduceKeyedWatermark {
751 f,
752 input: Box::new(self.underlying.ir_node.into_inner()),
753 watermark: Box::new(other.ir_node.into_inner()),
754 metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
755 },
756 );
757
758 KeyedSingleton { underlying: out_ir }
759 }
760}
761
762impl<'a, K, V, L, B, O> KeyedStream<K, V, L, B, O, ExactlyOnce>
763where
764 K: Eq + Hash,
765 L: Location<'a>,
766{
767 /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
768 ///
769 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
770 ///
771 /// If the input and output value types are the same and do not require initialization then use
772 /// [`KeyedStream::reduce_commutative`].
773 ///
774 /// # Example
775 /// ```rust
776 /// # use hydro_lang::*;
777 /// # use futures::StreamExt;
778 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
779 /// let tick = process.tick();
780 /// let numbers = process
781 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
782 /// .into_keyed();
783 /// let batch = numbers.batch(&tick, nondet!(/** test */));
784 /// batch
785 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
786 /// .entries()
787 /// .all_ticks()
788 /// # }, |mut stream| async move {
789 /// // (1, 5), (2, 7)
790 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
791 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
792 /// # }));
793 /// ```
794 pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
795 self,
796 init: impl IntoQuotedMut<'a, I, L>,
797 comb: impl IntoQuotedMut<'a, F, L>,
798 ) -> KeyedSingleton<K, A, L, B> {
799 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
800 .fold(init, comb)
801 }
802
803 /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
804 ///
805 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
806 ///
807 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
808 ///
809 /// # Example
810 /// ```rust
811 /// # use hydro_lang::*;
812 /// # use futures::StreamExt;
813 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
814 /// let tick = process.tick();
815 /// let numbers = process
816 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
817 /// .into_keyed();
818 /// let batch = numbers.batch(&tick, nondet!(/** test */));
819 /// batch
820 /// .reduce_commutative(q!(|acc, x| *acc += x))
821 /// .entries()
822 /// .all_ticks()
823 /// # }, |mut stream| async move {
824 /// // (1, 5), (2, 7)
825 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
826 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
827 /// # }));
828 /// ```
829 pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
830 self,
831 comb: impl IntoQuotedMut<'a, F, L>,
832 ) -> KeyedSingleton<K, V, L, B> {
833 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
834 .reduce(comb)
835 }
836
837 /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
838 ///
839 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
840 ///
841 /// # Example
842 /// ```rust
843 /// # use hydro_lang::*;
844 /// # use futures::StreamExt;
845 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
846 /// let tick = process.tick();
847 /// let watermark = tick.singleton(q!(1));
848 /// let numbers = process
849 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
850 /// .into_keyed();
851 /// let batch = numbers.batch(&tick, nondet!(/** test */));
852 /// batch
853 /// .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
854 /// .entries()
855 /// .all_ticks()
856 /// # }, |mut stream| async move {
857 /// // (2, 204)
858 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
859 /// # }));
860 /// ```
861 pub fn reduce_watermark_commutative<O2, F>(
862 self,
863 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
864 comb: impl IntoQuotedMut<'a, F, L>,
865 ) -> KeyedSingleton<K, V, L, B>
866 where
867 O2: Clone,
868 F: Fn(&mut V, V) + 'a,
869 {
870 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
871 .reduce_watermark(other, comb)
872 }
873}
874
875impl<'a, K, V, L, B, R> KeyedStream<K, V, L, B, TotalOrder, R>
876where
877 K: Eq + Hash,
878 L: Location<'a>,
879{
880 /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
881 ///
882 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
883 ///
884 /// If the input and output value types are the same and do not require initialization then use
885 /// [`KeyedStream::reduce_idempotent`].
886 ///
887 /// # Example
888 /// ```rust
889 /// # use hydro_lang::*;
890 /// # use futures::StreamExt;
891 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
892 /// let tick = process.tick();
893 /// let numbers = process
894 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
895 /// .into_keyed();
896 /// let batch = numbers.batch(&tick, nondet!(/** test */));
897 /// batch
898 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
899 /// .entries()
900 /// .all_ticks()
901 /// # }, |mut stream| async move {
902 /// // (1, false), (2, true)
903 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
904 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
905 /// # }));
906 /// ```
907 pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
908 self,
909 init: impl IntoQuotedMut<'a, I, L>,
910 comb: impl IntoQuotedMut<'a, F, L>,
911 ) -> KeyedSingleton<K, A, L, B> {
912 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
913 .fold(init, comb)
914 }
915
916 /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
917 ///
918 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
919 ///
920 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
921 ///
922 /// # Example
923 /// ```rust
924 /// # use hydro_lang::*;
925 /// # use futures::StreamExt;
926 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
927 /// let tick = process.tick();
928 /// let numbers = process
929 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
930 /// .into_keyed();
931 /// let batch = numbers.batch(&tick, nondet!(/** test */));
932 /// batch
933 /// .reduce_idempotent(q!(|acc, x| *acc |= x))
934 /// .entries()
935 /// .all_ticks()
936 /// # }, |mut stream| async move {
937 /// // (1, false), (2, true)
938 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
939 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
940 /// # }));
941 /// ```
942 pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
943 self,
944 comb: impl IntoQuotedMut<'a, F, L>,
945 ) -> KeyedSingleton<K, V, L, B> {
946 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
947 .reduce(comb)
948 }
949
950 /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
951 ///
952 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
953 ///
954 /// # Example
955 /// ```rust
956 /// # use hydro_lang::*;
957 /// # use futures::StreamExt;
958 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
959 /// let tick = process.tick();
960 /// let watermark = tick.singleton(q!(1));
961 /// let numbers = process
962 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
963 /// .into_keyed();
964 /// let batch = numbers.batch(&tick, nondet!(/** test */));
965 /// batch
966 /// .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
967 /// .entries()
968 /// .all_ticks()
969 /// # }, |mut stream| async move {
970 /// // (2, true)
971 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
972 /// # }));
973 /// ```
974 pub fn reduce_watermark_idempotent<O2, F>(
975 self,
976 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
977 comb: impl IntoQuotedMut<'a, F, L>,
978 ) -> KeyedSingleton<K, V, L, B>
979 where
980 O2: Clone,
981 F: Fn(&mut V, V) + 'a,
982 {
983 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
984 .reduce_watermark(other, comb)
985 }
986}
987
988impl<'a, K, V, L, B, O, R> KeyedStream<K, V, L, B, O, R>
989where
990 K: Eq + Hash,
991 L: Location<'a>,
992{
993 /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
994 ///
995 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
996 /// as there may be non-deterministic duplicates.
997 ///
998 /// If the input and output value types are the same and do not require initialization then use
999 /// [`KeyedStream::reduce_commutative_idempotent`].
1000 ///
1001 /// # Example
1002 /// ```rust
1003 /// # use hydro_lang::*;
1004 /// # use futures::StreamExt;
1005 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1006 /// let tick = process.tick();
1007 /// let numbers = process
1008 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1009 /// .into_keyed();
1010 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1011 /// batch
1012 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1013 /// .entries()
1014 /// .all_ticks()
1015 /// # }, |mut stream| async move {
1016 /// // (1, false), (2, true)
1017 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1018 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1019 /// # }));
1020 /// ```
1021 pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1022 self,
1023 init: impl IntoQuotedMut<'a, I, L>,
1024 comb: impl IntoQuotedMut<'a, F, L>,
1025 ) -> KeyedSingleton<K, A, L, B> {
1026 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1027 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1028 .fold(init, comb)
1029 }
1030
1031 /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1032 ///
1033 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1034 /// as there may be non-deterministic duplicates.
1035 ///
1036 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1037 ///
1038 /// # Example
1039 /// ```rust
1040 /// # use hydro_lang::*;
1041 /// # use futures::StreamExt;
1042 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1043 /// let tick = process.tick();
1044 /// let numbers = process
1045 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1046 /// .into_keyed();
1047 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1048 /// batch
1049 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1050 /// .entries()
1051 /// .all_ticks()
1052 /// # }, |mut stream| async move {
1053 /// // (1, false), (2, true)
1054 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1055 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1056 /// # }));
1057 /// ```
1058 pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1059 self,
1060 comb: impl IntoQuotedMut<'a, F, L>,
1061 ) -> KeyedSingleton<K, V, L, B> {
1062 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1063 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1064 .reduce(comb)
1065 }
1066
1067 /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1068 ///
1069 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1070 /// as there may be non-deterministic duplicates.
1071 ///
1072 /// # Example
1073 /// ```rust
1074 /// # use hydro_lang::*;
1075 /// # use futures::StreamExt;
1076 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1077 /// let tick = process.tick();
1078 /// let watermark = tick.singleton(q!(1));
1079 /// let numbers = process
1080 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1081 /// .into_keyed();
1082 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1083 /// batch
1084 /// .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
1085 /// .entries()
1086 /// .all_ticks()
1087 /// # }, |mut stream| async move {
1088 /// // (2, true)
1089 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1090 /// # }));
1091 /// ```
1092 pub fn reduce_watermark_commutative_idempotent<O2, F>(
1093 self,
1094 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1095 comb: impl IntoQuotedMut<'a, F, L>,
1096 ) -> KeyedSingleton<K, V, L, B>
1097 where
1098 O2: Clone,
1099 F: Fn(&mut V, V) + 'a,
1100 {
1101 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1102 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1103 .reduce_watermark(other, comb)
1104 }
1105
1106 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1107 /// whose keys are not in the bounded stream.
1108 ///
1109 /// # Example
1110 /// ```rust
1111 /// # use hydro_lang::*;
1112 /// # use futures::StreamExt;
1113 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1114 /// let tick = process.tick();
1115 /// let keyed_stream = process
1116 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1117 /// .batch(&tick, nondet!(/** test */))
1118 /// .into_keyed();
1119 /// let keys_to_remove = process
1120 /// .source_iter(q!(vec![1, 2]))
1121 /// .batch(&tick, nondet!(/** test */));
1122 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1123 /// # .entries()
1124 /// # }, |mut stream| async move {
1125 /// // { 3: ['c'], 4: ['d'] }
1126 /// # for w in vec![(3, 'c'), (4, 'd')] {
1127 /// # assert_eq!(stream.next().await.unwrap(), w);
1128 /// # }
1129 /// # }));
1130 pub fn filter_key_not_in<O2, R2>(self, other: Stream<K, L, Bounded, O2, R2>) -> Self {
1131 KeyedStream {
1132 underlying: self.entries().anti_join(other),
1133 _phantom_order: Default::default(),
1134 }
1135 }
1136}
1137
1138impl<'a, K, V, L, B, O, R> KeyedStream<K, V, L, B, O, R>
1139where
1140 L: Location<'a> + NoTick + NoAtomic,
1141{
1142 pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1143 KeyedStream {
1144 underlying: self.underlying.atomic(tick),
1145 _phantom_order: Default::default(),
1146 }
1147 }
1148
1149 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1150 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1151 /// the order of the input.
1152 ///
1153 /// # Non-Determinism
1154 /// The batch boundaries are non-deterministic and may change across executions.
1155 pub fn batch(
1156 self,
1157 tick: &Tick<L>,
1158 nondet: NonDet,
1159 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1160 self.atomic(tick).batch(nondet)
1161 }
1162}
1163
1164impl<'a, K, V, L, B, O, R> KeyedStream<K, V, Atomic<L>, B, O, R>
1165where
1166 L: Location<'a> + NoTick + NoAtomic,
1167{
1168 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1169 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1170 /// the order of the input.
1171 ///
1172 /// # Non-Determinism
1173 /// The batch boundaries are non-deterministic and may change across executions.
1174 pub fn batch(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1175 KeyedStream {
1176 underlying: self.underlying.batch(nondet),
1177 _phantom_order: Default::default(),
1178 }
1179 }
1180}
1181
1182impl<'a, K, V, L, O, R> KeyedStream<K, V, Tick<L>, Bounded, O, R>
1183where
1184 L: Location<'a>,
1185{
1186 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1187 KeyedStream {
1188 underlying: self.underlying.all_ticks(),
1189 _phantom_order: Default::default(),
1190 }
1191 }
1192}
1193
1194#[cfg(test)]
1195mod tests {
1196 use futures::{SinkExt, StreamExt};
1197 use hydro_deploy::Deployment;
1198 use stageleft::q;
1199
1200 use crate::location::Location;
1201 use crate::{FlowBuilder, nondet};
1202
1203 #[tokio::test]
1204 async fn reduce_watermark_filter() {
1205 let mut deployment = Deployment::new();
1206
1207 let flow = FlowBuilder::new();
1208 let node = flow.process::<()>();
1209 let external = flow.external::<()>();
1210
1211 let node_tick = node.tick();
1212 let watermark = node_tick.singleton(q!(1));
1213
1214 let sum = node
1215 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1216 .into_keyed()
1217 .reduce_watermark(
1218 watermark,
1219 q!(|acc, v| {
1220 *acc += v;
1221 }),
1222 )
1223 .snapshot(&node_tick, nondet!(/** test */))
1224 .entries()
1225 .all_ticks()
1226 .send_bincode_external(&external);
1227
1228 let nodes = flow
1229 .with_process(&node, deployment.Localhost())
1230 .with_external(&external, deployment.Localhost())
1231 .deploy(&mut deployment);
1232
1233 deployment.deploy().await.unwrap();
1234
1235 let mut out = nodes.connect_source_bincode(sum).await;
1236
1237 deployment.start().await.unwrap();
1238
1239 assert_eq!(out.next().await.unwrap(), (2, 204));
1240 }
1241
1242 #[tokio::test]
1243 async fn reduce_watermark_garbage_collect() {
1244 let mut deployment = Deployment::new();
1245
1246 let flow = FlowBuilder::new();
1247 let node = flow.process::<()>();
1248 let external = flow.external::<()>();
1249 let (tick_send, tick_trigger) = node.source_external_bincode(&external);
1250
1251 let node_tick = node.tick();
1252 let (watermark_complete_cycle, watermark) =
1253 node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
1254 let next_watermark = watermark.clone().map(q!(|v| v + 1));
1255 watermark_complete_cycle.complete_next_tick(next_watermark);
1256
1257 let tick_triggered_input = node
1258 .source_iter(q!([(3, 103)]))
1259 .batch(&node_tick, nondet!(/** test */))
1260 .continue_if(
1261 tick_trigger
1262 .clone()
1263 .batch(&node_tick, nondet!(/** test */))
1264 .first(),
1265 )
1266 .all_ticks();
1267
1268 let sum = node
1269 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1270 .interleave(tick_triggered_input)
1271 .into_keyed()
1272 .reduce_watermark_commutative(
1273 watermark,
1274 q!(|acc, v| {
1275 *acc += v;
1276 }),
1277 )
1278 .snapshot(&node_tick, nondet!(/** test */))
1279 .entries()
1280 .all_ticks()
1281 .send_bincode_external(&external);
1282
1283 let nodes = flow
1284 .with_default_optimize()
1285 .with_process(&node, deployment.Localhost())
1286 .with_external(&external, deployment.Localhost())
1287 .deploy(&mut deployment);
1288
1289 deployment.deploy().await.unwrap();
1290
1291 let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
1292 let mut out_recv = nodes.connect_source_bincode(sum).await;
1293
1294 deployment.start().await.unwrap();
1295
1296 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
1297
1298 tick_send.send(()).await.unwrap();
1299
1300 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
1301 }
1302}