chrontext.chrontext

class Engine:

The hybrid query engine of chrontext. Initialize Engine using: - A SPARQL Database: either in the form of a SPARQL endpoint or an embedded Oxigraph SPARQL database - A Virtualized Database: one of the supported databases: - A Python defined database (could be anything) - Google Cloud BigQuery - OPC UA History Access

Engine( resources: Dict[str, Template], virtualized_python_database: Optional[VirtualizedPythonDatabase] = None, virtualized_bigquery_database: Optional[VirtualizedBigQueryDatabase] = None, virtualized_opcua_database: Optional[ForwardRef('VirtualizedOPCUADatabase')] = None, sparql_endpoint: Optional[str] = None, sparql_embedded_oxigraph: Optional[pyoxigraph.Store] = None)

Construct a new hybrid query engine. Specify exactly one of sparql_endpoint and sparql_embedded_oxigraph.

Parameters
  • resources: The templates associated with each
  • sparql_endpoint: A SPARQL endpoint (a URL)
  • sparql_embedded_oxigraph: An embedded oxigraph SPARQL database, a Store-object.
def init(self) -> None:

Initialize the hybrid query engine.

Returns
def query( self, query: str, native_dataframe: bool = False, include_datatypes: bool = False) -> Union[polars.dataframe.frame.DataFrame, chrontext.chrontext.SolutionMappings]:

Execute a query

Parameters
  • query: The SPARQL query.
  • native_dataframe: Return columns with chrontext-native formatting. Useful for round-trips into e.g. maplib.
  • include_datatypes: Return datatypes of the results DataFrame (returns SolutionMappings instead of DataFrame).
Returns

The query result.

def serve_postgres(self, catalog: Catalog):

Serve the data product catalog as a postgres endpoint. Contact DataTreehouse to try.

Parameters
  • catalog:
Returns
def serve_flight(self, address: str):
Parameters
  • address:
Returns
class VirtualizedPythonDatabase:

A virtualized database implemented in Python.

VirtualizedPythonDatabase( database: Any, resource_sql_map: Optional[Dict[str, Any]], sql_dialect: Optional[Literal['postgres', 'bigquery', 'databricks']])

See the tutorial in README.md for guidance on how to use this class. This API is subject to change, it will be possible to specify what parts of the SPARQL query may be pushed down into the database. For advanced use, the resource_sql_map may be omitted, in which case the VirtualizedQuery will be provided to the query method. The user must then translate this VirtualizedQuery (built on SPARQL Algebra) to the target query language.

Parameters
  • database: An instance of a class containing a query method.
  • resource_sql_map: A dict providing a sqlalchemy Select for each resource.
  • sql_dialect: The SQL dialect accepted by the query method.
class VirtualizedBigQueryDatabase:

A virtualized BigQuery database

VirtualizedBigQueryDatabase( resource_sql_map: Dict[str, Union[sqlalchemy.sql.selectable.Select, sqlalchemy.sql.schema.Table]], key_json_path: str)

To be able to connect to BigQuery, provide the path to the JSON key. For each resource name in chrontext that you want to associate with BigQuery, provide an sqlalchemy Select or Table that contains each of the parameters referenced in the corresponding template provided to Engine.

See test_bigquery.py in the tests for usage.

Parameters
  • resource_sql_map: The SQLs associated with the resources
  • key_json_path: Path to JSON containing Key to connect to BigQuery.
class DataProduct:

A DataProduct is a SPARQL query which is annotated with types. It defines a virtual SQL table.

DataProduct(query: str, types: Dict[str, RDFType])

