公司网站建设工作重点,网站更换名称需要重新备案吗,伊春网络运营推广,网站 掌握使用LangChain进行流式处理
流式处理对于基于 LLM 的应用程序对最终用户的响应至关重要#xff0c;重要的 LangChain 原语#xff0c;如 LLMs、解析器、提示、检索器和代理实现了 LangChain Runnable 接口。
该接口提供了两种常见的流式内容的方法#xff1a;
sync strea…使用LangChain进行流式处理
流式处理对于基于 LLM 的应用程序对最终用户的响应至关重要重要的 LangChain 原语如 LLMs、解析器、提示、检索器和代理实现了 LangChain Runnable 接口。
该接口提供了两种常见的流式内容的方法
sync stream 和 async astream流式处理的默认实现从链中流式传输最终输出。async astream_events 和 async astream_log这些方法提供了一种从链中流式传输中间步骤和最终输出的方式。
Using Stream
所有Runnable对象都实现了一个名为stream的同步方法和一个名为astream的异步实体这些方法旨在以块的形式流式传输最终输出只要可用就会产生每个块。
只有在程序中的所有步骤都知道如何处理输入流时即一次处理一个输入块并生成相应的输出块时才能进行流式处理。
LLMs和Chat Models
大型语言模型及其聊天变体是基于 LLM 的应用程序的主要瓶颈。
大型语言模型生成对查询的完整响应可能需要几秒钟。这远远慢于应用程序对最终用户响应感觉灵敏的**~200-300 ms**的阈值。
让应用程序感觉响应更快的关键策略是显示中间进度即通过 token 流式传输模型令牌的输出。
from langchain_openai import ChatOpenAImodel ChatOpenAI()
chunks []
async for chunk in model.astream(你好!):chunks.append(chunk)print(chunk.content, end|, flushTrue)|你|好||有|什|么|可以|帮|助|你|的|吗|||chunks[0]AIMessageChunk(content)我们得到了一个叫做 AIMessageChunk 的东西。这个块代表了一个 AIMessage 的一部分。
消息块是可以添加的 – 可以简单地将它们相加以获得到目前为止响应的状态
Chains
事实上所有LLM申请都涉及更多步骤而不仅仅是调用语言模型
使用 LangChain 表达式语言LCEL构建一个简单的链它结合了提示、模型和解析器并验证流式传输是否有效。
我们使用 StrOutputParser 来解析模型的输出。这是一个简单的解析器它从 AIMessageChunk 中提取内容字段为我们提供模型返回的token。 LCEL 是一种通过将不同的 LangChain 原语链接在一起来指定“程序”的声明方式。使用 LCEL 创建的链受益于stream和astream 的自动实现允许最终输出的流式传输。事实上用 LCEL 创建的链实现了整个标准 Runnable 接口。 from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParserprompt ChatPromptTemplate.from_template(告诉我一个关于 {topic} 的笑话)
model ChatOpenAI()
parser StrOutputParser()
chain prompt | model | parserasync for chunk in chain.astream({topic: 鹦鹉}):print(chunk, end|, flushTrue)|为|什|么|鹦|鹉|会|成|为|最|佳|演|说|家||因|为|它|们|总|是|有|话|可|说||而|且|还|可以|模|仿|人|类|的|语|言||不必使用 LangChain 表达语言来使用 LangChain您可以依赖于标准的 命令式 编程方法通过在每个组件上分别调用 invoke、batch 或 stream将结果分配给变量然后根据需要在下游使用它们。 Working with Input Streams
如果想要在生成时从输出中流式传输 JSON 怎么办
如果您依赖于 json.loads 来解析部分 json那么解析将失败因为部分 json 不会是有效的 json。
有一种方法解析器操作输入流尝试将部分json“自动完成”为有效状态。
from langchain_core.output_parsers import JsonOutputParserchain (model | JsonOutputParser()
) # 由于 Langchain 旧版本中的一个错误JsonOutputParser 未能从某些模型中流式传输结果
async for text in chain.astream(以 JSON 格式输出法国、西班牙和日本的国家及其人口的列表。使用一个带有“countries”外键的字典其中包含一个国家列表。每个国家应该有“name”和“population”关键字。
):print(text, flushTrue){}
{countries: []}
{countries: [{}]}
{countries: [{name: }]}
{countries: [{name: 法}]}
{countries: [{name: 法国}]}
{countries: [{name: 法国, population: 670}]}
{countries: [{name: 法国, population: 670128}]}
{countries: [{name: 法国, population: 67012883}]}
{countries: [{name: 法国, population: 67012883}, {}]}
{countries: [{name: 法国, population: 67012883}, {name: }]}
{countries: [{name: 法国, population: 67012883}, {name: 西}]}
{countries: [{name: 法国, population: 67012883}, {name: 西班}]}
{countries: [{name: 法国, population: 67012883}, {name: 西班牙}]}使用前面的示例并在末尾附加一个提取函数用于从最终的JSON中提取国家/地区名称。 链中的任何步骤如果操作的是最终输入而不是对输入流都可能通过stream或 破坏流功能astream。 稍后我们将讨论astream_events流式传输中间步骤结果的 API。 即使该链包含仅对最终输入进行操作的步骤该 API 也会流式传输中间步骤的结果。 from langchain_core.output_parsers import (JsonOutputParser,
)# 一个操作最终输入而不是输入流的函数
def _extract_country_names(inputs):A function that does not operates on input streams and breaks streaming.if not isinstance(inputs, dict):return if countries not in inputs:return countries inputs[countries]if not isinstance(countries, list):return country_names [country.get(name) for country in countries if isinstance(country, dict)]return country_nameschain model | JsonOutputParser() | _extract_country_namesasync for text in chain.astream(以 JSON 格式输出法国、西班牙和日本的国家及其人口的列表。使用一个带有“countries”外键的字典其中包含一个国家列表。每个国家应该有“name”和“population”关键字。
):print(text, end|, flushTrue)Generator Functions生成器函数
使用可以操作输入流的生成器函数来修复流式处理 生成器函数使用yield的函数允许编写对能够操作输入流的代码 from langchain_core.output_parsers import JsonOutputParserasync def _extract_country_names_streaming(input_stream):A function that operates on input streams.country_names_so_far set()async for input in input_stream:if not isinstance(input, dict):continueif countries not in input:continuecountries input[countries]if not isinstance(countries, list):continuefor country in countries:name country.get(name)if not name:continueif name not in country_names_so_far:yield namecountry_names_so_far.add(name)chain model | JsonOutputParser() | _extract_country_names_streamingasync for text in chain.astream(output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of countries which contains a list of countries. Each country should have the key name and population
):print(text, end|, flushTrue)因为上面的代码依赖于 JSON 自动完成所以您可能会看到国家/地区的部分名称例如Sp和Spain这不是提取结果所想要的 我们关注的是流媒体概念而不一定是链条的结果。 Non-streaming components
非流式组件
一些内置组件如检索器不提供任何 streaming。如果我们尝试对它们进行stream会发生什么
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddingstemplate Answer the question based only on the following context:
{context}Question: {question}prompt ChatPromptTemplate.from_template(template)vectorstore FAISS.from_texts([harrison worked at kensho, harrison likes spicy food],embeddingOpenAIEmbeddings(),
)
retriever vectorstore.as_retriever()chunks [chunk for chunk in retriever.stream(where did harrison work?)]
chunks只有从该组件产生的最终结果被流式传输了。
并非所有组件都必须实现流式传输——在某些情况下流式传输要么是不必要的、困难的要么就是没有意义。 使用非流式组件构建的 LCEL 链在很多情况下仍然能够进行流式传输部分输出的流式传输在链中最后一个非流式步骤之后开始。 retrieval_chain ({context: retriever.with_config(run_nameDocs),question: RunnablePassthrough(),}| prompt| model| StrOutputParser()
)for chunk in retrieval_chain.stream(Where did harrison work? Write 3 made up sentences about this place.
):print(chunk, end|, flushTrue)Based| on| the| given| context|,| the| only| information| provided| about| where| Harrison| worked| is| that| he| worked| at| Ken|sh|o|.| Since| there| are| no| other| details| provided| about| Ken|sh|o|,| I| do| not| have| enough| information| to| write| 3| additional| made| up| sentences| about| this| place|.| I| can| only| state| that| Harrison| worked| at| Ken|sh|o|.||Using Stream Events
Event Streaming is a beta API.
为了让 astream_events API 正常工作
尽可能在代码中使用 async例如异步工具等如果定义自定义函数/可运行对象请传播回调每当使用非 LCEL 的可运行对象时请确保在 LLM 上调用 .astream() 而不是 .ainvoke以强制 LLM 流式传输令牌。
Event Reference
下表是参考表显示了各种 Runnable 对象可能发出的一些事件。 当流正确实现时直到输入流完全被消耗之后才知道可运行的输入。这意味着inputs通常仅包含end事件而不是start事件。 事件名称块输入输出on_chat_model_start[model name]{“messages”: [[SystemMessage, HumanMessage]]}on_chat_model_stream[model name]AIMessageChunk(content“hello”)on_chat_model_end[model name]{“messages”: [[SystemMessage, HumanMessage]]}{“generations”: […], “llm_output”: None, …}on_llm_start[model name]{‘input’: ‘hello’}on_llm_stream[model name]‘Hello’on_llm_end[model name]‘Hello human!’
ChatModel
首先查看聊天模型产生的事件。
events []
async for event in model.astream_events(hello, versionv1):events.append(event)由于这是一个测试版的API使用version“v1”参数将使我们能够最大程度地减少对代码的此类破坏性更改。 看一下一些开始事件和一些结束事件
events[:3][{event: on_chat_model_start,run_id: 555843ed-3d24-4774-af25-fbf030d5e8c4,name: ChatAnthropic,tags: [],metadata: {},data: {input: hello}},{event: on_chat_model_stream,run_id: 555843ed-3d24-4774-af25-fbf030d5e8c4,tags: [],metadata: {},name: ChatAnthropic,data: {chunk: AIMessageChunk(content Hello)}},{event: on_chat_model_stream,run_id: 555843ed-3d24-4774-af25-fbf030d5e8c4,tags: [],metadata: {},name: ChatAnthropic,data: {chunk: AIMessageChunk(content!)}}]events[-2:][{event: on_chat_model_stream,run_id: 555843ed-3d24-4774-af25-fbf030d5e8c4,tags: [],metadata: {},name: ChatAnthropic,data: {chunk: AIMessageChunk(content)}},{event: on_chat_model_end,name: ChatAnthropic,run_id: 555843ed-3d24-4774-af25-fbf030d5e8c4,tags: [],metadata: {},data: {output: AIMessageChunk(content Hello!)}}]Chain
解析流式JSON的标准链
chain (model | JsonOutputParser()
) # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some modelsevents [eventasync for event in chain.astream_events(output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of countries which contains a list of countries. Each country should have the key name and population,versionv1,)
]如果您检查前几个事件您会发现有 3 个不同的开始事件而不是2 个开始事件。
三个启动事件对应
链模型解析器该模型解析器
events[:3][{event: on_chain_start,run_id: b1074bff-2a17-458b-9e7b-625211710df4,name: RunnableSequence,tags: [],metadata: {},data: {input: output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of countries which contains a list of countries. Each country should have the key name and population}},{event: on_chat_model_start,name: ChatAnthropic,run_id: 6072be59-1f43-4f1c-9470-3b92e8406a99,tags: [seq:step:1],metadata: {},data: {input: {messages: [[HumanMessage(contentoutput a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of countries which contains a list of countries. Each country should have the key name and population)]]}}},{event: on_parser_start,name: JsonOutputParser,run_id: bf978194-0eda-4494-ad15-3a5bfe69cd59,tags: [seq:step:2],metadata: {},data: {}}]使用此 API 从模型和解析器获取流事件的输出忽略开始事件、结束事件和链中的事件。
num_events 0async for event in chain.astream_events(output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of countries which contains a list of countries. Each country should have the key name and population,versionv1,
):kind event[event]if kind on_chat_model_stream:print(fChat model chunk: {repr(event[data][chunk].content)},flushTrue,)if kind on_parser_stream:print(fParser chunk: {event[data][chunk]}, flushTrue)num_events 1if num_events 30:# Truncate the outputprint(...)breakChat model chunk: Here
Chat model chunk: is
Chat model chunk: the
Chat model chunk: JSON
Chat model chunk: with
Chat model chunk: the
Chat model chunk: requested
Chat model chunk: countries
Chat model chunk: and
Chat model chunk: their
Chat model chunk: populations
Chat model chunk: :
Chat model chunk: \n\n
Chat model chunk: json
Parser chunk: {}
Chat model chunk: \n{
Chat model chunk: \n
Chat model chunk:
Chat model chunk: countries
Chat model chunk: :
Parser chunk: {countries: []}
Chat model chunk: [
Chat model chunk: \n
Parser chunk: {countries: [{}]}
Chat model chunk: {
...由于模型和解析器都支持流式传输因此我们可以实时看到来自两个组件的流式传输事件
Filtering Events
过滤事件
由于此 API 产生如此多的事件因此能够过滤事件非常有用。
可以按组件名称、组件标签或组件类型进行过滤。 By Name chain model.with_config({run_name: model}) | JsonOutputParser().with_config({run_name: my_parser}
)max_events 0
async for event in chain.astream_events(output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of countries which contains a list of countries. Each country should have the key name and population,versionv1,include_names[my_parser], # 只输出名为my_parser的事件
):print(event)max_events 1if max_events 10:# Truncate outputprint(...)breakBy Type chain model.with_config({run_name: model}) | JsonOutputParser().with_config({run_name: my_parser}
)max_events 0
async for event in chain.astream_events(output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of countries which contains a list of countries. Each country should have the key name and population,versionv1,include_types[chat_model], # 只输出类型为chat_model的事件
):print(event)max_events 1if max_events 10:# Truncate outputprint(...)breakBy Tags 标签由给定可运行对象的子组件继承。 如果您使用标签进行过滤请确保这是您想要的。 chain (model | JsonOutputParser()).with_config({tags: [my_chain]})max_events 0
async for event in chain.astream_events(output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of countries which contains a list of countries. Each country should have the key name and population,versionv1,include_tags[my_chain], # 只输出tags为my_chain的事件
):print(event)max_events 1if max_events 10:# Truncate outputprint(...)breakNon-streaming components
非流式组件
某些组件由于不对输入流进行操作而无法很好地进行流传输虽然这些组件在使用 astream 时可能会中断最终输出的流式传输但 astream_events 仍然会从支持流式传输的中间步骤中产生流式传输事件
通过 astream_events 我们仍然可以看到来自模型和解析器的流输出
num_events 0async for event in chain.astream_events(output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of countries which contains a list of countries. Each country should have the key name and population,versionv1,
):kind event[event]if kind on_chat_model_stream:print(fChat model chunk: {repr(event[data][chunk].content)},flushTrue,)if kind on_parser_stream:print(fParser chunk: {event[data][chunk]}, flushTrue)num_events 1if num_events 30:# Truncate the outputprint(...)breakChat model chunk: Here
Chat model chunk: is
Chat model chunk: the
Chat model chunk: JSON
Chat model chunk: with
Chat model chunk: the
Chat model chunk: requested
Chat model chunk: countries
Chat model chunk: and
Chat model chunk: their
Chat model chunk: populations
Chat model chunk: :
Chat model chunk: \n\n
Chat model chunk: json
Parser chunk: {}
Chat model chunk: \n{
Chat model chunk: \n
Chat model chunk:
Chat model chunk: countries
Chat model chunk: :
Parser chunk: {countries: []}
Chat model chunk: [
Chat model chunk: \n
Parser chunk: {countries: [{}]}
Chat model chunk: {
Chat model chunk: \n
Chat model chunk:
...Propagating Callbacks
传播回调 如果在工具中使用调用可运行对象则需要将回调传播到可运行对象否则不会生成任何流事件。 当使用 RunnableLambdas 或 chain 装饰器时回调会在幕后自动传播。