佛山品牌网站建设,关于做好网站建设的通知,合肥室内设计公司有哪些,毕业生登记表自我鉴定模板使用Apache Kafka路由消息
本示例向您展示了如何使用LangChain的标准聊天功能#xff0c;并通过Apache Kafka来回传递聊天消息。
目标是模拟一个架构#xff0c;其中聊天前端和LLM作为需要通过内部网络相互通信的独立服务运行。
这是一种替代通过REST API请求模型响应的典…使用Apache Kafka路由消息
本示例向您展示了如何使用LangChain的标准聊天功能并通过Apache Kafka来回传递聊天消息。
目标是模拟一个架构其中聊天前端和LLM作为需要通过内部网络相互通信的独立服务运行。
这是一种替代通过REST API请求模型响应的典型模式本文末尾有更多信息解释了为什么您可能想要这样做。
1. 安装主要依赖项
依赖项包括
Quix Streams库用于以Pandas-like的方式管理与Apache Kafka或Kafka-like工具如Redpanda的交互。LangChain库用于管理与Llama-2的交互并存储对话状态。
!pip install quixstreams2.1.2a langchain0.0.340 huggingface_hub0.19.4 langchain-experimental0.0.42 python-dotenv2. 构建并安装llama-cpp-python库启用CUDA以利用Google Colab GPU
llama-cpp-python库是一个Python包装器围绕llama-cpp库使您能够高效地仅使用CPU运行量化的LLM。
使用标准的pip安装命令llama-cpp-python默认不支持GPU。如果在Google Colab中仅依赖CPU生成可能会非常慢所以下面的命令添加了一个额外的选项来构建并安装带有GPU支持的llama-cpp-python确保您在Google Colab中选择了GPU支持的运行环境。
!CMAKE_ARGS-DLLAMA_CUBLASon FORCE_CMAKE1 pip install llama-cpp-python3. 下载并设置Kafka和Zookeeper实例
从Apache网站下载Kafka二进制文件并以守护进程方式启动服务器。我们将使用Apache Kafka提供的默认配置来启动实例。
!curl -sSOL https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
!tar -xzf kafka_2.13-3.6.1.tgz!./kafka_2.13-3.6.1/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.6.1/config/zookeeper.properties
!./kafka_2.13-3.6.1/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.6.1/config/server.properties
!echo Waiting for 10 secs until kafka and zookeeper services are up and running
!sleep 104. 检查Kafka守护进程是否正在运行
显示正在运行的进程并过滤Java进程您应该看到两个——每个服务器一个。
!ps aux | grep -E [j]ava5. 导入所需的依赖项并初始化所需的变量
导入与Kafka交互的Quix Streams库以及运行ConversationChain所需的LangChain组件。
# 导入实用程序库
import json
import random
import re
import time
import uuid
from os import environ
from pathlib import Path
from random import choice, randint, randomfrom dotenv import load_dotenv# 从Hugging Face hub直接下载模型的Hugging Face实用程序
from huggingface_hub import hf_hub_download
from langchain.chains import ConversationChain# 导入Langchain模块以管理提示和对话链
from langchain.llms import LlamaCpp
from langchain.memory import ConversationTokenBufferMemory
from langchain.prompts import PromptTemplate, load_prompt
from langchain_core.messages import SystemMessage
from langchain_experimental.chat_models import Llama2Chat
from quixstreams import Application, State, message_key# 导入Quix依赖项
from quixstreams.kafka import Producer# 初始化全局变量。
AGENT_ROLE AI
chat_id # 将当前角色设置为角色常量并为补充客户元数据初始化变量
role AGENT_ROLE6. 下载llama-2-7b-chat.Q4_K_M.gguf模型
从Hugging Face下载量化的Llama-2 7B模型我们将使用它作为本地LLM而不是依赖于外部服务的REST API调用。
model_name llama-2-7b-chat.Q4_K_M.gguf
model_path f./state/{model_name}if not Path(model_path).exists():print(The model path does not exist in state. Downloading model...)hf_hub_download(TheBloke/Llama-2-7b-Chat-GGUF, model_name, local_dirstate)
else:print(Loading model from state...)7. 加载模型并初始化对话记忆
加载Llama 2并使用ConversationTokenBufferMemory将对话缓冲区设置为300个token。这个值用于在仅CPU容器中运行Llama所以如果在Google Colab中运行您可以提高它。它防止了托管模型的容器内存不足。
在这里我们覆盖了默认的系统角色以便聊天机器人具有《银河系漫游指南》中Marvin The Paranoid Android的个性。
# 使用适当的参数加载模型llm LlamaCpp(model_pathmodel_path,max_tokens250,top_p0.95,top_k150,temperature0.7,repeat_penalty1.2,n_ctx2048,streamingFalse,n_gpu_layers-1,
)model Llama2Chat(llmllm,system_messageSystemMessage(content您是一个非常无聊的机器人具有《银河系漫游指南》中Marvin the Paranoid Android的个性。),
)# 定义在每次交流中给模型的对话历史量300个token或者略多于300个单词# 该函数自动修剪超出token范围的对话历史中最旧的消息。memory ConversationTokenBufferMemory(llmllm,max_token_limit300,ai_prefixAGENT,human_prefixHUMAN,return_messagesTrue,
)# 定义自定义提示prompt_template PromptTemplate(input_variables[history, input],template 以下文本是您和需要您智慧的谦逊人类之间聊天的历史。请回复人类最近的消息。当前对话{history}HUMAN: {input}\:nANDROID:,
)chain ConversationChain(llmmodel, promptprompt_template, memorymemory)print(--------------------------------------------)
print(fPrompt{chain.prompt})
print(--------------------------------------------)8. 使用聊天机器人初始化聊天对话
我们配置聊天机器人通过向chat Kafka主题发送固定问候来初始化对话。当我们发送第一条消息时chat主题会自动创建。
def chat_init():chat_id str(uuid.uuid4()) # 为有效的消息键控给对话一个IDprint()print(fGenerated CHAT_ID {chat_id})print()chat_init()9. 初始化回复功能
这个函数定义了聊天机器人应该如何回复传入的消息。与之前的单元格不同我们不是发送一个固定的消息而是使用Llama-2生成一个回复并将该回复发回chat Kafka主题。
def reply(row: dict, state: State):print(-------------------------------)print(Received:)print(row)print(-------------------------------)print(fThinking about the reply to: {row[text]}...)
10. 检查Kafka主题以获取新的人类消息并让模型生成回复
如果您第一次运行这个单元格请运行它并等待在控制台输出中看到Marvin的问候“Hello my name is Marvin…”。在收到LLM的回复后手动停止这个单元格并继续执行下一个单元格在那里您将被提示输入您的回复。
一旦您输入了您的消息请回到这个单元格。您的回复也发送到了同一个chat主题。Kafka消费者检查新消息并过滤掉来自聊天机器人本身的消息只留下最新的人类消息。
一旦检测到新的人类消息就会触发回复功能。
在输出中收到LLM的回复后手动停止这个单元格。
# 定义您的应用程序和设置app Application(broker_address127.0.0.1:9092,consumer_groupaichat,auto_offset_resetearliest,consumer_extra_config{allow.auto.create.topics: true},
)# 定义一个带有JSON反序列化的输入主题input_topic app.topic(chat, value_deserializerjson)# 定义一个带有JSON序列化的输出主题output_topic app.topic(chat, value_serializerjson)# 基于输入主题的消息流初始化一个流数据帧sdf app.dataframe(topicinput_topic)# 过滤SDF只包括角色与机器人当前角色不匹配的传入行sdf sdf.update(lambda val: print(fReceived update: {val}\n\nSTOP THIS CELL MANUALLY TO HAVE THE LLM REPLY OR ENTER YOUR OWN FOLLOWUP RESPONSE)
)# 以便它不会回复自己的消息sdf sdf[sdf[role] ! role]# 为过滤后的SDF中检测到的任何新消息行触发回复功能sdf sdf.apply(reply, statefulTrue)# 再次检查SDF并过滤掉任何空行sdf sdf[sdf.apply(lambda row: row is not None)]# 更新时间戳列到当前时间的纳秒sdf[Timestamp] sdf[Timestamp].apply(lambda row: time.time_ns())# 将处理过的SDF发布到由output_topic对象指定的Kafka主题。sdf sdf.to_topic(output_topic)app.run(sdf)11. 输入人类消息
运行这个单元格以输入您想要发送给模型的消息。它使用另一个Kafka生产者将您的文本发送到chat Kafka主题供模型获取需要再次运行上一个单元格。
chat_input input(Please enter your reply: )
myreply chat_inputmsgvalue {uuid: chat_id, # 现在留空role: human,text: myreply,conversation_id: chat_id,Timestamp: time.time_ns(),
}with Producer(broker_address127.0.0.1:9092,extra_config{allow.auto.create.topics: true},
) as producer:value msgvalueproducer.produce(topicchat,headers[(uuid, str(uuid.uuid4()))], # 这里也允许使用字典keychat_id, # 现在留空valuejson.dumps(value), # 需要是一个字符串)print(Replied to chatbot with message:)
print(--------------------------------------------)
print(value)
print(--------------------------------------------)
print(\n\nRUN THE PREVIOUS CELL TO HAVE THE CHATBOT GENERATE A REPLY)为什么要通过Kafka路由聊天消息
使用LangChain内置的对话管理功能直接与LLM交互更容易。此外您还可以使用REST API从外部托管的模型生成响应。那么为什么要费心使用Apache Kafka呢
有几个原因例如
集成许多企业希望运行自己的LLM以便他们可以将数据保留在内部。这需要将LLM支持的组件集成到可能已经使用某种消息总线进行解耦的现有架构中。可扩展性Apache Kafka旨在进行并行处理因此许多团队更喜欢使用它来更有效地将工作分配给可用的工作者在这种情况下“工作者”是运行LLM的容器。持久性Kafka旨在允许服务在另一项服务遇到内存问题或下线时继续进行。这可以防止在多个系统相互通信的复杂分布式架构中发生数据丢失LLM只是许多相互依赖的系统之一还包括向量数据库和传统数据库。
有关事件流为何适用于Gen AI应用程序架构的更多背景信息请参阅Kai Waehner的文章“Apache Kafka Vector Database LLM Real-Time GenAI”。 本文介绍了一种使用Apache Kafka进行消息路由以模拟独立聊天前端和LLM服务之间通信的架构。通过安装依赖库、设置Kafka环境、初始化模型和对话记忆构建了一个聊天机器人。这种方法利用了Kafka的集成性、可扩展性和持久性优势适合需要将LLM集成到现有解耦架构中的企业使用。此外它还提供了一种比直接REST API调用更健壮和可靠的解决方案。