基于类的流#

实验性功能

这是一个实验性功能,随时可能更改。了解更多

当用户需要在多次流运行中将对象(如连接)持久化到内存中时,他们可以将可调用类编写为流入口,并将持久化参数放入 __init__ 方法中。

如果用户需要在批处理运行输出上记录指标,他们可以添加一个 __aggregate__ 方法,该方法将在批处理运行结束后调度。 __aggregate__ 方法应仅包含 1 个参数,即批处理运行结果列表。

有关更多详细信息,请参阅连接支持聚合支持

将类作为流#

假设我们有一个文件 flow_entry.py

class Reply(TypedDict):
    output: str

class MyFlow:
    def __init__(self, model_config: AzureOpenAIModelConfiguration, flow_config: dict):
      """Flow initialization logic goes here."""
      self.model_config = model_config
      self.flow_config = flow_config

    def __call__(question: str) -> Reply:
      """Flow execution logic goes here."""
      return Reply(output=output)

    def __aggregate__(self, line_results: List[str]) -> dict:
      """Aggregation logic goes here. Return key-value pair as metrics."""
      return {"key": val}

流测试#

使用原始代码测试#

由于流的定义是函数/可调用类。我们建议用户像运行其他脚本一样直接运行它

class MyFlow:
    pass
if __name__ == "__main__":
    flow = MyFlow(model_config, flow_config)
    output = flow(question)
    metrics = flow.__aggregate__([output])
    # check metrics here

通过函数调用测试#

它还支持将您的类入口转换为流,并使用 prompt flow 的能力进行测试。

您可以使用以下 CLI 进行测试

# flow entry syntax: path.to.module:ClassName
pf flow test --flow flow_entry:MyFlow --inputs question="What's the capital of France?" --init init.json

注意:目前此命令将在您的工作目录中生成一个 flow.flex.yaml。它将成为流的入口。

在此处查看完整示例:basic-chat

与流聊天#

支持在 CLI 中与流聊天

pf flow test --flow flow_entry:MyFlow --inputs inputs.json --init init.json --ui

有关更多信息,请查看此处

批处理运行#

用户还可以批处理运行流。

pf run create --flow "path.to.module:ClassName" --data "./data.jsonl"
# user can also directly use entry in `flow` param for batch run
pf.run(flow="path.to.module:ClassName", init="./init.jsonl", data="./data.jsonl")

或者直接运行导入的流类或流实例。

from promptflow.core import AzureOpenAIModelConfiguration


class MyFlow:
    pass

config = AzureOpenAIModelConfiguration(
  azure_deployment="my_deployment",
  # connection and api_key configs are exclusive
  connection="my_aoai_connection",
  api_key="actual_key",
)
pf.run(flow=MyFlow, init={"model_config": config, "flow_config": {}}, data="./data.jsonl")
# or
flow_obj = MyFlow(model_config=config, flow_config={})
pf.run(flow=flow_obj, data="./data.jsonl")

有关此主题的更多信息,请参阅运行和评估流

定义流 YAML#

用户可以手动编写名为 flow.flex.yaml 的 YAML 文件,或将函数/可调用入口保存到 YAML 文件中。这是部署或在云中运行等高级场景所必需的。流 YAML 可能如下所示

$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
entry: path.to.module:ClassName

使用 YAML 批处理运行#

用户可以批处理运行流。流 init 函数的参数由 init 参数支持。

用户需要编写一个 JSON 文件作为 init 的值,因为在命令行中编写模型配置很困难。

{
    "model_config": {
        "azure_endpoint": "my_endpoint",
        "azure_deployment": "my_deployment",
        "api_key": "actual_api_key"
    },
    "flow_config": {}
}
pf run create --flow "./flow.flex.yaml" --data "./data.jsonl" --init init.json
pf = PFClient()

config = AzureOpenAIModelConfiguration(
  azure_deployment="my_deployment",
  api_key="actual_key"
)
# if init's value is not json serializable, raise user error
pf.run(flow="./flow.flex.yaml", init={"model_config": config, "flow_config": {}}, data="./data.jsonl")

# when submit to cloud, user can only use connection
# in runtime executor will resolve connection in AzureOpenAIModelConfiguration and set connection's fields to ModelConfig: equal to original ModelConfiguration.from_connection()
config = AzureOpenAIModelConfiguration(
  azure_deployment="my_embedding_deployment",
  connection="my-aoai-connection",
) 
pfazure.run(flow="./flow.flex.yaml", init={"model_config": config, "flow_config": {}}, data="./data.jsonl")

部署流#

用户可以服务流。流 init 函数的参数由 init 参数支持。流应在 YAML 中具有完整的 init/inputs/outputs 规范,以确保可以生成服务 swagger。

用户需要编写一个 JSON 文件作为 init 的值,因为在命令行中编写模型配置很困难。

{
    "model_config": {
        "azure_endpoint": "my_endpoint",
        "azure_deployment": "my_deployment",
        "api_key": "actual_api_key"
    },
    "flow_config": {}
}
# user can only pass model config by file 
pf flow serve --source "./"  --port 8088 --host localhost --init path/to/init.json

了解更多:部署流

聚合支持#

引入聚合支持是为了帮助用户计算指标。

class MyFlow:
    def __call__(text: str) -> str:
      """Flow execution logic goes here."""
      pass

    # will only execute once after batch run finished.
    # the processed_results will be list of __call__'s output and we will log the return value as metrics automatically.
    def __aggregate__(self, processed_results: List[str]) -> dict:
        for element in processed_results:
            # If __call__'s output is primitive type, element will be primitive type.
            # If __call__'s output is dataclass, element will be a dictionary, but can access it's attribute with `element.attribute_name`
            # For other cases, it's recommended to access by key `element["attribute_name"]`

注意:

聚合支持有一些限制

  • 聚合函数只会在批处理运行中执行。

  • 仅支持 1 个硬编码的 __aggregate__ 函数。

  • 在执行时,__aggregate__ 将只传递 1 个位置参数。

  • 聚合函数的输入将是流运行的输出列表。

    • 传递给 __aggregate__ 函数的 processed_results 中的每个元素与每行的 __call__ 返回的对象不相同。

    • 重构的元素是一个字典,它支持 1 层属性访问。但建议通过键访问它们。有关用法,请参阅上面的示例。

  • 如果聚合函数接受多个参数,则在提交阶段引发错误。

后续步骤#