向量嵌入
robotmem 使用嵌入向量来实现语义相似度搜索。支持两种后端:ONNX(本地,默认)和 Ollama(HTTP API)。
Embedder 协议
所有嵌入后端实现相同的协议:
class Embedder(Protocol):
@property
def available(self) -> bool: ...
@property
def unavailable_reason(self) -> str: ...
@property
def model(self) -> str: ...
@property
def dim(self) -> int: ...
async def embed_one(self, text: str) -> list[float]: ...
async def embed_batch(self, texts: list[str], batch_size: int = 32) -> list[list[float] | None]: ...
async def check_availability(self) -> bool: ...
async def close(self) -> None: ...
这种基于协议的设计允许通过工厂函数灵活切换后端:
embedder = create_embedder(config)
# config.embed_backend == "onnx" → FastEmbedEmbedder
# config.embed_backend == "ollama" → OllamaEmbedder
ONNX 后端(默认)
ONNX 后端使用 Qdrant 的 fastembed 进行本地 CPU 推理,无需任何外部服务依赖。
关键属性
| 属性 | 值 |
|---|---|
| 模型 | BAAI/bge-small-en-v1.5 |
| 维度 | 384 |
| 模型大小 | ~67MB(自动下载) |
| 延迟 | ~5ms/查询 |
| 依赖 | fastembed(108KB wheel,无 PyTorch) |
| 执行方式 | 纯 CPU,ONNX Runtime |
| 缓存 | ~/.cache/fastembed/ |
初始化
编码器在首次使用时延迟初始化(而非启动时加载):
class FastEmbedEmbedder:
def __init__(self, model, dim, cache_dir):
self._encoder = None # 延迟初始化
self._init_lock = threading.Lock()
def _ensure_encoder(self):
"""线程安全的延迟初始化 — 仅加载模型一次"""
if self._encoder is not None:
return
with self._init_lock:
if self._encoder is not None:
return
from fastembed import TextEmbedding
self._encoder = TextEmbedding(model_name=self._model_name)
关键设计决策:
- 延迟加载:模型仅在首次需要嵌入时才下载/加载
- 线程安全:threading.Lock() 防止重复加载模型
- 双重检查锁定:初始化后避免锁竞争
异步执行
ONNX 推理是同步的,因此使用 run_in_executor 包装以避免阻塞事件循环:
async def embed_one(self, text: str) -> list[float]:
loop = asyncio.get_running_loop()
embeddings = await loop.run_in_executor(
None, lambda: list(self._encoder.embed([text]))
)
return embeddings[0].tolist()
可用性检查
async def check_availability(self) -> bool:
# 1. 加载模型(延迟初始化)
self._ensure_encoder()
# 2. 测试嵌入 "ping"
test_result = await loop.run_in_executor(
None, lambda: list(self._encoder.embed(["ping"]))
)
# 3. 验证维度是否与配置匹配
if len(test_result[0]) == self._dim:
return True
失败模式:
- fastembed 未安装 → 明确的错误信息
- 维度不匹配 → 建议更新 onnx_dim 配置
- 模型下载失败 → 报告异常
配置
{
"embed_backend": "onnx",
"onnx_model": "BAAI/bge-small-en-v1.5",
"onnx_dim": 384,
"fastembed_cache_dir": ""
}
设置 fastembed_cache_dir 可覆盖默认的 ~/.cache/fastembed/ 位置。
Ollama 后端
Ollama 后端通过 HTTP API 连接到本地或远程 Ollama 服务器,支持原生 Ollama API 和 OpenAI 兼容端点。
关键属性
| 属性 | 值 |
|---|---|
| 默认模型 | nomic-embed-text |
| 维度 | 768 |
| 模型大小 | ~274MB |
| 延迟 | ~20-50ms/查询 |
| 依赖 | 运行中的 Ollama 服务器 |
| API 模式 | ollama(原生)或 openai_compat |
HTTP 客户端
使用 httpx.AsyncClient 并配备连接池:
self._client = httpx.AsyncClient(
base_url=self._ollama_url,
timeout=httpx.Timeout(connect=3.0, read=10.0, write=10.0, pool=10.0),
transport=httpx.AsyncHTTPTransport(
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
),
)
客户端通过 asyncio 锁延迟创建,防止竞态条件。
重试策略
每次嵌入调用都有指数退避重试:
第 1 次尝试 → 失败 → 等待 1.0s
第 2 次尝试 → 失败 → 等待 2.0s
第 3 次尝试 → 失败 → 抛出异常
参数: - 最大重试次数:3 - 退避基数:1.0s - 总超时:30s(即使有待执行的重试也会终止) - 并发批次限制:4(信号量控制)
async def _embed_one_inner(self, text):
for attempt in range(self._MAX_RETRIES):
try:
resp = await client.post(endpoint, json=payload)
resp.raise_for_status()
return self._parse_embeddings(resp.json())[0]
except (ConnectError, TimeoutException) as e:
wait = 1.0 * (2 ** attempt) # 1s, 2s, 4s
await asyncio.sleep(wait)
批量嵌入
对于多段文本,使用受控并发进行批量处理:
async def embed_batch(self, texts, batch_size=32):
batches = [texts[i:i+batch_size] for i in range(0, len(texts), batch_size)]
sem = asyncio.Semaphore(4) # 最多 4 个并发批次
results = await asyncio.gather(
*[_limited(b, i) for i, b in enumerate(batches)],
return_exceptions=True,
)
# 失败的批次 → 用 None 填充(部分成功)
失败的批次位置会用 None 填充,允许部分成功 — 由调用方决定如何处理缺失的嵌入。
API 兼容性
支持两种 API 模式:
| 模式 | 端点 | 请求格式 | 响应格式 |
|---|---|---|---|
ollama |
/api/embed |
{"model": "...", "input": "..."} |
{"embeddings": [[...]]} |
openai_compat |
/v1/embeddings |
{"model": "...", "input": "..."} |
{"data": [{"embedding": [...], "index": 0}]} |
OpenAI 兼容模式按 index 字段排序响应,以处理乱序返回的情况。
可用性检查(Ollama 模式)
三阶段验证:
阶段 1:身份检查
GET /api/version
→ 验证响应中包含 "version"(不是同端口上的其他服务)
阶段 2:模型检查
GET /api/tags
→ 验证目标模型已下载
阶段 3:嵌入测试
POST /api/embed {"model": "...", "input": "ping"}
→ 验证返回非空向量
每个阶段都有特定的错误信息:
- 端口上不是 Ollama → "检查端口冲突"
- 模型未找到 → "运行:ollama pull <model>"
- 嵌入超时 → "可能内存不足"
配置
{
"embed_backend": "ollama",
"embedding_model": "nomic-embed-text",
"embedding_dim": 768,
"ollama_url": "http://localhost:11434",
"embed_api": "ollama"
}
对于 OpenAI 兼容服务器:
{
"embed_backend": "ollama",
"embed_api": "openai_compat",
"embedding_model": "your-model",
"embedding_dim": 768,
"ollama_url": "http://your-server:8080"
}
服务冷却
当嵌入服务失败时,ServiceCooldown 机制防止重复的失败连接:
失败 1 → 冷却 60s
失败 2 → 冷却 120s
失败 3 → 冷却 240s
失败 4+ → 冷却 300s(最大值)
成功 → 重置计数器
class ServiceCooldown:
base_cooldown = 60.0 # 初始冷却时间
max_cooldown = 300.0 # 最大冷却时间(5 分钟)
backoff_factor = 2.0 # 指数倍增因子
@property
def current_backoff(self):
return min(
base * (factor ** (failures - 1)),
max_cooldown,
)
冷却期间:
- embedder.available 返回 False
- recall 降级为仅 BM25 模式
- 不发送任何嵌入请求
- 冷却过期 → 下次 check_availability() 调用时重试
向量存储
嵌入向量存储在两个位置:
| 位置 | 格式 | 用途 |
|---|---|---|
memories.embedding |
BLOB(float32 数组) | 备份/重建索引 |
memories_vec(vec0) |
vec0 虚拟表 | KNN 搜索 |
Python 浮点数与 SQLite blob 之间的转换:
import struct
def floats_to_blob(floats: list[float]) -> bytes:
return struct.pack(f'{len(floats)}f', *floats)
def blob_to_floats(blob: bytes) -> list[float]:
count = len(blob) // 4 # 每个 float32 占 4 字节
return list(struct.unpack(f'{count}f', blob))
后端对比
| 维度 | ONNX(默认) | Ollama |
|---|---|---|
| 配置 | 零配置 | 需要 ollama serve + 模型拉取 |
| 速度 | ~5ms/查询 | ~20-50ms/查询 |
| 模型大小 | 67MB(自动下载) | 274MB(nomic-embed-text) |
| CPU/GPU | 纯 CPU | CPU(GPU 可选) |
| 离线 | 首次下载后完全离线 | 需要本地 Ollama 服务器 |
| 维度 | 384d | 768d |
| 质量 | MTEB 检索 51.68 | 某些任务上更高 |
| 多语言 | 有限 | 多语言模型表现更好 |
| 失败模式 | 进程崩溃 → 需重启 | HTTP 超时 → 优雅降级 |
| 重试 | 无重试(本地,速度快) | 3 次重试,指数退避 |
何时选择 ONNX
- 嵌入式/边缘部署,无外部服务
- 低延迟要求(<10ms)
- 以英文为主的工作负载
- 偏好最简配置
何时选择 Ollama
- 多语言需求(中日韩等)
- 需要更高的嵌入质量
- 有 GPU 可用以加速推理
- 已在运行 Ollama 执行其他任务
优雅降级
嵌入系统设计为永远不会阻塞核心功能:
| 故障 | 影响 | 行为 |
|---|---|---|
| ONNX 模型未安装 | 无向量搜索 | 仅 BM25 搜索正常工作 |
| Ollama 服务器宕机 | 无向量搜索 | 仅 BM25,ServiceCooldown 已激活 |
| 单条记忆嵌入失败 | 该记忆无向量 | 仍可通过 BM25 找到 |
| sqlite-vec 未安装 | 无 vec0 表 | 仅 BM25,嵌入仍存储在 memories.embedding 中 |
learn 工具即使嵌入失败也始终成功 — 记忆在没有嵌入的情况下存储,之后可以通过 get_memories_missing_embedding() 进行回填。