Skip to content

[DataFlow runtime · M5 3/4] Durable recovery (SQLite + restart reconcile) + fault/soak gates#608

Open
maocheng23 wants to merge 25 commits into
mainfrom
dataflow-up-11-m5-recovery
Open

[DataFlow runtime · M5 3/4] Durable recovery (SQLite + restart reconcile) + fault/soak gates#608
maocheng23 wants to merge 25 commits into
mainfrom
dataflow-up-11-m5-recovery

Conversation

@maocheng23

Copy link
Copy Markdown
Collaborator

Adds SQLiteMetadataStore (WAL, synchronous=FULL) + all_committed_ids + reconcile_on_restart (B4), plus the M5 exit-gate fault-injection + sustained-lag soak tests.

Part of the DataFlow runtime M5/M6 stacked series (continues the M1–M4 work in #594#601 / #603). Stacked PRs — merge bottom-up (up-9 first). Lint (pre-commit) + runtime CPU test suite green.

🤖 Generated with Claude Code

maocheng23 and others added 6 commits June 26, 2026 20:48
…oncile (B4)

The recovery floor for at-least-once-with-idempotent-effects. SQLiteMetadataStore
persists committed SampleRefs (JSON, nested FeatureSpec round-tripped) and the
single {acked sample_ids, global_step, optimizer_durable} transaction. Release
state is *derived* from that marker on restart, never stored separately, so it
can't disagree with the optimizer step.

DataFlowController.reconcile_on_restart() rebuilds the queue from the durable
marker per the B4 table: acked+stepped -> released (idempotent free);
committed-unacked -> replay/requeue; never-committed -> rollout retries on id.

Gate: test_controller_dies_between_ack_and_release — crash in the after-ack,
before-release window yields no duplicate-train and no data loss.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
CPU-only, drive the real loop (store + backpressure + queue + durable ack):
- fault injection per testing/fault_injection.md: rollout dies before/after
  write, put-over-budget atomicity, get-missing-key terminal failure, idempotent
  release/abort cleanup, offline bad-dtype + mixed-schema rejection, control
  plane stays tensor-free on failure paths
- test_rollout_outruns_trainer_stays_bounded: 4:1 producer:consumer, residency
  never exceeds the hard cap, backpressure engages
- test_sustained_lag_soak_drains_to_baseline: trainer 5x behind stays under
  capacity, then drains to exact baseline (no leak)

M5 exit gate (leak + soak + B4 recovery) is GPU-free and green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…/B9)

Framework-first M6: build the seams a real Mooncake/RDMA backend slots behind,
contract-tested on CPU now (no multi-node infra required).

- SharedDirFeatureStore: a disaggregated FeatureStore over a shared directory.
  Producer and consumer are separate processes sharing only the dir; get()
  resolves from the ref + filesystem alone (true cross-process boundary), control
  plane still moves only SampleRef metadata. A real MooncakeFeatureStore swaps
  the shared-dir transport for RDMA behind this same API.
- B5 (no use-after-free): get() after release/abort raises; a generation guard
  rejects a stale ref after re-put; clone-on-fetch is the default.
- B9 (auth in disaggregated mode): AuthPolicy shared-secret gate at attach time
  and on the data path; missing/mismatched token is a PermissionError.
- Resharding contract: SampleRefQueue.get(partition=(index, num)) re-partitions a
  stable committed pool by a consumer-side hash of sample_id, so the same pool
  redistributes when DP width changes — no sample leased twice or dropped.

Numerical resharding equivalence (tp>1 & sp>1, >=4 ranks) is the GPU gate, added
next. Real RDMA Mooncake backend + cross-node deploy need infra not available
here; the seam + contract are locked down.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Spawns a 4-process tp2 x sp2 group; on each rank runs one offline EAGLE3 step
through both the legacy path and the new TrainerCore/strategy/FSDP-backend path
on identical USP-sharded data, asserting per-rank loss equivalence + grad-norm
reduction parity. This is the falsifiable scale-out gate (not FSDP-only).

