使用从提示流部署的流式终结点#

在提示流中,你可以将流部署为 REST 终结点以进行实时推理。

通过发送请求使用终结点时,默认行为是联机终结点将一直等待,直到整个响应准备就绪,然后将其发送回客户端。这可能会导致客户端长时间延迟和糟糕的用户体验。

为了避免这种情况,在使用终结点时可以使用流式处理。启用流式处理后,无需等待整个响应准备就绪。相反,服务器将以生成时的块形式将响应发送回来。然后,客户端可以逐步显示响应,减少等待时间并提高交互性。

本文将介绍流式处理的范围、流式处理的工作原理以及如何使用流式处理终结点。

创建启用流式处理的流#

如果要使用流式处理模式,需要创建一个流,其中包含一个生成字符串生成器作为流输出的节点。字符串生成器是一个对象,可以根据请求一次返回一个字符串。可以使用以下类型的节点来创建字符串生成器

  • LLM 节点:此节点使用大型语言模型根据输入生成自然语言响应。

    {# Sample prompt template for LLM node #}
    
    # system:
    You are a helpful assistant.
    
    # user:
    {{question}}
    
  • Python 工具节点:此节点允许你编写可以生成字符串输出的自定义 Python 代码。可以使用此节点调用支持流式处理的外部 API 或库。例如,可以使用此代码逐字回显输入

    from promptflow.core import tool
    
    # Sample code echo input by yield in Python tool node
    
    @tool
    def my_python_tool(paragraph: str) -> str:
        yield "Echo: "
        for word in paragraph.split():
            yield word + " "
    

在本指南中,我们将使用“与维基百科聊天”示例流作为示例。此流处理用户的问题,在维基百科中搜索相关文章,并根据文章中的信息回答问题。它使用流式处理模式显示答案生成进度。

chat_wikipedia.png

将流部署为联机终结点#

要使用流式处理模式,需要将流部署为联机终结点。这允许你实时发送请求并接收流中的响应。

按照此指南将流部署为联机终结点。

[!NOTE]

你可以按照此文档部署联机终结点。请部署运行时环境版本晚于版本20230816.v10。你可以在运行时详细信息页中检查运行时版本并更新运行时。

了解流式处理过程#

当你有联机终结点时,客户端和服务器需要遵循特定的内容协商原则才能利用流式处理模式

内容协商就像客户端和服务器之间就他们想要发送和接收的数据的首选格式进行的对话。它确保有效沟通并就交换数据的格式达成一致。

要了解流式处理过程,请考虑以下步骤

  • 首先,客户端构造一个 HTTP 请求,其中包含在Accept标头中所需的媒体类型。媒体类型告诉服务器客户端期望什么样的数据格式。这就像客户端说:“嘿,我正在寻找你将发送给我的数据的特定格式。它可能是 JSON、文本或其他一些格式。”例如,application/json表示偏爱 JSON 数据,text/event-stream表示希望流式处理数据,而*/*表示客户端接受任何数据格式。

    [!NOTE]

    如果请求缺少Accept标头或Accept标头为空,则表示客户端将接受响应中的任何媒体类型。服务器将其视为*/*

  • 接下来,服务器根据Accept标头中指定的媒体类型进行响应。值得注意的是,客户端可以在Accept标头中请求多种媒体类型,服务器必须考虑其功能和格式优先级以确定适当的响应。

    • 首先,服务器检查Accept标头中是否明确指定了text/event-stream

      • 对于启用流的流,服务器返回一个Content-Typetext/event-stream的响应,表明数据正在流式传输。

      • 对于未启用流的流,服务器将继续检查标头中指定的其他媒体类型。

    • 如果未指定text/event-stream,则服务器会检查Accept标头中是否指定了application/json*/*

      • 在这种情况下,服务器返回一个Content-Typeapplication/json的响应,以 JSON 格式提供数据。

    • 如果Accept标头指定了其他媒体类型,例如text/html

      • 服务器返回一个424响应,其中包含 PromptFlow 运行时错误代码UserError和运行时 HTTP 状态406,表示服务器无法使用请求的数据格式满足请求。

      注意:有关详细信息,请参阅处理错误

  • 最后,客户端检查Content-Type响应标头。如果将其设置为text/event-stream,则表示数据正在流式传输。

让我们仔细看看流式处理过程是如何工作的。流式处理模式下的响应数据遵循服务器发送事件 (SSE) 的格式。

整个过程如下:

0. 客户端向服务器发送消息。#

POST https://<your-endpoint>.inference.ml.azure.com/score
Content-Type: application/json
Authorization: Bearer <key or token of your endpoint>
Accept: text/event-stream

{
    "question": "Hello",
    "chat_history": []
}

[!NOTE]

Accept 标头设置为 text/event-stream 以请求流式响应。

1. 服务器以流模式发回响应。#

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8
Connection: close
Transfer-Encoding: chunked

data: {"answer": ""}

data: {"answer": "Hello"}

data: {"answer": "!"}

data: {"answer": " How"}

data: {"answer": " can"}

data: {"answer": " I"}

data: {"answer": " assist"}

data: {"answer": " you"}

data: {"answer": " today"}

data: {"answer": " ?"}

data: {"answer": ""}

请注意,Content-Type 设置为 text/event-stream; charset=utf-8,表示响应是事件流。

客户端应将响应数据解码为服务器发送事件并递增显示。发送完所有数据后,服务器将关闭 HTTP 连接。

每个响应事件都是相对于上一个事件的增量。建议客户端在内存中跟踪合并数据,并在下一个请求中将其作为聊天历史记录发送回服务器。

2. 客户端向服务器发送另一条聊天消息,以及完整的聊天历史记录。#

POST https://<your-endpoint>.inference.ml.azure.com/score
Content-Type: application/json
Authorization: Bearer <key or token of your endpoint>
Accept: text/event-stream

{
    "question": "Glad to know you!",
    "chat_history": [
        {
            "inputs": {
                "question": "Hello"
            },
            "outputs": {
                "answer": "Hello! How can I assist you today?"
            }
        }
    ]
}

3. 服务器以流模式发回答案。#

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8
Connection: close
Transfer-Encoding: chunked

data: {"answer": ""}

data: {"answer": "Nice"}

data: {"answer": " to"}

data: {"answer": " know"}

data: {"answer": " you"}

data: {"answer": " too"}

data: {"answer": "!"}

data: {"answer": " Is"}

data: {"answer": " there"}

data: {"answer": " anything"}

data: {"answer": " I"}

data: {"answer": " can"}

data: {"answer": " help"}

data: {"answer": " you"}

data: {"answer": " with"}

data: {"answer": "?"}

data: {"answer": ""}

4. 聊天以类似方式继续。#

处理错误#

客户端应首先检查 HTTP 响应代码。有关联机终结点返回的常见错误代码,请参阅此表

如果响应代码为“424 模型错误”,则表示错误是由模型的代码引起的。PromptFlow 模型的错误响应始终遵循此格式

{
  "error": {
    "code": "UserError",
    "message": "Media type text/event-stream in Accept header is not acceptable. Supported media type(s) - application/json",
  }
}
  • 它始终是一个 JSON 字典,只定义一个键“error”。

  • “error”的值是一个字典,包含“code”、“message”。

  • “code”定义错误类别。目前,对于错误的客户端输入可能是“UserError”,对于服务内部的错误可能是“SystemError”。

  • “message”是错误的描述。它可以显示给最终用户。

如何使用服务器发送事件#

使用 Python 消费#

在此示例用法中,我们使用 SSEClient 类。此并非 Python 内置类,需要单独安装。你可以通过 pip 安装:

pip install sseclient-py

一个示例用法如下:

import requests
from sseclient import SSEClient
from requests.exceptions import HTTPError

try:
    response = requests.post(url, json=body, headers=headers, stream=stream)
    response.raise_for_status()

    content_type = response.headers.get('Content-Type')
    if "text/event-stream" in content_type:
        client = SSEClient(response)
        for event in client.events():
            # Handle event, i.e. print to stdout
    else:
        # Handle json response

except HTTPError:
    # Handle exceptions

使用 JavaScript 消费#

有几个库可以在 JavaScript 中使用服务器发送事件。这里是其中一个示例

使用 Python 的示例聊天应用程序#

这是一个用 Python 编写的示例聊天应用程序。(点击此处查看源代码。)

chat_app

高级用法 - 混合流和非流流输出#

有时,你可能希望从流输出中获取流和非流结果。例如,在“与维基百科聊天”流中,你可能不仅希望获取 LLM 的答案,还希望获取流搜索的 URL 列表。为此,你需要修改流以输出流 LLM 的答案和非流 URL 列表的组合。

在示例“与维基百科聊天”流中,输出连接到 LLM 节点augmented_chat。要将 URL 列表添加到输出中,你需要添加一个名为url且值为${get_wiki_url.output}的输出字段。

chat_wikipedia_dual_output_center.png

流的输出将是一个非流字段作为基准,一个流字段作为增量。下面是一个请求和响应的示例。

0. 客户端向服务器发送消息。#

POST https://<your-endpoint>.inference.ml.azure.com/score
Content-Type: application/json
Authorization: Bearer <key or token of your endpoint>
Accept: text/event-stream
{
    "question": "When was ChatGPT launched?",
    "chat_history": []
}

1. 服务器以流模式发回答案。#

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8
Connection: close
Transfer-Encoding: chunked

data: {"url": ["https://en.wikipedia.org/w/index.php?search=ChatGPT", "https://en.wikipedia.org/w/index.php?search=GPT-4"]}

data: {"answer": ""}

data: {"answer": "Chat"}

data: {"answer": "G"}

data: {"answer": "PT"}

data: {"answer": " was"}

data: {"answer": " launched"}

data: {"answer": " on"}

data: {"answer": " November"}

data: {"answer": " "}

data: {"answer": "30"}

data: {"answer": ","}

data: {"answer": " "}

data: {"answer": "202"}

data: {"answer": "2"}

data: {"answer": "."}

data: {"answer": " \n\n"}

...

data: {"answer": "PT"}

data: {"answer": ""}

2. 客户端向服务器发送另一条聊天消息,以及完整的聊天历史记录。#

POST https://<your-endpoint>.inference.ml.azure.com/score
Content-Type: application/json
Authorization: Bearer <key or token of your endpoint>
Accept: text/event-stream
{
    "question": "When did OpenAI announce GPT-4? How long is it between these two milestones?",
    "chat_history": [
        {
            "inputs": {
                "question": "When was ChatGPT launched?"
            },
            "outputs": {
                "url": [
                    "https://en.wikipedia.org/w/index.php?search=ChatGPT",
                    "https://en.wikipedia.org/w/index.php?search=GPT-4"
                ],
                "answer": "ChatGPT was launched on November 30, 2022. \n\nSOURCES: https://en.wikipedia.org/w/index.php?search=ChatGPT"
            }
        }
    ]
}

3. 服务器以流模式发回答案。#

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8
Connection: close
Transfer-Encoding: chunked

data: {"url": ["https://en.wikipedia.org/w/index.php?search=Generative pre-trained transformer ", "https://en.wikipedia.org/w/index.php?search=Microsoft "]}

data: {"answer": ""}

data: {"answer": "Open"}

data: {"answer": "AI"}

data: {"answer": " released"}

data: {"answer": " G"}

data: {"answer": "PT"}

data: {"answer": "-"}

data: {"answer": "4"}

data: {"answer": " in"}

data: {"answer": " March"}

data: {"answer": " "}

data: {"answer": "202"}

data: {"answer": "3"}

data: {"answer": "."}

data: {"answer": " Chat"}

data: {"answer": "G"}

data: {"answer": "PT"}

data: {"answer": " was"}

data: {"answer": " launched"}

data: {"answer": " on"}

data: {"answer": " November"}

data: {"answer": " "}

data: {"answer": "30"}

data: {"answer": ","}

data: {"answer": " "}

data: {"answer": "202"}

data: {"answer": "2"}

data: {"answer": "."}

data: {"answer": " The"}

data: {"answer": " time"}

data: {"answer": " between"}

data: {"answer": " these"}

data: {"answer": " two"}

data: {"answer": " milestones"}

data: {"answer": " is"}

data: {"answer": " approximately"}

data: {"answer": " "}

data: {"answer": "3"}

data: {"answer": " months"}

data: {"answer": ".\n\n"}

...

data: {"answer": "Chat"}

data: {"answer": "G"}

data: {"answer": "PT"}

data: {"answer": ""}