Source code for dapla_metadata.standards.name_validator

import asyncio
import logging
import re
from collections.abc import AsyncGenerator

from upath import UPath
from upath.types import ReadablePathLike

from dapla_metadata.datasets.dapla_dataset_path_info import DaplaDatasetPathInfo
from dapla_metadata.datasets.dataset_parser import SUPPORTED_DATASET_FILE_SUFFIXES
from dapla_metadata.standards.utils.constants import FILE_DOES_NOT_EXIST
from dapla_metadata.standards.utils.constants import FILE_IGNORED
from dapla_metadata.standards.utils.constants import IGNORED_FOLDERS
from dapla_metadata.standards.utils.constants import INVALID_SYMBOLS
from dapla_metadata.standards.utils.constants import MISSING_DATA_STATE
from dapla_metadata.standards.utils.constants import MISSING_DATASET_SHORT_NAME
from dapla_metadata.standards.utils.constants import MISSING_PERIOD
from dapla_metadata.standards.utils.constants import MISSING_SHORT_NAME
from dapla_metadata.standards.utils.constants import MISSING_VERSION
from dapla_metadata.standards.utils.constants import NAME_STANDARD_SUCCESS
from dapla_metadata.standards.utils.constants import NAME_STANDARD_VIOLATION
from dapla_metadata.standards.utils.constants import PATH_IGNORED
from dapla_metadata.standards.utils.constants import SSB_NAMING_STANDARD_REPORT
from dapla_metadata.standards.utils.constants import SSB_NAMING_STANDARD_REPORT_FILES
from dapla_metadata.standards.utils.constants import (
    SSB_NAMING_STANDARD_REPORT_RESULT_AVERAGE,
)
from dapla_metadata.standards.utils.constants import (
    SSB_NAMING_STANDARD_REPORT_RESULT_BEST,
)
from dapla_metadata.standards.utils.constants import (
    SSB_NAMING_STANDARD_REPORT_RESULT_GOOD,
)
from dapla_metadata.standards.utils.constants import (
    SSB_NAMING_STANDARD_REPORT_RESULT_LOW,
)
from dapla_metadata.standards.utils.constants import (
    SSB_NAMING_STANDARD_REPORT_RESULT_NO_SCORE,
)
from dapla_metadata.standards.utils.constants import SSB_NAMING_STANDARD_REPORT_SUCCESS
from dapla_metadata.standards.utils.constants import (
    SSB_NAMING_STANDARD_REPORT_SUCCESS_RATE,
)
from dapla_metadata.standards.utils.constants import (
    SSB_NAMING_STANDARD_REPORT_VIOLATIONS,
)

logger = logging.getLogger(__name__)


