Memory Architecture for Multi-Agent Systems: From Ephemeral Context to Persistent Knowledge

AI2You

AI2You | Human Evolution & AI

2026-03-06

3D illustration of a four-layer memory architecture for multi-agent systems β€” Redis, Vector Store, Neo4j, and SQL β€” rendered as glowing slabs in electric blue and teal tones on a dark navy background, with vertical data streams flowing between memory tiers.
Complete memory stack for MAS in production: Redis for ephemeral state, Vector DB for semantic retrieval, Neo4j for relational knowledge, and MCP as the interoperability protocol β€” with Python code and cost estimates per layer.

AI2YOU β€” AI-FIRST TECHNICAL SERIES

For AI Engineers, Tech Leads, and CTOs implementing multi-agent systems in production.

1. An Agent Without Memory Is an Employee Who Forgets Everything After Every Conversation

Imagine hiring a credit analyst who, every time you open a new browser tab, completely forgets the client they just worked with β€” the history, the decisions made, the documents reviewed. You would need to resend everything. They would need to re-read everything. The process would restart from zero.

That is exactly what happens with AI agents that lack a memory architecture.

The cost is not philosophical. It is measurable in tokens, latency, and money. Consider a realistic pipeline: 10 agents collaborating on contract analysis, each execution consuming an average of 4,000 tokens of reprocessed context that was already available from previous executions. At 100 executions per day, using gpt-4o-mini at $0.15 per 1M input tokens (confirmed OpenAI pricing, March 2026 β€” check for updates at openai.com/pricing):

markdown
Wasted tokens/day = 10 agents Γ— 4,000 tokens Γ— 100 executions = 4,000,000 tokens/day Daily wasted cost = 4,000,000 Γ— $0.15 / 1,000,000 = $0.60/day Monthly wasted cost = $0.60 Γ— 30 = $18/month per pipeline

That seems small. But in a company with 40 active pipelines, that figure becomes $720/month in tokens reprocessing information the system already knew. With proper memory caching, the estimated reduction is 40–70% of that cost (illustrative figure, based on semantic caching benchmarks published by Zilliz and Redis Labs β€” check current benchmarks at zilliz.com/blog).

The real cost, however, is not in the tokens. It is in accumulated latency and state inconsistency: agents making contradictory decisions because they do not share what they learned in previous executions.

This article is a technical contract: by the end, you will have a complete 4-layer memory stack β€” Redis, Vector DB, Graph DB, and MCP β€” with production Python code for each layer, justified technology selection criteria, and operational cost estimates at scale.

2. Types of AI Memory: An Operational Taxonomy

Before choosing technology, you need to understand what you are storing. Confusing memory types is the most frequent error in MAS architectures β€” and results in systems that use Vector DB for what should be in Redis, and Redis for what should be in the graph.

2.1 Short-term Memory

The context window is the agent's short-term memory. It is expensive, limited, and volatile by design. The most common anti-pattern is context stuffing: loading all available history into the context under the assumption that more information produces better reasoning.

Research on "lost in the middle" demonstrates the opposite: LLMs perform significantly worse on information positioned in the middle of the context compared to the beginning and end. A 128k-token context with relevant information in the middle can have lower recall than an 8k-token context with the same information at the start (Liu et al., 2023 β€” "Lost in the Middle: How Language Models Use Long Contexts", arXiv:2307.03172, published in TACL 2024).

Optimization strategies:

Sliding window: keeps only the N most recent messages, discarding older history.

python
1# langchain==0.3.x | redis==5.x 2 3import logging 4from typing import Optional 5from langchain_core.messages import BaseMessage, HumanMessage, AIMessage 6from langchain_community.chat_message_histories import RedisChatMessageHistory 7 8logger = logging.getLogger(__name__) 9 10 11class SlidingWindowMemory: 12 """ 13 Short-term memory with sliding window over Redis. 14 15 Keeps the N most recent turns. Older turns are 16 discarded β€” not archived. For archiving, use 17 consolidation to Vector Store before discarding. 18 """ 19 20 def __init__( 21 self, 22 session_id: str, 23 redis_url: str, 24 window_size: int = 10, 25 ttl: int = 3600, 26 ) -> None: 27 self.window_size = window_size 28 self.history = RedisChatMessageHistory( 29 session_id=session_id, 30 url=redis_url, 31 ttl=ttl, 32 ) 33 34 def add_exchange(self, human: str, ai: str) -> None: 35 """Adds a human/AI pair and truncates the window if necessary.""" 36 self.history.add_user_message(human) 37 self.history.add_ai_message(ai) 38 self._truncate() 39 40 def get_context(self) -> list[BaseMessage]: 41 """Returns messages within the active window.""" 42 messages = self.history.messages 43 return messages[-self.window_size * 2:] # N turns = 2N messages 44 45 def _truncate(self) -> None: 46 """Removes messages beyond the window limit.""" 47 messages = self.history.messages 48 excess = len(messages) - (self.window_size * 2) 49 if excess > 0: 50 logger.info(f"sliding_window_truncate excess={excess} session={self.history.session_id}") 51 # RedisChatMessageHistory does not support delete by index natively 52 # β€” rebuild the list and overwrite 53 self.history.clear() 54 for msg in messages[excess:]: 55 if isinstance(msg, HumanMessage): 56 self.history.add_user_message(msg.content) 57 elif isinstance(msg, AIMessage): 58 self.history.add_ai_message(msg.content)

