eino

Eino

Eino字节跳动,基于 Golang 的大模型应用综合开发框架

入门案例

用 Eino 及 Ollama 快速搭建一个 LLM 应用。

func main() {
template := prompt.FromMessages(schema.FString,
schema.SystemMessage("你是一个{role}。"),
schema.UserMessage("问题: {question}"),
)

ctx := context.Background()
messages, err := template.Format(ctx, map[string]any{
"role": "乐于助人的助手",
"question": "如何学习 golang?",
})

if err != nil {
panic(err)
}

chatModel, err := ollama.NewChatModel(ctx, &ollama.ChatModelConfig{
BaseURL: "http://localhost:11434", // Ollama 服务地址
Model: "qwen2.5:14b", // 模型名称
})
if err != nil {
panic(err)
}

result, err := chatModel.Generate(ctx, messages)
if err != nil {
panic(err)
}
fmt.Printf("result.Content: %v\n", result.Content)
}

流式传输:

func main() {
template := prompt.FromMessages(schema.FString,
schema.SystemMessage("你是一个{role}。"),
schema.UserMessage("问题: {question}"),
)

ctx := context.Background()
messages, err := template.Format(ctx, map[string]any{
"role": "乐于助人的助手",
"question": "如何学习 golang?",
})

if err != nil {
panic(err)
}

chatModel, err := ollama.NewChatModel(ctx, &ollama.ChatModelConfig{
BaseURL: "http://localhost:11434", // Ollama 服务地址
Model: "qwen2.5:14b", // 模型名称
})
if err != nil {
panic(err)
}

streamResult, err := chatModel.Stream(ctx, messages)
if err != nil {
panic(err)
}

reportStream(streamResult)

}

func reportStream(sr *schema.StreamReader[*schema.Message]) {
defer sr.Close()

i := 0
for {
message, err := sr.Recv()
if err == io.EOF { // 流式输出结束
return
}
if err != nil {
log.Fatalf("recv failed: %v", err)
}
log.Printf("message[%d]: %+v\n", i, message)
i++
}
}

核心概念

Components 组件

Document Loader

Document Loader 是一个用于加载文档的组件。它的主要作用是从不同来源(如网络 URL、本地文件等)加载文档内容,并将其转换为标准的文档格式。这个组件在处理需要从各种来源获取文档内容的场景中发挥重要作用,比如:

  • 从网络 URL 加载网页内容
  • 读取本地 PDF、Word 等格式的文档

示例:

func Run() {
ctx := context.Background()
loader, err := file.NewFileLoader(ctx, &file.FileLoaderConfig{
UseNameAsID: true,
})
if err != nil {
panic(err)
}

filePath := "Your File Path"
docs, err := loader.Load(ctx, document.Source{
URI: filePath,
})
if err != nil {
panic(err)
}

fmt.Println(docs[0].Content)
}

file.FileLoaderConfig 指定了 Loader 的参数配置,以及用哪个 Parser 来解析文档,FileLoader 核心方法 Load() 用于解析一个文档:

type FileLoaderConfig struct {
    UseNameAsID bool
    Parser      parser.Parser
}
// Parser is a document parser, can be used to parse a document from a reader.
type Parser interface {
    Parse(ctx context.Context, reader io.Reader, opts ...Option) ([]*schema.Document, error)
}
func (f *FileLoader) Load(ctx context.Context, src document.Source, opts ...document.LoaderOption) (docs []*schema.Document, err error)

自己实现一个按行解析的 parser.Parser

type LineParser struct{}

func (p *LineParser) Parse(ctx context.Context, reader io.Reader, opts ...parser.Option) ([]*schema.Document, error) {
var docs []*schema.Document

scanner := bufio.NewScanner(reader)
for scanner.Scan() {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

line := scanner.Text()

doc := &schema.Document{
Content: line,
}
docs = append(docs, doc)
}

if err := scanner.Err(); err != nil {
return nil, err
}
return docs, nil
}

