hydro_lang/stream/
networking.rs

1use std::marker::PhantomData;
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6use syn::parse_quote;
7
8use crate::ir::{DebugInstantiate, HydroLeaf, HydroNode};
9use crate::keyed_singleton::KeyedSingleton;
10use crate::keyed_stream::KeyedStream;
11use crate::location::external_process::ExternalBincodeStream;
12use crate::location::tick::NoAtomic;
13use crate::location::{MembershipEvent, NoTick};
14use crate::staging_util::get_this_crate;
15use crate::stream::ExactlyOnce;
16use crate::{
17    Cluster, External, Location, MemberId, NonDet, Process, Stream, TotalOrder, Unbounded, nondet,
18};
19
20// same as the one in `hydro_std`, but internal use only
21fn track_membership<'a, C, L: Location<'a> + NoTick + NoAtomic>(
22    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
23) -> KeyedSingleton<MemberId<C>, (), L, Unbounded> {
24    membership
25        .fold(
26            q!(|| false),
27            q!(|present, event| {
28                match event {
29                    MembershipEvent::Joined => *present = true,
30                    MembershipEvent::Left => *present = false,
31                }
32            }),
33        )
34        .filter_map(q!(|v| if v { Some(()) } else { None }))
35}
36
37pub fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
38    let root = get_this_crate();
39
40    if is_demux {
41        parse_quote! {
42            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::MemberId<_>, #t_type), _>(
43                |(id, data)| {
44                    (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
45                }
46            )
47        }
48    } else {
49        parse_quote! {
50            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
51                |data| {
52                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
53                }
54            )
55        }
56    }
57}
58
59fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
60    serialize_bincode_with_type(is_demux, &quote_type::<T>())
61}
62
63pub fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
64    let root = get_this_crate();
65
66    if let Some(c_type) = tagged {
67        parse_quote! {
68            |res| {
69                let (id, b) = res.unwrap();
70                (#root::MemberId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
71            }
72        }
73    } else {
74        parse_quote! {
75            |res| {
76                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
77            }
78        }
79    }
80}
81
82pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
83    deserialize_bincode_with_type(tagged, &quote_type::<T>())
84}
85
86impl<'a, T, L, B, O, R> Stream<T, Cluster<'a, L>, B, O, R> {
87    pub fn send_bincode<L2>(
88        self,
89        other: &Process<'a, L2>,
90    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
91    where
92        T: Serialize + DeserializeOwned,
93    {
94        let serialize_pipeline = Some(serialize_bincode::<T>(false));
95
96        let deserialize_pipeline = Some(deserialize_bincode::<T>(Some(&quote_type::<L>())));
97
98        let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
99            other.clone(),
100            HydroNode::Network {
101                serialize_fn: serialize_pipeline.map(|e| e.into()),
102                instantiate_fn: DebugInstantiate::Building,
103                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
104                input: Box::new(self.ir_node.into_inner()),
105                metadata: other.new_node_metadata::<(MemberId<L>, T)>(),
106            },
107        );
108
109        raw_stream.into_keyed()
110    }
111
112    pub fn broadcast_bincode<L2: 'a>(
113        self,
114        other: &Cluster<'a, L2>,
115        nondet_membership: NonDet,
116    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
117    where
118        T: Clone + Serialize + DeserializeOwned,
119    {
120        let ids = track_membership(self.location.source_cluster_members(other));
121        let join_tick = self.location.tick();
122        let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
123
124        current_members
125            .weaker_retries()
126            .assume_ordering::<TotalOrder>(
127                nondet!(/** we send to each member independently, order does not matter */),
128            )
129            .cross_product_nested_loop(
130                self.batch(&join_tick, nondet_membership)
131                    .assume_ordering::<TotalOrder>(
132                        nondet!(/** we weaken the ordering back later */),
133                    ),
134            )
135            .assume_ordering::<O>(nondet!(/** strictly weaker than TotalOrder */))
136            .all_ticks()
137            .demux_bincode(other)
138    }
139}
140
141impl<'a, T, L, L2, B, O, R> Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R> {
142    pub fn demux_bincode(
143        self,
144        other: &Cluster<'a, L2>,
145    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
146    where
147        T: Serialize + DeserializeOwned,
148    {
149        self.into_keyed().demux_bincode(other)
150    }
151}
152
153impl<'a, T, L, L2, B, O, R> KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R> {
154    pub fn demux_bincode(
155        self,
156        other: &Cluster<'a, L2>,
157    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
158    where
159        T: Serialize + DeserializeOwned,
160    {
161        let serialize_pipeline = Some(serialize_bincode::<T>(true));
162
163        let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
164
165        Stream::new(
166            other.clone(),
167            HydroNode::Network {
168                serialize_fn: serialize_pipeline.map(|e| e.into()),
169                instantiate_fn: DebugInstantiate::Building,
170                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
171                input: Box::new(self.underlying.ir_node.into_inner()),
172                metadata: other.new_node_metadata::<T>(),
173            },
174        )
175    }
176}
177
178impl<'a, T, L, B> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
179    pub fn round_robin_bincode<L2: 'a>(
180        self,
181        other: &Cluster<'a, L2>,
182        nondet_membership: NonDet,
183    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
184    where
185        T: Serialize + DeserializeOwned,
186    {
187        let ids = track_membership(self.location.source_cluster_members(other));
188        let join_tick = self.location.tick();
189        let current_members = ids
190            .snapshot(&join_tick, nondet_membership)
191            .keys()
192            .assume_ordering(
193                nondet!(/** safe to assume ordering because each output is independent */),
194            )
195            .collect_vec();
196
197        self.enumerate()
198            .batch(&join_tick, nondet_membership)
199            .cross_singleton(current_members)
200            .map(q!(|(data, members)| (
201                members[data.0 % members.len()],
202                data.1
203            )))
204            .all_ticks()
205            .demux_bincode(other)
206    }
207}
208
209impl<'a, T, L, L2, B, O, R> Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R> {
210    pub fn demux_bincode(
211        self,
212        other: &Cluster<'a, L2>,
213    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
214    where
215        T: Serialize + DeserializeOwned,
216    {
217        self.into_keyed().demux_bincode(other)
218    }
219}
220
221impl<'a, T, L, L2, B, O, R> KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R> {
222    pub fn demux_bincode(
223        self,
224        other: &Cluster<'a, L2>,
225    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
226    where
227        T: Serialize + DeserializeOwned,
228    {
229        let serialize_pipeline = Some(serialize_bincode::<T>(true));
230
231        let deserialize_pipeline = Some(deserialize_bincode::<T>(Some(&quote_type::<L>())));
232
233        let raw_stream: Stream<(MemberId<L>, T), Cluster<'a, L2>, Unbounded, O, R> = Stream::new(
234            other.clone(),
235            HydroNode::Network {
236                serialize_fn: serialize_pipeline.map(|e| e.into()),
237                instantiate_fn: DebugInstantiate::Building,
238                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
239                input: Box::new(self.underlying.ir_node.into_inner()),
240                metadata: other.new_node_metadata::<(MemberId<L>, T)>(),
241            },
242        );
243
244        raw_stream.into_keyed()
245    }
246}
247
248impl<'a, T, L, B, O, R> Stream<T, Process<'a, L>, B, O, R> {
249    pub fn send_bincode<L2>(
250        self,
251        other: &Process<'a, L2>,
252    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
253    where
254        T: Serialize + DeserializeOwned,
255    {
256        let serialize_pipeline = Some(serialize_bincode::<T>(false));
257
258        let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
259
260        Stream::new(
261            other.clone(),
262            HydroNode::Network {
263                serialize_fn: serialize_pipeline.map(|e| e.into()),
264                instantiate_fn: DebugInstantiate::Building,
265                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
266                input: Box::new(self.ir_node.into_inner()),
267                metadata: other.new_node_metadata::<T>(),
268            },
269        )
270    }
271
272    pub fn broadcast_bincode<L2: 'a>(
273        self,
274        other: &Cluster<'a, L2>,
275        nondet_membership: NonDet,
276    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
277    where
278        T: Clone + Serialize + DeserializeOwned,
279    {
280        let ids = track_membership(self.location.source_cluster_members(other));
281        let join_tick = self.location.tick();
282        let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
283
284        current_members
285            .weaker_retries()
286            .assume_ordering::<TotalOrder>(
287                nondet!(/** we send to each member independently, order does not matter */),
288            )
289            .cross_product_nested_loop(
290                self.batch(&join_tick, nondet_membership)
291                    .assume_ordering::<TotalOrder>(
292                        nondet!(/** we weaken the ordering back later */),
293                    ),
294            )
295            .assume_ordering::<O>(nondet!(/** strictly weaker than TotalOrder */))
296            .all_ticks()
297            .demux_bincode(other)
298    }
299
300    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T>
301    where
302        T: Serialize + DeserializeOwned,
303    {
304        let serialize_pipeline = Some(serialize_bincode::<T>(false));
305
306        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
307
308        let external_key = flow_state_borrow.next_external_out;
309        flow_state_borrow.next_external_out += 1;
310
311        let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
312
313        leaves.push(HydroLeaf::SendExternal {
314            to_external_id: other.id,
315            to_key: external_key,
316            to_many: false,
317            serialize_fn: serialize_pipeline.map(|e| e.into()),
318            instantiate_fn: DebugInstantiate::Building,
319            input: Box::new(HydroNode::Unpersist {
320                inner: Box::new(self.ir_node.into_inner()),
321                metadata: self.location.new_node_metadata::<T>(),
322            }),
323        });
324
325        ExternalBincodeStream {
326            process_id: other.id,
327            port_id: external_key,
328            _phantom: PhantomData,
329        }
330    }
331}