perf(mem_wal): parallelize fresh-tier source planning and execution#7257
Merged
hamersaw merged 2 commits intoJun 16, 2026
Merged
Conversation
The LSM FTS and vector search planners built each source's plan in a sequential `for` loop and unioned the arms under a single `SortPreservingMergeExec`. The merge polls every union arm from one task, so per-arm CPU (posting/index decode, BM25 and distance scoring) serialized even though the underlying IO awaits interleave — wall time grew linearly with the flushed-generation count. Build the per-source plans concurrently with `try_join_all`, run the cross-source block-list PK hashing concurrently, and wrap the union in a round-robin `RepartitionExec` (`spawn_union_arms`) so each arm gets its own driver task. Rows stay disjoint across partitions, so the per-partition TopK + sort-preserving merge semantics are unchanged. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
026dcde to
b4b6bbf
Compare
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
LuQQiu
reviewed
Jun 15, 2026
| ) -> lance_core::Result<std::sync::Arc<dyn datafusion::physical_plan::ExecutionPlan>> { | ||
| use datafusion::physical_plan::repartition::RepartitionExec; | ||
| let n = union.properties().partitioning.partition_count(); | ||
| let repart = RepartitionExec::try_new( |
Contributor
There was a problem hiding this comment.
RepartitionExec may be a little heavy to solve concurrency issue? it's shuffle
does it improve the result a lot?
If we already do per source topK (seems like, could you confirm?), the final result candidates may not be that many? and can use SortExec directly?
Contributor
Author
There was a problem hiding this comment.
Your right, the RepartitionExec is a bit heavy for what we actually want to achieve here. I removed this and am now relying on the underlying constructs to give us execution parallelism.
The fresh-tier FTS and vector planners wrapped the per-source union in a round-robin RepartitionExec to give each arm its own driver task. That fan-out is redundant: the downstream SortPreservingMergeExec already spawns one task per input partition (one per union arm) via spawn_buffered on the multi-thread runtime, and the heavy per-arm CPU (IVF_HNSW partition search, BM25/WAND scoring) already runs on the CPU pool via spawn_cpu at the leaf. The extra exchange only added a channel hop and a round-robin reshuffle for no measurable gain. Remove spawn_union_arms and return the UnionExec directly. The concurrent per-source plan building (try_join_all) is unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
LuQQiu
approved these changes
Jun 16, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
The LSM FTS and vector search planners (
LsmFtsSearchPlanner,LsmVectorSearchPlanner) built each source's plan in a sequentialforloop and unioned the arms under a singleSortPreservingMergeExec. The merge polls every union arm from one task, so per-arm CPU (posting/index decode, BM25 and distance scoring) serialized even though the underlying IO awaits interleave — wall time grew linearly with the flushed-generation count.This:
try_join_all(FTS + vector),block_list.rs),RepartitionExecvia a newspawn_union_armshelper so each arm gets its own driver task.Rows stay disjoint across partitions, so the per-partition TopK + sort-preserving merge semantics are unchanged.
Changes
scanner/exec.rs:spawn_union_armshelper (round-robin repartition over the union).scanner/fts_search.rs,scanner/vector_search.rs: concurrent per-source plan builds +spawn_union_armsover the union.scanner/block_list.rs: concurrent flushed-generation PK-hash loads.Validation
Validated end-to-end against a WAL FTS benchmark on minikube with object storage behind a 10ms/GET latency proxy. Read latency over a fresh tier as a function of flushed-generation count, p50:
Per-generation slope dropped from ~290ms/gen to ~12ms/gen.
🤖 Generated with Claude Code