diff --git a/pyproject.toml b/pyproject.toml index 98474cb..1ef02cd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,13 @@ excel = [ networkx = [ "networkx>=3.0", ] +sparql = [ + "rdflib>=7.6.0", +] +validation = [ + "pyshacl>=0.30", + "rdflib>=7.6.0", +] dev = [ "pytest>=7.0", "pytest-xdist", @@ -36,6 +43,7 @@ dev = [ "networkx>=3.0", "openpyxl>=3.1.5", "rdflib>=7.6.0", + "pyshacl>=0.30", ] # vis-network JS is vendored in triplets/cgmes_tools/static — graph drawing needs no extra deps visualization = [ diff --git a/tests/test_sparql.py b/tests/test_sparql.py new file mode 100644 index 0000000..757fd19 --- /dev/null +++ b/tests/test_sparql.py @@ -0,0 +1,102 @@ +"""Tests for the rdflib reference SPARQL engine (triplets.sparql).""" +import pytest + +pytest.importorskip("rdflib") + +import pandas +import triplets +from pathlib import Path + +SVEDALA_DIR = Path("test_data/relicapgrid/Instance/Grid/IGM_Svedala") +SVEDALA_FILES = [ + str(SVEDALA_DIR / "20220615T2230Z__Svedala_EQ_1.xml"), + str(SVEDALA_DIR / "20220615T2230Z_2D_Svedala_SSH_1.xml"), + str(SVEDALA_DIR / "20220615T2230Z_2D_Svedala_TP_1.xml"), + str(SVEDALA_DIR / "20220615T2230Z_2D_Svedala_SV_1.xml"), +] +SKIP_REASON = "Svedala test data not available" + +PREFIXES = ("PREFIX cim: " + "PREFIX rdf: ") + + +@pytest.fixture(scope="module") +def svedala(): + if not SVEDALA_DIR.exists(): + pytest.skip(SKIP_REASON) + return pandas.read_RDF(SVEDALA_FILES) + + +@pytest.fixture(scope="module") +def svedala_eq(): + eq = SVEDALA_DIR / "20220615T2230Z__Svedala_EQ_1.xml" + if not eq.exists(): + pytest.skip(SKIP_REASON) + return pandas.read_RDF([str(eq)]) + + +def test_select_count_matches_tableview(svedala): + """SPARQL count of a type == type_tableview row count (cross-engine consistency).""" + result = svedala.sparql.query( + PREFIXES + "SELECT (COUNT(?s) AS ?n) WHERE { ?s rdf:type cim:ACLineSegment }") + assert isinstance(result, pandas.DataFrame) + assert int(result["n"].iloc[0]) == len(svedala.triplets.type_tableview("ACLineSegment")) + + +def test_select_returns_columns_and_rows(svedala): + result = svedala.sparql.query( + PREFIXES + "SELECT ?s ?name WHERE { ?s cim:IdentifiedObject.name ?name } LIMIT 5") + assert list(result.columns) == ["s", "name"] + assert len(result) == 5 + + +def test_ask(svedala): + assert svedala.sparql.query(PREFIXES + "ASK { ?s rdf:type cim:Substation }") is True + assert svedala.sparql.query(PREFIXES + "ASK { ?s rdf:type cim:NoSuchClass }") is False + + +def test_construct_returns_triplets(svedala): + result = svedala.sparql.query( + PREFIXES + "CONSTRUCT { ?s rdf:type cim:ACLineSegment } WHERE { ?s rdf:type cim:ACLineSegment }") + assert list(result.columns) == ["ID", "KEY", "VALUE", "INSTANCE_ID"] + assert (result["KEY"] == "Type").all() + + +def test_typed_values_with_rdf_map(svedala): + """With rdf_map, numeric literals come back as python floats (xsd datatype survives).""" + from triplets.export_schema import schemas + result = svedala.sparql.query( + PREFIXES + "SELECT ?l WHERE { ?s cim:Conductor.length ?l } LIMIT 1", + rdf_map=schemas.ENTSOE_CGMES_3_0_0_552_ED1) + assert isinstance(result["l"].iloc[0], float) + + +def test_scope_restricts_to_named_graph(svedala): + """Scoping to the SSH instance finds no ACLineSegment (they live in EQ).""" + instances = svedala[(svedala["KEY"] == "Type") & (svedala["VALUE"] == "ACLineSegment")]["INSTANCE_ID"] + eq_instance = str(instances.astype(str).iloc[0]) + all_instances = set(svedala["INSTANCE_ID"].astype(str).unique()) + other = next(i for i in all_instances if i != eq_instance) + + q = PREFIXES + "SELECT (COUNT(?s) AS ?n) WHERE { ?s rdf:type cim:ACLineSegment }" + in_scope = int(svedala.sparql.query(q, scope=[eq_instance])["n"].iloc[0]) + out_scope = int(svedala.sparql.query(q, scope=[other])["n"].iloc[0]) + assert in_scope > 0 + assert out_scope == 0 + + +def test_polars_input_parity(svedala): + polars = pytest.importorskip("polars") + q = PREFIXES + "SELECT (COUNT(?s) AS ?n) WHERE { ?s rdf:type cim:ACLineSegment }" + pandas_n = int(triplets.sparql.query(svedala, q)["n"].iloc[0]) + polars_n = int(triplets.sparql.query(polars.from_pandas(svedala), q)["n"].iloc[0]) + assert pandas_n == polars_n + + +def test_duckdb_input(svedala): + duckdb = pytest.importorskip("duckdb") + con = duckdb.connect() + con.register("src", svedala) + con.execute("CREATE TABLE triplets AS SELECT * FROM src") + q = PREFIXES + "SELECT (COUNT(?s) AS ?n) WHERE { ?s rdf:type cim:ACLineSegment }" + assert int(triplets.sparql.query(con, q)["n"].iloc[0]) == len(svedala.triplets.type_tableview("ACLineSegment")) diff --git a/tests/test_validation.py b/tests/test_validation.py new file mode 100644 index 0000000..bd770a5 --- /dev/null +++ b/tests/test_validation.py @@ -0,0 +1,91 @@ +"""Tests for the pyshacl reference SHACL engine (triplets.validation).""" +import os +import pytest + +pytest.importorskip("rdflib") +pytest.importorskip("pyshacl") + +import pandas +import triplets +from pathlib import Path + +SVEDALA_DIR = Path("test_data/relicapgrid/Instance/Grid/IGM_Svedala") +SVEDALA_EQ = str(SVEDALA_DIR / "20220615T2230Z__Svedala_EQ_1.xml") +SKIP_REASON = "Svedala test data not available" + +# Inline shape (written to tmp) — deterministic, no committed shape files / external repo. +INLINE_SHAPE = """ +@prefix sh: . +@prefix cim: . +@prefix xsd: . + +cim:ACLineSegmentShape a sh:NodeShape ; + sh:targetClass cim:ACLineSegment ; + sh:property [ sh:path cim:IdentifiedObject.name ; sh:minCount 1 ; + sh:message "ACLineSegment must have a name" ] ; + sh:property [ sh:path cim:Conductor.length ; sh:datatype xsd:float ; + sh:message "Conductor.length must be xsd:float" ] . +""" + +# Real CGMES SHACL shapes — external, skip-guarded (not vendored into the repo). +CGMES_SHACL_DIR = Path(os.environ.get( + "TRIPLETS_CGMES_SHACL", + "/home/kvilgo/GIT/application-profiles-library/CGMES/CurrentRelease/SHACL")) +CGMES_EQ_SHACL = CGMES_SHACL_DIR / "61970-301_Equipment-AP-Con-Complex-SHACL.ttl" + + +@pytest.fixture(scope="module") +def svedala_eq(): + if not Path(SVEDALA_EQ).exists(): + pytest.skip(SKIP_REASON) + return pandas.read_RDF([SVEDALA_EQ]) + + +@pytest.fixture(scope="module") +def shape_file(tmp_path_factory): + path = tmp_path_factory.mktemp("shapes") / "inline.ttl" + path.write_text(INLINE_SHAPE) + return str(path) + + +def test_typed_data_conforms(svedala_eq, shape_file): + """With rdf_map, Conductor.length is xsd:float → datatype constraint passes.""" + from triplets.export_schema import schemas + violations = svedala_eq.shacl.validate(shape_file, rdf_map=schemas.ENTSOE_CGMES_3_0_0_552_ED1) + assert isinstance(violations, pandas.DataFrame) + assert len(violations) == 0 + + +def test_untyped_data_trips_datatype(svedala_eq, shape_file): + """Without rdf_map, Conductor.length is a plain string → xsd:float violations.""" + violations = svedala_eq.shacl.validate(shape_file) + assert len(violations) > 0 + assert (violations["VIOLATION_TYPE"] == "sh:datatype").all() + + +def test_violations_columns(svedala_eq, shape_file): + from triplets.validation.shacl_report import VIOLATION_COLUMNS + violations = svedala_eq.shacl.validate(shape_file) + assert list(violations.columns) == VIOLATION_COLUMNS + # focusNode stripped to bare UUID (no urn:uuid:) + assert not violations["ID"].str.startswith("urn:uuid:").any() + + +def test_scope_excludes_out_of_scope_instances(svedala_eq, shape_file): + """Scoping to an instance without ACLineSegments yields no violations.""" + instance = str(svedala_eq["INSTANCE_ID"].astype(str).iloc[0]) + in_scope = svedala_eq.shacl.validate(shape_file, scope=[instance]) + assert len(in_scope) > 0 # the EQ instance has the ACLineSegments + out_scope = svedala_eq.shacl.validate(shape_file, scope=["00000000-0000-0000-0000-000000000000"]) + assert len(out_scope) == 0 + + +@pytest.mark.performance # pyshacl on the full complex CGMES SHACL takes minutes — opt-in +@pytest.mark.skipif(not CGMES_EQ_SHACL.exists(), + reason="external CGMES SHACL shapes not available") +def test_real_cgmes_eq_shapes(svedala_eq): + """Validate Svedala EQ against the real CGMES Equipment SHACL profile.""" + from triplets.export_schema import schemas + from triplets.validation.shacl_report import VIOLATION_COLUMNS + violations = svedala_eq.shacl.validate(str(CGMES_EQ_SHACL), rdf_map=schemas.ENTSOE_CGMES_3_0_0_552_ED1) + assert list(violations.columns) == VIOLATION_COLUMNS diff --git a/triplets/__init__.py b/triplets/__init__.py index ffd55a8..a9fcc72 100644 --- a/triplets/__init__.py +++ b/triplets/__init__.py @@ -6,7 +6,9 @@ from . import cli from . import tools from . import export -from . import _accessor # registers df.triplets.* namespace # noqa: F401 +from . import sparql # noqa: F401 (df.sparql.* namespace; rdflib lazy-loaded per call) +from . import validation # noqa: F401 (df.shacl.* namespace; pyshacl lazy-loaded per call) +from . import _accessor # registers df.triplets.* / df.sparql.* / df.shacl.* namespaces # noqa: F401 __all__ = [ 'cgmes_tools', @@ -14,6 +16,8 @@ 'export_schema', 'rdfs_tools', 'cli', + 'sparql', + 'validation', ] from ._version import get_versions diff --git a/triplets/_accessor.py b/triplets/_accessor.py index 343abcb..c32cdcb 100644 --- a/triplets/_accessor.py +++ b/triplets/_accessor.py @@ -18,7 +18,7 @@ import logging import pandas -from . import tools, export +from . import tools, export, sparql, validation logger = logging.getLogger(__name__) @@ -141,6 +141,31 @@ def __init__(self, df): logger.debug("polars not installed, skipping triplets namespace accessor") +# ── df.sparql.* / df.shacl.* (separate root accessors) ────────────────────── +# These take triplet data in any flavor; the engines load it into rdflib. +# (DuckDB connections already have a native .query() — call +# triplets.sparql.query(connection, ...) directly instead of a method.) + +def _register_root_accessor(name, methods): + """Register one extra root accessor (df..) on pandas + polars.""" + class _Accessor: + def __init__(self, df): + self._df = df + + _Accessor.__name__ = f"{name.capitalize()}Accessor" + for module, method in methods: + setattr(_Accessor, method, _delegate(module, method)) + + pandas.api.extensions.register_dataframe_accessor(name)(_Accessor) + if polars: + polars.api.register_dataframe_namespace(name)(type(f"Polars{_Accessor.__name__}", (_Accessor,), {})) + logger.debug("Registered df.%s accessor", name) + + +_register_root_accessor("sparql", [(sparql, "query")]) +_register_root_accessor("shacl", [(validation, "validate")]) + + # ── DuckDB ──────────────────────────────────────────────────────────────────── if duckdb: from .tools import duckdb_engine diff --git a/triplets/_rdflib_loader.py b/triplets/_rdflib_loader.py new file mode 100644 index 0000000..1beda23 --- /dev/null +++ b/triplets/_rdflib_loader.py @@ -0,0 +1,74 @@ +"""Shared rdflib loading for the SPARQL and SHACL reference engines. + +Both engines turn triplet data into an in-memory rdflib graph by going through +the existing N-Quads export (datatype-annotated, INSTANCE_ID as named graph). +No temp files: the export is taken in memory as a BytesIO and parsed directly. +""" +import logging + +logger = logging.getLogger(__name__) + + +def load_dataset(data, rdf_map=None): + """Triplet data (any flavor) → rdflib.Dataset with named graphs per INSTANCE_ID. + + Parameters + ---------- + data : pandas/polars DataFrame, pyarrow Table/RecordBatch, or DuckDB connection + Triplet dataset with columns [ID, KEY, VALUE, INSTANCE_ID]. + rdf_map : dict or str, optional + Export schema — enables correct xsd datatypes / enum namespaces in the + loaded graph. Optional (schema-optional principle): works without it. + + Returns + ------- + rdflib.Dataset + default_union=True so queries/validation see the union across the + per-INSTANCE_ID named graphs. + """ + import rdflib + from .export import export_to_nquads + + data = _to_loadable(data) + buffer = export_to_nquads(data, rdf_map=rdf_map, export_to_memory=True) + buffer.seek(0) + + dataset = rdflib.Dataset(default_union=True) + dataset.parse(source=buffer, format="nquads") + logger.debug("loaded rdflib Dataset: %d quads", len(dataset)) + return dataset + + +def _to_loadable(data): + """export_to_nquads handles pandas/polars; convert arrow/duckdb to pandas first.""" + module = type(data).__module__ + if module.startswith("pyarrow"): + return data.to_pandas(types_mapper=__import__("pandas").ArrowDtype) + if module.startswith(("duckdb", "_duckdb")): + return data.execute("SELECT * FROM triplets").df() + return data # pandas / polars — export_to_nquads takes these directly + + +def scoped_graph(dataset, scope=None): + """Return the graph to query/validate: full union, or just the scoped instances. + + Parameters + ---------- + dataset : rdflib.Dataset + scope : iterable of INSTANCE_ID (str), optional + When given, return a concrete Graph holding the union of those + instances' named graphs (the quad's graph component does the + filtering). A concrete Graph — not a view — is required because + pyshacl clones/iterates the data graph and does not read a + ReadOnlyGraphAggregate. The copy is only the reduced scope, which is + the point of scoping. When None, the full default-union dataset is used. + """ + if scope is None: + return dataset + + import rdflib + + graph = rdflib.Graph() + for instance_id in scope: + graph += dataset.get_context(rdflib.URIRef(f"urn:uuid:{instance_id}")) + return graph diff --git a/triplets/export/__init__.py b/triplets/export/__init__.py index dccbdd7..37ac37f 100644 --- a/triplets/export/__init__.py +++ b/triplets/export/__init__.py @@ -58,11 +58,13 @@ def export_to_csv(data, path=None, multivalue=True, export_to_memory=False, sing single_file=single_file, base_filename=base_filename) -def export_to_nquads(data, path, rdf_map=None, engine="auto"): +def export_to_nquads(data, path=None, rdf_map=None, engine="auto", export_to_memory=False): """Export triplet DataFrame to N-Quads file. Parameters ---------- + path : str, optional + Output file path (.nq). Ignored when export_to_memory=True. rdf_map : dict or str, optional Export schema for proper enum detection and literal datatype annotations. If None, enums exported as literals. @@ -70,6 +72,9 @@ def export_to_nquads(data, path, rdf_map=None, engine="auto"): "polars" (lazy expression plan, ~4x faster) or "pandas". "auto" picks polars when installed, converting pandas input (~17 ms per million rows); falls back to pandas otherwise. + export_to_memory : bool, default False + If True, return an in-memory BytesIO (with .name) instead of + writing to disk — same convention as export_to_csv / export_to_cimxml. """ _check_columns(data) if engine == "auto": @@ -85,12 +90,12 @@ def export_to_nquads(data, path, rdf_map=None, engine="auto"): from .nquads_polars import export_to_nquads as _fn if not _is_polars(data): data = polars.from_pandas(data) - return _fn(data, path, rdf_map=rdf_map) + return _fn(data, path, rdf_map=rdf_map, export_to_memory=export_to_memory) from .nquads_pandas import export_to_nquads as _fn if _is_polars(data): data = data.to_pandas(use_pyarrow_extension_array=True) - return _fn(data, path, rdf_map=rdf_map) + return _fn(data, path, rdf_map=rdf_map, export_to_memory=export_to_memory) # ── CIM XML engine dispatch ────────────────────────────────────────────────── diff --git a/triplets/export/nquads_pandas.py b/triplets/export/nquads_pandas.py index 5700fcf..67a10e5 100644 --- a/triplets/export/nquads_pandas.py +++ b/triplets/export/nquads_pandas.py @@ -1,5 +1,7 @@ """N-Quads export using pandas — schema-aware value classification.""" +from io import BytesIO + import pandas from .nquads_utils import ( @@ -8,19 +10,21 @@ ) -def export_to_nquads(data, path, rdf_map=None): +def export_to_nquads(data, path=None, rdf_map=None, export_to_memory=False): """Export triplet DataFrame to N-Quads file. Parameters ---------- data : pandas.DataFrame Triplet dataset with columns [ID, KEY, VALUE, INSTANCE_ID]. - path : str - Output file path (.nq). + path : str, optional + Output file path (.nq). Ignored when export_to_memory=True. rdf_map : dict or str, optional Export schema for proper enum/association detection and literal datatype annotations ("400"^^<...XMLSchema#float>). If None, enumerations won't get namespace and literals stay untyped. + export_to_memory : bool, default False + If True, return an in-memory BytesIO (with .name) instead of writing to disk. """ enum_keys, key_namespaces, key_datatypes = build_key_metadata(rdf_map) if rdf_map else (set(), {}, {}) @@ -40,6 +44,12 @@ def export_to_nquads(data, path, rdf_map=None): graphs = inst_col.apply(make_graph) quads = subjects + " " + predicates + " " + objects + " " + graphs + " ." + content = "\n".join(quads.values) + "\n" + + if export_to_memory: + buffer = BytesIO(content.encode("utf-8")) + buffer.name = "export.nq" + return buffer with open(path, "w") as f: - f.write("\n".join(quads.values) + "\n") + f.write(content) diff --git a/triplets/export/nquads_polars.py b/triplets/export/nquads_polars.py index 88ee665..a560050 100644 --- a/triplets/export/nquads_polars.py +++ b/triplets/export/nquads_polars.py @@ -1,6 +1,7 @@ """N-Quads export using polars — lazy expression plan, fully vectorized.""" import logging +from io import BytesIO import polars as pl @@ -19,18 +20,20 @@ def _iri_or_uuid(column): .otherwise(pl.format("", pl.col(column)))) -def export_to_nquads(data, path, rdf_map=None): +def export_to_nquads(data, path=None, rdf_map=None, export_to_memory=False): """Export triplet DataFrame to N-Quads file. Parameters ---------- data : polars.DataFrame Triplet dataset with columns [ID, KEY, VALUE, INSTANCE_ID]. - path : str - Output file path (.nq). + path : str, optional + Output file path (.nq). Ignored when export_to_memory=True. rdf_map : dict or str, optional Export schema for proper enum/association detection and literal datatype annotations ("400"^^<...XMLSchema#float>). + export_to_memory : bool, default False + If True, return an in-memory BytesIO (with .name) instead of writing to disk. """ enum_keys, key_namespaces, key_datatypes = build_key_metadata(rdf_map) if rdf_map else (set(), {}, {}) @@ -85,4 +88,11 @@ def export_to_nquads(data, path, rdf_map=None): # materialization, no outer pl.format (~2.3x faster than collect → to_list # → "\n".join). quote_style="never" keeps each term verbatim (literals # carry their own quotes / internal spaces, no CSV escaping wanted). + if export_to_memory: + buffer = BytesIO() + quads.write_csv(buffer, include_header=False, quote_style="never", separator=" ") + buffer.name = "export.nq" + buffer.seek(0) + return buffer + quads.write_csv(path, include_header=False, quote_style="never", separator=" ") diff --git a/triplets/sparql/__init__.py b/triplets/sparql/__init__.py new file mode 100644 index 0000000..57bd7a4 --- /dev/null +++ b/triplets/sparql/__init__.py @@ -0,0 +1,80 @@ +"""SPARQL querying over triplet data. + +Engines (registry dispatch, mirroring triplets.parser): +- rdflib — reference, built-in SPARQL 1.1, always available with the `sparql` extra +- (future) qlever — performance option (C++); would take auto priority once added + +Data is loaded via the N-Quads export into an rdflib Dataset (INSTANCE_ID as +named graph). No oxigraph engine: our native tooling is C/C++/Cython and qlever +is the chosen performance path. +""" +from __future__ import annotations + +import logging +from importlib import import_module +from typing import Any + +logger = logging.getLogger(__name__) + +# Engine name → module (lazy import). Auto preference: first importable. +_ENGINE_MODULES = { + "rdflib": ".sparql_rdflib", +} +_ENGINE_ALIASES = { + "reference": "rdflib", +} +_ENGINES: dict[str, Any] = {} # loaded-module cache + + +def register_engine(name: str, module: Any) -> None: + """Register a custom SPARQL engine for future extensibility.""" + _ENGINES[name] = module + + +def _load_engine(name: str): + if name in _ENGINES: + return _ENGINES[name] + module_name = _ENGINE_MODULES.get(name) + if module_name is None: + raise ValueError(f"Unknown sparql engine: {name}. Known: {', '.join(_ENGINE_MODULES)}") + try: + _ENGINES[name] = import_module(module_name, __package__) + except ImportError as e: + raise ImportError(f"{name} sparql engine not available. " + "Install with: pip install triplets[sparql]. " + f"Original error: {e}") from e + return _ENGINES[name] + + +def get_engine(name: str = "auto"): + """Resolve SPARQL engine name (with aliases) and return (name, module).""" + if name == "auto": + for candidate in _ENGINE_MODULES: + try: + return candidate, _load_engine(candidate) + except ImportError: + continue + resolved = _ENGINE_ALIASES.get(name, name) + logger.debug(f"sparql engine: {resolved}") + return resolved, _load_engine(resolved) + + +def query(data, query_string, rdf_map=None, scope=None, engine="auto", return_type="pandas"): + """Run a SPARQL query over triplet data. + + Parameters + ---------- + data : triplet DataFrame (pandas/polars), arrow, or DuckDB connection + query_string : str + SPARQL query. SELECT → DataFrame (columns = projected vars), + ASK → bool, CONSTRUCT/DESCRIBE → triplet DataFrame. + rdf_map : dict or str, optional + Export schema — enables xsd-typed literals in the queried graph (optional). + scope : iterable of INSTANCE_ID, optional + Restrict the queried data to these instances' named graphs; all data + stays loaded for reference resolution. None = full union. + engine : str, default "auto" + "rdflib" (reference). "auto" picks the best available. + """ + engine_name, engine_mod = get_engine(engine) + return engine_mod.query(data, query_string, rdf_map=rdf_map, scope=scope, return_type=return_type) diff --git a/triplets/sparql/sparql_rdflib.py b/triplets/sparql/sparql_rdflib.py new file mode 100644 index 0000000..fb0ab06 --- /dev/null +++ b/triplets/sparql/sparql_rdflib.py @@ -0,0 +1,87 @@ +"""SPARQL reference engine — rdflib's built-in SPARQL 1.1. + +Correctness-first reference; the data is loaded into an in-memory rdflib +Dataset via the N-Quads export. Faster engines (qlever) come later behind the +same dispatcher. +""" +import logging + +import pandas + +from .._rdflib_loader import load_dataset, scoped_graph +from ..export.nquads_utils import CIM_NS, RDF_TYPE + +logger = logging.getLogger(__name__) + +_UUID_PREFIX = "urn:uuid:" + + +def query(data, query_string, rdf_map=None, scope=None, return_type="pandas"): + """Execute query_string over data; shape the result by query type.""" + dataset = load_dataset(data, rdf_map=rdf_map) + graph = scoped_graph(dataset, scope) + result = graph.query(query_string) + + if result.type == "ASK": + return bool(result.askAnswer) + if result.type in ("CONSTRUCT", "DESCRIBE"): + return _graph_to_triplets(result.graph) + return _select_to_dataframe(result) + + +def _select_to_dataframe(result): + """SELECT result → DataFrame (columns = projected vars, python-typed cells).""" + variables = list(result.vars) + columns = [str(v) for v in variables] + rows = [[_term_to_py(row[v]) for v in variables] for row in result] + return pandas.DataFrame(rows, columns=columns) + + +def _term_to_py(term): + """rdflib term → python value. Literals keep their xsd-mapped type; IRIs → str.""" + if term is None: + return None + if type(term).__name__ == "Literal": + return term.toPython() # int/float/datetime/str per xsd datatype + return str(term) # URIRef / BNode + + +def _graph_to_triplets(graph): + """CONSTRUCT/DESCRIBE result graph → triplet DataFrame (ID/KEY/VALUE). + + Inverse of the N-Quads export conventions: strips urn:uuid: from subjects, + CIM namespace from predicates, maps rdf:type → 'Type'. INSTANCE_ID is empty + (a constructed graph has no source instance). + """ + rows = [] + for subject, predicate, obj in graph: + rows.append({ + "ID": _strip_uuid(str(subject)), + "KEY": _shorten_predicate(str(predicate)), + "VALUE": _shorten_object(obj), + "INSTANCE_ID": None, + }) + return pandas.DataFrame(rows, columns=["ID", "KEY", "VALUE", "INSTANCE_ID"]) + + +def _strip_uuid(value): + return value[len(_UUID_PREFIX):] if value.startswith(_UUID_PREFIX) else value + + +def _shorten_predicate(predicate): + if predicate == RDF_TYPE: + return "Type" + if predicate.startswith(CIM_NS): + return predicate[len(CIM_NS):] + return predicate + + +def _shorten_object(obj): + if type(obj).__name__ == "Literal": + return str(obj) + value = str(obj) + if value.startswith(_UUID_PREFIX): + return value[len(_UUID_PREFIX):] + if value.startswith(CIM_NS): + return value[len(CIM_NS):] + return value diff --git a/triplets/validation/__init__.py b/triplets/validation/__init__.py new file mode 100644 index 0000000..5ab435f --- /dev/null +++ b/triplets/validation/__init__.py @@ -0,0 +1,79 @@ +"""SHACL validation over triplet data. + +Engines (registry dispatch, mirroring triplets.parser): +- pyshacl — reference, spec-complete, rdflib-based; always available with the + `validation` extra +- (future) pandas / polars / duckdb — experimental compiled-IR executors + +Data is loaded via the N-Quads export into an rdflib graph. pyshacl is the +correctness baseline the future engines are cross-checked against. +""" +from __future__ import annotations + +import logging +from importlib import import_module +from typing import Any + +logger = logging.getLogger(__name__) + +# Engine name → module (lazy import). Auto preference: first importable. +_ENGINE_MODULES = { + "pyshacl": ".shacl_pyshacl", +} +_ENGINE_ALIASES = { + "reference": "pyshacl", +} +_ENGINES: dict[str, Any] = {} # loaded-module cache + + +def register_engine(name: str, module: Any) -> None: + """Register a custom validation engine for future extensibility.""" + _ENGINES[name] = module + + +def _load_engine(name: str): + if name in _ENGINES: + return _ENGINES[name] + module_name = _ENGINE_MODULES.get(name) + if module_name is None: + raise ValueError(f"Unknown validation engine: {name}. Known: {', '.join(_ENGINE_MODULES)}") + try: + _ENGINES[name] = import_module(module_name, __package__) + except ImportError as e: + raise ImportError(f"{name} validation engine not available. " + "Install with: pip install triplets[validation]. " + f"Original error: {e}") from e + return _ENGINES[name] + + +def get_engine(name: str = "auto"): + """Resolve validation engine name (with aliases) and return (name, module).""" + if name == "auto": + for candidate in _ENGINE_MODULES: + try: + return candidate, _load_engine(candidate) + except ImportError: + continue + resolved = _ENGINE_ALIASES.get(name, name) + logger.debug(f"validation engine: {resolved}") + return resolved, _load_engine(resolved) + + +def validate(data, shapes, rdf_map=None, scope=None, engine="auto", **kwargs): + """Validate triplet data against SHACL shapes; return a violations DataFrame. + + Parameters + ---------- + data : triplet DataFrame (pandas/polars), arrow, or DuckDB connection + shapes : str | path | list of paths | rdflib.Graph + SHACL shapes (format auto-detected by extension). + rdf_map : dict or str, optional + Export schema — xsd-typed literals in the data graph (optional). + scope : iterable of INSTANCE_ID, optional + Validate only these instances' named graphs; all data stays loaded for + reference resolution. None = full union. + engine : str, default "auto" + "pyshacl" (reference). "auto" picks the best available. + """ + engine_name, engine_mod = get_engine(engine) + return engine_mod.validate(data, shapes, rdf_map=rdf_map, scope=scope, **kwargs) diff --git a/triplets/validation/shacl_pyshacl.py b/triplets/validation/shacl_pyshacl.py new file mode 100644 index 0000000..53a46ec --- /dev/null +++ b/triplets/validation/shacl_pyshacl.py @@ -0,0 +1,58 @@ +"""SHACL reference engine — pyshacl (spec-complete, rdflib-based). + +Correctness-first reference; the data is loaded into an in-memory rdflib graph +via the N-Quads export. Faster experimental engines (pandas/polars/duckdb) +come later behind the same dispatcher. +""" +import logging + +from .._rdflib_loader import load_dataset, scoped_graph +from .shacl_report import report_to_violations + +logger = logging.getLogger(__name__) + +_SHAPE_FORMATS = {".ttl": "turtle", ".rdf": "xml", ".xml": "xml", ".nt": "nt", ".jsonld": "json-ld"} + + +def validate(data, shapes, rdf_map=None, scope=None, inference="none", + advanced=True, abort_on_first=False, return_type="pandas"): + """Validate triplet data against SHACL shapes; return a violations DataFrame. + + Parameters + ---------- + data : triplet DataFrame (pandas/polars), arrow, or DuckDB connection + shapes : str | path | list of paths | rdflib.Graph + SHACL shapes. File format auto-detected by extension (.ttl/.rdf/.xml). + rdf_map : dict or str, optional + Export schema — xsd-typed literals in the data graph (optional). + scope : iterable of INSTANCE_ID, optional + Validate only these instances (named graphs); all data stays loaded for + reference resolution. None = full union (all profiles). + inference, advanced, abort_on_first : passed to pyshacl.validate. + """ + from pyshacl import validate as pyshacl_validate + + data_graph = scoped_graph(load_dataset(data, rdf_map=rdf_map), scope) + shapes_graph = _load_shapes(shapes) + + conforms, report_graph, _report_text = pyshacl_validate( + data_graph, shacl_graph=shapes_graph, + inference=inference, advanced=advanced, abort_on_first=abort_on_first, + ) + logger.debug("SHACL conforms=%s", conforms) + return report_to_violations(report_graph) + + +def _load_shapes(shapes): + """str/path | list of paths | rdflib.Graph → one rdflib.Graph of shapes.""" + import rdflib + + if isinstance(shapes, rdflib.Graph): + return shapes + + paths = [shapes] if isinstance(shapes, (str, bytes)) or hasattr(shapes, "__fspath__") else list(shapes) + graph = rdflib.Graph() + for path in paths: + suffix = str(path)[str(path).rfind("."):].lower() + graph.parse(str(path), format=_SHAPE_FORMATS.get(suffix, "turtle")) + return graph diff --git a/triplets/validation/shacl_report.py b/triplets/validation/shacl_report.py new file mode 100644 index 0000000..be64546 --- /dev/null +++ b/triplets/validation/shacl_report.py @@ -0,0 +1,107 @@ +"""Map a pyshacl/SHACL ValidationReport graph to the canonical violations DataFrame. + +Canonical violations schema (identical across all current and future SHACL +engines, so the later vectorized engines can produce it natively): + [ID, KEY, VALUE, VIOLATION_TYPE, MESSAGE, SEVERITY, SOURCE_SHAPE] +""" +import logging + +import pandas + +from ..export.nquads_utils import CIM_NS, RDF_TYPE + +logger = logging.getLogger(__name__) + +VIOLATION_COLUMNS = ["ID", "KEY", "VALUE", "VIOLATION_TYPE", "MESSAGE", "SEVERITY", "SOURCE_SHAPE"] + +_UUID_PREFIX = "urn:uuid:" +_SH = "http://www.w3.org/ns/shacl#" + +# sh:sourceConstraintComponent URI suffix → short violation type +_COMPONENT_MAP = { + "MinCountConstraintComponent": "sh:minCount", + "MaxCountConstraintComponent": "sh:maxCount", + "DatatypeConstraintComponent": "sh:datatype", + "MinLengthConstraintComponent": "sh:minLength", + "MaxLengthConstraintComponent": "sh:maxLength", + "PatternConstraintComponent": "sh:pattern", + "MinInclusiveConstraintComponent": "sh:minInclusive", + "MaxInclusiveConstraintComponent": "sh:maxInclusive", + "ClassConstraintComponent": "sh:class", + "NodeKindConstraintComponent": "sh:nodeKind", + "InConstraintComponent": "sh:in", + "HasValueConstraintComponent": "sh:hasValue", + "EqualsConstraintComponent": "sh:equals", + "DisjointConstraintComponent": "sh:disjoint", + "LessThanConstraintComponent": "sh:lessThan", + "ClosedConstraintComponent": "sh:closed", + "OrConstraintComponent": "sh:or", + "AndConstraintComponent": "sh:and", + "NotConstraintComponent": "sh:not", + "SPARQLConstraintComponent": "sh:sparql", +} + + +def report_to_violations(report_graph): + """ValidationReport rdflib graph → violations DataFrame (single columnar pass).""" + import rdflib + + sh = rdflib.Namespace(_SH) + # collect into per-column lists, build the DataFrame once (no per-row concat) + columns = {name: [] for name in VIOLATION_COLUMNS} + + for result in report_graph.subjects(rdflib.RDF.type, sh.ValidationResult): + path = report_graph.value(result, sh.resultPath) + value = report_graph.value(result, sh.value) + component = report_graph.value(result, sh.sourceConstraintComponent) + severity = report_graph.value(result, sh.resultSeverity) + shape = report_graph.value(result, sh.sourceShape) + message = report_graph.value(result, sh.resultMessage) + + columns["ID"].append(_strip_uuid(report_graph.value(result, sh.focusNode))) + columns["KEY"].append(_shorten(path)) + columns["VALUE"].append(_term_value(value)) + columns["VIOLATION_TYPE"].append(_component(component)) + columns["MESSAGE"].append(str(message) if message is not None else None) + columns["SEVERITY"].append(_local_name(severity) if severity is not None else "Violation") + columns["SOURCE_SHAPE"].append(str(shape) if shape is not None else None) + + return pandas.DataFrame(columns, columns=VIOLATION_COLUMNS) + + +def _strip_uuid(term): + if term is None: + return None + value = str(term) + return value[len(_UUID_PREFIX):] if value.startswith(_UUID_PREFIX) else value + + +def _shorten(term): + """Predicate/path IRI → short KEY (CIM local name, rdf:type → 'Type').""" + if term is None: + return None + value = str(term) + if value == RDF_TYPE: + return "Type" + if value.startswith(CIM_NS): + return value[len(CIM_NS):] + return value + + +def _term_value(term): + if term is None: + return None + if type(term).__name__ == "Literal": + return str(term) + return _strip_uuid(term) + + +def _component(term): + if term is None: + return "sh:unknown" + suffix = str(term).split("#")[-1] + return _COMPONENT_MAP.get(suffix, f"sh:{suffix}") + + +def _local_name(term): + return str(term).split("#")[-1]