Arquitetura de Memória para Sistemas Multi-Agentes: Do Contexto Efêmero ao Conhecimento Persistente

AI2You

AI2You | Evolução Humana & IA

2026-03-06

Ilustração 3D de uma arquitetura de memória em quatro camadas para sistemas multi-agentes — Redis, Vector Store, Neo4j e SQL — representadas como slabs luminosos em tons de azul elétrico e teal sobre fundo azul-marinho escuro, com fluxos de dados verticais entre as camadas.
Stack completa de memória para MAS em produção: Redis para estado efêmero, Vector DB para recuperação semântica, Neo4j para conhecimento relacional e MCP como protocolo de interoperabilidade — com código Python e estimativas de custo por camada.

AI2YOU — AI-FIRST TECHNICAL SERIES

Para Engenheiros de IA, Tech Leads e CTOs que implementam sistemas multi-agentes em produção.

1. Um Agente sem Memória é um Funcionário que Esquece Tudo a Cada Conversa

Imagine contratar um analista de crédito que, toda vez que você abre uma nova aba no navegador, esquece completamente o cliente com quem acabou de trabalhar — o histórico, as decisões tomadas, os documentos analisados. Você precisaria reenviar tudo. Ele precisaria reler tudo. O processo recomeçaria do zero.

É exatamente isso que acontece com agentes de IA sem arquitetura de memória.

O custo não é filosófico. É mensurável em tokens, latência e dinheiro. Considere um pipeline realista: 10 agentes colaborando em análise de contratos, cada execução consome em média 4.000 tokens de contexto reprocessado que já estava disponível em execuções anteriores. A 100 execuções por dia, usando gpt-4o-mini a $0,15 por 1M tokens de input (preço confirmado OpenAI, março 2026 — verificar atualizações em openai.com/pricing):

markdown
Tokens desperdiçados/dia = 10 agentes × 4.000 tokens × 100 execuções = 4.000.000 tokens/dia Custo diário desperdiçado = 4.000.000 × $0,15 / 1.000.000 = $0,60/dia Custo mensal desperdiçado = $0,60 × 30 = $18/mês por pipeline

Parece pouco. Mas em uma empresa com 40 pipelines ativos, esse número se torna $720/mês em tokens que reprocessam informação que o sistema já conhecia. Com caching de memória adequado, a estimativa de redução é de 40–70% desse custo (dado ilustrativo, baseado em benchmarks de semantic caching publicados pela Zilliz e Redis Labs — verificar benchmarks atuais em zilliz.com/blog).

O custo real, porém, não está nos tokens. Está na latência acumulada e na inconsistência de estado: agentes que tomam decisões contraditórias porque não compartilham o que aprenderam nas execuções anteriores.

Este artigo é um contrato técnico: ao final, você terá uma stack de memória completa em 4 camadas — Redis, Vector DB, Graph DB e MCP — com código Python de produção para cada camada, critérios de seleção tecnológica justificados e estimativas de custo operacional por escala.

2. Tipos de Memória em IA: Taxonomia Operacional

Antes de escolher tecnologia, é necessário entender o que você está armazenando. Confundir tipos de memória é o erro mais frequente em arquiteturas MAS — e resulta em sistemas que usam Vector DB para o que deveria estar no Redis, e Redis para o que deveria estar no grafo.

2.1 Memória de Curto Prazo (Short-term)

A context window é a memória de curto prazo do agente. Ela é cara, limitada e volátil por design. O anti-pattern mais comum é o context stuffing: colocar todo o histórico disponível no contexto achando que mais informação produz melhor raciocínio.

Pesquisas sobre "lost in the middle" demonstram o oposto: LLMs têm desempenho significativamente pior em informações posicionadas no meio do contexto comparado ao início e ao fim. Um contexto de 128k tokens com informação relevante no meio pode ter recall inferior a um contexto de 8k tokens com a mesma informação no início (Liu et al., 2023 — "Lost in the Middle: How Language Models Use Long Contexts", arXiv:2307.03172, publicado em TACL 2024).

Estratégias de otimização:

Sliding window: mantém apenas as N mensagens mais recentes, descartando o histórico mais antigo.

