Add Sink.eagerFutureSink to avoid NeverMaterializedException on empty streams#2684
Add Sink.eagerFutureSink to avoid NeverMaterializedException on empty streams#2684gmethvin wants to merge 1 commit intoapache:mainfrom
Conversation
60a2162 to
5aee236
Compare
5aee236 to
56f937e
Compare
|
Thanks @gmethvin, this seems reasonable. Can you use this source header on new files? https://github.com/apache/pekko/blob/main/project/AddMetaInfLicenseFiles.scala#L1-L16 Can you add |
56f937e to
5ec9455
Compare
So FYI, the context behind Hence the reason why
Ill have a look at the PR in more detail later, but at the risk of bikeshedding |
|
@gmethvin The tests are failing |
5ec9455 to
5dfc90d
Compare
|
It's hard to come up with a snappy name here. I don't have a strong opinion. Maybe this pretty literal naming might work.
|
|
Are we considering a broader renaming of
|
5dfc90d to
6d43071
Compare
|
One possibility is to introduce a boolean parameter on the futureSink method to say if want to support empty streams. In 1.x, we can default to false and in 2.x we can change this to true. In 2.x, we can choose to make a major change but if we want to get this into 1.5.0, we need a low impact change. Maybe we could focus on a 1.x friendly change first and then discuss if we want to make a bigger change in 2.x. |
You'd need to overload the method with the extra param. Just adding a new param with a default value would change the method signature. If you have to add the new method anyway I feel like an explicitly-named method is better. Also I feel like the lazy/eager semantics are important to communicate in the naming, since that's what the user cares about when choosing the operator. Otherwise they would just always choose the version that can handle empty streams.
Absolutely, that's why I proposed a new name. In 2.x maybe we could just remove |
6d43071 to
52df19f
Compare
There was a problem hiding this comment.
Pull request overview
Adds an “eager” variant of futureSink / completionStageSink to avoid NeverMaterializedException when the upstream completes without emitting any elements (notably for integrations that must handle empty request bodies).
Changes:
- Introduces
Sink.eagerFutureSink(Scala) andSink.eagerCompletionStageSink(Java) APIs. - Implements a new internal
EagerFutureSinkGraphStage and wires default stage attributes. - Adds Scala test coverage and updates operator docs/index + cross-links from existing lazy/future sink docs.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala | Adds the Scala DSL eagerFutureSink factory + scaladoc. |
| stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala | Adds the Java DSL eagerCompletionStageSink factory + Javadoc. |
| stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala | Registers default attributes name for the new stage. |
| stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala | Adds the internal EagerFutureSink GraphStage implementation. |
| stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/EagerFutureSinkSpec.scala | Adds behavioral tests for empty/non-empty, late future completion, and failure propagation. |
| stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala | Ensures Scala/Java DSL factory consistency coverage includes the new API. |
| docs/src/main/paradox/stream/operators/index.md | Adds the new operators to the docs index and operator list. |
| docs/src/main/paradox/stream/operators/Sink/lazyFutureSink.md | Cross-links to eager variant for empty-stream behavior. |
| docs/src/main/paradox/stream/operators/Sink/lazyCompletionStageSink.md | Cross-links to eager variant for empty-stream behavior. |
| docs/src/main/paradox/stream/operators/Sink/futureSink.md | Documents futureSink’s lazy semantics and points to eager alternative. |
| docs/src/main/paradox/stream/operators/Sink/completionStageSink.md | Documents completionStageSink’s lazy semantics and points to eager alternative. |
| docs/src/main/paradox/stream/operators/Sink/eagerFutureSink.md | New operator page for Scala eager sink. |
| docs/src/main/paradox/stream/operators/Sink/eagerCompletionStageSink.md | New operator page for Java eager sink. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
52df19f to
60559d7
Compare
60559d7 to
4d104a9
Compare
So it is to be said that at least in pekko-streams there are 2 overloaded definitions of "lazy". One is an anonymous function that returns a future, with the anonymous function being deliberate as the future should only be executed at the specified time, hency the lazy and the other definition is, as you said, deferring the materialization. There are 2 options we have here, one is that this change only goes in to Pekko 2.x where we reuse the The other option is that as you said, we use a new I have a slight preference for the former although admittedly it would require a bit more work. |
|
I'm open to either approach. The naming consistency of reclaiming For value-producing sinks, the eager approach is arguably what the user wants anyway: // Sink.seq on an empty stream
Source.empty[Int]
.toMat(Sink.futureSink(Future.successful(Sink.seq[Int])))(Keep.right)
.run().flatten
// Lazy: Future fails with NeverMaterializedException
// Eager: Future succeeds with Seq.empty
// Sink.fold on an empty stream
Source.empty[Int]
.toMat(Sink.futureSink(Future.successful(Sink.fold(0)(_ + _))))(Keep.right)
.run().flatten
// Lazy: NeverMaterializedException
// Eager: Future.successful(0)I would expect that these users are already handling the But for side-effecting sinks, eager evaluation changes observable behavior: // FileIO sink on an empty stream
Source.empty[ByteString]
.toMat(Sink.futureSink(Future.successful(FileIO.toPath(path))))(Keep.right)
.run().flatten
// Lazy: no file touched, NeverMaterializedException
// Eager: empty file created on disk, IOResult(0, Success(Done))Anyone using The question is how likely it is that someone is depending on this difference and also ignores both the 1.x deprecation warning and the 2.x migration guide. My guess is this is rare but not zero. |
Sink.futureSinkdelegates tolazyFutureSink/LazySink, which defers inner sink materialization until the first element arrives. If upstream completes with no elements, the materializedFuture[M]fails withNeverMaterializedException. This is problematic when aFuture[Sink]needs to handle potentially empty streams (e.g. Play Framework'sAccumulator.flattenfor bodyless HTTP requests, see playframework/playframework#13793).This PR adds
EagerFutureSink, which materializes the inner sink when the future resolves rather than on the first element:Sink.eagerFutureSink[T, M](future: Future[Sink[T, M]]): Sink[T, Future[M]]Sink.eagerCompletionStageSink[T, M](future: CompletionStage[Sink[T, M]]): Sink[T, CompletionStage[M]]I'm not sure if it was a deliberate choice to make
futureSinkdelegate tolazyFutureSink, but in 2.0,futureSink/completionStageSinkcould be changed to delegate toEagerFutureSinkinstead, sincelazyFutureSink/lazyCompletionStageSinkalready exist for users who want first-element-triggered materialization.