LangChain#

  1. Basics

  2. RAG

  3. LangGraph

  4. LangGraph Platform

  5. Cognitive Architectures

  6. `Agent Architectures`_

  7. LLM Patterns

  8. Deployment

  9. Prompting Basics

back to top

Basics#

  • Interfaces, LLM Output, Component Composition

  • LangChain provides abstractions for each major prompting technique, utilising Python and JavaScript for wrappers

  • has integrations with commercial and open source LLM providers

  • prompt templates enable to reuse prompts more than once, and store them in the LangChain Hub

Interfaces#

  • Chat Model
    • LLM providers like OpenAI differentiate messages sent to and from the model into roles

    • System role: for instructions the model should use to answer a user question

    • User role: for user’s query and other content produced by the user

    • Assistant role: for content generated by the model

    • temperature: controls sampling algorithm, lower values produce more predictable outputs, and higher values do better for creative tasks

    • max_tokens: limits the size and cost of output

    • chat models make use of different types of chat message interfaces associated with each role

    • HumanMessage: message sent from human, user role

    • AIMessage: message sent from AI, assistant role

    • SystemMessage: message setting the instructions for AI, system role

    • ChatMessage: message for arbitrary setting of role

  • LLM
    • takes a string input, send it to the model provider, and returns the model prediction as output

    • LangChain interact with LLMs using function calling or traditional prompting

  • Prompt Template
    • allow to construct prompts with dynamic inputs

    • use ChatPromptTemplate for AI chat applications

    from langchain_core.prompts import PromptTemplate, ChatPromptTemplate
    
    template = PromptTemplate.from_template("""Answer the question based on the context
    below.
    
    Context: {context}
    Question: {question}
    Answer: """)
    
    # Using ChatPromptTemplate will associate with roles
    template = ChatPromptTemplate.from_messages([
        ('system', 'Answer the question base on the context below.'),
        ('human', 'Context: {context}'),
        ('human', 'Question: {question}')
    ])
    
    prompt = template.invoke({
        "context": "This is context",
        "question": "Question based on context?"
    })
    
  • Runnable
    • invoke(): single input to output

    • batch(): multiple inputs to multiple outputs

    • stream(): stream output from a single input as it’s produced

    • each method has asyncio equivalents

    • utilities for retries, fallbacks, schemas, and runtime configurability are available

    model.invoke('Hi there!')
    
    model.batch(['Hi there!', 'Bye!'])
    
    for token in model.stream('Bye!'):
        print(token)
    

LLM Output#

  • LLM can generate specific output format, such as JSON, XML, CSV

  • JSON Output
    • need to define schema using Pydantic, and include it in the prompt

    • schema is converted to JSONSchema object, and used to validate the output from LLM

    class AnswerWithJustification(BaseModel):
        '''An answer to the user's question along with justification for the answer. '''
        answer: str
        justification: str
    
    llm = ChatOpenAI(model="supported_model", temperature=0)
    
    structured_llm = llm.with_structured_output(AnswerWithJustification)
    msg = structured_llm.invoke("""Question""")
    
  • Output Parsers
    • classes to structure LLM responses

    • can be used to provide output format instructions in the prompt

    • textual output can be rendered to a more structured format

    from langchain_core.output_parsers import CommaSeparatedListOutputParser
    
    parser = CommaSeparatedListOutputParser()
    items = parser.invoke("apple, banana, cherry") # ['apple', 'banana', 'cherry']
    

Component Composition#

  • Imperative Composition
    • calling components directly, e.g. model.invoke()

    • Parallel execution: threads or coroutines in Python, and Promise.all in JavaScript

    • Streaming: using yield

    • Async execution: with async functions

    • useful for writing custom logic

    template = ChatPromptTemplate.from_messages([
        ('system', 'You are a helpful assistant.'),
        ('human', '{question}')
    ])
    
    model = ChatOpenAI(model="gpt-3.5-turbo")
    
    # combine components in a function
    @chain
    def chatbot(values):
        prompt = template.invoke(values)
        for token in model.stream(prompt):
            yield token
    
    for part in chatbot.stream({"question": "Question"}):
        print(part.content, end=' ')
    
  • Declarative Composition
    • using LCEL (LangChain Expression Language)

    • LCEL compositions are compiled to an optimised execution plan

    • Streaming, Parallel and Async executions are automatic

    • useful for assembling existing components with limited customisation

    template = ChatPromptTemplate.from_messages([
        ('system', 'You are a helpful assistant.'),
        ('human', '{question}')
    ])
    
    model = ChatOpenAI(model="gpt-3.5-turbo")
    
    # combine components with | operator
    chatbot = template | model
    
    for part in chatbot.stream({"question": "Question"}):
        print(part.content, end=' ')
    

back to top

RAG#