2.2 Long-term Memory

Vector stores are the persistent semantic index of the system. The question is not "which one to use" but "which trade-off is acceptable for this case":

CriterionPineconeWeaviateChromapgvector
Latency p99 (query)~20ms~30ms~50ms local~100ms
Cost/1M vectors~$0.08/monthSelf-hosted: $0Self-hosted: $0Included in RDS
Self-hosted❌ SaaS onlyβœ…βœ…βœ…
Metadata filtersβœ… Robustβœ… Robust🟑 Limitedβœ… Via SQL
Horizontal scalingβœ… Managedβœ… With effort❌🟑 Via RDS
Best forManaged productionSelf-hosted enterpriseDev/PoCExisting PostgreSQL stack

(latencies are illustrative estimates based on public benchmarks β€” ANN Benchmarks and official provider documentation)

Chunking strategies for maximum recall:

  • Fixed-size: simple, predictable, fails on documents with variable-length sections
  • Semantic: splits by topic change β€” higher recall, higher indexing cost
  • Hierarchical: small chunk for precision + large chunk for context β€” best of both, higher complexity

2.3 Working Memory

Working memory is the temporary shared state between agents during an execution. The canonical pattern is the blackboard: a centralized data structure where any agent can read and write, and all agents see the global task state.

python
1# redis==5.x 2 3import json 4import logging 5from typing import Any, Optional 6from datetime import datetime 7 8logger = logging.getLogger(__name__) 9 10 11class AgentBlackboard: 12 """ 13 Shared blackboard between agents via Redis Hash. 14 15 Each execution has its own namespace isolated by execution_id. 16 Writes are atomic via HSET. Reads are always consistent 17 within the same Redis instance. 18 """ 19 20 def __init__(self, redis_client, execution_id: str) -> None: 21 self.redis = redis_client 22 self.key = f"blackboard:{execution_id}" 23 self.ttl = 86400 # 24h β€” cleans up abandoned executions 24 25 def write(self, agent_id: str, field: str, value: Any) -> None: 26 """ 27 Writes a field to the blackboard with provenance metadata. 28 29 The field is prefixed with agent_id for traceability: 30 'planner:task_decomposition', 'worker_a:extraction_result'. 31 """ 32 payload = json.dumps({ 33 "value": value, 34 "agent_id": agent_id, 35 "written_at": datetime.utcnow().isoformat(), 36 }) 37 self.redis.hset(self.key, f"{agent_id}:{field}", payload) 38 self.redis.expire(self.key, self.ttl) 39 logger.info(f"blackboard_write key={self.key} agent={agent_id} field={field}") 40 41 def read(self, agent_id: str, field: str) -> Optional[Any]: 42 """Reads a field written by a specific agent.""" 43 raw = self.redis.hget(self.key, f"{agent_id}:{field}") 44 if not raw: 45 return None 46 return json.loads(raw)["value"] 47 48 def read_all(self) -> dict[str, Any]: 49 """Returns the full blackboard state for the Critic.""" 50 raw = self.redis.hgetall(self.key) 51 return {k.decode(): json.loads(v)["value"] for k, v in raw.items()}

Scratchpads differ from the blackboard: they are private per agent, invisible to other agents, and discarded at the end of execution. Use scratchpads for intermediate reasoning that does not need to be shared β€” this reduces noise in the blackboard and improves the quality of the individual agent's reasoning.

2.4 Episodic vs Semantic Memory

The most neglected distinction in corporate MAS:

Episodic memory is the log of what happened: "In session X, Agent A decided to reject document Y because field Z was absent." It is temporal, contextual, ordered.

Semantic memory is what the system knows: "Documents of type Y require field Z as mandatory under BACEN regulation 4.557." It is atemporal, factual, structured.

Confusing the two produces systems that treat facts as logs (re-deriving knowledge at every execution) or logs as facts (generalizing specific decisions into incorrect rules).

Practical case β€” financial onboarding:

  • Episodic: "Client JoΓ£o Silva submitted the balance sheet at 2:32 PM on 03/03."
  • Semantic: "PJ clients with revenue > R$ 10M require a balance sheet audited by a Big Four firm."

