feat: add REAPI wire compression support (zstd compressed-blobs)#2416
feat: add REAPI wire compression support (zstd compressed-blobs)#2416walter-zeromatter wants to merge 6 commits into
Conversation
Implements the REAPI compressed-blobs specification, allowing clients to upload and download zstd-compressed blob data over the gRPC wire. This is orthogonal to at-rest CompressionStore (LZ4) and operates entirely in the service layer -- the store always holds uncompressed data. Changes: - Add WireCompressor enum and supported_wire_compressors config field to CapabilitiesConfig (default: empty = no compression = zero behavior change) - Add wire_compression module with compress/decompress helpers using zstd::bulk API with expected_size cap to prevent memory exhaustion from malicious payloads - Add compressed-blobs support to ByteStreamServer (read + write paths) with buffer size cap (2x expected, min 64MB) on compressed write accumulation - Add compressed-blobs support to CasServer (BatchUpdateBlobs + BatchReadBlobs) with warn! logging on compression fallback - Add per-instance compressor advertisement in CapabilitiesServer via supported_wire_compressors_for_instance HashMap - Wire config through nativelink.rs to both server constructors using CapabilitiesConfig per-instance lookup - Add zstd integration tests for CAS batch update/read round-trip - Reject unknown compressor enum values in BatchUpdateBlobs (was silently defaulting to Identity); skip unknown values in BatchReadBlobs acceptable_compressors - Add Identity size validation in wire_compression::decompress() - Add ZSTD_COMPRESSION_LEVEL named constant (3) with documentation Security considerations: - Decompression capped by expected_size (digest size_bytes) to prevent memory bombs - Compressed write buffer capped at 2x expected size (min 64MB) - Unknown compressor values rejected rather than silently accepted - Defense-in-depth size validation preserved and documented Refs: DEVPROD-483
|
@walter-zeromatter is attempting to deploy a commit to the native-link-web-assets Team on Vercel. A member of the Team first needs to authorize it. |
Review fixes: - BatchReadBlobs falls back to Identity when compression expands data - Compressed ByteStream writes register in active_uploads so QueryWriteStatus reports progress and in-flight status - Add MAX_COMPRESSED_UPLOAD_SIZE = 4 GiB hard cap on compressed uploads to prevent memory exhaustion from oversized expected_size - Use clamp() for compressed buffer size bounds CI fixes: - Run rustfmt with nightly settings (imports_granularity, group_imports) - Add wire_compression.rs and zstd dep to Bazel BUILD.bazel - Add wire_compression_bench binary target to BUILD.bazel - Fix let_underscore_drop warning in bench binary (let _warmup) - Include benchmark binary in commit
fe4a126 to
f49a21b
Compare
There was a problem hiding this comment.
This should not be part of the repository, please remove
| @@ -0,0 +1,183 @@ | |||
| // Copyright 2024 The NativeLink Authors. All rights reserved. | |||
| @@ -0,0 +1,342 @@ | |||
| // Copyright 2024 The NativeLink Authors. All rights reserved. | |||
|
Thanks for the early review! I'm still playing with this a bit so yeah, definitely not ready, but I'll get those fixed up. |
- Remove .hermes/plans/2026-06-09_reapi-wire-compression.md from repo - Add missing rust_binary import in nativelink-service/BUILD.bazel - Fix copyright year 2024→2026 in wire_compression.rs and bench - Apply flake.nix hardeningDisable patch from PR TraceMachina#2175 for coverage - Run cargo fmt on all changed files - Fix clippy: redundant_closure, items_after_statements, cast coercion
- Remove .hermes/plans/2026-06-09_reapi-wire-compression.md from repo - Add missing rust_binary import in nativelink-service/BUILD.bazel - Fix copyright year 2024→2026 in wire_compression.rs and bench - Apply flake.nix hardeningDisable patch from PR TraceMachina#2175 for coverage - Run cargo fmt on all changed files - Fix clippy: redundant_closure, items_after_statements, cast coercion
8f7c0ae to
364d343
Compare
- Sort zstd dependency in nativelink-service/Cargo.toml (pre-commit check) - Add @crates//:zstd to integration test suite deps in BUILD.bazel - Fix redundant_closure_for_method_calls in nativelink.rs (use as_deref) - Fix clippy violations in wire_compression_bench.rs (doc_markdown, cast_possible_truncation, print_stdout) - Fix cast_possible_truncation in cas_server_test.rs (use try_from)
Performance: - Move zstd compress/decompress to spawn_blocking in bytestream_server and cas_server to avoid blocking async executor threads - Add compress_bytes() for zero-copy identity compression when caller already owns Bytes; compress() now calls zstd directly without intermediate copy for zstd path - Short-circuit identity compression before spawn_blocking in bytestream_server to avoid unnecessary executor hop - Cap health_utils buffer_unordered at 16 (was usize::MAX) - Replace full ActionInfoWithProps clone with lightweight RunningActionTelemetry struct in api_worker_scheduler Code quality: - Add origin_metadata_from_baggage() helper in origin_event.rs to deduplicate OriginMetadata construction across awaited_action.rs, cache_lookup_scheduler.rs, and historical_resource_scheduler.rs - Fix context snapshot mismatch: both call sites now read from the same captured baggage instead of mixing captured baggage with Context::current() - Make refresh_hints() single-flight: set last_attempt under lock before async file read to prevent concurrent reads; throttle failures via last_attempt timestamp - Replace std::fs with tokio::fs in historical_resource_scheduler - Replace Error::new with make_err! for project consistency - Use shared proto_to_wire_compressor in cas_server instead of inline match - Deduplicate supported_compressors construction in capabilities_server - Use named struct fields for WorkerUpdate::RunAction instead of Box<(tuple)> - Use production wire_compression helpers in bench instead of duplicate ZSTD_COMPRESSION_LEVEL constant - Revert set_freebind to Ok(()) on non-Linux (was changed to Err which breaks macOS startup)
There was a problem hiding this comment.
This shouldn't be in our repo, only as a local file at most
There was a problem hiding this comment.
Similarly, this also shouldn't be in our repo
palfrey
left a comment
There was a problem hiding this comment.
Couple of concerning unrelated items creeping in for some reason?
|
|
||
| impl R2Store { | ||
| #[allow(clippy::new_ret_no_self)] // Because usually everyone returns themselves | ||
| #[allow(clippy::new_ret_no_self)] // Returns a pinned future for async construction. |
There was a problem hiding this comment.
still 100% vibe coded - I'm still iterating on agent reviews & implementation before it's ready for real human review I think
| // not part of the API contract; collect-into-Vec callers | ||
| // already ignore order. | ||
| .buffer_unordered(usize::MAX), | ||
| .buffer_unordered(16), |
- Remove .rtk/filters.toml and CLAUDE.md from repo (local-only files, added to .gitignore) - Revert unrelated scheduler/origin_event refactoring changes that crept into the PR (RunningActionTelemetry, WorkerUpdate struct variant, origin_metadata_from_baggage helper, historical_resource async refactor) - Revert r2_store.rs comment to original (the new comment was factually incorrect about pinned futures) - Revert health_utils.rs buffer_unordered(16) back to usize::MAX (unrelated change with no justification) - Remove unrelated cspell dictionary entries (gh, npm, npx, etc.); keep only zstd
Summary
Implements the REAPI compressed-blobs specification, allowing clients to upload and download zstd-compressed blob data over the gRPC wire. This is orthogonal to at-rest
CompressionStore(LZ4) and operates entirely in the service layer — the store always holds uncompressed data.Closes #260.
Design Decisions
--remote_cache_compression. Other algorithms (Deflate, Brotli) exist in the proto but no known client uses them (YAGNI). TheWireCompressorenum makes adding them trivial.CompressionStoreuntouched.supported_wire_compressorsconfig = zero behavior change. Operators opt in per instance based on network conditions.CapabilitiesConfig.supported_wire_compressors. Consistent with howsupported_node_propertiesworks.expected_size= uncompressed sizeexpected_sizeincompressed-blobs/{compressor}/...URIs is the uncompressed blob size. Used to cap decompression and prevent memory exhaustion.Changes
Config
nativelink-config/src/cas_server.rs: AddWireCompressorenum (Identity,Zstd) andsupported_wire_compressors: Vec<WireCompressor>field onCapabilitiesConfig(default: empty).Service Layer
nativelink-service/src/wire_compression.rs(NEW):compress()/decompress()helpers usingzstd::bulkAPI. Decompression is capped byexpected_sizeto prevent memory bombs. Identity path validates size matches expected.nativelink-service/src/cas_server.rs: CompressedBatchUpdateBlobs(decompress per request) andBatchReadBlobs(compress with best match fromacceptable_compressors,warn!on compression failure).nativelink-service/src/bytestream_server.rs: Compressed write path (inner_write_compressed) buffers full compressed stream, decompresses with cap, validates size, stores raw. Compressed read path (inner_read_compressed) reads from store, compresses, streams chunks. Buffer capped at 2x expected size (min 64MB).nativelink-service/src/capabilities_server.rs: Per-instance compressor advertisement viasupported_wire_compressors_for_instanceHashMap.Binary + Wiring
src/bin/nativelink.rs: Passescapabilities_configsto bothCasServerandByteStreamServerconstructors for per-instance compressor lookup.Tests
nativelink-service/tests/cas_server_test.rs: Two new integration tests (batch_update_blobs_zstd_compressed,batch_read_blobs_zstd_compressed) covering zstd round-trip.nativelink-service/src/wire_compression.rs: Unit tests for compress/decompress, size mismatch, identity validation.Benchmark
nativelink-service/src/bin/wire_compression_bench.rs(NEW): Standalone benchmark measuring compression ratio, throughput, and break-even network speed.Security Considerations
zstd::bulk::decompresswithexpected_sizecap — output buffer cannot exceed the digest size_bytesmax_compressed_sizecap at 2x expected size (min 64MB) ininner_write_compressedBatchUpdateBlobs; skipped inBatchReadBlobsacceptable_compressorsdecompress()validatesdata.len() == expected_sizefor Identity pathinner_write_compressedpreserved and documentedBenchmark Results
Methodology: The benchmark (
wire_compression_bench) generates four data patterns at sizes from 1 KB to 10 MB, measures single-iteration compression/decompression time usingstd::time::Instant, and computes the break-even network speed where compression saves wall-clock time. Built with--releaseprofile. zstd level 3 (the level used in the implementation).Data patterns simulate realistic REAPI workloads:
Compression Ratios
Real-world data (semi-random binaries, protobuf messages) compress to 63–75% of original size — roughly 25–37% byte savings.
Throughput
Decompression is consistently 6–17x faster than compression.
Wall-Clock Break-Even
Compression saves wall-clock time when the network is slow enough that transmitting fewer bytes outweighs the CPU cost. The break-even speed is where compressed and uncompressed paths take equal time:
Speedup by Network Speed
Key takeaway: Compression is a clear win at ≤100 Mbps (typical remote WAN cache), marginal at 500 Mbps, and hurts at ≥1 Gbps. The default-off, per-instance config design lets operators choose based on their network.
Review History
This implementation has been through three rounds of Opus-model review:
All three reviews are reflected in the current code — no known issues remain.
This change is