Skip to content

prefect_gcp.secret_manager

Classes

GcpSecret

Bases: SecretBlock

Manages a secret in Google Cloud Platform's Secret Manager.

Attributes:

Name Type Description
gcp_credentials GcpCredentials

Credentials to use for authentication with GCP.

secret_name str

Name of the secret to manage.

secret_version str

Version number of the secret to use, or "latest".

Source code in prefect_gcp/secret_manager.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
class GcpSecret(SecretBlock):
    """
    Manages a secret in Google Cloud Platform's Secret Manager.

    Attributes:
        gcp_credentials: Credentials to use for authentication with GCP.
        secret_name: Name of the secret to manage.
        secret_version: Version number of the secret to use, or "latest".
    """

    _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/10424e311932e31c477ac2b9ef3d53cefbaad708-250x250.png"  # noqa
    _documentation_url = "https://prefecthq.github.io/prefect-gcp/secret_manager/#prefect_gcp.secret_manager.GcpSecret"  # noqa: E501

    gcp_credentials: GcpCredentials
    secret_name: str = Field(default=..., description="Name of the secret to manage.")
    secret_version: str = Field(
        default="latest", description="Version number of the secret to use."
    )

    @sync_compatible
    async def read_secret(self) -> bytes:
        """
        Reads the secret data from the secret storage service.

        Returns:
            The secret data as bytes.
        """
        client = self.gcp_credentials.get_secret_manager_client()
        project = self.gcp_credentials.project
        name = f"projects/{project}/secrets/{self.secret_name}/versions/{self.secret_version}"  # noqa
        request = AccessSecretVersionRequest(name=name)

        self.logger.debug(f"Preparing to read secret data from {name!r}.")
        response = await run_sync_in_worker_thread(
            client.access_secret_version, request=request
        )
        secret = response.payload.data
        self.logger.info(f"The secret {name!r} data was successfully read.")
        return secret

    @sync_compatible
    async def write_secret(self, secret_data: bytes) -> str:
        """
        Writes the secret data to the secret storage service; if it doesn't exist
        it will be created.

        Args:
            secret_data: The secret to write.

        Returns:
            The path that the secret was written to.
        """
        client = self.gcp_credentials.get_secret_manager_client()
        project = self.gcp_credentials.project
        parent = f"projects/{project}/secrets/{self.secret_name}"
        payload = SecretPayload(data=secret_data)
        add_request = AddSecretVersionRequest(parent=parent, payload=payload)

        self.logger.debug(f"Preparing to write secret data to {parent!r}.")
        try:
            response = await run_sync_in_worker_thread(
                client.add_secret_version, request=add_request
            )
        except NotFound:
            self.logger.info(
                f"The secret {parent!r} does not exist yet, creating it now."
            )
            create_parent = f"projects/{project}"
            secret_id = self.secret_name
            secret = Secret(replication=Replication(automatic=Replication.Automatic()))
            create_request = CreateSecretRequest(
                parent=create_parent, secret_id=secret_id, secret=secret
            )
            await run_sync_in_worker_thread(
                client.create_secret, request=create_request
            )

            self.logger.debug(f"Preparing to write secret data to {parent!r} again.")
            response = await run_sync_in_worker_thread(
                client.add_secret_version, request=add_request
            )

        self.logger.info(f"The secret data was written successfully to {parent!r}.")
        return response.name

    @sync_compatible
    async def delete_secret(self) -> str:
        """
        Deletes the secret from the secret storage service.

        Returns:
            The path that the secret was deleted from.
        """
        client = self.gcp_credentials.get_secret_manager_client()
        project = self.gcp_credentials.project

        name = f"projects/{project}/secrets/{self.secret_name}"
        request = DeleteSecretRequest(name=name)

        self.logger.debug(f"Preparing to delete the secret {name!r}.")
        await run_sync_in_worker_thread(client.delete_secret, request=request)
        self.logger.info(f"The secret {name!r} was successfully deleted.")
        return name

Functions

delete_secret async

Deletes the secret from the secret storage service.

Returns:

Type Description
str

The path that the secret was deleted from.

Source code in prefect_gcp/secret_manager.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
@sync_compatible
async def delete_secret(self) -> str:
    """
    Deletes the secret from the secret storage service.

    Returns:
        The path that the secret was deleted from.
    """
    client = self.gcp_credentials.get_secret_manager_client()
    project = self.gcp_credentials.project

    name = f"projects/{project}/secrets/{self.secret_name}"
    request = DeleteSecretRequest(name=name)

    self.logger.debug(f"Preparing to delete the secret {name!r}.")
    await run_sync_in_worker_thread(client.delete_secret, request=request)
    self.logger.info(f"The secret {name!r} was successfully deleted.")
    return name
read_secret async

Reads the secret data from the secret storage service.

