1use std::collections::HashMap;
2use std::error::Error;
3use std::fmt::Write;
4
5use auto_impl::auto_impl;
6
7pub use super::graphviz::{HydroDot, escape_dot};
8pub use super::mermaid::{HydroMermaid, escape_mermaid};
10pub use super::reactflow::HydroReactFlow;
11use crate::ir::{DebugExpr, HydroLeaf, HydroNode, HydroSource};
12
13#[derive(Debug, Clone)]
15pub enum NodeLabel {
16 Static(String),
18 WithExprs {
20 op_name: String,
21 exprs: Vec<DebugExpr>,
22 },
23}
24
25impl NodeLabel {
26 pub fn static_label(s: String) -> Self {
28 Self::Static(s)
29 }
30
31 pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
33 Self::WithExprs { op_name, exprs }
34 }
35}
36
37impl std::fmt::Display for NodeLabel {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 Self::Static(s) => write!(f, "{}", s),
41 Self::WithExprs { op_name, exprs } => {
42 if exprs.is_empty() {
43 write!(f, "{}()", op_name)
44 } else {
45 let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
46 write!(f, "{}({})", op_name, expr_strs.join(", "))
47 }
48 }
49 }
50 }
51}
52
53pub struct IndentedGraphWriter<W> {
56 pub write: W,
57 pub indent: usize,
58 pub config: HydroWriteConfig,
59}
60
61impl<W> IndentedGraphWriter<W> {
62 pub fn new(write: W) -> Self {
64 Self {
65 write,
66 indent: 0,
67 config: HydroWriteConfig::default(),
68 }
69 }
70
71 pub fn new_with_config(write: W, config: &HydroWriteConfig) -> Self {
73 Self {
74 write,
75 indent: 0,
76 config: config.clone(),
77 }
78 }
79}
80
81impl<W: Write> IndentedGraphWriter<W> {
82 pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
84 writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
85 }
86}
87
88pub type GraphWriteError = std::fmt::Error;
90
91#[auto_impl(&mut, Box)]
93pub trait HydroGraphWrite {
94 type Err: Error;
96
97 fn write_prologue(&mut self) -> Result<(), Self::Err>;
99
100 fn write_node_definition(
102 &mut self,
103 node_id: usize,
104 node_label: &NodeLabel,
105 node_type: HydroNodeType,
106 location_id: Option<usize>,
107 location_type: Option<&str>,
108 ) -> Result<(), Self::Err>;
109
110 fn write_edge(
112 &mut self,
113 src_id: usize,
114 dst_id: usize,
115 edge_type: HydroEdgeType,
116 label: Option<&str>,
117 ) -> Result<(), Self::Err>;
118
119 fn write_location_start(
121 &mut self,
122 location_id: usize,
123 location_type: &str,
124 ) -> Result<(), Self::Err>;
125
126 fn write_node(&mut self, node_id: usize) -> Result<(), Self::Err>;
128
129 fn write_location_end(&mut self) -> Result<(), Self::Err>;
131
132 fn write_epilogue(&mut self) -> Result<(), Self::Err>;
134}
135
136#[derive(Debug, Clone, Copy)]
138pub enum HydroNodeType {
139 Source,
140 Transform,
141 Join,
142 Aggregation,
143 Network,
144 Sink,
145 Tee,
146}
147
148#[derive(Debug, Clone, Copy)]
150pub enum HydroEdgeType {
151 Stream,
152 Persistent,
153 Network,
154 Cycle,
155}
156
157#[derive(Debug, Clone)]
159pub struct HydroWriteConfig {
160 pub show_metadata: bool,
161 pub show_location_groups: bool,
162 pub use_short_labels: bool,
163 pub process_id_name: Vec<(usize, String)>,
164 pub cluster_id_name: Vec<(usize, String)>,
165 pub external_id_name: Vec<(usize, String)>,
166}
167
168impl Default for HydroWriteConfig {
169 fn default() -> Self {
170 Self {
171 show_metadata: false,
172 show_location_groups: true,
173 use_short_labels: true, process_id_name: vec![],
175 cluster_id_name: vec![],
176 external_id_name: vec![],
177 }
178 }
179}
180
181#[derive(Debug, Default)]
183pub struct HydroGraphStructure {
184 pub nodes: HashMap<usize, (NodeLabel, HydroNodeType, Option<usize>)>, pub edges: Vec<(usize, usize, HydroEdgeType, Option<String>)>, pub locations: HashMap<usize, String>, pub next_node_id: usize,
188}
189
190impl HydroGraphStructure {
191 pub fn new() -> Self {
192 Self::default()
193 }
194
195 pub fn add_node(
196 &mut self,
197 label: NodeLabel,
198 node_type: HydroNodeType,
199 location: Option<usize>,
200 ) -> usize {
201 let node_id = self.next_node_id;
202 self.next_node_id += 1;
203 self.nodes.insert(node_id, (label, node_type, location));
204 node_id
205 }
206
207 pub fn add_edge(
208 &mut self,
209 src: usize,
210 dst: usize,
211 edge_type: HydroEdgeType,
212 label: Option<String>,
213 ) {
214 self.edges.push((src, dst, edge_type, label));
215 }
216
217 pub fn add_location(&mut self, location_id: usize, location_type: String) {
218 self.locations.insert(location_id, location_type);
219 }
220}
221
222pub fn extract_op_name(full_label: String) -> String {
224 full_label
225 .split('(')
226 .next()
227 .unwrap_or("unknown")
228 .to_string()
229 .to_lowercase()
230}
231
232pub fn extract_short_label(full_label: &str) -> String {
234 if let Some(op_name) = full_label.split('(').next() {
236 let base_name = op_name.to_lowercase();
237 match base_name.as_str() {
238 "source" => {
240 if full_label.contains("Iter") {
241 "source_iter".to_string()
242 } else if full_label.contains("Stream") {
243 "source_stream".to_string()
244 } else if full_label.contains("ExternalNetwork") {
245 "external_network".to_string()
246 } else if full_label.contains("Spin") {
247 "spin".to_string()
248 } else {
249 "source".to_string()
250 }
251 }
252 "network" => {
253 if full_label.contains("deser") {
254 "network(recv)".to_string()
255 } else if full_label.contains("ser") {
256 "network(send)".to_string()
257 } else {
258 "network".to_string()
259 }
260 }
261 _ => base_name,
263 }
264 } else {
265 if full_label.len() > 20 {
267 format!("{}...", &full_label[..17])
268 } else {
269 full_label.to_string()
270 }
271 }
272}
273
274fn extract_location_id(metadata: &crate::ir::HydroIrMetadata) -> (Option<usize>, Option<String>) {
276 use crate::location::LocationId;
277 match &metadata.location_kind {
278 LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
279 LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
280 LocationId::Tick(_, inner) => match inner.as_ref() {
281 LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
282 LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
283 _ => (None, None),
284 },
285 }
286}
287
288fn setup_location(
290 structure: &mut HydroGraphStructure,
291 metadata: &crate::ir::HydroIrMetadata,
292) -> Option<usize> {
293 let (location_id, location_type) = extract_location_id(metadata);
294 if let (Some(loc_id), Some(loc_type)) = (location_id, location_type) {
295 structure.add_location(loc_id, loc_type);
296 }
297 location_id
298}
299
300impl HydroLeaf {
301 pub fn write_graph<W>(
303 &self,
304 mut graph_write: W,
305 config: &HydroWriteConfig,
306 ) -> Result<(), W::Err>
307 where
308 W: HydroGraphWrite,
309 {
310 let mut structure = HydroGraphStructure::new();
311 let mut seen_tees = HashMap::new();
312
313 let _sink_id = self.build_graph_structure(&mut structure, &mut seen_tees, config);
315
316 graph_write.write_prologue()?;
318
319 for (&node_id, (label, node_type, location)) in &structure.nodes {
321 let (location_id, location_type) = if let Some(loc_id) = location {
322 (
323 Some(*loc_id),
324 structure.locations.get(loc_id).map(|s| s.as_str()),
325 )
326 } else {
327 (None, None)
328 };
329
330 graph_write.write_node_definition(
333 node_id,
334 label,
335 *node_type,
336 location_id,
337 location_type,
338 )?;
339 }
340
341 if config.show_location_groups {
343 let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
344 for (&node_id, (_, _, location)) in &structure.nodes {
345 if let Some(location_id) = location {
346 nodes_by_location
347 .entry(*location_id)
348 .or_default()
349 .push(node_id);
350 }
351 }
352
353 for (&location_id, node_ids) in &nodes_by_location {
354 if let Some(location_type) = structure.locations.get(&location_id) {
355 graph_write.write_location_start(location_id, location_type)?;
356 for &node_id in node_ids {
357 graph_write.write_node(node_id)?;
358 }
359 graph_write.write_location_end()?;
360 }
361 }
362 }
363
364 for (src_id, dst_id, edge_type, label) in &structure.edges {
366 graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
367 }
368
369 graph_write.write_epilogue()?;
370 Ok(())
371 }
372
373 pub fn build_graph_structure(
375 &self,
376 structure: &mut HydroGraphStructure,
377 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
378 config: &HydroWriteConfig,
379 ) -> usize {
380 fn build_sink_node(
382 structure: &mut HydroGraphStructure,
383 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
384 config: &HydroWriteConfig,
385 input: &HydroNode,
386 metadata: Option<&crate::ir::HydroIrMetadata>,
387 label: NodeLabel,
388 edge_type: HydroEdgeType,
389 ) -> usize {
390 let input_id = input.build_graph_structure(structure, seen_tees, config);
391 let location_id = metadata.and_then(|m| setup_location(structure, m));
392 let sink_id = structure.add_node(label, HydroNodeType::Sink, location_id);
393 structure.add_edge(input_id, sink_id, edge_type, None);
394 sink_id
395 }
396
397 match self {
398 HydroLeaf::ForEach { f, input, metadata } => build_sink_node(
400 structure,
401 seen_tees,
402 config,
403 input,
404 Some(metadata),
405 NodeLabel::with_exprs("for_each".to_string(), vec![f.clone()]),
406 HydroEdgeType::Stream,
407 ),
408
409 HydroLeaf::SendExternal {
410 to_external_id,
411 to_key,
412 input,
413 ..
414 } => build_sink_node(
415 structure,
416 seen_tees,
417 config,
418 input,
419 None,
420 NodeLabel::with_exprs(
421 format!("send_external({}:{})", to_external_id, to_key),
422 vec![],
423 ),
424 HydroEdgeType::Stream,
425 ),
426
427 HydroLeaf::DestSink {
428 sink,
429 input,
430 metadata,
431 } => build_sink_node(
432 structure,
433 seen_tees,
434 config,
435 input,
436 Some(metadata),
437 NodeLabel::with_exprs("dest_sink".to_string(), vec![sink.clone()]),
438 HydroEdgeType::Stream,
439 ),
440
441 HydroLeaf::CycleSink {
443 ident,
444 input,
445 metadata,
446 ..
447 } => build_sink_node(
448 structure,
449 seen_tees,
450 config,
451 input,
452 Some(metadata),
453 NodeLabel::static_label(format!("cycle_sink({})", ident)),
454 HydroEdgeType::Cycle,
455 ),
456 }
457 }
458}
459
460impl HydroNode {
461 pub fn build_graph_structure(
463 &self,
464 structure: &mut HydroGraphStructure,
465 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
466 config: &HydroWriteConfig,
467 ) -> usize {
468 use crate::location::LocationId;
469
470 struct TransformParams<'a> {
474 structure: &'a mut HydroGraphStructure,
475 seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
476 config: &'a HydroWriteConfig,
477 input: &'a HydroNode,
478 metadata: &'a crate::ir::HydroIrMetadata,
479 op_name: String,
480 node_type: HydroNodeType,
481 edge_type: HydroEdgeType,
482 }
483
484 fn build_simple_transform(params: TransformParams) -> usize {
486 let input_id = params.input.build_graph_structure(
487 params.structure,
488 params.seen_tees,
489 params.config,
490 );
491 let location_id = setup_location(params.structure, params.metadata);
492 let node_id = params.structure.add_node(
493 NodeLabel::Static(params.op_name.to_string()),
494 params.node_type,
495 location_id,
496 );
497 params
498 .structure
499 .add_edge(input_id, node_id, params.edge_type, None);
500 node_id
501 }
502
503 fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> usize {
505 let input_id = params.input.build_graph_structure(
506 params.structure,
507 params.seen_tees,
508 params.config,
509 );
510 let location_id = setup_location(params.structure, params.metadata);
511 let node_id = params.structure.add_node(
512 NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
513 params.node_type,
514 location_id,
515 );
516 params
517 .structure
518 .add_edge(input_id, node_id, params.edge_type, None);
519 node_id
520 }
521
522 fn build_dual_expr_transform(
524 params: TransformParams,
525 expr1: &DebugExpr,
526 expr2: &DebugExpr,
527 ) -> usize {
528 let input_id = params.input.build_graph_structure(
529 params.structure,
530 params.seen_tees,
531 params.config,
532 );
533 let location_id = setup_location(params.structure, params.metadata);
534 let node_id = params.structure.add_node(
535 NodeLabel::with_exprs(
536 params.op_name.to_string(),
537 vec![expr1.clone(), expr2.clone()],
538 ),
539 params.node_type,
540 location_id,
541 );
542 params
543 .structure
544 .add_edge(input_id, node_id, params.edge_type, None);
545 node_id
546 }
547
548 fn build_source_node(
550 structure: &mut HydroGraphStructure,
551 metadata: &crate::ir::HydroIrMetadata,
552 label: String,
553 ) -> usize {
554 let location_id = setup_location(structure, metadata);
555 structure.add_node(NodeLabel::Static(label), HydroNodeType::Source, location_id)
556 }
557
558 match self {
559 HydroNode::Placeholder => structure.add_node(
560 NodeLabel::Static("PLACEHOLDER".to_string()),
561 HydroNodeType::Transform,
562 None,
563 ),
564
565 HydroNode::Source {
566 source, metadata, ..
567 } => {
568 let label = match source {
569 HydroSource::Stream(expr) => format!("source_stream({})", expr),
570 HydroSource::ExternalNetwork() => "external_network()".to_string(),
571 HydroSource::Iter(expr) => format!("source_iter({})", expr),
572 HydroSource::Spin() => "spin()".to_string(),
573 };
574 build_source_node(structure, metadata, label)
575 }
576
577 HydroNode::ExternalInput {
578 from_external_id,
579 from_key,
580 metadata,
581 ..
582 } => build_source_node(
583 structure,
584 metadata,
585 format!("external_input({}:{})", from_external_id, from_key),
586 ),
587
588 HydroNode::CycleSource {
589 ident, metadata, ..
590 } => build_source_node(structure, metadata, format!("cycle_source({})", ident)),
591
592 HydroNode::Tee { inner, metadata } => {
593 let ptr = inner.as_ptr();
594 if let Some(&existing_id) = seen_tees.get(&ptr) {
595 return existing_id;
596 }
597
598 let input_id = inner
599 .0
600 .borrow()
601 .build_graph_structure(structure, seen_tees, config);
602 let location_id = setup_location(structure, metadata);
603
604 let tee_id = structure.add_node(
605 NodeLabel::Static(extract_op_name(self.print_root())),
606 HydroNodeType::Tee,
607 location_id,
608 );
609
610 seen_tees.insert(ptr, tee_id);
611
612 structure.add_edge(input_id, tee_id, HydroEdgeType::Stream, None);
613
614 tee_id
615 }
616
617 HydroNode::Delta { inner, metadata }
619 | HydroNode::DeferTick {
620 input: inner,
621 metadata,
622 }
623 | HydroNode::Enumerate {
624 input: inner,
625 metadata,
626 ..
627 }
628 | HydroNode::Unique {
629 input: inner,
630 metadata,
631 }
632 | HydroNode::ResolveFutures {
633 input: inner,
634 metadata,
635 }
636 | HydroNode::ResolveFuturesOrdered {
637 input: inner,
638 metadata,
639 } => build_simple_transform(TransformParams {
640 structure,
641 seen_tees,
642 config,
643 input: inner,
644 metadata,
645 op_name: extract_op_name(self.print_root()),
646 node_type: HydroNodeType::Transform,
647 edge_type: HydroEdgeType::Stream,
648 }),
649
650 HydroNode::Persist { inner, metadata } => build_simple_transform(TransformParams {
652 structure,
653 seen_tees,
654 config,
655 input: inner,
656 metadata,
657 op_name: extract_op_name(self.print_root()),
658 node_type: HydroNodeType::Transform,
659 edge_type: HydroEdgeType::Persistent,
660 }),
661
662 HydroNode::Sort {
664 input: inner,
665 metadata,
666 } => build_simple_transform(TransformParams {
667 structure,
668 seen_tees,
669 config,
670 input: inner,
671 metadata,
672 op_name: extract_op_name(self.print_root()),
673 node_type: HydroNodeType::Aggregation,
674 edge_type: HydroEdgeType::Stream,
675 }),
676
677 HydroNode::Map { f, input, metadata }
679 | HydroNode::Filter { f, input, metadata }
680 | HydroNode::FlatMap { f, input, metadata }
681 | HydroNode::FilterMap { f, input, metadata }
682 | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
683 TransformParams {
684 structure,
685 seen_tees,
686 config,
687 input,
688 metadata,
689 op_name: extract_op_name(self.print_root()),
690 node_type: HydroNodeType::Transform,
691 edge_type: HydroEdgeType::Stream,
692 },
693 f,
694 ),
695
696 HydroNode::Reduce { f, input, metadata }
698 | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
699 TransformParams {
700 structure,
701 seen_tees,
702 config,
703 input,
704 metadata,
705 op_name: extract_op_name(self.print_root()),
706 node_type: HydroNodeType::Aggregation,
707 edge_type: HydroEdgeType::Stream,
708 },
709 f,
710 ),
711
712 HydroNode::Join {
714 left,
715 right,
716 metadata,
717 }
718 | HydroNode::CrossProduct {
719 left,
720 right,
721 metadata,
722 }
723 | HydroNode::CrossSingleton {
724 left,
725 right,
726 metadata,
727 } => {
728 let left_id = left.build_graph_structure(structure, seen_tees, config);
729 let right_id = right.build_graph_structure(structure, seen_tees, config);
730 let location_id = setup_location(structure, metadata);
731 let node_id = structure.add_node(
732 NodeLabel::Static(extract_op_name(self.print_root())),
733 HydroNodeType::Join,
734 location_id,
735 );
736 structure.add_edge(
737 left_id,
738 node_id,
739 HydroEdgeType::Stream,
740 Some("left".to_string()),
741 );
742 structure.add_edge(
743 right_id,
744 node_id,
745 HydroEdgeType::Stream,
746 Some("right".to_string()),
747 );
748 node_id
749 }
750
751 HydroNode::Difference {
753 pos: left,
754 neg: right,
755 metadata,
756 }
757 | HydroNode::AntiJoin {
758 pos: left,
759 neg: right,
760 metadata,
761 } => {
762 let left_id = left.build_graph_structure(structure, seen_tees, config);
763 let right_id = right.build_graph_structure(structure, seen_tees, config);
764 let location_id = setup_location(structure, metadata);
765 let node_id = structure.add_node(
766 NodeLabel::Static(extract_op_name(self.print_root())),
767 HydroNodeType::Join,
768 location_id,
769 );
770 structure.add_edge(
771 left_id,
772 node_id,
773 HydroEdgeType::Stream,
774 Some("pos".to_string()),
775 );
776 structure.add_edge(
777 right_id,
778 node_id,
779 HydroEdgeType::Stream,
780 Some("neg".to_string()),
781 );
782 node_id
783 }
784
785 HydroNode::Fold {
787 init,
788 acc,
789 input,
790 metadata,
791 }
792 | HydroNode::FoldKeyed {
793 init,
794 acc,
795 input,
796 metadata,
797 }
798 | HydroNode::Scan {
799 init,
800 acc,
801 input,
802 metadata,
803 } => {
804 let node_type = HydroNodeType::Aggregation; build_dual_expr_transform(
807 TransformParams {
808 structure,
809 seen_tees,
810 config,
811 input,
812 metadata,
813 op_name: extract_op_name(self.print_root()),
814 node_type,
815 edge_type: HydroEdgeType::Stream,
816 },
817 init,
818 acc,
819 )
820 }
821
822 HydroNode::ReduceKeyedWatermark {
824 f,
825 input,
826 watermark,
827 metadata,
828 } => {
829 let input_id = input.build_graph_structure(structure, seen_tees, config);
830 let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
831 let location_id = setup_location(structure, metadata);
832 let join_node_id = structure.add_node(
833 NodeLabel::Static(extract_op_name(self.print_root())),
834 HydroNodeType::Join,
835 location_id,
836 );
837 structure.add_edge(
838 input_id,
839 join_node_id,
840 HydroEdgeType::Stream,
841 Some("input".to_string()),
842 );
843 structure.add_edge(
844 watermark_id,
845 join_node_id,
846 HydroEdgeType::Stream,
847 Some("watermark".to_string()),
848 );
849
850 let node_id = structure.add_node(
851 NodeLabel::with_exprs(
852 extract_op_name(self.print_root()).to_string(),
853 vec![f.clone()],
854 ),
855 HydroNodeType::Aggregation,
856 location_id,
857 );
858 structure.add_edge(join_node_id, node_id, HydroEdgeType::Stream, None);
859 node_id
860 }
861
862 HydroNode::Network {
863 serialize_fn,
864 deserialize_fn,
865 input,
866 metadata,
867 ..
868 } => {
869 let input_id = input.build_graph_structure(structure, seen_tees, config);
870 let _from_location_id = setup_location(structure, metadata);
871
872 let to_location_id = match metadata.location_kind.root() {
873 LocationId::Process(id) => {
874 structure.add_location(*id, "Process".to_string());
875 Some(*id)
876 }
877 LocationId::Cluster(id) => {
878 structure.add_location(*id, "Cluster".to_string());
879 Some(*id)
880 }
881 _ => None,
882 };
883
884 let mut label = "network(".to_string();
885 if serialize_fn.is_some() {
886 label.push_str("ser");
887 }
888 if deserialize_fn.is_some() {
889 if serialize_fn.is_some() {
890 label.push_str(" + ");
891 }
892 label.push_str("deser");
893 }
894 label.push(')');
895
896 let network_id = structure.add_node(
897 NodeLabel::Static(label),
898 HydroNodeType::Network,
899 to_location_id,
900 );
901 structure.add_edge(
902 input_id,
903 network_id,
904 HydroEdgeType::Network,
905 Some(format!("to {:?}", to_location_id)),
906 );
907 network_id
908 }
909
910 HydroNode::Unpersist { inner, .. } => {
912 inner.build_graph_structure(structure, seen_tees, config)
914 }
915
916 HydroNode::Chain {
917 first,
918 second,
919 metadata,
920 } => {
921 let first_id = first.build_graph_structure(structure, seen_tees, config);
922 let second_id = second.build_graph_structure(structure, seen_tees, config);
923 let location_id = setup_location(structure, metadata);
924 let chain_id = structure.add_node(
925 NodeLabel::Static(extract_op_name(self.print_root())),
926 HydroNodeType::Transform,
927 location_id,
928 );
929 structure.add_edge(
930 first_id,
931 chain_id,
932 HydroEdgeType::Stream,
933 Some("first".to_string()),
934 );
935 structure.add_edge(
936 second_id,
937 chain_id,
938 HydroEdgeType::Stream,
939 Some("second".to_string()),
940 );
941 chain_id
942 }
943
944 HydroNode::Counter {
945 tag: _,
946 duration,
947 input,
948 metadata,
949 } => build_single_expr_transform(
950 TransformParams {
951 structure,
952 seen_tees,
953 config,
954 input,
955 metadata,
956 op_name: extract_op_name(self.print_root()),
957 node_type: HydroNodeType::Transform,
958 edge_type: HydroEdgeType::Stream,
959 },
960 duration,
961 ),
962 }
963 }
964}
965
966macro_rules! render_hydro_ir {
969 ($name:ident, $write_fn:ident) => {
970 pub fn $name(leaves: &[HydroLeaf], config: &HydroWriteConfig) -> String {
971 let mut output = String::new();
972 $write_fn(&mut output, leaves, config).unwrap();
973 output
974 }
975 };
976}
977
978macro_rules! write_hydro_ir {
980 ($name:ident, $writer_type:ty, $constructor:expr) => {
981 pub fn $name(
982 output: impl std::fmt::Write,
983 leaves: &[HydroLeaf],
984 config: &HydroWriteConfig,
985 ) -> std::fmt::Result {
986 let mut graph_write: $writer_type = $constructor(output, config);
987 write_hydro_ir_graph(&mut graph_write, leaves, config)
988 }
989 };
990}
991
992render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
993write_hydro_ir!(
994 write_hydro_ir_mermaid,
995 HydroMermaid<_>,
996 HydroMermaid::new_with_config
997);
998
999render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1000write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1001
1002render_hydro_ir!(render_hydro_ir_reactflow, write_hydro_ir_reactflow);
1003write_hydro_ir!(
1004 write_hydro_ir_reactflow,
1005 HydroReactFlow<_>,
1006 HydroReactFlow::new
1007);
1008
1009fn write_hydro_ir_graph<W>(
1010 mut graph_write: W,
1011 leaves: &[HydroLeaf],
1012 config: &HydroWriteConfig,
1013) -> Result<(), W::Err>
1014where
1015 W: HydroGraphWrite,
1016{
1017 let mut structure = HydroGraphStructure::new();
1018 let mut seen_tees = HashMap::new();
1019
1020 for leaf in leaves {
1022 leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1023 }
1024
1025 graph_write.write_prologue()?;
1027
1028 for (&node_id, (label, node_type, location)) in &structure.nodes {
1029 let (location_id, location_type) = if let Some(loc_id) = location {
1030 (
1031 Some(*loc_id),
1032 structure.locations.get(loc_id).map(|s| s.as_str()),
1033 )
1034 } else {
1035 (None, None)
1036 };
1037 graph_write.write_node_definition(
1038 node_id,
1039 label,
1040 *node_type,
1041 location_id,
1042 location_type,
1043 )?;
1044 }
1045
1046 if config.show_location_groups {
1047 let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
1048 for (&node_id, (_, _, location)) in &structure.nodes {
1049 if let Some(location_id) = location {
1050 nodes_by_location
1051 .entry(*location_id)
1052 .or_default()
1053 .push(node_id);
1054 }
1055 }
1056
1057 for (&location_id, node_ids) in &nodes_by_location {
1058 if let Some(location_type) = structure.locations.get(&location_id) {
1059 graph_write.write_location_start(location_id, location_type)?;
1060 for &node_id in node_ids {
1061 graph_write.write_node(node_id)?;
1062 }
1063 graph_write.write_location_end()?;
1064 }
1065 }
1066 }
1067
1068 for (src_id, dst_id, edge_type, label) in &structure.edges {
1069 graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
1070 }
1071
1072 graph_write.write_epilogue()?;
1073 Ok(())
1074}