Data Indexing#

  • indexing is a technique to enhance LLM output by providing context from external sources

  • processing external data source, and storing embeddings in a vector store

  • embed a user’s query, retrieve similar documents, and passing them as context to the prompt

  • Retrieving: getting relevant embeddings and data stored in the vector store based on user’s query

  • Generation: synthesising original prompt with the retrieved relevant documents

  • Ingestion: converting documents into embeddings, and storing in vector store

  • Context Window: size of input and output tokens LLMs and embedding models can handle

  • Document Loader
    • can load files such as txt, csv, json, Markdown, and integrate with platforms such as Slack and Notion

    • can use WebBaseLoader to load HTML, or PyPDFLoader with pypdf package

    • loaded data is stored in Document class, and need to be split into chunks semantically

  • RecursiveCharacterTextSplitter
    • can split text based on a list of separators in order

    • default separator order: \n\n (paragraph), \n (line), space (word)

    • split paragraphs that are within the chunk size

    • for paragraphs longer than the chunk size, split by the next separator

    • each chunk is a Document with metadata of the original document

    • can use for others, such as code languages and Markdown, with relevant separators

    from langchain_text_splitters import Language, RecursiveCharacterTextSplitter
    from langchain_community.document_loaders import TextLoader
    
    loader = TextLoader("./main.py")
    docs = loader.load()
    
    splitter = RecursiveCharacterTextSplitter.from_language(
        language=Language.PYTHON,
        chunk_size=50,
        chunk_overlap=0
    )
    
    python_docs = splitter.split_documents(docs)
    
  • Embedding
    • converting text to numbers that cannot be used to recover original text

    • both text and numerals are stored since it is a lossy representation

    • Dense embeddings: low-dimensional vectors with mostly non-zero values

    • Sparse embeddings: high-dimensional vectors with mostly zero values

    • never combine embeddings from different models

    • words or sentences that are close in meaning should be closer in semantic dimension

    • cosine similarity is usually used for degree of similarity

    • Embeddings class interfaces with text embedding models, and generate vector representations

    • can embed documents and query

    • embedding multiple documents at the same time is more efficient

    from langchain_openai import OpenAIEmbeddings
    
    model = OpenAIEmbeddings()
    
    embeddings = model.embed_documents([
        "Hi there!",
        "Oh, hello!",
        "What's your name?",
        "My friends call me World",
        "Hello World!"
    ])
    
  • Vector Store
    • database to store vectors and perform complex calculations

    • handle unstructured data, including text and images

    • has capabilities such as multi-tenancy and metadata filtering

    • PostgreSQL can be used as vector store with pgvector extension

    • add_documents(): create embeddings for each document, and store them

    connection = 'PostgreSQL_Connection'
    
    db = PGVector.from_documents(docs, embeddings_model, connection=connection)
    
    db.similarity_search("query", k=N)
    
    db.add_documents(
        [
            Document(
                page_content="Content",
                metadata={"key": "value"}
            )
        ],
        ids=ids
    )
    
    db.delete(ids=['1'])
    
  • Indexing API
    • uses RecordManager to track document writes into the vector store

    • stores document hash, write time, and source ID

    • provides cleanup modes to delete existing documents

    • None: manual clean up of old content

    • Icremental & full: delete previous versions if content of the source document or derived ones change

    • Full: delete any documents not included in documents currently being indexed

    from langchain.indexes import SQLRecordManager, index
    
    collection_name = "my_docs"
    embeddings_model = OpenAIEmbeddings(model="text-embedding-3-small")
    namespace = "my_docs_namespace"
    
    vectorstore = PGVector(
        embeddings=embeddings_model,
        collection_name=collection_name,
        connection=connection,
        use_jsonb=True
    )
    
    record_manager = SQLRecordManager(
        namespace,
        db_url=connection
    )
    
    record_manager.create_schema()
    
    docs = [
        Document(
            page_content="content 1",
            metadata={"id": 1, "source": "source_1.txt"}
        ),
        Document(
            page_content="content 2",
            metadata={"id": 2, "source": "source_2.txt"}
        )
    ]
    
    index_1 = index(
        docs,
        record_manager,
        vectorstore,
        cleanup="incremental",
        source_id_key="source"
    )
    
    print("Index attempt 1: ", index_1)
    
    index_2 = index(
        docs,
        record_manager,
        vectorstore,
        cleanup="incremental",
        source_id_key="source"
    )
    
    # attempting to index again will not add the documents
    print("Index attempt 2: ", index_2)
    
    docs[0].page_content = "modified"
    
    index_3 = index(
        docs,
        record_manager,
        vectorstore,
        cleanup="incremental",
        source_id_key="source"
    )
    
    # new version is written, and all old versions sharing the same source are deleted
    print("Index attempt 3: ", index_3)
    

Indexing Optimisations#

  • MultiVectorRetriever
    • decouple documents to use for answer synthesis

    • e.g. in a document of text and tables, embed summaries of table elements with an id reference to the full raw table, which is stored in a separate Docstore

    • enables to provide the model with full context to answer user’s question

    from langchain_core.output_parsers import StrOutputParser
    from langchain_core.prompts import ChatPromptTemplate
    from langchain.storage import InMemoryStore
    from langchain_postgres import PGVector
    from langchain.retrievers.multi_vector import MultiVectorRetriever
    
    # load the document, split, create embeddings and LLM model
    
    prompt_text = "Summarize the following document:\n\n{doc}"
    
    prompt = ChatPromptTemplate.from_template(prompt_text)
    
    summarise_chain = {
        "doc": lambda x: x.page_content} | prompt | llm | StrOutputParser()
    
    summaries = summarise_chain.batch(chunks, {"max_concurrency": 5})
    
    vectorstore = PGVector(
        embeddings=embeddings_model,
        collection_name=collection_name,
        connection=connection,
        use_jsonb=True
    )
    
    # for parent documents
    store = InMemoryStore()
    id_key = "doc_id"
    
    retriever = MultiVectorRetriever(
        vectorstore=vectorstore,
        docstore=store,
        id_key=id_key
    )
    
    doc_ids = [str(uuid.uuid4()) for _ in chunks]
    
    summary_docs = [
        Document(page_content=s, metadata={id_key: doc_ids[i]})
        for i, s in enumerate(summaries)
    ]
    
    retriever.vectorstore.add_documents(summary_docs)
    
    # store the original documents, linked to summaries via doc_ids
    retriever.docstore.mset(list(zip(doc_ids, chunks)))
    
    # vector store retrieves the summaries
    sub_docs = retriever.vectorstore.similarity_search(
        "topic", k=2)
    
    # retriever return the larger source document chunks
    retrieved_docs = retriever.invoke("topic")
    
  • RAPTOR
    • Recursive Abstractive Processing for Tree-Organised Retrieval

    • creating document summaries for higher-level concepts, embedding and clustering them and summarising each cluster

    • recursively done to produce a tree of higher-level summaries

    • then the summaries and initial documents are indexed together

  • ColBERT
    • effective embeddings approach for better retrieval

    • generate contextual embeddings for each token in the document and query

    • calculate and score similarity between each query token and all document tokens

    • sum the max similarity score of each query embedding to any of the document embeddings to get a score for each document