Returns:

Type Description
bytes

The secret data as bytes.

Source code in prefect_gcp/secret_manager.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
@sync_compatible
async def read_secret(self) -> bytes:
    """
    Reads the secret data from the secret storage service.

    Returns:
        The secret data as bytes.
    """
    client = self.gcp_credentials.get_secret_manager_client()
    project = self.gcp_credentials.project
    name = f"projects/{project}/secrets/{self.secret_name}/versions/{self.secret_version}"  # noqa
    request = AccessSecretVersionRequest(name=name)

    self.logger.debug(f"Preparing to read secret data from {name!r}.")
    response = await run_sync_in_worker_thread(
        client.access_secret_version, request=request
    )
    secret = response.payload.data
    self.logger.info(f"The secret {name!r} data was successfully read.")
    return secret
write_secret async

Writes the secret data to the secret storage service; if it doesn't exist it will be created.

Parameters:

Name Type Description Default
secret_data bytes

The secret to write.

required

Returns:

Type Description
str

The path that the secret was written to.

Source code in prefect_gcp/secret_manager.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
@sync_compatible
async def write_secret(self, secret_data: bytes) -> str:
    """
    Writes the secret data to the secret storage service; if it doesn't exist
    it will be created.

    Args:
        secret_data: The secret to write.

    Returns:
        The path that the secret was written to.
    """
    client = self.gcp_credentials.get_secret_manager_client()
    project = self.gcp_credentials.project
    parent = f"projects/{project}/secrets/{self.secret_name}"
    payload = SecretPayload(data=secret_data)
    add_request = AddSecretVersionRequest(parent=parent, payload=payload)

    self.logger.debug(f"Preparing to write secret data to {parent!r}.")
    try:
        response = await run_sync_in_worker_thread(
            client.add_secret_version, request=add_request
        )
    except NotFound:
        self.logger.info(
            f"The secret {parent!r} does not exist yet, creating it now."
        )
        create_parent = f"projects/{project}"
        secret_id = self.secret_name
        secret = Secret(replication=Replication(automatic=Replication.Automatic()))
        create_request = CreateSecretRequest(
            parent=create_parent, secret_id=secret_id, secret=secret
        )
        await run_sync_in_worker_thread(
            client.create_secret, request=create_request
        )

        self.logger.debug(f"Preparing to write secret data to {parent!r} again.")
        response = await run_sync_in_worker_thread(
            client.add_secret_version, request=add_request
        )

    self.logger.info(f"The secret data was written successfully to {parent!r}.")
    return response.name

Functions

create_secret async

Creates a secret in Google Cloud Platform's Secret Manager.

Parameters:

Name Type Description Default
secret_name str

Name of the secret to retrieve.

required
gcp_credentials GcpCredentials

Credentials to use for authentication with GCP.

required
timeout float

The number of seconds the transport should wait for the server response.

60
project Optional[str]

Name of the project to use; overrides the gcp_credentials project if provided.

None

Returns:

Type Description
str

The path of the created secret.

Example
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.secret_manager import create_secret

@flow()
def example_cloud_storage_create_secret_flow():
    gcp_credentials = GcpCredentials(project="project")
    secret_path = create_secret("secret_name", gcp_credentials)
    return secret_path

example_cloud_storage_create_secret_flow()
Source code in prefect_gcp/secret_manager.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@task
async def create_secret(
    secret_name: str,
    gcp_credentials: "GcpCredentials",
    timeout: float = 60,
    project: Optional[str] = None,
) -> str:
    """
    Creates a secret in Google Cloud Platform's Secret Manager.

    Args:
        secret_name: Name of the secret to retrieve.
        gcp_credentials: Credentials to use for authentication with GCP.
        timeout: The number of seconds the transport should wait
            for the server response.
        project: Name of the project to use; overrides the
            gcp_credentials project if provided.

    Returns:
        The path of the created secret.

    Example:
        ```python
        from prefect import flow
        from prefect_gcp import GcpCredentials
        from prefect_gcp.secret_manager import create_secret

        @flow()
        def example_cloud_storage_create_secret_flow():
            gcp_credentials = GcpCredentials(project="project")
            secret_path = create_secret("secret_name", gcp_credentials)
            return secret_path

        example_cloud_storage_create_secret_flow()
        ```
    """
    logger = get_run_logger()
    logger.info("Creating the %s secret", secret_name)

    client = gcp_credentials.get_secret_manager_client()
    project = project or gcp_credentials.project

    parent = f"projects/{project}"
    secret_settings = {"replication": {"automatic": {}}}

    partial_create = partial(
        client.create_secret,
        parent=parent,
        secret_id=secret_name,
        secret=secret_settings,
        timeout=timeout,
    )
    response = await to_thread.run_sync(partial_create)
    return response.name