The first goes to the execution database (PostgreSQL). The second goes to the knowledge graph (Neo4j). Never the other way around.

3. Memory Architecture for MAS: The 4-Layer Stack

3.1 The Shared State Problem

Multi-agent systems with shared memory are distributed systems. The CAP theorem guarantees apply in full:

  • Strong consistency (all agents see the same state at the same time): required for critical execution memory β€” one agent cannot approve an operation that another just rejected
  • Availability + eventual consistency: acceptable for semantic memory β€” if Agent B does not yet see the fact Agent A just indexed, it will independently derive the same fact and the index will be updated on the next sync

The split brain problem with parallel agents: two Workers writing to the same Vector Store namespace simultaneously can index conflicting versions of the same document. Solution: writes to the Vector Store go through the Planner (single write point) or use optimistic locking with versioning by agent_id + timestamp.

3.2 The 4-Layer Memory Stack

markdown
1β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” 2β”‚ LAYER 1: Conversation Memory β”‚ 3β”‚ Redis / Memcached β”‚ 4β”‚ TTL: 1h–24h | Latency: <5ms | Use: active session context β”‚ 5β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ 6β”‚ LAYER 2: Context Memory β”‚ 7β”‚ Vector DB (Pinecone / Weaviate / pgvector) β”‚ 8β”‚ TTL: 30–90 days | Latency: 20–100ms | Use: RAG retrieval β”‚ 9β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ 10β”‚ LAYER 3: Knowledge Memory β”‚ 11β”‚ Graph DB (Neo4j) β”‚ 12β”‚ TTL: permanent | Latency: 10–50ms | Use: facts and relations β”‚ 13β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ 14β”‚ LAYER 4: Execution Memory β”‚ 15β”‚ SQL/NoSQL (PostgreSQL / MongoDB) β”‚ 16β”‚ TTL: regulatory (5–7 years) | Use: audit trail, compliance β”‚ 17β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ 18 β–² β–² β–² β–² 19 β”‚ β”‚ β”‚ β”‚ 20 [Agent API] [RAG queries] [Knowledge [Audit 21 [Blackboard] [Semantic queries] writes] 22 search]

Signal that you are using the wrong layer:

  • Layer 1 with data > 10KB per key: move to Layer 2
  • Layer 2 for data that never changes semantically: move to Layer 3
  • Layer 3 for temporary execution logs: move to Layer 4
  • Layer 4 for real-time semantic retrieval: create an index in Layer 2

3.3 Access Patterns by Agent Type