python
1# langchain==0.3.x | redis==5.x 2 3import logging 4from typing import Optional 5from langchain_core.messages import BaseMessage, HumanMessage, AIMessage 6from langchain_community.chat_message_histories import RedisChatMessageHistory 7 8logger = logging.getLogger(__name__) 9 10 11class SlidingWindowMemory: 12 """ 13 Memória de curto prazo com janela deslizante sobre Redis. 14 15 Mantém os N turnos mais recentes. Turnos mais antigos são 16 descartados — não arquivados. Para arquivamento, use 17 consolidação para Vector Store antes do descarte. 18 """ 19 20 def __init__( 21 self, 22 session_id: str, 23 redis_url: str, 24 window_size: int = 10, 25 ttl: int = 3600, 26 ) -> None: 27 self.window_size = window_size 28 self.history = RedisChatMessageHistory( 29 session_id=session_id, 30 url=redis_url, 31 ttl=ttl, 32 ) 33 34 def add_exchange(self, human: str, ai: str) -> None: 35 """Adiciona um par humano/AI e trunca a janela se necessário.""" 36 self.history.add_user_message(human) 37 self.history.add_ai_message(ai) 38 self._truncate() 39 40 def get_context(self) -> list[BaseMessage]: 41 """Retorna as mensagens dentro da janela ativa.""" 42 messages = self.history.messages 43 return messages[-self.window_size * 2:] # N turnos = 2N mensagens 44 45 def _truncate(self) -> None: 46 """Remove mensagens além do limite da janela.""" 47 messages = self.history.messages 48 excess = len(messages) - (self.window_size * 2) 49 if excess > 0: 50 logger.info(f"sliding_window_truncate excess={excess} session={self.history.session_id}") 51 # RedisChatMessageHistory não suporta delete por índice nativamente 52 # — reconstrua a lista e sobreescreva 53 self.history.clear() 54 for msg in messages[excess:]: 55 if isinstance(msg, HumanMessage): 56 self.history.add_user_message(msg.content) 57 elif isinstance(msg, AIMessage): 58 self.history.add_ai_message(msg.content)

2.2 Memória de Longo Prazo (Long-term)

Vector stores são o índice semântico persistente do sistema. A pergunta não é "qual usar" mas "qual trade-off é aceitável para este caso":

CritérioPineconeWeaviateChromapgvector
Latência p99 (query)~20ms~30ms~50ms local~100ms
Custo/1M vetores~$0,08/mêsSelf-hosted: $0Self-hosted: $0Incluído no RDS
Self-hosted❌ SaaS apenas
Filtros de metadados✅ Robusto✅ Robusto🟡 Limitado✅ Via SQL
Escala horizontal✅ Gerenciado✅ Com esforço🟡 Via RDS
Ideal paraProdução gerenciadaSelf-hosted enterpriseDev/PoCStack PostgreSQL existente

(latências são estimativas ilustrativas baseadas em benchmarks públicos — ANN Benchmarks e documentação oficial dos providers)

Estratégias de chunking para recall máximo:

  • Fixed-size: simples, previsível, falha em documentos com seções de tamanho variável
  • Semantic: divide por mudança de tópico — maior recall, maior custo de indexação
  • Hierarchical: chunk pequeno para precisão + chunk grande para contexto — melhor dos dois, maior complexidade

2.3 Memória de Trabalho (Working Memory)

A working memory é o estado temporário compartilhado entre agentes durante uma execução. O padrão canônico é o blackboard: uma estrutura de dados centralizada onde qualquer agente pode ler e escrever, e todos enxergam o estado global da tarefa.

python
1# redis==5.x 2 3import json 4import logging 5from typing import Any, Optional 6from datetime import datetime 7 8logger = logging.getLogger(__name__) 9 10 11class AgentBlackboard: 12 """ 13 Blackboard compartilhado entre agentes via Redis Hash. 14 15 Cada execução tem seu próprio namespace isolado por execution_id. 16 Escritas são atômicas via HSET. Leituras são sempre consistentes 17 dentro do mesmo Redis instance. 18 """ 19 20 def __init__(self, redis_client, execution_id: str) -> None: 21 self.redis = redis_client 22 self.key = f"blackboard:{execution_id}" 23 self.ttl = 86400 # 24h — limpa execuções abandonadas 24 25 def write(self, agent_id: str, field: str, value: Any) -> None: 26 """ 27 Escreve um campo no blackboard com metadata de proveniência. 28 29 O campo é prefixado com agent_id para rastreabilidade: 30 'planner:task_decomposition', 'worker_a:extraction_result'. 31 """ 32 payload = json.dumps({ 33 "value": value, 34 "agent_id": agent_id, 35 "written_at": datetime.utcnow().isoformat(), 36 }) 37 self.redis.hset(self.key, f"{agent_id}:{field}", payload) 38 self.redis.expire(self.key, self.ttl) 39 logger.info(f"blackboard_write key={self.key} agent={agent_id} field={field}") 40 41 def read(self, agent_id: str, field: str) -> Optional[Any]: 42 """Lê um campo escrito por um agente específico.""" 43 raw = self.redis.hget(self.key, f"{agent_id}:{field}") 44 if not raw: 45 return None 46 return json.loads(raw)["value"] 47 48 def read_all(self) -> dict[str, Any]: 49 """Retorna todo o estado do blackboard para o Critic.""" 50 raw = self.redis.hgetall(self.key) 51 return {k.decode(): json.loads(v)["value"] for k, v in raw.items()}

Scratchpads são diferentes do blackboard: são privados por agente, invisíveis para outros agentes, e descartados ao fim da execução. Use scratchpads para raciocínio intermediário que não precisa ser compartilhado — isso reduz ruído no blackboard e melhora a qualidade do raciocínio do agente individual.

2.4 Memória Episódica vs Semântica

A distinção mais negligenciada em MAS corporativo:

Memória episódica é o log do que aconteceu: "Na sessão X, o Agente A decidiu rejeitar o documento Y porque o campo Z estava ausente." É temporal, contextual, ordenada.

