Skip to content

[SPARK-55890] Check arrow memory at end of tests#54689

Open
garlandz-db wants to merge 22 commits intoapache:masterfrom
garlandz-db:SASP-7529
Open

[SPARK-55890] Check arrow memory at end of tests#54689
garlandz-db wants to merge 22 commits intoapache:masterfrom
garlandz-db:SASP-7529

Conversation

@garlandz-db
Copy link
Contributor

@garlandz-db garlandz-db commented Mar 9, 2026

Fixes Arrow off-heap memory leaks in Spark Connect and adds afterAll guards to detect future leaks pre-merge.

Leaks fixed:

  1. SparkConnectPlannerSuite.scala (createLocalRelationProto) — test helper called .next() without closing the
    iterator.
  2. SparkConnectProtoSuite.scala (createLocalRelationProtoByAttributeReferences) — same pattern.
  3. ArrowConvertersSuite.scala ("two batches with different schema") — two ArrowBatchWithSchemaIterator instances
    drained but never closed.

Detection guards added — afterAll assertions on ArrowUtils.rootAllocator.getAllocatedMemory == 0 in:

  • SparkConnectPlanTest (covers SparkConnectPlannerSuite, SparkConnectProtoSuite)
  • SparkConnectServerTest (covers SparkConnectServiceE2ESuite, SparkConnectServiceInternalServerSuite, and any
    future subclass)

Why are the changes needed?

ArrowUtils.rootAllocator is a JVM-wide singleton. Every toBatchWithSchemaIterator and fromIPCStream call allocates a child allocator and Arrow buffers from it. If the iterator is not explicitly closed, those buffers are never freed, causing off-heap memory growth on the driver. A related leak in the deserialization path (fromIPCStream) was fixed in SPARK-54696. This PR closes the complementary serialization-path gap that SPARK-54696 missed, and adds test-time assertions to catch regressions before merge.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • SparkConnectPlannerSuite and SparkConnectProtoSuite pass with the new afterAll memory assertion. Before fixing leaks 2 and 3, SparkConnectProtoSuite aborted with 16896 bytes still allocated, confirming the guards work.
  • ArrowConvertersSuite passes with the iterator close fix (43 tests).
  • To manually validate detection: removing iter.close() from any fixed site causes the suite to abort with a non-zero allocation count.

build/sbt "connect/Test/testOnly org.apache.spark.sql.connect.planner.SparkConnectPlannerSuite
org.apache.spark.sql.connect.planner.SparkConnectProtoSuite"
build/sbt "sql/Test/testOnly org.apache.spark.sql.execution.arrow.ArrowConvertersSuite"

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6

@hvanhovell
Copy link
Contributor

@garlandz-db can you file a JIRA ticket?

@garlandz-db garlandz-db changed the title Check arrow memory at end of tests SPARK-55890 Check arrow memory at end of tests Mar 9, 2026
@garlandz-db garlandz-db changed the title SPARK-55890 Check arrow memory at end of tests [SPARK-55890] Check arrow memory at end of tests Mar 9, 2026
@garlandz-db
Copy link
Contributor Author

done