python
1# langchain==0.3.x | redis==5.x | langchain-pinecone==0.2.x 2 3import logging 4from enum import Enum 5from typing import Any, Optional 6from dataclasses import dataclass, field 7 8logger = logging.getLogger(__name__) 9 10 11class MemoryScope(Enum): 12 PRIVATE = "private" # Current agent only 13 SHARED = "shared" # All agents in the execution 14 HIERARCHICAL = "hier" # Agents with explicit permission 15 16 17@dataclass 18class MemoryEntry: 19 """Atomic unit of memory with provenance metadata.""" 20 content: Any 21 agent_id: str 22 session_id: str 23 execution_id: str 24 memory_type: str # "episodic" | "semantic" | "working" 25 scope: MemoryScope 26 confidence_score: float # 0.0–1.0 β€” agent's confidence in the content 27 ttl_seconds: Optional[int] = None 28 tags: list[str] = field(default_factory=list) 29 30 31class AgentMemoryManager: 32 """ 33 Unified interface for the 4 memory layers. 34 35 Encapsulates routing logic: the agent declares what it 36 wants to store β€” the Manager decides which layer to persist to. 37 """ 38 39 def __init__( 40 self, 41 agent_id: str, 42 redis_client, 43 vector_store, 44 graph_driver, 45 sql_session, 46 ) -> None: 47 self.agent_id = agent_id 48 self._redis = redis_client 49 self._vector = vector_store 50 self._graph = graph_driver 51 self._sql = sql_session 52 53 def store(self, entry: MemoryEntry) -> str: 54 """ 55 Routes storage to the correct layer based on 56 memory_type and scope. 57 58 Returns: memory_id for later retrieval. 59 """ 60 if entry.scope == MemoryScope.PRIVATE: 61 return self._store_private(entry) 62 elif entry.memory_type == "working": 63 return self._store_blackboard(entry) 64 elif entry.memory_type == "semantic": 65 return self._store_vector(entry) 66 elif entry.memory_type == "episodic": 67 return self._store_sql(entry) 68 else: 69 raise ValueError(f"Invalid combination: {entry.memory_type}/{entry.scope}") 70 71 def _store_private(self, entry: MemoryEntry) -> str: 72 """Private scratchpad β€” Redis with session TTL.""" 73 key = f"scratch:{self.agent_id}:{entry.execution_id}" 74 import json 75 self._redis.setex(key, entry.ttl_seconds or 3600, json.dumps({ 76 "content": entry.content, 77 "tags": entry.tags, 78 })) 79 logger.info(f"memory_store type=private agent={self.agent_id}") 80 return key 81 82 def _store_blackboard(self, entry: MemoryEntry) -> str: 83 """Shared blackboard β€” Redis Hash per execution_id.""" 84 key = f"blackboard:{entry.execution_id}" 85 field_name = f"{self.agent_id}:{':'.join(entry.tags)}" 86 import json 87 self._redis.hset(key, field_name, json.dumps(entry.content)) 88 self._redis.expire(key, entry.ttl_seconds or 86400) 89 return f"{key}:{field_name}" 90 91 def _store_vector(self, entry: MemoryEntry) -> str: 92 """Semantic memory β€” Vector Store with agent metadata.""" 93 import uuid 94 memory_id = str(uuid.uuid4()) 95 self._vector.add_texts( 96 texts=[str(entry.content)], 97 metadatas=[{ 98 "memory_id": memory_id, 99 "agent_id": entry.agent_id, 100 "session_id": entry.session_id, 101 "confidence": entry.confidence_score, 102 "tags": ",".join(entry.tags), 103 }], 104 ids=[memory_id], 105 ) 106 logger.info(f"memory_store type=semantic agent={self.agent_id} id={memory_id}") 107 return memory_id 108 109 def _store_sql(self, entry: MemoryEntry) -> str: 110 """Episodic memory β€” PostgreSQL for audit trail.""" 111 from datetime import datetime 112 record = { 113 "agent_id": entry.agent_id, 114 "session_id": entry.session_id, 115 "execution_id": entry.execution_id, 116 "content": str(entry.content), 117 "tags": entry.tags, 118 "created_at": datetime.utcnow(), 119 } 120 result = self._sql.execute( 121 "INSERT INTO agent_memory_log VALUES (:agent_id, :session_id, " 122 ":execution_id, :content, :tags, :created_at) RETURNING id", 123 record 124 ) 125 return str(result.fetchone()[0])

3.4 Write-Through vs Write-Back Strategy

Write-through: each agent action persists immediately across all relevant layers before continuing. Higher latency, guaranteed consistency.

Write-back: the agent accumulates memories in a local buffer and persists in batch at the end of execution or at regular intervals. Lower latency, risk of loss on failure before flush.

Decision rule:

  • Critical execution memory (compliance, audit, irreversible decisions): write-through mandatory
  • Low-risk semantic memory (context, preferences, non-critical history): write-back acceptable
  • Working/blackboard memory: write-through β€” other agents depend on immediate consistency

4. Practical Implementation: Production Code per Layer

4.1 Vector Store as Semantic Memory

python
1# langchain==0.3.x | langchain-pinecone==0.2.x | openai==1.x 2 3import logging 4import uuid 5from typing import Optional 6from langchain_pinecone import PineconeVectorStore 7from langchain_openai import OpenAIEmbeddings 8from langchain_core.documents import Document 9 10logger = logging.getLogger(__name__) 11 12embeddings = OpenAIEmbeddings(model="text-embedding-3-small") 13 14 15class SemanticMemoryStore: 16 """ 17 Long-term semantic memory using Pinecone. 18 19 Stores memories with agent metadata for retrieval 20 filtered by agent_id, session, type, and confidence score. 21 """ 22 23 def __init__(self, index_name: str) -> None: 24 self.store = PineconeVectorStore( 25 index_name=index_name, 26 embedding=embeddings, 27 ) 28 29 def remember( 30 self, 31 content: str, 32 agent_id: str, 33 session_id: str, 34 memory_type: str, 35 confidence: float = 1.0, 36 tags: Optional[list[str]] = None, 37 ) -> str: 38 """ 39 Indexes a memory with complete provenance metadata. 40 41 Before indexing, checks deduplication by similarity: 42 if a memory > 0.97 similarity exists for the same agent, 43 updates the confidence_score instead of creating a duplicate. 44 """ 45 memory_id = str(uuid.uuid4()) 46 47 # Deduplication check 48 existing = self.store.similarity_search_with_score( 49 query=content, 50 k=1, 51 filter={"agent_id": agent_id, "memory_type": memory_type}, 52 ) 53 54 if existing and existing[0][1] > 0.97: 55 logger.info(f"semantic_memory_deduplicated agent={agent_id} similarity={existing[0][1]:.3f}") 56 return existing[0][0].metadata["memory_id"] 57 58 doc = Document( 59 page_content=content, 60 metadata={ 61 "memory_id": memory_id, 62 "agent_id": agent_id, 63 "session_id": session_id, 64 "memory_type": memory_type, # semantic | episodic | working 65 "confidence_score": confidence, 66 "tags": ",".join(tags or []), 67 } 68 ) 69 70 self.store.add_documents([doc], ids=[memory_id]) 71 logger.info(f"semantic_memory_stored agent={agent_id} id={memory_id}") 72 return memory_id 73 74 def recall( 75 self, 76 query: str, 77 agent_id: str, 78 k: int = 5, 79 min_confidence: float = 0.7, 80 use_mmr: bool = False, 81 ) -> list[Document]: 82 """ 83 Retrieves relevant memories filtered by agent. 84 85 MMR (Maximal Marginal Relevance): use when diversity 86 of results matters more than pure similarity. 87 Similarity search: use when you want the k most similar, 88 accepting redundancy. 89 """ 90 filter_dict = { 91 "agent_id": agent_id, 92 "confidence_score": {"$gte": min_confidence}, 93 } 94 95 if use_mmr: 96 # MMR: balances relevance and diversity 97 # fetch_k > k β€” fetches more candidates, selects the most diverse 98 return self.store.max_marginal_relevance_search( 99 query=query, 100 k=k, 101 fetch_k=k * 3, 102 filter=filter_dict, 103 ) 104 105 return self.store.similarity_search( 106 query=query, 107 k=k, 108 filter=filter_dict, 109 )