func Run() {
ctx := context.Background()
loader, err := file.NewFileLoader(ctx, &file.FileLoaderConfig{
UseNameAsID: true,
Parser: &LineParser{},
})
if err != nil {
panic(err)
}

filePath := "Your file path"
docs, err := loader.Load(ctx, document.Source{
URI: filePath,
})
if err != nil {
panic(err)
}

fmt.Println(docs[0].Content)
}

Document 结构体是文档的标准格式:

  • ID:文档的唯一标识符,用于在系统中唯一标识一个文档
  • Content:文档的实际内容
  • MetaData:文档的元数据,可以存储如下信息:
    • 文档的来源信息
    • 文档的向量表示(用于向量检索)
    • 文档的分数(用于排序)
    • 文档的子索引(用于分层检索)
    • 其他自定义元数据

Embedding

Embedding 组件是一个用于将文本转换为向量表示的组件。它的主要作用是将文本内容映射到向量空间,使得语义相似的文本在向量空间中的距离较近。这个组件在以下场景中发挥重要作用:

  • 文本相似度计算
  • 语义搜索
  • 文本聚类分析
import (
"context"
"fmt"

"github.com/cloudwego/eino-ext/components/embedding/ollama"
)

func Run() {
ctx := context.Background()
embedder, _ := ollama.NewEmbedder(ctx, &ollama.EmbeddingConfig{
BaseURL: "http://118.193.43.101:11434",
Model: "bge-m3:567m",
Timeout: 0,
})
vectorIds, _ := embedder.EmbedStrings(ctx, []string{"hello", "how are you"})

fmt.Println(vectorIds)
}

Embedder 维护了 LLM 的一个 HTTP Client:

func NewEmbedder(ctx context.Context, config *EmbeddingConfig) (*Embedder, error) {
// ...

var httpClient *http.Client
if config.HTTPClient != nil {
httpClient = config.HTTPClient
} else {
httpClient = &http.Client{Timeout: config.Timeout}
}

baseURL, err := url.Parse(config.BaseURL)
if err != nil {
return nil, fmt.Errorf("invalid base URL: %w", err)
}
cli := api.NewClient(baseURL, httpClient) // ollama 官方提供的 apiCient
return &Embedder{
cli: cli,
conf: config,
}, nil
}

embedder.EmbedStrings 通过 ollama apiClient 调用生成对应的 embedding 向量。

Document Transformer

Document Transformer 是一个用于文档转换和处理的组件。它的主要作用是对输入的文档进行各种转换操作,如分割、过滤、合并等,从而得到满足特定需求的文档。这个组件可用于以下场景中:

  • 将长文档分割成小段落以便于处理
  • 根据特定规则过滤文档内容
  • 对文档内容进行结构化转换
  • 提取文档中的特定部分
import (
"context"
"fmt"

"github.com/cloudwego/eino-ext/components/document/transformer/splitter/markdown"
"github.com/cloudwego/eino/schema"
)

func Run() {
ctx := context.Background()
transformer, _ := markdown.NewHeaderSplitter(ctx, &markdown.HeaderConfig{
Headers: map[string]string{
"##": "",
},
})

markdownDoc := &schema.Document{
Content: "## Title 1 \nHello World\n## Title 2\nWorld Hello",
}

transformedDocs, _ := transformer.Transform(ctx, []*schema.Document{markdownDoc})

for idx, doc := range transformedDocs {
fmt.Printf("doc segment %v: %v", idx, doc.Content)
}
}

Transformer 是一个接口,对输入的文档进行转换。其中,src 是待处理的文档列表,返回值是处理完成的文档列表。

type Transformer interface {
Transform(ctx context.Context, src []*schema.Document, opts ...TransformerOption) ([]*schema.Document, error)
}

具体看一下 HeaderSplitter 是怎么实现的 Transformer 接口,来对文档进行处理的。

func (h *headerSplitter) Transform(ctx context.Context, docs []*schema.Document, opts ...document.TransformerOption) ([]*schema.Document, error) {
    var ret []*schema.Document
    for _, doc := range docs {
        result := h.splitText(ctx, doc.Content)
        for i := range result {
            nDoc := &schema.Document{
                ID:       h.idGenerator(ctx, doc.ID, i),
                Content:  result[i].chunk,
                MetaData: deepCopyAnyMap(doc.MetaData),
            }
            if nDoc.MetaData == nil {
                nDoc.MetaData = make(map[string]any, len(result[i].meta))
            }
            for k, v := range result[i].meta {
                nDoc.MetaData[k] = v
            }
            ret = append(ret, nDoc)
        }
    }
    return ret, nil
}
func (h *headerSplitter) splitText(ctx context.Context, text string) []splitResult {
    var recordedMetaList []metaRecord
    recordedMetaMap := make(map[string]string)
    var currentLines []string
    var bInCodeBlock bool
    var openingFence string
    var ret []splitResult
    lines := strings.Split(text, "\n")
    for _, line := range lines {
        if len(line) == 0 {
            continue
        }
        line = strings.TrimSpace(line)
        // check is in code block
        // check if the line starts with headers
        bNewHeader := false
        for header, name := range h.headers {
            if strings.HasPrefix(line, header) && (len(line) == len(header) || line[len(header)] == ' ') {
                if len(currentLines) > 0 {
                    ret = append(ret, splitResult{
                        chunk: strings.Join(currentLines, "\n"),
                        meta:  deepCopyMap(recordedMetaMap),
                    })
                    currentLines = currentLines[:0]
                }

                if !h.trimHeaders {
                    currentLines = append(currentLines, line)
                }

                newLevel := len(header)
                for i := len(recordedMetaList) - 1; i >= 0; i-- {
                    if recordedMetaList[i].level >= newLevel {
                        delete(recordedMetaMap, recordedMetaList[i].name)
                        recordedMetaList = recordedMetaList[:i]
                    } else {
                        break
                    }
                }

                data := strings.TrimSpace(line[len(header):])
                recordedMetaList = append(recordedMetaList, metaRecord{
                    name:  name,
                    level: newLevel,
                    data:  data,
                })
                recordedMetaMap[name] = data

                bNewHeader = true
                break
            }
        }
        if !bNewHeader {
            currentLines = append(currentLines, line)
        }
    }
    ret = append(ret, splitResult{
        chunk: strings.Join(currentLines, "\n"),
        meta:  deepCopyMap(recordedMetaMap),
    })
    return ret
}

ChatModel

ChatModel 组件是一个用于与大模型交互的组件。主要作用是将用户的输入消息发送给语言模型,并获取模型的响应。例如自然语言对话、文本生成、工具调用、多模态交互。

示例,用的是 ark 火山引擎:

func Run() {
err := godotenv.Load()
if err != nil {
panic(err)
}
ctx := context.Background()
model, err := ark.NewChatModel(ctx, &ark.ChatModelConfig{
APIKey: os.Getenv("ARK_API_KEY"),
Model: os.Getenv("MODEL"),
})
input := []*schema.Message{
schema.SystemMessage("你是一个乐于助人的助手"),
schema.UserMessage("你好"),
}

response, err := model.Generate(ctx, input)
if err != nil {
panic(err)
}
print(response.Content)
}

ChatTemplate

type ChatTemplate interface {
Format(ctx context.Context, vs map[string]any, opts ...Option) ([]*schema.Message, error)
}

​ - prompt.FromMessages():用于把多个 message 变成一个 chat template。 - schema.Message{}:实现了 Format 接口的结构体,因此可直接构建 schema.Message{} 作为 template - schema.SystemMessage():此方法是构建 role 为 “system” 的 message 快捷方法 - schema.AssistantMessage():此方法是构建 role 为 “assistant” 的 message 快捷方法 - schema.UserMessage():此方法是构建 role 为 “user” 的 message 快捷方法 - schema.ToolMessage():此方法是构建 role 为 “tool” 的 message 快捷方法 - schema.MessagesPlaceholder():可用于把一个 []*schema.Message 插入到 message 列表中,常用于插入历史对话

示例:

func Run() {
err := godotenv.Load()
if err != nil {
panic(err)
}
ctx := context.Background()
model, err := ark.NewChatModel(ctx, &ark.ChatModelConfig{
APIKey: os.Getenv("ARK_API_KEY"),
Model: os.Getenv("MODEL"),
})
if err != nil {
panic(err)
}
template := prompt.FromMessages(schema.FString,
schema.SystemMessage("你是一个{role}"),
&schema.Message{
Role: schema.User,
Content: "请帮帮我,史瓦罗先生,{task}",
},
)
params := map[string]any{
"role": "机器人史瓦罗先生",
"task": "写一首诗",
}
messages, err := template.Format(ctx, params)
if err != nil {
panic(err)
}

response, err := model.Generate(ctx, messages)
if err != nil {
panic(err)
}
print(response.Content)
}

RAG 搭建

搭建 milvus

services:
etcd:
container_name: milvus-etcd
image: quay.io/coreos/etcd:v3.5.18
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
- ETCD_SNAPSHOT_COUNT=50000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
command: etcd -advertise-client-urls=http://etcd:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
healthcheck:
test: ["CMD", "etcdctl", "endpoint", "health"]
interval: 30s
timeout: 20s
retries: 3

minio:
container_name: milvus-minio
image: minio/minio:RELEASE.2023-03-20T20-16-18Z
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
ports:
- "9001:9001"
- "9000:9000"
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
command: minio server /minio_data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3

standalone:
container_name: milvus-standalone
image: milvusdb/milvus:v2.5.10
command: ["milvus", "run", "standalone"]
security_opt:
- seccomp:unconfined
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"]
interval: 30s
start_period: 90s
timeout: 20s
retries: 3
ports:
- "19530:19530"
- "9091:9091"
depends_on:
- "etcd"
- "minio"
# Attu 服务
attu:
container_name: milvus-attu
image: zilliz/attu:v2.5
ports:
- "8000:3000" # 把本地的 8000 端口映射到容器的 3000 端口 (Attu 默认端口)
environment:
# MILVUS_URL 指向 Docker 网络里的 Milvus standalone 服务
MILVUS_URL: standalone:19530
depends_on:
- standalone # 确保 Milvus 启动后再启动 Attu
networks:
default:
name: milvus
var MilvusClient cli.Client
var collection = "myEino"
var indexer *milvus.Indexer

var fields = []*entity.Field{
	{
		Name:     "id",
		DataType: entity.FieldTypeVarChar,
		TypeParams: map[string]string{
			"max_length": "255",
		},
		PrimaryKey: true,
	},
	{
		Name:     "vector", // 确保字段名匹配
		DataType: entity.FieldTypeBinaryVector,
		TypeParams: map[string]string{
			"dim": "81920",
		},
	},
	{
		Name:     "content",
		DataType: entity.FieldTypeVarChar,
		TypeParams: map[string]string{
			"max_length": "8192",
		},
	},
	{
		Name:     "metadata",
		DataType: entity.FieldTypeJSON,
	},
}

func InitMilvusClient() {
	ctx := context.Background()
	client, err := cli.NewClient(ctx, cli.Config{
		Address: "localhost:19530",
	})
	if err != nil {
		log.Fatal("failed to create milvus client:", err)
	}
	MilvusClient = client
}

func NewMilvusIndexer() *milvus.Indexer {
	if indexer != nil {
		return indexer
	}
	ctx := context.Background()
	if MilvusClient == nil {
		log.Fatal("Milvus client is not initialized")
	}
	embedder := NewEmbedder()
	indexer_, err := milvus.NewIndexer(ctx, &milvus.IndexerConfig{
		Client:            MilvusClient,
		Collection:        collection,
		Fields:            fields,
		Embedding:         embedder,
	})
	if err != nil {
		log.Fatal("failed to create milvus indexer:", err)
	}
	indexer = indexer_
	return indexer
}

