Source code for pylipd.utils.lipd_to_rdf

"""
The LipdToRDF class helps in converting a LiPD file to an RDF Graph.
It uses the SCHEMA dictionary (from globals/schema.py) to do the conversion
"""

import copy
import json
import pickle
import re
import os
import os.path
import zipfile
import tempfile
import pandas as pd

from rdflib import RDFS
from rdflib.graph import ConjunctiveGraph, Literal, RDF, URIRef, BNode, Collection
from rdflib.namespace import XSD

from io import BytesIO
from urllib.request import urlopen
from urllib.parse import urlparse, urlunparse, quote

from ..globals.urls import NSURL, DATAURL, ONTONS, NAMESPACES
from ..globals.blacklist import BLACKLIST
from ..globals.synonyms import SYNONYMS
from ..globals.schema import SCHEMA

from .utils import expand_schema, ucfirst, lcfirst, camelCase, unCamelCase, escape, uniqid, sanitizeId

[docs] class LipdToRDF: """ The LipdToRDF class helps in converting a LiPD file to an RDF Graph. It uses the SCHEMA dictionary (from globals/schema.py) to do the conversion """ def __init__(self, standardize=True, add_labels=True): self.graph = ConjunctiveGraph() self.lipd_csvs = {} self.graphurl = NSURL self.namespace = NSURL + "/" self.standardize = standardize self.add_labels = add_labels self.schema = expand_schema(copy.deepcopy(SCHEMA))
[docs] def convert(self, lipdpath): '''Convert LiPD file to RDF Graph Parameters ---------- lipdpath : str path to lipd file (the path could also be a url) ''' self.graph = ConjunctiveGraph() lpdname = os.path.basename(lipdpath).replace(".lpd", "") lpdname = re.sub("\?.+$", "", lpdname) self.graphurl = NSURL + "/" + lpdname with tempfile.TemporaryDirectory(prefix="lipd_to_rdf_") as tmpdir: self._unzip_lipd_file(lipdpath, tmpdir) jsons = self._find_files_with_extension(tmpdir, 'jsonld') for jsonpath, _ in jsons: jsondir = os.path.dirname(jsonpath) csvs = self._find_files_with_extension(jsondir, 'csv') self.lipd_csvs = {} for csvpath, _ in csvs: csvname = os.path.basename(csvpath) try: self.lipd_csvs[csvname] = pd.read_csv(csvpath, header=None) except: # If normal load doesn't work, try to detect the number of columns and load it that way print(f"WARNING: CSV file '{csvname}' might have inconsistent number of columns !!\nDetecting number of columns to load ..\n") self.lipd_csvs[csvname] = self._detect_columns_and_load(csvpath) self._load_lipd_json_to_graph(jsonpath)
def _detect_columns_and_load(self, filename): # detect number of columns num_columns=0 with open(filename) as f: for line in f.readlines(): num = len(line.split(',')) if num > num_columns: num_columns=num # load with pre-determined number of columns df=pd.read_csv(filename,names=range(num_columns)) return df
[docs] def serialize(self, topath, type="rdf"): '''Write LiPD RDF Graph to RDF file (or Pickle file) Parameters ---------- topath : str path to the output file type : str the output file type : rdf or pickle (we store the pickled rdf graph for efficiency sometimes) ''' if self.graph: if type == "rdf": self.graph.serialize(topath, format="nquads", encoding="utf-8") elif type == "pickle": with open(topath, 'wb') as f: pickle.dump(self.graph, f)
def _unzip_lipd_file(self, lipdfile, unzipdir): if lipdfile.startswith("http"): # If this is a URL # Handle special characters in url (if any) res = urlparse(lipdfile) lipdurl = urlunparse(res._replace(path=quote(res.path))) # Open url and unzip resp = urlopen(lipdurl) with zipfile.ZipFile(BytesIO(resp.read())) as zip_ref: zip_ref.extractall(unzipdir) else: # If this is a local file # Unzip file with zipfile.ZipFile(lipdfile, 'r') as zip_ref: zip_ref.extractall(unzipdir) def _parse_persons_string(self, author_string, parent = None) : # Check for semi-colon delimiter and split accordingly if ";" in author_string: author_split = re.split("\s*;\s*", author_string) # Further split the authors with commas if necessary author_list = [] for author in author_split: author = author if "," in author: last_first = re.split("\s*,\s*", author) author_list.append(f"{last_first[1]} {last_first[0]}") else: author_list.append(author) authors = author_list else: # Split the author string with commas author_list = [] author_split = re.split("\s*,\s*", author_string) if len(author_split) % 2 == 0: # Even number : last name first name for i in range(0, len(author_split), 2): author_list.append(f"{author_split[i+1]} {author_split[i]}") else: # Odd number : first name last name for author in author_split: author_list.append(author) authors = author_list return authors def _parse_persons(self, auths, parent = None) : authors = [] if (not type(auths) is list) : auths = [auths] for authstr in auths: authname = None if type(authstr) is dict: if "name" in authstr: authname = authstr["name"] else: authname = authstr if authname: auth = self._parse_persons_string(authname, parent) if type(auth) is list: authors.extend(auth) else: authors.append(auth) return [{"name": auth} for auth in authors] def _parse_location(self, geo, parent = None) : ngeo = {} ngeo["locationType"] = geo["type"] if "type" in geo else None ngeo["coordinatesFor"] = parent["@id"] coords = geo["geometry"]["coordinates"] if (coords and len(coords) > 0) : ngeo["coordinates"] = str(str(coords[1]) + ",") + str(coords[0]) ngeo["wgs84:lat"] = coords[1] ngeo["hasLatitude"] = coords[1] ngeo["latitude"] = coords[1] ngeo["wgs84:long"] = coords[0] ngeo["longitude"] = coords[0] ngeo["hasLongitude"] = coords[0] if (len(coords) > 2) : ngeo["wgs84:alt"] = coords[2] ngeo["elevation"] = coords[2] ngeo["hasElevation"] = coords[2] if "properties" in geo and isinstance(geo["properties"], dict) : for key,value in geo["properties"].items() : ngeo[key] = value elif isinstance(geo, dict) : for key,value in geo.items() : if key != "geometry": # Do not add lat long if they are already added if f"wgs84:{key}" not in ngeo: ngeo[key] = value return ngeo def _get_uncertainty(self, val, parent = None) : uncertainty = {} uncertainty["hasValue"] = val uncertainty["analytical"] = val uncertainty["reproducibility"] = val return uncertainty def _get_google_spreadsheet_url(self, key, parent = None) : return "https://docs.google.com/spreadsheets/d/" + str(key) + "" def _get_parent_property(self, obj, prop) : parent = obj["@parent"] while (parent) : if ((prop in parent)) : return parent[prop] parent = parent["@parent"] return None def _get_parent_with_property_value(self, obj, prop, val) : parent = obj["@parent"] while (parent) : if ((prop in parent) and parent[prop] == val) : return parent parent = parent["@parent"] return None def _set_identifier_properties(self, pub, objhash) : props = {} if "identifier" in pub : for identifier in pub["identifier"] : if identifier["type"] == "doi" : if "hasDOI" not in pub: pub["hasDOI"] = [] pub["hasDOI"].append(identifier["id"]) else : if identifier["type"] == "issn" : if "hasISSN" not in pub: pub["hasISSN"] = [] pub["hasISSN"].append(identifier["id"]) elif identifier["type"] == "isbn" : if "hasISBN" not in pub: pub["hasISBN"] = [] pub["hasISBN"].append(identifier["id"]) if (("url" in identifier)) : if "hasLink" not in pub: pub["hasLink"] = [] pub["hasLink"].append(identifier["url"]) del pub["identifier"] return [pub, objhash, []] def _values_to_string(self, obj, objhash) : if "values" in obj : if (type(obj["values"]) is list) : obj["values"] = ", ".join(obj["values"]) return [obj, objhash, []] def _guess_sensor_type(self, archive, observation, sensor) : if (('sensorGenus' in sensor) or ('sensorSpecies' in sensor)) : if (archive == "MarineSediment") : return "Foraminifera" elif (archive == "Coral") : return "Polyp" elif (archive == "Wood") : return "Vegetation" elif (archive == "MolluskShell") : return "Bivalves" elif (archive == "Sclerosponge") : return "Sponge" return "OrganicSensor" else : if (archive == "MarineSediment" and (observation == "Uk37" or observation == "Alkenone")) : type = "Coccolithophores" elif (archive == "MarineSediment" and observation == "TEX86") : type = "Archea" elif (archive == "MarineSediment" and observation == "D18O") : type = "Foraminifera" elif (archive == "MarineSediment" and observation == "Mg/Ca") : type = "Foraminifera" elif (archive == "LakeSediment" and (observation == "Uk37" or observation == "Alkenone")) : type = "Coccolithophores" elif (archive == "LakeSediment" and observation == "TEX86") : type = "Archea" elif (archive == "LakeSediment" and observation == "Midge") : type = "Chironomids" elif (archive == "LakeSediment" and observation == "BSi") : type = "Diatoms" elif (archive == "LakeSediment" and observation == "Chironomid") : type = "Chironomids" elif (archive == "LakeSediment" and observation == "Reflectance") : type = "PhotosyntheticAlgae" elif (archive == "LakeSediment" and observation == "Pollen") : type = "Watershed" elif (archive == "Coral") : return "Polyp" elif (archive == "Wood") : return "Vegetation" elif (archive == "MolluskShell") : return "Bivalves" elif (archive == "Sclerosponge") : return "Sponge" elif (archive == "Speleothem") : return "Karst" elif (archive == "GlacierIce") : return "Snow" elif (archive == "LakeSediment" and observation == "VarveThickness") : return "Catchment" elif (archive == "GlacierIce" and observation == "Melt") : return "IceSurface" elif (archive == "Borehole") : return "Soil" else : return "InorganicSensor" def _get_observation(self, observation) : if observation is None: return None if (observation.lower() == "alkenone") : return "Uk37" return camelCase(observation) def _get_variable_id(self, obj, parentid) : iobj = dict((k.lower(), v) for k, v in obj.items()) if "tsid" not in iobj: iobj["tsid"] = uniqid() id = parentid + "." + iobj["tsid"] id += "." + str(iobj.get("variablename", "")) return id def _wrap_integration_time(self, obj, objhash) : objid = obj["@id"] # Deal with integrationTime pvals = {} for key,value in obj.items() : if (re.search(r"^integrationTime\$", key, re.IGNORECASE)) : pvals["hasValue"] = value del obj[key] else: m = re.search(r"^integrationTime(.+)", key) if m is not None: nkey = m.groups()[0] nkey_lcfirst = lcfirst(nkey) pvals[nkey_lcfirst] = value del obj[key] if len(pvals.values()) > 0: intimeid = objid + '.IntegrationTime' obj['integrationTime'] = intimeid intime = {} intime['@id'] = intimeid intime['@category'] = 'IntegrationTime' intime['@schema'] = 'IntegrationTime' intime.update(pvals) objhash[intimeid] = intime return [obj, objhash, [intimeid]] return [obj, objhash, []] def _add_interpretation_rank(self, obj, objhash): if "rank" not in obj or type(obj["rank"]) != int: rank = obj["@index"] - 1 obj["rank"] = rank return [obj, objhash, []] def _wrap_uncertainty(self, obj, objhash) : objid = obj["@id"] # Deal with uncertainty pvals = {} keys_to_be_deleted = [] for key,value in obj.items() : if (re.search(r"^uncertainty\$", key, re.IGNORECASE)) : pvals["hasValue"] = value keys_to_be_deleted.append(key) elif (re.search(r"^uncertainty", key, re.IGNORECASE)) : pvals[key] = value keys_to_be_deleted.append(key) for key in keys_to_be_deleted: del obj[key] if len(pvals.values()) > 0 : uncid = "" + str(objid) + ".Uncertainty" obj["hasUncertainty"] = uncid uncertainty = { "@id": uncid, "@category": "Uncertainty" } for prop,value in pvals.items() : uncertainty[prop] = value objhash[uncid] = uncertainty return [obj, objhash, [uncid]] return [obj, objhash, []] def _add_found_in_table(self, obj, objhash) : obj["foundInTable"] = obj["@parent"]["@id"] return [obj, objhash, []] def _add_found_in_dataset(self, obj, objhash) : parent = obj["@parent"] top = parent while (parent) : top = parent parent = parent["@parent"] obj["foundInDataset"] = top["@id"] return [obj, objhash, []] # Unroll the list to a rdf first/rest structure def _unroll_values_list_to_rdf(self, lst: list, dtype): listitems = [] for idx, item in enumerate(lst): listitems.append(Literal(item, datatype=(XSD[dtype] if dtype in XSD else None))) listid = BNode() list = Collection(self.graph, listid, listitems) return listid def _add_variable_values(self, obj, objhash) : csvname = obj["@parent"]["@id"] + ".csv" if "number" not in obj: obj["number"] = obj["@index"] if type(obj["number"]) is str: obj["number"] = int(obj["number"]) if not isinstance(obj["number"], list): obj["number"] = [obj["number"]] indices = [int(col)-1 for col in obj["number"]] if csvname in self.lipd_csvs: df = self.lipd_csvs[csvname] values = [] if len(indices) == 1: if indices[0] < len(df.columns): df_values = df[indices[0]] values = df_values.tolist() #dtype = "float" if df_values.dtypes == "float64" else "string" else: df_values = df[indices] values = df_values.values.tolist() #dtype = "float" if df_values[0].dtypes == "float64" else "string" # TODO: Dumping to json string for now. # - Compressing string if it is too long (otherwise loading to RDF hangs) valstring = json.dumps(values) #if len(valstring) > 1000000: # valstring =json.dumps({"base64_zlib": zip_string(valstring)}) obj["hasValues"] = valstring # rdf:Seq doesn't seem to be importing well in GraphDB #bnodeid = self._unroll_values_list_to_rdf(values, dtype) #obj["hasValues"] = bnodeid return [obj, objhash, []] return [obj, objhash, []] def _add_standard_variable(self, obj, objhash) : if "variableName" in obj: name = obj["variableName"] synonyms = SYNONYMS["VARIABLES"]["PaleoVariable"] if type(name) is str and name.lower() in synonyms: obj["hasStandardVariable"] = synonyms[name.lower()]["id"] # Only add object label in the current graph if set if self.add_labels: label = synonyms[name.lower()]["label"] self._set_object_label(obj["hasStandardVariable"], label) return [obj, objhash, []] def _stringify_column_numbers_array(self, obj, objhash): if "number" in obj and isinstance(obj["number"], list) and len(obj["number"]) > 1: obj["hasColumnNumber"] = json.dumps(obj["number"]) del obj["number"] return [obj, objhash, []] def _modify_structure_if_needed(self, obj, objhash, schema) : if (("@fromJson" in schema)) : for func in schema["@fromJson"]: fn = getattr(self, func) (obj, objhash, newids) = fn(obj, objhash) for newid in newids : if ((newid in objhash)) : newobj = objhash[newid] if (type(newobj) is dict) and ("@category" in newobj) : newschid = newobj["@category"] newschema = self.schema[newschid] if (newschid in self.schema) else {} (objhash[newid], objhash) = self._modify_structure_if_needed(newobj, objhash, newschema) return [obj, objhash] def _get_compound_key_id(self, compound_key, obj) : tobj = obj for key in compound_key : if ((type(tobj) is dict) and (key in tobj)) : tobj = tobj[key] else : return None if not type(tobj) is dict: return tobj return None def _get_binding_key_id(self, key, obj) : key_options = key.split("|") for optkey in key_options : compound_key = optkey.split(".") keyid = self._get_compound_key_id(compound_key, obj) if (keyid) : return keyid return uniqid() def _get_function_key_id(self, fn, arg, curobjid) : if (fn == "trunc") : return curobjid[0:0 + len(curobjid) - int(arg)] elif (fn == "uniqid") : return str(curobjid) + uniqid(arg) return curobjid def _create_id_from_pattern(self, pattern, obj) : objid = "" for key in pattern : m = re.search(r"{(.+)}", key) if m and len(m.groups()) > 0 : objid += str(self._get_binding_key_id(m.groups()[0], obj)) else : m = re.search(r"_(.+)\((.*)\)", key) if m and len(m.groups()) > 1: fn = m.groups()[0] arg = m.groups()[1] objid = str(self._get_function_key_id(fn, arg, objid)) else : objid += str(key) return objid def _fix_title(self, titleid) : return titleid.replace(r"@\\x{FFFD}@u", '_') def _get_object_id(self, obj, category, schema) : if type(obj) is dict: objid = "Unknown." + uniqid(category) else: objid = ucfirst(obj).replace(" ", "_") if (("@id" in schema)) : objid = self._create_id_from_pattern(schema["@id"], obj) return self._fix_title(objid) def _map_lipd_to_json(self, obj, parent, index, category, schemaname, hash) : schema = self.schema[schemaname] if (schemaname in self.schema) else {} if not type(obj) is dict: return obj obj["@parent"] = parent obj["@index"] = index obj["@schema"] = schemaname objid = self._get_object_id(obj, category, schema) if (("@id" in obj)) : objid = obj["@id"] if ((objid in hash)) : return objid obj["@id"] = objid (obj, hash) = self._modify_structure_if_needed(obj, hash, schema) if ("@category" in obj) : category = obj["@category"] hash[objid] = { "@id": objid, "@category" : category, "@schema" : schemaname } item = hash[objid] if type(obj) is dict : for propkey,value in obj.items() : if (propkey[0] == "@") : continue if propkey in BLACKLIST : continue details = {} pname = propkey if propkey in schema : details = schema[propkey] pname = details["name"] if ("name" in details) else propkey dtype = details["type"] if ("type" in details) else None cat = details["category"] if ("category" in details) else None sch = details["schema"] if ("schema" in details) else None fromJson = details["fromJson"] if ("fromJson" in details) else None if (sch and not cat) : cat = sch if (fromJson) : fn = getattr(self, fromJson) value = fn(value, obj) if (not value) : continue if (pname) : if (type(value) is list) : index = 1 for subvalue in value: if (type(subvalue) is dict): if propkey not in item: item[propkey] = [] item[propkey].append(self._map_lipd_to_json(subvalue, obj, index, cat, sch, hash)) index+=1 else : if (type(value) is dict): item[propkey] = self._map_lipd_to_json(value, obj, None, cat, sch, hash) else : item[propkey] = value else : if (type(value) is dict): for subpropkey,subvalue in value.items() : item[subpropkey] = subvalue continue if (not pname) : continue if (type(value) is list): index = 1 for subvalue in value: if propkey not in item: item[propkey] = [] item[propkey].append(self._map_lipd_to_json(subvalue, obj, index, cat, sch, hash)) index+=1 else : if (type(value) is dict): if propkey not in item: item[propkey] = [] item[propkey].append(self._map_lipd_to_json(value, obj, None, cat, sch, hash)) else : if (dtype == "Individual") : item[propkey] = value if (not (value in hash)) : hash[value] = { "@id" : value, "@category" : cat, "@schema" : sch } else : item[propkey] = value hash[objid] = item return objid def _guess_data_value_type(self, val) : value = str(val) if (re.search(r"^-?\d+$", value)) : return "float" #"integer" if (re.search(r"^-?\d+\.\d+$", value)) : return "float" if (re.search(r"^[2][0-9]{3}[-][0-1][0-9][-][0-3][0-9]( |T)[0-9]{2}:[0-9]{2}:[0-9]{2}", value)) : return "datetime" if (re.search(r"^[2][0-9]{3}[-][0-1][0-9][-][0-3][0-9]", value)) : return "date" if (re.search(r"^(true|false)$", value, re.IGNORECASE)) : return "boolean" if (re.search(r"^http", value)) : return "url" #if (re.search(r"^.+@.+\..+", value)) : # return "Email" if (re.search(r"^\".+\"$", value)) : return "string" if (re.search(r"^'.+'$", value)) : return "string" return "string" def _guessValueType(self, value) : if value: if type(value) is list : for subvalue in value : return self._guessValueType(subvalue) elif type(value) is dict : return "Individual" else : valtype = self._guess_data_value_type(value) return valtype return "string" def _get_property_details(self, key, schema, value) : pname = key details = { "name": pname } if (key in schema) and ("@@processed" in schema[key]) : return schema[key] # Get details from schema if (key in schema) : for skey,svalue in schema[key].items() : details[skey] = svalue if (("schema" in details)) : details["type"] = "Individual" pname = lcfirst(details["name"]) # Get more details from the property definition (if it exists) """ newname = resolveProperty(pname) if (newname) : details["type"] = getOntPropertyRange(newname) details["name"] = newname """ if (not ("type" in details)) : details["type"] = self._guessValueType(value) if (not ("type" in details)) : details["type"] = "string" details["@@processed"] = True schema[key] = details return details # Create individual def _create_individual(self, objid) : return self.namespace + sanitizeId(objid) # Create class def _create_class(self, category) : return ONTONS + sanitizeId(category) # Create property def _create_property(self, prop, dtype, cat, multiple) : nsprop = prop.split(":", 2) ns = ONTONS if len(nsprop) > 1 : prefix = nsprop[0] if prefix in NAMESPACES: ns = NAMESPACES[prefix] prop = nsprop[1] return [ ns + lcfirst(sanitizeId(prop)), dtype, cat, multiple ] # Set individual classes def _set_individual_classes(self, objid, category, extracats) : if objid and category: self.graph.add(( URIRef(objid), RDF.type, URIRef(category), URIRef(self.graphurl) )) for ecat in extracats: if objid and ecat: self.graph.add(( URIRef(objid), RDF.type, URIRef(self._create_class(ecat)), URIRef(self.graphurl) )) # Set object label def _set_object_label(self, objid, label) : if objid and label: self.graph.add(( URIRef(objid), RDFS.label, Literal(label), URIRef(self.graphurl) )) # Set property value def _set_property_value( self, objid, prop, value ): if (type(value) is list) : for subvalue in value : self._set_property_value(objid, prop, subvalue) return (propid, dtype, cat, multiple) = prop if objid and value: objitem = None if dtype == "float" or dtype == "integer": if re.search("^.*[^a-zA-Z]?nan[^a-zA-Z]?.*$", str(value).lower()): return if re.search("^.*[^a-zA-Z]?na[^a-zA-Z]?.*$", str(value).lower()): return if type(value) is str: value = escape(value) if dtype == "boolean": value = str(value).lower() if value != "true": value = "false" elif dtype == "float": m = re.search(r"(\-?\d+\.?\d*)", str(value)) if m: value = m.group(1) else: value = 0.0 elif dtype == "integer": m = re.search(r"(\-?\d+)", str(value)) if m: value = m.group(1) else: value = 0 if dtype == "Individual": value = self._create_individual(value) objitem = URIRef(value) elif dtype == "EnumeratedIndividual": objitem = URIRef(value) elif dtype == "List": objitem = value else: objitem = Literal(value, datatype=(XSD[dtype] if dtype in XSD else None)) # FIXME: Do not add if property doesn't allow multiple values and a value already exists if not multiple: existing = list(self.graph.triples((URIRef(objid), URIRef(propid), None))) if len(existing) > 0: return self.graph.add(( URIRef(objid), URIRef(propid), objitem, URIRef(self.graphurl) )) def _create_individual_full(self, obj) : category = obj["@category"] extracats = obj["@extracats"] if ("@extracats" in obj) else {} schemaname = obj["@schema"] if ("@schema" in obj) else category schema = self.schema[schemaname] if (schemaname in self.schema) else {} objid = obj["@id"] if (not objid) : return # Create category if (category) : category = self._create_class(category) objid = self._create_individual(objid) # Set Individual classes self._set_individual_classes(objid, category, extracats) for key,value in obj.items() : if (key[0] == "@") : continue details = self._get_property_details(key, schema, value) prop = details["name"] dtype = details["type"] synonyms = details.get("synonyms", {}) cat = details["category"] if ("category" in details) else None sch = details["schema"] if ("schema" in details) else None if (sch and not cat) : cat = sch fromJson = details["fromJson"] if ("fromJson" in details) else None multiple = details["multiple"] if ("multiple" in details) else False if (not prop) : continue # Create Property propDI = self._create_property(prop, dtype, cat, multiple) # Set property value if dtype == "Individual": if type(value) is str and synonyms: # If the value is a string and there are synonyms for this Individual if value.lower() in synonyms: # If we have a synonym-mapping for the value to an Individual propDI[1] = "EnumeratedIndividual" # Rename property type to be an enumeration synid = synonyms[value.lower()]["id"] if not self.standardize: # If we don't want to standardize, then create a unique id for the individual synid += "." + uniqid() self._set_property_value(objid, propDI, synid) # Only add object label in the current graph if set if self.add_labels: if self.standardize: # Set the standard label for the individual label = synonyms[value.lower()]["label"] else: # Set the user label for the individual label = value self._set_object_label(synid, label) else: # We don't have a synonym-mapping for the value. Create an individual and set its label to the value propDI[1] = "EnumeratedIndividual" synid = self._create_individual(value) + "." + uniqid() self._set_property_value(objid, propDI, synid) self._set_object_label(synid, value) else: # There are no synonyms, and value is not a string. Just use it directly self._set_property_value(objid, propDI, value) elif type(value) is dict: self._set_property_value(objid, propDI, value) else : if (dtype == "File") : # Enable this ? """ fileid = uploadFile(value) if (fileid) : protectIndividual(fileid) data = set_property(data, propDI, fileid) """ else : self._set_property_value(objid, propDI, value) def _find_files_with_extension(self, directory, extension): myregexobj = re.compile('\.'+extension+'$') try: for entry in os.scandir(directory): if entry.is_file() and myregexobj.search(entry.path): yield entry.path, entry.name elif entry.is_dir(): # if its a directory, then repeat process as a nested function yield from self._find_files_with_extension(entry.path, extension) except OSError as ose: print('Cannot access ' + directory +'. Probably a permissions error ', ose) def _load_lipd_json_to_graph(self, jsonpath, url=None): self.graph = ConjunctiveGraph() objhash = {} with open(jsonpath) as f: obj = json.load(f) if "dataSetName" in obj: self.graphurl = NSURL + "/" + sanitizeId(obj["dataSetName"]) self._map_lipd_to_json(obj, None, None, "Dataset", "Dataset", objhash) if url: objhash[obj["@id"]]["hasUrl"] = url else: objhash[obj["@id"]]["hasUrl"] = DATAURL + "/" + obj["@id"] + ".lpd" for key, item in objhash.items(): self._create_individual_full(item)