In [1]
已复制!
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License.
# 版权所有 (c) 2024 Microsoft Corporation。 # 根据 MIT 许可证获得许可。
In [2]
已复制!
from typing import Any
import numpy as np
import yaml
from graphrag.config.models.vector_store_schema_config import VectorStoreSchemaConfig
from graphrag.data_model.types import TextEmbedder
# GraphRAG vector store components
from graphrag.vector_stores.base import (
BaseVectorStore,
VectorStoreDocument,
VectorStoreSearchResult,
)
from graphrag.vector_stores.factory import VectorStoreFactory
from typing import Any import numpy as np import yaml from graphrag.config.models.vector_store_schema_config import VectorStoreSchemaConfig from graphrag.data_model.types import TextEmbedder # GraphRAG vector store components from graphrag.vector_stores.base import ( BaseVectorStore, VectorStoreDocument, VectorStoreSearchResult, ) from graphrag.vector_stores.factory import VectorStoreFactory
步骤 2:了解 BaseVectorStore 接口¶
在使用自定义向量存储之前,让我们检查 BaseVectorStore
接口,以了解需要实现哪些方法。
In [3]
已复制!
# Let's inspect the BaseVectorStore class to understand the required methods
import inspect
print("BaseVectorStore Abstract Methods:")
print("=" * 40)
abstract_methods = []
for name, method in inspect.getmembers(BaseVectorStore, predicate=inspect.isfunction):
if getattr(method, "__isabstractmethod__", False):
signature = inspect.signature(method)
abstract_methods.append(f"• {name}{signature}")
print(f"• {name}{signature}")
print(f"\nTotal abstract methods to implement: {len(abstract_methods)}")
# 让我们检查 BaseVectorStore 类以了解所需方法 import inspect print("BaseVectorStore 抽象方法:") print("=" * 40) abstract_methods = [] for name, method in inspect.getmembers(BaseVectorStore, predicate=inspect.isfunction): if getattr(method, "__isabstractmethod__", False): signature = inspect.signature(method) abstract_methods.append(f"• {name}{signature}") print(f"• {name}{signature}") print(f"\n需要实现的抽象方法总数: {len(abstract_methods)}")
BaseVectorStore Abstract Methods: ======================================== • connect(self, **kwargs: Any) -> None • filter_by_id(self, include_ids: list[str] | list[int]) -> Any • load_documents(self, documents: list[graphrag.vector_stores.base.VectorStoreDocument], overwrite: bool = True) -> None • search_by_id(self, id: str) -> graphrag.vector_stores.base.VectorStoreDocument • similarity_search_by_text(self, text: str, text_embedder: collections.abc.Callable[[str], list[float]], k: int = 10, **kwargs: Any) -> list[graphrag.vector_stores.base.VectorStoreSearchResult] • similarity_search_by_vector(self, query_embedding: list[float], k: int = 10, **kwargs: Any) -> list[graphrag.vector_stores.base.VectorStoreSearchResult] Total abstract methods to implement: 6
步骤 3:实现自定义向量存储¶
现在让我们实现一个简单的内存向量存储作为示例。此向量存储将:
- 使用 Python 数据结构在内存中存储文档和向量
- 支持所有必需的 BaseVectorStore 方法
注意:这只是一个简化的演示示例。生产向量存储通常会使用优化的库(如 FAISS)、更复杂的索引和持久存储。
In [4]
已复制!
class SimpleInMemoryVectorStore(BaseVectorStore):
"""A simple in-memory vector store implementation for demonstration purposes.
This vector store stores documents and their embeddings in memory and provides
basic similarity search functionality using cosine similarity.
WARNING: This is for demonstration only - not suitable for production use.
For production, consider using optimized vector databases like LanceDB,
Azure AI Search, or other specialized vector stores.
"""
# Internal storage for documents and vectors
documents: dict[str, VectorStoreDocument]
vectors: dict[str, np.ndarray]
connected: bool
def __init__(self, **kwargs: Any):
"""Initialize the in-memory vector store."""
super().__init__(**kwargs)
self.documents: dict[str, VectorStoreDocument] = {}
self.vectors: dict[str, np.ndarray] = {}
self.connected = False
print(f"🚀 SimpleInMemoryVectorStore initialized for index: {self.index_name}")
def connect(self, **kwargs: Any) -> None:
"""Connect to the vector storage (no-op for in-memory store)."""
self.connected = True
print(f"✅ Connected to in-memory vector store: {self.index_name}")
def load_documents(
self, documents: list[VectorStoreDocument], overwrite: bool = True
) -> None:
"""Load documents into the vector store."""
if not self.connected:
msg = "Vector store not connected. Call connect() first."
raise RuntimeError(msg)
if overwrite:
self.documents.clear()
self.vectors.clear()
loaded_count = 0
for doc in documents:
if doc.vector is not None:
doc_id = str(doc.id)
self.documents[doc_id] = doc
self.vectors[doc_id] = np.array(doc.vector, dtype=np.float32)
loaded_count += 1
print(f"📚 Loaded {loaded_count} documents into vector store")
def _cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
"""Calculate cosine similarity between two vectors."""
# Normalize vectors
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return float(np.dot(vec1, vec2) / (norm1 * norm2))
def similarity_search_by_vector(
self, query_embedding: list[float], k: int = 10, **kwargs: Any
) -> list[VectorStoreSearchResult]:
"""Perform similarity search using a query vector."""
if not self.connected:
msg = "Vector store not connected. Call connect() first."
raise RuntimeError(msg)
if not self.vectors:
return []
query_vec = np.array(query_embedding, dtype=np.float32)
similarities = []
# Calculate similarity with all stored vectors
for doc_id, stored_vec in self.vectors.items():
similarity = self._cosine_similarity(query_vec, stored_vec)
similarities.append((doc_id, similarity))
# Sort by similarity (descending) and take top k
similarities.sort(key=lambda x: x[1], reverse=True)
top_k = similarities[:k]
# Create search results
results = []
for doc_id, score in top_k:
document = self.documents[doc_id]
result = VectorStoreSearchResult(document=document, score=score)
results.append(result)
return results
def similarity_search_by_text(
self, text: str, text_embedder: TextEmbedder, k: int = 10, **kwargs: Any
) -> list[VectorStoreSearchResult]:
"""Perform similarity search using text (which gets embedded first)."""
# Embed the text first
query_embedding = text_embedder(text)
# Use vector search with the embedding
return self.similarity_search_by_vector(query_embedding, k, **kwargs)
def filter_by_id(self, include_ids: list[str] | list[int]) -> Any:
"""Build a query filter to filter documents by id.
For this simple implementation, we return the list of IDs as the filter.
"""
return [str(id_) for id_ in include_ids]
def search_by_id(self, id: str) -> VectorStoreDocument:
"""Search for a document by id."""
doc_id = str(id)
if doc_id not in self.documents:
msg = f"Document with id '{id}' not found"
raise KeyError(msg)
return self.documents[doc_id]
def get_stats(self) -> dict[str, Any]:
"""Get statistics about the vector store (custom method)."""
return {
"index_name": self.index_name,
"document_count": len(self.documents),
"vector_count": len(self.vectors),
"connected": self.connected,
"vector_dimension": len(next(iter(self.vectors.values())))
if self.vectors
else 0,
}
print("✅ SimpleInMemoryVectorStore class defined!")
class SimpleInMemoryVectorStore(BaseVectorStore): """一个简单的内存向量存储实现,用于演示目的。此向量存储将文档及其嵌入存储在内存中,并使用余弦相似度提供基本的相似性搜索功能。警告:这仅用于演示 - 不适用于生产环境。对于生产环境,请考虑使用优化的向量数据库,如 LanceDB、Azure AI Search 或其他专门的向量存储。 """ # 用于文档和向量的内部存储 documents: dict[str, VectorStoreDocument] vectors: dict[str, np.ndarray] connected: bool def __init__(self, **kwargs: Any): """初始化内存向量存储。""" super().__init__(**kwargs) self.documents: dict[str, VectorStoreDocument] = {} self.vectors: dict[str, np.ndarray] = {} self.connected = False print(f"🚀 SimpleInMemoryVectorStore 已为索引初始化: {self.index_name}") def connect(self, **kwargs: Any) -> None: """连接到向量存储(对于内存存储,此操作为空)。""" self.connected = True print(f"✅ 已连接到内存向量存储: {self.index_name}") def load_documents( self, documents: list[VectorStoreDocument], overwrite: bool = True ) -> None: """将文档加载到向量存储中。""" if not self.connected: msg = "向量存储未连接。请先调用 connect()。" raise RuntimeError(msg) if overwrite: self.documents.clear() self.vectors.clear() loaded_count = 0 for doc in documents: if doc.vector is not None: doc_id = str(doc.id) self.documents[doc_id] = doc self.vectors[doc_id] = np.array(doc.vector, dtype=np.float32) loaded_count += 1 print(f"📚 已加载 {loaded_count} 个文档到向量存储中") def _cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float: """计算两个向量之间的余弦相似度。""" # 归一化向量 norm1 = np.linalg.norm(vec1) norm2 = np.linalg.norm(vec2) if norm1 == 0 or norm2 == 0: return 0.0 return float(np.dot(vec1, vec2) / (norm1 * norm2)) def similarity_search_by_vector( self, query_embedding: list[float], k: int = 10, **kwargs: Any ) -> list[VectorStoreSearchResult]: """使用查询向量执行相似性搜索。""" if not self.connected: msg = "向量存储未连接。请先调用 connect()。" raise RuntimeError(msg) if not self.vectors: return [] query_vec = np.array(query_embedding, dtype=np.float32) similarities = [] # 计算与所有存储向量的相似度 for doc_id, stored_vec in self.vectors.items(): similarity = self._cosine_similarity(query_vec, stored_vec) similarities.append((doc_id, similarity)) # 按相似度(降序)排序并取前 k 个 similarities.sort(key=lambda x: x[1], reverse=True) top_k = similarities[:k] # 创建搜索结果 results = [] for doc_id, score in top_k: document = self.documents[doc_id] result = VectorStoreSearchResult(document=document, score=score) results.append(result) return results def similarity_search_by_text( self, text: str, text_embedder: TextEmbedder, k: int = 10, **kwargs: Any ) -> list[VectorStoreSearchResult]: """使用文本执行相似性搜索(文本首先被嵌入)。""" # 首先嵌入文本 query_embedding = text_embedder(text) # 使用嵌入向量进行向量搜索 return self.similarity_search_by_vector(query_embedding, k, **kwargs) def filter_by_id(self, include_ids: list[str] | list[int]) -> Any: """构建查询过滤器以按 ID 过滤文档。对于这个简单的实现,我们返回 ID 列表作为过滤器。 """ return [str(id_) for id_ in include_ids] def search_by_id(self, id: str) -> VectorStoreDocument: """按 ID 搜索文档。""" doc_id = str(id) if doc_id not in self.documents: msg = f"未找到 ID 为 '{id}' 的文档" raise KeyError(msg) return self.documents[doc_id] def get_stats(self) -> dict[str, Any]: """获取向量存储的统计信息(自定义方法)。""" return { "index_name": self.index_name, "document_count": len(self.documents), "vector_count": len(self.vectors), "connected": self.connected, "vector_dimension": len(next(iter(self.vectors.values()))) if self.vectors else 0, } print("✅ SimpleInMemoryVectorStore 类已定义!")
✅ SimpleInMemoryVectorStore class defined!
步骤 4:注册自定义向量存储¶
现在让我们向 VectorStoreFactory
注册我们的自定义向量存储,以便它可以在整个 GraphRAG 中使用。
In [5]
已复制!
# Register our custom vector store with a unique identifier
CUSTOM_VECTOR_STORE_TYPE = "simple_memory"
# Register the vector store class
VectorStoreFactory.register(CUSTOM_VECTOR_STORE_TYPE, SimpleInMemoryVectorStore)
print(f"✅ Registered custom vector store with type: '{CUSTOM_VECTOR_STORE_TYPE}'")
# Verify registration
available_types = VectorStoreFactory.get_vector_store_types()
print(f"\n📋 Available vector store types: {available_types}")
print(
f"🔍 Is our custom type supported? {VectorStoreFactory.is_supported_type(CUSTOM_VECTOR_STORE_TYPE)}"
)
# 使用唯一标识符注册我们的自定义向量存储 CUSTOM_VECTOR_STORE_TYPE = "simple_memory" # 注册向量存储类 VectorStoreFactory.register(CUSTOM_VECTOR_STORE_TYPE, SimpleInMemoryVectorStore) print(f"✅ 已使用类型 '{CUSTOM_VECTOR_STORE_TYPE}' 注册自定义向量存储") # 验证注册 available_types = VectorStoreFactory.get_vector_store_types() print(f"\n📋 可用的向量存储类型: {available_types}") print( f"🔍 我们的自定义类型是否受支持? {VectorStoreFactory.is_supported_type(CUSTOM_VECTOR_STORE_TYPE)}" )
✅ Registered custom vector store with type: 'simple_memory' 📋 Available vector store types: ['lancedb', 'azure_ai_search', 'cosmosdb', 'simple_memory'] 🔍 Is our custom type supported? True
步骤 5:测试自定义向量存储¶
让我们创建一些示例数据并测试我们的自定义向量存储实现。
In [6]
已复制!
# Create sample documents with mock embeddings
def create_mock_embedding(dimension: int = 384) -> list[float]:
"""Create a random embedding vector for testing."""
return np.random.normal(0, 1, dimension).tolist()
# Sample documents
sample_documents = [
VectorStoreDocument(
id="doc_1",
text="GraphRAG is a powerful knowledge graph extraction and reasoning framework.",
vector=create_mock_embedding(),
attributes={"category": "technology", "source": "documentation"},
),
VectorStoreDocument(
id="doc_2",
text="Vector stores enable efficient similarity search over high-dimensional data.",
vector=create_mock_embedding(),
attributes={"category": "technology", "source": "research"},
),
VectorStoreDocument(
id="doc_3",
text="Machine learning models can process and understand natural language text.",
vector=create_mock_embedding(),
attributes={"category": "AI", "source": "article"},
),
VectorStoreDocument(
id="doc_4",
text="Custom implementations allow for specialized behavior and integration.",
vector=create_mock_embedding(),
attributes={"category": "development", "source": "tutorial"},
),
]
print(f"📝 Created {len(sample_documents)} sample documents")
# 创建带有模拟嵌入的示例文档 def create_mock_embedding(dimension: int = 384) -> list[float]: """创建用于测试的随机嵌入向量。""" return np.random.normal(0, 1, dimension).tolist() # 示例文档 sample_documents = [ VectorStoreDocument( id="doc_1", text="GraphRAG 是一个强大的知识图谱提取和推理框架。", vector=create_mock_embedding(), attributes={"category": "technology", "source": "documentation"}, ), VectorStoreDocument( id="doc_2", text="向量存储实现了对高维数据的高效相似性搜索。", vector=create_mock_embedding(), attributes={"category": "technology", "source": "research"}, ), VectorStoreDocument( id="doc_3", text="机器学习模型可以处理和理解自然语言文本。", vector=create_mock_embedding(), attributes={"category": "AI", "source": "article"}, ), VectorStoreDocument( id="doc_4", text="自定义实现允许专门的行为和集成。", vector=create_mock_embedding(), attributes={"category": "development", "source": "tutorial"}, ), ] print(f"📝 已创建 {len(sample_documents)} 个示例文档")
📝 Created 4 sample documents
In [7]
已复制!
# Test creating vector store using the factory
schema = VectorStoreSchemaConfig(index_name="test_collection")
# Create vector store instance using factory
vector_store = VectorStoreFactory.create_vector_store(
CUSTOM_VECTOR_STORE_TYPE, vector_store_schema_config=schema
)
print(f"✅ Created vector store instance: {type(vector_store).__name__}")
print(f"📊 Initial stats: {vector_store.get_stats()}")
# 测试使用工厂模式创建向量存储 schema = VectorStoreSchemaConfig(index_name="test_collection") # 使用工厂模式创建向量存储实例 vector_store = VectorStoreFactory.create_vector_store( CUSTOM_VECTOR_STORE_TYPE, vector_store_schema_config=schema ) print(f"✅ 已创建向量存储实例: {type(vector_store).__name__}") print(f"📊 初始统计信息: {vector_store.get_stats()}")
🚀 SimpleInMemoryVectorStore initialized for index: test_collection ✅ Created vector store instance: SimpleInMemoryVectorStore 📊 Initial stats: {'index_name': 'test_collection', 'document_count': 0, 'vector_count': 0, 'connected': False, 'vector_dimension': 0}
In [8]
已复制!
# Connect and load documents
vector_store.connect()
vector_store.load_documents(sample_documents)
print(f"📊 Updated stats: {vector_store.get_stats()}")
# 连接并加载文档 vector_store.connect() vector_store.load_documents(sample_documents) print(f"📊 更新后的统计信息: {vector_store.get_stats()}")
✅ Connected to in-memory vector store: test_collection 📚 Loaded 4 documents into vector store 📊 Updated stats: {'index_name': 'test_collection', 'document_count': 4, 'vector_count': 4, 'connected': True, 'vector_dimension': 384}
In [9]
已复制!
# Test similarity search
query_vector = create_mock_embedding() # Random query vector for testing
search_results = vector_store.similarity_search_by_vector(
query_vector,
k=3, # Get top 3 similar documents
)
print(f"🔍 Found {len(search_results)} similar documents:\n")
for i, result in enumerate(search_results, 1):
doc = result.document
print(f"{i}. ID: {doc.id}")
print(f" Text: {doc.text[:60]}...")
print(f" Similarity Score: {result.score:.4f}")
print(f" Category: {doc.attributes.get('category', 'N/A')}")
print()
# 测试相似性搜索 query_vector = create_mock_embedding() # 用于测试的随机查询向量 search_results = vector_store.similarity_search_by_vector( query_vector, k=3, # 获取前 3 个相似文档 ) print(f"🔍 找到 {len(search_results)} 个相似文档:\n") for i, result in enumerate(search_results, 1): doc = result.document print(f"{i}. ID: {doc.id}") print(f" 文本: {doc.text[:60]}...") print(f" 相似度分数: {result.score:.4f}") print(f" 类别: {doc.attributes.get('category', 'N/A')}") print()
🔍 Found 3 similar documents: 1. ID: doc_1 Text: GraphRAG is a powerful knowledge graph extraction and reason... Similarity Score: 0.0373 Category: technology 2. ID: doc_4 Text: Custom implementations allow for specialized behavior and in... Similarity Score: -0.0061 Category: development 3. ID: doc_2 Text: Vector stores enable efficient similarity search over high-d... Similarity Score: -0.0230 Category: technology
In [10]
已复制!
# Test search by ID
try:
found_doc = vector_store.search_by_id("doc_2")
print("✅ Found document by ID:")
print(f" ID: {found_doc.id}")
print(f" Text: {found_doc.text}")
print(f" Attributes: {found_doc.attributes}")
except KeyError as e:
print(f"❌ Error: {e}")
# Test filter by ID
id_filter = vector_store.filter_by_id(["doc_1", "doc_3"])
print(f"\n🔧 ID filter result: {id_filter}")
# 测试按 ID 搜索 try: found_doc = vector_store.search_by_id("doc_2") print("✅ 通过 ID 找到文档:") print(f" ID: {found_doc.id}") print(f" 文本: {found_doc.text}") print(f" 属性: {found_doc.attributes}") except KeyError as e: print(f"❌ 错误: {e}") # 测试按 ID 过滤 id_filter = vector_store.filter_by_id(["doc_1", "doc_3"]) print(f"\n🔧 ID 过滤结果: {id_filter}")
✅ Found document by ID: ID: doc_2 Text: Vector stores enable efficient similarity search over high-dimensional data. Attributes: {'category': 'technology', 'source': 'research'} 🔧 ID filter result: ['doc_1', 'doc_3']
步骤 6:GraphRAG 的配置¶
现在让我们看看如何在设置文件中配置 GraphRAG 以使用您的自定义向量存储。
In [11]
已复制!
# Example GraphRAG yaml settings
example_settings = {
"vector_store": {
"default_vector_store": {
"type": CUSTOM_VECTOR_STORE_TYPE, # "simple_memory"
"collection_name": "graphrag_entities",
# Add any custom parameters your vector store needs
"custom_parameter": "custom_value",
}
},
# Other GraphRAG configuration...
"models": {
"default_embedding_model": {
"type": "openai_embedding",
"model": "text-embedding-3-small",
}
},
}
# Convert to YAML format for settings.yml
yaml_config = yaml.dump(example_settings, default_flow_style=False, indent=2)
print("📄 Example settings.yml configuration:")
print("=" * 40)
print(yaml_config)
# 示例 GraphRAG yaml 设置 example_settings = { "vector_store": { "default_vector_store": { "type": CUSTOM_VECTOR_STORE_TYPE, # "simple_memory" "collection_name": "graphrag_entities", # 添加您的向量存储所需的任何自定义参数 "custom_parameter": "custom_value", } }, # 其他 GraphRAG 配置... "models": { "default_embedding_model": { "type": "openai_embedding", "model": "text-embedding-3-small", } }, } # 转换为 YAML 格式以用于 settings.yml yaml_config = yaml.dump(example_settings, default_flow_style=False, indent=2) print("📄 settings.yml 配置示例:") print("=" * 40) print(yaml_config)
📄 Example settings.yml configuration: ======================================== models: default_embedding_model: model: text-embedding-3-small type: openai_embedding vector_store: default_vector_store: collection_name: graphrag_entities custom_parameter: custom_value type: simple_memory
步骤 7:与 GraphRAG 管道集成¶
以下是您的自定义向量存储在典型 GraphRAG 管道中的使用方式。
输入 [12]
已复制!
# Example of how GraphRAG would use your custom vector store
def simulate_graphrag_pipeline():
"""Simulate how GraphRAG would use the custom vector store."""
print("🚀 Simulating GraphRAG pipeline with custom vector store...\n")
# 1. GraphRAG creates vector store using factory
schema = VectorStoreSchemaConfig(index_name="graphrag_entities")
store = VectorStoreFactory.create_vector_store(
CUSTOM_VECTOR_STORE_TYPE,
vector_store_schema_config=schema,
similarity_threshold=0.3,
)
store.connect()
print("✅ Step 1: Vector store created and connected")
# 2. During indexing, GraphRAG loads extracted entities
entity_documents = [
VectorStoreDocument(
id=f"entity_{i}",
text=f"Entity {i} description: Important concept in the knowledge graph",
vector=create_mock_embedding(),
attributes={"type": "entity", "importance": i % 3 + 1},
)
for i in range(10)
]
store.load_documents(entity_documents)
print(f"✅ Step 2: Loaded {len(entity_documents)} entity documents")
# 3. During query time, GraphRAG searches for relevant entities
query_embedding = create_mock_embedding()
relevant_entities = store.similarity_search_by_vector(query_embedding, k=5)
print(f"✅ Step 3: Found {len(relevant_entities)} relevant entities for query")
# 4. GraphRAG uses these entities for context building
context_entities = [result.document for result in relevant_entities]
print("✅ Step 4: Context built using retrieved entities")
print(f"📊 Final stats: {store.get_stats()}")
return context_entities
# Run the simulation
context = simulate_graphrag_pipeline()
print(f"\n🎯 Retrieved {len(context)} entities for context building")
# GraphRAG 如何使用您的自定义向量存储示例 def simulate_graphrag_pipeline(): """模拟 GraphRAG 如何使用自定义向量存储。""" print("🚀 正在使用自定义向量存储模拟 GraphRAG 管道...\n") # 1. GraphRAG 使用工厂模式创建向量存储 schema = VectorStoreSchemaConfig(index_name="graphrag_entities") store = VectorStoreFactory.create_vector_store( CUSTOM_VECTOR_STORE_TYPE, vector_store_schema_config=schema, similarity_threshold=0.3, ) store.connect() print("✅ 步骤 1: 向量存储已创建并连接") # 2. 在索引期间,GraphRAG 加载提取的实体 entity_documents = [ VectorStoreDocument( id=f"entity_{i}", text=f"实体 {i} 描述: 知识图谱中的重要概念", vector=create_mock_embedding(), attributes={"type": "entity", "importance": i % 3 + 1}, ) for i in range(10) ] store.load_documents(entity_documents) print(f"✅ 步骤 2: 已加载 {len(entity_documents)} 个实体文档") # 3. 在查询时,GraphRAG 搜索相关实体 query_embedding = create_mock_embedding() relevant_entities = store.similarity_search_by_vector(query_embedding, k=5) print(f"✅ 步骤 3: 找到 {len(relevant_entities)} 个与查询相关的实体") # 4. GraphRAG 使用这些实体构建上下文 context_entities = [result.document for result in relevant_entities] print("✅ 步骤 4: 已使用检索到的实体构建上下文") print(f"📊 最终统计信息: {store.get_stats()}") return context_entities # 运行模拟 context = simulate_graphrag_pipeline() print(f"\n🎯 已为上下文构建检索到 {len(context)} 个实体")
🚀 Simulating GraphRAG pipeline with custom vector store... 🚀 SimpleInMemoryVectorStore initialized for index: graphrag_entities ✅ Connected to in-memory vector store: graphrag_entities ✅ Step 1: Vector store created and connected 📚 Loaded 10 documents into vector store ✅ Step 2: Loaded 10 entity documents ✅ Step 3: Found 5 relevant entities for query ✅ Step 4: Context built using retrieved entities 📊 Final stats: {'index_name': 'graphrag_entities', 'document_count': 10, 'vector_count': 10, 'connected': True, 'vector_dimension': 384} 🎯 Retrieved 5 entities for context building
步骤 8:测试和验证¶
让我们创建一个全面的测试套件,以确保我们的向量存储正常工作。
输入 [13]
已复制!
def test_custom_vector_store():
"""Comprehensive test suite for the custom vector store."""
print("🧪 Running comprehensive vector store tests...\n")
# Test 1: Basic functionality
print("Test 1: Basic functionality")
store = VectorStoreFactory.create_vector_store(
CUSTOM_VECTOR_STORE_TYPE,
vector_store_schema_config=VectorStoreSchemaConfig(index_name="test"),
)
store.connect()
# Load test documents
test_docs = sample_documents[:2]
store.load_documents(test_docs)
assert len(store.documents) == 2, "Should have 2 documents"
assert len(store.vectors) == 2, "Should have 2 vectors"
print("✅ Basic functionality test passed")
# Test 2: Search functionality
print("\nTest 2: Search functionality")
query_vec = create_mock_embedding()
results = store.similarity_search_by_vector(query_vec, k=5)
assert len(results) <= 2, "Should not return more results than documents"
assert all(isinstance(r, VectorStoreSearchResult) for r in results), (
"Should return VectorStoreSearchResult objects"
)
assert all(-1 <= r.score <= 1 for r in results), (
"Similarity scores should be between -1 and 1"
)
print("✅ Search functionality test passed")
# Test 3: Search by ID
print("\nTest 3: Search by ID")
found_doc = store.search_by_id("doc_1")
assert found_doc.id == "doc_1", "Should find correct document"
try:
store.search_by_id("nonexistent")
assert False, "Should raise KeyError for nonexistent ID"
except KeyError:
pass # Expected
print("✅ Search by ID test passed")
# Test 4: Filter functionality
print("\nTest 4: Filter functionality")
filter_result = store.filter_by_id(["doc_1", "doc_2"])
assert filter_result == ["doc_1", "doc_2"], "Should return filtered IDs"
print("✅ Filter functionality test passed")
# Test 5: Error handling
print("\nTest 5: Error handling")
disconnected_store = VectorStoreFactory.create_vector_store(
CUSTOM_VECTOR_STORE_TYPE,
vector_store_schema_config=VectorStoreSchemaConfig(index_name="test2"),
)
try:
disconnected_store.load_documents(test_docs)
assert False, "Should raise error when not connected"
except RuntimeError:
pass # Expected
try:
disconnected_store.similarity_search_by_vector(query_vec)
assert False, "Should raise error when not connected"
except RuntimeError:
pass # Expected
print("✅ Error handling test passed")
print("\n🎉 All tests passed! Your custom vector store is working correctly.")
# Run the tests
test_custom_vector_store()
def test_custom_vector_store(): """自定义向量存储的综合测试套件。""" print("🧪 正在运行综合向量存储测试...\n") # 测试 1: 基本功能 print("测试 1: 基本功能") store = VectorStoreFactory.create_vector_store( CUSTOM_VECTOR_STORE_TYPE, vector_store_schema_config=VectorStoreSchemaConfig(index_name="test"), ) store.connect() # 加载测试文档 test_docs = sample_documents[:2] store.load_documents(test_docs) assert len(store.documents) == 2, "应该有 2 个文档" assert len(store.vectors) == 2, "应该有 2 个向量" print("✅ 基本功能测试通过") # 测试 2: 搜索功能 print("\n测试 2: 搜索功能") query_vec = create_mock_embedding() results = store.similarity_search_by_vector(query_vec, k=5) assert len(results) <= 2, "不应返回超过文档数量的结果" assert all(isinstance(r, VectorStoreSearchResult) for r in results), ( "应返回 VectorStoreSearchResult 对象" ) assert all(-1 <= r.score <= 1 for r in results), ( "相似度分数应在 -1 到 1 之间" ) print("✅ 搜索功能测试通过") # 测试 3: 按 ID 搜索 print("\n测试 3: 按 ID 搜索") found_doc = store.search_by_id("doc_1") assert found_doc.id == "doc_1", "应找到正确的文档" try: store.search_by_id("nonexistent") assert False, "对于不存在的 ID 应引发 KeyError" except KeyError: pass # 预期 print("✅ 按 ID 搜索测试通过") # 测试 4: 过滤功能 print("\n测试 4: 过滤功能") filter_result = store.filter_by_id(["doc_1", "doc_2"]) assert filter_result == ["doc_1", "doc_2"], "应返回过滤后的 ID" print("✅ 过滤功能测试通过") # 测试 5: 错误处理 print("\n测试 5: 错误处理") disconnected_store = VectorStoreFactory.create_vector_store( CUSTOM_VECTOR_STORE_TYPE, vector_store_schema_config=VectorStoreSchemaConfig(index_name="test2"), ) try: disconnected_store.load_documents(test_docs) assert False, "未连接时应引发错误" except RuntimeError: pass # 预期 try: disconnected_store.similarity_search_by_vector(query_vec) assert False, "未连接时应引发错误" except RuntimeError: pass # 预期 print("✅ 错误处理测试通过") print("\n🎉 所有测试通过!您的自定义向量存储正常工作。") # 运行测试 test_custom_vector_store()
🧪 Running comprehensive vector store tests... Test 1: Basic functionality 🚀 SimpleInMemoryVectorStore initialized for index: test ✅ Connected to in-memory vector store: test 📚 Loaded 2 documents into vector store ✅ Basic functionality test passed Test 2: Search functionality ✅ Search functionality test passed Test 3: Search by ID ✅ Search by ID test passed Test 4: Filter functionality ✅ Filter functionality test passed Test 5: Error handling 🚀 SimpleInMemoryVectorStore initialized for index: test2 ✅ Error handling test passed 🎉 All tests passed! Your custom vector store is working correctly.
总结和后续步骤¶
恭喜!您已成功学习如何在 GraphRAG 中实现和注册自定义向量存储。以下是您完成的任务:
您构建了什么¶
- ✅ 自定义向量存储类:实现了包含所有必需方法的
SimpleInMemoryVectorStore
- ✅ 工厂集成:使用
VectorStoreFactory
注册了您的向量存储 - ✅ 全面测试:使用完整的测试套件验证了功能
- ✅ 配置示例:学习了如何配置 GraphRAG 以使用您的向量存储
主要收获¶
- 接口合规性:始终实现
BaseVectorStore
中的所有方法 - 工厂模式:使用
VectorStoreFactory.register()
使您的向量存储可用 - 配置:向量存储在 GraphRAG 设置文件中配置
- 测试:在部署之前彻底测试所有功能
后续步骤¶
查看 API 概述笔记本,了解如何通过 graphrag API 索引和查询数据。
资源¶
祝您构建愉快!🚀