Memória semântica é o que o sistema sabe: "Documentos do tipo Y exigem o campo Z como obrigatório pela regulação BACEN 4.557." É atemporal, factual, estruturada.

Confundir os dois produz sistemas que tratam fatos como logs (re-derivando conhecimento a cada execução) ou logs como fatos (generalizando decisões específicas em regras incorretas).

Caso prático — onboarding financeiro:

  • Episódico: "O cliente João Silva enviou o balanço patrimonial às 14h32 do dia 03/03."
  • Semântico: "Clientes PJ com faturamento > R$ 10M exigem balanço auditado por Big Four."

O primeiro vai para o banco de dados de execução (PostgreSQL). O segundo vai para o grafo de conhecimento (Neo4j). Nunca o contrário.

3. Arquitetura de Memória para MAS: Stack em 4 Camadas

3.1 O Problema do Estado Compartilhado

Sistemas multi-agentes com memória compartilhada são sistemas distribuídos. As garantias do teorema CAP se aplicam integralmente:

  • Consistência forte (todos os agentes veem o mesmo estado ao mesmo tempo): necessária para memória de execução crítica — um agente não pode aprovar uma operação que outro acabou de rejeitar
  • Disponibilidade + eventual consistency: aceitável para memória semântica — se o Agente B ainda não enxerga o fato que o Agente A acabou de indexar, ele derivará o mesmo fato independentemente e o índice será atualizado na próxima sincronização

O problema de split brain em agentes paralelos: dois Workers escrevendo no mesmo namespace do Vector Store simultaneamente podem indexar versões conflitantes do mesmo documento. Solução: writes no Vector Store passam pelo Planner (ponto único de escrita) ou usam optimistic locking com versionamento por agent_id + timestamp.

3.2 Stack de Memória em 4 Camadas

markdown
1┌──────────────────────────────────────────────────────────────────┐ 2│ LAYER 1: Memória de Conversa │ 3│ Redis / Memcached │ 4│ TTL: 1h–24h | Latência: <5ms | Uso: contexto ativo da sessão │ 5├──────────────────────────────────────────────────────────────────┤ 6│ LAYER 2: Memória de Contexto │ 7│ Vector DB (Pinecone / Weaviate / pgvector) │ 8│ TTL: 30–90 dias | Latência: 20–100ms | Uso: recuperação RAG │ 9├──────────────────────────────────────────────────────────────────┤ 10│ LAYER 3: Memória de Conhecimento │ 11│ Graph DB (Neo4j) │ 12│ TTL: permanente | Latência: 10–50ms | Uso: fatos e relações │ 13├──────────────────────────────────────────────────────────────────┤ 14│ LAYER 4: Memória de Execução │ 15│ SQL/NoSQL (PostgreSQL / MongoDB) │ 16│ TTL: regulatório (5–7 anos) | Uso: audit trail, compliance │ 17└──────────────────────────────────────────────────────────────────┘ 18 ▲ ▲ ▲ ▲ 19 │ │ │ │ 20 [Agent API] [RAG queries] [Knowledge [Audit 21 [Blackboard] [Semantic queries] writes] 22 search]

Sinal de que você está usando a camada errada:

  • Layer 1 com dados > 10KB por chave: mova para Layer 2
  • Layer 2 para dados que nunca mudam semanticamente: mova para Layer 3
  • Layer 3 para logs de execução temporários: mova para Layer 4
  • Layer 4 para recuperação semântica em tempo real: crie índice em Layer 2

3.3 Padrões de Acesso por Tipo de Agente

