1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7use syn::parse_quote;
8
9use crate::builder::FLOW_USED_MESSAGE;
10use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
11use crate::ir::{HydroLeaf, HydroNode, HydroSource, TeeNode};
12use crate::location::tick::{Atomic, NoAtomic};
13use crate::location::{LocationId, NoTick, check_matching_location};
14use crate::singleton::ZipResult;
15use crate::stream::{AtLeastOnce, ExactlyOnce, NoOrder};
16use crate::unsafety::NonDet;
17use crate::{Bounded, Location, Singleton, Stream, Tick, TotalOrder, Unbounded};
18
19pub struct Optional<Type, Loc, Bound> {
20 pub(crate) location: Loc,
21 pub(crate) ir_node: RefCell<HydroNode>,
22
23 _phantom: PhantomData<(Type, Loc, Bound)>,
24}
25
26impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
27where
28 L: Location<'a>,
29{
30 fn defer_tick(self) -> Self {
31 Optional::defer_tick(self)
32 }
33}
34
35impl<'a, T, L> CycleCollection<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded>
36where
37 L: Location<'a>,
38{
39 type Location = Tick<L>;
40
41 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
42 Optional::new(
43 location.clone(),
44 HydroNode::CycleSource {
45 ident,
46 metadata: location.new_node_metadata::<T>(),
47 },
48 )
49 }
50}
51
52impl<'a, T, L> CycleComplete<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded>
53where
54 L: Location<'a>,
55{
56 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
57 assert_eq!(
58 self.location.id(),
59 expected_location,
60 "locations do not match"
61 );
62 self.location
63 .flow_state()
64 .borrow_mut()
65 .leaves
66 .as_mut()
67 .expect(FLOW_USED_MESSAGE)
68 .push(HydroLeaf::CycleSink {
69 ident,
70 input: Box::new(self.ir_node.into_inner()),
71 metadata: self.location.new_node_metadata::<T>(),
72 });
73 }
74}
75
76impl<'a, T, L> CycleCollection<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded>
77where
78 L: Location<'a>,
79{
80 type Location = Tick<L>;
81
82 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
83 Optional::new(
84 location.clone(),
85 HydroNode::CycleSource {
86 ident,
87 metadata: location.new_node_metadata::<T>(),
88 },
89 )
90 }
91}
92
93impl<'a, T, L> CycleComplete<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded>
94where
95 L: Location<'a>,
96{
97 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
98 assert_eq!(
99 self.location.id(),
100 expected_location,
101 "locations do not match"
102 );
103 self.location
104 .flow_state()
105 .borrow_mut()
106 .leaves
107 .as_mut()
108 .expect(FLOW_USED_MESSAGE)
109 .push(HydroLeaf::CycleSink {
110 ident,
111 input: Box::new(self.ir_node.into_inner()),
112 metadata: self.location.new_node_metadata::<T>(),
113 });
114 }
115}
116
117impl<'a, T, L, B> CycleCollection<'a, ForwardRefMarker> for Optional<T, L, B>
118where
119 L: Location<'a> + NoTick,
120{
121 type Location = L;
122
123 fn create_source(ident: syn::Ident, location: L) -> Self {
124 Optional::new(
125 location.clone(),
126 HydroNode::Persist {
127 inner: Box::new(HydroNode::CycleSource {
128 ident,
129 metadata: location.new_node_metadata::<T>(),
130 }),
131 metadata: location.new_node_metadata::<T>(),
132 },
133 )
134 }
135}
136
137impl<'a, T, L, B> CycleComplete<'a, ForwardRefMarker> for Optional<T, L, B>
138where
139 L: Location<'a> + NoTick,
140{
141 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
142 assert_eq!(
143 self.location.id(),
144 expected_location,
145 "locations do not match"
146 );
147 let metadata = self.location.new_node_metadata::<T>();
148 self.location
149 .flow_state()
150 .borrow_mut()
151 .leaves
152 .as_mut()
153 .expect(FLOW_USED_MESSAGE)
154 .push(HydroLeaf::CycleSink {
155 ident,
156 input: Box::new(HydroNode::Unpersist {
157 inner: Box::new(self.ir_node.into_inner()),
158 metadata: metadata.clone(),
159 }),
160 metadata,
161 });
162 }
163}
164
165impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
166where
167 L: Location<'a>,
168{
169 fn from(singleton: Optional<T, L, Bounded>) -> Self {
170 Optional::new(singleton.location, singleton.ir_node.into_inner())
171 }
172}
173
174impl<'a, T, L, B> From<Singleton<T, L, B>> for Optional<T, L, B>
175where
176 L: Location<'a>,
177{
178 fn from(singleton: Singleton<T, L, B>) -> Self {
179 Optional::some(singleton)
180 }
181}
182
183impl<'a, T, L, B> Clone for Optional<T, L, B>
184where
185 T: Clone,
186 L: Location<'a>,
187{
188 fn clone(&self) -> Self {
189 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
190 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
191 *self.ir_node.borrow_mut() = HydroNode::Tee {
192 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
193 metadata: self.location.new_node_metadata::<T>(),
194 };
195 }
196
197 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
198 Optional {
199 location: self.location.clone(),
200 ir_node: HydroNode::Tee {
201 inner: TeeNode(inner.0.clone()),
202 metadata: metadata.clone(),
203 }
204 .into(),
205 _phantom: PhantomData,
206 }
207 } else {
208 unreachable!()
209 }
210 }
211}
212
213impl<'a, T, L, B> Optional<T, L, B>
214where
215 L: Location<'a>,
216{
217 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
218 Optional {
219 location,
220 ir_node: RefCell::new(ir_node),
221 _phantom: PhantomData,
222 }
223 }
224
225 pub fn some(singleton: Singleton<T, L, B>) -> Self {
226 Optional::new(singleton.location, singleton.ir_node.into_inner())
227 }
228
229 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
248 where
249 F: Fn(T) -> U + 'a,
250 {
251 let f = f.splice_fn1_ctx(&self.location).into();
252 Optional::new(
253 self.location.clone(),
254 HydroNode::Map {
255 f,
256 input: Box::new(self.ir_node.into_inner()),
257 metadata: self.location.new_node_metadata::<U>(),
258 },
259 )
260 }
261
262 pub fn flat_map_ordered<U, I, F>(
263 self,
264 f: impl IntoQuotedMut<'a, F, L>,
265 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
266 where
267 I: IntoIterator<Item = U>,
268 F: Fn(T) -> I + 'a,
269 {
270 let f = f.splice_fn1_ctx(&self.location).into();
271 Stream::new(
272 self.location.clone(),
273 HydroNode::FlatMap {
274 f,
275 input: Box::new(self.ir_node.into_inner()),
276 metadata: self.location.new_node_metadata::<U>(),
277 },
278 )
279 }
280
281 pub fn flat_map_unordered<U, I, F>(
282 self,
283 f: impl IntoQuotedMut<'a, F, L>,
284 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
285 where
286 I: IntoIterator<Item = U>,
287 F: Fn(T) -> I + 'a,
288 {
289 let f = f.splice_fn1_ctx(&self.location).into();
290 Stream::new(
291 self.location.clone(),
292 HydroNode::FlatMap {
293 f,
294 input: Box::new(self.ir_node.into_inner()),
295 metadata: self.location.new_node_metadata::<U>(),
296 },
297 )
298 }
299
300 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
301 where
302 T: IntoIterator<Item = U>,
303 {
304 self.flat_map_ordered(q!(|v| v))
305 }
306
307 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
308 where
309 T: IntoIterator<Item = U>,
310 {
311 self.flat_map_unordered(q!(|v| v))
312 }
313
314 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
315 where
316 F: Fn(&T) -> bool + 'a,
317 {
318 let f = f.splice_fn1_borrow_ctx(&self.location).into();
319 Optional::new(
320 self.location.clone(),
321 HydroNode::Filter {
322 f,
323 input: Box::new(self.ir_node.into_inner()),
324 metadata: self.location.new_node_metadata::<T>(),
325 },
326 )
327 }
328
329 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
330 where
331 F: Fn(T) -> Option<U> + 'a,
332 {
333 let f = f.splice_fn1_ctx(&self.location).into();
334 Optional::new(
335 self.location.clone(),
336 HydroNode::FilterMap {
337 f,
338 input: Box::new(self.ir_node.into_inner()),
339 metadata: self.location.new_node_metadata::<U>(),
340 },
341 )
342 }
343
344 pub fn union(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
345 check_matching_location(&self.location, &other.location);
346
347 if L::is_top_level() {
348 Optional::new(
349 self.location.clone(),
350 HydroNode::Persist {
351 inner: Box::new(HydroNode::Chain {
352 first: Box::new(HydroNode::Unpersist {
353 inner: Box::new(self.ir_node.into_inner()),
354 metadata: self.location.new_node_metadata::<T>(),
355 }),
356 second: Box::new(HydroNode::Unpersist {
357 inner: Box::new(other.ir_node.into_inner()),
358 metadata: self.location.new_node_metadata::<T>(),
359 }),
360 metadata: self.location.new_node_metadata::<T>(),
361 }),
362 metadata: self.location.new_node_metadata::<T>(),
363 },
364 )
365 } else {
366 Optional::new(
367 self.location.clone(),
368 HydroNode::Chain {
369 first: Box::new(self.ir_node.into_inner()),
370 second: Box::new(other.ir_node.into_inner()),
371 metadata: self.location.new_node_metadata::<T>(),
372 },
373 )
374 }
375 }
376
377 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
378 where
379 O: Clone,
380 {
381 let other: Optional<O, L, B> = other.into();
382 check_matching_location(&self.location, &other.location);
383
384 if L::is_top_level() {
385 Optional::new(
386 self.location.clone(),
387 HydroNode::Persist {
388 inner: Box::new(HydroNode::CrossSingleton {
389 left: Box::new(HydroNode::Unpersist {
390 inner: Box::new(self.ir_node.into_inner()),
391 metadata: self.location.new_node_metadata::<T>(),
392 }),
393 right: Box::new(HydroNode::Unpersist {
394 inner: Box::new(other.ir_node.into_inner()),
395 metadata: self.location.new_node_metadata::<O>(),
396 }),
397 metadata: self.location.new_node_metadata::<(T, O)>(),
398 }),
399 metadata: self.location.new_node_metadata::<(T, O)>(),
400 },
401 )
402 } else {
403 Optional::new(
404 self.location.clone(),
405 HydroNode::CrossSingleton {
406 left: Box::new(self.ir_node.into_inner()),
407 right: Box::new(other.ir_node.into_inner()),
408 metadata: self.location.new_node_metadata::<(T, O)>(),
409 },
410 )
411 }
412 }
413
414 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
415 check_matching_location(&self.location, &other.location);
416
417 if L::is_top_level() {
418 Singleton::new(
419 self.location.clone(),
420 HydroNode::Persist {
421 inner: Box::new(HydroNode::Chain {
422 first: Box::new(HydroNode::Unpersist {
423 inner: Box::new(self.ir_node.into_inner()),
424 metadata: self.location.new_node_metadata::<T>(),
425 }),
426 second: Box::new(HydroNode::Unpersist {
427 inner: Box::new(other.ir_node.into_inner()),
428 metadata: self.location.new_node_metadata::<T>(),
429 }),
430 metadata: self.location.new_node_metadata::<T>(),
431 }),
432 metadata: self.location.new_node_metadata::<T>(),
433 },
434 )
435 } else {
436 Singleton::new(
437 self.location.clone(),
438 HydroNode::Chain {
439 first: Box::new(self.ir_node.into_inner()),
440 second: Box::new(other.ir_node.into_inner()),
441 metadata: self.location.new_node_metadata::<T>(),
442 },
443 )
444 }
445 }
446
447 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
448 where
449 T: Clone,
450 {
451 let none: syn::Expr = parse_quote!([::std::option::Option::None]);
452 let core_ir = HydroNode::Persist {
453 inner: Box::new(HydroNode::Source {
454 source: HydroSource::Iter(none.into()),
455 metadata: self.location.root().new_node_metadata::<Option<T>>(),
456 }),
457 metadata: self.location.new_node_metadata::<Option<T>>(),
458 };
459
460 let none_singleton = if L::is_top_level() {
461 Singleton::new(
462 self.location.clone(),
463 HydroNode::Persist {
464 inner: Box::new(core_ir),
465 metadata: self.location.new_node_metadata::<Option<T>>(),
466 },
467 )
468 } else {
469 Singleton::new(self.location.clone(), core_ir)
470 };
471
472 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
473 }
474
475 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
478 {
479 let mut node = self.ir_node.borrow_mut();
480 let metadata = node.metadata_mut();
481 metadata.tag = Some(name.to_string());
482 }
483 self
484 }
485}
486
487impl<'a, T, L> Optional<T, L, Bounded>
488where
489 L: Location<'a>,
490{
491 pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
492 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
493 }
494
495 pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
496 self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
497 }
498
499 pub fn then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded>
500 where
501 Singleton<U, L, Bounded>: ZipResult<
502 'a,
503 Optional<(), L, Bounded>,
504 Location = L,
505 Out = Optional<(U, ()), L, Bounded>,
506 >,
507 {
508 value.continue_if(self)
509 }
510
511 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce> {
512 if L::is_top_level() {
513 panic!("Converting an optional to a stream is not yet supported at the top level");
514 }
515
516 Stream::new(self.location, self.ir_node.into_inner())
517 }
518}
519
520impl<'a, T, L, B> Optional<T, Atomic<L>, B>
521where
522 L: Location<'a> + NoTick,
523{
524 pub fn snapshot(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
533 Optional::new(
534 self.location.clone().tick,
535 HydroNode::Unpersist {
536 inner: Box::new(self.ir_node.into_inner()),
537 metadata: self.location.new_node_metadata::<T>(),
538 },
539 )
540 }
541
542 pub fn end_atomic(self) -> Optional<T, L, B> {
543 Optional::new(self.location.tick.l, self.ir_node.into_inner())
544 }
545}
546
547impl<'a, T, L, B> Optional<T, L, B>
548where
549 L: Location<'a> + NoTick + NoAtomic,
550{
551 pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
552 Optional::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
553 }
554
555 pub fn snapshot(self, tick: &Tick<L>, nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
564 self.atomic(tick).snapshot(nondet)
565 }
566
567 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
575 let tick = self.location.tick();
576 self.snapshot(&tick, nondet).all_ticks().weakest_retries()
577 }
578
579 pub fn sample_every(
589 self,
590 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
591 nondet: NonDet,
592 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
593 let samples = self.location.source_interval(interval, nondet);
594 let tick = self.location.tick();
595
596 self.snapshot(&tick, nondet)
597 .continue_if(samples.batch(&tick, nondet).first())
598 .all_ticks()
599 .weakest_retries()
600 }
601}
602
603impl<'a, T, L> Optional<T, Tick<L>, Bounded>
604where
605 L: Location<'a>,
606{
607 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
608 Stream::new(
609 self.location.outer().clone(),
610 HydroNode::Persist {
611 inner: Box::new(self.ir_node.into_inner()),
612 metadata: self.location.new_node_metadata::<T>(),
613 },
614 )
615 }
616
617 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
618 Stream::new(
619 Atomic {
620 tick: self.location.clone(),
621 },
622 HydroNode::Persist {
623 inner: Box::new(self.ir_node.into_inner()),
624 metadata: self.location.new_node_metadata::<T>(),
625 },
626 )
627 }
628
629 pub fn latest(self) -> Optional<T, L, Unbounded> {
630 Optional::new(
631 self.location.outer().clone(),
632 HydroNode::Persist {
633 inner: Box::new(self.ir_node.into_inner()),
634 metadata: self.location.new_node_metadata::<T>(),
635 },
636 )
637 }
638
639 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
640 Optional::new(
641 Atomic {
642 tick: self.location.clone(),
643 },
644 HydroNode::Persist {
645 inner: Box::new(self.ir_node.into_inner()),
646 metadata: self.location.new_node_metadata::<T>(),
647 },
648 )
649 }
650
651 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
652 Optional::new(
653 self.location.clone(),
654 HydroNode::DeferTick {
655 input: Box::new(self.ir_node.into_inner()),
656 metadata: self.location.new_node_metadata::<T>(),
657 },
658 )
659 }
660
661 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
662 Stream::new(
663 self.location.clone(),
664 HydroNode::Persist {
665 inner: Box::new(self.ir_node.into_inner()),
666 metadata: self.location.new_node_metadata::<T>(),
667 },
668 )
669 }
670
671 pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
672 Optional::new(
673 self.location.clone(),
674 HydroNode::Delta {
675 inner: Box::new(self.ir_node.into_inner()),
676 metadata: self.location.new_node_metadata::<T>(),
677 },
678 )
679 }
680}