Skip to content


Credentials block for authenticating with Snowflake.


InvalidPemFormat (Exception)

Invalid PEM Format Certificate

Source code in prefect_snowflake/
class InvalidPemFormat(Exception):
    """Invalid PEM Format Certificate"""

SnowflakeCredentials (CredentialsBlock) pydantic-model

Block used to manage authentication with Snowflake.


Name Type Description Default
account str

The snowflake account name.

user str

The user name used to authenticate.

password SecretStr

The password used to authenticate.

private_key SecretStr

The PEM used to authenticate.

authenticator str

The type of authenticator to use for initializing connection (oauth, externalbrowser, etc); refer to Snowflake documentation for details, and note that externalbrowser will only work in an environment where a browser is available.

token SecretStr

The OAuth or JWT Token to provide when authenticator is set to OAuth.

endpoint str

The Okta endpoint to use when authenticator is set to okta_endpoint, e.g. https://<okta_account_name>

role str

The name of the default role to use.

autocommit bool

Whether to automatically commit.



Load stored Snowflake credentials:

from prefect_snowflake import SnowflakeCredentials

snowflake_credentials_block = SnowflakeCredentials.load("BLOCK_NAME")

Source code in prefect_snowflake/
class SnowflakeCredentials(CredentialsBlock):
    Block used to manage authentication with Snowflake.

        account (str): The snowflake account name.
        user (str): The user name used to authenticate.
        password (SecretStr): The password used to authenticate.
        private_key (SecretStr): The PEM used to authenticate.
        authenticator (str): The type of authenticator to use for initializing
            connection (oauth, externalbrowser, etc); refer to
            [Snowflake documentation](
            for details, and note that `externalbrowser` will only
            work in an environment where a browser is available.
        token (SecretStr): The OAuth or JWT Token to provide when
            authenticator is set to OAuth.
        endpoint (str): The Okta endpoint to use when authenticator is
            set to `okta_endpoint`, e.g. `https://<okta_account_name>`.
        role (str): The name of the default role to use.
        autocommit (bool): Whether to automatically commit.

        Load stored Snowflake credentials:
        from prefect_snowflake import SnowflakeCredentials

        snowflake_credentials_block = SnowflakeCredentials.load("BLOCK_NAME")
    """  # noqa E501

    _block_type_name = "Snowflake Credentials"
    _logo_url = ""  # noqa
    _documentation_url = ""  # noqa

    account: str = Field(
        ..., description="The snowflake account name.", example=""
    user: str = Field(..., description="The user name used to authenticate.")
    password: Optional[SecretStr] = Field(
        default=None, description="The password used to authenticate."
    private_key: Optional[SecretBytes] = Field(
        default=None, description="The PEM used to authenticate."
    private_key_path: Optional[Path] = Field(
        default=None, description="The path to the private key."
    private_key_passphrase: Optional[SecretStr] = Field(
        default=None, description="The password to use for the private key."
    authenticator: Literal[
    ] = Field(  # noqa
        description=("The type of authenticator to use for initializing connection."),
    token: Optional[SecretStr] = Field(
            "The OAuth or JWT Token to provide when authenticator is set to `oauth`."
    endpoint: Optional[str] = Field(
            "The Okta endpoint to use when authenticator is set to `okta_endpoint`."
    role: Optional[str] = Field(
        default=None, description="The name of the default role to use."
    autocommit: Optional[bool] = Field(
        default=None, description="Whether to automatically commit."

    def _validate_auth_kwargs(cls, values):
        Ensure an authorization value has been provided by the user.
        auth_params = (
        if not any(values.get(param) for param in auth_params):
            auth_str = ", ".join(auth_params)
            raise ValueError(
                f"One of the authentication keys must be provided: {auth_str}\n"
        elif values.get("private_key") and values.get("private_key_path"):
            raise ValueError(
                "Do not provide both private_key and private_key_path; select one."
        elif values.get("password") and values.get("private_key_passphrase"):
            raise ValueError(
                "Do not provide both password and private_key_passphrase; "
                "specify private_key_passphrase only instead."
        return values

    def _validate_token_kwargs(cls, values):
        Ensure an authorization value has been provided by the user.
        authenticator = values.get("authenticator")
        token = values.get("token")
        if authenticator == "oauth" and not token:
            raise ValueError(
                "If authenticator is set to `oauth`, `token` must be provided"
        return values

    def _validate_okta_kwargs(cls, values):
        Ensure an authorization value has been provided by the user.
        authenticator = values.get("authenticator")

        # did not want to make a breaking change so we will allow both
        # see
        if "okta_endpoint" in values.keys():
                "Please specify `endpoint` instead of `okta_endpoint`; "
                "`okta_endpoint` will be removed March 31, 2023.",
            # remove okta endpoint from fields
            okta_endpoint = values.pop("okta_endpoint")
            if "endpoint" not in values.keys():
                values["endpoint"] = okta_endpoint

        endpoint = values.get("endpoint")
        if authenticator == "okta_endpoint" and not endpoint:
            raise ValueError(
                "If authenticator is set to `okta_endpoint`, "
                "`endpoint` must be provided"
        return values

    def resolve_private_key(self) -> Optional[bytes]:
        Converts a PEM encoded private key into a DER binary key.

            DER encoded key if private_key has been provided otherwise returns None.

            InvalidPemFormat: If private key is not in PEM format.
        if self.private_key_path is None and self.private_key is None:
            return None
        elif self.private_key_path:
            private_key = self.private_key_path.read_bytes()
            private_key = self._decode_secret(self.private_key)

        if self.private_key_passphrase is not None:
            password = self._decode_secret(self.private_key_passphrase)
        elif self.password is not None:
                "Using the password field for private_key is deprecated "
                "and will not work after March 31, 2023; please use "
                "private_key_passphrase instead",
            password = self._decode_secret(self.password)
            password = None

        composed_private_key = self._compose_pem(private_key)
        return load_pem_private_key(

    def _decode_secret(secret: Union[SecretStr, SecretBytes]) -> Optional[bytes]:
        Decode the provided secret into bytes. If the secret is not a
        string or bytes, or it is whitespace, then return None.

            secret: The value to decode.

            The decoded secret as bytes.

        if isinstance(secret, (SecretBytes, SecretStr)):
            secret = secret.get_secret_value()

        if not isinstance(secret, (bytes, str)) or len(secret) == 0 or secret.isspace():
            return None

        return secret if isinstance(secret, bytes) else secret.encode()

    def _compose_pem(private_key: bytes) -> bytes:
        """Validate structure of PEM certificate.

        The original key passed from Prefect is sometimes malformed.
        This function recomposes the key into a valid key that will
        pass the serialization step when resolving the key to a DER.

            private_key: A valid PEM format byte encoded string.

            byte encoded certificate.

            InvalidPemFormat: if private key is an invalid format.
        pem_parts = re.match(_SIMPLE_PEM_CERTIFICATE_REGEX, private_key.decode())
        if pem_parts is None:
            raise InvalidPemFormat()

        body = "\n".join(re.split(r"\s+", pem_parts[2].strip()))
        # reassemble header+body+footer
        return f"{pem_parts[1]}\n{body}\n{pem_parts[3]}".encode()

    def get_client(
        self, **connect_kwargs: Any
    ) -> snowflake.connector.SnowflakeConnection:
        Returns an authenticated connection that can be used to query
        Snowflake databases.

        Any additional arguments passed to this method will be used to configure
        the SnowflakeConnection. For available parameters, please refer to the
        [Snowflake Python connector documentation](

            **connect_kwargs: Additional arguments to pass to

            An authenticated Snowflake connection.

            Get Snowflake connection with only block configuration:
            from prefect_snowflake import SnowflakeCredentials

            snowflake_credentials_block = SnowflakeCredentials.load("BLOCK_NAME")

            connection = snowflake_credentials_block.get_client()

            Get Snowflake connector scoped to a specified database:
            from prefect_snowflake import SnowflakeCredentials

            snowflake_credentials_block = SnowflakeCredentials.load("BLOCK_NAME")

            connection = snowflake_credentials_block.get_client(database="my_database")
        """  # noqa
        connect_params = {
            # required to track task's usage in the Snowflake Partner Network Portal
            "application": "Prefect_Snowflake_Collection",
            **self.dict(exclude_unset=True, exclude={"block_type_slug"}),

        for key, value in connect_params.items():
            if isinstance(value, SecretField):
                connect_params[key] = connect_params[key].get_secret_value()

        # set authenticator to the actual okta_endpoint
        if connect_params.get("authenticator") == "okta_endpoint":
            endpoint = connect_params.pop("endpoint", None) or connect_params.pop(
                "okta_endpoint", None
            )  # okta_endpoint is deprecated
            connect_params["authenticator"] = endpoint

        private_der_key = self.resolve_private_key()
        if private_der_key is not None:
            connect_params["private_key"] = private_der_key
            connect_params.pop("password", None)
            connect_params.pop("private_key_passphrase", None)

        return snowflake.connector.connect(**connect_params)


account: str pydantic-field required

The snowflake account name.

authenticator: Literal['snowflake', 'snowflake_jwt', 'externalbrowser', 'okta_endpoint', 'oauth', 'username_password_mfa'] pydantic-field

The type of authenticator to use for initializing connection.

autocommit: bool pydantic-field

Whether to automatically commit.

endpoint: str pydantic-field

The Okta endpoint to use when authenticator is set to okta_endpoint.

password: SecretStr pydantic-field

The password used to authenticate.

private_key: SecretBytes pydantic-field

The PEM used to authenticate.

private_key_passphrase: SecretStr pydantic-field

The password to use for the private key.

private_key_path: Path pydantic-field

The path to the private key.

role: str pydantic-field

The name of the default role to use.

token: SecretStr pydantic-field

The OAuth or JWT Token to provide when authenticator is set to oauth.

user: str pydantic-field required

The user name used to authenticate.


__json_encoder__ special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.


Returns an authenticated connection that can be used to query Snowflake databases.

Any additional arguments passed to this method will be used to configure the SnowflakeConnection. For available parameters, please refer to the Snowflake Python connector documentation.


Name Type Description Default
**connect_kwargs Any

Additional arguments to pass to snowflake.connector.connect.



Type Description

An authenticated Snowflake connection.


Get Snowflake connection with only block configuration:

from prefect_snowflake import SnowflakeCredentials

snowflake_credentials_block = SnowflakeCredentials.load("BLOCK_NAME")

connection = snowflake_credentials_block.get_client()

Get Snowflake connector scoped to a specified database:

from prefect_snowflake import SnowflakeCredentials

snowflake_credentials_block = SnowflakeCredentials.load("BLOCK_NAME")

connection = snowflake_credentials_block.get_client(database="my_database")

Source code in prefect_snowflake/
def get_client(
    self, **connect_kwargs: Any
) -> snowflake.connector.SnowflakeConnection:
    Returns an authenticated connection that can be used to query
    Snowflake databases.

    Any additional arguments passed to this method will be used to configure
    the SnowflakeConnection. For available parameters, please refer to the
    [Snowflake Python connector documentation](

        **connect_kwargs: Additional arguments to pass to

        An authenticated Snowflake connection.

        Get Snowflake connection with only block configuration:
        from prefect_snowflake import SnowflakeCredentials

        snowflake_credentials_block = SnowflakeCredentials.load("BLOCK_NAME")

        connection = snowflake_credentials_block.get_client()

        Get Snowflake connector scoped to a specified database:
        from prefect_snowflake import SnowflakeCredentials

        snowflake_credentials_block = SnowflakeCredentials.load("BLOCK_NAME")

        connection = snowflake_credentials_block.get_client(database="my_database")
    """  # noqa
    connect_params = {
        # required to track task's usage in the Snowflake Partner Network Portal
        "application": "Prefect_Snowflake_Collection",
        **self.dict(exclude_unset=True, exclude={"block_type_slug"}),

    for key, value in connect_params.items():
        if isinstance(value, SecretField):
            connect_params[key] = connect_params[key].get_secret_value()

    # set authenticator to the actual okta_endpoint
    if connect_params.get("authenticator") == "okta_endpoint":
        endpoint = connect_params.pop("endpoint", None) or connect_params.pop(
            "okta_endpoint", None
        )  # okta_endpoint is deprecated
        connect_params["authenticator"] = endpoint

    private_der_key = self.resolve_private_key()
    if private_der_key is not None:
        connect_params["private_key"] = private_der_key
        connect_params.pop("password", None)
        connect_params.pop("private_key_passphrase", None)

    return snowflake.connector.connect(**connect_params)

Converts a PEM encoded private key into a DER binary key.


Type Description

DER encoded key if private_key has been provided otherwise returns None.


Type Description

If private key is not in PEM format.

Source code in prefect_snowflake/
def resolve_private_key(self) -> Optional[bytes]:
    Converts a PEM encoded private key into a DER binary key.

        DER encoded key if private_key has been provided otherwise returns None.

        InvalidPemFormat: If private key is not in PEM format.
    if self.private_key_path is None and self.private_key is None:
        return None
    elif self.private_key_path:
        private_key = self.private_key_path.read_bytes()
        private_key = self._decode_secret(self.private_key)

    if self.private_key_passphrase is not None:
        password = self._decode_secret(self.private_key_passphrase)
    elif self.password is not None:
            "Using the password field for private_key is deprecated "
            "and will not work after March 31, 2023; please use "
            "private_key_passphrase instead",
        password = self._decode_secret(self.password)
        password = None

    composed_private_key = self._compose_pem(private_key)
    return load_pem_private_key(