Query Transformation#

  • modifying user input to be more or less abstract to generate accurate LLM output

  • Rewrite-Retrieve-Read
    • prompts the LLM to rewrite the user’s query before performing retrieval

    • remove irrelevant information in the query with the help of LLM

    • but will add additional latency in the chain due to more LLM calls

    rewrite_prompt = ChatPromptTemplate.from_template("""
    Provide a better search query for web search engine to answer the given
    question, end the queries with '**'. Question: {x} Answer:
    """)
    
    def parse_rewriter_output(message):
        return message.content.strip('"').strip("**")
    
    rewriter = rewrite_prompt | llm | parse_rewriter_output
    
    @chain
    def qa_rrr(input):
        new_query = rewriter.invoke(input)
        docs = retriever.invoke(new_query)
        formatted = prompt.invoke({"context": docs, "question": input})
        answer = llm.invoke(formatted)
        return answer
    
    qa_rrr.invoke("Query with irrelevant information")
    
  • Multi-Query Retrieval
    • tell LLM to generate multiple queries based on the user’s initial one

    • each query is retrieved in parallel and inserted as prompt context for final output

    • useful when a single question may rely on multiple perspectives for an answer

    • should deduplicate documents as single retriever is used with multiple queries

    perspectives_prompt = ChatPromptTemplate.from_template("""
    You are an AI language model assistant. Your task is to generate five
    different versions of the given user question to retrieve relvant documents
    from a vector database. By generating multiple perspectives on the user
    question, your goal is to help the user overcome come of the limitations of
    the distance-based similarity search. Provide these alternative questions
    separated by newlines. Original question: {question}
    """)
    
    def parse_queries_output(message):
        return message.content.split('\n')
    
    query_gen = perspectives_prompt | llm | parse_queries_output
    
    def get_unique_union(document_lists):
        deduped_docs = {
            doc.page_content: doc
            for sublist in document_lists for doc in sublist
        }
    
        return list(deduped_docs.values())
    
    retrieval_chain = query_gen | retriever.batch | get_unique_union
    
    @chain
    def multi_query_qa(input):
        docs = retrieval_chain.invoke(input)
        formatted = prompt.invoke({"context": docs, "question": input})
        ans = llm.invoke(formatted)
        return ans
    
    multi_query_qa.invoke("Question")
    
  • RAG-Fusion
    • similar to the Multi-Query retrieval

    • retrieved documents are re-ranked at the final step with RRF (Reciprocal Rank Fusion) algorithm, pulling the most relevant documents to the top

    • RRF is ideal for combining results from queries with different scales or distributions of scores

    # def multi_query_qa()
    
    prompt_rag_fusion = ChatPromptTemplate.from_template("""
    You are a helpful assistant that generates multiple search queries based on
    a single input query.\n
    Generate multiple search queries related to: {question} \n
    Output (4 queries):
    """)
    
    query_gen = prompt_rag_fusion | llm | parse_queries_output
    
    retrieval_chain = query_gen | retriever.batch | reciprocal_rank_fusion
    
    multi_query_qa.invoke("Question")
    
    def reciprocal_rank_fusion(results: list[list], k=60):
        fused_scores = {}
        documents = {}
    
        for docs in results:
            for rank, doc in enumerate(docs):
                doc_str = doc.page_content
                if doc_str not in fused_scores:
                    fused_scores[doc_str] = 0
                    documents[doc_str] = doc
    
                fused_scores[doc_str] += 1 / (rank + k)
    
        reranked_doc_strs = sorted(
            fused_scores, key=lambda d: fused_scores[d], reverse=True)
    
        return [documents[doc_str] for doc_str in reranked_doc_strs]
    
  • HyDE
    • Hypothetical Document Embeddings

    • create hypothetical document based on user’s query, embed it, and retrieve relevant documents based on vector similarity

    prompt_hyde = ChatPromptTemplate.from_template("""
    Please write a passage to answer the question.\n
    Question: {question} \n
    Passage:
    """)
    
    prompt = ChatPromptTemplate.from_template("""
    Answer the following question based on this context:
    
    {context}
    
    Question: {question}
    """)
    
    generate_doc = prompt | llm | StrOutputParser()
    
    retrieval_chain = generate_doc | retriever
    
    @chain
    def qa(input):
        docs = retrieval_chain.invoke(input)
        formatted = prompt.invoke({"context": docs, "question": input})
        answer = llm.invoke(formatted)
        return answer
    
    qa.invoke("Question")
    