4.2 Redis as High-Speed Context Cache

python
1# redis==5.x | langchain-redis==0.1.x 2 3import json 4import logging 5from typing import Optional, Any 6from datetime import datetime 7import redis 8 9logger = logging.getLogger(__name__) 10 11 12class AgentContextCache: 13 """ 14 High-speed context cache for agent state. 15 16 Redis data structures by data type: 17 - Hash: structured agent state (key-value) 18 - List: message history (FIFO with limit) 19 - Sorted Set: memories ranked by relevance/recency 20 - Pub/Sub: inter-agent notification on updates 21 """ 22 23 TTL = { 24 "session_active": 3600, # 1h β€” active session 25 "context_recent": 86400, # 24h β€” recent context 26 "task_state": None, # No TTL β€” persists until explicit completion 27 } 28 29 def __init__(self, redis_url: str) -> None: 30 self.r = redis.from_url(redis_url, decode_responses=True) 31 self.pubsub = self.r.pubsub() 32 33 # ── Structured state ────────────────────────────────────────────── 34 35 def set_agent_state(self, agent_id: str, execution_id: str, state: dict) -> None: 36 """Persists full agent state as Redis Hash.""" 37 key = f"agent_state:{execution_id}:{agent_id}" 38 self.r.hset(key, mapping={k: json.dumps(v) for k, v in state.items()}) 39 self.r.expire(key, self.TTL["task_state"] or 0) 40 logger.info(f"agent_state_set key={key} fields={list(state.keys())}") 41 42 def get_agent_state(self, agent_id: str, execution_id: str) -> dict: 43 """Retrieves full agent state.""" 44 key = f"agent_state:{execution_id}:{agent_id}" 45 raw = self.r.hgetall(key) 46 return {k: json.loads(v) for k, v in raw.items()} 47 48 # ── Message history ─────────────────────────────────────────────── 49 50 def append_message( 51 self, 52 session_id: str, 53 role: str, 54 content: str, 55 max_history: int = 20, 56 ) -> None: 57 """ 58 Appends a message to history with RPUSH + LTRIM. 59 60 LTRIM ensures the list never exceeds max_history entries 61 without a prior read β€” O(1) operation. 62 """ 63 key = f"history:{session_id}" 64 message = json.dumps({"role": role, "content": content, 65 "ts": datetime.utcnow().isoformat()}) 66 pipe = self.r.pipeline() 67 pipe.rpush(key, message) 68 pipe.ltrim(key, -max_history, -1) 69 pipe.expire(key, self.TTL["session_active"]) 70 pipe.execute() 71 72 def get_history(self, session_id: str) -> list[dict]: 73 """Retrieves full session history.""" 74 key = f"history:{session_id}" 75 return [json.loads(m) for m in self.r.lrange(key, 0, -1)] 76 77 # ── Ranked memories ─────────────────────────────────────────────── 78 79 def rank_memory(self, agent_id: str, memory_id: str, score: float) -> None: 80 """ 81 Inserts memory into a Sorted Set ranked by relevance. 82 83 Composite score: combine semantic similarity + recency 84 into a single float for unified ranking. 85 """ 86 key = f"ranked_memories:{agent_id}" 87 self.r.zadd(key, {memory_id: score}) 88 self.r.expire(key, self.TTL["context_recent"]) 89 90 def get_top_memories(self, agent_id: str, top_k: int = 5) -> list[str]: 91 """Returns memory_ids with highest score (most relevant).""" 92 key = f"ranked_memories:{agent_id}" 93 return self.r.zrevrange(key, 0, top_k - 1) 94 95 # ── Pub/Sub for inter-agent notification ────────────────────────── 96 97 def notify_memory_update(self, execution_id: str, agent_id: str, field: str) -> None: 98 """Notifies other agents that shared memory has been updated.""" 99 channel = f"memory_updates:{execution_id}" 100 payload = json.dumps({"agent_id": agent_id, "field": field, 101 "ts": datetime.utcnow().isoformat()}) 102 self.r.publish(channel, payload) 103 logger.info(f"memory_update_published channel={channel} agent={agent_id}")