python
1# langchain==0.3.x | redis==5.x | langchain-pinecone==0.2.x 2 3import logging 4from enum import Enum 5from typing import Any, Optional 6from dataclasses import dataclass, field 7 8logger = logging.getLogger(__name__) 9 10 11class MemoryScope(Enum): 12 PRIVATE = "private" # Apenas o agente atual 13 SHARED = "shared" # Todos os agentes da execução 14 HIERARCHICAL = "hier" # Agentes com permissão explícita 15 16 17@dataclass 18class MemoryEntry: 19 """Unidade atômica de memória com metadata de proveniência.""" 20 content: Any 21 agent_id: str 22 session_id: str 23 execution_id: str 24 memory_type: str # "episodic" | "semantic" | "working" 25 scope: MemoryScope 26 confidence_score: float # 0.0–1.0 — confiança do agente no conteúdo 27 ttl_seconds: Optional[int] = None 28 tags: list[str] = field(default_factory=list) 29 30 31class AgentMemoryManager: 32 """ 33 Interface unificada para as 4 camadas de memória. 34 35 Encapsula a lógica de roteamento: o agente declara o que 36 quer armazenar — o Manager decide em qual layer persistir. 37 """ 38 39 def __init__( 40 self, 41 agent_id: str, 42 redis_client, 43 vector_store, 44 graph_driver, 45 sql_session, 46 ) -> None: 47 self.agent_id = agent_id 48 self._redis = redis_client 49 self._vector = vector_store 50 self._graph = graph_driver 51 self._sql = sql_session 52 53 def store(self, entry: MemoryEntry) -> str: 54 """ 55 Roteia o armazenamento para a camada correta baseado em 56 memory_type e scope. 57 58 Returns: memory_id para recuperação posterior. 59 """ 60 if entry.scope == MemoryScope.PRIVATE: 61 return self._store_private(entry) 62 elif entry.memory_type == "working": 63 return self._store_blackboard(entry) 64 elif entry.memory_type == "semantic": 65 return self._store_vector(entry) 66 elif entry.memory_type == "episodic": 67 return self._store_sql(entry) 68 else: 69 raise ValueError(f"Combinação inválida: {entry.memory_type}/{entry.scope}") 70 71 def _store_private(self, entry: MemoryEntry) -> str: 72 """Scratchpad privado — Redis com TTL de sessão.""" 73 key = f"scratch:{self.agent_id}:{entry.execution_id}" 74 import json 75 self._redis.setex(key, entry.ttl_seconds or 3600, json.dumps({ 76 "content": entry.content, 77 "tags": entry.tags, 78 })) 79 logger.info(f"memory_store type=private agent={self.agent_id}") 80 return key 81 82 def _store_blackboard(self, entry: MemoryEntry) -> str: 83 """Blackboard compartilhado — Redis Hash por execution_id.""" 84 key = f"blackboard:{entry.execution_id}" 85 field_name = f"{self.agent_id}:{':'.join(entry.tags)}" 86 import json 87 self._redis.hset(key, field_name, json.dumps(entry.content)) 88 self._redis.expire(key, entry.ttl_seconds or 86400) 89 return f"{key}:{field_name}" 90 91 def _store_vector(self, entry: MemoryEntry) -> str: 92 """Memória semântica — Vector Store com metadados de agente.""" 93 import uuid 94 memory_id = str(uuid.uuid4()) 95 self._vector.add_texts( 96 texts=[str(entry.content)], 97 metadatas=[{ 98 "memory_id": memory_id, 99 "agent_id": entry.agent_id, 100 "session_id": entry.session_id, 101 "confidence": entry.confidence_score, 102 "tags": ",".join(entry.tags), 103 }], 104 ids=[memory_id], 105 ) 106 logger.info(f"memory_store type=semantic agent={self.agent_id} id={memory_id}") 107 return memory_id 108 109 def _store_sql(self, entry: MemoryEntry) -> str: 110 """Memória episódica — PostgreSQL para audit trail.""" 111 from datetime import datetime 112 record = { 113 "agent_id": entry.agent_id, 114 "session_id": entry.session_id, 115 "execution_id": entry.execution_id, 116 "content": str(entry.content), 117 "tags": entry.tags, 118 "created_at": datetime.utcnow(), 119 } 120 result = self._sql.execute( 121 "INSERT INTO agent_memory_log VALUES (:agent_id, :session_id, " 122 ":execution_id, :content, :tags, :created_at) RETURNING id", 123 record 124 ) 125 return str(result.fetchone()[0])

3.4 Estratégia de Write-Through vs Write-Back

Write-through: cada ação do agente persiste imediatamente em todas as camadas relevantes antes de continuar. Latência maior, consistência garantida.

Write-back: o agente acumula memórias em buffer local e persiste em batch ao final da execução ou em intervalos regulares. Menor latência, risco de perda em falha antes do flush.

Regra de decisão:

  • Memória de execução crítica (compliance, audit, decisões irreversíveis): write-through obrigatório
  • Memória semântica de baixo risco (contexto, preferências, histórico não-crítico): write-back aceitável
  • Memória de working/blackboard: write-through — outros agentes dependem da consistência imediata

4. Implementação Prática: Código de Produção por Camada

4.1 Vector Store como Memória Semântica