[docs] class ValidationResult: """Result object for name standard validation.""" def __init__( self, success: bool, file_path: str, ) -> None: """Initialize the validatation result.""" self.success = success self.file_path = file_path self.messages: list[str] = [] self.violations: list[str] = []
[docs] def add_message(self, message: str) -> None: """Add message to list.""" if message not in self.messages: self.messages.append(message)
[docs] def add_violation(self, violation: str) -> None: """Add violation to list.""" if violation not in self.violations: self.violations.append(violation) if self.success: self.success = False
def __repr__(self) -> str: """Representation for debugging.""" return f"ValidationResult(success={self.success}, file_path={self.file_path}, messages={self.messages}, violations={self.violations})"
[docs] def to_dict(self) -> dict: """Return result as a dictionary.""" return { "success": self.success, "file_path": self.file_path, "messages": self.messages, "violations": self.violations, }
[docs] class NamingStandardReport: """Report object for name standard validation.""" def __init__(self, validation_results: list[ValidationResult]) -> None: """Initialize the naming standard report.""" self.validation_results = validation_results self.num_files_validated = len(validation_results) self.num_success = len( [result for result in validation_results if result.success is True], ) self.num_failures = len( [result for result in validation_results if result.success is False], )
[docs] def generate_report(self) -> str: """Format the report as a string.""" return ( f"{SSB_NAMING_STANDARD_REPORT}\n" f"=============================\n" f"{self.evaluate_result()}" f"{SSB_NAMING_STANDARD_REPORT_SUCCESS_RATE}: {self.success_rate():.2f}%\n" f"{SSB_NAMING_STANDARD_REPORT_FILES}: {self.num_files_validated}\n" f"{SSB_NAMING_STANDARD_REPORT_SUCCESS}: {self.num_success}\n" f"{SSB_NAMING_STANDARD_REPORT_VIOLATIONS}s: {self.num_failures}\n" )
[docs] def success_rate(self) -> int | float | None: """Calculate the success rate as a percentage. Returns: int | float | None: The success rate as a percentage, or None if no files were validated. """ if self.num_files_validated == 0: return None return self.num_success / self.num_files_validated * 100
[docs] def evaluate_result(self) -> str: """Returns an appropriate message based on the success rate.""" rate = self.success_rate() if rate is not None: if 95 <= rate <= 100: return SSB_NAMING_STANDARD_REPORT_RESULT_BEST if 70 < rate < 95: return SSB_NAMING_STANDARD_REPORT_RESULT_GOOD if 40 <= rate <= 70: return SSB_NAMING_STANDARD_REPORT_RESULT_AVERAGE if rate < 40: return SSB_NAMING_STANDARD_REPORT_RESULT_LOW return SSB_NAMING_STANDARD_REPORT_RESULT_NO_SCORE
def _has_invalid_symbols(path: ReadablePathLike) -> bool: """Return True if string contains illegal symbols. Examples: >>> _has_invalid_symbols("åregang-øre") True >>> _has_invalid_symbols("Azor89") False >>> _has_invalid_symbols("ssbÆ-dapla-example-data-produkt-prod/ledstill/oppdrag/skjema_p2018_p2020_v1") True >>> _has_invalid_symbols("ssb-dapla-example-data-produkt-prod/ledstill/oppdrag/skjema_p2018_p2020_v1") False >>> _has_invalid_symbols("ssb-dapla-example-data-produkt-prod/ledstill/inndata/skjema_p2018_p202_v1/aar=2018/data.parquet") False """ # TODO @mmwinther: The = symbol is allowed to avoid failures on subdirectories of partioned parquet datasets. # DPMETA-824 return bool(re.search(r"[^a-zA-Z0-9\./:_\-=]", str(path).strip())) def _check_violations( file: UPath, ) -> list[str]: """Check for missing attributes and invalid symbols.""" path_info = DaplaDatasetPathInfo(file) checks = { MISSING_SHORT_NAME: path_info.statistic_short_name, MISSING_DATA_STATE: path_info.dataset_state, MISSING_PERIOD: path_info.contains_data_from, MISSING_DATASET_SHORT_NAME: path_info.dataset_short_name, MISSING_VERSION: path_info.dataset_version, INVALID_SYMBOLS: not _has_invalid_symbols(file), } return [message for message, value in checks.items() if not value] async def _validate_file( file: UPath, check_file_exists: bool = False, ) -> ValidationResult: """Check for naming standard violations. Returns: A ValidationResult object containing messages and violations """ logger.info("Validating file: %s", file) if file.suffix not in SUPPORTED_DATASET_FILE_SUFFIXES: logger.info("Skipping validation on non-dataset file: %s", file) return await _ignored_file_type_result(file) result = ValidationResult(success=True, file_path=str(file)) if check_file_exists and not file.exists(): result.add_message( FILE_DOES_NOT_EXIST, ) result.violations = await asyncio.get_running_loop().run_in_executor( None, lambda: _check_violations(file), ) if result.violations: result.success = False result.add_message(NAME_STANDARD_VIOLATION) else: result.success = True result.add_message( NAME_STANDARD_SUCCESS, ) return result async def _ignored_folder_result(file: UPath) -> ValidationResult: r = ValidationResult(success=True, file_path=str(file)) r.add_message(PATH_IGNORED) return r async def _ignored_file_type_result(file: UPath) -> ValidationResult: r = ValidationResult(success=True, file_path=str(file)) r.add_message(FILE_IGNORED) return r
[docs] async def validate_directory( path: ReadablePathLike, ) -> AsyncGenerator[AsyncGenerator | asyncio.Task]: """Validate a file or recursively validate all files in a directory.""" path = UPath(path) if set(path.parts).intersection(IGNORED_FOLDERS): logger.info("File path ignored: %s", path) yield asyncio.create_task(_ignored_folder_result(path)) elif path.suffix: yield asyncio.create_task(_validate_file(path, check_file_exists=True)) else: for obj in await asyncio.get_running_loop().run_in_executor( None, lambda: path.glob("*"), ): if obj.suffix: yield asyncio.create_task(_validate_file(obj), name=obj.name) else: logger.debug("Recursing into: %s", obj) yield validate_directory(obj)