Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11#[cfg(feature = "tokio")]
12use tokio::time::Instant;
13
14use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
15use super::keyed_singleton::KeyedSingleton;
16use super::keyed_stream::{Generate, KeyedStream};
17use super::optional::Optional;
18use super::singleton::Singleton;
19use crate::compile::builder::{CycleId, FlowState};
20use crate::compile::ir::{
21    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
22};
23#[cfg(stageleft_runtime)]
24use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
25use crate::forward_handle::{ForwardRef, TickCycle};
26use crate::live_collections::batch_atomic::BatchAtomic;
27use crate::live_collections::singleton::SingletonBound;
28#[cfg(stageleft_runtime)]
29use crate::location::dynamic::{DynLocation, LocationId};
30use crate::location::tick::{Atomic, DeferTick};
31use crate::location::{Location, Tick, TopLevel, check_matching_location};
32use crate::manual_expr::ManualExpr;
33use crate::nondet::{NonDet, nondet};
34use crate::prelude::manual_proof;
35use crate::properties::{
36    AggFuncAlgebra, ApplyMonotoneStream, StreamMapFuncAlgebra, ValidCommutativityFor,
37    ValidIdempotenceFor, ValidMutBorrowCommutativityFor, ValidMutBorrowIdempotenceFor,
38    ValidMutCommutativityFor, ValidMutIdempotenceFor,
39};
40
41pub mod networking;
42
43/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
44#[sealed::sealed]
45pub trait Ordering:
46    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
47{
48    /// The [`StreamOrder`] corresponding to this type.
49    const ORDERING_KIND: StreamOrder;
50}
51
52/// Marks the stream as being totally ordered, which means that there are
53/// no sources of non-determinism (other than intentional ones) that will
54/// affect the order of elements.
55pub enum TotalOrder {}
56
57#[sealed::sealed]
58impl Ordering for TotalOrder {
59    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
60}
61
62/// Marks the stream as having no order, which means that the order of
63/// elements may be affected by non-determinism.
64///
65/// This restricts certain operators, such as `fold` and `reduce`, to only
66/// be used with commutative aggregation functions.
67pub enum NoOrder {}
68
69#[sealed::sealed]
70impl Ordering for NoOrder {
71    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
72}
73
74/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
75/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
76/// have `Self` guarantees instead.
77#[sealed::sealed]
78pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
79#[sealed::sealed]
80impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
81
82/// Helper trait for determining the weakest of two orderings.
83#[sealed::sealed]
84pub trait MinOrder<Other: ?Sized> {
85    /// The weaker of the two orderings.
86    type Min: Ordering;
87}
88
89#[sealed::sealed]
90impl<O: Ordering> MinOrder<O> for TotalOrder {
91    type Min = O;
92}
93
94#[sealed::sealed]
95impl<O: Ordering> MinOrder<O> for NoOrder {
96    type Min = NoOrder;
97}
98
99/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
100#[sealed::sealed]
101pub trait Retries:
102    MinRetries<Self, Min = Self>
103    + MinRetries<ExactlyOnce, Min = Self>
104    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
105{
106    /// The [`StreamRetry`] corresponding to this type.
107    const RETRIES_KIND: StreamRetry;
108}
109
110/// Marks the stream as having deterministic message cardinality, with no
111/// possibility of duplicates.
112pub enum ExactlyOnce {}
113
114#[sealed::sealed]
115impl Retries for ExactlyOnce {
116    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
117}
118
119/// Marks the stream as having non-deterministic message cardinality, which
120/// means that duplicates may occur, but messages will not be dropped.
121pub enum AtLeastOnce {}
122
123#[sealed::sealed]
124impl Retries for AtLeastOnce {
125    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
126}
127
128/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
129/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
130/// have `Self` guarantees instead.
131#[sealed::sealed]
132pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
133#[sealed::sealed]
134impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
135
136/// Helper trait for determining the weakest of two retry guarantees.
137#[sealed::sealed]
138pub trait MinRetries<Other: ?Sized> {
139    /// The weaker of the two retry guarantees.
140    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
141}
142
143#[sealed::sealed]
144impl<R: Retries> MinRetries<R> for ExactlyOnce {
145    type Min = R;
146}
147
148#[sealed::sealed]
149impl<R: Retries> MinRetries<R> for AtLeastOnce {
150    type Min = AtLeastOnce;
151}
152
153#[sealed::sealed]
154#[diagnostic::on_unimplemented(
155    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
156    label = "required here",
157    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
158)]
159/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
160pub trait IsOrdered: Ordering {}
161
162#[sealed::sealed]
163#[diagnostic::do_not_recommend]
164impl IsOrdered for TotalOrder {}
165
166#[sealed::sealed]
167#[diagnostic::on_unimplemented(
168    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
169    label = "required here",
170    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
171)]
172/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
173pub trait IsExactlyOnce: Retries {}
174
175#[sealed::sealed]
176#[diagnostic::do_not_recommend]
177impl IsExactlyOnce for ExactlyOnce {}
178
179/// Streaming sequence of elements with type `Type`.
180///
181/// This live collection represents a growing sequence of elements, with new elements being
182/// asynchronously appended to the end of the sequence. This can be used to model the arrival
183/// of network input, such as API requests, or streaming ingestion.
184///
185/// By default, all streams have deterministic ordering and each element is materialized exactly
186/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
187/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
188/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
189///
190/// Type Parameters:
191/// - `Type`: the type of elements in the stream
192/// - `Loc`: the location where the stream is being materialized
193/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
194/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
195///   (default is [`TotalOrder`])
196/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
197///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
198pub struct Stream<
199    Type,
200    Loc,
201    Bound: Boundedness = Unbounded,
202    Order: Ordering = TotalOrder,
203    Retry: Retries = ExactlyOnce,
204> {
205    pub(crate) location: Loc,
206    pub(crate) ir_node: RefCell<HydroNode>,
207    pub(crate) flow_state: FlowState,
208
209    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
210}
211
212impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
213    fn drop(&mut self) {
214        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
215        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
216            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
217                input: Box::new(ir_node),
218                op_metadata: HydroIrOpMetadata::new(),
219            });
220        }
221    }
222}
223
224impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
225    for Stream<T, L, Unbounded, O, R>
226where
227    L: Location<'a>,
228{
229    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
230        let new_meta = stream
231            .location
232            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
233
234        Stream {
235            location: stream.location.clone(),
236            flow_state: stream.flow_state.clone(),
237            ir_node: RefCell::new(HydroNode::Cast {
238                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
239                metadata: new_meta,
240            }),
241            _phantom: PhantomData,
242        }
243    }
244}
245
246impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
247    for Stream<T, L, B, NoOrder, R>
248where
249    L: Location<'a>,
250{
251    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
252        stream.weaken_ordering()
253    }
254}
255
256impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
257    for Stream<T, L, B, O, AtLeastOnce>
258where
259    L: Location<'a>,
260{
261    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
262        stream.weaken_retries()
263    }
264}
265
266impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
267where
268    L: Location<'a>,
269{
270    fn defer_tick(self) -> Self {
271        Stream::defer_tick(self)
272    }
273}
274
275impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
276    for Stream<T, Tick<L>, Bounded, O, R>
277where
278    L: Location<'a>,
279{
280    type Location = Tick<L>;
281
282    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
283        Stream::new(
284            location.clone(),
285            HydroNode::CycleSource {
286                cycle_id,
287                metadata: location.new_node_metadata(Self::collection_kind()),
288            },
289        )
290    }
291}
292
293impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
294    for Stream<T, Tick<L>, Bounded, O, R>
295where
296    L: Location<'a>,
297{
298    type Location = Tick<L>;
299
300    fn location(&self) -> &Self::Location {
301        self.location()
302    }
303
304    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
305        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
306            location.clone(),
307            HydroNode::DeferTick {
308                input: Box::new(HydroNode::CycleSource {
309                    cycle_id,
310                    metadata: location.new_node_metadata(Self::collection_kind()),
311                }),
312                metadata: location.new_node_metadata(Self::collection_kind()),
313            },
314        );
315
316        from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
317    }
318}
319
320impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
321    for Stream<T, Tick<L>, Bounded, O, R>
322where
323    L: Location<'a>,
324{
325    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
326        assert_eq!(
327            Location::id(&self.location),
328            expected_location,
329            "locations do not match"
330        );
331        self.location
332            .flow_state()
333            .borrow_mut()
334            .push_root(HydroRoot::CycleSink {
335                cycle_id,
336                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
337                op_metadata: HydroIrOpMetadata::new(),
338            });
339    }
340}
341
342impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
343    for Stream<T, L, B, O, R>
344where
345    L: Location<'a>,
346{
347    type Location = L;
348
349    fn create_source(cycle_id: CycleId, location: L) -> Self {
350        Stream::new(
351            location.clone(),
352            HydroNode::CycleSource {
353                cycle_id,
354                metadata: location.new_node_metadata(Self::collection_kind()),
355            },
356        )
357    }
358}
359
360impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
361    for Stream<T, L, B, O, R>
362where
363    L: Location<'a>,
364{
365    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
366        assert_eq!(
367            Location::id(&self.location),
368            expected_location,
369            "locations do not match"
370        );
371        self.location
372            .flow_state()
373            .borrow_mut()
374            .push_root(HydroRoot::CycleSink {
375                cycle_id,
376                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
377                op_metadata: HydroIrOpMetadata::new(),
378            });
379    }
380}
381
382impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
383where
384    T: Clone,
385    L: Location<'a>,
386{
387    fn clone(&self) -> Self {
388        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
389            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
390            *self.ir_node.borrow_mut() = HydroNode::Tee {
391                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
392                metadata: self.location.new_node_metadata(Self::collection_kind()),
393            };
394        }
395
396        let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
397            unreachable!()
398        };
399        Stream {
400            location: self.location.clone(),
401            flow_state: self.flow_state.clone(),
402            ir_node: HydroNode::Tee {
403                inner: SharedNode(inner.0.clone()),
404                metadata: metadata.clone(),
405            }
406            .into(),
407            _phantom: PhantomData,
408        }
409    }
410}
411
412impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
413where
414    L: Location<'a>,
415{
416    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
417        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
418        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
419
420        let flow_state = location.flow_state().clone();
421        Stream {
422            location,
423            flow_state,
424            ir_node: RefCell::new(ir_node),
425            _phantom: PhantomData,
426        }
427    }
428
429    /// Returns the [`Location`] where this stream is being materialized.
430    pub fn location(&self) -> &L {
431        &self.location
432    }
433
434    /// Creates a shared reference handle to this stream's handoff buffer that can be captured
435    /// inside `q!()` closures. The handle resolves to `&Vec<T>` at runtime.
436    ///
437    /// The stream must be bounded, otherwise reading it would be non-deterministic.
438    pub fn by_ref(&self) -> crate::handoff_ref::StreamRef<'a, '_, T, L>
439    where
440        B: IsBounded,
441    {
442        crate::handoff_ref::StreamRef::new(&self.ir_node)
443    }
444
445    /// Returns a mutable reference handle to this stream's handoff buffer that can be captured
446    /// inside `q!()` closures. The handle resolves to `&mut Vec<T>` at runtime.
447    pub fn by_mut(&self) -> crate::handoff_ref::StreamMut<'a, '_, T, L>
448    where
449        B: IsBounded,
450    {
451        crate::handoff_ref::StreamMut::new(&self.ir_node)
452    }
453
454    /// Weakens the consistency of this live collection to not guarantee any consistency across
455    /// cluster members (if this collection is on a cluster).
456    pub fn weaken_consistency(self) -> Stream<T, L::DropConsistency, B, O, R>
457    where
458        L: Location<'a>,
459    {
460        if L::consistency()
461            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
462        {
463            // already no consistency
464            Stream::new(
465                self.location.drop_consistency(),
466                self.ir_node.replace(HydroNode::Placeholder),
467            )
468        } else {
469            Stream::new(
470                self.location.drop_consistency(),
471                HydroNode::Cast {
472                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
473                    metadata: self.location.drop_consistency().new_node_metadata(Stream::<
474                        T,
475                        L::DropConsistency,
476                        B,
477                        O,
478                        R,
479                    >::collection_kind(
480                    )),
481                },
482            )
483        }
484    }
485
486    /// Casts this live collection to have the consistency guarantees specified in the given
487    /// location type parameter. The developer must ensure that the strengthened consistency
488    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
489    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
490        self,
491        _proof: impl crate::properties::ConsistencyProof,
492    ) -> Stream<T, L2, B, O, R>
493    where
494        L: Location<'a>,
495    {
496        if L::consistency() == L2::consistency() {
497            Stream::new(
498                self.location.with_consistency_of(),
499                self.ir_node.replace(HydroNode::Placeholder),
500            )
501        } else {
502            Stream::new(
503                self.location.with_consistency_of(),
504                HydroNode::AssertIsConsistent {
505                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
506                    trusted: false,
507                    metadata: self
508                        .location
509                        .clone()
510                        .with_consistency_of::<L2>()
511                        .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
512                },
513            )
514        }
515    }
516
517    pub(crate) fn assert_has_consistency_of_trusted<
518        L2: Location<'a, DropConsistency = L::DropConsistency>,
519    >(
520        self,
521        _proof: impl crate::properties::ConsistencyProof,
522    ) -> Stream<T, L2, B, O, R>
523    where
524        L: Location<'a>,
525    {
526        if L::consistency() == L2::consistency() {
527            Stream::new(
528                self.location.with_consistency_of(),
529                self.ir_node.replace(HydroNode::Placeholder),
530            )
531        } else {
532            Stream::new(
533                self.location.with_consistency_of(),
534                HydroNode::AssertIsConsistent {
535                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
536                    trusted: true,
537                    metadata: self
538                        .location
539                        .clone()
540                        .with_consistency_of::<L2>()
541                        .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
542                },
543            )
544        }
545    }
546
547    pub(crate) fn collection_kind() -> CollectionKind {
548        CollectionKind::Stream {
549            bound: B::BOUND_KIND,
550            order: O::ORDERING_KIND,
551            retry: R::RETRIES_KIND,
552            element_type: quote_type::<T>().into(),
553        }
554    }
555
556    /// Produces a stream based on invoking `f` on each element.
557    /// If you do not want to modify the stream and instead only want to view
558    /// each item use [`Stream::inspect`] instead.
559    ///
560    /// # Example
561    /// ```rust
562    /// # #[cfg(feature = "deploy")] {
563    /// # use hydro_lang::prelude::*;
564    /// # use futures::StreamExt;
565    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
566    /// let words = process.source_iter(q!(vec!["hello", "world"]));
567    /// words.map(q!(|x| x.to_uppercase()))
568    /// # }, |mut stream| async move {
569    /// # for w in vec!["HELLO", "WORLD"] {
570    /// #     assert_eq!(stream.next().await.unwrap(), w);
571    /// # }
572    /// # }));
573    /// # }
574    /// ```
575    pub fn map<U, F, C, I, const WAS_MUT: bool>(
576        self,
577        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, I>>,
578    ) -> Stream<U, L, B, O, R>
579    where
580        F: FnMut(T) -> U + 'a,
581        C: ValidMutCommutativityFor<F, T, U, O, WAS_MUT>,
582        I: ValidMutIdempotenceFor<F, T, U, R, WAS_MUT>,
583    {
584        let f = crate::handoff_ref::with_ref_capture(|| {
585            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
586            proof.register_proof(&expr);
587            expr.into()
588        });
589        Stream::new(
590            self.location.clone(),
591            HydroNode::Map {
592                f,
593                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
594                metadata: self
595                    .location
596                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
597            },
598        )
599    }
600
601    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
602    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
603    /// for the output type `U` must produce items in a **deterministic** order.
604    ///
605    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
606    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
607    ///
608    /// # Example
609    /// ```rust
610    /// # #[cfg(feature = "deploy")] {
611    /// # use hydro_lang::prelude::*;
612    /// # use futures::StreamExt;
613    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
614    /// process
615    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
616    ///     .flat_map_ordered(q!(|x| x))
617    /// # }, |mut stream| async move {
618    /// // 1, 2, 3, 4
619    /// # for w in (1..5) {
620    /// #     assert_eq!(stream.next().await.unwrap(), w);
621    /// # }
622    /// # }));
623    /// # }
624    /// ```
625    pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
626        self,
627        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
628    ) -> Stream<U, L, B, O, R>
629    where
630        I: IntoIterator<Item = U>,
631        F: FnMut(T) -> I + 'a,
632        C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
633        Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
634    {
635        let f = crate::handoff_ref::with_ref_capture(|| {
636            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
637            proof.register_proof(&expr);
638            expr.into()
639        });
640        Stream::new(
641            self.location.clone(),
642            HydroNode::FlatMap {
643                f,
644                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
645                metadata: self
646                    .location
647                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
648            },
649        )
650    }
651
652    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
653    /// for the output type `U` to produce items in any order.
654    ///
655    /// # Example
656    /// ```rust
657    /// # #[cfg(feature = "deploy")] {
658    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
659    /// # use futures::StreamExt;
660    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
661    /// process
662    ///     .source_iter(q!(vec![
663    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
664    ///         std::collections::HashSet::from_iter(vec![3, 4]),
665    ///     ]))
666    ///     .flat_map_unordered(q!(|x| x))
667    /// # }, |mut stream| async move {
668    /// // 1, 2, 3, 4, but in no particular order
669    /// # let mut results = Vec::new();
670    /// # for w in (1..5) {
671    /// #     results.push(stream.next().await.unwrap());
672    /// # }
673    /// # results.sort();
674    /// # assert_eq!(results, vec![1, 2, 3, 4]);
675    /// # }));
676    /// # }
677    /// ```
678    pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
679        self,
680        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
681    ) -> Stream<U, L, B, NoOrder, R>
682    where
683        I: IntoIterator<Item = U>,
684        F: FnMut(T) -> I + 'a,
685        C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
686        Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
687    {
688        let f = crate::handoff_ref::with_ref_capture(|| {
689            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
690            proof.register_proof(&expr);
691            expr.into()
692        });
693        Stream::new(
694            self.location.clone(),
695            HydroNode::FlatMap {
696                f,
697                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
698                metadata: self
699                    .location
700                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
701            },
702        )
703    }
704
705    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
706    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
707    ///
708    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
709    /// not deterministic, use [`Stream::flatten_unordered`] instead.
710    ///
711    /// ```rust
712    /// # #[cfg(feature = "deploy")] {
713    /// # use hydro_lang::prelude::*;
714    /// # use futures::StreamExt;
715    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
716    /// process
717    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
718    ///     .flatten_ordered()
719    /// # }, |mut stream| async move {
720    /// // 1, 2, 3, 4
721    /// # for w in (1..5) {
722    /// #     assert_eq!(stream.next().await.unwrap(), w);
723    /// # }
724    /// # }));
725    /// # }
726    /// ```
727    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
728    where
729        T: IntoIterator<Item = U>,
730    {
731        self.flat_map_ordered(q!(|d| d))
732    }
733
734    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
735    /// for the element type `T` to produce items in any order.
736    ///
737    /// # Example
738    /// ```rust
739    /// # #[cfg(feature = "deploy")] {
740    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
741    /// # use futures::StreamExt;
742    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
743    /// process
744    ///     .source_iter(q!(vec![
745    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
746    ///         std::collections::HashSet::from_iter(vec![3, 4]),
747    ///     ]))
748    ///     .flatten_unordered()
749    /// # }, |mut stream| async move {
750    /// // 1, 2, 3, 4, but in no particular order
751    /// # let mut results = Vec::new();
752    /// # for w in (1..5) {
753    /// #     results.push(stream.next().await.unwrap());
754    /// # }
755    /// # results.sort();
756    /// # assert_eq!(results, vec![1, 2, 3, 4]);
757    /// # }));
758    /// # }
759    /// ```
760    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
761    where
762        T: IntoIterator<Item = U>,
763    {
764        self.flat_map_unordered(q!(|d| d))
765    }
766
767    /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
768    /// then emit the elements of that stream one by one. When the inner stream yields
769    /// `Pending`, this operator yields as well.
770    pub fn flat_map_stream_blocking<U, S, F, C, Idemp, const WAS_MUT: bool>(
771        self,
772        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
773    ) -> Stream<U, L, B, O, R>
774    where
775        S: futures::Stream<Item = U>,
776        F: FnMut(T) -> S + 'a,
777        C: ValidMutCommutativityFor<F, T, S, O, WAS_MUT>,
778        Idemp: ValidMutIdempotenceFor<F, T, S, R, WAS_MUT>,
779    {
780        let f = crate::handoff_ref::with_ref_capture(|| {
781            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
782            proof.register_proof(&expr);
783            expr.into()
784        });
785        Stream::new(
786            self.location.clone(),
787            HydroNode::FlatMapStreamBlocking {
788                f,
789                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
790                metadata: self
791                    .location
792                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
793            },
794        )
795    }
796
797    /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
798    /// emit its elements one by one. When the inner stream yields `Pending`, this operator
799    /// yields as well.
800    pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
801    where
802        T: futures::Stream<Item = U>,
803    {
804        self.flat_map_stream_blocking(q!(|d| d))
805    }
806
807    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
808    /// `f`, preserving the order of the elements.
809    ///
810    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
811    /// not modify or take ownership of the values. If you need to modify the values while filtering
812    /// use [`Stream::filter_map`] instead.
813    ///
814    /// # Example
815    /// ```rust
816    /// # #[cfg(feature = "deploy")] {
817    /// # use hydro_lang::prelude::*;
818    /// # use futures::StreamExt;
819    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
820    /// process
821    ///     .source_iter(q!(vec![1, 2, 3, 4]))
822    ///     .filter(q!(|&x| x > 2))
823    /// # }, |mut stream| async move {
824    /// // 3, 4
825    /// # for w in (3..5) {
826    /// #     assert_eq!(stream.next().await.unwrap(), w);
827    /// # }
828    /// # }));
829    /// # }
830    /// ```
831    pub fn filter<F, C, Idemp, const WAS_MUT: bool>(
832        self,
833        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
834    ) -> Self
835    where
836        F: FnMut(&T) -> bool + 'a,
837        C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
838        Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
839    {
840        let f = crate::handoff_ref::with_ref_capture(|| {
841            let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
842            proof.register_proof(&expr);
843            expr.into()
844        });
845        Stream::new(
846            self.location.clone(),
847            HydroNode::Filter {
848                f,
849                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
850                metadata: self.location.new_node_metadata(Self::collection_kind()),
851            },
852        )
853    }
854
855    /// Splits the stream into two streams based on a predicate, without cloning elements.
856    ///
857    /// Elements for which `f` returns `true` are sent to the first output stream,
858    /// and elements for which `f` returns `false` are sent to the second output stream.
859    ///
860    /// Unlike using `filter` twice, this only evaluates the predicate once per element
861    /// and does not require `T: Clone`.
862    ///
863    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
864    /// the predicate is only used for routing; the element itself is moved to the
865    /// appropriate output stream.
866    ///
867    /// # Example
868    /// ```rust
869    /// # #[cfg(feature = "deploy")] {
870    /// # use hydro_lang::prelude::*;
871    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
872    /// # use futures::StreamExt;
873    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
874    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
875    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
876    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
877    /// evens.map(q!(|x| (x, true)))
878    ///     .merge_unordered(odds.map(q!(|x| (x, false))))
879    /// # }, |mut stream| async move {
880    /// # let mut results = Vec::new();
881    /// # for _ in 0..6 {
882    /// #     results.push(stream.next().await.unwrap());
883    /// # }
884    /// # results.sort();
885    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
886    /// # }));
887    /// # }
888    /// ```
889    pub fn partition<F, C, Idemp, const WAS_MUT: bool>(
890        self,
891        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
892    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
893    where
894        F: FnMut(&T) -> bool + 'a,
895        C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
896        Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
897    {
898        let f = crate::handoff_ref::with_ref_capture(|| {
899            let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
900            proof.register_proof(&expr);
901            expr.into()
902        });
903        let shared = SharedNode(Rc::new(RefCell::new(
904            self.ir_node.replace(HydroNode::Placeholder),
905        )));
906
907        let true_stream = Stream::new(
908            self.location.clone(),
909            HydroNode::Partition {
910                inner: SharedNode(shared.0.clone()),
911                f: f.clone(),
912                is_true: true,
913                metadata: self.location.new_node_metadata(Self::collection_kind()),
914            },
915        );
916
917        let false_stream = Stream::new(
918            self.location.clone(),
919            HydroNode::Partition {
920                inner: SharedNode(shared.0),
921                f,
922                is_true: false,
923                metadata: self.location.new_node_metadata(Self::collection_kind()),
924            },
925        );
926
927        (true_stream, false_stream)
928    }
929
930    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
931    ///
932    /// # Example
933    /// ```rust
934    /// # #[cfg(feature = "deploy")] {
935    /// # use hydro_lang::prelude::*;
936    /// # use futures::StreamExt;
937    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
938    /// process
939    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
940    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
941    /// # }, |mut stream| async move {
942    /// // 1, 2
943    /// # for w in (1..3) {
944    /// #     assert_eq!(stream.next().await.unwrap(), w);
945    /// # }
946    /// # }));
947    /// # }
948    /// ```
949    pub fn filter_map<U, F, C, Idemp, const WAS_MUT: bool>(
950        self,
951        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
952    ) -> Stream<U, L, B, O, R>
953    where
954        F: FnMut(T) -> Option<U> + 'a,
955        C: ValidMutCommutativityFor<F, T, Option<U>, O, WAS_MUT>,
956        Idemp: ValidMutIdempotenceFor<F, T, Option<U>, R, WAS_MUT>,
957    {
958        let f = crate::handoff_ref::with_ref_capture(|| {
959            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
960            proof.register_proof(&expr);
961            expr.into()
962        });
963        Stream::new(
964            self.location.clone(),
965            HydroNode::FilterMap {
966                f,
967                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
968                metadata: self
969                    .location
970                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
971            },
972        )
973    }
974
975    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
976    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
977    /// If `other` is an empty [`Optional`], no values will be produced.
978    ///
979    /// # Example
980    /// ```rust
981    /// # #[cfg(feature = "deploy")] {
982    /// # use hydro_lang::prelude::*;
983    /// # use futures::StreamExt;
984    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
985    /// let tick = process.tick();
986    /// let batch = process
987    ///   .source_iter(q!(vec![1, 2, 3, 4]))
988    ///   .batch(&tick, nondet!(/** test */));
989    /// let count = batch.clone().count(); // `count()` returns a singleton
990    /// batch.cross_singleton(count).all_ticks()
991    /// # }, |mut stream| async move {
992    /// // (1, 4), (2, 4), (3, 4), (4, 4)
993    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
994    /// #     assert_eq!(stream.next().await.unwrap(), w);
995    /// # }
996    /// # }));
997    /// # }
998    /// ```
999    pub fn cross_singleton<O2>(
1000        self,
1001        other: impl Into<Optional<O2, L, Bounded>>,
1002    ) -> Stream<(T, O2), L, B, O, R>
1003    where
1004        O2: Clone,
1005    {
1006        let other: Optional<O2, L, Bounded> = other.into();
1007        check_matching_location(&self.location, &other.location);
1008
1009        Stream::new(
1010            self.location.clone(),
1011            HydroNode::CrossSingleton {
1012                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1013                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1014                metadata: self
1015                    .location
1016                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
1017            },
1018        )
1019    }
1020
1021    /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
1022    ///
1023    /// # Example
1024    /// ```rust
1025    /// # #[cfg(feature = "deploy")] {
1026    /// # use hydro_lang::prelude::*;
1027    /// # use futures::StreamExt;
1028    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1029    /// let tick = process.tick();
1030    /// // ticks are lazy by default, forces the second tick to run
1031    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1032    ///
1033    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
1034    /// let batch_first_tick = process
1035    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1036    ///   .batch(&tick, nondet!(/** test */));
1037    /// let batch_second_tick = process
1038    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1039    ///   .batch(&tick, nondet!(/** test */))
1040    ///   .defer_tick();
1041    /// batch_first_tick.chain(batch_second_tick)
1042    ///   .filter_if(signal)
1043    ///   .all_ticks()
1044    /// # }, |mut stream| async move {
1045    /// // [1, 2, 3, 4]
1046    /// # for w in vec![1, 2, 3, 4] {
1047    /// #     assert_eq!(stream.next().await.unwrap(), w);
1048    /// # }
1049    /// # }));
1050    /// # }
1051    /// ```
1052    pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
1053        self.cross_singleton(signal.filter(q!(|b| *b)))
1054            .map(q!(|(d, _)| d))
1055    }
1056
1057    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
1058    ///
1059    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
1060    /// leader of a cluster.
1061    ///
1062    /// # Example
1063    /// ```rust
1064    /// # #[cfg(feature = "deploy")] {
1065    /// # use hydro_lang::prelude::*;
1066    /// # use futures::StreamExt;
1067    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1068    /// let tick = process.tick();
1069    /// // ticks are lazy by default, forces the second tick to run
1070    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1071    ///
1072    /// let batch_first_tick = process
1073    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1074    ///   .batch(&tick, nondet!(/** test */));
1075    /// let batch_second_tick = process
1076    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1077    ///   .batch(&tick, nondet!(/** test */))
1078    ///   .defer_tick(); // appears on the second tick
1079    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1080    /// batch_first_tick.chain(batch_second_tick)
1081    ///   .filter_if_some(some_on_first_tick)
1082    ///   .all_ticks()
1083    /// # }, |mut stream| async move {
1084    /// // [1, 2, 3, 4]
1085    /// # for w in vec![1, 2, 3, 4] {
1086    /// #     assert_eq!(stream.next().await.unwrap(), w);
1087    /// # }
1088    /// # }));
1089    /// # }
1090    /// ```
1091    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1092    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1093        self.filter_if(signal.is_some())
1094    }
1095
1096    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
1097    ///
1098    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
1099    /// some local state.
1100    ///
1101    /// # Example
1102    /// ```rust
1103    /// # #[cfg(feature = "deploy")] {
1104    /// # use hydro_lang::prelude::*;
1105    /// # use futures::StreamExt;
1106    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1107    /// let tick = process.tick();
1108    /// // ticks are lazy by default, forces the second tick to run
1109    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1110    ///
1111    /// let batch_first_tick = process
1112    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1113    ///   .batch(&tick, nondet!(/** test */));
1114    /// let batch_second_tick = process
1115    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1116    ///   .batch(&tick, nondet!(/** test */))
1117    ///   .defer_tick(); // appears on the second tick
1118    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1119    /// batch_first_tick.chain(batch_second_tick)
1120    ///   .filter_if_none(some_on_first_tick)
1121    ///   .all_ticks()
1122    /// # }, |mut stream| async move {
1123    /// // [5, 6, 7, 8]
1124    /// # for w in vec![5, 6, 7, 8] {
1125    /// #     assert_eq!(stream.next().await.unwrap(), w);
1126    /// # }
1127    /// # }));
1128    /// # }
1129    /// ```
1130    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1131    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1132        self.filter_if(other.is_none())
1133    }
1134
1135    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams,
1136    /// returning all tupled pairs.
1137    ///
1138    /// When the right side is [`Bounded`], it is accumulated first and the left side streams
1139    /// through, preserving the left side's ordering. When both sides are [`Unbounded`], a
1140    /// symmetric hash join is used and ordering is [`NoOrder`].
1141    ///
1142    /// # Example
1143    /// ```rust
1144    /// # #[cfg(feature = "deploy")] {
1145    /// # use hydro_lang::prelude::*;
1146    /// # use std::collections::HashSet;
1147    /// # use futures::StreamExt;
1148    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1149    /// let tick = process.tick();
1150    /// let stream1 = process.source_iter(q!(vec![1, 2]));
1151    /// let stream2 = process.source_iter(q!(vec!['a', 'b']));
1152    /// stream1.cross_product(stream2)
1153    /// # }, |mut stream| async move {
1154    /// // (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b') in any order
1155    /// # let expected = HashSet::from([(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]);
1156    /// # stream.map(|i| assert!(expected.contains(&i)));
1157    /// # }));
1158    /// # }
1159    pub fn cross_product<T2, B2: Boundedness, O2: Ordering, R2: Retries>(
1160        self,
1161        other: Stream<T2, L, B2, O2, R2>,
1162    ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
1163    where
1164        T: Clone,
1165        T2: Clone,
1166        R: MinRetries<R2>,
1167    {
1168        self.map(q!(|v| ((), v)))
1169            .join(other.map(q!(|v| ((), v))))
1170            .map(q!(|((), (v1, v2))| (v1, v2)))
1171    }
1172
1173    /// Takes one stream as input and filters out any duplicate occurrences. The output
1174    /// contains all unique values from the input.
1175    ///
1176    /// # Example
1177    /// ```rust
1178    /// # #[cfg(feature = "deploy")] {
1179    /// # use hydro_lang::prelude::*;
1180    /// # use futures::StreamExt;
1181    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1182    /// let tick = process.tick();
1183    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1184    /// # }, |mut stream| async move {
1185    /// # for w in vec![1, 2, 3, 4] {
1186    /// #     assert_eq!(stream.next().await.unwrap(), w);
1187    /// # }
1188    /// # }));
1189    /// # }
1190    /// ```
1191    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1192    where
1193        T: Eq + Hash,
1194    {
1195        Stream::new(
1196            self.location.clone(),
1197            HydroNode::Unique {
1198                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1199                metadata: self
1200                    .location
1201                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1202            },
1203        )
1204    }
1205
1206    /// Outputs everything in this stream that is *not* contained in the `other` stream.
1207    ///
1208    /// The `other` stream must be [`Bounded`], since this function will wait until
1209    /// all its elements are available before producing any output.
1210    /// # Example
1211    /// ```rust
1212    /// # #[cfg(feature = "deploy")] {
1213    /// # use hydro_lang::prelude::*;
1214    /// # use futures::StreamExt;
1215    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1216    /// let tick = process.tick();
1217    /// let stream = process
1218    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1219    ///   .batch(&tick, nondet!(/** test */));
1220    /// let batch = process
1221    ///   .source_iter(q!(vec![1, 2]))
1222    ///   .batch(&tick, nondet!(/** test */));
1223    /// stream.filter_not_in(batch).all_ticks()
1224    /// # }, |mut stream| async move {
1225    /// # for w in vec![3, 4] {
1226    /// #     assert_eq!(stream.next().await.unwrap(), w);
1227    /// # }
1228    /// # }));
1229    /// # }
1230    /// ```
1231    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1232    where
1233        T: Eq + Hash,
1234        B2: IsBounded,
1235    {
1236        check_matching_location(&self.location, &other.location);
1237
1238        Stream::new(
1239            self.location.clone(),
1240            HydroNode::Difference {
1241                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1242                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1243                metadata: self
1244                    .location
1245                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1246            },
1247        )
1248    }
1249
1250    /// An operator which allows you to "inspect" each element of a stream without
1251    /// modifying it. The closure `f` is called on a reference to each item. This is
1252    /// mainly useful for debugging, and should not be used to generate side-effects.
1253    ///
1254    /// # Example
1255    /// ```rust
1256    /// # #[cfg(feature = "deploy")] {
1257    /// # use hydro_lang::prelude::*;
1258    /// # use futures::StreamExt;
1259    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1260    /// let nums = process.source_iter(q!(vec![1, 2]));
1261    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1262    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1263    /// # }, |mut stream| async move {
1264    /// # for w in vec![1, 2] {
1265    /// #     assert_eq!(stream.next().await.unwrap(), w);
1266    /// # }
1267    /// # }));
1268    /// # }
1269    /// ```
1270    pub fn inspect<F, C, Idemp, const WAS_MUT: bool>(
1271        self,
1272        f: impl IntoQuotedMut<'a, F, L::DropConsistency, StreamMapFuncAlgebra<C, Idemp>>,
1273    ) -> Self
1274    where
1275        F: FnMut(&T) + 'a,
1276        C: ValidMutBorrowCommutativityFor<F, T, (), O, WAS_MUT>,
1277        Idemp: ValidMutBorrowIdempotenceFor<F, T, (), R, WAS_MUT>,
1278    {
1279        let f = crate::handoff_ref::with_ref_capture(|| {
1280            let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location.drop_consistency());
1281            proof.register_proof(&expr);
1282            expr.into()
1283        });
1284
1285        Stream::new(
1286            self.location.clone(),
1287            HydroNode::Inspect {
1288                f,
1289                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1290                metadata: self.location.new_node_metadata(Self::collection_kind()),
1291            },
1292        )
1293    }
1294
1295    /// Executes the provided closure for every element in this stream.
1296    ///
1297    /// If the stream is unordered or has retries, the closure must demonstrate commutativity
1298    /// and/or idempotence via annotations:
1299    /// ```rust,ignore
1300    /// stream.for_each(q!(
1301    ///     |x| *flag_mut |= x,
1302    ///     commutative = manual_proof!(/** boolean OR is commutative */),
1303    ///     idempotent = manual_proof!(/** boolean OR is idempotent */)
1304    /// ));
1305    /// ```
1306    ///
1307    /// On a `TotalOrder + ExactlyOnce` stream, no annotations are needed.
1308    ///
1309    /// The closure may capture singletons via `by_ref()` or `by_mut()`.
1310    pub fn for_each<F: FnMut(T) + 'a, C, I>(
1311        self,
1312        f: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, I>>,
1313    ) where
1314        C: ValidCommutativityFor<O>,
1315        I: ValidIdempotenceFor<R>,
1316    {
1317        let f = crate::handoff_ref::with_ref_capture(|| {
1318            let (f, proof) = f.splice_fnmut1_ctx_props(&self.location);
1319            proof.register_proof(&f);
1320            f.into()
1321        });
1322        self.location
1323            .flow_state()
1324            .borrow_mut()
1325            .push_root(HydroRoot::ForEach {
1326                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1327                f,
1328                op_metadata: HydroIrOpMetadata::new(),
1329            });
1330    }
1331
1332    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1333    /// TCP socket to some other server. You should _not_ use this API for interacting with
1334    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1335    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1336    /// interaction with asynchronous sinks.
1337    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1338    where
1339        O: IsOrdered,
1340        R: IsExactlyOnce,
1341        S: 'a + futures::Sink<T> + Unpin,
1342    {
1343        self.location
1344            .flow_state()
1345            .borrow_mut()
1346            .push_root(HydroRoot::DestSink {
1347                sink: sink.splice_typed_ctx(&self.location).into(),
1348                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1349                op_metadata: HydroIrOpMetadata::new(),
1350            });
1351    }
1352
1353    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1354    ///
1355    /// # Example
1356    /// ```rust
1357    /// # #[cfg(feature = "deploy")] {
1358    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1359    /// # use futures::StreamExt;
1360    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1361    /// let tick = process.tick();
1362    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1363    /// numbers.enumerate()
1364    /// # }, |mut stream| async move {
1365    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1366    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1367    /// #     assert_eq!(stream.next().await.unwrap(), w);
1368    /// # }
1369    /// # }));
1370    /// # }
1371    /// ```
1372    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1373    where
1374        O: IsOrdered,
1375        R: IsExactlyOnce,
1376    {
1377        Stream::new(
1378            self.location.clone(),
1379            HydroNode::Enumerate {
1380                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1381                metadata: self.location.new_node_metadata(Stream::<
1382                    (usize, T),
1383                    L,
1384                    B,
1385                    TotalOrder,
1386                    ExactlyOnce,
1387                >::collection_kind()),
1388            },
1389        )
1390    }
1391
1392    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1393    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1394    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1395    ///
1396    /// Depending on the input stream guarantees, the closure may need to be commutative
1397    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1398    ///
1399    /// # Example
1400    /// ```rust
1401    /// # #[cfg(feature = "deploy")] {
1402    /// # use hydro_lang::prelude::*;
1403    /// # use futures::StreamExt;
1404    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1405    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1406    /// words
1407    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1408    ///     .into_stream()
1409    /// # }, |mut stream| async move {
1410    /// // "HELLOWORLD"
1411    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1412    /// # }));
1413    /// # }
1414    /// ```
1415    pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1416        self,
1417        init: impl IntoQuotedMut<'a, I, L>,
1418        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1419    ) -> Singleton<A, L, B2>
1420    where
1421        I: Fn() -> A + 'a,
1422        F: 'a + Fn(&mut A, T),
1423        C: ValidCommutativityFor<O>,
1424        Idemp: ValidIdempotenceFor<R>,
1425        B: ApplyMonotoneStream<M, B2>,
1426    {
1427        let init = init.splice_fn0_ctx(&self.location).into();
1428        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1429        proof.register_proof(&comb);
1430
1431        // Only assume_retries (for idempotence), not assume_ordering.
1432        // The fold hook in the simulator handles ordering non-determinism directly.
1433        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1434        let retried: Stream<T, L::DropConsistency, B, O, ExactlyOnce> = self.assume_retries(nondet);
1435
1436        let core = HydroNode::Fold {
1437            init,
1438            acc: comb.into(),
1439            input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1440            metadata: retried
1441                .location
1442                .new_node_metadata(Singleton::<A, L::DropConsistency, B2>::collection_kind()),
1443            // we do not guarantee consistency at this point because if the algebraic properties
1444            // do not hold in practice, replica consistency may fail to be maintained, so we
1445            // would like the simulator to assert consistency; in the future, this will be dynamic
1446            // based on the proof mechanism
1447        };
1448
1449        Singleton::new(retried.location.clone(), core)
1450            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1451    }
1452
1453    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1454    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1455    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1456    /// reference, so that it can be modified in place.
1457    ///
1458    /// Depending on the input stream guarantees, the closure may need to be commutative
1459    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1460    ///
1461    /// # Example
1462    /// ```rust
1463    /// # #[cfg(feature = "deploy")] {
1464    /// # use hydro_lang::prelude::*;
1465    /// # use futures::StreamExt;
1466    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1467    /// let bools = process.source_iter(q!(vec![false, true, false]));
1468    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1469    /// # }, |mut stream| async move {
1470    /// // true
1471    /// # assert_eq!(stream.next().await.unwrap(), true);
1472    /// # }));
1473    /// # }
1474    /// ```
1475    pub fn reduce<F, C, Idemp>(
1476        self,
1477        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1478    ) -> Optional<T, L, B>
1479    where
1480        F: Fn(&mut T, T) + 'a,
1481        C: ValidCommutativityFor<O>,
1482        Idemp: ValidIdempotenceFor<R>,
1483    {
1484        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1485        proof.register_proof(&f);
1486
1487        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1488        let ordered_etc: Stream<T, L::DropConsistency, B> =
1489            self.assume_retries(nondet).assume_ordering(nondet);
1490
1491        let core = HydroNode::Reduce {
1492            f: f.into(),
1493            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1494            metadata: ordered_etc
1495                .location
1496                .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
1497        };
1498
1499        Optional::new(ordered_etc.location.clone(), core)
1500            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1501    }
1502
1503    /// Computes the maximum element in the stream as an [`Optional`], which
1504    /// will be empty until the first element in the input arrives.
1505    ///
1506    /// # Example
1507    /// ```rust
1508    /// # #[cfg(feature = "deploy")] {
1509    /// # use hydro_lang::prelude::*;
1510    /// # use futures::StreamExt;
1511    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1512    /// let tick = process.tick();
1513    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1514    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1515    /// batch.max().all_ticks()
1516    /// # }, |mut stream| async move {
1517    /// // 4
1518    /// # assert_eq!(stream.next().await.unwrap(), 4);
1519    /// # }));
1520    /// # }
1521    /// ```
1522    pub fn max(self) -> Optional<T, L, B>
1523    where
1524        T: Ord,
1525    {
1526        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1527            .assume_ordering_trusted_bounded::<TotalOrder>(
1528                nondet!(/** max is commutative, but order affects intermediates */),
1529            )
1530            .reduce(q!(|curr, new| {
1531                if new > *curr {
1532                    *curr = new;
1533                }
1534            }))
1535    }
1536
1537    /// Computes the minimum element in the stream as an [`Optional`], which
1538    /// will be empty until the first element in the input arrives.
1539    ///
1540    /// # Example
1541    /// ```rust
1542    /// # #[cfg(feature = "deploy")] {
1543    /// # use hydro_lang::prelude::*;
1544    /// # use futures::StreamExt;
1545    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1546    /// let tick = process.tick();
1547    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1548    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1549    /// batch.min().all_ticks()
1550    /// # }, |mut stream| async move {
1551    /// // 1
1552    /// # assert_eq!(stream.next().await.unwrap(), 1);
1553    /// # }));
1554    /// # }
1555    /// ```
1556    pub fn min(self) -> Optional<T, L, B>
1557    where
1558        T: Ord,
1559    {
1560        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1561            .assume_ordering_trusted_bounded::<TotalOrder>(
1562                nondet!(/** max is commutative, but order affects intermediates */),
1563            )
1564            .reduce(q!(|curr, new| {
1565                if new < *curr {
1566                    *curr = new;
1567                }
1568            }))
1569    }
1570
1571    /// Computes the first element in the stream as an [`Optional`], which
1572    /// will be empty until the first element in the input arrives.
1573    ///
1574    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1575    /// re-ordering of elements may cause the first element to change.
1576    ///
1577    /// # Example
1578    /// ```rust
1579    /// # #[cfg(feature = "deploy")] {
1580    /// # use hydro_lang::prelude::*;
1581    /// # use futures::StreamExt;
1582    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1583    /// let tick = process.tick();
1584    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1585    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1586    /// batch.first().all_ticks()
1587    /// # }, |mut stream| async move {
1588    /// // 1
1589    /// # assert_eq!(stream.next().await.unwrap(), 1);
1590    /// # }));
1591    /// # }
1592    /// ```
1593    pub fn first(self) -> Optional<T, L, B>
1594    where
1595        O: IsOrdered,
1596    {
1597        self.make_totally_ordered()
1598            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1599            .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1600            .reduce(q!(|_, _| {}))
1601    }
1602
1603    /// Computes the last element in the stream as an [`Optional`], which
1604    /// will be empty until an element in the input arrives.
1605    ///
1606    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1607    /// re-ordering of elements may cause the last element to change.
1608    ///
1609    /// # Example
1610    /// ```rust
1611    /// # #[cfg(feature = "deploy")] {
1612    /// # use hydro_lang::prelude::*;
1613    /// # use futures::StreamExt;
1614    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1615    /// let tick = process.tick();
1616    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1617    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1618    /// batch.last().all_ticks()
1619    /// # }, |mut stream| async move {
1620    /// // 4
1621    /// # assert_eq!(stream.next().await.unwrap(), 4);
1622    /// # }));
1623    /// # }
1624    /// ```
1625    pub fn last(self) -> Optional<T, L, B>
1626    where
1627        O: IsOrdered,
1628    {
1629        self.make_totally_ordered()
1630            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1631            .reduce(q!(|curr, new| *curr = new))
1632    }
1633
1634    /// Returns a stream containing at most the first `n` elements of the input stream,
1635    /// preserving the original order. Similar to `LIMIT` in SQL.
1636    ///
1637    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1638    /// retries, since the result depends on the order and cardinality of elements.
1639    ///
1640    /// # Example
1641    /// ```rust
1642    /// # #[cfg(feature = "deploy")] {
1643    /// # use hydro_lang::prelude::*;
1644    /// # use futures::StreamExt;
1645    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1646    /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1647    /// numbers.limit(q!(3))
1648    /// # }, |mut stream| async move {
1649    /// // 10, 20, 30
1650    /// # for w in vec![10, 20, 30] {
1651    /// #     assert_eq!(stream.next().await.unwrap(), w);
1652    /// # }
1653    /// # }));
1654    /// # }
1655    /// ```
1656    pub fn limit(
1657        self,
1658        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1659    ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1660    where
1661        O: IsOrdered,
1662        R: IsExactlyOnce,
1663    {
1664        self.generator(
1665            q!(|| 0usize),
1666            q!(move |count, item| {
1667                if *count == n {
1668                    Generate::Break
1669                } else {
1670                    *count += 1;
1671                    if *count == n {
1672                        Generate::Return(item)
1673                    } else {
1674                        Generate::Yield(item)
1675                    }
1676                }
1677            }),
1678        )
1679    }
1680
1681    /// Collects all the elements of this stream into a single [`Vec`] element.
1682    ///
1683    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1684    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1685    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1686    /// the vector at an arbitrary point in time.
1687    ///
1688    /// # Example
1689    /// ```rust
1690    /// # #[cfg(feature = "deploy")] {
1691    /// # use hydro_lang::prelude::*;
1692    /// # use futures::StreamExt;
1693    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1694    /// let tick = process.tick();
1695    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1696    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1697    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1698    /// # }, |mut stream| async move {
1699    /// // [ vec![1, 2, 3, 4] ]
1700    /// # for w in vec![vec![1, 2, 3, 4]] {
1701    /// #     assert_eq!(stream.next().await.unwrap(), w);
1702    /// # }
1703    /// # }));
1704    /// # }
1705    /// ```
1706    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1707    where
1708        O: IsOrdered,
1709        R: IsExactlyOnce,
1710    {
1711        self.make_totally_ordered().make_exactly_once().fold(
1712            q!(|| vec![]),
1713            q!(|acc, v| {
1714                acc.push(v);
1715            }),
1716        )
1717    }
1718
1719    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1720    /// and emitting each intermediate result.
1721    ///
1722    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1723    /// containing all intermediate accumulated values. The scan operation can also terminate early
1724    /// by returning `None`.
1725    ///
1726    /// The function takes a mutable reference to the accumulator and the current element, and returns
1727    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1728    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1729    ///
1730    /// # Examples
1731    ///
1732    /// Basic usage - running sum:
1733    /// ```rust
1734    /// # #[cfg(feature = "deploy")] {
1735    /// # use hydro_lang::prelude::*;
1736    /// # use futures::StreamExt;
1737    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1738    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1739    ///     q!(|| 0),
1740    ///     q!(|acc, x| {
1741    ///         *acc += x;
1742    ///         Some(*acc)
1743    ///     }),
1744    /// )
1745    /// # }, |mut stream| async move {
1746    /// // Output: 1, 3, 6, 10
1747    /// # for w in vec![1, 3, 6, 10] {
1748    /// #     assert_eq!(stream.next().await.unwrap(), w);
1749    /// # }
1750    /// # }));
1751    /// # }
1752    /// ```
1753    ///
1754    /// Early termination example:
1755    /// ```rust
1756    /// # #[cfg(feature = "deploy")] {
1757    /// # use hydro_lang::prelude::*;
1758    /// # use futures::StreamExt;
1759    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1760    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1761    ///     q!(|| 1),
1762    ///     q!(|state, x| {
1763    ///         *state = *state * x;
1764    ///         if *state > 6 {
1765    ///             None // Terminate the stream
1766    ///         } else {
1767    ///             Some(-*state)
1768    ///         }
1769    ///     }),
1770    /// )
1771    /// # }, |mut stream| async move {
1772    /// // Output: -1, -2, -6
1773    /// # for w in vec![-1, -2, -6] {
1774    /// #     assert_eq!(stream.next().await.unwrap(), w);
1775    /// # }
1776    /// # }));
1777    /// # }
1778    /// ```
1779    pub fn scan<A, U, I, F>(
1780        self,
1781        init: impl IntoQuotedMut<'a, I, L>,
1782        f: impl IntoQuotedMut<'a, F, L>,
1783    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1784    where
1785        O: IsOrdered,
1786        R: IsExactlyOnce,
1787        I: Fn() -> A + 'a,
1788        F: Fn(&mut A, T) -> Option<U> + 'a,
1789    {
1790        let init = init.splice_fn0_ctx(&self.location).into();
1791        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1792
1793        Stream::new(
1794            self.location.clone(),
1795            HydroNode::Scan {
1796                init,
1797                acc: f,
1798                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1799                metadata: self.location.new_node_metadata(
1800                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1801                ),
1802            },
1803        )
1804    }
1805
1806    /// Async version of [`Stream::scan`]. Applies an async function to each element of the
1807    /// stream, maintaining an internal state (accumulator) and emitting the values returned
1808    /// by the function.
1809    ///
1810    /// The closure runs synchronously (so it can mutate the accumulator), then returns a
1811    /// future. The future is polled to completion. If it resolves to `Some`, the value is
1812    /// emitted. If it resolves to `None`, the item is filtered out.
1813    ///
1814    /// # Examples
1815    ///
1816    /// ```rust
1817    /// # #[cfg(feature = "deploy")] {
1818    /// # use hydro_lang::prelude::*;
1819    /// # use futures::StreamExt;
1820    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1821    /// process
1822    ///     .source_iter(q!(vec![1, 2, 3, 4]))
1823    ///     .scan_async_blocking(
1824    ///         q!(|| 0),
1825    ///         q!(|acc, x| {
1826    ///             *acc += x;
1827    ///             let val = *acc;
1828    ///             async move { Some(val) }
1829    ///         }),
1830    ///     )
1831    /// # }, |mut stream| async move {
1832    /// // Output: 1, 3, 6, 10
1833    /// # for w in vec![1, 3, 6, 10] {
1834    /// #     assert_eq!(stream.next().await.unwrap(), w);
1835    /// # }
1836    /// # }));
1837    /// # }
1838    /// ```
1839    pub fn scan_async_blocking<A, U, I, F, Fut>(
1840        self,
1841        init: impl IntoQuotedMut<'a, I, L>,
1842        f: impl IntoQuotedMut<'a, F, L>,
1843    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1844    where
1845        O: IsOrdered,
1846        R: IsExactlyOnce,
1847        I: Fn() -> A + 'a,
1848        F: Fn(&mut A, T) -> Fut + 'a,
1849        Fut: Future<Output = Option<U>> + 'a,
1850    {
1851        let init = init.splice_fn0_ctx(&self.location).into();
1852        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1853
1854        Stream::new(
1855            self.location.clone(),
1856            HydroNode::ScanAsyncBlocking {
1857                init,
1858                acc: f,
1859                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1860                metadata: self.location.new_node_metadata(
1861                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1862                ),
1863            },
1864        )
1865    }
1866
1867    /// Iteratively processes the elements of the stream using a state machine that can yield
1868    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1869    /// syntax in Rust, without requiring special syntax.
1870    ///
1871    /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1872    /// state. The second argument defines the processing logic, taking in a mutable reference
1873    /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1874    /// variants define what is emitted and whether further inputs should be processed.
1875    ///
1876    /// # Example
1877    /// ```rust
1878    /// # #[cfg(feature = "deploy")] {
1879    /// # use hydro_lang::prelude::*;
1880    /// # use futures::StreamExt;
1881    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1882    /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1883    ///     q!(|| 0),
1884    ///     q!(|acc, x| {
1885    ///         *acc += x;
1886    ///         if *acc > 100 {
1887    ///             hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1888    ///         } else if *acc % 2 == 0 {
1889    ///             hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1890    ///         } else {
1891    ///             hydro_lang::live_collections::keyed_stream::Generate::Continue
1892    ///         }
1893    ///     }),
1894    /// )
1895    /// # }, |mut stream| async move {
1896    /// // Output: "even", "done!"
1897    /// # let mut results = Vec::new();
1898    /// # for _ in 0..2 {
1899    /// #     results.push(stream.next().await.unwrap());
1900    /// # }
1901    /// # results.sort();
1902    /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1903    /// # }));
1904    /// # }
1905    /// ```
1906    pub fn generator<A, U, I, F>(
1907        self,
1908        init: impl IntoQuotedMut<'a, I, L> + Copy,
1909        f: impl IntoQuotedMut<'a, F, L> + Copy,
1910    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1911    where
1912        O: IsOrdered,
1913        R: IsExactlyOnce,
1914        I: Fn() -> A + 'a,
1915        F: Fn(&mut A, T) -> Generate<U> + 'a,
1916    {
1917        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1918        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1919
1920        let this = self.make_totally_ordered().make_exactly_once();
1921
1922        // State is Option<Option<A>>:
1923        //   None = not yet initialized
1924        //   Some(Some(a)) = active with state a
1925        //   Some(None) = terminated
1926        let scan_init = q!(|| None)
1927            .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1928            .into();
1929        let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1930            if state.is_none() {
1931                *state = Some(Some(init()));
1932            }
1933            match state {
1934                Some(Some(state_value)) => match f(state_value, v) {
1935                    Generate::Yield(out) => Some(Some(out)),
1936                    Generate::Return(out) => {
1937                        *state = Some(None);
1938                        Some(Some(out))
1939                    }
1940                    // Unlike KeyedStream, we can terminate the scan directly on
1941                    // Break/Return because there is only one state (no other keys
1942                    // that still need processing).
1943                    Generate::Break => None,
1944                    Generate::Continue => Some(None),
1945                },
1946                // State is Some(None) after Return; terminate the scan.
1947                _ => None,
1948            }
1949        })
1950        .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1951        .into();
1952
1953        let scan_node = HydroNode::Scan {
1954            init: scan_init,
1955            acc: scan_f,
1956            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1957            metadata: this.location.new_node_metadata(Stream::<
1958                Option<U>,
1959                L,
1960                B,
1961                TotalOrder,
1962                ExactlyOnce,
1963            >::collection_kind()),
1964        };
1965
1966        let flatten_f = q!(|d| d)
1967            .splice_fn1_ctx::<Option<U>, _>(&this.location)
1968            .into();
1969        let flatten_node = HydroNode::FlatMap {
1970            f: flatten_f,
1971            input: Box::new(scan_node),
1972            metadata: this
1973                .location
1974                .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1975        };
1976
1977        Stream::new(this.location.clone(), flatten_node)
1978    }
1979
1980    /// Given a time interval, returns a stream corresponding to samples taken from the
1981    /// stream roughly at that interval. The output will have elements in the same order
1982    /// as the input, but with arbitrary elements skipped between samples. There is also
1983    /// no guarantee on the exact timing of the samples.
1984    ///
1985    /// # Non-Determinism
1986    /// The output stream is non-deterministic in which elements are sampled, since this
1987    /// is controlled by a clock.
1988    #[cfg(feature = "tokio")]
1989    pub fn sample_every(
1990        self,
1991        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1992        nondet: NonDet,
1993    ) -> Stream<T, L::DropConsistency, Unbounded, O, AtLeastOnce>
1994    where
1995        L: TopLevel<'a>,
1996    {
1997        let samples = self.location.source_interval(interval);
1998
1999        let tick = self.location.tick();
2000        self.batch(&tick, nondet)
2001            .filter_if(samples.batch(&tick, nondet).first().is_some())
2002            .all_ticks()
2003            .weaken_retries()
2004    }
2005
2006    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
2007    /// stream has not emitted a value since that duration.
2008    ///
2009    /// # Non-Determinism
2010    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2011    /// samples take place, timeouts may be non-deterministically generated or missed,
2012    /// and the notification of the timeout may be delayed as well. There is also no
2013    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2014    /// detected based on when the next sample is taken.
2015    #[cfg(feature = "tokio")]
2016    pub fn timeout(
2017        self,
2018        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::DropConsistency>> + Copy + 'a,
2019        nondet: NonDet,
2020    ) -> Optional<(), L::DropConsistency, Unbounded>
2021    where
2022        L: TopLevel<'a>,
2023    {
2024        let tick = self.location.tick();
2025
2026        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
2027            q!(|| None),
2028            q!(
2029                |latest, _| {
2030                    *latest = Some(Instant::now());
2031                },
2032                commutative = manual_proof!(/** TODO */)
2033            ),
2034        );
2035
2036        latest_received
2037            .snapshot(&tick, nondet)
2038            .filter_map(q!(move |latest_received| {
2039                if let Some(latest_received) = latest_received {
2040                    if Instant::now().duration_since(latest_received) > duration {
2041                        Some(())
2042                    } else {
2043                        None
2044                    }
2045                } else {
2046                    Some(())
2047                }
2048            }))
2049            .latest()
2050    }
2051
2052    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2053    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2054    ///
2055    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2056    /// processed before an acknowledgement is emitted.
2057    pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
2058        let id = self.location.flow_state().borrow_mut().next_clock_id();
2059        let out_location = Atomic {
2060            tick: Tick {
2061                id,
2062                l: self.location.clone(),
2063            },
2064        };
2065        Stream::new(
2066            out_location.clone(),
2067            HydroNode::BeginAtomic {
2068                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2069                metadata: out_location
2070                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2071            },
2072        )
2073    }
2074
2075    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2076    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2077    /// the order of the input. The output stream will execute in the [`Tick`] that was
2078    /// used to create the atomic section.
2079    ///
2080    /// # Non-Determinism
2081    /// The batch boundaries are non-deterministic and may change across executions.
2082    pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2083        self,
2084        tick: &Tick<L2>,
2085        _nondet: NonDet,
2086    ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2087        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2088        Stream::new(
2089            tick.drop_consistency(),
2090            HydroNode::Batch {
2091                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2092                metadata: tick
2093                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2094            },
2095        )
2096    }
2097
2098    /// An operator which allows you to "name" a `HydroNode`.
2099    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
2100    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
2101        {
2102            let mut node = self.ir_node.borrow_mut();
2103            let metadata = node.metadata_mut();
2104            metadata.tag = Some(name.to_owned());
2105        }
2106        self
2107    }
2108
2109    /// Turns this [`Stream`] into a [`Optional`], under the invariant assumption that there is at
2110    /// most one element. If this invariant is broken, the program may exhibit undefined behavior,
2111    /// so uses must be carefully vetted.
2112    pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
2113    where
2114        B: IsBounded,
2115    {
2116        Optional::new(
2117            self.location.clone(),
2118            HydroNode::Cast {
2119                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2120                metadata: self
2121                    .location
2122                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
2123            },
2124        )
2125    }
2126
2127    pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
2128        if O::ORDERING_KIND == O2::ORDERING_KIND {
2129            Stream::new(
2130                self.location.clone(),
2131                self.ir_node.replace(HydroNode::Placeholder),
2132            )
2133        } else {
2134            panic!(
2135                "Runtime ordering {:?} did not match requested cast {:?}.",
2136                O::ORDERING_KIND,
2137                O2::ORDERING_KIND
2138            )
2139        }
2140    }
2141
2142    /// Explicitly "casts" the stream to a type with a different ordering
2143    /// guarantee. Useful in unsafe code where the ordering cannot be proven
2144    /// by the type-system.
2145    ///
2146    /// # Non-Determinism
2147    /// This function is used as an escape hatch, and any mistakes in the
2148    /// provided ordering guarantee will propagate into the guarantees
2149    /// for the rest of the program.
2150    pub fn assume_ordering<O2: Ordering>(
2151        self,
2152        _nondet: NonDet,
2153    ) -> Stream<T, L::DropConsistency, B, O2, R> {
2154        if O::ORDERING_KIND == O2::ORDERING_KIND {
2155            self.use_ordering_type().weaken_consistency()
2156        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2157            // We can always weaken the ordering guarantee
2158            let target_location = self.location().drop_consistency();
2159            Stream::new(
2160                target_location.clone(),
2161                HydroNode::Cast {
2162                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2163                    metadata: target_location
2164                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2165                },
2166            )
2167        } else {
2168            let target_location = self.location().drop_consistency();
2169            Stream::new(
2170                target_location.clone(),
2171                HydroNode::ObserveNonDet {
2172                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2173                    trusted: false,
2174                    metadata: target_location
2175                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2176                },
2177            )
2178        }
2179    }
2180
2181    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
2182    // intermediate states will not be revealed
2183    fn assume_ordering_trusted_bounded<O2: Ordering>(
2184        self,
2185        nondet: NonDet,
2186    ) -> Stream<T, L, B, O2, R> {
2187        if B::BOUNDED {
2188            self.assume_ordering_trusted(nondet)
2189        } else {
2190            let self_location = self.location.clone();
2191            let inner: Stream<T, L::DropConsistency, B, O2, R> = self.assume_ordering(nondet);
2192            Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2193        }
2194    }
2195
2196    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2197    // is not observable
2198    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2199        self,
2200        _nondet: NonDet,
2201    ) -> Stream<T, L, B, O2, R> {
2202        if O::ORDERING_KIND == O2::ORDERING_KIND {
2203            self.use_ordering_type()
2204        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2205            // We can always weaken the ordering guarantee
2206            Stream::new(
2207                self.location.clone(),
2208                HydroNode::Cast {
2209                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2210                    metadata: self
2211                        .location
2212                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2213                },
2214            )
2215        } else {
2216            Stream::new(
2217                self.location.clone(),
2218                HydroNode::ObserveNonDet {
2219                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2220                    trusted: true,
2221                    metadata: self
2222                        .location
2223                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2224                },
2225            )
2226        }
2227    }
2228
2229    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2230    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
2231    /// which is always safe because that is the weakest possible guarantee.
2232    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2233        self.weaken_ordering::<NoOrder>()
2234    }
2235
2236    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
2237    /// enforcing that `O2` is weaker than the input ordering guarantee.
2238    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2239        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
2240        self.assume_ordering_trusted::<O2>(nondet)
2241    }
2242
2243    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
2244    /// implies that `O == TotalOrder`.
2245    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2246    where
2247        O: IsOrdered,
2248    {
2249        self.assume_ordering_trusted(nondet!(/** no-op */))
2250    }
2251
2252    /// Explicitly "casts" the stream to a type with a different retries
2253    /// guarantee. Useful in unsafe code where the lack of retries cannot
2254    /// be proven by the type-system.
2255    ///
2256    /// # Non-Determinism
2257    /// This function is used as an escape hatch, and any mistakes in the
2258    /// provided retries guarantee will propagate into the guarantees
2259    /// for the rest of the program.
2260    pub fn assume_retries<R2: Retries>(
2261        self,
2262        _nondet: NonDet,
2263    ) -> Stream<T, L::DropConsistency, B, O, R2> {
2264        if R::RETRIES_KIND == R2::RETRIES_KIND {
2265            Stream::new(
2266                self.location.drop_consistency(),
2267                self.ir_node.replace(HydroNode::Placeholder),
2268            )
2269        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2270            // We can always weaken the retries guarantee
2271            let target_location = self.location.drop_consistency();
2272            Stream::new(
2273                target_location.clone(),
2274                HydroNode::Cast {
2275                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2276                    metadata: target_location
2277                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2278                },
2279            )
2280        } else {
2281            let target_location = self.location.drop_consistency();
2282            Stream::new(
2283                target_location.clone(),
2284                HydroNode::ObserveNonDet {
2285                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2286                    trusted: false,
2287                    metadata: target_location
2288                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2289                },
2290            )
2291        }
2292    }
2293
2294    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2295    // is not observable
2296    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2297        if R::RETRIES_KIND == R2::RETRIES_KIND {
2298            Stream::new(
2299                self.location.clone(),
2300                self.ir_node.replace(HydroNode::Placeholder),
2301            )
2302        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2303            // We can always weaken the retries guarantee
2304            Stream::new(
2305                self.location.clone(),
2306                HydroNode::Cast {
2307                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2308                    metadata: self
2309                        .location
2310                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2311                },
2312            )
2313        } else {
2314            Stream::new(
2315                self.location.clone(),
2316                HydroNode::ObserveNonDet {
2317                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2318                    trusted: true,
2319                    metadata: self
2320                        .location
2321                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2322                },
2323            )
2324        }
2325    }
2326
2327    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2328    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2329    /// which is always safe because that is the weakest possible guarantee.
2330    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2331        self.weaken_retries::<AtLeastOnce>()
2332    }
2333
2334    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2335    /// enforcing that `R2` is weaker than the input retries guarantee.
2336    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2337        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2338        self.assume_retries_trusted::<R2>(nondet)
2339    }
2340
2341    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2342    /// implies that `R == ExactlyOnce`.
2343    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2344    where
2345        R: IsExactlyOnce,
2346    {
2347        self.assume_retries_trusted(nondet!(/** no-op */))
2348    }
2349
2350    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2351    /// implies that `B == Bounded`.
2352    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2353    where
2354        B: IsBounded,
2355    {
2356        self.weaken_boundedness()
2357    }
2358
2359    /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
2360    /// which implies that `B == Bounded`.
2361    pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2362        if B::BOUNDED == B2::BOUNDED {
2363            Stream::new(
2364                self.location.clone(),
2365                self.ir_node.replace(HydroNode::Placeholder),
2366            )
2367        } else {
2368            // We can always weaken the boundedness
2369            Stream::new(
2370                self.location.clone(),
2371                HydroNode::Cast {
2372                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2373                    metadata: self
2374                        .location
2375                        .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2376                },
2377            )
2378        }
2379    }
2380}
2381
2382impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2383where
2384    L: Location<'a>,
2385{
2386    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2387    ///
2388    /// # Example
2389    /// ```rust
2390    /// # #[cfg(feature = "deploy")] {
2391    /// # use hydro_lang::prelude::*;
2392    /// # use futures::StreamExt;
2393    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2394    /// process.source_iter(q!(&[1, 2, 3])).cloned()
2395    /// # }, |mut stream| async move {
2396    /// // 1, 2, 3
2397    /// # for w in vec![1, 2, 3] {
2398    /// #     assert_eq!(stream.next().await.unwrap(), w);
2399    /// # }
2400    /// # }));
2401    /// # }
2402    /// ```
2403    pub fn cloned(self) -> Stream<T, L, B, O, R>
2404    where
2405        T: Clone,
2406    {
2407        self.map(q!(|d| d.clone()))
2408    }
2409}
2410
2411impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2412where
2413    L: Location<'a>,
2414{
2415    /// Computes the number of elements in the stream as a [`Singleton`].
2416    ///
2417    /// # Example
2418    /// ```rust
2419    /// # #[cfg(feature = "deploy")] {
2420    /// # use hydro_lang::prelude::*;
2421    /// # use futures::StreamExt;
2422    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2423    /// let tick = process.tick();
2424    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2425    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2426    /// batch.count().all_ticks()
2427    /// # }, |mut stream| async move {
2428    /// // 4
2429    /// # assert_eq!(stream.next().await.unwrap(), 4);
2430    /// # }));
2431    /// # }
2432    /// ```
2433    pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2434        self.assume_ordering_trusted::<TotalOrder>(nondet!(
2435            /// Order does not affect eventual count, and also does not affect intermediate states.
2436        ))
2437        .fold(
2438            q!(|| 0usize),
2439            q!(
2440                |count, _| *count += 1,
2441                monotone = manual_proof!(/** += 1 is monotone */)
2442            ),
2443        )
2444    }
2445}
2446
2447impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2448    /// Produces a new stream that merges the elements of the two input streams.
2449    /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2450    ///
2451    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2452    /// [`Bounded`], you can use [`Stream::chain`] instead.
2453    ///
2454    /// # Example
2455    /// ```rust
2456    /// # #[cfg(feature = "deploy")] {
2457    /// # use hydro_lang::prelude::*;
2458    /// # use futures::StreamExt;
2459    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2460    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2461    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2462    /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2463    /// # }, |mut stream| async move {
2464    /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2465    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2466    /// #     assert_eq!(stream.next().await.unwrap(), w);
2467    /// # }
2468    /// # }));
2469    /// # }
2470    /// ```
2471    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2472        self,
2473        other: Stream<T, L, Unbounded, O2, R2>,
2474    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2475    where
2476        R: MinRetries<R2>,
2477    {
2478        Stream::new(
2479            self.location.clone(),
2480            HydroNode::Chain {
2481                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2482                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2483                metadata: self.location.new_node_metadata(Stream::<
2484                    T,
2485                    L,
2486                    Unbounded,
2487                    NoOrder,
2488                    <R as MinRetries<R2>>::Min,
2489                >::collection_kind()),
2490            },
2491        )
2492    }
2493
2494    /// Deprecated: use [`Stream::merge_unordered`] instead.
2495    #[deprecated(note = "use `merge_unordered` instead")]
2496    pub fn interleave<O2: Ordering, R2: Retries>(
2497        self,
2498        other: Stream<T, L, Unbounded, O2, R2>,
2499    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2500    where
2501        R: MinRetries<R2>,
2502    {
2503        self.merge_unordered(other)
2504    }
2505}
2506
2507impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2508    /// Produces a new stream that combines the elements of the two input streams,
2509    /// preserving the relative order of elements within each input.
2510    ///
2511    /// # Non-Determinism
2512    /// The order in which elements *across* the two streams will be interleaved is
2513    /// non-deterministic, so the order of elements will vary across runs. If the output
2514    /// order is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic
2515    /// but emits an unordered stream. For deterministic first-then-second ordering on
2516    /// bounded streams, use [`Stream::chain`].
2517    ///
2518    /// # Example
2519    /// ```rust
2520    /// # #[cfg(feature = "deploy")] {
2521    /// # use hydro_lang::prelude::*;
2522    /// # use futures::StreamExt;
2523    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2524    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2525    /// # process.source_iter(q!(vec![1, 3])).into();
2526    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2527    /// # }, |mut stream| async move {
2528    /// // 1, 3 and 2, 4 in some order, preserving the original local order
2529    /// # for w in vec![1, 3, 2, 4] {
2530    /// #     assert_eq!(stream.next().await.unwrap(), w);
2531    /// # }
2532    /// # }));
2533    /// # }
2534    /// ```
2535    pub fn merge_ordered<R2: Retries>(
2536        self,
2537        other: Stream<T, L, B, TotalOrder, R2>,
2538        _nondet: NonDet,
2539    ) -> Stream<T, L::DropConsistency, B, TotalOrder, <R as MinRetries<R2>>::Min>
2540    where
2541        R: MinRetries<R2>,
2542    {
2543        let target_location = self.location().drop_consistency();
2544        Stream::new(
2545            target_location.clone(),
2546            HydroNode::MergeOrdered {
2547                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2548                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2549                metadata: target_location.new_node_metadata(Stream::<
2550                    T,
2551                    L::DropConsistency,
2552                    B,
2553                    TotalOrder,
2554                    <R as MinRetries<R2>>::Min,
2555                >::collection_kind()),
2556            },
2557        )
2558    }
2559}
2560
2561impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2562where
2563    L: Location<'a>,
2564{
2565    /// Produces a new stream that emits the input elements in sorted order.
2566    ///
2567    /// The input stream can have any ordering guarantee, but the output stream
2568    /// will have a [`TotalOrder`] guarantee. This operator will block until all
2569    /// elements in the input stream are available, so it requires the input stream
2570    /// to be [`Bounded`].
2571    ///
2572    /// # Example
2573    /// ```rust
2574    /// # #[cfg(feature = "deploy")] {
2575    /// # use hydro_lang::prelude::*;
2576    /// # use futures::StreamExt;
2577    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2578    /// let tick = process.tick();
2579    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2580    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2581    /// batch.sort().all_ticks()
2582    /// # }, |mut stream| async move {
2583    /// // 1, 2, 3, 4
2584    /// # for w in (1..5) {
2585    /// #     assert_eq!(stream.next().await.unwrap(), w);
2586    /// # }
2587    /// # }));
2588    /// # }
2589    /// ```
2590    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2591    where
2592        B: IsBounded,
2593        T: Ord,
2594    {
2595        let this = self.make_bounded();
2596        Stream::new(
2597            this.location.clone(),
2598            HydroNode::Sort {
2599                input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2600                metadata: this
2601                    .location
2602                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2603            },
2604        )
2605    }
2606
2607    /// Produces a new stream that first emits the elements of the `self` stream,
2608    /// and then emits the elements of the `other` stream. The output stream has
2609    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2610    /// [`TotalOrder`] guarantee.
2611    ///
2612    /// Currently, both input streams must be [`Bounded`]. This operator will block
2613    /// on the first stream until all its elements are available. In a future version,
2614    /// we will relax the requirement on the `other` stream.
2615    ///
2616    /// # Example
2617    /// ```rust
2618    /// # #[cfg(feature = "deploy")] {
2619    /// # use hydro_lang::prelude::*;
2620    /// # use futures::StreamExt;
2621    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2622    /// let tick = process.tick();
2623    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2624    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2625    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2626    /// # }, |mut stream| async move {
2627    /// // 2, 3, 4, 5, 1, 2, 3, 4
2628    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2629    /// #     assert_eq!(stream.next().await.unwrap(), w);
2630    /// # }
2631    /// # }));
2632    /// # }
2633    /// ```
2634    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2635        self,
2636        other: Stream<T, L, B2, O2, R2>,
2637    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2638    where
2639        B: IsBounded,
2640        O: MinOrder<O2>,
2641        R: MinRetries<R2>,
2642    {
2643        check_matching_location(&self.location, &other.location);
2644
2645        Stream::new(
2646            self.location.clone(),
2647            HydroNode::Chain {
2648                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2649                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2650                metadata: self.location.new_node_metadata(Stream::<
2651                    T,
2652                    L,
2653                    B2,
2654                    <O as MinOrder<O2>>::Min,
2655                    <R as MinRetries<R2>>::Min,
2656                >::collection_kind()),
2657            },
2658        )
2659    }
2660
2661    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2662    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2663    /// because this is compiled into a nested loop.
2664    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>, R2: Retries>(
2665        self,
2666        other: Stream<T2, L, Bounded, O2, R2>,
2667    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, <R as MinRetries<R2>>::Min>
2668    where
2669        B: IsBounded,
2670        T: Clone,
2671        T2: Clone,
2672        R: MinRetries<R2>,
2673    {
2674        let this = self.make_bounded();
2675        check_matching_location(&this.location, &other.location);
2676
2677        Stream::new(
2678            this.location.clone(),
2679            HydroNode::CrossProduct {
2680                left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2681                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2682                metadata: this.location.new_node_metadata(Stream::<
2683                    (T, T2),
2684                    L,
2685                    Bounded,
2686                    <O2 as MinOrder<O>>::Min,
2687                    <R as MinRetries<R2>>::Min,
2688                >::collection_kind()),
2689            },
2690        )
2691    }
2692
2693    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2694    /// `self` used as the values for *each* key.
2695    ///
2696    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2697    /// values. For example, it can be used to send the same set of elements to several cluster
2698    /// members, if the membership information is available as a [`KeyedSingleton`].
2699    ///
2700    /// # Example
2701    /// ```rust
2702    /// # #[cfg(feature = "deploy")] {
2703    /// # use hydro_lang::prelude::*;
2704    /// # use futures::StreamExt;
2705    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2706    /// # let tick = process.tick();
2707    /// let keyed_singleton = // { 1: (), 2: () }
2708    /// # process
2709    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2710    /// #     .into_keyed()
2711    /// #     .batch(&tick, nondet!(/** test */))
2712    /// #     .first();
2713    /// let stream = // [ "a", "b" ]
2714    /// # process
2715    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2716    /// #     .batch(&tick, nondet!(/** test */));
2717    /// stream.repeat_with_keys(keyed_singleton)
2718    /// # .entries().all_ticks()
2719    /// # }, |mut stream| async move {
2720    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2721    /// # let mut results = Vec::new();
2722    /// # for _ in 0..4 {
2723    /// #     results.push(stream.next().await.unwrap());
2724    /// # }
2725    /// # results.sort();
2726    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2727    /// # }));
2728    /// # }
2729    /// ```
2730    pub fn repeat_with_keys<K, V2>(
2731        self,
2732        keys: KeyedSingleton<K, V2, L, Bounded>,
2733    ) -> KeyedStream<K, T, L, Bounded, O, R>
2734    where
2735        B: IsBounded,
2736        K: Clone,
2737        T: Clone,
2738    {
2739        keys.keys()
2740            .assume_ordering_trusted::<TotalOrder>(
2741                nondet!(/** keyed stream does not depend on ordering of keys */),
2742            )
2743            .cross_product_nested_loop(self.make_bounded())
2744            .into_keyed()
2745    }
2746
2747    /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2748    /// execution until all results are available. The output order is based on when futures
2749    /// complete, and may be different than the input order.
2750    ///
2751    /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2752    /// while futures are pending, this variant blocks until the futures resolve.
2753    ///
2754    /// # Example
2755    /// ```rust
2756    /// # #[cfg(feature = "deploy")] {
2757    /// # use std::collections::HashSet;
2758    /// # use futures::StreamExt;
2759    /// # use hydro_lang::prelude::*;
2760    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2761    /// process
2762    ///     .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2763    ///     .map(q!(|x| async move {
2764    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2765    ///         x
2766    ///     }))
2767    ///     .resolve_futures_blocking()
2768    /// #   },
2769    /// #   |mut stream| async move {
2770    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2771    /// #       let mut output = HashSet::new();
2772    /// #       for _ in 1..10 {
2773    /// #           output.insert(stream.next().await.unwrap());
2774    /// #       }
2775    /// #       assert_eq!(
2776    /// #           output,
2777    /// #           HashSet::<i32>::from_iter(1..10)
2778    /// #       );
2779    /// #   },
2780    /// # ));
2781    /// # }
2782    /// ```
2783    pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2784    where
2785        T: Future,
2786    {
2787        Stream::new(
2788            self.location.clone(),
2789            HydroNode::ResolveFuturesBlocking {
2790                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2791                metadata: self
2792                    .location
2793                    .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2794            },
2795        )
2796    }
2797
2798    /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2799    ///
2800    /// # Example
2801    /// ```rust
2802    /// # #[cfg(feature = "deploy")] {
2803    /// # use hydro_lang::prelude::*;
2804    /// # use futures::StreamExt;
2805    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2806    /// let tick = process.tick();
2807    /// let empty: Stream<i32, _, Bounded> = process
2808    ///   .source_iter(q!(Vec::<i32>::new()))
2809    ///   .batch(&tick, nondet!(/** test */));
2810    /// empty.is_empty().all_ticks()
2811    /// # }, |mut stream| async move {
2812    /// // true
2813    /// # assert_eq!(stream.next().await.unwrap(), true);
2814    /// # }));
2815    /// # }
2816    /// ```
2817    #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2818    pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2819    where
2820        B: IsBounded,
2821    {
2822        self.make_bounded()
2823            .assume_ordering_trusted::<TotalOrder>(
2824                nondet!(/** is_empty intermediates unaffected by order */),
2825            )
2826            .first()
2827            .is_none()
2828    }
2829}
2830
2831impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2832where
2833    L: Location<'a>,
2834{
2835    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2836    /// by equi-joining the two streams on the key attribute `K`.
2837    ///
2838    /// When the right-hand side is [`Bounded`], the join accumulates the right side first
2839    /// and streams the left side through, preserving the left side's ordering. When both
2840    /// sides are [`Unbounded`], a symmetric hash join is used and ordering is [`NoOrder`].
2841    ///
2842    /// # Example
2843    /// ```rust
2844    /// # #[cfg(feature = "deploy")] {
2845    /// # use hydro_lang::prelude::*;
2846    /// # use std::collections::HashSet;
2847    /// # use futures::StreamExt;
2848    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2849    /// let tick = process.tick();
2850    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2851    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2852    /// stream1.join(stream2)
2853    /// # }, |mut stream| async move {
2854    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2855    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2856    /// # stream.map(|i| assert!(expected.contains(&i)));
2857    /// # }));
2858    /// # }
2859    pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2860        self,
2861        n: Stream<(K, V2), L, B2, O2, R2>,
2862    ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2863    where
2864        K: Eq + Hash + Clone,
2865        R: MinRetries<R2>,
2866        V1: Clone,
2867        V2: Clone,
2868    {
2869        check_matching_location(&self.location, &n.location);
2870
2871        let ir_node = if B2::BOUNDED {
2872            HydroNode::JoinHalf {
2873                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2874                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2875                metadata: self.location.new_node_metadata(Stream::<
2876                    (K, (V1, V2)),
2877                    L,
2878                    B,
2879                    B2::PreserveOrderIfBounded<O>,
2880                    <R as MinRetries<R2>>::Min,
2881                >::collection_kind()),
2882            }
2883        } else {
2884            HydroNode::Join {
2885                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2886                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2887                metadata: self.location.new_node_metadata(Stream::<
2888                    (K, (V1, V2)),
2889                    L,
2890                    B,
2891                    B2::PreserveOrderIfBounded<O>,
2892                    <R as MinRetries<R2>>::Min,
2893                >::collection_kind()),
2894            }
2895        };
2896
2897        Stream::new(self.location.clone(), ir_node)
2898    }
2899
2900    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2901    /// computes the anti-join of the items in the input -- i.e. returns
2902    /// unique items in the first input that do not have a matching key
2903    /// in the second input.
2904    ///
2905    /// # Example
2906    /// ```rust
2907    /// # #[cfg(feature = "deploy")] {
2908    /// # use hydro_lang::prelude::*;
2909    /// # use futures::StreamExt;
2910    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2911    /// let tick = process.tick();
2912    /// let stream = process
2913    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2914    ///   .batch(&tick, nondet!(/** test */));
2915    /// let batch = process
2916    ///   .source_iter(q!(vec![1, 2]))
2917    ///   .batch(&tick, nondet!(/** test */));
2918    /// stream.anti_join(batch).all_ticks()
2919    /// # }, |mut stream| async move {
2920    /// # for w in vec![(3, 'c'), (4, 'd')] {
2921    /// #     assert_eq!(stream.next().await.unwrap(), w);
2922    /// # }
2923    /// # }));
2924    /// # }
2925    pub fn anti_join<O2: Ordering, R2: Retries>(
2926        self,
2927        n: Stream<K, L, Bounded, O2, R2>,
2928    ) -> Stream<(K, V1), L, B, O, R>
2929    where
2930        K: Eq + Hash,
2931    {
2932        check_matching_location(&self.location, &n.location);
2933
2934        Stream::new(
2935            self.location.clone(),
2936            HydroNode::AntiJoin {
2937                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2938                neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2939                metadata: self
2940                    .location
2941                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2942            },
2943        )
2944    }
2945}
2946
2947impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2948    Stream<(K, V), L, B, O, R>
2949{
2950    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2951    /// is used as the key and the second element is added to the entries associated with that key.
2952    ///
2953    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2954    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2955    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2956    /// total ordering _within_ each group but no ordering _across_ groups.
2957    ///
2958    /// # Example
2959    /// ```rust
2960    /// # #[cfg(feature = "deploy")] {
2961    /// # use hydro_lang::prelude::*;
2962    /// # use futures::StreamExt;
2963    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2964    /// process
2965    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2966    ///     .into_keyed()
2967    /// #   .entries()
2968    /// # }, |mut stream| async move {
2969    /// // { 1: [2, 3], 2: [4] }
2970    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2971    /// #     assert_eq!(stream.next().await.unwrap(), w);
2972    /// # }
2973    /// # }));
2974    /// # }
2975    /// ```
2976    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2977        KeyedStream::new(
2978            self.location.clone(),
2979            HydroNode::Cast {
2980                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2981                metadata: self
2982                    .location
2983                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2984            },
2985        )
2986    }
2987}
2988
2989impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2990where
2991    K: Eq + Hash,
2992    L: Location<'a>,
2993{
2994    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2995    /// # Example
2996    /// ```rust
2997    /// # #[cfg(feature = "deploy")] {
2998    /// # use hydro_lang::prelude::*;
2999    /// # use futures::StreamExt;
3000    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3001    /// let tick = process.tick();
3002    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
3003    /// let batch = numbers.batch(&tick, nondet!(/** test */));
3004    /// batch.keys().all_ticks()
3005    /// # }, |mut stream| async move {
3006    /// // 1, 2
3007    /// # assert_eq!(stream.next().await.unwrap(), 1);
3008    /// # assert_eq!(stream.next().await.unwrap(), 2);
3009    /// # }));
3010    /// # }
3011    /// ```
3012    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
3013        self.into_keyed()
3014            .fold(
3015                q!(|| ()),
3016                q!(
3017                    |_, _| {},
3018                    commutative = manual_proof!(/** values are ignored */),
3019                    idempotent = manual_proof!(/** values are ignored */)
3020                ),
3021            )
3022            .keys()
3023    }
3024}
3025
3026impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
3027where
3028    L: Location<'a>,
3029{
3030    /// Returns a stream corresponding to the latest batch of elements being atomically
3031    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
3032    /// the order of the input.
3033    ///
3034    /// # Non-Determinism
3035    /// The batch boundaries are non-deterministic and may change across executions.
3036    pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
3037        self,
3038        tick: &Tick<L2>,
3039        _nondet: NonDet,
3040    ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
3041        Stream::new(
3042            tick.drop_consistency(),
3043            HydroNode::Batch {
3044                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3045                metadata: tick
3046                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3047            },
3048        )
3049    }
3050
3051    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
3052    /// See [`Stream::atomic`] for more details.
3053    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
3054        Stream::new(
3055            self.location.tick.l.clone(),
3056            HydroNode::EndAtomic {
3057                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3058                metadata: self
3059                    .location
3060                    .tick
3061                    .l
3062                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
3063            },
3064        )
3065    }
3066}
3067
3068impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
3069where
3070    L: TopLevel<'a>,
3071    F: Future<Output = T>,
3072{
3073    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3074    /// Future outputs are produced as available, regardless of input arrival order.
3075    ///
3076    /// # Example
3077    /// ```rust
3078    /// # #[cfg(feature = "deploy")] {
3079    /// # use std::collections::HashSet;
3080    /// # use futures::StreamExt;
3081    /// # use hydro_lang::prelude::*;
3082    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3083    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3084    ///     .map(q!(|x| async move {
3085    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3086    ///         x
3087    ///     }))
3088    ///     .resolve_futures()
3089    /// #   },
3090    /// #   |mut stream| async move {
3091    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
3092    /// #       let mut output = HashSet::new();
3093    /// #       for _ in 1..10 {
3094    /// #           output.insert(stream.next().await.unwrap());
3095    /// #       }
3096    /// #       assert_eq!(
3097    /// #           output,
3098    /// #           HashSet::<i32>::from_iter(1..10)
3099    /// #       );
3100    /// #   },
3101    /// # ));
3102    /// # }
3103    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
3104        Stream::new(
3105            self.location.clone(),
3106            HydroNode::ResolveFutures {
3107                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3108                metadata: self
3109                    .location
3110                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
3111            },
3112        )
3113    }
3114
3115    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3116    /// Future outputs are produced in the same order as the input stream.
3117    ///
3118    /// # Example
3119    /// ```rust
3120    /// # #[cfg(feature = "deploy")] {
3121    /// # use std::collections::HashSet;
3122    /// # use futures::StreamExt;
3123    /// # use hydro_lang::prelude::*;
3124    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3125    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3126    ///     .map(q!(|x| async move {
3127    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3128    ///         x
3129    ///     }))
3130    ///     .resolve_futures_ordered()
3131    /// #   },
3132    /// #   |mut stream| async move {
3133    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
3134    /// #       let mut output = Vec::new();
3135    /// #       for _ in 1..10 {
3136    /// #           output.push(stream.next().await.unwrap());
3137    /// #       }
3138    /// #       assert_eq!(
3139    /// #           output,
3140    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
3141    /// #       );
3142    /// #   },
3143    /// # ));
3144    /// # }
3145    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3146        Stream::new(
3147            self.location.clone(),
3148            HydroNode::ResolveFuturesOrdered {
3149                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3150                metadata: self
3151                    .location
3152                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3153            },
3154        )
3155    }
3156}
3157
3158impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3159where
3160    L: Location<'a>,
3161{
3162    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
3163    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3164    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3165        Stream::new(
3166            self.location.outer().clone(),
3167            HydroNode::YieldConcat {
3168                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3169                metadata: self
3170                    .location
3171                    .outer()
3172                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3173            },
3174        )
3175    }
3176
3177    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
3178    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3179    ///
3180    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
3181    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
3182    /// stream's [`Tick`] context.
3183    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3184        let out_location = Atomic {
3185            tick: self.location.clone(),
3186        };
3187
3188        Stream::new(
3189            out_location.clone(),
3190            HydroNode::YieldConcat {
3191                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3192                metadata: out_location
3193                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3194            },
3195        )
3196    }
3197
3198    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
3199    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
3200    /// input.
3201    ///
3202    /// This API is particularly useful for stateful computation on batches of data, such as
3203    /// maintaining an accumulated state that is up to date with the current batch.
3204    ///
3205    /// # Example
3206    /// ```rust
3207    /// # #[cfg(feature = "deploy")] {
3208    /// # use hydro_lang::prelude::*;
3209    /// # use futures::StreamExt;
3210    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3211    /// let tick = process.tick();
3212    /// # // ticks are lazy by default, forces the second tick to run
3213    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3214    /// # let batch_first_tick = process
3215    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
3216    /// #  .batch(&tick, nondet!(/** test */));
3217    /// # let batch_second_tick = process
3218    /// #   .source_iter(q!(vec![5, 6, 7]))
3219    /// #   .batch(&tick, nondet!(/** test */))
3220    /// #   .defer_tick(); // appears on the second tick
3221    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
3222    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
3223    ///
3224    /// input.batch(&tick, nondet!(/** test */))
3225    ///     .across_ticks(|s| s.count()).all_ticks()
3226    /// # }, |mut stream| async move {
3227    /// // [4, 7]
3228    /// assert_eq!(stream.next().await.unwrap(), 4);
3229    /// assert_eq!(stream.next().await.unwrap(), 7);
3230    /// # }));
3231    /// # }
3232    /// ```
3233    pub fn across_ticks<Out: BatchAtomic<'a>>(
3234        self,
3235        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3236    ) -> Out::Batched {
3237        thunk(self.all_ticks_atomic()).batched_atomic()
3238    }
3239
3240    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
3241    /// always has the elements of `self` at tick `T - 1`.
3242    ///
3243    /// At tick `0`, the output stream is empty, since there is no previous tick.
3244    ///
3245    /// This operator enables stateful iterative processing with ticks, by sending data from one
3246    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
3247    ///
3248    /// # Example
3249    /// ```rust
3250    /// # #[cfg(feature = "deploy")] {
3251    /// # use hydro_lang::prelude::*;
3252    /// # use futures::StreamExt;
3253    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3254    /// let tick = process.tick();
3255    /// // ticks are lazy by default, forces the second tick to run
3256    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3257    ///
3258    /// let batch_first_tick = process
3259    ///   .source_iter(q!(vec![1, 2, 3, 4]))
3260    ///   .batch(&tick, nondet!(/** test */));
3261    /// let batch_second_tick = process
3262    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
3263    ///   .batch(&tick, nondet!(/** test */))
3264    ///   .defer_tick(); // appears on the second tick
3265    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
3266    ///
3267    /// changes_across_ticks.clone().filter_not_in(
3268    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
3269    /// ).all_ticks()
3270    /// # }, |mut stream| async move {
3271    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
3272    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
3273    /// #     assert_eq!(stream.next().await.unwrap(), w);
3274    /// # }
3275    /// # }));
3276    /// # }
3277    /// ```
3278    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3279        Stream::new(
3280            self.location.clone(),
3281            HydroNode::DeferTick {
3282                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3283                metadata: self
3284                    .location
3285                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3286            },
3287        )
3288    }
3289}
3290
3291#[cfg(test)]
3292mod tests {
3293    #[cfg(feature = "deploy")]
3294    use futures::{SinkExt, StreamExt};
3295    #[cfg(feature = "deploy")]
3296    use hydro_deploy::Deployment;
3297    #[cfg(feature = "deploy")]
3298    use serde::{Deserialize, Serialize};
3299    #[cfg(any(feature = "deploy", feature = "sim"))]
3300    use stageleft::q;
3301
3302    #[cfg(any(feature = "deploy", feature = "sim"))]
3303    use crate::compile::builder::FlowBuilder;
3304    #[cfg(feature = "deploy")]
3305    use crate::live_collections::sliced::sliced;
3306    #[cfg(feature = "deploy")]
3307    use crate::live_collections::stream::ExactlyOnce;
3308    #[cfg(feature = "sim")]
3309    use crate::live_collections::stream::NoOrder;
3310    #[cfg(any(feature = "deploy", feature = "sim"))]
3311    use crate::live_collections::stream::TotalOrder;
3312    #[cfg(any(feature = "deploy", feature = "sim"))]
3313    use crate::location::Location;
3314    #[cfg(feature = "sim")]
3315    use crate::networking::TCP;
3316    #[cfg(any(feature = "deploy", feature = "sim"))]
3317    use crate::nondet::nondet;
3318
3319    mod backtrace_chained_ops;
3320
3321    #[cfg(feature = "deploy")]
3322    struct P1 {}
3323    #[cfg(feature = "deploy")]
3324    struct P2 {}
3325
3326    #[cfg(feature = "deploy")]
3327    #[derive(Serialize, Deserialize, Debug)]
3328    struct SendOverNetwork {
3329        n: u32,
3330    }
3331
3332    #[cfg(feature = "deploy")]
3333    #[tokio::test]
3334    async fn first_ten_distributed() {
3335        use crate::networking::TCP;
3336
3337        let mut deployment = Deployment::new();
3338
3339        let mut flow = FlowBuilder::new();
3340        let first_node = flow.process::<P1>();
3341        let second_node = flow.process::<P2>();
3342        let external = flow.external::<P2>();
3343
3344        let numbers = first_node.source_iter(q!(0..10));
3345        let out_port = numbers
3346            .map(q!(|n| SendOverNetwork { n }))
3347            .send(&second_node, TCP.fail_stop().bincode())
3348            .send_bincode_external(&external);
3349
3350        let nodes = flow
3351            .with_process(&first_node, deployment.Localhost())
3352            .with_process(&second_node, deployment.Localhost())
3353            .with_external(&external, deployment.Localhost())
3354            .deploy(&mut deployment);
3355
3356        deployment.deploy().await.unwrap();
3357
3358        let mut external_out = nodes.connect(out_port).await;
3359
3360        deployment.start().await.unwrap();
3361
3362        for i in 0..10 {
3363            assert_eq!(external_out.next().await.unwrap().n, i);
3364        }
3365    }
3366
3367    #[cfg(feature = "deploy")]
3368    #[tokio::test]
3369    async fn first_cardinality() {
3370        let mut deployment = Deployment::new();
3371
3372        let mut flow = FlowBuilder::new();
3373        let node = flow.process::<()>();
3374        let external = flow.external::<()>();
3375
3376        let node_tick = node.tick();
3377        let count = node_tick
3378            .singleton(q!([1, 2, 3]))
3379            .into_stream()
3380            .flatten_ordered()
3381            .first()
3382            .into_stream()
3383            .count()
3384            .all_ticks()
3385            .send_bincode_external(&external);
3386
3387        let nodes = flow
3388            .with_process(&node, deployment.Localhost())
3389            .with_external(&external, deployment.Localhost())
3390            .deploy(&mut deployment);
3391
3392        deployment.deploy().await.unwrap();
3393
3394        let mut external_out = nodes.connect(count).await;
3395
3396        deployment.start().await.unwrap();
3397
3398        assert_eq!(external_out.next().await.unwrap(), 1);
3399    }
3400
3401    #[cfg(feature = "deploy")]
3402    #[tokio::test]
3403    async fn unbounded_reduce_remembers_state() {
3404        let mut deployment = Deployment::new();
3405
3406        let mut flow = FlowBuilder::new();
3407        let node = flow.process::<()>();
3408        let external = flow.external::<()>();
3409
3410        let (input_port, input) = node.source_external_bincode(&external);
3411        let out = input
3412            .reduce(q!(|acc, v| *acc += v))
3413            .sample_eager(nondet!(/** test */))
3414            .send_bincode_external(&external);
3415
3416        let nodes = flow
3417            .with_process(&node, deployment.Localhost())
3418            .with_external(&external, deployment.Localhost())
3419            .deploy(&mut deployment);
3420
3421        deployment.deploy().await.unwrap();
3422
3423        let mut external_in = nodes.connect(input_port).await;
3424        let mut external_out = nodes.connect(out).await;
3425
3426        deployment.start().await.unwrap();
3427
3428        external_in.send(1).await.unwrap();
3429        assert_eq!(external_out.next().await.unwrap(), 1);
3430
3431        external_in.send(2).await.unwrap();
3432        assert_eq!(external_out.next().await.unwrap(), 3);
3433    }
3434
3435    #[cfg(feature = "deploy")]
3436    #[tokio::test]
3437    async fn top_level_bounded_cross_singleton() {
3438        let mut deployment = Deployment::new();
3439
3440        let mut flow = FlowBuilder::new();
3441        let node = flow.process::<()>();
3442        let external = flow.external::<()>();
3443
3444        let (input_port, input) =
3445            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3446
3447        let out = input
3448            .cross_singleton(
3449                node.source_iter(q!(vec![1, 2, 3]))
3450                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3451            )
3452            .send_bincode_external(&external);
3453
3454        let nodes = flow
3455            .with_process(&node, deployment.Localhost())
3456            .with_external(&external, deployment.Localhost())
3457            .deploy(&mut deployment);
3458
3459        deployment.deploy().await.unwrap();
3460
3461        let mut external_in = nodes.connect(input_port).await;
3462        let mut external_out = nodes.connect(out).await;
3463
3464        deployment.start().await.unwrap();
3465
3466        external_in.send(1).await.unwrap();
3467        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3468
3469        external_in.send(2).await.unwrap();
3470        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3471    }
3472
3473    #[cfg(feature = "deploy")]
3474    #[tokio::test]
3475    async fn top_level_bounded_reduce_cardinality() {
3476        let mut deployment = Deployment::new();
3477
3478        let mut flow = FlowBuilder::new();
3479        let node = flow.process::<()>();
3480        let external = flow.external::<()>();
3481
3482        let (input_port, input) =
3483            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3484
3485        let out = sliced! {
3486            let input = use(input, nondet!(/** test */));
3487            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3488            input.cross_singleton(v.into_stream().count())
3489        }
3490        .send_bincode_external(&external);
3491
3492        let nodes = flow
3493            .with_process(&node, deployment.Localhost())
3494            .with_external(&external, deployment.Localhost())
3495            .deploy(&mut deployment);
3496
3497        deployment.deploy().await.unwrap();
3498
3499        let mut external_in = nodes.connect(input_port).await;
3500        let mut external_out = nodes.connect(out).await;
3501
3502        deployment.start().await.unwrap();
3503
3504        external_in.send(1).await.unwrap();
3505        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3506
3507        external_in.send(2).await.unwrap();
3508        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3509    }
3510
3511    #[cfg(feature = "deploy")]
3512    #[tokio::test]
3513    async fn top_level_bounded_into_singleton_cardinality() {
3514        let mut deployment = Deployment::new();
3515
3516        let mut flow = FlowBuilder::new();
3517        let node = flow.process::<()>();
3518        let external = flow.external::<()>();
3519
3520        let (input_port, input) =
3521            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3522
3523        let out = sliced! {
3524            let input = use(input, nondet!(/** test */));
3525            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3526            input.cross_singleton(v.into_stream().count())
3527        }
3528        .send_bincode_external(&external);
3529
3530        let nodes = flow
3531            .with_process(&node, deployment.Localhost())
3532            .with_external(&external, deployment.Localhost())
3533            .deploy(&mut deployment);
3534
3535        deployment.deploy().await.unwrap();
3536
3537        let mut external_in = nodes.connect(input_port).await;
3538        let mut external_out = nodes.connect(out).await;
3539
3540        deployment.start().await.unwrap();
3541
3542        external_in.send(1).await.unwrap();
3543        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3544
3545        external_in.send(2).await.unwrap();
3546        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3547    }
3548
3549    #[cfg(feature = "deploy")]
3550    #[tokio::test]
3551    async fn atomic_fold_replays_each_tick() {
3552        let mut deployment = Deployment::new();
3553
3554        let mut flow = FlowBuilder::new();
3555        let node = flow.process::<()>();
3556        let external = flow.external::<()>();
3557
3558        let (input_port, input) =
3559            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3560        let tick = node.tick();
3561
3562        let out = input
3563            .batch(&tick, nondet!(/** test */))
3564            .cross_singleton(
3565                node.source_iter(q!(vec![1, 2, 3]))
3566                    .atomic()
3567                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3568                    .snapshot_atomic(&tick, nondet!(/** test */)),
3569            )
3570            .all_ticks()
3571            .send_bincode_external(&external);
3572
3573        let nodes = flow
3574            .with_process(&node, deployment.Localhost())
3575            .with_external(&external, deployment.Localhost())
3576            .deploy(&mut deployment);
3577
3578        deployment.deploy().await.unwrap();
3579
3580        let mut external_in = nodes.connect(input_port).await;
3581        let mut external_out = nodes.connect(out).await;
3582
3583        deployment.start().await.unwrap();
3584
3585        external_in.send(1).await.unwrap();
3586        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3587
3588        external_in.send(2).await.unwrap();
3589        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3590    }
3591
3592    #[cfg(feature = "deploy")]
3593    #[tokio::test]
3594    async fn unbounded_scan_remembers_state() {
3595        let mut deployment = Deployment::new();
3596
3597        let mut flow = FlowBuilder::new();
3598        let node = flow.process::<()>();
3599        let external = flow.external::<()>();
3600
3601        let (input_port, input) = node.source_external_bincode(&external);
3602        let out = input
3603            .scan(
3604                q!(|| 0),
3605                q!(|acc, v| {
3606                    *acc += v;
3607                    Some(*acc)
3608                }),
3609            )
3610            .send_bincode_external(&external);
3611
3612        let nodes = flow
3613            .with_process(&node, deployment.Localhost())
3614            .with_external(&external, deployment.Localhost())
3615            .deploy(&mut deployment);
3616
3617        deployment.deploy().await.unwrap();
3618
3619        let mut external_in = nodes.connect(input_port).await;
3620        let mut external_out = nodes.connect(out).await;
3621
3622        deployment.start().await.unwrap();
3623
3624        external_in.send(1).await.unwrap();
3625        assert_eq!(external_out.next().await.unwrap(), 1);
3626
3627        external_in.send(2).await.unwrap();
3628        assert_eq!(external_out.next().await.unwrap(), 3);
3629    }
3630
3631    #[cfg(feature = "deploy")]
3632    #[tokio::test]
3633    async fn unbounded_enumerate_remembers_state() {
3634        let mut deployment = Deployment::new();
3635
3636        let mut flow = FlowBuilder::new();
3637        let node = flow.process::<()>();
3638        let external = flow.external::<()>();
3639
3640        let (input_port, input) = node.source_external_bincode(&external);
3641        let out = input.enumerate().send_bincode_external(&external);
3642
3643        let nodes = flow
3644            .with_process(&node, deployment.Localhost())
3645            .with_external(&external, deployment.Localhost())
3646            .deploy(&mut deployment);
3647
3648        deployment.deploy().await.unwrap();
3649
3650        let mut external_in = nodes.connect(input_port).await;
3651        let mut external_out = nodes.connect(out).await;
3652
3653        deployment.start().await.unwrap();
3654
3655        external_in.send(1).await.unwrap();
3656        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3657
3658        external_in.send(2).await.unwrap();
3659        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3660    }
3661
3662    #[cfg(feature = "deploy")]
3663    #[tokio::test]
3664    async fn unbounded_unique_remembers_state() {
3665        let mut deployment = Deployment::new();
3666
3667        let mut flow = FlowBuilder::new();
3668        let node = flow.process::<()>();
3669        let external = flow.external::<()>();
3670
3671        let (input_port, input) =
3672            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3673        let out = input.unique().send_bincode_external(&external);
3674
3675        let nodes = flow
3676            .with_process(&node, deployment.Localhost())
3677            .with_external(&external, deployment.Localhost())
3678            .deploy(&mut deployment);
3679
3680        deployment.deploy().await.unwrap();
3681
3682        let mut external_in = nodes.connect(input_port).await;
3683        let mut external_out = nodes.connect(out).await;
3684
3685        deployment.start().await.unwrap();
3686
3687        external_in.send(1).await.unwrap();
3688        assert_eq!(external_out.next().await.unwrap(), 1);
3689
3690        external_in.send(2).await.unwrap();
3691        assert_eq!(external_out.next().await.unwrap(), 2);
3692
3693        external_in.send(1).await.unwrap();
3694        external_in.send(3).await.unwrap();
3695        assert_eq!(external_out.next().await.unwrap(), 3);
3696    }
3697
3698    #[cfg(feature = "sim")]
3699    #[test]
3700    #[should_panic]
3701    fn sim_batch_nondet_size() {
3702        let mut flow = FlowBuilder::new();
3703        let node = flow.process::<()>();
3704
3705        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3706
3707        let tick = node.tick();
3708        let out_recv = input
3709            .batch(&tick, nondet!(/** test */))
3710            .count()
3711            .all_ticks()
3712            .sim_output();
3713
3714        flow.sim().exhaustive(async || {
3715            in_send.send(());
3716            in_send.send(());
3717            in_send.send(());
3718
3719            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3720        });
3721    }
3722
3723    #[cfg(feature = "sim")]
3724    #[test]
3725    fn sim_batch_preserves_order() {
3726        let mut flow = FlowBuilder::new();
3727        let node = flow.process::<()>();
3728
3729        let (in_send, input) = node.sim_input();
3730
3731        let tick = node.tick();
3732        let out_recv = input
3733            .batch(&tick, nondet!(/** test */))
3734            .all_ticks()
3735            .sim_output();
3736
3737        flow.sim().exhaustive(async || {
3738            in_send.send(1);
3739            in_send.send(2);
3740            in_send.send(3);
3741
3742            out_recv.assert_yields_only([1, 2, 3]).await;
3743        });
3744    }
3745
3746    #[cfg(feature = "sim")]
3747    #[test]
3748    #[should_panic]
3749    fn sim_batch_unordered_shuffles() {
3750        let mut flow = FlowBuilder::new();
3751        let node = flow.process::<()>();
3752
3753        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3754
3755        let tick = node.tick();
3756        let batch = input.batch(&tick, nondet!(/** test */));
3757        let out_recv = batch
3758            .clone()
3759            .min()
3760            .zip(batch.max())
3761            .all_ticks()
3762            .sim_output();
3763
3764        flow.sim().exhaustive(async || {
3765            in_send.send_many_unordered([1, 2, 3]);
3766
3767            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3768                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3769            }
3770        });
3771    }
3772
3773    #[cfg(feature = "sim")]
3774    #[test]
3775    fn sim_batch_unordered_shuffles_count() {
3776        let mut flow = FlowBuilder::new();
3777        let node = flow.process::<()>();
3778
3779        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3780
3781        let tick = node.tick();
3782        let batch = input.batch(&tick, nondet!(/** test */));
3783        let out_recv = batch.all_ticks().sim_output();
3784
3785        let instance_count = flow.sim().exhaustive(async || {
3786            in_send.send_many_unordered([1, 2, 3, 4]);
3787            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3788        });
3789
3790        assert_eq!(
3791            instance_count,
3792            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3793        )
3794    }
3795
3796    #[cfg(feature = "sim")]
3797    #[test]
3798    #[should_panic]
3799    fn sim_observe_order_batched() {
3800        let mut flow = FlowBuilder::new();
3801        let node = flow.process::<()>();
3802
3803        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3804
3805        let tick = node.tick();
3806        let batch = input.batch(&tick, nondet!(/** test */));
3807        let out_recv = batch
3808            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3809            .all_ticks()
3810            .sim_output();
3811
3812        flow.sim().exhaustive(async || {
3813            in_send.send_many_unordered([1, 2, 3, 4]);
3814            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3815        });
3816    }
3817
3818    #[cfg(feature = "sim")]
3819    #[test]
3820    fn sim_observe_order_batched_count() {
3821        let mut flow = FlowBuilder::new();
3822        let node = flow.process::<()>();
3823
3824        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3825
3826        let tick = node.tick();
3827        let batch = input.batch(&tick, nondet!(/** test */));
3828        let out_recv = batch
3829            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3830            .all_ticks()
3831            .sim_output();
3832
3833        let instance_count = flow.sim().exhaustive(async || {
3834            in_send.send_many_unordered([1, 2, 3, 4]);
3835            let _ = out_recv.collect::<Vec<_>>().await;
3836        });
3837
3838        assert_eq!(
3839            instance_count,
3840            192 // 4! * 2^{4 - 1}
3841        )
3842    }
3843
3844    #[cfg(feature = "sim")]
3845    #[test]
3846    fn sim_unordered_count_instance_count() {
3847        let mut flow = FlowBuilder::new();
3848        let node = flow.process::<()>();
3849
3850        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3851
3852        let tick = node.tick();
3853        let out_recv = input
3854            .count()
3855            .snapshot(&tick, nondet!(/** test */))
3856            .all_ticks()
3857            .sim_output();
3858
3859        let instance_count = flow.sim().exhaustive(async || {
3860            in_send.send_many_unordered([1, 2, 3, 4]);
3861            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3862        });
3863
3864        assert_eq!(
3865            instance_count,
3866            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3867        )
3868    }
3869
3870    #[cfg(feature = "sim")]
3871    #[test]
3872    fn sim_top_level_assume_ordering() {
3873        let mut flow = FlowBuilder::new();
3874        let node = flow.process::<()>();
3875
3876        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3877
3878        let out_recv = input
3879            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3880            .sim_output();
3881
3882        let instance_count = flow.sim().exhaustive(async || {
3883            in_send.send_many_unordered([1, 2, 3]);
3884            let mut out = out_recv.collect::<Vec<_>>().await;
3885            out.sort();
3886            assert_eq!(out, vec![1, 2, 3]);
3887        });
3888
3889        assert_eq!(instance_count, 6)
3890    }
3891
3892    #[cfg(feature = "sim")]
3893    #[test]
3894    fn sim_top_level_assume_ordering_cycle_back() {
3895        let mut flow = FlowBuilder::new();
3896        let node = flow.process::<()>();
3897        let node2 = flow.process::<()>();
3898
3899        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3900
3901        let (complete_cycle_back, cycle_back) =
3902            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3903        let ordered = input
3904            .merge_unordered(cycle_back)
3905            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3906        complete_cycle_back.complete(
3907            ordered
3908                .clone()
3909                .map(q!(|v| v + 1))
3910                .filter(q!(|v| v % 2 == 1))
3911                .send(&node2, TCP.fail_stop().bincode())
3912                .send(&node, TCP.fail_stop().bincode()),
3913        );
3914
3915        let out_recv = ordered.sim_output();
3916
3917        let mut saw = false;
3918        let instance_count = flow.sim().exhaustive(async || {
3919            in_send.send_many_unordered([0, 2]);
3920            let out = out_recv.collect::<Vec<_>>().await;
3921
3922            if out.starts_with(&[0, 1, 2]) {
3923                saw = true;
3924            }
3925        });
3926
3927        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3928        assert_eq!(instance_count, 6);
3929    }
3930
3931    #[cfg(feature = "sim")]
3932    #[test]
3933    fn sim_top_level_assume_ordering_cycle_back_tick() {
3934        let mut flow = FlowBuilder::new();
3935        let node = flow.process::<()>();
3936        let node2 = flow.process::<()>();
3937
3938        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3939
3940        let (complete_cycle_back, cycle_back) =
3941            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3942        let ordered = input
3943            .merge_unordered(cycle_back)
3944            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3945        complete_cycle_back.complete(
3946            ordered
3947                .clone()
3948                .batch(&node.tick(), nondet!(/** test */))
3949                .all_ticks()
3950                .map(q!(|v| v + 1))
3951                .filter(q!(|v| v % 2 == 1))
3952                .send(&node2, TCP.fail_stop().bincode())
3953                .send(&node, TCP.fail_stop().bincode()),
3954        );
3955
3956        let out_recv = ordered.sim_output();
3957
3958        let mut saw = false;
3959        let instance_count = flow.sim().exhaustive(async || {
3960            in_send.send_many_unordered([0, 2]);
3961            let out = out_recv.collect::<Vec<_>>().await;
3962
3963            if out.starts_with(&[0, 1, 2]) {
3964                saw = true;
3965            }
3966        });
3967
3968        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3969        assert_eq!(instance_count, 58);
3970    }
3971
3972    #[cfg(feature = "sim")]
3973    #[test]
3974    fn sim_top_level_assume_ordering_multiple() {
3975        let mut flow = FlowBuilder::new();
3976        let node = flow.process::<()>();
3977        let node2 = flow.process::<()>();
3978
3979        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3980        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3981
3982        let (complete_cycle_back, cycle_back) =
3983            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3984        let input1_ordered = input
3985            .clone()
3986            .merge_unordered(cycle_back)
3987            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3988        let foo = input1_ordered
3989            .clone()
3990            .map(q!(|v| v + 3))
3991            .weaken_ordering::<NoOrder>()
3992            .merge_unordered(input2)
3993            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3994
3995        complete_cycle_back.complete(
3996            foo.filter(q!(|v| *v == 3))
3997                .send(&node2, TCP.fail_stop().bincode())
3998                .send(&node, TCP.fail_stop().bincode()),
3999        );
4000
4001        let out_recv = input1_ordered.sim_output();
4002
4003        let mut saw = false;
4004        let instance_count = flow.sim().exhaustive(async || {
4005            in_send.send_many_unordered([0, 1]);
4006            let out = out_recv.collect::<Vec<_>>().await;
4007
4008            if out.starts_with(&[0, 3, 1]) {
4009                saw = true;
4010            }
4011        });
4012
4013        assert!(saw, "did not see an instance with 0, 3, 1 in order");
4014        assert_eq!(instance_count, 24);
4015    }
4016
4017    #[cfg(feature = "sim")]
4018    #[test]
4019    fn sim_atomic_assume_ordering_cycle_back() {
4020        let mut flow = FlowBuilder::new();
4021        let node = flow.process::<()>();
4022        let node2 = flow.process::<()>();
4023
4024        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
4025
4026        let (complete_cycle_back, cycle_back) =
4027            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
4028        let ordered = input
4029            .merge_unordered(cycle_back)
4030            .atomic()
4031            .assume_ordering::<TotalOrder>(nondet!(/** test */))
4032            .end_atomic();
4033        complete_cycle_back.complete(
4034            ordered
4035                .clone()
4036                .map(q!(|v| v + 1))
4037                .filter(q!(|v| v % 2 == 1))
4038                .send(&node2, TCP.fail_stop().bincode())
4039                .send(&node, TCP.fail_stop().bincode()),
4040        );
4041
4042        let out_recv = ordered.sim_output();
4043
4044        let instance_count = flow.sim().exhaustive(async || {
4045            in_send.send_many_unordered([0, 2]);
4046            let out = out_recv.collect::<Vec<_>>().await;
4047            assert_eq!(out.len(), 4);
4048        });
4049        assert_eq!(instance_count, 22);
4050    }
4051
4052    #[cfg(feature = "deploy")]
4053    #[tokio::test]
4054    async fn partition_evens_odds() {
4055        let mut deployment = Deployment::new();
4056
4057        let mut flow = FlowBuilder::new();
4058        let node = flow.process::<()>();
4059        let external = flow.external::<()>();
4060
4061        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
4062        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
4063        let evens_port = evens.send_bincode_external(&external);
4064        let odds_port = odds.send_bincode_external(&external);
4065
4066        let nodes = flow
4067            .with_process(&node, deployment.Localhost())
4068            .with_external(&external, deployment.Localhost())
4069            .deploy(&mut deployment);
4070
4071        deployment.deploy().await.unwrap();
4072
4073        let mut evens_out = nodes.connect(evens_port).await;
4074        let mut odds_out = nodes.connect(odds_port).await;
4075
4076        deployment.start().await.unwrap();
4077
4078        let mut even_results = Vec::new();
4079        for _ in 0..3 {
4080            even_results.push(evens_out.next().await.unwrap());
4081        }
4082        even_results.sort();
4083        assert_eq!(even_results, vec![2, 4, 6]);
4084
4085        let mut odd_results = Vec::new();
4086        for _ in 0..3 {
4087            odd_results.push(odds_out.next().await.unwrap());
4088        }
4089        odd_results.sort();
4090        assert_eq!(odd_results, vec![1, 3, 5]);
4091    }
4092
4093    #[cfg(feature = "deploy")]
4094    #[tokio::test]
4095    async fn unconsumed_inspect_still_runs() {
4096        use crate::deploy::DeployCrateWrapper;
4097
4098        let mut deployment = Deployment::new();
4099
4100        let mut flow = FlowBuilder::new();
4101        let node = flow.process::<()>();
4102
4103        // The return value of .inspect() is intentionally dropped.
4104        // Before the Null-root fix, this would silently do nothing.
4105        node.source_iter(q!(0..5))
4106            .inspect(q!(|x| println!("inspect: {}", x)));
4107
4108        let nodes = flow
4109            .with_process(&node, deployment.Localhost())
4110            .deploy(&mut deployment);
4111
4112        deployment.deploy().await.unwrap();
4113
4114        let mut stdout = nodes.get_process(&node).stdout();
4115
4116        deployment.start().await.unwrap();
4117
4118        let mut lines = Vec::new();
4119        for _ in 0..5 {
4120            lines.push(stdout.recv().await.unwrap());
4121        }
4122        lines.sort();
4123        assert_eq!(
4124            lines,
4125            vec![
4126                "inspect: 0",
4127                "inspect: 1",
4128                "inspect: 2",
4129                "inspect: 3",
4130                "inspect: 4",
4131            ]
4132        );
4133    }
4134
4135    #[cfg(feature = "sim")]
4136    #[test]
4137    fn sim_limit() {
4138        let mut flow = FlowBuilder::new();
4139        let node = flow.process::<()>();
4140
4141        let (in_send, input) = node.sim_input();
4142
4143        let out_recv = input.limit(q!(3)).sim_output();
4144
4145        flow.sim().exhaustive(async || {
4146            in_send.send(1);
4147            in_send.send(2);
4148            in_send.send(3);
4149            in_send.send(4);
4150            in_send.send(5);
4151
4152            out_recv.assert_yields_only([1, 2, 3]).await;
4153        });
4154    }
4155
4156    #[cfg(feature = "sim")]
4157    #[test]
4158    fn sim_limit_zero() {
4159        let mut flow = FlowBuilder::new();
4160        let node = flow.process::<()>();
4161
4162        let (in_send, input) = node.sim_input();
4163
4164        let out_recv = input.limit(q!(0)).sim_output();
4165
4166        flow.sim().exhaustive(async || {
4167            in_send.send(1);
4168            in_send.send(2);
4169
4170            out_recv.assert_yields_only::<i32, _>([]).await;
4171        });
4172    }
4173
4174    #[cfg(feature = "sim")]
4175    #[test]
4176    fn sim_merge_ordered() {
4177        let mut flow = FlowBuilder::new();
4178        let node = flow.process::<()>();
4179
4180        let (in_send, input) = node.sim_input();
4181        let (in_send2, input2) = node.sim_input();
4182
4183        let out_recv = input
4184            .merge_ordered(input2, nondet!(/** test */))
4185            .sim_output();
4186
4187        let mut saw_out_of_order = false;
4188        let instances = flow.sim().exhaustive(async || {
4189            in_send.send(1);
4190            in_send.send(2);
4191            in_send2.send(3);
4192            in_send2.send(4);
4193
4194            let out = out_recv.collect::<Vec<_>>().await;
4195
4196            if out == [1, 3, 2, 4] {
4197                saw_out_of_order = true;
4198            }
4199
4200            // Assert ordering preservation: elements from each input must
4201            // appear in their original relative order.
4202            let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
4203            let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
4204            assert_eq!(
4205                first_elements,
4206                vec![1, 2],
4207                "first input order violated: {:?}",
4208                out
4209            );
4210            assert_eq!(
4211                second_elements,
4212                vec![3, 4],
4213                "second input order violated: {:?}",
4214                out
4215            );
4216
4217            first_elements.append(&mut second_elements);
4218            first_elements.sort();
4219            assert_eq!(first_elements, vec![1, 2, 3, 4]);
4220        });
4221
4222        assert!(saw_out_of_order);
4223        assert_eq!(instances, 6);
4224    }
4225
4226    /// Tests that merge_ordered passes through elements when only one input
4227    /// has data.
4228    #[cfg(feature = "sim")]
4229    #[test]
4230    fn sim_merge_ordered_one_empty() {
4231        let mut flow = FlowBuilder::new();
4232        let node = flow.process::<()>();
4233
4234        let (in_send, input) = node.sim_input();
4235        let (_in_send2, input2) = node.sim_input();
4236
4237        let out_recv = input
4238            .merge_ordered(input2, nondet!(/** test */))
4239            .sim_output();
4240
4241        let instances = flow.sim().exhaustive(async || {
4242            in_send.send(1);
4243            in_send.send(2);
4244
4245            let out = out_recv.collect::<Vec<_>>().await;
4246            assert_eq!(out, vec![1, 2]);
4247        });
4248
4249        // Only one possible interleaving when one input is empty
4250        assert_eq!(instances, 1);
4251    }
4252
4253    /// Tests that merge_ordered correctly handles feedback cycles.
4254    /// An element output from merge_ordered is filtered and cycled back to
4255    /// one of its inputs. The one-at-a-time release must allow the cycled-back
4256    /// element to arrive and potentially be emitted before elements still
4257    /// waiting on the other input.
4258    #[cfg(feature = "sim")]
4259    #[test]
4260    fn sim_merge_ordered_cycle_back() {
4261        let mut flow = FlowBuilder::new();
4262        let node = flow.process::<()>();
4263
4264        let (in_send, input) = node.sim_input();
4265
4266        // Create a forward ref for the cycle back
4267        let (complete_cycle_back, cycle_back) =
4268            node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4269
4270        // merge_ordered: input (external) with cycle_back
4271        let merged = input.merge_ordered(cycle_back, nondet!(/** test */));
4272
4273        // Cycle back: elements equal to 1 get mapped to 10 and fed back
4274        complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4275
4276        let out_recv = merged.sim_output();
4277
4278        // Send 1 and 2. Element 1 should cycle back as 10.
4279        // Valid orderings must have 1 before 10 (since 10 depends on 1).
4280        let mut saw_cycle_before_second = false;
4281        flow.sim().exhaustive(async || {
4282            in_send.send(1);
4283            in_send.send(2);
4284
4285            let out = out_recv.collect::<Vec<_>>().await;
4286
4287            // 10 must always come after 1 (causal dependency)
4288            let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4289            let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4290            assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4291
4292            // Check if we see [1, 10, 2] — the cycled element beats the second input
4293            if out == [1, 10, 2] {
4294                saw_cycle_before_second = true;
4295            }
4296
4297            let mut sorted = out;
4298            sorted.sort();
4299            assert_eq!(sorted, vec![1, 2, 10]);
4300        });
4301
4302        assert!(
4303            saw_cycle_before_second,
4304            "never saw the cycled element arrive before the second input element"
4305        );
4306    }
4307
4308    /// Tests that merge_ordered correctly interleaves when one input has a
4309    /// delayed element. With a: [1, _delay_, 2] and b: [3, 4], the delayed
4310    /// element 2 should be able to appear after b's elements.
4311    #[cfg(feature = "sim")]
4312    #[test]
4313    fn sim_merge_ordered_delayed() {
4314        let mut flow = FlowBuilder::new();
4315        let node = flow.process::<()>();
4316
4317        let (in_send, input) = node.sim_input();
4318        let (in_send2, input2) = node.sim_input();
4319
4320        let out_recv = input
4321            .merge_ordered(input2, nondet!(/** test */))
4322            .sim_output();
4323
4324        let mut saw_delayed_interleaving = false;
4325        flow.sim().exhaustive(async || {
4326            // Send 1 from a, and 3, 4 from b
4327            in_send.send(1);
4328            in_send2.send(3);
4329            in_send2.send(4);
4330
4331            // Collect what's available so far
4332            let first_batch = out_recv.collect::<Vec<_>>().await;
4333
4334            // Now send the delayed element 2 from a
4335            in_send.send(2);
4336            let second_batch = out_recv.collect::<Vec<_>>().await;
4337
4338            let mut all: Vec<_> = first_batch
4339                .iter()
4340                .chain(second_batch.iter())
4341                .copied()
4342                .collect();
4343
4344            // Check if we saw [1, 3, 4, 2] — the delayed interleaving
4345            if all == [1, 3, 4, 2] {
4346                saw_delayed_interleaving = true;
4347            }
4348
4349            all.sort();
4350            assert_eq!(all, vec![1, 2, 3, 4]);
4351        });
4352
4353        assert!(saw_delayed_interleaving);
4354    }
4355
4356    /// Deploy test: merge_ordered with a delayed element on one input.
4357    /// Sends a=1, b=3, b=4, then after receiving those, sends a=2.
4358    /// Expects to see [1, 3, 4] first, then [2] — demonstrating that
4359    /// both inputs are pulled and the delayed element arrives later.
4360    #[cfg(feature = "deploy")]
4361    #[tokio::test]
4362    async fn deploy_merge_ordered_delayed() {
4363        let mut deployment = Deployment::new();
4364
4365        let mut flow = FlowBuilder::new();
4366        let node = flow.process::<()>();
4367        let external = flow.external::<()>();
4368
4369        let (input_a_port, input_a) = node.source_external_bincode(&external);
4370        let (input_b_port, input_b) = node.source_external_bincode(&external);
4371
4372        let out = input_a
4373            .assume_ordering(nondet!(/** test */))
4374            .merge_ordered(
4375                input_b.assume_ordering(nondet!(/** test */)),
4376                nondet!(/** test */),
4377            )
4378            .send_bincode_external(&external);
4379
4380        let nodes = flow
4381            .with_process(&node, deployment.Localhost())
4382            .with_external(&external, deployment.Localhost())
4383            .deploy(&mut deployment);
4384
4385        deployment.deploy().await.unwrap();
4386
4387        let mut ext_a = nodes.connect(input_a_port).await;
4388        let mut ext_b = nodes.connect(input_b_port).await;
4389        let mut ext_out = nodes.connect(out).await;
4390
4391        deployment.start().await.unwrap();
4392
4393        // Send a=1, b=3, b=4
4394        ext_a.send(1).await.unwrap();
4395        ext_b.send(3).await.unwrap();
4396        ext_b.send(4).await.unwrap();
4397
4398        // Collect the first 3 elements
4399        let mut received = Vec::new();
4400        for _ in 0..3 {
4401            received.push(ext_out.next().await.unwrap());
4402        }
4403
4404        // Now send the delayed a=2
4405        ext_a.send(2).await.unwrap();
4406        received.push(ext_out.next().await.unwrap());
4407
4408        // All elements should be present
4409        received.sort();
4410        assert_eq!(received, vec![1, 2, 3, 4]);
4411    }
4412
4413    #[cfg(feature = "deploy")]
4414    #[tokio::test]
4415    async fn monotone_fold_threshold() {
4416        use crate::properties::manual_proof;
4417
4418        let mut deployment = Deployment::new();
4419
4420        let mut flow = FlowBuilder::new();
4421        let node = flow.process::<()>();
4422        let external = flow.external::<()>();
4423
4424        let in_unbounded: super::Stream<_, _> =
4425            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4426        let sum = in_unbounded.fold(
4427            q!(|| 0),
4428            q!(
4429                |sum, v| {
4430                    *sum += v;
4431                },
4432                monotone = manual_proof!(/** test */)
4433            ),
4434        );
4435
4436        let threshold_out = sum
4437            .threshold_greater_or_equal(node.singleton(q!(7)))
4438            .send_bincode_external(&external);
4439
4440        let nodes = flow
4441            .with_process(&node, deployment.Localhost())
4442            .with_external(&external, deployment.Localhost())
4443            .deploy(&mut deployment);
4444
4445        deployment.deploy().await.unwrap();
4446
4447        let mut threshold_out = nodes.connect(threshold_out).await;
4448
4449        deployment.start().await.unwrap();
4450
4451        assert_eq!(threshold_out.next().await.unwrap(), 7);
4452    }
4453
4454    #[cfg(feature = "deploy")]
4455    #[tokio::test]
4456    async fn monotone_count_threshold() {
4457        let mut deployment = Deployment::new();
4458
4459        let mut flow = FlowBuilder::new();
4460        let node = flow.process::<()>();
4461        let external = flow.external::<()>();
4462
4463        let in_unbounded: super::Stream<_, _> =
4464            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4465        let sum = in_unbounded.count();
4466
4467        let threshold_out = sum
4468            .threshold_greater_or_equal(node.singleton(q!(3)))
4469            .send_bincode_external(&external);
4470
4471        let nodes = flow
4472            .with_process(&node, deployment.Localhost())
4473            .with_external(&external, deployment.Localhost())
4474            .deploy(&mut deployment);
4475
4476        deployment.deploy().await.unwrap();
4477
4478        let mut threshold_out = nodes.connect(threshold_out).await;
4479
4480        deployment.start().await.unwrap();
4481
4482        assert_eq!(threshold_out.next().await.unwrap(), 3);
4483    }
4484
4485    #[cfg(feature = "deploy")]
4486    #[tokio::test]
4487    async fn monotone_map_order_preserving_threshold() {
4488        use crate::properties::manual_proof;
4489
4490        let mut deployment = Deployment::new();
4491
4492        let mut flow = FlowBuilder::new();
4493        let node = flow.process::<()>();
4494        let external = flow.external::<()>();
4495
4496        let in_unbounded: super::Stream<_, _> =
4497            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4498        let sum = in_unbounded.fold(
4499            q!(|| 0),
4500            q!(
4501                |sum, v| {
4502                    *sum += v;
4503                },
4504                monotone = manual_proof!(/** test */)
4505            ),
4506        );
4507
4508        // map with order_preserving should preserve monotonicity
4509        let doubled = sum.map(q!(
4510            |v| v * 2,
4511            order_preserving = manual_proof!(/** doubling preserves order */)
4512        ));
4513
4514        let threshold_out = doubled
4515            .threshold_greater_or_equal(node.singleton(q!(14)))
4516            .send_bincode_external(&external);
4517
4518        let nodes = flow
4519            .with_process(&node, deployment.Localhost())
4520            .with_external(&external, deployment.Localhost())
4521            .deploy(&mut deployment);
4522
4523        deployment.deploy().await.unwrap();
4524
4525        let mut threshold_out = nodes.connect(threshold_out).await;
4526
4527        deployment.start().await.unwrap();
4528
4529        assert_eq!(threshold_out.next().await.unwrap(), 14);
4530    }
4531
4532    // === Compile-time type tests for join/cross_product ordering ===
4533
4534    #[cfg(any(feature = "deploy", feature = "sim"))]
4535    mod join_ordering_type_tests {
4536        use crate::live_collections::boundedness::{Bounded, Unbounded};
4537        use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4538        use crate::location::{Location, Process};
4539
4540        #[expect(dead_code, reason = "compile-time type test")]
4541        fn join_unbounded_with_bounded_preserves_order<'a>(
4542            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4543            right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4544        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4545            left.join(right)
4546        }
4547
4548        #[expect(dead_code, reason = "compile-time type test")]
4549        fn join_unbounded_with_unbounded_is_no_order<'a>(
4550            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4551            right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4552        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4553            left.join(right)
4554        }
4555
4556        #[expect(dead_code, reason = "compile-time type test")]
4557        fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4558            left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4559            right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4560        ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4561            left.join(right)
4562        }
4563
4564        #[expect(dead_code, reason = "compile-time type test")]
4565        fn join_unbounded_noorder_with_bounded<'a>(
4566            left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4567            right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4568        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4569            left.join(right)
4570        }
4571
4572        // === Compile-time type tests for cross_product ordering ===
4573
4574        #[expect(dead_code, reason = "compile-time type test")]
4575        fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4576            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4577            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4578        ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4579            left.cross_product(right)
4580        }
4581
4582        #[expect(dead_code, reason = "compile-time type test")]
4583        fn cross_product_bounded_with_bounded_preserves_order<'a>(
4584            left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4585            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4586        ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4587            left.cross_product(right)
4588        }
4589
4590        #[expect(dead_code, reason = "compile-time type test")]
4591        fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4592            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4593            right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4594        ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4595            left.cross_product(right)
4596        }
4597    } // mod join_ordering_type_tests
4598
4599    // === Runtime correctness tests for bounded join/cross_product ===
4600
4601    #[cfg(feature = "sim")]
4602    #[test]
4603    fn cross_product_mixed_boundedness_correctness() {
4604        use stageleft::q;
4605
4606        use crate::compile::builder::FlowBuilder;
4607        use crate::nondet::nondet;
4608
4609        let mut flow = FlowBuilder::new();
4610        let process = flow.process::<()>();
4611        let tick = process.tick();
4612
4613        let left = process.source_iter(q!(vec![1, 2]));
4614        let right = process
4615            .source_iter(q!(vec!['a', 'b']))
4616            .batch(&tick, nondet!(/** test */))
4617            .all_ticks();
4618
4619        let out = left.cross_product(right).sim_output();
4620
4621        flow.sim().exhaustive(async || {
4622            out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4623                .await;
4624        });
4625    }
4626
4627    #[cfg(feature = "sim")]
4628    #[test]
4629    fn join_mixed_boundedness_correctness() {
4630        use stageleft::q;
4631
4632        use crate::compile::builder::FlowBuilder;
4633        use crate::nondet::nondet;
4634
4635        let mut flow = FlowBuilder::new();
4636        let process = flow.process::<()>();
4637        let tick = process.tick();
4638
4639        let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4640        let right = process
4641            .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4642            .batch(&tick, nondet!(/** test */))
4643            .all_ticks();
4644
4645        let out = left.join(right).sim_output();
4646
4647        flow.sim().exhaustive(async || {
4648            out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4649                .await;
4650        });
4651    }
4652
4653    #[cfg(feature = "sim")]
4654    #[test]
4655    fn sim_merge_unordered_independent_atomics() {
4656        let mut flow = FlowBuilder::new();
4657        let node = flow.process::<()>();
4658
4659        let (in1_send, input1) = node.sim_input::<_, TotalOrder, _>();
4660        let (in2_send, input2) = node.sim_input::<_, TotalOrder, _>();
4661
4662        let out = input1
4663            .atomic()
4664            .merge_unordered(input2.atomic())
4665            .end_atomic()
4666            .sim_output();
4667
4668        flow.sim().exhaustive(async || {
4669            in1_send.send(1);
4670            in2_send.send(2);
4671
4672            out.assert_yields_only_unordered(vec![1, 2]).await;
4673        });
4674    }
4675
4676    #[cfg(feature = "deploy")]
4677    #[tokio::test]
4678    async fn test_stream_ref() {
4679        let mut deployment = Deployment::new();
4680
4681        let mut flow = FlowBuilder::new();
4682        let external = flow.external::<()>();
4683        let p1 = flow.process::<()>();
4684
4685        // Create a bounded stream (source_iter is bounded within a tick)
4686        let my_stream = p1.source_iter(q!(1..=5i32));
4687
4688        let stream_ref = my_stream.by_ref();
4689
4690        // Use the stream ref to get the vec's length
4691        let out_port = p1
4692            .source_iter(q!([()]))
4693            .map(q!(|_| stream_ref.len() as i32))
4694            .send_bincode_external(&external);
4695
4696        // Also consume the stream via pipe
4697        my_stream.for_each(q!(|_| {}));
4698
4699        let nodes = flow
4700            .with_default_optimize()
4701            .with_process(&p1, deployment.Localhost())
4702            .with_external(&external, deployment.Localhost())
4703            .deploy(&mut deployment);
4704
4705        deployment.deploy().await.unwrap();
4706
4707        let mut out_recv = nodes.connect(out_port).await;
4708
4709        deployment.start().await.unwrap();
4710
4711        let result = out_recv.next().await.unwrap();
4712        // stream has 5 elements
4713        assert_eq!(result, 5);
4714    }
4715
4716    #[cfg(feature = "deploy")]
4717    #[tokio::test]
4718    async fn test_stream_ref_contents() {
4719        let mut deployment = Deployment::new();
4720
4721        let mut flow = FlowBuilder::new();
4722        let external = flow.external::<()>();
4723        let p1 = flow.process::<()>();
4724
4725        // Create a bounded stream
4726        let my_stream = p1.source_iter(q!(1..=3i32));
4727
4728        let stream_ref = my_stream.by_ref();
4729
4730        // Sum the referenced vec's contents
4731        let out_port = p1
4732            .source_iter(q!([()]))
4733            .map(q!(|_| stream_ref.iter().sum::<i32>()))
4734            .send_bincode_external(&external);
4735
4736        my_stream.for_each(q!(|_| {}));
4737
4738        let nodes = flow
4739            .with_default_optimize()
4740            .with_process(&p1, deployment.Localhost())
4741            .with_external(&external, deployment.Localhost())
4742            .deploy(&mut deployment);
4743
4744        deployment.deploy().await.unwrap();
4745
4746        let mut out_recv = nodes.connect(out_port).await;
4747
4748        deployment.start().await.unwrap();
4749
4750        let result = out_recv.next().await.unwrap();
4751        // sum of 1+2+3 = 6
4752        assert_eq!(result, 6);
4753    }
4754
4755    #[cfg(feature = "deploy")]
4756    #[tokio::test]
4757    async fn test_stream_ref_no_consumer() {
4758        let mut deployment = Deployment::new();
4759
4760        let mut flow = FlowBuilder::new();
4761        let external = flow.external::<()>();
4762        let p1 = flow.process::<()>();
4763
4764        // Create a bounded stream — no pipe consumer, only ref
4765        let my_stream = p1.source_iter(q!(1..=4i32));
4766
4767        let stream_ref = my_stream.by_ref();
4768
4769        let out_port = p1
4770            .source_iter(q!([()]))
4771            .map(q!(|_| stream_ref.len() as i32))
4772            .send_bincode_external(&external);
4773
4774        let nodes = flow
4775            .with_default_optimize()
4776            .with_process(&p1, deployment.Localhost())
4777            .with_external(&external, deployment.Localhost())
4778            .deploy(&mut deployment);
4779
4780        deployment.deploy().await.unwrap();
4781
4782        let mut out_recv = nodes.connect(out_port).await;
4783
4784        deployment.start().await.unwrap();
4785
4786        let result = out_recv.next().await.unwrap();
4787        assert_eq!(result, 4);
4788    }
4789
4790    #[cfg(feature = "deploy")]
4791    #[tokio::test]
4792    async fn test_stream_mut() {
4793        let mut deployment = Deployment::new();
4794
4795        let mut flow = FlowBuilder::new();
4796        let external = flow.external::<()>();
4797        let p1 = flow.process::<()>();
4798
4799        // Create a bounded stream
4800        let my_stream = p1.source_iter(q!(1..=5i32));
4801
4802        let stream_mut = my_stream.by_mut();
4803
4804        // Mutably reference the buffer to retain only items > 3
4805        let out_port = p1
4806            .source_iter(q!([()]))
4807            .map(q!(|_| {
4808                stream_mut.retain(|x| *x > 3);
4809                stream_mut.len() as i32
4810            }))
4811            .send_bincode_external(&external);
4812
4813        my_stream.for_each(q!(|_| {}));
4814
4815        let nodes = flow
4816            .with_default_optimize()
4817            .with_process(&p1, deployment.Localhost())
4818            .with_external(&external, deployment.Localhost())
4819            .deploy(&mut deployment);
4820
4821        deployment.deploy().await.unwrap();
4822
4823        let mut out_recv = nodes.connect(out_port).await;
4824
4825        deployment.start().await.unwrap();
4826
4827        let result = out_recv.next().await.unwrap();
4828        // After retain(> 3): [4, 5] => len = 2
4829        assert_eq!(result, 2);
4830    }
4831
4832    /// A map with a mut singleton ref on an unordered input should produce > 1
4833    /// simulation instance because the ordering of elements through the mut closure
4834    /// is non-deterministic.
4835    #[cfg(feature = "sim")]
4836    #[test]
4837    fn sim_map_with_mut_on_unordered_explores_multiple_states() {
4838        use crate::live_collections::sliced::sliced;
4839        use crate::live_collections::stream::ExactlyOnce;
4840        use crate::properties::manual_proof;
4841
4842        let mut flow = FlowBuilder::new();
4843        let node = flow.process::<()>();
4844
4845        let (trigger_send, trigger) = node.sim_input::<i32, TotalOrder, ExactlyOnce>();
4846
4847        let out_recv = sliced! {
4848            let batch = use(trigger, nondet!(/** test */));
4849            let counter = batch.location().source_iter(q!(vec![0i32]))
4850                .fold(q!(|| 0i32), q!(|acc, v| *acc += v));
4851            let counter_mut = counter.by_mut();
4852            let items = batch.location().source_iter(q!(vec![1i32, 2])).weaken_ordering::<NoOrder>();
4853            items.map(q!(
4854                |x| {
4855                    *counter_mut += x;
4856                    *counter_mut
4857                },
4858                commutative = manual_proof!(/** test */)
4859            ))
4860        }
4861        .sim_output();
4862
4863        let count = flow.sim().exhaustive(async || {
4864            trigger_send.send(1);
4865            let _all: Vec<i32> = out_recv.collect_sorted().await;
4866        });
4867
4868        assert_eq!(
4869            count, 2,
4870            "Expected 2 simulation instances due to mut on unordered input, got {}",
4871            count
4872        );
4873    }
4874
4875    /// A map with a mut singleton ref on a top-level unordered input should produce > 1
4876    /// simulation instance. Currently panics because observe_nondet doesn't support
4877    /// top-level bounded inputs yet.
4878    #[cfg(feature = "sim")]
4879    #[test]
4880    #[ignore = "observe_nondet not yet supported for top-level bounded inputs (https://github.com/hydro-project/hydro/issues/2950)"]
4881    fn sim_map_with_mut_on_unordered_top_level() {
4882        use crate::properties::manual_proof;
4883
4884        let mut flow = FlowBuilder::new();
4885        let node = flow.process::<()>();
4886
4887        let counter = node
4888            .source_iter(q!(vec![0i32]))
4889            .fold(q!(|| 0i32), q!(|acc, v| *acc += v));
4890        let counter_mut = counter.by_mut();
4891
4892        let out_recv = node
4893            .source_iter(q!(vec![1i32, 2]))
4894            .weaken_ordering::<NoOrder>()
4895            .map(q!(
4896                |x| {
4897                    *counter_mut += x;
4898                    *counter_mut
4899                },
4900                commutative = manual_proof!(/** test */)
4901            ))
4902            .assume_ordering::<TotalOrder>(nondet!(/** test */))
4903            .sim_output();
4904
4905        counter.into_stream().for_each(q!(|_| {}));
4906
4907        let count = flow.sim().exhaustive(async || {
4908            let _all: Vec<i32> = out_recv.collect().await;
4909        });
4910
4911        assert_eq!(
4912            count, 2,
4913            "Expected 2 simulation instances due to mut on unordered input, got {}",
4914            count
4915        );
4916    }
4917}