LangChain LCEL: Advanced Patterns for Production AI Applications
Master LangChain Expression Language for composable, streaming AI pipelines
LangChain LCEL: Advanced Patterns for Production AI Applications
Master LangChain Expression Language for composable, streaming AI pipelines
LangChain Expression Language (LCEL) is the modern way to build composable LLM pipelines. This guide covers advanced LCEL patterns: parallel execution, streaming, dynamic routing, conditional chains, retry and fallback logic, tool use orchestration, and testing strategies. Includes production patterns for RAG applications, multi-step agents, and complex data transformation pipelines with real performance benchmarks.
LangChain LCEL: Advanced Patterns for Production AI Applications
Why LCEL Over Legacy LangChain
Old LangChain: LLMChain, SequentialChain, RouterChain—separate classes with inconsistent interfaces. Hard to compose, hard to stream, hard to debug.
LCEL (LangChain Expression Language): pipe operator (|) composes any runnable components. Unified interface: every component has invoke(), stream(), batch(), ainvoke(), astream(), abatch(). Build chains like Unix pipes.
python
Old way
from langchain.chains import LLMChain
chain = LLMChain(llm=llm, prompt=prompt)LCEL way
chain = prompt | llm | output_parser
Every LCEL chain automatically supports: streaming, async execution, batching, retry logic.
Core LCEL Building Blocks
Runnables
Any callable that implements the Runnable interface: PromptTemplate, ChatOpenAI, OutputParser, Lambda functions (RunnableLambda), dictionaries (RunnableParallel).python
from langchain_core.runnables import RunnableLambda, RunnableParallelLambda as runnable
double = RunnableLambda(lambda x: x * 2)Parallel execution
parallel_chain = RunnableParallel({
"summary": summary_chain,
"keywords": keyword_chain,
"sentiment": sentiment_chain
})
All three chains run in parallel, results merged into dict
Streaming
LCEL chains stream by default. Use astream() for async streaming:python
async def stream_response(question: str):
chain = prompt | llm | StrOutputParser()
async for chunk in chain.astream({"question": question}):
yield chunk # Yields as tokens are generatedWith streaming callbacks
from langchain.callbacks import StreamingStdOutCallbackHandler
llm = ChatOpenAI(streaming=True, callbacks=[StreamingStdOutCallbackHandler()])
Advanced Patterns
Pattern 1: Dynamic Routing
python
from langchain_core.runnables import RunnableBranchroute = RunnableBranch(
(lambda x: x["topic"] == "technical", technical_chain),
(lambda x: x["topic"] == "billing", billing_chain),
default_chain # fallback
)
Alternatively, use LLM to determine route
router_chain = router_prompt | llm | JsonOutputParser()def route_based_on_llm(input):
route_decision = router_chain.invoke(input)
if route_decision["route"] == "technical":
return technical_chain.invoke(input)
elif route_decision["route"] == "billing":
return billing_chain.invoke(input)
return general_chain.invoke(input)
intelligent_router = RunnableLambda(route_based_on_llm)
Pattern 2: Retry and Fallback
python
from langchain_core.runnables import RunnableWithFallbacksRetry on failure
chain_with_retry = chain.with_retry(
retry_if_exception_type=(RateLimitError,),
stop_after_attempt=3,
wait_exponential_jitter=True
)Fallback to different model if primary fails
primary_chain = prompt | ChatOpenAI(model="gpt-4o")
fallback_chain = prompt | ChatOpenAI(model="gpt-4o-mini")chain_with_fallback = primary_chain.with_fallbacks([fallback_chain])
Pattern 3: RAG Pipeline with LCEL
python
from langchain_core.runnables import RunnablePassthrough
from langchain.retrievers import EnsembleRetrieverHybrid retriever (semantic + keyword)
retriever = EnsembleRetriever(
retrievers=[vector_retriever, bm25_retriever],
weights=[0.6, 0.4]
)def format_docs(docs):
return "\n\n".join([d.page_content for d in docs])
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| rag_prompt
| llm
| StrOutputParser()
)
Stream the answer as it's generated
async for token in rag_chain.astream("What is quantum computing?"):
print(token, end="", flush=True)
Pattern 4: Multi-Step Reasoning Chain
python
Step 1: Extract key information
extract_chain = extract_prompt | llm | JsonOutputParser()Step 2: Research each extracted topic
def research_topics(extracted: dict) -> dict:
results = {}
for topic in extracted["topics"]:
results[topic] = research_chain.invoke({"topic": topic})
return {**extracted, "research": results}Step 3: Synthesize final answer
synthesis_chain = synthesis_prompt | llm | StrOutputParser()Full pipeline
pipeline = (
extract_chain
| RunnableLambda(research_topics)
| synthesis_chain
)
Pattern 5: Tool Use with Streaming
python
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.tools import tool@tool
def search_database(query: str) -> str:
"""Search the company database for relevant information."""
return db.search(query)
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""Send an email to a customer."""
return email_client.send(to, subject, body)
agent = create_tool_calling_agent(llm, [search_database, send_email], agent_prompt)
executor = AgentExecutor(agent=agent, tools=[search_database, send_email], verbose=True)
Stream agent execution
async for event in executor.astream_events({"input": "Research order #123 and email John the status"}):
if event["event"] == "on_tool_end":
print(f"Tool used: {event['name']}, result: {event['data']}")
elif event["event"] == "on_chain_stream":
print(event["data"]["chunk"], end="", flush=True)
Testing LCEL Chains
Unit Testing with Mocked LLMs
python
from langchain_core.runnables import RunnableLambdadef test_classification_chain():
# Mock LLM that returns deterministic output
mock_llm = RunnableLambda(lambda _: "positive")
test_chain = classification_prompt | mock_llm | StrOutputParser()
result = test_chain.invoke({"text": "I love this product!"})
assert result == "positive"
Integration Testing with Recording
Use LangSmith to record real LLM calls, then replay for regression testing:
Performance Optimization
Batch processing: LCEL batch() method runs multiple inputs concurrently with configurable concurrency:
python
results = chain.batch(inputs, config={"max_concurrency": 20})
Caching: add caching layer to avoid redundant LLM calls:
python
from langchain.globals import set_llm_cache
from langchain.cache import SQLiteCacheset_llm_cache(SQLiteCache(database_path=".langchain.db"))
Identical inputs return cached results
Streaming for UX: even if backend processing takes 5 seconds, streaming gives users immediate feedback. Always use streaming for user-facing chat applications.
相关工具
相关教程
From simple document Q&A to enterprise-grade RAG systems that actually work
The practical guide to fine-tuning language models for specific tasks and domains
Which AI agent framework should you choose for production applications in 2025?