← Back to tutorials

LangChain LCEL: Advanced Patterns for Production AI Applications

Master LangChain Expression Language for composable, streaming AI pipelines

LangChain LCEL: Advanced Patterns for Production AI Applications

Why LCEL Over Legacy LangChain

Old LangChain: LLMChain, SequentialChain, RouterChain—separate classes with inconsistent interfaces. Hard to compose, hard to stream, hard to debug.

LCEL (LangChain Expression Language): pipe operator (|) composes any runnable components. Unified interface: every component has invoke(), stream(), batch(), ainvoke(), astream(), abatch(). Build chains like Unix pipes.

python

Old way

from langchain.chains import LLMChain chain = LLMChain(llm=llm, prompt=prompt)

LCEL way

chain = prompt | llm | output_parser

Every LCEL chain automatically supports: streaming, async execution, batching, retry logic.

Core LCEL Building Blocks

Runnables

Any callable that implements the Runnable interface: PromptTemplate, ChatOpenAI, OutputParser, Lambda functions (RunnableLambda), dictionaries (RunnableParallel).

python
from langchain_core.runnables import RunnableLambda, RunnableParallel

Lambda as runnable

double = RunnableLambda(lambda x: x * 2)

Parallel execution

parallel_chain = RunnableParallel({ "summary": summary_chain, "keywords": keyword_chain, "sentiment": sentiment_chain })

All three chains run in parallel, results merged into dict

Streaming

LCEL chains stream by default. Use astream() for async streaming:

python
async def stream_response(question: str):
    chain = prompt | llm | StrOutputParser()
    async for chunk in chain.astream({"question": question}):
        yield chunk  # Yields as tokens are generated

With streaming callbacks

from langchain.callbacks import StreamingStdOutCallbackHandler llm = ChatOpenAI(streaming=True, callbacks=[StreamingStdOutCallbackHandler()])

Advanced Patterns

Pattern 1: Dynamic Routing

python
from langchain_core.runnables import RunnableBranch

route = RunnableBranch( (lambda x: x["topic"] == "technical", technical_chain), (lambda x: x["topic"] == "billing", billing_chain), default_chain # fallback )

Alternatively, use LLM to determine route

router_chain = router_prompt | llm | JsonOutputParser()

def route_based_on_llm(input): route_decision = router_chain.invoke(input) if route_decision["route"] == "technical": return technical_chain.invoke(input) elif route_decision["route"] == "billing": return billing_chain.invoke(input) return general_chain.invoke(input)

intelligent_router = RunnableLambda(route_based_on_llm)

Pattern 2: Retry and Fallback

python
from langchain_core.runnables import RunnableWithFallbacks

Retry on failure

chain_with_retry = chain.with_retry( retry_if_exception_type=(RateLimitError,), stop_after_attempt=3, wait_exponential_jitter=True )

Fallback to different model if primary fails

primary_chain = prompt | ChatOpenAI(model="gpt-4o") fallback_chain = prompt | ChatOpenAI(model="gpt-4o-mini")

chain_with_fallback = primary_chain.with_fallbacks([fallback_chain])

Pattern 3: RAG Pipeline with LCEL

python
from langchain_core.runnables import RunnablePassthrough
from langchain.retrievers import EnsembleRetriever

Hybrid retriever (semantic + keyword)

retriever = EnsembleRetriever( retrievers=[vector_retriever, bm25_retriever], weights=[0.6, 0.4] )

def format_docs(docs): return "\n\n".join([d.page_content for d in docs])

rag_chain = ( {"context": retriever | format_docs, "question": RunnablePassthrough()} | rag_prompt | llm | StrOutputParser() )

Stream the answer as it's generated

async for token in rag_chain.astream("What is quantum computing?"): print(token, end="", flush=True)

Pattern 4: Multi-Step Reasoning Chain

python

Step 1: Extract key information

extract_chain = extract_prompt | llm | JsonOutputParser()

Step 2: Research each extracted topic

def research_topics(extracted: dict) -> dict: results = {} for topic in extracted["topics"]: results[topic] = research_chain.invoke({"topic": topic}) return {**extracted, "research": results}

Step 3: Synthesize final answer

synthesis_chain = synthesis_prompt | llm | StrOutputParser()

Full pipeline

pipeline = ( extract_chain | RunnableLambda(research_topics) | synthesis_chain )

Pattern 5: Tool Use with Streaming

python
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.tools import tool

@tool def search_database(query: str) -> str: """Search the company database for relevant information.""" return db.search(query)

@tool def send_email(to: str, subject: str, body: str) -> str: """Send an email to a customer.""" return email_client.send(to, subject, body)

agent = create_tool_calling_agent(llm, [search_database, send_email], agent_prompt) executor = AgentExecutor(agent=agent, tools=[search_database, send_email], verbose=True)

Stream agent execution

async for event in executor.astream_events({"input": "Research order #123 and email John the status"}): if event["event"] == "on_tool_end": print(f"Tool used: {event['name']}, result: {event['data']}") elif event["event"] == "on_chain_stream": print(event["data"]["chunk"], end="", flush=True)

Testing LCEL Chains

Unit Testing with Mocked LLMs

python
from langchain_core.runnables import RunnableLambda

def test_classification_chain(): # Mock LLM that returns deterministic output mock_llm = RunnableLambda(lambda _: "positive") test_chain = classification_prompt | mock_llm | StrOutputParser() result = test_chain.invoke({"text": "I love this product!"}) assert result == "positive"

Integration Testing with Recording

Use LangSmith to record real LLM calls, then replay for regression testing:

  • Record: run chain against real LLM, save inputs/outputs to LangSmith dataset
  • Replay: run chain against recorded inputs, compare outputs to saved responses
  • Alert: flag when output changes significantly
  • Performance Optimization

    Batch processing: LCEL batch() method runs multiple inputs concurrently with configurable concurrency:

    python
    results = chain.batch(inputs, config={"max_concurrency": 20})
    

    Caching: add caching layer to avoid redundant LLM calls:

    python
    from langchain.globals import set_llm_cache
    from langchain.cache import SQLiteCache

    set_llm_cache(SQLiteCache(database_path=".langchain.db"))

    Identical inputs return cached results

    Streaming for UX: even if backend processing takes 5 seconds, streaming gives users immediate feedback. Always use streaming for user-facing chat applications.

    Also available in 中文.