1use core::panic;
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23
24#[cfg(feature = "build")]
25use crate::compile::builder::ClockId;
26#[cfg(feature = "build")]
27use crate::compile::builder::StmtId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31#[cfg(feature = "build")]
32use crate::handoff_ref::handoff_ref_ident;
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39pub struct ClosureExpr {
45 pub(crate) expr: DebugExpr,
46 pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
50}
51
52impl Clone for ClosureExpr {
53 fn clone(&self) -> Self {
54 Self {
55 expr: self.expr.clone(),
56 singleton_refs: self
57 .singleton_refs
58 .iter()
59 .map(|(node, is_mut)| {
60 let HydroNode::Reference {
61 inner,
62 kind,
63 access_counter,
64 metadata,
65 } = node
66 else {
67 panic!("singleton_refs should only contain HydroNode::Reference");
68 };
69 (
70 HydroNode::Reference {
71 inner: SharedNode(Rc::clone(&inner.0)),
72 kind: *kind,
73 access_counter: access_counter.freeze(),
74 metadata: metadata.clone(),
75 },
76 *is_mut,
77 )
78 })
79 .collect(),
80 }
81 }
82}
83
84impl Hash for ClosureExpr {
85 fn hash<H: Hasher>(&self, state: &mut H) {
86 self.expr.hash(state);
87 }
91}
92
93impl serde::Serialize for ClosureExpr {
94 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
95 use serde::ser::SerializeStruct;
96 let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
97 s.serialize_field("expr", &self.expr)?;
98 s.serialize_field(
99 "singleton_refs",
100 &SerializableSingletonRefs(&self.singleton_refs),
101 )?;
102 s.end()
103 }
104}
105
106struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
107
108impl serde::Serialize for SerializableSingletonRefs<'_> {
109 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
110 use serde::ser::SerializeSeq;
111 let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
112 for (node, is_mut) in self.0.iter() {
113 seq.serialize_element(&(node, is_mut))?;
114 }
115 seq.end()
116 }
117}
118
119impl Debug for ClosureExpr {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 Debug::fmt(&self.expr, f)
122 }
123}
124
125impl Display for ClosureExpr {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 Display::fmt(&self.expr, f)
128 }
129}
130
131impl From<syn::Expr> for ClosureExpr {
132 fn from(expr: syn::Expr) -> Self {
133 Self {
134 expr: DebugExpr(Box::new(expr)),
135 singleton_refs: Vec::new(),
136 }
137 }
138}
139
140impl From<DebugExpr> for ClosureExpr {
141 fn from(expr: DebugExpr) -> Self {
142 Self {
143 expr,
144 singleton_refs: Vec::new(),
145 }
146 }
147}
148
149impl ClosureExpr {
150 pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
151 Self {
152 expr,
153 singleton_refs,
154 }
155 }
156
157 pub fn has_mut_ref(&self) -> bool {
158 self.singleton_refs.iter().any(|(_, is_mut)| *is_mut)
159 }
160
161 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
162 Self {
163 expr: self.expr.clone(),
164 singleton_refs: self
165 .singleton_refs
166 .iter()
167 .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
168 .collect(),
169 }
170 }
171
172 pub fn transform_children(
173 &mut self,
174 transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
175 seen_tees: &mut SeenSharedNodes,
176 ) {
177 for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
178 transform(ref_node, seen_tees);
179 }
180 }
181
182 #[cfg(feature = "build")]
185 pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
186 if self.singleton_refs.is_empty() {
187 self.expr.0.to_token_stream()
188 } else {
189 assert!(
190 ident_stack.len() >= self.singleton_refs.len(),
191 "ident_stack has {} entries but expected at least {} for singleton_refs",
192 ident_stack.len(),
193 self.singleton_refs.len()
194 );
195 let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
196
197 let mut let_bindings = Vec::new();
198 for ((i, (ref_node, is_mut)), ref_ident) in
199 self.singleton_refs.iter().enumerate().zip(ref_idents)
200 {
201 let HydroNode::Reference { access_counter, .. } = ref_node else {
202 panic!("ClosureExpression expected references to `HydroNode::Reference`");
203 };
204 let group = access_counter.frozen_group();
205 let local_ident = handoff_ref_ident(i);
207 let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
208 let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
209 let mut_token = is_mut.then(|| quote!(mut));
210 let binding = quote! {
211 let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
212 };
213 let_bindings.push(binding);
214 }
215
216 let expr = &self.expr.0;
217 quote! {
218 {
219 #( #let_bindings )*
220 #expr
221 }
222 }
223 }
224 }
225}
226
227#[derive(Clone, Hash)]
231pub struct DebugExpr(pub Box<syn::Expr>);
232
233impl serde::Serialize for DebugExpr {
234 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
235 serializer.serialize_str(&self.to_string())
236 }
237}
238
239impl From<syn::Expr> for DebugExpr {
240 fn from(expr: syn::Expr) -> Self {
241 Self(Box::new(expr))
242 }
243}
244
245impl Deref for DebugExpr {
246 type Target = syn::Expr;
247
248 fn deref(&self) -> &Self::Target {
249 &self.0
250 }
251}
252
253impl ToTokens for DebugExpr {
254 fn to_tokens(&self, tokens: &mut TokenStream) {
255 self.0.to_tokens(tokens);
256 }
257}
258
259impl Debug for DebugExpr {
260 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 write!(f, "{}", self.0.to_token_stream())
262 }
263}
264
265impl Display for DebugExpr {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 let original = self.0.as_ref().clone();
268 let simplified = simplify_q_macro(original);
269
270 write!(f, "q!({})", quote::quote!(#simplified))
273 }
274}
275
276fn simplify_q_macro(expr: syn::Expr) -> syn::Expr {
278 if let syn::Expr::Call(ref call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
279 && is_stageleft_runtime_support_call(&path_expr.path)
281 && let syn::Expr::Block(b) = &call.args[0]
282 && b.block.stmts.len() == 3
283 && let Some(syn::Stmt::Expr(e, _)) = b.block.stmts.get(2)
284 {
286 let mut e = e.clone();
287 while let syn::Expr::Block(ref mut block) = e
288 && block.block.stmts.len() == 1
289 && let syn::Stmt::Expr(inner_e, _) = block.block.stmts.remove(0)
290 {
291 e = inner_e;
292 }
293
294 e
295 } else {
296 expr
297 }
298}
299
300fn is_stageleft_runtime_support_call(path: &syn::Path) -> bool {
301 if let Some(last_segment) = path.segments.last() {
303 let fn_name = last_segment.ident.to_string();
304 path.segments.len() > 2
305 && path.segments[0].ident == "stageleft"
306 && path.segments[1].ident == "runtime_support"
307 && fn_name.contains("_type_hint")
308 } else {
309 false
310 }
311}
312
313#[derive(Clone, PartialEq, Eq, Hash)]
317pub struct DebugType(pub Box<syn::Type>);
318
319impl From<syn::Type> for DebugType {
320 fn from(t: syn::Type) -> Self {
321 Self(Box::new(t))
322 }
323}
324
325impl Deref for DebugType {
326 type Target = syn::Type;
327
328 fn deref(&self) -> &Self::Target {
329 &self.0
330 }
331}
332
333impl ToTokens for DebugType {
334 fn to_tokens(&self, tokens: &mut TokenStream) {
335 self.0.to_tokens(tokens);
336 }
337}
338
339impl Debug for DebugType {
340 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341 write!(f, "{}", self.0.to_token_stream())
342 }
343}
344
345impl serde::Serialize for DebugType {
346 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
347 serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
348 }
349}
350
351fn serialize_backtrace_as_span<S: serde::Serializer>(
352 backtrace: &Backtrace,
353 serializer: S,
354) -> Result<S::Ok, S::Error> {
355 match backtrace.format_span() {
356 Some(span) => serializer.serialize_some(&span),
357 None => serializer.serialize_none(),
358 }
359}
360
361fn serialize_ident<S: serde::Serializer>(
362 ident: &syn::Ident,
363 serializer: S,
364) -> Result<S::Ok, S::Error> {
365 serializer.serialize_str(&ident.to_string())
366}
367
368pub enum DebugInstantiate {
369 Building,
370 Finalized(Box<DebugInstantiateFinalized>),
371}
372
373impl serde::Serialize for DebugInstantiate {
374 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
375 match self {
376 DebugInstantiate::Building => {
377 serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
378 }
379 DebugInstantiate::Finalized(_) => {
380 panic!(
381 "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
382 )
383 }
384 }
385 }
386}
387
388#[cfg_attr(
389 not(feature = "build"),
390 expect(
391 dead_code,
392 reason = "sink, source unused without `feature = \"build\"`."
393 )
394)]
395pub struct DebugInstantiateFinalized {
396 sink: syn::Expr,
397 source: syn::Expr,
398 connect_fn: Option<Box<dyn FnOnce()>>,
399}
400
401impl From<DebugInstantiateFinalized> for DebugInstantiate {
402 fn from(f: DebugInstantiateFinalized) -> Self {
403 Self::Finalized(Box::new(f))
404 }
405}
406
407impl Debug for DebugInstantiate {
408 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409 write!(f, "<network instantiate>")
410 }
411}
412
413impl Hash for DebugInstantiate {
414 fn hash<H: Hasher>(&self, _state: &mut H) {
415 }
417}
418
419impl Clone for DebugInstantiate {
420 fn clone(&self) -> Self {
421 match self {
422 DebugInstantiate::Building => DebugInstantiate::Building,
423 DebugInstantiate::Finalized(_) => {
424 panic!("DebugInstantiate::Finalized should not be cloned")
425 }
426 }
427 }
428}
429
430#[derive(Debug, Hash, Clone, serde::Serialize)]
439pub enum ClusterMembersState {
440 Uninit,
442 Stream(DebugExpr),
445 Tee(LocationId, LocationId),
449}
450
451#[derive(Debug, Hash, Clone, serde::Serialize)]
453pub enum HydroSource {
454 Stream(DebugExpr),
455 ExternalNetwork(),
456 Iter(DebugExpr),
457 Spin(),
458 ClusterMembers(LocationId, ClusterMembersState),
459 Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
460 EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
461}
462
463#[cfg(feature = "build")]
464pub trait DfirBuilder {
470 fn singleton_intermediates(&self) -> bool;
472
473 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
475
476 #[expect(clippy::too_many_arguments, reason = "TODO")]
477 fn batch(
478 &mut self,
479 in_ident: syn::Ident,
480 in_location: &LocationId,
481 in_kind: &CollectionKind,
482 out_ident: &syn::Ident,
483 out_location: &LocationId,
484 op_meta: &HydroIrOpMetadata,
485 fold_hooked_idents: &HashSet<String>,
486 );
487 fn yield_from_tick(
488 &mut self,
489 in_ident: syn::Ident,
490 in_location: &LocationId,
491 in_kind: &CollectionKind,
492 out_ident: &syn::Ident,
493 out_location: &LocationId,
494 );
495
496 fn begin_atomic(
497 &mut self,
498 in_ident: syn::Ident,
499 in_location: &LocationId,
500 in_kind: &CollectionKind,
501 out_ident: &syn::Ident,
502 out_location: &LocationId,
503 op_meta: &HydroIrOpMetadata,
504 );
505 fn end_atomic(
506 &mut self,
507 in_ident: syn::Ident,
508 in_location: &LocationId,
509 in_kind: &CollectionKind,
510 out_ident: &syn::Ident,
511 );
512
513 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
514 fn observe_nondet(
515 &mut self,
516 trusted: bool,
517 location: &LocationId,
518 in_ident: syn::Ident,
519 in_kind: &CollectionKind,
520 out_ident: &syn::Ident,
521 out_kind: &CollectionKind,
522 op_meta: &HydroIrOpMetadata,
523 );
524
525 #[expect(clippy::too_many_arguments, reason = "TODO")]
526 fn merge_ordered(
527 &mut self,
528 location: &LocationId,
529 first_ident: syn::Ident,
530 second_ident: syn::Ident,
531 out_ident: &syn::Ident,
532 in_kind: &CollectionKind,
533 op_meta: &HydroIrOpMetadata,
534 operator_tag: Option<&str>,
535 );
536
537 #[expect(clippy::too_many_arguments, reason = "TODO")]
538 fn create_network(
539 &mut self,
540 from: &LocationId,
541 to: &LocationId,
542 input_ident: syn::Ident,
543 out_ident: &syn::Ident,
544 serialize: Option<&DebugExpr>,
545 sink: syn::Expr,
546 source: syn::Expr,
547 deserialize: Option<&DebugExpr>,
548 tag_id: StmtId,
549 networking_info: &crate::networking::NetworkingInfo,
550 );
551
552 fn create_external_source(
553 &mut self,
554 on: &LocationId,
555 source_expr: syn::Expr,
556 out_ident: &syn::Ident,
557 deserialize: Option<&DebugExpr>,
558 tag_id: StmtId,
559 );
560
561 fn create_external_output(
562 &mut self,
563 on: &LocationId,
564 sink_expr: syn::Expr,
565 input_ident: &syn::Ident,
566 serialize: Option<&DebugExpr>,
567 tag_id: StmtId,
568 );
569
570 fn emit_fold_hook(
573 &mut self,
574 location: &LocationId,
575 in_ident: &syn::Ident,
576 in_kind: &CollectionKind,
577 op_meta: &HydroIrOpMetadata,
578 ) -> Option<syn::Ident>;
579
580 fn assert_is_consistent(
584 &mut self,
585 trusted: bool,
586 location: &LocationId,
587 in_ident: syn::Ident,
588 out_ident: &syn::Ident,
589 );
590
591 fn observe_for_mut(
595 &mut self,
596 location: &LocationId,
597 in_ident: syn::Ident,
598 in_kind: &CollectionKind,
599 out_ident: &syn::Ident,
600 op_meta: &HydroIrOpMetadata,
601 );
602}
603
604#[cfg(feature = "build")]
605impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
606 fn singleton_intermediates(&self) -> bool {
607 false
608 }
609
610 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
611 self.entry(location.root().key())
612 .expect("location was removed")
613 .or_default()
614 }
615
616 fn batch(
617 &mut self,
618 in_ident: syn::Ident,
619 in_location: &LocationId,
620 in_kind: &CollectionKind,
621 out_ident: &syn::Ident,
622 _out_location: &LocationId,
623 _op_meta: &HydroIrOpMetadata,
624 _fold_hooked_idents: &HashSet<String>,
625 ) {
626 let builder = self.get_dfir_mut(in_location.root());
627 if in_kind.is_bounded()
628 && matches!(
629 in_kind,
630 CollectionKind::Singleton { .. }
631 | CollectionKind::Optional { .. }
632 | CollectionKind::KeyedSingleton { .. }
633 )
634 {
635 assert!(in_location.is_top_level());
636 builder.add_dfir(
637 parse_quote! {
638 #out_ident = #in_ident -> persist::<'static>();
639 },
640 None,
641 None,
642 );
643 } else {
644 builder.add_dfir(
645 parse_quote! {
646 #out_ident = #in_ident;
647 },
648 None,
649 None,
650 );
651 }
652 }
653
654 fn yield_from_tick(
655 &mut self,
656 in_ident: syn::Ident,
657 in_location: &LocationId,
658 _in_kind: &CollectionKind,
659 out_ident: &syn::Ident,
660 _out_location: &LocationId,
661 ) {
662 let builder = self.get_dfir_mut(in_location.root());
663 builder.add_dfir(
664 parse_quote! {
665 #out_ident = #in_ident;
666 },
667 None,
668 None,
669 );
670 }
671
672 fn begin_atomic(
673 &mut self,
674 in_ident: syn::Ident,
675 in_location: &LocationId,
676 _in_kind: &CollectionKind,
677 out_ident: &syn::Ident,
678 _out_location: &LocationId,
679 _op_meta: &HydroIrOpMetadata,
680 ) {
681 let builder = self.get_dfir_mut(in_location.root());
682 builder.add_dfir(
683 parse_quote! {
684 #out_ident = #in_ident;
685 },
686 None,
687 None,
688 );
689 }
690
691 fn end_atomic(
692 &mut self,
693 in_ident: syn::Ident,
694 in_location: &LocationId,
695 _in_kind: &CollectionKind,
696 out_ident: &syn::Ident,
697 ) {
698 let builder = self.get_dfir_mut(in_location.root());
699 builder.add_dfir(
700 parse_quote! {
701 #out_ident = #in_ident;
702 },
703 None,
704 None,
705 );
706 }
707
708 fn observe_nondet(
709 &mut self,
710 _trusted: bool,
711 location: &LocationId,
712 in_ident: syn::Ident,
713 _in_kind: &CollectionKind,
714 out_ident: &syn::Ident,
715 _out_kind: &CollectionKind,
716 _op_meta: &HydroIrOpMetadata,
717 ) {
718 let builder = self.get_dfir_mut(location);
719 builder.add_dfir(
720 parse_quote! {
721 #out_ident = #in_ident;
722 },
723 None,
724 None,
725 );
726 }
727
728 fn merge_ordered(
729 &mut self,
730 location: &LocationId,
731 first_ident: syn::Ident,
732 second_ident: syn::Ident,
733 out_ident: &syn::Ident,
734 _in_kind: &CollectionKind,
735 _op_meta: &HydroIrOpMetadata,
736 operator_tag: Option<&str>,
737 ) {
738 let builder = self.get_dfir_mut(location);
739 builder.add_dfir(
740 parse_quote! {
741 #out_ident = union();
742 #first_ident -> [0]#out_ident;
743 #second_ident -> [1]#out_ident;
744 },
745 None,
746 operator_tag,
747 );
748 }
749
750 fn create_network(
751 &mut self,
752 from: &LocationId,
753 to: &LocationId,
754 input_ident: syn::Ident,
755 out_ident: &syn::Ident,
756 serialize: Option<&DebugExpr>,
757 sink: syn::Expr,
758 source: syn::Expr,
759 deserialize: Option<&DebugExpr>,
760 tag_id: StmtId,
761 _networking_info: &crate::networking::NetworkingInfo,
762 ) {
763 let sender_builder = self.get_dfir_mut(from);
764 if let Some(serialize_pipeline) = serialize {
765 sender_builder.add_dfir(
766 parse_quote! {
767 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
768 },
769 None,
770 Some(&format!("send{}", tag_id)),
772 );
773 } else {
774 sender_builder.add_dfir(
775 parse_quote! {
776 #input_ident -> dest_sink(#sink);
777 },
778 None,
779 Some(&format!("send{}", tag_id)),
780 );
781 }
782
783 let receiver_builder = self.get_dfir_mut(to);
784 if let Some(deserialize_pipeline) = deserialize {
785 receiver_builder.add_dfir(
786 parse_quote! {
787 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
788 },
789 None,
790 Some(&format!("recv{}", tag_id)),
791 );
792 } else {
793 receiver_builder.add_dfir(
794 parse_quote! {
795 #out_ident = source_stream(#source);
796 },
797 None,
798 Some(&format!("recv{}", tag_id)),
799 );
800 }
801 }
802
803 fn create_external_source(
804 &mut self,
805 on: &LocationId,
806 source_expr: syn::Expr,
807 out_ident: &syn::Ident,
808 deserialize: Option<&DebugExpr>,
809 tag_id: StmtId,
810 ) {
811 let receiver_builder = self.get_dfir_mut(on);
812 if let Some(deserialize_pipeline) = deserialize {
813 receiver_builder.add_dfir(
814 parse_quote! {
815 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
816 },
817 None,
818 Some(&format!("recv{}", tag_id)),
819 );
820 } else {
821 receiver_builder.add_dfir(
822 parse_quote! {
823 #out_ident = source_stream(#source_expr);
824 },
825 None,
826 Some(&format!("recv{}", tag_id)),
827 );
828 }
829 }
830
831 fn create_external_output(
832 &mut self,
833 on: &LocationId,
834 sink_expr: syn::Expr,
835 input_ident: &syn::Ident,
836 serialize: Option<&DebugExpr>,
837 tag_id: StmtId,
838 ) {
839 let sender_builder = self.get_dfir_mut(on);
840 if let Some(serialize_fn) = serialize {
841 sender_builder.add_dfir(
842 parse_quote! {
843 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
844 },
845 None,
846 Some(&format!("send{}", tag_id)),
848 );
849 } else {
850 sender_builder.add_dfir(
851 parse_quote! {
852 #input_ident -> dest_sink(#sink_expr);
853 },
854 None,
855 Some(&format!("send{}", tag_id)),
856 );
857 }
858 }
859
860 fn emit_fold_hook(
861 &mut self,
862 _location: &LocationId,
863 _in_ident: &syn::Ident,
864 _in_kind: &CollectionKind,
865 _op_meta: &HydroIrOpMetadata,
866 ) -> Option<syn::Ident> {
867 None
868 }
869
870 fn assert_is_consistent(
871 &mut self,
872 _trusted: bool,
873 location: &LocationId,
874 in_ident: syn::Ident,
875 out_ident: &syn::Ident,
876 ) {
877 let builder = self.get_dfir_mut(location);
878 builder.add_dfir(
879 parse_quote! {
880 #out_ident = #in_ident;
881 },
882 None,
883 None,
884 );
885 }
886
887 fn observe_for_mut(
888 &mut self,
889 location: &LocationId,
890 in_ident: syn::Ident,
891 _in_kind: &CollectionKind,
892 out_ident: &syn::Ident,
893 _op_meta: &HydroIrOpMetadata,
894 ) {
895 let builder = self.get_dfir_mut(location);
896 builder.add_dfir(
897 parse_quote! {
898 #out_ident = #in_ident;
899 },
900 None,
901 None,
902 );
903 }
904}
905
906#[cfg(feature = "build")]
907pub enum BuildersOrCallback<'a, L, N>
908where
909 L: FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
910 N: FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
911{
912 Builders(&'a mut dyn DfirBuilder),
913 Callback(L, N),
914}
915
916#[derive(Debug, Hash, serde::Serialize)]
920pub enum HydroRoot {
921 ForEach {
922 f: ClosureExpr,
923 input: Box<HydroNode>,
924 op_metadata: HydroIrOpMetadata,
925 },
926 SendExternal {
927 to_external_key: LocationKey,
928 to_port_id: ExternalPortId,
929 to_many: bool,
930 unpaired: bool,
931 serialize_fn: Option<DebugExpr>,
932 instantiate_fn: DebugInstantiate,
933 input: Box<HydroNode>,
934 op_metadata: HydroIrOpMetadata,
935 },
936 DestSink {
937 sink: DebugExpr,
938 input: Box<HydroNode>,
939 op_metadata: HydroIrOpMetadata,
940 },
941 CycleSink {
942 cycle_id: CycleId,
943 input: Box<HydroNode>,
944 op_metadata: HydroIrOpMetadata,
945 },
946 EmbeddedOutput {
947 #[serde(serialize_with = "serialize_ident")]
948 ident: syn::Ident,
949 input: Box<HydroNode>,
950 op_metadata: HydroIrOpMetadata,
951 },
952 Null {
953 input: Box<HydroNode>,
954 op_metadata: HydroIrOpMetadata,
955 },
956}
957
958impl HydroRoot {
959 #[cfg(feature = "build")]
960 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
961 pub fn compile_network<'a, D>(
962 &mut self,
963 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
964 seen_tees: &mut SeenSharedNodes,
965 seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
966 processes: &SparseSecondaryMap<LocationKey, D::Process>,
967 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
968 externals: &SparseSecondaryMap<LocationKey, D::External>,
969 env: &mut D::InstantiateEnv,
970 ) where
971 D: Deploy<'a>,
972 {
973 let refcell_extra_stmts = RefCell::new(extra_stmts);
974 let refcell_env = RefCell::new(env);
975 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
976 self.transform_bottom_up(
977 &mut |l| {
978 if let HydroRoot::SendExternal {
979 #[cfg(feature = "tokio")]
980 input,
981 #[cfg(feature = "tokio")]
982 to_external_key,
983 #[cfg(feature = "tokio")]
984 to_port_id,
985 #[cfg(feature = "tokio")]
986 to_many,
987 #[cfg(feature = "tokio")]
988 unpaired,
989 #[cfg(feature = "tokio")]
990 instantiate_fn,
991 ..
992 } = l
993 {
994 #[cfg(feature = "tokio")]
995 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
996 DebugInstantiate::Building => {
997 let to_node = externals
998 .get(*to_external_key)
999 .unwrap_or_else(|| {
1000 panic!("A external used in the graph was not instantiated: {}", to_external_key)
1001 })
1002 .clone();
1003
1004 match input.metadata().location_id.root() {
1005 &LocationId::Process(process_key) => {
1006 if *to_many {
1007 (
1008 (
1009 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1010 parse_quote!(DUMMY),
1011 ),
1012 Box::new(|| {}) as Box<dyn FnOnce()>,
1013 )
1014 } else {
1015 let from_node = processes
1016 .get(process_key)
1017 .unwrap_or_else(|| {
1018 panic!("A process used in the graph was not instantiated: {}", process_key)
1019 })
1020 .clone();
1021
1022 let sink_port = from_node.next_port();
1023 let source_port = to_node.next_port();
1024
1025 if *unpaired {
1026 use stageleft::quote_type;
1027 use tokio_util::codec::LengthDelimitedCodec;
1028
1029 to_node.register(*to_port_id, source_port.clone());
1030
1031 let _ = D::e2o_source(
1032 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1033 &to_node, &source_port,
1034 &from_node, &sink_port,
1035 "e_type::<LengthDelimitedCodec>(),
1036 format!("{}_{}", *to_external_key, *to_port_id)
1037 );
1038 }
1039
1040 (
1041 (
1042 D::o2e_sink(
1043 &from_node,
1044 &sink_port,
1045 &to_node,
1046 &source_port,
1047 format!("{}_{}", *to_external_key, *to_port_id)
1048 ),
1049 parse_quote!(DUMMY),
1050 ),
1051 if *unpaired {
1052 D::e2o_connect(
1053 &to_node,
1054 &source_port,
1055 &from_node,
1056 &sink_port,
1057 *to_many,
1058 NetworkHint::Auto,
1059 )
1060 } else {
1061 Box::new(|| {}) as Box<dyn FnOnce()>
1062 },
1063 )
1064 }
1065 }
1066 LocationId::Cluster(cluster_key) => {
1067 let from_node = clusters
1068 .get(*cluster_key)
1069 .unwrap_or_else(|| {
1070 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1071 })
1072 .clone();
1073
1074 let sink_port = from_node.next_port();
1075 let source_port = to_node.next_port();
1076
1077 if *unpaired {
1078 to_node.register(*to_port_id, source_port.clone());
1079 }
1080
1081 (
1082 (
1083 D::m2e_sink(
1084 &from_node,
1085 &sink_port,
1086 &to_node,
1087 &source_port,
1088 format!("{}_{}", *to_external_key, *to_port_id)
1089 ),
1090 parse_quote!(DUMMY),
1091 ),
1092 Box::new(|| {}) as Box<dyn FnOnce()>,
1093 )
1094 }
1095 _ => panic!()
1096 }
1097 },
1098
1099 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1100 };
1101
1102 #[cfg(not(feature = "tokio"))]
1103 {
1104 panic!("Cannot instantiate external inputs without tokio");
1105 };
1106
1107 #[cfg(feature = "tokio")]
1108 {
1109 *instantiate_fn = DebugInstantiateFinalized {
1110 sink: sink_expr,
1111 source: source_expr,
1112 connect_fn: Some(connect_fn),
1113 }
1114 .into();
1115 };
1116 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1117 let element_type = match &input.metadata().collection_kind {
1118 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1119 _ => panic!("Embedded output must have Stream collection kind"),
1120 };
1121 let location_key = match input.metadata().location_id.root() {
1122 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1123 _ => panic!("Embedded output must be on a process or cluster"),
1124 };
1125 D::register_embedded_output(
1126 &mut refcell_env.borrow_mut(),
1127 location_key,
1128 ident,
1129 &element_type,
1130 );
1131 }
1132 },
1133 &mut |n| {
1134 if let HydroNode::Network {
1135 name,
1136 networking_info,
1137 input,
1138 instantiate_fn,
1139 metadata,
1140 ..
1141 } = n
1142 {
1143 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1144 DebugInstantiate::Building => instantiate_network::<D>(
1145 &mut refcell_env.borrow_mut(),
1146 input.metadata().location_id.root(),
1147 metadata.location_id.root(),
1148 processes,
1149 clusters,
1150 name.as_deref(),
1151 networking_info,
1152 ),
1153
1154 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1155 };
1156
1157 *instantiate_fn = DebugInstantiateFinalized {
1158 sink: sink_expr,
1159 source: source_expr,
1160 connect_fn: Some(connect_fn),
1161 }
1162 .into();
1163 } else if let HydroNode::ExternalInput {
1164 from_external_key,
1165 from_port_id,
1166 from_many,
1167 codec_type,
1168 port_hint,
1169 instantiate_fn,
1170 metadata,
1171 ..
1172 } = n
1173 {
1174 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1175 DebugInstantiate::Building => {
1176 let from_node = externals
1177 .get(*from_external_key)
1178 .unwrap_or_else(|| {
1179 panic!(
1180 "A external used in the graph was not instantiated: {}",
1181 from_external_key,
1182 )
1183 })
1184 .clone();
1185
1186 match metadata.location_id.root() {
1187 &LocationId::Process(process_key) => {
1188 let to_node = processes
1189 .get(process_key)
1190 .unwrap_or_else(|| {
1191 panic!("A process used in the graph was not instantiated: {}", process_key)
1192 })
1193 .clone();
1194
1195 let sink_port = from_node.next_port();
1196 let source_port = to_node.next_port();
1197
1198 from_node.register(*from_port_id, sink_port.clone());
1199
1200 (
1201 (
1202 parse_quote!(DUMMY),
1203 if *from_many {
1204 D::e2o_many_source(
1205 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1206 &to_node, &source_port,
1207 codec_type.0.as_ref(),
1208 format!("{}_{}", *from_external_key, *from_port_id)
1209 )
1210 } else {
1211 D::e2o_source(
1212 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1213 &from_node, &sink_port,
1214 &to_node, &source_port,
1215 codec_type.0.as_ref(),
1216 format!("{}_{}", *from_external_key, *from_port_id)
1217 )
1218 },
1219 ),
1220 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1221 )
1222 }
1223 LocationId::Cluster(cluster_key) => {
1224 let to_node = clusters
1225 .get(*cluster_key)
1226 .unwrap_or_else(|| {
1227 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1228 })
1229 .clone();
1230
1231 let sink_port = from_node.next_port();
1232 let source_port = to_node.next_port();
1233
1234 from_node.register(*from_port_id, sink_port.clone());
1235
1236 (
1237 (
1238 parse_quote!(DUMMY),
1239 D::e2m_source(
1240 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1241 &from_node, &sink_port,
1242 &to_node, &source_port,
1243 codec_type.0.as_ref(),
1244 format!("{}_{}", *from_external_key, *from_port_id)
1245 ),
1246 ),
1247 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1248 )
1249 }
1250 _ => panic!()
1251 }
1252 },
1253
1254 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1255 };
1256
1257 *instantiate_fn = DebugInstantiateFinalized {
1258 sink: sink_expr,
1259 source: source_expr,
1260 connect_fn: Some(connect_fn),
1261 }
1262 .into();
1263 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1264 let element_type = match &metadata.collection_kind {
1265 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1266 _ => panic!("Embedded source must have Stream collection kind"),
1267 };
1268 let location_key = match metadata.location_id.root() {
1269 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1270 _ => panic!("Embedded source must be on a process or cluster"),
1271 };
1272 D::register_embedded_stream_input(
1273 &mut refcell_env.borrow_mut(),
1274 location_key,
1275 ident,
1276 &element_type,
1277 );
1278 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1279 let element_type = match &metadata.collection_kind {
1280 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1281 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1282 };
1283 let location_key = match metadata.location_id.root() {
1284 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1285 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1286 };
1287 D::register_embedded_singleton_input(
1288 &mut refcell_env.borrow_mut(),
1289 location_key,
1290 ident,
1291 &element_type,
1292 );
1293 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1294 match state {
1295 ClusterMembersState::Uninit => {
1296 let at_location = metadata.location_id.root().clone();
1297 let key = (at_location.clone(), location_id.key());
1298 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1299 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1301 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1302 &(),
1303 );
1304 *state = ClusterMembersState::Stream(expr.into());
1305 } else {
1306 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1308 }
1309 }
1310 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1311 panic!("cluster members already finalized");
1312 }
1313 }
1314 }
1315 },
1316 seen_tees,
1317 false,
1318 );
1319 }
1320
1321 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1322 self.transform_bottom_up(
1323 &mut |l| {
1324 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1325 match instantiate_fn {
1326 DebugInstantiate::Building => panic!("network not built"),
1327
1328 DebugInstantiate::Finalized(finalized) => {
1329 (finalized.connect_fn.take().unwrap())();
1330 }
1331 }
1332 }
1333 },
1334 &mut |n| {
1335 if let HydroNode::Network { instantiate_fn, .. }
1336 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1337 {
1338 match instantiate_fn {
1339 DebugInstantiate::Building => panic!("network not built"),
1340
1341 DebugInstantiate::Finalized(finalized) => {
1342 (finalized.connect_fn.take().unwrap())();
1343 }
1344 }
1345 }
1346 },
1347 seen_tees,
1348 false,
1349 );
1350 }
1351
1352 pub fn transform_bottom_up(
1353 &mut self,
1354 transform_root: &mut impl FnMut(&mut HydroRoot),
1355 transform_node: &mut impl FnMut(&mut HydroNode),
1356 seen_tees: &mut SeenSharedNodes,
1357 check_well_formed: bool,
1358 ) {
1359 self.transform_children(
1360 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1361 seen_tees,
1362 );
1363
1364 transform_root(self);
1365 }
1366
1367 pub fn transform_children(
1368 &mut self,
1369 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1370 seen_tees: &mut SeenSharedNodes,
1371 ) {
1372 match self {
1373 HydroRoot::ForEach { f, input, .. } => {
1374 f.transform_children(&mut transform, seen_tees);
1375 transform(input, seen_tees);
1376 }
1377 HydroRoot::SendExternal { input, .. }
1378 | HydroRoot::DestSink { input, .. }
1379 | HydroRoot::CycleSink { input, .. }
1380 | HydroRoot::EmbeddedOutput { input, .. }
1381 | HydroRoot::Null { input, .. } => {
1382 transform(input, seen_tees);
1383 }
1384 }
1385 }
1386
1387 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1388 match self {
1389 HydroRoot::ForEach {
1390 f,
1391 input,
1392 op_metadata,
1393 } => HydroRoot::ForEach {
1394 f: f.deep_clone(seen_tees),
1395 input: Box::new(input.deep_clone(seen_tees)),
1396 op_metadata: op_metadata.clone(),
1397 },
1398 HydroRoot::SendExternal {
1399 to_external_key,
1400 to_port_id,
1401 to_many,
1402 unpaired,
1403 serialize_fn,
1404 instantiate_fn,
1405 input,
1406 op_metadata,
1407 } => HydroRoot::SendExternal {
1408 to_external_key: *to_external_key,
1409 to_port_id: *to_port_id,
1410 to_many: *to_many,
1411 unpaired: *unpaired,
1412 serialize_fn: serialize_fn.clone(),
1413 instantiate_fn: instantiate_fn.clone(),
1414 input: Box::new(input.deep_clone(seen_tees)),
1415 op_metadata: op_metadata.clone(),
1416 },
1417 HydroRoot::DestSink {
1418 sink,
1419 input,
1420 op_metadata,
1421 } => HydroRoot::DestSink {
1422 sink: sink.clone(),
1423 input: Box::new(input.deep_clone(seen_tees)),
1424 op_metadata: op_metadata.clone(),
1425 },
1426 HydroRoot::CycleSink {
1427 cycle_id,
1428 input,
1429 op_metadata,
1430 } => HydroRoot::CycleSink {
1431 cycle_id: *cycle_id,
1432 input: Box::new(input.deep_clone(seen_tees)),
1433 op_metadata: op_metadata.clone(),
1434 },
1435 HydroRoot::EmbeddedOutput {
1436 ident,
1437 input,
1438 op_metadata,
1439 } => HydroRoot::EmbeddedOutput {
1440 ident: ident.clone(),
1441 input: Box::new(input.deep_clone(seen_tees)),
1442 op_metadata: op_metadata.clone(),
1443 },
1444 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1445 input: Box::new(input.deep_clone(seen_tees)),
1446 op_metadata: op_metadata.clone(),
1447 },
1448 }
1449 }
1450
1451 #[cfg(feature = "build")]
1452 pub fn emit(
1453 &mut self,
1454 graph_builders: &mut dyn DfirBuilder,
1455 seen_tees: &mut SeenSharedNodes,
1456 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1457 next_stmt_id: &mut crate::Counter<StmtId>,
1458 fold_hooked_idents: &mut HashSet<String>,
1459 ) {
1460 self.emit_core(
1461 &mut BuildersOrCallback::<
1462 fn(&mut HydroRoot, &mut crate::Counter<StmtId>),
1463 fn(&mut HydroNode, &mut crate::Counter<StmtId>),
1464 >::Builders(graph_builders),
1465 seen_tees,
1466 built_tees,
1467 next_stmt_id,
1468 fold_hooked_idents,
1469 );
1470 }
1471
1472 #[cfg(feature = "build")]
1473 pub fn emit_core(
1474 &mut self,
1475 builders_or_callback: &mut BuildersOrCallback<
1476 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1477 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1478 >,
1479 seen_tees: &mut SeenSharedNodes,
1480 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1481 next_stmt_id: &mut crate::Counter<StmtId>,
1482 fold_hooked_idents: &mut HashSet<String>,
1483 ) {
1484 match self {
1485 HydroRoot::ForEach { f, input, .. } => {
1486 let input_ident = input.emit_core(
1487 builders_or_callback,
1488 seen_tees,
1489 built_tees,
1490 next_stmt_id,
1491 fold_hooked_idents,
1492 );
1493
1494 let input_ident = maybe_observe_for_mut(
1495 f,
1496 input_ident,
1497 &input.metadata().location_id,
1498 &input.metadata().collection_kind,
1499 &input.metadata().op,
1500 builders_or_callback,
1501 next_stmt_id,
1502 );
1503
1504 let stmt_id = next_stmt_id.get_and_increment();
1505
1506 match builders_or_callback {
1507 BuildersOrCallback::Builders(graph_builders) => {
1508 let mut ident_stack: Vec<syn::Ident> = Vec::new();
1509
1510 for (ref_node, _is_mut) in f.singleton_refs.iter() {
1512 let HydroNode::Reference { inner, .. } = ref_node else {
1513 panic!("singleton_refs should only contain HydroNode::Reference");
1514 };
1515 let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
1516 let idents = built_tees.get(&ptr).expect(
1517 "ForEach singleton ref not found in built_tees — ref node was not emitted",
1518 );
1519 ident_stack.push(idents[0].clone());
1520 }
1521
1522 let f_tokens = f.emit_tokens(&mut ident_stack);
1523
1524 graph_builders
1525 .get_dfir_mut(&input.metadata().location_id)
1526 .add_dfir(
1527 parse_quote! {
1528 #input_ident -> for_each(#f_tokens);
1529 },
1530 None,
1531 Some(&stmt_id.to_string()),
1532 );
1533 }
1534 BuildersOrCallback::Callback(leaf_callback, _) => {
1535 leaf_callback(self, next_stmt_id);
1536 }
1537 }
1538 }
1539
1540 HydroRoot::SendExternal {
1541 serialize_fn,
1542 instantiate_fn,
1543 input,
1544 ..
1545 } => {
1546 let input_ident = input.emit_core(
1547 builders_or_callback,
1548 seen_tees,
1549 built_tees,
1550 next_stmt_id,
1551 fold_hooked_idents,
1552 );
1553
1554 let stmt_id = next_stmt_id.get_and_increment();
1555
1556 match builders_or_callback {
1557 BuildersOrCallback::Builders(graph_builders) => {
1558 let (sink_expr, _) = match instantiate_fn {
1559 DebugInstantiate::Building => (
1560 syn::parse_quote!(DUMMY_SINK),
1561 syn::parse_quote!(DUMMY_SOURCE),
1562 ),
1563
1564 DebugInstantiate::Finalized(finalized) => {
1565 (finalized.sink.clone(), finalized.source.clone())
1566 }
1567 };
1568
1569 graph_builders.create_external_output(
1570 &input.metadata().location_id,
1571 sink_expr,
1572 &input_ident,
1573 serialize_fn.as_ref(),
1574 stmt_id,
1575 );
1576 }
1577 BuildersOrCallback::Callback(leaf_callback, _) => {
1578 leaf_callback(self, next_stmt_id);
1579 }
1580 }
1581 }
1582
1583 HydroRoot::DestSink { sink, input, .. } => {
1584 let input_ident = input.emit_core(
1585 builders_or_callback,
1586 seen_tees,
1587 built_tees,
1588 next_stmt_id,
1589 fold_hooked_idents,
1590 );
1591
1592 let stmt_id = next_stmt_id.get_and_increment();
1593
1594 match builders_or_callback {
1595 BuildersOrCallback::Builders(graph_builders) => {
1596 graph_builders
1597 .get_dfir_mut(&input.metadata().location_id)
1598 .add_dfir(
1599 parse_quote! {
1600 #input_ident -> dest_sink(#sink);
1601 },
1602 None,
1603 Some(&stmt_id.to_string()),
1604 );
1605 }
1606 BuildersOrCallback::Callback(leaf_callback, _) => {
1607 leaf_callback(self, next_stmt_id);
1608 }
1609 }
1610 }
1611
1612 HydroRoot::CycleSink {
1613 cycle_id, input, ..
1614 } => {
1615 let input_ident = input.emit_core(
1616 builders_or_callback,
1617 seen_tees,
1618 built_tees,
1619 next_stmt_id,
1620 fold_hooked_idents,
1621 );
1622
1623 match builders_or_callback {
1624 BuildersOrCallback::Builders(graph_builders) => {
1625 let elem_type: syn::Type = match &input.metadata().collection_kind {
1626 CollectionKind::KeyedSingleton {
1627 key_type,
1628 value_type,
1629 ..
1630 }
1631 | CollectionKind::KeyedStream {
1632 key_type,
1633 value_type,
1634 ..
1635 } => {
1636 parse_quote!((#key_type, #value_type))
1637 }
1638 CollectionKind::Stream { element_type, .. }
1639 | CollectionKind::Singleton { element_type, .. }
1640 | CollectionKind::Optional { element_type, .. } => {
1641 parse_quote!(#element_type)
1642 }
1643 };
1644
1645 let cycle_id_ident = cycle_id.as_ident();
1646 graph_builders
1647 .get_dfir_mut(&input.metadata().location_id)
1648 .add_dfir(
1649 parse_quote! {
1650 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1651 },
1652 None,
1653 None,
1654 );
1655 }
1656 BuildersOrCallback::Callback(_, _) => {}
1658 }
1659 }
1660
1661 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1662 let input_ident = input.emit_core(
1663 builders_or_callback,
1664 seen_tees,
1665 built_tees,
1666 next_stmt_id,
1667 fold_hooked_idents,
1668 );
1669
1670 let stmt_id = next_stmt_id.get_and_increment();
1671
1672 match builders_or_callback {
1673 BuildersOrCallback::Builders(graph_builders) => {
1674 graph_builders
1675 .get_dfir_mut(&input.metadata().location_id)
1676 .add_dfir(
1677 parse_quote! {
1678 #input_ident -> for_each(&mut #ident);
1679 },
1680 None,
1681 Some(&stmt_id.to_string()),
1682 );
1683 }
1684 BuildersOrCallback::Callback(leaf_callback, _) => {
1685 leaf_callback(self, next_stmt_id);
1686 }
1687 }
1688 }
1689
1690 HydroRoot::Null { input, .. } => {
1691 let input_ident = input.emit_core(
1692 builders_or_callback,
1693 seen_tees,
1694 built_tees,
1695 next_stmt_id,
1696 fold_hooked_idents,
1697 );
1698
1699 let stmt_id = next_stmt_id.get_and_increment();
1700
1701 match builders_or_callback {
1702 BuildersOrCallback::Builders(graph_builders) => {
1703 graph_builders
1704 .get_dfir_mut(&input.metadata().location_id)
1705 .add_dfir(
1706 parse_quote! {
1707 #input_ident -> for_each(|_| {});
1708 },
1709 None,
1710 Some(&stmt_id.to_string()),
1711 );
1712 }
1713 BuildersOrCallback::Callback(leaf_callback, _) => {
1714 leaf_callback(self, next_stmt_id);
1715 }
1716 }
1717 }
1718 }
1719 }
1720
1721 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1722 match self {
1723 HydroRoot::ForEach { op_metadata, .. }
1724 | HydroRoot::SendExternal { op_metadata, .. }
1725 | HydroRoot::DestSink { op_metadata, .. }
1726 | HydroRoot::CycleSink { op_metadata, .. }
1727 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1728 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1729 }
1730 }
1731
1732 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1733 match self {
1734 HydroRoot::ForEach { op_metadata, .. }
1735 | HydroRoot::SendExternal { op_metadata, .. }
1736 | HydroRoot::DestSink { op_metadata, .. }
1737 | HydroRoot::CycleSink { op_metadata, .. }
1738 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1739 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1740 }
1741 }
1742
1743 pub fn input(&self) -> &HydroNode {
1744 match self {
1745 HydroRoot::ForEach { input, .. }
1746 | HydroRoot::SendExternal { input, .. }
1747 | HydroRoot::DestSink { input, .. }
1748 | HydroRoot::CycleSink { input, .. }
1749 | HydroRoot::EmbeddedOutput { input, .. }
1750 | HydroRoot::Null { input, .. } => input,
1751 }
1752 }
1753
1754 pub fn input_metadata(&self) -> &HydroIrMetadata {
1755 self.input().metadata()
1756 }
1757
1758 pub fn print_root(&self) -> String {
1759 match self {
1760 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1761 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1762 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1763 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1764 HydroRoot::EmbeddedOutput { ident, .. } => {
1765 format!("EmbeddedOutput({})", ident)
1766 }
1767 HydroRoot::Null { .. } => "Null".to_owned(),
1768 }
1769 }
1770
1771 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1772 match self {
1773 HydroRoot::ForEach { f, .. } => {
1774 transform(&mut f.expr);
1775 }
1776 HydroRoot::DestSink { sink, .. } => {
1777 transform(sink);
1778 }
1779 HydroRoot::SendExternal { .. }
1780 | HydroRoot::CycleSink { .. }
1781 | HydroRoot::EmbeddedOutput { .. }
1782 | HydroRoot::Null { .. } => {}
1783 }
1784 }
1785}
1786
1787#[cfg(feature = "build")]
1788fn tick_of(loc: &LocationId) -> Option<ClockId> {
1789 match loc {
1790 LocationId::Tick(id, _) => Some(*id),
1791 LocationId::Atomic(inner) => tick_of(inner),
1792 _ => None,
1793 }
1794}
1795
1796#[cfg(feature = "build")]
1797fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1798 match loc {
1799 LocationId::Tick(id, inner) => {
1800 *id = uf_find(uf, *id);
1801 remap_location(inner, uf);
1802 }
1803 LocationId::Atomic(inner) => {
1804 remap_location(inner, uf);
1805 }
1806 LocationId::Process(_) | LocationId::Cluster(_) => {}
1807 }
1808}
1809
1810#[cfg(feature = "build")]
1811fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1812 let p = *parent.get(&x).unwrap_or(&x);
1813 if p == x {
1814 return x;
1815 }
1816 let root = uf_find(parent, p);
1817 parent.insert(x, root);
1818 root
1819}
1820
1821#[cfg(feature = "build")]
1822fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1823 let ra = uf_find(parent, a);
1824 let rb = uf_find(parent, b);
1825 if ra != rb {
1826 parent.insert(ra, rb);
1827 }
1828}
1829
1830#[cfg(feature = "build")]
1834pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1835 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1836
1837 transform_bottom_up(
1839 ir,
1840 &mut |_| {},
1841 &mut |node: &mut HydroNode| match node {
1842 HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1843 if let (Some(a), Some(b)) = (
1844 tick_of(&inner.metadata().location_id),
1845 tick_of(&metadata.location_id),
1846 ) {
1847 uf_union(&mut uf, a, b);
1848 }
1849 }
1850 HydroNode::Chain {
1851 first,
1852 second,
1853 metadata,
1854 }
1855 | HydroNode::ChainFirst {
1856 first,
1857 second,
1858 metadata,
1859 }
1860 | HydroNode::MergeOrdered {
1861 first,
1862 second,
1863 metadata,
1864 } => {
1865 if let (Some(a), Some(b)) = (
1866 tick_of(&first.metadata().location_id),
1867 tick_of(&metadata.location_id),
1868 ) {
1869 uf_union(&mut uf, a, b);
1870 }
1871 if let (Some(a), Some(b)) = (
1872 tick_of(&second.metadata().location_id),
1873 tick_of(&metadata.location_id),
1874 ) {
1875 uf_union(&mut uf, a, b);
1876 }
1877 }
1878 _ => {}
1879 },
1880 false,
1881 );
1882
1883 transform_bottom_up(
1885 ir,
1886 &mut |_| {},
1887 &mut |node: &mut HydroNode| {
1888 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1889 },
1890 false,
1891 );
1892}
1893
1894#[cfg(feature = "build")]
1895pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1896 let mut builders = SecondaryMap::new();
1897 let mut seen_tees = HashMap::new();
1898 let mut built_tees = HashMap::new();
1899 let mut next_stmt_id = crate::Counter::<StmtId>::default();
1900 let mut fold_hooked_idents = HashSet::new();
1901 for leaf in ir {
1902 leaf.emit(
1903 &mut builders,
1904 &mut seen_tees,
1905 &mut built_tees,
1906 &mut next_stmt_id,
1907 &mut fold_hooked_idents,
1908 );
1909 }
1910 builders
1911}
1912
1913#[cfg(feature = "build")]
1914pub fn traverse_dfir(
1915 ir: &mut [HydroRoot],
1916 transform_root: impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1917 transform_node: impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1918) {
1919 let mut seen_tees = HashMap::new();
1920 let mut built_tees = HashMap::new();
1921 let mut next_stmt_id = crate::Counter::<StmtId>::default();
1922 let mut fold_hooked_idents = HashSet::new();
1923 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1924 ir.iter_mut().for_each(|leaf| {
1925 leaf.emit_core(
1926 &mut callback,
1927 &mut seen_tees,
1928 &mut built_tees,
1929 &mut next_stmt_id,
1930 &mut fold_hooked_idents,
1931 );
1932 });
1933}
1934
1935pub fn transform_bottom_up(
1936 ir: &mut [HydroRoot],
1937 transform_root: &mut impl FnMut(&mut HydroRoot),
1938 transform_node: &mut impl FnMut(&mut HydroNode),
1939 check_well_formed: bool,
1940) {
1941 let mut seen_tees = HashMap::new();
1942 ir.iter_mut().for_each(|leaf| {
1943 leaf.transform_bottom_up(
1944 transform_root,
1945 transform_node,
1946 &mut seen_tees,
1947 check_well_formed,
1948 );
1949 });
1950}
1951
1952pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1953 let mut seen_tees = HashMap::new();
1954 ir.iter()
1955 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1956 .collect()
1957}
1958
1959type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1960thread_local! {
1961 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1962 static SERIALIZED_SHARED: PrintedTees
1966 = const { RefCell::new(None) };
1967}
1968
1969pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1970 PRINTED_TEES.with(|printed_tees| {
1971 let mut printed_tees_mut = printed_tees.borrow_mut();
1972 *printed_tees_mut = Some((0, HashMap::new()));
1973 drop(printed_tees_mut);
1974
1975 let ret = f();
1976
1977 let mut printed_tees_mut = printed_tees.borrow_mut();
1978 *printed_tees_mut = None;
1979
1980 ret
1981 })
1982}
1983
1984pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1989 let _guard = SerializedSharedGuard::enter();
1990 f()
1991}
1992
1993struct SerializedSharedGuard {
1996 previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1997}
1998
1999impl SerializedSharedGuard {
2000 fn enter() -> Self {
2001 let previous = SERIALIZED_SHARED.with(|cell| {
2002 let mut guard = cell.borrow_mut();
2003 guard.replace((0, HashMap::new()))
2004 });
2005 Self { previous }
2006 }
2007}
2008
2009impl Drop for SerializedSharedGuard {
2010 fn drop(&mut self) {
2011 SERIALIZED_SHARED.with(|cell| {
2012 *cell.borrow_mut() = self.previous.take();
2013 });
2014 }
2015}
2016
2017pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
2018
2019impl serde::Serialize for SharedNode {
2020 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2031 SERIALIZED_SHARED.with(|cell| {
2032 let mut guard = cell.borrow_mut();
2033 let state = guard.as_mut().ok_or_else(|| {
2035 serde::ser::Error::custom(
2036 "SharedNode serialization requires an active serialize_dedup_shared scope",
2037 )
2038 })?;
2039 let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2040
2041 if let Some(&id) = state.1.get(&ptr) {
2042 drop(guard);
2043 use serde::ser::SerializeMap;
2044 let mut map = serializer.serialize_map(Some(1))?;
2045 map.serialize_entry("$shared_ref", &id)?;
2046 map.end()
2047 } else {
2048 let id = state.0;
2049 state.0 += 1;
2050 state.1.insert(ptr, id);
2051 drop(guard);
2052
2053 use serde::ser::SerializeMap;
2054 let mut map = serializer.serialize_map(Some(2))?;
2055 map.serialize_entry("$shared", &id)?;
2056 map.serialize_entry("node", &*self.0.borrow())?;
2057 map.end()
2058 }
2059 })
2060 }
2061}
2062
2063impl SharedNode {
2064 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2065 Rc::as_ptr(&self.0)
2066 }
2067}
2068
2069impl Debug for SharedNode {
2070 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2071 PRINTED_TEES.with(|printed_tees| {
2072 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2073 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2074
2075 if let Some(printed_tees_mut) = printed_tees_mut {
2076 if let Some(existing) = printed_tees_mut
2077 .1
2078 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2079 {
2080 write!(f, "<shared {}>", existing)
2081 } else {
2082 let next_id = printed_tees_mut.0;
2083 printed_tees_mut.0 += 1;
2084 printed_tees_mut
2085 .1
2086 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2087 drop(printed_tees_mut_borrow);
2088 write!(f, "<shared {}>: ", next_id)?;
2089 Debug::fmt(&self.0.borrow(), f)
2090 }
2091 } else {
2092 drop(printed_tees_mut_borrow);
2093 write!(f, "<shared>: ")?;
2094 Debug::fmt(&self.0.borrow(), f)
2095 }
2096 })
2097 }
2098}
2099
2100impl Hash for SharedNode {
2101 fn hash<H: Hasher>(&self, state: &mut H) {
2102 self.0.borrow_mut().hash(state);
2103 }
2104}
2105
2106#[derive(Debug)]
2111pub enum AccessCounter {
2112 Counting(Cell<u32>),
2113 Frozen(u32),
2114}
2115
2116impl AccessCounter {
2117 pub fn new() -> Self {
2118 Self::Counting(Cell::new(0))
2119 }
2120
2121 pub fn next_group(&self, is_mut: bool) -> Self {
2125 let AccessCounter::Counting(count) = self else {
2126 panic!("Cannot count on `AccessCounter::Frozen`");
2127 };
2128 let c = if is_mut {
2129 let c = count.get() + 1;
2130 count.set(c + 1);
2131 c
2132 } else {
2133 count.get()
2134 };
2135 Self::Frozen(c)
2136 }
2137
2138 pub fn freeze(&self) -> Self {
2140 Self::Frozen(match self {
2141 Self::Counting(count) => count.get(),
2142 Self::Frozen(count) => *count,
2143 })
2144 }
2145
2146 pub fn frozen_group(&self) -> u32 {
2147 let Self::Frozen(count) = self else {
2148 panic!("`AccessCounter` not frozen");
2149 };
2150 *count
2151 }
2152}
2153
2154impl Default for AccessCounter {
2155 fn default() -> Self {
2156 Self::new()
2157 }
2158}
2159
2160impl Hash for AccessCounter {
2161 fn hash<H: Hasher>(&self, _state: &mut H) {
2162 }
2164}
2165
2166impl serde::Serialize for AccessCounter {
2167 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2168 let count = match self {
2169 AccessCounter::Counting(count) => count.get(),
2170 AccessCounter::Frozen(count) => *count,
2171 };
2172 count.serialize(serializer)
2173 }
2174}
2175
2176#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2177pub enum BoundKind {
2178 Unbounded,
2179 Bounded,
2180}
2181
2182#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2183pub enum StreamOrder {
2184 NoOrder,
2185 TotalOrder,
2186}
2187
2188#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2189pub enum StreamRetry {
2190 AtLeastOnce,
2191 ExactlyOnce,
2192}
2193
2194#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2195pub enum KeyedSingletonBoundKind {
2196 Unbounded,
2197 MonotonicKeys,
2198 MonotonicValue,
2199 BoundedValue,
2200 Bounded,
2201}
2202
2203#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2204pub enum SingletonBoundKind {
2205 Unbounded,
2206 Monotonic,
2207 Bounded,
2208}
2209
2210#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2211pub enum CollectionKind {
2212 Stream {
2213 bound: BoundKind,
2214 order: StreamOrder,
2215 retry: StreamRetry,
2216 element_type: DebugType,
2217 },
2218 Singleton {
2219 bound: SingletonBoundKind,
2220 element_type: DebugType,
2221 },
2222 Optional {
2223 bound: BoundKind,
2224 element_type: DebugType,
2225 },
2226 KeyedStream {
2227 bound: BoundKind,
2228 value_order: StreamOrder,
2229 value_retry: StreamRetry,
2230 key_type: DebugType,
2231 value_type: DebugType,
2232 },
2233 KeyedSingleton {
2234 bound: KeyedSingletonBoundKind,
2235 key_type: DebugType,
2236 value_type: DebugType,
2237 },
2238}
2239
2240impl CollectionKind {
2241 pub fn is_bounded(&self) -> bool {
2242 matches!(
2243 self,
2244 CollectionKind::Stream {
2245 bound: BoundKind::Bounded,
2246 ..
2247 } | CollectionKind::Singleton {
2248 bound: SingletonBoundKind::Bounded,
2249 ..
2250 } | CollectionKind::Optional {
2251 bound: BoundKind::Bounded,
2252 ..
2253 } | CollectionKind::KeyedStream {
2254 bound: BoundKind::Bounded,
2255 ..
2256 } | CollectionKind::KeyedSingleton {
2257 bound: KeyedSingletonBoundKind::Bounded,
2258 ..
2259 }
2260 )
2261 }
2262
2263 pub fn is_strict(&self) -> bool {
2266 match self {
2267 CollectionKind::Stream { order, retry, .. } => {
2268 *order == StreamOrder::TotalOrder && *retry == StreamRetry::ExactlyOnce
2269 }
2270 CollectionKind::KeyedStream {
2271 value_order,
2272 value_retry,
2273 ..
2274 } => {
2275 *value_order == StreamOrder::TotalOrder && *value_retry == StreamRetry::ExactlyOnce
2276 }
2277 CollectionKind::Singleton { .. }
2280 | CollectionKind::Optional { .. }
2281 | CollectionKind::KeyedSingleton { .. } => true,
2282 }
2283 }
2284
2285 pub fn strict_kind(&self) -> CollectionKind {
2287 match self {
2288 CollectionKind::Stream {
2289 bound,
2290 element_type,
2291 ..
2292 } => CollectionKind::Stream {
2293 bound: bound.clone(),
2294 order: StreamOrder::TotalOrder,
2295 retry: StreamRetry::ExactlyOnce,
2296 element_type: element_type.clone(),
2297 },
2298 CollectionKind::KeyedStream {
2299 bound,
2300 key_type,
2301 value_type,
2302 ..
2303 } => CollectionKind::KeyedStream {
2304 bound: bound.clone(),
2305 value_order: StreamOrder::TotalOrder,
2306 value_retry: StreamRetry::ExactlyOnce,
2307 key_type: key_type.clone(),
2308 value_type: value_type.clone(),
2309 },
2310 other => other.clone(),
2311 }
2312 }
2313}
2314
2315#[derive(Clone, serde::Serialize)]
2316pub struct HydroIrMetadata {
2317 pub location_id: LocationId,
2318 pub collection_kind: CollectionKind,
2319 pub consistency: Option<ClusterConsistency>,
2320 pub cardinality: Option<usize>,
2321 pub tag: Option<String>,
2322 pub op: HydroIrOpMetadata,
2323}
2324
2325impl Hash for HydroIrMetadata {
2327 fn hash<H: Hasher>(&self, _: &mut H) {}
2328}
2329
2330impl PartialEq for HydroIrMetadata {
2331 fn eq(&self, _: &Self) -> bool {
2332 true
2333 }
2334}
2335
2336impl Eq for HydroIrMetadata {}
2337
2338impl Debug for HydroIrMetadata {
2339 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2340 f.debug_struct("HydroIrMetadata")
2341 .field("location_id", &self.location_id)
2342 .field("collection_kind", &self.collection_kind)
2343 .finish()
2344 }
2345}
2346
2347#[derive(Clone, serde::Serialize)]
2350pub struct HydroIrOpMetadata {
2351 #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2352 pub backtrace: Backtrace,
2353 pub cpu_usage: Option<f64>,
2354 pub network_recv_cpu_usage: Option<f64>,
2355 pub id: Option<usize>,
2356}
2357
2358impl HydroIrOpMetadata {
2359 #[expect(
2360 clippy::new_without_default,
2361 reason = "explicit calls to new ensure correct backtrace bounds"
2362 )]
2363 pub fn new() -> HydroIrOpMetadata {
2364 Self::new_with_skip(1)
2365 }
2366
2367 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2368 HydroIrOpMetadata {
2369 backtrace: Backtrace::get_backtrace(2 + skip_count),
2370 cpu_usage: None,
2371 network_recv_cpu_usage: None,
2372 id: None,
2373 }
2374 }
2375}
2376
2377impl Debug for HydroIrOpMetadata {
2378 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2379 f.debug_struct("HydroIrOpMetadata").finish()
2380 }
2381}
2382
2383impl Hash for HydroIrOpMetadata {
2384 fn hash<H: Hasher>(&self, _: &mut H) {}
2385}
2386
2387#[derive(Debug, Hash, serde::Serialize)]
2390pub enum HydroNode {
2391 Placeholder,
2392
2393 Cast {
2401 inner: Box<HydroNode>,
2402 metadata: HydroIrMetadata,
2403 },
2404
2405 ObserveNonDet {
2411 inner: Box<HydroNode>,
2412 trusted: bool, metadata: HydroIrMetadata,
2414 },
2415
2416 Source {
2417 source: HydroSource,
2418 metadata: HydroIrMetadata,
2419 },
2420
2421 SingletonSource {
2422 value: DebugExpr,
2423 first_tick_only: bool,
2424 metadata: HydroIrMetadata,
2425 },
2426
2427 CycleSource {
2428 cycle_id: CycleId,
2429 metadata: HydroIrMetadata,
2430 },
2431
2432 Tee {
2433 inner: SharedNode,
2434 metadata: HydroIrMetadata,
2435 },
2436
2437 Reference {
2446 inner: SharedNode,
2447 kind: crate::handoff_ref::HandoffRefKind,
2448 access_counter: AccessCounter,
2449 metadata: HydroIrMetadata,
2450 },
2451
2452 Partition {
2453 inner: SharedNode,
2454 f: ClosureExpr,
2455 is_true: bool,
2456 metadata: HydroIrMetadata,
2457 },
2458
2459 BeginAtomic {
2460 inner: Box<HydroNode>,
2461 metadata: HydroIrMetadata,
2462 },
2463
2464 EndAtomic {
2465 inner: Box<HydroNode>,
2466 metadata: HydroIrMetadata,
2467 },
2468
2469 Batch {
2470 inner: Box<HydroNode>,
2471 metadata: HydroIrMetadata,
2472 },
2473
2474 YieldConcat {
2475 inner: Box<HydroNode>,
2476 metadata: HydroIrMetadata,
2477 },
2478
2479 Chain {
2480 first: Box<HydroNode>,
2481 second: Box<HydroNode>,
2482 metadata: HydroIrMetadata,
2483 },
2484
2485 MergeOrdered {
2486 first: Box<HydroNode>,
2487 second: Box<HydroNode>,
2488 metadata: HydroIrMetadata,
2489 },
2490
2491 ChainFirst {
2492 first: Box<HydroNode>,
2493 second: Box<HydroNode>,
2494 metadata: HydroIrMetadata,
2495 },
2496
2497 CrossProduct {
2498 left: Box<HydroNode>,
2499 right: Box<HydroNode>,
2500 metadata: HydroIrMetadata,
2501 },
2502
2503 CrossSingleton {
2504 left: Box<HydroNode>,
2505 right: Box<HydroNode>,
2506 metadata: HydroIrMetadata,
2507 },
2508
2509 Join {
2510 left: Box<HydroNode>,
2511 right: Box<HydroNode>,
2512 metadata: HydroIrMetadata,
2513 },
2514
2515 JoinHalf {
2519 left: Box<HydroNode>,
2520 right: Box<HydroNode>,
2521 metadata: HydroIrMetadata,
2522 },
2523
2524 Difference {
2525 pos: Box<HydroNode>,
2526 neg: Box<HydroNode>,
2527 metadata: HydroIrMetadata,
2528 },
2529
2530 AntiJoin {
2531 pos: Box<HydroNode>,
2532 neg: Box<HydroNode>,
2533 metadata: HydroIrMetadata,
2534 },
2535
2536 ResolveFutures {
2537 input: Box<HydroNode>,
2538 metadata: HydroIrMetadata,
2539 },
2540 ResolveFuturesBlocking {
2541 input: Box<HydroNode>,
2542 metadata: HydroIrMetadata,
2543 },
2544 ResolveFuturesOrdered {
2545 input: Box<HydroNode>,
2546 metadata: HydroIrMetadata,
2547 },
2548
2549 Map {
2550 f: ClosureExpr,
2551 input: Box<HydroNode>,
2552 metadata: HydroIrMetadata,
2553 },
2554 FlatMap {
2555 f: ClosureExpr,
2556 input: Box<HydroNode>,
2557 metadata: HydroIrMetadata,
2558 },
2559 FlatMapStreamBlocking {
2560 f: ClosureExpr,
2561 input: Box<HydroNode>,
2562 metadata: HydroIrMetadata,
2563 },
2564 Filter {
2565 f: ClosureExpr,
2566 input: Box<HydroNode>,
2567 metadata: HydroIrMetadata,
2568 },
2569 FilterMap {
2570 f: ClosureExpr,
2571 input: Box<HydroNode>,
2572 metadata: HydroIrMetadata,
2573 },
2574
2575 DeferTick {
2576 input: Box<HydroNode>,
2577 metadata: HydroIrMetadata,
2578 },
2579 Enumerate {
2580 input: Box<HydroNode>,
2581 metadata: HydroIrMetadata,
2582 },
2583 Inspect {
2584 f: ClosureExpr,
2585 input: Box<HydroNode>,
2586 metadata: HydroIrMetadata,
2587 },
2588
2589 Unique {
2590 input: Box<HydroNode>,
2591 metadata: HydroIrMetadata,
2592 },
2593
2594 Sort {
2595 input: Box<HydroNode>,
2596 metadata: HydroIrMetadata,
2597 },
2598 Fold {
2599 init: ClosureExpr,
2600 acc: ClosureExpr,
2601 input: Box<HydroNode>,
2602 metadata: HydroIrMetadata,
2603 },
2604
2605 Scan {
2606 init: ClosureExpr,
2607 acc: ClosureExpr,
2608 input: Box<HydroNode>,
2609 metadata: HydroIrMetadata,
2610 },
2611 ScanAsyncBlocking {
2612 init: ClosureExpr,
2613 acc: ClosureExpr,
2614 input: Box<HydroNode>,
2615 metadata: HydroIrMetadata,
2616 },
2617 FoldKeyed {
2618 init: ClosureExpr,
2619 acc: ClosureExpr,
2620 input: Box<HydroNode>,
2621 metadata: HydroIrMetadata,
2622 },
2623
2624 Reduce {
2625 f: ClosureExpr,
2626 input: Box<HydroNode>,
2627 metadata: HydroIrMetadata,
2628 },
2629 ReduceKeyed {
2630 f: ClosureExpr,
2631 input: Box<HydroNode>,
2632 metadata: HydroIrMetadata,
2633 },
2634 ReduceKeyedWatermark {
2635 f: ClosureExpr,
2636 input: Box<HydroNode>,
2637 watermark: Box<HydroNode>,
2638 metadata: HydroIrMetadata,
2639 },
2640
2641 Network {
2642 name: Option<String>,
2643 networking_info: crate::networking::NetworkingInfo,
2644 serialize_fn: Option<DebugExpr>,
2645 instantiate_fn: DebugInstantiate,
2646 deserialize_fn: Option<DebugExpr>,
2647 input: Box<HydroNode>,
2648 metadata: HydroIrMetadata,
2649 },
2650
2651 ExternalInput {
2652 from_external_key: LocationKey,
2653 from_port_id: ExternalPortId,
2654 from_many: bool,
2655 codec_type: DebugType,
2656 #[serde(skip)]
2657 port_hint: NetworkHint,
2658 instantiate_fn: DebugInstantiate,
2659 deserialize_fn: Option<DebugExpr>,
2660 metadata: HydroIrMetadata,
2661 },
2662
2663 Counter {
2664 tag: String,
2665 duration: DebugExpr,
2666 prefix: String,
2667 input: Box<HydroNode>,
2668 metadata: HydroIrMetadata,
2669 },
2670
2671 AssertIsConsistent {
2672 inner: Box<HydroNode>,
2673 trusted: bool,
2674 metadata: HydroIrMetadata,
2675 },
2676
2677 UnboundSingleton {
2678 inner: Box<HydroNode>,
2679 metadata: HydroIrMetadata,
2680 },
2681}
2682
2683pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2684pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2685
2686#[cfg(feature = "build")]
2690fn maybe_observe_for_mut(
2691 f: &ClosureExpr,
2692 in_ident: syn::Ident,
2693 in_location: &LocationId,
2694 in_kind: &CollectionKind,
2695 op_meta: &HydroIrOpMetadata,
2696 builders_or_callback: &mut BuildersOrCallback<
2697 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
2698 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
2699 >,
2700 next_stmt_id: &mut crate::Counter<StmtId>,
2701) -> syn::Ident {
2702 if f.has_mut_ref() && !in_kind.is_strict() {
2703 let observe_stmt_id = next_stmt_id.get_and_increment();
2704 let observe_ident =
2705 syn::Ident::new(&format!("stream_{}", observe_stmt_id), Span::call_site());
2706 if let BuildersOrCallback::Builders(graph_builders) = builders_or_callback {
2707 graph_builders.observe_for_mut(in_location, in_ident, in_kind, &observe_ident, op_meta);
2708 }
2709 observe_ident
2710 } else {
2711 in_ident
2712 }
2713}
2714
2715impl HydroNode {
2716 pub fn transform_bottom_up(
2717 &mut self,
2718 transform: &mut impl FnMut(&mut HydroNode),
2719 seen_tees: &mut SeenSharedNodes,
2720 check_well_formed: bool,
2721 ) {
2722 self.transform_children(
2723 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2724 seen_tees,
2725 );
2726
2727 transform(self);
2728
2729 let self_location = self.metadata().location_id.root();
2730
2731 if check_well_formed {
2732 match &*self {
2733 HydroNode::Network { .. } => {}
2734 _ => {
2735 self.input_metadata().iter().for_each(|i| {
2736 if i.location_id.root() != self_location {
2737 panic!(
2738 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2739 i,
2740 i.location_id.root(),
2741 self,
2742 self_location
2743 )
2744 }
2745 });
2746 }
2747 }
2748 }
2749 }
2750
2751 #[inline(always)]
2752 pub fn transform_children(
2753 &mut self,
2754 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2755 seen_tees: &mut SeenSharedNodes,
2756 ) {
2757 match self {
2758 HydroNode::Placeholder => {
2759 panic!();
2760 }
2761
2762 HydroNode::Source { .. }
2763 | HydroNode::SingletonSource { .. }
2764 | HydroNode::CycleSource { .. }
2765 | HydroNode::ExternalInput { .. } => {}
2766
2767 HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => {
2768 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2769 *inner = SharedNode(transformed.clone());
2770 } else {
2771 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2772 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2773 let mut orig = inner.0.replace(HydroNode::Placeholder);
2774 transform(&mut orig, seen_tees);
2775 *transformed_cell.borrow_mut() = orig;
2776 *inner = SharedNode(transformed_cell);
2777 }
2778 }
2779
2780 HydroNode::Partition { inner, f, .. } => {
2781 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2782 *inner = SharedNode(transformed.clone());
2783 } else {
2784 f.transform_children(&mut transform, seen_tees);
2785 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2786 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2787 let mut orig = inner.0.replace(HydroNode::Placeholder);
2788 transform(&mut orig, seen_tees);
2789 *transformed_cell.borrow_mut() = orig;
2790 *inner = SharedNode(transformed_cell);
2791 }
2792 }
2793
2794 HydroNode::Cast { inner, .. }
2795 | HydroNode::ObserveNonDet { inner, .. }
2796 | HydroNode::BeginAtomic { inner, .. }
2797 | HydroNode::EndAtomic { inner, .. }
2798 | HydroNode::Batch { inner, .. }
2799 | HydroNode::YieldConcat { inner, .. }
2800 | HydroNode::UnboundSingleton { inner, .. }
2801 | HydroNode::AssertIsConsistent { inner, .. } => {
2802 transform(inner.as_mut(), seen_tees);
2803 }
2804
2805 HydroNode::Chain { first, second, .. } => {
2806 transform(first.as_mut(), seen_tees);
2807 transform(second.as_mut(), seen_tees);
2808 }
2809
2810 HydroNode::MergeOrdered { first, second, .. } => {
2811 transform(first.as_mut(), seen_tees);
2812 transform(second.as_mut(), seen_tees);
2813 }
2814
2815 HydroNode::ChainFirst { first, second, .. } => {
2816 transform(first.as_mut(), seen_tees);
2817 transform(second.as_mut(), seen_tees);
2818 }
2819
2820 HydroNode::CrossSingleton { left, right, .. }
2821 | HydroNode::CrossProduct { left, right, .. }
2822 | HydroNode::Join { left, right, .. }
2823 | HydroNode::JoinHalf { left, right, .. } => {
2824 transform(left.as_mut(), seen_tees);
2825 transform(right.as_mut(), seen_tees);
2826 }
2827
2828 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2829 transform(pos.as_mut(), seen_tees);
2830 transform(neg.as_mut(), seen_tees);
2831 }
2832
2833 HydroNode::Map { f, input, .. } => {
2834 f.transform_children(&mut transform, seen_tees);
2835 transform(input.as_mut(), seen_tees);
2836 }
2837 HydroNode::FlatMap { f, input, .. }
2838 | HydroNode::FlatMapStreamBlocking { f, input, .. }
2839 | HydroNode::Filter { f, input, .. }
2840 | HydroNode::FilterMap { f, input, .. }
2841 | HydroNode::Inspect { f, input, .. }
2842 | HydroNode::Reduce { f, input, .. }
2843 | HydroNode::ReduceKeyed { f, input, .. } => {
2844 f.transform_children(&mut transform, seen_tees);
2845 transform(input.as_mut(), seen_tees);
2846 }
2847 HydroNode::ReduceKeyedWatermark {
2848 f,
2849 input,
2850 watermark,
2851 ..
2852 } => {
2853 f.transform_children(&mut transform, seen_tees);
2854 transform(input.as_mut(), seen_tees);
2855 transform(watermark.as_mut(), seen_tees);
2856 }
2857 HydroNode::Fold {
2858 init, acc, input, ..
2859 }
2860 | HydroNode::Scan {
2861 init, acc, input, ..
2862 }
2863 | HydroNode::ScanAsyncBlocking {
2864 init, acc, input, ..
2865 }
2866 | HydroNode::FoldKeyed {
2867 init, acc, input, ..
2868 } => {
2869 init.transform_children(&mut transform, seen_tees);
2870 acc.transform_children(&mut transform, seen_tees);
2871 transform(input.as_mut(), seen_tees);
2872 }
2873 HydroNode::ResolveFutures { input, .. }
2874 | HydroNode::ResolveFuturesBlocking { input, .. }
2875 | HydroNode::ResolveFuturesOrdered { input, .. }
2876 | HydroNode::Sort { input, .. }
2877 | HydroNode::DeferTick { input, .. }
2878 | HydroNode::Enumerate { input, .. }
2879 | HydroNode::Unique { input, .. }
2880 | HydroNode::Network { input, .. }
2881 | HydroNode::Counter { input, .. } => {
2882 transform(input.as_mut(), seen_tees);
2883 }
2884 }
2885 }
2886
2887 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2888 match self {
2889 HydroNode::Placeholder => HydroNode::Placeholder,
2890 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2891 inner: Box::new(inner.deep_clone(seen_tees)),
2892 metadata: metadata.clone(),
2893 },
2894 HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2895 inner: Box::new(inner.deep_clone(seen_tees)),
2896 metadata: metadata.clone(),
2897 },
2898 HydroNode::ObserveNonDet {
2899 inner,
2900 trusted,
2901 metadata,
2902 } => HydroNode::ObserveNonDet {
2903 inner: Box::new(inner.deep_clone(seen_tees)),
2904 trusted: *trusted,
2905 metadata: metadata.clone(),
2906 },
2907 HydroNode::AssertIsConsistent {
2908 inner,
2909 trusted,
2910 metadata,
2911 } => HydroNode::AssertIsConsistent {
2912 inner: Box::new(inner.deep_clone(seen_tees)),
2913 trusted: *trusted,
2914 metadata: metadata.clone(),
2915 },
2916 HydroNode::Source { source, metadata } => HydroNode::Source {
2917 source: source.clone(),
2918 metadata: metadata.clone(),
2919 },
2920 HydroNode::SingletonSource {
2921 value,
2922 first_tick_only,
2923 metadata,
2924 } => HydroNode::SingletonSource {
2925 value: value.clone(),
2926 first_tick_only: *first_tick_only,
2927 metadata: metadata.clone(),
2928 },
2929 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2930 cycle_id: *cycle_id,
2931 metadata: metadata.clone(),
2932 },
2933 HydroNode::Tee { inner, metadata }
2934 | HydroNode::Reference {
2935 inner, metadata, ..
2936 } => {
2937 let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2938 SharedNode(transformed.clone())
2939 } else {
2940 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2941 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2942 let cloned = inner.0.borrow().deep_clone(seen_tees);
2943 *new_rc.borrow_mut() = cloned;
2944 SharedNode(new_rc)
2945 };
2946 if let HydroNode::Reference {
2947 kind,
2948 access_counter,
2949 ..
2950 } = self
2951 {
2952 HydroNode::Reference {
2953 inner: cloned_inner,
2954 kind: *kind,
2955 access_counter: access_counter.freeze(),
2956 metadata: metadata.clone(),
2957 }
2958 } else {
2959 HydroNode::Tee {
2960 inner: cloned_inner,
2961 metadata: metadata.clone(),
2962 }
2963 }
2964 }
2965 HydroNode::Partition {
2966 inner,
2967 f,
2968 is_true,
2969 metadata,
2970 } => {
2971 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2972 HydroNode::Partition {
2973 inner: SharedNode(transformed.clone()),
2974 f: f.deep_clone(seen_tees),
2975 is_true: *is_true,
2976 metadata: metadata.clone(),
2977 }
2978 } else {
2979 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2980 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2981 let cloned = inner.0.borrow().deep_clone(seen_tees);
2982 *new_rc.borrow_mut() = cloned;
2983 HydroNode::Partition {
2984 inner: SharedNode(new_rc),
2985 f: f.deep_clone(seen_tees),
2986 is_true: *is_true,
2987 metadata: metadata.clone(),
2988 }
2989 }
2990 }
2991 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2992 inner: Box::new(inner.deep_clone(seen_tees)),
2993 metadata: metadata.clone(),
2994 },
2995 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2996 inner: Box::new(inner.deep_clone(seen_tees)),
2997 metadata: metadata.clone(),
2998 },
2999 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
3000 inner: Box::new(inner.deep_clone(seen_tees)),
3001 metadata: metadata.clone(),
3002 },
3003 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
3004 inner: Box::new(inner.deep_clone(seen_tees)),
3005 metadata: metadata.clone(),
3006 },
3007 HydroNode::Chain {
3008 first,
3009 second,
3010 metadata,
3011 } => HydroNode::Chain {
3012 first: Box::new(first.deep_clone(seen_tees)),
3013 second: Box::new(second.deep_clone(seen_tees)),
3014 metadata: metadata.clone(),
3015 },
3016 HydroNode::MergeOrdered {
3017 first,
3018 second,
3019 metadata,
3020 } => HydroNode::MergeOrdered {
3021 first: Box::new(first.deep_clone(seen_tees)),
3022 second: Box::new(second.deep_clone(seen_tees)),
3023 metadata: metadata.clone(),
3024 },
3025 HydroNode::ChainFirst {
3026 first,
3027 second,
3028 metadata,
3029 } => HydroNode::ChainFirst {
3030 first: Box::new(first.deep_clone(seen_tees)),
3031 second: Box::new(second.deep_clone(seen_tees)),
3032 metadata: metadata.clone(),
3033 },
3034 HydroNode::CrossProduct {
3035 left,
3036 right,
3037 metadata,
3038 } => HydroNode::CrossProduct {
3039 left: Box::new(left.deep_clone(seen_tees)),
3040 right: Box::new(right.deep_clone(seen_tees)),
3041 metadata: metadata.clone(),
3042 },
3043 HydroNode::CrossSingleton {
3044 left,
3045 right,
3046 metadata,
3047 } => HydroNode::CrossSingleton {
3048 left: Box::new(left.deep_clone(seen_tees)),
3049 right: Box::new(right.deep_clone(seen_tees)),
3050 metadata: metadata.clone(),
3051 },
3052 HydroNode::Join {
3053 left,
3054 right,
3055 metadata,
3056 } => HydroNode::Join {
3057 left: Box::new(left.deep_clone(seen_tees)),
3058 right: Box::new(right.deep_clone(seen_tees)),
3059 metadata: metadata.clone(),
3060 },
3061 HydroNode::JoinHalf {
3062 left,
3063 right,
3064 metadata,
3065 } => HydroNode::JoinHalf {
3066 left: Box::new(left.deep_clone(seen_tees)),
3067 right: Box::new(right.deep_clone(seen_tees)),
3068 metadata: metadata.clone(),
3069 },
3070 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
3071 pos: Box::new(pos.deep_clone(seen_tees)),
3072 neg: Box::new(neg.deep_clone(seen_tees)),
3073 metadata: metadata.clone(),
3074 },
3075 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
3076 pos: Box::new(pos.deep_clone(seen_tees)),
3077 neg: Box::new(neg.deep_clone(seen_tees)),
3078 metadata: metadata.clone(),
3079 },
3080 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
3081 input: Box::new(input.deep_clone(seen_tees)),
3082 metadata: metadata.clone(),
3083 },
3084 HydroNode::ResolveFuturesBlocking { input, metadata } => {
3085 HydroNode::ResolveFuturesBlocking {
3086 input: Box::new(input.deep_clone(seen_tees)),
3087 metadata: metadata.clone(),
3088 }
3089 }
3090 HydroNode::ResolveFuturesOrdered { input, metadata } => {
3091 HydroNode::ResolveFuturesOrdered {
3092 input: Box::new(input.deep_clone(seen_tees)),
3093 metadata: metadata.clone(),
3094 }
3095 }
3096 HydroNode::Map { f, input, metadata } => HydroNode::Map {
3097 f: f.deep_clone(seen_tees),
3098 input: Box::new(input.deep_clone(seen_tees)),
3099 metadata: metadata.clone(),
3100 },
3101 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
3102 f: f.deep_clone(seen_tees),
3103 input: Box::new(input.deep_clone(seen_tees)),
3104 metadata: metadata.clone(),
3105 },
3106 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
3107 HydroNode::FlatMapStreamBlocking {
3108 f: f.deep_clone(seen_tees),
3109 input: Box::new(input.deep_clone(seen_tees)),
3110 metadata: metadata.clone(),
3111 }
3112 }
3113 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
3114 f: f.deep_clone(seen_tees),
3115 input: Box::new(input.deep_clone(seen_tees)),
3116 metadata: metadata.clone(),
3117 },
3118 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
3119 f: f.deep_clone(seen_tees),
3120 input: Box::new(input.deep_clone(seen_tees)),
3121 metadata: metadata.clone(),
3122 },
3123 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
3124 input: Box::new(input.deep_clone(seen_tees)),
3125 metadata: metadata.clone(),
3126 },
3127 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
3128 input: Box::new(input.deep_clone(seen_tees)),
3129 metadata: metadata.clone(),
3130 },
3131 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
3132 f: f.deep_clone(seen_tees),
3133 input: Box::new(input.deep_clone(seen_tees)),
3134 metadata: metadata.clone(),
3135 },
3136 HydroNode::Unique { input, metadata } => HydroNode::Unique {
3137 input: Box::new(input.deep_clone(seen_tees)),
3138 metadata: metadata.clone(),
3139 },
3140 HydroNode::Sort { input, metadata } => HydroNode::Sort {
3141 input: Box::new(input.deep_clone(seen_tees)),
3142 metadata: metadata.clone(),
3143 },
3144 HydroNode::Fold {
3145 init,
3146 acc,
3147 input,
3148 metadata,
3149 } => HydroNode::Fold {
3150 init: init.deep_clone(seen_tees),
3151 acc: acc.deep_clone(seen_tees),
3152 input: Box::new(input.deep_clone(seen_tees)),
3153 metadata: metadata.clone(),
3154 },
3155 HydroNode::Scan {
3156 init,
3157 acc,
3158 input,
3159 metadata,
3160 } => HydroNode::Scan {
3161 init: init.deep_clone(seen_tees),
3162 acc: acc.deep_clone(seen_tees),
3163 input: Box::new(input.deep_clone(seen_tees)),
3164 metadata: metadata.clone(),
3165 },
3166 HydroNode::ScanAsyncBlocking {
3167 init,
3168 acc,
3169 input,
3170 metadata,
3171 } => HydroNode::ScanAsyncBlocking {
3172 init: init.deep_clone(seen_tees),
3173 acc: acc.deep_clone(seen_tees),
3174 input: Box::new(input.deep_clone(seen_tees)),
3175 metadata: metadata.clone(),
3176 },
3177 HydroNode::FoldKeyed {
3178 init,
3179 acc,
3180 input,
3181 metadata,
3182 } => HydroNode::FoldKeyed {
3183 init: init.deep_clone(seen_tees),
3184 acc: acc.deep_clone(seen_tees),
3185 input: Box::new(input.deep_clone(seen_tees)),
3186 metadata: metadata.clone(),
3187 },
3188 HydroNode::ReduceKeyedWatermark {
3189 f,
3190 input,
3191 watermark,
3192 metadata,
3193 } => HydroNode::ReduceKeyedWatermark {
3194 f: f.deep_clone(seen_tees),
3195 input: Box::new(input.deep_clone(seen_tees)),
3196 watermark: Box::new(watermark.deep_clone(seen_tees)),
3197 metadata: metadata.clone(),
3198 },
3199 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3200 f: f.deep_clone(seen_tees),
3201 input: Box::new(input.deep_clone(seen_tees)),
3202 metadata: metadata.clone(),
3203 },
3204 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3205 f: f.deep_clone(seen_tees),
3206 input: Box::new(input.deep_clone(seen_tees)),
3207 metadata: metadata.clone(),
3208 },
3209 HydroNode::Network {
3210 name,
3211 networking_info,
3212 serialize_fn,
3213 instantiate_fn,
3214 deserialize_fn,
3215 input,
3216 metadata,
3217 } => HydroNode::Network {
3218 name: name.clone(),
3219 networking_info: networking_info.clone(),
3220 serialize_fn: serialize_fn.clone(),
3221 instantiate_fn: instantiate_fn.clone(),
3222 deserialize_fn: deserialize_fn.clone(),
3223 input: Box::new(input.deep_clone(seen_tees)),
3224 metadata: metadata.clone(),
3225 },
3226 HydroNode::ExternalInput {
3227 from_external_key,
3228 from_port_id,
3229 from_many,
3230 codec_type,
3231 port_hint,
3232 instantiate_fn,
3233 deserialize_fn,
3234 metadata,
3235 } => HydroNode::ExternalInput {
3236 from_external_key: *from_external_key,
3237 from_port_id: *from_port_id,
3238 from_many: *from_many,
3239 codec_type: codec_type.clone(),
3240 port_hint: *port_hint,
3241 instantiate_fn: instantiate_fn.clone(),
3242 deserialize_fn: deserialize_fn.clone(),
3243 metadata: metadata.clone(),
3244 },
3245 HydroNode::Counter {
3246 tag,
3247 duration,
3248 prefix,
3249 input,
3250 metadata,
3251 } => HydroNode::Counter {
3252 tag: tag.clone(),
3253 duration: duration.clone(),
3254 prefix: prefix.clone(),
3255 input: Box::new(input.deep_clone(seen_tees)),
3256 metadata: metadata.clone(),
3257 },
3258 }
3259 }
3260
3261 #[cfg(feature = "build")]
3262 pub fn emit_core(
3263 &mut self,
3264 builders_or_callback: &mut BuildersOrCallback<
3265 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
3266 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
3267 >,
3268 seen_tees: &mut SeenSharedNodes,
3269 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3270 next_stmt_id: &mut crate::Counter<StmtId>,
3271 fold_hooked_idents: &mut HashSet<String>,
3272 ) -> syn::Ident {
3273 let mut ident_stack: Vec<syn::Ident> = Vec::new();
3274
3275 self.transform_bottom_up(
3276 &mut |node: &mut HydroNode| {
3277 let out_location = node.metadata().location_id.clone();
3278 match node {
3279 HydroNode::Placeholder => {
3280 panic!()
3281 }
3282
3283 HydroNode::Cast { .. } => {
3284 let _ = next_stmt_id.get_and_increment();
3287 match builders_or_callback {
3288 BuildersOrCallback::Builders(_) => {}
3289 BuildersOrCallback::Callback(_, node_callback) => {
3290 node_callback(node, next_stmt_id);
3291 }
3292 }
3293 }
3295
3296 HydroNode::UnboundSingleton { .. } => {
3297 let inner_ident = ident_stack.pop().unwrap();
3298
3299 let stmt_id = next_stmt_id.get_and_increment();
3300 let out_ident =
3301 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3302
3303 match builders_or_callback {
3304 BuildersOrCallback::Builders(graph_builders) => {
3305 if graph_builders.singleton_intermediates() {
3306 let builder = graph_builders.get_dfir_mut(&out_location);
3307 builder.add_dfir(
3308 parse_quote! {
3309 #out_ident = #inner_ident;
3310 },
3311 None,
3312 None,
3313 );
3314 } else {
3315 let builder = graph_builders.get_dfir_mut(&out_location);
3316 builder.add_dfir(
3317 parse_quote! {
3318 #out_ident = #inner_ident -> persist::<'static>();
3319 },
3320 None,
3321 None,
3322 );
3323 }
3324 }
3325 BuildersOrCallback::Callback(_, node_callback) => {
3326 node_callback(node, next_stmt_id);
3327 }
3328 }
3329
3330 ident_stack.push(out_ident);
3331 }
3332
3333 HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3334 let inner_ident = ident_stack.pop().unwrap();
3335
3336 let stmt_id = next_stmt_id.get_and_increment();
3337 let out_ident =
3338 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3339
3340 match builders_or_callback {
3341 BuildersOrCallback::Builders(graph_builders) => {
3342 graph_builders.assert_is_consistent(
3343 *trusted,
3344 &inner.metadata().location_id,
3345 inner_ident,
3346 &out_ident,
3347 );
3348 }
3349 BuildersOrCallback::Callback(_, node_callback) => {
3350 node_callback(node, next_stmt_id);
3351 }
3352 }
3353
3354 ident_stack.push(out_ident);
3355 }
3356
3357 HydroNode::ObserveNonDet {
3358 inner,
3359 trusted,
3360 metadata,
3361 ..
3362 } => {
3363 let inner_ident = ident_stack.pop().unwrap();
3364
3365 let stmt_id = next_stmt_id.get_and_increment();
3366 let observe_ident =
3367 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3368
3369 match builders_or_callback {
3370 BuildersOrCallback::Builders(graph_builders) => {
3371 graph_builders.observe_nondet(
3372 *trusted,
3373 &inner.metadata().location_id,
3374 inner_ident,
3375 &inner.metadata().collection_kind,
3376 &observe_ident,
3377 &metadata.collection_kind,
3378 &metadata.op,
3379 );
3380 }
3381 BuildersOrCallback::Callback(_, node_callback) => {
3382 node_callback(node, next_stmt_id);
3383 }
3384 }
3385
3386 ident_stack.push(observe_ident);
3387 }
3388
3389 HydroNode::Batch {
3390 inner, metadata, ..
3391 } => {
3392 let inner_ident = ident_stack.pop().unwrap();
3393
3394 let stmt_id = next_stmt_id.get_and_increment();
3395 let batch_ident =
3396 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3397
3398 match builders_or_callback {
3399 BuildersOrCallback::Builders(graph_builders) => {
3400 graph_builders.batch(
3401 inner_ident,
3402 &inner.metadata().location_id,
3403 &inner.metadata().collection_kind,
3404 &batch_ident,
3405 &out_location,
3406 &metadata.op,
3407 fold_hooked_idents,
3408 );
3409 }
3410 BuildersOrCallback::Callback(_, node_callback) => {
3411 node_callback(node, next_stmt_id);
3412 }
3413 }
3414
3415 ident_stack.push(batch_ident);
3416 }
3417
3418 HydroNode::YieldConcat { inner, .. } => {
3419 let inner_ident = ident_stack.pop().unwrap();
3420
3421 let stmt_id = next_stmt_id.get_and_increment();
3422 let yield_ident =
3423 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3424
3425 match builders_or_callback {
3426 BuildersOrCallback::Builders(graph_builders) => {
3427 graph_builders.yield_from_tick(
3428 inner_ident,
3429 &inner.metadata().location_id,
3430 &inner.metadata().collection_kind,
3431 &yield_ident,
3432 &out_location,
3433 );
3434 }
3435 BuildersOrCallback::Callback(_, node_callback) => {
3436 node_callback(node, next_stmt_id);
3437 }
3438 }
3439
3440 ident_stack.push(yield_ident);
3441 }
3442
3443 HydroNode::BeginAtomic { inner, metadata } => {
3444 let inner_ident = ident_stack.pop().unwrap();
3445
3446 let stmt_id = next_stmt_id.get_and_increment();
3447 let begin_ident =
3448 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3449
3450 match builders_or_callback {
3451 BuildersOrCallback::Builders(graph_builders) => {
3452 graph_builders.begin_atomic(
3453 inner_ident,
3454 &inner.metadata().location_id,
3455 &inner.metadata().collection_kind,
3456 &begin_ident,
3457 &out_location,
3458 &metadata.op,
3459 );
3460 }
3461 BuildersOrCallback::Callback(_, node_callback) => {
3462 node_callback(node, next_stmt_id);
3463 }
3464 }
3465
3466 ident_stack.push(begin_ident);
3467 }
3468
3469 HydroNode::EndAtomic { inner, .. } => {
3470 let inner_ident = ident_stack.pop().unwrap();
3471
3472 let stmt_id = next_stmt_id.get_and_increment();
3473 let end_ident =
3474 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3475
3476 match builders_or_callback {
3477 BuildersOrCallback::Builders(graph_builders) => {
3478 graph_builders.end_atomic(
3479 inner_ident,
3480 &inner.metadata().location_id,
3481 &inner.metadata().collection_kind,
3482 &end_ident,
3483 );
3484 }
3485 BuildersOrCallback::Callback(_, node_callback) => {
3486 node_callback(node, next_stmt_id);
3487 }
3488 }
3489
3490 ident_stack.push(end_ident);
3491 }
3492
3493 HydroNode::Source {
3494 source, metadata, ..
3495 } => {
3496 if let HydroSource::ExternalNetwork() = source {
3497 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3498 } else {
3499 let stmt_id = next_stmt_id.get_and_increment();
3500 let source_ident =
3501 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3502
3503 let source_stmt = match source {
3504 HydroSource::Stream(expr) => {
3505 debug_assert!(metadata.location_id.is_top_level());
3506 parse_quote! {
3507 #source_ident = source_stream(#expr);
3508 }
3509 }
3510
3511 HydroSource::ExternalNetwork() => {
3512 unreachable!()
3513 }
3514
3515 HydroSource::Iter(expr) => {
3516 if metadata.location_id.is_top_level() {
3517 parse_quote! {
3518 #source_ident = source_iter(#expr);
3519 }
3520 } else {
3521 parse_quote! {
3523 #source_ident = source_iter(#expr) -> persist::<'static>();
3524 }
3525 }
3526 }
3527
3528 HydroSource::Spin() => {
3529 debug_assert!(metadata.location_id.is_top_level());
3530 parse_quote! {
3531 #source_ident = spin();
3532 }
3533 }
3534
3535 HydroSource::ClusterMembers(target_loc, state) => {
3536 debug_assert!(metadata.location_id.is_top_level());
3537
3538 let members_tee_ident = syn::Ident::new(
3539 &format!(
3540 "__cluster_members_tee_{}_{}",
3541 metadata.location_id.root().key(),
3542 target_loc.key(),
3543 ),
3544 Span::call_site(),
3545 );
3546
3547 match state {
3548 ClusterMembersState::Stream(d) => {
3549 parse_quote! {
3550 #members_tee_ident = source_stream(#d) -> tee();
3551 #source_ident = #members_tee_ident;
3552 }
3553 },
3554 ClusterMembersState::Uninit => syn::parse_quote! {
3555 #source_ident = source_stream(DUMMY);
3556 },
3557 ClusterMembersState::Tee(..) => parse_quote! {
3558 #source_ident = #members_tee_ident;
3559 },
3560 }
3561 }
3562
3563 HydroSource::Embedded(ident) => {
3564 parse_quote! {
3565 #source_ident = source_stream(#ident);
3566 }
3567 }
3568
3569 HydroSource::EmbeddedSingleton(ident) => {
3570 parse_quote! {
3571 #source_ident = source_iter([#ident]);
3572 }
3573 }
3574 };
3575
3576 match builders_or_callback {
3577 BuildersOrCallback::Builders(graph_builders) => {
3578 let builder = graph_builders.get_dfir_mut(&out_location);
3579 builder.add_dfir(source_stmt, None, Some(&stmt_id.to_string()));
3580 }
3581 BuildersOrCallback::Callback(_, node_callback) => {
3582 node_callback(node, next_stmt_id);
3583 }
3584 }
3585
3586 ident_stack.push(source_ident);
3587 }
3588 }
3589
3590 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3591 let stmt_id = next_stmt_id.get_and_increment();
3592 let source_ident =
3593 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3594
3595 match builders_or_callback {
3596 BuildersOrCallback::Builders(graph_builders) => {
3597 let builder = graph_builders.get_dfir_mut(&out_location);
3598
3599 if *first_tick_only {
3600 assert!(
3601 !metadata.location_id.is_top_level(),
3602 "first_tick_only SingletonSource must be inside a tick"
3603 );
3604 }
3605
3606 if *first_tick_only
3607 || (metadata.location_id.is_top_level()
3608 && metadata.collection_kind.is_bounded())
3609 {
3610 builder.add_dfir(
3611 parse_quote! {
3612 #source_ident = source_iter([#value]);
3613 },
3614 None,
3615 Some(&stmt_id.to_string()),
3616 );
3617 } else {
3618 builder.add_dfir(
3619 parse_quote! {
3620 #source_ident = source_iter([#value]) -> persist::<'static>();
3621 },
3622 None,
3623 Some(&stmt_id.to_string()),
3624 );
3625 }
3626 }
3627 BuildersOrCallback::Callback(_, node_callback) => {
3628 node_callback(node, next_stmt_id);
3629 }
3630 }
3631
3632 ident_stack.push(source_ident);
3633 }
3634
3635 HydroNode::CycleSource { cycle_id, .. } => {
3636 let ident = cycle_id.as_ident();
3637
3638 let _ = next_stmt_id.get_and_increment();
3640
3641 match builders_or_callback {
3642 BuildersOrCallback::Builders(_) => {}
3643 BuildersOrCallback::Callback(_, node_callback) => {
3644 node_callback(node, next_stmt_id);
3645 }
3646 }
3647
3648 ident_stack.push(ident);
3649 }
3650
3651 HydroNode::Tee { inner, .. } => {
3652 let stmt_id = next_stmt_id.get_and_increment();
3655
3656 let ret_ident = if let Some(built_idents) =
3657 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3658 {
3659 match builders_or_callback {
3660 BuildersOrCallback::Builders(_) => {}
3661 BuildersOrCallback::Callback(_, node_callback) => {
3662 node_callback(node, next_stmt_id);
3663 }
3664 }
3665
3666 built_idents[0].clone()
3667 } else {
3668 let inner_ident = ident_stack.pop().unwrap();
3671
3672 let tee_ident =
3673 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3674
3675 built_tees.insert(
3676 inner.0.as_ref() as *const RefCell<HydroNode>,
3677 vec![tee_ident.clone()],
3678 );
3679
3680 match builders_or_callback {
3681 BuildersOrCallback::Builders(graph_builders) => {
3682 if fold_hooked_idents.contains(&inner_ident.to_string()) {
3694 fold_hooked_idents.insert(tee_ident.to_string());
3695 }
3696 let builder = graph_builders.get_dfir_mut(&out_location);
3697 builder.add_dfir(
3698 parse_quote! {
3699 #tee_ident = #inner_ident -> tee();
3700 },
3701 None,
3702 Some(&stmt_id.to_string()),
3703 );
3704 }
3705 BuildersOrCallback::Callback(_, node_callback) => {
3706 node_callback(node, next_stmt_id);
3707 }
3708 }
3709
3710 tee_ident
3711 };
3712
3713 ident_stack.push(ret_ident);
3714 }
3715
3716 HydroNode::Reference { inner, kind, .. } => {
3717 let stmt_id = next_stmt_id.get_and_increment();
3720
3721 let ret_ident = if let Some(built_idents) =
3722 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3723 {
3724 built_idents[0].clone()
3725 } else {
3726 let inner_ident = ident_stack.pop().unwrap();
3727
3728 let ref_ident =
3729 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3730
3731 built_tees.insert(
3732 inner.0.as_ref() as *const RefCell<HydroNode>,
3733 vec![ref_ident.clone()],
3734 );
3735
3736 match builders_or_callback {
3737 BuildersOrCallback::Builders(graph_builders) => {
3738 let builder = graph_builders.get_dfir_mut(&out_location);
3739 let op_ident = syn::Ident::new(
3740 match kind {
3741 crate::handoff_ref::HandoffRefKind::Singleton => "singleton",
3742 crate::handoff_ref::HandoffRefKind::Optional => "optional",
3743 crate::handoff_ref::HandoffRefKind::Vec => "handoff",
3744 },
3745 Span::call_site(),
3746 );
3747 builder.add_dfir(
3748 parse_quote! {
3749 #ref_ident = #inner_ident -> #op_ident();
3750 },
3751 None,
3752 Some(&stmt_id.to_string()),
3753 );
3754 }
3755 BuildersOrCallback::Callback(_, node_callback) => {
3756 node_callback(node, next_stmt_id);
3757 }
3758 }
3759
3760 ref_ident
3761 };
3762
3763 ident_stack.push(ret_ident);
3764 }
3765
3766 HydroNode::Partition {
3767 inner, f, is_true, metadata,
3768 } => {
3769 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3771 let stmt_id = next_stmt_id.get_and_increment();
3772
3773 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3774 match builders_or_callback {
3775 BuildersOrCallback::Builders(_) => {}
3776 BuildersOrCallback::Callback(_, node_callback) => {
3777 node_callback(node, next_stmt_id);
3778 }
3779 }
3780
3781 let idx = if is_true { 0 } else { 1 };
3782 built_idents[idx].clone()
3783 } else {
3784 let inner_ident = ident_stack.pop().unwrap();
3787 let f_tokens = f.emit_tokens(&mut ident_stack);
3788
3789 let inner_ident = {
3790 let inner_borrow = inner.0.borrow();
3791 maybe_observe_for_mut(
3792 f, inner_ident,
3793 &inner_borrow.metadata().location_id,
3794 &inner_borrow.metadata().collection_kind,
3795 &metadata.op,
3796 builders_or_callback, next_stmt_id,
3797 )
3798 };
3799
3800 let partition_ident = syn::Ident::new(
3801 &format!("stream_{}_partition", stmt_id),
3802 Span::call_site(),
3803 );
3804 let true_ident = syn::Ident::new(
3805 &format!("stream_{}_true", stmt_id),
3806 Span::call_site(),
3807 );
3808 let false_ident = syn::Ident::new(
3809 &format!("stream_{}_false", stmt_id),
3810 Span::call_site(),
3811 );
3812
3813 built_tees.insert(
3814 ptr,
3815 vec![true_ident.clone(), false_ident.clone()],
3816 );
3817
3818 let stmt_id = next_stmt_id.get_and_increment();
3819 match builders_or_callback {
3820 BuildersOrCallback::Builders(graph_builders) => {
3821 let builder = graph_builders.get_dfir_mut(&out_location);
3822 builder.add_dfir(
3823 parse_quote! {
3824 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3825 #true_ident = #partition_ident[0];
3826 #false_ident = #partition_ident[1];
3827 },
3828 None,
3829 Some(&stmt_id.to_string()),
3830 );
3831 }
3832 BuildersOrCallback::Callback(_, node_callback) => {
3833 node_callback(node, next_stmt_id);
3834 }
3835 }
3836
3837 if is_true { true_ident } else { false_ident }
3838 };
3839
3840 ident_stack.push(ret_ident);
3841 }
3842
3843 HydroNode::Chain { .. } => {
3844 let second_ident = ident_stack.pop().unwrap();
3846 let first_ident = ident_stack.pop().unwrap();
3847
3848 let stmt_id = next_stmt_id.get_and_increment();
3849 let chain_ident =
3850 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3851
3852 match builders_or_callback {
3853 BuildersOrCallback::Builders(graph_builders) => {
3854 let builder = graph_builders.get_dfir_mut(&out_location);
3855 builder.add_dfir(
3856 parse_quote! {
3857 #chain_ident = chain();
3858 #first_ident -> [0]#chain_ident;
3859 #second_ident -> [1]#chain_ident;
3860 },
3861 None,
3862 Some(&stmt_id.to_string()),
3863 );
3864 }
3865 BuildersOrCallback::Callback(_, node_callback) => {
3866 node_callback(node, next_stmt_id);
3867 }
3868 }
3869
3870 ident_stack.push(chain_ident);
3871 }
3872
3873 HydroNode::MergeOrdered { first, metadata, .. } => {
3874 let second_ident = ident_stack.pop().unwrap();
3875 let first_ident = ident_stack.pop().unwrap();
3876
3877 let stmt_id = next_stmt_id.get_and_increment();
3878 let merge_ident =
3879 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3880
3881 match builders_or_callback {
3882 BuildersOrCallback::Builders(graph_builders) => {
3883 graph_builders.merge_ordered(
3884 &first.metadata().location_id,
3885 first_ident,
3886 second_ident,
3887 &merge_ident,
3888 &first.metadata().collection_kind,
3889 &metadata.op,
3890 Some(&stmt_id.to_string()),
3891 );
3892 }
3893 BuildersOrCallback::Callback(_, node_callback) => {
3894 node_callback(node, next_stmt_id);
3895 }
3896 }
3897
3898 ident_stack.push(merge_ident);
3899 }
3900
3901 HydroNode::ChainFirst { .. } => {
3902 let second_ident = ident_stack.pop().unwrap();
3903 let first_ident = ident_stack.pop().unwrap();
3904
3905 let stmt_id = next_stmt_id.get_and_increment();
3906 let chain_ident =
3907 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3908
3909 match builders_or_callback {
3910 BuildersOrCallback::Builders(graph_builders) => {
3911 let builder = graph_builders.get_dfir_mut(&out_location);
3912 builder.add_dfir(
3913 parse_quote! {
3914 #chain_ident = chain_first_n(1);
3915 #first_ident -> [0]#chain_ident;
3916 #second_ident -> [1]#chain_ident;
3917 },
3918 None,
3919 Some(&stmt_id.to_string()),
3920 );
3921 }
3922 BuildersOrCallback::Callback(_, node_callback) => {
3923 node_callback(node, next_stmt_id);
3924 }
3925 }
3926
3927 ident_stack.push(chain_ident);
3928 }
3929
3930 HydroNode::CrossSingleton { right, .. } => {
3931 let right_ident = ident_stack.pop().unwrap();
3932 let left_ident = ident_stack.pop().unwrap();
3933
3934 let stmt_id = next_stmt_id.get_and_increment();
3935 let cross_ident =
3936 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3937
3938 match builders_or_callback {
3939 BuildersOrCallback::Builders(graph_builders) => {
3940 let builder = graph_builders.get_dfir_mut(&out_location);
3941
3942 if right.metadata().location_id.is_top_level()
3943 && right.metadata().collection_kind.is_bounded()
3944 {
3945 builder.add_dfir(
3946 parse_quote! {
3947 #cross_ident = cross_singleton::<'static>();
3948 #left_ident -> [input]#cross_ident;
3949 #right_ident -> [single]#cross_ident;
3950 },
3951 None,
3952 Some(&stmt_id.to_string()),
3953 );
3954 } else {
3955 builder.add_dfir(
3956 parse_quote! {
3957 #cross_ident = cross_singleton();
3958 #left_ident -> [input]#cross_ident;
3959 #right_ident -> [single]#cross_ident;
3960 },
3961 None,
3962 Some(&stmt_id.to_string()),
3963 );
3964 }
3965 }
3966 BuildersOrCallback::Callback(_, node_callback) => {
3967 node_callback(node, next_stmt_id);
3968 }
3969 }
3970
3971 ident_stack.push(cross_ident);
3972 }
3973
3974 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3975 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3976 parse_quote!(cross_join_multiset)
3977 } else {
3978 parse_quote!(join_multiset)
3979 };
3980
3981 let (HydroNode::CrossProduct { left, right, .. }
3982 | HydroNode::Join { left, right, .. }) = node
3983 else {
3984 unreachable!()
3985 };
3986
3987 let is_top_level = left.metadata().location_id.is_top_level()
3988 && right.metadata().location_id.is_top_level();
3989 let left_lifetime = if left.metadata().location_id.is_top_level() {
3990 quote!('static)
3991 } else {
3992 quote!('tick)
3993 };
3994
3995 let right_lifetime = if right.metadata().location_id.is_top_level() {
3996 quote!('static)
3997 } else {
3998 quote!('tick)
3999 };
4000
4001 let right_ident = ident_stack.pop().unwrap();
4002 let left_ident = ident_stack.pop().unwrap();
4003
4004 let stmt_id = next_stmt_id.get_and_increment();
4005 let stream_ident =
4006 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4007
4008 match builders_or_callback {
4009 BuildersOrCallback::Builders(graph_builders) => {
4010 let builder = graph_builders.get_dfir_mut(&out_location);
4011 builder.add_dfir(
4012 if is_top_level {
4013 parse_quote! {
4016 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
4017 #left_ident -> [0]#stream_ident;
4018 #right_ident -> [1]#stream_ident;
4019 }
4020 } else {
4021 parse_quote! {
4022 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
4023 #left_ident -> [0]#stream_ident;
4024 #right_ident -> [1]#stream_ident;
4025 }
4026 }
4027 ,
4028 None,
4029 Some(&stmt_id.to_string()),
4030 );
4031 }
4032 BuildersOrCallback::Callback(_, node_callback) => {
4033 node_callback(node, next_stmt_id);
4034 }
4035 }
4036
4037 ident_stack.push(stream_ident);
4038 }
4039
4040 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
4041 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
4042 parse_quote!(difference)
4043 } else {
4044 parse_quote!(anti_join)
4045 };
4046
4047 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
4048 node
4049 else {
4050 unreachable!()
4051 };
4052
4053 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
4054 quote!('static)
4055 } else {
4056 quote!('tick)
4057 };
4058
4059 let neg_ident = ident_stack.pop().unwrap();
4060 let pos_ident = ident_stack.pop().unwrap();
4061
4062 let stmt_id = next_stmt_id.get_and_increment();
4063 let stream_ident =
4064 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4065
4066 match builders_or_callback {
4067 BuildersOrCallback::Builders(graph_builders) => {
4068 let builder = graph_builders.get_dfir_mut(&out_location);
4069 builder.add_dfir(
4070 parse_quote! {
4071 #stream_ident = #operator::<'tick, #neg_lifetime>();
4072 #pos_ident -> [pos]#stream_ident;
4073 #neg_ident -> [neg]#stream_ident;
4074 },
4075 None,
4076 Some(&stmt_id.to_string()),
4077 );
4078 }
4079 BuildersOrCallback::Callback(_, node_callback) => {
4080 node_callback(node, next_stmt_id);
4081 }
4082 }
4083
4084 ident_stack.push(stream_ident);
4085 }
4086
4087 HydroNode::JoinHalf { .. } => {
4088 let HydroNode::JoinHalf { right, .. } = node else {
4089 unreachable!()
4090 };
4091
4092 assert!(
4093 right.metadata().collection_kind.is_bounded(),
4094 "JoinHalf requires the right (build) side to be Bounded, got {:?}",
4095 right.metadata().collection_kind
4096 );
4097
4098 let build_lifetime = if right.metadata().location_id.is_top_level() {
4099 quote!('static)
4100 } else {
4101 quote!('tick)
4102 };
4103
4104 let build_ident = ident_stack.pop().unwrap();
4105 let probe_ident = ident_stack.pop().unwrap();
4106
4107 let stmt_id = next_stmt_id.get_and_increment();
4108 let stream_ident =
4109 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4110
4111 match builders_or_callback {
4112 BuildersOrCallback::Builders(graph_builders) => {
4113 let builder = graph_builders.get_dfir_mut(&out_location);
4114 builder.add_dfir(
4115 parse_quote! {
4116 #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
4117 #probe_ident -> [probe]#stream_ident;
4118 #build_ident -> [build]#stream_ident;
4119 },
4120 None,
4121 Some(&stmt_id.to_string()),
4122 );
4123 }
4124 BuildersOrCallback::Callback(_, node_callback) => {
4125 node_callback(node, next_stmt_id);
4126 }
4127 }
4128
4129 ident_stack.push(stream_ident);
4130 }
4131
4132 HydroNode::ResolveFutures { .. } => {
4133 let input_ident = ident_stack.pop().unwrap();
4134
4135 let stmt_id = next_stmt_id.get_and_increment();
4136 let futures_ident =
4137 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4138
4139 match builders_or_callback {
4140 BuildersOrCallback::Builders(graph_builders) => {
4141 let builder = graph_builders.get_dfir_mut(&out_location);
4142 builder.add_dfir(
4143 parse_quote! {
4144 #futures_ident = #input_ident -> resolve_futures();
4145 },
4146 None,
4147 Some(&stmt_id.to_string()),
4148 );
4149 }
4150 BuildersOrCallback::Callback(_, node_callback) => {
4151 node_callback(node, next_stmt_id);
4152 }
4153 }
4154
4155 ident_stack.push(futures_ident);
4156 }
4157
4158 HydroNode::ResolveFuturesBlocking { .. } => {
4159 let input_ident = ident_stack.pop().unwrap();
4160
4161 let stmt_id = next_stmt_id.get_and_increment();
4162 let futures_ident =
4163 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4164
4165 match builders_or_callback {
4166 BuildersOrCallback::Builders(graph_builders) => {
4167 let builder = graph_builders.get_dfir_mut(&out_location);
4168 builder.add_dfir(
4169 parse_quote! {
4170 #futures_ident = #input_ident -> resolve_futures_blocking();
4171 },
4172 None,
4173 Some(&stmt_id.to_string()),
4174 );
4175 }
4176 BuildersOrCallback::Callback(_, node_callback) => {
4177 node_callback(node, next_stmt_id);
4178 }
4179 }
4180
4181 ident_stack.push(futures_ident);
4182 }
4183
4184 HydroNode::ResolveFuturesOrdered { .. } => {
4185 let input_ident = ident_stack.pop().unwrap();
4186
4187 let stmt_id = next_stmt_id.get_and_increment();
4188 let futures_ident =
4189 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4190
4191 match builders_or_callback {
4192 BuildersOrCallback::Builders(graph_builders) => {
4193 let builder = graph_builders.get_dfir_mut(&out_location);
4194 builder.add_dfir(
4195 parse_quote! {
4196 #futures_ident = #input_ident -> resolve_futures_ordered();
4197 },
4198 None,
4199 Some(&stmt_id.to_string()),
4200 );
4201 }
4202 BuildersOrCallback::Callback(_, node_callback) => {
4203 node_callback(node, next_stmt_id);
4204 }
4205 }
4206
4207 ident_stack.push(futures_ident);
4208 }
4209
4210 HydroNode::Map {
4211 f,
4212 input,
4213 metadata,
4214 } => {
4215 let input_ident = ident_stack.pop().unwrap();
4217 let f_tokens = f.emit_tokens(&mut ident_stack);
4218
4219 let input_ident = maybe_observe_for_mut(
4220 f,
4221 input_ident,
4222 &input.metadata().location_id,
4223 &input.metadata().collection_kind,
4224 &metadata.op,
4225 builders_or_callback,
4226 next_stmt_id,
4227 );
4228
4229 let stmt_id = next_stmt_id.get_and_increment();
4230 let map_ident =
4231 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4232
4233 match builders_or_callback {
4234 BuildersOrCallback::Builders(graph_builders) => {
4235 let builder = graph_builders.get_dfir_mut(&out_location);
4236 builder.add_dfir(
4237 parse_quote! {
4238 #map_ident = #input_ident -> map(#f_tokens);
4239 },
4240 None,
4241 Some(&stmt_id.to_string()),
4242 );
4243 }
4244 BuildersOrCallback::Callback(_, node_callback) => {
4245 node_callback(node, next_stmt_id);
4246 }
4247 }
4248
4249 ident_stack.push(map_ident);
4250 }
4251
4252 HydroNode::FlatMap { f, input, metadata } => {
4253 let input_ident = ident_stack.pop().unwrap();
4254 let f_tokens = f.emit_tokens(&mut ident_stack);
4255
4256 let input_ident = maybe_observe_for_mut(
4257 f, input_ident,
4258 &input.metadata().location_id,
4259 &input.metadata().collection_kind,
4260 &metadata.op,
4261 builders_or_callback, next_stmt_id,
4262 );
4263
4264 let stmt_id = next_stmt_id.get_and_increment();
4265 let flat_map_ident =
4266 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4267
4268 match builders_or_callback {
4269 BuildersOrCallback::Builders(graph_builders) => {
4270 let builder = graph_builders.get_dfir_mut(&out_location);
4271 builder.add_dfir(
4272 parse_quote! {
4273 #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4274 },
4275 None,
4276 Some(&stmt_id.to_string()),
4277 );
4278 }
4279 BuildersOrCallback::Callback(_, node_callback) => {
4280 node_callback(node, next_stmt_id);
4281 }
4282 }
4283
4284 ident_stack.push(flat_map_ident);
4285 }
4286
4287 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
4288 let input_ident = ident_stack.pop().unwrap();
4289 let f_tokens = f.emit_tokens(&mut ident_stack);
4290
4291 let input_ident = maybe_observe_for_mut(
4292 f, input_ident,
4293 &input.metadata().location_id,
4294 &input.metadata().collection_kind,
4295 &metadata.op,
4296 builders_or_callback, next_stmt_id,
4297 );
4298
4299 let stmt_id = next_stmt_id.get_and_increment();
4300 let flat_map_stream_blocking_ident =
4301 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4302
4303 match builders_or_callback {
4304 BuildersOrCallback::Builders(graph_builders) => {
4305 let builder = graph_builders.get_dfir_mut(&out_location);
4306 builder.add_dfir(
4307 parse_quote! {
4308 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4309 },
4310 None,
4311 Some(&stmt_id.to_string()),
4312 );
4313 }
4314 BuildersOrCallback::Callback(_, node_callback) => {
4315 node_callback(node, next_stmt_id);
4316 }
4317 }
4318
4319 ident_stack.push(flat_map_stream_blocking_ident);
4320 }
4321
4322 HydroNode::Filter { f, input, metadata } => {
4323 let input_ident = ident_stack.pop().unwrap();
4324 let f_tokens = f.emit_tokens(&mut ident_stack);
4325
4326 let input_ident = maybe_observe_for_mut(
4327 f, input_ident,
4328 &input.metadata().location_id,
4329 &input.metadata().collection_kind,
4330 &metadata.op,
4331 builders_or_callback, next_stmt_id,
4332 );
4333
4334 let stmt_id = next_stmt_id.get_and_increment();
4335 let filter_ident =
4336 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4337
4338 match builders_or_callback {
4339 BuildersOrCallback::Builders(graph_builders) => {
4340 let builder = graph_builders.get_dfir_mut(&out_location);
4341 builder.add_dfir(
4342 parse_quote! {
4343 #filter_ident = #input_ident -> filter(#f_tokens);
4344 },
4345 None,
4346 Some(&stmt_id.to_string()),
4347 );
4348 }
4349 BuildersOrCallback::Callback(_, node_callback) => {
4350 node_callback(node, next_stmt_id);
4351 }
4352 }
4353
4354 ident_stack.push(filter_ident);
4355 }
4356
4357 HydroNode::FilterMap { f, input, metadata } => {
4358 let input_ident = ident_stack.pop().unwrap();
4359 let f_tokens = f.emit_tokens(&mut ident_stack);
4360
4361 let input_ident = maybe_observe_for_mut(
4362 f, input_ident,
4363 &input.metadata().location_id,
4364 &input.metadata().collection_kind,
4365 &metadata.op,
4366 builders_or_callback, next_stmt_id,
4367 );
4368
4369 let stmt_id = next_stmt_id.get_and_increment();
4370 let filter_map_ident =
4371 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4372
4373 match builders_or_callback {
4374 BuildersOrCallback::Builders(graph_builders) => {
4375 let builder = graph_builders.get_dfir_mut(&out_location);
4376 builder.add_dfir(
4377 parse_quote! {
4378 #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4379 },
4380 None,
4381 Some(&stmt_id.to_string()),
4382 );
4383 }
4384 BuildersOrCallback::Callback(_, node_callback) => {
4385 node_callback(node, next_stmt_id);
4386 }
4387 }
4388
4389 ident_stack.push(filter_map_ident);
4390 }
4391
4392 HydroNode::Sort { .. } => {
4393 let input_ident = ident_stack.pop().unwrap();
4394
4395 let stmt_id = next_stmt_id.get_and_increment();
4396 let sort_ident =
4397 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4398
4399 match builders_or_callback {
4400 BuildersOrCallback::Builders(graph_builders) => {
4401 let builder = graph_builders.get_dfir_mut(&out_location);
4402 builder.add_dfir(
4403 parse_quote! {
4404 #sort_ident = #input_ident -> sort();
4405 },
4406 None,
4407 Some(&stmt_id.to_string()),
4408 );
4409 }
4410 BuildersOrCallback::Callback(_, node_callback) => {
4411 node_callback(node, next_stmt_id);
4412 }
4413 }
4414
4415 ident_stack.push(sort_ident);
4416 }
4417
4418 HydroNode::DeferTick { .. } => {
4419 let input_ident = ident_stack.pop().unwrap();
4420
4421 let stmt_id = next_stmt_id.get_and_increment();
4422 let defer_tick_ident =
4423 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4424
4425 match builders_or_callback {
4426 BuildersOrCallback::Builders(graph_builders) => {
4427 let builder = graph_builders.get_dfir_mut(&out_location);
4428 builder.add_dfir(
4429 parse_quote! {
4430 #defer_tick_ident = #input_ident -> defer_tick_lazy();
4431 },
4432 None,
4433 Some(&stmt_id.to_string()),
4434 );
4435 }
4436 BuildersOrCallback::Callback(_, node_callback) => {
4437 node_callback(node, next_stmt_id);
4438 }
4439 }
4440
4441 ident_stack.push(defer_tick_ident);
4442 }
4443
4444 HydroNode::Enumerate { input, .. } => {
4445 let input_ident = ident_stack.pop().unwrap();
4446
4447 let stmt_id = next_stmt_id.get_and_increment();
4448 let enumerate_ident =
4449 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4450
4451 match builders_or_callback {
4452 BuildersOrCallback::Builders(graph_builders) => {
4453 let builder = graph_builders.get_dfir_mut(&out_location);
4454 let lifetime = if input.metadata().location_id.is_top_level() {
4455 quote!('static)
4456 } else {
4457 quote!('tick)
4458 };
4459 builder.add_dfir(
4460 parse_quote! {
4461 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4462 },
4463 None,
4464 Some(&stmt_id.to_string()),
4465 );
4466 }
4467 BuildersOrCallback::Callback(_, node_callback) => {
4468 node_callback(node, next_stmt_id);
4469 }
4470 }
4471
4472 ident_stack.push(enumerate_ident);
4473 }
4474
4475 HydroNode::Inspect { f, input, metadata } => {
4476 let input_ident = ident_stack.pop().unwrap();
4477 let f_tokens = f.emit_tokens(&mut ident_stack);
4478
4479 let input_ident = maybe_observe_for_mut(
4480 f, input_ident,
4481 &input.metadata().location_id,
4482 &input.metadata().collection_kind,
4483 &metadata.op,
4484 builders_or_callback, next_stmt_id,
4485 );
4486
4487 let stmt_id = next_stmt_id.get_and_increment();
4488 let inspect_ident =
4489 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4490
4491 match builders_or_callback {
4492 BuildersOrCallback::Builders(graph_builders) => {
4493 let builder = graph_builders.get_dfir_mut(&out_location);
4494 builder.add_dfir(
4495 parse_quote! {
4496 #inspect_ident = #input_ident -> inspect(#f_tokens);
4497 },
4498 None,
4499 Some(&stmt_id.to_string()),
4500 );
4501 }
4502 BuildersOrCallback::Callback(_, node_callback) => {
4503 node_callback(node, next_stmt_id);
4504 }
4505 }
4506
4507 ident_stack.push(inspect_ident);
4508 }
4509
4510 HydroNode::Unique { input, .. } => {
4511 let input_ident = ident_stack.pop().unwrap();
4512
4513 let stmt_id = next_stmt_id.get_and_increment();
4514 let unique_ident =
4515 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4516
4517 match builders_or_callback {
4518 BuildersOrCallback::Builders(graph_builders) => {
4519 let builder = graph_builders.get_dfir_mut(&out_location);
4520 let lifetime = if input.metadata().location_id.is_top_level() {
4521 quote!('static)
4522 } else {
4523 quote!('tick)
4524 };
4525
4526 builder.add_dfir(
4527 parse_quote! {
4528 #unique_ident = #input_ident -> unique::<#lifetime>();
4529 },
4530 None,
4531 Some(&stmt_id.to_string()),
4532 );
4533 }
4534 BuildersOrCallback::Callback(_, node_callback) => {
4535 node_callback(node, next_stmt_id);
4536 }
4537 }
4538
4539 ident_stack.push(unique_ident);
4540 }
4541
4542 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4543 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4544 if input.metadata().location_id.is_top_level()
4545 && input.metadata().collection_kind.is_bounded()
4546 {
4547 parse_quote!(fold_no_replay)
4548 } else {
4549 parse_quote!(fold)
4550 }
4551 } else if matches!(node, HydroNode::Scan { .. }) {
4552 parse_quote!(scan)
4553 } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4554 parse_quote!(scan_async_blocking)
4555 } else if let HydroNode::FoldKeyed { input, .. } = node {
4556 if input.metadata().location_id.is_top_level()
4557 && input.metadata().collection_kind.is_bounded()
4558 {
4559 todo!("Fold keyed on a top-level bounded collection is not yet supported")
4560 } else {
4561 parse_quote!(fold_keyed)
4562 }
4563 } else {
4564 unreachable!()
4565 };
4566
4567 let (HydroNode::Fold { input, .. }
4568 | HydroNode::FoldKeyed { input, .. }
4569 | HydroNode::Scan { input, .. }
4570 | HydroNode::ScanAsyncBlocking { input, .. }) = node
4571 else {
4572 unreachable!()
4573 };
4574
4575 let lifetime = if input.metadata().location_id.is_top_level() {
4576 quote!('static)
4577 } else {
4578 quote!('tick)
4579 };
4580
4581 let input_ident = ident_stack.pop().unwrap();
4582
4583 let (HydroNode::Fold { init, acc, .. }
4584 | HydroNode::FoldKeyed { init, acc, .. }
4585 | HydroNode::Scan { init, acc, .. }
4586 | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4587 else {
4588 unreachable!()
4589 };
4590
4591 let acc_tokens = acc.emit_tokens(&mut ident_stack);
4592 let init_tokens = init.emit_tokens(&mut ident_stack);
4593
4594 let stmt_id = next_stmt_id.get_and_increment();
4595 let fold_ident =
4596 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4597
4598 match builders_or_callback {
4599 BuildersOrCallback::Builders(graph_builders) => {
4600 if matches!(node, HydroNode::Fold { .. })
4601 && node.metadata().location_id.is_top_level()
4602 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4603 && graph_builders.singleton_intermediates()
4604 && !node.metadata().collection_kind.is_bounded()
4605 {
4606 let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4607 let hooked_input_ident = graph_builders.emit_fold_hook(
4608 &input.metadata().location_id,
4609 &input_ident,
4610 &input.metadata().collection_kind,
4611 &node.metadata().op,
4612 );
4613
4614 let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4615 let acc: syn::Expr = parse_quote!({
4616 let mut __inner = #acc_tokens;
4617 move |__state, __batch: Vec<_>| {
4618 if __batch.is_empty() {
4619 return None;
4620 }
4621 for __value in __batch {
4622 __inner(__state, __value);
4623 }
4624 Some(__state.clone())
4625 }
4626 });
4627 (hooked, acc)
4628 } else {
4629 let acc: syn::Expr = parse_quote!({
4630 let mut __inner = #acc_tokens;
4631 move |__state, __value| {
4632 __inner(__state, __value);
4633 Some(__state.clone())
4634 }
4635 });
4636 (&input_ident, acc)
4637 };
4638
4639 let builder = graph_builders.get_dfir_mut(&out_location);
4640 builder.add_dfir(
4641 parse_quote! {
4642 source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4643 #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4644 #fold_ident = chain();
4645 },
4646 None,
4647 Some(&stmt_id.to_string()),
4648 );
4649
4650 if hooked_input_ident.is_some() {
4651 fold_hooked_idents.insert(fold_ident.to_string());
4652 }
4653 } else if matches!(node, HydroNode::FoldKeyed { .. })
4654 && node.metadata().location_id.is_top_level()
4655 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4656 && graph_builders.singleton_intermediates()
4657 && !node.metadata().collection_kind.is_bounded()
4658 {
4659 let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4660 let hooked_input_ident = graph_builders.emit_fold_hook(
4661 &input.metadata().location_id,
4662 &input_ident,
4663 &input.metadata().collection_kind,
4664 &node.metadata().op,
4665 );
4666 let builder = graph_builders.get_dfir_mut(&out_location);
4667
4668 let wrapped_acc: syn::Expr = parse_quote!({
4669 let mut __init = #init_tokens;
4670 let mut __inner = #acc_tokens;
4671 move |__state, __kv: (_, _)| {
4672 let __state = __state
4674 .entry(::std::clone::Clone::clone(&__kv.0))
4675 .or_insert_with(|| (__init)());
4676 __inner(__state, __kv.1);
4677 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4678 }
4679 });
4680
4681 if let Some(hooked_input_ident) = hooked_input_ident {
4682 builder.add_dfir(
4683 parse_quote! {
4684 #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4685 },
4686 None,
4687 Some(&stmt_id.to_string()),
4688 );
4689
4690 fold_hooked_idents.insert(fold_ident.to_string());
4691 } else {
4692 builder.add_dfir(
4693 parse_quote! {
4694 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4695 },
4696 None,
4697 Some(&stmt_id.to_string()),
4698 );
4699 }
4700 } else if (matches!(node, HydroNode::Fold { .. })
4701 || matches!(node, HydroNode::FoldKeyed { .. }))
4702 && !node.metadata().location_id.is_top_level()
4703 && graph_builders.singleton_intermediates()
4704 {
4705 let input_ref = match &*node {
4706 HydroNode::Fold { input, .. } => input,
4707 HydroNode::FoldKeyed { input, .. } => input,
4708 _ => unreachable!(),
4709 };
4710 let hooked_input_ident = graph_builders.emit_fold_hook(
4711 &input_ref.metadata().location_id,
4712 &input_ident,
4713 &input_ref.metadata().collection_kind,
4714 &node.metadata().op,
4715 );
4716
4717 let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4718 let builder = graph_builders.get_dfir_mut(&out_location);
4719 builder.add_dfir(
4720 parse_quote! {
4721 #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4722 },
4723 None,
4724 Some(&stmt_id.to_string()),
4725 );
4726 } else {
4727 let builder = graph_builders.get_dfir_mut(&out_location);
4728 builder.add_dfir(
4729 parse_quote! {
4730 #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4731 },
4732 None,
4733 Some(&stmt_id.to_string()),
4734 );
4735 }
4736 }
4737 BuildersOrCallback::Callback(_, node_callback) => {
4738 node_callback(node, next_stmt_id);
4739 }
4740 }
4741
4742 ident_stack.push(fold_ident);
4743 }
4744
4745 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4746 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4747 if input.metadata().location_id.is_top_level()
4748 && input.metadata().collection_kind.is_bounded()
4749 {
4750 parse_quote!(reduce_no_replay)
4751 } else {
4752 parse_quote!(reduce)
4753 }
4754 } else if let HydroNode::ReduceKeyed { input, .. } = node {
4755 if input.metadata().location_id.is_top_level()
4756 && input.metadata().collection_kind.is_bounded()
4757 {
4758 todo!(
4759 "Calling keyed reduce on a top-level bounded collection is not supported"
4760 )
4761 } else {
4762 parse_quote!(reduce_keyed)
4763 }
4764 } else {
4765 unreachable!()
4766 };
4767
4768 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4769 else {
4770 unreachable!()
4771 };
4772
4773 let lifetime = if input.metadata().location_id.is_top_level() {
4774 quote!('static)
4775 } else {
4776 quote!('tick)
4777 };
4778
4779 let input_ident = ident_stack.pop().unwrap();
4780
4781 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4782 else {
4783 unreachable!()
4784 };
4785
4786 let f_tokens = f.emit_tokens(&mut ident_stack);
4787
4788 let stmt_id = next_stmt_id.get_and_increment();
4789 let reduce_ident =
4790 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4791
4792 match builders_or_callback {
4793 BuildersOrCallback::Builders(graph_builders) => {
4794 if matches!(node, HydroNode::Reduce { .. })
4795 && node.metadata().location_id.is_top_level()
4796 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4797 && graph_builders.singleton_intermediates()
4798 && !node.metadata().collection_kind.is_bounded()
4799 {
4800 todo!(
4801 "Reduce with optional intermediates is not yet supported in simulator"
4802 );
4803 } else if matches!(node, HydroNode::ReduceKeyed { .. })
4804 && node.metadata().location_id.is_top_level()
4805 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4806 && graph_builders.singleton_intermediates()
4807 && !node.metadata().collection_kind.is_bounded()
4808 {
4809 todo!(
4810 "Reduce keyed with optional intermediates is not yet supported in simulator"
4811 );
4812 } else {
4813 let builder = graph_builders.get_dfir_mut(&out_location);
4814 builder.add_dfir(
4815 parse_quote! {
4816 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4817 },
4818 None,
4819 Some(&stmt_id.to_string()),
4820 );
4821 }
4822 }
4823 BuildersOrCallback::Callback(_, node_callback) => {
4824 node_callback(node, next_stmt_id);
4825 }
4826 }
4827
4828 ident_stack.push(reduce_ident);
4829 }
4830
4831 HydroNode::ReduceKeyedWatermark {
4832 f,
4833 input,
4834 metadata,
4835 ..
4836 } => {
4837 let lifetime = if input.metadata().location_id.is_top_level() {
4838 quote!('static)
4839 } else {
4840 quote!('tick)
4841 };
4842
4843 let watermark_ident = ident_stack.pop().unwrap();
4845 let input_ident = ident_stack.pop().unwrap();
4846 let f_tokens = f.emit_tokens(&mut ident_stack);
4847
4848 let stmt_id = next_stmt_id.get_and_increment();
4849 let chain_ident = syn::Ident::new(
4850 &format!("reduce_keyed_watermark_chain_{}", stmt_id),
4851 Span::call_site(),
4852 );
4853
4854 let fold_ident =
4855 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4856
4857 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4858 && input.metadata().collection_kind.is_bounded()
4859 {
4860 parse_quote!(fold_no_replay)
4861 } else {
4862 parse_quote!(fold)
4863 };
4864
4865 match builders_or_callback {
4866 BuildersOrCallback::Builders(graph_builders) => {
4867 if metadata.location_id.is_top_level()
4868 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4869 && graph_builders.singleton_intermediates()
4870 && !metadata.collection_kind.is_bounded()
4871 {
4872 todo!(
4873 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4874 )
4875 } else {
4876 let builder = graph_builders.get_dfir_mut(&out_location);
4877 builder.add_dfir(
4878 parse_quote! {
4879 #chain_ident = chain();
4880 #input_ident
4881 -> map(|x| (Some(x), None))
4882 -> [0]#chain_ident;
4883 #watermark_ident
4884 -> map(|watermark| (None, Some(watermark)))
4885 -> [1]#chain_ident;
4886
4887 #fold_ident = #chain_ident
4888 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4889 let __reduce_keyed_fn = #f_tokens;
4890 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4891 if let Some((k, v)) = opt_payload {
4892 if let Some(curr_watermark) = *opt_curr_watermark {
4893 if k < curr_watermark {
4894 return;
4895 }
4896 }
4897 match map.entry(k) {
4898 ::std::collections::hash_map::Entry::Vacant(e) => {
4899 e.insert(v);
4900 }
4901 ::std::collections::hash_map::Entry::Occupied(mut e) => {
4902 __reduce_keyed_fn(e.get_mut(), v);
4903 }
4904 }
4905 } else {
4906 let watermark = opt_watermark.unwrap();
4907 if let Some(curr_watermark) = *opt_curr_watermark {
4908 if watermark <= curr_watermark {
4909 return;
4910 }
4911 }
4912 map.retain(|k, _| *k >= watermark);
4913 *opt_curr_watermark = Some(watermark);
4914 }
4915 }
4916 })
4917 -> flat_map(|(map, _curr_watermark)| map);
4918 },
4919 None,
4920 Some(&stmt_id.to_string()),
4921 );
4922 }
4923 }
4924 BuildersOrCallback::Callback(_, node_callback) => {
4925 node_callback(node, next_stmt_id);
4926 }
4927 }
4928
4929 ident_stack.push(fold_ident);
4930 }
4931
4932 HydroNode::Network {
4933 networking_info,
4934 serialize_fn: serialize_pipeline,
4935 instantiate_fn,
4936 deserialize_fn: deserialize_pipeline,
4937 input,
4938 ..
4939 } => {
4940 let input_ident = ident_stack.pop().unwrap();
4941
4942 let stmt_id = next_stmt_id.get_and_increment();
4943 let receiver_stream_ident =
4944 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4945
4946 match builders_or_callback {
4947 BuildersOrCallback::Builders(graph_builders) => {
4948 let (sink_expr, source_expr) = match instantiate_fn {
4949 DebugInstantiate::Building => (
4950 syn::parse_quote!(DUMMY_SINK),
4951 syn::parse_quote!(DUMMY_SOURCE),
4952 ),
4953
4954 DebugInstantiate::Finalized(finalized) => {
4955 (finalized.sink.clone(), finalized.source.clone())
4956 }
4957 };
4958
4959 graph_builders.create_network(
4960 &input.metadata().location_id,
4961 &out_location,
4962 input_ident,
4963 &receiver_stream_ident,
4964 serialize_pipeline.as_ref(),
4965 sink_expr,
4966 source_expr,
4967 deserialize_pipeline.as_ref(),
4968 stmt_id,
4969 networking_info,
4970 );
4971 }
4972 BuildersOrCallback::Callback(_, node_callback) => {
4973 node_callback(node, next_stmt_id);
4974 }
4975 }
4976
4977 ident_stack.push(receiver_stream_ident);
4978 }
4979
4980 HydroNode::ExternalInput {
4981 instantiate_fn,
4982 deserialize_fn: deserialize_pipeline,
4983 ..
4984 } => {
4985 let stmt_id = next_stmt_id.get_and_increment();
4986 let receiver_stream_ident =
4987 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4988
4989 match builders_or_callback {
4990 BuildersOrCallback::Builders(graph_builders) => {
4991 let (_, source_expr) = match instantiate_fn {
4992 DebugInstantiate::Building => (
4993 syn::parse_quote!(DUMMY_SINK),
4994 syn::parse_quote!(DUMMY_SOURCE),
4995 ),
4996
4997 DebugInstantiate::Finalized(finalized) => {
4998 (finalized.sink.clone(), finalized.source.clone())
4999 }
5000 };
5001
5002 graph_builders.create_external_source(
5003 &out_location,
5004 source_expr,
5005 &receiver_stream_ident,
5006 deserialize_pipeline.as_ref(),
5007 stmt_id,
5008 );
5009 }
5010 BuildersOrCallback::Callback(_, node_callback) => {
5011 node_callback(node, next_stmt_id);
5012 }
5013 }
5014
5015 ident_stack.push(receiver_stream_ident);
5016 }
5017
5018 HydroNode::Counter {
5019 tag,
5020 duration,
5021 prefix,
5022 ..
5023 } => {
5024 let input_ident = ident_stack.pop().unwrap();
5025
5026 let stmt_id = next_stmt_id.get_and_increment();
5027 let counter_ident =
5028 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5029
5030 match builders_or_callback {
5031 BuildersOrCallback::Builders(graph_builders) => {
5032 let arg = format!("{}({})", prefix, tag);
5033 let builder = graph_builders.get_dfir_mut(&out_location);
5034 builder.add_dfir(
5035 parse_quote! {
5036 #counter_ident = #input_ident -> _counter(#arg, #duration);
5037 },
5038 None,
5039 Some(&stmt_id.to_string()),
5040 );
5041 }
5042 BuildersOrCallback::Callback(_, node_callback) => {
5043 node_callback(node, next_stmt_id);
5044 }
5045 }
5046
5047 ident_stack.push(counter_ident);
5048 }
5049 }
5050 },
5051 seen_tees,
5052 false,
5053 );
5054
5055 let ret = ident_stack
5056 .pop()
5057 .expect("ident_stack should have exactly one element after traversal");
5058 assert!(
5059 ident_stack.is_empty(),
5060 "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
5061 This indicates a bug in the code gen: some node pushed idents that were never consumed.",
5062 ident_stack.len()
5063 );
5064 ret
5065 }
5066
5067 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
5068 match self {
5069 HydroNode::Placeholder => {
5070 panic!()
5071 }
5072 HydroNode::Cast { .. }
5073 | HydroNode::ObserveNonDet { .. }
5074 | HydroNode::UnboundSingleton { .. }
5075 | HydroNode::AssertIsConsistent { .. } => {}
5076 HydroNode::Source { source, .. } => match source {
5077 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
5078 HydroSource::ExternalNetwork()
5079 | HydroSource::Spin()
5080 | HydroSource::ClusterMembers(_, _)
5081 | HydroSource::Embedded(_)
5082 | HydroSource::EmbeddedSingleton(_) => {} },
5084 HydroNode::SingletonSource { value, .. } => {
5085 transform(value);
5086 }
5087 HydroNode::CycleSource { .. }
5088 | HydroNode::Tee { .. }
5089 | HydroNode::Reference { .. }
5090 | HydroNode::YieldConcat { .. }
5091 | HydroNode::BeginAtomic { .. }
5092 | HydroNode::EndAtomic { .. }
5093 | HydroNode::Batch { .. }
5094 | HydroNode::Chain { .. }
5095 | HydroNode::MergeOrdered { .. }
5096 | HydroNode::ChainFirst { .. }
5097 | HydroNode::CrossProduct { .. }
5098 | HydroNode::CrossSingleton { .. }
5099 | HydroNode::ResolveFutures { .. }
5100 | HydroNode::ResolveFuturesBlocking { .. }
5101 | HydroNode::ResolveFuturesOrdered { .. }
5102 | HydroNode::Join { .. }
5103 | HydroNode::JoinHalf { .. }
5104 | HydroNode::Difference { .. }
5105 | HydroNode::AntiJoin { .. }
5106 | HydroNode::DeferTick { .. }
5107 | HydroNode::Enumerate { .. }
5108 | HydroNode::Unique { .. }
5109 | HydroNode::Sort { .. } => {}
5110 HydroNode::Map { f, .. }
5111 | HydroNode::FlatMap { f, .. }
5112 | HydroNode::FlatMapStreamBlocking { f, .. }
5113 | HydroNode::Filter { f, .. }
5114 | HydroNode::FilterMap { f, .. }
5115 | HydroNode::Inspect { f, .. }
5116 | HydroNode::Partition { f, .. }
5117 | HydroNode::Reduce { f, .. }
5118 | HydroNode::ReduceKeyed { f, .. }
5119 | HydroNode::ReduceKeyedWatermark { f, .. } => {
5120 transform(&mut f.expr);
5121 }
5122 HydroNode::Fold { init, acc, .. }
5123 | HydroNode::Scan { init, acc, .. }
5124 | HydroNode::ScanAsyncBlocking { init, acc, .. }
5125 | HydroNode::FoldKeyed { init, acc, .. } => {
5126 transform(&mut init.expr);
5127 transform(&mut acc.expr);
5128 }
5129 HydroNode::Network {
5130 serialize_fn,
5131 deserialize_fn,
5132 ..
5133 } => {
5134 if let Some(serialize_fn) = serialize_fn {
5135 transform(serialize_fn);
5136 }
5137 if let Some(deserialize_fn) = deserialize_fn {
5138 transform(deserialize_fn);
5139 }
5140 }
5141 HydroNode::ExternalInput { deserialize_fn, .. } => {
5142 if let Some(deserialize_fn) = deserialize_fn {
5143 transform(deserialize_fn);
5144 }
5145 }
5146 HydroNode::Counter { duration, .. } => {
5147 transform(duration);
5148 }
5149 }
5150 }
5151
5152 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
5153 &self.metadata().op
5154 }
5155
5156 pub fn metadata(&self) -> &HydroIrMetadata {
5157 match self {
5158 HydroNode::Placeholder => {
5159 panic!()
5160 }
5161 HydroNode::Cast { metadata, .. }
5162 | HydroNode::ObserveNonDet { metadata, .. }
5163 | HydroNode::AssertIsConsistent { metadata, .. }
5164 | HydroNode::UnboundSingleton { metadata, .. }
5165 | HydroNode::Source { metadata, .. }
5166 | HydroNode::SingletonSource { metadata, .. }
5167 | HydroNode::CycleSource { metadata, .. }
5168 | HydroNode::Tee { metadata, .. }
5169 | HydroNode::Reference { metadata, .. }
5170 | HydroNode::Partition { metadata, .. }
5171 | HydroNode::YieldConcat { metadata, .. }
5172 | HydroNode::BeginAtomic { metadata, .. }
5173 | HydroNode::EndAtomic { metadata, .. }
5174 | HydroNode::Batch { metadata, .. }
5175 | HydroNode::Chain { metadata, .. }
5176 | HydroNode::MergeOrdered { metadata, .. }
5177 | HydroNode::ChainFirst { metadata, .. }
5178 | HydroNode::CrossProduct { metadata, .. }
5179 | HydroNode::CrossSingleton { metadata, .. }
5180 | HydroNode::Join { metadata, .. }
5181 | HydroNode::JoinHalf { metadata, .. }
5182 | HydroNode::Difference { metadata, .. }
5183 | HydroNode::AntiJoin { metadata, .. }
5184 | HydroNode::ResolveFutures { metadata, .. }
5185 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5186 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5187 | HydroNode::Map { metadata, .. }
5188 | HydroNode::FlatMap { metadata, .. }
5189 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5190 | HydroNode::Filter { metadata, .. }
5191 | HydroNode::FilterMap { metadata, .. }
5192 | HydroNode::DeferTick { metadata, .. }
5193 | HydroNode::Enumerate { metadata, .. }
5194 | HydroNode::Inspect { metadata, .. }
5195 | HydroNode::Unique { metadata, .. }
5196 | HydroNode::Sort { metadata, .. }
5197 | HydroNode::Scan { metadata, .. }
5198 | HydroNode::ScanAsyncBlocking { metadata, .. }
5199 | HydroNode::Fold { metadata, .. }
5200 | HydroNode::FoldKeyed { metadata, .. }
5201 | HydroNode::Reduce { metadata, .. }
5202 | HydroNode::ReduceKeyed { metadata, .. }
5203 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5204 | HydroNode::ExternalInput { metadata, .. }
5205 | HydroNode::Network { metadata, .. }
5206 | HydroNode::Counter { metadata, .. } => metadata,
5207 }
5208 }
5209
5210 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5211 &mut self.metadata_mut().op
5212 }
5213
5214 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5215 match self {
5216 HydroNode::Placeholder => {
5217 panic!()
5218 }
5219 HydroNode::Cast { metadata, .. }
5220 | HydroNode::ObserveNonDet { metadata, .. }
5221 | HydroNode::AssertIsConsistent { metadata, .. }
5222 | HydroNode::UnboundSingleton { metadata, .. }
5223 | HydroNode::Source { metadata, .. }
5224 | HydroNode::SingletonSource { metadata, .. }
5225 | HydroNode::CycleSource { metadata, .. }
5226 | HydroNode::Tee { metadata, .. }
5227 | HydroNode::Reference { metadata, .. }
5228 | HydroNode::Partition { metadata, .. }
5229 | HydroNode::YieldConcat { metadata, .. }
5230 | HydroNode::BeginAtomic { metadata, .. }
5231 | HydroNode::EndAtomic { metadata, .. }
5232 | HydroNode::Batch { metadata, .. }
5233 | HydroNode::Chain { metadata, .. }
5234 | HydroNode::MergeOrdered { metadata, .. }
5235 | HydroNode::ChainFirst { metadata, .. }
5236 | HydroNode::CrossProduct { metadata, .. }
5237 | HydroNode::CrossSingleton { metadata, .. }
5238 | HydroNode::Join { metadata, .. }
5239 | HydroNode::JoinHalf { metadata, .. }
5240 | HydroNode::Difference { metadata, .. }
5241 | HydroNode::AntiJoin { metadata, .. }
5242 | HydroNode::ResolveFutures { metadata, .. }
5243 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5244 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5245 | HydroNode::Map { metadata, .. }
5246 | HydroNode::FlatMap { metadata, .. }
5247 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5248 | HydroNode::Filter { metadata, .. }
5249 | HydroNode::FilterMap { metadata, .. }
5250 | HydroNode::DeferTick { metadata, .. }
5251 | HydroNode::Enumerate { metadata, .. }
5252 | HydroNode::Inspect { metadata, .. }
5253 | HydroNode::Unique { metadata, .. }
5254 | HydroNode::Sort { metadata, .. }
5255 | HydroNode::Scan { metadata, .. }
5256 | HydroNode::ScanAsyncBlocking { metadata, .. }
5257 | HydroNode::Fold { metadata, .. }
5258 | HydroNode::FoldKeyed { metadata, .. }
5259 | HydroNode::Reduce { metadata, .. }
5260 | HydroNode::ReduceKeyed { metadata, .. }
5261 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5262 | HydroNode::ExternalInput { metadata, .. }
5263 | HydroNode::Network { metadata, .. }
5264 | HydroNode::Counter { metadata, .. } => metadata,
5265 }
5266 }
5267
5268 pub fn input(&self) -> Vec<&HydroNode> {
5269 match self {
5270 HydroNode::Placeholder => {
5271 panic!()
5272 }
5273 HydroNode::Source { .. }
5274 | HydroNode::SingletonSource { .. }
5275 | HydroNode::ExternalInput { .. }
5276 | HydroNode::CycleSource { .. }
5277 | HydroNode::Tee { .. }
5278 | HydroNode::Reference { .. }
5279 | HydroNode::Partition { .. } => {
5280 vec![]
5282 }
5283 HydroNode::Cast { inner, .. }
5284 | HydroNode::ObserveNonDet { inner, .. }
5285 | HydroNode::YieldConcat { inner, .. }
5286 | HydroNode::BeginAtomic { inner, .. }
5287 | HydroNode::EndAtomic { inner, .. }
5288 | HydroNode::Batch { inner, .. }
5289 | HydroNode::UnboundSingleton { inner, .. }
5290 | HydroNode::AssertIsConsistent { inner, .. } => {
5291 vec![inner]
5292 }
5293 HydroNode::Chain { first, second, .. } => {
5294 vec![first, second]
5295 }
5296 HydroNode::MergeOrdered { first, second, .. } => {
5297 vec![first, second]
5298 }
5299 HydroNode::ChainFirst { first, second, .. } => {
5300 vec![first, second]
5301 }
5302 HydroNode::CrossProduct { left, right, .. }
5303 | HydroNode::CrossSingleton { left, right, .. }
5304 | HydroNode::Join { left, right, .. }
5305 | HydroNode::JoinHalf { left, right, .. } => {
5306 vec![left, right]
5307 }
5308 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5309 vec![pos, neg]
5310 }
5311 HydroNode::Map { input, .. }
5312 | HydroNode::FlatMap { input, .. }
5313 | HydroNode::FlatMapStreamBlocking { input, .. }
5314 | HydroNode::Filter { input, .. }
5315 | HydroNode::FilterMap { input, .. }
5316 | HydroNode::Sort { input, .. }
5317 | HydroNode::DeferTick { input, .. }
5318 | HydroNode::Enumerate { input, .. }
5319 | HydroNode::Inspect { input, .. }
5320 | HydroNode::Unique { input, .. }
5321 | HydroNode::Network { input, .. }
5322 | HydroNode::Counter { input, .. }
5323 | HydroNode::ResolveFutures { input, .. }
5324 | HydroNode::ResolveFuturesBlocking { input, .. }
5325 | HydroNode::ResolveFuturesOrdered { input, .. }
5326 | HydroNode::Fold { input, .. }
5327 | HydroNode::FoldKeyed { input, .. }
5328 | HydroNode::Reduce { input, .. }
5329 | HydroNode::ReduceKeyed { input, .. }
5330 | HydroNode::Scan { input, .. }
5331 | HydroNode::ScanAsyncBlocking { input, .. } => {
5332 vec![input]
5333 }
5334 HydroNode::ReduceKeyedWatermark {
5335 input, watermark, ..
5336 } => {
5337 vec![input, watermark]
5338 }
5339 }
5340 }
5341
5342 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5343 self.input()
5344 .iter()
5345 .map(|input_node| input_node.metadata())
5346 .collect()
5347 }
5348
5349 pub fn is_shared_with_others(&self) -> bool {
5353 match self {
5354 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5355 Rc::strong_count(&inner.0) > 1
5356 }
5357 HydroNode::Reference { .. } => false,
5360 _ => false,
5361 }
5362 }
5363
5364 pub fn print_root(&self) -> String {
5365 match self {
5366 HydroNode::Placeholder => {
5367 panic!()
5368 }
5369 HydroNode::Cast { .. } => "Cast()".to_owned(),
5370 HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5371 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5372 HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5373 HydroNode::Source { source, .. } => format!("Source({:?})", source),
5374 HydroNode::SingletonSource {
5375 value,
5376 first_tick_only,
5377 ..
5378 } => format!(
5379 "SingletonSource({:?}, first_tick_only={})",
5380 value, first_tick_only
5381 ),
5382 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5383 HydroNode::Tee { inner, .. } => {
5384 format!("Tee({})", inner.0.borrow().print_root())
5385 }
5386 HydroNode::Reference { inner, kind, .. } => {
5387 format!("Reference({:?}, {})", kind, inner.0.borrow().print_root())
5388 }
5389 HydroNode::Partition { f, is_true, .. } => {
5390 format!("Partition({:?}, is_true={})", f, is_true)
5391 }
5392 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5393 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5394 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5395 HydroNode::Batch { .. } => "Batch()".to_owned(),
5396 HydroNode::Chain { first, second, .. } => {
5397 format!("Chain({}, {})", first.print_root(), second.print_root())
5398 }
5399 HydroNode::MergeOrdered { first, second, .. } => {
5400 format!(
5401 "MergeOrdered({}, {})",
5402 first.print_root(),
5403 second.print_root()
5404 )
5405 }
5406 HydroNode::ChainFirst { first, second, .. } => {
5407 format!(
5408 "ChainFirst({}, {})",
5409 first.print_root(),
5410 second.print_root()
5411 )
5412 }
5413 HydroNode::CrossProduct { left, right, .. } => {
5414 format!(
5415 "CrossProduct({}, {})",
5416 left.print_root(),
5417 right.print_root()
5418 )
5419 }
5420 HydroNode::CrossSingleton { left, right, .. } => {
5421 format!(
5422 "CrossSingleton({}, {})",
5423 left.print_root(),
5424 right.print_root()
5425 )
5426 }
5427 HydroNode::Join { left, right, .. } => {
5428 format!("Join({}, {})", left.print_root(), right.print_root())
5429 }
5430 HydroNode::JoinHalf { left, right, .. } => {
5431 format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5432 }
5433 HydroNode::Difference { pos, neg, .. } => {
5434 format!("Difference({}, {})", pos.print_root(), neg.print_root())
5435 }
5436 HydroNode::AntiJoin { pos, neg, .. } => {
5437 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5438 }
5439 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5440 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5441 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5442 HydroNode::Map { f, .. } => format!("Map({:?})", f),
5443 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5444 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5445 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5446 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5447 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5448 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5449 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5450 HydroNode::Unique { .. } => "Unique()".to_owned(),
5451 HydroNode::Sort { .. } => "Sort()".to_owned(),
5452 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5453 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5454 HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5455 format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5456 }
5457 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5458 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5459 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5460 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5461 HydroNode::Network { .. } => "Network()".to_owned(),
5462 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5463 HydroNode::Counter { tag, duration, .. } => {
5464 format!("Counter({:?}, {:?})", tag, duration)
5465 }
5466 }
5467 }
5468}
5469
5470#[cfg(feature = "build")]
5471fn instantiate_network<'a, D>(
5472 env: &mut D::InstantiateEnv,
5473 from_location: &LocationId,
5474 to_location: &LocationId,
5475 processes: &SparseSecondaryMap<LocationKey, D::Process>,
5476 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5477 name: Option<&str>,
5478 networking_info: &crate::networking::NetworkingInfo,
5479) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5480where
5481 D: Deploy<'a>,
5482{
5483 let ((sink, source), connect_fn) = match (from_location, to_location) {
5484 (&LocationId::Process(from), &LocationId::Process(to)) => {
5485 let from_node = processes
5486 .get(from)
5487 .unwrap_or_else(|| {
5488 panic!("A process used in the graph was not instantiated: {}", from)
5489 })
5490 .clone();
5491 let to_node = processes
5492 .get(to)
5493 .unwrap_or_else(|| {
5494 panic!("A process used in the graph was not instantiated: {}", to)
5495 })
5496 .clone();
5497
5498 let sink_port = from_node.next_port();
5499 let source_port = to_node.next_port();
5500
5501 (
5502 D::o2o_sink_source(
5503 env,
5504 &from_node,
5505 &sink_port,
5506 &to_node,
5507 &source_port,
5508 name,
5509 networking_info,
5510 ),
5511 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5512 )
5513 }
5514 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5515 let from_node = processes
5516 .get(from)
5517 .unwrap_or_else(|| {
5518 panic!("A process used in the graph was not instantiated: {}", from)
5519 })
5520 .clone();
5521 let to_node = clusters
5522 .get(to)
5523 .unwrap_or_else(|| {
5524 panic!("A cluster used in the graph was not instantiated: {}", to)
5525 })
5526 .clone();
5527
5528 let sink_port = from_node.next_port();
5529 let source_port = to_node.next_port();
5530
5531 (
5532 D::o2m_sink_source(
5533 env,
5534 &from_node,
5535 &sink_port,
5536 &to_node,
5537 &source_port,
5538 name,
5539 networking_info,
5540 ),
5541 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5542 )
5543 }
5544 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5545 let from_node = clusters
5546 .get(from)
5547 .unwrap_or_else(|| {
5548 panic!("A cluster used in the graph was not instantiated: {}", from)
5549 })
5550 .clone();
5551 let to_node = processes
5552 .get(to)
5553 .unwrap_or_else(|| {
5554 panic!("A process used in the graph was not instantiated: {}", to)
5555 })
5556 .clone();
5557
5558 let sink_port = from_node.next_port();
5559 let source_port = to_node.next_port();
5560
5561 (
5562 D::m2o_sink_source(
5563 env,
5564 &from_node,
5565 &sink_port,
5566 &to_node,
5567 &source_port,
5568 name,
5569 networking_info,
5570 ),
5571 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5572 )
5573 }
5574 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5575 let from_node = clusters
5576 .get(from)
5577 .unwrap_or_else(|| {
5578 panic!("A cluster used in the graph was not instantiated: {}", from)
5579 })
5580 .clone();
5581 let to_node = clusters
5582 .get(to)
5583 .unwrap_or_else(|| {
5584 panic!("A cluster used in the graph was not instantiated: {}", to)
5585 })
5586 .clone();
5587
5588 let sink_port = from_node.next_port();
5589 let source_port = to_node.next_port();
5590
5591 (
5592 D::m2m_sink_source(
5593 env,
5594 &from_node,
5595 &sink_port,
5596 &to_node,
5597 &source_port,
5598 name,
5599 networking_info,
5600 ),
5601 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5602 )
5603 }
5604 (LocationId::Tick(_, _), _) => panic!(),
5605 (_, LocationId::Tick(_, _)) => panic!(),
5606 (LocationId::Atomic(_), _) => panic!(),
5607 (_, LocationId::Atomic(_)) => panic!(),
5608 };
5609 (sink, source, connect_fn)
5610}
5611
5612#[cfg(test)]
5613mod serde_test;
5614
5615#[cfg(test)]
5616mod test {
5617 use std::mem::size_of;
5618
5619 use stageleft::{QuotedWithContext, q};
5620
5621 use super::*;
5622
5623 #[test]
5624 #[cfg_attr(
5625 not(feature = "build"),
5626 ignore = "expects inclusion of feature-gated fields"
5627 )]
5628 fn hydro_node_size() {
5629 assert_eq!(size_of::<HydroNode>(), 264);
5630 }
5631
5632 #[test]
5633 #[cfg_attr(
5634 not(feature = "build"),
5635 ignore = "expects inclusion of feature-gated fields"
5636 )]
5637 fn hydro_root_size() {
5638 assert_eq!(size_of::<HydroRoot>(), 136);
5639 }
5640
5641 #[test]
5642 fn test_simplify_q_macro_basic() {
5643 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5645 let result = simplify_q_macro(simple_expr.clone());
5646 assert_eq!(result, simple_expr);
5647 }
5648
5649 #[test]
5650 fn test_simplify_q_macro_actual_stageleft_call() {
5651 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5653 let result = simplify_q_macro(stageleft_call);
5654 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5657 }
5658
5659 #[test]
5660 fn test_closure_no_pipe_at_start() {
5661 let stageleft_call = q!({
5663 let foo = 123;
5664 move |b: usize| b + foo
5665 })
5666 .splice_fn1_ctx(&());
5667 let result = simplify_q_macro(stageleft_call);
5668 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5669 }
5670}