Adds _fixtures.init_rank_distributed for multi-process TP x SP group setup.
Runs on the 4xH200 pod via rcli.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@maocheng23 maocheng23 requested a review from FrankLeeeee as a code owner June 28, 2026 00:33
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

maocheng23 and others added 15 commits June 28, 2026 20:40
…ase_partition helpers

No behavior change: get() now reads as reclaim -> lease(any|partition) -> wait.
Clarifies that partition_key (reserved producer-side hint) and partition
(consumer-side resharding control) are unrelated.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…lease (B5)

Replace the {sid}.ckpt + .gen-sidecar / two-step-publish design with a
per-generation filename {sid}.g{gen}.ckpt, a single atomic publish, and a
generation-aware release()/_free_gen_locked so a stale handle can never free a
freshly re-put generation. Generation is derived from disk (monotonic across
instances) instead of a per-process counter. Includes retain_on_release
(offline re-iterable mode) and the stale-reput cross-instance regression test.

This folds the disagg-correctness hardening down into the seam PR so #609 ships
a correct SharedDirFeatureStore rather than one fixed two PRs later.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Adds the consumer/producer assembly for the M6 disaggregation seam
(SharedDirFeatureStore), plus a runnable 2-node example:

- launch.py: build_disagg_eagle3_runtime (consumer side) + factor the shared
  offline trainer assembly out of build_offline_eagle3_runtime, so colocated and
  disaggregated paths produce byte-identical batches/training.
- data_plane/disagg_ingest.py: ingest_offline_features (producer: load .ckpt ->
  SharedDirFeatureStore.put) + JSON ref-manifest (the tensor-free metadata bridge
  between pools; asserts the no-tensor invariant).
- examples/disagg/: run_disagg_eagle3.py (role-branched producer/consumer driver),
  run_qwen2.5_7b_eagle3_disagg.sh (rcli --per-node wrapper), README.
- tests/test_runtime/test_disagg_launch.py: CPU bit-exact differential (disagg
  store serves identical tensors to the colocated path; manifest round-trips
  tensor-free; B9 auth) + a GPU FSDP train smoke.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The thin launchers skip sanity_check(); the train_eagle3 builders read
args.target_batch_size/dp_size which only sanity_check derives. Call it on the
consumer after init_distributed (it needs the process group). Also wire
chat-template/cache-dir/learning-rate into the rcli wrapper.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Thread log_interval through build_offline/build_disagg_eagle3_runtime (default
50) so the example can emit a finer training curve; driver logs every 25 steps.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Offline training re-iterates the ref set across epochs, but SharedDirFeatureStore
consume-once-frees on release() -> epoch 2 get() raised KeyError. Add
retain_on_release (read-only mode): release() drops the lease but keeps the file,
mirroring LocalFeatureStore's file:// no-op release. The disagg consumer sets it;
online rollout keeps consume-once (default False). Whole-store cleanup at run end.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
DISAGG_ROLE=colocated runs the SAME model build + assembly via
build_offline_eagle3_runtime (LocalFeatureStore), so disagg vs colocated can be
compared on identical features/seed. Factored the shared model/optimizer build.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Disagg consumer vs colocated baseline on Qwen2.5-7B (identical features/seed):
training metrics (acceptance_rate/ploss/acc) match to ~5 sig figs; residual is
GPU floating-point noise, not the transport.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Lint-only: formats the files this PR adds/changes; no behavior change. The shell
wrapper is marked executable (check-shebang-scripts-are-executable).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…isaggregation

Adds the M6 fast-path FeatureStore backend named by disaggregated.py: backs the
data plane with the Mooncake distributed object store (cross-node RDMA zero-copy)
behind the unchanged FeatureStore API, so producer put() on one node and consumer
get() on another move bytes peer-to-peer instead of via a shared mount.

