ssb_timeseries.io

The IO module provides abstractions for READ and WRITE operations so that Dataset does not have to care avbout the mechanics.

TO DO: turn Dataset.io into a Protocol class?

Essential configs:

TIMESERIES_CONFIG: str = os.environ.get(“TIMESERIES_CONFIG”) CONFIG = config.Config(configuration_file=TIMESERIES_CONFIG)

Default configs may be created by running

poetry run timeseries-config {home | jovyan | gcs}

See config module docs for details.

exception DatasetIoException

Bases: Exception

Exception for dataset io errors.

class FileSystem(set_name, set_type, as_of_utc=None, process_stage='statistikk', sharing=None)

Bases: object

A filesystem abstraction for Dataset IO.

Initialise filesystem abstraction for dataset.

Calculate directory structure based on dataset type and name.

Parameters:
  • set_name (str)

  • set_type (SeriesType)

  • as_of_utc (datetime | None)

  • process_stage (str)

  • sharing (dict | None)

__init__(set_name, set_type, as_of_utc=None, process_stage='statistikk', sharing=None)

Initialise filesystem abstraction for dataset.

Calculate directory structure based on dataset type and name.

Parameters:
  • set_name (str)

  • set_type (SeriesType)

  • as_of_utc (datetime | None)

  • process_stage (str)

  • sharing (dict | None)

Return type:

None

property data_dir: str

The data directory for the dataset. This is a subdirectory under the type path.

property data_file: str

The name of the data file for the dataset.

property data_fullpath: str

The full path to the data file.

datafile_exists()

Check if the data file exists.

Return type:

bool

classmethod dir(*args, **kwargs)

Check that target directory is under BUCKET. If so, create it if it does not exist.

Return type:

str

Parameters:
  • args (str)

  • kwargs (bool)

last_version_number_by_regex(directory, pattern='*')

Check directory and get max version number from files matching regex pattern.

Return type:

str

Parameters:
  • directory (str)

  • pattern (str)

list_versions(file_pattern='*', pattern='as_of')

Check data directory and list version marker (‘as-of’ or ‘name’) of data files.

Return type:

list[str | datetime]

Parameters:
property metadata_dir: str

The location of the metadata file for the dataset.

In the inital implementation with data and metadata in separate files it made sense for this to be the same as the data directory. However, Most likely, in a future version we will change this apporach and store metadata as header information in the data file, and the same information in a central meta data directory.

property metadata_file: str

The name of the metadata file for the dataset.

property metadata_fullpath: str

The full path to the metadata file.

metadatafile_exists()

Check if the metadata file exists.

Return type:

bool

parquet_schema(meta)

Dataset specific helper: translate tags to parquet schema metadata before the generic call ‘write_parquet’.

Return type:

Schema | None

Parameters:

meta (dict[str, Any])

parquet_schema_from_df(df)

Dataset specific helper: translate tags to parquet schema metadata before the generic call ‘write_parquet’.

Return type:

Schema | None

Parameters:

df (DataFrame)

read_data(interval=<function Interval.all>)

Read data from the filesystem. Return empty dataframe if not found.

Return type:

DataFrame

Parameters:

interval (Interval)

read_metadata()

Read tags from the metadata file.

Return type:

dict

property root: str

The root path is the basis for all other paths.

save(meta, data=None)

Save data and metadata to disk.

Return type:

None

Parameters:
  • meta (dict)

  • data (DataFrame)

property set_type_dir: str

temporality and versioning.

Type:

Under the time series root there is a directory for each data type. Names concatenate the contituents of the type

sharing_directory(bucket)

Get name of sharing directory based on dataset parameters and configuration.

Creates the directory if it does not exist.

Return type:

str | PathLike[str]

Parameters:

bucket (str)

snapshot(product, process_stage, sharing=None, as_of_tz=None, period_from=None, period_to=None)

Copies snapshots to bucket(s) according to processing stage and sharing configuration.

For this to work, .stage and sharing configurations should be set for the dataset, eg:

.sharing = [
    {'team': 's123', 'path': '<s1234-bucket>'},
    {'team': 's234', 'path': '<s234-bucket>'},
    {'team': 's345': 'path': '<s345-bucket>'}
]
.stage = 'statistikk'
Return type:

