Skip to content

prefect_kubernetes.utilities

Utilities for working with the Python Kubernetes API.

Functions

convert_manifest_to_model

Recursively converts a dict representation of a Kubernetes resource to the corresponding Python model containing the Python models that compose it, according to the openapi_types on the class retrieved with v1_model_name.

If manifest is a path-like object with a .yaml or .yml extension, it will be treated as a path to a Kubernetes resource manifest and loaded into a dict.

Parameters:

Name Type Description Default
manifest Union[Path, str, KubernetesManifest]

A path to a Kubernetes resource manifest or its dict representation.

required
v1_model_name str

The name of a Kubernetes client model to convert the manifest to.

required

Returns:

Type Description
V1KubernetesModel

A populated instance of a Kubernetes client model with type v1_model_name.

Raises:

Type Description
ValueError

If v1_model_name is not a valid Kubernetes client model name.

ValueError

If manifest is path-like and is not a valid yaml filename.

Source code in prefect_kubernetes/utilities.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def convert_manifest_to_model(
    manifest: Union[Path, str, KubernetesManifest], v1_model_name: str
) -> V1KubernetesModel:
    """Recursively converts a `dict` representation of a Kubernetes resource to the
    corresponding Python model containing the Python models that compose it,
    according to the `openapi_types` on the class retrieved with `v1_model_name`.

    If `manifest` is a path-like object with a `.yaml` or `.yml` extension, it will be
    treated as a path to a Kubernetes resource manifest and loaded into a `dict`.

    Args:
        manifest: A path to a Kubernetes resource manifest or its `dict` representation.
        v1_model_name: The name of a Kubernetes client model to convert the manifest to.

    Returns:
        A populated instance of a Kubernetes client model with type `v1_model_name`.

    Raises:
        ValueError: If `v1_model_name` is not a valid Kubernetes client model name.
        ValueError: If `manifest` is path-like and is not a valid yaml filename.
    """
    if not manifest:
        return None

    if not (isinstance(v1_model_name, str) and v1_model_name in set(dir(k8s_models))):
        raise ValueError(
            "`v1_model` must be the name of a valid Kubernetes client model, received "
            f": {v1_model_name!r}"
        )

    if isinstance(manifest, (Path, str)):
        str_path = str(manifest)
        if not str_path.endswith((".yaml", ".yml")):
            raise ValueError("Manifest must be a valid dict or path to a .yaml file.")
        manifest = KubernetesJob.job_from_file(manifest)

    converted_manifest = {}
    v1_model = getattr(k8s_models, v1_model_name)
    valid_supplied_fields = (  # valid and specified fields for current `v1_model_name`
        (k, v)
        for k, v in v1_model.openapi_types.items()
        if v1_model.attribute_map[k] in manifest  # map goes 🐍 -> 🐫, user supplies 🐫
    )

    for field, value_type in valid_supplied_fields:
        if value_type.startswith("V1"):  # field value is another model
            converted_manifest[field] = convert_manifest_to_model(
                manifest[v1_model.attribute_map[field]], value_type
            )
        elif value_type.startswith("list[V1"):  # field value is a list of models
            field_item_type = value_type.replace("list[", "").replace("]", "")
            try:
                converted_manifest[field] = [
                    convert_manifest_to_model(item, field_item_type)
                    for item in manifest[v1_model.attribute_map[field]]
                ]
            except TypeError:
                converted_manifest[field] = manifest[v1_model.attribute_map[field]]
        elif value_type in base_types:  # field value is a primitive Python type
            converted_manifest[field] = manifest[v1_model.attribute_map[field]]

    return v1_model(**converted_manifest)

enable_socket_keep_alive

Setting the keep-alive flags on the kubernetes client object. Unfortunately neither the kubernetes library nor the urllib3 library which kubernetes is using internally offer the functionality to enable keep-alive messages. Thus the flags are added to be used on the underlying sockets.

Source code in prefect_kubernetes/utilities.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def enable_socket_keep_alive(client: ApiClient) -> None:
    """
    Setting the keep-alive flags on the kubernetes client object.
    Unfortunately neither the kubernetes library nor the urllib3 library which
    kubernetes is using internally offer the functionality to enable keep-alive
    messages. Thus the flags are added to be used on the underlying sockets.
    """

    socket_options = [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)]

    if hasattr(socket, "TCP_KEEPINTVL"):
        socket_options.append((socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30))

    if hasattr(socket, "TCP_KEEPCNT"):
        socket_options.append((socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 6))

    if hasattr(socket, "TCP_KEEPIDLE"):
        socket_options.append((socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 6))

    if sys.platform == "darwin":
        # TCP_KEEP_ALIVE not available on socket module in macOS, but defined in
        # https://github.com/apple/darwin-xnu/blob/2ff845c2e033bd0ff64b5b6aa6063a1f8f65aa32/bsd/netinet/tcp.h#L215
        TCP_KEEP_ALIVE = 0x10
        socket_options.append((socket.IPPROTO_TCP, TCP_KEEP_ALIVE, 30))

    client.rest_client.pool_manager.connection_pool_kw[
        "socket_options"
    ] = socket_options