delete_secret async

Deletes the specified secret from Google Cloud Platform's Secret Manager.

Parameters:

Name Type Description Default
secret_name str

Name of the secret to delete.

required
gcp_credentials GcpCredentials

Credentials to use for authentication with GCP.

required
timeout float

The number of seconds the transport should wait for the server response.

60
project Optional[str]

Name of the project to use; overrides the gcp_credentials project if provided.

None

Returns:

Type Description
str

The path of the deleted secret.

Example
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.secret_manager import delete_secret

@flow()
def example_cloud_storage_delete_secret_flow():
    gcp_credentials = GcpCredentials(project="project")
    secret_path = delete_secret("secret_name", gcp_credentials)
    return secret_path

example_cloud_storage_delete_secret_flow()
Source code in prefect_gcp/secret_manager.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
@task
async def delete_secret(
    secret_name: str,
    gcp_credentials: "GcpCredentials",
    timeout: float = 60,
    project: Optional[str] = None,
) -> str:
    """
    Deletes the specified secret from Google Cloud Platform's Secret Manager.

    Args:
        secret_name: Name of the secret to delete.
        gcp_credentials: Credentials to use for authentication with GCP.
        timeout: The number of seconds the transport should wait
            for the server response.
        project: Name of the project to use; overrides the
            gcp_credentials project if provided.

    Returns:
        The path of the deleted secret.

    Example:
        ```python
        from prefect import flow
        from prefect_gcp import GcpCredentials
        from prefect_gcp.secret_manager import delete_secret

        @flow()
        def example_cloud_storage_delete_secret_flow():
            gcp_credentials = GcpCredentials(project="project")
            secret_path = delete_secret("secret_name", gcp_credentials)
            return secret_path

        example_cloud_storage_delete_secret_flow()
        ```
    """
    logger = get_run_logger()
    logger.info("Deleting %s secret", secret_name)

    client = gcp_credentials.get_secret_manager_client()
    project = project or gcp_credentials.project

    name = f"projects/{project}/secrets/{secret_name}/"
    partial_delete = partial(client.delete_secret, name=name, timeout=timeout)
    await to_thread.run_sync(partial_delete)
    return name

delete_secret_version async

Deletes a version of a given secret from Google Cloud Platform's Secret Manager.

Parameters:

Name Type Description Default
secret_name str

Name of the secret to retrieve.

required
version_id int

Version number of the secret to use; "latest" can NOT be used.

required
gcp_credentials GcpCredentials

Credentials to use for authentication with GCP.

required
timeout float

The number of seconds the transport should wait for the server response.

60
project Optional[str]

Name of the project to use; overrides the gcp_credentials project if provided.

None

Returns:

Type Description
str

The path of the deleted secret version.

Example
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.secret_manager import delete_secret_version

@flow()
def example_cloud_storage_delete_secret_version_flow():
    gcp_credentials = GcpCredentials(project="project")
    secret_value = delete_secret_version("secret_name", 1, gcp_credentials)
    return secret_value

example_cloud_storage_delete_secret_version_flow()
Source code in prefect_gcp/secret_manager.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
@task
async def delete_secret_version(
    secret_name: str,
    version_id: int,
    gcp_credentials: "GcpCredentials",
    timeout: float = 60,
    project: Optional[str] = None,
) -> str:
    """
    Deletes a version of a given secret from Google Cloud Platform's Secret Manager.

    Args:
        secret_name: Name of the secret to retrieve.
        version_id: Version number of the secret to use; "latest" can NOT be used.
        gcp_credentials: Credentials to use for authentication with GCP.
        timeout: The number of seconds the transport should wait
            for the server response.
        project: Name of the project to use; overrides the
            gcp_credentials project if provided.

    Returns:
        The path of the deleted secret version.

    Example:
        ```python
        from prefect import flow
        from prefect_gcp import GcpCredentials
        from prefect_gcp.secret_manager import delete_secret_version

        @flow()
        def example_cloud_storage_delete_secret_version_flow():
            gcp_credentials = GcpCredentials(project="project")
            secret_value = delete_secret_version("secret_name", 1, gcp_credentials)
            return secret_value

        example_cloud_storage_delete_secret_version_flow()
        ```
    """
    logger = get_run_logger()
    logger.info("Reading %s version of %s secret", version_id, secret_name)

    client = gcp_credentials.get_secret_manager_client()
    project = project or gcp_credentials.project

    if version_id == "latest":
        raise ValueError("The version_id cannot be 'latest'")

    name = f"projects/{project}/secrets/{secret_name}/versions/{version_id}"
    partial_destroy = partial(client.destroy_secret_version, name=name, timeout=timeout)
    await to_thread.run_sync(partial_destroy)
    return name

read_secret async

Reads the value of a given secret from Google Cloud Platform's Secret Manager.

