ssb_timeseries.io
¶
The IO module provides abstractions for READ and WRITE operations so that Dataset does not have to care avbout the mechanics.
TO DO: turn Dataset.io into a Protocol class?
- Essential configs:
TIMESERIES_CONFIG: str = os.environ.get(“TIMESERIES_CONFIG”) CONFIG = config.Config(configuration_file=TIMESERIES_CONFIG)
- Default configs may be created by running
poetry run timeseries-config {home | jovyan | gcs}
See config module docs for details.
- exception DatasetIoException¶
Bases:
Exception
Exception for dataset io errors.
- class FileSystem(set_name, set_type, as_of_utc=None, process_stage='statistikk', sharing=None)¶
Bases:
object
A filesystem abstraction for Dataset IO.
Initialise filesystem abstraction for dataset.
Calculate directory structure based on dataset type and name.
- Parameters:
set_name (str)
set_type (SeriesType)
as_of_utc (datetime | None)
process_stage (str)
sharing (dict | None)
- __init__(set_name, set_type, as_of_utc=None, process_stage='statistikk', sharing=None)¶
Initialise filesystem abstraction for dataset.
Calculate directory structure based on dataset type and name.
- Parameters:
set_name (str)
set_type (SeriesType)
as_of_utc (datetime | None)
process_stage (str)
sharing (dict | None)
- Return type:
None
- property data_dir: str¶
The data directory for the dataset. This is a subdirectory under the type path.
- property data_file: str¶
The name of the data file for the dataset.
- property data_fullpath: str¶
The full path to the data file.
- datafile_exists()¶
Check if the data file exists.
- Return type:
bool
- classmethod dir(*args, **kwargs)¶
Check that target directory is under BUCKET. If so, create it if it does not exist.
- Return type:
str
- Parameters:
args (str)
kwargs (bool)
- last_version_number_by_regex(directory, pattern='*')¶
Check directory and get max version number from files matching regex pattern.
- Return type:
str
- Parameters:
directory (str)
pattern (str)
- list_versions(file_pattern='*', pattern='as_of')¶
Check data directory and list version marker (‘as-of’ or ‘name’) of data files.
- Return type:
list
[str
|datetime
]- Parameters:
file_pattern (str)
pattern (str | Versioning)
- property metadata_dir: str¶
The location of the metadata file for the dataset.
In the inital implementation with data and metadata in separate files it made sense for this to be the same as the data directory. However, Most likely, in a future version we will change this apporach and store metadata as header information in the data file, and the same information in a central meta data directory.
- property metadata_file: str¶
The name of the metadata file for the dataset.
- property metadata_fullpath: str¶
The full path to the metadata file.
- metadatafile_exists()¶
Check if the metadata file exists.
- Return type:
bool
- parquet_schema(meta)¶
Dataset specific helper: translate tags to parquet schema metadata before the generic call ‘write_parquet’.
- Return type:
Schema
|None
- Parameters:
meta (dict[str, Any])
- parquet_schema_from_df(df)¶
Dataset specific helper: translate tags to parquet schema metadata before the generic call ‘write_parquet’.
- Return type:
Schema
|None
- Parameters:
df (DataFrame)
- read_data(interval=<function Interval.all>)¶
Read data from the filesystem. Return empty dataframe if not found.
- Return type:
DataFrame
- Parameters:
interval (Interval)
- read_metadata()¶
Read tags from the metadata file.
- Return type:
dict
- property root: str¶
The root path is the basis for all other paths.
- save(meta, data=None)¶
Save data and metadata to disk.
- Return type:
None
- Parameters:
meta (dict)
data (DataFrame)
- property set_type_dir: str¶
temporality and versioning.
- Type:
Under the time series root there is a directory for each data type. Names concatenate the contituents of the type
- sharing_directory(bucket)¶
Get name of sharing directory based on dataset parameters and configuration.
Creates the directory if it does not exist.
- Return type:
str
|PathLike
[str
]- Parameters:
bucket (str)
- snapshot(product, process_stage, sharing=None, as_of_tz=None, period_from=None, period_to=None)¶
Copies snapshots to bucket(s) according to processing stage and sharing configuration.
For this to work, .stage and sharing configurations should be set for the dataset, eg:
.sharing = [ {'team': 's123', 'path': '<s1234-bucket>'}, {'team': 's234', 'path': '<s234-bucket>'}, {'team': 's345': 'path': '<s345-bucket>'} ] .stage = 'statistikk'
- Return type:
None
- Parameters:
product (str)
process_stage (str)
sharing (dict | None)
as_of_tz (datetime | None)
period_from (datetime | None)
period_to (datetime | None)
- snapshot_directory(product, process_stage='statistikk')¶
Get name of snapshot directory.
Uses dataset parameters, configuration, product and process stage.
- Return type:
str
|PathLike
[str
]- Parameters:
product (str)
process_stage (str)
- snapshot_filename(product, process_stage, as_of_utc=None, period_from='', period_to='')¶
Get full path of snapshot file.
Uses dataset parameters, configuration, product, process stage and as-of time. Relying on snapshot_directory() first to get the directory name.
- Return type:
str
|PathLike
[str
]- Parameters:
product (str)
process_stage (str)
as_of_utc (datetime | None)
period_from (str)
period_to (str)
- property type_path: str¶
All sets of the same data type are stored in the same sub directory under the timeseries root.
- write_data(new, tags=None)¶
Write data to the filesystem. If versioning is AS_OF, write to new file. If versioning is NONE, write to existing file.
- Return type:
None
- Parameters:
new (DataFrame)
tags (dict | None)
- write_metadata(meta)¶
Write tags to the metadata file.
- Return type:
None
- Parameters:
meta (dict)
- class SearchResult(name: str, type_directory: str)¶
Bases:
NamedTuple
Result item for search.
Create new instance of SearchResult(name, type_directory)
- Parameters:
name (str)
type_directory (str)
-
name:
str
¶ Alias for field number 0
-
type_directory:
str
¶ Alias for field number 1
- find_datasets(pattern='', exclude='metadata')¶
Search for files in under timeseries root.
- Return type:
list
[SearchResult
]- Parameters:
pattern (str | PathLike[str])
exclude (str)
- find_metadata_files(repository=None, pattern='', contains='', equals='')¶
Search for metadata json files in the ‘catalog’ directory.
Only one of the arguments ‘contains’ or ‘equals’ can be provided at the same time. If none is provided, all files are returned.
- Return type:
list
[str
]- Parameters:
repository (list[str | PathLike[str]] | str | PathLike[str] | None)
pattern (str)
contains (str)
equals (str)
- for_all_datasets_move_metadata_files(pattern='')¶
Search for files in under timeseries root.
- Return type:
list
[SearchResult
]- Parameters:
pattern (str | PathLike[str])
- list_datasets()¶
List all datasets under timeseries root.
- Return type:
list
[SearchResult
]
- merge_data(old, new, date_cols)¶
Merge new data into old data.
- Return type:
Table
|DataFrame
|DataFrame
- Parameters:
old (Table | DataFrame | DataFrame)
new (Table | DataFrame | DataFrame)
date_cols (set[str])
- parquet_schema(data_type, meta)¶
Dataset specific helper: translate tags to parquet schema metadata before the generic call ‘write_parquet’.
- Return type:
Schema
|None
- Parameters:
data_type (SeriesType)
meta (dict[str, Any])
- tags_from_json(dict_with_json_string, byte_encoded=True)¶
Reverse ‘tags_to_json()’: return tag dict from dict that has been coerced to bytes.
Mutliple dict fields into a single field: {json: <json-string>}. May or may not have been byte encoded.
- Return type:
dict
- Parameters:
dict_with_json_string (dict)
byte_encoded (bool)
- tags_from_json_file(file_or_files)¶
Read one or more json files.
- Return type:
dict
[str
,dict
[str
,str
|list
[str
]] |dict
[str
,dict
[str
,str
|list
[str
]]]] |list
[dict
[str
,dict
[str
,str
|list
[str
]] |dict
[str
,dict
[str
,str
|list
[str
]]]]]- Parameters:
file_or_files (str | PathLike[str] | list[str | PathLike[str]])
- tags_to_json(x)¶
Turn tag dict into a dict where keys and values are coercible to bytes.
See: https://arrow.apache.org/docs/python/generated/pyarrow.schema.html
The simple solution is to put it all into a single field: {json: <json-string>}
- Return type:
dict
[str
,str
]- Parameters:
x (dict[str, str | list[str]])
- version_from_file_name(file_name, pattern='as_of', group=2)¶
For known name patterns, extract version marker.
- Return type:
str
- Parameters:
file_name (str)
pattern (str | Versioning)
group (int)