Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ excel = [
networkx = [
"networkx>=3.0",
]
sparql = [
"rdflib>=7.6.0",
]
validation = [
"pyshacl>=0.30",
"rdflib>=7.6.0",
]
dev = [
"pytest>=7.0",
"pytest-xdist",
Expand All @@ -36,6 +43,7 @@ dev = [
"networkx>=3.0",
"openpyxl>=3.1.5",
"rdflib>=7.6.0",
"pyshacl>=0.30",
]
# vis-network JS is vendored in triplets/cgmes_tools/static — graph drawing needs no extra deps
visualization = [
Expand Down
102 changes: 102 additions & 0 deletions tests/test_sparql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Tests for the rdflib reference SPARQL engine (triplets.sparql)."""
import pytest

pytest.importorskip("rdflib")

import pandas
import triplets
from pathlib import Path

SVEDALA_DIR = Path("test_data/relicapgrid/Instance/Grid/IGM_Svedala")
SVEDALA_FILES = [
str(SVEDALA_DIR / "20220615T2230Z__Svedala_EQ_1.xml"),
str(SVEDALA_DIR / "20220615T2230Z_2D_Svedala_SSH_1.xml"),
str(SVEDALA_DIR / "20220615T2230Z_2D_Svedala_TP_1.xml"),
str(SVEDALA_DIR / "20220615T2230Z_2D_Svedala_SV_1.xml"),
]
SKIP_REASON = "Svedala test data not available"

PREFIXES = ("PREFIX cim: <http://iec.ch/TC57/CIM100#> "
"PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> ")


@pytest.fixture(scope="module")
def svedala():
if not SVEDALA_DIR.exists():
pytest.skip(SKIP_REASON)
return pandas.read_RDF(SVEDALA_FILES)


@pytest.fixture(scope="module")
def svedala_eq():
eq = SVEDALA_DIR / "20220615T2230Z__Svedala_EQ_1.xml"
if not eq.exists():
pytest.skip(SKIP_REASON)
return pandas.read_RDF([str(eq)])


def test_select_count_matches_tableview(svedala):
"""SPARQL count of a type == type_tableview row count (cross-engine consistency)."""
result = svedala.sparql.query(
PREFIXES + "SELECT (COUNT(?s) AS ?n) WHERE { ?s rdf:type cim:ACLineSegment }")
assert isinstance(result, pandas.DataFrame)
assert int(result["n"].iloc[0]) == len(svedala.triplets.type_tableview("ACLineSegment"))


def test_select_returns_columns_and_rows(svedala):
result = svedala.sparql.query(
PREFIXES + "SELECT ?s ?name WHERE { ?s cim:IdentifiedObject.name ?name } LIMIT 5")
assert list(result.columns) == ["s", "name"]
assert len(result) == 5


def test_ask(svedala):
assert svedala.sparql.query(PREFIXES + "ASK { ?s rdf:type cim:Substation }") is True
assert svedala.sparql.query(PREFIXES + "ASK { ?s rdf:type cim:NoSuchClass }") is False


def test_construct_returns_triplets(svedala):
result = svedala.sparql.query(
PREFIXES + "CONSTRUCT { ?s rdf:type cim:ACLineSegment } WHERE { ?s rdf:type cim:ACLineSegment }")
assert list(result.columns) == ["ID", "KEY", "VALUE", "INSTANCE_ID"]
assert (result["KEY"] == "Type").all()


def test_typed_values_with_rdf_map(svedala):
"""With rdf_map, numeric literals come back as python floats (xsd datatype survives)."""
from triplets.export_schema import schemas
result = svedala.sparql.query(
PREFIXES + "SELECT ?l WHERE { ?s cim:Conductor.length ?l } LIMIT 1",
rdf_map=schemas.ENTSOE_CGMES_3_0_0_552_ED1)
assert isinstance(result["l"].iloc[0], float)


def test_scope_restricts_to_named_graph(svedala):
"""Scoping to the SSH instance finds no ACLineSegment (they live in EQ)."""
instances = svedala[(svedala["KEY"] == "Type") & (svedala["VALUE"] == "ACLineSegment")]["INSTANCE_ID"]
eq_instance = str(instances.astype(str).iloc[0])
all_instances = set(svedala["INSTANCE_ID"].astype(str).unique())
other = next(i for i in all_instances if i != eq_instance)

q = PREFIXES + "SELECT (COUNT(?s) AS ?n) WHERE { ?s rdf:type cim:ACLineSegment }"
in_scope = int(svedala.sparql.query(q, scope=[eq_instance])["n"].iloc[0])
out_scope = int(svedala.sparql.query(q, scope=[other])["n"].iloc[0])
assert in_scope > 0
assert out_scope == 0


def test_polars_input_parity(svedala):
polars = pytest.importorskip("polars")
q = PREFIXES + "SELECT (COUNT(?s) AS ?n) WHERE { ?s rdf:type cim:ACLineSegment }"
pandas_n = int(triplets.sparql.query(svedala, q)["n"].iloc[0])
polars_n = int(triplets.sparql.query(polars.from_pandas(svedala), q)["n"].iloc[0])
assert pandas_n == polars_n


def test_duckdb_input(svedala):
duckdb = pytest.importorskip("duckdb")
con = duckdb.connect()
con.register("src", svedala)
con.execute("CREATE TABLE triplets AS SELECT * FROM src")
q = PREFIXES + "SELECT (COUNT(?s) AS ?n) WHERE { ?s rdf:type cim:ACLineSegment }"
assert int(triplets.sparql.query(con, q)["n"].iloc[0]) == len(svedala.triplets.type_tableview("ACLineSegment"))
91 changes: 91 additions & 0 deletions tests/test_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""Tests for the pyshacl reference SHACL engine (triplets.validation)."""
import os
import pytest

pytest.importorskip("rdflib")
pytest.importorskip("pyshacl")

import pandas
import triplets
from pathlib import Path

SVEDALA_DIR = Path("test_data/relicapgrid/Instance/Grid/IGM_Svedala")
SVEDALA_EQ = str(SVEDALA_DIR / "20220615T2230Z__Svedala_EQ_1.xml")
SKIP_REASON = "Svedala test data not available"

# Inline shape (written to tmp) — deterministic, no committed shape files / external repo.
INLINE_SHAPE = """
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix cim: <http://iec.ch/TC57/CIM100#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .

cim:ACLineSegmentShape a sh:NodeShape ;
sh:targetClass cim:ACLineSegment ;
sh:property [ sh:path cim:IdentifiedObject.name ; sh:minCount 1 ;
sh:message "ACLineSegment must have a name" ] ;
sh:property [ sh:path cim:Conductor.length ; sh:datatype xsd:float ;
sh:message "Conductor.length must be xsd:float" ] .
"""

# Real CGMES SHACL shapes — external, skip-guarded (not vendored into the repo).
CGMES_SHACL_DIR = Path(os.environ.get(
"TRIPLETS_CGMES_SHACL",
"/home/kvilgo/GIT/application-profiles-library/CGMES/CurrentRelease/SHACL"))
CGMES_EQ_SHACL = CGMES_SHACL_DIR / "61970-301_Equipment-AP-Con-Complex-SHACL.ttl"


@pytest.fixture(scope="module")
def svedala_eq():
if not Path(SVEDALA_EQ).exists():
pytest.skip(SKIP_REASON)
return pandas.read_RDF([SVEDALA_EQ])


@pytest.fixture(scope="module")
def shape_file(tmp_path_factory):
path = tmp_path_factory.mktemp("shapes") / "inline.ttl"
path.write_text(INLINE_SHAPE)
return str(path)


def test_typed_data_conforms(svedala_eq, shape_file):
"""With rdf_map, Conductor.length is xsd:float → datatype constraint passes."""
from triplets.export_schema import schemas
violations = svedala_eq.shacl.validate(shape_file, rdf_map=schemas.ENTSOE_CGMES_3_0_0_552_ED1)
assert isinstance(violations, pandas.DataFrame)
assert len(violations) == 0


def test_untyped_data_trips_datatype(svedala_eq, shape_file):
"""Without rdf_map, Conductor.length is a plain string → xsd:float violations."""
violations = svedala_eq.shacl.validate(shape_file)
assert len(violations) > 0
assert (violations["VIOLATION_TYPE"] == "sh:datatype").all()


def test_violations_columns(svedala_eq, shape_file):
from triplets.validation.shacl_report import VIOLATION_COLUMNS
violations = svedala_eq.shacl.validate(shape_file)
assert list(violations.columns) == VIOLATION_COLUMNS
# focusNode stripped to bare UUID (no urn:uuid:)
assert not violations["ID"].str.startswith("urn:uuid:").any()


def test_scope_excludes_out_of_scope_instances(svedala_eq, shape_file):
"""Scoping to an instance without ACLineSegments yields no violations."""
instance = str(svedala_eq["INSTANCE_ID"].astype(str).iloc[0])
in_scope = svedala_eq.shacl.validate(shape_file, scope=[instance])
assert len(in_scope) > 0 # the EQ instance has the ACLineSegments
out_scope = svedala_eq.shacl.validate(shape_file, scope=["00000000-0000-0000-0000-000000000000"])
assert len(out_scope) == 0


@pytest.mark.performance # pyshacl on the full complex CGMES SHACL takes minutes — opt-in
@pytest.mark.skipif(not CGMES_EQ_SHACL.exists(),
reason="external CGMES SHACL shapes not available")
def test_real_cgmes_eq_shapes(svedala_eq):
"""Validate Svedala EQ against the real CGMES Equipment SHACL profile."""
from triplets.export_schema import schemas
from triplets.validation.shacl_report import VIOLATION_COLUMNS
violations = svedala_eq.shacl.validate(str(CGMES_EQ_SHACL), rdf_map=schemas.ENTSOE_CGMES_3_0_0_552_ED1)
assert list(violations.columns) == VIOLATION_COLUMNS
6 changes: 5 additions & 1 deletion triplets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@
from . import cli
from . import tools
from . import export
from . import _accessor # registers df.triplets.* namespace # noqa: F401
from . import sparql # noqa: F401 (df.sparql.* namespace; rdflib lazy-loaded per call)
from . import validation # noqa: F401 (df.shacl.* namespace; pyshacl lazy-loaded per call)
from . import _accessor # registers df.triplets.* / df.sparql.* / df.shacl.* namespaces # noqa: F401

__all__ = [
'cgmes_tools',
'rdf_parser',
'export_schema',
'rdfs_tools',
'cli',
'sparql',
'validation',
]

from ._version import get_versions
Expand Down
27 changes: 26 additions & 1 deletion triplets/_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import logging
import pandas

from . import tools, export
from . import tools, export, sparql, validation

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -141,6 +141,31 @@ def __init__(self, df):
logger.debug("polars not installed, skipping triplets namespace accessor")


# ── df.sparql.* / df.shacl.* (separate root accessors) ──────────────────────
# These take triplet data in any flavor; the engines load it into rdflib.
# (DuckDB connections already have a native .query() — call
# triplets.sparql.query(connection, ...) directly instead of a method.)

def _register_root_accessor(name, methods):
"""Register one extra root accessor (df.<name>.<method>) on pandas + polars."""
class _Accessor:
def __init__(self, df):
self._df = df

_Accessor.__name__ = f"{name.capitalize()}Accessor"
for module, method in methods:
setattr(_Accessor, method, _delegate(module, method))

pandas.api.extensions.register_dataframe_accessor(name)(_Accessor)
if polars:
polars.api.register_dataframe_namespace(name)(type(f"Polars{_Accessor.__name__}", (_Accessor,), {}))
logger.debug("Registered df.%s accessor", name)


_register_root_accessor("sparql", [(sparql, "query")])
_register_root_accessor("shacl", [(validation, "validate")])


# ── DuckDB ────────────────────────────────────────────────────────────────────
if duckdb:
from .tools import duckdb_engine
Expand Down
74 changes: 74 additions & 0 deletions triplets/_rdflib_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Shared rdflib loading for the SPARQL and SHACL reference engines.

Both engines turn triplet data into an in-memory rdflib graph by going through
the existing N-Quads export (datatype-annotated, INSTANCE_ID as named graph).
No temp files: the export is taken in memory as a BytesIO and parsed directly.
"""
import logging

logger = logging.getLogger(__name__)


def load_dataset(data, rdf_map=None):
"""Triplet data (any flavor) → rdflib.Dataset with named graphs per INSTANCE_ID.

Parameters
----------
data : pandas/polars DataFrame, pyarrow Table/RecordBatch, or DuckDB connection
Triplet dataset with columns [ID, KEY, VALUE, INSTANCE_ID].
rdf_map : dict or str, optional
Export schema — enables correct xsd datatypes / enum namespaces in the
loaded graph. Optional (schema-optional principle): works without it.

Returns
-------
rdflib.Dataset
default_union=True so queries/validation see the union across the
per-INSTANCE_ID named graphs.
"""
import rdflib
from .export import export_to_nquads

data = _to_loadable(data)
buffer = export_to_nquads(data, rdf_map=rdf_map, export_to_memory=True)
buffer.seek(0)

dataset = rdflib.Dataset(default_union=True)
dataset.parse(source=buffer, format="nquads")
logger.debug("loaded rdflib Dataset: %d quads", len(dataset))
return dataset


def _to_loadable(data):
"""export_to_nquads handles pandas/polars; convert arrow/duckdb to pandas first."""
module = type(data).__module__
if module.startswith("pyarrow"):
return data.to_pandas(types_mapper=__import__("pandas").ArrowDtype)
if module.startswith(("duckdb", "_duckdb")):
return data.execute("SELECT * FROM triplets").df()
return data # pandas / polars — export_to_nquads takes these directly


def scoped_graph(dataset, scope=None):
"""Return the graph to query/validate: full union, or just the scoped instances.

Parameters
----------
dataset : rdflib.Dataset
scope : iterable of INSTANCE_ID (str), optional
When given, return a concrete Graph holding the union of those
instances' named graphs (the quad's graph component does the
filtering). A concrete Graph — not a view — is required because
pyshacl clones/iterates the data graph and does not read a
ReadOnlyGraphAggregate. The copy is only the reduced scope, which is
the point of scoping. When None, the full default-union dataset is used.
"""
if scope is None:
return dataset

import rdflib

graph = rdflib.Graph()
for instance_id in scope:
graph += dataset.get_context(rdflib.URIRef(f"urn:uuid:{instance_id}"))
return graph
11 changes: 8 additions & 3 deletions triplets/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,23 @@ def export_to_csv(data, path=None, multivalue=True, export_to_memory=False, sing
single_file=single_file, base_filename=base_filename)


def export_to_nquads(data, path, rdf_map=None, engine="auto"):
def export_to_nquads(data, path=None, rdf_map=None, engine="auto", export_to_memory=False):
"""Export triplet DataFrame to N-Quads file.

Parameters
----------
path : str, optional
Output file path (.nq). Ignored when export_to_memory=True.
rdf_map : dict or str, optional
Export schema for proper enum detection and literal datatype
annotations. If None, enums exported as literals.
engine : str, default "auto"
"polars" (lazy expression plan, ~4x faster) or "pandas".
"auto" picks polars when installed, converting pandas input
(~17 ms per million rows); falls back to pandas otherwise.
export_to_memory : bool, default False
If True, return an in-memory BytesIO (with .name) instead of
writing to disk — same convention as export_to_csv / export_to_cimxml.
"""
_check_columns(data)
if engine == "auto":
Expand All @@ -85,12 +90,12 @@ def export_to_nquads(data, path, rdf_map=None, engine="auto"):
from .nquads_polars import export_to_nquads as _fn
if not _is_polars(data):
data = polars.from_pandas(data)
return _fn(data, path, rdf_map=rdf_map)
return _fn(data, path, rdf_map=rdf_map, export_to_memory=export_to_memory)

from .nquads_pandas import export_to_nquads as _fn
if _is_polars(data):
data = data.to_pandas(use_pyarrow_extension_array=True)
return _fn(data, path, rdf_map=rdf_map)
return _fn(data, path, rdf_map=rdf_map, export_to_memory=export_to_memory)


# ── CIM XML engine dispatch ──────────────────────────────────────────────────
Expand Down
Loading