Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions src/executorlib/standalone/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ def dump(file_name: Optional[str], data_dict: dict) -> None:
with contextlib.suppress(ValueError):
fname.create_dataset(
name="/" + group_dict[data_key],
data=np.void(cloudpickle.dumps(data_value)),
data=np.frombuffer(
cloudpickle.dumps(data_value), dtype=np.uint8
),
compression="gzip",
)


Expand All @@ -55,26 +58,26 @@ def load(file_name: str) -> dict:
with h5py.File(file_name, "r") as hdf:
data_dict = {}
if "function" in hdf:
data_dict["fn"] = cloudpickle.loads(np.void(hdf["/function"]))
data_dict["fn"] = cloudpickle.loads(hdf["/function"][()].tobytes())
else:
raise TypeError("Function not found in HDF5 file.")
if "input_args" in hdf:
data_dict["args"] = cloudpickle.loads(np.void(hdf["/input_args"]))
data_dict["args"] = cloudpickle.loads(hdf["/input_args"][()].tobytes())
else:
data_dict["args"] = ()
if "input_kwargs" in hdf:
data_dict["kwargs"] = cloudpickle.loads(np.void(hdf["/input_kwargs"]))
data_dict["kwargs"] = cloudpickle.loads(hdf["/input_kwargs"][()].tobytes())
else:
data_dict["kwargs"] = {}
if "resource_dict" in hdf:
data_dict["resource_dict"] = cloudpickle.loads(
np.void(hdf["/resource_dict"])
hdf["/resource_dict"][()].tobytes()
)
else:
data_dict["resource_dict"] = {}
if "error_log_file" in hdf:
data_dict["error_log_file"] = cloudpickle.loads(
np.void(hdf["/error_log_file"])
hdf["/error_log_file"][()].tobytes()
)
return data_dict

Expand All @@ -93,9 +96,9 @@ def get_output(file_name: str) -> tuple[bool, bool, Any]:
def get_output_helper(file_name: str) -> tuple[bool, bool, Any]:
with h5py.File(file_name, "r") as hdf:
if "output" in hdf:
return True, True, cloudpickle.loads(np.void(hdf["/output"]))
return True, True, cloudpickle.loads(hdf["/output"][()].tobytes())
elif "error" in hdf:
return True, False, cloudpickle.loads(np.void(hdf["/error"]))
return True, False, cloudpickle.loads(hdf["/error"][()].tobytes())
else:
return False, False, None

Expand All @@ -122,7 +125,7 @@ def get_runtime(file_name: str) -> float:
"""
with h5py.File(file_name, "r") as hdf:
if "runtime" in hdf:
return cloudpickle.loads(np.void(hdf["/runtime"]))
return cloudpickle.loads(hdf["/runtime"][()].tobytes())
else:
return 0.0

Expand All @@ -140,7 +143,7 @@ def get_queue_id(file_name: Optional[str]) -> Optional[int]:
if file_name is not None and os.path.exists(file_name):
with h5py.File(file_name, "r") as hdf:
if "queue_id" in hdf:
return cloudpickle.loads(np.void(hdf["/queue_id"]))
return cloudpickle.loads(hdf["/queue_id"][()].tobytes())
return None


Expand Down Expand Up @@ -230,7 +233,7 @@ def _get_content_of_file(file_name: str) -> dict:
"""
with h5py.File(file_name, "r") as hdf:
return {
key: cloudpickle.loads(np.void(hdf["/" + key]))
key: cloudpickle.loads(hdf["/" + key][()].tobytes())
for key in group_dict.values()
if key in hdf
}
211 changes: 211 additions & 0 deletions tests/unit/standalone/test_hdf_backwards.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
import contextlib
import os
import shutil
import unittest
from concurrent.futures import Future
from typing import Optional

import numpy as np


try:
from executorlib.standalone.hdf import (
load,
get_output,
get_runtime,
get_queue_id,
get_future_from_cache,
group_dict,
)

import h5py
import cloudpickle