Query Routing#

  • to forward user’s query to the relevant data source

  • Logical Routing
    • let LLM decide which data source to apply based on the query

    • function-calling models are used to help classify each query

    • need to define a schema that the model can use to generate arguments of a function based on the query

    • extracted data source can be passed into other functions for additional logic

    • suitable when a defined list of data sources is available

    from pydantic import BaseModel, Field
    from typing import Literal
    
    class RouteQuery(BaseModel):
        datasource: Literal["source_1", "source_2"] = Field(
            ...,
            description="""Given a user question, choose which datasource would be
            most relevant for answering their question
            """)
    
    def choose_route(result):
        if "source_1" in result.datasource.lower():
            return "chain for source_1"
        else:
            return "chain for source_2"
    
    structured_llm = llm.with_structured_output(RouteQuery)
    
    system = """You are an expert at routing a user question to the appropriate
    data source.
    
    Based on the programming language the question is referring to, route it to
    the relevant data source.
    """
    
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system),
            ("human", "{question}")
        ]
    )
    
    router = prompt | structured_llm
    
    question = "Question"
    
    # chaining for additional logic
    full_chain = router | RunnableLambda(choose_route)
    
    result = full_chain.invoke({"question": question})
    
  • Semantic Routing
    • embedding various prompts of various data sources with the query, and doing vector similarity search for the most similar prompt

    from langchain_core.prompts import PromptTemplate
    from langchain.utils.math import cosine_similarity
    
    template_1 = """Template 1
    Here is a question:
    {query}
    """
    
    template_2 = """Template 2
    Here is a question:
    {query}
    """
    
    prompt_templates = [template_1, template_2]
    prompt_embeddings = embedding_model.embed_documents(prompt_templates)
    
    @chain
    def prompt_router(query):
        query_embedding = embedding_model.embed_query(query)
        similarity = cosine_similarity([query_embedding], prompt_embeddings)[0]
        most_similar = prompt_templates[similarity.argmax()]
        return PromptTemplate.from_template(most_similar)
    
    semantic_router = (
        prompt_router
        | llm
        | StrOutputParser()
    )
    
    semantic_router.invoke("Question")
    

Query Construction#

  • convert natural language query into language of database or data source

  • Text-to-Metadata Filter
    • can attach metadata key-value pairs to vectors in an index during embedding process

    • filter expressions will be used during query

    • SelfQueryRetriever uses LLM to extract and execute relevant metadata filters based on user’s query and predefined metadata schema

    • retriever will send query generation prompt, parse metadata filter and rewritten query, convert the metadata filter for vector store, and run similarity search against the vector store

    from langchain.chains.query_constructor.schema import AttributeInfo
    from langchain.retrievers.self_query.base import SelfQueryRetriever
    
    fields = [
        AttributeInfo(
            name="NAME",
            description="DESC",
            type="string or list[string]"
        ),
    ]
    
    description = "DESC"
    
    retriever = SelfQueryRetriever.from_llm(llm, db, description, fields)
    
    retriever.invoke("Question")
    
  • Text-to-SQL
    • Database description: provide LLM with accurate description of the database, such as CREATE TABLE description for each table with column names and types, and can also include example rows from the table

    • Few-shot examples: append standard static examples in the prompt to guide the agent on how it should build queries based on questions

    • always run queries with a user with read-only permissions

    • database user running the queries should have access only to the necessary tables

    • add a time-out to the queries to protect from expensive query

    from langchain_community.tools.sql_database.tool import QuerySQLDatabaseTool
    from langchain_community.utilities import SQLDatabase
    from langchain.chains.sql_database.query import create_sql_query_chain
    
    db = SQLDatabase.from_uri(connection)
    
    write_query = create_sql_query_chain(llm, db)
    execute_query = QuerySQLDatabaseTool(db=db)
    
    chain = write_query | execute_query
    chain.invoke('Question')
    

back to top

LangGraph#

Graph#

  • LangGraph is an open source library by LangChain

  • enable developers to implement multiactor, multistep, and stateful cognitive architectures called graphs

  • State: data received from outside, modified and produced by the app

  • Node: Python or JavaScript functions, receiving current state and returning updated state

  • Edge: connection between nodes, can be fixed path or conditional

  • need to define the state of the graph first

  • state keys without an annotation will be overwritten

  • without explicit instruction, execution is stopped when there’s no more nodes to run

  • graph is compiled into a runnable object

from typing import Annotated, TypedDict
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import END, START, StateGraph
from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list, add_messages]

builder = StateGraph(State)
llm = ChatOpenAI(model="gpt-3.5-turbo")

def chatbot(state: State):
    answer = llm.invoke(state["messages"])
    return {"messages": [answer]}

builder.add_node("chatbot", chatbot)

builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)

graph = builder.compile()

input = {"messages": [HumanMessage('hi!')]}

for chunk in graph.stream(input):
    print(chunk)

Memory#

  • LLMs are stateless, with prior prompt or model response is lost with a new response

  • including previous conversations and context in the final prompt can give memory

  • chat history can be stored as a list of messages, append recent messages after each turn, or append into prompt by inserting the messages into the prompt

  • appending chat history in the prompt have scalability issues

  • Checkpointer: storage adapter for in-memory, SQLite, Postgres, Redis, and MySQL

  • Thread: also called interaction, auto created when first used

from langgraph.checkpoint.memory import MemorySaver
from langchain_core.runnables.config import RunnableConfig

# stores the state at the end of each step
graph = builder.compile(checkpointer=MemorySaver())

thread_1 = RunnableConfig({"configurable": {"thread_id": "1"}})
result_1 = graph.invoke(
    {"messages": [HumanMessage("hi, my name is Jack!")]}, thread_1
)

result_2 = graph.invoke(
    {"messages": [HumanMessage("what is my name?")]}, thread_1)

