Source code for dapla_metadata.standards.standard_validators

import asyncio
import logging
import os
import time
from collections.abc import AsyncGenerator

from dapla_metadata.standards.name_validator import NamingStandardReport
from dapla_metadata.standards.name_validator import ValidationResult
from dapla_metadata.standards.name_validator import validate_directory

logger = logging.getLogger(__name__)


[docs] async def check_naming_standard( file_path: str | os.PathLike[str], ) -> list[ValidationResult]: """Check whether a given path follows the SSB naming standard. This function checks whether the provided `file_path` and subdirectories thereof comply with the naming standard. Currently we only examine '.parquet' files. Other files are ignored. Args: file_path: The path to a bucket, directory, or specific file to validate. This can be in the following forms: - A bucket URL in the form 'gs://ssb-dapla-felles-data-produkt-test' - An absolute path to a mounted bucket in the form '/buckets/produkt' - Any subdirectory or file thereof We also accept paths which don't yet exist so that you can test if a path will comply. Returns: list[ValidationResult]: A list of validation results, including success status, checked file path, messages, and any detected violations. Examples: >>> check_naming_standard("/data/example_file.parquet").success False >>> check_naming_standard("/buckets/produkt/datadoc/utdata/person_data_p2021_v2.parquet").success True """ results = [] # Begin validation. # For each file this returns a task which we can wait on to complete. # For each directory this returns another AsyncGenerator which must be unpacked below tasks: list[asyncio.Task] = [] async for t in flatten_generator(validate_directory(str(file_path))): tasks.append(t) # noqa: PERF401 # 5 minute timeout for safety start_time = time.time() while time.time() < start_time + (5 * 60): for item in tasks: if isinstance(item, AsyncGenerator): # Drill down into lower directories to get the validation tasks from them tasks.remove(item) new_tasks = [t async for t in item] logger.debug("New Tasks: %s %s", len(new_tasks), new_tasks) tasks.extend( new_tasks, ) elif isinstance(item, asyncio.Task): if item.done(): logger.info("Validated %s", item.get_name()) tasks.remove(item) results.append(item.result()) logger.debug("Tasks: %s %s", len(tasks), tasks) logger.debug("Results: %s", len(results)) if len(tasks) == 0: logger.info("Completed validation") break # Allow time for other processing to be performed await asyncio.sleep(0.001) return results
[docs] async def flatten_generator(gen: AsyncGenerator) -> AsyncGenerator[asyncio.Task, None]: """Recursively flatten nested async generators.""" async for item in gen: if isinstance(item, AsyncGenerator): async for sub_item in flatten_generator(item): yield sub_item else: yield item
[docs] def generate_validation_report( validation_results: list[ValidationResult], ) -> NamingStandardReport: """Generate and print a formatted naming standard validation report. This function takes a list of `ValidationResult` objects, creates a `NamingStandardReport` instance, and prints the generated report. Args: validation_results: A list of ValidationResult objects that contain the outcomes of the name standard checks. Returns: NamingStandardReport: An instance of `NamingStandardReport` containing the validation results. """ report = NamingStandardReport(validation_results=validation_results) print(report.generate_report()) # noqa: T201 return report