"""
The RDFToLiPD class helps in converting an RDF Graph to a LiPD file.
It uses the SCHEMA dictionary (from globals/schema.py) to do the conversion
"""
import copy
import os
import re
import csv
import bagit
import json
import tempfile
import zipfile
from rdflib.graph import URIRef
from ..globals.urls import NSURL
from ..globals.blacklist import REVERSE_BLACKLIST
from ..globals.schema import SCHEMA
from ..globals.synonyms import RSYNONYMS
from .utils import ucfirst, lcfirst, unzip_string
[docs]
class RDFToLiPD:
"""
The RDFToLiPD class helps in converting an RDF Graph to a LiPD file.
It uses the SCHEMA dictionary (from globals/schema.py) to do the conversion
"""
def __init__(self, graph):
self.graph = graph
self.lipd_csvs = {}
self.graphurl = NSURL
self.namespace = NSURL + "/"
[docs]
def convert(self, dsname, lipdfile):
'''Convert RDF graph to a LiPD file
Parameters
----------
graph : rdflib.ConjunctiveGraph
the RDF graph object
'''
lipd = self.convert_to_json(dsname)
with tempfile.TemporaryDirectory(prefix="rdf_to_lipd_") as tmpdir:
# Create a temporary data directory
datadir = f"{tmpdir}/{dsname}"
os.makedirs(datadir)
# Create the csv files and metadata jsonld
self._create_csvs(lipd, datadir)
with(open(f"{datadir}/metadata.jsonld", "w")) as f:
json.dump(lipd, f, indent=4, default=str)
# Convert data directory to a bag
bagit.make_bag(datadir, checksums=['md5'])
# Zip the bag
self._zip_directory(datadir, lipdfile)
return lipd
[docs]
def convert_to_json(self, dsname):
'''Convert RDF graph to a LiPD file
Parameters
----------
graph : rdflib.ConjunctiveGraph
the RDF graph object
'''
self.schema = copy.deepcopy(SCHEMA)
self.rschema = self._get_schema_reverse_map()
self.allfacts = {}
self._get_indexed_facts(self.namespace + dsname)
lipd = self._convert_to_lipd(self.namespace + dsname, "Dataset", "Dataset", pagesdone={})
lipd = self._post_processing(lipd)
return lipd
def _get_table_data(self, table):
csvdata = []
numrows = 0
table["columns"] = sorted(table["columns"], key=lambda x: x["number"] if ("number" in x and type(x["number"]) is int) else 99999)
for col in table["columns"]:
if "values" in col:
numrows = len(col["values"])
break
for rowidx in range(numrows):
row = []
for col in table["columns"]:
if "values" in col:
colindices = []
if "number" in col:
colnum = col["number"]
colindices.append(colnum)
else:
colindices.append(1)
for colindex in colindices:
if type(colindex) is list:
sorted(colindex)
firstidx = colindex[0]
for subindex in colindex:
idx = subindex - 1
if idx >= len(row):
row.extend([""] * (idx - len(row) + 1))
row[idx] = col["values"][rowidx][subindex-firstidx]
else:
idx = colindex - 1
if idx >= len(row):
row.extend([""] * (idx - len(row) + 1))
row[idx] = col["values"][rowidx]
csvdata.append(row)
# Delete the values field (since we've extracted the csv data)
for col in table["columns"]:
if "values" in col:
del col["values"]
return csvdata
def _create_csvs(self, lipd, datadir):
csvs = {}
datakeys = ["paleoData", "chronData"]
for datakey in datakeys:
if datakey in lipd:
for data in lipd[datakey]:
if "measurementTable" in data:
for table in data["measurementTable"]:
csvs[table["filename"]] = self._get_table_data(table)
if "model" in data:
for model in data["model"]:
if "ensembleTable" in model:
for table in model["ensembleTable"]:
csvs[table["filename"]] = self._get_table_data(table)
if "summaryTable" in model:
for table in model["summaryTable"]:
csvs[table["filename"]] = self._get_table_data(table)
for csvname, csvdata in csvs.items():
# writing to csv file
with open(f"{datadir}/{csvname}", 'w') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerows(csvdata)
def _zip_directory(self, datadir, lipdfile):
# Zip the bag
with zipfile.ZipFile(lipdfile, 'w') as zipf:
for root, dirs, files in os.walk(datadir):
for file in files:
zipf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), os.path.join(datadir, '..')))
'''
def _zip_lipd_dir(self, lipddir, lipdfile):
if lipdfile.startswith("http"):
# If this is a URL
# Handle special characters in url (if any)
res = urlparse(lipdfile)
lipdurl = urlunparse(res._replace(path=quote(res.path)))
# Open url and unzip
resp = urlopen(lipdurl)
with zipfile.ZipFile(BytesIO(resp.read())) as zip_ref:
zip_ref.extractall(unzipdir)
else:
# If this is a local file
# Unzip file
with zipfile.ZipFile(lipdfile, 'r') as zip_ref:
zip_ref.extractall(unzipdir)
'''
def _get_property_details(self, pname, schema) :
details = { "name": pname }
# Create property
if schema and (pname in schema) :
for skey,svalue in schema[pname].items() :
details[skey] = svalue
return details
def _get_rdf_property_details(self, pname, fullkey, schema) :
key = pname
pname = lcfirst(pname)
details = { "name": pname }
# Check for full key in schema
if schema and (fullkey in schema):
for ind, svalue in schema[fullkey].items():
details[ind] = svalue
# Or check for just pname in schema
elif schema and (key in schema):
for ind, svalue in schema[key].items():
details[ind] = svalue
return details;
def _get_schema_reverse_map(self) :
newschema = {}
for schid,sch in self.schema.items() :
newsch = {}
for prop,details in sch.items() :
if (prop[0] == "@") :
continue
if (("skip_auto_convert_to_json" in details)) :
continue
pdetails = self._get_property_details(prop, sch)
pname = pdetails["name"]
pdetails["name"] = prop
newsch[pname] = pdetails
if (("category" in pdetails)) :
catpname = pname + "." + ucfirst(pdetails["category"])
newsch[catpname] = pdetails
if (("schema" in pdetails)) :
schpname = pname + "." + ucfirst(pdetails["schema"])
newsch[schpname] = pdetails
newschema[schid] = newsch
return newschema
def _local_name(self, url):
return str(url).split("#")[-1]
def _get_prop_values_from_query_result_p_o(self, qres):
result = {}
for row in qres:
pname = self._local_name(row.p)
if pname not in result:
result[pname] = []
value = {}
if isinstance(row.o, URIRef):
value["@type"] = "uri"
value["@id"] = str(row.o)
else:
value["@type"] = "literal"
value["@value"] = row.o.value
value["@datatype"] = str(row.o.datatype)
result[pname].append(value)
return result
def _get_facts(self, id):
query = f"SELECT ?p ?o WHERE {{ <{id}> ?p ?o }}"
qres = self.graph.query(query)
return self._get_prop_values_from_query_result_p_o(qres)
def _get_indexed_facts(self, id):
if id in self.allfacts:
return
facts = self._get_facts(id)
self.allfacts[id] = facts
for pname, pfacts in facts.items():
for pfact in pfacts:
if pfact["@type"] == "uri":
if pname != "type":
self._get_indexed_facts(pfact["@id"])
def _convert_to_lipd(self, id, category, schemaname, pagesdone={}) :
if id in self.allfacts:
facts = self.allfacts[id]
if id in pagesdone:
return pagesdone[id]
schema = self.rschema[schemaname] if (schemaname in self.rschema) else None
if (schemaname and not category) :
category = schemaname
if "type" in facts:
cats = facts["type"]
for cat in cats:
if (cat["@type"] == "uri") :
category = self._local_name(cat["@id"])
break
obj = {
"@id":id,
"@category":category,
"@schema":schemaname
}
pagesdone[id] = obj
for pname, pfacts in facts.items() :
if pname in REVERSE_BLACKLIST:
continue
prop = pname
prop = re.sub("/\\s/", "_", prop)
# Get a sample value page category, and use to make a property key
propkey = prop
for value in pfacts:
if value["@type"] == "uri" :
if value["@id"] in self.allfacts :
pfact = self.allfacts[value["@id"]]
if "type" in pfact:
valcats = pfact["type"]
for valcat in valcats:
if (valcat["@type"] == "uri") :
valcatname = self._local_name(valcat["@id"])
propkey = prop + "." + valcatname
break
details = self._get_rdf_property_details(prop, propkey, schema)
name = details["name"]
ptype = details["type"] if ("type" in details) else None
cat = details["category"] if ("category" in details) else None
sch = details["schema"] if ("schema" in details) else None
if (cat and not sch) :
sch = cat
toJson = details["toJson"] if ("toJson" in details) else None
multiple = details["multiple"] if ("multiple" in details) else False
if len(pfacts) > 0 :
if (multiple) :
obj[name] = []
for pfact in pfacts :
if pfact["@type"] == "uri" :
val = self._convert_to_lipd(pfact["@id"], cat, sch, pagesdone)
else:
val = pfact["@value"]
if (toJson) :
fn = getattr(self, toJson)
val = fn(val)
# If there is already a value present
# - Then this need to be marked as "multiple"
if (not multiple) and (name in obj) and (not type(obj[name]) is list):
multiple = True
obj[name] = [obj[name]]
if (multiple) :
obj[name].append(val)
else :
obj[name] = val
return obj
else :
return re.sub("/_/", " ", id)
def _post_processing(self, obj, parent=None):
if not(type(obj) is dict):
return obj
if not("@schema" in obj):
return obj
# FIXME: Hack to avoid recursion
schemaname = obj["@schema"]
tschema = self.schema[schemaname] if schemaname in self.schema else None
if tschema and "@toJson_pre" in tschema:
for func in tschema["@toJson_pre"]:
fn = getattr(self, func)
obj = fn(obj, parent)
for key,value in obj.items():
if type(value) is list:
for i in range(len(value)):
obj[key][i] = self._post_processing(value[i], obj)
else:
obj[key] = self._post_processing(value, obj)
if tschema and "@toJson" in tschema:
for func in tschema["@toJson"]:
fn = getattr(self, func)
obj = fn(obj, parent)
# FIXME: Only remove hasValues.. LiPD CSVs should be created outside this function beforehand
if "hasValues" in obj:
valuestr = obj["hasValues"]
obj["values"] = json.loads(valuestr)
del obj["hasValues"]
del obj["@id"]
del obj["@schema"]
del obj["@category"]
if "type" in obj:
del obj["type"]
return obj
#########################################
# Converters
#########################################
def _location_to_json(self, geo, parent = None) :
geojson = {
"geometry":
{
"coordinates" : [0, 0, 0],
},
"properties": {}
}
if "coordinates" in geo :
latlong = geo["coordinates"].split(",")
geojson["geometry"]["coordinates"] = [ float(latlong[1]), float(latlong[0]), float(latlong[2]) if len(latlong)>2 else 0 ]
geojson["geometry"]["type"] = "Point"
if "long" in geo :
geojson["geometry"]["coordinates"][0] = float(geo["long"])
if "longitude" in geo :
geojson["geometry"]["coordinates"][0] = float(geo["longitude"])
if "lat" in geo :
geojson["geometry"]["coordinates"][1] = float(geo["lat"])
if "latitude" in geo :
geojson["geometry"]["coordinates"][1] = float(geo["latitude"])
if "alt" in geo and geo["alt"] != "NA":
geojson["geometry"]["coordinates"][2] = float(geo["alt"])
if "elevation" in geo and geo["elevation"] != "NA":
geojson["geometry"]["coordinates"][2] = float(geo["elevation"])
for prop,value in geo.items() :
if prop[0] == "@" :
continue
if prop == "locationType" :
geojson["type"] = geo["locationType"]
else :
if prop == "coordinates" or prop == "coordinatesFor":
# Ignore
pass
else :
if re.search(r"^(geo|wgs84):", prop) :
# Ignore
pass
elif prop in ["long", "lat", "alt"] :
pass
else :
geojson["properties"][prop] = value
return geojson
def _get_google_spreadsheet_key(self, url:str, parent = None) :
return url.replace("https://docs.google.com/spreadsheets/d/", "")
def _remove_found_in_table(self, var, parent = None) :
if "foundInTable" in var :
del var["foundInTable"]
return var
def _remove_found_in_dataset(self, var, parent = None) :
if "foundInDataset" in var :
del var["foundInDataset"]
return var
def _unwrap_uncertainty(self, var, parent = None) :
if (("hasUncertainty" in var)) :
unc = var["hasUncertainty"]
if (("hasValue" in unc)) :
var["uncertainty"] = float(unc["hasValue"])
del unc["hasValue"]
for key,value in unc.items() :
if (key[0] != "@") :
var[key] = value
del var["hasUncertainty"]
return var
def _unwrap_integration_time(self, interp, parent = None) :
if (("integrationTime" in interp)) :
intime = interp["integrationTime"]
if (("hasValue" in intime)) :
interp["integrationTime"] = float(intime["hasValue"])
del intime["hasValue"]
for key,value in intime.items() :
if (key[0] != "@") :
interp["integrationTime" + ucfirst(key)] = value
del interp["hasIntegrationTime"]
return interp
def _collect_variables_by_id(self, item, arr) :
if not (type(item) is dict):
return arr
# Data is a Hash
if ("@category" in item) and ("@id" in item) and re.match("/Variable\$/", item["@category"]) :
arr[item["@id"]] = item
else :
for key,value in item.items() :
if (key[0] != "@") :
arr = self._collect_variables_by_id(item[key], arr)
return arr
def _set_archive_type_label(self, ds, parent = None) :
if "hasArchiveType" in ds :
if "@id" in ds["hasArchiveType"]:
id = ds["hasArchiveType"]["@id"]
if id in RSYNONYMS:
ds["archiveType"] = RSYNONYMS[id]
else:
ds["archiveType"] = ds["hasArchiveType"]["label"]
del ds["hasArchiveType"]
return ds
def _set_variable_name_from_standard_variable_label(self, var, parent = None) :
if "hasStandardVariable" in var :
if "@id" in var["hasStandardVariable"]:
id = var["hasStandardVariable"]["@id"]
if id in RSYNONYMS:
var["variableName"] = RSYNONYMS[id]
else:
var["variableName"] = var["hasStandardVariable"]["label"]
del var["hasStandardVariable"]
return var
def _set_units_label(self, var, parent = None) :
if "hasUnits" in var :
if "@id" in var["hasUnits"]:
id = var["hasUnits"]["@id"]
if id in RSYNONYMS:
var["units"] = RSYNONYMS[id]
else:
var["units"] = var["hasUnits"]["label"]
del var["hasUnits"]
return var
def _set_proxy_label(self, var, parent = None) :
if "hasProxy" in var :
if "@id" in var["hasProxy"]:
id = var["hasProxy"]["@id"]
if id in RSYNONYMS:
var["proxy"] = RSYNONYMS[id]
else:
var["proxy"] = var["hasProxy"]["label"]
del var["hasProxy"]
return var
def _set_proxy_general_label(self, var, parent = None) :
if "hasProxyGeneral" in var :
if "@id" in var["hasProxyGeneral"]:
id = var["hasProxyGeneral"]["@id"]
if id in RSYNONYMS:
var["proxyGeneral"] = RSYNONYMS[id]
else:
var["proxyGeneral"] = var["hasProxyGeneral"]["label"]
del var["hasProxyGeneral"]
return var
def _set_interpretation_variable_label(self, interp, parent = None) :
if "hasVariable" in interp :
if "@id" in interp["hasVariable"]:
id = interp["hasVariable"]["@id"]
if id in RSYNONYMS:
interp["variable"] = RSYNONYMS[id]
else:
interp["variable"] = interp["hasVariable"]["label"]
del interp["hasVariable"]
return interp
def _set_seasonality_labels(self, interp, parent = None) :
convs = {
"hasSeasonality": "seasonality",
"hasSeasonalityGeneral": "seasonalityGeneral",
"hasSeasonalityOriginal": "seasonalityOriginal"
}
for pid in convs:
if pid in interp :
if "@id" in interp[pid]:
id = interp[pid]["@id"]
nid = convs[pid]
if id in RSYNONYMS:
interp[nid] = RSYNONYMS[id]
else:
interp[nid] = interp[pid]["label"]
del interp[pid]
return interp
def _create_publication_identifier(self, pub, parent = None) :
identifiers = []
if (("hasDOI" in pub)) :
identifier = {}
identifier["type"] = "doi"
identifier["id"] = pub["hasDOI"]
if (("link" in pub)) :
for link in pub["link"].values() :
if (re.match("/dx.doi.org/", link)) :
identifier["url"] = link
del pub["link"]
del pub["hasDOI"]
identifiers.append(identifier)
pub["identifier"] = identifiers
return pub
def _values_to_array(self, resolution, parent = None) :
if (("values" in resolution)) :
return resolution["values"].split(",")
def _unarray_column_number(self, var, parent = None) :
if not var:
return var
if ("number" in var) and (type(var["number"]) is list) and (len(var["number"]) == 1) :
var["number"] = var["number"][0]
if ("number" in var) and (type(var["number"]) is str) :
var["number"] = json.loads(var["number"])
return var
def _extract_variable_values(self, var, parent = None) :
if "hasValues" in var:
valuestr = var["hasValues"]
values = json.loads(valuestr)
if type(values) is dict and "base64_zlib" in values:
values = unzip_string(values["base64_zlib"])
var["hasValues"] = values
return var
def _changes_to_json(self, change, parent=None):
newChange = {}
if "name" in change:
newChange[change["name"]] = change.get("notes", [])
return newChange
return None