Multiactor#

  • application with multiple actors needs a coordination layer to define actors, hand off work, and schedule execution of each actor

  • each actor should help update a single central state

  • with a single central state, a snapshot can be made, execution can be paused and human-in-the-loop control can be implemented

Chat History#

  • chat history messages should be in a format to generate accurate response from the model

  • Trimming Messages
    • limit the number of messages that are retrieved from history and appended to the prompt

    • ideal to load and store the most recent messages

    • trim_messages(): can specify how many tokens to keep or remove from chat history, and has different strategies

    from langchain_core.messages import (AIMessage, HumanMessage, SystemMessage,
                                         trim_messages)
    from langchain_openai import ChatOpenAI
    
    trimmer = trim_messages(
        max_tokens=65,
        strategy="last", # prioritise most recent
        token_counter=ChatOpenAI(model="gpt-4o"), # use tokeniser appropriate to that model
        include_system=True, # keep system message
        allow_partial=False, # to cut the last message's content to fit or not
        start_on="human" # never remove AIMessage without removing corresponding HumanMessage
    )
    
    messages = [
        SystemMessage(content="you're a good assistant"),
        HumanMessage(content="hi! i'm bob"),
        AIMessage(content="hi"),
        HumanMessage(content="i like vanilla ice cream"),
        AIMessage(content="nice"),
        HumanMessage(content="what's 2 + 2?"),
        AIMessage(content="4"),
        HumanMessage(content="thanks"),
        AIMessage(content="no problem!"),
        HumanMessage(content="having fun?"),
        AIMessage(content="yes"),
    ]
    
    trimmer.invoke(messages)
    
  • Filtering Messages
    • filter_messages(): filter by type, ID, or name

    • can also be composed with other components in a chain

    from langchain_core.messages import filter_messages
    
    messages = [
        SystemMessage(content="you're a good assistant", id="1"),
        HumanMessage(content="hi! i'm bob", id="2"),
        AIMessage(content="hi", id="3"),
        HumanMessage(content="i like vanilla ice cream", name="bob", id="4"),
        AIMessage(content="nice", id="5"),
        HumanMessage(content="what's 2 + 2?", name="alice", id="6"),
        AIMessage(content="4", id="7"),
        HumanMessage(content="thanks", name="alice", id="8"),
        AIMessage(content="no problem!", id="9"),
        HumanMessage(content="having fun?", name="bob", id="10"),
        AIMessage(content="yes", id="11"),
    ]
    
    filter_messages(messages, include_types="human")
    
    filter_ = filter_messages(messages, include_types=[
                    HumanMessage, AIMessage], exclude_ids=["3"])
    
    chain = filter_ | model
    
  • Merging Consecutive Messages
    • models such as Anthropic chat models do not support consecutive messages of the same type

    • merge_message_runs(): allows to merge consecutive messages of the same type

    • a list will be merged as a list

    • can also be composed with other components in a chain

    from langchain_core.messages import merge_message_runs
    
    messages = [
        SystemMessage(content="you're a good assistant"),
        SystemMessage(content="you always respond with a joke"),
        HumanMessage(
            [{"type": "text", "text": "hello"}]
        ),
        HumanMessage("world")
    ]
    
    merger_ = merge_message_runs(messages)
    
    # SystemMessage(content="you're a good assistant\nyou always respond with a joke"),
    # HumanMessage(content=[{"type": "text", "text": "hello"}, "world"]
    
    chain = merger_ | model
    

Subgraphs#

  • graphs that are used as part of another graph

  • to build multi-agent systems, reuse a set of nodes in multiple graphs, and let different teams to work on different parts of the graph

  • Direct Subgraph Call
    • adding a node that calls the subgraph directly to the parent

    • both should share state keys to communicate, and do not need to transform state

    • passing extra keys to the subgraph node will be ignored

    • extra keys from the subgraph will be ignored by the parent

    class State(TypedDict):
        foo: str    # shared with subgraph
    
    class SubgraphState(TypedDict):
        foo: str    # shared with parent
        bar: str
    
    def subgraph_node(state: SubgraphState):
        return {"foo": state["foo"] + "bar"}
    
    subgraph_builder = StateGraph(SubgraphState)
    subgraph_builder.add_node(subgraph_node)
    subgraph = subgraph_builder.compile()
    
    builder = StateGraph(State)
    builder.add_node("subgraph", subgraph)
    graph = builder.compile()
    
  • Function Subgraph Call
    • adding a node with a function that invokes the subgraph to the parent

    • both with different state schemas

    • function needs to transform parent state to the subgraph state before invoking the subgraph and transform the result back to the parent state before returning

    class State(TypedDict):
        foo: str
    
    class SubgraphState(TypedDict):
        bar: str
        baz: str
    
    def subgraph_node(state: SubgraphState):
        return {"bar": state["bar"] + "baz"}
    
    def node(state: State):
        response = subgraph.invoke({"bar": state["foo"]})
        return {"foo": response["bar"]}
    
    subgraph_builder = StateGraph(SubgraphState)
    subgraph_builder.add_node(subgraph_node)
    subgraph = subgraph_builder.compile()
    
    builder = StateGraph(State)
    builder.add_node(node)
    graph = builder.compile()
    

LangGraph Platform#

  • Data Models, Features

  • managed service to deploy and host LangGraph agents

  • horizontally scales task queues, servers, and a Postgres checkpointer for efficiency

  • allows collaboration of deploying and monitoring agentic AI apps

  • LangGraph Studio: to debug, edit and test agents visually, can share agent with team members