4.3 Neo4j for Relational Knowledge

python
1# neo4j==5.x | langchain-neo4j==0.1.x 2 3import logging 4from typing import Optional 5from neo4j import GraphDatabase, Driver 6 7logger = logging.getLogger(__name__) 8 9 10class KnowledgeGraph: 11 """ 12 Structured knowledge graph for MAS. 13 14 Schema: 15 - (Agent)-[:KNOWS]->(Fact) 16 - (Agent)-[:DECIDED {reason, confidence}]->(Decision) 17 - (Session)-[:CONTAINS]->(Decision) 18 - (Fact)-[:CONTRADICTS]->(Fact) 19 - (Entity)-[:REFERENCED_IN]->(Session) 20 """ 21 22 def __init__(self, uri: str, user: str, password: str) -> None: 23 self.driver: Driver = GraphDatabase.driver(uri, auth=(user, password)) 24 25 def store_fact( 26 self, 27 agent_id: str, 28 subject: str, 29 predicate: str, 30 obj: str, 31 confidence: float = 1.0, 32 source_session: Optional[str] = None, 33 ) -> None: 34 """ 35 Stores a semantic fact as a triple (subject, predicate, object). 36 37 Before inserting, checks for contradictions with existing facts 38 on the same subject and predicate. 39 """ 40 with self.driver.session() as session: 41 # Check for contradiction 42 existing = session.run( 43 """ 44 MATCH (f:Fact {subject: $subject, predicate: $predicate}) 45 WHERE f.object <> $object 46 RETURN f 47 """, 48 subject=subject, predicate=predicate, object=obj 49 ).data() 50 51 if existing: 52 logger.warning( 53 f"knowledge_contradiction subject={subject} predicate={predicate} " 54 f"existing={existing[0]['f']['object']} new={obj}" 55 ) 56 # Creates CONTRADICTS relation between the two facts 57 session.run( 58 """ 59 MATCH (f1:Fact {subject: $subject, predicate: $predicate}) 60 MERGE (f2:Fact {subject: $subject, predicate: $predicate, object: $object}) 61 MERGE (f1)-[:CONTRADICTS {detected_by: $agent_id}]->(f2) 62 """, 63 subject=subject, predicate=predicate, object=obj, agent_id=agent_id 64 ) 65 return 66 67 session.run( 68 """ 69 MERGE (a:Agent {id: $agent_id}) 70 MERGE (f:Fact {subject: $subject, predicate: $predicate, object: $object}) 71 ON CREATE SET f.confidence = $confidence, f.created_at = datetime() 72 MERGE (a)-[:KNOWS {source_session: $source_session}]->(f) 73 """, 74 agent_id=agent_id, subject=subject, predicate=predicate, 75 object=obj, confidence=confidence, source_session=source_session 76 ) 77 logger.info(f"fact_stored agent={agent_id} {subject}-[{predicate}]->{obj}") 78 79 def recall_about_entity( 80 self, 81 entity: str, 82 agent_id: Optional[str] = None, 83 last_n_sessions: int = 3, 84 ) -> list[dict]: 85 """ 86 Retrieves everything the system knows about an entity, 87 optionally filtered by agent and last N sessions. 88 """ 89 with self.driver.session() as session: 90 query = """ 91 MATCH (a:Agent)-[:KNOWS]->(f:Fact) 92 WHERE f.subject = $entity 93 OR f.object = $entity 94 """ 95 params: dict = {"entity": entity} 96 97 if agent_id: 98 query += " AND a.id = $agent_id" 99 params["agent_id"] = agent_id 100 101 query += " RETURN f.subject, f.predicate, f.object, f.confidence ORDER BY f.confidence DESC" 102 103 return session.run(query, **params).data() 104 105 def detect_contradictions(self) -> list[dict]: 106 """ 107 Returns all contradictory fact pairs in the graph. 108 Should be run by the Critic before critical decisions. 109 """ 110 with self.driver.session() as session: 111 return session.run( 112 """ 113 MATCH (f1:Fact)-[:CONTRADICTS]->(f2:Fact) 114 RETURN f1.subject, f1.predicate, f1.object AS value_a, 115 f2.object AS value_b 116 """ 117 ).data()