def dump(file_name: Optional[str], data_dict: dict) -> None:
"""
Previous dump function just copied here for backwards compatibility.
"""
if file_name is not None:
file_name_abs = os.path.abspath(file_name)
os.makedirs(os.path.dirname(file_name_abs), exist_ok=True)
with h5py.File(file_name_abs, "a") as fname:
for data_key, data_value in data_dict.items():
if data_key in group_dict:
with contextlib.suppress(ValueError):
fname.create_dataset(
name="/" + group_dict[data_key],
data=np.void(cloudpickle.dumps(data_value)),
)
Comment on lines +34 to +38

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid suppressing all ValueError in legacy dump helper; it can hide real write/serialization failures.

Line 34 currently suppresses any ValueError, not just duplicate-dataset cases. In these compatibility tests, that can mask broken fixture creation and produce misleading downstream failures.

Suggested fix
-                        with contextlib.suppress(ValueError):
-                            fname.create_dataset(
-                                name="/" + group_dict[data_key],
-                                data=np.void(cloudpickle.dumps(data_value)),
-                            )
+                        dataset_name = "/" + group_dict[data_key]
+                        if dataset_name in fname:
+                            continue
+                        fname.create_dataset(
+                            name=dataset_name,
+                            data=np.void(cloudpickle.dumps(data_value)),
+                        )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
with contextlib.suppress(ValueError):
fname.create_dataset(
name="/" + group_dict[data_key],
data=np.void(cloudpickle.dumps(data_value)),
)
dataset_name = "/" + group_dict[data_key]
if dataset_name in fname:
continue
fname.create_dataset(
name=dataset_name,
data=np.void(cloudpickle.dumps(data_value)),
)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/standalone/test_hdf_backwards.py` around lines 34 - 38, The code
is suppressing all ValueError around fname.create_dataset which can hide real
serialization/write errors; instead, check for an existing dataset name before
creating to only skip duplicates: construct the target name as name = "/" +
group_dict[data_key], then if name not in fname call
fname.create_dataset(name=name, data=np.void(cloudpickle.dumps(data_value)));
otherwise skip (or optionally log) so only true duplicate cases are avoided and
other ValueErrors still surface. Reference: contextlib.suppress,
fname.create_dataset, np.void, cloudpickle.dumps, group_dict, data_key.


skip_h5py_test = False
except ImportError:
skip_h5py_test = True


def my_funct(a, b):
return a + b


@unittest.skipIf(
skip_h5py_test, "h5py is not installed, so the h5io tests are skipped."
)
class TestSharedFunctions(unittest.TestCase):
def test_hdf_mixed(self):
cache_directory = os.path.abspath("executorlib_cache")
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, "test_mixed.h5")
a = 1
b = 2
dump(
file_name=file_name,
data_dict={"fn": my_funct, "args": [a], "kwargs": {"b": b}},
)
data_dict = load(file_name=file_name)
self.assertTrue("fn" in data_dict.keys())
self.assertEqual(data_dict["args"], [a])
self.assertEqual(data_dict["kwargs"], {"b": b})
flag, no_error, output = get_output(file_name=file_name)
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
self.assertFalse(no_error)
self.assertFalse(flag)
self.assertIsNone(output)

def test_get_future_from_file(self):
cache_directory = os.path.abspath("executorlib_cache")
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, "test_mixed_i.h5")
a = 1
b = 2
dump(
file_name=file_name,
data_dict={"fn": my_funct, "args": [a], "kwargs": {"b": b}},
)
future = get_future_from_cache(
cache_directory=cache_directory,
cache_key="test_mixed",
)
Comment on lines +76 to +86

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

test_get_future_from_file uses inconsistent cache filename/key and will fail deterministically.

Line 76 writes test_mixed_i.h5, but Line 85 queries cache_key="test_mixed" (which resolves to a different cache file stem). This makes the test assert the happy path while setting up a missing-file path.

Suggested fix
-        file_name = os.path.join(cache_directory, "test_mixed_i.h5")
+        file_name = os.path.join(cache_directory, "test_mixed.h5")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
file_name = os.path.join(cache_directory, "test_mixed_i.h5")
a = 1
b = 2
dump(
file_name=file_name,
data_dict={"fn": my_funct, "args": [a], "kwargs": {"b": b}},
)
future = get_future_from_cache(
cache_directory=cache_directory,
cache_key="test_mixed",
)
file_name = os.path.join(cache_directory, "test_mixed.h5")
a = 1
b = 2
dump(
file_name=file_name,
data_dict={"fn": my_funct, "args": [a], "kwargs": {"b": b}},
)
future = get_future_from_cache(
cache_directory=cache_directory,
cache_key="test_mixed",
)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/standalone/test_hdf_backwards.py` around lines 76 - 86, The test
creates a cache file named via file_name = os.path.join(cache_directory,
"test_mixed_i.h5") but later calls get_future_from_cache(...,
cache_key="test_mixed"), causing a mismatch; update either the dump call or the
cache_key so they match (e.g. change the filename stem to "test_mixed.h5" or
change cache_key to "test_mixed_i") so dump(...) and get_future_from_cache(...)
refer to the same cache key/file when using the dump and get_future_from_cache
functions.

