AI2YOU — AI-FIRST TECHNICAL SERIES
Para Engenheiros de IA, Tech Leads e CTOs que implementam sistemas multi-agentes em produção.
1. Um Agente sem Memória é um Funcionário que Esquece Tudo a Cada Conversa
Imagine contratar um analista de crédito que, toda vez que você abre uma nova aba no navegador, esquece completamente o cliente com quem acabou de trabalhar — o histórico, as decisões tomadas, os documentos analisados. Você precisaria reenviar tudo. Ele precisaria reler tudo. O processo recomeçaria do zero.
É exatamente isso que acontece com agentes de IA sem arquitetura de memória.
O custo não é filosófico. É mensurável em tokens, latência e dinheiro. Considere um pipeline realista: 10 agentes colaborando em análise de contratos, cada execução consome em média 4.000 tokens de contexto reprocessado que já estava disponível em execuções anteriores. A 100 execuções por dia, usando gpt-4o-mini a $0,15 por 1M tokens de input (preço confirmado OpenAI, março 2026 — verificar atualizações em openai.com/pricing):
Tokens desperdiçados/dia = 10 agentes × 4.000 tokens × 100 execuções
= 4.000.000 tokens/dia
Custo diário desperdiçado = 4.000.000 × $0,15 / 1.000.000 = $0,60/dia
Custo mensal desperdiçado = $0,60 × 30 = $18/mês por pipeline
Parece pouco. Mas em uma empresa com 40 pipelines ativos, esse número se torna $720/mês em tokens que reprocessam informação que o sistema já conhecia. Com caching de memória adequado, a estimativa de redução é de 40–70% desse custo (dado ilustrativo, baseado em benchmarks de semantic caching publicados pela Zilliz e Redis Labs — verificar benchmarks atuais em zilliz.com/blog).
O custo real, porém, não está nos tokens. Está na latência acumulada e na inconsistência de estado: agentes que tomam decisões contraditórias porque não compartilham o que aprenderam nas execuções anteriores.
Este artigo é um contrato técnico: ao final, você terá uma stack de memória completa em 4 camadas — Redis, Vector DB, Graph DB e MCP — com código Python de produção para cada camada, critérios de seleção tecnológica justificados e estimativas de custo operacional por escala.
2. Tipos de Memória em IA: Taxonomia Operacional
Antes de escolher tecnologia, é necessário entender o que você está armazenando. Confundir tipos de memória é o erro mais frequente em arquiteturas MAS — e resulta em sistemas que usam Vector DB para o que deveria estar no Redis, e Redis para o que deveria estar no grafo.
2.1 Memória de Curto Prazo (Short-term)
A context window é a memória de curto prazo do agente. Ela é cara, limitada e volátil por design. O anti-pattern mais comum é o context stuffing: colocar todo o histórico disponível no contexto achando que mais informação produz melhor raciocínio.
Pesquisas sobre "lost in the middle" demonstram o oposto: LLMs têm desempenho significativamente pior em informações posicionadas no meio do contexto comparado ao início e ao fim. Um contexto de 128k tokens com informação relevante no meio pode ter recall inferior a um contexto de 8k tokens com a mesma informação no início (Liu et al., 2023 — "Lost in the Middle: How Language Models Use Long Contexts", arXiv:2307.03172, publicado em TACL 2024).
Estratégias de otimização:
Sliding window: mantém apenas as N mensagens mais recentes, descartando o histórico mais antigo.
1# langchain==0.3.x | redis==5.x
2
3import logging
4from typing import Optional
5from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
6from langchain_community.chat_message_histories import RedisChatMessageHistory
7
8logger = logging.getLogger(__name__)
9
10
11class SlidingWindowMemory:
12 """
13 Memória de curto prazo com janela deslizante sobre Redis.
14
15 Mantém os N turnos mais recentes. Turnos mais antigos são
16 descartados — não arquivados. Para arquivamento, use
17 consolidação para Vector Store antes do descarte.
18 """
19
20 def __init__(
21 self,
22 session_id: str,
23 redis_url: str,
24 window_size: int = 10,
25 ttl: int = 3600,
26 ) -> None:
27 self.window_size = window_size
28 self.history = RedisChatMessageHistory(
29 session_id=session_id,
30 url=redis_url,
31 ttl=ttl,
32 )
33
34 def add_exchange(self, human: str, ai: str) -> None:
35 """Adiciona um par humano/AI e trunca a janela se necessário."""
36 self.history.add_user_message(human)
37 self.history.add_ai_message(ai)
38 self._truncate()
39
40 def get_context(self) -> list[BaseMessage]:
41 """Retorna as mensagens dentro da janela ativa."""
42 messages = self.history.messages
43 return messages[-self.window_size * 2:] # N turnos = 2N mensagens
44
45 def _truncate(self) -> None:
46 """Remove mensagens além do limite da janela."""
47 messages = self.history.messages
48 excess = len(messages) - (self.window_size * 2)
49 if excess > 0:
50 logger.info(f"sliding_window_truncate excess={excess} session={self.history.session_id}")
51 # RedisChatMessageHistory não suporta delete por índice nativamente
52 # — reconstrua a lista e sobreescreva
53 self.history.clear()
54 for msg in messages[excess:]:
55 if isinstance(msg, HumanMessage):
56 self.history.add_user_message(msg.content)
57 elif isinstance(msg, AIMessage):
58 self.history.add_ai_message(msg.content)
2.2 Memória de Longo Prazo (Long-term)
Vector stores são o índice semântico persistente do sistema. A pergunta não é "qual usar" mas "qual trade-off é aceitável para este caso":
| Critério | Pinecone | Weaviate | Chroma | pgvector |
|---|
| Latência p99 (query) | ~20ms | ~30ms | ~50ms local | ~100ms |
| Custo/1M vetores | ~$0,08/mês | Self-hosted: $0 | Self-hosted: $0 | Incluído no RDS |
| Self-hosted | ❌ SaaS apenas | ✅ | ✅ | ✅ |
| Filtros de metadados | ✅ Robusto | ✅ Robusto | 🟡 Limitado | ✅ Via SQL |
| Escala horizontal | ✅ Gerenciado | ✅ Com esforço | ❌ | 🟡 Via RDS |
| Ideal para | Produção gerenciada | Self-hosted enterprise | Dev/PoC | Stack PostgreSQL existente |
(latências são estimativas ilustrativas baseadas em benchmarks públicos — ANN Benchmarks e documentação oficial dos providers)
Estratégias de chunking para recall máximo:
- Fixed-size: simples, previsível, falha em documentos com seções de tamanho variável
- Semantic: divide por mudança de tópico — maior recall, maior custo de indexação
- Hierarchical: chunk pequeno para precisão + chunk grande para contexto — melhor dos dois, maior complexidade
2.3 Memória de Trabalho (Working Memory)
A working memory é o estado temporário compartilhado entre agentes durante uma execução. O padrão canônico é o blackboard: uma estrutura de dados centralizada onde qualquer agente pode ler e escrever, e todos enxergam o estado global da tarefa.
1# redis==5.x
2
3import json
4import logging
5from typing import Any, Optional
6from datetime import datetime
7
8logger = logging.getLogger(__name__)
9
10
11class AgentBlackboard:
12 """
13 Blackboard compartilhado entre agentes via Redis Hash.
14
15 Cada execução tem seu próprio namespace isolado por execution_id.
16 Escritas são atômicas via HSET. Leituras são sempre consistentes
17 dentro do mesmo Redis instance.
18 """
19
20 def __init__(self, redis_client, execution_id: str) -> None:
21 self.redis = redis_client
22 self.key = f"blackboard:{execution_id}"
23 self.ttl = 86400 # 24h — limpa execuções abandonadas
24
25 def write(self, agent_id: str, field: str, value: Any) -> None:
26 """
27 Escreve um campo no blackboard com metadata de proveniência.
28
29 O campo é prefixado com agent_id para rastreabilidade:
30 'planner:task_decomposition', 'worker_a:extraction_result'.
31 """
32 payload = json.dumps({
33 "value": value,
34 "agent_id": agent_id,
35 "written_at": datetime.utcnow().isoformat(),
36 })
37 self.redis.hset(self.key, f"{agent_id}:{field}", payload)
38 self.redis.expire(self.key, self.ttl)
39 logger.info(f"blackboard_write key={self.key} agent={agent_id} field={field}")
40
41 def read(self, agent_id: str, field: str) -> Optional[Any]:
42 """Lê um campo escrito por um agente específico."""
43 raw = self.redis.hget(self.key, f"{agent_id}:{field}")
44 if not raw:
45 return None
46 return json.loads(raw)["value"]
47
48 def read_all(self) -> dict[str, Any]:
49 """Retorna todo o estado do blackboard para o Critic."""
50 raw = self.redis.hgetall(self.key)
51 return {k.decode(): json.loads(v)["value"] for k, v in raw.items()}
Scratchpads são diferentes do blackboard: são privados por agente, invisíveis para outros agentes, e descartados ao fim da execução. Use scratchpads para raciocínio intermediário que não precisa ser compartilhado — isso reduz ruído no blackboard e melhora a qualidade do raciocínio do agente individual.
2.4 Memória Episódica vs Semântica
A distinção mais negligenciada em MAS corporativo:
Memória episódica é o log do que aconteceu: "Na sessão X, o Agente A decidiu rejeitar o documento Y porque o campo Z estava ausente." É temporal, contextual, ordenada.
Memória semântica é o que o sistema sabe: "Documentos do tipo Y exigem o campo Z como obrigatório pela regulação BACEN 4.557." É atemporal, factual, estruturada.
Confundir os dois produz sistemas que tratam fatos como logs (re-derivando conhecimento a cada execução) ou logs como fatos (generalizando decisões específicas em regras incorretas).
Caso prático — onboarding financeiro:
- Episódico: "O cliente João Silva enviou o balanço patrimonial às 14h32 do dia 03/03."
- Semântico: "Clientes PJ com faturamento > R$ 10M exigem balanço auditado por Big Four."
O primeiro vai para o banco de dados de execução (PostgreSQL). O segundo vai para o grafo de conhecimento (Neo4j). Nunca o contrário.
3. Arquitetura de Memória para MAS: Stack em 4 Camadas
3.1 O Problema do Estado Compartilhado
Sistemas multi-agentes com memória compartilhada são sistemas distribuídos. As garantias do teorema CAP se aplicam integralmente:
- Consistência forte (todos os agentes veem o mesmo estado ao mesmo tempo): necessária para memória de execução crítica — um agente não pode aprovar uma operação que outro acabou de rejeitar
- Disponibilidade + eventual consistency: aceitável para memória semântica — se o Agente B ainda não enxerga o fato que o Agente A acabou de indexar, ele derivará o mesmo fato independentemente e o índice será atualizado na próxima sincronização
O problema de split brain em agentes paralelos: dois Workers escrevendo no mesmo namespace do Vector Store simultaneamente podem indexar versões conflitantes do mesmo documento. Solução: writes no Vector Store passam pelo Planner (ponto único de escrita) ou usam optimistic locking com versionamento por agent_id + timestamp.
3.2 Stack de Memória em 4 Camadas
1┌──────────────────────────────────────────────────────────────────┐
2│ LAYER 1: Memória de Conversa │
3│ Redis / Memcached │
4│ TTL: 1h–24h | Latência: <5ms | Uso: contexto ativo da sessão │
5├──────────────────────────────────────────────────────────────────┤
6│ LAYER 2: Memória de Contexto │
7│ Vector DB (Pinecone / Weaviate / pgvector) │
8│ TTL: 30–90 dias | Latência: 20–100ms | Uso: recuperação RAG │
9├──────────────────────────────────────────────────────────────────┤
10│ LAYER 3: Memória de Conhecimento │
11│ Graph DB (Neo4j) │
12│ TTL: permanente | Latência: 10–50ms | Uso: fatos e relações │
13├──────────────────────────────────────────────────────────────────┤
14│ LAYER 4: Memória de Execução │
15│ SQL/NoSQL (PostgreSQL / MongoDB) │
16│ TTL: regulatório (5–7 anos) | Uso: audit trail, compliance │
17└──────────────────────────────────────────────────────────────────┘
18 ▲ ▲ ▲ ▲
19 │ │ │ │
20 [Agent API] [RAG queries] [Knowledge [Audit
21 [Blackboard] [Semantic queries] writes]
22 search]
Sinal de que você está usando a camada errada:
- Layer 1 com dados > 10KB por chave: mova para Layer 2
- Layer 2 para dados que nunca mudam semanticamente: mova para Layer 3
- Layer 3 para logs de execução temporários: mova para Layer 4
- Layer 4 para recuperação semântica em tempo real: crie índice em Layer 2
3.3 Padrões de Acesso por Tipo de Agente
1# langchain==0.3.x | redis==5.x | langchain-pinecone==0.2.x
2
3import logging
4from enum import Enum
5from typing import Any, Optional
6from dataclasses import dataclass, field
7
8logger = logging.getLogger(__name__)
9
10
11class MemoryScope(Enum):
12 PRIVATE = "private" # Apenas o agente atual
13 SHARED = "shared" # Todos os agentes da execução
14 HIERARCHICAL = "hier" # Agentes com permissão explícita
15
16
17@dataclass
18class MemoryEntry:
19 """Unidade atômica de memória com metadata de proveniência."""
20 content: Any
21 agent_id: str
22 session_id: str
23 execution_id: str
24 memory_type: str # "episodic" | "semantic" | "working"
25 scope: MemoryScope
26 confidence_score: float # 0.0–1.0 — confiança do agente no conteúdo
27 ttl_seconds: Optional[int] = None
28 tags: list[str] = field(default_factory=list)
29
30
31class AgentMemoryManager:
32 """
33 Interface unificada para as 4 camadas de memória.
34
35 Encapsula a lógica de roteamento: o agente declara o que
36 quer armazenar — o Manager decide em qual layer persistir.
37 """
38
39 def __init__(
40 self,
41 agent_id: str,
42 redis_client,
43 vector_store,
44 graph_driver,
45 sql_session,
46 ) -> None:
47 self.agent_id = agent_id
48 self._redis = redis_client
49 self._vector = vector_store
50 self._graph = graph_driver
51 self._sql = sql_session
52
53 def store(self, entry: MemoryEntry) -> str:
54 """
55 Roteia o armazenamento para a camada correta baseado em
56 memory_type e scope.
57
58 Returns: memory_id para recuperação posterior.
59 """
60 if entry.scope == MemoryScope.PRIVATE:
61 return self._store_private(entry)
62 elif entry.memory_type == "working":
63 return self._store_blackboard(entry)
64 elif entry.memory_type == "semantic":
65 return self._store_vector(entry)
66 elif entry.memory_type == "episodic":
67 return self._store_sql(entry)
68 else:
69 raise ValueError(f"Combinação inválida: {entry.memory_type}/{entry.scope}")
70
71 def _store_private(self, entry: MemoryEntry) -> str:
72 """Scratchpad privado — Redis com TTL de sessão."""
73 key = f"scratch:{self.agent_id}:{entry.execution_id}"
74 import json
75 self._redis.setex(key, entry.ttl_seconds or 3600, json.dumps({
76 "content": entry.content,
77 "tags": entry.tags,
78 }))
79 logger.info(f"memory_store type=private agent={self.agent_id}")
80 return key
81
82 def _store_blackboard(self, entry: MemoryEntry) -> str:
83 """Blackboard compartilhado — Redis Hash por execution_id."""
84 key = f"blackboard:{entry.execution_id}"
85 field_name = f"{self.agent_id}:{':'.join(entry.tags)}"
86 import json
87 self._redis.hset(key, field_name, json.dumps(entry.content))
88 self._redis.expire(key, entry.ttl_seconds or 86400)
89 return f"{key}:{field_name}"
90
91 def _store_vector(self, entry: MemoryEntry) -> str:
92 """Memória semântica — Vector Store com metadados de agente."""
93 import uuid
94 memory_id = str(uuid.uuid4())
95 self._vector.add_texts(
96 texts=[str(entry.content)],
97 metadatas=[{
98 "memory_id": memory_id,
99 "agent_id": entry.agent_id,
100 "session_id": entry.session_id,
101 "confidence": entry.confidence_score,
102 "tags": ",".join(entry.tags),
103 }],
104 ids=[memory_id],
105 )
106 logger.info(f"memory_store type=semantic agent={self.agent_id} id={memory_id}")
107 return memory_id
108
109 def _store_sql(self, entry: MemoryEntry) -> str:
110 """Memória episódica — PostgreSQL para audit trail."""
111 from datetime import datetime
112 record = {
113 "agent_id": entry.agent_id,
114 "session_id": entry.session_id,
115 "execution_id": entry.execution_id,
116 "content": str(entry.content),
117 "tags": entry.tags,
118 "created_at": datetime.utcnow(),
119 }
120 result = self._sql.execute(
121 "INSERT INTO agent_memory_log VALUES (:agent_id, :session_id, "
122 ":execution_id, :content, :tags, :created_at) RETURNING id",
123 record
124 )
125 return str(result.fetchone()[0])
3.4 Estratégia de Write-Through vs Write-Back
Write-through: cada ação do agente persiste imediatamente em todas as camadas relevantes antes de continuar. Latência maior, consistência garantida.
Write-back: o agente acumula memórias em buffer local e persiste em batch ao final da execução ou em intervalos regulares. Menor latência, risco de perda em falha antes do flush.
Regra de decisão:
- Memória de execução crítica (compliance, audit, decisões irreversíveis): write-through obrigatório
- Memória semântica de baixo risco (contexto, preferências, histórico não-crítico): write-back aceitável
- Memória de working/blackboard: write-through — outros agentes dependem da consistência imediata
4. Implementação Prática: Código de Produção por Camada
4.1 Vector Store como Memória Semântica
1# langchain==0.3.x | langchain-pinecone==0.2.x | openai==1.x
2
3import logging
4import uuid
5from typing import Optional
6from langchain_pinecone import PineconeVectorStore
7from langchain_openai import OpenAIEmbeddings
8from langchain_core.documents import Document
9
10logger = logging.getLogger(__name__)
11
12embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
13
14
15class SemanticMemoryStore:
16 """
17 Memória semântica de longo prazo usando Pinecone.
18
19 Armazena memórias com metadados de agente para recuperação
20 filtrada por agent_id, session, tipo e score de confiança.
21 """
22
23 def __init__(self, index_name: str) -> None:
24 self.store = PineconeVectorStore(
25 index_name=index_name,
26 embedding=embeddings,
27 )
28
29 def remember(
30 self,
31 content: str,
32 agent_id: str,
33 session_id: str,
34 memory_type: str,
35 confidence: float = 1.0,
36 tags: Optional[list[str]] = None,
37 ) -> str:
38 """
39 Indexa uma memória com metadata completa de proveniência.
40
41 Antes de indexar, verifica deduplicação por similaridade:
42 se existe memória > 0.97 de similaridade do mesmo agente,
43 atualiza o confidence_score em vez de criar duplicata.
44 """
45 memory_id = str(uuid.uuid4())
46
47 # Verificação de deduplicação
48 existing = self.store.similarity_search_with_score(
49 query=content,
50 k=1,
51 filter={"agent_id": agent_id, "memory_type": memory_type},
52 )
53
54 if existing and existing[0][1] > 0.97:
55 logger.info(f"semantic_memory_deduplicated agent={agent_id} similarity={existing[0][1]:.3f}")
56 return existing[0][0].metadata["memory_id"]
57
58 doc = Document(
59 page_content=content,
60 metadata={
61 "memory_id": memory_id,
62 "agent_id": agent_id,
63 "session_id": session_id,
64 "memory_type": memory_type, # semantic | episodic | working
65 "confidence_score": confidence,
66 "tags": ",".join(tags or []),
67 }
68 )
69
70 self.store.add_documents([doc], ids=[memory_id])
71 logger.info(f"semantic_memory_stored agent={agent_id} id={memory_id}")
72 return memory_id
73
74 def recall(
75 self,
76 query: str,
77 agent_id: str,
78 k: int = 5,
79 min_confidence: float = 0.7,
80 use_mmr: bool = False,
81 ) -> list[Document]:
82 """
83 Recupera memórias relevantes filtradas por agente.
84
85 MMR (Maximal Marginal Relevance): use quando a diversidade
86 dos resultados importa mais que a similaridade pura.
87 Similarity search: use quando você quer os k mais similares,
88 aceitando redundância.
89 """
90 filter_dict = {
91 "agent_id": agent_id,
92 "confidence_score": {"$gte": min_confidence},
93 }
94
95 if use_mmr:
96 # MMR: balanceia relevância e diversidade
97 # fetch_k > k — busca mais candidatos, seleciona os mais diversos
98 return self.store.max_marginal_relevance_search(
99 query=query,
100 k=k,
101 fetch_k=k * 3,
102 filter=filter_dict,
103 )
104
105 return self.store.similarity_search(
106 query=query,
107 k=k,
108 filter=filter_dict,
109 )
4.2 Redis como Cache de Contexto de Alta Velocidade
1# redis==5.x | langchain-redis==0.1.x
2
3import json
4import logging
5from typing import Optional, Any
6from datetime import datetime
7import redis
8
9logger = logging.getLogger(__name__)
10
11
12class AgentContextCache:
13 """
14 Cache de contexto de alta velocidade para estado de agente.
15
16 Estruturas Redis por tipo de dado:
17 - Hash: estado estruturado do agente (chave-valor)
18 - List: histórico de mensagens (FIFO com limite)
19 - Sorted Set: memórias ranqueadas por relevância/recência
20 - Pub/Sub: notificação entre agentes sobre updates
21 """
22
23 TTL = {
24 "session_active": 3600, # 1h — sessão ativa
25 "context_recent": 86400, # 24h — contexto recente
26 "task_state": None, # Sem TTL — persiste até conclusão explícita
27 }
28
29 def __init__(self, redis_url: str) -> None:
30 self.r = redis.from_url(redis_url, decode_responses=True)
31 self.pubsub = self.r.pubsub()
32
33 # ── Estado estruturado ────────────────────────────────────────────
34
35 def set_agent_state(self, agent_id: str, execution_id: str, state: dict) -> None:
36 """Persiste estado completo do agente como Redis Hash."""
37 key = f"agent_state:{execution_id}:{agent_id}"
38 self.r.hset(key, mapping={k: json.dumps(v) for k, v in state.items()})
39 self.r.expire(key, self.TTL["task_state"] or 0)
40 logger.info(f"agent_state_set key={key} fields={list(state.keys())}")
41
42 def get_agent_state(self, agent_id: str, execution_id: str) -> dict:
43 """Recupera estado completo do agente."""
44 key = f"agent_state:{execution_id}:{agent_id}"
45 raw = self.r.hgetall(key)
46 return {k: json.loads(v) for k, v in raw.items()}
47
48 # ── Histórico de mensagens ────────────────────────────────────────
49
50 def append_message(
51 self,
52 session_id: str,
53 role: str,
54 content: str,
55 max_history: int = 20,
56 ) -> None:
57 """
58 Appenda mensagem ao histórico com RPUSH + LTRIM.
59
60 LTRIM garante que a lista nunca excede max_history entries
61 sem necessidade de leitura prévia — operação O(1).
62 """
63 key = f"history:{session_id}"
64 message = json.dumps({"role": role, "content": content,
65 "ts": datetime.utcnow().isoformat()})
66 pipe = self.r.pipeline()
67 pipe.rpush(key, message)
68 pipe.ltrim(key, -max_history, -1)
69 pipe.expire(key, self.TTL["session_active"])
70 pipe.execute()
71
72 def get_history(self, session_id: str) -> list[dict]:
73 """Recupera histórico completo da sessão."""
74 key = f"history:{session_id}"
75 return [json.loads(m) for m in self.r.lrange(key, 0, -1)]
76
77 # ── Memórias ranqueadas ───────────────────────────────────────────
78
79 def rank_memory(self, agent_id: str, memory_id: str, score: float) -> None:
80 """
81 Insere memória em Sorted Set ranqueado por relevância.
82
83 Score composto: combine similaridade semântica + recência
84 em um único float para rankeamento unificado.
85 """
86 key = f"ranked_memories:{agent_id}"
87 self.r.zadd(key, {memory_id: score})
88 self.r.expire(key, self.TTL["context_recent"])
89
90 def get_top_memories(self, agent_id: str, top_k: int = 5) -> list[str]:
91 """Retorna os memory_ids com maior score (mais relevantes)."""
92 key = f"ranked_memories:{agent_id}"
93 return self.r.zrevrange(key, 0, top_k - 1)
94
95 # ── Pub/Sub para notificação entre agentes ────────────────────────
96
97 def notify_memory_update(self, execution_id: str, agent_id: str, field: str) -> None:
98 """Notifica outros agentes que a memória compartilhada foi atualizada."""
99 channel = f"memory_updates:{execution_id}"
100 payload = json.dumps({"agent_id": agent_id, "field": field,
101 "ts": datetime.utcnow().isoformat()})
102 self.r.publish(channel, payload)
103 logger.info(f"memory_update_published channel={channel} agent={agent_id}")
4.3 Neo4j para Conhecimento Relacional
1# neo4j==5.x | langchain-neo4j==0.1.x
2
3import logging
4from typing import Optional
5from neo4j import GraphDatabase, Driver
6
7logger = logging.getLogger(__name__)
8
9
10class KnowledgeGraph:
11 """
12 Grafo de conhecimento estruturado para MAS.
13
14 Schema:
15 - (Agent)-[:KNOWS]->(Fact)
16 - (Agent)-[:DECIDED {reason, confidence}]->(Decision)
17 - (Session)-[:CONTAINS]->(Decision)
18 - (Fact)-[:CONTRADICTS]->(Fact)
19 - (Entity)-[:REFERENCED_IN]->(Session)
20 """
21
22 def __init__(self, uri: str, user: str, password: str) -> None:
23 self.driver: Driver = GraphDatabase.driver(uri, auth=(user, password))
24
25 def store_fact(
26 self,
27 agent_id: str,
28 subject: str,
29 predicate: str,
30 obj: str,
31 confidence: float = 1.0,
32 source_session: Optional[str] = None,
33 ) -> None:
34 """
35 Armazena um fato semântico como tripla (sujeito, predicado, objeto).
36
37 Antes de inserir, verifica contradição com fatos existentes
38 sobre o mesmo sujeito e predicado.
39 """
40 with self.driver.session() as session:
41 # Verifica contradição
42 existing = session.run(
43 """
44 MATCH (f:Fact {subject: $subject, predicate: $predicate})
45 WHERE f.object <> $object
46 RETURN f
47 """,
48 subject=subject, predicate=predicate, object=obj
49 ).data()
50
51 if existing:
52 logger.warning(
53 f"knowledge_contradiction subject={subject} predicate={predicate} "
54 f"existing={existing[0]['f']['object']} new={obj}"
55 )
56 # Cria relação CONTRADICTS entre os dois fatos
57 session.run(
58 """
59 MATCH (f1:Fact {subject: $subject, predicate: $predicate})
60 MERGE (f2:Fact {subject: $subject, predicate: $predicate, object: $object})
61 MERGE (f1)-[:CONTRADICTS {detected_by: $agent_id}]->(f2)
62 """,
63 subject=subject, predicate=predicate, object=obj, agent_id=agent_id
64 )
65 return
66
67 session.run(
68 """
69 MERGE (a:Agent {id: $agent_id})
70 MERGE (f:Fact {subject: $subject, predicate: $predicate, object: $object})
71 ON CREATE SET f.confidence = $confidence, f.created_at = datetime()
72 MERGE (a)-[:KNOWS {source_session: $source_session}]->(f)
73 """,
74 agent_id=agent_id, subject=subject, predicate=predicate,
75 object=obj, confidence=confidence, source_session=source_session
76 )
77 logger.info(f"fact_stored agent={agent_id} {subject}-[{predicate}]->{obj}")
78
79 def recall_about_entity(
80 self,
81 entity: str,
82 agent_id: Optional[str] = None,
83 last_n_sessions: int = 3,
84 ) -> list[dict]:
85 """
86 Recupera tudo que o sistema sabe sobre uma entidade,
87 opcionalmente filtrado pelo agente e últimas N sessões.
88 """
89 with self.driver.session() as session:
90 query = """
91 MATCH (a:Agent)-[:KNOWS]->(f:Fact)
92 WHERE f.subject = $entity
93 OR f.object = $entity
94 """
95 params: dict = {"entity": entity}
96
97 if agent_id:
98 query += " AND a.id = $agent_id"
99 params["agent_id"] = agent_id
100
101 query += " RETURN f.subject, f.predicate, f.object, f.confidence ORDER BY f.confidence DESC"
102
103 return session.run(query, **params).data()
104
105 def detect_contradictions(self) -> list[dict]:
106 """
107 Retorna todos os pares de fatos contraditórios no grafo.
108 Deve ser executado pelo Critic antes de decisões críticas.
109 """
110 with self.driver.session() as session:
111 return session.run(
112 """
113 MATCH (f1:Fact)-[:CONTRADICTS]->(f2:Fact)
114 RETURN f1.subject, f1.predicate, f1.object AS value_a,
115 f2.object AS value_b
116 """
117 ).data()
5. MCP e Memória Persistente: Interoperabilidade por Design
5.1 O Padrão Model Context Protocol
MCP resolve um problema de acoplamento: sem ele, cada agente precisa conhecer a API específica do Pinecone, do Redis e do Neo4j. Com MCP, os agentes falam com um MemoryServer padronizado, e a implementação subjacente é transparente.
A consequência arquitetural é significativa: trocar de Pinecone para Weaviate não requer refatorar os agentes — apenas o servidor MCP. O contrato de interface permanece estável.
5.2 Implementação de Servidor MCP para Memória
1# mcp==1.x | fastmcp==0.4.x
2
3import logging
4from typing import Any
5import fastmcp
6
7logger = logging.getLogger(__name__)
8
9mcp = fastmcp.FastMCP("MemoryServer")
10
11
12@mcp.tool()
13async def store_memory(
14 content: str,
15 agent_id: str,
16 session_id: str,
17 memory_type: str,
18 scope: str = "shared",
19 confidence: float = 1.0,
20 tags: list[str] | None = None,
21) -> dict:
22 """
23 Armazena uma memória no layer apropriado.
24
25 O servidor decide o layer baseado em memory_type e scope —
26 o agente não precisa conhecer os detalhes de persistência.
27 """
28 # Importar o AgentMemoryManager configurado com os clients reais
29 from memory_manager import get_memory_manager
30 manager = get_memory_manager(agent_id)
31
32 from memory_manager import MemoryEntry, MemoryScope
33 entry = MemoryEntry(
34 content=content,
35 agent_id=agent_id,
36 session_id=session_id,
37 execution_id=session_id, # simplificado
38 memory_type=memory_type,
39 scope=MemoryScope(scope),
40 confidence_score=confidence,
41 tags=tags or [],
42 )
43
44 memory_id = manager.store(entry)
45 logger.info(f"mcp_store_memory agent={agent_id} type={memory_type} id={memory_id}")
46 return {"memory_id": memory_id, "status": "stored"}
47
48
49@mcp.tool()
50async def retrieve_memory(
51 query: str,
52 agent_id: str,
53 memory_type: str = "semantic",
54 top_k: int = 5,
55 min_confidence: float = 0.7,
56) -> list[dict]:
57 """
58 Recupera memórias relevantes para uma query.
59
60 Roteamento: semântico → Vector Store, episódico → SQL,
61 working → Redis blackboard.
62 """
63 from memory_manager import get_memory_manager
64 manager = get_memory_manager(agent_id)
65
66 if memory_type == "semantic":
67 docs = manager._vector.recall(
68 query=query,
69 agent_id=agent_id,
70 k=top_k,
71 min_confidence=min_confidence,
72 )
73 return [{"content": d.page_content, "metadata": d.metadata} for d in docs]
74
75 logger.warning(f"mcp_retrieve unsupported type={memory_type}")
76 return []
77
78
79@mcp.tool()
80async def forget_memory(memory_id: str, agent_id: str) -> dict:
81 """
82 Remove uma memória específica do Vector Store.
83
84 Use com cautela: remoção de memória semântica é irreversível.
85 Prefira decrementar confidence_score antes de deletar.
86 """
87 from memory_manager import get_memory_manager
88 manager = get_memory_manager(agent_id)
89 manager._vector.store.delete(ids=[memory_id])
90 logger.info(f"mcp_forget_memory agent={agent_id} id={memory_id}")
91 return {"memory_id": memory_id, "status": "deleted"}
5.3 Integração LangGraph + MCP Memory Tool
1# langchain==0.3.x | langgraph==0.2.x | mcp==1.x
2
3from langchain_core.tools import StructuredTool
4from langchain_mcp_adapters import MCPToolkit
5
6# Conecta ao servidor MCP de memória
7toolkit = MCPToolkit(server_url="http://localhost:8000/mcp")
8memory_tools = toolkit.get_tools()
9
10# Os tools MCP ficam disponíveis para qualquer nó do LangGraph
11# como qualquer outro tool — zero acoplamento com a implementação
12
13def agent_node_with_memory(state: dict) -> dict:
14 """
15 Nó de agente com acesso a memória via MCP.
16
17 O agente decide quando e o que lembrar — não é automático.
18 Memory-on-demand produz índices mais limpos que memory-on-every-step.
19 """
20 from langchain_openai import ChatOpenAI
21 llm = ChatOpenAI(model="gpt-4o-mini", temperature=0).bind_tools(memory_tools)
22
23 response = llm.invoke(state["messages"])
24 return {"messages": state["messages"] + [response]}
6. Otimização e Custos
6.1 Estratégias de Compactação de Contexto
Summarization progressiva: memórias antigas viram resumos, resumos viram fatos, fatos entram no grafo. Pipeline de consolidação agendado:
1# apscheduler==3.x | langchain==0.3.x
2
3import logging
4from apscheduler.schedulers.asyncio import AsyncIOScheduler
5from langchain_openai import ChatOpenAI
6
7logger = logging.getLogger(__name__)
8llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
9scheduler = AsyncIOScheduler()
10
11
12@scheduler.scheduled_job("interval", hours=6)
13async def consolidate_old_memories() -> None:
14 """
15 Job de consolidação: memórias com mais de 7 dias e
16 confidence_score < 0.5 são sumarizadas e removidas do índice.
17
18 Memórias com confidence_score > 0.9 são promovidas para o
19 grafo de conhecimento como fatos estruturados.
20 """
21 from memory_manager import get_all_stale_memories, promote_to_graph, archive_memory
22
23 stale = await get_all_stale_memories(older_than_days=7, max_confidence=0.5)
24 logger.info(f"consolidation_start stale_count={len(stale)}")
25
26 for memory in stale:
27 if memory["confidence_score"] > 0.9:
28 # Promove para grafo — extrai tripla (sujeito, predicado, objeto)
29 await promote_to_graph(memory)
30 else:
31 # Arquiva em SQL e remove do Vector Store
32 await archive_memory(memory)
33
34 logger.info("consolidation_done")
Forget curve inspirada em Ebbinghaus: o confidence_score de memórias não acessadas decrementa automaticamente com o tempo — memórias não utilizadas eventualmente são removidas sem intervenção manual.
6.2 Breakdown de Custos por Camada
Estimativa para sistema com 100 agentes ativos, 1.000 sessões/dia, 90 dias de retenção (preços verificados março 2026 — consultar providers antes de orçar, valores sujeitos a alteração):
| Layer | Tecnologia | Custo/mês (verificado) | Variável de escala | Fonte |
|---|
| Conversa | Redis Cloud (1GB) | ~$22 | Sessões ativas simultâneas | redis.io/pricing |
| Contexto | Pinecone Standard | mín. 50+0,33/GB storage + $16/1M read units | Vetores indexados + volume de queries | pinecone.io/pricing |
| Conhecimento | Neo4j Aura Professional | a partir de $65 | Nós e relações no grafo | neo4j.com/pricing |
| Execução | PostgreSQL RDS (db.t3.medium) | ~$50–100 | Linhas de audit log | aws.amazon.com/rds/pricing |
| Embeddings | OpenAI text-embedding-3-small | ~$10–40 | Tokens indexados/mês | openai.com/pricing |
| Total estimado | | ~$197–577/mês | | |
Com caching semântico (GPTCache ou similar), a estimativa de redução no custo de embeddings é de 30–60% em workloads com consultas repetitivas (dado ilustrativo).
6.3 Trade-offs de Latência vs Consistência por Caso de Uso
| Tipo de processo | Consistência necessária | Layer primário | Latência esperada |
|---|
| Compliance / KYC | Forte | Layer 4 (SQL) + Layer 1 (Redis) | 50–200ms |
| Análise de documento | Eventual | Layer 2 (Vector) + Layer 3 (Graph) | 30–120ms |
| Onboarding conversacional | Eventual | Layer 1 (Redis) | 5–20ms |
| Decisão de crédito | Forte | Layer 4 + Layer 3 (Neo4j) | 80–300ms |
| Geração de relatório | Eventual | Layer 2 (Vector) | 20–80ms |
7. Conclusão: Memória como Diferencial Competitivo
Sistemas que aprendem entre sessões — que acumulam conhecimento, detectam contradições e recuperam contexto relevante sem reprocessar — têm uma vantagem operacional não-replicável a curto prazo. Cada execução torna o sistema mais preciso. Cada sessão reduz o custo marginal da próxima.
Sistemas sem memória adequada recomeçam do zero a cada vez. O custo operacional não escala — ele se multiplica.
A stack de 4 camadas apresentada neste artigo não é a única arquitetura válida. É um ponto de partida defensável: cada layer tem um propósito claro, tecnologias comprovadas em produção, e critérios explícitos de quando migrar dados entre camadas.
Checklist de Implementação
Se você está desenhando ou auditando a arquitetura de memória do seu sistema multi-agente — escolha de Vector Store, estratégia de estado compartilhado, integração com MCP ou compliance de retenção.
Palavras-chave: Arquitetura de Memória para Agentes de IA, memória persistente multi-agentes, vector store LangChain agentes, Redis memória agente IA, MCP memória persistente, RAG ativo sistemas multi-agentes, Neo4j agentes de IA.
Publicado por AI2You — AI-First Technical Series | ai2you.online/pt/blog