5. MCP and Persistent Memory: Interoperability by Design

5.1 The Model Context Protocol Pattern

MCP solves a coupling problem: without it, each agent needs to know the specific API of Pinecone, Redis, and Neo4j. With MCP, agents speak to a standardized MemoryServer, and the underlying implementation is transparent.

The architectural consequence is significant: switching from Pinecone to Weaviate does not require refactoring agents β€” only the MCP server. The interface contract remains stable.

5.2 MCP Server Implementation for Memory

python
1# mcp==1.x | fastmcp==0.4.x 2 3import logging 4from typing import Any 5import fastmcp 6 7logger = logging.getLogger(__name__) 8 9mcp = fastmcp.FastMCP("MemoryServer") 10 11 12@mcp.tool() 13async def store_memory( 14 content: str, 15 agent_id: str, 16 session_id: str, 17 memory_type: str, 18 scope: str = "shared", 19 confidence: float = 1.0, 20 tags: list[str] | None = None, 21) -> dict: 22 """ 23 Stores a memory in the appropriate layer. 24 25 The server decides the layer based on memory_type and scope β€” 26 the agent does not need to know the persistence details. 27 """ 28 from memory_manager import get_memory_manager 29 manager = get_memory_manager(agent_id) 30 31 from memory_manager import MemoryEntry, MemoryScope 32 entry = MemoryEntry( 33 content=content, 34 agent_id=agent_id, 35 session_id=session_id, 36 execution_id=session_id, # simplified 37 memory_type=memory_type, 38 scope=MemoryScope(scope), 39 confidence_score=confidence, 40 tags=tags or [], 41 ) 42 43 memory_id = manager.store(entry) 44 logger.info(f"mcp_store_memory agent={agent_id} type={memory_type} id={memory_id}") 45 return {"memory_id": memory_id, "status": "stored"} 46 47 48@mcp.tool() 49async def retrieve_memory( 50 query: str, 51 agent_id: str, 52 memory_type: str = "semantic", 53 top_k: int = 5, 54 min_confidence: float = 0.7, 55) -> list[dict]: 56 """ 57 Retrieves memories relevant to a query. 58 59 Routing: semantic β†’ Vector Store, episodic β†’ SQL, 60 working β†’ Redis blackboard. 61 """ 62 from memory_manager import get_memory_manager 63 manager = get_memory_manager(agent_id) 64 65 if memory_type == "semantic": 66 docs = manager._vector.recall( 67 query=query, 68 agent_id=agent_id, 69 k=top_k, 70 min_confidence=min_confidence, 71 ) 72 return [{"content": d.page_content, "metadata": d.metadata} for d in docs] 73 74 logger.warning(f"mcp_retrieve unsupported type={memory_type}") 75 return [] 76 77 78@mcp.tool() 79async def forget_memory(memory_id: str, agent_id: str) -> dict: 80 """ 81 Removes a specific memory from the Vector Store. 82 83 Use with caution: removal of semantic memory is irreversible. 84 Prefer decrementing confidence_score before deleting. 85 """ 86 from memory_manager import get_memory_manager 87 manager = get_memory_manager(agent_id) 88 manager._vector.store.delete(ids=[memory_id]) 89 logger.info(f"mcp_forget_memory agent={agent_id} id={memory_id}") 90 return {"memory_id": memory_id, "status": "deleted"}

5.3 LangGraph + MCP Memory Tool Integration

python
1# langchain==0.3.x | langgraph==0.2.x | mcp==1.x 2 3from langchain_core.tools import StructuredTool 4from langchain_mcp_adapters import MCPToolkit 5 6# Connects to the memory MCP server 7toolkit = MCPToolkit(server_url="http://localhost:8000/mcp") 8memory_tools = toolkit.get_tools() 9 10# MCP tools are available to any LangGraph node 11# like any other tool β€” zero coupling with the implementation 12 13def agent_node_with_memory(state: dict) -> dict: 14 """ 15 Agent node with memory access via MCP. 16 17 The agent decides when and what to remember β€” it is not automatic. 18 Memory-on-demand produces cleaner indexes than memory-on-every-step. 19 """ 20 from langchain_openai import ChatOpenAI 21 llm = ChatOpenAI(model="gpt-4o-mini", temperature=0).bind_tools(memory_tools) 22 23 response = llm.invoke(state["messages"]) 24 return {"messages": state["messages"] + [response]}

6. Optimization and Costs

6.1 Context Compaction Strategies

Progressive summarization: old memories become summaries, summaries become facts, facts enter the graph. Scheduled consolidation pipeline:

