Orchestrazione Multi-Agent con LangGraph: Guida Pratica
Guida tecnica a LangGraph per costruire sistemi multi-agent robusti: StateGraph, nodi, edge condizionali, checkpointing e pattern di orchestrazione con codice Python reale.
Orchestrazione Multi-Agent con LangGraph: Guida Pratica
LangGraph è diventato lo standard de facto per costruire sistemi agentici complessi in Python. Non è la soluzione più semplice — ma è quella che scala bene in produzione.
In questa guida costruiamo un sistema multi-agent reale step by step, partendo dalla teoria del StateGraph fino al deploy in produzione con checkpointing.
Perché LangGraph e Non Solo LangChain?
LangChain è ottimo per catene lineari: A → B → C → risposta. LangGraph aggiunge i cicli: A → B → C → torna a A se la risposta non è soddisfacente.
Gli agenti, per definizione, richiedono cicli. ReAct è un ciclo. Un sistema con human-in-the-loop è un ciclo. Un agente che rivaluta il proprio piano è un ciclo. LangGraph gestisce tutto questo nativamente.
LangChain: A → B → C (pipeline lineare)
LangGraph:
A → B → C
↑ ↓
└── D ←─┘ (cicli, condizioni, parallelismo)
Concetti Fondamentali
State: Il Cuore del Sistema
Tutto in LangGraph ruota intorno allo State: un dizionario tipizzato che rappresenta il contesto corrente del sistema. Ogni nodo legge dallo state e ci scrive.
from typing import TypedDict, Annotated, list
import operator
class AgentState(TypedDict):
# Messaggio dell'utente originale
user_request: str
# Lista di messaggi (usare operator.add per append automatico)
messages: Annotated[list[dict], operator.add]
# Risultati delle ricerche
research_results: list[str]
# Piano corrente
current_plan: list[str]
# Iterazioni (per evitare loop infiniti)
iteration_count: int
# Output finale
final_answer: str
# Metadati per debugging
agent_trace: Annotated[list[str], operator.add]
Nodi: Le Unità di Lavoro
Un nodo è una funzione Python che prende lo state e ritorna un dizionario con gli aggiornamenti:
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import json
llm = ChatOpenAI(model="gpt-4o", temperature=0)
llm_mini = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def research_node(state: AgentState) -> dict:
"""
Nodo di ricerca: cerca informazioni rilevanti per il task.
"""
query = state["user_request"]
# Usa il modello per decidere cosa cercare
search_decision = llm_mini.invoke([
SystemMessage(content="Determina le query di ricerca ottimali per rispondere alla richiesta."),
HumanMessage(content=f"Richiesta: {query}\nElenca 3 query di ricerca in JSON array.")
])
try:
search_queries = json.loads(search_decision.content)
except:
search_queries = [query]
# Esegui le ricerche (stub)
results = []
for q in search_queries[:3]:
result = search_knowledge_base(q)
results.append(result)
return {
"research_results": results,
"agent_trace": [f"Research: trovati {len(results)} risultati per {len(search_queries)} query"],
}
def planning_node(state: AgentState) -> dict:
"""
Nodo di pianificazione: crea un piano basato sui risultati di ricerca.
"""
context = "\n".join(state["research_results"])
plan_response = llm.invoke([
SystemMessage(content="""Sei un pianificatore. Basandoti sul contesto fornito,
crea un piano step-by-step per rispondere alla richiesta dell'utente.
Restituisci un JSON array di stringhe, ogni stringa è uno step del piano."""),
HumanMessage(content=f"""
Richiesta: {state['user_request']}
Contesto disponibile:
{context}
Crea il piano:
""")
])
try:
plan = json.loads(plan_response.content)
except:
plan = [plan_response.content]
return {
"current_plan": plan,
"iteration_count": state.get("iteration_count", 0) + 1,
"agent_trace": [f"Planning: piano con {len(plan)} step creato"],
}
def execution_node(state: AgentState) -> dict:
"""
Nodo di esecuzione: esegue il piano e genera la risposta finale.
"""
plan_text = "\n".join([f"{i+1}. {step}" for i, step in enumerate(state["current_plan"])])
context = "\n".join(state["research_results"])
response = llm.invoke([
SystemMessage(content="Esegui il piano fornito e produci una risposta completa e accurata."),
HumanMessage(content=f"""
Richiesta originale: {state['user_request']}
Piano:
{plan_text}
Contesto:
{context}
Esegui il piano e rispondi:
""")
])
return {
"final_answer": response.content,
"agent_trace": ["Execution: risposta finale generata"],
}
def quality_check_node(state: AgentState) -> dict:
"""
Nodo di quality check: valuta se la risposta è soddisfacente.
"""
check_response = llm_mini.invoke([
HumanMessage(content=f"""
Richiesta originale: {state['user_request']}
Risposta generata: {state['final_answer']}
La risposta risponde completamente e accuratamente alla richiesta?
Rispondi SOLO con "APPROVATA" o "DA_RIFARE" seguito da una breve motivazione.
""")
])
is_approved = "APPROVATA" in check_response.content.upper()
return {
"agent_trace": [f"QualityCheck: {'approvata' if is_approved else 'da rifare'} - {check_response.content[:100]}"],
"messages": [{"role": "quality_check", "content": check_response.content, "approved": is_approved}],
}
Costruire il Grafo
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
def should_redo(state: AgentState) -> str:
"""
Edge condizionale: decide se ripetere o terminare.
"""
# Controlla l'ultimo quality check
messages = state.get("messages", [])
quality_messages = [m for m in messages if m.get("role") == "quality_check"]
if not quality_messages:
return "redo"
last_check = quality_messages[-1]
# Se approvata: vai alla fine
if last_check.get("approved"):
return "approved"
# Se troppe iterazioni: forza la fine
if state.get("iteration_count", 0) >= 3:
return "max_iterations"
return "redo"
# Costruzione del grafo
workflow = StateGraph(AgentState)
# Aggiungi i nodi
workflow.add_node("research", research_node)
workflow.add_node("planning", planning_node)
workflow.add_node("execution", execution_node)
workflow.add_node("quality_check", quality_check_node)
# Definisci il flusso
workflow.set_entry_point("research")
workflow.add_edge("research", "planning")
workflow.add_edge("planning", "execution")
workflow.add_edge("execution", "quality_check")
# Edge condizionale: dopo quality_check, approva o riparti
workflow.add_conditional_edges(
"quality_check",
should_redo,
{
"approved": END,
"max_iterations": END,
"redo": "planning", # Riparte dalla pianificazione con i risultati esistenti
}
)
# Checkpointing per persistenza
checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer)
Checkpointing: Lo Stato Sopravvive ai Crash
Il checkpointing è una delle killer feature di LangGraph: il grafo salva lo stato ad ogni step. Se qualcosa va storto, puoi riprendere esattamente da dove eri.
import uuid
def run_agent_with_checkpointing(user_request: str) -> dict:
"""
Esegue l'agente con checkpointing. Ogni run ha un thread_id univoco.
"""
thread_id = str(uuid.uuid4())
config = {
"configurable": {
"thread_id": thread_id,
}
}
initial_state = {
"user_request": user_request,
"messages": [],
"research_results": [],
"current_plan": [],
"iteration_count": 0,
"final_answer": "",
"agent_trace": [],
}
result = app.invoke(initial_state, config=config)
return {
"thread_id": thread_id,
"answer": result["final_answer"],
"iterations": result["iteration_count"],
"trace": result["agent_trace"],
}
def resume_from_checkpoint(thread_id: str, human_feedback: str = None) -> dict:
"""
Riprende l'esecuzione da uno stato salvato.
Utile per human-in-the-loop.
"""
config = {"configurable": {"thread_id": thread_id}}
# Recupera lo stato corrente
current_state = app.get_state(config)
if human_feedback:
# Aggiorna lo stato con il feedback umano
app.update_state(
config,
{"messages": [{"role": "human_feedback", "content": human_feedback}]}
)
# Riprendi l'esecuzione
result = app.invoke(None, config=config)
return result
Pattern: Human-in-the-Loop
LangGraph supporta nativamente l'interruzione del flusso per approvazione umana:
from langgraph.graph import StateGraph, END, interrupt_before
# Aggiungi un punto di interruzione prima dell'esecuzione
workflow_hitl = StateGraph(AgentState)
workflow_hitl.add_node("research", research_node)
workflow_hitl.add_node("planning", planning_node)
workflow_hitl.add_node("human_approval", lambda state: state) # Nodo passthrough
workflow_hitl.add_node("execution", execution_node)
workflow_hitl.set_entry_point("research")
workflow_hitl.add_edge("research", "planning")
workflow_hitl.add_edge("planning", "human_approval")
workflow_hitl.add_edge("human_approval", "execution")
workflow_hitl.add_edge("execution", END)
# Compila con interrupt_before per fermarsi prima di human_approval
app_hitl = workflow_hitl.compile(
checkpointer=MemorySaver(),
interrupt_before=["human_approval"] # Si ferma qui e aspetta
)
def run_with_human_approval(request: str) -> str:
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
# Step 1: Esegui fino all'interruzione
state = app_hitl.invoke(
{"user_request": request, "messages": [], "research_results": [],
"current_plan": [], "iteration_count": 0, "final_answer": "", "agent_trace": []},
config=config
)
# A questo punto l'esecuzione si è fermata. Mostra il piano all'umano.
current_state = app_hitl.get_state(config)
plan = current_state.values.get("current_plan", [])
print(f"\nPiano proposto dall'agente:")
for i, step in enumerate(plan):
print(f" {i+1}. {step}")
approval = input("\nApprovare il piano? (s/n): ")
if approval.lower() != "s":
print("Piano rifiutato. Agente interrotto.")
return None
# Step 2: Riprendi dopo approvazione
final_state = app_hitl.invoke(None, config=config)
return final_state["final_answer"]
Monitoring in Produzione con LangSmith
import os
from langsmith import Client as LangSmithClient
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key"
os.environ["LANGCHAIN_PROJECT"] = "my-agent-production"
# Da questo momento, ogni invocazione viene tracciata automaticamente in LangSmith
# Puoi vedere:
# - Ogni step del grafo con input/output
# - Token usati per ogni chiamata LLM
# - Latenza per ogni nodo
# - Errori e retry
# - Valutazione automatica delle risposte
def run_production_agent(request: str, user_id: str) -> dict:
"""
Run in produzione con tracing completo.
"""
from langsmith import traceable
@traceable(name="agent_run", tags=["production"])
def _run():
return run_agent_with_checkpointing(request)
result = _run()
# Log manuale di metriche custom
client = LangSmithClient()
client.create_feedback(
run_id=result.get("thread_id"),
key="user_id",
value=user_id
)
return result
Conclusioni
LangGraph è lo strumento giusto quando:
- Il tuo workflow richiede cicli (retry, quality checks, iterazioni)
- Hai bisogno di checkpointing per resilienza
- Vuoi human-in-the-loop in punti specifici del flusso
- Il sistema è abbastanza complesso da richiedere orchestrazione esplicita
Per workflow semplici lineari, LangChain LCEL è più che sufficiente. Non aggiungere complessità quando non serve.
Il codice in questo articolo è un punto di partenza. In produzione, aggiungi: error handling granulare, timeout per ogni nodo, rate limiting, e monitoring con LangSmith o Langfuse.
Stai costruendo un sistema agente complesso? Contattaci.
Vuoi approfondire per il tuo business?
Richiedi un audit gratuito o prenota una call con un ingegnere.
Richiedi un audit