func TransDoc() []*schema.Document {
	ctx := context.Background()

	// 初始化分割器
	splitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderConfig{
		Headers: map[string]string{
			"#":   "h1",
			"##":  "h2",
			"###": "h3",
		},
		TrimHeaders: false,
	})
	if err != nil {
		panic(err)
	}

	// 准备要分割的文档
	content, err := os.OpenFile("./Raft.md", os.O_CREATE|os.O_RDWR, 0755)
	if err != nil {
		panic(err)
	}
	defer content.Close()
	bs, err := os.ReadFile("./Raft.md")
	if err != nil {
		panic(err)
	}
	docs := []*schema.Document{
		{
			ID:      uuid.New().String(),
			Content: string(bs),
		},
	}

	// 执行分割
	results, err := splitter.Transform(ctx, docs)
	if err != nil {
		panic(err)
	}

	for i, doc := range results {
		doc.ID = docs[0].ID + "_" + strconv.Itoa(i)
		println(doc.ID)
	}

	return results
}
func NewRetriever() *milvus.Retriever {
	ctx := context.Background()
	embedder := NewEmbedder()
	retriever, err := milvus.NewRetriever(ctx, &milvus.RetrieverConfig{
		Client:      MilvusClient,
		Collection:  collection,
		Partition:   nil,
		VectorField: "vector",
		OutputFields: []string{
			"id",
			"content",
			"metadata",
		},
		TopK:      1,
		Embedding: embedder,
	})
	if err != nil {
		panic(err)
	}

	return retriever
}
func Run() {
	err := godotenv.Load(".env")
	if err != nil {
		panic(err)
	}
	InitMilvusClient()
	indexer := NewMilvusIndexer()
	docs := TransDoc()
	ids, err := indexer.Store(context.Background(), docs)
	if err != nil {
		panic(err)
	}
	fmt.Println(ids)

	retriever := NewRetriever()
	query := "什么是Raft一致性算法?"
	results, err := retriever.Retrieve(context.Background(), query)
	if err != nil {
		panic(err)
	}
	fmt.Println(results)
}

ReAct

ReAct 编排是一种让大型语言模型(LLM)更智能地执行任务的策略,它的核心思想是让 LLM 像人一样思考(Reason)和行动(Act)。

ReAct 的名字来源于两个关键部分:

  • Reason(思考): LLM 会生成思维链(Chain-of-Thought),这是一种详细的推理过程,包括它对问题的理解、如何拆解问题、以及接下来打算做什么。这就像一个人在心里默念自己的思考过程。
  • Act(行动): LLM 会根据它的思考,调用外部工具或执行特定操作。这些“工具”可以是:
    • 搜索引擎:查找最新信息或特定事实。
    • 计算器:执行数学运算。
    • 代码解释器:运行代码来处理数据或验证逻辑。
    • API 接口:与外部系统交互,比如查询天气、发送邮件等。

ReAct 的工作流程可以概括为以下步骤:

  1. 接收任务:用户给 LLM 一个任务或问题。
  2. 思考(Reason):LLM 首先会思考这个问题,生成一个计划或下一步的行动方针。这个思考过程是可见的,比如它会说“我需要先查找XX信息”或者“这个问题可以分解为A和B”。
  3. 行动(Act):根据思考结果,LLM 会选择一个合适的工具并执行操作。例如,如果它需要查找信息,它会调用搜索引擎并生成搜索查询。
  4. 观察(Observe):LLM 会接收到工具执行后的结果(例如,搜索结果、计算结果等)。
  5. 循环:LLM 会根据观察到的结果,再次回到思考步骤。它会分析结果,决定下一步是继续使用工具、修改之前的计划,还是已经得到了最终答案并准备输出。
  6. 输出答案:当 LLM 认为任务已经完成时,它会生成最终的答案。