python
1# langchain==0.3.x | langchain-pinecone==0.2.x | openai==1.x 2 3import logging 4import uuid 5from typing import Optional 6from langchain_pinecone import PineconeVectorStore 7from langchain_openai import OpenAIEmbeddings 8from langchain_core.documents import Document 9 10logger = logging.getLogger(__name__) 11 12embeddings = OpenAIEmbeddings(model="text-embedding-3-small") 13 14 15class SemanticMemoryStore: 16 """ 17 Memória semântica de longo prazo usando Pinecone. 18 19 Armazena memórias com metadados de agente para recuperação 20 filtrada por agent_id, session, tipo e score de confiança. 21 """ 22 23 def __init__(self, index_name: str) -> None: 24 self.store = PineconeVectorStore( 25 index_name=index_name, 26 embedding=embeddings, 27 ) 28 29 def remember( 30 self, 31 content: str, 32 agent_id: str, 33 session_id: str, 34 memory_type: str, 35 confidence: float = 1.0, 36 tags: Optional[list[str]] = None, 37 ) -> str: 38 """ 39 Indexa uma memória com metadata completa de proveniência. 40 41 Antes de indexar, verifica deduplicação por similaridade: 42 se existe memória > 0.97 de similaridade do mesmo agente, 43 atualiza o confidence_score em vez de criar duplicata. 44 """ 45 memory_id = str(uuid.uuid4()) 46 47 # Verificação de deduplicação 48 existing = self.store.similarity_search_with_score( 49 query=content, 50 k=1, 51 filter={"agent_id": agent_id, "memory_type": memory_type}, 52 ) 53 54 if existing and existing[0][1] > 0.97: 55 logger.info(f"semantic_memory_deduplicated agent={agent_id} similarity={existing[0][1]:.3f}") 56 return existing[0][0].metadata["memory_id"] 57 58 doc = Document( 59 page_content=content, 60 metadata={ 61 "memory_id": memory_id, 62 "agent_id": agent_id, 63 "session_id": session_id, 64 "memory_type": memory_type, # semantic | episodic | working 65 "confidence_score": confidence, 66 "tags": ",".join(tags or []), 67 } 68 ) 69 70 self.store.add_documents([doc], ids=[memory_id]) 71 logger.info(f"semantic_memory_stored agent={agent_id} id={memory_id}") 72 return memory_id 73 74 def recall( 75 self, 76 query: str, 77 agent_id: str, 78 k: int = 5, 79 min_confidence: float = 0.7, 80 use_mmr: bool = False, 81 ) -> list[Document]: 82 """ 83 Recupera memórias relevantes filtradas por agente. 84 85 MMR (Maximal Marginal Relevance): use quando a diversidade 86 dos resultados importa mais que a similaridade pura. 87 Similarity search: use quando você quer os k mais similares, 88 aceitando redundância. 89 """ 90 filter_dict = { 91 "agent_id": agent_id, 92 "confidence_score": {"$gte": min_confidence}, 93 } 94 95 if use_mmr: 96 # MMR: balanceia relevância e diversidade 97 # fetch_k > k — busca mais candidatos, seleciona os mais diversos 98 return self.store.max_marginal_relevance_search( 99 query=query, 100 k=k, 101 fetch_k=k * 3, 102 filter=filter_dict, 103 ) 104 105 return self.store.similarity_search( 106 query=query, 107 k=k, 108 filter=filter_dict, 109 )

4.2 Redis como Cache de Contexto de Alta Velocidade

python
1# redis==5.x | langchain-redis==0.1.x 2 3import json 4import logging 5from typing import Optional, Any 6from datetime import datetime 7import redis 8 9logger = logging.getLogger(__name__) 10 11 12class AgentContextCache: 13 """ 14 Cache de contexto de alta velocidade para estado de agente. 15 16 Estruturas Redis por tipo de dado: 17 - Hash: estado estruturado do agente (chave-valor) 18 - List: histórico de mensagens (FIFO com limite) 19 - Sorted Set: memórias ranqueadas por relevância/recência 20 - Pub/Sub: notificação entre agentes sobre updates 21 """ 22 23 TTL = { 24 "session_active": 3600, # 1h — sessão ativa 25 "context_recent": 86400, # 24h — contexto recente 26 "task_state": None, # Sem TTL — persiste até conclusão explícita 27 } 28 29 def __init__(self, redis_url: str) -> None: 30 self.r = redis.from_url(redis_url, decode_responses=True) 31 self.pubsub = self.r.pubsub() 32 33 # ── Estado estruturado ──────────────────────────────────────────── 34 35 def set_agent_state(self, agent_id: str, execution_id: str, state: dict) -> None: 36 """Persiste estado completo do agente como Redis Hash.""" 37 key = f"agent_state:{execution_id}:{agent_id}" 38 self.r.hset(key, mapping={k: json.dumps(v) for k, v in state.items()}) 39 self.r.expire(key, self.TTL["task_state"] or 0) 40 logger.info(f"agent_state_set key={key} fields={list(state.keys())}") 41 42 def get_agent_state(self, agent_id: str, execution_id: str) -> dict: 43 """Recupera estado completo do agente.""" 44 key = f"agent_state:{execution_id}:{agent_id}" 45 raw = self.r.hgetall(key) 46 return {k: json.loads(v) for k, v in raw.items()} 47 48 # ── Histórico de mensagens ──────────────────────────────────────── 49 50 def append_message( 51 self, 52 session_id: str, 53 role: str, 54 content: str, 55 max_history: int = 20, 56 ) -> None: 57 """ 58 Appenda mensagem ao histórico com RPUSH + LTRIM. 59 60 LTRIM garante que a lista nunca excede max_history entries 61 sem necessidade de leitura prévia — operação O(1). 62 """ 63 key = f"history:{session_id}" 64 message = json.dumps({"role": role, "content": content, 65 "ts": datetime.utcnow().isoformat()}) 66 pipe = self.r.pipeline() 67 pipe.rpush(key, message) 68 pipe.ltrim(key, -max_history, -1) 69 pipe.expire(key, self.TTL["session_active"]) 70 pipe.execute() 71 72 def get_history(self, session_id: str) -> list[dict]: 73 """Recupera histórico completo da sessão.""" 74 key = f"history:{session_id}" 75 return [json.loads(m) for m in self.r.lrange(key, 0, -1)] 76 77 # ── Memórias ranqueadas ─────────────────────────────────────────── 78 79 def rank_memory(self, agent_id: str, memory_id: str, score: float) -> None: 80 """ 81 Insere memória em Sorted Set ranqueado por relevância. 82 83 Score composto: combine similaridade semântica + recência 84 em um único float para rankeamento unificado. 85 """ 86 key = f"ranked_memories:{agent_id}" 87 self.r.zadd(key, {memory_id: score}) 88 self.r.expire(key, self.TTL["context_recent"]) 89 90 def get_top_memories(self, agent_id: str, top_k: int = 5) -> list[str]: 91 """Retorna os memory_ids com maior score (mais relevantes).""" 92 key = f"ranked_memories:{agent_id}" 93 return self.r.zrevrange(key, 0, top_k - 1) 94 95 # ── Pub/Sub para notificação entre agentes ──────────────────────── 96 97 def notify_memory_update(self, execution_id: str, agent_id: str, field: str) -> None: 98 """Notifica outros agentes que a memória compartilhada foi atualizada.""" 99 channel = f"memory_updates:{execution_id}" 100 payload = json.dumps({"agent_id": agent_id, "field": field, 101 "ts": datetime.utcnow().isoformat()}) 102 self.r.publish(channel, payload) 103 logger.info(f"memory_update_published channel={channel} agent={agent_id}")

