hydro_lang/
stream.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::future::Future;
4use std::hash::Hash;
5use std::marker::PhantomData;
6use std::ops::Deref;
7use std::rc::Rc;
8
9use stageleft::{IntoQuotedMut, QuotedWithContext, q};
10use syn::parse_quote;
11use tokio::time::Instant;
12
13use crate::builder::FLOW_USED_MESSAGE;
14use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
15use crate::ir::{HydroLeaf, HydroNode, TeeNode};
16use crate::keyed_stream::KeyedStream;
17use crate::location::tick::{Atomic, NoAtomic};
18use crate::location::{Location, LocationId, NoTick, Tick, check_matching_location};
19use crate::manual_expr::ManualExpr;
20use crate::unsafety::NonDet;
21use crate::*;
22
23pub mod networking;
24
25/// Marks the stream as being totally ordered, which means that there are
26/// no sources of non-determinism (other than intentional ones) that will
27/// affect the order of elements.
28pub enum TotalOrder {}
29
30/// Marks the stream as having no order, which means that the order of
31/// elements may be affected by non-determinism.
32///
33/// This restricts certain operators, such as `fold` and `reduce`, to only
34/// be used with commutative aggregation functions.
35pub enum NoOrder {}
36
37/// Helper trait for determining the weakest of two orderings.
38#[sealed::sealed]
39pub trait MinOrder<Other> {
40    /// The weaker of the two orderings.
41    type Min;
42}
43
44#[sealed::sealed]
45impl<T> MinOrder<T> for T {
46    type Min = T;
47}
48
49#[sealed::sealed]
50impl MinOrder<NoOrder> for TotalOrder {
51    type Min = NoOrder;
52}
53
54#[sealed::sealed]
55impl MinOrder<TotalOrder> for NoOrder {
56    type Min = NoOrder;
57}
58
59/// Marks the stream as having deterministic message cardinality, with no
60/// possibility of duplicates.
61pub enum ExactlyOnce {}
62
63/// Marks the stream as having non-deterministic message cardinality, which
64/// means that duplicates may occur, but messages will not be dropped.
65pub enum AtLeastOnce {}
66
67/// Helper trait for determining the weakest of two retry guarantees.
68#[sealed::sealed]
69pub trait MinRetries<Other> {
70    /// The weaker of the two retry guarantees.
71    type Min;
72}
73
74#[sealed::sealed]
75impl<T> MinRetries<T> for T {
76    type Min = T;
77}
78
79#[sealed::sealed]
80impl MinRetries<ExactlyOnce> for AtLeastOnce {
81    type Min = AtLeastOnce;
82}
83
84#[sealed::sealed]
85impl MinRetries<AtLeastOnce> for ExactlyOnce {
86    type Min = AtLeastOnce;
87}
88
89/// An ordered sequence stream of elements of type `T`.
90///
91/// Type Parameters:
92/// - `Type`: the type of elements in the stream
93/// - `Loc`: the location where the stream is being materialized
94/// - `Bound`: the boundedness of the stream, which is either [`Bounded`]
95///   or [`Unbounded`]
96/// - `Order`: the ordering of the stream, which is either [`TotalOrder`]
97///   or [`NoOrder`] (default is [`TotalOrder`])
98pub struct Stream<Type, Loc, Bound, Order = TotalOrder, Retries = ExactlyOnce> {
99    pub(crate) location: Loc,
100    pub(crate) ir_node: RefCell<HydroNode>,
101
102    _phantom: PhantomData<(Type, Loc, Bound, Order, Retries)>,
103}
104
105impl<'a, T, L, O, R> From<Stream<T, L, Bounded, O, R>> for Stream<T, L, Unbounded, O, R>
106where
107    L: Location<'a>,
108{
109    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
110        Stream {
111            location: stream.location,
112            ir_node: stream.ir_node,
113            _phantom: PhantomData,
114        }
115    }
116}
117
118impl<'a, T, L, B, R> From<Stream<T, L, B, TotalOrder, R>> for Stream<T, L, B, NoOrder, R>
119where
120    L: Location<'a>,
121{
122    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
123        Stream {
124            location: stream.location,
125            ir_node: stream.ir_node,
126            _phantom: PhantomData,
127        }
128    }
129}
130
131impl<'a, T, L, B, O> From<Stream<T, L, B, O, ExactlyOnce>> for Stream<T, L, B, O, AtLeastOnce>
132where
133    L: Location<'a>,
134{
135    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
136        Stream {
137            location: stream.location,
138            ir_node: stream.ir_node,
139            _phantom: PhantomData,
140        }
141    }
142}
143
144impl<'a, T, L, O, R> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
145where
146    L: Location<'a>,
147{
148    fn defer_tick(self) -> Self {
149        Stream::defer_tick(self)
150    }
151}
152
153impl<'a, T, L, O, R> CycleCollection<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, O, R>
154where
155    L: Location<'a>,
156{
157    type Location = Tick<L>;
158
159    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
160        Stream::new(
161            location.clone(),
162            HydroNode::CycleSource {
163                ident,
164                metadata: location.new_node_metadata::<T>(),
165            },
166        )
167    }
168}
169
170impl<'a, T, L, O, R> CycleComplete<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, O, R>
171where
172    L: Location<'a>,
173{
174    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
175        assert_eq!(
176            self.location.id(),
177            expected_location,
178            "locations do not match"
179        );
180        self.location
181            .flow_state()
182            .borrow_mut()
183            .leaves
184            .as_mut()
185            .expect(FLOW_USED_MESSAGE)
186            .push(HydroLeaf::CycleSink {
187                ident,
188                input: Box::new(self.ir_node.into_inner()),
189                metadata: self.location.new_node_metadata::<T>(),
190            });
191    }
192}
193
194impl<'a, T, L, B, O, R> CycleCollection<'a, ForwardRefMarker> for Stream<T, L, B, O, R>
195where
196    L: Location<'a> + NoTick,
197{
198    type Location = L;
199
200    fn create_source(ident: syn::Ident, location: L) -> Self {
201        Stream::new(
202            location.clone(),
203            HydroNode::Persist {
204                inner: Box::new(HydroNode::CycleSource {
205                    ident,
206                    metadata: location.new_node_metadata::<T>(),
207                }),
208                metadata: location.new_node_metadata::<T>(),
209            },
210        )
211    }
212}
213
214impl<'a, T, L, B, O, R> CycleComplete<'a, ForwardRefMarker> for Stream<T, L, B, O, R>
215where
216    L: Location<'a> + NoTick,
217{
218    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
219        assert_eq!(
220            self.location.id(),
221            expected_location,
222            "locations do not match"
223        );
224        let metadata = self.location.new_node_metadata::<T>();
225        self.location
226            .flow_state()
227            .borrow_mut()
228            .leaves
229            .as_mut()
230            .expect(FLOW_USED_MESSAGE)
231            .push(HydroLeaf::CycleSink {
232                ident,
233                input: Box::new(HydroNode::Unpersist {
234                    inner: Box::new(self.ir_node.into_inner()),
235                    metadata: metadata.clone(),
236                }),
237                metadata,
238            });
239    }
240}
241
242impl<'a, T, L, B, O, R> Clone for Stream<T, L, B, O, R>
243where
244    T: Clone,
245    L: Location<'a>,
246{
247    fn clone(&self) -> Self {
248        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
249            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
250            *self.ir_node.borrow_mut() = HydroNode::Tee {
251                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
252                metadata: self.location.new_node_metadata::<T>(),
253            };
254        }
255
256        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
257            Stream {
258                location: self.location.clone(),
259                ir_node: HydroNode::Tee {
260                    inner: TeeNode(inner.0.clone()),
261                    metadata: metadata.clone(),
262                }
263                .into(),
264                _phantom: PhantomData,
265            }
266        } else {
267            unreachable!()
268        }
269    }
270}
271
272impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
273where
274    L: Location<'a>,
275{
276    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
277        Stream {
278            location,
279            ir_node: RefCell::new(ir_node),
280            _phantom: PhantomData,
281        }
282    }
283
284    /// Produces a stream based on invoking `f` on each element.
285    /// If you do not want to modify the stream and instead only want to view
286    /// each item use [`Stream::inspect`] instead.
287    ///
288    /// # Example
289    /// ```rust
290    /// # use hydro_lang::*;
291    /// # use futures::StreamExt;
292    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
293    /// let words = process.source_iter(q!(vec!["hello", "world"]));
294    /// words.map(q!(|x| x.to_uppercase()))
295    /// # }, |mut stream| async move {
296    /// # for w in vec!["HELLO", "WORLD"] {
297    /// #     assert_eq!(stream.next().await.unwrap(), w);
298    /// # }
299    /// # }));
300    /// ```
301    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
302    where
303        F: Fn(T) -> U + 'a,
304    {
305        let f = f.splice_fn1_ctx(&self.location).into();
306        Stream::new(
307            self.location.clone(),
308            HydroNode::Map {
309                f,
310                input: Box::new(self.ir_node.into_inner()),
311                metadata: self.location.new_node_metadata::<U>(),
312            },
313        )
314    }
315
316    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
317    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
318    /// for the output type `U` must produce items in a **deterministic** order.
319    ///
320    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
321    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
322    ///
323    /// # Example
324    /// ```rust
325    /// # use hydro_lang::*;
326    /// # use futures::StreamExt;
327    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
328    /// process
329    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
330    ///     .flat_map_ordered(q!(|x| x))
331    /// # }, |mut stream| async move {
332    /// // 1, 2, 3, 4
333    /// # for w in (1..5) {
334    /// #     assert_eq!(stream.next().await.unwrap(), w);
335    /// # }
336    /// # }));
337    /// ```
338    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
339    where
340        I: IntoIterator<Item = U>,
341        F: Fn(T) -> I + 'a,
342    {
343        let f = f.splice_fn1_ctx(&self.location).into();
344        Stream::new(
345            self.location.clone(),
346            HydroNode::FlatMap {
347                f,
348                input: Box::new(self.ir_node.into_inner()),
349                metadata: self.location.new_node_metadata::<U>(),
350            },
351        )
352    }
353
354    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
355    /// for the output type `U` to produce items in any order.
356    ///
357    /// # Example
358    /// ```rust
359    /// # use hydro_lang::{*, stream::ExactlyOnce};
360    /// # use futures::StreamExt;
361    /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
362    /// process
363    ///     .source_iter(q!(vec![
364    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
365    ///         std::collections::HashSet::from_iter(vec![3, 4]),
366    ///     ]))
367    ///     .flat_map_unordered(q!(|x| x))
368    /// # }, |mut stream| async move {
369    /// // 1, 2, 3, 4, but in no particular order
370    /// # let mut results = Vec::new();
371    /// # for w in (1..5) {
372    /// #     results.push(stream.next().await.unwrap());
373    /// # }
374    /// # results.sort();
375    /// # assert_eq!(results, vec![1, 2, 3, 4]);
376    /// # }));
377    /// ```
378    pub fn flat_map_unordered<U, I, F>(
379        self,
380        f: impl IntoQuotedMut<'a, F, L>,
381    ) -> Stream<U, L, B, NoOrder, R>
382    where
383        I: IntoIterator<Item = U>,
384        F: Fn(T) -> I + 'a,
385    {
386        let f = f.splice_fn1_ctx(&self.location).into();
387        Stream::new(
388            self.location.clone(),
389            HydroNode::FlatMap {
390                f,
391                input: Box::new(self.ir_node.into_inner()),
392                metadata: self.location.new_node_metadata::<U>(),
393            },
394        )
395    }
396
397    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
398    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
399    ///
400    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
401    /// not deterministic, use [`Stream::flatten_unordered`] instead.
402    ///
403    /// ```rust
404    /// # use hydro_lang::*;
405    /// # use futures::StreamExt;
406    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
407    /// process
408    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
409    ///     .flatten_ordered()
410    /// # }, |mut stream| async move {
411    /// // 1, 2, 3, 4
412    /// # for w in (1..5) {
413    /// #     assert_eq!(stream.next().await.unwrap(), w);
414    /// # }
415    /// # }));
416    /// ```
417    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
418    where
419        T: IntoIterator<Item = U>,
420    {
421        self.flat_map_ordered(q!(|d| d))
422    }
423
424    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
425    /// for the element type `T` to produce items in any order.
426    ///
427    /// # Example
428    /// ```rust
429    /// # use hydro_lang::{*, stream::ExactlyOnce};
430    /// # use futures::StreamExt;
431    /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
432    /// process
433    ///     .source_iter(q!(vec![
434    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
435    ///         std::collections::HashSet::from_iter(vec![3, 4]),
436    ///     ]))
437    ///     .flatten_unordered()
438    /// # }, |mut stream| async move {
439    /// // 1, 2, 3, 4, but in no particular order
440    /// # let mut results = Vec::new();
441    /// # for w in (1..5) {
442    /// #     results.push(stream.next().await.unwrap());
443    /// # }
444    /// # results.sort();
445    /// # assert_eq!(results, vec![1, 2, 3, 4]);
446    /// # }));
447    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
448    where
449        T: IntoIterator<Item = U>,
450    {
451        self.flat_map_unordered(q!(|d| d))
452    }
453
454    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
455    /// `f`, preserving the order of the elements.
456    ///
457    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
458    /// not modify or take ownership of the values. If you need to modify the values while filtering
459    /// use [`Stream::filter_map`] instead.
460    ///
461    /// # Example
462    /// ```rust
463    /// # use hydro_lang::*;
464    /// # use futures::StreamExt;
465    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
466    /// process
467    ///     .source_iter(q!(vec![1, 2, 3, 4]))
468    ///     .filter(q!(|&x| x > 2))
469    /// # }, |mut stream| async move {
470    /// // 3, 4
471    /// # for w in (3..5) {
472    /// #     assert_eq!(stream.next().await.unwrap(), w);
473    /// # }
474    /// # }));
475    /// ```
476    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
477    where
478        F: Fn(&T) -> bool + 'a,
479    {
480        let f = f.splice_fn1_borrow_ctx(&self.location).into();
481        Stream::new(
482            self.location.clone(),
483            HydroNode::Filter {
484                f,
485                input: Box::new(self.ir_node.into_inner()),
486                metadata: self.location.new_node_metadata::<T>(),
487            },
488        )
489    }
490
491    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
492    ///
493    /// # Example
494    /// ```rust
495    /// # use hydro_lang::*;
496    /// # use futures::StreamExt;
497    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
498    /// process
499    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
500    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
501    /// # }, |mut stream| async move {
502    /// // 1, 2
503    /// # for w in (1..3) {
504    /// #     assert_eq!(stream.next().await.unwrap(), w);
505    /// # }
506    /// # }));
507    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
508    where
509        F: Fn(T) -> Option<U> + 'a,
510    {
511        let f = f.splice_fn1_ctx(&self.location).into();
512        Stream::new(
513            self.location.clone(),
514            HydroNode::FilterMap {
515                f,
516                input: Box::new(self.ir_node.into_inner()),
517                metadata: self.location.new_node_metadata::<U>(),
518            },
519        )
520    }
521
522    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
523    /// where `x` is the final value of `other`, a bounded [`Singleton`].
524    ///
525    /// # Example
526    /// ```rust
527    /// # use hydro_lang::*;
528    /// # use futures::StreamExt;
529    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
530    /// let tick = process.tick();
531    /// let batch = process
532    ///   .source_iter(q!(vec![1, 2, 3, 4]))
533    ///   .batch(&tick, nondet!(/** test */));
534    /// let count = batch.clone().count(); // `count()` returns a singleton
535    /// batch.cross_singleton(count).all_ticks()
536    /// # }, |mut stream| async move {
537    /// // (1, 4), (2, 4), (3, 4), (4, 4)
538    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
539    /// #     assert_eq!(stream.next().await.unwrap(), w);
540    /// # }
541    /// # }));
542    pub fn cross_singleton<O2>(
543        self,
544        other: impl Into<Optional<O2, L, Bounded>>,
545    ) -> Stream<(T, O2), L, B, O, R>
546    where
547        O2: Clone,
548    {
549        let other: Optional<O2, L, Bounded> = other.into();
550        check_matching_location(&self.location, &other.location);
551
552        Stream::new(
553            self.location.clone(),
554            HydroNode::CrossSingleton {
555                left: Box::new(self.ir_node.into_inner()),
556                right: Box::new(other.ir_node.into_inner()),
557                metadata: self.location.new_node_metadata::<(T, O2)>(),
558            },
559        )
560    }
561
562    /// Allow this stream through if the argument (a Bounded Optional) is non-empty, otherwise the output is empty.
563    pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
564        self.cross_singleton(signal.map(q!(|_u| ())))
565            .map(q!(|(d, _signal)| d))
566    }
567
568    /// Allow this stream through if the argument (a Bounded Optional) is empty, otherwise the output is empty.
569    pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
570        self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
571    }
572
573    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
574    /// tupled pairs in a non-deterministic order.
575    ///
576    /// # Example
577    /// ```rust
578    /// # use hydro_lang::*;
579    /// # use std::collections::HashSet;
580    /// # use futures::StreamExt;
581    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
582    /// let tick = process.tick();
583    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
584    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
585    /// stream1.cross_product(stream2)
586    /// # }, |mut stream| async move {
587    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
588    /// # stream.map(|i| assert!(expected.contains(&i)));
589    /// # }));
590    pub fn cross_product<T2, O2>(
591        self,
592        other: Stream<T2, L, B, O2, R>,
593    ) -> Stream<(T, T2), L, B, NoOrder, R>
594    where
595        T: Clone,
596        T2: Clone,
597    {
598        check_matching_location(&self.location, &other.location);
599
600        Stream::new(
601            self.location.clone(),
602            HydroNode::CrossProduct {
603                left: Box::new(self.ir_node.into_inner()),
604                right: Box::new(other.ir_node.into_inner()),
605                metadata: self.location.new_node_metadata::<(T, T2)>(),
606            },
607        )
608    }
609
610    /// Takes one stream as input and filters out any duplicate occurrences. The output
611    /// contains all unique values from the input.
612    ///
613    /// # Example
614    /// ```rust
615    /// # use hydro_lang::*;
616    /// # use futures::StreamExt;
617    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
618    /// let tick = process.tick();
619    ///     process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
620    /// # }, |mut stream| async move {
621    /// # for w in vec![1, 2, 3, 4] {
622    /// #     assert_eq!(stream.next().await.unwrap(), w);
623    /// # }
624    /// # }));
625    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
626    where
627        T: Eq + Hash,
628    {
629        Stream::new(
630            self.location.clone(),
631            HydroNode::Unique {
632                input: Box::new(self.ir_node.into_inner()),
633                metadata: self.location.new_node_metadata::<T>(),
634            },
635        )
636    }
637
638    /// Outputs everything in this stream that is *not* contained in the `other` stream.
639    ///
640    /// The `other` stream must be [`Bounded`], since this function will wait until
641    /// all its elements are available before producing any output.
642    /// # Example
643    /// ```rust
644    /// # use hydro_lang::*;
645    /// # use futures::StreamExt;
646    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
647    /// let tick = process.tick();
648    /// let stream = process
649    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
650    ///   .batch(&tick, nondet!(/** test */));
651    /// let batch = process
652    ///   .source_iter(q!(vec![1, 2]))
653    ///   .batch(&tick, nondet!(/** test */));
654    /// stream.filter_not_in(batch).all_ticks()
655    /// # }, |mut stream| async move {
656    /// # for w in vec![3, 4] {
657    /// #     assert_eq!(stream.next().await.unwrap(), w);
658    /// # }
659    /// # }));
660    pub fn filter_not_in<O2>(
661        self,
662        other: Stream<T, L, Bounded, O2, R>,
663    ) -> Stream<T, L, Bounded, O, R>
664    where
665        T: Eq + Hash,
666    {
667        check_matching_location(&self.location, &other.location);
668
669        Stream::new(
670            self.location.clone(),
671            HydroNode::Difference {
672                pos: Box::new(self.ir_node.into_inner()),
673                neg: Box::new(other.ir_node.into_inner()),
674                metadata: self.location.new_node_metadata::<T>(),
675            },
676        )
677    }
678
679    /// An operator which allows you to "inspect" each element of a stream without
680    /// modifying it. The closure `f` is called on a reference to each item. This is
681    /// mainly useful for debugging, and should not be used to generate side-effects.
682    ///
683    /// # Example
684    /// ```rust
685    /// # use hydro_lang::*;
686    /// # use futures::StreamExt;
687    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
688    /// let nums = process.source_iter(q!(vec![1, 2]));
689    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
690    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
691    /// # }, |mut stream| async move {
692    /// # for w in vec![1, 2] {
693    /// #     assert_eq!(stream.next().await.unwrap(), w);
694    /// # }
695    /// # }));
696    /// ```
697    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
698    where
699        F: Fn(&T) + 'a,
700    {
701        let f = f.splice_fn1_borrow_ctx(&self.location).into();
702
703        if L::is_top_level() {
704            Stream::new(
705                self.location.clone(),
706                HydroNode::Persist {
707                    inner: Box::new(HydroNode::Inspect {
708                        f,
709                        input: Box::new(HydroNode::Unpersist {
710                            inner: Box::new(self.ir_node.into_inner()),
711                            metadata: self.location.new_node_metadata::<T>(),
712                        }),
713                        metadata: self.location.new_node_metadata::<T>(),
714                    }),
715                    metadata: self.location.new_node_metadata::<T>(),
716                },
717            )
718        } else {
719            Stream::new(
720                self.location.clone(),
721                HydroNode::Inspect {
722                    f,
723                    input: Box::new(self.ir_node.into_inner()),
724                    metadata: self.location.new_node_metadata::<T>(),
725                },
726            )
727        }
728    }
729
730    /// An operator which allows you to "name" a `HydroNode`.
731    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
732    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
733        {
734            let mut node = self.ir_node.borrow_mut();
735            let metadata = node.metadata_mut();
736            metadata.tag = Some(name.to_string());
737        }
738        self
739    }
740
741    /// Explicitly "casts" the stream to a type with a different ordering
742    /// guarantee. Useful in unsafe code where the ordering cannot be proven
743    /// by the type-system.
744    ///
745    /// # Non-Determinism
746    /// This function is used as an escape hatch, and any mistakes in the
747    /// provided ordering guarantee will propagate into the guarantees
748    /// for the rest of the program.
749    pub fn assume_ordering<O2>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
750        Stream::new(self.location, self.ir_node.into_inner())
751    }
752
753    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
754    /// which is always safe because that is the weakest possible guarantee.
755    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
756        let nondet = nondet!(/** this is a weaker odering guarantee, so it is safe to assume */);
757        self.assume_ordering::<NoOrder>(nondet)
758    }
759
760    /// Explicitly "casts" the stream to a type with a different retries
761    /// guarantee. Useful in unsafe code where the lack of retries cannot
762    /// be proven by the type-system.
763    ///
764    /// # Non-Determinism
765    /// This function is used as an escape hatch, and any mistakes in the
766    /// provided retries guarantee will propagate into the guarantees
767    /// for the rest of the program.
768    pub fn assume_retries<R2>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
769        Stream::new(self.location, self.ir_node.into_inner())
770    }
771
772    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
773    /// which is always safe because that is the weakest possible guarantee.
774    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
775        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
776        self.assume_retries::<AtLeastOnce>(nondet)
777    }
778
779    /// Weakens the retries guarantee provided by the stream to be the weaker of the
780    /// current guarantee and `R2`. This is safe because the output guarantee will
781    /// always be weaker than the input.
782    pub fn weaken_retries<R2>(self) -> Stream<T, L, B, O, <R as MinRetries<R2>>::Min>
783    where
784        R: MinRetries<R2>,
785    {
786        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
787        self.assume_retries::<<R as MinRetries<R2>>::Min>(nondet)
788    }
789}
790
791impl<'a, T, L, B, O> Stream<T, L, B, O, ExactlyOnce>
792where
793    L: Location<'a>,
794{
795    /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
796    /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
797    pub fn weaker_retries<R2>(self) -> Stream<T, L, B, O, R2> {
798        self.assume_retries(
799            nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
800        )
801    }
802}
803
804impl<'a, T, L, B, O, R> Stream<&T, L, B, O, R>
805where
806    L: Location<'a>,
807{
808    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
809    ///
810    /// # Example
811    /// ```rust
812    /// # use hydro_lang::*;
813    /// # use futures::StreamExt;
814    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
815    /// process.source_iter(q!(&[1, 2, 3])).cloned()
816    /// # }, |mut stream| async move {
817    /// // 1, 2, 3
818    /// # for w in vec![1, 2, 3] {
819    /// #     assert_eq!(stream.next().await.unwrap(), w);
820    /// # }
821    /// # }));
822    /// ```
823    pub fn cloned(self) -> Stream<T, L, B, O, R>
824    where
825        T: Clone,
826    {
827        self.map(q!(|d| d.clone()))
828    }
829}
830
831impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
832where
833    L: Location<'a>,
834{
835    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
836    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
837    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
838    ///
839    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
840    /// and there may be duplicates.
841    ///
842    /// # Example
843    /// ```rust
844    /// # use hydro_lang::*;
845    /// # use futures::StreamExt;
846    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
847    /// let tick = process.tick();
848    /// let bools = process.source_iter(q!(vec![false, true, false]));
849    /// let batch = bools.batch(&tick, nondet!(/** test */));
850    /// batch
851    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
852    ///     .all_ticks()
853    /// # }, |mut stream| async move {
854    /// // true
855    /// # assert_eq!(stream.next().await.unwrap(), true);
856    /// # }));
857    /// ```
858    pub fn fold_commutative_idempotent<A, I, F>(
859        self,
860        init: impl IntoQuotedMut<'a, I, L>,
861        comb: impl IntoQuotedMut<'a, F, L>,
862    ) -> Singleton<A, L, B>
863    where
864        I: Fn() -> A + 'a,
865        F: Fn(&mut A, T),
866    {
867        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
868        self.assume_ordering(nondet)
869            .assume_retries(nondet)
870            .fold(init, comb)
871    }
872
873    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
874    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
875    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
876    /// reference, so that it can be modified in place.
877    ///
878    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
879    /// and there may be duplicates.
880    ///
881    /// # Example
882    /// ```rust
883    /// # use hydro_lang::*;
884    /// # use futures::StreamExt;
885    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
886    /// let tick = process.tick();
887    /// let bools = process.source_iter(q!(vec![false, true, false]));
888    /// let batch = bools.batch(&tick, nondet!(/** test */));
889    /// batch
890    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
891    ///     .all_ticks()
892    /// # }, |mut stream| async move {
893    /// // true
894    /// # assert_eq!(stream.next().await.unwrap(), true);
895    /// # }));
896    /// ```
897    pub fn reduce_commutative_idempotent<F>(
898        self,
899        comb: impl IntoQuotedMut<'a, F, L>,
900    ) -> Optional<T, L, B>
901    where
902        F: Fn(&mut T, T) + 'a,
903    {
904        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
905        self.assume_ordering(nondet)
906            .assume_retries(nondet)
907            .reduce(comb)
908    }
909
910    /// Computes the maximum element in the stream as an [`Optional`], which
911    /// will be empty until the first element in the input arrives.
912    ///
913    /// # Example
914    /// ```rust
915    /// # use hydro_lang::*;
916    /// # use futures::StreamExt;
917    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
918    /// let tick = process.tick();
919    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
920    /// let batch = numbers.batch(&tick, nondet!(/** test */));
921    /// batch.max().all_ticks()
922    /// # }, |mut stream| async move {
923    /// // 4
924    /// # assert_eq!(stream.next().await.unwrap(), 4);
925    /// # }));
926    /// ```
927    pub fn max(self) -> Optional<T, L, B>
928    where
929        T: Ord,
930    {
931        self.reduce_commutative_idempotent(q!(|curr, new| {
932            if new > *curr {
933                *curr = new;
934            }
935        }))
936    }
937
938    /// Computes the maximum element in the stream as an [`Optional`], where the
939    /// maximum is determined according to the `key` function. The [`Optional`] will
940    /// be empty until the first element in the input arrives.
941    ///
942    /// # Example
943    /// ```rust
944    /// # use hydro_lang::*;
945    /// # use futures::StreamExt;
946    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
947    /// let tick = process.tick();
948    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
949    /// let batch = numbers.batch(&tick, nondet!(/** test */));
950    /// batch.max_by_key(q!(|x| -x)).all_ticks()
951    /// # }, |mut stream| async move {
952    /// // 1
953    /// # assert_eq!(stream.next().await.unwrap(), 1);
954    /// # }));
955    /// ```
956    pub fn max_by_key<K, F>(self, key: impl IntoQuotedMut<'a, F, L> + Copy) -> Optional<T, L, B>
957    where
958        K: Ord,
959        F: Fn(&T) -> K + 'a,
960    {
961        let f = key.splice_fn1_borrow_ctx(&self.location);
962
963        let wrapped: syn::Expr = parse_quote!({
964            let key_fn = #f;
965            move |curr, new| {
966                if key_fn(&new) > key_fn(&*curr) {
967                    *curr = new;
968                }
969            }
970        });
971
972        let mut core = HydroNode::Reduce {
973            f: wrapped.into(),
974            input: Box::new(self.ir_node.into_inner()),
975            metadata: self.location.new_node_metadata::<T>(),
976        };
977
978        if L::is_top_level() {
979            core = HydroNode::Persist {
980                inner: Box::new(core),
981                metadata: self.location.new_node_metadata::<T>(),
982            };
983        }
984
985        Optional::new(self.location, core)
986    }
987
988    /// Computes the minimum element in the stream as an [`Optional`], which
989    /// will be empty until the first element in the input arrives.
990    ///
991    /// # Example
992    /// ```rust
993    /// # use hydro_lang::*;
994    /// # use futures::StreamExt;
995    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
996    /// let tick = process.tick();
997    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
998    /// let batch = numbers.batch(&tick, nondet!(/** test */));
999    /// batch.min().all_ticks()
1000    /// # }, |mut stream| async move {
1001    /// // 1
1002    /// # assert_eq!(stream.next().await.unwrap(), 1);
1003    /// # }));
1004    /// ```
1005    pub fn min(self) -> Optional<T, L, B>
1006    where
1007        T: Ord,
1008    {
1009        self.reduce_commutative_idempotent(q!(|curr, new| {
1010            if new < *curr {
1011                *curr = new;
1012            }
1013        }))
1014    }
1015}
1016
1017impl<'a, T, L, B, O> Stream<T, L, B, O, ExactlyOnce>
1018where
1019    L: Location<'a>,
1020{
1021    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1022    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1023    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1024    ///
1025    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1026    ///
1027    /// # Example
1028    /// ```rust
1029    /// # use hydro_lang::*;
1030    /// # use futures::StreamExt;
1031    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1032    /// let tick = process.tick();
1033    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1034    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1035    /// batch
1036    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1037    ///     .all_ticks()
1038    /// # }, |mut stream| async move {
1039    /// // 10
1040    /// # assert_eq!(stream.next().await.unwrap(), 10);
1041    /// # }));
1042    /// ```
1043    pub fn fold_commutative<A, I, F>(
1044        self,
1045        init: impl IntoQuotedMut<'a, I, L>,
1046        comb: impl IntoQuotedMut<'a, F, L>,
1047    ) -> Singleton<A, L, B>
1048    where
1049        I: Fn() -> A + 'a,
1050        F: Fn(&mut A, T),
1051    {
1052        let nondet = nondet!(/** the combinator function is commutative */);
1053        self.assume_ordering(nondet).fold(init, comb)
1054    }
1055
1056    /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1057    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1058    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1059    /// reference, so that it can be modified in place.
1060    ///
1061    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1062    ///
1063    /// # Example
1064    /// ```rust
1065    /// # use hydro_lang::*;
1066    /// # use futures::StreamExt;
1067    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1068    /// let tick = process.tick();
1069    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1070    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1071    /// batch
1072    ///     .reduce_commutative(q!(|curr, new| *curr += new))
1073    ///     .all_ticks()
1074    /// # }, |mut stream| async move {
1075    /// // 10
1076    /// # assert_eq!(stream.next().await.unwrap(), 10);
1077    /// # }));
1078    /// ```
1079    pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1080    where
1081        F: Fn(&mut T, T) + 'a,
1082    {
1083        let nondet = nondet!(/** the combinator function is commutative */);
1084        self.assume_ordering(nondet).reduce(comb)
1085    }
1086
1087    /// Computes the number of elements in the stream as a [`Singleton`].
1088    ///
1089    /// # Example
1090    /// ```rust
1091    /// # use hydro_lang::*;
1092    /// # use futures::StreamExt;
1093    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1094    /// let tick = process.tick();
1095    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1096    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1097    /// batch.count().all_ticks()
1098    /// # }, |mut stream| async move {
1099    /// // 4
1100    /// # assert_eq!(stream.next().await.unwrap(), 4);
1101    /// # }));
1102    /// ```
1103    pub fn count(self) -> Singleton<usize, L, B> {
1104        self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
1105    }
1106}
1107
1108impl<'a, T, L, B, R> Stream<T, L, B, TotalOrder, R>
1109where
1110    L: Location<'a>,
1111{
1112    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1113    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1114    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1115    ///
1116    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1117    ///
1118    /// # Example
1119    /// ```rust
1120    /// # use hydro_lang::*;
1121    /// # use futures::StreamExt;
1122    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1123    /// let tick = process.tick();
1124    /// let bools = process.source_iter(q!(vec![false, true, false]));
1125    /// let batch = bools.batch(&tick, nondet!(/** test */));
1126    /// batch
1127    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1128    ///     .all_ticks()
1129    /// # }, |mut stream| async move {
1130    /// // true
1131    /// # assert_eq!(stream.next().await.unwrap(), true);
1132    /// # }));
1133    /// ```
1134    pub fn fold_idempotent<A, I, F>(
1135        self,
1136        init: impl IntoQuotedMut<'a, I, L>,
1137        comb: impl IntoQuotedMut<'a, F, L>,
1138    ) -> Singleton<A, L, B>
1139    where
1140        I: Fn() -> A + 'a,
1141        F: Fn(&mut A, T),
1142    {
1143        let nondet = nondet!(/** the combinator function is idempotent */);
1144        self.assume_retries(nondet).fold(init, comb)
1145    }
1146
1147    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1148    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1149    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1150    /// reference, so that it can be modified in place.
1151    ///
1152    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1153    ///
1154    /// # Example
1155    /// ```rust
1156    /// # use hydro_lang::*;
1157    /// # use futures::StreamExt;
1158    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1159    /// let tick = process.tick();
1160    /// let bools = process.source_iter(q!(vec![false, true, false]));
1161    /// let batch = bools.batch(&tick, nondet!(/** test */));
1162    /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1163    /// # }, |mut stream| async move {
1164    /// // true
1165    /// # assert_eq!(stream.next().await.unwrap(), true);
1166    /// # }));
1167    /// ```
1168    pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1169    where
1170        F: Fn(&mut T, T) + 'a,
1171    {
1172        let nondet = nondet!(/** the combinator function is idempotent */);
1173        self.assume_retries(nondet).reduce(comb)
1174    }
1175
1176    /// Computes the first element in the stream as an [`Optional`], which
1177    /// will be empty until the first element in the input arrives.
1178    ///
1179    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1180    /// re-ordering of elements may cause the first element to change.
1181    ///
1182    /// # Example
1183    /// ```rust
1184    /// # use hydro_lang::*;
1185    /// # use futures::StreamExt;
1186    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1187    /// let tick = process.tick();
1188    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1189    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1190    /// batch.first().all_ticks()
1191    /// # }, |mut stream| async move {
1192    /// // 1
1193    /// # assert_eq!(stream.next().await.unwrap(), 1);
1194    /// # }));
1195    /// ```
1196    pub fn first(self) -> Optional<T, L, B> {
1197        self.reduce_idempotent(q!(|_, _| {}))
1198    }
1199
1200    /// Computes the last element in the stream as an [`Optional`], which
1201    /// will be empty until an element in the input arrives.
1202    ///
1203    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1204    /// re-ordering of elements may cause the last element to change.
1205    ///
1206    /// # Example
1207    /// ```rust
1208    /// # use hydro_lang::*;
1209    /// # use futures::StreamExt;
1210    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1211    /// let tick = process.tick();
1212    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1213    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1214    /// batch.last().all_ticks()
1215    /// # }, |mut stream| async move {
1216    /// // 4
1217    /// # assert_eq!(stream.next().await.unwrap(), 4);
1218    /// # }));
1219    /// ```
1220    pub fn last(self) -> Optional<T, L, B> {
1221        self.reduce_idempotent(q!(|curr, new| *curr = new))
1222    }
1223}
1224
1225impl<'a, T, L, B> Stream<T, L, B, TotalOrder, ExactlyOnce>
1226where
1227    L: Location<'a>,
1228{
1229    /// Returns a stream with the current count tupled with each element in the input stream.
1230    ///
1231    /// # Example
1232    /// ```rust
1233    /// # use hydro_lang::{*, stream::ExactlyOnce};
1234    /// # use futures::StreamExt;
1235    /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1236    /// let tick = process.tick();
1237    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1238    /// numbers.enumerate()
1239    /// # }, |mut stream| async move {
1240    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1241    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1242    /// #     assert_eq!(stream.next().await.unwrap(), w);
1243    /// # }
1244    /// # }));
1245    /// ```
1246    pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1247        if L::is_top_level() {
1248            Stream::new(
1249                self.location.clone(),
1250                HydroNode::Persist {
1251                    inner: Box::new(HydroNode::Enumerate {
1252                        is_static: true,
1253                        input: Box::new(HydroNode::Unpersist {
1254                            inner: Box::new(self.ir_node.into_inner()),
1255                            metadata: self.location.new_node_metadata::<T>(),
1256                        }),
1257                        metadata: self.location.new_node_metadata::<(usize, T)>(),
1258                    }),
1259                    metadata: self.location.new_node_metadata::<(usize, T)>(),
1260                },
1261            )
1262        } else {
1263            Stream::new(
1264                self.location.clone(),
1265                HydroNode::Enumerate {
1266                    is_static: false,
1267                    input: Box::new(self.ir_node.into_inner()),
1268                    metadata: self.location.new_node_metadata::<(usize, T)>(),
1269                },
1270            )
1271        }
1272    }
1273
1274    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1275    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1276    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1277    ///
1278    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1279    /// to depend on the order of elements in the stream.
1280    ///
1281    /// # Example
1282    /// ```rust
1283    /// # use hydro_lang::*;
1284    /// # use futures::StreamExt;
1285    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1286    /// let tick = process.tick();
1287    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1288    /// let batch = words.batch(&tick, nondet!(/** test */));
1289    /// batch
1290    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1291    ///     .all_ticks()
1292    /// # }, |mut stream| async move {
1293    /// // "HELLOWORLD"
1294    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1295    /// # }));
1296    /// ```
1297    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1298        self,
1299        init: impl IntoQuotedMut<'a, I, L>,
1300        comb: impl IntoQuotedMut<'a, F, L>,
1301    ) -> Singleton<A, L, B> {
1302        let init = init.splice_fn0_ctx(&self.location).into();
1303        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1304
1305        let mut core = HydroNode::Fold {
1306            init,
1307            acc: comb,
1308            input: Box::new(self.ir_node.into_inner()),
1309            metadata: self.location.new_node_metadata::<A>(),
1310        };
1311
1312        if L::is_top_level() {
1313            // top-level (possibly unbounded) singletons are represented as
1314            // a stream which produces all values from all ticks every tick,
1315            // so Unpersist will always give the lastest aggregation
1316            core = HydroNode::Persist {
1317                inner: Box::new(core),
1318                metadata: self.location.new_node_metadata::<A>(),
1319            };
1320        }
1321
1322        Singleton::new(self.location, core)
1323    }
1324
1325    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1326        self.fold(
1327            q!(|| vec![]),
1328            q!(|acc, v| {
1329                acc.push(v);
1330            }),
1331        )
1332    }
1333
1334    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1335    /// and emitting each intermediate result.
1336    ///
1337    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1338    /// containing all intermediate accumulated values. The scan operation can also terminate early
1339    /// by returning `None`.
1340    ///
1341    /// The function takes a mutable reference to the accumulator and the current element, and returns
1342    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1343    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1344    ///
1345    /// # Examples
1346    ///
1347    /// Basic usage - running sum:
1348    /// ```rust
1349    /// # use hydro_lang::*;
1350    /// # use futures::StreamExt;
1351    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1352    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1353    ///     q!(|| 0),
1354    ///     q!(|acc, x| {
1355    ///         *acc += x;
1356    ///         Some(*acc)
1357    ///     }),
1358    /// )
1359    /// # }, |mut stream| async move {
1360    /// // Output: 1, 3, 6, 10
1361    /// # for w in vec![1, 3, 6, 10] {
1362    /// #     assert_eq!(stream.next().await.unwrap(), w);
1363    /// # }
1364    /// # }));
1365    /// ```
1366    ///
1367    /// Early termination example:
1368    /// ```rust
1369    /// # use hydro_lang::*;
1370    /// # use futures::StreamExt;
1371    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1372    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1373    ///     q!(|| 1),
1374    ///     q!(|state, x| {
1375    ///         *state = *state * x;
1376    ///         if *state > 6 {
1377    ///             None // Terminate the stream
1378    ///         } else {
1379    ///             Some(-*state)
1380    ///         }
1381    ///     }),
1382    /// )
1383    /// # }, |mut stream| async move {
1384    /// // Output: -1, -2, -6
1385    /// # for w in vec![-1, -2, -6] {
1386    /// #     assert_eq!(stream.next().await.unwrap(), w);
1387    /// # }
1388    /// # }));
1389    /// ```
1390    pub fn scan<A, U, I, F>(
1391        self,
1392        init: impl IntoQuotedMut<'a, I, L>,
1393        f: impl IntoQuotedMut<'a, F, L>,
1394    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1395    where
1396        I: Fn() -> A + 'a,
1397        F: Fn(&mut A, T) -> Option<U> + 'a,
1398    {
1399        let init = init.splice_fn0_ctx(&self.location).into();
1400        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1401
1402        if L::is_top_level() {
1403            Stream::new(
1404                self.location.clone(),
1405                HydroNode::Persist {
1406                    inner: Box::new(HydroNode::Scan {
1407                        init,
1408                        acc: f,
1409                        input: Box::new(HydroNode::Unpersist {
1410                            inner: Box::new(self.ir_node.into_inner()),
1411                            metadata: self.location.new_node_metadata::<U>(),
1412                        }),
1413                        metadata: self.location.new_node_metadata::<U>(),
1414                    }),
1415                    metadata: self.location.new_node_metadata::<U>(),
1416                },
1417            )
1418        } else {
1419            Stream::new(
1420                self.location.clone(),
1421                HydroNode::Scan {
1422                    init,
1423                    acc: f,
1424                    input: Box::new(self.ir_node.into_inner()),
1425                    metadata: self.location.new_node_metadata::<U>(),
1426                },
1427            )
1428        }
1429    }
1430
1431    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1432    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1433    /// until the first element in the input arrives.
1434    ///
1435    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1436    /// to depend on the order of elements in the stream.
1437    ///
1438    /// # Example
1439    /// ```rust
1440    /// # use hydro_lang::*;
1441    /// # use futures::StreamExt;
1442    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1443    /// let tick = process.tick();
1444    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1445    /// let batch = words.batch(&tick, nondet!(/** test */));
1446    /// batch
1447    ///     .map(q!(|x| x.to_string()))
1448    ///     .reduce(q!(|curr, new| curr.push_str(&new)))
1449    ///     .all_ticks()
1450    /// # }, |mut stream| async move {
1451    /// // "HELLOWORLD"
1452    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1453    /// # }));
1454    /// ```
1455    pub fn reduce<F: Fn(&mut T, T) + 'a>(
1456        self,
1457        comb: impl IntoQuotedMut<'a, F, L>,
1458    ) -> Optional<T, L, B> {
1459        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1460        let mut core = HydroNode::Reduce {
1461            f,
1462            input: Box::new(self.ir_node.into_inner()),
1463            metadata: self.location.new_node_metadata::<T>(),
1464        };
1465
1466        if L::is_top_level() {
1467            core = HydroNode::Persist {
1468                inner: Box::new(core),
1469                metadata: self.location.new_node_metadata::<T>(),
1470            };
1471        }
1472
1473        Optional::new(self.location, core)
1474    }
1475}
1476
1477impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O, R> Stream<T, L, Unbounded, O, R> {
1478    /// Produces a new stream that interleaves the elements of the two input streams.
1479    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1480    ///
1481    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1482    /// [`Bounded`], you can use [`Stream::chain`] instead.
1483    ///
1484    /// # Example
1485    /// ```rust
1486    /// # use hydro_lang::*;
1487    /// # use futures::StreamExt;
1488    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1489    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1490    /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1491    /// # }, |mut stream| async move {
1492    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1493    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1494    /// #     assert_eq!(stream.next().await.unwrap(), w);
1495    /// # }
1496    /// # }));
1497    /// ```
1498    pub fn interleave<O2, R2: MinRetries<R>>(
1499        self,
1500        other: Stream<T, L, Unbounded, O2, R2>,
1501    ) -> Stream<T, L, Unbounded, NoOrder, R::Min>
1502    where
1503        R: MinRetries<R2, Min = R2::Min>,
1504    {
1505        let tick = self.location.tick();
1506        // Because the outputs are unordered, we can interleave batches from both streams.
1507        let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1508        self.batch(&tick, nondet_batch_interleaving)
1509            .weakest_ordering()
1510            .weaken_retries::<R2>()
1511            .chain(
1512                other
1513                    .batch(&tick, nondet_batch_interleaving)
1514                    .weakest_ordering()
1515                    .weaken_retries::<R>(),
1516            )
1517            .all_ticks()
1518    }
1519}
1520
1521impl<'a, T, L, O, R> Stream<T, L, Bounded, O, R>
1522where
1523    L: Location<'a>,
1524{
1525    /// Produces a new stream that emits the input elements in sorted order.
1526    ///
1527    /// The input stream can have any ordering guarantee, but the output stream
1528    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1529    /// elements in the input stream are available, so it requires the input stream
1530    /// to be [`Bounded`].
1531    ///
1532    /// # Example
1533    /// ```rust
1534    /// # use hydro_lang::*;
1535    /// # use futures::StreamExt;
1536    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1537    /// let tick = process.tick();
1538    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1539    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1540    /// batch.sort().all_ticks()
1541    /// # }, |mut stream| async move {
1542    /// // 1, 2, 3, 4
1543    /// # for w in (1..5) {
1544    /// #     assert_eq!(stream.next().await.unwrap(), w);
1545    /// # }
1546    /// # }));
1547    /// ```
1548    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1549    where
1550        T: Ord,
1551    {
1552        Stream::new(
1553            self.location.clone(),
1554            HydroNode::Sort {
1555                input: Box::new(self.ir_node.into_inner()),
1556                metadata: self.location.new_node_metadata::<T>(),
1557            },
1558        )
1559    }
1560
1561    /// Produces a new stream that first emits the elements of the `self` stream,
1562    /// and then emits the elements of the `other` stream. The output stream has
1563    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1564    /// [`TotalOrder`] guarantee.
1565    ///
1566    /// Currently, both input streams must be [`Bounded`]. This operator will block
1567    /// on the first stream until all its elements are available. In a future version,
1568    /// we will relax the requirement on the `other` stream.
1569    ///
1570    /// # Example
1571    /// ```rust
1572    /// # use hydro_lang::*;
1573    /// # use futures::StreamExt;
1574    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1575    /// let tick = process.tick();
1576    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1577    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1578    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1579    /// # }, |mut stream| async move {
1580    /// // 2, 3, 4, 5, 1, 2, 3, 4
1581    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1582    /// #     assert_eq!(stream.next().await.unwrap(), w);
1583    /// # }
1584    /// # }));
1585    /// ```
1586    pub fn chain<O2>(self, other: Stream<T, L, Bounded, O2, R>) -> Stream<T, L, Bounded, O::Min, R>
1587    where
1588        O: MinOrder<O2>,
1589    {
1590        check_matching_location(&self.location, &other.location);
1591
1592        Stream::new(
1593            self.location.clone(),
1594            HydroNode::Chain {
1595                first: Box::new(self.ir_node.into_inner()),
1596                second: Box::new(other.ir_node.into_inner()),
1597                metadata: self.location.new_node_metadata::<T>(),
1598            },
1599        )
1600    }
1601
1602    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1603    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1604    /// because this is compiled into a nested loop.
1605    pub fn cross_product_nested_loop<T2, O2>(
1606        self,
1607        other: Stream<T2, L, Bounded, O2, R>,
1608    ) -> Stream<(T, T2), L, Bounded, O::Min, R>
1609    where
1610        T: Clone,
1611        T2: Clone,
1612        O: MinOrder<O2>,
1613    {
1614        check_matching_location(&self.location, &other.location);
1615
1616        Stream::new(
1617            self.location.clone(),
1618            HydroNode::CrossProduct {
1619                left: Box::new(self.ir_node.into_inner()),
1620                right: Box::new(other.ir_node.into_inner()),
1621                metadata: self.location.new_node_metadata::<(T, T2)>(),
1622            },
1623        )
1624    }
1625}
1626
1627impl<'a, K, V1, L, B, O, R> Stream<(K, V1), L, B, O, R>
1628where
1629    L: Location<'a>,
1630{
1631    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1632    /// by equi-joining the two streams on the key attribute `K`.
1633    ///
1634    /// # Example
1635    /// ```rust
1636    /// # use hydro_lang::*;
1637    /// # use std::collections::HashSet;
1638    /// # use futures::StreamExt;
1639    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1640    /// let tick = process.tick();
1641    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1642    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1643    /// stream1.join(stream2)
1644    /// # }, |mut stream| async move {
1645    /// // (1, ('a', 'x')), (2, ('b', 'y'))
1646    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1647    /// # stream.map(|i| assert!(expected.contains(&i)));
1648    /// # }));
1649    pub fn join<V2, O2>(
1650        self,
1651        n: Stream<(K, V2), L, B, O2, R>,
1652    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, R>
1653    where
1654        K: Eq + Hash,
1655    {
1656        check_matching_location(&self.location, &n.location);
1657
1658        Stream::new(
1659            self.location.clone(),
1660            HydroNode::Join {
1661                left: Box::new(self.ir_node.into_inner()),
1662                right: Box::new(n.ir_node.into_inner()),
1663                metadata: self.location.new_node_metadata::<(K, (V1, V2))>(),
1664            },
1665        )
1666    }
1667
1668    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1669    /// computes the anti-join of the items in the input -- i.e. returns
1670    /// unique items in the first input that do not have a matching key
1671    /// in the second input.
1672    ///
1673    /// # Example
1674    /// ```rust
1675    /// # use hydro_lang::*;
1676    /// # use futures::StreamExt;
1677    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1678    /// let tick = process.tick();
1679    /// let stream = process
1680    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1681    ///   .batch(&tick, nondet!(/** test */));
1682    /// let batch = process
1683    ///   .source_iter(q!(vec![1, 2]))
1684    ///   .batch(&tick, nondet!(/** test */));
1685    /// stream.anti_join(batch).all_ticks()
1686    /// # }, |mut stream| async move {
1687    /// # for w in vec![(3, 'c'), (4, 'd')] {
1688    /// #     assert_eq!(stream.next().await.unwrap(), w);
1689    /// # }
1690    /// # }));
1691    pub fn anti_join<O2, R2>(self, n: Stream<K, L, Bounded, O2, R2>) -> Stream<(K, V1), L, B, O, R>
1692    where
1693        K: Eq + Hash,
1694    {
1695        check_matching_location(&self.location, &n.location);
1696
1697        Stream::new(
1698            self.location.clone(),
1699            HydroNode::AntiJoin {
1700                pos: Box::new(self.ir_node.into_inner()),
1701                neg: Box::new(n.ir_node.into_inner()),
1702                metadata: self.location.new_node_metadata::<(K, V1)>(),
1703            },
1704        )
1705    }
1706}
1707
1708impl<'a, K, V, L: Location<'a>, B, O, R> Stream<(K, V), L, B, O, R> {
1709    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
1710        KeyedStream {
1711            underlying: self.weakest_ordering(),
1712            _phantom_order: Default::default(),
1713        }
1714    }
1715}
1716
1717impl<'a, K, V, L, B> Stream<(K, V), L, B, TotalOrder, ExactlyOnce>
1718where
1719    K: Eq + Hash,
1720    L: Location<'a>,
1721{
1722    /// A special case of [`Stream::scan`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1723    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1724    /// in the second element are transformed via the `f` combinator.
1725    ///
1726    /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
1727    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1728    /// early by returning `None`.
1729    ///
1730    /// The function takes a mutable reference to the accumulator and the current element, and returns
1731    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1732    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1733    ///
1734    /// # Example
1735    /// ```rust
1736    /// # use hydro_lang::*;
1737    /// # use futures::StreamExt;
1738    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1739    /// process
1740    ///     .source_iter(q!(vec![(0, 1), (0, 2), (1, 3), (1, 4)]))
1741    ///     .scan_keyed(
1742    ///         q!(|| 0),
1743    ///         q!(|acc, x| {
1744    ///             *acc += x;
1745    ///             Some(*acc)
1746    ///         }),
1747    ///     )
1748    /// # }, |mut stream| async move {
1749    /// // Output: (0, 1), (0, 3), (1, 3), (1, 7)
1750    /// # for w in vec![(0, 1), (0, 3), (1, 3), (1, 7)] {
1751    /// #     assert_eq!(stream.next().await.unwrap(), w);
1752    /// # }
1753    /// # }));
1754    /// ```
1755    pub fn scan_keyed<A, U, I, F>(
1756        self,
1757        init: impl IntoQuotedMut<'a, I, L> + Copy,
1758        f: impl IntoQuotedMut<'a, F, L> + Copy,
1759    ) -> Stream<(K, U), L, B, TotalOrder, ExactlyOnce>
1760    where
1761        K: Clone,
1762        I: Fn() -> A + 'a,
1763        F: Fn(&mut A, V) -> Option<U> + 'a,
1764    {
1765        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1766        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1767        self.scan(
1768            q!(|| HashMap::new()),
1769            q!(move |acc, (k, v)| {
1770                let existing_state = acc.entry(k.clone()).or_insert_with(&init);
1771                if let Some(out) = f(existing_state, v) {
1772                    Some(Some((k, out)))
1773                } else {
1774                    acc.remove(&k);
1775                    Some(None)
1776                }
1777            }),
1778        )
1779        .flatten_ordered()
1780    }
1781
1782    /// Like [`Stream::fold_keyed`], in the spirit of SQL's GROUP BY and aggregation constructs. But the aggregation
1783    /// function returns a boolean, which when true indicates that the aggregated result is complete and can be
1784    /// released to downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input stream
1785    /// is [`Unbounded`], the outputs of the fold can be processed like normal stream elements.
1786    ///
1787    /// # Example
1788    /// ```rust
1789    /// # use hydro_lang::*;
1790    /// # use futures::StreamExt;
1791    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1792    /// process
1793    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1794    ///     .fold_keyed_early_stop(
1795    ///         q!(|| 0),
1796    ///         q!(|acc, x| {
1797    ///             *acc += x;
1798    ///             x % 2 == 0
1799    ///         }),
1800    ///     )
1801    /// # }, |mut stream| async move {
1802    /// // Output: (0, 2), (1, 9)
1803    /// # for w in vec![(0, 2), (1, 9)] {
1804    /// #     assert_eq!(stream.next().await.unwrap(), w);
1805    /// # }
1806    /// # }));
1807    /// ```
1808    pub fn fold_keyed_early_stop<A, I, F>(
1809        self,
1810        init: impl IntoQuotedMut<'a, I, L> + Copy,
1811        f: impl IntoQuotedMut<'a, F, L> + Copy,
1812    ) -> Stream<(K, A), L, B, TotalOrder, ExactlyOnce>
1813    where
1814        K: Clone,
1815        I: Fn() -> A + 'a,
1816        F: Fn(&mut A, V) -> bool + 'a,
1817    {
1818        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1819        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1820        self.scan(
1821            q!(|| HashMap::new()),
1822            q!(move |acc, (k, v)| {
1823                let existing_state = acc.entry(k.clone()).or_insert_with(&init);
1824                if f(existing_state, v) {
1825                    let out = acc.remove(&k).unwrap();
1826                    Some(Some((k, out)))
1827                } else {
1828                    Some(None)
1829                }
1830            }),
1831        )
1832        .flatten_ordered()
1833    }
1834}
1835
1836impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1837where
1838    K: Eq + Hash,
1839    L: Location<'a>,
1840{
1841    #[deprecated = "use .into_keyed().fold(...) instead"]
1842    /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1843    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1844    /// in the second element are accumulated via the `comb` closure.
1845    ///
1846    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1847    /// to depend on the order of elements in the stream.
1848    ///
1849    /// If the input and output value types are the same and do not require initialization then use
1850    /// [`Stream::reduce_keyed`].
1851    ///
1852    /// # Example
1853    /// ```rust
1854    /// # use hydro_lang::*;
1855    /// # use futures::StreamExt;
1856    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1857    /// let tick = process.tick();
1858    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1859    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1860    /// batch
1861    ///     .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1862    ///     .all_ticks()
1863    /// # }, |mut stream| async move {
1864    /// // (1, 5), (2, 7)
1865    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1866    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1867    /// # }));
1868    /// ```
1869    pub fn fold_keyed<A, I, F>(
1870        self,
1871        init: impl IntoQuotedMut<'a, I, Tick<L>>,
1872        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1873    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1874    where
1875        I: Fn() -> A + 'a,
1876        F: Fn(&mut A, V) + 'a,
1877    {
1878        self.into_keyed().fold(init, comb).entries()
1879    }
1880
1881    #[deprecated = "use .into_keyed().reduce(...) instead"]
1882    /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1883    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1884    /// in the second element are accumulated via the `comb` closure.
1885    ///
1886    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1887    /// to depend on the order of elements in the stream.
1888    ///
1889    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
1890    ///
1891    /// # Example
1892    /// ```rust
1893    /// # use hydro_lang::*;
1894    /// # use futures::StreamExt;
1895    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1896    /// let tick = process.tick();
1897    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1898    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1899    /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
1900    /// # }, |mut stream| async move {
1901    /// // (1, 5), (2, 7)
1902    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1903    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1904    /// # }));
1905    /// ```
1906    pub fn reduce_keyed<F>(
1907        self,
1908        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1909    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1910    where
1911        F: Fn(&mut V, V) + 'a,
1912    {
1913        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1914
1915        Stream::new(
1916            self.location.clone(),
1917            HydroNode::ReduceKeyed {
1918                f,
1919                input: Box::new(self.ir_node.into_inner()),
1920                metadata: self.location.new_node_metadata::<(K, V)>(),
1921            },
1922        )
1923    }
1924}
1925
1926impl<'a, K, V, L, O, R> Stream<(K, V), Tick<L>, Bounded, O, R>
1927where
1928    K: Eq + Hash,
1929    L: Location<'a>,
1930{
1931    #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
1932    /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
1933    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
1934    /// in the second element are accumulated via the `comb` closure.
1935    ///
1936    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1937    /// as there may be non-deterministic duplicates.
1938    ///
1939    /// If the input and output value types are the same and do not require initialization then use
1940    /// [`Stream::reduce_keyed_commutative_idempotent`].
1941    ///
1942    /// # Example
1943    /// ```rust
1944    /// # use hydro_lang::*;
1945    /// # use futures::StreamExt;
1946    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1947    /// let tick = process.tick();
1948    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
1949    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1950    /// batch
1951    ///     .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1952    ///     .all_ticks()
1953    /// # }, |mut stream| async move {
1954    /// // (1, false), (2, true)
1955    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1956    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1957    /// # }));
1958    /// ```
1959    pub fn fold_keyed_commutative_idempotent<A, I, F>(
1960        self,
1961        init: impl IntoQuotedMut<'a, I, Tick<L>>,
1962        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1963    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1964    where
1965        I: Fn() -> A + 'a,
1966        F: Fn(&mut A, V) + 'a,
1967    {
1968        self.into_keyed()
1969            .fold_commutative_idempotent(init, comb)
1970            .entries()
1971    }
1972
1973    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
1974    /// # Example
1975    /// ```rust
1976    /// # use hydro_lang::*;
1977    /// # use futures::StreamExt;
1978    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1979    /// let tick = process.tick();
1980    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1981    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1982    /// batch.keys().all_ticks()
1983    /// # }, |mut stream| async move {
1984    /// // 1, 2
1985    /// # assert_eq!(stream.next().await.unwrap(), 1);
1986    /// # assert_eq!(stream.next().await.unwrap(), 2);
1987    /// # }));
1988    /// ```
1989    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
1990        self.into_keyed()
1991            .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
1992            .keys()
1993    }
1994
1995    #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
1996    /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
1997    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
1998    /// in the second element are accumulated via the `comb` closure.
1999    ///
2000    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2001    /// as there may be non-deterministic duplicates.
2002    ///
2003    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2004    ///
2005    /// # Example
2006    /// ```rust
2007    /// # use hydro_lang::*;
2008    /// # use futures::StreamExt;
2009    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2010    /// let tick = process.tick();
2011    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2012    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2013    /// batch
2014    ///     .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2015    ///     .all_ticks()
2016    /// # }, |mut stream| async move {
2017    /// // (1, false), (2, true)
2018    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2019    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2020    /// # }));
2021    /// ```
2022    pub fn reduce_keyed_commutative_idempotent<F>(
2023        self,
2024        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2025    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2026    where
2027        F: Fn(&mut V, V) + 'a,
2028    {
2029        self.into_keyed()
2030            .reduce_commutative_idempotent(comb)
2031            .entries()
2032    }
2033}
2034
2035impl<'a, K, V, L, O> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2036where
2037    K: Eq + Hash,
2038    L: Location<'a>,
2039{
2040    #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2041    /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2042    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2043    /// in the second element are accumulated via the `comb` closure.
2044    ///
2045    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2046    ///
2047    /// If the input and output value types are the same and do not require initialization then use
2048    /// [`Stream::reduce_keyed_commutative`].
2049    ///
2050    /// # Example
2051    /// ```rust
2052    /// # use hydro_lang::*;
2053    /// # use futures::StreamExt;
2054    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2055    /// let tick = process.tick();
2056    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2057    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2058    /// batch
2059    ///     .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2060    ///     .all_ticks()
2061    /// # }, |mut stream| async move {
2062    /// // (1, 5), (2, 7)
2063    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2064    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2065    /// # }));
2066    /// ```
2067    pub fn fold_keyed_commutative<A, I, F>(
2068        self,
2069        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2070        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2071    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2072    where
2073        I: Fn() -> A + 'a,
2074        F: Fn(&mut A, V) + 'a,
2075    {
2076        self.into_keyed().fold_commutative(init, comb).entries()
2077    }
2078
2079    #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2080    /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2081    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2082    /// in the second element are accumulated via the `comb` closure.
2083    ///
2084    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2085    ///
2086    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2087    ///
2088    /// # Example
2089    /// ```rust
2090    /// # use hydro_lang::*;
2091    /// # use futures::StreamExt;
2092    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2093    /// let tick = process.tick();
2094    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2095    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2096    /// batch
2097    ///     .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2098    ///     .all_ticks()
2099    /// # }, |mut stream| async move {
2100    /// // (1, 5), (2, 7)
2101    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2102    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2103    /// # }));
2104    /// ```
2105    pub fn reduce_keyed_commutative<F>(
2106        self,
2107        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2108    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2109    where
2110        F: Fn(&mut V, V) + 'a,
2111    {
2112        self.into_keyed().reduce_commutative(comb).entries()
2113    }
2114}
2115
2116impl<'a, K, V, L, R> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2117where
2118    K: Eq + Hash,
2119    L: Location<'a>,
2120{
2121    #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2122    /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2123    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2124    /// in the second element are accumulated via the `comb` closure.
2125    ///
2126    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2127    ///
2128    /// If the input and output value types are the same and do not require initialization then use
2129    /// [`Stream::reduce_keyed_idempotent`].
2130    ///
2131    /// # Example
2132    /// ```rust
2133    /// # use hydro_lang::*;
2134    /// # use futures::StreamExt;
2135    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2136    /// let tick = process.tick();
2137    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2138    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2139    /// batch
2140    ///     .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2141    ///     .all_ticks()
2142    /// # }, |mut stream| async move {
2143    /// // (1, false), (2, true)
2144    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2145    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2146    /// # }));
2147    /// ```
2148    pub fn fold_keyed_idempotent<A, I, F>(
2149        self,
2150        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2151        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2152    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2153    where
2154        I: Fn() -> A + 'a,
2155        F: Fn(&mut A, V) + 'a,
2156    {
2157        self.into_keyed().fold_idempotent(init, comb).entries()
2158    }
2159
2160    #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2161    /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2162    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2163    /// in the second element are accumulated via the `comb` closure.
2164    ///
2165    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2166    ///
2167    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2168    ///
2169    /// # Example
2170    /// ```rust
2171    /// # use hydro_lang::*;
2172    /// # use futures::StreamExt;
2173    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2174    /// let tick = process.tick();
2175    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2176    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2177    /// batch
2178    ///     .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2179    ///     .all_ticks()
2180    /// # }, |mut stream| async move {
2181    /// // (1, false), (2, true)
2182    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2183    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2184    /// # }));
2185    /// ```
2186    pub fn reduce_keyed_idempotent<F>(
2187        self,
2188        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2189    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2190    where
2191        F: Fn(&mut V, V) + 'a,
2192    {
2193        self.into_keyed().reduce_idempotent(comb).entries()
2194    }
2195}
2196
2197impl<'a, T, L, B, O, R> Stream<T, Atomic<L>, B, O, R>
2198where
2199    L: Location<'a> + NoTick,
2200{
2201    /// Returns a stream corresponding to the latest batch of elements being atomically
2202    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2203    /// the order of the input.
2204    ///
2205    /// # Non-Determinism
2206    /// The batch boundaries are non-deterministic and may change across executions.
2207    pub fn batch(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2208        Stream::new(
2209            self.location.clone().tick,
2210            HydroNode::Unpersist {
2211                inner: Box::new(self.ir_node.into_inner()),
2212                metadata: self.location.new_node_metadata::<T>(),
2213            },
2214        )
2215    }
2216
2217    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2218        Stream::new(self.location.tick.l, self.ir_node.into_inner())
2219    }
2220
2221    pub fn atomic_source(&self) -> Tick<L> {
2222        self.location.tick.clone()
2223    }
2224}
2225
2226impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
2227where
2228    L: Location<'a> + NoTick + NoAtomic,
2229{
2230    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2231        Stream::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
2232    }
2233
2234    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2235    /// Future outputs are produced as available, regardless of input arrival order.
2236    ///
2237    /// # Example
2238    /// ```rust
2239    /// # use std::collections::HashSet;
2240    /// # use futures::StreamExt;
2241    /// # use hydro_lang::*;
2242    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2243    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2244    ///     .map(q!(|x| async move {
2245    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2246    ///         x
2247    ///     }))
2248    ///     .resolve_futures()
2249    /// #   },
2250    /// #   |mut stream| async move {
2251    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2252    /// #       let mut output = HashSet::new();
2253    /// #       for _ in 1..10 {
2254    /// #           output.insert(stream.next().await.unwrap());
2255    /// #       }
2256    /// #       assert_eq!(
2257    /// #           output,
2258    /// #           HashSet::<i32>::from_iter(1..10)
2259    /// #       );
2260    /// #   },
2261    /// # ));
2262    pub fn resolve_futures<T2>(self) -> Stream<T2, L, B, NoOrder, R>
2263    where
2264        T: Future<Output = T2>,
2265    {
2266        Stream::new(
2267            self.location.clone(),
2268            HydroNode::ResolveFutures {
2269                input: Box::new(self.ir_node.into_inner()),
2270                metadata: self.location.new_node_metadata::<T2>(),
2271            },
2272        )
2273    }
2274
2275    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2276    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2277    /// the order of the input.
2278    ///
2279    /// # Non-Determinism
2280    /// The batch boundaries are non-deterministic and may change across executions.
2281    pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2282        self.atomic(tick).batch(nondet)
2283    }
2284
2285    /// Given a time interval, returns a stream corresponding to samples taken from the
2286    /// stream roughly at that interval. The output will have elements in the same order
2287    /// as the input, but with arbitrary elements skipped between samples. There is also
2288    /// no guarantee on the exact timing of the samples.
2289    ///
2290    /// # Non-Determinism
2291    /// The output stream is non-deterministic in which elements are sampled, since this
2292    /// is controlled by a clock.
2293    pub fn sample_every(
2294        self,
2295        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2296        nondet: NonDet,
2297    ) -> Stream<T, L, Unbounded, O, AtLeastOnce> {
2298        let samples = self.location.source_interval(interval, nondet);
2299
2300        let tick = self.location.tick();
2301        self.batch(&tick, nondet)
2302            .continue_if(samples.batch(&tick, nondet).first())
2303            .all_ticks()
2304            .weakest_retries()
2305    }
2306
2307    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
2308    /// stream has not emitted a value since that duration.
2309    ///
2310    /// # Non-Determinism
2311    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2312    /// samples take place, timeouts may be non-deterministically generated or missed,
2313    /// and the notification of the timeout may be delayed as well. There is also no
2314    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2315    /// detected based on when the next sample is taken.
2316    pub fn timeout(
2317        self,
2318        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2319        nondet: NonDet,
2320    ) -> Optional<(), L, Unbounded> {
2321        let tick = self.location.tick();
2322
2323        let latest_received = self.assume_retries(nondet).fold_commutative(
2324            q!(|| None),
2325            q!(|latest, _| {
2326                *latest = Some(Instant::now());
2327            }),
2328        );
2329
2330        latest_received
2331            .snapshot(&tick, nondet)
2332            .filter_map(q!(move |latest_received| {
2333                if let Some(latest_received) = latest_received {
2334                    if Instant::now().duration_since(latest_received) > duration {
2335                        Some(())
2336                    } else {
2337                        None
2338                    }
2339                } else {
2340                    Some(())
2341                }
2342            }))
2343            .latest()
2344    }
2345}
2346
2347impl<'a, F, T, L, B, O, R> Stream<F, L, B, O, R>
2348where
2349    L: Location<'a> + NoTick + NoAtomic,
2350    F: Future<Output = T>,
2351{
2352    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2353    /// Future outputs are produced in the same order as the input stream.
2354    ///
2355    /// # Example
2356    /// ```rust
2357    /// # use std::collections::HashSet;
2358    /// # use futures::StreamExt;
2359    /// # use hydro_lang::*;
2360    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2361    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2362    ///     .map(q!(|x| async move {
2363    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2364    ///         x
2365    ///     }))
2366    ///     .resolve_futures_ordered()
2367    /// #   },
2368    /// #   |mut stream| async move {
2369    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2370    /// #       let mut output = Vec::new();
2371    /// #       for _ in 1..10 {
2372    /// #           output.push(stream.next().await.unwrap());
2373    /// #       }
2374    /// #       assert_eq!(
2375    /// #           output,
2376    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2377    /// #       );
2378    /// #   },
2379    /// # ));
2380    pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2381        Stream::new(
2382            self.location.clone(),
2383            HydroNode::ResolveFuturesOrdered {
2384                input: Box::new(self.ir_node.into_inner()),
2385                metadata: self.location.new_node_metadata::<T>(),
2386            },
2387        )
2388    }
2389}
2390
2391impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
2392where
2393    L: Location<'a> + NoTick,
2394{
2395    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2396        let f = f.splice_fn1_ctx(&self.location).into();
2397        let metadata = self.location.new_node_metadata::<T>();
2398        self.location
2399            .flow_state()
2400            .borrow_mut()
2401            .leaves
2402            .as_mut()
2403            .expect(FLOW_USED_MESSAGE)
2404            .push(HydroLeaf::ForEach {
2405                input: Box::new(HydroNode::Unpersist {
2406                    inner: Box::new(self.ir_node.into_inner()),
2407                    metadata: metadata.clone(),
2408                }),
2409                f,
2410                metadata,
2411            });
2412    }
2413
2414    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2415    where
2416        S: 'a + futures::Sink<T> + Unpin,
2417    {
2418        self.location
2419            .flow_state()
2420            .borrow_mut()
2421            .leaves
2422            .as_mut()
2423            .expect(FLOW_USED_MESSAGE)
2424            .push(HydroLeaf::DestSink {
2425                sink: sink.splice_typed_ctx(&self.location).into(),
2426                input: Box::new(self.ir_node.into_inner()),
2427                metadata: self.location.new_node_metadata::<T>(),
2428            });
2429    }
2430}
2431
2432impl<'a, T, L, O, R> Stream<T, Tick<L>, Bounded, O, R>
2433where
2434    L: Location<'a>,
2435{
2436    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2437        Stream::new(
2438            self.location.outer().clone(),
2439            HydroNode::Persist {
2440                inner: Box::new(self.ir_node.into_inner()),
2441                metadata: self.location.new_node_metadata::<T>(),
2442            },
2443        )
2444    }
2445
2446    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2447        Stream::new(
2448            Atomic {
2449                tick: self.location.clone(),
2450            },
2451            HydroNode::Persist {
2452                inner: Box::new(self.ir_node.into_inner()),
2453                metadata: self.location.new_node_metadata::<T>(),
2454            },
2455        )
2456    }
2457
2458    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2459    where
2460        T: Clone,
2461    {
2462        Stream::new(
2463            self.location.clone(),
2464            HydroNode::Persist {
2465                inner: Box::new(self.ir_node.into_inner()),
2466                metadata: self.location.new_node_metadata::<T>(),
2467            },
2468        )
2469    }
2470
2471    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2472        Stream::new(
2473            self.location.clone(),
2474            HydroNode::DeferTick {
2475                input: Box::new(self.ir_node.into_inner()),
2476                metadata: self.location.new_node_metadata::<T>(),
2477            },
2478        )
2479    }
2480
2481    pub fn delta(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2482        Stream::new(
2483            self.location.clone(),
2484            HydroNode::Delta {
2485                inner: Box::new(self.ir_node.into_inner()),
2486                metadata: self.location.new_node_metadata::<T>(),
2487            },
2488        )
2489    }
2490}
2491
2492#[cfg(test)]
2493mod tests {
2494    use futures::StreamExt;
2495    use hydro_deploy::Deployment;
2496    use serde::{Deserialize, Serialize};
2497    use stageleft::q;
2498
2499    use crate::FlowBuilder;
2500    use crate::location::Location;
2501
2502    struct P1 {}
2503    struct P2 {}
2504
2505    #[derive(Serialize, Deserialize, Debug)]
2506    struct SendOverNetwork {
2507        n: u32,
2508    }
2509
2510    #[tokio::test]
2511    async fn first_ten_distributed() {
2512        let mut deployment = Deployment::new();
2513
2514        let flow = FlowBuilder::new();
2515        let first_node = flow.process::<P1>();
2516        let second_node = flow.process::<P2>();
2517        let external = flow.external::<P2>();
2518
2519        let numbers = first_node.source_iter(q!(0..10));
2520        let out_port = numbers
2521            .map(q!(|n| SendOverNetwork { n }))
2522            .send_bincode(&second_node)
2523            .send_bincode_external(&external);
2524
2525        let nodes = flow
2526            .with_process(&first_node, deployment.Localhost())
2527            .with_process(&second_node, deployment.Localhost())
2528            .with_external(&external, deployment.Localhost())
2529            .deploy(&mut deployment);
2530
2531        deployment.deploy().await.unwrap();
2532
2533        let mut external_out = nodes.connect_source_bincode(out_port).await;
2534
2535        deployment.start().await.unwrap();
2536
2537        for i in 0..10 {
2538            assert_eq!(external_out.next().await.unwrap().n, i);
2539        }
2540    }
2541
2542    #[tokio::test]
2543    async fn first_cardinality() {
2544        let mut deployment = Deployment::new();
2545
2546        let flow = FlowBuilder::new();
2547        let node = flow.process::<()>();
2548        let external = flow.external::<()>();
2549
2550        let node_tick = node.tick();
2551        let count = node_tick
2552            .singleton(q!([1, 2, 3]))
2553            .into_stream()
2554            .flatten_ordered()
2555            .first()
2556            .into_stream()
2557            .count()
2558            .all_ticks()
2559            .send_bincode_external(&external);
2560
2561        let nodes = flow
2562            .with_process(&node, deployment.Localhost())
2563            .with_external(&external, deployment.Localhost())
2564            .deploy(&mut deployment);
2565
2566        deployment.deploy().await.unwrap();
2567
2568        let mut external_out = nodes.connect_source_bincode(count).await;
2569
2570        deployment.start().await.unwrap();
2571
2572        assert_eq!(external_out.next().await.unwrap(), 1);
2573    }
2574}