[DataFlow runtime · M6] MooncakeFeatureStore zero-copy transport (put_from/get_into)#614
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a zero-copy transport mechanism for MooncakeFeatureStore as the default path, storing individual tensors directly as raw buffers instead of a single pickled blob. It adds support for raw DMA transfers via put_from and get_into with buffer registration, falling back to the legacy pickle path if the backend lacks these APIs. The changes also include comprehensive unit tests for zero-copy and cross-process behaviors. Feedback on the PR highlights a potential bug in _try_physical_free where partial deletion failures can block subsequent retries if the store rejects already-deleted keys, suggesting a dynamic update to _sample_names. Additionally, the reviewer noted that the newly added helper method _tensor_keys is unused and should be removed.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| def _try_physical_free(self, sample_id: str) -> bool: | ||
| """Remove the remote object. False on a (retryable) RPC failure.""" | ||
| return self._store_remove(self._key(sample_id)) | ||
| """Remove the remote object(s). False on a (retryable) RPC failure. | ||
|
|
||
| Zero-copy: one object per tensor, so remove every per-tensor key of the | ||
| sample's current generation. Pickle: a single object. | ||
| """ | ||
| if not self._zero_copy: | ||
| return self._store_remove(self._key(sample_id)) | ||
| gen = self._generation.get(sample_id) | ||
| if gen is None: | ||
| return True # nothing tracked to remove (already freed) | ||
| ok = True | ||
| for name in self._sample_names.get(sample_id, []): | ||
| if not self._store_remove(self._tkey(sample_id, gen, name)): | ||
| ok = False | ||
| return ok |
There was a problem hiding this comment.
In zero-copy mode, a sample consists of multiple per-tensor objects. If _try_physical_free fails halfway (e.g., due to a transient network issue during one of the _store_remove calls), it returns False, and the sample is placed in _release_pending. During subsequent gc() retries, _try_physical_free is called again and will attempt to remove all keys in _sample_names from the beginning. If the underlying Mooncake store returns an error (non-zero status) when attempting to remove already-deleted keys, _store_remove will return False for those keys. This causes _try_physical_free to repeatedly fail, preventing the remaining keys from ever being cleaned up properly, leading to orphaned objects and leaks. To fix this, we should dynamically update self._sample_names[sample_id] to only keep the names of the tensors that failed to be removed. This avoids redundant removal attempts on already-deleted keys and ensures robust retry behavior.
def _try_physical_free(self, sample_id: str) -> bool:\n \"\"\"Remove the remote object(s). False on a (retryable) RPC failure.\n\n Zero-copy: one object per tensor, so remove every per-tensor key of the\n sample's current generation. Pickle: a single object.\n \"\"\"\n if not self._zero_copy:\n return self._store_remove(self._key(sample_id))\n gen = self._generation.get(sample_id)\n if gen is None:\n return True # nothing tracked to remove (already freed)\n names = self._sample_names.get(sample_id, [])\n failed_names = []\n for name in names:\n if not self._store_remove(self._tkey(sample_id, gen, name)):\n failed_names.append(name)\n if failed_names:\n self._sample_names[sample_id] = failed_names\n return False\n return True| def _tensor_keys(self, sample_id: str, gen: int) -> List[str]: | ||
| return [ | ||
| self._tkey(sample_id, gen, n) | ||
| for n in self._sample_names.get(sample_id, []) | ||
| ] |
73023aa to
207e0d4
Compare
Replace the torch.save/torch.load pickle round-trip with Mooncake's native
raw-buffer DMA. One hard-pinned object per *tensor*, keyed
{store_id}/{sid}/g{gen}/{name}: put() writes each tensor straight from its
storage via put_from(ptr); get() reads each straight into a tensor allocated
from the ref's FeatureSpec via get_into(ptr). Shape/dtype travel on the ref, so
there is no serialized header. The generation lives in the key (like
SharedDirFeatureStore's filename gen), so a re-put supersedes the old key set
and a stale ref's keys are gone -> get() raises (B5). Hard-pin is preserved
(put_from carries the ReplicateConfig). Falls back to the pickle blob path when
the backend lacks put_from/get_into (zero_copy=False or an older mooncake).
Source + receive buffers are registered with the transfer engine around the DMA:
RDMA rejects an unregistered address (AddressNotRegistered, -800); TCP ignores
the registration. Validated cross-node on 2x H200 over BOTH tcp and rdma --
producer put_from on node 0, consumer get_into + FSDP train on node 1.
Tests: the fake now simulates put_from/get_into via ctypes, so the full contract
runs on the zero-copy path (28 tests, real gated test green on a live master).
Adds zero-copy specifics (per-tensor keys, no-pickle-on-the-wire, re-put
supersede, pickle fallback) + cross-process contract tests (producer/consumer as
separate instances over one backend) + abort-under-lease-defer and
re-put-remove-failure edge cases.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
6b3e016 to
1a3278b
Compare
207e0d4 to
f057907
Compare
…or_keys _store_get_tensor only checked get_into's return for a negative error code, so a short read (0 <= rc < nb) into the freshly torch.empty'd receive buffer was accepted, handing the trainer a tensor with an uninitialized garbage tail. Unlike the pickle path (torch.load reconstructs whole tensors), the raw-buffer path cannot otherwise detect under-fill. get_into returns the bytes read (a full read == nb), so require rc == nb and raise KeyError on a short read. Adds a zero-copy regression test that truncates the stored blob. Also remove the dead _tensor_keys() helper: it has no call sites, while _try_physical_free and _sample_exists each inline the same per-tensor-key comprehension. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Stacks on #612 (
dataflow-up-15-mooncake). Merge bottom-up. First PR of a short series toward online disaggregated training (no hot-switch): this one delivers the zero-copy transport; follow-ups addbuild_disagg_online_eagle3_runtime+ cross-pool consume-once/backpressure, then a two-axis staleness gate.What
Replaces the
torch.save/torch.loadpickle round-trip inMooncakeFeatureStorewith Mooncake's native raw-buffer DMA:{store_id}/{sample_id}/g{gen}/{name}.put()writes each tensor straight from its storage viaput_from(ptr, nbytes, ReplicateConfig);get()reads each straight into a tensor allocated from the ref'sFeatureSpecviaget_into(ptr, nbytes). Shape/dtype travel on the ref → no serialized header.SharedDirFeatureStore's filename gen): a re-put supersedes the old key set, so a stale ref's keys are gone →get()raises (B5).put_fromcarries theReplicateConfig).zero_copy=False(or a backend withoutput_from/get_into) transparently uses the pickle blob path.Source and receive buffers are registered with the transfer engine around the DMA — RDMA rejects an unregistered address (
AddressNotRegistered, status -800); TCP ignores it.Validation (2× H200, cross-node)
TestMooncakeFeatureStoreRealagainst a livemooncake_master(now exercising the zero-copy path).put_fromon node 0 → consumerget_into+ FSDP on node 1) over bothtcpandrdma, end-to-end throughbuild_disagg_eagle3_runtime.Tests
The in-memory fake now simulates
put_from/get_intoviactypes, so the whole contract runs on the zero-copy path. Adds zero-copy specifics (per-tensor keys, no-pickle-on-the-wire, re-put supersede, pickle fallback), cross-process contract tests (producer/consumer as separate instances over one backend — the real disagg topology), andabort-under-lease-defer / re-put-with-failed-remove edge cases. One@unittest.expectedFailuredocuments the cross-process-abort-under-lease-defer tombstone gap (closes with the shared metadata index follow-up).🤖 Generated with Claude Code