Create a new data product from a SPARQL query and the types of the columns. The SPARQL should be a SELECT query and should explicitly include projected variables (don't use *).

>>> dp1 = DataProduct(query=query, types={
...     "farm_name":RDFType.Literal("http://www.w3.org/2001/XMLSchema#string"),
...     "turbine_name":RDFType.Literal("http://www.w3.org/2001/XMLSchema#string"),
...     "t":RDFType.Literal("http://www.w3.org/2001/XMLSchema#dateTime"),
...     "v":RDFType.Literal("http://www.w3.org/2001/XMLSchema#double")})
Parameters
  • query: The SPARQL SELECT Query that defines the data product
  • types: The types of each of the variables in the data product
class Catalog:

A Catalog maps SPARQL queries to virtual SQL tables.

Catalog(data_products: Dict[str, DataProduct])

Create a new data product catalog, which defines virtual tables.

Parameters
  • data_products: The data products in the catalog. Keys are the table names.
class RDFType:

The type of a column containing a RDF variable. For instance, xsd:string is RDFType.Literal("http://www.w3.org/2001/XMLSchema#string")

def Literal(iri):
def IRI():
def BlankNode():
def Unknown():
def Nested(rdf_type):
def Multi(rdf_types):
class Prefix:

A prefix that can be used to ergonomically build iris.

Prefix(prefix, iri)

Create a new prefix.

Parameters
  • prefix: The name of the prefix
  • iri: The prefix IRI.
def suf(self, suffix: str) -> IRI:

Create a IRI by appending the suffix.

Parameters
  • suffix: The suffix to append.
Returns
class Variable:

A variable in a template.

Variable(name: str)

Create a new variable.

Parameters
  • name: The name of the variable.
name: str
class Literal:

An RDF literal.

Literal( value: str, data_type: IRI = None, language: str = None)

Create a new RDF Literal

Parameters
  • value: The lexical representation of the value.
  • data_type: The data type of the value (an IRI).
  • language: The language tag of the value.
def to_native(self) -> Union[int, float, bool, str, datetime.datetime, datetime.date]:
Returns
value: str
datatype
language: Optional[str]
class IRI:
IRI(iri: str)

Create a new IRI

Parameters
  • iri: IRI (without < and >).
iri: str

An IRI.

class Parameter:
Parameter( variable: Variable, optional: Optional[bool] = False, allow_blank: Optional[bool] = True, rdf_type: Optional[RDFType] = None, default_value: Union[Literal, IRI, chrontext.chrontext.BlankNode, NoneType] = None)

Create a new parameter for a Template.

Parameters
  • variable: The variable.
  • optional: Can the variable be unbound?
  • allow_blank: Can the variable be bound to a blank node?
  • rdf_type: The type of the variable. Can be nested.
  • default_value: Default value when no value provided.
default_value: Union[Literal, IRI, chrontext.chrontext.BlankNode, NoneType]

Parameters for template signatures.

optional: bool
rdf_type: Optional[RDFType]
variable: Variable
allow_blank: bool
class Argument:
Argument( term: Union[Variable, IRI, Literal], list_expand: Optional[bool] = False)

An argument for a template instance.

Parameters
  • term: The term.
  • list_expand: Should the argument be expanded? Used with the list_expander argument of instance.
variable
class Template:
Template( iri: IRI, parameters: List[Union[Parameter, Variable]], instances: List[Instance])

Create a new OTTR Template

Parameters
  • iri: The IRI of the template
  • parameters:
  • instances:
def instance( self, arguments: List[Union[Argument, Variable, IRI, Literal, NoneType]], list_expander: Literal['cross', 'zipMin', 'zipMax'] = None) -> Instance:
Parameters
  • arguments: The arguments to the template.
  • list_expander: (How) should we list-expand?
Returns
instances: List[Instance]

An OTTR Template. Note that accessing parameters- or instances-fields returns copies. To change these fields, you must assign new lists of parameters or instances.

parameters: List[Parameter]
iri: str
class Instance:
Instance( iri: IRI, arguments: List[Union[Argument, Variable, IRI, Literal, chrontext.chrontext.BlankNode, NoneType]], list_expander: Optional[Literal['cross', 'zipMin', 'zipMax']] = None)

A template instance.

Parameters
  • iri: The IRI of the template to be instantiated.
  • arguments: The arguments for template instantiation.
  • list_expander: (How) should we do list expansion?
iri
class XSD:

The xsd namespace, for convenience.

XSD()

Create the xsd namespace helper.

short: IRI
double: IRI
int_: IRI
language: IRI
duration: IRI
date: IRI
boolean: IRI
dateTime: IRI
float: IRI
byte: IRI
dateTimeStamp: IRI
decimal: IRI
string: IRI
integer: IRI
long: IRI
def Triple( subject: Union[Argument, IRI, Variable, chrontext.chrontext.BlankNode], predicate: Union[Argument, IRI, Variable, chrontext.chrontext.BlankNode], object: Union[Argument, IRI, Variable, Literal, chrontext.chrontext.BlankNode], list_expander: Optional[Literal['cross', 'zipMin', 'zipMax']] = None):

An OTTR Triple Pattern used for creating templates. This is the basis pattern which all template instances are rewritten into. Equivalent to:

>>> ottr = Prefix("http://ns.ottr.xyz/0.4/")
... Instance(ottr.suf("Triple"), subject, predicate, object, list_expander)
Parameters
  • subject:
  • predicate:
  • object:
  • list_expander:
Returns
def a() -> IRI:
Returns

IRI("http://www.w3.org/1999/02/22-rdf-syntax-ns#type")

class FlightClient:
FlightClient(uri: str, metadata: Dict[str, str] = None)
Parameters
  • uri: The URI of the Flight server (see engine.serve_flight())
  • metadata: gRPC metadata to add to each request
def query( self, query: str, native_dataframe: bool = False, include_datatypes: bool = False) -> Union[polars.dataframe.frame.DataFrame, chrontext.chrontext.SolutionMappings]:

Execute a query

Parameters
  • query: The SPARQL query.
  • native_dataframe: Return columns with chrontext-native formatting. Useful for round-trips into e.g. maplib.
  • include_datatypes: Return datatypes of the results DataFrame (returns SolutionMappings instead of DataFrame).
Returns

The query result.