Skip to content

state_utils

Utils for getting and setting the state.

StateObjectHandler

Handles operations around persisting the state.

Source code in dapla_team_cli/pr/state/state_utils.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class StateObjectHandler:
    """Handles operations around persisting the state."""

    def __init__(self, bucket_name: str, user_project: str) -> None:
        """Initializes the class."""
        self.bucket_name = bucket_name
        self.user_project = user_project
        self.client: Optional[storage.Client] = None
        self.bucket: Optional[storage.Bucket] = None
        self.current_state: Optional[State] = None

    def get_client(self) -> storage.Client:
        """Return a GCS storage client."""
        if self.client is None:
            self.client = storage.Client()
        return self.client

    def get_bucket(self) -> storage.Bucket:
        """Return a bucket object."""
        if self.bucket is None:
            self.bucket = self.get_client().bucket(bucket_name=self.bucket_name, user_project=self.user_project)
        return self.bucket

    def get_user_state(self) -> Optional[State]:
        """Prompt the user to select a run from those available and return that run as a state object."""
        state_object_name = self.user_select_run(show_other_users=True)
        if not state_object_name:
            return None

        return self.fetch_state(state_object_name=state_object_name)

    def fetch_state(self, state_object_name: Optional[str] = None) -> State:
        """Fetch a state object and deserialize it."""
        if not state_object_name:
            state_object_name = self.user_select_run()

        blob = self.get_bucket().blob(state_object_name)
        json_data = json.loads(blob.download_as_string())
        self.current_state = State(**json_data)
        return self.current_state

    def set_state(self, state: State) -> None:
        """Serialize the state object and write it to a json file in persistent storage."""
        self.current_state = state
        blob = self.get_bucket().blob(f"{state.name}.json")
        blob.upload_from_string(state.json())

    def list_blobs(self, show_other_users: bool) -> list[str]:
        """Fetch a list of blobs from a bucket."""
        blobs = self.get_bucket().list_blobs()
        sorted_blobs = sorted(blobs, key=lambda b: b.updated, reverse=True)
        batch_blobs = [f"{blob.name}, {blob.updated.replace(microsecond=0)}" for blob in sorted_blobs]

        if not show_other_users:
            result = subprocess.run(
                ["gcloud", "config", "list", "account", "--format", "value(core.account)"], stdout=subprocess.PIPE, check=True
            )  # noqa: S603
            run_invoker = result.stdout.decode().strip().replace("@ssb.no", "")
            batch_blobs = [blob_data for blob_data in batch_blobs if run_invoker in blob_data]

        return batch_blobs

    def user_select_run(self, show_other_users: bool = False) -> StateObjectName:
        """Prompt the user to select a run from a list of available options."""
        batch_blobs = self.list_blobs(show_other_users)

        if batch_blobs is None or len(batch_blobs) == 0:
            print("No state-files were found. Please create a new state-file with 'dpteam batch ready'.")
            sys.exit(1)

        if len(batch_blobs) == 1:
            state_object = batch_blobs[0]
        else:
            state_object = questionary.select(
                "Which run do you want to perform this action on?",
                choices=batch_blobs + [questionary.Separator(), questionary.Choice(FormattedText([("bold fg:ansired", CANCEL)]))],  # type: ignore
            ).ask()

        if state_object in [None, CANCEL]:
            return None

        state_object_name = state_object.split(",")[0]
        print(f"[blue] Using state file with name {state_object_name}")
        return str(state_object_name)

__init__(bucket_name, user_project)

Initializes the class.

Source code in dapla_team_cli/pr/state/state_utils.py
26
27
28
29
30
31
32
def __init__(self, bucket_name: str, user_project: str) -> None:
    """Initializes the class."""
    self.bucket_name = bucket_name
    self.user_project = user_project
    self.client: Optional[storage.Client] = None
    self.bucket: Optional[storage.Bucket] = None
    self.current_state: Optional[State] = None