4.3 Neo4j para Conhecimento Relacional

python
1# neo4j==5.x | langchain-neo4j==0.1.x 2 3import logging 4from typing import Optional 5from neo4j import GraphDatabase, Driver 6 7logger = logging.getLogger(__name__) 8 9 10class KnowledgeGraph: 11 """ 12 Grafo de conhecimento estruturado para MAS. 13 14 Schema: 15 - (Agent)-[:KNOWS]->(Fact) 16 - (Agent)-[:DECIDED {reason, confidence}]->(Decision) 17 - (Session)-[:CONTAINS]->(Decision) 18 - (Fact)-[:CONTRADICTS]->(Fact) 19 - (Entity)-[:REFERENCED_IN]->(Session) 20 """ 21 22 def __init__(self, uri: str, user: str, password: str) -> None: 23 self.driver: Driver = GraphDatabase.driver(uri, auth=(user, password)) 24 25 def store_fact( 26 self, 27 agent_id: str, 28 subject: str, 29 predicate: str, 30 obj: str, 31 confidence: float = 1.0, 32 source_session: Optional[str] = None, 33 ) -> None: 34 """ 35 Armazena um fato semântico como tripla (sujeito, predicado, objeto). 36 37 Antes de inserir, verifica contradição com fatos existentes 38 sobre o mesmo sujeito e predicado. 39 """ 40 with self.driver.session() as session: 41 # Verifica contradição 42 existing = session.run( 43 """ 44 MATCH (f:Fact {subject: $subject, predicate: $predicate}) 45 WHERE f.object <> $object 46 RETURN f 47 """, 48 subject=subject, predicate=predicate, object=obj 49 ).data() 50 51 if existing: 52 logger.warning( 53 f"knowledge_contradiction subject={subject} predicate={predicate} " 54 f"existing={existing[0]['f']['object']} new={obj}" 55 ) 56 # Cria relação CONTRADICTS entre os dois fatos 57 session.run( 58 """ 59 MATCH (f1:Fact {subject: $subject, predicate: $predicate}) 60 MERGE (f2:Fact {subject: $subject, predicate: $predicate, object: $object}) 61 MERGE (f1)-[:CONTRADICTS {detected_by: $agent_id}]->(f2) 62 """, 63 subject=subject, predicate=predicate, object=obj, agent_id=agent_id 64 ) 65 return 66 67 session.run( 68 """ 69 MERGE (a:Agent {id: $agent_id}) 70 MERGE (f:Fact {subject: $subject, predicate: $predicate, object: $object}) 71 ON CREATE SET f.confidence = $confidence, f.created_at = datetime() 72 MERGE (a)-[:KNOWS {source_session: $source_session}]->(f) 73 """, 74 agent_id=agent_id, subject=subject, predicate=predicate, 75 object=obj, confidence=confidence, source_session=source_session 76 ) 77 logger.info(f"fact_stored agent={agent_id} {subject}-[{predicate}]->{obj}") 78 79 def recall_about_entity( 80 self, 81 entity: str, 82 agent_id: Optional[str] = None, 83 last_n_sessions: int = 3, 84 ) -> list[dict]: 85 """ 86 Recupera tudo que o sistema sabe sobre uma entidade, 87 opcionalmente filtrado pelo agente e últimas N sessões. 88 """ 89 with self.driver.session() as session: 90 query = """ 91 MATCH (a:Agent)-[:KNOWS]->(f:Fact) 92 WHERE f.subject = $entity 93 OR f.object = $entity 94 """ 95 params: dict = {"entity": entity} 96 97 if agent_id: 98 query += " AND a.id = $agent_id" 99 params["agent_id"] = agent_id 100 101 query += " RETURN f.subject, f.predicate, f.object, f.confidence ORDER BY f.confidence DESC" 102 103 return session.run(query, **params).data() 104 105 def detect_contradictions(self) -> list[dict]: 106 """ 107 Retorna todos os pares de fatos contraditórios no grafo. 108 Deve ser executado pelo Critic antes de decisões críticas. 109 """ 110 with self.driver.session() as session: 111 return session.run( 112 """ 113 MATCH (f1:Fact)-[:CONTRADICTS]->(f2:Fact) 114 RETURN f1.subject, f1.predicate, f1.object AS value_a, 115 f2.object AS value_b 116 """ 117 ).data()

