[Feature] Switch from numpy void() to frombuffer()#984
Conversation
📝 WalkthroughWalkthroughThis PR changes HDF5 persistence to store cloudpickle byte streams as gzip-compressed ChangesHDF5 Serialization Format Upgrade
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
for more information, see https://pre-commit.ci
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #984 +/- ##
=======================================
Coverage 94.24% 94.24%
=======================================
Files 39 39
Lines 2119 2119
=======================================
Hits 1997 1997
Misses 122 122 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/executorlib/standalone/hdf.py (1)
39-43: ⚡ Quick winExtract shared pickle/HDF encode-decode helpers to prevent format drift.
The serialization/deserialization expression is duplicated across many sites; centralizing it will make future format changes safer.
Proposed refactor
@@ import cloudpickle import h5py import numpy as np @@ +def _serialize_to_uint8(value: Any) -> np.ndarray: + return np.frombuffer(cloudpickle.dumps(value), dtype=np.uint8) + + +def _deserialize_from_key(hdf: h5py.File, key: str) -> Any: + return cloudpickle.loads(hdf[f"/{key}"][()].tobytes()) + + def dump(file_name: Optional[str], data_dict: dict) -> None: @@ fname.create_dataset( name="/" + group_dict[data_key], - data=np.frombuffer( - cloudpickle.dumps(data_value), dtype=np.uint8 - ), + data=_serialize_to_uint8(data_value), compression="gzip", ) @@ - data_dict["fn"] = cloudpickle.loads(hdf["/function"][()].tobytes()) + data_dict["fn"] = _deserialize_from_key(hdf, "function") @@ - data_dict["args"] = cloudpickle.loads(hdf["/input_args"][()].tobytes()) + data_dict["args"] = _deserialize_from_key(hdf, "input_args")Also applies to: 59-79, 97-99, 126-126, 144-144, 230-230
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/executorlib/standalone/hdf.py` around lines 39 - 43, Replace the repeated serialization/deserialization expression (cloudpickle.dumps(...) wrapped with np.frombuffer(..., dtype=np.uint8) and compression="gzip") with two shared helpers—e.g., encode_pickle_for_hdf(obj) that returns the uint8 ndarray ready to write to HDF and returns any needed metadata, and decode_pickle_from_hdf(uint8_array) that calls cloudpickle.loads on the buffer when reading; update every site currently doing cloudpickle.dumps + np.frombuffer + compression="gzip" (the duplicated expression) to call encode_pickle_for_hdf when writing and decode_pickle_from_hdf when reading so all places use a single implementation and a single compression/format contract.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/executorlib/standalone/hdf.py`:
- Around line 39-43: Replace the repeated serialization/deserialization
expression (cloudpickle.dumps(...) wrapped with np.frombuffer(...,
dtype=np.uint8) and compression="gzip") with two shared helpers—e.g.,
encode_pickle_for_hdf(obj) that returns the uint8 ndarray ready to write to HDF
and returns any needed metadata, and decode_pickle_from_hdf(uint8_array) that
calls cloudpickle.loads on the buffer when reading; update every site currently
doing cloudpickle.dumps + np.frombuffer + compression="gzip" (the duplicated
expression) to call encode_pickle_for_hdf when writing and
decode_pickle_from_hdf when reading so all places use a single implementation
and a single compression/format contract.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 4d5ec173-654e-4ee6-90a8-c48f7fcbfa04
📒 Files selected for processing (1)
src/executorlib/standalone/hdf.py
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tests/unit/standalone/test_hdf_backwards.py`:
- Around line 76-86: The test creates a cache file named via file_name =
os.path.join(cache_directory, "test_mixed_i.h5") but later calls
get_future_from_cache(..., cache_key="test_mixed"), causing a mismatch; update
either the dump call or the cache_key so they match (e.g. change the filename
stem to "test_mixed.h5" or change cache_key to "test_mixed_i") so dump(...) and
get_future_from_cache(...) refer to the same cache key/file when using the dump
and get_future_from_cache functions.
- Around line 34-38: The code is suppressing all ValueError around
fname.create_dataset which can hide real serialization/write errors; instead,
check for an existing dataset name before creating to only skip duplicates:
construct the target name as name = "/" + group_dict[data_key], then if name not
in fname call fname.create_dataset(name=name,
data=np.void(cloudpickle.dumps(data_value))); otherwise skip (or optionally log)
so only true duplicate cases are avoided and other ValueErrors still surface.
Reference: contextlib.suppress, fname.create_dataset, np.void,
cloudpickle.dumps, group_dict, data_key.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3d10f469-e13f-4af7-bf87-8a8d3a8b113e
📒 Files selected for processing (2)
src/executorlib/standalone/hdf.pytests/unit/standalone/test_hdf_backwards.py
🚧 Files skipped from review as they are similar to previous changes (1)
- src/executorlib/standalone/hdf.py
| with contextlib.suppress(ValueError): | ||
| fname.create_dataset( | ||
| name="/" + group_dict[data_key], | ||
| data=np.void(cloudpickle.dumps(data_value)), | ||
| ) |
There was a problem hiding this comment.
Avoid suppressing all ValueError in legacy dump helper; it can hide real write/serialization failures.
Line 34 currently suppresses any ValueError, not just duplicate-dataset cases. In these compatibility tests, that can mask broken fixture creation and produce misleading downstream failures.
Suggested fix
- with contextlib.suppress(ValueError):
- fname.create_dataset(
- name="/" + group_dict[data_key],
- data=np.void(cloudpickle.dumps(data_value)),
- )
+ dataset_name = "/" + group_dict[data_key]
+ if dataset_name in fname:
+ continue
+ fname.create_dataset(
+ name=dataset_name,
+ data=np.void(cloudpickle.dumps(data_value)),
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| with contextlib.suppress(ValueError): | |
| fname.create_dataset( | |
| name="/" + group_dict[data_key], | |
| data=np.void(cloudpickle.dumps(data_value)), | |
| ) | |
| dataset_name = "/" + group_dict[data_key] | |
| if dataset_name in fname: | |
| continue | |
| fname.create_dataset( | |
| name=dataset_name, | |
| data=np.void(cloudpickle.dumps(data_value)), | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/unit/standalone/test_hdf_backwards.py` around lines 34 - 38, The code
is suppressing all ValueError around fname.create_dataset which can hide real
serialization/write errors; instead, check for an existing dataset name before
creating to only skip duplicates: construct the target name as name = "/" +
group_dict[data_key], then if name not in fname call
fname.create_dataset(name=name, data=np.void(cloudpickle.dumps(data_value)));
otherwise skip (or optionally log) so only true duplicate cases are avoided and
other ValueErrors still surface. Reference: contextlib.suppress,
fname.create_dataset, np.void, cloudpickle.dumps, group_dict, data_key.
| file_name = os.path.join(cache_directory, "test_mixed_i.h5") | ||
| a = 1 | ||
| b = 2 | ||
| dump( | ||
| file_name=file_name, | ||
| data_dict={"fn": my_funct, "args": [a], "kwargs": {"b": b}}, | ||
| ) | ||
| future = get_future_from_cache( | ||
| cache_directory=cache_directory, | ||
| cache_key="test_mixed", | ||
| ) |
There was a problem hiding this comment.
test_get_future_from_file uses inconsistent cache filename/key and will fail deterministically.
Line 76 writes test_mixed_i.h5, but Line 85 queries cache_key="test_mixed" (which resolves to a different cache file stem). This makes the test assert the happy path while setting up a missing-file path.
Suggested fix
- file_name = os.path.join(cache_directory, "test_mixed_i.h5")
+ file_name = os.path.join(cache_directory, "test_mixed.h5")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| file_name = os.path.join(cache_directory, "test_mixed_i.h5") | |
| a = 1 | |
| b = 2 | |
| dump( | |
| file_name=file_name, | |
| data_dict={"fn": my_funct, "args": [a], "kwargs": {"b": b}}, | |
| ) | |
| future = get_future_from_cache( | |
| cache_directory=cache_directory, | |
| cache_key="test_mixed", | |
| ) | |
| file_name = os.path.join(cache_directory, "test_mixed.h5") | |
| a = 1 | |
| b = 2 | |
| dump( | |
| file_name=file_name, | |
| data_dict={"fn": my_funct, "args": [a], "kwargs": {"b": b}}, | |
| ) | |
| future = get_future_from_cache( | |
| cache_directory=cache_directory, | |
| cache_key="test_mixed", | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/unit/standalone/test_hdf_backwards.py` around lines 76 - 86, The test
creates a cache file named via file_name = os.path.join(cache_directory,
"test_mixed_i.h5") but later calls get_future_from_cache(...,
cache_key="test_mixed"), causing a mismatch; update either the dump call or the
cache_key so they match (e.g. change the filename stem to "test_mixed.h5" or
change cache_key to "test_mixed_i") so dump(...) and get_future_from_cache(...)
refer to the same cache key/file when using the dump and get_future_from_cache
functions.
p.void is a fixed-size raw byte scalar. Its size is stored internally using NumPy’s npy_intp / Python Py_ssize_t-like sizing, but in practice some NumPy scalar/array paths still hit a ~2 GiB signed 32-bit limit (2**31 - 1) for a single element / scalar buffer. So yes: the limit you are hitting is plausibly on the order of 2 GB, not a cloudpickle limit.
For large pickles, don’t store the whole pickle as one np.void. Store it as a byte array dataset instead:
Read it back:
Summary by CodeRabbit
Bug Fixes
Chores
Tests