1use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11#[cfg(feature = "tokio")]
12use tokio::time::Instant;
13
14use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
15use super::keyed_singleton::KeyedSingleton;
16use super::keyed_stream::{Generate, KeyedStream};
17use super::optional::Optional;
18use super::singleton::Singleton;
19use crate::compile::builder::{CycleId, FlowState};
20use crate::compile::ir::{
21 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
22};
23#[cfg(stageleft_runtime)]
24use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
25use crate::forward_handle::{ForwardRef, TickCycle};
26use crate::live_collections::batch_atomic::BatchAtomic;
27use crate::live_collections::singleton::SingletonBound;
28#[cfg(stageleft_runtime)]
29use crate::location::dynamic::{DynLocation, LocationId};
30use crate::location::tick::{Atomic, DeferTick};
31use crate::location::{Location, Tick, TopLevel, check_matching_location};
32use crate::manual_expr::ManualExpr;
33use crate::nondet::{NonDet, nondet};
34use crate::prelude::manual_proof;
35use crate::properties::{
36 AggFuncAlgebra, ApplyMonotoneStream, StreamMapFuncAlgebra, ValidCommutativityFor,
37 ValidIdempotenceFor, ValidMutBorrowCommutativityFor, ValidMutBorrowIdempotenceFor,
38 ValidMutCommutativityFor, ValidMutIdempotenceFor,
39};
40
41pub mod networking;
42
43#[sealed::sealed]
45pub trait Ordering:
46 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
47{
48 const ORDERING_KIND: StreamOrder;
50}
51
52pub enum TotalOrder {}
56
57#[sealed::sealed]
58impl Ordering for TotalOrder {
59 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
60}
61
62pub enum NoOrder {}
68
69#[sealed::sealed]
70impl Ordering for NoOrder {
71 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
72}
73
74#[sealed::sealed]
78pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
79#[sealed::sealed]
80impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
81
82#[sealed::sealed]
84pub trait MinOrder<Other: ?Sized> {
85 type Min: Ordering;
87}
88
89#[sealed::sealed]
90impl<O: Ordering> MinOrder<O> for TotalOrder {
91 type Min = O;
92}
93
94#[sealed::sealed]
95impl<O: Ordering> MinOrder<O> for NoOrder {
96 type Min = NoOrder;
97}
98
99#[sealed::sealed]
101pub trait Retries:
102 MinRetries<Self, Min = Self>
103 + MinRetries<ExactlyOnce, Min = Self>
104 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
105{
106 const RETRIES_KIND: StreamRetry;
108}
109
110pub enum ExactlyOnce {}
113
114#[sealed::sealed]
115impl Retries for ExactlyOnce {
116 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
117}
118
119pub enum AtLeastOnce {}
122
123#[sealed::sealed]
124impl Retries for AtLeastOnce {
125 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
126}
127
128#[sealed::sealed]
132pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
133#[sealed::sealed]
134impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
135
136#[sealed::sealed]
138pub trait MinRetries<Other: ?Sized> {
139 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
141}
142
143#[sealed::sealed]
144impl<R: Retries> MinRetries<R> for ExactlyOnce {
145 type Min = R;
146}
147
148#[sealed::sealed]
149impl<R: Retries> MinRetries<R> for AtLeastOnce {
150 type Min = AtLeastOnce;
151}
152
153#[sealed::sealed]
154#[diagnostic::on_unimplemented(
155 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
156 label = "required here",
157 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
158)]
159pub trait IsOrdered: Ordering {}
161
162#[sealed::sealed]
163#[diagnostic::do_not_recommend]
164impl IsOrdered for TotalOrder {}
165
166#[sealed::sealed]
167#[diagnostic::on_unimplemented(
168 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
169 label = "required here",
170 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
171)]
172pub trait IsExactlyOnce: Retries {}
174
175#[sealed::sealed]
176#[diagnostic::do_not_recommend]
177impl IsExactlyOnce for ExactlyOnce {}
178
179pub struct Stream<
199 Type,
200 Loc,
201 Bound: Boundedness = Unbounded,
202 Order: Ordering = TotalOrder,
203 Retry: Retries = ExactlyOnce,
204> {
205 pub(crate) location: Loc,
206 pub(crate) ir_node: RefCell<HydroNode>,
207 pub(crate) flow_state: FlowState,
208
209 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
210}
211
212impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
213 fn drop(&mut self) {
214 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
215 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
216 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
217 input: Box::new(ir_node),
218 op_metadata: HydroIrOpMetadata::new(),
219 });
220 }
221 }
222}
223
224impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
225 for Stream<T, L, Unbounded, O, R>
226where
227 L: Location<'a>,
228{
229 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
230 let new_meta = stream
231 .location
232 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
233
234 Stream {
235 location: stream.location.clone(),
236 flow_state: stream.flow_state.clone(),
237 ir_node: RefCell::new(HydroNode::Cast {
238 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
239 metadata: new_meta,
240 }),
241 _phantom: PhantomData,
242 }
243 }
244}
245
246impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
247 for Stream<T, L, B, NoOrder, R>
248where
249 L: Location<'a>,
250{
251 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
252 stream.weaken_ordering()
253 }
254}
255
256impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
257 for Stream<T, L, B, O, AtLeastOnce>
258where
259 L: Location<'a>,
260{
261 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
262 stream.weaken_retries()
263 }
264}
265
266impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
267where
268 L: Location<'a>,
269{
270 fn defer_tick(self) -> Self {
271 Stream::defer_tick(self)
272 }
273}
274
275impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
276 for Stream<T, Tick<L>, Bounded, O, R>
277where
278 L: Location<'a>,
279{
280 type Location = Tick<L>;
281
282 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
283 Stream::new(
284 location.clone(),
285 HydroNode::CycleSource {
286 cycle_id,
287 metadata: location.new_node_metadata(Self::collection_kind()),
288 },
289 )
290 }
291}
292
293impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
294 for Stream<T, Tick<L>, Bounded, O, R>
295where
296 L: Location<'a>,
297{
298 type Location = Tick<L>;
299
300 fn location(&self) -> &Self::Location {
301 self.location()
302 }
303
304 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
305 let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
306 location.clone(),
307 HydroNode::DeferTick {
308 input: Box::new(HydroNode::CycleSource {
309 cycle_id,
310 metadata: location.new_node_metadata(Self::collection_kind()),
311 }),
312 metadata: location.new_node_metadata(Self::collection_kind()),
313 },
314 );
315
316 from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
317 }
318}
319
320impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
321 for Stream<T, Tick<L>, Bounded, O, R>
322where
323 L: Location<'a>,
324{
325 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
326 assert_eq!(
327 Location::id(&self.location),
328 expected_location,
329 "locations do not match"
330 );
331 self.location
332 .flow_state()
333 .borrow_mut()
334 .push_root(HydroRoot::CycleSink {
335 cycle_id,
336 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
337 op_metadata: HydroIrOpMetadata::new(),
338 });
339 }
340}
341
342impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
343 for Stream<T, L, B, O, R>
344where
345 L: Location<'a>,
346{
347 type Location = L;
348
349 fn create_source(cycle_id: CycleId, location: L) -> Self {
350 Stream::new(
351 location.clone(),
352 HydroNode::CycleSource {
353 cycle_id,
354 metadata: location.new_node_metadata(Self::collection_kind()),
355 },
356 )
357 }
358}
359
360impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
361 for Stream<T, L, B, O, R>
362where
363 L: Location<'a>,
364{
365 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
366 assert_eq!(
367 Location::id(&self.location),
368 expected_location,
369 "locations do not match"
370 );
371 self.location
372 .flow_state()
373 .borrow_mut()
374 .push_root(HydroRoot::CycleSink {
375 cycle_id,
376 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
377 op_metadata: HydroIrOpMetadata::new(),
378 });
379 }
380}
381
382impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
383where
384 T: Clone,
385 L: Location<'a>,
386{
387 fn clone(&self) -> Self {
388 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
389 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
390 *self.ir_node.borrow_mut() = HydroNode::Tee {
391 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
392 metadata: self.location.new_node_metadata(Self::collection_kind()),
393 };
394 }
395
396 let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
397 unreachable!()
398 };
399 Stream {
400 location: self.location.clone(),
401 flow_state: self.flow_state.clone(),
402 ir_node: HydroNode::Tee {
403 inner: SharedNode(inner.0.clone()),
404 metadata: metadata.clone(),
405 }
406 .into(),
407 _phantom: PhantomData,
408 }
409 }
410}
411
412impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
413where
414 L: Location<'a>,
415{
416 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
417 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
418 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
419
420 let flow_state = location.flow_state().clone();
421 Stream {
422 location,
423 flow_state,
424 ir_node: RefCell::new(ir_node),
425 _phantom: PhantomData,
426 }
427 }
428
429 pub fn location(&self) -> &L {
431 &self.location
432 }
433
434 pub fn by_ref(&self) -> crate::handoff_ref::StreamRef<'a, '_, T, L>
439 where
440 B: IsBounded,
441 {
442 crate::handoff_ref::StreamRef::new(&self.ir_node)
443 }
444
445 pub fn by_mut(&self) -> crate::handoff_ref::StreamMut<'a, '_, T, L>
448 where
449 B: IsBounded,
450 {
451 crate::handoff_ref::StreamMut::new(&self.ir_node)
452 }
453
454 pub fn weaken_consistency(self) -> Stream<T, L::DropConsistency, B, O, R>
457 where
458 L: Location<'a>,
459 {
460 if L::consistency()
461 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
462 {
463 Stream::new(
465 self.location.drop_consistency(),
466 self.ir_node.replace(HydroNode::Placeholder),
467 )
468 } else {
469 Stream::new(
470 self.location.drop_consistency(),
471 HydroNode::Cast {
472 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
473 metadata: self.location.drop_consistency().new_node_metadata(Stream::<
474 T,
475 L::DropConsistency,
476 B,
477 O,
478 R,
479 >::collection_kind(
480 )),
481 },
482 )
483 }
484 }
485
486 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
490 self,
491 _proof: impl crate::properties::ConsistencyProof,
492 ) -> Stream<T, L2, B, O, R>
493 where
494 L: Location<'a>,
495 {
496 if L::consistency() == L2::consistency() {
497 Stream::new(
498 self.location.with_consistency_of(),
499 self.ir_node.replace(HydroNode::Placeholder),
500 )
501 } else {
502 Stream::new(
503 self.location.with_consistency_of(),
504 HydroNode::AssertIsConsistent {
505 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
506 trusted: false,
507 metadata: self
508 .location
509 .clone()
510 .with_consistency_of::<L2>()
511 .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
512 },
513 )
514 }
515 }
516
517 pub(crate) fn assert_has_consistency_of_trusted<
518 L2: Location<'a, DropConsistency = L::DropConsistency>,
519 >(
520 self,
521 _proof: impl crate::properties::ConsistencyProof,
522 ) -> Stream<T, L2, B, O, R>
523 where
524 L: Location<'a>,
525 {
526 if L::consistency() == L2::consistency() {
527 Stream::new(
528 self.location.with_consistency_of(),
529 self.ir_node.replace(HydroNode::Placeholder),
530 )
531 } else {
532 Stream::new(
533 self.location.with_consistency_of(),
534 HydroNode::AssertIsConsistent {
535 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
536 trusted: true,
537 metadata: self
538 .location
539 .clone()
540 .with_consistency_of::<L2>()
541 .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
542 },
543 )
544 }
545 }
546
547 pub(crate) fn collection_kind() -> CollectionKind {
548 CollectionKind::Stream {
549 bound: B::BOUND_KIND,
550 order: O::ORDERING_KIND,
551 retry: R::RETRIES_KIND,
552 element_type: quote_type::<T>().into(),
553 }
554 }
555
556 pub fn map<U, F, C, I, const WAS_MUT: bool>(
576 self,
577 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, I>>,
578 ) -> Stream<U, L, B, O, R>
579 where
580 F: FnMut(T) -> U + 'a,
581 C: ValidMutCommutativityFor<F, T, U, O, WAS_MUT>,
582 I: ValidMutIdempotenceFor<F, T, U, R, WAS_MUT>,
583 {
584 let f = crate::handoff_ref::with_ref_capture(|| {
585 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
586 proof.register_proof(&expr);
587 expr.into()
588 });
589 Stream::new(
590 self.location.clone(),
591 HydroNode::Map {
592 f,
593 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
594 metadata: self
595 .location
596 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
597 },
598 )
599 }
600
601 pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
626 self,
627 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
628 ) -> Stream<U, L, B, O, R>
629 where
630 I: IntoIterator<Item = U>,
631 F: FnMut(T) -> I + 'a,
632 C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
633 Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
634 {
635 let f = crate::handoff_ref::with_ref_capture(|| {
636 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
637 proof.register_proof(&expr);
638 expr.into()
639 });
640 Stream::new(
641 self.location.clone(),
642 HydroNode::FlatMap {
643 f,
644 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
645 metadata: self
646 .location
647 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
648 },
649 )
650 }
651
652 pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
679 self,
680 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
681 ) -> Stream<U, L, B, NoOrder, R>
682 where
683 I: IntoIterator<Item = U>,
684 F: FnMut(T) -> I + 'a,
685 C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
686 Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
687 {
688 let f = crate::handoff_ref::with_ref_capture(|| {
689 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
690 proof.register_proof(&expr);
691 expr.into()
692 });
693 Stream::new(
694 self.location.clone(),
695 HydroNode::FlatMap {
696 f,
697 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
698 metadata: self
699 .location
700 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
701 },
702 )
703 }
704
705 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
728 where
729 T: IntoIterator<Item = U>,
730 {
731 self.flat_map_ordered(q!(|d| d))
732 }
733
734 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
761 where
762 T: IntoIterator<Item = U>,
763 {
764 self.flat_map_unordered(q!(|d| d))
765 }
766
767 pub fn flat_map_stream_blocking<U, S, F, C, Idemp, const WAS_MUT: bool>(
771 self,
772 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
773 ) -> Stream<U, L, B, O, R>
774 where
775 S: futures::Stream<Item = U>,
776 F: FnMut(T) -> S + 'a,
777 C: ValidMutCommutativityFor<F, T, S, O, WAS_MUT>,
778 Idemp: ValidMutIdempotenceFor<F, T, S, R, WAS_MUT>,
779 {
780 let f = crate::handoff_ref::with_ref_capture(|| {
781 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
782 proof.register_proof(&expr);
783 expr.into()
784 });
785 Stream::new(
786 self.location.clone(),
787 HydroNode::FlatMapStreamBlocking {
788 f,
789 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
790 metadata: self
791 .location
792 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
793 },
794 )
795 }
796
797 pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
801 where
802 T: futures::Stream<Item = U>,
803 {
804 self.flat_map_stream_blocking(q!(|d| d))
805 }
806
807 pub fn filter<F, C, Idemp, const WAS_MUT: bool>(
832 self,
833 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
834 ) -> Self
835 where
836 F: FnMut(&T) -> bool + 'a,
837 C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
838 Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
839 {
840 let f = crate::handoff_ref::with_ref_capture(|| {
841 let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
842 proof.register_proof(&expr);
843 expr.into()
844 });
845 Stream::new(
846 self.location.clone(),
847 HydroNode::Filter {
848 f,
849 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
850 metadata: self.location.new_node_metadata(Self::collection_kind()),
851 },
852 )
853 }
854
855 pub fn partition<F, C, Idemp, const WAS_MUT: bool>(
890 self,
891 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
892 ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
893 where
894 F: FnMut(&T) -> bool + 'a,
895 C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
896 Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
897 {
898 let f = crate::handoff_ref::with_ref_capture(|| {
899 let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
900 proof.register_proof(&expr);
901 expr.into()
902 });
903 let shared = SharedNode(Rc::new(RefCell::new(
904 self.ir_node.replace(HydroNode::Placeholder),
905 )));
906
907 let true_stream = Stream::new(
908 self.location.clone(),
909 HydroNode::Partition {
910 inner: SharedNode(shared.0.clone()),
911 f: f.clone(),
912 is_true: true,
913 metadata: self.location.new_node_metadata(Self::collection_kind()),
914 },
915 );
916
917 let false_stream = Stream::new(
918 self.location.clone(),
919 HydroNode::Partition {
920 inner: SharedNode(shared.0),
921 f,
922 is_true: false,
923 metadata: self.location.new_node_metadata(Self::collection_kind()),
924 },
925 );
926
927 (true_stream, false_stream)
928 }
929
930 pub fn filter_map<U, F, C, Idemp, const WAS_MUT: bool>(
950 self,
951 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
952 ) -> Stream<U, L, B, O, R>
953 where
954 F: FnMut(T) -> Option<U> + 'a,
955 C: ValidMutCommutativityFor<F, T, Option<U>, O, WAS_MUT>,
956 Idemp: ValidMutIdempotenceFor<F, T, Option<U>, R, WAS_MUT>,
957 {
958 let f = crate::handoff_ref::with_ref_capture(|| {
959 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
960 proof.register_proof(&expr);
961 expr.into()
962 });
963 Stream::new(
964 self.location.clone(),
965 HydroNode::FilterMap {
966 f,
967 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
968 metadata: self
969 .location
970 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
971 },
972 )
973 }
974
975 pub fn cross_singleton<O2>(
1000 self,
1001 other: impl Into<Optional<O2, L, Bounded>>,
1002 ) -> Stream<(T, O2), L, B, O, R>
1003 where
1004 O2: Clone,
1005 {
1006 let other: Optional<O2, L, Bounded> = other.into();
1007 check_matching_location(&self.location, &other.location);
1008
1009 Stream::new(
1010 self.location.clone(),
1011 HydroNode::CrossSingleton {
1012 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1013 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1014 metadata: self
1015 .location
1016 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
1017 },
1018 )
1019 }
1020
1021 pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
1053 self.cross_singleton(signal.filter(q!(|b| *b)))
1054 .map(q!(|(d, _)| d))
1055 }
1056
1057 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1092 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1093 self.filter_if(signal.is_some())
1094 }
1095
1096 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1131 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1132 self.filter_if(other.is_none())
1133 }
1134
1135 pub fn cross_product<T2, B2: Boundedness, O2: Ordering, R2: Retries>(
1160 self,
1161 other: Stream<T2, L, B2, O2, R2>,
1162 ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
1163 where
1164 T: Clone,
1165 T2: Clone,
1166 R: MinRetries<R2>,
1167 {
1168 self.map(q!(|v| ((), v)))
1169 .join(other.map(q!(|v| ((), v))))
1170 .map(q!(|((), (v1, v2))| (v1, v2)))
1171 }
1172
1173 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1192 where
1193 T: Eq + Hash,
1194 {
1195 Stream::new(
1196 self.location.clone(),
1197 HydroNode::Unique {
1198 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1199 metadata: self
1200 .location
1201 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1202 },
1203 )
1204 }
1205
1206 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1232 where
1233 T: Eq + Hash,
1234 B2: IsBounded,
1235 {
1236 check_matching_location(&self.location, &other.location);
1237
1238 Stream::new(
1239 self.location.clone(),
1240 HydroNode::Difference {
1241 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1242 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1243 metadata: self
1244 .location
1245 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1246 },
1247 )
1248 }
1249
1250 pub fn inspect<F, C, Idemp, const WAS_MUT: bool>(
1271 self,
1272 f: impl IntoQuotedMut<'a, F, L::DropConsistency, StreamMapFuncAlgebra<C, Idemp>>,
1273 ) -> Self
1274 where
1275 F: FnMut(&T) + 'a,
1276 C: ValidMutBorrowCommutativityFor<F, T, (), O, WAS_MUT>,
1277 Idemp: ValidMutBorrowIdempotenceFor<F, T, (), R, WAS_MUT>,
1278 {
1279 let f = crate::handoff_ref::with_ref_capture(|| {
1280 let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location.drop_consistency());
1281 proof.register_proof(&expr);
1282 expr.into()
1283 });
1284
1285 Stream::new(
1286 self.location.clone(),
1287 HydroNode::Inspect {
1288 f,
1289 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1290 metadata: self.location.new_node_metadata(Self::collection_kind()),
1291 },
1292 )
1293 }
1294
1295 pub fn for_each<F: FnMut(T) + 'a, C, I>(
1311 self,
1312 f: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, I>>,
1313 ) where
1314 C: ValidCommutativityFor<O>,
1315 I: ValidIdempotenceFor<R>,
1316 {
1317 let f = crate::handoff_ref::with_ref_capture(|| {
1318 let (f, proof) = f.splice_fnmut1_ctx_props(&self.location);
1319 proof.register_proof(&f);
1320 f.into()
1321 });
1322 self.location
1323 .flow_state()
1324 .borrow_mut()
1325 .push_root(HydroRoot::ForEach {
1326 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1327 f,
1328 op_metadata: HydroIrOpMetadata::new(),
1329 });
1330 }
1331
1332 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1338 where
1339 O: IsOrdered,
1340 R: IsExactlyOnce,
1341 S: 'a + futures::Sink<T> + Unpin,
1342 {
1343 self.location
1344 .flow_state()
1345 .borrow_mut()
1346 .push_root(HydroRoot::DestSink {
1347 sink: sink.splice_typed_ctx(&self.location).into(),
1348 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1349 op_metadata: HydroIrOpMetadata::new(),
1350 });
1351 }
1352
1353 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1373 where
1374 O: IsOrdered,
1375 R: IsExactlyOnce,
1376 {
1377 Stream::new(
1378 self.location.clone(),
1379 HydroNode::Enumerate {
1380 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1381 metadata: self.location.new_node_metadata(Stream::<
1382 (usize, T),
1383 L,
1384 B,
1385 TotalOrder,
1386 ExactlyOnce,
1387 >::collection_kind()),
1388 },
1389 )
1390 }
1391
1392 pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1416 self,
1417 init: impl IntoQuotedMut<'a, I, L>,
1418 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1419 ) -> Singleton<A, L, B2>
1420 where
1421 I: Fn() -> A + 'a,
1422 F: 'a + Fn(&mut A, T),
1423 C: ValidCommutativityFor<O>,
1424 Idemp: ValidIdempotenceFor<R>,
1425 B: ApplyMonotoneStream<M, B2>,
1426 {
1427 let init = init.splice_fn0_ctx(&self.location).into();
1428 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1429 proof.register_proof(&comb);
1430
1431 let nondet = nondet!();
1434 let retried: Stream<T, L::DropConsistency, B, O, ExactlyOnce> = self.assume_retries(nondet);
1435
1436 let core = HydroNode::Fold {
1437 init,
1438 acc: comb.into(),
1439 input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1440 metadata: retried
1441 .location
1442 .new_node_metadata(Singleton::<A, L::DropConsistency, B2>::collection_kind()),
1443 };
1448
1449 Singleton::new(retried.location.clone(), core)
1450 .assert_has_consistency_of(manual_proof!())
1451 }
1452
1453 pub fn reduce<F, C, Idemp>(
1476 self,
1477 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1478 ) -> Optional<T, L, B>
1479 where
1480 F: Fn(&mut T, T) + 'a,
1481 C: ValidCommutativityFor<O>,
1482 Idemp: ValidIdempotenceFor<R>,
1483 {
1484 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1485 proof.register_proof(&f);
1486
1487 let nondet = nondet!();
1488 let ordered_etc: Stream<T, L::DropConsistency, B> =
1489 self.assume_retries(nondet).assume_ordering(nondet);
1490
1491 let core = HydroNode::Reduce {
1492 f: f.into(),
1493 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1494 metadata: ordered_etc
1495 .location
1496 .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
1497 };
1498
1499 Optional::new(ordered_etc.location.clone(), core)
1500 .assert_has_consistency_of(manual_proof!())
1501 }
1502
1503 pub fn max(self) -> Optional<T, L, B>
1523 where
1524 T: Ord,
1525 {
1526 self.assume_retries_trusted::<ExactlyOnce>(nondet!())
1527 .assume_ordering_trusted_bounded::<TotalOrder>(
1528 nondet!(),
1529 )
1530 .reduce(q!(|curr, new| {
1531 if new > *curr {
1532 *curr = new;
1533 }
1534 }))
1535 }
1536
1537 pub fn min(self) -> Optional<T, L, B>
1557 where
1558 T: Ord,
1559 {
1560 self.assume_retries_trusted::<ExactlyOnce>(nondet!())
1561 .assume_ordering_trusted_bounded::<TotalOrder>(
1562 nondet!(),
1563 )
1564 .reduce(q!(|curr, new| {
1565 if new < *curr {
1566 *curr = new;
1567 }
1568 }))
1569 }
1570
1571 pub fn first(self) -> Optional<T, L, B>
1594 where
1595 O: IsOrdered,
1596 {
1597 self.make_totally_ordered()
1598 .assume_retries_trusted::<ExactlyOnce>(nondet!())
1599 .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1600 .reduce(q!(|_, _| {}))
1601 }
1602
1603 pub fn last(self) -> Optional<T, L, B>
1626 where
1627 O: IsOrdered,
1628 {
1629 self.make_totally_ordered()
1630 .assume_retries_trusted::<ExactlyOnce>(nondet!())
1631 .reduce(q!(|curr, new| *curr = new))
1632 }
1633
1634 pub fn limit(
1657 self,
1658 n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1659 ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1660 where
1661 O: IsOrdered,
1662 R: IsExactlyOnce,
1663 {
1664 self.generator(
1665 q!(|| 0usize),
1666 q!(move |count, item| {
1667 if *count == n {
1668 Generate::Break
1669 } else {
1670 *count += 1;
1671 if *count == n {
1672 Generate::Return(item)
1673 } else {
1674 Generate::Yield(item)
1675 }
1676 }
1677 }),
1678 )
1679 }
1680
1681 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1707 where
1708 O: IsOrdered,
1709 R: IsExactlyOnce,
1710 {
1711 self.make_totally_ordered().make_exactly_once().fold(
1712 q!(|| vec![]),
1713 q!(|acc, v| {
1714 acc.push(v);
1715 }),
1716 )
1717 }
1718
1719 pub fn scan<A, U, I, F>(
1780 self,
1781 init: impl IntoQuotedMut<'a, I, L>,
1782 f: impl IntoQuotedMut<'a, F, L>,
1783 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1784 where
1785 O: IsOrdered,
1786 R: IsExactlyOnce,
1787 I: Fn() -> A + 'a,
1788 F: Fn(&mut A, T) -> Option<U> + 'a,
1789 {
1790 let init = init.splice_fn0_ctx(&self.location).into();
1791 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1792
1793 Stream::new(
1794 self.location.clone(),
1795 HydroNode::Scan {
1796 init,
1797 acc: f,
1798 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1799 metadata: self.location.new_node_metadata(
1800 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1801 ),
1802 },
1803 )
1804 }
1805
1806 pub fn scan_async_blocking<A, U, I, F, Fut>(
1840 self,
1841 init: impl IntoQuotedMut<'a, I, L>,
1842 f: impl IntoQuotedMut<'a, F, L>,
1843 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1844 where
1845 O: IsOrdered,
1846 R: IsExactlyOnce,
1847 I: Fn() -> A + 'a,
1848 F: Fn(&mut A, T) -> Fut + 'a,
1849 Fut: Future<Output = Option<U>> + 'a,
1850 {
1851 let init = init.splice_fn0_ctx(&self.location).into();
1852 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1853
1854 Stream::new(
1855 self.location.clone(),
1856 HydroNode::ScanAsyncBlocking {
1857 init,
1858 acc: f,
1859 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1860 metadata: self.location.new_node_metadata(
1861 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1862 ),
1863 },
1864 )
1865 }
1866
1867 pub fn generator<A, U, I, F>(
1907 self,
1908 init: impl IntoQuotedMut<'a, I, L> + Copy,
1909 f: impl IntoQuotedMut<'a, F, L> + Copy,
1910 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1911 where
1912 O: IsOrdered,
1913 R: IsExactlyOnce,
1914 I: Fn() -> A + 'a,
1915 F: Fn(&mut A, T) -> Generate<U> + 'a,
1916 {
1917 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1918 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1919
1920 let this = self.make_totally_ordered().make_exactly_once();
1921
1922 let scan_init = q!(|| None)
1927 .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1928 .into();
1929 let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1930 if state.is_none() {
1931 *state = Some(Some(init()));
1932 }
1933 match state {
1934 Some(Some(state_value)) => match f(state_value, v) {
1935 Generate::Yield(out) => Some(Some(out)),
1936 Generate::Return(out) => {
1937 *state = Some(None);
1938 Some(Some(out))
1939 }
1940 Generate::Break => None,
1944 Generate::Continue => Some(None),
1945 },
1946 _ => None,
1948 }
1949 })
1950 .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1951 .into();
1952
1953 let scan_node = HydroNode::Scan {
1954 init: scan_init,
1955 acc: scan_f,
1956 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1957 metadata: this.location.new_node_metadata(Stream::<
1958 Option<U>,
1959 L,
1960 B,
1961 TotalOrder,
1962 ExactlyOnce,
1963 >::collection_kind()),
1964 };
1965
1966 let flatten_f = q!(|d| d)
1967 .splice_fn1_ctx::<Option<U>, _>(&this.location)
1968 .into();
1969 let flatten_node = HydroNode::FlatMap {
1970 f: flatten_f,
1971 input: Box::new(scan_node),
1972 metadata: this
1973 .location
1974 .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1975 };
1976
1977 Stream::new(this.location.clone(), flatten_node)
1978 }
1979
1980 #[cfg(feature = "tokio")]
1989 pub fn sample_every(
1990 self,
1991 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1992 nondet: NonDet,
1993 ) -> Stream<T, L::DropConsistency, Unbounded, O, AtLeastOnce>
1994 where
1995 L: TopLevel<'a>,
1996 {
1997 let samples = self.location.source_interval(interval);
1998
1999 let tick = self.location.tick();
2000 self.batch(&tick, nondet)
2001 .filter_if(samples.batch(&tick, nondet).first().is_some())
2002 .all_ticks()
2003 .weaken_retries()
2004 }
2005
2006 #[cfg(feature = "tokio")]
2016 pub fn timeout(
2017 self,
2018 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::DropConsistency>> + Copy + 'a,
2019 nondet: NonDet,
2020 ) -> Optional<(), L::DropConsistency, Unbounded>
2021 where
2022 L: TopLevel<'a>,
2023 {
2024 let tick = self.location.tick();
2025
2026 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
2027 q!(|| None),
2028 q!(
2029 |latest, _| {
2030 *latest = Some(Instant::now());
2031 },
2032 commutative = manual_proof!()
2033 ),
2034 );
2035
2036 latest_received
2037 .snapshot(&tick, nondet)
2038 .filter_map(q!(move |latest_received| {
2039 if let Some(latest_received) = latest_received {
2040 if Instant::now().duration_since(latest_received) > duration {
2041 Some(())
2042 } else {
2043 None
2044 }
2045 } else {
2046 Some(())
2047 }
2048 }))
2049 .latest()
2050 }
2051
2052 pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
2058 let id = self.location.flow_state().borrow_mut().next_clock_id();
2059 let out_location = Atomic {
2060 tick: Tick {
2061 id,
2062 l: self.location.clone(),
2063 },
2064 };
2065 Stream::new(
2066 out_location.clone(),
2067 HydroNode::BeginAtomic {
2068 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2069 metadata: out_location
2070 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2071 },
2072 )
2073 }
2074
2075 pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2083 self,
2084 tick: &Tick<L2>,
2085 _nondet: NonDet,
2086 ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2087 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2088 Stream::new(
2089 tick.drop_consistency(),
2090 HydroNode::Batch {
2091 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2092 metadata: tick
2093 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2094 },
2095 )
2096 }
2097
2098 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
2101 {
2102 let mut node = self.ir_node.borrow_mut();
2103 let metadata = node.metadata_mut();
2104 metadata.tag = Some(name.to_owned());
2105 }
2106 self
2107 }
2108
2109 pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
2113 where
2114 B: IsBounded,
2115 {
2116 Optional::new(
2117 self.location.clone(),
2118 HydroNode::Cast {
2119 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2120 metadata: self
2121 .location
2122 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
2123 },
2124 )
2125 }
2126
2127 pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
2128 if O::ORDERING_KIND == O2::ORDERING_KIND {
2129 Stream::new(
2130 self.location.clone(),
2131 self.ir_node.replace(HydroNode::Placeholder),
2132 )
2133 } else {
2134 panic!(
2135 "Runtime ordering {:?} did not match requested cast {:?}.",
2136 O::ORDERING_KIND,
2137 O2::ORDERING_KIND
2138 )
2139 }
2140 }
2141
2142 pub fn assume_ordering<O2: Ordering>(
2151 self,
2152 _nondet: NonDet,
2153 ) -> Stream<T, L::DropConsistency, B, O2, R> {
2154 if O::ORDERING_KIND == O2::ORDERING_KIND {
2155 self.use_ordering_type().weaken_consistency()
2156 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2157 let target_location = self.location().drop_consistency();
2159 Stream::new(
2160 target_location.clone(),
2161 HydroNode::Cast {
2162 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2163 metadata: target_location
2164 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2165 },
2166 )
2167 } else {
2168 let target_location = self.location().drop_consistency();
2169 Stream::new(
2170 target_location.clone(),
2171 HydroNode::ObserveNonDet {
2172 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2173 trusted: false,
2174 metadata: target_location
2175 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2176 },
2177 )
2178 }
2179 }
2180
2181 fn assume_ordering_trusted_bounded<O2: Ordering>(
2184 self,
2185 nondet: NonDet,
2186 ) -> Stream<T, L, B, O2, R> {
2187 if B::BOUNDED {
2188 self.assume_ordering_trusted(nondet)
2189 } else {
2190 let self_location = self.location.clone();
2191 let inner: Stream<T, L::DropConsistency, B, O2, R> = self.assume_ordering(nondet);
2192 Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2193 }
2194 }
2195
2196 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2199 self,
2200 _nondet: NonDet,
2201 ) -> Stream<T, L, B, O2, R> {
2202 if O::ORDERING_KIND == O2::ORDERING_KIND {
2203 self.use_ordering_type()
2204 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2205 Stream::new(
2207 self.location.clone(),
2208 HydroNode::Cast {
2209 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2210 metadata: self
2211 .location
2212 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2213 },
2214 )
2215 } else {
2216 Stream::new(
2217 self.location.clone(),
2218 HydroNode::ObserveNonDet {
2219 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2220 trusted: true,
2221 metadata: self
2222 .location
2223 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2224 },
2225 )
2226 }
2227 }
2228
2229 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2230 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2233 self.weaken_ordering::<NoOrder>()
2234 }
2235
2236 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2239 let nondet = nondet!();
2240 self.assume_ordering_trusted::<O2>(nondet)
2241 }
2242
2243 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2246 where
2247 O: IsOrdered,
2248 {
2249 self.assume_ordering_trusted(nondet!())
2250 }
2251
2252 pub fn assume_retries<R2: Retries>(
2261 self,
2262 _nondet: NonDet,
2263 ) -> Stream<T, L::DropConsistency, B, O, R2> {
2264 if R::RETRIES_KIND == R2::RETRIES_KIND {
2265 Stream::new(
2266 self.location.drop_consistency(),
2267 self.ir_node.replace(HydroNode::Placeholder),
2268 )
2269 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2270 let target_location = self.location.drop_consistency();
2272 Stream::new(
2273 target_location.clone(),
2274 HydroNode::Cast {
2275 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2276 metadata: target_location
2277 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2278 },
2279 )
2280 } else {
2281 let target_location = self.location.drop_consistency();
2282 Stream::new(
2283 target_location.clone(),
2284 HydroNode::ObserveNonDet {
2285 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2286 trusted: false,
2287 metadata: target_location
2288 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2289 },
2290 )
2291 }
2292 }
2293
2294 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2297 if R::RETRIES_KIND == R2::RETRIES_KIND {
2298 Stream::new(
2299 self.location.clone(),
2300 self.ir_node.replace(HydroNode::Placeholder),
2301 )
2302 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2303 Stream::new(
2305 self.location.clone(),
2306 HydroNode::Cast {
2307 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2308 metadata: self
2309 .location
2310 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2311 },
2312 )
2313 } else {
2314 Stream::new(
2315 self.location.clone(),
2316 HydroNode::ObserveNonDet {
2317 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2318 trusted: true,
2319 metadata: self
2320 .location
2321 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2322 },
2323 )
2324 }
2325 }
2326
2327 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2328 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2331 self.weaken_retries::<AtLeastOnce>()
2332 }
2333
2334 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2337 let nondet = nondet!();
2338 self.assume_retries_trusted::<R2>(nondet)
2339 }
2340
2341 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2344 where
2345 R: IsExactlyOnce,
2346 {
2347 self.assume_retries_trusted(nondet!())
2348 }
2349
2350 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2353 where
2354 B: IsBounded,
2355 {
2356 self.weaken_boundedness()
2357 }
2358
2359 pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2362 if B::BOUNDED == B2::BOUNDED {
2363 Stream::new(
2364 self.location.clone(),
2365 self.ir_node.replace(HydroNode::Placeholder),
2366 )
2367 } else {
2368 Stream::new(
2370 self.location.clone(),
2371 HydroNode::Cast {
2372 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2373 metadata: self
2374 .location
2375 .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2376 },
2377 )
2378 }
2379 }
2380}
2381
2382impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2383where
2384 L: Location<'a>,
2385{
2386 pub fn cloned(self) -> Stream<T, L, B, O, R>
2404 where
2405 T: Clone,
2406 {
2407 self.map(q!(|d| d.clone()))
2408 }
2409}
2410
2411impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2412where
2413 L: Location<'a>,
2414{
2415 pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2434 self.assume_ordering_trusted::<TotalOrder>(nondet!(
2435 ))
2437 .fold(
2438 q!(|| 0usize),
2439 q!(
2440 |count, _| *count += 1,
2441 monotone = manual_proof!()
2442 ),
2443 )
2444 }
2445}
2446
2447impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2448 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2472 self,
2473 other: Stream<T, L, Unbounded, O2, R2>,
2474 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2475 where
2476 R: MinRetries<R2>,
2477 {
2478 Stream::new(
2479 self.location.clone(),
2480 HydroNode::Chain {
2481 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2482 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2483 metadata: self.location.new_node_metadata(Stream::<
2484 T,
2485 L,
2486 Unbounded,
2487 NoOrder,
2488 <R as MinRetries<R2>>::Min,
2489 >::collection_kind()),
2490 },
2491 )
2492 }
2493
2494 #[deprecated(note = "use `merge_unordered` instead")]
2496 pub fn interleave<O2: Ordering, R2: Retries>(
2497 self,
2498 other: Stream<T, L, Unbounded, O2, R2>,
2499 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2500 where
2501 R: MinRetries<R2>,
2502 {
2503 self.merge_unordered(other)
2504 }
2505}
2506
2507impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2508 pub fn merge_ordered<R2: Retries>(
2536 self,
2537 other: Stream<T, L, B, TotalOrder, R2>,
2538 _nondet: NonDet,
2539 ) -> Stream<T, L::DropConsistency, B, TotalOrder, <R as MinRetries<R2>>::Min>
2540 where
2541 R: MinRetries<R2>,
2542 {
2543 let target_location = self.location().drop_consistency();
2544 Stream::new(
2545 target_location.clone(),
2546 HydroNode::MergeOrdered {
2547 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2548 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2549 metadata: target_location.new_node_metadata(Stream::<
2550 T,
2551 L::DropConsistency,
2552 B,
2553 TotalOrder,
2554 <R as MinRetries<R2>>::Min,
2555 >::collection_kind()),
2556 },
2557 )
2558 }
2559}
2560
2561impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2562where
2563 L: Location<'a>,
2564{
2565 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2591 where
2592 B: IsBounded,
2593 T: Ord,
2594 {
2595 let this = self.make_bounded();
2596 Stream::new(
2597 this.location.clone(),
2598 HydroNode::Sort {
2599 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2600 metadata: this
2601 .location
2602 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2603 },
2604 )
2605 }
2606
2607 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2635 self,
2636 other: Stream<T, L, B2, O2, R2>,
2637 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2638 where
2639 B: IsBounded,
2640 O: MinOrder<O2>,
2641 R: MinRetries<R2>,
2642 {
2643 check_matching_location(&self.location, &other.location);
2644
2645 Stream::new(
2646 self.location.clone(),
2647 HydroNode::Chain {
2648 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2649 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2650 metadata: self.location.new_node_metadata(Stream::<
2651 T,
2652 L,
2653 B2,
2654 <O as MinOrder<O2>>::Min,
2655 <R as MinRetries<R2>>::Min,
2656 >::collection_kind()),
2657 },
2658 )
2659 }
2660
2661 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>, R2: Retries>(
2665 self,
2666 other: Stream<T2, L, Bounded, O2, R2>,
2667 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, <R as MinRetries<R2>>::Min>
2668 where
2669 B: IsBounded,
2670 T: Clone,
2671 T2: Clone,
2672 R: MinRetries<R2>,
2673 {
2674 let this = self.make_bounded();
2675 check_matching_location(&this.location, &other.location);
2676
2677 Stream::new(
2678 this.location.clone(),
2679 HydroNode::CrossProduct {
2680 left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2681 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2682 metadata: this.location.new_node_metadata(Stream::<
2683 (T, T2),
2684 L,
2685 Bounded,
2686 <O2 as MinOrder<O>>::Min,
2687 <R as MinRetries<R2>>::Min,
2688 >::collection_kind()),
2689 },
2690 )
2691 }
2692
2693 pub fn repeat_with_keys<K, V2>(
2731 self,
2732 keys: KeyedSingleton<K, V2, L, Bounded>,
2733 ) -> KeyedStream<K, T, L, Bounded, O, R>
2734 where
2735 B: IsBounded,
2736 K: Clone,
2737 T: Clone,
2738 {
2739 keys.keys()
2740 .assume_ordering_trusted::<TotalOrder>(
2741 nondet!(),
2742 )
2743 .cross_product_nested_loop(self.make_bounded())
2744 .into_keyed()
2745 }
2746
2747 pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2784 where
2785 T: Future,
2786 {
2787 Stream::new(
2788 self.location.clone(),
2789 HydroNode::ResolveFuturesBlocking {
2790 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2791 metadata: self
2792 .location
2793 .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2794 },
2795 )
2796 }
2797
2798 #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2818 pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2819 where
2820 B: IsBounded,
2821 {
2822 self.make_bounded()
2823 .assume_ordering_trusted::<TotalOrder>(
2824 nondet!(),
2825 )
2826 .first()
2827 .is_none()
2828 }
2829}
2830
2831impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2832where
2833 L: Location<'a>,
2834{
2835 pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2860 self,
2861 n: Stream<(K, V2), L, B2, O2, R2>,
2862 ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2863 where
2864 K: Eq + Hash + Clone,
2865 R: MinRetries<R2>,
2866 V1: Clone,
2867 V2: Clone,
2868 {
2869 check_matching_location(&self.location, &n.location);
2870
2871 let ir_node = if B2::BOUNDED {
2872 HydroNode::JoinHalf {
2873 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2874 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2875 metadata: self.location.new_node_metadata(Stream::<
2876 (K, (V1, V2)),
2877 L,
2878 B,
2879 B2::PreserveOrderIfBounded<O>,
2880 <R as MinRetries<R2>>::Min,
2881 >::collection_kind()),
2882 }
2883 } else {
2884 HydroNode::Join {
2885 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2886 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2887 metadata: self.location.new_node_metadata(Stream::<
2888 (K, (V1, V2)),
2889 L,
2890 B,
2891 B2::PreserveOrderIfBounded<O>,
2892 <R as MinRetries<R2>>::Min,
2893 >::collection_kind()),
2894 }
2895 };
2896
2897 Stream::new(self.location.clone(), ir_node)
2898 }
2899
2900 pub fn anti_join<O2: Ordering, R2: Retries>(
2926 self,
2927 n: Stream<K, L, Bounded, O2, R2>,
2928 ) -> Stream<(K, V1), L, B, O, R>
2929 where
2930 K: Eq + Hash,
2931 {
2932 check_matching_location(&self.location, &n.location);
2933
2934 Stream::new(
2935 self.location.clone(),
2936 HydroNode::AntiJoin {
2937 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2938 neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2939 metadata: self
2940 .location
2941 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2942 },
2943 )
2944 }
2945}
2946
2947impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2948 Stream<(K, V), L, B, O, R>
2949{
2950 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2977 KeyedStream::new(
2978 self.location.clone(),
2979 HydroNode::Cast {
2980 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2981 metadata: self
2982 .location
2983 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2984 },
2985 )
2986 }
2987}
2988
2989impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2990where
2991 K: Eq + Hash,
2992 L: Location<'a>,
2993{
2994 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
3013 self.into_keyed()
3014 .fold(
3015 q!(|| ()),
3016 q!(
3017 |_, _| {},
3018 commutative = manual_proof!(),
3019 idempotent = manual_proof!()
3020 ),
3021 )
3022 .keys()
3023 }
3024}
3025
3026impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
3027where
3028 L: Location<'a>,
3029{
3030 pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
3037 self,
3038 tick: &Tick<L2>,
3039 _nondet: NonDet,
3040 ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
3041 Stream::new(
3042 tick.drop_consistency(),
3043 HydroNode::Batch {
3044 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3045 metadata: tick
3046 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3047 },
3048 )
3049 }
3050
3051 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
3054 Stream::new(
3055 self.location.tick.l.clone(),
3056 HydroNode::EndAtomic {
3057 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3058 metadata: self
3059 .location
3060 .tick
3061 .l
3062 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
3063 },
3064 )
3065 }
3066}
3067
3068impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
3069where
3070 L: TopLevel<'a>,
3071 F: Future<Output = T>,
3072{
3073 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
3104 Stream::new(
3105 self.location.clone(),
3106 HydroNode::ResolveFutures {
3107 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3108 metadata: self
3109 .location
3110 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
3111 },
3112 )
3113 }
3114
3115 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3146 Stream::new(
3147 self.location.clone(),
3148 HydroNode::ResolveFuturesOrdered {
3149 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3150 metadata: self
3151 .location
3152 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3153 },
3154 )
3155 }
3156}
3157
3158impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3159where
3160 L: Location<'a>,
3161{
3162 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3165 Stream::new(
3166 self.location.outer().clone(),
3167 HydroNode::YieldConcat {
3168 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3169 metadata: self
3170 .location
3171 .outer()
3172 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3173 },
3174 )
3175 }
3176
3177 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3184 let out_location = Atomic {
3185 tick: self.location.clone(),
3186 };
3187
3188 Stream::new(
3189 out_location.clone(),
3190 HydroNode::YieldConcat {
3191 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3192 metadata: out_location
3193 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3194 },
3195 )
3196 }
3197
3198 pub fn across_ticks<Out: BatchAtomic<'a>>(
3234 self,
3235 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3236 ) -> Out::Batched {
3237 thunk(self.all_ticks_atomic()).batched_atomic()
3238 }
3239
3240 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3279 Stream::new(
3280 self.location.clone(),
3281 HydroNode::DeferTick {
3282 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3283 metadata: self
3284 .location
3285 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3286 },
3287 )
3288 }
3289}
3290
3291#[cfg(test)]
3292mod tests {
3293 #[cfg(feature = "deploy")]
3294 use futures::{SinkExt, StreamExt};
3295 #[cfg(feature = "deploy")]
3296 use hydro_deploy::Deployment;
3297 #[cfg(feature = "deploy")]
3298 use serde::{Deserialize, Serialize};
3299 #[cfg(any(feature = "deploy", feature = "sim"))]
3300 use stageleft::q;
3301
3302 #[cfg(any(feature = "deploy", feature = "sim"))]
3303 use crate::compile::builder::FlowBuilder;
3304 #[cfg(feature = "deploy")]
3305 use crate::live_collections::sliced::sliced;
3306 #[cfg(feature = "deploy")]
3307 use crate::live_collections::stream::ExactlyOnce;
3308 #[cfg(feature = "sim")]
3309 use crate::live_collections::stream::NoOrder;
3310 #[cfg(any(feature = "deploy", feature = "sim"))]
3311 use crate::live_collections::stream::TotalOrder;
3312 #[cfg(any(feature = "deploy", feature = "sim"))]
3313 use crate::location::Location;
3314 #[cfg(feature = "sim")]
3315 use crate::networking::TCP;
3316 #[cfg(any(feature = "deploy", feature = "sim"))]
3317 use crate::nondet::nondet;
3318
3319 mod backtrace_chained_ops;
3320
3321 #[cfg(feature = "deploy")]
3322 struct P1 {}
3323 #[cfg(feature = "deploy")]
3324 struct P2 {}
3325
3326 #[cfg(feature = "deploy")]
3327 #[derive(Serialize, Deserialize, Debug)]
3328 struct SendOverNetwork {
3329 n: u32,
3330 }
3331
3332 #[cfg(feature = "deploy")]
3333 #[tokio::test]
3334 async fn first_ten_distributed() {
3335 use crate::networking::TCP;
3336
3337 let mut deployment = Deployment::new();
3338
3339 let mut flow = FlowBuilder::new();
3340 let first_node = flow.process::<P1>();
3341 let second_node = flow.process::<P2>();
3342 let external = flow.external::<P2>();
3343
3344 let numbers = first_node.source_iter(q!(0..10));
3345 let out_port = numbers
3346 .map(q!(|n| SendOverNetwork { n }))
3347 .send(&second_node, TCP.fail_stop().bincode())
3348 .send_bincode_external(&external);
3349
3350 let nodes = flow
3351 .with_process(&first_node, deployment.Localhost())
3352 .with_process(&second_node, deployment.Localhost())
3353 .with_external(&external, deployment.Localhost())
3354 .deploy(&mut deployment);
3355
3356 deployment.deploy().await.unwrap();
3357
3358 let mut external_out = nodes.connect(out_port).await;
3359
3360 deployment.start().await.unwrap();
3361
3362 for i in 0..10 {
3363 assert_eq!(external_out.next().await.unwrap().n, i);
3364 }
3365 }
3366
3367 #[cfg(feature = "deploy")]
3368 #[tokio::test]
3369 async fn first_cardinality() {
3370 let mut deployment = Deployment::new();
3371
3372 let mut flow = FlowBuilder::new();
3373 let node = flow.process::<()>();
3374 let external = flow.external::<()>();
3375
3376 let node_tick = node.tick();
3377 let count = node_tick
3378 .singleton(q!([1, 2, 3]))
3379 .into_stream()
3380 .flatten_ordered()
3381 .first()
3382 .into_stream()
3383 .count()
3384 .all_ticks()
3385 .send_bincode_external(&external);
3386
3387 let nodes = flow
3388 .with_process(&node, deployment.Localhost())
3389 .with_external(&external, deployment.Localhost())
3390 .deploy(&mut deployment);
3391
3392 deployment.deploy().await.unwrap();
3393
3394 let mut external_out = nodes.connect(count).await;
3395
3396 deployment.start().await.unwrap();
3397
3398 assert_eq!(external_out.next().await.unwrap(), 1);
3399 }
3400
3401 #[cfg(feature = "deploy")]
3402 #[tokio::test]
3403 async fn unbounded_reduce_remembers_state() {
3404 let mut deployment = Deployment::new();
3405
3406 let mut flow = FlowBuilder::new();
3407 let node = flow.process::<()>();
3408 let external = flow.external::<()>();
3409
3410 let (input_port, input) = node.source_external_bincode(&external);
3411 let out = input
3412 .reduce(q!(|acc, v| *acc += v))
3413 .sample_eager(nondet!())
3414 .send_bincode_external(&external);
3415
3416 let nodes = flow
3417 .with_process(&node, deployment.Localhost())
3418 .with_external(&external, deployment.Localhost())
3419 .deploy(&mut deployment);
3420
3421 deployment.deploy().await.unwrap();
3422
3423 let mut external_in = nodes.connect(input_port).await;
3424 let mut external_out = nodes.connect(out).await;
3425
3426 deployment.start().await.unwrap();
3427
3428 external_in.send(1).await.unwrap();
3429 assert_eq!(external_out.next().await.unwrap(), 1);
3430
3431 external_in.send(2).await.unwrap();
3432 assert_eq!(external_out.next().await.unwrap(), 3);
3433 }
3434
3435 #[cfg(feature = "deploy")]
3436 #[tokio::test]
3437 async fn top_level_bounded_cross_singleton() {
3438 let mut deployment = Deployment::new();
3439
3440 let mut flow = FlowBuilder::new();
3441 let node = flow.process::<()>();
3442 let external = flow.external::<()>();
3443
3444 let (input_port, input) =
3445 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3446
3447 let out = input
3448 .cross_singleton(
3449 node.source_iter(q!(vec![1, 2, 3]))
3450 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3451 )
3452 .send_bincode_external(&external);
3453
3454 let nodes = flow
3455 .with_process(&node, deployment.Localhost())
3456 .with_external(&external, deployment.Localhost())
3457 .deploy(&mut deployment);
3458
3459 deployment.deploy().await.unwrap();
3460
3461 let mut external_in = nodes.connect(input_port).await;
3462 let mut external_out = nodes.connect(out).await;
3463
3464 deployment.start().await.unwrap();
3465
3466 external_in.send(1).await.unwrap();
3467 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3468
3469 external_in.send(2).await.unwrap();
3470 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3471 }
3472
3473 #[cfg(feature = "deploy")]
3474 #[tokio::test]
3475 async fn top_level_bounded_reduce_cardinality() {
3476 let mut deployment = Deployment::new();
3477
3478 let mut flow = FlowBuilder::new();
3479 let node = flow.process::<()>();
3480 let external = flow.external::<()>();
3481
3482 let (input_port, input) =
3483 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3484
3485 let out = sliced! {
3486 let input = use(input, nondet!());
3487 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!());
3488 input.cross_singleton(v.into_stream().count())
3489 }
3490 .send_bincode_external(&external);
3491
3492 let nodes = flow
3493 .with_process(&node, deployment.Localhost())
3494 .with_external(&external, deployment.Localhost())
3495 .deploy(&mut deployment);
3496
3497 deployment.deploy().await.unwrap();
3498
3499 let mut external_in = nodes.connect(input_port).await;
3500 let mut external_out = nodes.connect(out).await;
3501
3502 deployment.start().await.unwrap();
3503
3504 external_in.send(1).await.unwrap();
3505 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3506
3507 external_in.send(2).await.unwrap();
3508 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3509 }
3510
3511 #[cfg(feature = "deploy")]
3512 #[tokio::test]
3513 async fn top_level_bounded_into_singleton_cardinality() {
3514 let mut deployment = Deployment::new();
3515
3516 let mut flow = FlowBuilder::new();
3517 let node = flow.process::<()>();
3518 let external = flow.external::<()>();
3519
3520 let (input_port, input) =
3521 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3522
3523 let out = sliced! {
3524 let input = use(input, nondet!());
3525 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!());
3526 input.cross_singleton(v.into_stream().count())
3527 }
3528 .send_bincode_external(&external);
3529
3530 let nodes = flow
3531 .with_process(&node, deployment.Localhost())
3532 .with_external(&external, deployment.Localhost())
3533 .deploy(&mut deployment);
3534
3535 deployment.deploy().await.unwrap();
3536
3537 let mut external_in = nodes.connect(input_port).await;
3538 let mut external_out = nodes.connect(out).await;
3539
3540 deployment.start().await.unwrap();
3541
3542 external_in.send(1).await.unwrap();
3543 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3544
3545 external_in.send(2).await.unwrap();
3546 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3547 }
3548
3549 #[cfg(feature = "deploy")]
3550 #[tokio::test]
3551 async fn atomic_fold_replays_each_tick() {
3552 let mut deployment = Deployment::new();
3553
3554 let mut flow = FlowBuilder::new();
3555 let node = flow.process::<()>();
3556 let external = flow.external::<()>();
3557
3558 let (input_port, input) =
3559 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3560 let tick = node.tick();
3561
3562 let out = input
3563 .batch(&tick, nondet!())
3564 .cross_singleton(
3565 node.source_iter(q!(vec![1, 2, 3]))
3566 .atomic()
3567 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3568 .snapshot_atomic(&tick, nondet!()),
3569 )
3570 .all_ticks()
3571 .send_bincode_external(&external);
3572
3573 let nodes = flow
3574 .with_process(&node, deployment.Localhost())
3575 .with_external(&external, deployment.Localhost())
3576 .deploy(&mut deployment);
3577
3578 deployment.deploy().await.unwrap();
3579
3580 let mut external_in = nodes.connect(input_port).await;
3581 let mut external_out = nodes.connect(out).await;
3582
3583 deployment.start().await.unwrap();
3584
3585 external_in.send(1).await.unwrap();
3586 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3587
3588 external_in.send(2).await.unwrap();
3589 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3590 }
3591
3592 #[cfg(feature = "deploy")]
3593 #[tokio::test]
3594 async fn unbounded_scan_remembers_state() {
3595 let mut deployment = Deployment::new();
3596
3597 let mut flow = FlowBuilder::new();
3598 let node = flow.process::<()>();
3599 let external = flow.external::<()>();
3600
3601 let (input_port, input) = node.source_external_bincode(&external);
3602 let out = input
3603 .scan(
3604 q!(|| 0),
3605 q!(|acc, v| {
3606 *acc += v;
3607 Some(*acc)
3608 }),
3609 )
3610 .send_bincode_external(&external);
3611
3612 let nodes = flow
3613 .with_process(&node, deployment.Localhost())
3614 .with_external(&external, deployment.Localhost())
3615 .deploy(&mut deployment);
3616
3617 deployment.deploy().await.unwrap();
3618
3619 let mut external_in = nodes.connect(input_port).await;
3620 let mut external_out = nodes.connect(out).await;
3621
3622 deployment.start().await.unwrap();
3623
3624 external_in.send(1).await.unwrap();
3625 assert_eq!(external_out.next().await.unwrap(), 1);
3626
3627 external_in.send(2).await.unwrap();
3628 assert_eq!(external_out.next().await.unwrap(), 3);
3629 }
3630
3631 #[cfg(feature = "deploy")]
3632 #[tokio::test]
3633 async fn unbounded_enumerate_remembers_state() {
3634 let mut deployment = Deployment::new();
3635
3636 let mut flow = FlowBuilder::new();
3637 let node = flow.process::<()>();
3638 let external = flow.external::<()>();
3639
3640 let (input_port, input) = node.source_external_bincode(&external);
3641 let out = input.enumerate().send_bincode_external(&external);
3642
3643 let nodes = flow
3644 .with_process(&node, deployment.Localhost())
3645 .with_external(&external, deployment.Localhost())
3646 .deploy(&mut deployment);
3647
3648 deployment.deploy().await.unwrap();
3649
3650 let mut external_in = nodes.connect(input_port).await;
3651 let mut external_out = nodes.connect(out).await;
3652
3653 deployment.start().await.unwrap();
3654
3655 external_in.send(1).await.unwrap();
3656 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3657
3658 external_in.send(2).await.unwrap();
3659 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3660 }
3661
3662 #[cfg(feature = "deploy")]
3663 #[tokio::test]
3664 async fn unbounded_unique_remembers_state() {
3665 let mut deployment = Deployment::new();
3666
3667 let mut flow = FlowBuilder::new();
3668 let node = flow.process::<()>();
3669 let external = flow.external::<()>();
3670
3671 let (input_port, input) =
3672 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3673 let out = input.unique().send_bincode_external(&external);
3674
3675 let nodes = flow
3676 .with_process(&node, deployment.Localhost())
3677 .with_external(&external, deployment.Localhost())
3678 .deploy(&mut deployment);
3679
3680 deployment.deploy().await.unwrap();
3681
3682 let mut external_in = nodes.connect(input_port).await;
3683 let mut external_out = nodes.connect(out).await;
3684
3685 deployment.start().await.unwrap();
3686
3687 external_in.send(1).await.unwrap();
3688 assert_eq!(external_out.next().await.unwrap(), 1);
3689
3690 external_in.send(2).await.unwrap();
3691 assert_eq!(external_out.next().await.unwrap(), 2);
3692
3693 external_in.send(1).await.unwrap();
3694 external_in.send(3).await.unwrap();
3695 assert_eq!(external_out.next().await.unwrap(), 3);
3696 }
3697
3698 #[cfg(feature = "sim")]
3699 #[test]
3700 #[should_panic]
3701 fn sim_batch_nondet_size() {
3702 let mut flow = FlowBuilder::new();
3703 let node = flow.process::<()>();
3704
3705 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3706
3707 let tick = node.tick();
3708 let out_recv = input
3709 .batch(&tick, nondet!())
3710 .count()
3711 .all_ticks()
3712 .sim_output();
3713
3714 flow.sim().exhaustive(async || {
3715 in_send.send(());
3716 in_send.send(());
3717 in_send.send(());
3718
3719 assert_eq!(out_recv.next().await.unwrap(), 3); });
3721 }
3722
3723 #[cfg(feature = "sim")]
3724 #[test]
3725 fn sim_batch_preserves_order() {
3726 let mut flow = FlowBuilder::new();
3727 let node = flow.process::<()>();
3728
3729 let (in_send, input) = node.sim_input();
3730
3731 let tick = node.tick();
3732 let out_recv = input
3733 .batch(&tick, nondet!())
3734 .all_ticks()
3735 .sim_output();
3736
3737 flow.sim().exhaustive(async || {
3738 in_send.send(1);
3739 in_send.send(2);
3740 in_send.send(3);
3741
3742 out_recv.assert_yields_only([1, 2, 3]).await;
3743 });
3744 }
3745
3746 #[cfg(feature = "sim")]
3747 #[test]
3748 #[should_panic]
3749 fn sim_batch_unordered_shuffles() {
3750 let mut flow = FlowBuilder::new();
3751 let node = flow.process::<()>();
3752
3753 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3754
3755 let tick = node.tick();
3756 let batch = input.batch(&tick, nondet!());
3757 let out_recv = batch
3758 .clone()
3759 .min()
3760 .zip(batch.max())
3761 .all_ticks()
3762 .sim_output();
3763
3764 flow.sim().exhaustive(async || {
3765 in_send.send_many_unordered([1, 2, 3]);
3766
3767 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3768 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3769 }
3770 });
3771 }
3772
3773 #[cfg(feature = "sim")]
3774 #[test]
3775 fn sim_batch_unordered_shuffles_count() {
3776 let mut flow = FlowBuilder::new();
3777 let node = flow.process::<()>();
3778
3779 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3780
3781 let tick = node.tick();
3782 let batch = input.batch(&tick, nondet!());
3783 let out_recv = batch.all_ticks().sim_output();
3784
3785 let instance_count = flow.sim().exhaustive(async || {
3786 in_send.send_many_unordered([1, 2, 3, 4]);
3787 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3788 });
3789
3790 assert_eq!(
3791 instance_count,
3792 75 )
3794 }
3795
3796 #[cfg(feature = "sim")]
3797 #[test]
3798 #[should_panic]
3799 fn sim_observe_order_batched() {
3800 let mut flow = FlowBuilder::new();
3801 let node = flow.process::<()>();
3802
3803 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3804
3805 let tick = node.tick();
3806 let batch = input.batch(&tick, nondet!());
3807 let out_recv = batch
3808 .assume_ordering::<TotalOrder>(nondet!())
3809 .all_ticks()
3810 .sim_output();
3811
3812 flow.sim().exhaustive(async || {
3813 in_send.send_many_unordered([1, 2, 3, 4]);
3814 out_recv.assert_yields_only([1, 2, 3, 4]).await; });
3816 }
3817
3818 #[cfg(feature = "sim")]
3819 #[test]
3820 fn sim_observe_order_batched_count() {
3821 let mut flow = FlowBuilder::new();
3822 let node = flow.process::<()>();
3823
3824 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3825
3826 let tick = node.tick();
3827 let batch = input.batch(&tick, nondet!());
3828 let out_recv = batch
3829 .assume_ordering::<TotalOrder>(nondet!())
3830 .all_ticks()
3831 .sim_output();
3832
3833 let instance_count = flow.sim().exhaustive(async || {
3834 in_send.send_many_unordered([1, 2, 3, 4]);
3835 let _ = out_recv.collect::<Vec<_>>().await;
3836 });
3837
3838 assert_eq!(
3839 instance_count,
3840 192 )
3842 }
3843
3844 #[cfg(feature = "sim")]
3845 #[test]
3846 fn sim_unordered_count_instance_count() {
3847 let mut flow = FlowBuilder::new();
3848 let node = flow.process::<()>();
3849
3850 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3851
3852 let tick = node.tick();
3853 let out_recv = input
3854 .count()
3855 .snapshot(&tick, nondet!())
3856 .all_ticks()
3857 .sim_output();
3858
3859 let instance_count = flow.sim().exhaustive(async || {
3860 in_send.send_many_unordered([1, 2, 3, 4]);
3861 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3862 });
3863
3864 assert_eq!(
3865 instance_count,
3866 16 )
3868 }
3869
3870 #[cfg(feature = "sim")]
3871 #[test]
3872 fn sim_top_level_assume_ordering() {
3873 let mut flow = FlowBuilder::new();
3874 let node = flow.process::<()>();
3875
3876 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3877
3878 let out_recv = input
3879 .assume_ordering::<TotalOrder>(nondet!())
3880 .sim_output();
3881
3882 let instance_count = flow.sim().exhaustive(async || {
3883 in_send.send_many_unordered([1, 2, 3]);
3884 let mut out = out_recv.collect::<Vec<_>>().await;
3885 out.sort();
3886 assert_eq!(out, vec![1, 2, 3]);
3887 });
3888
3889 assert_eq!(instance_count, 6)
3890 }
3891
3892 #[cfg(feature = "sim")]
3893 #[test]
3894 fn sim_top_level_assume_ordering_cycle_back() {
3895 let mut flow = FlowBuilder::new();
3896 let node = flow.process::<()>();
3897 let node2 = flow.process::<()>();
3898
3899 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3900
3901 let (complete_cycle_back, cycle_back) =
3902 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3903 let ordered = input
3904 .merge_unordered(cycle_back)
3905 .assume_ordering::<TotalOrder>(nondet!());
3906 complete_cycle_back.complete(
3907 ordered
3908 .clone()
3909 .map(q!(|v| v + 1))
3910 .filter(q!(|v| v % 2 == 1))
3911 .send(&node2, TCP.fail_stop().bincode())
3912 .send(&node, TCP.fail_stop().bincode()),
3913 );
3914
3915 let out_recv = ordered.sim_output();
3916
3917 let mut saw = false;
3918 let instance_count = flow.sim().exhaustive(async || {
3919 in_send.send_many_unordered([0, 2]);
3920 let out = out_recv.collect::<Vec<_>>().await;
3921
3922 if out.starts_with(&[0, 1, 2]) {
3923 saw = true;
3924 }
3925 });
3926
3927 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3928 assert_eq!(instance_count, 6);
3929 }
3930
3931 #[cfg(feature = "sim")]
3932 #[test]
3933 fn sim_top_level_assume_ordering_cycle_back_tick() {
3934 let mut flow = FlowBuilder::new();
3935 let node = flow.process::<()>();
3936 let node2 = flow.process::<()>();
3937
3938 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3939
3940 let (complete_cycle_back, cycle_back) =
3941 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3942 let ordered = input
3943 .merge_unordered(cycle_back)
3944 .assume_ordering::<TotalOrder>(nondet!());
3945 complete_cycle_back.complete(
3946 ordered
3947 .clone()
3948 .batch(&node.tick(), nondet!())
3949 .all_ticks()
3950 .map(q!(|v| v + 1))
3951 .filter(q!(|v| v % 2 == 1))
3952 .send(&node2, TCP.fail_stop().bincode())
3953 .send(&node, TCP.fail_stop().bincode()),
3954 );
3955
3956 let out_recv = ordered.sim_output();
3957
3958 let mut saw = false;
3959 let instance_count = flow.sim().exhaustive(async || {
3960 in_send.send_many_unordered([0, 2]);
3961 let out = out_recv.collect::<Vec<_>>().await;
3962
3963 if out.starts_with(&[0, 1, 2]) {
3964 saw = true;
3965 }
3966 });
3967
3968 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3969 assert_eq!(instance_count, 58);
3970 }
3971
3972 #[cfg(feature = "sim")]
3973 #[test]
3974 fn sim_top_level_assume_ordering_multiple() {
3975 let mut flow = FlowBuilder::new();
3976 let node = flow.process::<()>();
3977 let node2 = flow.process::<()>();
3978
3979 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3980 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3981
3982 let (complete_cycle_back, cycle_back) =
3983 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3984 let input1_ordered = input
3985 .clone()
3986 .merge_unordered(cycle_back)
3987 .assume_ordering::<TotalOrder>(nondet!());
3988 let foo = input1_ordered
3989 .clone()
3990 .map(q!(|v| v + 3))
3991 .weaken_ordering::<NoOrder>()
3992 .merge_unordered(input2)
3993 .assume_ordering::<TotalOrder>(nondet!());
3994
3995 complete_cycle_back.complete(
3996 foo.filter(q!(|v| *v == 3))
3997 .send(&node2, TCP.fail_stop().bincode())
3998 .send(&node, TCP.fail_stop().bincode()),
3999 );
4000
4001 let out_recv = input1_ordered.sim_output();
4002
4003 let mut saw = false;
4004 let instance_count = flow.sim().exhaustive(async || {
4005 in_send.send_many_unordered([0, 1]);
4006 let out = out_recv.collect::<Vec<_>>().await;
4007
4008 if out.starts_with(&[0, 3, 1]) {
4009 saw = true;
4010 }
4011 });
4012
4013 assert!(saw, "did not see an instance with 0, 3, 1 in order");
4014 assert_eq!(instance_count, 24);
4015 }
4016
4017 #[cfg(feature = "sim")]
4018 #[test]
4019 fn sim_atomic_assume_ordering_cycle_back() {
4020 let mut flow = FlowBuilder::new();
4021 let node = flow.process::<()>();
4022 let node2 = flow.process::<()>();
4023
4024 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
4025
4026 let (complete_cycle_back, cycle_back) =
4027 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
4028 let ordered = input
4029 .merge_unordered(cycle_back)
4030 .atomic()
4031 .assume_ordering::<TotalOrder>(nondet!())
4032 .end_atomic();
4033 complete_cycle_back.complete(
4034 ordered
4035 .clone()
4036 .map(q!(|v| v + 1))
4037 .filter(q!(|v| v % 2 == 1))
4038 .send(&node2, TCP.fail_stop().bincode())
4039 .send(&node, TCP.fail_stop().bincode()),
4040 );
4041
4042 let out_recv = ordered.sim_output();
4043
4044 let instance_count = flow.sim().exhaustive(async || {
4045 in_send.send_many_unordered([0, 2]);
4046 let out = out_recv.collect::<Vec<_>>().await;
4047 assert_eq!(out.len(), 4);
4048 });
4049 assert_eq!(instance_count, 22);
4050 }
4051
4052 #[cfg(feature = "deploy")]
4053 #[tokio::test]
4054 async fn partition_evens_odds() {
4055 let mut deployment = Deployment::new();
4056
4057 let mut flow = FlowBuilder::new();
4058 let node = flow.process::<()>();
4059 let external = flow.external::<()>();
4060
4061 let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
4062 let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
4063 let evens_port = evens.send_bincode_external(&external);
4064 let odds_port = odds.send_bincode_external(&external);
4065
4066 let nodes = flow
4067 .with_process(&node, deployment.Localhost())
4068 .with_external(&external, deployment.Localhost())
4069 .deploy(&mut deployment);
4070
4071 deployment.deploy().await.unwrap();
4072
4073 let mut evens_out = nodes.connect(evens_port).await;
4074 let mut odds_out = nodes.connect(odds_port).await;
4075
4076 deployment.start().await.unwrap();
4077
4078 let mut even_results = Vec::new();
4079 for _ in 0..3 {
4080 even_results.push(evens_out.next().await.unwrap());
4081 }
4082 even_results.sort();
4083 assert_eq!(even_results, vec![2, 4, 6]);
4084
4085 let mut odd_results = Vec::new();
4086 for _ in 0..3 {
4087 odd_results.push(odds_out.next().await.unwrap());
4088 }
4089 odd_results.sort();
4090 assert_eq!(odd_results, vec![1, 3, 5]);
4091 }
4092
4093 #[cfg(feature = "deploy")]
4094 #[tokio::test]
4095 async fn unconsumed_inspect_still_runs() {
4096 use crate::deploy::DeployCrateWrapper;
4097
4098 let mut deployment = Deployment::new();
4099
4100 let mut flow = FlowBuilder::new();
4101 let node = flow.process::<()>();
4102
4103 node.source_iter(q!(0..5))
4106 .inspect(q!(|x| println!("inspect: {}", x)));
4107
4108 let nodes = flow
4109 .with_process(&node, deployment.Localhost())
4110 .deploy(&mut deployment);
4111
4112 deployment.deploy().await.unwrap();
4113
4114 let mut stdout = nodes.get_process(&node).stdout();
4115
4116 deployment.start().await.unwrap();
4117
4118 let mut lines = Vec::new();
4119 for _ in 0..5 {
4120 lines.push(stdout.recv().await.unwrap());
4121 }
4122 lines.sort();
4123 assert_eq!(
4124 lines,
4125 vec![
4126 "inspect: 0",
4127 "inspect: 1",
4128 "inspect: 2",
4129 "inspect: 3",
4130 "inspect: 4",
4131 ]
4132 );
4133 }
4134
4135 #[cfg(feature = "sim")]
4136 #[test]
4137 fn sim_limit() {
4138 let mut flow = FlowBuilder::new();
4139 let node = flow.process::<()>();
4140
4141 let (in_send, input) = node.sim_input();
4142
4143 let out_recv = input.limit(q!(3)).sim_output();
4144
4145 flow.sim().exhaustive(async || {
4146 in_send.send(1);
4147 in_send.send(2);
4148 in_send.send(3);
4149 in_send.send(4);
4150 in_send.send(5);
4151
4152 out_recv.assert_yields_only([1, 2, 3]).await;
4153 });
4154 }
4155
4156 #[cfg(feature = "sim")]
4157 #[test]
4158 fn sim_limit_zero() {
4159 let mut flow = FlowBuilder::new();
4160 let node = flow.process::<()>();
4161
4162 let (in_send, input) = node.sim_input();
4163
4164 let out_recv = input.limit(q!(0)).sim_output();
4165
4166 flow.sim().exhaustive(async || {
4167 in_send.send(1);
4168 in_send.send(2);
4169
4170 out_recv.assert_yields_only::<i32, _>([]).await;
4171 });
4172 }
4173
4174 #[cfg(feature = "sim")]
4175 #[test]
4176 fn sim_merge_ordered() {
4177 let mut flow = FlowBuilder::new();
4178 let node = flow.process::<()>();
4179
4180 let (in_send, input) = node.sim_input();
4181 let (in_send2, input2) = node.sim_input();
4182
4183 let out_recv = input
4184 .merge_ordered(input2, nondet!())
4185 .sim_output();
4186
4187 let mut saw_out_of_order = false;
4188 let instances = flow.sim().exhaustive(async || {
4189 in_send.send(1);
4190 in_send.send(2);
4191 in_send2.send(3);
4192 in_send2.send(4);
4193
4194 let out = out_recv.collect::<Vec<_>>().await;
4195
4196 if out == [1, 3, 2, 4] {
4197 saw_out_of_order = true;
4198 }
4199
4200 let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
4203 let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
4204 assert_eq!(
4205 first_elements,
4206 vec![1, 2],
4207 "first input order violated: {:?}",
4208 out
4209 );
4210 assert_eq!(
4211 second_elements,
4212 vec![3, 4],
4213 "second input order violated: {:?}",
4214 out
4215 );
4216
4217 first_elements.append(&mut second_elements);
4218 first_elements.sort();
4219 assert_eq!(first_elements, vec![1, 2, 3, 4]);
4220 });
4221
4222 assert!(saw_out_of_order);
4223 assert_eq!(instances, 6);
4224 }
4225
4226 #[cfg(feature = "sim")]
4229 #[test]
4230 fn sim_merge_ordered_one_empty() {
4231 let mut flow = FlowBuilder::new();
4232 let node = flow.process::<()>();
4233
4234 let (in_send, input) = node.sim_input();
4235 let (_in_send2, input2) = node.sim_input();
4236
4237 let out_recv = input
4238 .merge_ordered(input2, nondet!())
4239 .sim_output();
4240
4241 let instances = flow.sim().exhaustive(async || {
4242 in_send.send(1);
4243 in_send.send(2);
4244
4245 let out = out_recv.collect::<Vec<_>>().await;
4246 assert_eq!(out, vec![1, 2]);
4247 });
4248
4249 assert_eq!(instances, 1);
4251 }
4252
4253 #[cfg(feature = "sim")]
4259 #[test]
4260 fn sim_merge_ordered_cycle_back() {
4261 let mut flow = FlowBuilder::new();
4262 let node = flow.process::<()>();
4263
4264 let (in_send, input) = node.sim_input();
4265
4266 let (complete_cycle_back, cycle_back) =
4268 node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4269
4270 let merged = input.merge_ordered(cycle_back, nondet!());
4272
4273 complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4275
4276 let out_recv = merged.sim_output();
4277
4278 let mut saw_cycle_before_second = false;
4281 flow.sim().exhaustive(async || {
4282 in_send.send(1);
4283 in_send.send(2);
4284
4285 let out = out_recv.collect::<Vec<_>>().await;
4286
4287 let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4289 let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4290 assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4291
4292 if out == [1, 10, 2] {
4294 saw_cycle_before_second = true;
4295 }
4296
4297 let mut sorted = out;
4298 sorted.sort();
4299 assert_eq!(sorted, vec![1, 2, 10]);
4300 });
4301
4302 assert!(
4303 saw_cycle_before_second,
4304 "never saw the cycled element arrive before the second input element"
4305 );
4306 }
4307
4308 #[cfg(feature = "sim")]
4312 #[test]
4313 fn sim_merge_ordered_delayed() {
4314 let mut flow = FlowBuilder::new();
4315 let node = flow.process::<()>();
4316
4317 let (in_send, input) = node.sim_input();
4318 let (in_send2, input2) = node.sim_input();
4319
4320 let out_recv = input
4321 .merge_ordered(input2, nondet!())
4322 .sim_output();
4323
4324 let mut saw_delayed_interleaving = false;
4325 flow.sim().exhaustive(async || {
4326 in_send.send(1);
4328 in_send2.send(3);
4329 in_send2.send(4);
4330
4331 let first_batch = out_recv.collect::<Vec<_>>().await;
4333
4334 in_send.send(2);
4336 let second_batch = out_recv.collect::<Vec<_>>().await;
4337
4338 let mut all: Vec<_> = first_batch
4339 .iter()
4340 .chain(second_batch.iter())
4341 .copied()
4342 .collect();
4343
4344 if all == [1, 3, 4, 2] {
4346 saw_delayed_interleaving = true;
4347 }
4348
4349 all.sort();
4350 assert_eq!(all, vec![1, 2, 3, 4]);
4351 });
4352
4353 assert!(saw_delayed_interleaving);
4354 }
4355
4356 #[cfg(feature = "deploy")]
4361 #[tokio::test]
4362 async fn deploy_merge_ordered_delayed() {
4363 let mut deployment = Deployment::new();
4364
4365 let mut flow = FlowBuilder::new();
4366 let node = flow.process::<()>();
4367 let external = flow.external::<()>();
4368
4369 let (input_a_port, input_a) = node.source_external_bincode(&external);
4370 let (input_b_port, input_b) = node.source_external_bincode(&external);
4371
4372 let out = input_a
4373 .assume_ordering(nondet!())
4374 .merge_ordered(
4375 input_b.assume_ordering(nondet!()),
4376 nondet!(),
4377 )
4378 .send_bincode_external(&external);
4379
4380 let nodes = flow
4381 .with_process(&node, deployment.Localhost())
4382 .with_external(&external, deployment.Localhost())
4383 .deploy(&mut deployment);
4384
4385 deployment.deploy().await.unwrap();
4386
4387 let mut ext_a = nodes.connect(input_a_port).await;
4388 let mut ext_b = nodes.connect(input_b_port).await;
4389 let mut ext_out = nodes.connect(out).await;
4390
4391 deployment.start().await.unwrap();
4392
4393 ext_a.send(1).await.unwrap();
4395 ext_b.send(3).await.unwrap();
4396 ext_b.send(4).await.unwrap();
4397
4398 let mut received = Vec::new();
4400 for _ in 0..3 {
4401 received.push(ext_out.next().await.unwrap());
4402 }
4403
4404 ext_a.send(2).await.unwrap();
4406 received.push(ext_out.next().await.unwrap());
4407
4408 received.sort();
4410 assert_eq!(received, vec![1, 2, 3, 4]);
4411 }
4412
4413 #[cfg(feature = "deploy")]
4414 #[tokio::test]
4415 async fn monotone_fold_threshold() {
4416 use crate::properties::manual_proof;
4417
4418 let mut deployment = Deployment::new();
4419
4420 let mut flow = FlowBuilder::new();
4421 let node = flow.process::<()>();
4422 let external = flow.external::<()>();
4423
4424 let in_unbounded: super::Stream<_, _> =
4425 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4426 let sum = in_unbounded.fold(
4427 q!(|| 0),
4428 q!(
4429 |sum, v| {
4430 *sum += v;
4431 },
4432 monotone = manual_proof!()
4433 ),
4434 );
4435
4436 let threshold_out = sum
4437 .threshold_greater_or_equal(node.singleton(q!(7)))
4438 .send_bincode_external(&external);
4439
4440 let nodes = flow
4441 .with_process(&node, deployment.Localhost())
4442 .with_external(&external, deployment.Localhost())
4443 .deploy(&mut deployment);
4444
4445 deployment.deploy().await.unwrap();
4446
4447 let mut threshold_out = nodes.connect(threshold_out).await;
4448
4449 deployment.start().await.unwrap();
4450
4451 assert_eq!(threshold_out.next().await.unwrap(), 7);
4452 }
4453
4454 #[cfg(feature = "deploy")]
4455 #[tokio::test]
4456 async fn monotone_count_threshold() {
4457 let mut deployment = Deployment::new();
4458
4459 let mut flow = FlowBuilder::new();
4460 let node = flow.process::<()>();
4461 let external = flow.external::<()>();
4462
4463 let in_unbounded: super::Stream<_, _> =
4464 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4465 let sum = in_unbounded.count();
4466
4467 let threshold_out = sum
4468 .threshold_greater_or_equal(node.singleton(q!(3)))
4469 .send_bincode_external(&external);
4470
4471 let nodes = flow
4472 .with_process(&node, deployment.Localhost())
4473 .with_external(&external, deployment.Localhost())
4474 .deploy(&mut deployment);
4475
4476 deployment.deploy().await.unwrap();
4477
4478 let mut threshold_out = nodes.connect(threshold_out).await;
4479
4480 deployment.start().await.unwrap();
4481
4482 assert_eq!(threshold_out.next().await.unwrap(), 3);
4483 }
4484
4485 #[cfg(feature = "deploy")]
4486 #[tokio::test]
4487 async fn monotone_map_order_preserving_threshold() {
4488 use crate::properties::manual_proof;
4489
4490 let mut deployment = Deployment::new();
4491
4492 let mut flow = FlowBuilder::new();
4493 let node = flow.process::<()>();
4494 let external = flow.external::<()>();
4495
4496 let in_unbounded: super::Stream<_, _> =
4497 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4498 let sum = in_unbounded.fold(
4499 q!(|| 0),
4500 q!(
4501 |sum, v| {
4502 *sum += v;
4503 },
4504 monotone = manual_proof!()
4505 ),
4506 );
4507
4508 let doubled = sum.map(q!(
4510 |v| v * 2,
4511 order_preserving = manual_proof!()
4512 ));
4513
4514 let threshold_out = doubled
4515 .threshold_greater_or_equal(node.singleton(q!(14)))
4516 .send_bincode_external(&external);
4517
4518 let nodes = flow
4519 .with_process(&node, deployment.Localhost())
4520 .with_external(&external, deployment.Localhost())
4521 .deploy(&mut deployment);
4522
4523 deployment.deploy().await.unwrap();
4524
4525 let mut threshold_out = nodes.connect(threshold_out).await;
4526
4527 deployment.start().await.unwrap();
4528
4529 assert_eq!(threshold_out.next().await.unwrap(), 14);
4530 }
4531
4532 #[cfg(any(feature = "deploy", feature = "sim"))]
4535 mod join_ordering_type_tests {
4536 use crate::live_collections::boundedness::{Bounded, Unbounded};
4537 use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4538 use crate::location::{Location, Process};
4539
4540 #[expect(dead_code, reason = "compile-time type test")]
4541 fn join_unbounded_with_bounded_preserves_order<'a>(
4542 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4543 right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4544 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4545 left.join(right)
4546 }
4547
4548 #[expect(dead_code, reason = "compile-time type test")]
4549 fn join_unbounded_with_unbounded_is_no_order<'a>(
4550 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4551 right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4552 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4553 left.join(right)
4554 }
4555
4556 #[expect(dead_code, reason = "compile-time type test")]
4557 fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4558 left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4559 right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4560 ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4561 left.join(right)
4562 }
4563
4564 #[expect(dead_code, reason = "compile-time type test")]
4565 fn join_unbounded_noorder_with_bounded<'a>(
4566 left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4567 right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4568 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4569 left.join(right)
4570 }
4571
4572 #[expect(dead_code, reason = "compile-time type test")]
4575 fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4576 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4577 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4578 ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4579 left.cross_product(right)
4580 }
4581
4582 #[expect(dead_code, reason = "compile-time type test")]
4583 fn cross_product_bounded_with_bounded_preserves_order<'a>(
4584 left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4585 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4586 ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4587 left.cross_product(right)
4588 }
4589
4590 #[expect(dead_code, reason = "compile-time type test")]
4591 fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4592 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4593 right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4594 ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4595 left.cross_product(right)
4596 }
4597 } #[cfg(feature = "sim")]
4602 #[test]
4603 fn cross_product_mixed_boundedness_correctness() {
4604 use stageleft::q;
4605
4606 use crate::compile::builder::FlowBuilder;
4607 use crate::nondet::nondet;
4608
4609 let mut flow = FlowBuilder::new();
4610 let process = flow.process::<()>();
4611 let tick = process.tick();
4612
4613 let left = process.source_iter(q!(vec![1, 2]));
4614 let right = process
4615 .source_iter(q!(vec!['a', 'b']))
4616 .batch(&tick, nondet!())
4617 .all_ticks();
4618
4619 let out = left.cross_product(right).sim_output();
4620
4621 flow.sim().exhaustive(async || {
4622 out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4623 .await;
4624 });
4625 }
4626
4627 #[cfg(feature = "sim")]
4628 #[test]
4629 fn join_mixed_boundedness_correctness() {
4630 use stageleft::q;
4631
4632 use crate::compile::builder::FlowBuilder;
4633 use crate::nondet::nondet;
4634
4635 let mut flow = FlowBuilder::new();
4636 let process = flow.process::<()>();
4637 let tick = process.tick();
4638
4639 let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4640 let right = process
4641 .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4642 .batch(&tick, nondet!())
4643 .all_ticks();
4644
4645 let out = left.join(right).sim_output();
4646
4647 flow.sim().exhaustive(async || {
4648 out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4649 .await;
4650 });
4651 }
4652
4653 #[cfg(feature = "sim")]
4654 #[test]
4655 fn sim_merge_unordered_independent_atomics() {
4656 let mut flow = FlowBuilder::new();
4657 let node = flow.process::<()>();
4658
4659 let (in1_send, input1) = node.sim_input::<_, TotalOrder, _>();
4660 let (in2_send, input2) = node.sim_input::<_, TotalOrder, _>();
4661
4662 let out = input1
4663 .atomic()
4664 .merge_unordered(input2.atomic())
4665 .end_atomic()
4666 .sim_output();
4667
4668 flow.sim().exhaustive(async || {
4669 in1_send.send(1);
4670 in2_send.send(2);
4671
4672 out.assert_yields_only_unordered(vec![1, 2]).await;
4673 });
4674 }
4675
4676 #[cfg(feature = "deploy")]
4677 #[tokio::test]
4678 async fn test_stream_ref() {
4679 let mut deployment = Deployment::new();
4680
4681 let mut flow = FlowBuilder::new();
4682 let external = flow.external::<()>();
4683 let p1 = flow.process::<()>();
4684
4685 let my_stream = p1.source_iter(q!(1..=5i32));
4687
4688 let stream_ref = my_stream.by_ref();
4689
4690 let out_port = p1
4692 .source_iter(q!([()]))
4693 .map(q!(|_| stream_ref.len() as i32))
4694 .send_bincode_external(&external);
4695
4696 my_stream.for_each(q!(|_| {}));
4698
4699 let nodes = flow
4700 .with_default_optimize()
4701 .with_process(&p1, deployment.Localhost())
4702 .with_external(&external, deployment.Localhost())
4703 .deploy(&mut deployment);
4704
4705 deployment.deploy().await.unwrap();
4706
4707 let mut out_recv = nodes.connect(out_port).await;
4708
4709 deployment.start().await.unwrap();
4710
4711 let result = out_recv.next().await.unwrap();
4712 assert_eq!(result, 5);
4714 }
4715
4716 #[cfg(feature = "deploy")]
4717 #[tokio::test]
4718 async fn test_stream_ref_contents() {
4719 let mut deployment = Deployment::new();
4720
4721 let mut flow = FlowBuilder::new();
4722 let external = flow.external::<()>();
4723 let p1 = flow.process::<()>();
4724
4725 let my_stream = p1.source_iter(q!(1..=3i32));
4727
4728 let stream_ref = my_stream.by_ref();
4729
4730 let out_port = p1
4732 .source_iter(q!([()]))
4733 .map(q!(|_| stream_ref.iter().sum::<i32>()))
4734 .send_bincode_external(&external);
4735
4736 my_stream.for_each(q!(|_| {}));
4737
4738 let nodes = flow
4739 .with_default_optimize()
4740 .with_process(&p1, deployment.Localhost())
4741 .with_external(&external, deployment.Localhost())
4742 .deploy(&mut deployment);
4743
4744 deployment.deploy().await.unwrap();
4745
4746 let mut out_recv = nodes.connect(out_port).await;
4747
4748 deployment.start().await.unwrap();
4749
4750 let result = out_recv.next().await.unwrap();
4751 assert_eq!(result, 6);
4753 }
4754
4755 #[cfg(feature = "deploy")]
4756 #[tokio::test]
4757 async fn test_stream_ref_no_consumer() {
4758 let mut deployment = Deployment::new();
4759
4760 let mut flow = FlowBuilder::new();
4761 let external = flow.external::<()>();
4762 let p1 = flow.process::<()>();
4763
4764 let my_stream = p1.source_iter(q!(1..=4i32));
4766
4767 let stream_ref = my_stream.by_ref();
4768
4769 let out_port = p1
4770 .source_iter(q!([()]))
4771 .map(q!(|_| stream_ref.len() as i32))
4772 .send_bincode_external(&external);
4773
4774 let nodes = flow
4775 .with_default_optimize()
4776 .with_process(&p1, deployment.Localhost())
4777 .with_external(&external, deployment.Localhost())
4778 .deploy(&mut deployment);
4779
4780 deployment.deploy().await.unwrap();
4781
4782 let mut out_recv = nodes.connect(out_port).await;
4783
4784 deployment.start().await.unwrap();
4785
4786 let result = out_recv.next().await.unwrap();
4787 assert_eq!(result, 4);
4788 }
4789
4790 #[cfg(feature = "deploy")]
4791 #[tokio::test]
4792 async fn test_stream_mut() {
4793 let mut deployment = Deployment::new();
4794
4795 let mut flow = FlowBuilder::new();
4796 let external = flow.external::<()>();
4797 let p1 = flow.process::<()>();
4798
4799 let my_stream = p1.source_iter(q!(1..=5i32));
4801
4802 let stream_mut = my_stream.by_mut();
4803
4804 let out_port = p1
4806 .source_iter(q!([()]))
4807 .map(q!(|_| {
4808 stream_mut.retain(|x| *x > 3);
4809 stream_mut.len() as i32
4810 }))
4811 .send_bincode_external(&external);
4812
4813 my_stream.for_each(q!(|_| {}));
4814
4815 let nodes = flow
4816 .with_default_optimize()
4817 .with_process(&p1, deployment.Localhost())
4818 .with_external(&external, deployment.Localhost())
4819 .deploy(&mut deployment);
4820
4821 deployment.deploy().await.unwrap();
4822
4823 let mut out_recv = nodes.connect(out_port).await;
4824
4825 deployment.start().await.unwrap();
4826
4827 let result = out_recv.next().await.unwrap();
4828 assert_eq!(result, 2);
4830 }
4831
4832 #[cfg(feature = "sim")]
4836 #[test]
4837 fn sim_map_with_mut_on_unordered_explores_multiple_states() {
4838 use crate::live_collections::sliced::sliced;
4839 use crate::live_collections::stream::ExactlyOnce;
4840 use crate::properties::manual_proof;
4841
4842 let mut flow = FlowBuilder::new();
4843 let node = flow.process::<()>();
4844
4845 let (trigger_send, trigger) = node.sim_input::<i32, TotalOrder, ExactlyOnce>();
4846
4847 let out_recv = sliced! {
4848 let batch = use(trigger, nondet!());
4849 let counter = batch.location().source_iter(q!(vec![0i32]))
4850 .fold(q!(|| 0i32), q!(|acc, v| *acc += v));
4851 let counter_mut = counter.by_mut();
4852 let items = batch.location().source_iter(q!(vec![1i32, 2])).weaken_ordering::<NoOrder>();
4853 items.map(q!(
4854 |x| {
4855 *counter_mut += x;
4856 *counter_mut
4857 },
4858 commutative = manual_proof!()
4859 ))
4860 }
4861 .sim_output();
4862
4863 let count = flow.sim().exhaustive(async || {
4864 trigger_send.send(1);
4865 let _all: Vec<i32> = out_recv.collect_sorted().await;
4866 });
4867
4868 assert_eq!(
4869 count, 2,
4870 "Expected 2 simulation instances due to mut on unordered input, got {}",
4871 count
4872 );
4873 }
4874
4875 #[cfg(feature = "sim")]
4879 #[test]
4880 #[ignore = "observe_nondet not yet supported for top-level bounded inputs (https://github.com/hydro-project/hydro/issues/2950)"]
4881 fn sim_map_with_mut_on_unordered_top_level() {
4882 use crate::properties::manual_proof;
4883
4884 let mut flow = FlowBuilder::new();
4885 let node = flow.process::<()>();
4886
4887 let counter = node
4888 .source_iter(q!(vec![0i32]))
4889 .fold(q!(|| 0i32), q!(|acc, v| *acc += v));
4890 let counter_mut = counter.by_mut();
4891
4892 let out_recv = node
4893 .source_iter(q!(vec![1i32, 2]))
4894 .weaken_ordering::<NoOrder>()
4895 .map(q!(
4896 |x| {
4897 *counter_mut += x;
4898 *counter_mut
4899 },
4900 commutative = manual_proof!()
4901 ))
4902 .assume_ordering::<TotalOrder>(nondet!())
4903 .sim_output();
4904
4905 counter.into_stream().for_each(q!(|_| {}));
4906
4907 let count = flow.sim().exhaustive(async || {
4908 let _all: Vec<i32> = out_recv.collect().await;
4909 });
4910
4911 assert_eq!(
4912 count, 2,
4913 "Expected 2 simulation instances due to mut on unordered input, got {}",
4914 count
4915 );
4916 }
4917}