5. MCP e Memória Persistente: Interoperabilidade por Design

5.1 O Padrão Model Context Protocol

MCP resolve um problema de acoplamento: sem ele, cada agente precisa conhecer a API específica do Pinecone, do Redis e do Neo4j. Com MCP, os agentes falam com um MemoryServer padronizado, e a implementação subjacente é transparente.

A consequência arquitetural é significativa: trocar de Pinecone para Weaviate não requer refatorar os agentes — apenas o servidor MCP. O contrato de interface permanece estável.

5.2 Implementação de Servidor MCP para Memória

python
1# mcp==1.x | fastmcp==0.4.x 2 3import logging 4from typing import Any 5import fastmcp 6 7logger = logging.getLogger(__name__) 8 9mcp = fastmcp.FastMCP("MemoryServer") 10 11 12@mcp.tool() 13async def store_memory( 14 content: str, 15 agent_id: str, 16 session_id: str, 17 memory_type: str, 18 scope: str = "shared", 19 confidence: float = 1.0, 20 tags: list[str] | None = None, 21) -> dict: 22 """ 23 Armazena uma memória no layer apropriado. 24 25 O servidor decide o layer baseado em memory_type e scope — 26 o agente não precisa conhecer os detalhes de persistência. 27 """ 28 # Importar o AgentMemoryManager configurado com os clients reais 29 from memory_manager import get_memory_manager 30 manager = get_memory_manager(agent_id) 31 32 from memory_manager import MemoryEntry, MemoryScope 33 entry = MemoryEntry( 34 content=content, 35 agent_id=agent_id, 36 session_id=session_id, 37 execution_id=session_id, # simplificado 38 memory_type=memory_type, 39 scope=MemoryScope(scope), 40 confidence_score=confidence, 41 tags=tags or [], 42 ) 43 44 memory_id = manager.store(entry) 45 logger.info(f"mcp_store_memory agent={agent_id} type={memory_type} id={memory_id}") 46 return {"memory_id": memory_id, "status": "stored"} 47 48 49@mcp.tool() 50async def retrieve_memory( 51 query: str, 52 agent_id: str, 53 memory_type: str = "semantic", 54 top_k: int = 5, 55 min_confidence: float = 0.7, 56) -> list[dict]: 57 """ 58 Recupera memórias relevantes para uma query. 59 60 Roteamento: semântico → Vector Store, episódico → SQL, 61 working → Redis blackboard. 62 """ 63 from memory_manager import get_memory_manager 64 manager = get_memory_manager(agent_id) 65 66 if memory_type == "semantic": 67 docs = manager._vector.recall( 68 query=query, 69 agent_id=agent_id, 70 k=top_k, 71 min_confidence=min_confidence, 72 ) 73 return [{"content": d.page_content, "metadata": d.metadata} for d in docs] 74 75 logger.warning(f"mcp_retrieve unsupported type={memory_type}") 76 return [] 77 78 79@mcp.tool() 80async def forget_memory(memory_id: str, agent_id: str) -> dict: 81 """ 82 Remove uma memória específica do Vector Store. 83 84 Use com cautela: remoção de memória semântica é irreversível. 85 Prefira decrementar confidence_score antes de deletar. 86 """ 87 from memory_manager import get_memory_manager 88 manager = get_memory_manager(agent_id) 89 manager._vector.store.delete(ids=[memory_id]) 90 logger.info(f"mcp_forget_memory agent={agent_id} id={memory_id}") 91 return {"memory_id": memory_id, "status": "deleted"}

5.3 Integração LangGraph + MCP Memory Tool

python
1# langchain==0.3.x | langgraph==0.2.x | mcp==1.x 2 3from langchain_core.tools import StructuredTool 4from langchain_mcp_adapters import MCPToolkit 5 6# Conecta ao servidor MCP de memória 7toolkit = MCPToolkit(server_url="http://localhost:8000/mcp") 8memory_tools = toolkit.get_tools() 9 10# Os tools MCP ficam disponíveis para qualquer nó do LangGraph 11# como qualquer outro tool — zero acoplamento com a implementação 12 13def agent_node_with_memory(state: dict) -> dict: 14 """ 15 Nó de agente com acesso a memória via MCP. 16 17 O agente decide quando e o que lembrar — não é automático. 18 Memory-on-demand produz índices mais limpos que memory-on-every-step. 19 """ 20 from langchain_openai import ChatOpenAI 21 llm = ChatOpenAI(model="gpt-4o-mini", temperature=0).bind_tools(memory_tools) 22 23 response = llm.invoke(state["messages"]) 24 return {"messages": state["messages"] + [response]}

6. Otimização e Custos

6.1 Estratégias de Compactação de Contexto

Summarization progressiva: memórias antigas viram resumos, resumos viram fatos, fatos entram no grafo. Pipeline de consolidação agendado:

