From 072779d22490fdfc29c547e17477b80756458ed6 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 8 May 2026 11:39:27 +0200 Subject: [PATCH 1/4] Switch from numpy void() to frombuffer() --- src/executorlib/standalone/hdf.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/executorlib/standalone/hdf.py b/src/executorlib/standalone/hdf.py index dffc5d4b2..22d414685 100644 --- a/src/executorlib/standalone/hdf.py +++ b/src/executorlib/standalone/hdf.py @@ -36,7 +36,8 @@ def dump(file_name: Optional[str], data_dict: dict) -> None: if data_key in group_dict: fname.create_dataset( name="/" + group_dict[data_key], - data=np.void(cloudpickle.dumps(data_value)), + data=np.frombuffer(cloudpickle.dumps(data_value), dtype=np.uint8), + compression="gzip", ) @@ -53,26 +54,26 @@ def load(file_name: str) -> dict: with h5py.File(file_name, "r") as hdf: data_dict = {} if "function" in hdf: - data_dict["fn"] = cloudpickle.loads(np.void(hdf["/function"])) + data_dict["fn"] = cloudpickle.loads(np.frombuffer(hdf["/function"], dtype=np.uint8)) else: raise TypeError("Function not found in HDF5 file.") if "input_args" in hdf: - data_dict["args"] = cloudpickle.loads(np.void(hdf["/input_args"])) + data_dict["args"] = cloudpickle.loads(np.frombuffer(hdf["/input_args"], dtype=np.uint8)) else: data_dict["args"] = () if "input_kwargs" in hdf: - data_dict["kwargs"] = cloudpickle.loads(np.void(hdf["/input_kwargs"])) + data_dict["kwargs"] = cloudpickle.loads(np.frombuffer(hdf["/input_kwargs"], dtype=np.uint8)) else: data_dict["kwargs"] = {} if "resource_dict" in hdf: data_dict["resource_dict"] = cloudpickle.loads( - np.void(hdf["/resource_dict"]) + np.frombuffer(hdf["/resource_dict"], dtype=np.uint8) ) else: data_dict["resource_dict"] = {} if "error_log_file" in hdf: data_dict["error_log_file"] = cloudpickle.loads( - np.void(hdf["/error_log_file"]) + np.frombuffer(hdf["/error_log_file"], dtype=np.uint8) ) return data_dict @@ -91,9 +92,9 @@ def get_output(file_name: str) -> tuple[bool, bool, Any]: def get_output_helper(file_name: str) -> tuple[bool, bool, Any]: with h5py.File(file_name, "r") as hdf: if "output" in hdf: - return True, True, cloudpickle.loads(np.void(hdf["/output"])) + return True, True, cloudpickle.loads(np.frombuffer(hdf["/output"], dtype=np.uint8)) elif "error" in hdf: - return True, False, cloudpickle.loads(np.void(hdf["/error"])) + return True, False, cloudpickle.loads(np.frombuffer(hdf["/error"], dtype=np.uint8)) else: return False, False, None @@ -120,7 +121,7 @@ def get_runtime(file_name: str) -> float: """ with h5py.File(file_name, "r") as hdf: if "runtime" in hdf: - return cloudpickle.loads(np.void(hdf["/runtime"])) + return cloudpickle.loads(np.frombuffer(hdf["/runtime"], dtype=np.uint8)) else: return 0.0 @@ -138,7 +139,7 @@ def get_queue_id(file_name: Optional[str]) -> Optional[int]: if file_name is not None and os.path.exists(file_name): with h5py.File(file_name, "r") as hdf: if "queue_id" in hdf: - return cloudpickle.loads(np.void(hdf["/queue_id"])) + return cloudpickle.loads(np.frombuffer(hdf["/queue_id"], dtype=np.uint8)) return None @@ -224,7 +225,7 @@ def _get_content_of_file(file_name: str) -> dict: """ with h5py.File(file_name, "r") as hdf: return { - key: cloudpickle.loads(np.void(hdf["/" + key])) + key: cloudpickle.loads(np.frombuffer(hdf["/" + key], dtype=np.uint8)) for key in group_dict.values() if key in hdf } From bcac15939a8cd57cb17d352165d645fbab1d67d5 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 8 May 2026 11:43:49 +0200 Subject: [PATCH 2/4] fix reader --- src/executorlib/standalone/hdf.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/executorlib/standalone/hdf.py b/src/executorlib/standalone/hdf.py index 22d414685..df4d4b291 100644 --- a/src/executorlib/standalone/hdf.py +++ b/src/executorlib/standalone/hdf.py @@ -54,26 +54,26 @@ def load(file_name: str) -> dict: with h5py.File(file_name, "r") as hdf: data_dict = {} if "function" in hdf: - data_dict["fn"] = cloudpickle.loads(np.frombuffer(hdf["/function"], dtype=np.uint8)) + data_dict["fn"] = cloudpickle.loads(hdf["/function"][()].tobytes()) else: raise TypeError("Function not found in HDF5 file.") if "input_args" in hdf: - data_dict["args"] = cloudpickle.loads(np.frombuffer(hdf["/input_args"], dtype=np.uint8)) + data_dict["args"] = cloudpickle.loads(hdf["/input_args"][()].tobytes()) else: data_dict["args"] = () if "input_kwargs" in hdf: - data_dict["kwargs"] = cloudpickle.loads(np.frombuffer(hdf["/input_kwargs"], dtype=np.uint8)) + data_dict["kwargs"] = cloudpickle.loads(hdf["/input_kwargs"][()].tobytes()) else: data_dict["kwargs"] = {} if "resource_dict" in hdf: data_dict["resource_dict"] = cloudpickle.loads( - np.frombuffer(hdf["/resource_dict"], dtype=np.uint8) + hdf["/resource_dict"][()].tobytes() ) else: data_dict["resource_dict"] = {} if "error_log_file" in hdf: data_dict["error_log_file"] = cloudpickle.loads( - np.frombuffer(hdf["/error_log_file"], dtype=np.uint8) + hdf["/error_log_file"][()].tobytes() ) return data_dict @@ -92,9 +92,9 @@ def get_output(file_name: str) -> tuple[bool, bool, Any]: def get_output_helper(file_name: str) -> tuple[bool, bool, Any]: with h5py.File(file_name, "r") as hdf: if "output" in hdf: - return True, True, cloudpickle.loads(np.frombuffer(hdf["/output"], dtype=np.uint8)) + return True, True, cloudpickle.loads(hdf["/output"][()].tobytes()) elif "error" in hdf: - return True, False, cloudpickle.loads(np.frombuffer(hdf["/error"], dtype=np.uint8)) + return True, False, cloudpickle.loads(hdf["/error"][()].tobytes()) else: return False, False, None @@ -121,7 +121,7 @@ def get_runtime(file_name: str) -> float: """ with h5py.File(file_name, "r") as hdf: if "runtime" in hdf: - return cloudpickle.loads(np.frombuffer(hdf["/runtime"], dtype=np.uint8)) + return cloudpickle.loads(hdf["/runtime"][()].tobytes()) else: return 0.0 @@ -139,7 +139,7 @@ def get_queue_id(file_name: Optional[str]) -> Optional[int]: if file_name is not None and os.path.exists(file_name): with h5py.File(file_name, "r") as hdf: if "queue_id" in hdf: - return cloudpickle.loads(np.frombuffer(hdf["/queue_id"], dtype=np.uint8)) + return cloudpickle.loads(hdf["/queue_id"][()].tobytes()) return None @@ -225,7 +225,7 @@ def _get_content_of_file(file_name: str) -> dict: """ with h5py.File(file_name, "r") as hdf: return { - key: cloudpickle.loads(np.frombuffer(hdf["/" + key], dtype=np.uint8)) + key: cloudpickle.loads(hdf["/" + key][()].tobytes()) for key in group_dict.values() if key in hdf } From c65a65ba244c5172772a0ae34e8dba09ab4adac2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 8 May 2026 09:44:11 +0000 Subject: [PATCH 3/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/executorlib/standalone/hdf.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/executorlib/standalone/hdf.py b/src/executorlib/standalone/hdf.py index df4d4b291..2f1a0d717 100644 --- a/src/executorlib/standalone/hdf.py +++ b/src/executorlib/standalone/hdf.py @@ -36,7 +36,9 @@ def dump(file_name: Optional[str], data_dict: dict) -> None: if data_key in group_dict: fname.create_dataset( name="/" + group_dict[data_key], - data=np.frombuffer(cloudpickle.dumps(data_value), dtype=np.uint8), + data=np.frombuffer( + cloudpickle.dumps(data_value), dtype=np.uint8 + ), compression="gzip", ) From 86cccd0986a3f2e80dba0896f40f639cb642be86 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 12 Jun 2026 17:52:04 +0200 Subject: [PATCH 4/4] Add backwards compatibility tests --- tests/unit/standalone/test_hdf_backwards.py | 211 ++++++++++++++++++++ 1 file changed, 211 insertions(+) create mode 100644 tests/unit/standalone/test_hdf_backwards.py diff --git a/tests/unit/standalone/test_hdf_backwards.py b/tests/unit/standalone/test_hdf_backwards.py new file mode 100644 index 000000000..2461224ed --- /dev/null +++ b/tests/unit/standalone/test_hdf_backwards.py @@ -0,0 +1,211 @@ +import contextlib +import os +import shutil +import unittest +from concurrent.futures import Future +from typing import Optional + +import numpy as np + + +try: + from executorlib.standalone.hdf import ( + load, + get_output, + get_runtime, + get_queue_id, + get_future_from_cache, + group_dict, + ) + + import h5py + import cloudpickle + + def dump(file_name: Optional[str], data_dict: dict) -> None: + """ + Previous dump function just copied here for backwards compatibility. + """ + if file_name is not None: + file_name_abs = os.path.abspath(file_name) + os.makedirs(os.path.dirname(file_name_abs), exist_ok=True) + with h5py.File(file_name_abs, "a") as fname: + for data_key, data_value in data_dict.items(): + if data_key in group_dict: + with contextlib.suppress(ValueError): + fname.create_dataset( + name="/" + group_dict[data_key], + data=np.void(cloudpickle.dumps(data_value)), + ) + + skip_h5py_test = False +except ImportError: + skip_h5py_test = True + + +def my_funct(a, b): + return a + b + + +@unittest.skipIf( + skip_h5py_test, "h5py is not installed, so the h5io tests are skipped." +) +class TestSharedFunctions(unittest.TestCase): + def test_hdf_mixed(self): + cache_directory = os.path.abspath("executorlib_cache") + os.makedirs(cache_directory, exist_ok=True) + file_name = os.path.join(cache_directory, "test_mixed.h5") + a = 1 + b = 2 + dump( + file_name=file_name, + data_dict={"fn": my_funct, "args": [a], "kwargs": {"b": b}}, + ) + data_dict = load(file_name=file_name) + self.assertTrue("fn" in data_dict.keys()) + self.assertEqual(data_dict["args"], [a]) + self.assertEqual(data_dict["kwargs"], {"b": b}) + flag, no_error, output = get_output(file_name=file_name) + self.assertTrue(get_runtime(file_name=file_name) == 0.0) + self.assertFalse(no_error) + self.assertFalse(flag) + self.assertIsNone(output) + + def test_get_future_from_file(self): + cache_directory = os.path.abspath("executorlib_cache") + os.makedirs(cache_directory, exist_ok=True) + file_name = os.path.join(cache_directory, "test_mixed_i.h5") + a = 1 + b = 2 + dump( + file_name=file_name, + data_dict={"fn": my_funct, "args": [a], "kwargs": {"b": b}}, + ) + future = get_future_from_cache( + cache_directory=cache_directory, + cache_key="test_mixed", + ) + self.assertTrue(isinstance(future, Future)) + self.assertFalse(future.done()) + + def test_get_output_file_missing(self): + cache_directory = os.path.abspath("executorlib_cache") + with self.assertRaises(FileNotFoundError): + get_output(file_name=os.path.join(cache_directory, "does_not_exist.h5")) + + def test_get_future_from_file_missing(self): + cache_directory = os.path.abspath("executorlib_cache") + with self.assertRaises(FileNotFoundError): + get_future_from_cache( + cache_directory=cache_directory, + cache_key="does_not_exist", + ) + + def test_hdf_args(self): + cache_directory = os.path.abspath("executorlib_cache") + os.makedirs(cache_directory, exist_ok=True) + file_name = os.path.join(cache_directory, "test_args.h5") + a = 1 + b = 2 + dump(file_name=file_name, data_dict={"fn": my_funct, "args": [a, b]}) + data_dict = load(file_name=file_name) + self.assertTrue("fn" in data_dict.keys()) + self.assertEqual(data_dict["args"], [a, b]) + self.assertEqual(data_dict["kwargs"], {}) + flag, no_error, output = get_output(file_name=file_name) + self.assertTrue(get_runtime(file_name=file_name) == 0.0) + self.assertFalse(flag) + self.assertFalse(no_error) + self.assertIsNone(output) + + def test_hdf_kwargs(self): + cache_directory = os.path.abspath("executorlib_cache") + os.makedirs(cache_directory, exist_ok=True) + file_name = os.path.join(cache_directory, "test_kwargs.h5") + a = 1 + b = 2 + dump( + file_name=file_name, + data_dict={ + "fn": my_funct, + "args": (), + "kwargs": {"a": a, "b": b}, + "queue_id": 123, + "error_log_file": "error.out", + }, + ) + data_dict = load(file_name=file_name) + self.assertTrue("fn" in data_dict.keys()) + self.assertEqual(data_dict["args"], ()) + self.assertEqual(data_dict["kwargs"], {"a": a, "b": b}) + self.assertEqual(get_queue_id(file_name=file_name), 123) + flag, no_error, output = get_output(file_name=file_name) + self.assertTrue(get_runtime(file_name=file_name) == 0.0) + self.assertFalse(flag) + self.assertFalse(no_error) + self.assertIsNone(output) + + def test_hdf_missing_funct(self): + cache_directory = os.path.abspath("executorlib_cache") + os.makedirs(cache_directory, exist_ok=True) + file_name = os.path.join(cache_directory, "test_missing_funct.h5") + dump( + file_name=file_name, + data_dict={ + "queue_id": 123, + }, + ) + with self.assertRaises(TypeError): + load(file_name=file_name) + + def test_hdf_missing_args(self): + cache_directory = os.path.abspath("executorlib_cache") + os.makedirs(cache_directory, exist_ok=True) + file_name = os.path.join(cache_directory, "test_missing_args.h5") + dump( + file_name=file_name, + data_dict={ + "fn": my_funct, + }, + ) + data_dict = load(file_name=file_name) + self.assertTrue("fn" in data_dict.keys()) + self.assertEqual(data_dict["args"], ()) + flag, no_error, output = get_output(file_name=file_name) + self.assertTrue(get_runtime(file_name=file_name) == 0.0) + self.assertFalse(flag) + self.assertFalse(no_error) + self.assertIsNone(output) + + def test_hdf_queue_id(self): + cache_directory = os.path.abspath("executorlib_cache") + os.makedirs(cache_directory, exist_ok=True) + file_name = os.path.join(cache_directory, "test_queue.h5") + queue_id = 123 + dump( + file_name=file_name, + data_dict={"queue_id": queue_id}, + ) + self.assertEqual(get_queue_id(file_name=file_name), 123) + flag, no_error, output = get_output(file_name=file_name) + self.assertTrue(get_runtime(file_name=file_name) == 0.0) + self.assertFalse(flag) + self.assertFalse(no_error) + self.assertIsNone(output) + + def test_hdf_error(self): + cache_directory = os.path.abspath("executorlib_cache") + os.makedirs(cache_directory, exist_ok=True) + file_name = os.path.join(cache_directory, "test_error.h5") + error = ValueError() + dump( + file_name=file_name, + data_dict={"error": error}, + ) + flag, no_error, output = get_output(file_name=file_name) + self.assertTrue(get_runtime(file_name=file_name) == 0.0) + self.assertTrue(flag) + self.assertFalse(no_error) + self.assertTrue(isinstance(output, error.__class__)) + + def tearDown(self): + shutil.rmtree("executorlib_cache", ignore_errors=True)