Data Models#

  • Assistants
    • configured instance of CompiledGraph

    • has instance-specific configuration and metadata

    • multiple assistants can reference the same graph, but have different configuration and metadata

  • Threads
    • contains state collection of a group of runs

    • checkpoint: state of a thread at particular time

    • state of the underlying graph of the assistant will be persisted to the thread

    • current and historical state can be retrieved

    • a thread needs to be created before executing a run to persist state

  • Runs
    • invocation of an assistant

    • each run can have its own input, configuration and metadata

    • can be executed on a thread

  • Cron Jobs
    • allow to run graphs on a schedule

    • user must specify schedule, assistant, and input

    • a new thread will be created and given the input to run

Features#

  • Streaming
    • streaming mode determines what data is streamed back to the client

    • Values: stream full state of the graph after each super-step is executed

    • Messages: stream complete messages and tokens, mostly for chat apps, and can only use this mode if graph contains a messages key

    • Updates: stream state updates of the graph after each node execution

    • Events: stream all events during graph execution, can be used to do token-by-token streaming for LLMs

    • Debug: stream debug events during graph execution

  • Human-in-the-loop
    • LangGraph Platform allows human intervention to prevent unwanted outcomes

  • Double Texting
    • Reject: reject and does not allow double texting

    • Enqueue: complete the first run, and sends the new input as separate run

    • Interrupt: save and interrupt current execution, and continue to run with new input

    • Rollback: roll back all work and run with new input

  • Stateless Runs
    • take the input, create a thread, runs the agent without checkpoints, and clean the thread

    • stateless runs are retried while keeping memory intact

    • for background runs, entire run will be retried if the task worker dies halfway

  • Webhooks
    • LangGraph Platform supports completion webhooks

back to top

Cognitive Architectures#

Degree of Autonomy#

  • measure by evaluating how much of the app behaviour is determined by LLM

  • check if LLM has decided the output of a step, the next step to take, and what steps are available to take

LLM Call Architecture#

  • one LLM call only, useful when a large app make use of LLM

builder = StateGraph(State)

builder.add_node("chatbot", lambda state: chatbot(
    state, llm))
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)

Chain Architecture#

  • multiple LLM calls in a predefined sequence, also called flow engineering

builder = StateGraph(State, input_schema=Input, output_schema=Output)

builder.add_node("generate_sql", lambda state: generate_sql(
    state, llm_low_temp, generate_prompt))  # type: ignore
builder.add_node("explain_sql", lambda state: explain_sql(
    state, llm_high_temp, explain_prompt))  # type: ignore

builder.add_edge(START, "generate_sql")
builder.add_edge("generate_sql", "explain_sql")
builder.add_edge("explain_sql", END)

Router Architecture#

  • using LLM to define the sequence of steps to take

def router_node(state: State, llm, prompt) -> State:
    user_message = HumanMessage(state["user_query"])
    messages = [prompt, *state["messages"], user_message]
    res = llm.invoke(messages)
    return {
        "domain": res.content,
        "messages": [user_message, res]
    }

def pick_retriever(state: State) -> Literal["retrieve_medical_records",
                                            "retrieve_insurance_faqs"]:
    if state["domain"] == "records":
        return "retrieve_medical_records"
    else:
        return "retrieve_insurance_faqs"

builder = StateGraph(State, input_schema=Input, output_schema=Output)

builder.add_node("router", lambda state: router_node(
    state, llm_low_temp, router_prompt))

builder.add_node("retrieve_medical_records",
                 lambda state: retrieve_medical_records(
                     state, medical_records_retriever))

builder.add_node("retrieve_insurance_faqs",
                 lambda state: retrieve_insurance_faqs(
                     state, insurance_faqs_retriever))

builder.add_node("generate_answer",
                 lambda state: generate_answer(
                     state, llm_high_temp,
                     medical_records_prompt, insurance_faqs_prompt))

builder.add_edge(START, "router")
builder.add_conditional_edges("router", pick_retriever)
builder.add_edge("retrieve_medical_records", "generate_answer")
builder.add_edge("retrieve_insurance_faqs", "generate_answer")
builder.add_edge("generate_answer", END)

Agent Architecture#

  • Standard Agent, Always Tool Calling First, Managing Multiple Tools, Reflection, Multi-agent

  • Agent: something that acts

  • uses an LLM to pick from one or more possible courses of action, given context of current or desired next state

  • implemented by combining Tool Calling and Chain-of-Thought prompting techniques

  • LLM-driven Loop: plan actions and execute, LLM will decide when to stop looping

  • use a conditional edge to implement a loop as it can end the graph

Standard Agent#

  • LLM is always called first to decide a tool, adapting the behaviour to each user query

  • but flexibility can also cause unpredictability

import ast
from typing import Annotated, TypedDict
from langchain_community.tools import DuckDuckGoSearchRun
from langchain_core.messages import HumanMessage
from langchain_core.runnables import Runnable
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.graph import START, StateGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition

class State(TypedDict):
    messages: Annotated[list, add_messages]


@tool
def calculator(query: str) -> str:
    """A simple calculator tool, Input should be a mathematical expression."""
    return ast.literal_eval(query)


def llm_node(state: State, llm) -> State:
    res = llm.invoke(state["messages"])
    return {"messages": res}

search = DuckDuckGoSearchRun()
tools = [search, calculator]

llm: Runnable = ChatOpenAI(
    model="gpt-4.1-mini", temperature=0).bind_tools(tools)

builder = StateGraph(State)

builder.add_node("llm", lambda state: llm_node(state, llm))  # type: ignore
builder.add_node("tools", ToolNode(tools))

builder.add_edge(START, "llm")
builder.add_conditional_edges("llm", tools_condition)
builder.add_edge("tools", "llm")

