LangChain LCEL: Advanced Patterns for Production AI Applications
Master LangChain Expression Language for composable, streaming AI pipelines
LangChain LCEL: Advanced Patterns for Production AI Applications
Why LCEL Over Legacy LangChain
Old LangChain: LLMChain, SequentialChain, RouterChain—separate classes with inconsistent interfaces. Hard to compose, hard to stream, hard to debug.
LCEL (LangChain Expression Language): pipe operator (|) composes any runnable components. Unified interface: every component has invoke(), stream(), batch(), ainvoke(), astream(), abatch(). Build chains like Unix pipes.
python
Old way
from langchain.chains import LLMChain
chain = LLMChain(llm=llm, prompt=prompt)LCEL way
chain = prompt | llm | output_parser
Every LCEL chain automatically supports: streaming, async execution, batching, retry logic.
Core LCEL Building Blocks
Runnables
Any callable that implements the Runnable interface: PromptTemplate, ChatOpenAI, OutputParser, Lambda functions (RunnableLambda), dictionaries (RunnableParallel).python
from langchain_core.runnables import RunnableLambda, RunnableParallelLambda as runnable
double = RunnableLambda(lambda x: x * 2)Parallel execution
parallel_chain = RunnableParallel({
"summary": summary_chain,
"keywords": keyword_chain,
"sentiment": sentiment_chain
})
All three chains run in parallel, results merged into dict
Streaming
LCEL chains stream by default. Use astream() for async streaming:python
async def stream_response(question: str):
chain = prompt | llm | StrOutputParser()
async for chunk in chain.astream({"question": question}):
yield chunk # Yields as tokens are generatedWith streaming callbacks
from langchain.callbacks import StreamingStdOutCallbackHandler
llm = ChatOpenAI(streaming=True, callbacks=[StreamingStdOutCallbackHandler()])
Advanced Patterns
Pattern 1: Dynamic Routing
python
from langchain_core.runnables import RunnableBranchroute = RunnableBranch(
(lambda x: x["topic"] == "technical", technical_chain),
(lambda x: x["topic"] == "billing", billing_chain),
default_chain # fallback
)
Alternatively, use LLM to determine route
router_chain = router_prompt | llm | JsonOutputParser()def route_based_on_llm(input):
route_decision = router_chain.invoke(input)
if route_decision["route"] == "technical":
return technical_chain.invoke(input)
elif route_decision["route"] == "billing":
return billing_chain.invoke(input)
return general_chain.invoke(input)
intelligent_router = RunnableLambda(route_based_on_llm)
Pattern 2: Retry and Fallback
python
from langchain_core.runnables import RunnableWithFallbacksRetry on failure
chain_with_retry = chain.with_retry(
retry_if_exception_type=(RateLimitError,),
stop_after_attempt=3,
wait_exponential_jitter=True
)Fallback to different model if primary fails
primary_chain = prompt | ChatOpenAI(model="gpt-4o")
fallback_chain = prompt | ChatOpenAI(model="gpt-4o-mini")chain_with_fallback = primary_chain.with_fallbacks([fallback_chain])
Pattern 3: RAG Pipeline with LCEL
python
from langchain_core.runnables import RunnablePassthrough
from langchain.retrievers import EnsembleRetrieverHybrid retriever (semantic + keyword)
retriever = EnsembleRetriever(
retrievers=[vector_retriever, bm25_retriever],
weights=[0.6, 0.4]
)def format_docs(docs):
return "\n\n".join([d.page_content for d in docs])
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| rag_prompt
| llm
| StrOutputParser()
)
Stream the answer as it's generated
async for token in rag_chain.astream("What is quantum computing?"):
print(token, end="", flush=True)
Pattern 4: Multi-Step Reasoning Chain
python
Step 1: Extract key information
extract_chain = extract_prompt | llm | JsonOutputParser()Step 2: Research each extracted topic
def research_topics(extracted: dict) -> dict:
results = {}
for topic in extracted["topics"]:
results[topic] = research_chain.invoke({"topic": topic})
return {**extracted, "research": results}Step 3: Synthesize final answer
synthesis_chain = synthesis_prompt | llm | StrOutputParser()Full pipeline
pipeline = (
extract_chain
| RunnableLambda(research_topics)
| synthesis_chain
)
Pattern 5: Tool Use with Streaming
python
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.tools import tool@tool
def search_database(query: str) -> str:
"""Search the company database for relevant information."""
return db.search(query)
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""Send an email to a customer."""
return email_client.send(to, subject, body)
agent = create_tool_calling_agent(llm, [search_database, send_email], agent_prompt)
executor = AgentExecutor(agent=agent, tools=[search_database, send_email], verbose=True)
Stream agent execution
async for event in executor.astream_events({"input": "Research order #123 and email John the status"}):
if event["event"] == "on_tool_end":
print(f"Tool used: {event['name']}, result: {event['data']}")
elif event["event"] == "on_chain_stream":
print(event["data"]["chunk"], end="", flush=True)
Testing LCEL Chains
Unit Testing with Mocked LLMs
python
from langchain_core.runnables import RunnableLambdadef test_classification_chain():
# Mock LLM that returns deterministic output
mock_llm = RunnableLambda(lambda _: "positive")
test_chain = classification_prompt | mock_llm | StrOutputParser()
result = test_chain.invoke({"text": "I love this product!"})
assert result == "positive"
Integration Testing with Recording
Use LangSmith to record real LLM calls, then replay for regression testing:
Performance Optimization
Batch processing: LCEL batch() method runs multiple inputs concurrently with configurable concurrency:
python
results = chain.batch(inputs, config={"max_concurrency": 20})
Caching: add caching layer to avoid redundant LLM calls:
python
from langchain.globals import set_llm_cache
from langchain.cache import SQLiteCacheset_llm_cache(SQLiteCache(database_path=".langchain.db"))
Identical inputs return cached results
Streaming for UX: even if backend processing takes 5 seconds, streaming gives users immediate feedback. Always use streaming for user-facing chat applications.
Also available in 中文.