1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7
8use crate::builder::FLOW_USED_MESSAGE;
9use crate::cycle::{
10 CycleCollection, CycleCollectionWithInitial, CycleComplete, DeferTick, ForwardRefMarker,
11 TickCycleMarker,
12};
13use crate::ir::{HydroLeaf, HydroNode, TeeNode};
14use crate::location::tick::{Atomic, NoAtomic};
15use crate::location::{Location, LocationId, NoTick, Tick, check_matching_location};
16use crate::stream::{AtLeastOnce, ExactlyOnce};
17use crate::unsafety::NonDet;
18use crate::{Bounded, NoOrder, Optional, Stream, TotalOrder, Unbounded};
19
20pub struct Singleton<Type, Loc, Bound> {
21 pub(crate) location: Loc,
22 pub(crate) ir_node: RefCell<HydroNode>,
23
24 _phantom: PhantomData<(Type, Loc, Bound)>,
25}
26
27impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
28where
29 L: Location<'a>,
30{
31 fn from(singleton: Singleton<T, L, Bounded>) -> Self {
32 Singleton::new(singleton.location, singleton.ir_node.into_inner())
33 }
34}
35
36impl<'a, T, L> DeferTick for Singleton<T, Tick<L>, Bounded>
37where
38 L: Location<'a>,
39{
40 fn defer_tick(self) -> Self {
41 Singleton::defer_tick(self)
42 }
43}
44
45impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded>
46where
47 L: Location<'a>,
48{
49 type Location = Tick<L>;
50
51 fn create_source(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
52 Singleton::new(
53 location.clone(),
54 HydroNode::Chain {
55 first: Box::new(HydroNode::CycleSource {
56 ident,
57 metadata: location.new_node_metadata::<T>(),
58 }),
59 second: initial
60 .continue_if(location.optional_first_tick(q!(())))
61 .ir_node
62 .into_inner()
63 .into(),
64 metadata: location.new_node_metadata::<T>(),
65 },
66 )
67 }
68}
69
70impl<'a, T, L> CycleComplete<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded>
71where
72 L: Location<'a>,
73{
74 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
75 assert_eq!(
76 self.location.id(),
77 expected_location,
78 "locations do not match"
79 );
80 self.location
81 .flow_state()
82 .borrow_mut()
83 .leaves
84 .as_mut()
85 .expect(FLOW_USED_MESSAGE)
86 .push(HydroLeaf::CycleSink {
87 ident,
88 input: Box::new(self.ir_node.into_inner()),
89 metadata: self.location.new_node_metadata::<T>(),
90 });
91 }
92}
93
94impl<'a, T, L> CycleCollection<'a, ForwardRefMarker> for Singleton<T, Tick<L>, Bounded>
95where
96 L: Location<'a>,
97{
98 type Location = Tick<L>;
99
100 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
101 Singleton::new(
102 location.clone(),
103 HydroNode::CycleSource {
104 ident,
105 metadata: location.new_node_metadata::<T>(),
106 },
107 )
108 }
109}
110
111impl<'a, T, L> CycleComplete<'a, ForwardRefMarker> for Singleton<T, Tick<L>, Bounded>
112where
113 L: Location<'a>,
114{
115 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
116 assert_eq!(
117 self.location.id(),
118 expected_location,
119 "locations do not match"
120 );
121 self.location
122 .flow_state()
123 .borrow_mut()
124 .leaves
125 .as_mut()
126 .expect(FLOW_USED_MESSAGE)
127 .push(HydroLeaf::CycleSink {
128 ident,
129 input: Box::new(self.ir_node.into_inner()),
130 metadata: self.location.new_node_metadata::<T>(),
131 });
132 }
133}
134
135impl<'a, T, L, B> CycleCollection<'a, ForwardRefMarker> for Singleton<T, L, B>
136where
137 L: Location<'a> + NoTick,
138{
139 type Location = L;
140
141 fn create_source(ident: syn::Ident, location: L) -> Self {
142 Singleton::new(
143 location.clone(),
144 HydroNode::Persist {
145 inner: Box::new(HydroNode::CycleSource {
146 ident,
147 metadata: location.new_node_metadata::<T>(),
148 }),
149 metadata: location.new_node_metadata::<T>(),
150 },
151 )
152 }
153}
154
155impl<'a, T, L, B> CycleComplete<'a, ForwardRefMarker> for Singleton<T, L, B>
156where
157 L: Location<'a> + NoTick,
158{
159 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
160 assert_eq!(
161 self.location.id(),
162 expected_location,
163 "locations do not match"
164 );
165 let metadata = self.location.new_node_metadata::<T>();
166 self.location
167 .flow_state()
168 .borrow_mut()
169 .leaves
170 .as_mut()
171 .expect(FLOW_USED_MESSAGE)
172 .push(HydroLeaf::CycleSink {
173 ident,
174 input: Box::new(HydroNode::Unpersist {
175 inner: Box::new(self.ir_node.into_inner()),
176 metadata: metadata.clone(),
177 }),
178 metadata,
179 });
180 }
181}
182
183impl<'a, T, L, B> Clone for Singleton<T, L, B>
184where
185 T: Clone,
186 L: Location<'a>,
187{
188 fn clone(&self) -> Self {
189 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
190 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
191 *self.ir_node.borrow_mut() = HydroNode::Tee {
192 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
193 metadata: self.location.new_node_metadata::<T>(),
194 };
195 }
196
197 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
198 Singleton {
199 location: self.location.clone(),
200 ir_node: HydroNode::Tee {
201 inner: TeeNode(inner.0.clone()),
202 metadata: metadata.clone(),
203 }
204 .into(),
205 _phantom: PhantomData,
206 }
207 } else {
208 unreachable!()
209 }
210 }
211}
212
213impl<'a, T, L, B> Singleton<T, L, B>
214where
215 L: Location<'a>,
216{
217 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
218 Singleton {
219 location,
220 ir_node: RefCell::new(ir_node),
221 _phantom: PhantomData,
222 }
223 }
224
225 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
226 where
227 F: Fn(T) -> U + 'a,
228 {
229 let f = f.splice_fn1_ctx(&self.location).into();
230 Singleton::new(
231 self.location.clone(),
232 HydroNode::Map {
233 f,
234 input: Box::new(self.ir_node.into_inner()),
235 metadata: self.location.new_node_metadata::<U>(),
236 },
237 )
238 }
239
240 pub fn flat_map_ordered<U, I, F>(
241 self,
242 f: impl IntoQuotedMut<'a, F, L>,
243 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
244 where
245 I: IntoIterator<Item = U>,
246 F: Fn(T) -> I + 'a,
247 {
248 let f = f.splice_fn1_ctx(&self.location).into();
249 Stream::new(
250 self.location.clone(),
251 HydroNode::FlatMap {
252 f,
253 input: Box::new(self.ir_node.into_inner()),
254 metadata: self.location.new_node_metadata::<U>(),
255 },
256 )
257 }
258
259 pub fn flat_map_unordered<U, I, F>(
260 self,
261 f: impl IntoQuotedMut<'a, F, L>,
262 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
263 where
264 I: IntoIterator<Item = U>,
265 F: Fn(T) -> I + 'a,
266 {
267 let f = f.splice_fn1_ctx(&self.location).into();
268 Stream::new(
269 self.location.clone(),
270 HydroNode::FlatMap {
271 f,
272 input: Box::new(self.ir_node.into_inner()),
273 metadata: self.location.new_node_metadata::<U>(),
274 },
275 )
276 }
277
278 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
279 where
280 T: IntoIterator<Item = U>,
281 {
282 self.flat_map_ordered(q!(|x| x))
283 }
284
285 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
286 where
287 T: IntoIterator<Item = U>,
288 {
289 self.flat_map_unordered(q!(|x| x))
290 }
291
292 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
293 where
294 F: Fn(&T) -> bool + 'a,
295 {
296 let f = f.splice_fn1_borrow_ctx(&self.location).into();
297 Optional::new(
298 self.location.clone(),
299 HydroNode::Filter {
300 f,
301 input: Box::new(self.ir_node.into_inner()),
302 metadata: self.location.new_node_metadata::<T>(),
303 },
304 )
305 }
306
307 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
308 where
309 F: Fn(T) -> Option<U> + 'a,
310 {
311 let f = f.splice_fn1_ctx(&self.location).into();
312 Optional::new(
313 self.location.clone(),
314 HydroNode::FilterMap {
315 f,
316 input: Box::new(self.ir_node.into_inner()),
317 metadata: self.location.new_node_metadata::<U>(),
318 },
319 )
320 }
321
322 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
323 where
324 Self: ZipResult<'a, O, Location = L>,
325 {
326 check_matching_location(&self.location, &Self::other_location(&other));
327
328 if L::is_top_level() {
329 let left_ir_node = self.ir_node.into_inner();
330 let left_ir_node_metadata = left_ir_node.metadata().clone();
331 let right_ir_node = Self::other_ir_node(other);
332 let right_ir_node_metadata = right_ir_node.metadata().clone();
333
334 Self::make(
335 self.location.clone(),
336 HydroNode::Persist {
337 inner: Box::new(HydroNode::CrossSingleton {
338 left: Box::new(HydroNode::Unpersist {
339 inner: Box::new(left_ir_node),
340 metadata: left_ir_node_metadata,
341 }),
342 right: Box::new(HydroNode::Unpersist {
343 inner: Box::new(right_ir_node),
344 metadata: right_ir_node_metadata,
345 }),
346 metadata: self
347 .location
348 .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
349 }),
350 metadata: self
351 .location
352 .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
353 },
354 )
355 } else {
356 Self::make(
357 self.location.clone(),
358 HydroNode::CrossSingleton {
359 left: Box::new(self.ir_node.into_inner()),
360 right: Box::new(Self::other_ir_node(other)),
361 metadata: self
362 .location
363 .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
364 },
365 )
366 }
367 }
368
369 pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
370 where
371 Self: ZipResult<
372 'a,
373 Optional<(), L, Bounded>,
374 Location = L,
375 Out = Optional<(T, ()), L, Bounded>,
376 >,
377 {
378 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
379 }
380
381 pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
382 where
383 Singleton<T, L, B>: ZipResult<
384 'a,
385 Optional<(), L, Bounded>,
386 Location = L,
387 Out = Optional<(T, ()), L, Bounded>,
388 >,
389 {
390 self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
391 }
392
393 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
396 {
397 let mut node = self.ir_node.borrow_mut();
398 let metadata = node.metadata_mut();
399 metadata.tag = Some(name.to_string());
400 }
401 self
402 }
403}
404
405impl<'a, T, L, B> Singleton<T, Atomic<L>, B>
406where
407 L: Location<'a> + NoTick,
408{
409 pub fn snapshot(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
418 Singleton::new(
419 self.location.clone().tick,
420 HydroNode::Unpersist {
421 inner: Box::new(self.ir_node.into_inner()),
422 metadata: self.location.new_node_metadata::<T>(),
423 },
424 )
425 }
426
427 pub fn end_atomic(self) -> Optional<T, L, B> {
428 Optional::new(self.location.tick.l, self.ir_node.into_inner())
429 }
430}
431
432impl<'a, T, L, B> Singleton<T, L, B>
433where
434 L: Location<'a> + NoTick + NoAtomic,
435{
436 pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
437 Singleton::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
438 }
439
440 pub fn snapshot(self, tick: &Tick<L>, nondet: NonDet) -> Singleton<T, Tick<L>, Bounded>
449 where
450 L: NoTick,
451 {
452 self.atomic(tick).snapshot(nondet)
453 }
454
455 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
463 let tick = self.location.tick();
464 self.snapshot(&tick, nondet).all_ticks().weakest_retries()
465 }
466
467 pub fn sample_every(
477 self,
478 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
479 nondet: NonDet,
480 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
481 let samples = self.location.source_interval(interval, nondet);
482 let tick = self.location.tick();
483
484 self.snapshot(&tick, nondet)
485 .continue_if(samples.batch(&tick, nondet).first())
486 .all_ticks()
487 .weakest_retries()
488 }
489}
490
491impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
492where
493 L: Location<'a>,
494{
495 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
496 Stream::new(
497 self.location.outer().clone(),
498 HydroNode::Persist {
499 inner: Box::new(self.ir_node.into_inner()),
500 metadata: self.location.new_node_metadata::<T>(),
501 },
502 )
503 }
504
505 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
506 Stream::new(
507 Atomic {
508 tick: self.location.clone(),
509 },
510 HydroNode::Persist {
511 inner: Box::new(self.ir_node.into_inner()),
512 metadata: self.location.new_node_metadata::<T>(),
513 },
514 )
515 }
516
517 pub fn latest(self) -> Singleton<T, L, Unbounded> {
518 Singleton::new(
519 self.location.outer().clone(),
520 HydroNode::Persist {
521 inner: Box::new(self.ir_node.into_inner()),
522 metadata: self.location.new_node_metadata::<T>(),
523 },
524 )
525 }
526
527 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
528 Singleton::new(
529 Atomic {
530 tick: self.location.clone(),
531 },
532 HydroNode::Persist {
533 inner: Box::new(self.ir_node.into_inner()),
534 metadata: self.location.new_node_metadata::<T>(),
535 },
536 )
537 }
538
539 pub fn defer_tick(self) -> Singleton<T, Tick<L>, Bounded> {
540 Singleton::new(
541 self.location.clone(),
542 HydroNode::DeferTick {
543 input: Box::new(self.ir_node.into_inner()),
544 metadata: self.location.new_node_metadata::<T>(),
545 },
546 )
547 }
548
549 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
550 Stream::new(
551 self.location.clone(),
552 HydroNode::Persist {
553 inner: Box::new(self.ir_node.into_inner()),
554 metadata: self.location.new_node_metadata::<T>(),
555 },
556 )
557 }
558
559 pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
560 Optional::new(
561 self.location.clone(),
562 HydroNode::Delta {
563 inner: Box::new(self.ir_node.into_inner()),
564 metadata: self.location.new_node_metadata::<T>(),
565 },
566 )
567 }
568
569 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
570 Stream::new(self.location, self.ir_node.into_inner())
571 }
572}
573
574pub trait ZipResult<'a, Other> {
575 type Out;
576 type ElementType;
577 type Location;
578
579 fn other_location(other: &Other) -> Self::Location;
580 fn other_ir_node(other: Other) -> HydroNode;
581
582 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
583}
584
585impl<'a, T, U, L, B> ZipResult<'a, Singleton<U, Tick<L>, B>> for Singleton<T, Tick<L>, B>
586where
587 U: Clone,
588 L: Location<'a>,
589{
590 type Out = Singleton<(T, U), Tick<L>, B>;
591 type ElementType = (T, U);
592 type Location = Tick<L>;
593
594 fn other_location(other: &Singleton<U, Tick<L>, B>) -> Tick<L> {
595 other.location.clone()
596 }
597
598 fn other_ir_node(other: Singleton<U, Tick<L>, B>) -> HydroNode {
599 other.ir_node.into_inner()
600 }
601
602 fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
603 Singleton::new(location, ir_node)
604 }
605}
606
607impl<'a, T, U, L, B> ZipResult<'a, Optional<U, Tick<L>, B>> for Singleton<T, Tick<L>, B>
608where
609 U: Clone,
610 L: Location<'a>,
611{
612 type Out = Optional<(T, U), Tick<L>, B>;
613 type ElementType = (T, U);
614 type Location = Tick<L>;
615
616 fn other_location(other: &Optional<U, Tick<L>, B>) -> Tick<L> {
617 other.location.clone()
618 }
619
620 fn other_ir_node(other: Optional<U, Tick<L>, B>) -> HydroNode {
621 other.ir_node.into_inner()
622 }
623
624 fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
625 Optional::new(location, ir_node)
626 }
627}
628
629#[cfg(test)]
630mod tests {
631 use futures::{SinkExt, StreamExt};
632 use hydro_deploy::Deployment;
633 use stageleft::q;
634
635 use crate::*;
636
637 #[tokio::test]
638 async fn tick_cycle_cardinality() {
639 let mut deployment = Deployment::new();
640
641 let flow = FlowBuilder::new();
642 let node = flow.process::<()>();
643 let external = flow.external::<()>();
644
645 let (input_send, input) = node.source_external_bincode(&external);
646
647 let node_tick = node.tick();
648 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
649 let counts = singleton
650 .clone()
651 .into_stream()
652 .count()
653 .continue_if(input.batch(&node_tick, nondet!()).first())
654 .all_ticks()
655 .send_bincode_external(&external);
656 complete_cycle.complete_next_tick(singleton);
657
658 let nodes = flow
659 .with_process(&node, deployment.Localhost())
660 .with_external(&external, deployment.Localhost())
661 .deploy(&mut deployment);
662
663 deployment.deploy().await.unwrap();
664
665 let mut tick_trigger = nodes.connect_sink_bincode(input_send).await;
666 let mut external_out = nodes.connect_source_bincode(counts).await;
667
668 deployment.start().await.unwrap();
669
670 tick_trigger.send(()).await.unwrap();
671
672 assert_eq!(external_out.next().await.unwrap(), 1);
673
674 tick_trigger.send(()).await.unwrap();
675
676 assert_eq!(external_out.next().await.unwrap(), 1);
677 }
678}