graph = builder.compile()

input: State = {
    "messages": [
        HumanMessage("""Question""")
    ]
}

for c in graph.stream(input):
    print(c)

Always Tool Calling First#

  • having a clear rule that certain tool should always be called first

  • can reduce overall latency, and prevent erroneous LLM decision

  • but it can also make worse if there is no clear rule

# does not call LLM, only creates a tool for the search tool
def first_llm(state: State) -> State:
    query = state["messages"][-1].content
    search_tool_call = ToolCall(name="duckduckgo_search", args={
                                "query": query}, id=uuid4().hex)
    return {
        "messages": AIMessage(content="", tool_calls=[search_tool_call])
    }

builder.add_node("first_llm",
                 lambda state: first_llm(state))  # type: ignore
builder.add_node("llm", lambda state: llm_node(state, llm))  # type: ignore
builder.add_node("tools", ToolNode(tools))

builder.add_edge(START, "first_llm")
builder.add_edge("first_llm", "tools")
builder.add_conditional_edges("llm", tools_condition)
builder.add_edge("tools", "llm")

Managing Multiple Tools#

  • LLMs struggle to choose the right one when given many tools

  • can use a RAG step to preselect the most relevant tools for current query

  • giving LLM only a subset of tools can reduce cost, but RAG step adds latency

def llm_node(state: State, llm, tools) -> State:
    selected_tools = [
        tool for tool in tools if tool.name in state["selected_tools"]]
    res = llm.bind_tools(selected_tools).invoke(state["messages"])
    return {"messages": res}


def select_tools(state: State, tools_retriever) -> State:
    query = state["messages"][-1].content
    tool_docs = tools_retriever.invoke(query)
    return {
        "selected_tools": [doc.metadata["name"] for doc in tool_docs]
    }

embeddings = OpenAIEmbeddings()
llm: Runnable = ChatOpenAI(model="gpt-4.1-mini", temperature=0)

tools_retriever = InMemoryVectorStore.from_documents(
    [Document(tool.description, metadata={
              "name": tool.name}) for tool in tools],
    embeddings
).as_retriever()

builder = StateGraph(State)

builder.add_node("select_tools", lambda state: select_tools(
    state, tools_retriever))  # type: ignore
builder.add_node("llm", lambda state: llm_node(
    state, llm, tools))  # type: ignore
builder.add_node("tools", ToolNode(tools))

builder.add_edge(START, "select_tools")
builder.add_edge("select_tools", "llm")
builder.add_conditional_edges("llm", tools_condition)
builder.add_edge("tools", "llm")

Reflection#

  • also called self-critique

  • allowing LLM to analyse past output, including past reflections, and refine it

  • need to have a loop between a creator prompt and a reviser prompt

  • can be combined with other prompting techniques

  • always cost higher latency, but likely to increase the quality of final output

def generate(state: State, llm, prompt) -> State:
    ans = llm.invoke([prompt] + state["messages"])
    return {"messages": [ans]}


def reflect(state: State, llm, prompt) -> State:
    # invert the messages
    cls_map = {AIMessage: HumanMessage, HumanMessage: AIMessage}
    translated = [prompt, state["messages"][0]] + [
        cls_map[msg.__class__](content=msg.content) # calling a constructor
        for msg in state["messages"][1:]
    ]
    ans = llm.invoke(translated)
    # treat output as human feedback for generator
    return {"messages": [HumanMessage(content=ans.content)]}


def should_continue(state: State):
    if len(state["messages"]) > 6:
        return END
    else:
        return "reflect"

builder.add_node("generate", lambda state: generate(
    state, llm, generate_prompt))
builder.add_node("reflect", lambda state: reflect(
    state, llm, reflection_prompt))

builder.add_edge(START, "generate")
builder.add_conditional_edges("generate", should_continue, {
    "reflect": "reflect" # only explicit mapping shows on graph image
})
builder.add_edge("reflect", "generate")

Multi-agent#

  • composed of multiple smaller, independent agents

  • prevents an agent with multiple tools to make poor decisions

  • agents can be a simple prompt, LLM call or complex as ReAct agent

  • Network Strategy
    • agents can communicate, and any agent can decide which to call next

  • Hierarchical Strategy
    • system with a supervisor of supervisors

    • for more complex control flows

  • Custom Multi-Agent Workflow
    • each communicate with only a subset of agents

    • parts of the flow are deterministic

    • only selected agents can decide which others to call next

  • Supervisor Strategy
    • each agents communicates with the supervisor agent

    • supervisor decides which agent to call next

    • supervisor agent can be an LLM call with tools

    • subagent can be its own graph with internal state and only outputs summary of its work

    • can make each subagent to decide to return output directly to user or not

back to top

LLM Patterns#

Structured Output#

  • LLM should produce output in a predefined format

  • different models implement different strategies

  • lower temperature is a good fit as it reduces the chance of LLM to produce invalid output

  • Prompting
    • asking LLM to return output in desired format

    • not guaranteed for output to be in the format

  • Tool Calling
    • available for LLMs fine-tuned to pick from a list of output schemas

    • need to give LLM a name, description, and schema for desired output format

  • JSON Mode
    • available in LLMs enforced to output a valid JSON document

    class Joke(BaseModel):
        setup: str = Field(description="The setup of the joke")
        punchline: str = Field(description="The punchline to the joke")
    
    llm = ChatOpenAI(model="gpt-4.1-mini")
    llm = llm.with_structured_output(Joke)
    
    llm.invoke("Tell me a joke about cats")
    