fetch_state(state_object_name=None)

Fetch a state object and deserialize it.

Source code in dapla_team_cli/pr/state/state_utils.py
54
55
56
57
58
59
60
61
62
def fetch_state(self, state_object_name: Optional[str] = None) -> State:
    """Fetch a state object and deserialize it."""
    if not state_object_name:
        state_object_name = self.user_select_run()

    blob = self.get_bucket().blob(state_object_name)
    json_data = json.loads(blob.download_as_string())
    self.current_state = State(**json_data)
    return self.current_state

get_bucket()

Return a bucket object.

Source code in dapla_team_cli/pr/state/state_utils.py
40
41
42
43
44
def get_bucket(self) -> storage.Bucket:
    """Return a bucket object."""
    if self.bucket is None:
        self.bucket = self.get_client().bucket(bucket_name=self.bucket_name, user_project=self.user_project)
    return self.bucket

get_client()

Return a GCS storage client.

Source code in dapla_team_cli/pr/state/state_utils.py
34
35
36
37
38
def get_client(self) -> storage.Client:
    """Return a GCS storage client."""
    if self.client is None:
        self.client = storage.Client()
    return self.client

get_user_state()

Prompt the user to select a run from those available and return that run as a state object.

Source code in dapla_team_cli/pr/state/state_utils.py
46
47
48
49
50
51
52
def get_user_state(self) -> Optional[State]:
    """Prompt the user to select a run from those available and return that run as a state object."""
    state_object_name = self.user_select_run(show_other_users=True)
    if not state_object_name:
        return None

    return self.fetch_state(state_object_name=state_object_name)

list_blobs(show_other_users)

Fetch a list of blobs from a bucket.

Source code in dapla_team_cli/pr/state/state_utils.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def list_blobs(self, show_other_users: bool) -> list[str]:
    """Fetch a list of blobs from a bucket."""
    blobs = self.get_bucket().list_blobs()
    sorted_blobs = sorted(blobs, key=lambda b: b.updated, reverse=True)
    batch_blobs = [f"{blob.name}, {blob.updated.replace(microsecond=0)}" for blob in sorted_blobs]

    if not show_other_users:
        result = subprocess.run(
            ["gcloud", "config", "list", "account", "--format", "value(core.account)"], stdout=subprocess.PIPE, check=True
        )  # noqa: S603
        run_invoker = result.stdout.decode().strip().replace("@ssb.no", "")
        batch_blobs = [blob_data for blob_data in batch_blobs if run_invoker in blob_data]

    return batch_blobs

set_state(state)

Serialize the state object and write it to a json file in persistent storage.

Source code in dapla_team_cli/pr/state/state_utils.py
64
65
66
67
68
def set_state(self, state: State) -> None:
    """Serialize the state object and write it to a json file in persistent storage."""
    self.current_state = state
    blob = self.get_bucket().blob(f"{state.name}.json")
    blob.upload_from_string(state.json())

user_select_run(show_other_users=False)

Prompt the user to select a run from a list of available options.

Source code in dapla_team_cli/pr/state/state_utils.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def user_select_run(self, show_other_users: bool = False) -> StateObjectName:
    """Prompt the user to select a run from a list of available options."""
    batch_blobs = self.list_blobs(show_other_users)

    if batch_blobs is None or len(batch_blobs) == 0:
        print("No state-files were found. Please create a new state-file with 'dpteam batch ready'.")
        sys.exit(1)

    if len(batch_blobs) == 1:
        state_object = batch_blobs[0]
    else:
        state_object = questionary.select(
            "Which run do you want to perform this action on?",
            choices=batch_blobs + [questionary.Separator(), questionary.Choice(FormattedText([("bold fg:ansired", CANCEL)]))],  # type: ignore
        ).ask()

    if state_object in [None, CANCEL]:
        return None

    state_object_name = state_object.split(",")[0]
    print(f"[blue] Using state file with name {state_object_name}")
    return str(state_object_name)