[SPARK-55890] Check arrow memory at end of tests#54689
Open
garlandz-db wants to merge 22 commits intoapache:masterfrom
Open
[SPARK-55890] Check arrow memory at end of tests#54689garlandz-db wants to merge 22 commits intoapache:masterfrom
garlandz-db wants to merge 22 commits intoapache:masterfrom
Conversation
Contributor
|
@garlandz-db can you file a JIRA ticket? |
Contributor
Author
|
done |
hvanhovell
reviewed
Mar 9, 2026
|
|
||
| val attributes = attrs.map(exp => AttributeReference(exp.name, exp.dataType)()) | ||
| val buffer = ArrowConverters | ||
| val iter = ArrowConverters |
Contributor
Author
There was a problem hiding this comment.
without this fix
SparkConnectProtoSuite:
[info] org.apache.spark.sql.connect.planner.SparkConnectProtoSuite *** ABORTED *** (8 seconds, 710 milliseconds)
[info] 16896 did not equal 0 Arrow rootAllocator memory leak: 16896 bytes still allocated
(ArrowAllocatorLeakCheck.scala:33)
[info] org.scalatest.exceptions.TestFailedException:
[info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info] at org.scalatest.Assertio
- SimpleSparkConnectService checks ArrowUtils.rootAllocator.getAllocatedMemory after shutdown and exits with code 77 (ArrowLeakExitCode) if non-zero - RemoteSparkSession.stop() propagates exit code 77 via Runtime.getRuntime.halt() (the only way to fail CI from inside a JVM shutdown hook) - ArrowLeakDetectionE2ETest verifies end-to-end detection using SPARK_TEST_ARROW_LEAK env var to inject a synthetic unreleased allocation in the server process Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- ArrowFileReadWrite.save/load: wrap writer/reader in try-finally so allocators are always released (fixes 3072-byte cross-suite leaks) - ArrowFileReadWrite.write: remove redundant fileWriter.close() call (close() already closes it; double-close caused no-op but was confusing) - ArrowAllocatorLeakCheck.afterAll: run super.afterAll() first so SparkSession/SparkConnect resources are released before leak assertion (fixes false-positive 131072-byte failures in SessionEventsManagerSuite and related suites) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…Batches converter(rows.iterator) returns a CloseableIterator backed by ArrowBatchWithSchemaIterator, which holds a child allocator of ArrowUtils.rootAllocator. On the driver-side LocalTableScanExec path there is no TaskContext, so the task-completion listener that normally auto-closes the iterator never fires. Wrap in try-finally so the allocator is always released. This fixes the 131072-byte Arrow leak seen in SparkConnectServiceE2ESuite and PipelineEventStreamSuite when ArrowAllocatorLeakCheck is mixed in. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tProtoSuite Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…aladoc - Add ArrowFileReadWriteSuite with ArrowAllocatorLeakCheck mixin so the suite that directly exercises ArrowFileReadWrite.save/load also asserts no Arrow memory leaks after its own tests complete. - Expand ArrowAllocatorLeakCheck scaladoc with a mixin-order warning and correct/incorrect usage examples, since wrong ordering causes false-positive leak failures. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The LocalTableScanExec branch in processAsArrowBatches previously used converter(rows.iterator) which returns a mapped iterator (not AutoCloseable). When sendBatch threw (e.g., on client disconnect), the underlying ArrowBatchWithSchemaIterator was never closed, leaking 131072 bytes into ArrowUtils.rootAllocator and contaminating subsequent test suites. Fix: bypass the converter wrapper in the LocalTableScanExec branch and directly create an ArrowBatchWithSchemaIterator, closing it in a finally block. Arrow's close() is idempotent (isClosed guard), so double-close when the iterator exhausts itself is safe. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
In SparkConnectServerTest.clearAllExecutions(), after calling close() on each ExecuteHolder, wait for the execution thread to terminate before proceeding. Root cause: the LocalTableScanExec branch in processAsArrowBatches calls eventsManager.postFinished() before serializing Arrow batches. Tests that wait for ExecuteStatus.Finished (e.g. SPARK-45133 local relation test) exit before the execution thread finishes processing. clearAllExecutions() interrupted the thread but did not wait for it to stop, leaving an open ArrowBatchWithSchemaIterator (131072 bytes) in ArrowUtils.rootAllocator. The ArrowAllocatorLeakCheck in afterAll() then detected the residual allocation. Fix: capture the holder list before close(), then eventuallyWithTimeout on isExecuteThreadRunnerAlive() so the thread's finally-block runs batches.close() before the leak check. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ch loop
When holder.close() interrupts the execution thread, the LocalTableScanExec
branch of processAsArrowBatches would continue processing all rows (1M+ in
the SPARK-45133 test) because the tight loop had no interrupt check. This
left the ArrowBatchWithSchemaIterator open for seconds after clearAllExecutions()
returned, causing ArrowAllocatorLeakCheck to see 131072 bytes still allocated.
Fix: check Thread.currentThread().isInterrupted() at each loop iteration and
throw InterruptedException immediately, ensuring the finally { batches.close() }
runs before the execution thread terminates.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… thread interrupt Move batch.close() into try-finally in ArrowBatchIterator.next() and ArrowBatchWithSchemaIterator.next() so retained Arrow buffers are always released even when MessageSerializer.serialize() or serializeBatch() throws ClosedByInterruptException (triggered when the NIO WritableByteChannelImpl channel is interrupted mid-write by clearAllExecutions()). Also make ArrowBatchIterator.close() idempotent and thread-safe using AtomicBoolean.compareAndSet, with try-finally to ensure allocator.close() runs even if root.close() throws. Co-authored-by: Isaac
…ontext stops Register the Arrow allocator check at shutdown-hook priority 10 (below SPARK_CONTEXT_SHUTDOWN_PRIORITY=50) so it runs after SparkContext has fully wound down. Poll for up to 2 minutes instead of checking immediately, since execution threads may still be flushing their last Arrow batch. Use Runtime.getRuntime.halt() instead of exit() to avoid deadlock inside a shutdown hook. The synthetic SPARK_TEST_ARROW_LEAK injection also moves to the hook so SparkContext shutdown cannot release the child allocator before the check runs. Co-authored-by: Isaac
…kConnectService Co-authored-by: Isaac
…ed Cleaner release Arrow's AllocationManager registers a java.lang.ref.Cleaner on ArrowBuf. When the returned buffer reference is discarded and GC runs during Thread.sleep() in the polling loop, the Cleaner fires, decrements getAllocatedMemory to 0, and the leak check sees no leak (exit 0 instead of 77). Fix: store the buffer in a val that spans the polling loop and use Reference.reachabilityFence() in a finally block to prevent premature collection. Co-authored-by: Isaac
Move synthetic leak injection from the shutdown hook to the stop handler (after sparkSession.close()), where rootAllocator is guaranteed to still be open. Store the buffer in an object-level @volatile field instead of a local variable so the JVM cannot reclaim it. Short-circuit the 2-minute polling loop in the hook when testLeakBuf is set, so the test does not time out waiting for memory that is intentionally never freed. The previous approach (creating the leak inside the hook + reachabilityFence) failed because Arrow uses pure reference counting (no Cleaner/GC), so GC was not the issue; the hook itself was silently throwing when it tried to create a child allocator in an unexpected state. Co-authored-by: Isaac
…y leak detection - ArrowConverters.scala: remove AtomicBoolean idempotent close guard (not needed since test fixes avoid double-close: iterator exhaustion via hasNext auto-closes, and single .next() callers use explicit try/finally) - ArrowFileReadWrite.scala: remove redundant fileWriter.close() from write() (SparkArrowFileWriter.close() already closes fileWriter + root + allocator; save() wraps write() in try/finally calling close()) - SparkConnectPlanExecution.scala: revert LocalTableScanExec inlined iterator back to simpler converter(rows.iterator).foreach form
transformLocalRelation exhausts the iterator via toArray() which triggers auto-close (hasNext→false), then the finally block calls rows.close() again. Double-closing Arrow's BufferAllocator corrupts its accounting, leaving non-zero bytes in the root allocator after tests complete. Guard with a closed flag to prevent double-close.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes Arrow off-heap memory leaks in Spark Connect and adds afterAll guards to detect future leaks pre-merge.
Leaks fixed:
iterator.
drained but never closed.
Detection guards added — afterAll assertions on ArrowUtils.rootAllocator.getAllocatedMemory == 0 in:
future subclass)
Why are the changes needed?
ArrowUtils.rootAllocator is a JVM-wide singleton. Every toBatchWithSchemaIterator and fromIPCStream call allocates a child allocator and Arrow buffers from it. If the iterator is not explicitly closed, those buffers are never freed, causing off-heap memory growth on the driver. A related leak in the deserialization path (fromIPCStream) was fixed in SPARK-54696. This PR closes the complementary serialization-path gap that SPARK-54696 missed, and adds test-time assertions to catch regressions before merge.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
build/sbt "connect/Test/testOnly org.apache.spark.sql.connect.planner.SparkConnectPlannerSuite
org.apache.spark.sql.connect.planner.SparkConnectProtoSuite"
build/sbt "sql/Test/testOnly org.apache.spark.sql.execution.arrow.ArrowConvertersSuite"
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6