Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23
24#[cfg(feature = "build")]
25use crate::compile::builder::ClockId;
26#[cfg(feature = "build")]
27use crate::compile::builder::StmtId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31#[cfg(feature = "build")]
32use crate::handoff_ref::handoff_ref_ident;
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39/// A closure expression bundled with any singleton references it captures.
40///
41/// When a `q!()` closure captures a `SingletonRef`, the reference is recorded here
42/// alongside the closure's expression. This allows per-closure tracking of singleton
43/// captures, which is important for nodes with multiple closures (e.g. Fold has `init` and `acc`).
44pub struct ClosureExpr {
45    pub(crate) expr: DebugExpr,
46    /// Each entry is `(HydroNode::Reference, is_mut: bool)`.
47    /// The index in the Vec determines the ident name via [`handoff_ref_ident`].
48    /// The `access_counter` was assigned at staging time in code order.
49    pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
50}
51
52impl Clone for ClosureExpr {
53    fn clone(&self) -> Self {
54        Self {
55            expr: self.expr.clone(),
56            singleton_refs: self
57                .singleton_refs
58                .iter()
59                .map(|(node, is_mut)| {
60                    let HydroNode::Reference {
61                        inner,
62                        kind,
63                        access_counter,
64                        metadata,
65                    } = node
66                    else {
67                        panic!("singleton_refs should only contain HydroNode::Reference");
68                    };
69                    (
70                        HydroNode::Reference {
71                            inner: SharedNode(Rc::clone(&inner.0)),
72                            kind: *kind,
73                            access_counter: access_counter.freeze(),
74                            metadata: metadata.clone(),
75                        },
76                        *is_mut,
77                    )
78                })
79                .collect(),
80        }
81    }
82}
83
84impl Hash for ClosureExpr {
85    fn hash<H: Hasher>(&self, state: &mut H) {
86        self.expr.hash(state);
87        // singleton_refs are structural children (like HydroIrMetadata), not
88        // identity-defining. Two closures with the same expr but different
89        // captured refs are the same closure text — the refs only affect codegen.
90    }
91}
92
93impl serde::Serialize for ClosureExpr {
94    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
95        use serde::ser::SerializeStruct;
96        let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
97        s.serialize_field("expr", &self.expr)?;
98        s.serialize_field(
99            "singleton_refs",
100            &SerializableSingletonRefs(&self.singleton_refs),
101        )?;
102        s.end()
103    }
104}
105
106struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
107
108impl serde::Serialize for SerializableSingletonRefs<'_> {
109    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
110        use serde::ser::SerializeSeq;
111        let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
112        for (node, is_mut) in self.0.iter() {
113            seq.serialize_element(&(node, is_mut))?;
114        }
115        seq.end()
116    }
117}
118
119impl Debug for ClosureExpr {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        Debug::fmt(&self.expr, f)
122    }
123}
124
125impl Display for ClosureExpr {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        Display::fmt(&self.expr, f)
128    }
129}
130
131impl From<syn::Expr> for ClosureExpr {
132    fn from(expr: syn::Expr) -> Self {
133        Self {
134            expr: DebugExpr(Box::new(expr)),
135            singleton_refs: Vec::new(),
136        }
137    }
138}
139
140impl From<DebugExpr> for ClosureExpr {
141    fn from(expr: DebugExpr) -> Self {
142        Self {
143            expr,
144            singleton_refs: Vec::new(),
145        }
146    }
147}
148
149impl ClosureExpr {
150    pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
151        Self {
152            expr,
153            singleton_refs,
154        }
155    }
156
157    pub fn has_mut_ref(&self) -> bool {
158        self.singleton_refs.iter().any(|(_, is_mut)| *is_mut)
159    }
160
161    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
162        Self {
163            expr: self.expr.clone(),
164            singleton_refs: self
165                .singleton_refs
166                .iter()
167                .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
168                .collect(),
169        }
170    }
171
172    pub fn transform_children(
173        &mut self,
174        transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
175        seen_tees: &mut SeenSharedNodes,
176    ) {
177        for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
178            transform(ref_node, seen_tees);
179        }
180    }
181
182    /// Pop singleton ref idents from the stack and rewrite the closure's token stream,
183    /// replacing local singleton ref idents with `#{N} dfir_ident` or `#{N} mut dfir_ident` references.
184    #[cfg(feature = "build")]
185    pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
186        if self.singleton_refs.is_empty() {
187            self.expr.0.to_token_stream()
188        } else {
189            assert!(
190                ident_stack.len() >= self.singleton_refs.len(),
191                "ident_stack has {} entries but expected at least {} for singleton_refs",
192                ident_stack.len(),
193                self.singleton_refs.len()
194            );
195            let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
196
197            let mut let_bindings = Vec::new();
198            for ((i, (ref_node, is_mut)), ref_ident) in
199                self.singleton_refs.iter().enumerate().zip(ref_idents)
200            {
201                let HydroNode::Reference { access_counter, .. } = ref_node else {
202                    panic!("ClosureExpression expected references to `HydroNode::Reference`");
203                };
204                let group = access_counter.frozen_group();
205                // TODO(mingwei): proper spanning?
206                let local_ident = handoff_ref_ident(i);
207                let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
208                let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
209                let mut_token = is_mut.then(|| quote!(mut));
210                let binding = quote! {
211                    let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
212                };
213                let_bindings.push(binding);
214            }
215
216            let expr = &self.expr.0;
217            quote! {
218                {
219                    #( #let_bindings )*
220                    #expr
221                }
222            }
223        }
224    }
225}
226
227/// Wrapper that displays only the tokens of a parsed expr.
228///
229/// Boxes `syn::Type` which is ~240 bytes.
230#[derive(Clone, Hash)]
231pub struct DebugExpr(pub Box<syn::Expr>);
232
233impl serde::Serialize for DebugExpr {
234    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
235        serializer.serialize_str(&self.to_string())
236    }
237}
238
239impl From<syn::Expr> for DebugExpr {
240    fn from(expr: syn::Expr) -> Self {
241        Self(Box::new(expr))
242    }
243}
244
245impl Deref for DebugExpr {
246    type Target = syn::Expr;
247
248    fn deref(&self) -> &Self::Target {
249        &self.0
250    }
251}
252
253impl ToTokens for DebugExpr {
254    fn to_tokens(&self, tokens: &mut TokenStream) {
255        self.0.to_tokens(tokens);
256    }
257}
258
259impl Debug for DebugExpr {
260    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261        write!(f, "{}", self.0.to_token_stream())
262    }
263}
264
265impl Display for DebugExpr {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        let original = self.0.as_ref().clone();
268        let simplified = simplify_q_macro(original);
269
270        // For now, just use quote formatting without trying to parse as a statement
271        // This avoids the syn::parse_quote! issues entirely
272        write!(f, "q!({})", quote::quote!(#simplified))
273    }
274}
275
276/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
277fn simplify_q_macro(expr: syn::Expr) -> syn::Expr {
278    if let syn::Expr::Call(ref call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
279        // Look for calls to stageleft::runtime_support::fn*
280        && is_stageleft_runtime_support_call(&path_expr.path)
281        && let syn::Expr::Block(b) = &call.args[0]
282        && b.block.stmts.len() == 3
283        && let Some(syn::Stmt::Expr(e, _)) = b.block.stmts.get(2)
284    // skip the first two, which are imports
285    {
286        let mut e = e.clone();
287        while let syn::Expr::Block(ref mut block) = e
288            && block.block.stmts.len() == 1
289            && let syn::Stmt::Expr(inner_e, _) = block.block.stmts.remove(0)
290        {
291            e = inner_e;
292        }
293
294        e
295    } else {
296        expr
297    }
298}
299
300fn is_stageleft_runtime_support_call(path: &syn::Path) -> bool {
301    // Check if this is a call to stageleft::runtime_support::fn*
302    if let Some(last_segment) = path.segments.last() {
303        let fn_name = last_segment.ident.to_string();
304        path.segments.len() > 2
305            && path.segments[0].ident == "stageleft"
306            && path.segments[1].ident == "runtime_support"
307            && fn_name.contains("_type_hint")
308    } else {
309        false
310    }
311}
312
313/// Debug displays the type's tokens.
314///
315/// Boxes `syn::Type` which is ~320 bytes.
316#[derive(Clone, PartialEq, Eq, Hash)]
317pub struct DebugType(pub Box<syn::Type>);
318
319impl From<syn::Type> for DebugType {
320    fn from(t: syn::Type) -> Self {
321        Self(Box::new(t))
322    }
323}
324
325impl Deref for DebugType {
326    type Target = syn::Type;
327
328    fn deref(&self) -> &Self::Target {
329        &self.0
330    }
331}
332
333impl ToTokens for DebugType {
334    fn to_tokens(&self, tokens: &mut TokenStream) {
335        self.0.to_tokens(tokens);
336    }
337}
338
339impl Debug for DebugType {
340    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341        write!(f, "{}", self.0.to_token_stream())
342    }
343}
344
345impl serde::Serialize for DebugType {
346    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
347        serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
348    }
349}
350
351fn serialize_backtrace_as_span<S: serde::Serializer>(
352    backtrace: &Backtrace,
353    serializer: S,
354) -> Result<S::Ok, S::Error> {
355    match backtrace.format_span() {
356        Some(span) => serializer.serialize_some(&span),
357        None => serializer.serialize_none(),
358    }
359}
360
361fn serialize_ident<S: serde::Serializer>(
362    ident: &syn::Ident,
363    serializer: S,
364) -> Result<S::Ok, S::Error> {
365    serializer.serialize_str(&ident.to_string())
366}
367
368pub enum DebugInstantiate {
369    Building,
370    Finalized(Box<DebugInstantiateFinalized>),
371}
372
373impl serde::Serialize for DebugInstantiate {
374    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
375        match self {
376            DebugInstantiate::Building => {
377                serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
378            }
379            DebugInstantiate::Finalized(_) => {
380                panic!(
381                    "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
382                )
383            }
384        }
385    }
386}
387
388#[cfg_attr(
389    not(feature = "build"),
390    expect(
391        dead_code,
392        reason = "sink, source unused without `feature = \"build\"`."
393    )
394)]
395pub struct DebugInstantiateFinalized {
396    sink: syn::Expr,
397    source: syn::Expr,
398    connect_fn: Option<Box<dyn FnOnce()>>,
399}
400
401impl From<DebugInstantiateFinalized> for DebugInstantiate {
402    fn from(f: DebugInstantiateFinalized) -> Self {
403        Self::Finalized(Box::new(f))
404    }
405}
406
407impl Debug for DebugInstantiate {
408    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409        write!(f, "<network instantiate>")
410    }
411}
412
413impl Hash for DebugInstantiate {
414    fn hash<H: Hasher>(&self, _state: &mut H) {
415        // Do nothing
416    }
417}
418
419impl Clone for DebugInstantiate {
420    fn clone(&self) -> Self {
421        match self {
422            DebugInstantiate::Building => DebugInstantiate::Building,
423            DebugInstantiate::Finalized(_) => {
424                panic!("DebugInstantiate::Finalized should not be cloned")
425            }
426        }
427    }
428}
429
430/// Tracks the instantiation state of a `ClusterMembers` source.
431///
432/// During `compile_network`, the first `ClusterMembers` node for a given
433/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
434/// receives the expression returned by `Deploy::cluster_membership_stream`.
435/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
436/// during code-gen they simply reference the tee output of the first node
437/// instead of creating a redundant `source_stream`.
438#[derive(Debug, Hash, Clone, serde::Serialize)]
439pub enum ClusterMembersState {
440    /// Not yet instantiated.
441    Uninit,
442    /// The primary instance: holds the stream expression and will emit
443    /// `source_stream(expr) -> tee()` during code-gen.
444    Stream(DebugExpr),
445    /// A secondary instance that references the tee output of the primary.
446    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
447    /// can derive the deterministic tee ident without extra state.
448    Tee(LocationId, LocationId),
449}
450
451/// A source in a Hydro graph, where data enters the graph.
452#[derive(Debug, Hash, Clone, serde::Serialize)]
453pub enum HydroSource {
454    Stream(DebugExpr),
455    ExternalNetwork(),
456    Iter(DebugExpr),
457    Spin(),
458    ClusterMembers(LocationId, ClusterMembersState),
459    Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
460    EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
461}
462
463#[cfg(feature = "build")]
464/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
465/// and simulations.
466///
467/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
468/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
469pub trait DfirBuilder {
470    /// Whether the representation of singletons should include intermediate states.
471    fn singleton_intermediates(&self) -> bool;
472
473    /// Gets the DFIR builder for the given location, creating it if necessary.
474    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
475
476    #[expect(clippy::too_many_arguments, reason = "TODO")]
477    fn batch(
478        &mut self,
479        in_ident: syn::Ident,
480        in_location: &LocationId,
481        in_kind: &CollectionKind,
482        out_ident: &syn::Ident,
483        out_location: &LocationId,
484        op_meta: &HydroIrOpMetadata,
485        fold_hooked_idents: &HashSet<String>,
486    );
487    fn yield_from_tick(
488        &mut self,
489        in_ident: syn::Ident,
490        in_location: &LocationId,
491        in_kind: &CollectionKind,
492        out_ident: &syn::Ident,
493        out_location: &LocationId,
494    );
495
496    fn begin_atomic(
497        &mut self,
498        in_ident: syn::Ident,
499        in_location: &LocationId,
500        in_kind: &CollectionKind,
501        out_ident: &syn::Ident,
502        out_location: &LocationId,
503        op_meta: &HydroIrOpMetadata,
504    );
505    fn end_atomic(
506        &mut self,
507        in_ident: syn::Ident,
508        in_location: &LocationId,
509        in_kind: &CollectionKind,
510        out_ident: &syn::Ident,
511    );
512
513    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
514    fn observe_nondet(
515        &mut self,
516        trusted: bool,
517        location: &LocationId,
518        in_ident: syn::Ident,
519        in_kind: &CollectionKind,
520        out_ident: &syn::Ident,
521        out_kind: &CollectionKind,
522        op_meta: &HydroIrOpMetadata,
523    );
524
525    #[expect(clippy::too_many_arguments, reason = "TODO")]
526    fn merge_ordered(
527        &mut self,
528        location: &LocationId,
529        first_ident: syn::Ident,
530        second_ident: syn::Ident,
531        out_ident: &syn::Ident,
532        in_kind: &CollectionKind,
533        op_meta: &HydroIrOpMetadata,
534        operator_tag: Option<&str>,
535    );
536
537    #[expect(clippy::too_many_arguments, reason = "TODO")]
538    fn create_network(
539        &mut self,
540        from: &LocationId,
541        to: &LocationId,
542        input_ident: syn::Ident,
543        out_ident: &syn::Ident,
544        serialize: Option<&DebugExpr>,
545        sink: syn::Expr,
546        source: syn::Expr,
547        deserialize: Option<&DebugExpr>,
548        tag_id: StmtId,
549        networking_info: &crate::networking::NetworkingInfo,
550    );
551
552    fn create_external_source(
553        &mut self,
554        on: &LocationId,
555        source_expr: syn::Expr,
556        out_ident: &syn::Ident,
557        deserialize: Option<&DebugExpr>,
558        tag_id: StmtId,
559    );
560
561    fn create_external_output(
562        &mut self,
563        on: &LocationId,
564        sink_expr: syn::Expr,
565        input_ident: &syn::Ident,
566        serialize: Option<&DebugExpr>,
567        tag_id: StmtId,
568    );
569
570    /// Optionally emit a fold hook that buffers and permutes inputs before the fold.
571    /// Returns the new input ident to use for the fold if a hook was emitted.
572    fn emit_fold_hook(
573        &mut self,
574        location: &LocationId,
575        in_ident: &syn::Ident,
576        in_kind: &CollectionKind,
577        op_meta: &HydroIrOpMetadata,
578    ) -> Option<syn::Ident>;
579
580    /// Inserts necessary code to validate a manual assertion that at this point the
581    /// input live collection is consistent. In production, this is a no-op, but in simulation
582    /// this will (not yet implemented) inject assertions that validate consistency.
583    fn assert_is_consistent(
584        &mut self,
585        trusted: bool,
586        location: &LocationId,
587        in_ident: syn::Ident,
588        out_ident: &syn::Ident,
589    );
590
591    /// Observes non-determinism introduced by a mut closure operating on a non-strict
592    /// (unordered / at-least-once) input. In production this is identity; in simulation
593    /// it delegates to `observe_nondet` with the strict output kind.
594    fn observe_for_mut(
595        &mut self,
596        location: &LocationId,
597        in_ident: syn::Ident,
598        in_kind: &CollectionKind,
599        out_ident: &syn::Ident,
600        op_meta: &HydroIrOpMetadata,
601    );
602}
603
604#[cfg(feature = "build")]
605impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
606    fn singleton_intermediates(&self) -> bool {
607        false
608    }
609
610    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
611        self.entry(location.root().key())
612            .expect("location was removed")
613            .or_default()
614    }
615
616    fn batch(
617        &mut self,
618        in_ident: syn::Ident,
619        in_location: &LocationId,
620        in_kind: &CollectionKind,
621        out_ident: &syn::Ident,
622        _out_location: &LocationId,
623        _op_meta: &HydroIrOpMetadata,
624        _fold_hooked_idents: &HashSet<String>,
625    ) {
626        let builder = self.get_dfir_mut(in_location.root());
627        if in_kind.is_bounded()
628            && matches!(
629                in_kind,
630                CollectionKind::Singleton { .. }
631                    | CollectionKind::Optional { .. }
632                    | CollectionKind::KeyedSingleton { .. }
633            )
634        {
635            assert!(in_location.is_top_level());
636            builder.add_dfir(
637                parse_quote! {
638                    #out_ident = #in_ident -> persist::<'static>();
639                },
640                None,
641                None,
642            );
643        } else {
644            builder.add_dfir(
645                parse_quote! {
646                    #out_ident = #in_ident;
647                },
648                None,
649                None,
650            );
651        }
652    }
653
654    fn yield_from_tick(
655        &mut self,
656        in_ident: syn::Ident,
657        in_location: &LocationId,
658        _in_kind: &CollectionKind,
659        out_ident: &syn::Ident,
660        _out_location: &LocationId,
661    ) {
662        let builder = self.get_dfir_mut(in_location.root());
663        builder.add_dfir(
664            parse_quote! {
665                #out_ident = #in_ident;
666            },
667            None,
668            None,
669        );
670    }
671
672    fn begin_atomic(
673        &mut self,
674        in_ident: syn::Ident,
675        in_location: &LocationId,
676        _in_kind: &CollectionKind,
677        out_ident: &syn::Ident,
678        _out_location: &LocationId,
679        _op_meta: &HydroIrOpMetadata,
680    ) {
681        let builder = self.get_dfir_mut(in_location.root());
682        builder.add_dfir(
683            parse_quote! {
684                #out_ident = #in_ident;
685            },
686            None,
687            None,
688        );
689    }
690
691    fn end_atomic(
692        &mut self,
693        in_ident: syn::Ident,
694        in_location: &LocationId,
695        _in_kind: &CollectionKind,
696        out_ident: &syn::Ident,
697    ) {
698        let builder = self.get_dfir_mut(in_location.root());
699        builder.add_dfir(
700            parse_quote! {
701                #out_ident = #in_ident;
702            },
703            None,
704            None,
705        );
706    }
707
708    fn observe_nondet(
709        &mut self,
710        _trusted: bool,
711        location: &LocationId,
712        in_ident: syn::Ident,
713        _in_kind: &CollectionKind,
714        out_ident: &syn::Ident,
715        _out_kind: &CollectionKind,
716        _op_meta: &HydroIrOpMetadata,
717    ) {
718        let builder = self.get_dfir_mut(location);
719        builder.add_dfir(
720            parse_quote! {
721                #out_ident = #in_ident;
722            },
723            None,
724            None,
725        );
726    }
727
728    fn merge_ordered(
729        &mut self,
730        location: &LocationId,
731        first_ident: syn::Ident,
732        second_ident: syn::Ident,
733        out_ident: &syn::Ident,
734        _in_kind: &CollectionKind,
735        _op_meta: &HydroIrOpMetadata,
736        operator_tag: Option<&str>,
737    ) {
738        let builder = self.get_dfir_mut(location);
739        builder.add_dfir(
740            parse_quote! {
741                #out_ident = union();
742                #first_ident -> [0]#out_ident;
743                #second_ident -> [1]#out_ident;
744            },
745            None,
746            operator_tag,
747        );
748    }
749
750    fn create_network(
751        &mut self,
752        from: &LocationId,
753        to: &LocationId,
754        input_ident: syn::Ident,
755        out_ident: &syn::Ident,
756        serialize: Option<&DebugExpr>,
757        sink: syn::Expr,
758        source: syn::Expr,
759        deserialize: Option<&DebugExpr>,
760        tag_id: StmtId,
761        _networking_info: &crate::networking::NetworkingInfo,
762    ) {
763        let sender_builder = self.get_dfir_mut(from);
764        if let Some(serialize_pipeline) = serialize {
765            sender_builder.add_dfir(
766                parse_quote! {
767                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
768                },
769                None,
770                // operator tag separates send and receive, which otherwise have the same next_stmt_id
771                Some(&format!("send{}", tag_id)),
772            );
773        } else {
774            sender_builder.add_dfir(
775                parse_quote! {
776                    #input_ident -> dest_sink(#sink);
777                },
778                None,
779                Some(&format!("send{}", tag_id)),
780            );
781        }
782
783        let receiver_builder = self.get_dfir_mut(to);
784        if let Some(deserialize_pipeline) = deserialize {
785            receiver_builder.add_dfir(
786                parse_quote! {
787                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
788                },
789                None,
790                Some(&format!("recv{}", tag_id)),
791            );
792        } else {
793            receiver_builder.add_dfir(
794                parse_quote! {
795                    #out_ident = source_stream(#source);
796                },
797                None,
798                Some(&format!("recv{}", tag_id)),
799            );
800        }
801    }
802
803    fn create_external_source(
804        &mut self,
805        on: &LocationId,
806        source_expr: syn::Expr,
807        out_ident: &syn::Ident,
808        deserialize: Option<&DebugExpr>,
809        tag_id: StmtId,
810    ) {
811        let receiver_builder = self.get_dfir_mut(on);
812        if let Some(deserialize_pipeline) = deserialize {
813            receiver_builder.add_dfir(
814                parse_quote! {
815                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
816                },
817                None,
818                Some(&format!("recv{}", tag_id)),
819            );
820        } else {
821            receiver_builder.add_dfir(
822                parse_quote! {
823                    #out_ident = source_stream(#source_expr);
824                },
825                None,
826                Some(&format!("recv{}", tag_id)),
827            );
828        }
829    }
830
831    fn create_external_output(
832        &mut self,
833        on: &LocationId,
834        sink_expr: syn::Expr,
835        input_ident: &syn::Ident,
836        serialize: Option<&DebugExpr>,
837        tag_id: StmtId,
838    ) {
839        let sender_builder = self.get_dfir_mut(on);
840        if let Some(serialize_fn) = serialize {
841            sender_builder.add_dfir(
842                parse_quote! {
843                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
844                },
845                None,
846                // operator tag separates send and receive, which otherwise have the same next_stmt_id
847                Some(&format!("send{}", tag_id)),
848            );
849        } else {
850            sender_builder.add_dfir(
851                parse_quote! {
852                    #input_ident -> dest_sink(#sink_expr);
853                },
854                None,
855                Some(&format!("send{}", tag_id)),
856            );
857        }
858    }
859
860    fn emit_fold_hook(
861        &mut self,
862        _location: &LocationId,
863        _in_ident: &syn::Ident,
864        _in_kind: &CollectionKind,
865        _op_meta: &HydroIrOpMetadata,
866    ) -> Option<syn::Ident> {
867        None
868    }
869
870    fn assert_is_consistent(
871        &mut self,
872        _trusted: bool,
873        location: &LocationId,
874        in_ident: syn::Ident,
875        out_ident: &syn::Ident,
876    ) {
877        let builder = self.get_dfir_mut(location);
878        builder.add_dfir(
879            parse_quote! {
880                #out_ident = #in_ident;
881            },
882            None,
883            None,
884        );
885    }
886
887    fn observe_for_mut(
888        &mut self,
889        location: &LocationId,
890        in_ident: syn::Ident,
891        _in_kind: &CollectionKind,
892        out_ident: &syn::Ident,
893        _op_meta: &HydroIrOpMetadata,
894    ) {
895        let builder = self.get_dfir_mut(location);
896        builder.add_dfir(
897            parse_quote! {
898                #out_ident = #in_ident;
899            },
900            None,
901            None,
902        );
903    }
904}
905
906#[cfg(feature = "build")]
907pub enum BuildersOrCallback<'a, L, N>
908where
909    L: FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
910    N: FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
911{
912    Builders(&'a mut dyn DfirBuilder),
913    Callback(L, N),
914}
915
916/// An root in a Hydro graph, which is an pipeline that doesn't emit
917/// any downstream values. Traversals over the dataflow graph and
918/// generating DFIR IR start from roots.
919#[derive(Debug, Hash, serde::Serialize)]
920pub enum HydroRoot {
921    ForEach {
922        f: ClosureExpr,
923        input: Box<HydroNode>,
924        op_metadata: HydroIrOpMetadata,
925    },
926    SendExternal {
927        to_external_key: LocationKey,
928        to_port_id: ExternalPortId,
929        to_many: bool,
930        unpaired: bool,
931        serialize_fn: Option<DebugExpr>,
932        instantiate_fn: DebugInstantiate,
933        input: Box<HydroNode>,
934        op_metadata: HydroIrOpMetadata,
935    },
936    DestSink {
937        sink: DebugExpr,
938        input: Box<HydroNode>,
939        op_metadata: HydroIrOpMetadata,
940    },
941    CycleSink {
942        cycle_id: CycleId,
943        input: Box<HydroNode>,
944        op_metadata: HydroIrOpMetadata,
945    },
946    EmbeddedOutput {
947        #[serde(serialize_with = "serialize_ident")]
948        ident: syn::Ident,
949        input: Box<HydroNode>,
950        op_metadata: HydroIrOpMetadata,
951    },
952    Null {
953        input: Box<HydroNode>,
954        op_metadata: HydroIrOpMetadata,
955    },
956}
957
958impl HydroRoot {
959    #[cfg(feature = "build")]
960    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
961    pub fn compile_network<'a, D>(
962        &mut self,
963        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
964        seen_tees: &mut SeenSharedNodes,
965        seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
966        processes: &SparseSecondaryMap<LocationKey, D::Process>,
967        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
968        externals: &SparseSecondaryMap<LocationKey, D::External>,
969        env: &mut D::InstantiateEnv,
970    ) where
971        D: Deploy<'a>,
972    {
973        let refcell_extra_stmts = RefCell::new(extra_stmts);
974        let refcell_env = RefCell::new(env);
975        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
976        self.transform_bottom_up(
977            &mut |l| {
978                if let HydroRoot::SendExternal {
979                    #[cfg(feature = "tokio")]
980                    input,
981                    #[cfg(feature = "tokio")]
982                    to_external_key,
983                    #[cfg(feature = "tokio")]
984                    to_port_id,
985                    #[cfg(feature = "tokio")]
986                    to_many,
987                    #[cfg(feature = "tokio")]
988                    unpaired,
989                    #[cfg(feature = "tokio")]
990                    instantiate_fn,
991                    ..
992                } = l
993                {
994                    #[cfg(feature = "tokio")]
995                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
996                        DebugInstantiate::Building => {
997                            let to_node = externals
998                                .get(*to_external_key)
999                                .unwrap_or_else(|| {
1000                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
1001                                })
1002                                .clone();
1003
1004                            match input.metadata().location_id.root() {
1005                                &LocationId::Process(process_key) => {
1006                                    if *to_many {
1007                                        (
1008                                            (
1009                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1010                                                parse_quote!(DUMMY),
1011                                            ),
1012                                            Box::new(|| {}) as Box<dyn FnOnce()>,
1013                                        )
1014                                    } else {
1015                                        let from_node = processes
1016                                            .get(process_key)
1017                                            .unwrap_or_else(|| {
1018                                                panic!("A process used in the graph was not instantiated: {}", process_key)
1019                                            })
1020                                            .clone();
1021
1022                                        let sink_port = from_node.next_port();
1023                                        let source_port = to_node.next_port();
1024
1025                                        if *unpaired {
1026                                            use stageleft::quote_type;
1027                                            use tokio_util::codec::LengthDelimitedCodec;
1028
1029                                            to_node.register(*to_port_id, source_port.clone());
1030
1031                                            let _ = D::e2o_source(
1032                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1033                                                &to_node, &source_port,
1034                                                &from_node, &sink_port,
1035                                                &quote_type::<LengthDelimitedCodec>(),
1036                                                format!("{}_{}", *to_external_key, *to_port_id)
1037                                            );
1038                                        }
1039
1040                                        (
1041                                            (
1042                                                D::o2e_sink(
1043                                                    &from_node,
1044                                                    &sink_port,
1045                                                    &to_node,
1046                                                    &source_port,
1047                                                    format!("{}_{}", *to_external_key, *to_port_id)
1048                                                ),
1049                                                parse_quote!(DUMMY),
1050                                            ),
1051                                            if *unpaired {
1052                                                D::e2o_connect(
1053                                                    &to_node,
1054                                                    &source_port,
1055                                                    &from_node,
1056                                                    &sink_port,
1057                                                    *to_many,
1058                                                    NetworkHint::Auto,
1059                                                )
1060                                            } else {
1061                                                Box::new(|| {}) as Box<dyn FnOnce()>
1062                                            },
1063                                        )
1064                                    }
1065                                }
1066                                LocationId::Cluster(cluster_key) => {
1067                                    let from_node = clusters
1068                                        .get(*cluster_key)
1069                                        .unwrap_or_else(|| {
1070                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1071                                        })
1072                                        .clone();
1073
1074                                    let sink_port = from_node.next_port();
1075                                    let source_port = to_node.next_port();
1076
1077                                    if *unpaired {
1078                                        to_node.register(*to_port_id, source_port.clone());
1079                                    }
1080
1081                                    (
1082                                        (
1083                                            D::m2e_sink(
1084                                                &from_node,
1085                                                &sink_port,
1086                                                &to_node,
1087                                                &source_port,
1088                                                format!("{}_{}", *to_external_key, *to_port_id)
1089                                            ),
1090                                            parse_quote!(DUMMY),
1091                                        ),
1092                                        Box::new(|| {}) as Box<dyn FnOnce()>,
1093                                    )
1094                                }
1095                                _ => panic!()
1096                            }
1097                        },
1098
1099                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1100                    };
1101
1102                    #[cfg(not(feature = "tokio"))]
1103                    {
1104                        panic!("Cannot instantiate external inputs without tokio");
1105                    };
1106
1107                    #[cfg(feature = "tokio")]
1108                    {
1109                        *instantiate_fn = DebugInstantiateFinalized {
1110                            sink: sink_expr,
1111                            source: source_expr,
1112                            connect_fn: Some(connect_fn),
1113                        }
1114                        .into();
1115                    };
1116                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1117                    let element_type = match &input.metadata().collection_kind {
1118                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1119                        _ => panic!("Embedded output must have Stream collection kind"),
1120                    };
1121                    let location_key = match input.metadata().location_id.root() {
1122                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1123                        _ => panic!("Embedded output must be on a process or cluster"),
1124                    };
1125                    D::register_embedded_output(
1126                        &mut refcell_env.borrow_mut(),
1127                        location_key,
1128                        ident,
1129                        &element_type,
1130                    );
1131                }
1132            },
1133            &mut |n| {
1134                if let HydroNode::Network {
1135                    name,
1136                    networking_info,
1137                    input,
1138                    instantiate_fn,
1139                    metadata,
1140                    ..
1141                } = n
1142                {
1143                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1144                        DebugInstantiate::Building => instantiate_network::<D>(
1145                            &mut refcell_env.borrow_mut(),
1146                            input.metadata().location_id.root(),
1147                            metadata.location_id.root(),
1148                            processes,
1149                            clusters,
1150                            name.as_deref(),
1151                            networking_info,
1152                        ),
1153
1154                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1155                    };
1156
1157                    *instantiate_fn = DebugInstantiateFinalized {
1158                        sink: sink_expr,
1159                        source: source_expr,
1160                        connect_fn: Some(connect_fn),
1161                    }
1162                    .into();
1163                } else if let HydroNode::ExternalInput {
1164                    from_external_key,
1165                    from_port_id,
1166                    from_many,
1167                    codec_type,
1168                    port_hint,
1169                    instantiate_fn,
1170                    metadata,
1171                    ..
1172                } = n
1173                {
1174                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1175                        DebugInstantiate::Building => {
1176                            let from_node = externals
1177                                .get(*from_external_key)
1178                                .unwrap_or_else(|| {
1179                                    panic!(
1180                                        "A external used in the graph was not instantiated: {}",
1181                                        from_external_key,
1182                                    )
1183                                })
1184                                .clone();
1185
1186                            match metadata.location_id.root() {
1187                                &LocationId::Process(process_key) => {
1188                                    let to_node = processes
1189                                        .get(process_key)
1190                                        .unwrap_or_else(|| {
1191                                            panic!("A process used in the graph was not instantiated: {}", process_key)
1192                                        })
1193                                        .clone();
1194
1195                                    let sink_port = from_node.next_port();
1196                                    let source_port = to_node.next_port();
1197
1198                                    from_node.register(*from_port_id, sink_port.clone());
1199
1200                                    (
1201                                        (
1202                                            parse_quote!(DUMMY),
1203                                            if *from_many {
1204                                                D::e2o_many_source(
1205                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1206                                                    &to_node, &source_port,
1207                                                    codec_type.0.as_ref(),
1208                                                    format!("{}_{}", *from_external_key, *from_port_id)
1209                                                )
1210                                            } else {
1211                                                D::e2o_source(
1212                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1213                                                    &from_node, &sink_port,
1214                                                    &to_node, &source_port,
1215                                                    codec_type.0.as_ref(),
1216                                                    format!("{}_{}", *from_external_key, *from_port_id)
1217                                                )
1218                                            },
1219                                        ),
1220                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1221                                    )
1222                                }
1223                                LocationId::Cluster(cluster_key) => {
1224                                    let to_node = clusters
1225                                        .get(*cluster_key)
1226                                        .unwrap_or_else(|| {
1227                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1228                                        })
1229                                        .clone();
1230
1231                                    let sink_port = from_node.next_port();
1232                                    let source_port = to_node.next_port();
1233
1234                                    from_node.register(*from_port_id, sink_port.clone());
1235
1236                                    (
1237                                        (
1238                                            parse_quote!(DUMMY),
1239                                            D::e2m_source(
1240                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1241                                                &from_node, &sink_port,
1242                                                &to_node, &source_port,
1243                                                codec_type.0.as_ref(),
1244                                                format!("{}_{}", *from_external_key, *from_port_id)
1245                                            ),
1246                                        ),
1247                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1248                                    )
1249                                }
1250                                _ => panic!()
1251                            }
1252                        },
1253
1254                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1255                    };
1256
1257                    *instantiate_fn = DebugInstantiateFinalized {
1258                        sink: sink_expr,
1259                        source: source_expr,
1260                        connect_fn: Some(connect_fn),
1261                    }
1262                    .into();
1263                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1264                    let element_type = match &metadata.collection_kind {
1265                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1266                        _ => panic!("Embedded source must have Stream collection kind"),
1267                    };
1268                    let location_key = match metadata.location_id.root() {
1269                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1270                        _ => panic!("Embedded source must be on a process or cluster"),
1271                    };
1272                    D::register_embedded_stream_input(
1273                        &mut refcell_env.borrow_mut(),
1274                        location_key,
1275                        ident,
1276                        &element_type,
1277                    );
1278                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1279                    let element_type = match &metadata.collection_kind {
1280                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1281                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1282                    };
1283                    let location_key = match metadata.location_id.root() {
1284                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1285                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1286                    };
1287                    D::register_embedded_singleton_input(
1288                        &mut refcell_env.borrow_mut(),
1289                        location_key,
1290                        ident,
1291                        &element_type,
1292                    );
1293                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1294                    match state {
1295                        ClusterMembersState::Uninit => {
1296                            let at_location = metadata.location_id.root().clone();
1297                            let key = (at_location.clone(), location_id.key());
1298                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1299                                // First occurrence: call cluster_membership_stream and mark as Stream.
1300                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1301                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1302                                    &(),
1303                                );
1304                                *state = ClusterMembersState::Stream(expr.into());
1305                            } else {
1306                                // Already instantiated for this (at, target) pair: just tee.
1307                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1308                            }
1309                        }
1310                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1311                            panic!("cluster members already finalized");
1312                        }
1313                    }
1314                }
1315            },
1316            seen_tees,
1317            false,
1318        );
1319    }
1320
1321    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1322        self.transform_bottom_up(
1323            &mut |l| {
1324                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1325                    match instantiate_fn {
1326                        DebugInstantiate::Building => panic!("network not built"),
1327
1328                        DebugInstantiate::Finalized(finalized) => {
1329                            (finalized.connect_fn.take().unwrap())();
1330                        }
1331                    }
1332                }
1333            },
1334            &mut |n| {
1335                if let HydroNode::Network { instantiate_fn, .. }
1336                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1337                {
1338                    match instantiate_fn {
1339                        DebugInstantiate::Building => panic!("network not built"),
1340
1341                        DebugInstantiate::Finalized(finalized) => {
1342                            (finalized.connect_fn.take().unwrap())();
1343                        }
1344                    }
1345                }
1346            },
1347            seen_tees,
1348            false,
1349        );
1350    }
1351
1352    pub fn transform_bottom_up(
1353        &mut self,
1354        transform_root: &mut impl FnMut(&mut HydroRoot),
1355        transform_node: &mut impl FnMut(&mut HydroNode),
1356        seen_tees: &mut SeenSharedNodes,
1357        check_well_formed: bool,
1358    ) {
1359        self.transform_children(
1360            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1361            seen_tees,
1362        );
1363
1364        transform_root(self);
1365    }
1366
1367    pub fn transform_children(
1368        &mut self,
1369        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1370        seen_tees: &mut SeenSharedNodes,
1371    ) {
1372        match self {
1373            HydroRoot::ForEach { f, input, .. } => {
1374                f.transform_children(&mut transform, seen_tees);
1375                transform(input, seen_tees);
1376            }
1377            HydroRoot::SendExternal { input, .. }
1378            | HydroRoot::DestSink { input, .. }
1379            | HydroRoot::CycleSink { input, .. }
1380            | HydroRoot::EmbeddedOutput { input, .. }
1381            | HydroRoot::Null { input, .. } => {
1382                transform(input, seen_tees);
1383            }
1384        }
1385    }
1386
1387    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1388        match self {
1389            HydroRoot::ForEach {
1390                f,
1391                input,
1392                op_metadata,
1393            } => HydroRoot::ForEach {
1394                f: f.deep_clone(seen_tees),
1395                input: Box::new(input.deep_clone(seen_tees)),
1396                op_metadata: op_metadata.clone(),
1397            },
1398            HydroRoot::SendExternal {
1399                to_external_key,
1400                to_port_id,
1401                to_many,
1402                unpaired,
1403                serialize_fn,
1404                instantiate_fn,
1405                input,
1406                op_metadata,
1407            } => HydroRoot::SendExternal {
1408                to_external_key: *to_external_key,
1409                to_port_id: *to_port_id,
1410                to_many: *to_many,
1411                unpaired: *unpaired,
1412                serialize_fn: serialize_fn.clone(),
1413                instantiate_fn: instantiate_fn.clone(),
1414                input: Box::new(input.deep_clone(seen_tees)),
1415                op_metadata: op_metadata.clone(),
1416            },
1417            HydroRoot::DestSink {
1418                sink,
1419                input,
1420                op_metadata,
1421            } => HydroRoot::DestSink {
1422                sink: sink.clone(),
1423                input: Box::new(input.deep_clone(seen_tees)),
1424                op_metadata: op_metadata.clone(),
1425            },
1426            HydroRoot::CycleSink {
1427                cycle_id,
1428                input,
1429                op_metadata,
1430            } => HydroRoot::CycleSink {
1431                cycle_id: *cycle_id,
1432                input: Box::new(input.deep_clone(seen_tees)),
1433                op_metadata: op_metadata.clone(),
1434            },
1435            HydroRoot::EmbeddedOutput {
1436                ident,
1437                input,
1438                op_metadata,
1439            } => HydroRoot::EmbeddedOutput {
1440                ident: ident.clone(),
1441                input: Box::new(input.deep_clone(seen_tees)),
1442                op_metadata: op_metadata.clone(),
1443            },
1444            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1445                input: Box::new(input.deep_clone(seen_tees)),
1446                op_metadata: op_metadata.clone(),
1447            },
1448        }
1449    }
1450
1451    #[cfg(feature = "build")]
1452    pub fn emit(
1453        &mut self,
1454        graph_builders: &mut dyn DfirBuilder,
1455        seen_tees: &mut SeenSharedNodes,
1456        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1457        next_stmt_id: &mut crate::Counter<StmtId>,
1458        fold_hooked_idents: &mut HashSet<String>,
1459    ) {
1460        self.emit_core(
1461            &mut BuildersOrCallback::<
1462                fn(&mut HydroRoot, &mut crate::Counter<StmtId>),
1463                fn(&mut HydroNode, &mut crate::Counter<StmtId>),
1464            >::Builders(graph_builders),
1465            seen_tees,
1466            built_tees,
1467            next_stmt_id,
1468            fold_hooked_idents,
1469        );
1470    }
1471
1472    #[cfg(feature = "build")]
1473    pub fn emit_core(
1474        &mut self,
1475        builders_or_callback: &mut BuildersOrCallback<
1476            impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1477            impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1478        >,
1479        seen_tees: &mut SeenSharedNodes,
1480        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1481        next_stmt_id: &mut crate::Counter<StmtId>,
1482        fold_hooked_idents: &mut HashSet<String>,
1483    ) {
1484        match self {
1485            HydroRoot::ForEach { f, input, .. } => {
1486                let input_ident = input.emit_core(
1487                    builders_or_callback,
1488                    seen_tees,
1489                    built_tees,
1490                    next_stmt_id,
1491                    fold_hooked_idents,
1492                );
1493
1494                let input_ident = maybe_observe_for_mut(
1495                    f,
1496                    input_ident,
1497                    &input.metadata().location_id,
1498                    &input.metadata().collection_kind,
1499                    &input.metadata().op,
1500                    builders_or_callback,
1501                    next_stmt_id,
1502                );
1503
1504                let stmt_id = next_stmt_id.get_and_increment();
1505
1506                match builders_or_callback {
1507                    BuildersOrCallback::Builders(graph_builders) => {
1508                        let mut ident_stack: Vec<syn::Ident> = Vec::new();
1509
1510                        // Look up each captured ref's ident from built_tees
1511                        for (ref_node, _is_mut) in f.singleton_refs.iter() {
1512                            let HydroNode::Reference { inner, .. } = ref_node else {
1513                                panic!("singleton_refs should only contain HydroNode::Reference");
1514                            };
1515                            let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
1516                            let idents = built_tees.get(&ptr).expect(
1517                                "ForEach singleton ref not found in built_tees — ref node was not emitted",
1518                            );
1519                            ident_stack.push(idents[0].clone());
1520                        }
1521
1522                        let f_tokens = f.emit_tokens(&mut ident_stack);
1523
1524                        graph_builders
1525                            .get_dfir_mut(&input.metadata().location_id)
1526                            .add_dfir(
1527                                parse_quote! {
1528                                    #input_ident -> for_each(#f_tokens);
1529                                },
1530                                None,
1531                                Some(&stmt_id.to_string()),
1532                            );
1533                    }
1534                    BuildersOrCallback::Callback(leaf_callback, _) => {
1535                        leaf_callback(self, next_stmt_id);
1536                    }
1537                }
1538            }
1539
1540            HydroRoot::SendExternal {
1541                serialize_fn,
1542                instantiate_fn,
1543                input,
1544                ..
1545            } => {
1546                let input_ident = input.emit_core(
1547                    builders_or_callback,
1548                    seen_tees,
1549                    built_tees,
1550                    next_stmt_id,
1551                    fold_hooked_idents,
1552                );
1553
1554                let stmt_id = next_stmt_id.get_and_increment();
1555
1556                match builders_or_callback {
1557                    BuildersOrCallback::Builders(graph_builders) => {
1558                        let (sink_expr, _) = match instantiate_fn {
1559                            DebugInstantiate::Building => (
1560                                syn::parse_quote!(DUMMY_SINK),
1561                                syn::parse_quote!(DUMMY_SOURCE),
1562                            ),
1563
1564                            DebugInstantiate::Finalized(finalized) => {
1565                                (finalized.sink.clone(), finalized.source.clone())
1566                            }
1567                        };
1568
1569                        graph_builders.create_external_output(
1570                            &input.metadata().location_id,
1571                            sink_expr,
1572                            &input_ident,
1573                            serialize_fn.as_ref(),
1574                            stmt_id,
1575                        );
1576                    }
1577                    BuildersOrCallback::Callback(leaf_callback, _) => {
1578                        leaf_callback(self, next_stmt_id);
1579                    }
1580                }
1581            }
1582
1583            HydroRoot::DestSink { sink, input, .. } => {
1584                let input_ident = input.emit_core(
1585                    builders_or_callback,
1586                    seen_tees,
1587                    built_tees,
1588                    next_stmt_id,
1589                    fold_hooked_idents,
1590                );
1591
1592                let stmt_id = next_stmt_id.get_and_increment();
1593
1594                match builders_or_callback {
1595                    BuildersOrCallback::Builders(graph_builders) => {
1596                        graph_builders
1597                            .get_dfir_mut(&input.metadata().location_id)
1598                            .add_dfir(
1599                                parse_quote! {
1600                                    #input_ident -> dest_sink(#sink);
1601                                },
1602                                None,
1603                                Some(&stmt_id.to_string()),
1604                            );
1605                    }
1606                    BuildersOrCallback::Callback(leaf_callback, _) => {
1607                        leaf_callback(self, next_stmt_id);
1608                    }
1609                }
1610            }
1611
1612            HydroRoot::CycleSink {
1613                cycle_id, input, ..
1614            } => {
1615                let input_ident = input.emit_core(
1616                    builders_or_callback,
1617                    seen_tees,
1618                    built_tees,
1619                    next_stmt_id,
1620                    fold_hooked_idents,
1621                );
1622
1623                match builders_or_callback {
1624                    BuildersOrCallback::Builders(graph_builders) => {
1625                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1626                            CollectionKind::KeyedSingleton {
1627                                key_type,
1628                                value_type,
1629                                ..
1630                            }
1631                            | CollectionKind::KeyedStream {
1632                                key_type,
1633                                value_type,
1634                                ..
1635                            } => {
1636                                parse_quote!((#key_type, #value_type))
1637                            }
1638                            CollectionKind::Stream { element_type, .. }
1639                            | CollectionKind::Singleton { element_type, .. }
1640                            | CollectionKind::Optional { element_type, .. } => {
1641                                parse_quote!(#element_type)
1642                            }
1643                        };
1644
1645                        let cycle_id_ident = cycle_id.as_ident();
1646                        graph_builders
1647                            .get_dfir_mut(&input.metadata().location_id)
1648                            .add_dfir(
1649                                parse_quote! {
1650                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1651                                },
1652                                None,
1653                                None,
1654                            );
1655                    }
1656                    // No ID, no callback
1657                    BuildersOrCallback::Callback(_, _) => {}
1658                }
1659            }
1660
1661            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1662                let input_ident = input.emit_core(
1663                    builders_or_callback,
1664                    seen_tees,
1665                    built_tees,
1666                    next_stmt_id,
1667                    fold_hooked_idents,
1668                );
1669
1670                let stmt_id = next_stmt_id.get_and_increment();
1671
1672                match builders_or_callback {
1673                    BuildersOrCallback::Builders(graph_builders) => {
1674                        graph_builders
1675                            .get_dfir_mut(&input.metadata().location_id)
1676                            .add_dfir(
1677                                parse_quote! {
1678                                    #input_ident -> for_each(&mut #ident);
1679                                },
1680                                None,
1681                                Some(&stmt_id.to_string()),
1682                            );
1683                    }
1684                    BuildersOrCallback::Callback(leaf_callback, _) => {
1685                        leaf_callback(self, next_stmt_id);
1686                    }
1687                }
1688            }
1689
1690            HydroRoot::Null { input, .. } => {
1691                let input_ident = input.emit_core(
1692                    builders_or_callback,
1693                    seen_tees,
1694                    built_tees,
1695                    next_stmt_id,
1696                    fold_hooked_idents,
1697                );
1698
1699                let stmt_id = next_stmt_id.get_and_increment();
1700
1701                match builders_or_callback {
1702                    BuildersOrCallback::Builders(graph_builders) => {
1703                        graph_builders
1704                            .get_dfir_mut(&input.metadata().location_id)
1705                            .add_dfir(
1706                                parse_quote! {
1707                                    #input_ident -> for_each(|_| {});
1708                                },
1709                                None,
1710                                Some(&stmt_id.to_string()),
1711                            );
1712                    }
1713                    BuildersOrCallback::Callback(leaf_callback, _) => {
1714                        leaf_callback(self, next_stmt_id);
1715                    }
1716                }
1717            }
1718        }
1719    }
1720
1721    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1722        match self {
1723            HydroRoot::ForEach { op_metadata, .. }
1724            | HydroRoot::SendExternal { op_metadata, .. }
1725            | HydroRoot::DestSink { op_metadata, .. }
1726            | HydroRoot::CycleSink { op_metadata, .. }
1727            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1728            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1729        }
1730    }
1731
1732    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1733        match self {
1734            HydroRoot::ForEach { op_metadata, .. }
1735            | HydroRoot::SendExternal { op_metadata, .. }
1736            | HydroRoot::DestSink { op_metadata, .. }
1737            | HydroRoot::CycleSink { op_metadata, .. }
1738            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1739            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1740        }
1741    }
1742
1743    pub fn input(&self) -> &HydroNode {
1744        match self {
1745            HydroRoot::ForEach { input, .. }
1746            | HydroRoot::SendExternal { input, .. }
1747            | HydroRoot::DestSink { input, .. }
1748            | HydroRoot::CycleSink { input, .. }
1749            | HydroRoot::EmbeddedOutput { input, .. }
1750            | HydroRoot::Null { input, .. } => input,
1751        }
1752    }
1753
1754    pub fn input_metadata(&self) -> &HydroIrMetadata {
1755        self.input().metadata()
1756    }
1757
1758    pub fn print_root(&self) -> String {
1759        match self {
1760            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1761            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1762            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1763            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1764            HydroRoot::EmbeddedOutput { ident, .. } => {
1765                format!("EmbeddedOutput({})", ident)
1766            }
1767            HydroRoot::Null { .. } => "Null".to_owned(),
1768        }
1769    }
1770
1771    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1772        match self {
1773            HydroRoot::ForEach { f, .. } => {
1774                transform(&mut f.expr);
1775            }
1776            HydroRoot::DestSink { sink, .. } => {
1777                transform(sink);
1778            }
1779            HydroRoot::SendExternal { .. }
1780            | HydroRoot::CycleSink { .. }
1781            | HydroRoot::EmbeddedOutput { .. }
1782            | HydroRoot::Null { .. } => {}
1783        }
1784    }
1785}
1786
1787#[cfg(feature = "build")]
1788fn tick_of(loc: &LocationId) -> Option<ClockId> {
1789    match loc {
1790        LocationId::Tick(id, _) => Some(*id),
1791        LocationId::Atomic(inner) => tick_of(inner),
1792        _ => None,
1793    }
1794}
1795
1796#[cfg(feature = "build")]
1797fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1798    match loc {
1799        LocationId::Tick(id, inner) => {
1800            *id = uf_find(uf, *id);
1801            remap_location(inner, uf);
1802        }
1803        LocationId::Atomic(inner) => {
1804            remap_location(inner, uf);
1805        }
1806        LocationId::Process(_) | LocationId::Cluster(_) => {}
1807    }
1808}
1809
1810#[cfg(feature = "build")]
1811fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1812    let p = *parent.get(&x).unwrap_or(&x);
1813    if p == x {
1814        return x;
1815    }
1816    let root = uf_find(parent, p);
1817    parent.insert(x, root);
1818    root
1819}
1820
1821#[cfg(feature = "build")]
1822fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1823    let ra = uf_find(parent, a);
1824    let rb = uf_find(parent, b);
1825    if ra != rb {
1826        parent.insert(ra, rb);
1827    }
1828}
1829
1830/// Traverse the IR to build a union-find that unifies tick IDs connected
1831/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1832/// rewrite all `LocationId`s to use the representative tick ID.
1833#[cfg(feature = "build")]
1834pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1835    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1836
1837    // Pass 1: collect unifications.
1838    transform_bottom_up(
1839        ir,
1840        &mut |_| {},
1841        &mut |node: &mut HydroNode| match node {
1842            HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1843                if let (Some(a), Some(b)) = (
1844                    tick_of(&inner.metadata().location_id),
1845                    tick_of(&metadata.location_id),
1846                ) {
1847                    uf_union(&mut uf, a, b);
1848                }
1849            }
1850            HydroNode::Chain {
1851                first,
1852                second,
1853                metadata,
1854            }
1855            | HydroNode::ChainFirst {
1856                first,
1857                second,
1858                metadata,
1859            }
1860            | HydroNode::MergeOrdered {
1861                first,
1862                second,
1863                metadata,
1864            } => {
1865                if let (Some(a), Some(b)) = (
1866                    tick_of(&first.metadata().location_id),
1867                    tick_of(&metadata.location_id),
1868                ) {
1869                    uf_union(&mut uf, a, b);
1870                }
1871                if let (Some(a), Some(b)) = (
1872                    tick_of(&second.metadata().location_id),
1873                    tick_of(&metadata.location_id),
1874                ) {
1875                    uf_union(&mut uf, a, b);
1876                }
1877            }
1878            _ => {}
1879        },
1880        false,
1881    );
1882
1883    // Pass 2: rewrite all LocationIds.
1884    transform_bottom_up(
1885        ir,
1886        &mut |_| {},
1887        &mut |node: &mut HydroNode| {
1888            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1889        },
1890        false,
1891    );
1892}
1893
1894#[cfg(feature = "build")]
1895pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1896    let mut builders = SecondaryMap::new();
1897    let mut seen_tees = HashMap::new();
1898    let mut built_tees = HashMap::new();
1899    let mut next_stmt_id = crate::Counter::<StmtId>::default();
1900    let mut fold_hooked_idents = HashSet::new();
1901    for leaf in ir {
1902        leaf.emit(
1903            &mut builders,
1904            &mut seen_tees,
1905            &mut built_tees,
1906            &mut next_stmt_id,
1907            &mut fold_hooked_idents,
1908        );
1909    }
1910    builders
1911}
1912
1913#[cfg(feature = "build")]
1914pub fn traverse_dfir(
1915    ir: &mut [HydroRoot],
1916    transform_root: impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1917    transform_node: impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1918) {
1919    let mut seen_tees = HashMap::new();
1920    let mut built_tees = HashMap::new();
1921    let mut next_stmt_id = crate::Counter::<StmtId>::default();
1922    let mut fold_hooked_idents = HashSet::new();
1923    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1924    ir.iter_mut().for_each(|leaf| {
1925        leaf.emit_core(
1926            &mut callback,
1927            &mut seen_tees,
1928            &mut built_tees,
1929            &mut next_stmt_id,
1930            &mut fold_hooked_idents,
1931        );
1932    });
1933}
1934
1935pub fn transform_bottom_up(
1936    ir: &mut [HydroRoot],
1937    transform_root: &mut impl FnMut(&mut HydroRoot),
1938    transform_node: &mut impl FnMut(&mut HydroNode),
1939    check_well_formed: bool,
1940) {
1941    let mut seen_tees = HashMap::new();
1942    ir.iter_mut().for_each(|leaf| {
1943        leaf.transform_bottom_up(
1944            transform_root,
1945            transform_node,
1946            &mut seen_tees,
1947            check_well_formed,
1948        );
1949    });
1950}
1951
1952pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1953    let mut seen_tees = HashMap::new();
1954    ir.iter()
1955        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1956        .collect()
1957}
1958
1959type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1960thread_local! {
1961    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1962    /// Tracks shared nodes already serialized so that `SharedNode::serialize`
1963    /// emits the full subtree only once and uses a `"<shared N>"` back-reference
1964    /// on subsequent encounters, preventing infinite loops.
1965    static SERIALIZED_SHARED: PrintedTees
1966        = const { RefCell::new(None) };
1967}
1968
1969pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1970    PRINTED_TEES.with(|printed_tees| {
1971        let mut printed_tees_mut = printed_tees.borrow_mut();
1972        *printed_tees_mut = Some((0, HashMap::new()));
1973        drop(printed_tees_mut);
1974
1975        let ret = f();
1976
1977        let mut printed_tees_mut = printed_tees.borrow_mut();
1978        *printed_tees_mut = None;
1979
1980        ret
1981    })
1982}
1983
1984/// Runs `f` with a fresh shared-node deduplication scope for serialization.
1985/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
1986/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
1987/// back-reference.  The tracking state is restored when `f` returns or panics.
1988pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1989    let _guard = SerializedSharedGuard::enter();
1990    f()
1991}
1992
1993/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
1994/// making `serialize_dedup_shared` re-entrant and panic-safe.
1995struct SerializedSharedGuard {
1996    previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1997}
1998
1999impl SerializedSharedGuard {
2000    fn enter() -> Self {
2001        let previous = SERIALIZED_SHARED.with(|cell| {
2002            let mut guard = cell.borrow_mut();
2003            guard.replace((0, HashMap::new()))
2004        });
2005        Self { previous }
2006    }
2007}
2008
2009impl Drop for SerializedSharedGuard {
2010    fn drop(&mut self) {
2011        SERIALIZED_SHARED.with(|cell| {
2012            *cell.borrow_mut() = self.previous.take();
2013        });
2014    }
2015}
2016
2017pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
2018
2019impl serde::Serialize for SharedNode {
2020    /// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
2021    /// `Tee` / `Partition`).  A naïve recursive serialization would revisit the
2022    /// same subtree every time and, if the graph ever contains a cycle, loop
2023    /// forever.
2024    ///
2025    /// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
2026    /// integer id.  The first time we see a pointer we assign it the next id and
2027    /// emit the full subtree as `{"$shared": <id>, "node": …}`.  Every later
2028    /// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
2029    /// recursion.  Requires an active `serialize_dedup_shared` scope.
2030    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2031        SERIALIZED_SHARED.with(|cell| {
2032            let mut guard = cell.borrow_mut();
2033            // (next_id, pointer → assigned_id)
2034            let state = guard.as_mut().ok_or_else(|| {
2035                serde::ser::Error::custom(
2036                    "SharedNode serialization requires an active serialize_dedup_shared scope",
2037                )
2038            })?;
2039            let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2040
2041            if let Some(&id) = state.1.get(&ptr) {
2042                drop(guard);
2043                use serde::ser::SerializeMap;
2044                let mut map = serializer.serialize_map(Some(1))?;
2045                map.serialize_entry("$shared_ref", &id)?;
2046                map.end()
2047            } else {
2048                let id = state.0;
2049                state.0 += 1;
2050                state.1.insert(ptr, id);
2051                drop(guard);
2052
2053                use serde::ser::SerializeMap;
2054                let mut map = serializer.serialize_map(Some(2))?;
2055                map.serialize_entry("$shared", &id)?;
2056                map.serialize_entry("node", &*self.0.borrow())?;
2057                map.end()
2058            }
2059        })
2060    }
2061}
2062
2063impl SharedNode {
2064    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2065        Rc::as_ptr(&self.0)
2066    }
2067}
2068
2069impl Debug for SharedNode {
2070    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2071        PRINTED_TEES.with(|printed_tees| {
2072            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2073            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2074
2075            if let Some(printed_tees_mut) = printed_tees_mut {
2076                if let Some(existing) = printed_tees_mut
2077                    .1
2078                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2079                {
2080                    write!(f, "<shared {}>", existing)
2081                } else {
2082                    let next_id = printed_tees_mut.0;
2083                    printed_tees_mut.0 += 1;
2084                    printed_tees_mut
2085                        .1
2086                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2087                    drop(printed_tees_mut_borrow);
2088                    write!(f, "<shared {}>: ", next_id)?;
2089                    Debug::fmt(&self.0.borrow(), f)
2090                }
2091            } else {
2092                drop(printed_tees_mut_borrow);
2093                write!(f, "<shared>: ")?;
2094                Debug::fmt(&self.0.borrow(), f)
2095            }
2096        })
2097    }
2098}
2099
2100impl Hash for SharedNode {
2101    fn hash<H: Hasher>(&self, state: &mut H) {
2102        self.0.borrow_mut().hash(state);
2103    }
2104}
2105
2106/// A counter for tracking singleton access groups on a `HydroNode::Reference`.
2107///
2108/// Each mutable access increments the counter (before and after) to isolate itself in its own group;
2109/// immutable accesses share the current group.
2110#[derive(Debug)]
2111pub enum AccessCounter {
2112    Counting(Cell<u32>),
2113    Frozen(u32),
2114}
2115
2116impl AccessCounter {
2117    pub fn new() -> Self {
2118        Self::Counting(Cell::new(0))
2119    }
2120
2121    /// Assign the next access group for this reference.
2122    /// Mutable accesses get an isolated group (counter increments before and after).
2123    /// Immutable accesses share the current group.
2124    pub fn next_group(&self, is_mut: bool) -> Self {
2125        let AccessCounter::Counting(count) = self else {
2126            panic!("Cannot count on `AccessCounter::Frozen`");
2127        };
2128        let c = if is_mut {
2129            let c = count.get() + 1;
2130            count.set(c + 1);
2131            c
2132        } else {
2133            count.get()
2134        };
2135        Self::Frozen(c)
2136    }
2137
2138    /// Creates a frozen counter to prevent further counting.
2139    pub fn freeze(&self) -> Self {
2140        Self::Frozen(match self {
2141            Self::Counting(count) => count.get(),
2142            Self::Frozen(count) => *count,
2143        })
2144    }
2145
2146    pub fn frozen_group(&self) -> u32 {
2147        let Self::Frozen(count) = self else {
2148            panic!("`AccessCounter` not frozen");
2149        };
2150        *count
2151    }
2152}
2153
2154impl Default for AccessCounter {
2155    fn default() -> Self {
2156        Self::new()
2157    }
2158}
2159
2160impl Hash for AccessCounter {
2161    fn hash<H: Hasher>(&self, _state: &mut H) {
2162        // Access counter does not participate in hashing — it is runtime bookkeeping.
2163    }
2164}
2165
2166impl serde::Serialize for AccessCounter {
2167    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2168        let count = match self {
2169            AccessCounter::Counting(count) => count.get(),
2170            AccessCounter::Frozen(count) => *count,
2171        };
2172        count.serialize(serializer)
2173    }
2174}
2175
2176#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2177pub enum BoundKind {
2178    Unbounded,
2179    Bounded,
2180}
2181
2182#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2183pub enum StreamOrder {
2184    NoOrder,
2185    TotalOrder,
2186}
2187
2188#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2189pub enum StreamRetry {
2190    AtLeastOnce,
2191    ExactlyOnce,
2192}
2193
2194#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2195pub enum KeyedSingletonBoundKind {
2196    Unbounded,
2197    MonotonicKeys,
2198    MonotonicValue,
2199    BoundedValue,
2200    Bounded,
2201}
2202
2203#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2204pub enum SingletonBoundKind {
2205    Unbounded,
2206    Monotonic,
2207    Bounded,
2208}
2209
2210#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2211pub enum CollectionKind {
2212    Stream {
2213        bound: BoundKind,
2214        order: StreamOrder,
2215        retry: StreamRetry,
2216        element_type: DebugType,
2217    },
2218    Singleton {
2219        bound: SingletonBoundKind,
2220        element_type: DebugType,
2221    },
2222    Optional {
2223        bound: BoundKind,
2224        element_type: DebugType,
2225    },
2226    KeyedStream {
2227        bound: BoundKind,
2228        value_order: StreamOrder,
2229        value_retry: StreamRetry,
2230        key_type: DebugType,
2231        value_type: DebugType,
2232    },
2233    KeyedSingleton {
2234        bound: KeyedSingletonBoundKind,
2235        key_type: DebugType,
2236        value_type: DebugType,
2237    },
2238}
2239
2240impl CollectionKind {
2241    pub fn is_bounded(&self) -> bool {
2242        matches!(
2243            self,
2244            CollectionKind::Stream {
2245                bound: BoundKind::Bounded,
2246                ..
2247            } | CollectionKind::Singleton {
2248                bound: SingletonBoundKind::Bounded,
2249                ..
2250            } | CollectionKind::Optional {
2251                bound: BoundKind::Bounded,
2252                ..
2253            } | CollectionKind::KeyedStream {
2254                bound: BoundKind::Bounded,
2255                ..
2256            } | CollectionKind::KeyedSingleton {
2257                bound: KeyedSingletonBoundKind::Bounded,
2258                ..
2259            }
2260        )
2261    }
2262
2263    /// Returns whether this collection kind is already "strict" (TotalOrder + ExactlyOnce),
2264    /// meaning no non-determinism needs to be observed for mut closures.
2265    pub fn is_strict(&self) -> bool {
2266        match self {
2267            CollectionKind::Stream { order, retry, .. } => {
2268                *order == StreamOrder::TotalOrder && *retry == StreamRetry::ExactlyOnce
2269            }
2270            CollectionKind::KeyedStream {
2271                value_order,
2272                value_retry,
2273                ..
2274            } => {
2275                *value_order == StreamOrder::TotalOrder && *value_retry == StreamRetry::ExactlyOnce
2276            }
2277            // Singletons/Optionals/KeyedSingletons do not have observable
2278            // non-determinism other than snapshots / batching
2279            CollectionKind::Singleton { .. }
2280            | CollectionKind::Optional { .. }
2281            | CollectionKind::KeyedSingleton { .. } => true,
2282        }
2283    }
2284
2285    /// Creates a "strict" version of this kind with TotalOrder and ExactlyOnce.
2286    pub fn strict_kind(&self) -> CollectionKind {
2287        match self {
2288            CollectionKind::Stream {
2289                bound,
2290                element_type,
2291                ..
2292            } => CollectionKind::Stream {
2293                bound: bound.clone(),
2294                order: StreamOrder::TotalOrder,
2295                retry: StreamRetry::ExactlyOnce,
2296                element_type: element_type.clone(),
2297            },
2298            CollectionKind::KeyedStream {
2299                bound,
2300                key_type,
2301                value_type,
2302                ..
2303            } => CollectionKind::KeyedStream {
2304                bound: bound.clone(),
2305                value_order: StreamOrder::TotalOrder,
2306                value_retry: StreamRetry::ExactlyOnce,
2307                key_type: key_type.clone(),
2308                value_type: value_type.clone(),
2309            },
2310            other => other.clone(),
2311        }
2312    }
2313}
2314
2315#[derive(Clone, serde::Serialize)]
2316pub struct HydroIrMetadata {
2317    pub location_id: LocationId,
2318    pub collection_kind: CollectionKind,
2319    pub consistency: Option<ClusterConsistency>,
2320    pub cardinality: Option<usize>,
2321    pub tag: Option<String>,
2322    pub op: HydroIrOpMetadata,
2323}
2324
2325// HydroIrMetadata shouldn't be used to hash or compare
2326impl Hash for HydroIrMetadata {
2327    fn hash<H: Hasher>(&self, _: &mut H) {}
2328}
2329
2330impl PartialEq for HydroIrMetadata {
2331    fn eq(&self, _: &Self) -> bool {
2332        true
2333    }
2334}
2335
2336impl Eq for HydroIrMetadata {}
2337
2338impl Debug for HydroIrMetadata {
2339    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2340        f.debug_struct("HydroIrMetadata")
2341            .field("location_id", &self.location_id)
2342            .field("collection_kind", &self.collection_kind)
2343            .finish()
2344    }
2345}
2346
2347/// Metadata that is specific to the operator itself, rather than its outputs.
2348/// This is available on _both_ inner nodes and roots.
2349#[derive(Clone, serde::Serialize)]
2350pub struct HydroIrOpMetadata {
2351    #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2352    pub backtrace: Backtrace,
2353    pub cpu_usage: Option<f64>,
2354    pub network_recv_cpu_usage: Option<f64>,
2355    pub id: Option<usize>,
2356}
2357
2358impl HydroIrOpMetadata {
2359    #[expect(
2360        clippy::new_without_default,
2361        reason = "explicit calls to new ensure correct backtrace bounds"
2362    )]
2363    pub fn new() -> HydroIrOpMetadata {
2364        Self::new_with_skip(1)
2365    }
2366
2367    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2368        HydroIrOpMetadata {
2369            backtrace: Backtrace::get_backtrace(2 + skip_count),
2370            cpu_usage: None,
2371            network_recv_cpu_usage: None,
2372            id: None,
2373        }
2374    }
2375}
2376
2377impl Debug for HydroIrOpMetadata {
2378    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2379        f.debug_struct("HydroIrOpMetadata").finish()
2380    }
2381}
2382
2383impl Hash for HydroIrOpMetadata {
2384    fn hash<H: Hasher>(&self, _: &mut H) {}
2385}
2386
2387/// An intermediate node in a Hydro graph, which consumes data
2388/// from upstream nodes and emits data to downstream nodes.
2389#[derive(Debug, Hash, serde::Serialize)]
2390pub enum HydroNode {
2391    Placeholder,
2392
2393    /// Manually "casts" between two different collection kinds.
2394    ///
2395    /// Using this IR node requires special care, since it bypasses many of Hydro's core
2396    /// correctness checks. In particular, the user must ensure that every possible
2397    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
2398    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
2399    /// collection. This ensures that the simulator does not miss any possible outputs.
2400    Cast {
2401        inner: Box<HydroNode>,
2402        metadata: HydroIrMetadata,
2403    },
2404
2405    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
2406    /// interpretation of the input stream.
2407    ///
2408    /// In production, this simply passes through the input, but in simulation, this operator
2409    /// explicitly selects a randomized interpretation.
2410    ObserveNonDet {
2411        inner: Box<HydroNode>,
2412        trusted: bool, // if true, we do not need to simulate non-determinism
2413        metadata: HydroIrMetadata,
2414    },
2415
2416    Source {
2417        source: HydroSource,
2418        metadata: HydroIrMetadata,
2419    },
2420
2421    SingletonSource {
2422        value: DebugExpr,
2423        first_tick_only: bool,
2424        metadata: HydroIrMetadata,
2425    },
2426
2427    CycleSource {
2428        cycle_id: CycleId,
2429        metadata: HydroIrMetadata,
2430    },
2431
2432    Tee {
2433        inner: SharedNode,
2434        metadata: HydroIrMetadata,
2435    },
2436
2437    /// A reference materialization point. Wraps a SharedNode so that:
2438    /// - The pipe output delivers data to one consumer
2439    /// - `#var` references can borrow the value from the slot
2440    ///
2441    /// In DFIR codegen, emits `ident = inner_ident -> singleton()` or `-> optional()` or
2442    /// `-> handoff()` depending on `kind`.
2443    ///
2444    /// Uses the same `built_tees` dedup pattern as `Tee`.
2445    Reference {
2446        inner: SharedNode,
2447        kind: crate::handoff_ref::HandoffRefKind,
2448        access_counter: AccessCounter,
2449        metadata: HydroIrMetadata,
2450    },
2451
2452    Partition {
2453        inner: SharedNode,
2454        f: ClosureExpr,
2455        is_true: bool,
2456        metadata: HydroIrMetadata,
2457    },
2458
2459    BeginAtomic {
2460        inner: Box<HydroNode>,
2461        metadata: HydroIrMetadata,
2462    },
2463
2464    EndAtomic {
2465        inner: Box<HydroNode>,
2466        metadata: HydroIrMetadata,
2467    },
2468
2469    Batch {
2470        inner: Box<HydroNode>,
2471        metadata: HydroIrMetadata,
2472    },
2473
2474    YieldConcat {
2475        inner: Box<HydroNode>,
2476        metadata: HydroIrMetadata,
2477    },
2478
2479    Chain {
2480        first: Box<HydroNode>,
2481        second: Box<HydroNode>,
2482        metadata: HydroIrMetadata,
2483    },
2484
2485    MergeOrdered {
2486        first: Box<HydroNode>,
2487        second: Box<HydroNode>,
2488        metadata: HydroIrMetadata,
2489    },
2490
2491    ChainFirst {
2492        first: Box<HydroNode>,
2493        second: Box<HydroNode>,
2494        metadata: HydroIrMetadata,
2495    },
2496
2497    CrossProduct {
2498        left: Box<HydroNode>,
2499        right: Box<HydroNode>,
2500        metadata: HydroIrMetadata,
2501    },
2502
2503    CrossSingleton {
2504        left: Box<HydroNode>,
2505        right: Box<HydroNode>,
2506        metadata: HydroIrMetadata,
2507    },
2508
2509    Join {
2510        left: Box<HydroNode>,
2511        right: Box<HydroNode>,
2512        metadata: HydroIrMetadata,
2513    },
2514
2515    /// Asymmetric join where the right (build) side is bounded.
2516    /// The build side is accumulated (stratum-delayed) into a hash table,
2517    /// then the left (probe) side streams through preserving its ordering.
2518    JoinHalf {
2519        left: Box<HydroNode>,
2520        right: Box<HydroNode>,
2521        metadata: HydroIrMetadata,
2522    },
2523
2524    Difference {
2525        pos: Box<HydroNode>,
2526        neg: Box<HydroNode>,
2527        metadata: HydroIrMetadata,
2528    },
2529
2530    AntiJoin {
2531        pos: Box<HydroNode>,
2532        neg: Box<HydroNode>,
2533        metadata: HydroIrMetadata,
2534    },
2535
2536    ResolveFutures {
2537        input: Box<HydroNode>,
2538        metadata: HydroIrMetadata,
2539    },
2540    ResolveFuturesBlocking {
2541        input: Box<HydroNode>,
2542        metadata: HydroIrMetadata,
2543    },
2544    ResolveFuturesOrdered {
2545        input: Box<HydroNode>,
2546        metadata: HydroIrMetadata,
2547    },
2548
2549    Map {
2550        f: ClosureExpr,
2551        input: Box<HydroNode>,
2552        metadata: HydroIrMetadata,
2553    },
2554    FlatMap {
2555        f: ClosureExpr,
2556        input: Box<HydroNode>,
2557        metadata: HydroIrMetadata,
2558    },
2559    FlatMapStreamBlocking {
2560        f: ClosureExpr,
2561        input: Box<HydroNode>,
2562        metadata: HydroIrMetadata,
2563    },
2564    Filter {
2565        f: ClosureExpr,
2566        input: Box<HydroNode>,
2567        metadata: HydroIrMetadata,
2568    },
2569    FilterMap {
2570        f: ClosureExpr,
2571        input: Box<HydroNode>,
2572        metadata: HydroIrMetadata,
2573    },
2574
2575    DeferTick {
2576        input: Box<HydroNode>,
2577        metadata: HydroIrMetadata,
2578    },
2579    Enumerate {
2580        input: Box<HydroNode>,
2581        metadata: HydroIrMetadata,
2582    },
2583    Inspect {
2584        f: ClosureExpr,
2585        input: Box<HydroNode>,
2586        metadata: HydroIrMetadata,
2587    },
2588
2589    Unique {
2590        input: Box<HydroNode>,
2591        metadata: HydroIrMetadata,
2592    },
2593
2594    Sort {
2595        input: Box<HydroNode>,
2596        metadata: HydroIrMetadata,
2597    },
2598    Fold {
2599        init: ClosureExpr,
2600        acc: ClosureExpr,
2601        input: Box<HydroNode>,
2602        metadata: HydroIrMetadata,
2603    },
2604
2605    Scan {
2606        init: ClosureExpr,
2607        acc: ClosureExpr,
2608        input: Box<HydroNode>,
2609        metadata: HydroIrMetadata,
2610    },
2611    ScanAsyncBlocking {
2612        init: ClosureExpr,
2613        acc: ClosureExpr,
2614        input: Box<HydroNode>,
2615        metadata: HydroIrMetadata,
2616    },
2617    FoldKeyed {
2618        init: ClosureExpr,
2619        acc: ClosureExpr,
2620        input: Box<HydroNode>,
2621        metadata: HydroIrMetadata,
2622    },
2623
2624    Reduce {
2625        f: ClosureExpr,
2626        input: Box<HydroNode>,
2627        metadata: HydroIrMetadata,
2628    },
2629    ReduceKeyed {
2630        f: ClosureExpr,
2631        input: Box<HydroNode>,
2632        metadata: HydroIrMetadata,
2633    },
2634    ReduceKeyedWatermark {
2635        f: ClosureExpr,
2636        input: Box<HydroNode>,
2637        watermark: Box<HydroNode>,
2638        metadata: HydroIrMetadata,
2639    },
2640
2641    Network {
2642        name: Option<String>,
2643        networking_info: crate::networking::NetworkingInfo,
2644        serialize_fn: Option<DebugExpr>,
2645        instantiate_fn: DebugInstantiate,
2646        deserialize_fn: Option<DebugExpr>,
2647        input: Box<HydroNode>,
2648        metadata: HydroIrMetadata,
2649    },
2650
2651    ExternalInput {
2652        from_external_key: LocationKey,
2653        from_port_id: ExternalPortId,
2654        from_many: bool,
2655        codec_type: DebugType,
2656        #[serde(skip)]
2657        port_hint: NetworkHint,
2658        instantiate_fn: DebugInstantiate,
2659        deserialize_fn: Option<DebugExpr>,
2660        metadata: HydroIrMetadata,
2661    },
2662
2663    Counter {
2664        tag: String,
2665        duration: DebugExpr,
2666        prefix: String,
2667        input: Box<HydroNode>,
2668        metadata: HydroIrMetadata,
2669    },
2670
2671    AssertIsConsistent {
2672        inner: Box<HydroNode>,
2673        trusted: bool,
2674        metadata: HydroIrMetadata,
2675    },
2676
2677    UnboundSingleton {
2678        inner: Box<HydroNode>,
2679        metadata: HydroIrMetadata,
2680    },
2681}
2682
2683pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2684pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2685
2686/// If `f` has a mut singleton ref and `in_kind` is non-strict, emits an
2687/// `observe_for_mut` node and returns the new ident. Otherwise returns
2688/// `in_ident` unchanged. Always consumes a stmt_id when applicable.
2689#[cfg(feature = "build")]
2690fn maybe_observe_for_mut(
2691    f: &ClosureExpr,
2692    in_ident: syn::Ident,
2693    in_location: &LocationId,
2694    in_kind: &CollectionKind,
2695    op_meta: &HydroIrOpMetadata,
2696    builders_or_callback: &mut BuildersOrCallback<
2697        impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
2698        impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
2699    >,
2700    next_stmt_id: &mut crate::Counter<StmtId>,
2701) -> syn::Ident {
2702    if f.has_mut_ref() && !in_kind.is_strict() {
2703        let observe_stmt_id = next_stmt_id.get_and_increment();
2704        let observe_ident =
2705            syn::Ident::new(&format!("stream_{}", observe_stmt_id), Span::call_site());
2706        if let BuildersOrCallback::Builders(graph_builders) = builders_or_callback {
2707            graph_builders.observe_for_mut(in_location, in_ident, in_kind, &observe_ident, op_meta);
2708        }
2709        observe_ident
2710    } else {
2711        in_ident
2712    }
2713}
2714
2715impl HydroNode {
2716    pub fn transform_bottom_up(
2717        &mut self,
2718        transform: &mut impl FnMut(&mut HydroNode),
2719        seen_tees: &mut SeenSharedNodes,
2720        check_well_formed: bool,
2721    ) {
2722        self.transform_children(
2723            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2724            seen_tees,
2725        );
2726
2727        transform(self);
2728
2729        let self_location = self.metadata().location_id.root();
2730
2731        if check_well_formed {
2732            match &*self {
2733                HydroNode::Network { .. } => {}
2734                _ => {
2735                    self.input_metadata().iter().for_each(|i| {
2736                        if i.location_id.root() != self_location {
2737                            panic!(
2738                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2739                                i,
2740                                i.location_id.root(),
2741                                self,
2742                                self_location
2743                            )
2744                        }
2745                    });
2746                }
2747            }
2748        }
2749    }
2750
2751    #[inline(always)]
2752    pub fn transform_children(
2753        &mut self,
2754        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2755        seen_tees: &mut SeenSharedNodes,
2756    ) {
2757        match self {
2758            HydroNode::Placeholder => {
2759                panic!();
2760            }
2761
2762            HydroNode::Source { .. }
2763            | HydroNode::SingletonSource { .. }
2764            | HydroNode::CycleSource { .. }
2765            | HydroNode::ExternalInput { .. } => {}
2766
2767            HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => {
2768                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2769                    *inner = SharedNode(transformed.clone());
2770                } else {
2771                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2772                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2773                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2774                    transform(&mut orig, seen_tees);
2775                    *transformed_cell.borrow_mut() = orig;
2776                    *inner = SharedNode(transformed_cell);
2777                }
2778            }
2779
2780            HydroNode::Partition { inner, f, .. } => {
2781                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2782                    *inner = SharedNode(transformed.clone());
2783                } else {
2784                    f.transform_children(&mut transform, seen_tees);
2785                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2786                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2787                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2788                    transform(&mut orig, seen_tees);
2789                    *transformed_cell.borrow_mut() = orig;
2790                    *inner = SharedNode(transformed_cell);
2791                }
2792            }
2793
2794            HydroNode::Cast { inner, .. }
2795            | HydroNode::ObserveNonDet { inner, .. }
2796            | HydroNode::BeginAtomic { inner, .. }
2797            | HydroNode::EndAtomic { inner, .. }
2798            | HydroNode::Batch { inner, .. }
2799            | HydroNode::YieldConcat { inner, .. }
2800            | HydroNode::UnboundSingleton { inner, .. }
2801            | HydroNode::AssertIsConsistent { inner, .. } => {
2802                transform(inner.as_mut(), seen_tees);
2803            }
2804
2805            HydroNode::Chain { first, second, .. } => {
2806                transform(first.as_mut(), seen_tees);
2807                transform(second.as_mut(), seen_tees);
2808            }
2809
2810            HydroNode::MergeOrdered { first, second, .. } => {
2811                transform(first.as_mut(), seen_tees);
2812                transform(second.as_mut(), seen_tees);
2813            }
2814
2815            HydroNode::ChainFirst { first, second, .. } => {
2816                transform(first.as_mut(), seen_tees);
2817                transform(second.as_mut(), seen_tees);
2818            }
2819
2820            HydroNode::CrossSingleton { left, right, .. }
2821            | HydroNode::CrossProduct { left, right, .. }
2822            | HydroNode::Join { left, right, .. }
2823            | HydroNode::JoinHalf { left, right, .. } => {
2824                transform(left.as_mut(), seen_tees);
2825                transform(right.as_mut(), seen_tees);
2826            }
2827
2828            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2829                transform(pos.as_mut(), seen_tees);
2830                transform(neg.as_mut(), seen_tees);
2831            }
2832
2833            HydroNode::Map { f, input, .. } => {
2834                f.transform_children(&mut transform, seen_tees);
2835                transform(input.as_mut(), seen_tees);
2836            }
2837            HydroNode::FlatMap { f, input, .. }
2838            | HydroNode::FlatMapStreamBlocking { f, input, .. }
2839            | HydroNode::Filter { f, input, .. }
2840            | HydroNode::FilterMap { f, input, .. }
2841            | HydroNode::Inspect { f, input, .. }
2842            | HydroNode::Reduce { f, input, .. }
2843            | HydroNode::ReduceKeyed { f, input, .. } => {
2844                f.transform_children(&mut transform, seen_tees);
2845                transform(input.as_mut(), seen_tees);
2846            }
2847            HydroNode::ReduceKeyedWatermark {
2848                f,
2849                input,
2850                watermark,
2851                ..
2852            } => {
2853                f.transform_children(&mut transform, seen_tees);
2854                transform(input.as_mut(), seen_tees);
2855                transform(watermark.as_mut(), seen_tees);
2856            }
2857            HydroNode::Fold {
2858                init, acc, input, ..
2859            }
2860            | HydroNode::Scan {
2861                init, acc, input, ..
2862            }
2863            | HydroNode::ScanAsyncBlocking {
2864                init, acc, input, ..
2865            }
2866            | HydroNode::FoldKeyed {
2867                init, acc, input, ..
2868            } => {
2869                init.transform_children(&mut transform, seen_tees);
2870                acc.transform_children(&mut transform, seen_tees);
2871                transform(input.as_mut(), seen_tees);
2872            }
2873            HydroNode::ResolveFutures { input, .. }
2874            | HydroNode::ResolveFuturesBlocking { input, .. }
2875            | HydroNode::ResolveFuturesOrdered { input, .. }
2876            | HydroNode::Sort { input, .. }
2877            | HydroNode::DeferTick { input, .. }
2878            | HydroNode::Enumerate { input, .. }
2879            | HydroNode::Unique { input, .. }
2880            | HydroNode::Network { input, .. }
2881            | HydroNode::Counter { input, .. } => {
2882                transform(input.as_mut(), seen_tees);
2883            }
2884        }
2885    }
2886
2887    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2888        match self {
2889            HydroNode::Placeholder => HydroNode::Placeholder,
2890            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2891                inner: Box::new(inner.deep_clone(seen_tees)),
2892                metadata: metadata.clone(),
2893            },
2894            HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2895                inner: Box::new(inner.deep_clone(seen_tees)),
2896                metadata: metadata.clone(),
2897            },
2898            HydroNode::ObserveNonDet {
2899                inner,
2900                trusted,
2901                metadata,
2902            } => HydroNode::ObserveNonDet {
2903                inner: Box::new(inner.deep_clone(seen_tees)),
2904                trusted: *trusted,
2905                metadata: metadata.clone(),
2906            },
2907            HydroNode::AssertIsConsistent {
2908                inner,
2909                trusted,
2910                metadata,
2911            } => HydroNode::AssertIsConsistent {
2912                inner: Box::new(inner.deep_clone(seen_tees)),
2913                trusted: *trusted,
2914                metadata: metadata.clone(),
2915            },
2916            HydroNode::Source { source, metadata } => HydroNode::Source {
2917                source: source.clone(),
2918                metadata: metadata.clone(),
2919            },
2920            HydroNode::SingletonSource {
2921                value,
2922                first_tick_only,
2923                metadata,
2924            } => HydroNode::SingletonSource {
2925                value: value.clone(),
2926                first_tick_only: *first_tick_only,
2927                metadata: metadata.clone(),
2928            },
2929            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2930                cycle_id: *cycle_id,
2931                metadata: metadata.clone(),
2932            },
2933            HydroNode::Tee { inner, metadata }
2934            | HydroNode::Reference {
2935                inner, metadata, ..
2936            } => {
2937                let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2938                    SharedNode(transformed.clone())
2939                } else {
2940                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2941                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2942                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2943                    *new_rc.borrow_mut() = cloned;
2944                    SharedNode(new_rc)
2945                };
2946                if let HydroNode::Reference {
2947                    kind,
2948                    access_counter,
2949                    ..
2950                } = self
2951                {
2952                    HydroNode::Reference {
2953                        inner: cloned_inner,
2954                        kind: *kind,
2955                        access_counter: access_counter.freeze(),
2956                        metadata: metadata.clone(),
2957                    }
2958                } else {
2959                    HydroNode::Tee {
2960                        inner: cloned_inner,
2961                        metadata: metadata.clone(),
2962                    }
2963                }
2964            }
2965            HydroNode::Partition {
2966                inner,
2967                f,
2968                is_true,
2969                metadata,
2970            } => {
2971                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2972                    HydroNode::Partition {
2973                        inner: SharedNode(transformed.clone()),
2974                        f: f.deep_clone(seen_tees),
2975                        is_true: *is_true,
2976                        metadata: metadata.clone(),
2977                    }
2978                } else {
2979                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2980                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2981                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2982                    *new_rc.borrow_mut() = cloned;
2983                    HydroNode::Partition {
2984                        inner: SharedNode(new_rc),
2985                        f: f.deep_clone(seen_tees),
2986                        is_true: *is_true,
2987                        metadata: metadata.clone(),
2988                    }
2989                }
2990            }
2991            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2992                inner: Box::new(inner.deep_clone(seen_tees)),
2993                metadata: metadata.clone(),
2994            },
2995            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2996                inner: Box::new(inner.deep_clone(seen_tees)),
2997                metadata: metadata.clone(),
2998            },
2999            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
3000                inner: Box::new(inner.deep_clone(seen_tees)),
3001                metadata: metadata.clone(),
3002            },
3003            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
3004                inner: Box::new(inner.deep_clone(seen_tees)),
3005                metadata: metadata.clone(),
3006            },
3007            HydroNode::Chain {
3008                first,
3009                second,
3010                metadata,
3011            } => HydroNode::Chain {
3012                first: Box::new(first.deep_clone(seen_tees)),
3013                second: Box::new(second.deep_clone(seen_tees)),
3014                metadata: metadata.clone(),
3015            },
3016            HydroNode::MergeOrdered {
3017                first,
3018                second,
3019                metadata,
3020            } => HydroNode::MergeOrdered {
3021                first: Box::new(first.deep_clone(seen_tees)),
3022                second: Box::new(second.deep_clone(seen_tees)),
3023                metadata: metadata.clone(),
3024            },
3025            HydroNode::ChainFirst {
3026                first,
3027                second,
3028                metadata,
3029            } => HydroNode::ChainFirst {
3030                first: Box::new(first.deep_clone(seen_tees)),
3031                second: Box::new(second.deep_clone(seen_tees)),
3032                metadata: metadata.clone(),
3033            },
3034            HydroNode::CrossProduct {
3035                left,
3036                right,
3037                metadata,
3038            } => HydroNode::CrossProduct {
3039                left: Box::new(left.deep_clone(seen_tees)),
3040                right: Box::new(right.deep_clone(seen_tees)),
3041                metadata: metadata.clone(),
3042            },
3043            HydroNode::CrossSingleton {
3044                left,
3045                right,
3046                metadata,
3047            } => HydroNode::CrossSingleton {
3048                left: Box::new(left.deep_clone(seen_tees)),
3049                right: Box::new(right.deep_clone(seen_tees)),
3050                metadata: metadata.clone(),
3051            },
3052            HydroNode::Join {
3053                left,
3054                right,
3055                metadata,
3056            } => HydroNode::Join {
3057                left: Box::new(left.deep_clone(seen_tees)),
3058                right: Box::new(right.deep_clone(seen_tees)),
3059                metadata: metadata.clone(),
3060            },
3061            HydroNode::JoinHalf {
3062                left,
3063                right,
3064                metadata,
3065            } => HydroNode::JoinHalf {
3066                left: Box::new(left.deep_clone(seen_tees)),
3067                right: Box::new(right.deep_clone(seen_tees)),
3068                metadata: metadata.clone(),
3069            },
3070            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
3071                pos: Box::new(pos.deep_clone(seen_tees)),
3072                neg: Box::new(neg.deep_clone(seen_tees)),
3073                metadata: metadata.clone(),
3074            },
3075            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
3076                pos: Box::new(pos.deep_clone(seen_tees)),
3077                neg: Box::new(neg.deep_clone(seen_tees)),
3078                metadata: metadata.clone(),
3079            },
3080            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
3081                input: Box::new(input.deep_clone(seen_tees)),
3082                metadata: metadata.clone(),
3083            },
3084            HydroNode::ResolveFuturesBlocking { input, metadata } => {
3085                HydroNode::ResolveFuturesBlocking {
3086                    input: Box::new(input.deep_clone(seen_tees)),
3087                    metadata: metadata.clone(),
3088                }
3089            }
3090            HydroNode::ResolveFuturesOrdered { input, metadata } => {
3091                HydroNode::ResolveFuturesOrdered {
3092                    input: Box::new(input.deep_clone(seen_tees)),
3093                    metadata: metadata.clone(),
3094                }
3095            }
3096            HydroNode::Map { f, input, metadata } => HydroNode::Map {
3097                f: f.deep_clone(seen_tees),
3098                input: Box::new(input.deep_clone(seen_tees)),
3099                metadata: metadata.clone(),
3100            },
3101            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
3102                f: f.deep_clone(seen_tees),
3103                input: Box::new(input.deep_clone(seen_tees)),
3104                metadata: metadata.clone(),
3105            },
3106            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
3107                HydroNode::FlatMapStreamBlocking {
3108                    f: f.deep_clone(seen_tees),
3109                    input: Box::new(input.deep_clone(seen_tees)),
3110                    metadata: metadata.clone(),
3111                }
3112            }
3113            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
3114                f: f.deep_clone(seen_tees),
3115                input: Box::new(input.deep_clone(seen_tees)),
3116                metadata: metadata.clone(),
3117            },
3118            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
3119                f: f.deep_clone(seen_tees),
3120                input: Box::new(input.deep_clone(seen_tees)),
3121                metadata: metadata.clone(),
3122            },
3123            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
3124                input: Box::new(input.deep_clone(seen_tees)),
3125                metadata: metadata.clone(),
3126            },
3127            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
3128                input: Box::new(input.deep_clone(seen_tees)),
3129                metadata: metadata.clone(),
3130            },
3131            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
3132                f: f.deep_clone(seen_tees),
3133                input: Box::new(input.deep_clone(seen_tees)),
3134                metadata: metadata.clone(),
3135            },
3136            HydroNode::Unique { input, metadata } => HydroNode::Unique {
3137                input: Box::new(input.deep_clone(seen_tees)),
3138                metadata: metadata.clone(),
3139            },
3140            HydroNode::Sort { input, metadata } => HydroNode::Sort {
3141                input: Box::new(input.deep_clone(seen_tees)),
3142                metadata: metadata.clone(),
3143            },
3144            HydroNode::Fold {
3145                init,
3146                acc,
3147                input,
3148                metadata,
3149            } => HydroNode::Fold {
3150                init: init.deep_clone(seen_tees),
3151                acc: acc.deep_clone(seen_tees),
3152                input: Box::new(input.deep_clone(seen_tees)),
3153                metadata: metadata.clone(),
3154            },
3155            HydroNode::Scan {
3156                init,
3157                acc,
3158                input,
3159                metadata,
3160            } => HydroNode::Scan {
3161                init: init.deep_clone(seen_tees),
3162                acc: acc.deep_clone(seen_tees),
3163                input: Box::new(input.deep_clone(seen_tees)),
3164                metadata: metadata.clone(),
3165            },
3166            HydroNode::ScanAsyncBlocking {
3167                init,
3168                acc,
3169                input,
3170                metadata,
3171            } => HydroNode::ScanAsyncBlocking {
3172                init: init.deep_clone(seen_tees),
3173                acc: acc.deep_clone(seen_tees),
3174                input: Box::new(input.deep_clone(seen_tees)),
3175                metadata: metadata.clone(),
3176            },
3177            HydroNode::FoldKeyed {
3178                init,
3179                acc,
3180                input,
3181                metadata,
3182            } => HydroNode::FoldKeyed {
3183                init: init.deep_clone(seen_tees),
3184                acc: acc.deep_clone(seen_tees),
3185                input: Box::new(input.deep_clone(seen_tees)),
3186                metadata: metadata.clone(),
3187            },
3188            HydroNode::ReduceKeyedWatermark {
3189                f,
3190                input,
3191                watermark,
3192                metadata,
3193            } => HydroNode::ReduceKeyedWatermark {
3194                f: f.deep_clone(seen_tees),
3195                input: Box::new(input.deep_clone(seen_tees)),
3196                watermark: Box::new(watermark.deep_clone(seen_tees)),
3197                metadata: metadata.clone(),
3198            },
3199            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3200                f: f.deep_clone(seen_tees),
3201                input: Box::new(input.deep_clone(seen_tees)),
3202                metadata: metadata.clone(),
3203            },
3204            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3205                f: f.deep_clone(seen_tees),
3206                input: Box::new(input.deep_clone(seen_tees)),
3207                metadata: metadata.clone(),
3208            },
3209            HydroNode::Network {
3210                name,
3211                networking_info,
3212                serialize_fn,
3213                instantiate_fn,
3214                deserialize_fn,
3215                input,
3216                metadata,
3217            } => HydroNode::Network {
3218                name: name.clone(),
3219                networking_info: networking_info.clone(),
3220                serialize_fn: serialize_fn.clone(),
3221                instantiate_fn: instantiate_fn.clone(),
3222                deserialize_fn: deserialize_fn.clone(),
3223                input: Box::new(input.deep_clone(seen_tees)),
3224                metadata: metadata.clone(),
3225            },
3226            HydroNode::ExternalInput {
3227                from_external_key,
3228                from_port_id,
3229                from_many,
3230                codec_type,
3231                port_hint,
3232                instantiate_fn,
3233                deserialize_fn,
3234                metadata,
3235            } => HydroNode::ExternalInput {
3236                from_external_key: *from_external_key,
3237                from_port_id: *from_port_id,
3238                from_many: *from_many,
3239                codec_type: codec_type.clone(),
3240                port_hint: *port_hint,
3241                instantiate_fn: instantiate_fn.clone(),
3242                deserialize_fn: deserialize_fn.clone(),
3243                metadata: metadata.clone(),
3244            },
3245            HydroNode::Counter {
3246                tag,
3247                duration,
3248                prefix,
3249                input,
3250                metadata,
3251            } => HydroNode::Counter {
3252                tag: tag.clone(),
3253                duration: duration.clone(),
3254                prefix: prefix.clone(),
3255                input: Box::new(input.deep_clone(seen_tees)),
3256                metadata: metadata.clone(),
3257            },
3258        }
3259    }
3260
3261    #[cfg(feature = "build")]
3262    pub fn emit_core(
3263        &mut self,
3264        builders_or_callback: &mut BuildersOrCallback<
3265            impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
3266            impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
3267        >,
3268        seen_tees: &mut SeenSharedNodes,
3269        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3270        next_stmt_id: &mut crate::Counter<StmtId>,
3271        fold_hooked_idents: &mut HashSet<String>,
3272    ) -> syn::Ident {
3273        let mut ident_stack: Vec<syn::Ident> = Vec::new();
3274
3275        self.transform_bottom_up(
3276            &mut |node: &mut HydroNode| {
3277                let out_location = node.metadata().location_id.clone();
3278                match node {
3279                    HydroNode::Placeholder => {
3280                        panic!()
3281                    }
3282
3283                    HydroNode::Cast { .. } => {
3284                        // Cast passes through the input ident unchanged
3285                        // The input ident is already on the stack from processing the child
3286                        let _ = next_stmt_id.get_and_increment();
3287                        match builders_or_callback {
3288                            BuildersOrCallback::Builders(_) => {}
3289                            BuildersOrCallback::Callback(_, node_callback) => {
3290                                node_callback(node, next_stmt_id);
3291                            }
3292                        }
3293                        // input_ident stays on stack as output
3294                    }
3295
3296                    HydroNode::UnboundSingleton { .. } => {
3297                        let inner_ident = ident_stack.pop().unwrap();
3298
3299                        let stmt_id = next_stmt_id.get_and_increment();
3300                        let out_ident =
3301                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3302
3303                        match builders_or_callback {
3304                            BuildersOrCallback::Builders(graph_builders) => {
3305                                if graph_builders.singleton_intermediates() {
3306                                    let builder = graph_builders.get_dfir_mut(&out_location);
3307                                    builder.add_dfir(
3308                                        parse_quote! {
3309                                            #out_ident = #inner_ident;
3310                                        },
3311                                        None,
3312                                        None,
3313                                    );
3314                                } else {
3315                                    let builder = graph_builders.get_dfir_mut(&out_location);
3316                                    builder.add_dfir(
3317                                        parse_quote! {
3318                                            #out_ident = #inner_ident -> persist::<'static>();
3319                                        },
3320                                        None,
3321                                        None,
3322                                    );
3323                                }
3324                            }
3325                            BuildersOrCallback::Callback(_, node_callback) => {
3326                                node_callback(node, next_stmt_id);
3327                            }
3328                        }
3329
3330                        ident_stack.push(out_ident);
3331                    }
3332
3333                    HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3334                        let inner_ident = ident_stack.pop().unwrap();
3335
3336                        let stmt_id = next_stmt_id.get_and_increment();
3337                        let out_ident =
3338                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3339
3340                        match builders_or_callback {
3341                            BuildersOrCallback::Builders(graph_builders) => {
3342                                graph_builders.assert_is_consistent(
3343                                    *trusted,
3344                                    &inner.metadata().location_id,
3345                                    inner_ident,
3346                                    &out_ident,
3347                                );
3348                            }
3349                            BuildersOrCallback::Callback(_, node_callback) => {
3350                                node_callback(node, next_stmt_id);
3351                            }
3352                        }
3353
3354                        ident_stack.push(out_ident);
3355                    }
3356
3357                    HydroNode::ObserveNonDet {
3358                        inner,
3359                        trusted,
3360                        metadata,
3361                        ..
3362                    } => {
3363                        let inner_ident = ident_stack.pop().unwrap();
3364
3365                        let stmt_id = next_stmt_id.get_and_increment();
3366                        let observe_ident =
3367                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3368
3369                        match builders_or_callback {
3370                            BuildersOrCallback::Builders(graph_builders) => {
3371                                graph_builders.observe_nondet(
3372                                    *trusted,
3373                                    &inner.metadata().location_id,
3374                                    inner_ident,
3375                                    &inner.metadata().collection_kind,
3376                                    &observe_ident,
3377                                    &metadata.collection_kind,
3378                                    &metadata.op,
3379                                );
3380                            }
3381                            BuildersOrCallback::Callback(_, node_callback) => {
3382                                node_callback(node, next_stmt_id);
3383                            }
3384                        }
3385
3386                        ident_stack.push(observe_ident);
3387                    }
3388
3389                    HydroNode::Batch {
3390                        inner, metadata, ..
3391                    } => {
3392                        let inner_ident = ident_stack.pop().unwrap();
3393
3394                        let stmt_id = next_stmt_id.get_and_increment();
3395                        let batch_ident =
3396                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3397
3398                        match builders_or_callback {
3399                            BuildersOrCallback::Builders(graph_builders) => {
3400                                graph_builders.batch(
3401                                    inner_ident,
3402                                    &inner.metadata().location_id,
3403                                    &inner.metadata().collection_kind,
3404                                    &batch_ident,
3405                                    &out_location,
3406                                    &metadata.op,
3407                                    fold_hooked_idents,
3408                                );
3409                            }
3410                            BuildersOrCallback::Callback(_, node_callback) => {
3411                                node_callback(node, next_stmt_id);
3412                            }
3413                        }
3414
3415                        ident_stack.push(batch_ident);
3416                    }
3417
3418                    HydroNode::YieldConcat { inner, .. } => {
3419                        let inner_ident = ident_stack.pop().unwrap();
3420
3421                        let stmt_id = next_stmt_id.get_and_increment();
3422                        let yield_ident =
3423                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3424
3425                        match builders_or_callback {
3426                            BuildersOrCallback::Builders(graph_builders) => {
3427                                graph_builders.yield_from_tick(
3428                                    inner_ident,
3429                                    &inner.metadata().location_id,
3430                                    &inner.metadata().collection_kind,
3431                                    &yield_ident,
3432                                    &out_location,
3433                                );
3434                            }
3435                            BuildersOrCallback::Callback(_, node_callback) => {
3436                                node_callback(node, next_stmt_id);
3437                            }
3438                        }
3439
3440                        ident_stack.push(yield_ident);
3441                    }
3442
3443                    HydroNode::BeginAtomic { inner, metadata } => {
3444                        let inner_ident = ident_stack.pop().unwrap();
3445
3446                        let stmt_id = next_stmt_id.get_and_increment();
3447                        let begin_ident =
3448                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3449
3450                        match builders_or_callback {
3451                            BuildersOrCallback::Builders(graph_builders) => {
3452                                graph_builders.begin_atomic(
3453                                    inner_ident,
3454                                    &inner.metadata().location_id,
3455                                    &inner.metadata().collection_kind,
3456                                    &begin_ident,
3457                                    &out_location,
3458                                    &metadata.op,
3459                                );
3460                            }
3461                            BuildersOrCallback::Callback(_, node_callback) => {
3462                                node_callback(node, next_stmt_id);
3463                            }
3464                        }
3465
3466                        ident_stack.push(begin_ident);
3467                    }
3468
3469                    HydroNode::EndAtomic { inner, .. } => {
3470                        let inner_ident = ident_stack.pop().unwrap();
3471
3472                        let stmt_id = next_stmt_id.get_and_increment();
3473                        let end_ident =
3474                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3475
3476                        match builders_or_callback {
3477                            BuildersOrCallback::Builders(graph_builders) => {
3478                                graph_builders.end_atomic(
3479                                    inner_ident,
3480                                    &inner.metadata().location_id,
3481                                    &inner.metadata().collection_kind,
3482                                    &end_ident,
3483                                );
3484                            }
3485                            BuildersOrCallback::Callback(_, node_callback) => {
3486                                node_callback(node, next_stmt_id);
3487                            }
3488                        }
3489
3490                        ident_stack.push(end_ident);
3491                    }
3492
3493                    HydroNode::Source {
3494                        source, metadata, ..
3495                    } => {
3496                        if let HydroSource::ExternalNetwork() = source {
3497                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3498                        } else {
3499                            let stmt_id = next_stmt_id.get_and_increment();
3500                            let source_ident =
3501                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3502
3503                            let source_stmt = match source {
3504                                HydroSource::Stream(expr) => {
3505                                    debug_assert!(metadata.location_id.is_top_level());
3506                                    parse_quote! {
3507                                        #source_ident = source_stream(#expr);
3508                                    }
3509                                }
3510
3511                                HydroSource::ExternalNetwork() => {
3512                                    unreachable!()
3513                                }
3514
3515                                HydroSource::Iter(expr) => {
3516                                    if metadata.location_id.is_top_level() {
3517                                        parse_quote! {
3518                                            #source_ident = source_iter(#expr);
3519                                        }
3520                                    } else {
3521                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
3522                                        parse_quote! {
3523                                            #source_ident = source_iter(#expr) -> persist::<'static>();
3524                                        }
3525                                    }
3526                                }
3527
3528                                HydroSource::Spin() => {
3529                                    debug_assert!(metadata.location_id.is_top_level());
3530                                    parse_quote! {
3531                                        #source_ident = spin();
3532                                    }
3533                                }
3534
3535                                HydroSource::ClusterMembers(target_loc, state) => {
3536                                    debug_assert!(metadata.location_id.is_top_level());
3537
3538                                    let members_tee_ident = syn::Ident::new(
3539                                        &format!(
3540                                            "__cluster_members_tee_{}_{}",
3541                                            metadata.location_id.root().key(),
3542                                            target_loc.key(),
3543                                        ),
3544                                        Span::call_site(),
3545                                    );
3546
3547                                    match state {
3548                                        ClusterMembersState::Stream(d) => {
3549                                            parse_quote! {
3550                                                #members_tee_ident = source_stream(#d) -> tee();
3551                                                #source_ident = #members_tee_ident;
3552                                            }
3553                                        },
3554                                        ClusterMembersState::Uninit => syn::parse_quote! {
3555                                            #source_ident = source_stream(DUMMY);
3556                                        },
3557                                        ClusterMembersState::Tee(..) => parse_quote! {
3558                                            #source_ident = #members_tee_ident;
3559                                        },
3560                                    }
3561                                }
3562
3563                                HydroSource::Embedded(ident) => {
3564                                    parse_quote! {
3565                                        #source_ident = source_stream(#ident);
3566                                    }
3567                                }
3568
3569                                HydroSource::EmbeddedSingleton(ident) => {
3570                                    parse_quote! {
3571                                        #source_ident = source_iter([#ident]);
3572                                    }
3573                                }
3574                            };
3575
3576                            match builders_or_callback {
3577                                BuildersOrCallback::Builders(graph_builders) => {
3578                                    let builder = graph_builders.get_dfir_mut(&out_location);
3579                                    builder.add_dfir(source_stmt, None, Some(&stmt_id.to_string()));
3580                                }
3581                                BuildersOrCallback::Callback(_, node_callback) => {
3582                                    node_callback(node, next_stmt_id);
3583                                }
3584                            }
3585
3586                            ident_stack.push(source_ident);
3587                        }
3588                    }
3589
3590                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3591                        let stmt_id = next_stmt_id.get_and_increment();
3592                        let source_ident =
3593                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3594
3595                        match builders_or_callback {
3596                            BuildersOrCallback::Builders(graph_builders) => {
3597                                let builder = graph_builders.get_dfir_mut(&out_location);
3598
3599                                if *first_tick_only {
3600                                    assert!(
3601                                        !metadata.location_id.is_top_level(),
3602                                        "first_tick_only SingletonSource must be inside a tick"
3603                                    );
3604                                }
3605
3606                                if *first_tick_only
3607                                    || (metadata.location_id.is_top_level()
3608                                        && metadata.collection_kind.is_bounded())
3609                                {
3610                                    builder.add_dfir(
3611                                        parse_quote! {
3612                                            #source_ident = source_iter([#value]);
3613                                        },
3614                                        None,
3615                                        Some(&stmt_id.to_string()),
3616                                    );
3617                                } else {
3618                                    builder.add_dfir(
3619                                        parse_quote! {
3620                                            #source_ident = source_iter([#value]) -> persist::<'static>();
3621                                        },
3622                                        None,
3623                                        Some(&stmt_id.to_string()),
3624                                    );
3625                                }
3626                            }
3627                            BuildersOrCallback::Callback(_, node_callback) => {
3628                                node_callback(node, next_stmt_id);
3629                            }
3630                        }
3631
3632                        ident_stack.push(source_ident);
3633                    }
3634
3635                    HydroNode::CycleSource { cycle_id, .. } => {
3636                        let ident = cycle_id.as_ident();
3637
3638                        // consume a stmt id even though we did not emit anything so that we can instrument this
3639                        let _ = next_stmt_id.get_and_increment();
3640
3641                        match builders_or_callback {
3642                            BuildersOrCallback::Builders(_) => {}
3643                            BuildersOrCallback::Callback(_, node_callback) => {
3644                                node_callback(node, next_stmt_id);
3645                            }
3646                        }
3647
3648                        ident_stack.push(ident);
3649                    }
3650
3651                    HydroNode::Tee { inner, .. } => {
3652                        // we consume a stmt id regardless of if we emit the tee() operator,
3653                        // so that during rewrites we touch all recipients of the tee()
3654                        let stmt_id = next_stmt_id.get_and_increment();
3655
3656                        let ret_ident = if let Some(built_idents) =
3657                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3658                        {
3659                            match builders_or_callback {
3660                                BuildersOrCallback::Builders(_) => {}
3661                                BuildersOrCallback::Callback(_, node_callback) => {
3662                                    node_callback(node, next_stmt_id);
3663                                }
3664                            }
3665
3666                            built_idents[0].clone()
3667                        } else {
3668                            // The inner node was already processed by transform_bottom_up,
3669                            // so its ident is on the stack
3670                            let inner_ident = ident_stack.pop().unwrap();
3671
3672                            let tee_ident =
3673                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3674
3675                            built_tees.insert(
3676                                inner.0.as_ref() as *const RefCell<HydroNode>,
3677                                vec![tee_ident.clone()],
3678                            );
3679
3680                            match builders_or_callback {
3681                                BuildersOrCallback::Builders(graph_builders) => {
3682                                    // NOTE: With `forward_ref`, the fold codegen may not have
3683                                    // run yet when we reach this tee, so `fold_hooked_idents`
3684                                    // might not contain the inner ident. In that case we won't
3685                                    // propagate the "hooked" status to the tee and the
3686                                    // downstream singleton batch will use the normal
3687                                    // `SingletonHook` instead of `PassthroughSingletonHook`.
3688                                    // This is not a soundness issue: the fallback hook still
3689                                    // produces correct behavior, just with a redundant decision
3690                                    // point. TODO(https://github.com/hydro-project/hydro/issues/2856):
3691                                    // fix ordering so forward_ref folds are always processed
3692                                    // before their downstream tees.
3693                                    if fold_hooked_idents.contains(&inner_ident.to_string()) {
3694                                        fold_hooked_idents.insert(tee_ident.to_string());
3695                                    }
3696                                    let builder = graph_builders.get_dfir_mut(&out_location);
3697                                    builder.add_dfir(
3698                                        parse_quote! {
3699                                            #tee_ident = #inner_ident -> tee();
3700                                        },
3701                                        None,
3702                                        Some(&stmt_id.to_string()),
3703                                    );
3704                                }
3705                                BuildersOrCallback::Callback(_, node_callback) => {
3706                                    node_callback(node, next_stmt_id);
3707                                }
3708                            }
3709
3710                            tee_ident
3711                        };
3712
3713                        ident_stack.push(ret_ident);
3714                    }
3715
3716                    HydroNode::Reference { inner, kind, .. } => {
3717                        // we consume a stmt id regardless of if we emit the operator,
3718                        // so that during rewrites we touch all recipients
3719                        let stmt_id = next_stmt_id.get_and_increment();
3720
3721                        let ret_ident = if let Some(built_idents) =
3722                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3723                        {
3724                            built_idents[0].clone()
3725                        } else {
3726                            let inner_ident = ident_stack.pop().unwrap();
3727
3728                            let ref_ident =
3729                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3730
3731                            built_tees.insert(
3732                                inner.0.as_ref() as *const RefCell<HydroNode>,
3733                                vec![ref_ident.clone()],
3734                            );
3735
3736                            match builders_or_callback {
3737                                BuildersOrCallback::Builders(graph_builders) => {
3738                                    let builder = graph_builders.get_dfir_mut(&out_location);
3739                                    let op_ident = syn::Ident::new(
3740                                        match kind {
3741                                            crate::handoff_ref::HandoffRefKind::Singleton => "singleton",
3742                                            crate::handoff_ref::HandoffRefKind::Optional => "optional",
3743                                            crate::handoff_ref::HandoffRefKind::Vec => "handoff",
3744                                        },
3745                                        Span::call_site(),
3746                                    );
3747                                    builder.add_dfir(
3748                                        parse_quote! {
3749                                            #ref_ident = #inner_ident -> #op_ident();
3750                                        },
3751                                        None,
3752                                        Some(&stmt_id.to_string()),
3753                                    );
3754                                }
3755                                BuildersOrCallback::Callback(_, node_callback) => {
3756                                    node_callback(node, next_stmt_id);
3757                                }
3758                            }
3759
3760                            ref_ident
3761                        };
3762
3763                        ident_stack.push(ret_ident);
3764                    }
3765
3766                    HydroNode::Partition {
3767                        inner, f, is_true, metadata,
3768                    } => {
3769                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
3770                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3771                        let stmt_id = next_stmt_id.get_and_increment();
3772
3773                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3774                            match builders_or_callback {
3775                                BuildersOrCallback::Builders(_) => {}
3776                                BuildersOrCallback::Callback(_, node_callback) => {
3777                                    node_callback(node, next_stmt_id);
3778                                }
3779                            }
3780
3781                            let idx = if is_true { 0 } else { 1 };
3782                            built_idents[idx].clone()
3783                        } else {
3784                            // The inner node was already processed by transform_bottom_up,
3785                            // so its ident is on the stack
3786                            let inner_ident = ident_stack.pop().unwrap();
3787                            let f_tokens = f.emit_tokens(&mut ident_stack);
3788
3789                            let inner_ident = {
3790                                let inner_borrow = inner.0.borrow();
3791                                maybe_observe_for_mut(
3792                                    f, inner_ident,
3793                                    &inner_borrow.metadata().location_id,
3794                                    &inner_borrow.metadata().collection_kind,
3795                                    &metadata.op,
3796                                    builders_or_callback, next_stmt_id,
3797                                )
3798                            };
3799
3800                            let partition_ident = syn::Ident::new(
3801                                &format!("stream_{}_partition", stmt_id),
3802                                Span::call_site(),
3803                            );
3804                            let true_ident = syn::Ident::new(
3805                                &format!("stream_{}_true", stmt_id),
3806                                Span::call_site(),
3807                            );
3808                            let false_ident = syn::Ident::new(
3809                                &format!("stream_{}_false", stmt_id),
3810                                Span::call_site(),
3811                            );
3812
3813                            built_tees.insert(
3814                                ptr,
3815                                vec![true_ident.clone(), false_ident.clone()],
3816                            );
3817
3818                            let stmt_id = next_stmt_id.get_and_increment();
3819                            match builders_or_callback {
3820                                BuildersOrCallback::Builders(graph_builders) => {
3821                                    let builder = graph_builders.get_dfir_mut(&out_location);
3822                                    builder.add_dfir(
3823                                        parse_quote! {
3824                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3825                                            #true_ident = #partition_ident[0];
3826                                            #false_ident = #partition_ident[1];
3827                                        },
3828                                        None,
3829                                        Some(&stmt_id.to_string()),
3830                                    );
3831                                }
3832                                BuildersOrCallback::Callback(_, node_callback) => {
3833                                    node_callback(node, next_stmt_id);
3834                                }
3835                            }
3836
3837                            if is_true { true_ident } else { false_ident }
3838                        };
3839
3840                        ident_stack.push(ret_ident);
3841                    }
3842
3843                    HydroNode::Chain { .. } => {
3844                        // Children are processed left-to-right, so second is on top
3845                        let second_ident = ident_stack.pop().unwrap();
3846                        let first_ident = ident_stack.pop().unwrap();
3847
3848                        let stmt_id = next_stmt_id.get_and_increment();
3849                        let chain_ident =
3850                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3851
3852                        match builders_or_callback {
3853                            BuildersOrCallback::Builders(graph_builders) => {
3854                                let builder = graph_builders.get_dfir_mut(&out_location);
3855                                builder.add_dfir(
3856                                    parse_quote! {
3857                                        #chain_ident = chain();
3858                                        #first_ident -> [0]#chain_ident;
3859                                        #second_ident -> [1]#chain_ident;
3860                                    },
3861                                    None,
3862                                    Some(&stmt_id.to_string()),
3863                                );
3864                            }
3865                            BuildersOrCallback::Callback(_, node_callback) => {
3866                                node_callback(node, next_stmt_id);
3867                            }
3868                        }
3869
3870                        ident_stack.push(chain_ident);
3871                    }
3872
3873                    HydroNode::MergeOrdered { first, metadata, .. } => {
3874                        let second_ident = ident_stack.pop().unwrap();
3875                        let first_ident = ident_stack.pop().unwrap();
3876
3877                        let stmt_id = next_stmt_id.get_and_increment();
3878                        let merge_ident =
3879                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3880
3881                        match builders_or_callback {
3882                            BuildersOrCallback::Builders(graph_builders) => {
3883                                graph_builders.merge_ordered(
3884                                    &first.metadata().location_id,
3885                                    first_ident,
3886                                    second_ident,
3887                                    &merge_ident,
3888                                    &first.metadata().collection_kind,
3889                                    &metadata.op,
3890                                    Some(&stmt_id.to_string()),
3891                                );
3892                            }
3893                            BuildersOrCallback::Callback(_, node_callback) => {
3894                                node_callback(node, next_stmt_id);
3895                            }
3896                        }
3897
3898                        ident_stack.push(merge_ident);
3899                    }
3900
3901                    HydroNode::ChainFirst { .. } => {
3902                        let second_ident = ident_stack.pop().unwrap();
3903                        let first_ident = ident_stack.pop().unwrap();
3904
3905                        let stmt_id = next_stmt_id.get_and_increment();
3906                        let chain_ident =
3907                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3908
3909                        match builders_or_callback {
3910                            BuildersOrCallback::Builders(graph_builders) => {
3911                                let builder = graph_builders.get_dfir_mut(&out_location);
3912                                builder.add_dfir(
3913                                    parse_quote! {
3914                                        #chain_ident = chain_first_n(1);
3915                                        #first_ident -> [0]#chain_ident;
3916                                        #second_ident -> [1]#chain_ident;
3917                                    },
3918                                    None,
3919                                    Some(&stmt_id.to_string()),
3920                                );
3921                            }
3922                            BuildersOrCallback::Callback(_, node_callback) => {
3923                                node_callback(node, next_stmt_id);
3924                            }
3925                        }
3926
3927                        ident_stack.push(chain_ident);
3928                    }
3929
3930                    HydroNode::CrossSingleton { right, .. } => {
3931                        let right_ident = ident_stack.pop().unwrap();
3932                        let left_ident = ident_stack.pop().unwrap();
3933
3934                        let stmt_id = next_stmt_id.get_and_increment();
3935                        let cross_ident =
3936                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3937
3938                        match builders_or_callback {
3939                            BuildersOrCallback::Builders(graph_builders) => {
3940                                let builder = graph_builders.get_dfir_mut(&out_location);
3941
3942                                if right.metadata().location_id.is_top_level()
3943                                    && right.metadata().collection_kind.is_bounded()
3944                                {
3945                                    builder.add_dfir(
3946                                        parse_quote! {
3947                                            #cross_ident = cross_singleton::<'static>();
3948                                            #left_ident -> [input]#cross_ident;
3949                                            #right_ident -> [single]#cross_ident;
3950                                        },
3951                                        None,
3952                                        Some(&stmt_id.to_string()),
3953                                    );
3954                                } else {
3955                                    builder.add_dfir(
3956                                        parse_quote! {
3957                                            #cross_ident = cross_singleton();
3958                                            #left_ident -> [input]#cross_ident;
3959                                            #right_ident -> [single]#cross_ident;
3960                                        },
3961                                        None,
3962                                        Some(&stmt_id.to_string()),
3963                                    );
3964                                }
3965                            }
3966                            BuildersOrCallback::Callback(_, node_callback) => {
3967                                node_callback(node, next_stmt_id);
3968                            }
3969                        }
3970
3971                        ident_stack.push(cross_ident);
3972                    }
3973
3974                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3975                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3976                            parse_quote!(cross_join_multiset)
3977                        } else {
3978                            parse_quote!(join_multiset)
3979                        };
3980
3981                        let (HydroNode::CrossProduct { left, right, .. }
3982                        | HydroNode::Join { left, right, .. }) = node
3983                        else {
3984                            unreachable!()
3985                        };
3986
3987                        let is_top_level = left.metadata().location_id.is_top_level()
3988                            && right.metadata().location_id.is_top_level();
3989                        let left_lifetime = if left.metadata().location_id.is_top_level() {
3990                            quote!('static)
3991                        } else {
3992                            quote!('tick)
3993                        };
3994
3995                        let right_lifetime = if right.metadata().location_id.is_top_level() {
3996                            quote!('static)
3997                        } else {
3998                            quote!('tick)
3999                        };
4000
4001                        let right_ident = ident_stack.pop().unwrap();
4002                        let left_ident = ident_stack.pop().unwrap();
4003
4004                        let stmt_id = next_stmt_id.get_and_increment();
4005                        let stream_ident =
4006                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4007
4008                        match builders_or_callback {
4009                            BuildersOrCallback::Builders(graph_builders) => {
4010                                let builder = graph_builders.get_dfir_mut(&out_location);
4011                                builder.add_dfir(
4012                                    if is_top_level {
4013                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
4014                                        // a multiset_delta() to negate the replay behavior
4015                                        parse_quote! {
4016                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
4017                                            #left_ident -> [0]#stream_ident;
4018                                            #right_ident -> [1]#stream_ident;
4019                                        }
4020                                    } else {
4021                                        parse_quote! {
4022                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
4023                                            #left_ident -> [0]#stream_ident;
4024                                            #right_ident -> [1]#stream_ident;
4025                                        }
4026                                    }
4027                                    ,
4028                                    None,
4029                                    Some(&stmt_id.to_string()),
4030                                );
4031                            }
4032                            BuildersOrCallback::Callback(_, node_callback) => {
4033                                node_callback(node, next_stmt_id);
4034                            }
4035                        }
4036
4037                        ident_stack.push(stream_ident);
4038                    }
4039
4040                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
4041                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
4042                            parse_quote!(difference)
4043                        } else {
4044                            parse_quote!(anti_join)
4045                        };
4046
4047                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
4048                            node
4049                        else {
4050                            unreachable!()
4051                        };
4052
4053                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
4054                            quote!('static)
4055                        } else {
4056                            quote!('tick)
4057                        };
4058
4059                        let neg_ident = ident_stack.pop().unwrap();
4060                        let pos_ident = ident_stack.pop().unwrap();
4061
4062                        let stmt_id = next_stmt_id.get_and_increment();
4063                        let stream_ident =
4064                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4065
4066                        match builders_or_callback {
4067                            BuildersOrCallback::Builders(graph_builders) => {
4068                                let builder = graph_builders.get_dfir_mut(&out_location);
4069                                builder.add_dfir(
4070                                    parse_quote! {
4071                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
4072                                        #pos_ident -> [pos]#stream_ident;
4073                                        #neg_ident -> [neg]#stream_ident;
4074                                    },
4075                                    None,
4076                                    Some(&stmt_id.to_string()),
4077                                );
4078                            }
4079                            BuildersOrCallback::Callback(_, node_callback) => {
4080                                node_callback(node, next_stmt_id);
4081                            }
4082                        }
4083
4084                        ident_stack.push(stream_ident);
4085                    }
4086
4087                    HydroNode::JoinHalf { .. } => {
4088                        let HydroNode::JoinHalf { right, .. } = node else {
4089                            unreachable!()
4090                        };
4091
4092                        assert!(
4093                            right.metadata().collection_kind.is_bounded(),
4094                            "JoinHalf requires the right (build) side to be Bounded, got {:?}",
4095                            right.metadata().collection_kind
4096                        );
4097
4098                        let build_lifetime = if right.metadata().location_id.is_top_level() {
4099                            quote!('static)
4100                        } else {
4101                            quote!('tick)
4102                        };
4103
4104                        let build_ident = ident_stack.pop().unwrap();
4105                        let probe_ident = ident_stack.pop().unwrap();
4106
4107                        let stmt_id = next_stmt_id.get_and_increment();
4108                        let stream_ident =
4109                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4110
4111                        match builders_or_callback {
4112                            BuildersOrCallback::Builders(graph_builders) => {
4113                                let builder = graph_builders.get_dfir_mut(&out_location);
4114                                builder.add_dfir(
4115                                    parse_quote! {
4116                                        #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
4117                                        #probe_ident -> [probe]#stream_ident;
4118                                        #build_ident -> [build]#stream_ident;
4119                                    },
4120                                    None,
4121                                    Some(&stmt_id.to_string()),
4122                                );
4123                            }
4124                            BuildersOrCallback::Callback(_, node_callback) => {
4125                                node_callback(node, next_stmt_id);
4126                            }
4127                        }
4128
4129                        ident_stack.push(stream_ident);
4130                    }
4131
4132                    HydroNode::ResolveFutures { .. } => {
4133                        let input_ident = ident_stack.pop().unwrap();
4134
4135                        let stmt_id = next_stmt_id.get_and_increment();
4136                        let futures_ident =
4137                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4138
4139                        match builders_or_callback {
4140                            BuildersOrCallback::Builders(graph_builders) => {
4141                                let builder = graph_builders.get_dfir_mut(&out_location);
4142                                builder.add_dfir(
4143                                    parse_quote! {
4144                                        #futures_ident = #input_ident -> resolve_futures();
4145                                    },
4146                                    None,
4147                                    Some(&stmt_id.to_string()),
4148                                );
4149                            }
4150                            BuildersOrCallback::Callback(_, node_callback) => {
4151                                node_callback(node, next_stmt_id);
4152                            }
4153                        }
4154
4155                        ident_stack.push(futures_ident);
4156                    }
4157
4158                    HydroNode::ResolveFuturesBlocking { .. } => {
4159                        let input_ident = ident_stack.pop().unwrap();
4160
4161                        let stmt_id = next_stmt_id.get_and_increment();
4162                        let futures_ident =
4163                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4164
4165                        match builders_or_callback {
4166                            BuildersOrCallback::Builders(graph_builders) => {
4167                                let builder = graph_builders.get_dfir_mut(&out_location);
4168                                builder.add_dfir(
4169                                    parse_quote! {
4170                                        #futures_ident = #input_ident -> resolve_futures_blocking();
4171                                    },
4172                                    None,
4173                                    Some(&stmt_id.to_string()),
4174                                );
4175                            }
4176                            BuildersOrCallback::Callback(_, node_callback) => {
4177                                node_callback(node, next_stmt_id);
4178                            }
4179                        }
4180
4181                        ident_stack.push(futures_ident);
4182                    }
4183
4184                    HydroNode::ResolveFuturesOrdered { .. } => {
4185                        let input_ident = ident_stack.pop().unwrap();
4186
4187                        let stmt_id = next_stmt_id.get_and_increment();
4188                        let futures_ident =
4189                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4190
4191                        match builders_or_callback {
4192                            BuildersOrCallback::Builders(graph_builders) => {
4193                                let builder = graph_builders.get_dfir_mut(&out_location);
4194                                builder.add_dfir(
4195                                    parse_quote! {
4196                                        #futures_ident = #input_ident -> resolve_futures_ordered();
4197                                    },
4198                                    None,
4199                                    Some(&stmt_id.to_string()),
4200                                );
4201                            }
4202                            BuildersOrCallback::Callback(_, node_callback) => {
4203                                node_callback(node, next_stmt_id);
4204                            }
4205                        }
4206
4207                        ident_stack.push(futures_ident);
4208                    }
4209
4210                    HydroNode::Map {
4211                        f,
4212                        input,
4213                        metadata,
4214                    } => {
4215                        // Pop input ident (pushed last by transform_children).
4216                        let input_ident = ident_stack.pop().unwrap();
4217                        let f_tokens = f.emit_tokens(&mut ident_stack);
4218
4219                        let input_ident = maybe_observe_for_mut(
4220                            f,
4221                            input_ident,
4222                            &input.metadata().location_id,
4223                            &input.metadata().collection_kind,
4224                            &metadata.op,
4225                            builders_or_callback,
4226                            next_stmt_id,
4227                        );
4228
4229                        let stmt_id = next_stmt_id.get_and_increment();
4230                        let map_ident =
4231                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4232
4233                        match builders_or_callback {
4234                            BuildersOrCallback::Builders(graph_builders) => {
4235                                let builder = graph_builders.get_dfir_mut(&out_location);
4236                                builder.add_dfir(
4237                                    parse_quote! {
4238                                        #map_ident = #input_ident -> map(#f_tokens);
4239                                    },
4240                                    None,
4241                                    Some(&stmt_id.to_string()),
4242                                );
4243                            }
4244                            BuildersOrCallback::Callback(_, node_callback) => {
4245                                node_callback(node, next_stmt_id);
4246                            }
4247                        }
4248
4249                        ident_stack.push(map_ident);
4250                    }
4251
4252                    HydroNode::FlatMap { f, input, metadata } => {
4253                        let input_ident = ident_stack.pop().unwrap();
4254                        let f_tokens = f.emit_tokens(&mut ident_stack);
4255
4256                        let input_ident = maybe_observe_for_mut(
4257                            f, input_ident,
4258                            &input.metadata().location_id,
4259                            &input.metadata().collection_kind,
4260                            &metadata.op,
4261                            builders_or_callback, next_stmt_id,
4262                        );
4263
4264                        let stmt_id = next_stmt_id.get_and_increment();
4265                        let flat_map_ident =
4266                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4267
4268                        match builders_or_callback {
4269                            BuildersOrCallback::Builders(graph_builders) => {
4270                                let builder = graph_builders.get_dfir_mut(&out_location);
4271                                builder.add_dfir(
4272                                    parse_quote! {
4273                                        #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4274                                    },
4275                                    None,
4276                                    Some(&stmt_id.to_string()),
4277                                );
4278                            }
4279                            BuildersOrCallback::Callback(_, node_callback) => {
4280                                node_callback(node, next_stmt_id);
4281                            }
4282                        }
4283
4284                        ident_stack.push(flat_map_ident);
4285                    }
4286
4287                    HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
4288                        let input_ident = ident_stack.pop().unwrap();
4289                        let f_tokens = f.emit_tokens(&mut ident_stack);
4290
4291                        let input_ident = maybe_observe_for_mut(
4292                            f, input_ident,
4293                            &input.metadata().location_id,
4294                            &input.metadata().collection_kind,
4295                            &metadata.op,
4296                            builders_or_callback, next_stmt_id,
4297                        );
4298
4299                        let stmt_id = next_stmt_id.get_and_increment();
4300                        let flat_map_stream_blocking_ident =
4301                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4302
4303                        match builders_or_callback {
4304                            BuildersOrCallback::Builders(graph_builders) => {
4305                                let builder = graph_builders.get_dfir_mut(&out_location);
4306                                builder.add_dfir(
4307                                    parse_quote! {
4308                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4309                                    },
4310                                    None,
4311                                    Some(&stmt_id.to_string()),
4312                                );
4313                            }
4314                            BuildersOrCallback::Callback(_, node_callback) => {
4315                                node_callback(node, next_stmt_id);
4316                            }
4317                        }
4318
4319                        ident_stack.push(flat_map_stream_blocking_ident);
4320                    }
4321
4322                    HydroNode::Filter { f, input, metadata } => {
4323                        let input_ident = ident_stack.pop().unwrap();
4324                        let f_tokens = f.emit_tokens(&mut ident_stack);
4325
4326                        let input_ident = maybe_observe_for_mut(
4327                            f, input_ident,
4328                            &input.metadata().location_id,
4329                            &input.metadata().collection_kind,
4330                            &metadata.op,
4331                            builders_or_callback, next_stmt_id,
4332                        );
4333
4334                        let stmt_id = next_stmt_id.get_and_increment();
4335                        let filter_ident =
4336                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4337
4338                        match builders_or_callback {
4339                            BuildersOrCallback::Builders(graph_builders) => {
4340                                let builder = graph_builders.get_dfir_mut(&out_location);
4341                                builder.add_dfir(
4342                                    parse_quote! {
4343                                        #filter_ident = #input_ident -> filter(#f_tokens);
4344                                    },
4345                                    None,
4346                                    Some(&stmt_id.to_string()),
4347                                );
4348                            }
4349                            BuildersOrCallback::Callback(_, node_callback) => {
4350                                node_callback(node, next_stmt_id);
4351                            }
4352                        }
4353
4354                        ident_stack.push(filter_ident);
4355                    }
4356
4357                    HydroNode::FilterMap { f, input, metadata } => {
4358                        let input_ident = ident_stack.pop().unwrap();
4359                        let f_tokens = f.emit_tokens(&mut ident_stack);
4360
4361                        let input_ident = maybe_observe_for_mut(
4362                            f, input_ident,
4363                            &input.metadata().location_id,
4364                            &input.metadata().collection_kind,
4365                            &metadata.op,
4366                            builders_or_callback, next_stmt_id,
4367                        );
4368
4369                        let stmt_id = next_stmt_id.get_and_increment();
4370                        let filter_map_ident =
4371                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4372
4373                        match builders_or_callback {
4374                            BuildersOrCallback::Builders(graph_builders) => {
4375                                let builder = graph_builders.get_dfir_mut(&out_location);
4376                                builder.add_dfir(
4377                                    parse_quote! {
4378                                        #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4379                                    },
4380                                    None,
4381                                    Some(&stmt_id.to_string()),
4382                                );
4383                            }
4384                            BuildersOrCallback::Callback(_, node_callback) => {
4385                                node_callback(node, next_stmt_id);
4386                            }
4387                        }
4388
4389                        ident_stack.push(filter_map_ident);
4390                    }
4391
4392                    HydroNode::Sort { .. } => {
4393                        let input_ident = ident_stack.pop().unwrap();
4394
4395                        let stmt_id = next_stmt_id.get_and_increment();
4396                        let sort_ident =
4397                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4398
4399                        match builders_or_callback {
4400                            BuildersOrCallback::Builders(graph_builders) => {
4401                                let builder = graph_builders.get_dfir_mut(&out_location);
4402                                builder.add_dfir(
4403                                    parse_quote! {
4404                                        #sort_ident = #input_ident -> sort();
4405                                    },
4406                                    None,
4407                                    Some(&stmt_id.to_string()),
4408                                );
4409                            }
4410                            BuildersOrCallback::Callback(_, node_callback) => {
4411                                node_callback(node, next_stmt_id);
4412                            }
4413                        }
4414
4415                        ident_stack.push(sort_ident);
4416                    }
4417
4418                    HydroNode::DeferTick { .. } => {
4419                        let input_ident = ident_stack.pop().unwrap();
4420
4421                        let stmt_id = next_stmt_id.get_and_increment();
4422                        let defer_tick_ident =
4423                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4424
4425                        match builders_or_callback {
4426                            BuildersOrCallback::Builders(graph_builders) => {
4427                                let builder = graph_builders.get_dfir_mut(&out_location);
4428                                builder.add_dfir(
4429                                    parse_quote! {
4430                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
4431                                    },
4432                                    None,
4433                                    Some(&stmt_id.to_string()),
4434                                );
4435                            }
4436                            BuildersOrCallback::Callback(_, node_callback) => {
4437                                node_callback(node, next_stmt_id);
4438                            }
4439                        }
4440
4441                        ident_stack.push(defer_tick_ident);
4442                    }
4443
4444                    HydroNode::Enumerate { input, .. } => {
4445                        let input_ident = ident_stack.pop().unwrap();
4446
4447                        let stmt_id = next_stmt_id.get_and_increment();
4448                        let enumerate_ident =
4449                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4450
4451                        match builders_or_callback {
4452                            BuildersOrCallback::Builders(graph_builders) => {
4453                                let builder = graph_builders.get_dfir_mut(&out_location);
4454                                let lifetime = if input.metadata().location_id.is_top_level() {
4455                                    quote!('static)
4456                                } else {
4457                                    quote!('tick)
4458                                };
4459                                builder.add_dfir(
4460                                    parse_quote! {
4461                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4462                                    },
4463                                    None,
4464                                    Some(&stmt_id.to_string()),
4465                                );
4466                            }
4467                            BuildersOrCallback::Callback(_, node_callback) => {
4468                                node_callback(node, next_stmt_id);
4469                            }
4470                        }
4471
4472                        ident_stack.push(enumerate_ident);
4473                    }
4474
4475                    HydroNode::Inspect { f, input, metadata } => {
4476                        let input_ident = ident_stack.pop().unwrap();
4477                        let f_tokens = f.emit_tokens(&mut ident_stack);
4478
4479                        let input_ident = maybe_observe_for_mut(
4480                            f, input_ident,
4481                            &input.metadata().location_id,
4482                            &input.metadata().collection_kind,
4483                            &metadata.op,
4484                            builders_or_callback, next_stmt_id,
4485                        );
4486
4487                        let stmt_id = next_stmt_id.get_and_increment();
4488                        let inspect_ident =
4489                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4490
4491                        match builders_or_callback {
4492                            BuildersOrCallback::Builders(graph_builders) => {
4493                                let builder = graph_builders.get_dfir_mut(&out_location);
4494                                builder.add_dfir(
4495                                    parse_quote! {
4496                                        #inspect_ident = #input_ident -> inspect(#f_tokens);
4497                                    },
4498                                    None,
4499                                    Some(&stmt_id.to_string()),
4500                                );
4501                            }
4502                            BuildersOrCallback::Callback(_, node_callback) => {
4503                                node_callback(node, next_stmt_id);
4504                            }
4505                        }
4506
4507                        ident_stack.push(inspect_ident);
4508                    }
4509
4510                    HydroNode::Unique { input, .. } => {
4511                        let input_ident = ident_stack.pop().unwrap();
4512
4513                        let stmt_id = next_stmt_id.get_and_increment();
4514                        let unique_ident =
4515                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4516
4517                        match builders_or_callback {
4518                            BuildersOrCallback::Builders(graph_builders) => {
4519                                let builder = graph_builders.get_dfir_mut(&out_location);
4520                                let lifetime = if input.metadata().location_id.is_top_level() {
4521                                    quote!('static)
4522                                } else {
4523                                    quote!('tick)
4524                                };
4525
4526                                builder.add_dfir(
4527                                    parse_quote! {
4528                                        #unique_ident = #input_ident -> unique::<#lifetime>();
4529                                    },
4530                                    None,
4531                                    Some(&stmt_id.to_string()),
4532                                );
4533                            }
4534                            BuildersOrCallback::Callback(_, node_callback) => {
4535                                node_callback(node, next_stmt_id);
4536                            }
4537                        }
4538
4539                        ident_stack.push(unique_ident);
4540                    }
4541
4542                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4543                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4544                            if input.metadata().location_id.is_top_level()
4545                                && input.metadata().collection_kind.is_bounded()
4546                            {
4547                                parse_quote!(fold_no_replay)
4548                            } else {
4549                                parse_quote!(fold)
4550                            }
4551                        } else if matches!(node, HydroNode::Scan { .. }) {
4552                            parse_quote!(scan)
4553                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4554                            parse_quote!(scan_async_blocking)
4555                        } else if let HydroNode::FoldKeyed { input, .. } = node {
4556                            if input.metadata().location_id.is_top_level()
4557                                && input.metadata().collection_kind.is_bounded()
4558                            {
4559                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
4560                            } else {
4561                                parse_quote!(fold_keyed)
4562                            }
4563                        } else {
4564                            unreachable!()
4565                        };
4566
4567                        let (HydroNode::Fold { input, .. }
4568                        | HydroNode::FoldKeyed { input, .. }
4569                        | HydroNode::Scan { input, .. }
4570                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
4571                        else {
4572                            unreachable!()
4573                        };
4574
4575                        let lifetime = if input.metadata().location_id.is_top_level() {
4576                            quote!('static)
4577                        } else {
4578                            quote!('tick)
4579                        };
4580
4581                        let input_ident = ident_stack.pop().unwrap();
4582
4583                        let (HydroNode::Fold { init, acc, .. }
4584                        | HydroNode::FoldKeyed { init, acc, .. }
4585                        | HydroNode::Scan { init, acc, .. }
4586                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4587                        else {
4588                            unreachable!()
4589                        };
4590
4591                        let acc_tokens = acc.emit_tokens(&mut ident_stack);
4592                        let init_tokens = init.emit_tokens(&mut ident_stack);
4593
4594                        let stmt_id = next_stmt_id.get_and_increment();
4595                        let fold_ident =
4596                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4597
4598                        match builders_or_callback {
4599                            BuildersOrCallback::Builders(graph_builders) => {
4600                                if matches!(node, HydroNode::Fold { .. })
4601                                    && node.metadata().location_id.is_top_level()
4602                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4603                                    && graph_builders.singleton_intermediates()
4604                                    && !node.metadata().collection_kind.is_bounded()
4605                                {
4606                                    let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4607                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4608                                        &input.metadata().location_id,
4609                                        &input_ident,
4610                                        &input.metadata().collection_kind,
4611                                        &node.metadata().op,
4612                                    );
4613
4614                                    let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4615                                        let acc: syn::Expr = parse_quote!({
4616                                            let mut __inner = #acc_tokens;
4617                                            move |__state, __batch: Vec<_>| {
4618                                                if __batch.is_empty() {
4619                                                    return None;
4620                                                }
4621                                                for __value in __batch {
4622                                                    __inner(__state, __value);
4623                                                }
4624                                                Some(__state.clone())
4625                                            }
4626                                        });
4627                                        (hooked, acc)
4628                                    } else {
4629                                        let acc: syn::Expr = parse_quote!({
4630                                            let mut __inner = #acc_tokens;
4631                                            move |__state, __value| {
4632                                                __inner(__state, __value);
4633                                                Some(__state.clone())
4634                                            }
4635                                        });
4636                                        (&input_ident, acc)
4637                                    };
4638
4639                                    let builder = graph_builders.get_dfir_mut(&out_location);
4640                                    builder.add_dfir(
4641                                        parse_quote! {
4642                                            source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4643                                            #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4644                                            #fold_ident = chain();
4645                                        },
4646                                        None,
4647                                        Some(&stmt_id.to_string()),
4648                                    );
4649
4650                                    if hooked_input_ident.is_some() {
4651                                        fold_hooked_idents.insert(fold_ident.to_string());
4652                                    }
4653                                } else if matches!(node, HydroNode::FoldKeyed { .. })
4654                                    && node.metadata().location_id.is_top_level()
4655                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4656                                    && graph_builders.singleton_intermediates()
4657                                    && !node.metadata().collection_kind.is_bounded()
4658                                {
4659                                    let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4660                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4661                                        &input.metadata().location_id,
4662                                        &input_ident,
4663                                        &input.metadata().collection_kind,
4664                                        &node.metadata().op,
4665                                    );
4666                                    let builder = graph_builders.get_dfir_mut(&out_location);
4667
4668                                    let wrapped_acc: syn::Expr = parse_quote!({
4669                                        let mut __init = #init_tokens;
4670                                        let mut __inner = #acc_tokens;
4671                                        move |__state, __kv: (_, _)| {
4672                                            // TODO(shadaj): we can avoid the clone when the entry exists
4673                                            let __state = __state
4674                                                .entry(::std::clone::Clone::clone(&__kv.0))
4675                                                .or_insert_with(|| (__init)());
4676                                            __inner(__state, __kv.1);
4677                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4678                                        }
4679                                    });
4680
4681                                    if let Some(hooked_input_ident) = hooked_input_ident {
4682                                        builder.add_dfir(
4683                                            parse_quote! {
4684                                                #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4685                                            },
4686                                            None,
4687                                            Some(&stmt_id.to_string()),
4688                                        );
4689
4690                                        fold_hooked_idents.insert(fold_ident.to_string());
4691                                    } else {
4692                                        builder.add_dfir(
4693                                            parse_quote! {
4694                                                #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4695                                            },
4696                                            None,
4697                                            Some(&stmt_id.to_string()),
4698                                        );
4699                                    }
4700                                } else if (matches!(node, HydroNode::Fold { .. })
4701                                    || matches!(node, HydroNode::FoldKeyed { .. }))
4702                                    && !node.metadata().location_id.is_top_level()
4703                                    && graph_builders.singleton_intermediates()
4704                                {
4705                                    let input_ref = match &*node {
4706                                        HydroNode::Fold { input, .. } => input,
4707                                        HydroNode::FoldKeyed { input, .. } => input,
4708                                        _ => unreachable!(),
4709                                    };
4710                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4711                                        &input_ref.metadata().location_id,
4712                                        &input_ident,
4713                                        &input_ref.metadata().collection_kind,
4714                                        &node.metadata().op,
4715                                    );
4716
4717                                    let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4718                                    let builder = graph_builders.get_dfir_mut(&out_location);
4719                                    builder.add_dfir(
4720                                        parse_quote! {
4721                                            #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4722                                        },
4723                                        None,
4724                                        Some(&stmt_id.to_string()),
4725                                    );
4726                                } else {
4727                                    let builder = graph_builders.get_dfir_mut(&out_location);
4728                                    builder.add_dfir(
4729                                        parse_quote! {
4730                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4731                                        },
4732                                        None,
4733                                        Some(&stmt_id.to_string()),
4734                                    );
4735                                }
4736                            }
4737                            BuildersOrCallback::Callback(_, node_callback) => {
4738                                node_callback(node, next_stmt_id);
4739                            }
4740                        }
4741
4742                        ident_stack.push(fold_ident);
4743                    }
4744
4745                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4746                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4747                            if input.metadata().location_id.is_top_level()
4748                                && input.metadata().collection_kind.is_bounded()
4749                            {
4750                                parse_quote!(reduce_no_replay)
4751                            } else {
4752                                parse_quote!(reduce)
4753                            }
4754                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
4755                            if input.metadata().location_id.is_top_level()
4756                                && input.metadata().collection_kind.is_bounded()
4757                            {
4758                                todo!(
4759                                    "Calling keyed reduce on a top-level bounded collection is not supported"
4760                                )
4761                            } else {
4762                                parse_quote!(reduce_keyed)
4763                            }
4764                        } else {
4765                            unreachable!()
4766                        };
4767
4768                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4769                        else {
4770                            unreachable!()
4771                        };
4772
4773                        let lifetime = if input.metadata().location_id.is_top_level() {
4774                            quote!('static)
4775                        } else {
4776                            quote!('tick)
4777                        };
4778
4779                        let input_ident = ident_stack.pop().unwrap();
4780
4781                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4782                        else {
4783                            unreachable!()
4784                        };
4785
4786                        let f_tokens = f.emit_tokens(&mut ident_stack);
4787
4788                        let stmt_id = next_stmt_id.get_and_increment();
4789                        let reduce_ident =
4790                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4791
4792                        match builders_or_callback {
4793                            BuildersOrCallback::Builders(graph_builders) => {
4794                                if matches!(node, HydroNode::Reduce { .. })
4795                                    && node.metadata().location_id.is_top_level()
4796                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4797                                    && graph_builders.singleton_intermediates()
4798                                    && !node.metadata().collection_kind.is_bounded()
4799                                {
4800                                    todo!(
4801                                        "Reduce with optional intermediates is not yet supported in simulator"
4802                                    );
4803                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
4804                                    && node.metadata().location_id.is_top_level()
4805                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4806                                    && graph_builders.singleton_intermediates()
4807                                    && !node.metadata().collection_kind.is_bounded()
4808                                {
4809                                    todo!(
4810                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
4811                                    );
4812                                } else {
4813                                    let builder = graph_builders.get_dfir_mut(&out_location);
4814                                    builder.add_dfir(
4815                                        parse_quote! {
4816                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4817                                        },
4818                                        None,
4819                                        Some(&stmt_id.to_string()),
4820                                    );
4821                                }
4822                            }
4823                            BuildersOrCallback::Callback(_, node_callback) => {
4824                                node_callback(node, next_stmt_id);
4825                            }
4826                        }
4827
4828                        ident_stack.push(reduce_ident);
4829                    }
4830
4831                    HydroNode::ReduceKeyedWatermark {
4832                        f,
4833                        input,
4834                        metadata,
4835                        ..
4836                    } => {
4837                        let lifetime = if input.metadata().location_id.is_top_level() {
4838                            quote!('static)
4839                        } else {
4840                            quote!('tick)
4841                        };
4842
4843                        // watermark is processed second, so it's on top
4844                        let watermark_ident = ident_stack.pop().unwrap();
4845                        let input_ident = ident_stack.pop().unwrap();
4846                        let f_tokens = f.emit_tokens(&mut ident_stack);
4847
4848                        let stmt_id = next_stmt_id.get_and_increment();
4849                        let chain_ident = syn::Ident::new(
4850                            &format!("reduce_keyed_watermark_chain_{}", stmt_id),
4851                            Span::call_site(),
4852                        );
4853
4854                        let fold_ident =
4855                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4856
4857                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4858                            && input.metadata().collection_kind.is_bounded()
4859                        {
4860                            parse_quote!(fold_no_replay)
4861                        } else {
4862                            parse_quote!(fold)
4863                        };
4864
4865                        match builders_or_callback {
4866                            BuildersOrCallback::Builders(graph_builders) => {
4867                                if metadata.location_id.is_top_level()
4868                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4869                                    && graph_builders.singleton_intermediates()
4870                                    && !metadata.collection_kind.is_bounded()
4871                                {
4872                                    todo!(
4873                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4874                                    )
4875                                } else {
4876                                    let builder = graph_builders.get_dfir_mut(&out_location);
4877                                    builder.add_dfir(
4878                                        parse_quote! {
4879                                            #chain_ident = chain();
4880                                            #input_ident
4881                                                -> map(|x| (Some(x), None))
4882                                                -> [0]#chain_ident;
4883                                            #watermark_ident
4884                                                -> map(|watermark| (None, Some(watermark)))
4885                                                -> [1]#chain_ident;
4886
4887                                            #fold_ident = #chain_ident
4888                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4889                                                    let __reduce_keyed_fn = #f_tokens;
4890                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4891                                                        if let Some((k, v)) = opt_payload {
4892                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4893                                                                if k < curr_watermark {
4894                                                                    return;
4895                                                                }
4896                                                            }
4897                                                            match map.entry(k) {
4898                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
4899                                                                    e.insert(v);
4900                                                                }
4901                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
4902                                                                    __reduce_keyed_fn(e.get_mut(), v);
4903                                                                }
4904                                                            }
4905                                                        } else {
4906                                                            let watermark = opt_watermark.unwrap();
4907                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4908                                                                if watermark <= curr_watermark {
4909                                                                    return;
4910                                                                }
4911                                                            }
4912                                                            map.retain(|k, _| *k >= watermark);
4913                                                            *opt_curr_watermark = Some(watermark);
4914                                                        }
4915                                                    }
4916                                                })
4917                                                -> flat_map(|(map, _curr_watermark)| map);
4918                                        },
4919                                        None,
4920                                        Some(&stmt_id.to_string()),
4921                                    );
4922                                }
4923                            }
4924                            BuildersOrCallback::Callback(_, node_callback) => {
4925                                node_callback(node, next_stmt_id);
4926                            }
4927                        }
4928
4929                        ident_stack.push(fold_ident);
4930                    }
4931
4932                    HydroNode::Network {
4933                        networking_info,
4934                        serialize_fn: serialize_pipeline,
4935                        instantiate_fn,
4936                        deserialize_fn: deserialize_pipeline,
4937                        input,
4938                        ..
4939                    } => {
4940                        let input_ident = ident_stack.pop().unwrap();
4941
4942                        let stmt_id = next_stmt_id.get_and_increment();
4943                        let receiver_stream_ident =
4944                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4945
4946                        match builders_or_callback {
4947                            BuildersOrCallback::Builders(graph_builders) => {
4948                                let (sink_expr, source_expr) = match instantiate_fn {
4949                                    DebugInstantiate::Building => (
4950                                        syn::parse_quote!(DUMMY_SINK),
4951                                        syn::parse_quote!(DUMMY_SOURCE),
4952                                    ),
4953
4954                                    DebugInstantiate::Finalized(finalized) => {
4955                                        (finalized.sink.clone(), finalized.source.clone())
4956                                    }
4957                                };
4958
4959                                graph_builders.create_network(
4960                                    &input.metadata().location_id,
4961                                    &out_location,
4962                                    input_ident,
4963                                    &receiver_stream_ident,
4964                                    serialize_pipeline.as_ref(),
4965                                    sink_expr,
4966                                    source_expr,
4967                                    deserialize_pipeline.as_ref(),
4968                                    stmt_id,
4969                                    networking_info,
4970                                );
4971                            }
4972                            BuildersOrCallback::Callback(_, node_callback) => {
4973                                node_callback(node, next_stmt_id);
4974                            }
4975                        }
4976
4977                        ident_stack.push(receiver_stream_ident);
4978                    }
4979
4980                    HydroNode::ExternalInput {
4981                        instantiate_fn,
4982                        deserialize_fn: deserialize_pipeline,
4983                        ..
4984                    } => {
4985                        let stmt_id = next_stmt_id.get_and_increment();
4986                        let receiver_stream_ident =
4987                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4988
4989                        match builders_or_callback {
4990                            BuildersOrCallback::Builders(graph_builders) => {
4991                                let (_, source_expr) = match instantiate_fn {
4992                                    DebugInstantiate::Building => (
4993                                        syn::parse_quote!(DUMMY_SINK),
4994                                        syn::parse_quote!(DUMMY_SOURCE),
4995                                    ),
4996
4997                                    DebugInstantiate::Finalized(finalized) => {
4998                                        (finalized.sink.clone(), finalized.source.clone())
4999                                    }
5000                                };
5001
5002                                graph_builders.create_external_source(
5003                                    &out_location,
5004                                    source_expr,
5005                                    &receiver_stream_ident,
5006                                    deserialize_pipeline.as_ref(),
5007                                    stmt_id,
5008                                );
5009                            }
5010                            BuildersOrCallback::Callback(_, node_callback) => {
5011                                node_callback(node, next_stmt_id);
5012                            }
5013                        }
5014
5015                        ident_stack.push(receiver_stream_ident);
5016                    }
5017
5018                    HydroNode::Counter {
5019                        tag,
5020                        duration,
5021                        prefix,
5022                        ..
5023                    } => {
5024                        let input_ident = ident_stack.pop().unwrap();
5025
5026                        let stmt_id = next_stmt_id.get_and_increment();
5027                        let counter_ident =
5028                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5029
5030                        match builders_or_callback {
5031                            BuildersOrCallback::Builders(graph_builders) => {
5032                                let arg = format!("{}({})", prefix, tag);
5033                                let builder = graph_builders.get_dfir_mut(&out_location);
5034                                builder.add_dfir(
5035                                    parse_quote! {
5036                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
5037                                    },
5038                                    None,
5039                                    Some(&stmt_id.to_string()),
5040                                );
5041                            }
5042                            BuildersOrCallback::Callback(_, node_callback) => {
5043                                node_callback(node, next_stmt_id);
5044                            }
5045                        }
5046
5047                        ident_stack.push(counter_ident);
5048                    }
5049                }
5050            },
5051            seen_tees,
5052            false,
5053        );
5054
5055        let ret = ident_stack
5056            .pop()
5057            .expect("ident_stack should have exactly one element after traversal");
5058        assert!(
5059            ident_stack.is_empty(),
5060            "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
5061             This indicates a bug in the code gen: some node pushed idents that were never consumed.",
5062            ident_stack.len()
5063        );
5064        ret
5065    }
5066
5067    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
5068        match self {
5069            HydroNode::Placeholder => {
5070                panic!()
5071            }
5072            HydroNode::Cast { .. }
5073            | HydroNode::ObserveNonDet { .. }
5074            | HydroNode::UnboundSingleton { .. }
5075            | HydroNode::AssertIsConsistent { .. } => {}
5076            HydroNode::Source { source, .. } => match source {
5077                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
5078                HydroSource::ExternalNetwork()
5079                | HydroSource::Spin()
5080                | HydroSource::ClusterMembers(_, _)
5081                | HydroSource::Embedded(_)
5082                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
5083            },
5084            HydroNode::SingletonSource { value, .. } => {
5085                transform(value);
5086            }
5087            HydroNode::CycleSource { .. }
5088            | HydroNode::Tee { .. }
5089            | HydroNode::Reference { .. }
5090            | HydroNode::YieldConcat { .. }
5091            | HydroNode::BeginAtomic { .. }
5092            | HydroNode::EndAtomic { .. }
5093            | HydroNode::Batch { .. }
5094            | HydroNode::Chain { .. }
5095            | HydroNode::MergeOrdered { .. }
5096            | HydroNode::ChainFirst { .. }
5097            | HydroNode::CrossProduct { .. }
5098            | HydroNode::CrossSingleton { .. }
5099            | HydroNode::ResolveFutures { .. }
5100            | HydroNode::ResolveFuturesBlocking { .. }
5101            | HydroNode::ResolveFuturesOrdered { .. }
5102            | HydroNode::Join { .. }
5103            | HydroNode::JoinHalf { .. }
5104            | HydroNode::Difference { .. }
5105            | HydroNode::AntiJoin { .. }
5106            | HydroNode::DeferTick { .. }
5107            | HydroNode::Enumerate { .. }
5108            | HydroNode::Unique { .. }
5109            | HydroNode::Sort { .. } => {}
5110            HydroNode::Map { f, .. }
5111            | HydroNode::FlatMap { f, .. }
5112            | HydroNode::FlatMapStreamBlocking { f, .. }
5113            | HydroNode::Filter { f, .. }
5114            | HydroNode::FilterMap { f, .. }
5115            | HydroNode::Inspect { f, .. }
5116            | HydroNode::Partition { f, .. }
5117            | HydroNode::Reduce { f, .. }
5118            | HydroNode::ReduceKeyed { f, .. }
5119            | HydroNode::ReduceKeyedWatermark { f, .. } => {
5120                transform(&mut f.expr);
5121            }
5122            HydroNode::Fold { init, acc, .. }
5123            | HydroNode::Scan { init, acc, .. }
5124            | HydroNode::ScanAsyncBlocking { init, acc, .. }
5125            | HydroNode::FoldKeyed { init, acc, .. } => {
5126                transform(&mut init.expr);
5127                transform(&mut acc.expr);
5128            }
5129            HydroNode::Network {
5130                serialize_fn,
5131                deserialize_fn,
5132                ..
5133            } => {
5134                if let Some(serialize_fn) = serialize_fn {
5135                    transform(serialize_fn);
5136                }
5137                if let Some(deserialize_fn) = deserialize_fn {
5138                    transform(deserialize_fn);
5139                }
5140            }
5141            HydroNode::ExternalInput { deserialize_fn, .. } => {
5142                if let Some(deserialize_fn) = deserialize_fn {
5143                    transform(deserialize_fn);
5144                }
5145            }
5146            HydroNode::Counter { duration, .. } => {
5147                transform(duration);
5148            }
5149        }
5150    }
5151
5152    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
5153        &self.metadata().op
5154    }
5155
5156    pub fn metadata(&self) -> &HydroIrMetadata {
5157        match self {
5158            HydroNode::Placeholder => {
5159                panic!()
5160            }
5161            HydroNode::Cast { metadata, .. }
5162            | HydroNode::ObserveNonDet { metadata, .. }
5163            | HydroNode::AssertIsConsistent { metadata, .. }
5164            | HydroNode::UnboundSingleton { metadata, .. }
5165            | HydroNode::Source { metadata, .. }
5166            | HydroNode::SingletonSource { metadata, .. }
5167            | HydroNode::CycleSource { metadata, .. }
5168            | HydroNode::Tee { metadata, .. }
5169            | HydroNode::Reference { metadata, .. }
5170            | HydroNode::Partition { metadata, .. }
5171            | HydroNode::YieldConcat { metadata, .. }
5172            | HydroNode::BeginAtomic { metadata, .. }
5173            | HydroNode::EndAtomic { metadata, .. }
5174            | HydroNode::Batch { metadata, .. }
5175            | HydroNode::Chain { metadata, .. }
5176            | HydroNode::MergeOrdered { metadata, .. }
5177            | HydroNode::ChainFirst { metadata, .. }
5178            | HydroNode::CrossProduct { metadata, .. }
5179            | HydroNode::CrossSingleton { metadata, .. }
5180            | HydroNode::Join { metadata, .. }
5181            | HydroNode::JoinHalf { metadata, .. }
5182            | HydroNode::Difference { metadata, .. }
5183            | HydroNode::AntiJoin { metadata, .. }
5184            | HydroNode::ResolveFutures { metadata, .. }
5185            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5186            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5187            | HydroNode::Map { metadata, .. }
5188            | HydroNode::FlatMap { metadata, .. }
5189            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5190            | HydroNode::Filter { metadata, .. }
5191            | HydroNode::FilterMap { metadata, .. }
5192            | HydroNode::DeferTick { metadata, .. }
5193            | HydroNode::Enumerate { metadata, .. }
5194            | HydroNode::Inspect { metadata, .. }
5195            | HydroNode::Unique { metadata, .. }
5196            | HydroNode::Sort { metadata, .. }
5197            | HydroNode::Scan { metadata, .. }
5198            | HydroNode::ScanAsyncBlocking { metadata, .. }
5199            | HydroNode::Fold { metadata, .. }
5200            | HydroNode::FoldKeyed { metadata, .. }
5201            | HydroNode::Reduce { metadata, .. }
5202            | HydroNode::ReduceKeyed { metadata, .. }
5203            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5204            | HydroNode::ExternalInput { metadata, .. }
5205            | HydroNode::Network { metadata, .. }
5206            | HydroNode::Counter { metadata, .. } => metadata,
5207        }
5208    }
5209
5210    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5211        &mut self.metadata_mut().op
5212    }
5213
5214    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5215        match self {
5216            HydroNode::Placeholder => {
5217                panic!()
5218            }
5219            HydroNode::Cast { metadata, .. }
5220            | HydroNode::ObserveNonDet { metadata, .. }
5221            | HydroNode::AssertIsConsistent { metadata, .. }
5222            | HydroNode::UnboundSingleton { metadata, .. }
5223            | HydroNode::Source { metadata, .. }
5224            | HydroNode::SingletonSource { metadata, .. }
5225            | HydroNode::CycleSource { metadata, .. }
5226            | HydroNode::Tee { metadata, .. }
5227            | HydroNode::Reference { metadata, .. }
5228            | HydroNode::Partition { metadata, .. }
5229            | HydroNode::YieldConcat { metadata, .. }
5230            | HydroNode::BeginAtomic { metadata, .. }
5231            | HydroNode::EndAtomic { metadata, .. }
5232            | HydroNode::Batch { metadata, .. }
5233            | HydroNode::Chain { metadata, .. }
5234            | HydroNode::MergeOrdered { metadata, .. }
5235            | HydroNode::ChainFirst { metadata, .. }
5236            | HydroNode::CrossProduct { metadata, .. }
5237            | HydroNode::CrossSingleton { metadata, .. }
5238            | HydroNode::Join { metadata, .. }
5239            | HydroNode::JoinHalf { metadata, .. }
5240            | HydroNode::Difference { metadata, .. }
5241            | HydroNode::AntiJoin { metadata, .. }
5242            | HydroNode::ResolveFutures { metadata, .. }
5243            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5244            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5245            | HydroNode::Map { metadata, .. }
5246            | HydroNode::FlatMap { metadata, .. }
5247            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5248            | HydroNode::Filter { metadata, .. }
5249            | HydroNode::FilterMap { metadata, .. }
5250            | HydroNode::DeferTick { metadata, .. }
5251            | HydroNode::Enumerate { metadata, .. }
5252            | HydroNode::Inspect { metadata, .. }
5253            | HydroNode::Unique { metadata, .. }
5254            | HydroNode::Sort { metadata, .. }
5255            | HydroNode::Scan { metadata, .. }
5256            | HydroNode::ScanAsyncBlocking { metadata, .. }
5257            | HydroNode::Fold { metadata, .. }
5258            | HydroNode::FoldKeyed { metadata, .. }
5259            | HydroNode::Reduce { metadata, .. }
5260            | HydroNode::ReduceKeyed { metadata, .. }
5261            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5262            | HydroNode::ExternalInput { metadata, .. }
5263            | HydroNode::Network { metadata, .. }
5264            | HydroNode::Counter { metadata, .. } => metadata,
5265        }
5266    }
5267
5268    pub fn input(&self) -> Vec<&HydroNode> {
5269        match self {
5270            HydroNode::Placeholder => {
5271                panic!()
5272            }
5273            HydroNode::Source { .. }
5274            | HydroNode::SingletonSource { .. }
5275            | HydroNode::ExternalInput { .. }
5276            | HydroNode::CycleSource { .. }
5277            | HydroNode::Tee { .. }
5278            | HydroNode::Reference { .. }
5279            | HydroNode::Partition { .. } => {
5280                // Tee/Partition should find their input in separate special ways
5281                vec![]
5282            }
5283            HydroNode::Cast { inner, .. }
5284            | HydroNode::ObserveNonDet { inner, .. }
5285            | HydroNode::YieldConcat { inner, .. }
5286            | HydroNode::BeginAtomic { inner, .. }
5287            | HydroNode::EndAtomic { inner, .. }
5288            | HydroNode::Batch { inner, .. }
5289            | HydroNode::UnboundSingleton { inner, .. }
5290            | HydroNode::AssertIsConsistent { inner, .. } => {
5291                vec![inner]
5292            }
5293            HydroNode::Chain { first, second, .. } => {
5294                vec![first, second]
5295            }
5296            HydroNode::MergeOrdered { first, second, .. } => {
5297                vec![first, second]
5298            }
5299            HydroNode::ChainFirst { first, second, .. } => {
5300                vec![first, second]
5301            }
5302            HydroNode::CrossProduct { left, right, .. }
5303            | HydroNode::CrossSingleton { left, right, .. }
5304            | HydroNode::Join { left, right, .. }
5305            | HydroNode::JoinHalf { left, right, .. } => {
5306                vec![left, right]
5307            }
5308            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5309                vec![pos, neg]
5310            }
5311            HydroNode::Map { input, .. }
5312            | HydroNode::FlatMap { input, .. }
5313            | HydroNode::FlatMapStreamBlocking { input, .. }
5314            | HydroNode::Filter { input, .. }
5315            | HydroNode::FilterMap { input, .. }
5316            | HydroNode::Sort { input, .. }
5317            | HydroNode::DeferTick { input, .. }
5318            | HydroNode::Enumerate { input, .. }
5319            | HydroNode::Inspect { input, .. }
5320            | HydroNode::Unique { input, .. }
5321            | HydroNode::Network { input, .. }
5322            | HydroNode::Counter { input, .. }
5323            | HydroNode::ResolveFutures { input, .. }
5324            | HydroNode::ResolveFuturesBlocking { input, .. }
5325            | HydroNode::ResolveFuturesOrdered { input, .. }
5326            | HydroNode::Fold { input, .. }
5327            | HydroNode::FoldKeyed { input, .. }
5328            | HydroNode::Reduce { input, .. }
5329            | HydroNode::ReduceKeyed { input, .. }
5330            | HydroNode::Scan { input, .. }
5331            | HydroNode::ScanAsyncBlocking { input, .. } => {
5332                vec![input]
5333            }
5334            HydroNode::ReduceKeyedWatermark {
5335                input, watermark, ..
5336            } => {
5337                vec![input, watermark]
5338            }
5339        }
5340    }
5341
5342    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5343        self.input()
5344            .iter()
5345            .map(|input_node| input_node.metadata())
5346            .collect()
5347    }
5348
5349    /// Returns `true` if this node is a Tee or Partition whose inner Rc
5350    /// has other live references, meaning the upstream is already driven
5351    /// by another consumer and does not need a Null sink.
5352    pub fn is_shared_with_others(&self) -> bool {
5353        match self {
5354            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5355                Rc::strong_count(&inner.0) > 1
5356            }
5357            // A zero-output reference node is valid in DFIR (it drains itself at
5358            // end of tick), so it doesn't need to be driven by another consumer.
5359            HydroNode::Reference { .. } => false,
5360            _ => false,
5361        }
5362    }
5363
5364    pub fn print_root(&self) -> String {
5365        match self {
5366            HydroNode::Placeholder => {
5367                panic!()
5368            }
5369            HydroNode::Cast { .. } => "Cast()".to_owned(),
5370            HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5371            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5372            HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5373            HydroNode::Source { source, .. } => format!("Source({:?})", source),
5374            HydroNode::SingletonSource {
5375                value,
5376                first_tick_only,
5377                ..
5378            } => format!(
5379                "SingletonSource({:?}, first_tick_only={})",
5380                value, first_tick_only
5381            ),
5382            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5383            HydroNode::Tee { inner, .. } => {
5384                format!("Tee({})", inner.0.borrow().print_root())
5385            }
5386            HydroNode::Reference { inner, kind, .. } => {
5387                format!("Reference({:?}, {})", kind, inner.0.borrow().print_root())
5388            }
5389            HydroNode::Partition { f, is_true, .. } => {
5390                format!("Partition({:?}, is_true={})", f, is_true)
5391            }
5392            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5393            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5394            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5395            HydroNode::Batch { .. } => "Batch()".to_owned(),
5396            HydroNode::Chain { first, second, .. } => {
5397                format!("Chain({}, {})", first.print_root(), second.print_root())
5398            }
5399            HydroNode::MergeOrdered { first, second, .. } => {
5400                format!(
5401                    "MergeOrdered({}, {})",
5402                    first.print_root(),
5403                    second.print_root()
5404                )
5405            }
5406            HydroNode::ChainFirst { first, second, .. } => {
5407                format!(
5408                    "ChainFirst({}, {})",
5409                    first.print_root(),
5410                    second.print_root()
5411                )
5412            }
5413            HydroNode::CrossProduct { left, right, .. } => {
5414                format!(
5415                    "CrossProduct({}, {})",
5416                    left.print_root(),
5417                    right.print_root()
5418                )
5419            }
5420            HydroNode::CrossSingleton { left, right, .. } => {
5421                format!(
5422                    "CrossSingleton({}, {})",
5423                    left.print_root(),
5424                    right.print_root()
5425                )
5426            }
5427            HydroNode::Join { left, right, .. } => {
5428                format!("Join({}, {})", left.print_root(), right.print_root())
5429            }
5430            HydroNode::JoinHalf { left, right, .. } => {
5431                format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5432            }
5433            HydroNode::Difference { pos, neg, .. } => {
5434                format!("Difference({}, {})", pos.print_root(), neg.print_root())
5435            }
5436            HydroNode::AntiJoin { pos, neg, .. } => {
5437                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5438            }
5439            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5440            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5441            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5442            HydroNode::Map { f, .. } => format!("Map({:?})", f),
5443            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5444            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5445            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5446            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5447            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5448            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5449            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5450            HydroNode::Unique { .. } => "Unique()".to_owned(),
5451            HydroNode::Sort { .. } => "Sort()".to_owned(),
5452            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5453            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5454            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5455                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5456            }
5457            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5458            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5459            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5460            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5461            HydroNode::Network { .. } => "Network()".to_owned(),
5462            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5463            HydroNode::Counter { tag, duration, .. } => {
5464                format!("Counter({:?}, {:?})", tag, duration)
5465            }
5466        }
5467    }
5468}
5469
5470#[cfg(feature = "build")]
5471fn instantiate_network<'a, D>(
5472    env: &mut D::InstantiateEnv,
5473    from_location: &LocationId,
5474    to_location: &LocationId,
5475    processes: &SparseSecondaryMap<LocationKey, D::Process>,
5476    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5477    name: Option<&str>,
5478    networking_info: &crate::networking::NetworkingInfo,
5479) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5480where
5481    D: Deploy<'a>,
5482{
5483    let ((sink, source), connect_fn) = match (from_location, to_location) {
5484        (&LocationId::Process(from), &LocationId::Process(to)) => {
5485            let from_node = processes
5486                .get(from)
5487                .unwrap_or_else(|| {
5488                    panic!("A process used in the graph was not instantiated: {}", from)
5489                })
5490                .clone();
5491            let to_node = processes
5492                .get(to)
5493                .unwrap_or_else(|| {
5494                    panic!("A process used in the graph was not instantiated: {}", to)
5495                })
5496                .clone();
5497
5498            let sink_port = from_node.next_port();
5499            let source_port = to_node.next_port();
5500
5501            (
5502                D::o2o_sink_source(
5503                    env,
5504                    &from_node,
5505                    &sink_port,
5506                    &to_node,
5507                    &source_port,
5508                    name,
5509                    networking_info,
5510                ),
5511                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5512            )
5513        }
5514        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5515            let from_node = processes
5516                .get(from)
5517                .unwrap_or_else(|| {
5518                    panic!("A process used in the graph was not instantiated: {}", from)
5519                })
5520                .clone();
5521            let to_node = clusters
5522                .get(to)
5523                .unwrap_or_else(|| {
5524                    panic!("A cluster used in the graph was not instantiated: {}", to)
5525                })
5526                .clone();
5527
5528            let sink_port = from_node.next_port();
5529            let source_port = to_node.next_port();
5530
5531            (
5532                D::o2m_sink_source(
5533                    env,
5534                    &from_node,
5535                    &sink_port,
5536                    &to_node,
5537                    &source_port,
5538                    name,
5539                    networking_info,
5540                ),
5541                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5542            )
5543        }
5544        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5545            let from_node = clusters
5546                .get(from)
5547                .unwrap_or_else(|| {
5548                    panic!("A cluster used in the graph was not instantiated: {}", from)
5549                })
5550                .clone();
5551            let to_node = processes
5552                .get(to)
5553                .unwrap_or_else(|| {
5554                    panic!("A process used in the graph was not instantiated: {}", to)
5555                })
5556                .clone();
5557
5558            let sink_port = from_node.next_port();
5559            let source_port = to_node.next_port();
5560
5561            (
5562                D::m2o_sink_source(
5563                    env,
5564                    &from_node,
5565                    &sink_port,
5566                    &to_node,
5567                    &source_port,
5568                    name,
5569                    networking_info,
5570                ),
5571                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5572            )
5573        }
5574        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5575            let from_node = clusters
5576                .get(from)
5577                .unwrap_or_else(|| {
5578                    panic!("A cluster used in the graph was not instantiated: {}", from)
5579                })
5580                .clone();
5581            let to_node = clusters
5582                .get(to)
5583                .unwrap_or_else(|| {
5584                    panic!("A cluster used in the graph was not instantiated: {}", to)
5585                })
5586                .clone();
5587
5588            let sink_port = from_node.next_port();
5589            let source_port = to_node.next_port();
5590
5591            (
5592                D::m2m_sink_source(
5593                    env,
5594                    &from_node,
5595                    &sink_port,
5596                    &to_node,
5597                    &source_port,
5598                    name,
5599                    networking_info,
5600                ),
5601                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5602            )
5603        }
5604        (LocationId::Tick(_, _), _) => panic!(),
5605        (_, LocationId::Tick(_, _)) => panic!(),
5606        (LocationId::Atomic(_), _) => panic!(),
5607        (_, LocationId::Atomic(_)) => panic!(),
5608    };
5609    (sink, source, connect_fn)
5610}
5611
5612#[cfg(test)]
5613mod serde_test;
5614
5615#[cfg(test)]
5616mod test {
5617    use std::mem::size_of;
5618
5619    use stageleft::{QuotedWithContext, q};
5620
5621    use super::*;
5622
5623    #[test]
5624    #[cfg_attr(
5625        not(feature = "build"),
5626        ignore = "expects inclusion of feature-gated fields"
5627    )]
5628    fn hydro_node_size() {
5629        assert_eq!(size_of::<HydroNode>(), 264);
5630    }
5631
5632    #[test]
5633    #[cfg_attr(
5634        not(feature = "build"),
5635        ignore = "expects inclusion of feature-gated fields"
5636    )]
5637    fn hydro_root_size() {
5638        assert_eq!(size_of::<HydroRoot>(), 136);
5639    }
5640
5641    #[test]
5642    fn test_simplify_q_macro_basic() {
5643        // Test basic non-q! expression
5644        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5645        let result = simplify_q_macro(simple_expr.clone());
5646        assert_eq!(result, simple_expr);
5647    }
5648
5649    #[test]
5650    fn test_simplify_q_macro_actual_stageleft_call() {
5651        // Test a simplified version of what a real stageleft call might look like
5652        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5653        let result = simplify_q_macro(stageleft_call);
5654        // This should be processed by our visitor and simplified to q!(...)
5655        // since we detect the stageleft::runtime_support::fn_* pattern
5656        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5657    }
5658
5659    #[test]
5660    fn test_closure_no_pipe_at_start() {
5661        // Test a closure that does not start with a pipe
5662        let stageleft_call = q!({
5663            let foo = 123;
5664            move |b: usize| b + foo
5665        })
5666        .splice_fn1_ctx(&());
5667        let result = simplify_q_macro(stageleft_call);
5668        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5669    }
5670}