storage: add splitfdstream for efficient layer transfer with reflink support#651
storage: add splitfdstream for efficient layer transfer with reflink support#651giuseppe wants to merge 12 commits intocontainers:mainfrom
Conversation
4cca2a3 to
3b6f591
Compare
mtrmac
left a comment
There was a problem hiding this comment.
- 6% of the code base for an extremely niche use case :/
I’m afraid I can’t spend time on this now; and anyway some other work on pkg/archive needs to happen first, and expect the rebase to be somewhat non-trivial.
what other work would block this feature? I don't love either the amount of extra code, but it is almost all in new APIs/packages that are experimental and not leaking too much to the rest of the library. We went around this problem for a long time, and in the end this seems like an ~ok solution to not expose too much of the containers storage internals augmenting the tar format that is already plugged everywhere. |
3b6f591 to
26cd4b6
Compare
|
to add more context: the goal of this feature is to be able to copy a layer, similarly to what we would do with There are other interesting use cases enabled with a RPC like facilitate usage from different languages (Rust in our case), inject into a build container to grab R/O access to layers/images, but this is out of scope now. I realize complexity is not trivial just to get access to the raw files, but what alternatives do we have? From the store API PoV we could just extend it to return a map Any suggestions? |
|
(Just to avoid a misunderstanding, I didn’t really read the PR. I have, at this point, ~no opinion on RPC vs. Go API.) |
storage/pkg/archive/archive.go
Outdated
| return &tarReaderIterator{ | ||
| tr: tr, | ||
| trBuf: trBuf, | ||
| buffer: make([]byte, 1<<20), |
There was a problem hiding this comment.
Seems like 1<<20 should be in a const somewhere with a rationale?
|
|
||
| // splitFDStreamSocketDescriptor is the fd for the Unix socket used to | ||
| // receive file descriptors via SCM_RIGHTS in the re-exec child. | ||
| const splitFDStreamSocketDescriptor = 5 |
There was a problem hiding this comment.
Hardcoding this seems a bit fragile, can't we pass it as an env var or a CLI arg?
| // FDs are streamed to the child process one-at-a-time over a Unix socket | ||
| // using SCM_RIGHTS, avoiding EMFILE from inheriting too many FDs at exec. |
There was a problem hiding this comment.
It doesn't have to be truly one at a time, see also https://github.com/cgwalters/jsonrpc-fdpass?tab=readme-ov-file#41-fd-batching
In my initial PoC work here I handled this by just sending the splitfdstream inline data itself as a fd (could be O_TMPFILE or memfd or pipe), and then sending all the associated fds over jsonrpc-fdpass which automatically handles buffering.
Ensure we processing just 1 recvmsg() at a time (only ~200 fds on Linux) seems really reasonable and unlikely to get anywhere close to modern fd limits.
So if we do choose jsonrpc-fdpass, I think we also need to standardize this "splitfdstream serialization".
| return err | ||
| } | ||
|
|
||
| // Try copy_file_range - kernel-level copy, more efficient than userspace |
There was a problem hiding this comment.
Surprising we weren't doing this before.
storage/pkg/splitfdstream/fdpass.go
Outdated
|
|
||
| // ReadLine reads bytes until a newline is encountered, using recvmsg. | ||
| // Any FDs received are closed since line-reading doesn't expect them. | ||
| func (p *FDPasser) ReadLine() ([]byte, error) { |
There was a problem hiding this comment.
Why would one want this?
|
|
||
| // JSON-RPC 2.0 protocol constants | ||
| const ( | ||
| JSONRPCVersion = "2.0" |
There was a problem hiding this comment.
BTW see also https://github.com/cgwalters/jsonrpc-fdpass-go
23ed544 to
8d74f84
Compare
|
still PoC, so you can avoid a review as it is not yet ready but we can use to analyze the complexity. Moved to use https://github.com/cgwalters/jsonrpc-fdpass-go Now the code size is almost 50% than the last revision |
8d74f84 to
51a6c01
Compare
|
Packit jobs failed. @containers/packit-build please check. |
1b087bb to
bc6c5ff
Compare
a0876b6 to
d77fbc4
Compare
d77fbc4 to
6d605ac
Compare
6d605ac to
0ebb56c
Compare
|
@giuseppe looks like a rebase may be needed here. |
88bf912 to
a7806a8
Compare
|
@kolyshkin are these changes to the tar package a problem for the work you are doing there? |
|
@kolyshkin can you PTAL? |
| } | ||
| defer tarStream.Close() | ||
|
|
||
| // Write splitfdstream data directly to a memfd to avoid buffering in memory. |
There was a problem hiding this comment.
This type of comment I find a bit distracting because it's almost circular - a memfd is memory too...how about just:
A memfd is a convenient way to buffer the metadata
Or maybe just no comment at all. Dunno.
Here and in other places like
// Set up ID mappings
idMappings := options.IDMappings
The comment basically adds nothing.
| return streamFile, fds, nil | ||
| } | ||
|
|
||
| // convertTarToSplitFDStream converts a tar stream to a splitfdstream by parsing |
There was a problem hiding this comment.
This obviously works but it seems pretty inefficient to do - i.e. to read the entire contents of large file into memory and just discard it.
I guess we're doing this because the current API is "give a tarball" and this just builds on that without changing anything more fundamental?
| if buf == nil { | ||
| buf = make([]byte, archive.CopyBufferSize) | ||
| } | ||
| // File not found in diff directory (e.g., naiveDiff was used), |
There was a problem hiding this comment.
When would the file not be found?
| // Write a single prefix for the total size, then stream | ||
| // data in chunks. The reader expects exactly one prefix | ||
| // per file entry. | ||
| if err := writer.WriteInlinePrefix(header.Size); err != nil { |
There was a problem hiding this comment.
Would perhaps be more elegant if writer.WriteInline returned a sub-io.Write impl then we could just do io.Copy to that?
common/pkg/json-proxy/handler.go
Outdated
| rb, err = h.GetLayerInfoPiped(ctx, req.Args) | ||
| case "FinishPipe": | ||
| rb, err = h.FinishPipe(ctx, req.Args) | ||
| case "GetSplitFDStreamSocket": |
There was a problem hiding this comment.
It seems like this may grow into a general replacement for the bespoke proxy API right? How about calling it
OpenJsonRPCFdPass ?
| "runtime" | ||
| "sync" | ||
|
|
||
| fdpass "github.com/cgwalters/jsonrpc-fdpass-go" |
There was a problem hiding this comment.
This can use https://github.com/bootc-dev/jsonrpc-fdpass-go now
There was a problem hiding this comment.
| // The library limits to MaxFDsPerMessage per sendmsg, so remaining | ||
| // FDs are sent as follow-up "fds" notifications. |
There was a problem hiding this comment.
Hmm the jsonrpc-fdpass protocol specifies to send more fds as whitespace but it's possible the Go implementation isn't doing this right
There was a problem hiding this comment.
| // findManifest finds the image manifest from BigData keys. | ||
| // It looks for manifest-* keys containing an image manifest (has "config" field), | ||
| // filtering out manifest lists/indexes. | ||
| func findManifest(store Store, imageID string) ([]byte, error) { |
There was a problem hiding this comment.
Surely there's a common API for this?
Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
7af6099 to
daa8688
Compare
|
I think this is overall OK as is, however - just to cross-reference from composefs/composefs-rs#228 (comment) - I think we should get a bit more ambitious and try adding a full new proxy API based on jsonrpc-fdpass instead. There's a lot of warts and bugs we could fix at the same time (like the bad idea of auto-converting v2s2 to OCI and other things from the historical PRs there). That would then make the architecture much clearer: There's a jsonrpc-fdpass API for Just to also xref again I think this API has the potential to solve problems like containers/skopeo#658 (comment) too. But of course, that's where the choice of IPC becomes a big deal. |
isn't that something we can do incrementally? I've tried to keep this as small as possible (e.g. dropped write support) to facilitate merging it, but I think we should get that back and let the copy code handle this as a shortcut for more efficient copies to different stores |
cgwalters
left a comment
There was a problem hiding this comment.
Assisted-by: OpenCode (Claude Opus 4)
Comments prefixed with "AI:" are unedited AI output.
Comments on lines outside the diff:
storage/store.go:3829
AI: Important: Shutdown() cleans up the graph driver (line 3883: s.graphDriver.Cleanup()) but never calls s.jsonRPCServer.Stop(). Any goroutines spawned by HandleConnection may still be running and holding references to the now-cleaned-up driver, which could cause panics or undefined behavior.
Shutdown should stop the JSON-RPC server and wait for connections to drain before cleaning up the driver.
| } | ||
|
|
||
| resp := fdpass.NewResponse(result, req.ID) | ||
| if err := sendRetry(sender, &fdpass.MessageWithFds{ |
There was a problem hiding this comment.
AI: Important: After sendRetry succeeds, the server-side copies of streamFile and all fileFDs in allFDs are never closed. The sendmsg syscall duplicates file descriptors into the kernel's SCM_RIGHTS, so the sender's copies remain open. The Sender.Send() method in the transport library does not close them either (it only calls f.Fd() for the raw int).
This leaks 1 + N file descriptors per successful GetSplitFDStream call for the lifetime of the connection. On the error path (line 280-281) they're also not closed.
Suggested fix: add cleanup after send:
for _, f := range allFDs {
f.Close()
}Or, perhaps we take ownership of them in the jsonrpc-fdpass-go side?
| } | ||
| } | ||
|
|
||
| func (s *JSONRPCServer) handleRequest(sender *fdpass.Sender, req *fdpass.Request, fds []*os.File) { |
There was a problem hiding this comment.
AI: Important: The fds parameter (file descriptors received from the client message) is accepted but never used and never closed in any code path. If a client sends file descriptors with any request, they will be leaked. Even if current methods don't expect incoming FDs, the server should close them defensively:
defer func() {
for _, f := range fds {
f.Close()
}
}()| ) | ||
|
|
||
| // sendRetry retries sender.Send on EAGAIN (non-blocking socket buffer full). | ||
| func sendRetry(sender *fdpass.Sender, msg *fdpass.MessageWithFds) error { |
There was a problem hiding this comment.
AI: Important: This is an unbounded busy-loop with only runtime.Gosched() as backoff. If the socket buffer stays full (e.g., the peer is slow or blocked), this will spin a CPU core indefinitely with no backpressure, no maximum retry count, and no context/deadline support.
At minimum this should use exponential backoff (e.g., time.Sleep starting at 1ms, doubling up to some cap), or ideally poll/select on the fd for writability. A context parameter would also allow callers to set deadlines.
Wait sorry and perhaps I'm missing something but why do we need this API at all? I think we should just be able to use the jsonrpc-fdpass-go library at a high level without doing any socket stuff directly, no?
| return nil, nil, fmt.Errorf("failed to create client connection: %w", err) | ||
| } | ||
|
|
||
| serverConn, err := net.FileConn(serverFile) |
There was a problem hiding this comment.
AI: FD leak on error: if net.FileConn(serverFile) fails at this line, clientFile is not closed — only clientConn (which wraps a dup'd copy) and serverFile are cleaned up. The original fd held by clientFile leaks.
Fix: defer clientFile.Close() and defer serverFile.Close() immediately after creating them at lines 497-498. The redundant Close() calls at 514-515 are then just harmless no-ops.
| // Add to the WaitGroup before spawning the goroutine to avoid | ||
| // a race with Stop() -> connections.Wait(). | ||
| s.jsonRPCServer.TrackConnection() | ||
| go s.jsonRPCServer.HandleConnection(serverConn) |
There was a problem hiding this comment.
AI: Important: SplitFDStreamSocket() acquires the graph lock via startUsingGraphDriver() but releases it at function return via defer s.stopUsingGraphDriver(). The spawned goroutine running HandleConnection will call s.driver.GetSplitFDStream() on the stored driver reference after the lock has been released.
This means the handler goroutine accesses the graph driver without the graph lock held. If another goroutine calls startUsingGraphDriver() which detects a modification and creates a new driver instance, the handler may be using a stale/replaced driver. The handler goroutine needs its own locking strategy (e.g., calling startUsingGraphDriver/stopUsingGraphDriver around each request).
| "go.podman.io/storage/pkg/splitfdstream" | ||
| ) | ||
|
|
||
| func TestGetSplitFDStreamStub(t *testing.T) { |
There was a problem hiding this comment.
AI: The test coverage here is very thin — only two trivial error paths (nil options, non-existent layer) and an interface assertion. There are no tests for the happy path, composefs xattr resolution, inline fallback, path traversal rejection, FD ordering, empty layers, or zero-length regular files.
Given the complexity and security sensitivity of this code (path traversal checks, FD lifecycle, tar reconstruction), more substantial test coverage would significantly increase confidence. An integration test with a real layer diff would be especially valuable.
| // InlineWriter writes a negative prefix for size bytes and returns an | ||
| // io.Writer that passes data straight through to the underlying stream. | ||
| // This lets callers use io.Copy instead of manual chunked loops. | ||
| func (w *SplitFDStreamWriter) InlineWriter(size int64) (io.Writer, error) { |
There was a problem hiding this comment.
AI: This writes a prefix declaring exactly size bytes will follow, then returns the raw underlying io.Writer with no enforcement. If a caller writes more or fewer bytes than size, the stream is silently corrupted — the reader will misinterpret subsequent data.
Current callers do use LimitReader, so this works in practice, but the API is a footgun for future callers. Consider either wrapping the returned writer in a LimitedWriter that enforces the contract, or at least adding a prominent doc comment warning.
| // Walk the layer chain using store.LayerParent | ||
| var layerIDs []string | ||
| layerID := topLayerID | ||
| for layerID != "" { |
There was a problem hiding this comment.
AI: The layer chain walk silently swallows errors from store.LayerParent (line 92 breaks on error). If the store returns a transient error mid-chain, the caller gets a truncated layer list with no error indication. This could lead to silent data loss during transfer — the receiving side would only get a subset of layers.
Also, this produces a leaf-first (reverse) ordering, while the manifest fallback (line 100-103) produces root-first. The consumer will see inconsistent ordering depending on which code path was taken.
| } | ||
|
|
||
| // Verify it's still a regular file | ||
| var fdStat unix.Stat_t |
There was a problem hiding this comment.
AI: After opening the file and verifying it is regular, the code does not verify that the file size matches header.Size. If the file on disk has been modified since the diff was computed (TOCTOU race or partial write), the consumer would get an FD reference but the actual file size wouldn't match what the tar header says, potentially causing data corruption on the receiving end.
Adding a size check (if fdStat.Size != header.Size) and falling back to inline would make this more robust.
| // TarEntryIterator abstracts iteration over tar entries. | ||
| // Standard implementation wraps tar.Reader; splitfdstream provides | ||
| // entries from its chunk-based format with reflink support. | ||
| type TarEntryIterator interface { |
There was a problem hiding this comment.
AI: (low) The doc comment on WriteContentTo (line 1102) says "Only called for TypeReg entries with Size > 0", but looking at the actual call site in UnpackFromIterator (line 1261-1263), writeContent is constructed unconditionally for every entry type, and inside extractTarFileEntry the guard is writeContent != nil (not "TypeReg with Size > 0"). So WriteContentTo will be called for zero-size regular files too. The doc comment should be corrected, or the code should add the Size > 0 guard.
Yes, there's obviously virtues to being incremental. Though a different but related problem is that as is it's harder to get real test coverage as we have to create a different synthetic client using this API. But if it was plumbed through the proxy more directly, it'd be way easier to test e2e. |
Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
Extend the store with splitfdstream capabilities exposed via a UNIX socket for JSON-RPC communication. Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
Implement the SplitFDStreamDriver interface for the overlay driver, enabling efficient layer operations with reflink support. Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
Add a new json-proxy method that returns a Unix socket file descriptor to the client. The client then speaks the jsonrpc-fdpass protocol directly over that socket for splitfdstream operations, bypassing the json-proxy for bulk data transfer. Bump protocolVersion to "0.2.9". Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
Add a new
splitfdstreampackage that enables efficient container layer transfer between storage instances by separating tar metadata from file content. File content is passed as file descriptors.