自学内容网 自学内容网

【RAG落地利器】向量数据库Milvus教程:如何实现MetaData检索过滤

TrustRAG项目地址🌟:https://github.com/gomate-community/TrustRAG

可配置的模块化RAG框架

环境依赖

  • 安装 Docker

安装 Milvus

Milvus 在 Milvus 存储库中提供了 Docker Compose 配置文件。要使用 Docker Compose 安装 Milvus,只需运行

# Download the configuration file
$ wget https://github.com/milvus-io/milvus/releases/download/v2.5.3/milvus-standalone-docker-compose.yml -O docker-compose.yml

# Start Milvus
$ sudo docker compose up -d

Creating milvus-etcd  ... done
Creating milvus-minio ... done
Creating milvus-standalone ... done
version: '3.5'

services:
  etcd:
    container_name: milvus-etcd
    image: quay.io/coreos/etcd:v3.5.16
    environment:
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1000
      - ETCD_QUOTA_BACKEND_BYTES=4294967296
      - ETCD_SNAPSHOT_COUNT=50000
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
    command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
    healthcheck:
      test: ["CMD", "etcdctl", "endpoint", "health"]
      interval: 30s
      timeout: 20s
      retries: 3

  minio:
    container_name: milvus-minio
    image: minio/minio:RELEASE.2023-03-20T20-16-18Z
    environment:
      MINIO_ACCESS_KEY: minioadmin
      MINIO_SECRET_KEY: minioadmin
    ports:
      - "9001:9001"
      - "9000:9000"
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
    command: minio server /minio_data --console-address ":9001"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
      interval: 30s
      timeout: 20s
      retries: 3

  standalone:
    container_name: milvus-standalone
    image: milvusdb/milvus:v2.5.3
    command: ["milvus", "run", "standalone"]
    security_opt:
    - seccomp:unconfined
    environment:
      ETCD_ENDPOINTS: etcd:2379
      MINIO_ADDRESS: minio:9000
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"]
      interval: 30s
      start_period: 90s
      timeout: 20s
      retries: 3
    ports:
      - "19530:19530"
      - "9091:9091"
    depends_on:
      - "etcd"
      - "minio"

networks:
  default:
    name: milvus

启动 Milvus 后, 名为milvus-standalone、milvus-minio和milvus-etcd的容器已启动。

  • milvus-etcd容器不向主机暴露任何端口,并将其数据映射到当前文件夹中的volumes/etcd 。
  • milvus-minio容器使用默认身份验证凭据在本地服务端口9090和9091 ,并将其数据映射到当前文件夹中的volumes/minio 。
  • milvus-standalone容器使用默认设置在本地服务端口19530 ,并将其数据映射到当前文件夹中的volumes/milvus

其中minio访问地址:http://localhost:9001/browser,登录密码和用户名均为minioadmin

停止并删除 Milvus

sudo docker compose down
sudo rm -rf volumes

可以按如下方式停止并删除该容器

安装图形化管理工具Attu

以下是整理后的 Markdown 文档:

# 从 Docker 运行 Attu

## 启动容器运行 Attu 的步骤

```bash
docker run -p 8000:3000 -e MILVUS_URL={milvus server IP}:19530 zilliz/attu:v2.4

确保 Attu 容器可以访问 Milvus IP 地址。启动容器后,打开 Web 浏览器并输入 http://{ Attu IP }:8000 以查看 Attu GUI。

运行 Attu Docker 的可选环境变量

范围例子必需的描述
MILVUS_URL192.168.0.1:19530可选,Milvus 服务器 URL
数据库你的数据库可选,默认数据库名称
ATTU_LOG_LEVEL信息可选,设置 Attu 的日志级别
根证书路径/路径/到/根/证书可选,根证书的路径
PRIVATE_KEY_PATH/路径/到/私人/密钥可选,私钥路径
CERT_CHAIN_PATH/路径/到/证书/链可选,证书链的路径
服务器名称你的服务器名称可选,您的服务器名称
服务器端口服务器监听端口可选,若未设置则默认为 3000

请注意,MILVUS_URL 地址必须是 Attu Docker 容器可以访问的地址。因此,“127.0.0.1”或“localhost”不起作用。

使用环境变量运行 Docker 容器

Attu SSL 示例

docker run -p 8000:3000 \
-v /your-tls-file-path:/app/tls \
-e ATTU_LOG_LEVEL=info  \
-e ROOT_CERT_PATH=/app/tls/ca.pem \
-e PRIVATE_KEY_PATH=/app/tls/client.key \
-e CERT_CHAIN_PATH=/app/tls/client.pem \
-e SERVER_NAME=your_server_name \
zilliz/attu:dev

自定义服务器端口示例

此命令允许您使用主机网络运行 docker 容器,并为服务器指定要侦听的自定义端口。

docker run --network host \
-v /your-tls-file-path:/app/tls \
-e ATTU_LOG_LEVEL=info  \
-e SERVER_NAME=your_server_name \
-e SERVER_PORT=8080 \
zilliz/attu:dev

安装访问:http://localhost:8000/#/connect

使用pymilvus操作Milvus

安装依赖环境:

pip install --upgrade pymilvus openai requests tqdm

以下是您提供的代码和说明的Markdown格式版本:

# 准备数据

我们使用Milvus文档2.4.x中的常见问题解答页面作为我们RAG中的私有知识,这对于简单的RAG管道来说是一个很好的数据源。

下载zip文件并将文档提取到文件夹`milvus_docs`中。

```bash
$ wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
$ unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs

我们从文件夹milvus_docs/en/faq中加载所有的markdown文件。对于每个文档,我们简单地用“#”来分隔文件中的内容,这样可以粗略地区分markdown文件各个主体部分的内容。

from glob import glob

text_lines = []

for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
    with open(file_path, "r") as file:
        file_text = file.read()

    text_lines += file_text.split("# ")

准备嵌入模型

我们初始化OpenAI客户端来准备嵌入模型。

from openai import OpenAI

openai_client = OpenAI()

定义一个函数,使用OpenAI客户端生成文本嵌入。我们使用text-embedding-3-small模型作为示例。

def emb_text(text):
    return (
        openai_client.embeddings.create(input=text, model="text-embedding-3-small")
        .data[0]
        .embedding
    )

生成测试嵌入并打印其维度和前几个元素。

test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])

输出:

1536
[0.00988506618887186, -0.005540902726352215, 0.0068014683201909065, -0.03810417652130127, -0.018254263326525688, -0.041231658309698105, -0.007651153020560741, 0.03220026567578316, 0.01892443746328354, 0.00010708322952268645]

创建集合

from pymilvus import MilvusClient

milvus_client = MilvusClient(uri="./milvus_demo.db")

collection_name = "my_rag_collection"

至于MilvusClient的参数:

  • uri设置为本地文件(例如./milvus.db)是最方便的方法,因为它会自动利用Milvus Lite将所有数据存储在此文件中。
  • 如果你有大量数据,你可以在Docker或Kubernetes上搭建性能更佳的Milvus服务器。在此设置中,请使用服务器uri,例如http://localhost:19530,作为你的uri
  • 如果您想使用Milvus的完全托管云服务Zilliz Cloud,请调整uritoken,它们对应于Zilliz Cloud中的公共端点和Api密钥。

检查该集合是否已存在,如果存在则将其删除。

if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

使用指定的参数创建一个新的集合。

如果我们不指定任何字段信息,Milvus会自动创建一个默认id字段作为主键,以及一个vector字段用于存储向量数据。保留的JSON字段用于存储非架构定义的字段及其值。

milvus_client.create_collection(
    collection_name=collection_name,
    dimension=embedding_dim,
    metric_type="IP",  # Inner product distance
    consistency_level="Strong",  # Strong consistency level
)

将数据加载到Milvus中

遍历文本行,创建嵌入,然后将数据插入Milvus。

这里新增了一个字段text,是集合架构中未定义的字段,它将被自动添加到保留的JSON动态字段中,在高层次上可以将其视为普通字段。

from tqdm import tqdm

data = []

for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
    data.append({"id": i, "vector": emb_text(line), "text": line})

milvus_client.insert(collection_name=collection_name, data=data)

输出:

Creating embeddings: 100%|██████████| 72/72 [00:27<00:00,  2.67it/s]

{'insert_count': 72,
 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71],
 'cost': 0}

检索查询数据

让我们指定一个有关Milvus的常见问题。

question = "How is data stored in milvus?"

在集合中搜索问题并检索语义前3个匹配。

search_res = milvus_client.search(
    collection_name=collection_name,
    data=[
        emb_text(question)
    ],  # 使用`emb_text`函数将问题转换为嵌入向量
    limit=3,  # 返回前3个结果
    search_params={"metric_type": "IP", "params": {}},  # 内积距离
    output_fields=["text"],  # 返回text字段
)

让我们看一下查询的搜索结果。

