AI2YOU β AI-FIRST TECHNICAL SERIES
For AI Engineers, Tech Leads, and CTOs implementing multi-agent systems in production.
1. An Agent Without Memory Is an Employee Who Forgets Everything After Every Conversation
Imagine hiring a credit analyst who, every time you open a new browser tab, completely forgets the client they just worked with β the history, the decisions made, the documents reviewed. You would need to resend everything. They would need to re-read everything. The process would restart from zero.
That is exactly what happens with AI agents that lack a memory architecture.
The cost is not philosophical. It is measurable in tokens, latency, and money. Consider a realistic pipeline: 10 agents collaborating on contract analysis, each execution consuming an average of 4,000 tokens of reprocessed context that was already available from previous executions. At 100 executions per day, using gpt-4o-mini at $0.15 per 1M input tokens (confirmed OpenAI pricing, March 2026 β check for updates at openai.com/pricing):
Wasted tokens/day = 10 agents Γ 4,000 tokens Γ 100 executions
= 4,000,000 tokens/day
Daily wasted cost = 4,000,000 Γ $0.15 / 1,000,000 = $0.60/day
Monthly wasted cost = $0.60 Γ 30 = $18/month per pipeline
That seems small. But in a company with 40 active pipelines, that figure becomes $720/month in tokens reprocessing information the system already knew. With proper memory caching, the estimated reduction is 40β70% of that cost (illustrative figure, based on semantic caching benchmarks published by Zilliz and Redis Labs β check current benchmarks at zilliz.com/blog).
The real cost, however, is not in the tokens. It is in accumulated latency and state inconsistency: agents making contradictory decisions because they do not share what they learned in previous executions.
This article is a technical contract: by the end, you will have a complete 4-layer memory stack β Redis, Vector DB, Graph DB, and MCP β with production Python code for each layer, justified technology selection criteria, and operational cost estimates at scale.
2. Types of AI Memory: An Operational Taxonomy
Before choosing technology, you need to understand what you are storing. Confusing memory types is the most frequent error in MAS architectures β and results in systems that use Vector DB for what should be in Redis, and Redis for what should be in the graph.
2.1 Short-term Memory
The context window is the agent's short-term memory. It is expensive, limited, and volatile by design. The most common anti-pattern is context stuffing: loading all available history into the context under the assumption that more information produces better reasoning.
Research on "lost in the middle" demonstrates the opposite: LLMs perform significantly worse on information positioned in the middle of the context compared to the beginning and end. A 128k-token context with relevant information in the middle can have lower recall than an 8k-token context with the same information at the start (Liu et al., 2023 β "Lost in the Middle: How Language Models Use Long Contexts", arXiv:2307.03172, published in TACL 2024).
Optimization strategies:
Sliding window: keeps only the N most recent messages, discarding older history.
1# langchain==0.3.x | redis==5.x
2
3import logging
4from typing import Optional
5from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
6from langchain_community.chat_message_histories import RedisChatMessageHistory
7
8logger = logging.getLogger(__name__)
9
10
11class SlidingWindowMemory:
12 """
13 Short-term memory with sliding window over Redis.
14
15 Keeps the N most recent turns. Older turns are
16 discarded β not archived. For archiving, use
17 consolidation to Vector Store before discarding.
18 """
19
20 def __init__(
21 self,
22 session_id: str,
23 redis_url: str,
24 window_size: int = 10,
25 ttl: int = 3600,
26 ) -> None:
27 self.window_size = window_size
28 self.history = RedisChatMessageHistory(
29 session_id=session_id,
30 url=redis_url,
31 ttl=ttl,
32 )
33
34 def add_exchange(self, human: str, ai: str) -> None:
35 """Adds a human/AI pair and truncates the window if necessary."""
36 self.history.add_user_message(human)
37 self.history.add_ai_message(ai)
38 self._truncate()
39
40 def get_context(self) -> list[BaseMessage]:
41 """Returns messages within the active window."""
42 messages = self.history.messages
43 return messages[-self.window_size * 2:] # N turns = 2N messages
44
45 def _truncate(self) -> None:
46 """Removes messages beyond the window limit."""
47 messages = self.history.messages
48 excess = len(messages) - (self.window_size * 2)
49 if excess > 0:
50 logger.info(f"sliding_window_truncate excess={excess} session={self.history.session_id}")
51 # RedisChatMessageHistory does not support delete by index natively
52 # β rebuild the list and overwrite
53 self.history.clear()
54 for msg in messages[excess:]:
55 if isinstance(msg, HumanMessage):
56 self.history.add_user_message(msg.content)
57 elif isinstance(msg, AIMessage):
58 self.history.add_ai_message(msg.content)
2.2 Long-term Memory
Vector stores are the persistent semantic index of the system. The question is not "which one to use" but "which trade-off is acceptable for this case":
| Criterion | Pinecone | Weaviate | Chroma | pgvector |
|---|
| Latency p99 (query) | ~20ms | ~30ms | ~50ms local | ~100ms |
| Cost/1M vectors | ~$0.08/month | Self-hosted: $0 | Self-hosted: $0 | Included in RDS |
| Self-hosted | β SaaS only | β
| β
| β
|
| Metadata filters | β
Robust | β
Robust | π‘ Limited | β
Via SQL |
| Horizontal scaling | β
Managed | β
With effort | β | π‘ Via RDS |
| Best for | Managed production | Self-hosted enterprise | Dev/PoC | Existing PostgreSQL stack |
(latencies are illustrative estimates based on public benchmarks β ANN Benchmarks and official provider documentation)
Chunking strategies for maximum recall:
- Fixed-size: simple, predictable, fails on documents with variable-length sections
- Semantic: splits by topic change β higher recall, higher indexing cost
- Hierarchical: small chunk for precision + large chunk for context β best of both, higher complexity
2.3 Working Memory
Working memory is the temporary shared state between agents during an execution. The canonical pattern is the blackboard: a centralized data structure where any agent can read and write, and all agents see the global task state.
1# redis==5.x
2
3import json
4import logging
5from typing import Any, Optional
6from datetime import datetime
7
8logger = logging.getLogger(__name__)
9
10
11class AgentBlackboard:
12 """
13 Shared blackboard between agents via Redis Hash.
14
15 Each execution has its own namespace isolated by execution_id.
16 Writes are atomic via HSET. Reads are always consistent
17 within the same Redis instance.
18 """
19
20 def __init__(self, redis_client, execution_id: str) -> None:
21 self.redis = redis_client
22 self.key = f"blackboard:{execution_id}"
23 self.ttl = 86400 # 24h β cleans up abandoned executions
24
25 def write(self, agent_id: str, field: str, value: Any) -> None:
26 """
27 Writes a field to the blackboard with provenance metadata.
28
29 The field is prefixed with agent_id for traceability:
30 'planner:task_decomposition', 'worker_a:extraction_result'.
31 """
32 payload = json.dumps({
33 "value": value,
34 "agent_id": agent_id,
35 "written_at": datetime.utcnow().isoformat(),
36 })
37 self.redis.hset(self.key, f"{agent_id}:{field}", payload)
38 self.redis.expire(self.key, self.ttl)
39 logger.info(f"blackboard_write key={self.key} agent={agent_id} field={field}")
40
41 def read(self, agent_id: str, field: str) -> Optional[Any]:
42 """Reads a field written by a specific agent."""
43 raw = self.redis.hget(self.key, f"{agent_id}:{field}")
44 if not raw:
45 return None
46 return json.loads(raw)["value"]
47
48 def read_all(self) -> dict[str, Any]:
49 """Returns the full blackboard state for the Critic."""
50 raw = self.redis.hgetall(self.key)
51 return {k.decode(): json.loads(v)["value"] for k, v in raw.items()}
Scratchpads differ from the blackboard: they are private per agent, invisible to other agents, and discarded at the end of execution. Use scratchpads for intermediate reasoning that does not need to be shared β this reduces noise in the blackboard and improves the quality of the individual agent's reasoning.
2.4 Episodic vs Semantic Memory
The most neglected distinction in corporate MAS:
Episodic memory is the log of what happened: "In session X, Agent A decided to reject document Y because field Z was absent." It is temporal, contextual, ordered.
Semantic memory is what the system knows: "Documents of type Y require field Z as mandatory under BACEN regulation 4.557." It is atemporal, factual, structured.
Confusing the two produces systems that treat facts as logs (re-deriving knowledge at every execution) or logs as facts (generalizing specific decisions into incorrect rules).
Practical case β financial onboarding:
- Episodic: "Client JoΓ£o Silva submitted the balance sheet at 2:32 PM on 03/03."
- Semantic: "PJ clients with revenue > R$ 10M require a balance sheet audited by a Big Four firm."
The first goes to the execution database (PostgreSQL). The second goes to the knowledge graph (Neo4j). Never the other way around.
3. Memory Architecture for MAS: The 4-Layer Stack
3.1 The Shared State Problem
Multi-agent systems with shared memory are distributed systems. The CAP theorem guarantees apply in full:
- Strong consistency (all agents see the same state at the same time): required for critical execution memory β one agent cannot approve an operation that another just rejected
- Availability + eventual consistency: acceptable for semantic memory β if Agent B does not yet see the fact Agent A just indexed, it will independently derive the same fact and the index will be updated on the next sync
The split brain problem with parallel agents: two Workers writing to the same Vector Store namespace simultaneously can index conflicting versions of the same document. Solution: writes to the Vector Store go through the Planner (single write point) or use optimistic locking with versioning by agent_id + timestamp.
3.2 The 4-Layer Memory Stack
1ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2β LAYER 1: Conversation Memory β
3β Redis / Memcached β
4β TTL: 1hβ24h | Latency: <5ms | Use: active session context β
5ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
6β LAYER 2: Context Memory β
7β Vector DB (Pinecone / Weaviate / pgvector) β
8β TTL: 30β90 days | Latency: 20β100ms | Use: RAG retrieval β
9ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
10β LAYER 3: Knowledge Memory β
11β Graph DB (Neo4j) β
12β TTL: permanent | Latency: 10β50ms | Use: facts and relations β
13ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
14β LAYER 4: Execution Memory β
15β SQL/NoSQL (PostgreSQL / MongoDB) β
16β TTL: regulatory (5β7 years) | Use: audit trail, compliance β
17ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
18 β² β² β² β²
19 β β β β
20 [Agent API] [RAG queries] [Knowledge [Audit
21 [Blackboard] [Semantic queries] writes]
22 search]
Signal that you are using the wrong layer:
- Layer 1 with data > 10KB per key: move to Layer 2
- Layer 2 for data that never changes semantically: move to Layer 3
- Layer 3 for temporary execution logs: move to Layer 4
- Layer 4 for real-time semantic retrieval: create an index in Layer 2
3.3 Access Patterns by Agent Type
1# langchain==0.3.x | redis==5.x | langchain-pinecone==0.2.x
2
3import logging
4from enum import Enum
5from typing import Any, Optional
6from dataclasses import dataclass, field
7
8logger = logging.getLogger(__name__)
9
10
11class MemoryScope(Enum):
12 PRIVATE = "private" # Current agent only
13 SHARED = "shared" # All agents in the execution
14 HIERARCHICAL = "hier" # Agents with explicit permission
15
16
17@dataclass
18class MemoryEntry:
19 """Atomic unit of memory with provenance metadata."""
20 content: Any
21 agent_id: str
22 session_id: str
23 execution_id: str
24 memory_type: str # "episodic" | "semantic" | "working"
25 scope: MemoryScope
26 confidence_score: float # 0.0β1.0 β agent's confidence in the content
27 ttl_seconds: Optional[int] = None
28 tags: list[str] = field(default_factory=list)
29
30
31class AgentMemoryManager:
32 """
33 Unified interface for the 4 memory layers.
34
35 Encapsulates routing logic: the agent declares what it
36 wants to store β the Manager decides which layer to persist to.
37 """
38
39 def __init__(
40 self,
41 agent_id: str,
42 redis_client,
43 vector_store,
44 graph_driver,
45 sql_session,
46 ) -> None:
47 self.agent_id = agent_id
48 self._redis = redis_client
49 self._vector = vector_store
50 self._graph = graph_driver
51 self._sql = sql_session
52
53 def store(self, entry: MemoryEntry) -> str:
54 """
55 Routes storage to the correct layer based on
56 memory_type and scope.
57
58 Returns: memory_id for later retrieval.
59 """
60 if entry.scope == MemoryScope.PRIVATE:
61 return self._store_private(entry)
62 elif entry.memory_type == "working":
63 return self._store_blackboard(entry)
64 elif entry.memory_type == "semantic":
65 return self._store_vector(entry)
66 elif entry.memory_type == "episodic":
67 return self._store_sql(entry)
68 else:
69 raise ValueError(f"Invalid combination: {entry.memory_type}/{entry.scope}")
70
71 def _store_private(self, entry: MemoryEntry) -> str:
72 """Private scratchpad β Redis with session TTL."""
73 key = f"scratch:{self.agent_id}:{entry.execution_id}"
74 import json
75 self._redis.setex(key, entry.ttl_seconds or 3600, json.dumps({
76 "content": entry.content,
77 "tags": entry.tags,
78 }))
79 logger.info(f"memory_store type=private agent={self.agent_id}")
80 return key
81
82 def _store_blackboard(self, entry: MemoryEntry) -> str:
83 """Shared blackboard β Redis Hash per execution_id."""
84 key = f"blackboard:{entry.execution_id}"
85 field_name = f"{self.agent_id}:{':'.join(entry.tags)}"
86 import json
87 self._redis.hset(key, field_name, json.dumps(entry.content))
88 self._redis.expire(key, entry.ttl_seconds or 86400)
89 return f"{key}:{field_name}"
90
91 def _store_vector(self, entry: MemoryEntry) -> str:
92 """Semantic memory β Vector Store with agent metadata."""
93 import uuid
94 memory_id = str(uuid.uuid4())
95 self._vector.add_texts(
96 texts=[str(entry.content)],
97 metadatas=[{
98 "memory_id": memory_id,
99 "agent_id": entry.agent_id,
100 "session_id": entry.session_id,
101 "confidence": entry.confidence_score,
102 "tags": ",".join(entry.tags),
103 }],
104 ids=[memory_id],
105 )
106 logger.info(f"memory_store type=semantic agent={self.agent_id} id={memory_id}")
107 return memory_id
108
109 def _store_sql(self, entry: MemoryEntry) -> str:
110 """Episodic memory β PostgreSQL for audit trail."""
111 from datetime import datetime
112 record = {
113 "agent_id": entry.agent_id,
114 "session_id": entry.session_id,
115 "execution_id": entry.execution_id,
116 "content": str(entry.content),
117 "tags": entry.tags,
118 "created_at": datetime.utcnow(),
119 }
120 result = self._sql.execute(
121 "INSERT INTO agent_memory_log VALUES (:agent_id, :session_id, "
122 ":execution_id, :content, :tags, :created_at) RETURNING id",
123 record
124 )
125 return str(result.fetchone()[0])
3.4 Write-Through vs Write-Back Strategy
Write-through: each agent action persists immediately across all relevant layers before continuing. Higher latency, guaranteed consistency.
Write-back: the agent accumulates memories in a local buffer and persists in batch at the end of execution or at regular intervals. Lower latency, risk of loss on failure before flush.
Decision rule:
- Critical execution memory (compliance, audit, irreversible decisions): write-through mandatory
- Low-risk semantic memory (context, preferences, non-critical history): write-back acceptable
- Working/blackboard memory: write-through β other agents depend on immediate consistency
4. Practical Implementation: Production Code per Layer
4.1 Vector Store as Semantic Memory
1# langchain==0.3.x | langchain-pinecone==0.2.x | openai==1.x
2
3import logging
4import uuid
5from typing import Optional
6from langchain_pinecone import PineconeVectorStore
7from langchain_openai import OpenAIEmbeddings
8from langchain_core.documents import Document
9
10logger = logging.getLogger(__name__)
11
12embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
13
14
15class SemanticMemoryStore:
16 """
17 Long-term semantic memory using Pinecone.
18
19 Stores memories with agent metadata for retrieval
20 filtered by agent_id, session, type, and confidence score.
21 """
22
23 def __init__(self, index_name: str) -> None:
24 self.store = PineconeVectorStore(
25 index_name=index_name,
26 embedding=embeddings,
27 )
28
29 def remember(
30 self,
31 content: str,
32 agent_id: str,
33 session_id: str,
34 memory_type: str,
35 confidence: float = 1.0,
36 tags: Optional[list[str]] = None,
37 ) -> str:
38 """
39 Indexes a memory with complete provenance metadata.
40
41 Before indexing, checks deduplication by similarity:
42 if a memory > 0.97 similarity exists for the same agent,
43 updates the confidence_score instead of creating a duplicate.
44 """
45 memory_id = str(uuid.uuid4())
46
47 # Deduplication check
48 existing = self.store.similarity_search_with_score(
49 query=content,
50 k=1,
51 filter={"agent_id": agent_id, "memory_type": memory_type},
52 )
53
54 if existing and existing[0][1] > 0.97:
55 logger.info(f"semantic_memory_deduplicated agent={agent_id} similarity={existing[0][1]:.3f}")
56 return existing[0][0].metadata["memory_id"]
57
58 doc = Document(
59 page_content=content,
60 metadata={
61 "memory_id": memory_id,
62 "agent_id": agent_id,
63 "session_id": session_id,
64 "memory_type": memory_type, # semantic | episodic | working
65 "confidence_score": confidence,
66 "tags": ",".join(tags or []),
67 }
68 )
69
70 self.store.add_documents([doc], ids=[memory_id])
71 logger.info(f"semantic_memory_stored agent={agent_id} id={memory_id}")
72 return memory_id
73
74 def recall(
75 self,
76 query: str,
77 agent_id: str,
78 k: int = 5,
79 min_confidence: float = 0.7,
80 use_mmr: bool = False,
81 ) -> list[Document]:
82 """
83 Retrieves relevant memories filtered by agent.
84
85 MMR (Maximal Marginal Relevance): use when diversity
86 of results matters more than pure similarity.
87 Similarity search: use when you want the k most similar,
88 accepting redundancy.
89 """
90 filter_dict = {
91 "agent_id": agent_id,
92 "confidence_score": {"$gte": min_confidence},
93 }
94
95 if use_mmr:
96 # MMR: balances relevance and diversity
97 # fetch_k > k β fetches more candidates, selects the most diverse
98 return self.store.max_marginal_relevance_search(
99 query=query,
100 k=k,
101 fetch_k=k * 3,
102 filter=filter_dict,
103 )
104
105 return self.store.similarity_search(
106 query=query,
107 k=k,
108 filter=filter_dict,
109 )
4.2 Redis as High-Speed Context Cache
1# redis==5.x | langchain-redis==0.1.x
2
3import json
4import logging
5from typing import Optional, Any
6from datetime import datetime
7import redis
8
9logger = logging.getLogger(__name__)
10
11
12class AgentContextCache:
13 """
14 High-speed context cache for agent state.
15
16 Redis data structures by data type:
17 - Hash: structured agent state (key-value)
18 - List: message history (FIFO with limit)
19 - Sorted Set: memories ranked by relevance/recency
20 - Pub/Sub: inter-agent notification on updates
21 """
22
23 TTL = {
24 "session_active": 3600, # 1h β active session
25 "context_recent": 86400, # 24h β recent context
26 "task_state": None, # No TTL β persists until explicit completion
27 }
28
29 def __init__(self, redis_url: str) -> None:
30 self.r = redis.from_url(redis_url, decode_responses=True)
31 self.pubsub = self.r.pubsub()
32
33 # ββ Structured state ββββββββββββββββββββββββββββββββββββββββββββββ
34
35 def set_agent_state(self, agent_id: str, execution_id: str, state: dict) -> None:
36 """Persists full agent state as Redis Hash."""
37 key = f"agent_state:{execution_id}:{agent_id}"
38 self.r.hset(key, mapping={k: json.dumps(v) for k, v in state.items()})
39 self.r.expire(key, self.TTL["task_state"] or 0)
40 logger.info(f"agent_state_set key={key} fields={list(state.keys())}")
41
42 def get_agent_state(self, agent_id: str, execution_id: str) -> dict:
43 """Retrieves full agent state."""
44 key = f"agent_state:{execution_id}:{agent_id}"
45 raw = self.r.hgetall(key)
46 return {k: json.loads(v) for k, v in raw.items()}
47
48 # ββ Message history βββββββββββββββββββββββββββββββββββββββββββββββ
49
50 def append_message(
51 self,
52 session_id: str,
53 role: str,
54 content: str,
55 max_history: int = 20,
56 ) -> None:
57 """
58 Appends a message to history with RPUSH + LTRIM.
59
60 LTRIM ensures the list never exceeds max_history entries
61 without a prior read β O(1) operation.
62 """
63 key = f"history:{session_id}"
64 message = json.dumps({"role": role, "content": content,
65 "ts": datetime.utcnow().isoformat()})
66 pipe = self.r.pipeline()
67 pipe.rpush(key, message)
68 pipe.ltrim(key, -max_history, -1)
69 pipe.expire(key, self.TTL["session_active"])
70 pipe.execute()
71
72 def get_history(self, session_id: str) -> list[dict]:
73 """Retrieves full session history."""
74 key = f"history:{session_id}"
75 return [json.loads(m) for m in self.r.lrange(key, 0, -1)]
76
77 # ββ Ranked memories βββββββββββββββββββββββββββββββββββββββββββββββ
78
79 def rank_memory(self, agent_id: str, memory_id: str, score: float) -> None:
80 """
81 Inserts memory into a Sorted Set ranked by relevance.
82
83 Composite score: combine semantic similarity + recency
84 into a single float for unified ranking.
85 """
86 key = f"ranked_memories:{agent_id}"
87 self.r.zadd(key, {memory_id: score})
88 self.r.expire(key, self.TTL["context_recent"])
89
90 def get_top_memories(self, agent_id: str, top_k: int = 5) -> list[str]:
91 """Returns memory_ids with highest score (most relevant)."""
92 key = f"ranked_memories:{agent_id}"
93 return self.r.zrevrange(key, 0, top_k - 1)
94
95 # ββ Pub/Sub for inter-agent notification ββββββββββββββββββββββββββ
96
97 def notify_memory_update(self, execution_id: str, agent_id: str, field: str) -> None:
98 """Notifies other agents that shared memory has been updated."""
99 channel = f"memory_updates:{execution_id}"
100 payload = json.dumps({"agent_id": agent_id, "field": field,
101 "ts": datetime.utcnow().isoformat()})
102 self.r.publish(channel, payload)
103 logger.info(f"memory_update_published channel={channel} agent={agent_id}")
4.3 Neo4j for Relational Knowledge
1# neo4j==5.x | langchain-neo4j==0.1.x
2
3import logging
4from typing import Optional
5from neo4j import GraphDatabase, Driver
6
7logger = logging.getLogger(__name__)
8
9
10class KnowledgeGraph:
11 """
12 Structured knowledge graph for MAS.
13
14 Schema:
15 - (Agent)-[:KNOWS]->(Fact)
16 - (Agent)-[:DECIDED {reason, confidence}]->(Decision)
17 - (Session)-[:CONTAINS]->(Decision)
18 - (Fact)-[:CONTRADICTS]->(Fact)
19 - (Entity)-[:REFERENCED_IN]->(Session)
20 """
21
22 def __init__(self, uri: str, user: str, password: str) -> None:
23 self.driver: Driver = GraphDatabase.driver(uri, auth=(user, password))
24
25 def store_fact(
26 self,
27 agent_id: str,
28 subject: str,
29 predicate: str,
30 obj: str,
31 confidence: float = 1.0,
32 source_session: Optional[str] = None,
33 ) -> None:
34 """
35 Stores a semantic fact as a triple (subject, predicate, object).
36
37 Before inserting, checks for contradictions with existing facts
38 on the same subject and predicate.
39 """
40 with self.driver.session() as session:
41 # Check for contradiction
42 existing = session.run(
43 """
44 MATCH (f:Fact {subject: $subject, predicate: $predicate})
45 WHERE f.object <> $object
46 RETURN f
47 """,
48 subject=subject, predicate=predicate, object=obj
49 ).data()
50
51 if existing:
52 logger.warning(
53 f"knowledge_contradiction subject={subject} predicate={predicate} "
54 f"existing={existing[0]['f']['object']} new={obj}"
55 )
56 # Creates CONTRADICTS relation between the two facts
57 session.run(
58 """
59 MATCH (f1:Fact {subject: $subject, predicate: $predicate})
60 MERGE (f2:Fact {subject: $subject, predicate: $predicate, object: $object})
61 MERGE (f1)-[:CONTRADICTS {detected_by: $agent_id}]->(f2)
62 """,
63 subject=subject, predicate=predicate, object=obj, agent_id=agent_id
64 )
65 return
66
67 session.run(
68 """
69 MERGE (a:Agent {id: $agent_id})
70 MERGE (f:Fact {subject: $subject, predicate: $predicate, object: $object})
71 ON CREATE SET f.confidence = $confidence, f.created_at = datetime()
72 MERGE (a)-[:KNOWS {source_session: $source_session}]->(f)
73 """,
74 agent_id=agent_id, subject=subject, predicate=predicate,
75 object=obj, confidence=confidence, source_session=source_session
76 )
77 logger.info(f"fact_stored agent={agent_id} {subject}-[{predicate}]->{obj}")
78
79 def recall_about_entity(
80 self,
81 entity: str,
82 agent_id: Optional[str] = None,
83 last_n_sessions: int = 3,
84 ) -> list[dict]:
85 """
86 Retrieves everything the system knows about an entity,
87 optionally filtered by agent and last N sessions.
88 """
89 with self.driver.session() as session:
90 query = """
91 MATCH (a:Agent)-[:KNOWS]->(f:Fact)
92 WHERE f.subject = $entity
93 OR f.object = $entity
94 """
95 params: dict = {"entity": entity}
96
97 if agent_id:
98 query += " AND a.id = $agent_id"
99 params["agent_id"] = agent_id
100
101 query += " RETURN f.subject, f.predicate, f.object, f.confidence ORDER BY f.confidence DESC"
102
103 return session.run(query, **params).data()
104
105 def detect_contradictions(self) -> list[dict]:
106 """
107 Returns all contradictory fact pairs in the graph.
108 Should be run by the Critic before critical decisions.
109 """
110 with self.driver.session() as session:
111 return session.run(
112 """
113 MATCH (f1:Fact)-[:CONTRADICTS]->(f2:Fact)
114 RETURN f1.subject, f1.predicate, f1.object AS value_a,
115 f2.object AS value_b
116 """
117 ).data()
5. MCP and Persistent Memory: Interoperability by Design
5.1 The Model Context Protocol Pattern
MCP solves a coupling problem: without it, each agent needs to know the specific API of Pinecone, Redis, and Neo4j. With MCP, agents speak to a standardized MemoryServer, and the underlying implementation is transparent.
The architectural consequence is significant: switching from Pinecone to Weaviate does not require refactoring agents β only the MCP server. The interface contract remains stable.
5.2 MCP Server Implementation for Memory
1# mcp==1.x | fastmcp==0.4.x
2
3import logging
4from typing import Any
5import fastmcp
6
7logger = logging.getLogger(__name__)
8
9mcp = fastmcp.FastMCP("MemoryServer")
10
11
12@mcp.tool()
13async def store_memory(
14 content: str,
15 agent_id: str,
16 session_id: str,
17 memory_type: str,
18 scope: str = "shared",
19 confidence: float = 1.0,
20 tags: list[str] | None = None,
21) -> dict:
22 """
23 Stores a memory in the appropriate layer.
24
25 The server decides the layer based on memory_type and scope β
26 the agent does not need to know the persistence details.
27 """
28 from memory_manager import get_memory_manager
29 manager = get_memory_manager(agent_id)
30
31 from memory_manager import MemoryEntry, MemoryScope
32 entry = MemoryEntry(
33 content=content,
34 agent_id=agent_id,
35 session_id=session_id,
36 execution_id=session_id, # simplified
37 memory_type=memory_type,
38 scope=MemoryScope(scope),
39 confidence_score=confidence,
40 tags=tags or [],
41 )
42
43 memory_id = manager.store(entry)
44 logger.info(f"mcp_store_memory agent={agent_id} type={memory_type} id={memory_id}")
45 return {"memory_id": memory_id, "status": "stored"}
46
47
48@mcp.tool()
49async def retrieve_memory(
50 query: str,
51 agent_id: str,
52 memory_type: str = "semantic",
53 top_k: int = 5,
54 min_confidence: float = 0.7,
55) -> list[dict]:
56 """
57 Retrieves memories relevant to a query.
58
59 Routing: semantic β Vector Store, episodic β SQL,
60 working β Redis blackboard.
61 """
62 from memory_manager import get_memory_manager
63 manager = get_memory_manager(agent_id)
64
65 if memory_type == "semantic":
66 docs = manager._vector.recall(
67 query=query,
68 agent_id=agent_id,
69 k=top_k,
70 min_confidence=min_confidence,
71 )
72 return [{"content": d.page_content, "metadata": d.metadata} for d in docs]
73
74 logger.warning(f"mcp_retrieve unsupported type={memory_type}")
75 return []
76
77
78@mcp.tool()
79async def forget_memory(memory_id: str, agent_id: str) -> dict:
80 """
81 Removes a specific memory from the Vector Store.
82
83 Use with caution: removal of semantic memory is irreversible.
84 Prefer decrementing confidence_score before deleting.
85 """
86 from memory_manager import get_memory_manager
87 manager = get_memory_manager(agent_id)
88 manager._vector.store.delete(ids=[memory_id])
89 logger.info(f"mcp_forget_memory agent={agent_id} id={memory_id}")
90 return {"memory_id": memory_id, "status": "deleted"}
5.3 LangGraph + MCP Memory Tool Integration
1# langchain==0.3.x | langgraph==0.2.x | mcp==1.x
2
3from langchain_core.tools import StructuredTool
4from langchain_mcp_adapters import MCPToolkit
5
6# Connects to the memory MCP server
7toolkit = MCPToolkit(server_url="http://localhost:8000/mcp")
8memory_tools = toolkit.get_tools()
9
10# MCP tools are available to any LangGraph node
11# like any other tool β zero coupling with the implementation
12
13def agent_node_with_memory(state: dict) -> dict:
14 """
15 Agent node with memory access via MCP.
16
17 The agent decides when and what to remember β it is not automatic.
18 Memory-on-demand produces cleaner indexes than memory-on-every-step.
19 """
20 from langchain_openai import ChatOpenAI
21 llm = ChatOpenAI(model="gpt-4o-mini", temperature=0).bind_tools(memory_tools)
22
23 response = llm.invoke(state["messages"])
24 return {"messages": state["messages"] + [response]}
6. Optimization and Costs
6.1 Context Compaction Strategies
Progressive summarization: old memories become summaries, summaries become facts, facts enter the graph. Scheduled consolidation pipeline:
1# apscheduler==3.x | langchain==0.3.x
2
3import logging
4from apscheduler.schedulers.asyncio import AsyncIOScheduler
5from langchain_openai import ChatOpenAI
6
7logger = logging.getLogger(__name__)
8llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
9scheduler = AsyncIOScheduler()
10
11
12@scheduler.scheduled_job("interval", hours=6)
13async def consolidate_old_memories() -> None:
14 """
15 Consolidation job: memories older than 7 days with
16 confidence_score < 0.5 are summarized and removed from the index.
17
18 Memories with confidence_score > 0.9 are promoted to the
19 knowledge graph as structured facts.
20 """
21 from memory_manager import get_all_stale_memories, promote_to_graph, archive_memory
22
23 stale = await get_all_stale_memories(older_than_days=7, max_confidence=0.5)
24 logger.info(f"consolidation_start stale_count={len(stale)}")
25
26 for memory in stale:
27 if memory["confidence_score"] > 0.9:
28 # Promote to graph β extract triple (subject, predicate, object)
29 await promote_to_graph(memory)
30 else:
31 # Archive in SQL and remove from Vector Store
32 await archive_memory(memory)
33
34 logger.info("consolidation_done")
Ebbinghaus-inspired forget curve: the confidence_score of unaccessed memories decrements automatically over time β unused memories are eventually removed without manual intervention.
6.2 Cost Breakdown per Layer
Estimate for a system with 100 active agents, 1,000 sessions/day, 90-day retention (prices verified March 2026 β consult providers before budgeting, values subject to change):
| Layer | Technology | Cost/month (verified) | Scale variable | Source |
|---|
| Conversation | Redis Cloud (1GB) | ~$22 | Simultaneous active sessions | redis.io/pricing |
| Context | Pinecone Standard | min. 50+0.33/GB storage + $16/1M read units | Indexed vectors + query volume | pinecone.io/pricing |
| Knowledge | Neo4j Aura Professional | from $65 | Nodes and relations in graph | neo4j.com/pricing |
| Execution | PostgreSQL RDS (db.t3.medium) | ~$50β100 | Audit log rows | aws.amazon.com/rds/pricing |
| Embeddings | OpenAI text-embedding-3-small | ~$10β40 | Tokens indexed/month | openai.com/pricing |
| Estimated total | | ~$197β577/month | | |
With semantic caching (GPTCache or similar), the estimated reduction in embedding cost is 30β60% in workloads with repetitive queries (illustrative figure).
6.3 Latency vs Consistency Trade-offs by Use Case
| Process type | Required consistency | Primary layer | Expected latency |
|---|
| Compliance / KYC | Strong | Layer 4 (SQL) + Layer 1 (Redis) | 50β200ms |
| Document analysis | Eventual | Layer 2 (Vector) + Layer 3 (Graph) | 30β120ms |
| Conversational onboarding | Eventual | Layer 1 (Redis) | 5β20ms |
| Credit decision | Strong | Layer 4 + Layer 3 (Neo4j) | 80β300ms |
| Report generation | Eventual | Layer 2 (Vector) | 20β80ms |
7. Conclusion: Memory as Competitive Advantage
Systems that learn between sessions β that accumulate knowledge, detect contradictions, and retrieve relevant context without reprocessing β have an operational advantage that cannot be replicated in the short term. Each execution makes the system more accurate. Each session reduces the marginal cost of the next.
Systems without adequate memory restart from zero every time. Operational cost does not scale β it multiplies.
The 4-layer stack presented in this article is not the only valid architecture. It is a defensible starting point: each layer has a clear purpose, technologies proven in production, and explicit criteria for when to migrate data between layers.
Implementation Checklist
If you are designing or auditing the memory architecture of your multi-agent system β Vector Store selection, shared state strategy, MCP integration, or retention compliance.
Keywords: Memory Architecture for AI Agents, persistent memory multi-agent systems, vector store LangChain agents, Redis AI agent memory, MCP persistent memory, active RAG multi-agent systems, Neo4j AI agents.
Published by AI2You β AI-First Technical Series | ai2you.online/en/blog