None

Parameters:
  • product (str)

  • process_stage (str)

  • sharing (dict | None)

  • as_of_tz (datetime | None)

  • period_from (datetime | None)

  • period_to (datetime | None)

snapshot_directory(product, process_stage='statistikk')

Get name of snapshot directory.

Uses dataset parameters, configuration, product and process stage.

Return type:

str | PathLike[str]

Parameters:
  • product (str)

  • process_stage (str)

snapshot_filename(product, process_stage, as_of_utc=None, period_from='', period_to='')

Get full path of snapshot file.

Uses dataset parameters, configuration, product, process stage and as-of time. Relying on snapshot_directory() first to get the directory name.

Return type:

str | PathLike[str]

Parameters:
  • product (str)

  • process_stage (str)

  • as_of_utc (datetime | None)

  • period_from (str)

  • period_to (str)

property type_path: str

All sets of the same data type are stored in the same sub directory under the timeseries root.

write_data(new, tags=None)

Write data to the filesystem. If versioning is AS_OF, write to new file. If versioning is NONE, write to existing file.

Return type:

None

Parameters:
  • new (DataFrame)

  • tags (dict | None)

write_metadata(meta)

Write tags to the metadata file.

Return type:

None

Parameters:

meta (dict)

class SearchResult(name: str, type_directory: str)

Bases: NamedTuple

Result item for search.

Create new instance of SearchResult(name, type_directory)

Parameters:
  • name (str)

  • type_directory (str)

name: str

Alias for field number 0

type_directory: str

Alias for field number 1

find_datasets(pattern='', exclude='metadata')

Search for files in under timeseries root.

Return type:

list[SearchResult]

Parameters:
  • pattern (str | PathLike[str])

  • exclude (str)

find_metadata_files(repository=None, pattern='', contains='', equals='')

Search for metadata json files in the ‘catalog’ directory.

Only one of the arguments ‘contains’ or ‘equals’ can be provided at the same time. If none is provided, all files are returned.

Return type:

list[str]

Parameters:
  • repository (list[str | PathLike[str]] | str | PathLike[str] | None)

  • pattern (str)

  • contains (str)

  • equals (str)

for_all_datasets_move_metadata_files(pattern='')

Search for files in under timeseries root.

Return type:

list[SearchResult]

Parameters:

pattern (str | PathLike[str])

list_datasets()

List all datasets under timeseries root.

Return type:

list[SearchResult]

merge_data(old, new, date_cols)

Merge new data into old data.

Return type:

Table | DataFrame | DataFrame

Parameters:
  • old (Table | DataFrame | DataFrame)

  • new (Table | DataFrame | DataFrame)

  • date_cols (set[str])

parquet_schema(data_type, meta)

Dataset specific helper: translate tags to parquet schema metadata before the generic call ‘write_parquet’.

Return type:

Schema | None

Parameters:
tags_from_json(dict_with_json_string, byte_encoded=True)

Reverse ‘tags_to_json()’: return tag dict from dict that has been coerced to bytes.

Mutliple dict fields into a single field: {json: <json-string>}. May or may not have been byte encoded.

Return type:

dict

Parameters:
  • dict_with_json_string (dict)

  • byte_encoded (bool)

tags_from_json_file(file_or_files)

Read one or more json files.

Return type:

dict[str, dict[str, str | list[str]] | dict[str, dict[str, str | list[str]]]] | list[dict[str, dict[str, str | list[str]] | dict[str, dict[str, str | list[str]]]]]

Parameters:

file_or_files (str | PathLike[str] | list[str | PathLike[str]])

tags_to_json(x)

Turn tag dict into a dict where keys and values are coercible to bytes.

See: https://arrow.apache.org/docs/python/generated/pyarrow.schema.html

The simple solution is to put it all into a single field: {json: <json-string>}

Return type:

dict[str, str]

Parameters:

x (dict[str, str | list[str]])

version_from_file_name(file_name, pattern='as_of', group=2)

For known name patterns, extract version marker.

Return type:

str

Parameters:
  • file_name (str)

  • pattern (str | Versioning)

  • group (int)