import json

retrieved_lines_with_distances = [
    (res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))

输出:

[
    [
        " Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
        0.7883545756340027
    ],
    [
        "How does Milvus handle vector data types and precision?\n\nMilvus supports Binary, Float32, Float16, and BFloat16 vector types.\n\n- Binary vectors: Store binary data as sequences of 0s and 1s, used in image processing and information retrieval.\n- Float32 vectors: Default storage with a precision of about 7 decimal digits. Even Float64 values are stored with Float32 precision, leading to potential precision loss upon retrieval.\n- Float16 and BFloat16 vectors: Offer reduced precision and memory usage. Float16 is suitable for applications with limited bandwidth and storage, while BFloat16 balances range and efficiency, commonly used in deep learning to reduce computational requirements without significantly impacting accuracy.\n\n###",
        0.6757288575172424
    ],
    [
        "How much does Milvus cost?\n\nMilvus is a 100% free open-source project.\n\nPlease adhere to [Apache License 2.0](http://www.apache.org/licenses/LICENSE-2.0) when using Milvus for production or distribution purposes.\n\nZilliz, the company behind Milvus, also offers a fully managed cloud version of the platform for those that don't want to build and maintain their own distributed instance. [Zilliz Cloud](https://zilliz.com/cloud) automatically maintains data reliability and allows users to pay only for what they use.\n\n###",
        0.6421123147010803
    ]
]

使用LLM获取RAG响应

将检索到的文档转换为字符串格式。