Streaming Output#

  • higher latency is acceptable if there is progress/intermediate output while the app is still running

  • Stream Modes in LangGraph
    • updates: default mode

    • values: yield current state of the graph every time it changes, each set of nodes finishes executing

    • debug: yields detailed events every time a graph changes

    • checkpoint event: when a new checkpoint of current state is saved to the database

    • task event: when a node is about to start running

    • task_result events: when a node finishes running

    • stream modes can be combined

    for c in graph.stream(input, stream_mode="updates"):
        print(c)
    
  • Streaming Token-by-Token
    • useful for apps such as interactive chatbot

    output = app.astream_events(input, version="v2")
    
    async for event in output:
        if event["event"] == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                print(content)
    

Human in the Loop#

  • higher-agency architectures can have human intervention of interrupting, approving, forking or undoing

  • store the state at the end of each step and combine the new input with the previous state by using check pointer in graph

  • the graph remembering the previous state is the key to human-in-the-loop

  • Control Modes: interrupt, authorise, resume, restart, edit state, fork

  • combine different control modes to get better applications

  • Interrupt
    • using an event or signal allows to control interruption from outside of the running app

    graph = builder.compile(checkpointer=MemorySaver())
    
    event = asyncio.Event()
    
    config = {"configurable": {"thread_id": "1"}}
    
    async with aclosing(graph.astream(input, config) as stream):
        async for chunk in stream:
            if event.is_set():
                break
            else:
                pass
    
    event.set()
    
  • Authorise
    • defined to give control to the user every time a specific node is about to be called, usually used for tool confirmation

    output = graph.astream(input, config, interrupt_before=["tools"])
    async for c in output:
        # process output
    
  • Resume
    • invoke the graph with null input to continue processing previous non-null input

    output = graph.astream(None, config, interrupt_before=["tools"])
    async for c in output:
        # process output
    
  • Restart
    • invoke with new input to start a graph from the first node

    • will keep the current state, and merge it with new input

    • just change thread_id to start a new interaction from a blank state

    config = {"configurable": {"thread_id": "1"}}
    output = graph.astream(new_input, config)
    async for c in output:
        # process output
    
  • Edit State
    • update the state of the graph before resuming

    • inspect the state first and update accordingly

    • will create a new checkpoint with the update

    state = graph.get_state(config)
    update = {}
    graph.update_state(config, update)
    
  • Fork
    • use the past states to get alternative answer

    history = [
        state for state in
        graph.get_state_history(config)
    ]
    
    graph.invoke(None, history[2].config)
    

Double Texting Modes#

  • LLM may get new input before the previous one is processed

  • also called multitasking LLMs

  • Refuse
    • simplest strategy to reject concurrent inputs

    • concurrency management is handed off to the caller

  • Handle Independently
    • treat new inputs as independent invocations, creating new threads and producing output

    • user will receive as separate invocations, but can be scaled to large sizes

    • e.g. chatbot interacting with two different users concurrently

  • Queue
    • inputs are queued and handled when current one is finished

    • can receive multiple concurrent requests, and will be handled sequentially

    • may take time to process the queue, which may grow unbounded and inputs can be stale

    • not useful when new inputs depend on previous answers

  • Interrupt
    • stop current one and restart with the new input

    • previous input can be completely ignored

    • the completed state is kept but discard any pending state updates

    • keep the last completed step, along with current in-progress one

    • wait for current node to finish, but not the subsequent ones, save and interrupt

    • new input is handled quickly, reducing latency and stale outputs

    • the state needs to be designed to be stored partially

    • can have unpredictable final result as incomplete progress context might be used for the new input

  • Fork & Merge
    • handle new input in parallel, forking the state of the thread, and merge the final states

    • state needs to be designed to be merged without conflicts

    • e.g., use conflict-free replicated data types (CRDTs), conflict resolution algorithms or manually resolve conflicts

back to top

Deployment#

back to top

Prompting Basics#

  • LLMs, Zero-Shot Prompting, Few-Shot Prompting

  • prompts help the model understand context and generate relevant answers to queries

  • prompt engineering: adapting an existing LLM for specific task

  • Temperature: controls the randomness of LLM output

  • prompting techniques are most useful when combined with others

LLMs#

  • Fine-Tuned
    • created by taking base LLMs, and further train on a proprietary dataset for a specific task

  • Instruction-Tuned
    • fine-tuned with task-specific datasets and RLHF

  • Dialogue-Tuned
    • enhanced instruction-tuned LLMs

    • uses dialogue dataset and chat format

    • text is divided into parts associated with a role

    • System role: for instructions and framing the task

    • User role: actual task or question

    • Assistant role: for outputs of the model

Zero-Shot Prompting#

  • simply telling the LLM to perform the desired task

  • usually work for simple questions

  • will need to iterate on prompts and responses to get a reliable system

  • Chain-of-Thought
    • instructing the model to take time to think step by step

    • prepending the prompt with instructions for the LLM to describe how it could arrive at the answer

  • Retrieval-Augmented Generation
    • RAG: finding relevant context, and including them in the prompt

    • should be combined with CoT

  • Tool Calling
    • prepending the prompt with a list of external functions LLM can use

    • developer should parse the output, and call functions that the LLM wants to use

Few-Shot Prompting#

  • providing LLM with examples of other questions and correct answers

  • enables LLM to learn how to perform a new task without going through additional training or fine-tuning

  • less powerful than fine-tuning, but more flexible and can do it at query time

  • Static
    • include a predetermined list of a small number of examples in the prompt

  • Dynamic
    • from a dataset of many examples, choose the most relevant ones for each new query

back to top