python
1# apscheduler==3.x | langchain==0.3.x 2 3import logging 4from apscheduler.schedulers.asyncio import AsyncIOScheduler 5from langchain_openai import ChatOpenAI 6 7logger = logging.getLogger(__name__) 8llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) 9scheduler = AsyncIOScheduler() 10 11 12@scheduler.scheduled_job("interval", hours=6) 13async def consolidate_old_memories() -> None: 14 """ 15 Consolidation job: memories older than 7 days with 16 confidence_score < 0.5 are summarized and removed from the index. 17 18 Memories with confidence_score > 0.9 are promoted to the 19 knowledge graph as structured facts. 20 """ 21 from memory_manager import get_all_stale_memories, promote_to_graph, archive_memory 22 23 stale = await get_all_stale_memories(older_than_days=7, max_confidence=0.5) 24 logger.info(f"consolidation_start stale_count={len(stale)}") 25 26 for memory in stale: 27 if memory["confidence_score"] > 0.9: 28 # Promote to graph β€” extract triple (subject, predicate, object) 29 await promote_to_graph(memory) 30 else: 31 # Archive in SQL and remove from Vector Store 32 await archive_memory(memory) 33 34 logger.info("consolidation_done")

Ebbinghaus-inspired forget curve: the confidence_score of unaccessed memories decrements automatically over time β€” unused memories are eventually removed without manual intervention.

6.2 Cost Breakdown per Layer

Estimate for a system with 100 active agents, 1,000 sessions/day, 90-day retention (prices verified March 2026 β€” consult providers before budgeting, values subject to change):

LayerTechnologyCost/month (verified)Scale variableSource
ConversationRedis Cloud (1GB)~$22Simultaneous active sessionsredis.io/pricing
ContextPinecone Standardmin. 50+50 + 0.33/GB storage + $16/1M read unitsIndexed vectors + query volumepinecone.io/pricing
KnowledgeNeo4j Aura Professionalfrom $65Nodes and relations in graphneo4j.com/pricing
ExecutionPostgreSQL RDS (db.t3.medium)~$50–100Audit log rowsaws.amazon.com/rds/pricing
EmbeddingsOpenAI text-embedding-3-small~$10–40Tokens indexed/monthopenai.com/pricing
Estimated total~$197–577/month

With semantic caching (GPTCache or similar), the estimated reduction in embedding cost is 30–60% in workloads with repetitive queries (illustrative figure).

6.3 Latency vs Consistency Trade-offs by Use Case

Process typeRequired consistencyPrimary layerExpected latency
Compliance / KYCStrongLayer 4 (SQL) + Layer 1 (Redis)50–200ms
Document analysisEventualLayer 2 (Vector) + Layer 3 (Graph)30–120ms
Conversational onboardingEventualLayer 1 (Redis)5–20ms
Credit decisionStrongLayer 4 + Layer 3 (Neo4j)80–300ms
Report generationEventualLayer 2 (Vector)20–80ms

7. Conclusion: Memory as Competitive Advantage

Systems that learn between sessions β€” that accumulate knowledge, detect contradictions, and retrieve relevant context without reprocessing β€” have an operational advantage that cannot be replicated in the short term. Each execution makes the system more accurate. Each session reduces the marginal cost of the next.

Systems without adequate memory restart from zero every time. Operational cost does not scale β€” it multiplies.

The 4-layer stack presented in this article is not the only valid architecture. It is a defensible starting point: each layer has a clear purpose, technologies proven in production, and explicit criteria for when to migrate data between layers.

Implementation Checklist

  • Layer 1 (Redis) configured with explicit TTL per memory type (session, context, task)
  • Vector Store with agent_id, session_id, and confidence_score metadata on all documents
  • Explicit separation between episodic memory (SQL) and semantic memory (Vector Store) β€” zero mixing
  • Blackboard with namespace per execution_id β€” no state leakage between parallel executions
  • MCP server abstracting access to stores β€” agents do not depend on specific APIs
  • Memory consolidation job scheduled (every 6–24h depending on volume)
  • cache_hit_rate metrics per agent and per layer collected and monitored
  • Contradiction detection strategy in graph executed by Critic before critical decisions
  • Retention plan documented: how long each memory type is kept in each layer

If you are designing or auditing the memory architecture of your multi-agent system β€” Vector Store selection, shared state strategy, MCP integration, or retention compliance.

Keywords: Memory Architecture for AI Agents, persistent memory multi-agent systems, vector store LangChain agents, Redis AI agent memory, MCP persistent memory, active RAG multi-agent systems, Neo4j AI agents.

Published by AI2You β€” AI-First Technical Series | ai2you.online/en/blog


The Future is Collaborative

AI does not replace people. It enhances capabilities when properly targeted.