- One hard-pinned Mooncake object per sample (torch.save blob with embedded
  generation) so Mooncake's cache-LRU never silently evicts a committed-unacked
  feature; SpecForge is the sole lifetime authority via explicit remove().
- Carries the contract: B5 no-use-after-free (KeyError after release/abort,
  generation guard on re-put, clone-on-fetch), B9 shared-secret auth, consume-once
  free, retain_on_release for re-iterable offline epochs, max_resident_bytes
  backpressure, and max-hold gc.
- Re-instates the fallible-free retry seam SharedDirFeatureStore dropped: a failed
  remote remove() parks in _release_pending and gc() retries (Mooncake remove is a
  real RPC). Generation-aware lease accounting (a stale lease never pins the
  current generation).
- store= is injectable; mooncake is imported lazily, so the data plane imports
  without the package and the contract is unit-tested against an in-memory fake.

Scope: offline single-consumer path (in-process gen/lease index, mirroring
SharedDirFeatureStore's documented single-host limitation). Online multi-node
needs a shared metadata index — separate follow-up. Real RDMA e2e test gated on
the mooncake package; 15 fake-backed contract tests run on CPU.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…=mooncake)

The disagg example can now route features through MooncakeFeatureStore instead of
the shared mount: _store() builds the backend from DISAGG_BACKEND (default
shared_dir) + MOONCAKE_* env. Because a Mooncake object lives in the producer's
memory segment, the producer holds open until the consumer writes
<manifest>.consumed (or DISAGG_PRODUCER_HOLD_S elapses); shared_dir is unchanged.
Documents the backend switch + caveats in the README and the .sh wrapper (opt-in,
commented).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…NT_SIZE

The default 1 GiB segment only fits tiny feature sets; expose
MOONCAKE_GLOBAL_SEGMENT_SIZE / MOONCAKE_LOCAL_BUFFER_SIZE so the contributed
store memory can hold the hard-pinned feature set for a real run.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…deferred remove

Mooncake's remove() is lease-deferred (objects keep a short read-lease), so the
bytes can linger after release/abort and a just-read object's is_exist() stays 1
within the lease window — breaking the B5 'get after release raises' contract
that SharedDir/Local guarantee synchronously. Track (sample_id, generation)
freed in-process and reject get() of a freed ref immediately, while physical
reclamation stays lease-deferred / gc-retried. Adds a lease-defer regression
test (fake remove() reports success but keeps the object).

Found via real 2-node Mooncake e2e on sci-h200.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ility, reconcile gate, USP 4-rank test

- LocalFeatureStore: gc() counts freed_bytes on a successful release-pending
  retry; release() sid hoist + comment; get() lock-scope comment.
- SQLiteMetadataStore: synchronous=FULL; all_committed_ids ORDER BY rowid.
- controller.reconcile_on_restart gates release on optimizer_durable (not
  global_step is not None).
- test_equiv_4rank uses attention_backend='usp' + a flash-attn skip guard.
- regression tests in test_feature_store / test_recovery.

(The disaggregated.py per-generation rewrite that was previously bundled here now
lives in #609 alongside the SharedDirFeatureStore it hardens.)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Base automatically changed from dataflow-up-10-m5-backpressure to main June 29, 2026 13:39
Comment thread flow_compare.md

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Del this?

jiapingW added 2 commits June 29, 2026 23:56
[DataFlow runtime · M6 1/4] Disaggregation seam: SharedDirFeatureStore + resharding + auth
[DataFlow runtime · M6 2/4] Disaggregated offline EAGLE3 example + build_disagg_eagle3_runtime
jiapingW added 2 commits June 30, 2026 00:05
[DataFlow runtime · M6 3/4] M5/M6 adversarial-review hardening
[DataFlow runtime · M6 4/4] MooncakeFeatureStore — RDMA fast-path backend
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants