Skip to content

Add Sink.eagerFutureSink to avoid NeverMaterializedException on empty streams#2684

Open
gmethvin wants to merge 1 commit intoapache:mainfrom
gmethvin:eager-future-sink
Open

Add Sink.eagerFutureSink to avoid NeverMaterializedException on empty streams#2684
gmethvin wants to merge 1 commit intoapache:mainfrom
gmethvin:eager-future-sink

Conversation

@gmethvin
Copy link
Member

Sink.futureSink delegates to lazyFutureSink/LazySink, which defers inner sink materialization until the first element arrives. If upstream completes with no elements, the materialized Future[M] fails with NeverMaterializedException. This is problematic when a Future[Sink] needs to handle potentially empty streams (e.g. Play Framework's Accumulator.flatten for bodyless HTTP requests, see playframework/playframework#13793).

This PR adds EagerFutureSink, which materializes the inner sink when the future resolves rather than on the first element:

  • Scala: Sink.eagerFutureSink[T, M](future: Future[Sink[T, M]]): Sink[T, Future[M]]
  • Java: Sink.eagerCompletionStageSink[T, M](future: CompletionStage[Sink[T, M]]): Sink[T, CompletionStage[M]]

I'm not sure if it was a deliberate choice to make futureSink delegate to lazyFutureSink, but in 2.0, futureSink/completionStageSink could be changed to delegate to EagerFutureSink instead, since lazyFutureSink/lazyCompletionStageSink already exist for users who want first-element-triggered materialization.

@gmethvin gmethvin changed the title Add Sink.eagerFutureSink to prevent NeverMaterializedException on empty streams Add Sink.eagerFutureSink to support empty streams without NeverMaterializedException Feb 26, 2026
@gmethvin gmethvin changed the title Add Sink.eagerFutureSink to support empty streams without NeverMaterializedException Add Sink.eagerFutureSink to avoid NeverMaterializedException on empty streams Feb 26, 2026
@pjfanning
Copy link
Member

Thanks @gmethvin, this seems reasonable.

Can you use this source header on new files? https://github.com/apache/pekko/blob/main/project/AddMetaInfLicenseFiles.scala#L1-L16

Can you add @since 1.5.0 on the new public API methods?

@mdedetrich
Copy link
Contributor

mdedetrich commented Feb 26, 2026

Sink.futureSink delegates to lazyFutureSink/LazySink, which defers inner sink materialization until the first element arrives. If upstream completes with no elements, the materialized Future[M] fails with NeverMaterializedException. This is problematic when a Future[Sink] needs to handle potentially empty streams (e.g. Play Framework's Accumulator.flatten for bodyless HTTP requests, see playframework/playframework#13793).

So FYI, the context behind Sink.futureSink is that its meant to be used in cases where you have a Sink that outputs a single value (lets say a File based Sink that returns a Future[IOResult]) and you want to turn that into a Source so that you have a single stream that represents your logic flow.

Hence the reason why Sink.futureSink expects a value is that its a given that you will use that value in your future Source (in this example it would be a Source[IOResult] that only outputs a single element) and if no result is outputted by the Sink then the implication is that there is some sought of error as your Sink didn't actually properly produce a value.

This PR adds EagerFutureSink, which materializes the inner sink when the future resolves rather than on the first element:

  • Scala: Sink.eagerFutureSink[T, M](future: Future[Sink[T, M]]): Sink[T, Future[M]]
  • Java: Sink.eagerCompletionStageSink[T, M](future: CompletionStage[Sink[T, M]]): Sink[T, CompletionStage[M]]

Ill have a look at the PR in more detail later, but at the risk of bikeshedding Sink.maybeFutureSink/Sink.futureCompletionSink could be an alternative naming. I think in both cases (my suggested name and whats implemented in this PR) both have pros/cons, Sink.maybeFutureSink emphasizes that its same as Sink.futureSink/Sink.futureCompletionSink just that if there isn't a value and the future completes where as Sink.eagerFutureSink emphasizes the evaluation strategy, i.e. it eagerly awaits completion of the future. Would love to get other people's thoughts here.

@mdedetrich
Copy link
Contributor

@gmethvin The tests are failing

@pjfanning
Copy link
Member

It's hard to come up with a snappy name here. I don't have a strong opinion. Maybe this pretty literal naming might work.

  • Sink.futureSinkAllowingEmptyStream (or supporting)
  • Sink.completionStageSinkAllowingEmptyStream

@gmethvin
Copy link
Member Author

Are we considering a broader renaming of lazy* operators across the API? As long as lazy continues to mean “defers creation/materialization until the first element arrives” (or first demand), eager feels like the natural counterpart here.

futureSink would arguably be the best name for this, as it's consistent with existing stream naming where laziness is opt-in via a lazy prefix (Source.lazySingle, Source.lazyFuture, Source.lazyFutureSource, Flow.lazyFutureFlow). I needed the eager prefix only because futureSink is already used for the current lazy semantics (futureSink delegating to lazyFutureSink).

@pjfanning
Copy link
Member

One possibility is to introduce a boolean parameter on the futureSink method to say if want to support empty streams. In 1.x, we can default to false and in 2.x we can change this to true.
Would a defaulted new param cause bin compat issues in 1.x? If it does then this probably a non-starter.

In 2.x, we can choose to make a major change but if we want to get this into 1.5.0, we need a low impact change. Maybe we could focus on a 1.x friendly change first and then discuss if we want to make a bigger change in 2.x.

@gmethvin
Copy link
Member Author

One possibility is to introduce a boolean parameter on the futureSink method to say if want to support empty streams. In 1.x, we can default to false and in 2.x we can change this to true.
Would a defaulted new param cause bin compat issues in 1.x? If it does then this probably a non-starter.

You'd need to overload the method with the extra param. Just adding a new param with a default value would change the method signature. If you have to add the new method anyway I feel like an explicitly-named method is better.

Also I feel like the lazy/eager semantics are important to communicate in the naming, since that's what the user cares about when choosing the operator. Otherwise they would just always choose the version that can handle empty streams.

In 2.x, we can choose to make a major change but if we want to get this into 1.5.0, we need a low impact change. Maybe we could focus on a 1.x friendly change first and then discuss if we want to make a bigger change in 2.x.

Absolutely, that's why I proposed a new name. In 2.x maybe we could just remove futureSink and keep eagerFutureSink/lazyFutureSink. The naming would be inconsistent with the rest of the API but at least it would be clear.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an “eager” variant of futureSink / completionStageSink to avoid NeverMaterializedException when the upstream completes without emitting any elements (notably for integrations that must handle empty request bodies).

Changes:

  • Introduces Sink.eagerFutureSink (Scala) and Sink.eagerCompletionStageSink (Java) APIs.
  • Implements a new internal EagerFutureSink GraphStage and wires default stage attributes.
  • Adds Scala test coverage and updates operator docs/index + cross-links from existing lazy/future sink docs.

Reviewed changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala Adds the Scala DSL eagerFutureSink factory + scaladoc.
stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala Adds the Java DSL eagerCompletionStageSink factory + Javadoc.
stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala Registers default attributes name for the new stage.
stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala Adds the internal EagerFutureSink GraphStage implementation.
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/EagerFutureSinkSpec.scala Adds behavioral tests for empty/non-empty, late future completion, and failure propagation.
stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala Ensures Scala/Java DSL factory consistency coverage includes the new API.
docs/src/main/paradox/stream/operators/index.md Adds the new operators to the docs index and operator list.
docs/src/main/paradox/stream/operators/Sink/lazyFutureSink.md Cross-links to eager variant for empty-stream behavior.
docs/src/main/paradox/stream/operators/Sink/lazyCompletionStageSink.md Cross-links to eager variant for empty-stream behavior.
docs/src/main/paradox/stream/operators/Sink/futureSink.md Documents futureSink’s lazy semantics and points to eager alternative.
docs/src/main/paradox/stream/operators/Sink/completionStageSink.md Documents completionStageSink’s lazy semantics and points to eager alternative.
docs/src/main/paradox/stream/operators/Sink/eagerFutureSink.md New operator page for Scala eager sink.
docs/src/main/paradox/stream/operators/Sink/eagerCompletionStageSink.md New operator page for Java eager sink.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@mdedetrich
Copy link
Contributor

One possibility is to introduce a boolean parameter on the futureSink method to say if want to support empty streams. In 1.x, we can default to false and in 2.x we can change this to true.
Would a defaulted new param cause bin compat issues in 1.x? If it does then this probably a non-starter.

You'd need to overload the method with the extra param. Just adding a new param with a default value would change the method signature. If you have to add the new method anyway I feel like an explicitly-named method is better.

Also I feel like the lazy/eager semantics are important to communicate in the naming, since that's what the user cares about when choosing the operator. Otherwise they would just always choose the version that can handle empty streams.

In 2.x, we can choose to make a major change but if we want to get this into 1.5.0, we need a low impact change. Maybe we could focus on a 1.x friendly change first and then discuss if we want to make a bigger change in 2.x.

Absolutely, that's why I proposed a new name. In 2.x maybe we could just remove futureSink and keep eagerFutureSink/lazyFutureSink. The naming would be inconsistent with the rest of the API but at least it would be clear.

So it is to be said that at least in pekko-streams there are 2 overloaded definitions of "lazy". One is an anonymous function that returns a future, with the anonymous function being deliberate as the future should only be executed at the specified time, hency the lazy and the other definition is, as you said, deferring the materialization.

There are 2 options we have here, one is that this change only goes in to Pekko 2.x where we reuse the futureSink method. The advantage of this change is that we will have completely consistent naming (which I think is a huge benefit) however the downsides is that there will be a behaviour change (even though this is allowed in 2.x) and the improvement will only be in Pekko 2.x. If we do this we would have to add a deprecation warning in Pekko 1.x telling users not to use futureSink as the behaviour will change in the future.

The other option is that as you said, we use a new eagerFutureSink and deprecate + remove futureSink in a later time.

I have a slight preference for the former although admittedly it would require a bit more work.

@gmethvin
Copy link
Member Author

gmethvin commented Mar 1, 2026

I'm open to either approach. The naming consistency of reclaiming futureSink in 2.x is appealing. But I think it's worth being precise about the behavioral difference, since "lazy vs eager" is also about side effects.

For value-producing sinks, the eager approach is arguably what the user wants anyway:

// Sink.seq on an empty stream
Source.empty[Int]
  .toMat(Sink.futureSink(Future.successful(Sink.seq[Int])))(Keep.right)
  .run().flatten
// Lazy: Future fails with NeverMaterializedException
// Eager: Future succeeds with Seq.empty

// Sink.fold on an empty stream
Source.empty[Int]
  .toMat(Sink.futureSink(Future.successful(Sink.fold(0)(_ + _))))(Keep.right)
  .run().flatten
// Lazy: NeverMaterializedException
// Eager: Future.successful(0)

I would expect that these users are already handling the NeverMaterializedException and mapping to an empty value, in which case that just becomes dead code, but it's not a breaking change.

But for side-effecting sinks, eager evaluation changes observable behavior:

// FileIO sink on an empty stream
Source.empty[ByteString]
  .toMat(Sink.futureSink(Future.successful(FileIO.toPath(path))))(Keep.right)
  .run().flatten
// Lazy: no file touched, NeverMaterializedException
// Eager: empty file created on disk, IOResult(0, Success(Done))

Anyone using futureSink with side-effecting sinks on potentially empty streams today might rely (intentionally or not) on the fact that the inner sink is never materialized. With eager semantics, those side effects would now fire.

The question is how likely it is that someone is depending on this difference and also ignores both the 1.x deprecation warning and the 2.x migration guide. My guess is this is rare but not zero.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants