Orquestração de Agentes com LangChain e CrewAI: Do Conceito à Produção

AI2You

AI2You | Evolução Humana & IA

2026-03-05

Visualização futurista em 3D isométrica de dois grafos de orquestração de IA interconectados no espaço profundo. À esquerda, uma rede de nós estilo máquina de estados com conexões em azul elétrico; à direita, uma hierarquia de agentes representados por formas geométricas abstratas. Fluxos de dados em ciano unem as duas estruturas em um pipeline central, com raios de luz volumétricos e partículas sobre fundo azul-marinho escuro.
Framework prático para orquestrar sistemas multi-agentes em produção com LangChain/LangGraph e CrewAI — incluindo state management, tolerância a falhas, observabilidade e critérios de decisão de arquitetura.

AI2YOU — AI-FIRST TECHNICAL SERIES

Para Engenheiros de IA, Tech Leads e CTOs que tomam decisões de arquitetura em produção.

1. Você Já Construiu um Agente. Agora Precisa Construir uma Orquestra.

Um agente ReAct que consulta uma API e formata uma resposta é um problema resolvido. Os tutoriais cobrem bem esse terreno. O que a documentação oficial raramente cobre é o que acontece quando você tem oito desses agentes que precisam colaborar, compartilhar estado, se recuperar de falhas uns dos outros e produzir outputs auditáveis em um sistema que processa 400 requisições por hora.

Esse é um problema de categoria diferente.

A transição de agente para sistema multi-agente (MAS) não é uma questão de escalar o que já funciona. É uma re-arquitetura completa do modelo mental. Você para de pensar em "qual prompt produz o melhor output" e começa a pensar em protocolos de comunicação, gestão de estado distribuído, hierarquia de decisão e estratégias de recuperação de falhas.

A evidência empírica é dura: 73% dos projetos MAS falham na fase de integração — não na prova de conceito, não no modelo, mas no momento em que agentes independentes precisam funcionar como um sistema coeso em produção (dado ilustrativo, consistente com a literatura de engenharia de software distribuído). O ponto de falha mais comum não é técnico no sentido de "o modelo alucionou". É arquitetural: estado corrompido entre execuções, ausência de retry logic determinístico, falta de observabilidade quando algo dá errado às 3h da manhã.

Este artigo é um contrato: ao final, você terá um framework prático para tomar decisões de arquitetura entre LangChain/LangGraph e CrewAI, com código de produção comentado, padrões de tolerância a falhas e uma matriz de decisão que funciona para times reais. Sem exemplos de "hello world". Sem promessas de ROI sem base técnica.

2. Fundamentos da Orquestração

2.1 Definição Operacional

Orquestração não é coordenação de prompts em cadeia. Uma chain LangChain clássica — prompt | llm | parser — é composição sequencial de funções. Útil, mas determinístico e frágil: qualquer etapa que falha derruba o pipeline inteiro, não existe noção de estado compartilhado entre chamadas, e não há mecanismo para um componente "pedir ajuda" a outro.

Orquestração é a camada que gerencia:

  • Quem executa cada sub-tarefa
  • Quando a execução ocorre (dependências, paralelismo)
  • O quê é passado entre agentes (contrato de interface)
  • O que fazer quando qualquer coisa falha

A analogia do maestro é precisa por um motivo específico: o maestro não toca nenhum instrumento. Ele garante que o oboé entre no compasso correto, que o contrabaixo não afogue o solo de violino, e que quando o trompetista erra uma nota, a peça continue. Em termos de sistema: baixa latência de coordenação, alta tolerância a falhas individuais, coerência do output global.

Os 4 pilares inegociáveis de qualquer MAS em produção:

PilarProblema que resolveAusência causa
ComunicaçãoComo agentes passam dados entre siEstado inconsistente, re-processamento desnecessário
EstadoPersistência de contexto entre execuçõesPerda de progresso, reprocessamento custoso
HierarquiaQuem decide, executa, validaConflitos de responsabilidade, outputs não-auditáveis
RecuperaçãoO que fazer quando um agente falhaCascata de falhas, sistema não-determinístico

2.2 LangChain vs. CrewAI — Posicionamento Correto

A pergunta errada é "qual é melhor". A pergunta correta é "qual resolve o problema específico desta arquitetura".

LangChain/LangGraph é um framework de baixo nível. Você define explicitamente cada nó do grafo, cada aresta condicional, cada transição de estado. O LangGraph compila seu grafo em uma máquina de estados determinística. Você tem controle total — e total responsabilidade por cada detalhe.

CrewAI é uma abstração declarativa. Você define papéis de negócio (Pesquisador, Analista, Estrategista), tarefas e um processo de colaboração. O framework gerencia o fluxo de execução. Você abre mão de controle granular em troca de velocidade de desenvolvimento e legibilidade do código.

Matriz de decisão:

CritérioLangChain/LangGraphCrewAIHíbrido
Controle granular do grafo✅ Total❌ Abstrato✅ Parcial
Velocidade de prototipação🟡 Média✅ Alta🟡 Média
Complexidade do grafo✅ Suporta grafos complexos🟡 Linear/Hierárquico✅ Flexível
Abstração de papéis de negócio❌ Manual✅ Nativa✅ Via CrewAI
Observabilidade nativa✅ LangSmith🟡 Básica✅ LangSmith
Tolerância a falhas built-in🟡 Manual🟡 max_iter✅ Camadas
Curva de aprendizado🔴 Alta✅ Baixa🔴 Alta
Times pequenos (1-3 eng.)🟡 Viável✅ Recomendado❌ Custoso
Requisitos de auditoria✅ Trace completo🟡 Limitado✅ Trace completo

3. Arquitetura com LangChain/LangGraph

3.1 Estrutura Base com LangGraph

O modelo mental do LangGraph: um StateGraph é um grafo direcionado onde cada é uma função Python que recebe o estado atual e retorna uma atualização de estado. Arestas definem o fluxo. Arestas condicionais permitem roteamento dinâmico baseado no estado.

O exemplo abaixo implementa um sistema de análise de documentos com três agentes especializados:

python
1# langchain==0.3.x | langgraph==0.2.x | langchain-openai==0.2.x 2 3import logging 4import uuid 5from typing import TypedDict, Annotated, Literal 6from operator import add 7 8from langchain_openai import ChatOpenAI 9from langchain_core.messages import HumanMessage, SystemMessage 10from langgraph.graph import StateGraph, END 11from langgraph.checkpoint.sqlite import SqliteSaver 12 13# Logging estruturado — nunca print() em produção 14logging.basicConfig( 15 level=logging.INFO, 16 format='{"time": "%(asctime)s", "level": "%(levelname)s", "msg": "%(message)s"}' 17) 18logger = logging.getLogger(__name__) 19 20 21class DocumentState(TypedDict): 22 """Estado compartilhado entre todos os agentes do pipeline.""" 23 correlation_id: str # ID único da execução para rastreamento 24 raw_content: str # Documento de entrada 25 extracted_data: dict # Output do agente extrator 26 analysis: str # Output do agente analisador 27 final_report: str # Output do agente redator 28 errors: Annotated[list, add] # Acumulador de erros — não sobrescreve 29 retry_count: int # Contador de retentativas por nó 30 status: Literal["running", "completed", "failed"] 31 32 33llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) 34 35 36def extractor_node(state: DocumentState) -> dict: 37 """ 38 Extrai entidades estruturadas do documento bruto. 39 40 Contrato de output: dict com chaves 'entities', 'dates', 'amounts'. 41 Falhas são sinalizadas via campo 'errors' — nunca levantam exceções. 42 """ 43 cid = state["correlation_id"] 44 logger.info(f"extractor_start correlation_id={cid}") 45 46 try: 47 response = llm.invoke([ 48 SystemMessage(content=( 49 "Extraia do documento: entidades nomeadas, datas e valores monetários. " 50 "Retorne JSON com chaves: entities (list), dates (list), amounts (list)." 51 )), 52 HumanMessage(content=state["raw_content"]) 53 ]) 54 55 import json 56 extracted = json.loads(response.content) 57 logger.info(f"extractor_done correlation_id={cid} entities={len(extracted.get('entities', []))}") 58 return {"extracted_data": extracted} 59 60 except Exception as e: 61 logger.error(f"extractor_error correlation_id={cid} error={str(e)}") 62 return { 63 "extracted_data": {}, 64 "errors": [{"node": "extractor", "error": str(e), "cid": cid}] 65 } 66 67 68def analyzer_node(state: DocumentState) -> dict: 69 """ 70 Analisa os dados extraídos e produz insights estruturados. 71 72 Depende de extracted_data não-vazio. Se vazio, retorna erro 73 sem chamar o LLM — evita custo desnecessário. 74 """ 75 cid = state["correlation_id"] 76 77 if not state["extracted_data"]: 78 logger.warning(f"analyzer_skip correlation_id={cid} reason=empty_extracted_data") 79 return { 80 "analysis": "", 81 "errors": [{"node": "analyzer", "error": "extracted_data vazio", "cid": cid}] 82 } 83 84 logger.info(f"analyzer_start correlation_id={cid}") 85 86 response = llm.invoke([ 87 SystemMessage(content=( 88 "Com base nos dados extraídos, identifique: " 89 "1) Padrões temporais relevantes, " 90 "2) Anomalias nos valores monetários, " 91 "3) Relações entre entidades. " 92 "Seja conciso e técnico." 93 )), 94 HumanMessage(content=str(state["extracted_data"])) 95 ]) 96 97 logger.info(f"analyzer_done correlation_id={cid}") 98 return {"analysis": response.content} 99 100 101def writer_node(state: DocumentState) -> dict: 102 """ 103 Consolida extração e análise em relatório executivo estruturado. 104 105 Inclui seção de limitações quando há erros acumulados no estado. 106 """ 107 cid = state["correlation_id"] 108 has_errors = len(state.get("errors", [])) > 0 109 110 logger.info(f"writer_start correlation_id={cid} has_errors={has_errors}") 111 112 error_context = "" 113 if has_errors: 114 error_context = f"\n\nNOTA: {len(state['errors'])} erro(s) ocorreram durante o processamento. " 115 error_context += "Inclua uma seção 'Limitações' no relatório." 116 117 response = llm.invoke([ 118 SystemMessage(content=( 119 "Gere um relatório executivo estruturado com: " 120 "Sumário Executivo, Achados Principais, Análise de Risco, Recomendações." 121 + error_context 122 )), 123 HumanMessage(content=( 124 f"DADOS EXTRAÍDOS:\n{state['extracted_data']}\n\n" 125 f"ANÁLISE:\n{state['analysis']}" 126 )) 127 ]) 128 129 logger.info(f"writer_done correlation_id={cid}") 130 return { 131 "final_report": response.content, 132 "status": "completed" 133 } 134 135 136def should_continue(state: DocumentState) -> Literal["analyzer", "writer", END]: 137 """ 138 Aresta condicional: decide o próximo nó com base no estado atual. 139 140 Lógica: se extração falhou completamente, vai direto ao writer 141 para gerar relatório de falha. Caso contrário, fluxo normal. 142 """ 143 if not state["extracted_data"] and len(state.get("errors", [])) > 0: 144 # Falha crítica na extração — pula análise, gera relatório de erro 145 return "writer" 146 return "analyzer" 147 148 149def build_document_pipeline() -> StateGraph: 150 """Compila e retorna o grafo de processamento de documentos.""" 151 graph = StateGraph(DocumentState) 152 153 # Registrar nós 154 graph.add_node("extractor", extractor_node) 155 graph.add_node("analyzer", analyzer_node) 156 graph.add_node("writer", writer_node) 157 158 # Definir ponto de entrada 159 graph.set_entry_point("extractor") 160 161 # Aresta condicional após extração 162 graph.add_conditional_edges( 163 "extractor", 164 should_continue, 165 { 166 "analyzer": "analyzer", 167 "writer": "writer", 168 } 169 ) 170 171 # Arestas determinísticas 172 graph.add_edge("analyzer", "writer") 173 graph.add_edge("writer", END) 174 175 return graph 176 177 178# Uso com checkpointer para persistência de estado 179def run_pipeline(document: str) -> DocumentState: 180 """ 181 Executa o pipeline com persistência de estado via SQLite. 182 183 O thread_id permite retomar execuções interrompidas. 184 """ 185 checkpointer = SqliteSaver.from_conn_string(":memory:") # use path real em produção 186 pipeline = build_document_pipeline().compile(checkpointer=checkpointer) 187 188 initial_state: DocumentState = { 189 "correlation_id": str(uuid.uuid4()), 190 "raw_content": document, 191 "extracted_data": {}, 192 "analysis": "", 193 "final_report": "", 194 "errors": [], 195 "retry_count": 0, 196 "status": "running", 197 } 198 199 config = {"configurable": {"thread_id": initial_state["correlation_id"]}} 200 result = pipeline.invoke(initial_state, config=config) 201 return result

3.2 Padrões de Orquestração com Trade-offs

Sequential — pipeline linear, cada nó recebe o output do anterior.

python
1# langchain==0.3.x | langgraph==0.2.x 2# Adequado para: processos com dependências estritas de ordem 3# Limitação: latência total = soma das latências individuais 4 5graph.set_entry_point("node_a") 6graph.add_edge("node_a", "node_b") 7graph.add_edge("node_b", "node_c") 8graph.add_edge("node_c", END)

Parallel (fan-out/fan-in) — múltiplos Workers executando simultaneamente com merge dos resultados.

python
1# Reduz latência para: max(latência_worker_mais_lento) 2# Complexidade: lógica de merge pode ser não-determinística 3 4from langgraph.graph import Send 5 6def fan_out_node(state: dict) -> list[Send]: 7 """Distribui sub-tarefas para Workers paralelos.""" 8 tasks = state["tasks"] 9 return [Send("worker_node", {"task": task, "parent_id": state["id"]}) 10 for task in tasks] 11 12def merge_node(state: dict) -> dict: 13 """Consolida resultados — cuidado com race conditions no estado.""" 14 return {"merged_results": state["partial_results"]}

Hierarchical — agente supervisor decide qual Worker invocar com base no contexto.

python
1# Adequado para: domínios onde o roteamento não pode ser pré-determinado 2# Limitação: o supervisor é um ponto único de falha e de custo 3 4def supervisor_node(state: dict) -> dict: 5 """ 6 Supervisor decide o próximo agente. Usa structured output 7 para garantir que a decisão seja parseável deterministicamente. 8 """ 9 from pydantic import BaseModel 10 11 class RoutingDecision(BaseModel): 12 next_agent: Literal["research_worker", "analysis_worker", "writer_worker", "FINISH"] 13 reasoning: str 14 15 structured_llm = llm.with_structured_output(RoutingDecision) 16 decision = structured_llm.invoke(state["messages"]) 17 return {"next": decision.next_agent, "routing_log": decision.reasoning}

3.3 State Management em Detalhe

O SqliteSaver é adequado para desenvolvimento e cargas baixas. Em produção com concorrência:

python
1# langchain==0.3.x | langgraph==0.2.x | redis==5.x 2 3from langgraph.checkpoint.redis import RedisSaver 4 5# Produção: Redis com TTL para evitar acúmulo de estados órfãos 6checkpointer = RedisSaver.from_conn_string( 7 "redis://localhost:6379", 8 ttl={"default": 86400} # 24h — ajuste por tipo de processo 9) 10 11# Pattern de handoff: estado explícito de "pronto para o próximo agente" 12class HandoffState(TypedDict): 13 phase: Literal["extraction", "analysis", "writing", "done"] 14 phase_output: dict # Output da fase atual 15 phase_metadata: dict # Latência, tokens, modelo usado 16 handoff_validated: bool # Critic validou antes do handoff

4. Arquitetura com CrewAI

4.1 Modelo Declarativo de Papéis

CrewAI inverte o paradigma: em vez de definir um grafo técnico, você define responsabilidades de negócio. Um Agent é um papel com um role (cargo), goal (objetivo) e backstory (contexto que molda o comportamento do LLM).

O exemplo abaixo implementa uma Crew de inteligência de mercado:

python
1# crewai==0.80.x | langchain-openai==0.2.x 2 3import logging 4from typing import Optional 5from pydantic import BaseModel 6 7from crewai import Agent, Task, Crew, Process 8from crewai.tools import BaseTool 9from langchain_openai import ChatOpenAI 10 11logger = logging.getLogger(__name__) 12 13 14# --- Ferramenta customizada --- 15 16class WebSearchTool(BaseTool): 17 """ 18 Wrapper de busca web para uso pelos agentes. 19 20 Em produção, substitua por integração real (Tavily, Serper, etc). 21 """ 22 name: str = "web_search" 23 description: str = "Busca informações atualizadas na web sobre um tópico." 24 25 def _run(self, query: str) -> str: 26 # Integração real aqui 27 logger.info(f"web_search query={query}") 28 return f"[Resultados simulados para: {query}]" 29 30 31# --- Schema de output estruturado --- 32 33class MarketIntelligenceReport(BaseModel): 34 """Schema Pydantic para output estruturado da Crew.""" 35 executive_summary: str 36 key_competitors: list[str] 37 market_size_estimate: str 38 strategic_recommendations: list[str] 39 confidence_score: float # 0.0 - 1.0 40 41 42# --- Definição dos Agentes --- 43 44llm = ChatOpenAI(model="gpt-4o", temperature=0.1) 45 46researcher = Agent( 47 role="Senior Market Research Specialist", 48 goal=( 49 "Coletar dados factuais e atualizados sobre mercado, concorrentes e tendências. " 50 "Priorize fontes primárias. Sinalize quando dados são estimativas." 51 ), 52 backstory=( 53 "Você é um analista de inteligência competitiva com 10 anos de experiência " 54 "em mercados de tecnologia B2B. Você é cético, rigoroso e nunca fabrica dados." 55 ), 56 tools=[WebSearchTool()], 57 llm=llm, 58 max_iter=5, # Limite de iterações — controle de custo 59 verbose=True, 60 allow_delegation=False # Researcher não delega — executa diretamente 61) 62 63analyst = Agent( 64 role="Strategic Intelligence Analyst", 65 goal=( 66 "Transformar dados brutos de mercado em insights acionáveis. " 67 "Identifique padrões, anomalias e oportunidades não-óbvias." 68 ), 69 backstory=( 70 "Você é um analista sênior especializado em síntese de dados complexos. " 71 "Você pensa em sistemas, não em pontos de dados isolados." 72 ), 73 llm=llm, 74 max_iter=3, 75 verbose=True, 76 allow_delegation=False 77) 78 79strategist = Agent( 80 role="Go-to-Market Strategist", 81 goal=( 82 "Converter insights de mercado em recomendações estratégicas concretas " 83 "com critérios de priorização explícitos." 84 ), 85 backstory=( 86 "Você é um estrategista com foco em execução. Suas recomendações sempre " 87 "incluem: o quê fazer, por quê, em que ordem e como medir sucesso." 88 ), 89 llm=llm, 90 max_iter=3, 91 verbose=True, 92 allow_delegation=True # Strategist pode delegar revisões ao Analyst 93) 94 95 96# --- Definição das Tasks --- 97 98research_task = Task( 99 description=( 100 "Pesquise o mercado de {market_segment} com foco em: " 101 "1) Principais players e market share estimado, " 102 "2) Tendências de crescimento nos últimos 18 meses, " 103 "3) Movimentos recentes de M&A ou funding. " 104 "Documente cada fonte utilizada." 105 ), 106 expected_output=( 107 "Relatório de pesquisa com dados brutos organizados por categoria. " 108 "Inclua grau de confiança (alto/médio/baixo) para cada dado." 109 ), 110 agent=researcher 111) 112 113analysis_task = Task( 114 description=( 115 "Com base no relatório de pesquisa, produza: " 116 "1) Análise de posicionamento dos 3 principais competidores, " 117 "2) Identificação de gaps de mercado não endereçados, " 118 "3) Avaliação de ameaças e oportunidades (formato matriz)." 119 ), 120 expected_output=( 121 "Análise estruturada com seções distintas para cada entregável. " 122 "Cada insight deve ser suportado por dados do relatório de pesquisa." 123 ), 124 agent=analyst, 125 context=[research_task] # Dependência explícita 126) 127 128strategy_task = Task( 129 description=( 130 "Com base na análise de mercado, desenvolva recomendações estratégicas " 131 "priorizadas por impacto e viabilidade de execução em 90 dias." 132 ), 133 expected_output=( 134 "Relatório executivo no formato MarketIntelligenceReport com: " 135 "sumário executivo, competidores-chave, estimativa de mercado, " 136 "recomendações priorizadas e score de confiança geral." 137 ), 138 agent=strategist, 139 context=[research_task, analysis_task], 140 output_pydantic=MarketIntelligenceReport # Output estruturado e parseável 141) 142 143 144# --- Assembly da Crew --- 145 146market_intel_crew = Crew( 147 agents=[researcher, analyst, strategist], 148 tasks=[research_task, analysis_task, strategy_task], 149 process=Process.sequential, # Ordem garantida: research → analysis → strategy 150 verbose=True, 151 memory=True, # Habilita memória entre tasks 152 max_rpm=10, # Rate limiting — evita throttling da API 153) 154 155 156def run_market_intelligence(market_segment: str) -> MarketIntelligenceReport: 157 """ 158 Executa a Crew de inteligência de mercado para um segmento específico. 159 160 Returns: 161 MarketIntelligenceReport com output estruturado e validado via Pydantic. 162 """ 163 logger.info(f"crew_start segment={market_segment}") 164 result = market_intel_crew.kickoff(inputs={"market_segment": market_segment}) 165 logger.info(f"crew_done segment={market_segment}") 166 return result.pydantic

4.2 Processos de Colaboração

Hierarchical com Manager LLM — CrewAI instancia automaticamente um agente gerente que decide a ordem e delegação de tasks:

python
1# crewai==0.80.x 2from crewai import LLM 3 4manager_llm = LLM(model="gpt-4o", temperature=0) # Manager precisa de alta precisão 5 6hierarchical_crew = Crew( 7 agents=[researcher, analyst, strategist], 8 tasks=[research_task, analysis_task, strategy_task], 9 process=Process.hierarchical, 10 manager_llm=manager_llm, 11 verbose=True 12)

Configuração de Memória em Detalhe:

python
1# crewai==0.80.x — Memória requer configuração explícita de embeddings 2 3from langchain_openai import OpenAIEmbeddings 4 5crew_with_memory = Crew( 6 agents=[researcher, analyst, strategist], 7 tasks=[research_task, analysis_task, strategy_task], 8 process=Process.sequential, 9 memory=True, 10 # Short-term: contexto da execução atual (in-memory) 11 # Long-term: RAG sobre execuções passadas (ChromaDB por padrão) 12 # Entity: grafo de entidades mencionadas 13 embedder={ 14 "provider": "openai", 15 "config": {"model": "text-embedding-3-small"} 16 }, 17 verbose=True 18)

4.3 Configuração Avançada para Produção

Human-in-the-loop para decisões de alto risco:

python
1# crewai==0.80.x 2# Human input interrompe a execução e aguarda input via stdin 3# Em produção: integre com webhook ou sistema de aprovação 4 5approval_task = Task( 6 description="Valide se a estratégia proposta está alinhada com os objetivos do negócio.", 7 expected_output="Aprovação ou lista de ajustes necessários.", 8 agent=strategist, 9 human_input=True # Pausa execução para revisão humana 10)

5. Desafios Reais em Produção

5.1 Gerenciamento de Falhas — Não Ignore Isso

O padrão mais comum de falha em MAS não é o agente retornando lixo — é o agente não retornando nada devido a timeout, rate limit ou erro de rede. Retry logic determinístico é não-negociável:

python
1# langchain==0.3.x | tenacity==8.x 2 3import logging 4from functools import wraps 5from tenacity import ( 6 retry, 7 stop_after_attempt, 8 wait_exponential, 9 retry_if_exception_type, 10 before_sleep_log 11) 12from openai import RateLimitError, APITimeoutError, APIConnectionError 13 14logger = logging.getLogger(__name__) 15 16RETRYABLE_EXCEPTIONS = (RateLimitError, APITimeoutError, APIConnectionError) 17 18 19def with_agent_retry(max_attempts: int = 3, min_wait: float = 1.0, max_wait: float = 30.0): 20 """ 21 Decorator de retry com backoff exponencial para nós de agente. 22 23 Estratégia: jitter aleatório no wait evita thundering herd 24 quando múltiplos agentes falham simultaneamente. 25 """ 26 def decorator(func): 27 @retry( 28 stop=stop_after_attempt(max_attempts), 29 wait=wait_exponential(multiplier=min_wait, max=max_wait), 30 retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS), 31 before_sleep=before_sleep_log(logger, logging.WARNING), 32 reraise=True 33 ) 34 @wraps(func) 35 def wrapper(*args, **kwargs): 36 return func(*args, **kwargs) 37 return wrapper 38 return decorator 39 40 41class CircuitBreaker: 42 """ 43 Circuit breaker para chamadas a APIs externas. 44 45 Estados: CLOSED (normal) → OPEN (falhas consecutivas) → HALF_OPEN (testando) 46 Evita cascata de falhas quando API downstream está degradada. 47 """ 48 def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 60.0): 49 self.failure_count = 0 50 self.failure_threshold = failure_threshold 51 self.recovery_timeout = recovery_timeout 52 self.state = "CLOSED" 53 self.last_failure_time: float = 0 54 55 def call(self, func, *args, **kwargs): 56 import time 57 58 if self.state == "OPEN": 59 if time.time() - self.last_failure_time > self.recovery_timeout: 60 self.state = "HALF_OPEN" 61 logger.info("circuit_breaker state=HALF_OPEN") 62 else: 63 raise RuntimeError(f"Circuit breaker OPEN — aguardando recovery") 64 65 try: 66 result = func(*args, **kwargs) 67 if self.state == "HALF_OPEN": 68 self.state = "CLOSED" 69 self.failure_count = 0 70 logger.info("circuit_breaker state=CLOSED") 71 return result 72 except Exception as e: 73 self.failure_count += 1 74 self.last_failure_time = time.time() 75 if self.failure_count >= self.failure_threshold: 76 self.state = "OPEN" 77 logger.error(f"circuit_breaker state=OPEN failures={self.failure_count}") 78 raise 79 80 81# Agente com fallback: se o modelo principal falha, usa modelo menor 82@with_agent_retry(max_attempts=3) 83def resilient_agent_node(state: dict) -> dict: 84 """Nó de agente com retry automático e fallback de modelo.""" 85 try: 86 primary_llm = ChatOpenAI(model="gpt-4o", temperature=0) 87 return _execute_agent_logic(primary_llm, state) 88 except Exception as e: 89 logger.warning(f"primary_model_failed error={str(e)} falling_back=gpt-4o-mini") 90 fallback_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) 91 return _execute_agent_logic(fallback_llm, state)

5.2 Observabilidade Não é Opcional

python
1# langchain==0.3.x | python-json-logger==2.x 2 3import json 4import time 5import uuid 6from contextlib import contextmanager 7from pythonjsonlogger import jsonlogger 8 9# Configuração de logging estruturado em JSON 10handler = logging.StreamHandler() 11handler.setFormatter(jsonlogger.JsonFormatter( 12 fmt="%(asctime)s %(levelname)s %(name)s %(message)s" 13)) 14logger = logging.getLogger("mas.agent") 15logger.addHandler(handler) 16 17 18@contextmanager 19def agent_trace(agent_name: str, correlation_id: str): 20 """ 21 Context manager para tracing de execução de agentes. 22 23 Captura: latência, status, custo estimado, tokens utilizados. 24 Compatível com LangSmith via callbacks quando configurado. 25 """ 26 span_id = str(uuid.uuid4())[:8] 27 start_time = time.perf_counter() 28 29 logger.info("agent_start", extra={ 30 "agent": agent_name, 31 "correlation_id": correlation_id, 32 "span_id": span_id, 33 "event": "span_start" 34 }) 35 36 try: 37 yield span_id 38 elapsed_ms = (time.perf_counter() - start_time) * 1000 39 logger.info("agent_success", extra={ 40 "agent": agent_name, 41 "correlation_id": correlation_id, 42 "span_id": span_id, 43 "latency_ms": round(elapsed_ms, 2), 44 "event": "span_end", 45 "status": "success" 46 }) 47 except Exception as e: 48 elapsed_ms = (time.perf_counter() - start_time) * 1000 49 logger.error("agent_error", extra={ 50 "agent": agent_name, 51 "correlation_id": correlation_id, 52 "span_id": span_id, 53 "latency_ms": round(elapsed_ms, 2), 54 "event": "span_end", 55 "status": "error", 56 "error_type": type(e).__name__, 57 "error_msg": str(e) 58 }) 59 raise

Formato de log para auditoria — cada entrada é um objeto JSON independente, parseável por qualquer sistema de log aggregation (Datadog, CloudWatch, Loki):

json
1{ 2 "asctime": "2026-03-05T14:32:01.234Z", 3 "levelname": "INFO", 4 "agent": "analyzer_node", 5 "correlation_id": "a3f7c2d1-8b4e-4f9a-b2c1-d5e8f0a1b3c4", 6 "span_id": "7f2c3a1b", 7 "latency_ms": 1243.7, 8 "event": "span_end", 9 "status": "success", 10 "tokens_used": 847, 11 "model": "gpt-4o-mini", 12 "estimated_cost_usd": 0.000423 13}

5.3 Custos e Rate Limiting

O custo de um pipeline MAS não é a soma dos custos individuais — é amplificado por retries, contextos redundantes entre agentes e chamadas desnecessárias quando o estado já satisfaz a condição de saída.

python
1# langchain==0.3.x | gptcache==0.1.x 2 3from gptcache import cache 4from gptcache.adapter import openai as cached_openai 5from gptcache.embedding import Onnx 6 7# Semantic caching: requisições semanticamente similares 8# reutilizam respostas anteriores — redução de 30-60% em custo 9# em workflows com perguntas repetitivas (dado ilustrativo) 10onnx = Onnx() 11cache.init(embedding_func=onnx.to_embeddings) 12cache.set_openai_key() 13 14# Estimativa de custo por padrão de orquestração 15# (baseado em gpt-4o-mini a $0.15/1M input tokens — verificar preço atual) 16COST_ESTIMATES = { 17 "sequential_5_agents": "~$0.002-0.008 por execução", 18 "parallel_5_agents": "~$0.002-0.008 por execução (mesma chamada, menor latência)", 19 "hierarchical_supervisor": "~$0.005-0.020 por execução (+custo do supervisor)", 20 "crew_sequential_3_agents": "~$0.003-0.012 por execução" 21}

6. Decisão de Arquitetura — Tabela Comparativa Completa

CritérioLangChain/LangGraphCrewAIHíbrido
Controle do grafo de execuçãoTotal — você define cada arestaAbstrato — framework gerenciaLangGraph para sub-grafos críticos
Velocidade de prototipação3-5 dias para MAS básico1-2 dias para MAS básico4-7 dias
Abstração de papéis de negócioManual — requer mapeamento explícitoNativa — role/goal/backstoryVia CrewAI na camada de negócio
Observabilidade nativaLangSmith (trace completo)Básica (verbose logs)LangSmith para todo o sistema
Tolerância a falhas built-inNenhuma — implemente você mesmomax_iter, max_rpmCamadas: LangGraph + tenacity
Curva de aprendizadoAlta — requer conhecimento de grafosBaixa — declarativo e intuitivoAlta
Ecossistema de integrações500+ integrações nativas~100 integraçõesMelhor dos dois
Adequado para times pequenosViável com esforçoRecomendadoCustoso de manter
Requisitos de auditoria regulatóriaCompleto via LangSmithLimitadoCompleto
Grafos com lógica condicional complexaNativoNão suportadoLangGraph para essa camada
Output estruturado (Pydantic)Via structured output do LLMNativo via output_pydanticAmbos suportam
Human-in-the-loopVia interrupt/resume no LangGraphVia human_input=TrueAmbos suportam

Regra de decisão em 3 linhas:

  1. Use LangGraph quando o grafo de execução tem lógica condicional complexa, requisitos de auditoria regulatória rigorosos, ou quando o time tem engenheiros sênior disponíveis para manter a infraestrutura.
  2. Use CrewAI quando o domínio de negócio é bem mapeável em papéis, o time é pequeno, o prazo é curto, e controle granular do grafo não é requisito.
  3. Use híbrido quando a Crew de negócio (CrewAI) precisa de sub-grafos técnicos confiáveis para tarefas críticas — CrewAI orquestra o fluxo de negócio, LangGraph executa as etapas que exigem determinismo e observabilidade completa.

7. Conclusão

Três insights que não estão na documentação oficial e só emergem em produção:

1. O estado compartilhado é o contrato mais importante do sistema. Antes de escrever qualquer código de agente, defina o schema completo do estado. Mudanças tardias no TypedDict ou no schema Pydantic quebram checkpoints persistidos e exigem migrações. Trate o estado como você trataria um schema de banco de dados.

2. O Critic (validador) reduz custo total, não aumenta. A intuição de que "mais um agente = mais custo" é incorreta quando o Critic elimina reprocessamentos causados por outputs inválidos chegando nas etapas seguintes. Em pipelines com mais de 4 agentes, um Critic bem calibrado reduz o custo total em 15-35% (dado ilustrativo).

3. CrewAI e LangGraph não competem — estratificam. O padrão mais robusto observado em produção usa CrewAI para definir "o quê fazer" (orquestração de negócio) e LangGraph para definir "como fazer com garantias" (sub-grafos de execução crítica). A separação de responsabilidades é limpa e o código resultante é mais legível do que monolitos em qualquer dos dois frameworks.

Próximos passos concretos:

  1. Implemente o DocumentState com um processo real de baixo risco — não crie o estado "perfeito" de primeira. Ele vai evoluir.
  2. Configure LangSmith ou equivalente antes de ir a produção — debugar MAS sem tracing é ordens de magnitude mais custoso do que com.
  3. Escreva testes unitários para cada nó de agente com estados de entrada fixos — nodes são funções puras, são testáveis.

Palavras-chave: Orquestração de Agentes LangChain CrewAI, LangGraph produção, CrewAI tutorial avançado, Multi-Agent Systems Python, arquitetura de agentes IA, LLM orchestration framework.

Publicado por AI2You — AI-First Technical Series | ai2you.online/pt/blog