self.assertTrue(isinstance(future, Future))
self.assertFalse(future.done())

def test_get_output_file_missing(self):
cache_directory = os.path.abspath("executorlib_cache")
with self.assertRaises(FileNotFoundError):
get_output(file_name=os.path.join(cache_directory, "does_not_exist.h5"))

def test_get_future_from_file_missing(self):
cache_directory = os.path.abspath("executorlib_cache")
with self.assertRaises(FileNotFoundError):
get_future_from_cache(
cache_directory=cache_directory,
cache_key="does_not_exist",
)

def test_hdf_args(self):
cache_directory = os.path.abspath("executorlib_cache")
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, "test_args.h5")
a = 1
b = 2
dump(file_name=file_name, data_dict={"fn": my_funct, "args": [a, b]})
data_dict = load(file_name=file_name)
self.assertTrue("fn" in data_dict.keys())
self.assertEqual(data_dict["args"], [a, b])
self.assertEqual(data_dict["kwargs"], {})
flag, no_error, output = get_output(file_name=file_name)
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
self.assertFalse(flag)
self.assertFalse(no_error)
self.assertIsNone(output)

def test_hdf_kwargs(self):
cache_directory = os.path.abspath("executorlib_cache")
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, "test_kwargs.h5")
a = 1
b = 2
dump(
file_name=file_name,
data_dict={
"fn": my_funct,
"args": (),
"kwargs": {"a": a, "b": b},
"queue_id": 123,
"error_log_file": "error.out",
},
)
data_dict = load(file_name=file_name)
self.assertTrue("fn" in data_dict.keys())
self.assertEqual(data_dict["args"], ())
self.assertEqual(data_dict["kwargs"], {"a": a, "b": b})
self.assertEqual(get_queue_id(file_name=file_name), 123)
flag, no_error, output = get_output(file_name=file_name)
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
self.assertFalse(flag)
self.assertFalse(no_error)
self.assertIsNone(output)

def test_hdf_missing_funct(self):
cache_directory = os.path.abspath("executorlib_cache")
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, "test_missing_funct.h5")
dump(
file_name=file_name,
data_dict={
"queue_id": 123,
},
)
with self.assertRaises(TypeError):
load(file_name=file_name)

def test_hdf_missing_args(self):
cache_directory = os.path.abspath("executorlib_cache")
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, "test_missing_args.h5")
dump(
file_name=file_name,
data_dict={
"fn": my_funct,
},
)
data_dict = load(file_name=file_name)
self.assertTrue("fn" in data_dict.keys())
self.assertEqual(data_dict["args"], ())
flag, no_error, output = get_output(file_name=file_name)
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
self.assertFalse(flag)
self.assertFalse(no_error)
self.assertIsNone(output)

def test_hdf_queue_id(self):
cache_directory = os.path.abspath("executorlib_cache")
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, "test_queue.h5")
queue_id = 123
dump(
file_name=file_name,
data_dict={"queue_id": queue_id},
)
self.assertEqual(get_queue_id(file_name=file_name), 123)
flag, no_error, output = get_output(file_name=file_name)
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
self.assertFalse(flag)
self.assertFalse(no_error)
self.assertIsNone(output)

def test_hdf_error(self):
cache_directory = os.path.abspath("executorlib_cache")
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, "test_error.h5")
error = ValueError()
dump(
file_name=file_name,
data_dict={"error": error},
)
flag, no_error, output = get_output(file_name=file_name)
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
self.assertTrue(flag)
self.assertFalse(no_error)
self.assertTrue(isinstance(output, error.__class__))

def tearDown(self):
shutil.rmtree("executorlib_cache", ignore_errors=True)
Loading