Parameters:

Name Type Description Default
secret_name str

Name of the secret to retrieve.

required
gcp_credentials GcpCredentials

Credentials to use for authentication with GCP.

required
timeout float

The number of seconds the transport should wait for the server response.

60
project Optional[str]

Name of the project to use; overrides the gcp_credentials project if provided.

None

Returns:

Type Description
str

Contents of the specified secret.

Example
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.secret_manager import read_secret

@flow()
def example_cloud_storage_read_secret_flow():
    gcp_credentials = GcpCredentials(project="project")
    secret_value = read_secret("secret_name", gcp_credentials, version_id=1)
    return secret_value

example_cloud_storage_read_secret_flow()
Source code in prefect_gcp/secret_manager.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
@task
async def read_secret(
    secret_name: str,
    gcp_credentials: "GcpCredentials",
    version_id: Union[str, int] = "latest",
    timeout: float = 60,
    project: Optional[str] = None,
) -> str:
    """
    Reads the value of a given secret from Google Cloud Platform's Secret Manager.

    Args:
        secret_name: Name of the secret to retrieve.
        gcp_credentials: Credentials to use for authentication with GCP.
        timeout: The number of seconds the transport should wait
            for the server response.
        project: Name of the project to use; overrides the
            gcp_credentials project if provided.

    Returns:
        Contents of the specified secret.

    Example:
        ```python
        from prefect import flow
        from prefect_gcp import GcpCredentials
        from prefect_gcp.secret_manager import read_secret

        @flow()
        def example_cloud_storage_read_secret_flow():
            gcp_credentials = GcpCredentials(project="project")
            secret_value = read_secret("secret_name", gcp_credentials, version_id=1)
            return secret_value

        example_cloud_storage_read_secret_flow()
        ```
    """
    logger = get_run_logger()
    logger.info("Reading %s version of %s secret", version_id, secret_name)

    client = gcp_credentials.get_secret_manager_client()
    project = project or gcp_credentials.project

    name = f"projects/{project}/secrets/{secret_name}/versions/{version_id}"
    partial_access = partial(client.access_secret_version, name=name, timeout=timeout)
    response = await to_thread.run_sync(partial_access)
    secret = response.payload.data.decode("UTF-8")
    return secret

update_secret async

Updates a secret in Google Cloud Platform's Secret Manager.

Parameters:

Name Type Description Default
secret_name str

Name of the secret to retrieve.

required
secret_value Union[str, bytes]

Desired value of the secret. Can be either str or bytes.

required
gcp_credentials GcpCredentials

Credentials to use for authentication with GCP.

required
timeout float

The number of seconds the transport should wait for the server response.

60
project Optional[str]

Name of the project to use; overrides the gcp_credentials project if provided.

None

Returns:

Type Description
str

The path of the updated secret.

Example
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.secret_manager import update_secret

@flow()
def example_cloud_storage_update_secret_flow():
    gcp_credentials = GcpCredentials(project="project")
    secret_path = update_secret("secret_name", "secret_value", gcp_credentials)
    return secret_path

example_cloud_storage_update_secret_flow()
Source code in prefect_gcp/secret_manager.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@task
async def update_secret(
    secret_name: str,
    secret_value: Union[str, bytes],
    gcp_credentials: "GcpCredentials",
    timeout: float = 60,
    project: Optional[str] = None,
) -> str:
    """
    Updates a secret in Google Cloud Platform's Secret Manager.

    Args:
        secret_name: Name of the secret to retrieve.
        secret_value: Desired value of the secret. Can be either `str` or `bytes`.
        gcp_credentials: Credentials to use for authentication with GCP.
        timeout: The number of seconds the transport should wait
            for the server response.
        project: Name of the project to use; overrides the
            gcp_credentials project if provided.

    Returns:
        The path of the updated secret.

    Example:
        ```python
        from prefect import flow
        from prefect_gcp import GcpCredentials
        from prefect_gcp.secret_manager import update_secret

        @flow()
        def example_cloud_storage_update_secret_flow():
            gcp_credentials = GcpCredentials(project="project")
            secret_path = update_secret("secret_name", "secret_value", gcp_credentials)
            return secret_path

        example_cloud_storage_update_secret_flow()
        ```
    """
    logger = get_run_logger()
    logger.info("Updating the %s secret", secret_name)

    client = gcp_credentials.get_secret_manager_client()
    project = project or gcp_credentials.project

    parent = f"projects/{project}/secrets/{secret_name}"
    if isinstance(secret_value, str):
        secret_value = secret_value.encode("UTF-8")
    partial_add = partial(
        client.add_secret_version,
        parent=parent,
        payload={"data": secret_value},
        timeout=timeout,
    )
    response = await to_thread.run_sync(partial_add)
    return response.name