python
1# apscheduler==3.x | langchain==0.3.x 2 3import logging 4from apscheduler.schedulers.asyncio import AsyncIOScheduler 5from langchain_openai import ChatOpenAI 6 7logger = logging.getLogger(__name__) 8llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) 9scheduler = AsyncIOScheduler() 10 11 12@scheduler.scheduled_job("interval", hours=6) 13async def consolidate_old_memories() -> None: 14 """ 15 Job de consolidação: memórias com mais de 7 dias e 16 confidence_score < 0.5 são sumarizadas e removidas do índice. 17 18 Memórias com confidence_score > 0.9 são promovidas para o 19 grafo de conhecimento como fatos estruturados. 20 """ 21 from memory_manager import get_all_stale_memories, promote_to_graph, archive_memory 22 23 stale = await get_all_stale_memories(older_than_days=7, max_confidence=0.5) 24 logger.info(f"consolidation_start stale_count={len(stale)}") 25 26 for memory in stale: 27 if memory["confidence_score"] > 0.9: 28 # Promove para grafo — extrai tripla (sujeito, predicado, objeto) 29 await promote_to_graph(memory) 30 else: 31 # Arquiva em SQL e remove do Vector Store 32 await archive_memory(memory) 33 34 logger.info("consolidation_done")

Forget curve inspirada em Ebbinghaus: o confidence_score de memórias não acessadas decrementa automaticamente com o tempo — memórias não utilizadas eventualmente são removidas sem intervenção manual.

6.2 Breakdown de Custos por Camada

Estimativa para sistema com 100 agentes ativos, 1.000 sessões/dia, 90 dias de retenção (preços verificados março 2026 — consultar providers antes de orçar, valores sujeitos a alteração):

LayerTecnologiaCusto/mês (verificado)Variável de escalaFonte
ConversaRedis Cloud (1GB)~$22Sessões ativas simultâneasredis.io/pricing
ContextoPinecone Standardmín. 50+50 + 0,33/GB storage + $16/1M read unitsVetores indexados + volume de queriespinecone.io/pricing
ConhecimentoNeo4j Aura Professionala partir de $65Nós e relações no grafoneo4j.com/pricing
ExecuçãoPostgreSQL RDS (db.t3.medium)~$50–100Linhas de audit logaws.amazon.com/rds/pricing
EmbeddingsOpenAI text-embedding-3-small~$10–40Tokens indexados/mêsopenai.com/pricing
Total estimado~$197–577/mês

Com caching semântico (GPTCache ou similar), a estimativa de redução no custo de embeddings é de 30–60% em workloads com consultas repetitivas (dado ilustrativo).

6.3 Trade-offs de Latência vs Consistência por Caso de Uso

Tipo de processoConsistência necessáriaLayer primárioLatência esperada
Compliance / KYCForteLayer 4 (SQL) + Layer 1 (Redis)50–200ms
Análise de documentoEventualLayer 2 (Vector) + Layer 3 (Graph)30–120ms
Onboarding conversacionalEventualLayer 1 (Redis)5–20ms
Decisão de créditoForteLayer 4 + Layer 3 (Neo4j)80–300ms
Geração de relatórioEventualLayer 2 (Vector)20–80ms

7. Conclusão: Memória como Diferencial Competitivo

Sistemas que aprendem entre sessões — que acumulam conhecimento, detectam contradições e recuperam contexto relevante sem reprocessar — têm uma vantagem operacional não-replicável a curto prazo. Cada execução torna o sistema mais preciso. Cada sessão reduz o custo marginal da próxima.

Sistemas sem memória adequada recomeçam do zero a cada vez. O custo operacional não escala — ele se multiplica.

A stack de 4 camadas apresentada neste artigo não é a única arquitetura válida. É um ponto de partida defensável: cada layer tem um propósito claro, tecnologias comprovadas em produção, e critérios explícitos de quando migrar dados entre camadas.

Checklist de Implementação

  • Layer 1 (Redis) configurado com TTL explícito por tipo de memória (sessão, contexto, tarefa)
  • Vector Store com metadados de agent_id, session_id e confidence_score em todos os documentos
  • Separação explícita entre memória episódica (SQL) e semântica (Vector Store) — zero mistura
  • Blackboard com namespace por execution_id — sem vazamento de estado entre execuções paralelas
  • Servidor MCP abstraindo acesso aos stores — agentes não dependem de APIs específicas
  • Job de consolidação de memória agendado (a cada 6–24h, dependendo do volume)
  • Métricas de cache_hit_rate por agente e por layer coletadas e monitoradas
  • Estratégia de detecção de contradições no grafo executada pelo Critic antes de decisões críticas
  • Plano de retenção documentado: quanto tempo cada tipo de memória é mantido em cada layer

Se você está desenhando ou auditando a arquitetura de memória do seu sistema multi-agente — escolha de Vector Store, estratégia de estado compartilhado, integração com MCP ou compliance de retenção.

Palavras-chave: Arquitetura de Memória para Agentes de IA, memória persistente multi-agentes, vector store LangChain agentes, Redis memória agente IA, MCP memória persistente, RAG ativo sistemas multi-agentes, Neo4j agentes de IA.

Publicado por AI2You — AI-First Technical Series | ai2you.online/pt/blog