LangChain LCEL: Advanced Patterns for Production AI Applications

Master LangChain Expression Language for composable, streaming AI pipelines

返回教程列表
高级38 分钟

LangChain LCEL: Advanced Patterns for Production AI Applications

Master LangChain Expression Language for composable, streaming AI pipelines

LangChain Expression Language (LCEL) is the modern way to build composable LLM pipelines. This guide covers advanced LCEL patterns: parallel execution, streaming, dynamic routing, conditional chains, retry and fallback logic, tool use orchestration, and testing strategies. Includes production patterns for RAG applications, multi-step agents, and complex data transformation pipelines with real performance benchmarks.

LangChainLCELLLM pipelinesAI engineeringPython

LangChain LCEL: Advanced Patterns for Production AI Applications

Why LCEL Over Legacy LangChain

Old LangChain: LLMChain, SequentialChain, RouterChain—separate classes with inconsistent interfaces. Hard to compose, hard to stream, hard to debug.

LCEL (LangChain Expression Language): pipe operator (|) composes any runnable components. Unified interface: every component has invoke(), stream(), batch(), ainvoke(), astream(), abatch(). Build chains like Unix pipes.

python

Old way

from langchain.chains import LLMChain chain = LLMChain(llm=llm, prompt=prompt)

LCEL way

chain = prompt | llm | output_parser

Every LCEL chain automatically supports: streaming, async execution, batching, retry logic.

Core LCEL Building Blocks

Runnables

Any callable that implements the Runnable interface: PromptTemplate, ChatOpenAI, OutputParser, Lambda functions (RunnableLambda), dictionaries (RunnableParallel).

python
from langchain_core.runnables import RunnableLambda, RunnableParallel

Lambda as runnable

double = RunnableLambda(lambda x: x * 2)

Parallel execution

parallel_chain = RunnableParallel({ "summary": summary_chain, "keywords": keyword_chain, "sentiment": sentiment_chain })

All three chains run in parallel, results merged into dict

Streaming

LCEL chains stream by default. Use astream() for async streaming:

python
async def stream_response(question: str):
    chain = prompt | llm | StrOutputParser()
    async for chunk in chain.astream({"question": question}):
        yield chunk  # Yields as tokens are generated

With streaming callbacks

from langchain.callbacks import StreamingStdOutCallbackHandler llm = ChatOpenAI(streaming=True, callbacks=[StreamingStdOutCallbackHandler()])

Advanced Patterns

Pattern 1: Dynamic Routing

python
from langchain_core.runnables import RunnableBranch

route = RunnableBranch( (lambda x: x["topic"] == "technical", technical_chain), (lambda x: x["topic"] == "billing", billing_chain), default_chain # fallback )

Alternatively, use LLM to determine route

router_chain = router_prompt | llm | JsonOutputParser()

def route_based_on_llm(input): route_decision = router_chain.invoke(input) if route_decision["route"] == "technical": return technical_chain.invoke(input) elif route_decision["route"] == "billing": return billing_chain.invoke(input) return general_chain.invoke(input)

intelligent_router = RunnableLambda(route_based_on_llm)

Pattern 2: Retry and Fallback

python
from langchain_core.runnables import RunnableWithFallbacks

Retry on failure

chain_with_retry = chain.with_retry( retry_if_exception_type=(RateLimitError,), stop_after_attempt=3, wait_exponential_jitter=True )

Fallback to different model if primary fails

primary_chain = prompt | ChatOpenAI(model="gpt-4o") fallback_chain = prompt | ChatOpenAI(model="gpt-4o-mini")

chain_with_fallback = primary_chain.with_fallbacks([fallback_chain])

Pattern 3: RAG Pipeline with LCEL

python
from langchain_core.runnables import RunnablePassthrough
from langchain.retrievers import EnsembleRetriever

Hybrid retriever (semantic + keyword)

retriever = EnsembleRetriever( retrievers=[vector_retriever, bm25_retriever], weights=[0.6, 0.4] )

def format_docs(docs): return "\n\n".join([d.page_content for d in docs])

rag_chain = ( {"context": retriever | format_docs, "question": RunnablePassthrough()} | rag_prompt | llm | StrOutputParser() )

Stream the answer as it's generated

async for token in rag_chain.astream("What is quantum computing?"): print(token, end="", flush=True)

Pattern 4: Multi-Step Reasoning Chain

python

Step 1: Extract key information

extract_chain = extract_prompt | llm | JsonOutputParser()

Step 2: Research each extracted topic

def research_topics(extracted: dict) -> dict: results = {} for topic in extracted["topics"]: results[topic] = research_chain.invoke({"topic": topic}) return {**extracted, "research": results}

Step 3: Synthesize final answer

synthesis_chain = synthesis_prompt | llm | StrOutputParser()

Full pipeline

pipeline = ( extract_chain | RunnableLambda(research_topics) | synthesis_chain )

Pattern 5: Tool Use with Streaming

python
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.tools import tool

@tool def search_database(query: str) -> str: """Search the company database for relevant information.""" return db.search(query)

@tool def send_email(to: str, subject: str, body: str) -> str: """Send an email to a customer.""" return email_client.send(to, subject, body)

agent = create_tool_calling_agent(llm, [search_database, send_email], agent_prompt) executor = AgentExecutor(agent=agent, tools=[search_database, send_email], verbose=True)

Stream agent execution

async for event in executor.astream_events({"input": "Research order #123 and email John the status"}): if event["event"] == "on_tool_end": print(f"Tool used: {event['name']}, result: {event['data']}") elif event["event"] == "on_chain_stream": print(event["data"]["chunk"], end="", flush=True)

Testing LCEL Chains

Unit Testing with Mocked LLMs

python
from langchain_core.runnables import RunnableLambda

def test_classification_chain(): # Mock LLM that returns deterministic output mock_llm = RunnableLambda(lambda _: "positive") test_chain = classification_prompt | mock_llm | StrOutputParser() result = test_chain.invoke({"text": "I love this product!"}) assert result == "positive"

Integration Testing with Recording

Use LangSmith to record real LLM calls, then replay for regression testing:

  • Record: run chain against real LLM, save inputs/outputs to LangSmith dataset
  • Replay: run chain against recorded inputs, compare outputs to saved responses
  • Alert: flag when output changes significantly
  • Performance Optimization

    Batch processing: LCEL batch() method runs multiple inputs concurrently with configurable concurrency:

    python
    results = chain.batch(inputs, config={"max_concurrency": 20})
    

    Caching: add caching layer to avoid redundant LLM calls:

    python
    from langchain.globals import set_llm_cache
    from langchain.cache import SQLiteCache

    set_llm_cache(SQLiteCache(database_path=".langchain.db"))

    Identical inputs return cached results

    Streaming for UX: even if backend processing takes 5 seconds, streaming gives users immediate feedback. Always use streaming for user-facing chat applications.

    相关工具

    langchainopenailangsmithpython