val attributes = attrs.map(exp => AttributeReference(exp.name, exp.dataType)())
val buffer = ArrowConverters
val iter = ArrowConverters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is leaking right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without this fix

 SparkConnectProtoSuite:
     [info] org.apache.spark.sql.connect.planner.SparkConnectProtoSuite *** ABORTED *** (8 seconds, 710 milliseconds)
     [info]   16896 did not equal 0 Arrow rootAllocator memory leak: 16896 bytes still allocated
     (ArrowAllocatorLeakCheck.scala:33)
     [info]   org.scalatest.exceptions.TestFailedException:
     [info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
     [info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
     [info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
     [info]   at org.scalatest.Assertio

- SimpleSparkConnectService checks ArrowUtils.rootAllocator.getAllocatedMemory
  after shutdown and exits with code 77 (ArrowLeakExitCode) if non-zero
- RemoteSparkSession.stop() propagates exit code 77 via Runtime.getRuntime.halt()
  (the only way to fail CI from inside a JVM shutdown hook)
- ArrowLeakDetectionE2ETest verifies end-to-end detection using SPARK_TEST_ARROW_LEAK
  env var to inject a synthetic unreleased allocation in the server process

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@garlandz-db garlandz-db requested a review from hvanhovell March 11, 2026 11:20
garlandz-db and others added 5 commits March 11, 2026 17:26
- ArrowFileReadWrite.save/load: wrap writer/reader in try-finally so
  allocators are always released (fixes 3072-byte cross-suite leaks)
- ArrowFileReadWrite.write: remove redundant fileWriter.close() call
  (close() already closes it; double-close caused no-op but was confusing)
- ArrowAllocatorLeakCheck.afterAll: run super.afterAll() first so
  SparkSession/SparkConnect resources are released before leak assertion
  (fixes false-positive 131072-byte failures in SessionEventsManagerSuite
  and related suites)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…Batches

converter(rows.iterator) returns a CloseableIterator backed by
ArrowBatchWithSchemaIterator, which holds a child allocator of
ArrowUtils.rootAllocator. On the driver-side LocalTableScanExec path
there is no TaskContext, so the task-completion listener that normally
auto-closes the iterator never fires. Wrap in try-finally so the
allocator is always released.

This fixes the 131072-byte Arrow leak seen in SparkConnectServiceE2ESuite
and PipelineEventStreamSuite when ArrowAllocatorLeakCheck is mixed in.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tProtoSuite

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…aladoc

- Add ArrowFileReadWriteSuite with ArrowAllocatorLeakCheck mixin so the
  suite that directly exercises ArrowFileReadWrite.save/load also asserts
  no Arrow memory leaks after its own tests complete.
- Expand ArrowAllocatorLeakCheck scaladoc with a mixin-order warning and
  correct/incorrect usage examples, since wrong ordering causes
  false-positive leak failures.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
garlandz-db and others added 12 commits March 12, 2026 02:24
The LocalTableScanExec branch in processAsArrowBatches previously used
converter(rows.iterator) which returns a mapped iterator (not AutoCloseable).
When sendBatch threw (e.g., on client disconnect), the underlying
ArrowBatchWithSchemaIterator was never closed, leaking 131072 bytes into
ArrowUtils.rootAllocator and contaminating subsequent test suites.

Fix: bypass the converter wrapper in the LocalTableScanExec branch and
directly create an ArrowBatchWithSchemaIterator, closing it in a finally
block. Arrow's close() is idempotent (isClosed guard), so double-close
when the iterator exhausts itself is safe.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
In SparkConnectServerTest.clearAllExecutions(), after calling close()
on each ExecuteHolder, wait for the execution thread to terminate before
proceeding.

Root cause: the LocalTableScanExec branch in processAsArrowBatches calls
eventsManager.postFinished() before serializing Arrow batches. Tests that
wait for ExecuteStatus.Finished (e.g. SPARK-45133 local relation test)
exit before the execution thread finishes processing. clearAllExecutions()
interrupted the thread but did not wait for it to stop, leaving an open
ArrowBatchWithSchemaIterator (131072 bytes) in ArrowUtils.rootAllocator.
The ArrowAllocatorLeakCheck in afterAll() then detected the residual
allocation.

Fix: capture the holder list before close(), then eventuallyWithTimeout
on isExecuteThreadRunnerAlive() so the thread's finally-block runs
batches.close() before the leak check.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ch loop

When holder.close() interrupts the execution thread, the LocalTableScanExec
branch of processAsArrowBatches would continue processing all rows (1M+ in
the SPARK-45133 test) because the tight loop had no interrupt check. This
left the ArrowBatchWithSchemaIterator open for seconds after clearAllExecutions()
returned, causing ArrowAllocatorLeakCheck to see 131072 bytes still allocated.

Fix: check Thread.currentThread().isInterrupted() at each loop iteration and
throw InterruptedException immediately, ensuring the finally { batches.close() }
runs before the execution thread terminates.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… thread interrupt

Move batch.close() into try-finally in ArrowBatchIterator.next() and
ArrowBatchWithSchemaIterator.next() so retained Arrow buffers are always
released even when MessageSerializer.serialize() or serializeBatch() throws
ClosedByInterruptException (triggered when the NIO WritableByteChannelImpl
channel is interrupted mid-write by clearAllExecutions()).

Also make ArrowBatchIterator.close() idempotent and thread-safe using
AtomicBoolean.compareAndSet, with try-finally to ensure allocator.close()
runs even if root.close() throws.

Co-authored-by: Isaac
…ontext stops

Register the Arrow allocator check at shutdown-hook priority 10 (below
SPARK_CONTEXT_SHUTDOWN_PRIORITY=50) so it runs after SparkContext has fully
wound down. Poll for up to 2 minutes instead of checking immediately, since
execution threads may still be flushing their last Arrow batch. Use
Runtime.getRuntime.halt() instead of exit() to avoid deadlock inside a
shutdown hook. The synthetic SPARK_TEST_ARROW_LEAK injection also moves to
the hook so SparkContext shutdown cannot release the child allocator before
the check runs.

Co-authored-by: Isaac
…ed Cleaner release

Arrow's AllocationManager registers a java.lang.ref.Cleaner on ArrowBuf.
When the returned buffer reference is discarded and GC runs during
Thread.sleep() in the polling loop, the Cleaner fires, decrements
getAllocatedMemory to 0, and the leak check sees no leak (exit 0 instead
of 77). Fix: store the buffer in a val that spans the polling loop and
use Reference.reachabilityFence() in a finally block to prevent premature
collection.

Co-authored-by: Isaac
Move synthetic leak injection from the shutdown hook to the stop handler
(after sparkSession.close()), where rootAllocator is guaranteed to still be
open. Store the buffer in an object-level @volatile field instead of a local
variable so the JVM cannot reclaim it. Short-circuit the 2-minute polling
loop in the hook when testLeakBuf is set, so the test does not time out
waiting for memory that is intentionally never freed.

The previous approach (creating the leak inside the hook + reachabilityFence)
failed because Arrow uses pure reference counting (no Cleaner/GC), so GC was
not the issue; the hook itself was silently throwing when it tried to create a
child allocator in an unexpected state.

Co-authored-by: Isaac
…y leak detection

- ArrowConverters.scala: remove AtomicBoolean idempotent close guard (not needed
  since test fixes avoid double-close: iterator exhaustion via hasNext auto-closes,
  and single .next() callers use explicit try/finally)
- ArrowFileReadWrite.scala: remove redundant fileWriter.close() from write()
  (SparkArrowFileWriter.close() already closes fileWriter + root + allocator;
  save() wraps write() in try/finally calling close())
- SparkConnectPlanExecution.scala: revert LocalTableScanExec inlined iterator
  back to simpler converter(rows.iterator).foreach form
transformLocalRelation exhausts the iterator via toArray() which
triggers auto-close (hasNext→false), then the finally block calls
rows.close() again. Double-closing Arrow's BufferAllocator corrupts
its accounting, leaving non-zero bytes in the root allocator after
tests complete. Guard with a closed flag to prevent double-close.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants