AI2YOU — AI-FIRST TECHNICAL SERIES
Para Engenheiros de IA, Tech Leads e CTOs que tomam decisões de arquitetura em produção.
1. Você Já Construiu um Agente. Agora Precisa Construir uma Orquestra.
Um agente ReAct que consulta uma API e formata uma resposta é um problema resolvido. Os tutoriais cobrem bem esse terreno. O que a documentação oficial raramente cobre é o que acontece quando você tem oito desses agentes que precisam colaborar, compartilhar estado, se recuperar de falhas uns dos outros e produzir outputs auditáveis em um sistema que processa 400 requisições por hora.
Esse é um problema de categoria diferente.
A transição de agente para sistema multi-agente (MAS) não é uma questão de escalar o que já funciona. É uma re-arquitetura completa do modelo mental. Você para de pensar em "qual prompt produz o melhor output" e começa a pensar em protocolos de comunicação, gestão de estado distribuído, hierarquia de decisão e estratégias de recuperação de falhas.
A evidência empírica é dura: 73% dos projetos MAS falham na fase de integração — não na prova de conceito, não no modelo, mas no momento em que agentes independentes precisam funcionar como um sistema coeso em produção (dado ilustrativo, consistente com a literatura de engenharia de software distribuído). O ponto de falha mais comum não é técnico no sentido de "o modelo alucionou". É arquitetural: estado corrompido entre execuções, ausência de retry logic determinístico, falta de observabilidade quando algo dá errado às 3h da manhã.
Este artigo é um contrato: ao final, você terá um framework prático para tomar decisões de arquitetura entre LangChain/LangGraph e CrewAI, com código de produção comentado, padrões de tolerância a falhas e uma matriz de decisão que funciona para times reais. Sem exemplos de "hello world". Sem promessas de ROI sem base técnica.
2. Fundamentos da Orquestração
2.1 Definição Operacional
Orquestração não é coordenação de prompts em cadeia. Uma chain LangChain clássica — prompt | llm | parser — é composição sequencial de funções. Útil, mas determinístico e frágil: qualquer etapa que falha derruba o pipeline inteiro, não existe noção de estado compartilhado entre chamadas, e não há mecanismo para um componente "pedir ajuda" a outro.
Orquestração é a camada que gerencia:
- Quem executa cada sub-tarefa
- Quando a execução ocorre (dependências, paralelismo)
- O quê é passado entre agentes (contrato de interface)
- O que fazer quando qualquer coisa falha
A analogia do maestro é precisa por um motivo específico: o maestro não toca nenhum instrumento. Ele garante que o oboé entre no compasso correto, que o contrabaixo não afogue o solo de violino, e que quando o trompetista erra uma nota, a peça continue. Em termos de sistema: baixa latência de coordenação, alta tolerância a falhas individuais, coerência do output global.
Os 4 pilares inegociáveis de qualquer MAS em produção:
| Pilar | Problema que resolve | Ausência causa |
|---|
| Comunicação | Como agentes passam dados entre si | Estado inconsistente, re-processamento desnecessário |
| Estado | Persistência de contexto entre execuções | Perda de progresso, reprocessamento custoso |
| Hierarquia | Quem decide, executa, valida | Conflitos de responsabilidade, outputs não-auditáveis |
| Recuperação | O que fazer quando um agente falha | Cascata de falhas, sistema não-determinístico |
2.2 LangChain vs. CrewAI — Posicionamento Correto
A pergunta errada é "qual é melhor". A pergunta correta é "qual resolve o problema específico desta arquitetura".
LangChain/LangGraph é um framework de baixo nível. Você define explicitamente cada nó do grafo, cada aresta condicional, cada transição de estado. O LangGraph compila seu grafo em uma máquina de estados determinística. Você tem controle total — e total responsabilidade por cada detalhe.
CrewAI é uma abstração declarativa. Você define papéis de negócio (Pesquisador, Analista, Estrategista), tarefas e um processo de colaboração. O framework gerencia o fluxo de execução. Você abre mão de controle granular em troca de velocidade de desenvolvimento e legibilidade do código.
Matriz de decisão:
| Critério | LangChain/LangGraph | CrewAI | Híbrido |
|---|
| Controle granular do grafo | ✅ Total | ❌ Abstrato | ✅ Parcial |
| Velocidade de prototipação | 🟡 Média | ✅ Alta | 🟡 Média |
| Complexidade do grafo | ✅ Suporta grafos complexos | 🟡 Linear/Hierárquico | ✅ Flexível |
| Abstração de papéis de negócio | ❌ Manual | ✅ Nativa | ✅ Via CrewAI |
| Observabilidade nativa | ✅ LangSmith | 🟡 Básica | ✅ LangSmith |
| Tolerância a falhas built-in | 🟡 Manual | 🟡 max_iter | ✅ Camadas |
| Curva de aprendizado | 🔴 Alta | ✅ Baixa | 🔴 Alta |
| Times pequenos (1-3 eng.) | 🟡 Viável | ✅ Recomendado | ❌ Custoso |
| Requisitos de auditoria | ✅ Trace completo | 🟡 Limitado | ✅ Trace completo |
3. Arquitetura com LangChain/LangGraph
3.1 Estrutura Base com LangGraph
O modelo mental do LangGraph: um StateGraph é um grafo direcionado onde cada nó é uma função Python que recebe o estado atual e retorna uma atualização de estado. Arestas definem o fluxo. Arestas condicionais permitem roteamento dinâmico baseado no estado.
O exemplo abaixo implementa um sistema de análise de documentos com três agentes especializados:
1# langchain==0.3.x | langgraph==0.2.x | langchain-openai==0.2.x
2
3import logging
4import uuid
5from typing import TypedDict, Annotated, Literal
6from operator import add
7
8from langchain_openai import ChatOpenAI
9from langchain_core.messages import HumanMessage, SystemMessage
10from langgraph.graph import StateGraph, END
11from langgraph.checkpoint.sqlite import SqliteSaver
12
13# Logging estruturado — nunca print() em produção
14logging.basicConfig(
15 level=logging.INFO,
16 format='{"time": "%(asctime)s", "level": "%(levelname)s", "msg": "%(message)s"}'
17)
18logger = logging.getLogger(__name__)
19
20
21class DocumentState(TypedDict):
22 """Estado compartilhado entre todos os agentes do pipeline."""
23 correlation_id: str # ID único da execução para rastreamento
24 raw_content: str # Documento de entrada
25 extracted_data: dict # Output do agente extrator
26 analysis: str # Output do agente analisador
27 final_report: str # Output do agente redator
28 errors: Annotated[list, add] # Acumulador de erros — não sobrescreve
29 retry_count: int # Contador de retentativas por nó
30 status: Literal["running", "completed", "failed"]
31
32
33llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
34
35
36def extractor_node(state: DocumentState) -> dict:
37 """
38 Extrai entidades estruturadas do documento bruto.
39
40 Contrato de output: dict com chaves 'entities', 'dates', 'amounts'.
41 Falhas são sinalizadas via campo 'errors' — nunca levantam exceções.
42 """
43 cid = state["correlation_id"]
44 logger.info(f"extractor_start correlation_id={cid}")
45
46 try:
47 response = llm.invoke([
48 SystemMessage(content=(
49 "Extraia do documento: entidades nomeadas, datas e valores monetários. "
50 "Retorne JSON com chaves: entities (list), dates (list), amounts (list)."
51 )),
52 HumanMessage(content=state["raw_content"])
53 ])
54
55 import json
56 extracted = json.loads(response.content)
57 logger.info(f"extractor_done correlation_id={cid} entities={len(extracted.get('entities', []))}")
58 return {"extracted_data": extracted}
59
60 except Exception as e:
61 logger.error(f"extractor_error correlation_id={cid} error={str(e)}")
62 return {
63 "extracted_data": {},
64 "errors": [{"node": "extractor", "error": str(e), "cid": cid}]
65 }
66
67
68def analyzer_node(state: DocumentState) -> dict:
69 """
70 Analisa os dados extraídos e produz insights estruturados.
71
72 Depende de extracted_data não-vazio. Se vazio, retorna erro
73 sem chamar o LLM — evita custo desnecessário.
74 """
75 cid = state["correlation_id"]
76
77 if not state["extracted_data"]:
78 logger.warning(f"analyzer_skip correlation_id={cid} reason=empty_extracted_data")
79 return {
80 "analysis": "",
81 "errors": [{"node": "analyzer", "error": "extracted_data vazio", "cid": cid}]
82 }
83
84 logger.info(f"analyzer_start correlation_id={cid}")
85
86 response = llm.invoke([
87 SystemMessage(content=(
88 "Com base nos dados extraídos, identifique: "
89 "1) Padrões temporais relevantes, "
90 "2) Anomalias nos valores monetários, "
91 "3) Relações entre entidades. "
92 "Seja conciso e técnico."
93 )),
94 HumanMessage(content=str(state["extracted_data"]))
95 ])
96
97 logger.info(f"analyzer_done correlation_id={cid}")
98 return {"analysis": response.content}
99
100
101def writer_node(state: DocumentState) -> dict:
102 """
103 Consolida extração e análise em relatório executivo estruturado.
104
105 Inclui seção de limitações quando há erros acumulados no estado.
106 """
107 cid = state["correlation_id"]
108 has_errors = len(state.get("errors", [])) > 0
109
110 logger.info(f"writer_start correlation_id={cid} has_errors={has_errors}")
111
112 error_context = ""
113 if has_errors:
114 error_context = f"\n\nNOTA: {len(state['errors'])} erro(s) ocorreram durante o processamento. "
115 error_context += "Inclua uma seção 'Limitações' no relatório."
116
117 response = llm.invoke([
118 SystemMessage(content=(
119 "Gere um relatório executivo estruturado com: "
120 "Sumário Executivo, Achados Principais, Análise de Risco, Recomendações."
121 + error_context
122 )),
123 HumanMessage(content=(
124 f"DADOS EXTRAÍDOS:\n{state['extracted_data']}\n\n"
125 f"ANÁLISE:\n{state['analysis']}"
126 ))
127 ])
128
129 logger.info(f"writer_done correlation_id={cid}")
130 return {
131 "final_report": response.content,
132 "status": "completed"
133 }
134
135
136def should_continue(state: DocumentState) -> Literal["analyzer", "writer", END]:
137 """
138 Aresta condicional: decide o próximo nó com base no estado atual.
139
140 Lógica: se extração falhou completamente, vai direto ao writer
141 para gerar relatório de falha. Caso contrário, fluxo normal.
142 """
143 if not state["extracted_data"] and len(state.get("errors", [])) > 0:
144 # Falha crítica na extração — pula análise, gera relatório de erro
145 return "writer"
146 return "analyzer"
147
148
149def build_document_pipeline() -> StateGraph:
150 """Compila e retorna o grafo de processamento de documentos."""
151 graph = StateGraph(DocumentState)
152
153 # Registrar nós
154 graph.add_node("extractor", extractor_node)
155 graph.add_node("analyzer", analyzer_node)
156 graph.add_node("writer", writer_node)
157
158 # Definir ponto de entrada
159 graph.set_entry_point("extractor")
160
161 # Aresta condicional após extração
162 graph.add_conditional_edges(
163 "extractor",
164 should_continue,
165 {
166 "analyzer": "analyzer",
167 "writer": "writer",
168 }
169 )
170
171 # Arestas determinísticas
172 graph.add_edge("analyzer", "writer")
173 graph.add_edge("writer", END)
174
175 return graph
176
177
178# Uso com checkpointer para persistência de estado
179def run_pipeline(document: str) -> DocumentState:
180 """
181 Executa o pipeline com persistência de estado via SQLite.
182
183 O thread_id permite retomar execuções interrompidas.
184 """
185 checkpointer = SqliteSaver.from_conn_string(":memory:") # use path real em produção
186 pipeline = build_document_pipeline().compile(checkpointer=checkpointer)
187
188 initial_state: DocumentState = {
189 "correlation_id": str(uuid.uuid4()),
190 "raw_content": document,
191 "extracted_data": {},
192 "analysis": "",
193 "final_report": "",
194 "errors": [],
195 "retry_count": 0,
196 "status": "running",
197 }
198
199 config = {"configurable": {"thread_id": initial_state["correlation_id"]}}
200 result = pipeline.invoke(initial_state, config=config)
201 return result
3.2 Padrões de Orquestração com Trade-offs
Sequential — pipeline linear, cada nó recebe o output do anterior.
1# langchain==0.3.x | langgraph==0.2.x
2# Adequado para: processos com dependências estritas de ordem
3# Limitação: latência total = soma das latências individuais
4
5graph.set_entry_point("node_a")
6graph.add_edge("node_a", "node_b")
7graph.add_edge("node_b", "node_c")
8graph.add_edge("node_c", END)
Parallel (fan-out/fan-in) — múltiplos Workers executando simultaneamente com merge dos resultados.
1# Reduz latência para: max(latência_worker_mais_lento)
2# Complexidade: lógica de merge pode ser não-determinística
3
4from langgraph.graph import Send
5
6def fan_out_node(state: dict) -> list[Send]:
7 """Distribui sub-tarefas para Workers paralelos."""
8 tasks = state["tasks"]
9 return [Send("worker_node", {"task": task, "parent_id": state["id"]})
10 for task in tasks]
11
12def merge_node(state: dict) -> dict:
13 """Consolida resultados — cuidado com race conditions no estado."""
14 return {"merged_results": state["partial_results"]}
Hierarchical — agente supervisor decide qual Worker invocar com base no contexto.
1# Adequado para: domínios onde o roteamento não pode ser pré-determinado
2# Limitação: o supervisor é um ponto único de falha e de custo
3
4def supervisor_node(state: dict) -> dict:
5 """
6 Supervisor decide o próximo agente. Usa structured output
7 para garantir que a decisão seja parseável deterministicamente.
8 """
9 from pydantic import BaseModel
10
11 class RoutingDecision(BaseModel):
12 next_agent: Literal["research_worker", "analysis_worker", "writer_worker", "FINISH"]
13 reasoning: str
14
15 structured_llm = llm.with_structured_output(RoutingDecision)
16 decision = structured_llm.invoke(state["messages"])
17 return {"next": decision.next_agent, "routing_log": decision.reasoning}
3.3 State Management em Detalhe
O SqliteSaver é adequado para desenvolvimento e cargas baixas. Em produção com concorrência:
1# langchain==0.3.x | langgraph==0.2.x | redis==5.x
2
3from langgraph.checkpoint.redis import RedisSaver
4
5# Produção: Redis com TTL para evitar acúmulo de estados órfãos
6checkpointer = RedisSaver.from_conn_string(
7 "redis://localhost:6379",
8 ttl={"default": 86400} # 24h — ajuste por tipo de processo
9)
10
11# Pattern de handoff: estado explícito de "pronto para o próximo agente"
12class HandoffState(TypedDict):
13 phase: Literal["extraction", "analysis", "writing", "done"]
14 phase_output: dict # Output da fase atual
15 phase_metadata: dict # Latência, tokens, modelo usado
16 handoff_validated: bool # Critic validou antes do handoff
4. Arquitetura com CrewAI
4.1 Modelo Declarativo de Papéis
CrewAI inverte o paradigma: em vez de definir um grafo técnico, você define responsabilidades de negócio. Um Agent é um papel com um role (cargo), goal (objetivo) e backstory (contexto que molda o comportamento do LLM).
O exemplo abaixo implementa uma Crew de inteligência de mercado:
1# crewai==0.80.x | langchain-openai==0.2.x
2
3import logging
4from typing import Optional
5from pydantic import BaseModel
6
7from crewai import Agent, Task, Crew, Process
8from crewai.tools import BaseTool
9from langchain_openai import ChatOpenAI
10
11logger = logging.getLogger(__name__)
12
13
14# --- Ferramenta customizada ---
15
16class WebSearchTool(BaseTool):
17 """
18 Wrapper de busca web para uso pelos agentes.
19
20 Em produção, substitua por integração real (Tavily, Serper, etc).
21 """
22 name: str = "web_search"
23 description: str = "Busca informações atualizadas na web sobre um tópico."
24
25 def _run(self, query: str) -> str:
26 # Integração real aqui
27 logger.info(f"web_search query={query}")
28 return f"[Resultados simulados para: {query}]"
29
30
31# --- Schema de output estruturado ---
32
33class MarketIntelligenceReport(BaseModel):
34 """Schema Pydantic para output estruturado da Crew."""
35 executive_summary: str
36 key_competitors: list[str]
37 market_size_estimate: str
38 strategic_recommendations: list[str]
39 confidence_score: float # 0.0 - 1.0
40
41
42# --- Definição dos Agentes ---
43
44llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
45
46researcher = Agent(
47 role="Senior Market Research Specialist",
48 goal=(
49 "Coletar dados factuais e atualizados sobre mercado, concorrentes e tendências. "
50 "Priorize fontes primárias. Sinalize quando dados são estimativas."
51 ),
52 backstory=(
53 "Você é um analista de inteligência competitiva com 10 anos de experiência "
54 "em mercados de tecnologia B2B. Você é cético, rigoroso e nunca fabrica dados."
55 ),
56 tools=[WebSearchTool()],
57 llm=llm,
58 max_iter=5, # Limite de iterações — controle de custo
59 verbose=True,
60 allow_delegation=False # Researcher não delega — executa diretamente
61)
62
63analyst = Agent(
64 role="Strategic Intelligence Analyst",
65 goal=(
66 "Transformar dados brutos de mercado em insights acionáveis. "
67 "Identifique padrões, anomalias e oportunidades não-óbvias."
68 ),
69 backstory=(
70 "Você é um analista sênior especializado em síntese de dados complexos. "
71 "Você pensa em sistemas, não em pontos de dados isolados."
72 ),
73 llm=llm,
74 max_iter=3,
75 verbose=True,
76 allow_delegation=False
77)
78
79strategist = Agent(
80 role="Go-to-Market Strategist",
81 goal=(
82 "Converter insights de mercado em recomendações estratégicas concretas "
83 "com critérios de priorização explícitos."
84 ),
85 backstory=(
86 "Você é um estrategista com foco em execução. Suas recomendações sempre "
87 "incluem: o quê fazer, por quê, em que ordem e como medir sucesso."
88 ),
89 llm=llm,
90 max_iter=3,
91 verbose=True,
92 allow_delegation=True # Strategist pode delegar revisões ao Analyst
93)
94
95
96# --- Definição das Tasks ---
97
98research_task = Task(
99 description=(
100 "Pesquise o mercado de {market_segment} com foco em: "
101 "1) Principais players e market share estimado, "
102 "2) Tendências de crescimento nos últimos 18 meses, "
103 "3) Movimentos recentes de M&A ou funding. "
104 "Documente cada fonte utilizada."
105 ),
106 expected_output=(
107 "Relatório de pesquisa com dados brutos organizados por categoria. "
108 "Inclua grau de confiança (alto/médio/baixo) para cada dado."
109 ),
110 agent=researcher
111)
112
113analysis_task = Task(
114 description=(
115 "Com base no relatório de pesquisa, produza: "
116 "1) Análise de posicionamento dos 3 principais competidores, "
117 "2) Identificação de gaps de mercado não endereçados, "
118 "3) Avaliação de ameaças e oportunidades (formato matriz)."
119 ),
120 expected_output=(
121 "Análise estruturada com seções distintas para cada entregável. "
122 "Cada insight deve ser suportado por dados do relatório de pesquisa."
123 ),
124 agent=analyst,
125 context=[research_task] # Dependência explícita
126)
127
128strategy_task = Task(
129 description=(
130 "Com base na análise de mercado, desenvolva recomendações estratégicas "
131 "priorizadas por impacto e viabilidade de execução em 90 dias."
132 ),
133 expected_output=(
134 "Relatório executivo no formato MarketIntelligenceReport com: "
135 "sumário executivo, competidores-chave, estimativa de mercado, "
136 "recomendações priorizadas e score de confiança geral."
137 ),
138 agent=strategist,
139 context=[research_task, analysis_task],
140 output_pydantic=MarketIntelligenceReport # Output estruturado e parseável
141)
142
143
144# --- Assembly da Crew ---
145
146market_intel_crew = Crew(
147 agents=[researcher, analyst, strategist],
148 tasks=[research_task, analysis_task, strategy_task],
149 process=Process.sequential, # Ordem garantida: research → analysis → strategy
150 verbose=True,
151 memory=True, # Habilita memória entre tasks
152 max_rpm=10, # Rate limiting — evita throttling da API
153)
154
155
156def run_market_intelligence(market_segment: str) -> MarketIntelligenceReport:
157 """
158 Executa a Crew de inteligência de mercado para um segmento específico.
159
160 Returns:
161 MarketIntelligenceReport com output estruturado e validado via Pydantic.
162 """
163 logger.info(f"crew_start segment={market_segment}")
164 result = market_intel_crew.kickoff(inputs={"market_segment": market_segment})
165 logger.info(f"crew_done segment={market_segment}")
166 return result.pydantic
4.2 Processos de Colaboração
Hierarchical com Manager LLM — CrewAI instancia automaticamente um agente gerente que decide a ordem e delegação de tasks:
1# crewai==0.80.x
2from crewai import LLM
3
4manager_llm = LLM(model="gpt-4o", temperature=0) # Manager precisa de alta precisão
5
6hierarchical_crew = Crew(
7 agents=[researcher, analyst, strategist],
8 tasks=[research_task, analysis_task, strategy_task],
9 process=Process.hierarchical,
10 manager_llm=manager_llm,
11 verbose=True
12)
Configuração de Memória em Detalhe:
1# crewai==0.80.x — Memória requer configuração explícita de embeddings
2
3from langchain_openai import OpenAIEmbeddings
4
5crew_with_memory = Crew(
6 agents=[researcher, analyst, strategist],
7 tasks=[research_task, analysis_task, strategy_task],
8 process=Process.sequential,
9 memory=True,
10 # Short-term: contexto da execução atual (in-memory)
11 # Long-term: RAG sobre execuções passadas (ChromaDB por padrão)
12 # Entity: grafo de entidades mencionadas
13 embedder={
14 "provider": "openai",
15 "config": {"model": "text-embedding-3-small"}
16 },
17 verbose=True
18)
4.3 Configuração Avançada para Produção
Human-in-the-loop para decisões de alto risco:
1# crewai==0.80.x
2# Human input interrompe a execução e aguarda input via stdin
3# Em produção: integre com webhook ou sistema de aprovação
4
5approval_task = Task(
6 description="Valide se a estratégia proposta está alinhada com os objetivos do negócio.",
7 expected_output="Aprovação ou lista de ajustes necessários.",
8 agent=strategist,
9 human_input=True # Pausa execução para revisão humana
10)
5. Desafios Reais em Produção
5.1 Gerenciamento de Falhas — Não Ignore Isso
O padrão mais comum de falha em MAS não é o agente retornando lixo — é o agente não retornando nada devido a timeout, rate limit ou erro de rede. Retry logic determinístico é não-negociável:
1# langchain==0.3.x | tenacity==8.x
2
3import logging
4from functools import wraps
5from tenacity import (
6 retry,
7 stop_after_attempt,
8 wait_exponential,
9 retry_if_exception_type,
10 before_sleep_log
11)
12from openai import RateLimitError, APITimeoutError, APIConnectionError
13
14logger = logging.getLogger(__name__)
15
16RETRYABLE_EXCEPTIONS = (RateLimitError, APITimeoutError, APIConnectionError)
17
18
19def with_agent_retry(max_attempts: int = 3, min_wait: float = 1.0, max_wait: float = 30.0):
20 """
21 Decorator de retry com backoff exponencial para nós de agente.
22
23 Estratégia: jitter aleatório no wait evita thundering herd
24 quando múltiplos agentes falham simultaneamente.
25 """
26 def decorator(func):
27 @retry(
28 stop=stop_after_attempt(max_attempts),
29 wait=wait_exponential(multiplier=min_wait, max=max_wait),
30 retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS),
31 before_sleep=before_sleep_log(logger, logging.WARNING),
32 reraise=True
33 )
34 @wraps(func)
35 def wrapper(*args, **kwargs):
36 return func(*args, **kwargs)
37 return wrapper
38 return decorator
39
40
41class CircuitBreaker:
42 """
43 Circuit breaker para chamadas a APIs externas.
44
45 Estados: CLOSED (normal) → OPEN (falhas consecutivas) → HALF_OPEN (testando)
46 Evita cascata de falhas quando API downstream está degradada.
47 """
48 def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 60.0):
49 self.failure_count = 0
50 self.failure_threshold = failure_threshold
51 self.recovery_timeout = recovery_timeout
52 self.state = "CLOSED"
53 self.last_failure_time: float = 0
54
55 def call(self, func, *args, **kwargs):
56 import time
57
58 if self.state == "OPEN":
59 if time.time() - self.last_failure_time > self.recovery_timeout:
60 self.state = "HALF_OPEN"
61 logger.info("circuit_breaker state=HALF_OPEN")
62 else:
63 raise RuntimeError(f"Circuit breaker OPEN — aguardando recovery")
64
65 try:
66 result = func(*args, **kwargs)
67 if self.state == "HALF_OPEN":
68 self.state = "CLOSED"
69 self.failure_count = 0
70 logger.info("circuit_breaker state=CLOSED")
71 return result
72 except Exception as e:
73 self.failure_count += 1
74 self.last_failure_time = time.time()
75 if self.failure_count >= self.failure_threshold:
76 self.state = "OPEN"
77 logger.error(f"circuit_breaker state=OPEN failures={self.failure_count}")
78 raise
79
80
81# Agente com fallback: se o modelo principal falha, usa modelo menor
82@with_agent_retry(max_attempts=3)
83def resilient_agent_node(state: dict) -> dict:
84 """Nó de agente com retry automático e fallback de modelo."""
85 try:
86 primary_llm = ChatOpenAI(model="gpt-4o", temperature=0)
87 return _execute_agent_logic(primary_llm, state)
88 except Exception as e:
89 logger.warning(f"primary_model_failed error={str(e)} falling_back=gpt-4o-mini")
90 fallback_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
91 return _execute_agent_logic(fallback_llm, state)
5.2 Observabilidade Não é Opcional
1# langchain==0.3.x | python-json-logger==2.x
2
3import json
4import time
5import uuid
6from contextlib import contextmanager
7from pythonjsonlogger import jsonlogger
8
9# Configuração de logging estruturado em JSON
10handler = logging.StreamHandler()
11handler.setFormatter(jsonlogger.JsonFormatter(
12 fmt="%(asctime)s %(levelname)s %(name)s %(message)s"
13))
14logger = logging.getLogger("mas.agent")
15logger.addHandler(handler)
16
17
18@contextmanager
19def agent_trace(agent_name: str, correlation_id: str):
20 """
21 Context manager para tracing de execução de agentes.
22
23 Captura: latência, status, custo estimado, tokens utilizados.
24 Compatível com LangSmith via callbacks quando configurado.
25 """
26 span_id = str(uuid.uuid4())[:8]
27 start_time = time.perf_counter()
28
29 logger.info("agent_start", extra={
30 "agent": agent_name,
31 "correlation_id": correlation_id,
32 "span_id": span_id,
33 "event": "span_start"
34 })
35
36 try:
37 yield span_id
38 elapsed_ms = (time.perf_counter() - start_time) * 1000
39 logger.info("agent_success", extra={
40 "agent": agent_name,
41 "correlation_id": correlation_id,
42 "span_id": span_id,
43 "latency_ms": round(elapsed_ms, 2),
44 "event": "span_end",
45 "status": "success"
46 })
47 except Exception as e:
48 elapsed_ms = (time.perf_counter() - start_time) * 1000
49 logger.error("agent_error", extra={
50 "agent": agent_name,
51 "correlation_id": correlation_id,
52 "span_id": span_id,
53 "latency_ms": round(elapsed_ms, 2),
54 "event": "span_end",
55 "status": "error",
56 "error_type": type(e).__name__,
57 "error_msg": str(e)
58 })
59 raise
Formato de log para auditoria — cada entrada é um objeto JSON independente, parseável por qualquer sistema de log aggregation (Datadog, CloudWatch, Loki):
1{
2 "asctime": "2026-03-05T14:32:01.234Z",
3 "levelname": "INFO",
4 "agent": "analyzer_node",
5 "correlation_id": "a3f7c2d1-8b4e-4f9a-b2c1-d5e8f0a1b3c4",
6 "span_id": "7f2c3a1b",
7 "latency_ms": 1243.7,
8 "event": "span_end",
9 "status": "success",
10 "tokens_used": 847,
11 "model": "gpt-4o-mini",
12 "estimated_cost_usd": 0.000423
13}
5.3 Custos e Rate Limiting
O custo de um pipeline MAS não é a soma dos custos individuais — é amplificado por retries, contextos redundantes entre agentes e chamadas desnecessárias quando o estado já satisfaz a condição de saída.
1# langchain==0.3.x | gptcache==0.1.x
2
3from gptcache import cache
4from gptcache.adapter import openai as cached_openai
5from gptcache.embedding import Onnx
6
7# Semantic caching: requisições semanticamente similares
8# reutilizam respostas anteriores — redução de 30-60% em custo
9# em workflows com perguntas repetitivas (dado ilustrativo)
10onnx = Onnx()
11cache.init(embedding_func=onnx.to_embeddings)
12cache.set_openai_key()
13
14# Estimativa de custo por padrão de orquestração
15# (baseado em gpt-4o-mini a $0.15/1M input tokens — verificar preço atual)
16COST_ESTIMATES = {
17 "sequential_5_agents": "~$0.002-0.008 por execução",
18 "parallel_5_agents": "~$0.002-0.008 por execução (mesma chamada, menor latência)",
19 "hierarchical_supervisor": "~$0.005-0.020 por execução (+custo do supervisor)",
20 "crew_sequential_3_agents": "~$0.003-0.012 por execução"
21}
6. Decisão de Arquitetura — Tabela Comparativa Completa
| Critério | LangChain/LangGraph | CrewAI | Híbrido |
|---|
| Controle do grafo de execução | Total — você define cada aresta | Abstrato — framework gerencia | LangGraph para sub-grafos críticos |
| Velocidade de prototipação | 3-5 dias para MAS básico | 1-2 dias para MAS básico | 4-7 dias |
| Abstração de papéis de negócio | Manual — requer mapeamento explícito | Nativa — role/goal/backstory | Via CrewAI na camada de negócio |
| Observabilidade nativa | LangSmith (trace completo) | Básica (verbose logs) | LangSmith para todo o sistema |
| Tolerância a falhas built-in | Nenhuma — implemente você mesmo | max_iter, max_rpm | Camadas: LangGraph + tenacity |
| Curva de aprendizado | Alta — requer conhecimento de grafos | Baixa — declarativo e intuitivo | Alta |
| Ecossistema de integrações | 500+ integrações nativas | ~100 integrações | Melhor dos dois |
| Adequado para times pequenos | Viável com esforço | Recomendado | Custoso de manter |
| Requisitos de auditoria regulatória | Completo via LangSmith | Limitado | Completo |
| Grafos com lógica condicional complexa | Nativo | Não suportado | LangGraph para essa camada |
| Output estruturado (Pydantic) | Via structured output do LLM | Nativo via output_pydantic | Ambos suportam |
| Human-in-the-loop | Via interrupt/resume no LangGraph | Via human_input=True | Ambos suportam |
Regra de decisão em 3 linhas:
- Use LangGraph quando o grafo de execução tem lógica condicional complexa, requisitos de auditoria regulatória rigorosos, ou quando o time tem engenheiros sênior disponíveis para manter a infraestrutura.
- Use CrewAI quando o domínio de negócio é bem mapeável em papéis, o time é pequeno, o prazo é curto, e controle granular do grafo não é requisito.
- Use híbrido quando a Crew de negócio (CrewAI) precisa de sub-grafos técnicos confiáveis para tarefas críticas — CrewAI orquestra o fluxo de negócio, LangGraph executa as etapas que exigem determinismo e observabilidade completa.
7. Conclusão
Três insights que não estão na documentação oficial e só emergem em produção:
1. O estado compartilhado é o contrato mais importante do sistema. Antes de escrever qualquer código de agente, defina o schema completo do estado. Mudanças tardias no TypedDict ou no schema Pydantic quebram checkpoints persistidos e exigem migrações. Trate o estado como você trataria um schema de banco de dados.
2. O Critic (validador) reduz custo total, não aumenta. A intuição de que "mais um agente = mais custo" é incorreta quando o Critic elimina reprocessamentos causados por outputs inválidos chegando nas etapas seguintes. Em pipelines com mais de 4 agentes, um Critic bem calibrado reduz o custo total em 15-35% (dado ilustrativo).
3. CrewAI e LangGraph não competem — estratificam. O padrão mais robusto observado em produção usa CrewAI para definir "o quê fazer" (orquestração de negócio) e LangGraph para definir "como fazer com garantias" (sub-grafos de execução crítica). A separação de responsabilidades é limpa e o código resultante é mais legível do que monolitos em qualquer dos dois frameworks.
Próximos passos concretos:
- Implemente o
DocumentState com um processo real de baixo risco — não crie o estado "perfeito" de primeira. Ele vai evoluir.
- Configure LangSmith ou equivalente antes de ir a produção — debugar MAS sem tracing é ordens de magnitude mais custoso do que com.
- Escreva testes unitários para cada nó de agente com estados de entrada fixos — nodes são funções puras, são testáveis.
Palavras-chave: Orquestração de Agentes LangChain CrewAI, LangGraph produção, CrewAI tutorial avançado, Multi-Agent Systems Python, arquitetura de agentes IA, LLM orchestration framework.
Publicado por AI2You — AI-First Technical Series | ai2you.online/pt/blog