context = "\n".join(
    [line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]

语义检索以及MetaData检索过滤实现

以官方教程为例,我在TrustRAG中对Milvus进行了封装改造:

官方教程:https://milvus.io/docs/build-rag-with-milvus.md
TrusRAG实现代码MilvusEngine:https://github.com/gomate-community/TrustRAG/blob/main/trustrag/modules/engine/milvus.py

MilvusEngine实现如下:

from pymilvus import MilvusClient, DataType
from typing import List, Dict, Any, Optional
import numpy as np
from openai import OpenAI
from trustrag.modules.retrieval.embedding import EmbeddingGenerator
from typing import Union
class MilvusEngine:
    def __init__(
        self,
        collection_name: str,
        embedding_generator: EmbeddingGenerator,
        milvus_client_params: Dict[str, Any] = {"uri": "./milvus_demo.db"},
        vector_size: int = 1536,
        metric_type: str = "IP",  # Inner product distance
        consistency_level: str = "Strong",  # Strong consistency level
    ):
        """
        Initialize the Milvus vector store.

        :param collection_name: Name of the Milvus collection.
        :param embedding_generator: An instance of EmbeddingGenerator to generate embeddings.
        :param milvus_client_params: Dictionary of parameters to pass to MilvusClient.
        :param vector_size: Size of the vectors.
        :param metric_type: Distance metric for vector comparison (default is inner product).
        :param consistency_level: Consistency level for the collection (default is strong).
        """
        self.collection_name = collection_name
        self.vector_size = vector_size
        self.metric_type = metric_type
        self.consistency_level = consistency_level
        self.embedding_generator = embedding_generator

        # Initialize MilvusClient with provided parameters
        self.client = MilvusClient(**milvus_client_params)

        # Create collection if it doesn't exist
        if not self.client.has_collection(self.collection_name):
            self.client.create_collection(
                collection_name=self.collection_name,
                dimension=self.vector_size,
                metric_type=self.metric_type,
                consistency_level=self.consistency_level,
            )

    def upload_vectors(
        self, vectors: Union[np.ndarray, List[List[float]]],
        payload: List[Dict[str, Any]],
        batch_size: int = 256
    ):
        """
        Upload vectors and payload to the Milvus collection.

        :param vectors: A numpy array or list of vectors to upload.
        :param payload: A list of dictionaries containing the payload for each vector.
        :param batch_size: Number of vectors to upload in a single batch.
        """
        if not isinstance(vectors, np.ndarray):
            vectors = np.array(vectors)
        if len(vectors) != len(payload):
            raise ValueError("Vectors and payload must have the same length.")

        data = []
        for i, (vector, payload_item) in enumerate(zip(vectors, payload)):
            data.append({"id": i, "vector": vector.tolist(), **payload_item})

        self.client.insert(collection_name=self.collection_name, data=data)

    def search(
        self, text: str,
        query_filter: str = None,
        limit: int = 5
    ) -> List[Dict[str, Any]]:
        """
        Search for the closest vectors in the collection based on the input text.

        :param text: The text query to search for.
        :param query_filter: Optional filter to apply to the search.
        :param limit: Number of closest results to return.
        :return: List of payloads from the closest vectors.
        """
        # Generate embedding using the provided embedding generator
        vector = self.embedding_generator.generate_embeddings([text])

        # Search for closest vectors in the collection
        search_result = self.client.search(
            collection_name=self.collection_name,
            data=[vector[0]],  # Use the first (and only) embedding
            limit=limit,
            search_params={"metric_type": self.metric_type, "params": {}},
            output_fields=["*"],  # Return all fields
            filter=query_filter,
        )

        # Extract payloads from search results
        payloads = [hit["entity"] for hit in search_result[0]]
        return payloads

    def build_filter(self, conditions: List[Dict[str, Any]]) -> str:
        """
        Build a Milvus filter from a list of conditions.

        :param conditions: A list of conditions, where each condition is a dictionary with:
                          - key: The field name to filter on.
                          - value: The value to match (can be a string, number, or other supported types).
        :return: A Milvus filter dictionary.
        """
        filter_conditions = []
        for condition in conditions:
            key = condition.get("key")
            value = condition.get("value")
            if key and value is not None:
                filter_conditions.append(f"{key} == '{value}'")

        return " and ".join(filter_conditions) if filter_conditions else None

使用如下:

from trustrag.modules.retrieval.embedding import SentenceTransformerEmbedding
from trustrag.modules.engine.milvus import MilvusEngine
if __name__ == '__main__':
    # 初始化 MilvusEngine
    local_embedding_generator = SentenceTransformerEmbedding(model_name_or_path=r"H:\pretrained_models\mteb\all-MiniLM-L6-v2", device="cuda")
    milvus_engine = MilvusEngine(
        collection_name="my_collection",
        embedding_generator=local_embedding_generator,
        milvus_client_params={"uri": "http://localhost:19530"},
        vector_size=384
    )

    documents = [
        {"name": "SaferCodes", "images": "https://safer.codes/img/brand/logo-icon.png",
         "alt": "SaferCodes Logo QR codes generator system forms for COVID-19",
         "description": "QR codes systems for COVID-19.\nSimple tools for bars, restaurants, offices, and other small proximity businesses.",
         "link": "https://safer.codes", "city": "Chicago"},
        {"name": "Human Practice",
         "images": "https://d1qb2nb5cznatu.cloudfront.net/startups/i/373036-94d1e190f12f2c919c3566ecaecbda68-thumb_jpg.jpg?buster=1396498835",
         "alt": "Human Practice -  health care information technology",
         "description": "Point-of-care word of mouth\nPreferral is a mobile platform that channels physicians\u2019 interest in networking with their peers to build referrals within a hospital system.\nHospitals are in a race to employ physicians, even though they lose billions each year ($40B in 2014) on employment. Why ...",
         "link": "http://humanpractice.com", "city": "Chicago"},
        {"name": "StyleSeek",
         "images": "https://d1qb2nb5cznatu.cloudfront.net/startups/i/3747-bb0338d641617b54f5234a1d3bfc6fd0-thumb_jpg.jpg?buster=1329158692",
         "alt": "StyleSeek -  e-commerce fashion mass customization online shopping",
         "description": "Personalized e-commerce for lifestyle products\nStyleSeek is a personalized e-commerce site for lifestyle products.\nIt works across the style spectrum by enabling users (both men and women) to create and refine their unique StyleDNA.\nStyleSeek also promotes new products via its email newsletter, 100% personalized ...",
         "link": "http://styleseek.com", "city": "Chicago"},
        {"name": "Scout",
         "images": "https://d1qb2nb5cznatu.cloudfront.net/startups/i/190790-dbe27fe8cda0614d644431f853b64e8f-thumb_jpg.jpg?buster=1389652078",
         "alt": "Scout -  security consumer electronics internet of things",
         "description": "Hassle-free Home Security\nScout is a self-installed, wireless home security system. We've created a more open, affordable and modern system than what is available on the market today. With month-to-month contracts and portable devices, Scout is a renter-friendly solution for the other ...",
         "link": "http://www.scoutalarm.com", "city": "Chicago"},
        {"name": "Invitation codes", "images": "https://invitation.codes/img/inv-brand-fb3.png",
         "alt": "Invitation App - Share referral codes community ",
         "description": "The referral community\nInvitation App is a social network where people post their referral codes and collect rewards on autopilot.",
         "link": "https://invitation.codes", "city": "Chicago"},
        {"name": "Hyde Park Angels",
         "images": "https://d1qb2nb5cznatu.cloudfront.net/startups/i/61114-35cd9d9689b70b4dc1d0b3c5f11c26e7-thumb_jpg.jpg?buster=1427395222",
         "alt": "Hyde Park Angels - ",
         "description": "Hyde Park Angels is the largest and most active angel group in the Midwest. With a membership of over 100 successful entrepreneurs, executives, and venture capitalists, the organization prides itself on providing critical strategic expertise to entrepreneurs and ...",
         "link": "http://hydeparkangels.com", "city": "Chicago"},
        {"name": "GiveForward",
         "images": "https://d1qb2nb5cznatu.cloudfront.net/startups/i/1374-e472ccec267bef9432a459784455c133-thumb_jpg.jpg?buster=1397666635",
         "alt": "GiveForward -  health care startups crowdfunding",
         "description": "Crowdfunding for medical and life events\nGiveForward lets anyone to create a free fundraising page for a friend or loved one's uncovered medical bills, memorial fund, adoptions or any other life events in five minutes or less. Millions of families have used GiveForward to raise more than $165M to let ...",
         "link": "http://giveforward.com", "city": "Chicago"},
        {"name": "MentorMob",
         "images": "https://d1qb2nb5cznatu.cloudfront.net/startups/i/19374-3b63fcf38efde624dd79c5cbd96161db-thumb_jpg.jpg?buster=1315734490",
         "alt": "MentorMob -  digital media education ventures for good crowdsourcing",
         "description": "Google of Learning, indexed by experts\nProblem: Google doesn't index for learning. Nearly 1 billion Google searches are done for \"how to\" learn various topics every month, from photography to entrepreneurship, forcing learners to waste their time sifting through the millions of results.\nMentorMob is ...",
         "link": "http://www.mentormob.com", "city": "Chicago"},
        {"name": "The Boeing Company",
         "images": "https://d1qb2nb5cznatu.cloudfront.net/startups/i/49394-df6be7a1eca80e8e73cc6699fee4f772-thumb_jpg.jpg?buster=1406172049",
         "alt": "The Boeing Company -  manufacturing transportation", "description": "",
         "link": "http://www.boeing.com", "city": "Berlin"},
        {"name": "NowBoarding \u2708\ufe0f",
         "images": "https://static.above.flights/img/lowcost/envelope_blue.png",
         "alt": "Lowcost Email cheap flights alerts",
         "description": "Invite-only mailing list.\n\nWe search the best weekend and long-haul flight deals\nso you can book before everyone else.",
         "link": "https://nowboarding.club/", "city": "Berlin"},
        {"name": "Rocketmiles",
         "images": "https://d1qb2nb5cznatu.cloudfront.net/startups/i/158571-e53ddffe9fb3ed5e57080db7134117d0-thumb_jpg.jpg?buster=1361371304",
         "alt": "Rocketmiles -  e-commerce online travel loyalty programs hotels",
         "description": "Fueling more vacations\nWe enable our customers to travel more, travel better and travel further. 20M+ consumers stock away miles & points to satisfy their wanderlust.\nFlying around or using credit cards are the only good ways to fill the stockpile today. We've built the third way. Customers ...",
         "link": "http://www.Rocketmiles.com", "city": "Berlin"}

    ]
    # vectors = milvus_engine.embedding_generator.generate_embeddings([doc["description"] for doc in documents])
    # print(vectors.shape)
    # payload = [doc for doc in documents]

    # Upload vectors and payload
    # milvus_engine.upload_vectors(vectors=vectors, payload=payload)
    
    # 直接检索
    results = milvus_engine.search(
        text="vacations travel",
        limit=5
    )
    # 打印结果
    for result in results:
        print(result)

    # 定义过滤条件
    conditions = [
        {"key": "city", "value": "Berlin", "operator": "like"},  # color like "red"
        # {"key": "likes", "value": 50, "operator": ">"}  # likes > 50
    ]


    # 构建过滤表达式
    filter_expr = milvus_engine.build_filter(conditions)
    print("Filter Expression:", filter_expr)

    # 执行搜索
    # 添加mtea过滤
    results = milvus_engine.search(
        text="vacations travel",
        query_filter=filter_expr,
        limit=5
    )

    # 打印结果
    for result in results:
        print(result)


检索结果如下:

参考资料


原文地址:https://blog.csdn.net/yanqianglifei/article/details/145258088

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!