"""Handle reading, updating and writing of metadata."""
from __future__ import annotations
import copy
import json
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING
from typing import cast
import datadoc_model.all_optional.model as all_optional_model
import datadoc_model.required.model as required_model
from datadoc_model.all_optional.model import DataSetStatus
from upath import UPath
from dapla_metadata._shared import config
from dapla_metadata.dapla import user_info
from dapla_metadata.datasets._merge import DatasetConsistencyStatus
from dapla_metadata.datasets._merge import check_dataset_consistency
from dapla_metadata.datasets._merge import check_variables_consistency
from dapla_metadata.datasets._merge import merge_metadata
from dapla_metadata.datasets._merge import report_metadata_consistency
from dapla_metadata.datasets.compatibility._utils import (
is_metadata_in_container_structure,
)
from dapla_metadata.datasets.compatibility.model_backwards_compatibility import (
upgrade_metadata,
)
from dapla_metadata.datasets.dapla_dataset_path_info import DaplaDatasetPathInfo
from dapla_metadata.datasets.dataset_parser import DatasetParser
from dapla_metadata.datasets.dataset_parser import pretty_print_supported_types
from dapla_metadata.datasets.model_validation import ValidateDatadocMetadata
from dapla_metadata.datasets.statistic_subject_mapping import StatisticSubjectMapping
from dapla_metadata.datasets.utility.constants import (
DEFAULT_SPATIAL_COVERAGE_DESCRIPTION,
)
from dapla_metadata.datasets.utility.constants import NUM_OBLIGATORY_DATASET_FIELDS
from dapla_metadata.datasets.utility.constants import NUM_OBLIGATORY_VARIABLES_FIELDS
from dapla_metadata.datasets.utility.urn import convert_uris_to_urns
from dapla_metadata.datasets.utility.urn import klass_urn_converter
from dapla_metadata.datasets.utility.urn import vardef_urn_converter
from dapla_metadata.datasets.utility.utils import OptionalDatadocMetadataType
from dapla_metadata.datasets.utility.utils import PseudonymizationType
from dapla_metadata.datasets.utility.utils import VariableListType
from dapla_metadata.datasets.utility.utils import VariableType
from dapla_metadata.datasets.utility.utils import build_dataset_path
from dapla_metadata.datasets.utility.utils import build_metadata_document_path
from dapla_metadata.datasets.utility.utils import calculate_percentage
from dapla_metadata.datasets.utility.utils import derive_assessment_from_state
from dapla_metadata.datasets.utility.utils import get_timestamp_now
from dapla_metadata.datasets.utility.utils import (
num_obligatory_dataset_fields_completed,
)
from dapla_metadata.datasets.utility.utils import (
num_obligatory_variables_fields_completed,
)
from dapla_metadata.datasets.utility.utils import read_variables_from_metadata_document
from dapla_metadata.datasets.utility.utils import set_dataset_owner
from dapla_metadata.datasets.utility.utils import set_default_values_dataset
from dapla_metadata.datasets.utility.utils import set_default_values_pseudonymization
from dapla_metadata.datasets.utility.utils import set_default_values_variables
if TYPE_CHECKING:
from datetime import datetime
from upath.types import ReadablePathLike
logger = logging.getLogger(__name__)
[docs]
class Datadoc:
"""Handle reading, updating and writing of metadata.
If a metadata document exists, it is this information that is loaded. Nothing
is inferred from the dataset. If only a dataset path is supplied the metadata
document path is build based on the dataset path.
Example: /path/to/dataset.parquet -> /path/to/dataset__DOC.json
Attributes:
dataset_path: A file path to the path to where the dataset is stored.
metadata_document_path: A path to a metadata document if it exists.
statistic_subject_mapping: An instance of StatisticSubjectMapping.
"""
def __init__(
self,
dataset_path: ReadablePathLike | None = None,
metadata_document_path: ReadablePathLike | None = None,
statistic_subject_mapping: StatisticSubjectMapping | None = None,
errors_as_warnings: bool = False,
validate_required_fields_on_existing_metadata: bool = False,
) -> None:
"""Initialize the Datadoc instance.
If a dataset path is supplied, it attempts to locate and load the
corresponding metadata document. If no dataset path is provided, the class
is instantiated without loading any metadata.
Args:
dataset_path: The file path to the dataset. Defaults to None.
metadata_document_path: The file path to the metadata document.
Defaults to None.
statistic_subject_mapping: An instance of StatisticSubjectMapping.
Defaults to None
errors_as_warnings: Disable raising exceptions if inconsistencies
are found between existing and extracted metadata.
validate_required_fields_on_existing_metadata: Use a Pydantic model
which validates whether required fields are present when reading
in an existing metadata file.
"""
self._statistic_subject_mapping = statistic_subject_mapping
self.errors_as_warnings = errors_as_warnings
self.validate_required_fields_on_existing_metadata = (
validate_required_fields_on_existing_metadata
)
self.metadata_model = (
required_model
if self.validate_required_fields_on_existing_metadata
else all_optional_model
)
self.metadata_document: UPath | None = None
self.container: (
all_optional_model.MetadataContainer
| required_model.MetadataContainer
| None
) = None
self.dataset_path: UPath | None = None
self.dataset = all_optional_model.Dataset()
self.variables: VariableListType = []
self.variables_lookup: dict[str, VariableType] = {}
self.explicitly_defined_metadata_document = False
self.dataset_consistency_status: list[DatasetConsistencyStatus] = []
self.concrete_data_types_lookup: dict[str, str] = {}
if metadata_document_path:
self.metadata_document = UPath(metadata_document_path)
self.explicitly_defined_metadata_document = True
if not self.metadata_document.exists():
msg = f"Metadata document does not exist! Provided path: {self.metadata_document}"
raise ValueError(
msg,
)
if dataset_path:
self.dataset_path = UPath(dataset_path)
if not metadata_document_path:
self.metadata_document = build_metadata_document_path(
self.dataset_path,
)
if metadata_document_path or dataset_path:
self._extract_metadata_from_files()
def _extract_metadata_from_files(self) -> None:
"""Read metadata from an existing metadata document or create one.
If a metadata document exists, it reads and extracts metadata from it.
If no metadata document is found, it creates metadata from scratch by
extracting information from the dataset file.
This method ensures that:
- Metadata is extracted from an existing document if available.
- If metadata is not available, it is extracted from the dataset file.
- The dataset ID is set if not already present.
- Default values are set for variables, particularly the variable role on
creation.
- Default values for variables ID and 'is_personal_data' are set if the
values are None.
- The 'contains_personal_data' attribute is set to False if not specified.
- A lookup dictionary for variables is created based on their short names.
"""
extracted_metadata: all_optional_model.DatadocMetadata | None = None
existing_metadata: OptionalDatadocMetadataType = None
if self.metadata_document and self.metadata_document.exists():
existing_metadata = self._extract_metadata_from_existing_document(
self.metadata_document,
)
if self.dataset_path:
extracted_metadata = self._extract_metadata_from_dataset(self.dataset_path)
self.dataset_consistency_status.extend(
self.check_illegal_variable_data_type(
extracted_metadata.variables or [], self.concrete_data_types_lookup
)
)
if (
self.dataset_path
and self.metadata_document
and extracted_metadata
and existing_metadata
):
if extracted_metadata.dataset and existing_metadata.dataset:
self.dataset_consistency_status.extend(
check_dataset_consistency(
UPath(str(extracted_metadata.dataset.file_path)),
UPath(str(existing_metadata.dataset.file_path)),
)
)
self.dataset_consistency_status.extend(
check_variables_consistency(
extracted_metadata.variables or [],
existing_metadata.variables or [],
)
)
report_metadata_consistency(
self.dataset_consistency_status,
errors_as_warnings=self.errors_as_warnings,
)
# Merge existing metadata with a new dataset
merged_metadata = merge_metadata(
extracted_metadata,
existing_metadata,
explicitly_defined_metadata_document=self.explicitly_defined_metadata_document,
)
# Ensure the document path corresponds to the dataset path
self.metadata_document = build_metadata_document_path(
self.dataset_path,
)
self._set_metadata(merged_metadata)
return
report_metadata_consistency(
self.dataset_consistency_status,
errors_as_warnings=self.errors_as_warnings,
message="Problems were detected with the metadata.",
)
self._set_metadata(existing_metadata or extracted_metadata)
[docs]
def check_illegal_variable_data_type(
self, variables: VariableListType, concrete_data_types_lookup: dict[str, str]
) -> list[DatasetConsistencyStatus]:
"""Check whether any of the variable types are unsupported.
When we encounter a variable which is unsupported, the `DatasetParser` sets the variable `data_type` to `None`.
This function detects that situation and creates a friendly error message to inform of the situation.
"""
return [
DatasetConsistencyStatus(
message=f"Unsupported data type for variable '{v.short_name}' type: '{concrete_data_types_lookup.get(v.short_name, 'unknown')}' from dataset {self.dataset_path}\nPlease change the type of the variable to one of the supported options:\n{pretty_print_supported_types()}",
success=False,
)
for v in variables
if v.short_name and not v.data_type
]
def _set_metadata(
self,
metadata: OptionalDatadocMetadataType,
) -> None:
if not metadata or not (metadata.dataset and metadata.variables):
msg = "Could not read metadata"
raise ValueError(msg)
self.dataset = cast("all_optional_model.Dataset", metadata.dataset)
self.variables = metadata.variables
set_default_values_variables(self.variables)
set_default_values_dataset(cast("all_optional_model.Dataset", self.dataset))
set_dataset_owner(self.dataset)
convert_uris_to_urns(self.variables, "definition_uri", [vardef_urn_converter])
convert_uris_to_urns(
self.variables, "classification_uri", [klass_urn_converter]
)
self._create_variables_lookup()
def _create_variables_lookup(self) -> None:
self.variables_lookup = {
v.short_name: v for v in self.variables if v.short_name
}
def _extract_metadata_from_existing_document(
self,
document: UPath,
) -> OptionalDatadocMetadataType:
"""Read metadata from an existing metadata document.
If an existing metadata document is available, this method reads and
loads the metadata from it. It validates and upgrades the metadata as
necessary. If we have read in a file with an empty "datadoc" structure
the process ends.
A typical example causing a empty datadoc is a file produced from a
pseudonymization process.
Args:
document: A path to the existing metadata document.
Raises:
json.JSONDecodeError: If the metadata document cannot be parsed.
pydantic.ValidationError: If the data does not successfully validate.
"""
fresh_metadata = {}
try:
with document.open(mode="r", encoding="utf-8") as file:
fresh_metadata = json.load(file)
logger.info("Opened existing metadata file %s", document)
fresh_metadata = upgrade_metadata(
fresh_metadata,
)
if is_metadata_in_container_structure(fresh_metadata):
self.container = (
self.metadata_model.MetadataContainer.model_validate_json(
json.dumps(fresh_metadata),
)
)
datadoc_metadata = fresh_metadata["datadoc"]
else:
datadoc_metadata = fresh_metadata
if datadoc_metadata is None:
return None
existing = self.metadata_model.DatadocMetadata.model_validate_json(
json.dumps(datadoc_metadata),
)
# Always override the stored dataset path to ensure it matches
if existing.dataset:
existing.dataset.file_path = str(
self.dataset_path or build_dataset_path(document)
)
except json.JSONDecodeError:
logger.warning(
"Could not open existing metadata file %s. \
Falling back to collecting data from the dataset",
document,
exc_info=True,
)
return None
else:
return existing
def _extract_subject_field_from_path(
self,
dapla_dataset_path_info: DaplaDatasetPathInfo,
) -> str | None:
"""Extract the statistic short name from the dataset file path.
Map the extracted statistic short name to its corresponding statistical
subject.
Args:
dapla_dataset_path_info: The object representing the decomposed file
path.
Returns:
The code for the statistical subject or None if we couldn't map to one.
"""
if self._statistic_subject_mapping is None:
with ThreadPoolExecutor(max_workers=12) as executor:
return StatisticSubjectMapping(
executor,
config.get_statistical_subject_source_url(),
).get_secondary_subject(
dapla_dataset_path_info.statistic_short_name,
)
else:
return self._statistic_subject_mapping.get_secondary_subject(
dapla_dataset_path_info.statistic_short_name,
)
def _extract_metadata_from_dataset(
self,
dataset: UPath,
) -> all_optional_model.DatadocMetadata:
"""Obtain what metadata we can from the dataset itself.
This makes it easier for the user by 'pre-filling' certain fields.
Certain elements are dependent on the dataset being saved according
to SSB's standard.
Args:
dataset: The path to the dataset file, which can be a local or
cloud path.
Side Effects:
Updates the following instance attributes:
- ds_schema: An instance of DatasetParser initialized for the
given dataset file.
- dataset: An instance of model.Dataset with pre-filled metadata
fields.
- variables: A list of fields extracted from the dataset schema.
"""
dapla_dataset_path_info = DaplaDatasetPathInfo(dataset)
metadata = all_optional_model.DatadocMetadata()
metadata.dataset = all_optional_model.Dataset(
short_name=dapla_dataset_path_info.dataset_short_name,
dataset_state=dapla_dataset_path_info.dataset_state,
dataset_status=DataSetStatus.DRAFT,
assessment=(
derive_assessment_from_state(
dapla_dataset_path_info.dataset_state,
)
if dapla_dataset_path_info.dataset_state is not None
else None
),
version=dapla_dataset_path_info.dataset_version,
contains_data_from=dapla_dataset_path_info.contains_data_from,
contains_data_until=dapla_dataset_path_info.contains_data_until,
file_path=str(self.dataset_path),
metadata_created_by=user_info.get_user_info_for_current_platform().short_email,
subject_field=self._extract_subject_field_from_path(
dapla_dataset_path_info,
),
spatial_coverage_description=DEFAULT_SPATIAL_COVERAGE_DESCRIPTION,
)
metadata.variables = DatasetParser.for_file(dataset).get_fields()
try:
self.concrete_data_types_lookup = DatasetParser.for_file(
dataset
).get_concrete_data_types()
except RuntimeError:
logger.exception(
"Failed to get concrete data types for dataset %s", dataset
)
return metadata
[docs]
def datadoc_model(
self,
) -> all_optional_model.MetadataContainer | required_model.MetadataContainer:
"""Return the underlying datadoc model."""
datadoc = self.metadata_model.DatadocMetadata(
percentage_complete=self.percent_complete,
dataset=self.dataset,
variables=self.variables,
)
if self.container:
res = copy.deepcopy(self.container)
res.datadoc = datadoc
return res
return self.metadata_model.MetadataContainer(datadoc=datadoc)
@property
def percent_complete(self) -> int:
"""The percentage of obligatory metadata completed.
A metadata field is counted as complete when any non-None value is
assigned. Used for a live progress bar in the UI, as well as being
saved in the datadoc as a simple quality indicator.
"""
num_all_fields = NUM_OBLIGATORY_DATASET_FIELDS + (
NUM_OBLIGATORY_VARIABLES_FIELDS * len(self.variables)
)
num_set_fields = num_obligatory_dataset_fields_completed(
self.dataset,
) + num_obligatory_variables_fields_completed(self.variables)
return calculate_percentage(num_set_fields, num_all_fields)
[docs]
def add_pseudonymization(
self,
variable_short_name: str,
pseudonymization: PseudonymizationType | None = None,
) -> None:
"""Adds a new pseudo variable to the list of pseudonymized variables.
If `pseudonymization` is not supplied, an empty Pseudonymization structure
will be created and assigned to the variable.
If an encryption algorithm is recognized (one of the standard Dapla algorithms), default values are filled
for any missing fields.
Args:
variable_short_name: The short name for the variable that one wants to update the pseudo for.
pseudonymization: The updated pseudonymization.
"""
variable = self.variables_lookup[variable_short_name]
if pseudonymization:
set_default_values_pseudonymization(variable, pseudonymization)
else:
if self.validate_required_fields_on_existing_metadata:
msg = "Can't add empty pseudonymization object when validating required fields! Try setting `validate_required_fields_on_existing_metadata` to `False`."
raise ValueError(msg)
variable.pseudonymization = all_optional_model.Pseudonymization()
[docs]
def remove_pseudonymization(self, variable_short_name: str) -> None:
"""Removes a pseudo variable by using the shortname.
Updates the pseudo variable lookup by creating a new one.
Args:
variable_short_name: The short name for the variable that one wants to remove the pseudo for.
"""
if self.variables_lookup[variable_short_name].pseudonymization is not None:
self.variables_lookup[variable_short_name].pseudonymization = None
def _update_variable(
self, target_short_name: str, source_variable: VariableType
) -> None:
"""Updates a variable by short_name.
Args:
target_short_name: The short name for the variable that one wants to update.
source_variable: The variable data to update with.
"""
for i, variable in enumerate(self.variables):
if (
hasattr(variable, "short_name")
and variable.short_name == target_short_name
):
self.variables[i] = source_variable # type: ignore[assignment]
return
msg = f"Variable with short_name '{target_short_name}' not found."
raise ValueError(msg)
[docs]
def copy_variable(
self,
metadata_document_path: ReadablePathLike,
target_short_name: str,
source_short_name: str | None = None,
) -> None:
"""Copies the variables from the given dataset to the current dataset.
Args:
metadata_document_path: The path to the metadata document one wants to copy from.
target_short_name: The short name for the variable that one wants to copy to.
source_short_name: The short name for the variable that one wants to copy from. If None, the target short name is used.
"""
if target_short_name not in self.variables_lookup:
msg = f"Target variable {target_short_name} does not exist in the metadata document you are copying into!"
raise ValueError(msg)
source_short_name = source_short_name or target_short_name
source_variables = read_variables_from_metadata_document(metadata_document_path)
source_variables_lookup = {
v.short_name: v for v in source_variables if v.short_name is not None
}
if source_short_name not in source_variables_lookup:
msg = f"{source_short_name} does not exist!"
raise ValueError(msg)
source_variable = source_variables_lookup[source_short_name]
# Always override the data type to ensure it matches the physical dataset.
source_variable.data_type = self.variables_lookup[ # type: ignore[assignment]
target_short_name
].data_type
self.variables_lookup[target_short_name] = source_variable
source_variable = all_optional_model.Variable.model_validate(source_variable)
self._update_variable(target_short_name, source_variable)