LangChain#
Basics#
LangChain provides abstractions for each major prompting technique, utilising Python and JavaScript for wrappers
has integrations with commercial and open source LLM providers
prompt templates enable to reuse prompts more than once, and store them in the LangChain Hub
Interfaces#
- Chat Model
LLM providers like OpenAI differentiate messages sent to and from the model into roles
System role: for instructions the model should use to answer a user question
User role: for user’s query and other content produced by the user
Assistant role: for content generated by the model
temperature
: controls sampling algorithm, lower values produce more predictable outputs, and higher values do better for creative tasks
max_tokens
: limits the size and cost of outputchat models make use of different types of chat message interfaces associated with each role
HumanMessage
: message sent from human, user role
AIMessage
: message sent from AI, assistant role
SystemMessage
: message setting the instructions for AI, system role
ChatMessage
: message for arbitrary setting of role
- LLM
takes a string input, send it to the model provider, and returns the model prediction as output
LangChain interact with LLMs using function calling or traditional prompting
- Prompt Template
allow to construct prompts with dynamic inputs
use
ChatPromptTemplate
for AI chat applicationsfrom langchain_core.prompts import PromptTemplate, ChatPromptTemplate template = PromptTemplate.from_template("""Answer the question based on the context below. Context: {context} Question: {question} Answer: """) # Using ChatPromptTemplate will associate with roles template = ChatPromptTemplate.from_messages([ ('system', 'Answer the question base on the context below.'), ('human', 'Context: {context}'), ('human', 'Question: {question}') ]) prompt = template.invoke({ "context": "This is context", "question": "Question based on context?" })
- Runnable
invoke()
: single input to output
batch()
: multiple inputs to multiple outputs
stream()
: stream output from a single input as it’s producedeach method has
asyncio
equivalentsutilities for retries, fallbacks, schemas, and runtime configurability are available
model.invoke('Hi there!') model.batch(['Hi there!', 'Bye!']) for token in model.stream('Bye!'): print(token)
LLM Output#
LLM can generate specific output format, such as JSON, XML, CSV
- JSON Output
need to define schema using Pydantic, and include it in the prompt
schema is converted to
JSONSchema
object, and used to validate the output from LLMclass AnswerWithJustification(BaseModel): '''An answer to the user's question along with justification for the answer. ''' answer: str justification: str llm = ChatOpenAI(model="supported_model", temperature=0) structured_llm = llm.with_structured_output(AnswerWithJustification) msg = structured_llm.invoke("""Question""")
- Output Parsers
classes to structure LLM responses
can be used to provide output format instructions in the prompt
textual output can be rendered to a more structured format
from langchain_core.output_parsers import CommaSeparatedListOutputParser parser = CommaSeparatedListOutputParser() items = parser.invoke("apple, banana, cherry") # ['apple', 'banana', 'cherry']
Component Composition#
- Imperative Composition
calling components directly, e.g.
model.invoke()
Parallel execution: threads or coroutines in Python, and
Promise.all
in JavaScriptStreaming: using
yield
Async execution: with async functions
useful for writing custom logic
template = ChatPromptTemplate.from_messages([ ('system', 'You are a helpful assistant.'), ('human', '{question}') ]) model = ChatOpenAI(model="gpt-3.5-turbo") # combine components in a function @chain def chatbot(values): prompt = template.invoke(values) for token in model.stream(prompt): yield token for part in chatbot.stream({"question": "Question"}): print(part.content, end=' ')
- Declarative Composition
using LCEL (LangChain Expression Language)
LCEL compositions are compiled to an optimised execution plan
Streaming, Parallel and Async executions are automatic
useful for assembling existing components with limited customisation
template = ChatPromptTemplate.from_messages([ ('system', 'You are a helpful assistant.'), ('human', '{question}') ]) model = ChatOpenAI(model="gpt-3.5-turbo") # combine components with | operator chatbot = template | model for part in chatbot.stream({"question": "Question"}): print(part.content, end=' ')
RAG#
Data Indexing#
indexing is a technique to enhance LLM output by providing context from external sources
processing external data source, and storing embeddings in a vector store
embed a user’s query, retrieve similar documents, and passing them as context to the prompt
Retrieving: getting relevant embeddings and data stored in the vector store based on user’s query
Generation: synthesising original prompt with the retrieved relevant documents
Ingestion: converting documents into embeddings, and storing in vector store
Context Window: size of input and output tokens LLMs and embedding models can handle
- Document Loader
can load files such as txt, csv, json, Markdown, and integrate with platforms such as Slack and Notion
can use
WebBaseLoader
to load HTML, orPyPDFLoader
withpypdf
packageloaded data is stored in
Document
class, and need to be split into chunks semantically
RecursiveCharacterTextSplitter
can split text based on a list of separators in order
default separator order:
\n\n
(paragraph),\n
(line), space (word)split paragraphs that are within the chunk size
for paragraphs longer than the chunk size, split by the next separator
each chunk is a
Document
with metadata of the original documentcan use for others, such as code languages and Markdown, with relevant separators
from langchain_text_splitters import Language, RecursiveCharacterTextSplitter from langchain_community.document_loaders import TextLoader loader = TextLoader("./main.py") docs = loader.load() splitter = RecursiveCharacterTextSplitter.from_language( language=Language.PYTHON, chunk_size=50, chunk_overlap=0 ) python_docs = splitter.split_documents(docs)
- Embedding
converting text to numbers that cannot be used to recover original text
both text and numerals are stored since it is a lossy representation
Dense embeddings: low-dimensional vectors with mostly non-zero values
Sparse embeddings: high-dimensional vectors with mostly zero values
never combine embeddings from different models
words or sentences that are close in meaning should be closer in semantic dimension
cosine similarity is usually used for degree of similarity
Embeddings
class interfaces with text embedding models, and generate vector representationscan embed documents and query
embedding multiple documents at the same time is more efficient
from langchain_openai import OpenAIEmbeddings model = OpenAIEmbeddings() embeddings = model.embed_documents([ "Hi there!", "Oh, hello!", "What's your name?", "My friends call me World", "Hello World!" ])
- Vector Store
database to store vectors and perform complex calculations
handle unstructured data, including text and images
has capabilities such as multi-tenancy and metadata filtering
PostgreSQL can be used as vector store with
pgvector
extension
add_documents()
: create embeddings for each document, and store themconnection = 'PostgreSQL_Connection' db = PGVector.from_documents(docs, embeddings_model, connection=connection) db.similarity_search("query", k=N) db.add_documents( [ Document( page_content="Content", metadata={"key": "value"} ) ], ids=ids ) db.delete(ids=['1'])
- Indexing API
uses
RecordManager
to track document writes into the vector storestores document hash, write time, and source ID
provides cleanup modes to delete existing documents
None
: manual clean up of old content
Icremental
&full
: delete previous versions if content of the source document or derived ones change
Full
: delete any documents not included in documents currently being indexedfrom langchain.indexes import SQLRecordManager, index collection_name = "my_docs" embeddings_model = OpenAIEmbeddings(model="text-embedding-3-small") namespace = "my_docs_namespace" vectorstore = PGVector( embeddings=embeddings_model, collection_name=collection_name, connection=connection, use_jsonb=True ) record_manager = SQLRecordManager( namespace, db_url=connection ) record_manager.create_schema() docs = [ Document( page_content="content 1", metadata={"id": 1, "source": "source_1.txt"} ), Document( page_content="content 2", metadata={"id": 2, "source": "source_2.txt"} ) ] index_1 = index( docs, record_manager, vectorstore, cleanup="incremental", source_id_key="source" ) print("Index attempt 1: ", index_1) index_2 = index( docs, record_manager, vectorstore, cleanup="incremental", source_id_key="source" ) # attempting to index again will not add the documents print("Index attempt 2: ", index_2) docs[0].page_content = "modified" index_3 = index( docs, record_manager, vectorstore, cleanup="incremental", source_id_key="source" ) # new version is written, and all old versions sharing the same source are deleted print("Index attempt 3: ", index_3)
Indexing Optimisations#
MultiVectorRetriever
decouple documents to use for answer synthesis
e.g. in a document of text and tables, embed summaries of table elements with an id reference to the full raw table, which is stored in a separate Docstore
enables to provide the model with full context to answer user’s question
from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain.storage import InMemoryStore from langchain_postgres import PGVector from langchain.retrievers.multi_vector import MultiVectorRetriever # load the document, split, create embeddings and LLM model prompt_text = "Summarize the following document:\n\n{doc}" prompt = ChatPromptTemplate.from_template(prompt_text) summarise_chain = { "doc": lambda x: x.page_content} | prompt | llm | StrOutputParser() summaries = summarise_chain.batch(chunks, {"max_concurrency": 5}) vectorstore = PGVector( embeddings=embeddings_model, collection_name=collection_name, connection=connection, use_jsonb=True ) # for parent documents store = InMemoryStore() id_key = "doc_id" retriever = MultiVectorRetriever( vectorstore=vectorstore, docstore=store, id_key=id_key ) doc_ids = [str(uuid.uuid4()) for _ in chunks] summary_docs = [ Document(page_content=s, metadata={id_key: doc_ids[i]}) for i, s in enumerate(summaries) ] retriever.vectorstore.add_documents(summary_docs) # store the original documents, linked to summaries via doc_ids retriever.docstore.mset(list(zip(doc_ids, chunks))) # vector store retrieves the summaries sub_docs = retriever.vectorstore.similarity_search( "topic", k=2) # retriever return the larger source document chunks retrieved_docs = retriever.invoke("topic")
- RAPTOR
Recursive Abstractive Processing for Tree-Organised Retrieval
creating document summaries for higher-level concepts, embedding and clustering them and summarising each cluster
recursively done to produce a tree of higher-level summaries
then the summaries and initial documents are indexed together
- ColBERT
effective embeddings approach for better retrieval
generate contextual embeddings for each token in the document and query
calculate and score similarity between each query token and all document tokens
sum the max similarity score of each query embedding to any of the document embeddings to get a score for each document
Query Transformation#
modifying user input to be more or less abstract to generate accurate LLM output
- Rewrite-Retrieve-Read
prompts the LLM to rewrite the user’s query before performing retrieval
remove irrelevant information in the query with the help of LLM
but will add additional latency in the chain due to more LLM calls
rewrite_prompt = ChatPromptTemplate.from_template(""" Provide a better search query for web search engine to answer the given question, end the queries with '**'. Question: {x} Answer: """) def parse_rewriter_output(message): return message.content.strip('"').strip("**") rewriter = rewrite_prompt | llm | parse_rewriter_output @chain def qa_rrr(input): new_query = rewriter.invoke(input) docs = retriever.invoke(new_query) formatted = prompt.invoke({"context": docs, "question": input}) answer = llm.invoke(formatted) return answer qa_rrr.invoke("Query with irrelevant information")
- Multi-Query Retrieval
tell LLM to generate multiple queries based on the user’s initial one
each query is retrieved in parallel and inserted as prompt context for final output
useful when a single question may rely on multiple perspectives for an answer
should deduplicate documents as single retriever is used with multiple queries
perspectives_prompt = ChatPromptTemplate.from_template(""" You are an AI language model assistant. Your task is to generate five different versions of the given user question to retrieve relvant documents from a vector database. By generating multiple perspectives on the user question, your goal is to help the user overcome come of the limitations of the distance-based similarity search. Provide these alternative questions separated by newlines. Original question: {question} """) def parse_queries_output(message): return message.content.split('\n') query_gen = perspectives_prompt | llm | parse_queries_output def get_unique_union(document_lists): deduped_docs = { doc.page_content: doc for sublist in document_lists for doc in sublist } return list(deduped_docs.values()) retrieval_chain = query_gen | retriever.batch | get_unique_union @chain def multi_query_qa(input): docs = retrieval_chain.invoke(input) formatted = prompt.invoke({"context": docs, "question": input}) ans = llm.invoke(formatted) return ans multi_query_qa.invoke("Question")
- RAG-Fusion
similar to the Multi-Query retrieval
retrieved documents are re-ranked at the final step with RRF (Reciprocal Rank Fusion) algorithm, pulling the most relevant documents to the top
RRF is ideal for combining results from queries with different scales or distributions of scores
# def multi_query_qa() prompt_rag_fusion = ChatPromptTemplate.from_template(""" You are a helpful assistant that generates multiple search queries based on a single input query.\n Generate multiple search queries related to: {question} \n Output (4 queries): """) query_gen = prompt_rag_fusion | llm | parse_queries_output retrieval_chain = query_gen | retriever.batch | reciprocal_rank_fusion multi_query_qa.invoke("Question") def reciprocal_rank_fusion(results: list[list], k=60): fused_scores = {} documents = {} for docs in results: for rank, doc in enumerate(docs): doc_str = doc.page_content if doc_str not in fused_scores: fused_scores[doc_str] = 0 documents[doc_str] = doc fused_scores[doc_str] += 1 / (rank + k) reranked_doc_strs = sorted( fused_scores, key=lambda d: fused_scores[d], reverse=True) return [documents[doc_str] for doc_str in reranked_doc_strs]
- HyDE
Hypothetical Document Embeddings
create hypothetical document based on user’s query, embed it, and retrieve relevant documents based on vector similarity
prompt_hyde = ChatPromptTemplate.from_template(""" Please write a passage to answer the question.\n Question: {question} \n Passage: """) prompt = ChatPromptTemplate.from_template(""" Answer the following question based on this context: {context} Question: {question} """) generate_doc = prompt | llm | StrOutputParser() retrieval_chain = generate_doc | retriever @chain def qa(input): docs = retrieval_chain.invoke(input) formatted = prompt.invoke({"context": docs, "question": input}) answer = llm.invoke(formatted) return answer qa.invoke("Question")
Query Routing#
to forward user’s query to the relevant data source
- Logical Routing
let LLM decide which data source to apply based on the query
function-calling models are used to help classify each query
need to define a schema that the model can use to generate arguments of a function based on the query
extracted data source can be passed into other functions for additional logic
suitable when a defined list of data sources is available
from pydantic import BaseModel, Field from typing import Literal class RouteQuery(BaseModel): datasource: Literal["source_1", "source_2"] = Field( ..., description="""Given a user question, choose which datasource would be most relevant for answering their question """) def choose_route(result): if "source_1" in result.datasource.lower(): return "chain for source_1" else: return "chain for source_2" structured_llm = llm.with_structured_output(RouteQuery) system = """You are an expert at routing a user question to the appropriate data source. Based on the programming language the question is referring to, route it to the relevant data source. """ prompt = ChatPromptTemplate.from_messages( [ ("system", system), ("human", "{question}") ] ) router = prompt | structured_llm question = "Question" # chaining for additional logic full_chain = router | RunnableLambda(choose_route) result = full_chain.invoke({"question": question})
- Semantic Routing
embedding various prompts of various data sources with the query, and doing vector similarity search for the most similar prompt
from langchain_core.prompts import PromptTemplate from langchain.utils.math import cosine_similarity template_1 = """Template 1 Here is a question: {query} """ template_2 = """Template 2 Here is a question: {query} """ prompt_templates = [template_1, template_2] prompt_embeddings = embedding_model.embed_documents(prompt_templates) @chain def prompt_router(query): query_embedding = embedding_model.embed_query(query) similarity = cosine_similarity([query_embedding], prompt_embeddings)[0] most_similar = prompt_templates[similarity.argmax()] return PromptTemplate.from_template(most_similar) semantic_router = ( prompt_router | llm | StrOutputParser() ) semantic_router.invoke("Question")
Query Construction#
convert natural language query into language of database or data source
- Text-to-Metadata Filter
can attach metadata key-value pairs to vectors in an index during embedding process
filter expressions will be used during query
SelfQueryRetriever
uses LLM to extract and execute relevant metadata filters based on user’s query and predefined metadata schemaretriever will send query generation prompt, parse metadata filter and rewritten query, convert the metadata filter for vector store, and run similarity search against the vector store
from langchain.chains.query_constructor.schema import AttributeInfo from langchain.retrievers.self_query.base import SelfQueryRetriever fields = [ AttributeInfo( name="NAME", description="DESC", type="string or list[string]" ), ] description = "DESC" retriever = SelfQueryRetriever.from_llm(llm, db, description, fields) retriever.invoke("Question")
- Text-to-SQL
Database description: provide LLM with accurate description of the database, such as
CREATE TABLE
description for each table with column names and types, and can also include example rows from the tableFew-shot examples: append standard static examples in the prompt to guide the agent on how it should build queries based on questions
always run queries with a user with read-only permissions
database user running the queries should have access only to the necessary tables
add a time-out to the queries to protect from expensive query
from langchain_community.tools.sql_database.tool import QuerySQLDatabaseTool from langchain_community.utilities import SQLDatabase from langchain.chains.sql_database.query import create_sql_query_chain db = SQLDatabase.from_uri(connection) write_query = create_sql_query_chain(llm, db) execute_query = QuerySQLDatabaseTool(db=db) chain = write_query | execute_query chain.invoke('Question')
LangGraph#
Graph#
LangGraph is an open source library by LangChain
enable developers to implement multiactor, multistep, and stateful cognitive architectures called graphs
State: data received from outside, modified and produced by the app
Node: Python or JavaScript functions, receiving current state and returning updated state
Edge: connection between nodes, can be fixed path or conditional
need to define the state of the graph first
state keys without an annotation will be overwritten
without explicit instruction, execution is stopped when there’s no more nodes to run
graph is compiled into a runnable object
from typing import Annotated, TypedDict from langchain_core.messages import HumanMessage from langchain_openai import ChatOpenAI from langgraph.graph import END, START, StateGraph from langgraph.graph.message import add_messages class State(TypedDict): messages: Annotated[list, add_messages] builder = StateGraph(State) llm = ChatOpenAI(model="gpt-3.5-turbo") def chatbot(state: State): answer = llm.invoke(state["messages"]) return {"messages": [answer]} builder.add_node("chatbot", chatbot) builder.add_edge(START, "chatbot") builder.add_edge("chatbot", END) graph = builder.compile() input = {"messages": [HumanMessage('hi!')]} for chunk in graph.stream(input): print(chunk)
Memory#
LLMs are stateless, with prior prompt or model response is lost with a new response
including previous conversations and context in the final prompt can give memory
chat history can be stored as a list of messages, append recent messages after each turn, or append into prompt by inserting the messages into the prompt
appending chat history in the prompt have scalability issues
Checkpointer: storage adapter for in-memory, SQLite, Postgres, Redis, and MySQL
Thread: also called interaction, auto created when first used
from langgraph.checkpoint.memory import MemorySaver from langchain_core.runnables.config import RunnableConfig # stores the state at the end of each step graph = builder.compile(checkpointer=MemorySaver()) thread_1 = RunnableConfig({"configurable": {"thread_id": "1"}}) result_1 = graph.invoke( {"messages": [HumanMessage("hi, my name is Jack!")]}, thread_1 ) result_2 = graph.invoke( {"messages": [HumanMessage("what is my name?")]}, thread_1)
Multiactor#
application with multiple actors needs a coordination layer to define actors, hand off work, and schedule execution of each actor
each actor should help update a single central state
with a single central state, a snapshot can be made, execution can be paused and human-in-the-loop control can be implemented
Chat History#
chat history messages should be in a format to generate accurate response from the model
- Trimming Messages
limit the number of messages that are retrieved from history and appended to the prompt
ideal to load and store the most recent messages
trim_messages()
: can specify how many tokens to keep or remove from chat history, and has different strategiesfrom langchain_core.messages import (AIMessage, HumanMessage, SystemMessage, trim_messages) from langchain_openai import ChatOpenAI trimmer = trim_messages( max_tokens=65, strategy="last", # prioritise most recent token_counter=ChatOpenAI(model="gpt-4o"), # use tokeniser appropriate to that model include_system=True, # keep system message allow_partial=False, # to cut the last message's content to fit or not start_on="human" # never remove AIMessage without removing corresponding HumanMessage ) messages = [ SystemMessage(content="you're a good assistant"), HumanMessage(content="hi! i'm bob"), AIMessage(content="hi"), HumanMessage(content="i like vanilla ice cream"), AIMessage(content="nice"), HumanMessage(content="what's 2 + 2?"), AIMessage(content="4"), HumanMessage(content="thanks"), AIMessage(content="no problem!"), HumanMessage(content="having fun?"), AIMessage(content="yes"), ] trimmer.invoke(messages)
- Filtering Messages
filter_messages()
: filter by type, ID, or namecan also be composed with other components in a chain
from langchain_core.messages import filter_messages messages = [ SystemMessage(content="you're a good assistant", id="1"), HumanMessage(content="hi! i'm bob", id="2"), AIMessage(content="hi", id="3"), HumanMessage(content="i like vanilla ice cream", name="bob", id="4"), AIMessage(content="nice", id="5"), HumanMessage(content="what's 2 + 2?", name="alice", id="6"), AIMessage(content="4", id="7"), HumanMessage(content="thanks", name="alice", id="8"), AIMessage(content="no problem!", id="9"), HumanMessage(content="having fun?", name="bob", id="10"), AIMessage(content="yes", id="11"), ] filter_messages(messages, include_types="human") filter_ = filter_messages(messages, include_types=[ HumanMessage, AIMessage], exclude_ids=["3"]) chain = filter_ | model
- Merging Consecutive Messages
models such as Anthropic chat models do not support consecutive messages of the same type
merge_message_runs()
: allows to merge consecutive messages of the same typea list will be merged as a list
can also be composed with other components in a chain
from langchain_core.messages import merge_message_runs messages = [ SystemMessage(content="you're a good assistant"), SystemMessage(content="you always respond with a joke"), HumanMessage( [{"type": "text", "text": "hello"}] ), HumanMessage("world") ] merger_ = merge_message_runs(messages) # SystemMessage(content="you're a good assistant\nyou always respond with a joke"), # HumanMessage(content=[{"type": "text", "text": "hello"}, "world"] chain = merger_ | model
Subgraphs#
graphs that are used as part of another graph
to build multi-agent systems, reuse a set of nodes in multiple graphs, and let different teams to work on different parts of the graph
- Direct Subgraph Call
adding a node that calls the subgraph directly to the parent
both should share state keys to communicate, and do not need to transform state
passing extra keys to the subgraph node will be ignored
extra keys from the subgraph will be ignored by the parent
class State(TypedDict): foo: str # shared with subgraph class SubgraphState(TypedDict): foo: str # shared with parent bar: str def subgraph_node(state: SubgraphState): return {"foo": state["foo"] + "bar"} subgraph_builder = StateGraph(SubgraphState) subgraph_builder.add_node(subgraph_node) subgraph = subgraph_builder.compile() builder = StateGraph(State) builder.add_node("subgraph", subgraph) graph = builder.compile()
- Function Subgraph Call
adding a node with a function that invokes the subgraph to the parent
both with different state schemas
function needs to transform parent state to the subgraph state before invoking the subgraph and transform the result back to the parent state before returning
class State(TypedDict): foo: str class SubgraphState(TypedDict): bar: str baz: str def subgraph_node(state: SubgraphState): return {"bar": state["bar"] + "baz"} def node(state: State): response = subgraph.invoke({"bar": state["foo"]}) return {"foo": response["bar"]} subgraph_builder = StateGraph(SubgraphState) subgraph_builder.add_node(subgraph_node) subgraph = subgraph_builder.compile() builder = StateGraph(State) builder.add_node(node) graph = builder.compile()
LangGraph Platform#
managed service to deploy and host LangGraph agents
horizontally scales task queues, servers, and a Postgres checkpointer for efficiency
allows collaboration of deploying and monitoring agentic AI apps
LangGraph Studio: to debug, edit and test agents visually, can share agent with team members
Data Models#
- Assistants
configured instance of
CompiledGraph
has instance-specific configuration and metadata
multiple assistants can reference the same graph, but have different configuration and metadata
- Threads
contains state collection of a group of runs
checkpoint: state of a thread at particular time
state of the underlying graph of the assistant will be persisted to the thread
current and historical state can be retrieved
a thread needs to be created before executing a run to persist state
- Runs
invocation of an assistant
each run can have its own input, configuration and metadata
can be executed on a thread
- Cron Jobs
allow to run graphs on a schedule
user must specify schedule, assistant, and input
a new thread will be created and given the input to run
Features#
- Streaming
streaming mode determines what data is streamed back to the client
Values: stream full state of the graph after each super-step is executed
Messages: stream complete messages and tokens, mostly for chat apps, and can only use this mode if graph contains a
messages
keyUpdates: stream state updates of the graph after each node execution
Events: stream all events during graph execution, can be used to do token-by-token streaming for LLMs
Debug: stream debug events during graph execution
- Human-in-the-loop
LangGraph Platform allows human intervention to prevent unwanted outcomes
- Double Texting
Reject: reject and does not allow double texting
Enqueue: complete the first run, and sends the new input as separate run
Interrupt: save and interrupt current execution, and continue to run with new input
Rollback: roll back all work and run with new input
- Stateless Runs
take the input, create a thread, runs the agent without checkpoints, and clean the thread
stateless runs are retried while keeping memory intact
for background runs, entire run will be retried if the task worker dies halfway
- Webhooks
LangGraph Platform supports completion webhooks
Cognitive Architectures#
Degree of Autonomy, `LLM Call Architectures`_, Chain Architecture, Router Architecture
cognitive architectures can be called a recipe for the steps to be taken by an LLM app
Agency: capacity to act autonomously
Reliability: degree to which agency’s outputs can be trusted
Major Architectures: Code (does not use LLMs, same as regular software), LLM Call, Chain, Router, State Machine, Autonomous
Degree of Autonomy#
measure by evaluating how much of the app behaviour is determined by LLM
check if LLM has decided the output of a step, the next step to take, and what steps are available to take
LLM Call Architecture#
one LLM call only, useful when a large app make use of LLM
builder = StateGraph(State) builder.add_node("chatbot", lambda state: chatbot( state, llm)) builder.add_edge(START, "chatbot") builder.add_edge("chatbot", END)
Chain Architecture#
multiple LLM calls in a predefined sequence, also called flow engineering
builder = StateGraph(State, input_schema=Input, output_schema=Output) builder.add_node("generate_sql", lambda state: generate_sql( state, llm_low_temp, generate_prompt)) # type: ignore builder.add_node("explain_sql", lambda state: explain_sql( state, llm_high_temp, explain_prompt)) # type: ignore builder.add_edge(START, "generate_sql") builder.add_edge("generate_sql", "explain_sql") builder.add_edge("explain_sql", END)
Router Architecture#
using LLM to define the sequence of steps to take
def router_node(state: State, llm, prompt) -> State: user_message = HumanMessage(state["user_query"]) messages = [prompt, *state["messages"], user_message] res = llm.invoke(messages) return { "domain": res.content, "messages": [user_message, res] } def pick_retriever(state: State) -> Literal["retrieve_medical_records", "retrieve_insurance_faqs"]: if state["domain"] == "records": return "retrieve_medical_records" else: return "retrieve_insurance_faqs" builder = StateGraph(State, input_schema=Input, output_schema=Output) builder.add_node("router", lambda state: router_node( state, llm_low_temp, router_prompt)) builder.add_node("retrieve_medical_records", lambda state: retrieve_medical_records( state, medical_records_retriever)) builder.add_node("retrieve_insurance_faqs", lambda state: retrieve_insurance_faqs( state, insurance_faqs_retriever)) builder.add_node("generate_answer", lambda state: generate_answer( state, llm_high_temp, medical_records_prompt, insurance_faqs_prompt)) builder.add_edge(START, "router") builder.add_conditional_edges("router", pick_retriever) builder.add_edge("retrieve_medical_records", "generate_answer") builder.add_edge("retrieve_insurance_faqs", "generate_answer") builder.add_edge("generate_answer", END)
Agent Architecture#
Standard Agent, Always Tool Calling First, Managing Multiple Tools, Reflection, Multi-agent
Agent: something that acts
uses an LLM to pick from one or more possible courses of action, given context of current or desired next state
implemented by combining Tool Calling and Chain-of-Thought prompting techniques
LLM-driven Loop: plan actions and execute, LLM will decide when to stop looping
use a conditional edge to implement a loop as it can end the graph
Standard Agent#
LLM is always called first to decide a tool, adapting the behaviour to each user query
but flexibility can also cause unpredictability
import ast from typing import Annotated, TypedDict from langchain_community.tools import DuckDuckGoSearchRun from langchain_core.messages import HumanMessage from langchain_core.runnables import Runnable from langchain_core.tools import tool from langchain_openai import ChatOpenAI from langgraph.graph import START, StateGraph from langgraph.graph.message import add_messages from langgraph.prebuilt import ToolNode, tools_condition class State(TypedDict): messages: Annotated[list, add_messages] @tool def calculator(query: str) -> str: """A simple calculator tool, Input should be a mathematical expression.""" return ast.literal_eval(query) def llm_node(state: State, llm) -> State: res = llm.invoke(state["messages"]) return {"messages": res} search = DuckDuckGoSearchRun() tools = [search, calculator] llm: Runnable = ChatOpenAI( model="gpt-4.1-mini", temperature=0).bind_tools(tools) builder = StateGraph(State) builder.add_node("llm", lambda state: llm_node(state, llm)) # type: ignore builder.add_node("tools", ToolNode(tools)) builder.add_edge(START, "llm") builder.add_conditional_edges("llm", tools_condition) builder.add_edge("tools", "llm") graph = builder.compile() input: State = { "messages": [ HumanMessage("""Question""") ] } for c in graph.stream(input): print(c)
Always Tool Calling First#
having a clear rule that certain tool should always be called first
can reduce overall latency, and prevent erroneous LLM decision
but it can also make worse if there is no clear rule
# does not call LLM, only creates a tool for the search tool def first_llm(state: State) -> State: query = state["messages"][-1].content search_tool_call = ToolCall(name="duckduckgo_search", args={ "query": query}, id=uuid4().hex) return { "messages": AIMessage(content="", tool_calls=[search_tool_call]) } builder.add_node("first_llm", lambda state: first_llm(state)) # type: ignore builder.add_node("llm", lambda state: llm_node(state, llm)) # type: ignore builder.add_node("tools", ToolNode(tools)) builder.add_edge(START, "first_llm") builder.add_edge("first_llm", "tools") builder.add_conditional_edges("llm", tools_condition) builder.add_edge("tools", "llm")
Managing Multiple Tools#
LLMs struggle to choose the right one when given many tools
can use a RAG step to preselect the most relevant tools for current query
giving LLM only a subset of tools can reduce cost, but RAG step adds latency
def llm_node(state: State, llm, tools) -> State: selected_tools = [ tool for tool in tools if tool.name in state["selected_tools"]] res = llm.bind_tools(selected_tools).invoke(state["messages"]) return {"messages": res} def select_tools(state: State, tools_retriever) -> State: query = state["messages"][-1].content tool_docs = tools_retriever.invoke(query) return { "selected_tools": [doc.metadata["name"] for doc in tool_docs] } embeddings = OpenAIEmbeddings() llm: Runnable = ChatOpenAI(model="gpt-4.1-mini", temperature=0) tools_retriever = InMemoryVectorStore.from_documents( [Document(tool.description, metadata={ "name": tool.name}) for tool in tools], embeddings ).as_retriever() builder = StateGraph(State) builder.add_node("select_tools", lambda state: select_tools( state, tools_retriever)) # type: ignore builder.add_node("llm", lambda state: llm_node( state, llm, tools)) # type: ignore builder.add_node("tools", ToolNode(tools)) builder.add_edge(START, "select_tools") builder.add_edge("select_tools", "llm") builder.add_conditional_edges("llm", tools_condition) builder.add_edge("tools", "llm")
Reflection#
also called self-critique
allowing LLM to analyse past output, including past reflections, and refine it
need to have a loop between a creator prompt and a reviser prompt
can be combined with other prompting techniques
always cost higher latency, but likely to increase the quality of final output
def generate(state: State, llm, prompt) -> State: ans = llm.invoke([prompt] + state["messages"]) return {"messages": [ans]} def reflect(state: State, llm, prompt) -> State: # invert the messages cls_map = {AIMessage: HumanMessage, HumanMessage: AIMessage} translated = [prompt, state["messages"][0]] + [ cls_map[msg.__class__](content=msg.content) # calling a constructor for msg in state["messages"][1:] ] ans = llm.invoke(translated) # treat output as human feedback for generator return {"messages": [HumanMessage(content=ans.content)]} def should_continue(state: State): if len(state["messages"]) > 6: return END else: return "reflect" builder.add_node("generate", lambda state: generate( state, llm, generate_prompt)) builder.add_node("reflect", lambda state: reflect( state, llm, reflection_prompt)) builder.add_edge(START, "generate") builder.add_conditional_edges("generate", should_continue, { "reflect": "reflect" # only explicit mapping shows on graph image }) builder.add_edge("reflect", "generate")
Multi-agent#
composed of multiple smaller, independent agents
prevents an agent with multiple tools to make poor decisions
agents can be a simple prompt, LLM call or complex as ReAct agent
- Network Strategy
agents can communicate, and any agent can decide which to call next
- Hierarchical Strategy
system with a supervisor of supervisors
for more complex control flows
- Custom Multi-Agent Workflow
each communicate with only a subset of agents
parts of the flow are deterministic
only selected agents can decide which others to call next
- Supervisor Strategy
each agents communicates with the supervisor agent
supervisor decides which agent to call next
supervisor agent can be an LLM call with tools
subagent can be its own graph with internal state and only outputs summary of its work
can make each subagent to decide to return output directly to user or not
LLM Patterns#
Structured Output, Streaming Output, Human in the Loop, Double Texting Modes
Agent: high agency, lower reliability
Chain: low agency, higher reliability
LLM apps should minimise latency (time to get final answer), autonomy (interruptions for human input), or variance (variation between invocations)
Structured Output#
LLM should produce output in a predefined format
different models implement different strategies
lower temperature is a good fit as it reduces the chance of LLM to produce invalid output
- Prompting
asking LLM to return output in desired format
not guaranteed for output to be in the format
- Tool Calling
available for LLMs fine-tuned to pick from a list of output schemas
need to give LLM a name, description, and schema for desired output format
- JSON Mode
available in LLMs enforced to output a valid JSON document
class Joke(BaseModel): setup: str = Field(description="The setup of the joke") punchline: str = Field(description="The punchline to the joke") llm = ChatOpenAI(model="gpt-4.1-mini") llm = llm.with_structured_output(Joke) llm.invoke("Tell me a joke about cats")
Streaming Output#
higher latency is acceptable if there is progress/intermediate output while the app is still running
- Stream Modes in LangGraph
updates
: default mode
values
: yield current state of the graph every time it changes, each set of nodes finishes executing
debug
: yields detailed events every time a graph changes
checkpoint
event: when a new checkpoint of current state is saved to the database
task
event: when a node is about to start running
task_result
events: when a node finishes runningstream modes can be combined
for c in graph.stream(input, stream_mode="updates"): print(c)
- Streaming Token-by-Token
useful for apps such as interactive chatbot
output = app.astream_events(input, version="v2") async for event in output: if event["event"] == "on_chat_model_stream": content = event["data"]["chunk"].content if content: print(content)
Human in the Loop#
higher-agency architectures can have human intervention of interrupting, approving, forking or undoing
store the state at the end of each step and combine the new input with the previous state by using check pointer in graph
the graph remembering the previous state is the key to human-in-the-loop
Control Modes: interrupt, authorise, resume, restart, edit state, fork
combine different control modes to get better applications
- Interrupt
using an event or signal allows to control interruption from outside of the running app
graph = builder.compile(checkpointer=MemorySaver()) event = asyncio.Event() config = {"configurable": {"thread_id": "1"}} async with aclosing(graph.astream(input, config) as stream): async for chunk in stream: if event.is_set(): break else: pass event.set()
- Authorise
defined to give control to the user every time a specific node is about to be called, usually used for tool confirmation
output = graph.astream(input, config, interrupt_before=["tools"]) async for c in output: # process output
- Resume
invoke the graph with null input to continue processing previous non-null input
output = graph.astream(None, config, interrupt_before=["tools"]) async for c in output: # process output
- Restart
invoke with new input to start a graph from the first node
will keep the current state, and merge it with new input
just change
thread_id
to start a new interaction from a blank stateconfig = {"configurable": {"thread_id": "1"}} output = graph.astream(new_input, config) async for c in output: # process output
- Edit State
update the state of the graph before resuming
inspect the state first and update accordingly
will create a new checkpoint with the update
state = graph.get_state(config) update = {} graph.update_state(config, update)
- Fork
use the past states to get alternative answer
history = [ state for state in graph.get_state_history(config) ] graph.invoke(None, history[2].config)
Double Texting Modes#
LLM may get new input before the previous one is processed
also called multitasking LLMs
- Refuse
simplest strategy to reject concurrent inputs
concurrency management is handed off to the caller
- Handle Independently
treat new inputs as independent invocations, creating new threads and producing output
user will receive as separate invocations, but can be scaled to large sizes
e.g. chatbot interacting with two different users concurrently
- Queue
inputs are queued and handled when current one is finished
can receive multiple concurrent requests, and will be handled sequentially
may take time to process the queue, which may grow unbounded and inputs can be stale
not useful when new inputs depend on previous answers
- Interrupt
stop current one and restart with the new input
previous input can be completely ignored
the completed state is kept but discard any pending state updates
keep the last completed step, along with current in-progress one
wait for current node to finish, but not the subsequent ones, save and interrupt
new input is handled quickly, reducing latency and stale outputs
the state needs to be designed to be stored partially
can have unpredictable final result as incomplete progress context might be used for the new input
- Fork & Merge
handle new input in parallel, forking the state of the thread, and merge the final states
state needs to be designed to be merged without conflicts
e.g., use conflict-free replicated data types (CRDTs), conflict resolution algorithms or manually resolve conflicts
Deployment#
Prompting Basics#
prompts help the model understand context and generate relevant answers to queries
prompt engineering: adapting an existing LLM for specific task
Temperature: controls the randomness of LLM output
prompting techniques are most useful when combined with others
LLMs#
- Fine-Tuned
created by taking base LLMs, and further train on a proprietary dataset for a specific task
- Instruction-Tuned
fine-tuned with task-specific datasets and RLHF
- Dialogue-Tuned
enhanced instruction-tuned LLMs
uses dialogue dataset and chat format
text is divided into parts associated with a role
System role: for instructions and framing the task
User role: actual task or question
Assistant role: for outputs of the model
Zero-Shot Prompting#
simply telling the LLM to perform the desired task
usually work for simple questions
will need to iterate on prompts and responses to get a reliable system
- Chain-of-Thought
instructing the model to take time to think step by step
prepending the prompt with instructions for the LLM to describe how it could arrive at the answer
- Retrieval-Augmented Generation
RAG: finding relevant context, and including them in the prompt
should be combined with CoT
- Tool Calling
prepending the prompt with a list of external functions LLM can use
developer should parse the output, and call functions that the LLM wants to use
Few-Shot Prompting#
providing LLM with examples of other questions and correct answers
enables LLM to learn how to perform a new task without going through additional training or fine-tuning
less powerful than fine-tuning, but more flexible and can do it at query time
- Static
include a predetermined list of a small number of examples in the prompt
- Dynamic
from a dataset of many examples, choose the most relevant ones for each new query