Skip to content

langchain_prefect.plugins

Module for defining Prefect plugins for langchain.

Classes

RecordLLMCalls

Bases: ContextDecorator

Context decorator for patching LLM calls with a prefect flow.

Source code in langchain_prefect/plugins.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
class RecordLLMCalls(ContextDecorator):
    """Context decorator for patching LLM calls with a prefect flow."""

    def __init__(self, **decorator_kwargs):
        """Context decorator for patching LLM calls with a prefect flow.

        Args:
            tags: Tags to apply to flow runs created by this context manager.
            flow_kwargs: Keyword arguments to pass to the flow decorator.
            max_prompt_tokens: The maximum number of tokens allowed in a prompt.

        Example:
            Create a flow with `a_custom_tag` upon calling `OpenAI.generate`:

            >>> with RecordLLMCalls(tags={"a_custom_tag"}):
            >>>    llm = OpenAI(temperature=0.9)
            >>>    llm(
            >>>        "What would be a good company name "
            >>>        "for a company that makes carbonated water?"
            >>>    )

            Track many LLM calls when using a langchain agent

            >>> llm = OpenAI(temperature=0)
            >>> tools = load_tools(["llm-math"], llm=llm)
            >>> agent = initialize_agent(tools, llm)

            >>> @flow
            >>> def my_flow():  # noqa: D103
            >>>     agent.run(
            >>>         "How old is the current Dalai Lama? "
            >>>         "What is his age divided by 2 (rounded to the nearest integer)?"
            >>>     )

            >>> with RecordLLMCalls():
            >>>     my_flow()

            Create an async flow upon calling `OpenAI.agenerate`:

            >>> with RecordLLMCalls():
            >>>    llm = OpenAI(temperature=0.9)
            >>>    await llm.agenerate(
            >>>        [
            >>>            "Good name for a company that makes colorful socks?",
            >>>            "Good name for a company that sells carbonated water?",
            >>>        ]
            >>>    )

            Create flow for LLM call and enforce a max number of tokens in the prompt:

            >>> with RecordLLMCalls(max_prompt_tokens=100):
            >>>    llm = OpenAI(temperature=0.9)
            >>>    llm(
            >>>        "What would be a good company name "
            >>>        "for a company that makes carbonated water?"
            >>>    )
        """
        self.decorator_kwargs = decorator_kwargs

    def __enter__(self):
        """Called when entering the context manager.

        This is what would need to be changed if Langchain started making
        LLM api calls in a different place.
        """
        self.patched_methods = []
        for subcls in BaseLanguageModel.__subclasses__():
            if subcls.__name__ == "BaseChatModel":
                for subsubcls in subcls.__subclasses__():
                    # patch `BaseChatModel` generate methods when used as callable
                    self._patch_method(subsubcls, "_generate", record_llm_call)
                    self._patch_method(subsubcls, "_agenerate", record_llm_call)

            self._patch_method(subcls, "generate", record_llm_call)
            self._patch_method(subcls, "agenerate", record_llm_call)

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Reset methods when exiting the context manager."""
        for cls, method_name, original_method in self.patched_methods:
            setattr(cls, method_name, original_method)

    def _patch_method(self, cls, method_name, decorator):
        """Patch a method on a class with a decorator."""
        original_method = getattr(cls, method_name)
        modified_method = decorator(original_method, **self.decorator_kwargs)
        setattr(cls, method_name, modified_method)
        self.patched_methods.append((cls, method_name, original_method))

Functions

__enter__

Called when entering the context manager.

This is what would need to be changed if Langchain started making LLM api calls in a different place.

Source code in langchain_prefect/plugins.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def __enter__(self):
    """Called when entering the context manager.

    This is what would need to be changed if Langchain started making
    LLM api calls in a different place.
    """
    self.patched_methods = []
    for subcls in BaseLanguageModel.__subclasses__():
        if subcls.__name__ == "BaseChatModel":
            for subsubcls in subcls.__subclasses__():
                # patch `BaseChatModel` generate methods when used as callable
                self._patch_method(subsubcls, "_generate", record_llm_call)
                self._patch_method(subsubcls, "_agenerate", record_llm_call)

        self._patch_method(subcls, "generate", record_llm_call)
        self._patch_method(subcls, "agenerate", record_llm_call)
__exit__

Reset methods when exiting the context manager.

Source code in langchain_prefect/plugins.py
135
136
137
138
def __exit__(self, exc_type, exc_val, exc_tb):
    """Reset methods when exiting the context manager."""
    for cls, method_name, original_method in self.patched_methods:
        setattr(cls, method_name, original_method)
__init__

Context decorator for patching LLM calls with a prefect flow.

Parameters:

Name Type Description Default
tags

Tags to apply to flow runs created by this context manager.

required
flow_kwargs

Keyword arguments to pass to the flow decorator.

required
max_prompt_tokens

The maximum number of tokens allowed in a prompt.

required
Example

Create a flow with a_custom_tag upon calling OpenAI.generate:

with RecordLLMCalls(tags={"a_custom_tag"}): llm = OpenAI(temperature=0.9) llm( "What would be a good company name " "for a company that makes carbonated water?" )

Track many LLM calls when using a langchain agent

llm = OpenAI(temperature=0) tools = load_tools(["llm-math"], llm=llm) agent = initialize_agent(tools, llm)

@flow def my_flow(): # noqa: D103 agent.run( "How old is the current Dalai Lama? " "What is his age divided by 2 (rounded to the nearest integer)?" )

with RecordLLMCalls(): my_flow()

Create an async flow upon calling OpenAI.agenerate:

with RecordLLMCalls(): llm = OpenAI(temperature=0.9) await llm.agenerate( [ "Good name for a company that makes colorful socks?", "Good name for a company that sells carbonated water?", ] )

Create flow for LLM call and enforce a max number of tokens in the prompt:

with RecordLLMCalls(max_prompt_tokens=100): llm = OpenAI(temperature=0.9) llm( "What would be a good company name " "for a company that makes carbonated water?" )

Source code in langchain_prefect/plugins.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def __init__(self, **decorator_kwargs):
    """Context decorator for patching LLM calls with a prefect flow.

    Args:
        tags: Tags to apply to flow runs created by this context manager.
        flow_kwargs: Keyword arguments to pass to the flow decorator.
        max_prompt_tokens: The maximum number of tokens allowed in a prompt.

    Example:
        Create a flow with `a_custom_tag` upon calling `OpenAI.generate`:

        >>> with RecordLLMCalls(tags={"a_custom_tag"}):
        >>>    llm = OpenAI(temperature=0.9)
        >>>    llm(
        >>>        "What would be a good company name "
        >>>        "for a company that makes carbonated water?"
        >>>    )

        Track many LLM calls when using a langchain agent

        >>> llm = OpenAI(temperature=0)
        >>> tools = load_tools(["llm-math"], llm=llm)
        >>> agent = initialize_agent(tools, llm)

        >>> @flow
        >>> def my_flow():  # noqa: D103
        >>>     agent.run(
        >>>         "How old is the current Dalai Lama? "
        >>>         "What is his age divided by 2 (rounded to the nearest integer)?"
        >>>     )

        >>> with RecordLLMCalls():
        >>>     my_flow()

        Create an async flow upon calling `OpenAI.agenerate`:

        >>> with RecordLLMCalls():
        >>>    llm = OpenAI(temperature=0.9)
        >>>    await llm.agenerate(
        >>>        [
        >>>            "Good name for a company that makes colorful socks?",
        >>>            "Good name for a company that sells carbonated water?",
        >>>        ]
        >>>    )

        Create flow for LLM call and enforce a max number of tokens in the prompt:

        >>> with RecordLLMCalls(max_prompt_tokens=100):
        >>>    llm = OpenAI(temperature=0.9)
        >>>    llm(
        >>>        "What would be a good company name "
        >>>        "for a company that makes carbonated water?"
        >>>    )
    """
    self.decorator_kwargs = decorator_kwargs

Functions

record_llm_call

Decorator for wrapping a Langchain LLM call with a prefect flow.

Source code in langchain_prefect/plugins.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def record_llm_call(
    func: Callable[..., LLMResult],
    tags: set | None = None,
    max_prompt_tokens: int | None = int(1e4),
    flow_kwargs: dict | None = None,
) -> Callable[..., Flow]:
    """Decorator for wrapping a Langchain LLM call with a prefect flow."""

    tags = tags or set()

    @wraps(func)
    def wrapper(*args, **kwargs):
        """wrapper for LLM calls"""
        invocation_artifact = llm_invocation_summary(
            invocation_fn=func, *args, **kwargs
        )

        llm_endpoint = invocation_artifact.content["llm_endpoint"]
        prompts = invocation_artifact.content["prompts"]

        if max_prompt_tokens and (
            (N := num_tokens(get_prompt_content(prompts))) > max_prompt_tokens
        ):
            raise ValueError(
                f"Prompt is too long: it contains {N} tokens"
                f" and {max_prompt_tokens=}. Did not call {llm_endpoint!r}. "
                "If desired, increase `max_prompt_tokens`."
            )

        llm_generate = flow_wrapped_fn(func, flow_kwargs, *args, **kwargs)

        with prefect_tags(*[llm_endpoint, *tags]):
            return llm_generate.with_options(
                flow_run_name=f"Calling {llm_endpoint}"  # noqa: E501
            )(llm_input=invocation_artifact)

    return wrapper