promptflow.executor.flow_executor 模块#

class promptflow.executor.flow_executor.FlowExecutor(flow: Flow, connections: ConnectionProvider, run_tracker: RunTracker, cache_manager: AbstractCacheManager, loaded_tools: Mapping[str, Callable], *, raise_ex: bool = False, working_dir=None, line_timeout_sec=None, flow_file=None)#

基类:object

此类别用于为不同的输入执行单个流。

参数:
  • flow (Flow) – 要执行的流。

  • connections (dict) – 用于流的连接。

  • run_tracker (RunTracker) – 用于流的运行跟踪器。

  • cache_manager (AbstractCacheManager) – 用于流的缓存管理器。

  • loaded_tools (Mapping[str, Callable]) – 用于流的已加载工具。

  • worker_count (Optional[int]) – 用于流的工作器数量。默认值为 16。

  • raise_ex (Optional[bool]) – 是否引发异常。默认值为 False。

  • working_dir (Optional[str]) – 用于流的工作目录。默认值为 None。

  • line_timeout_sec (Optional[int]) – 用于流的行超时(秒)。默认值为 LINE_TIMEOUT_SEC。

  • flow_file (Optional[Path]) – 用于流的流文件。默认值为 None。

property aggregation_nodes#

获取流执行器的聚合节点。

返回:

聚合节点列表。

返回类型:

列表

static apply_inputs_mapping(inputs: Mapping[str, Mapping[str, Any]], inputs_mapping: Mapping[str, str]) Dict[str, Any]#
convert_flow_input_types(inputs: dict) Mapping[str, Any]#

将给定输入字典的输入类型转换为与流的预期类型匹配。

参数:

inputs (dict) – 包含流输入的字典。

返回:

包含转换后的输入的字典。

返回类型:

Mapping[str, Any]

classmethod create(flow_file: Path, connections: Union[dict, ConnectionProvider], working_dir: Optional[Path] = None, *, entry: Optional[str] = None, storage: Optional[AbstractRunStorage] = None, raise_ex: bool = True, node_override: Optional[Dict[str, Dict[str, Any]]] = None, line_timeout_sec: Optional[int] = None, init_kwargs: Optional[Dict[str, Any]] = None, **kwargs) FlowExecutor#

创建 FlowExecutor 的新实例。

参数:
  • flow_file (Path) – 流文件的路径。

  • connections (Union[dict, ConnectionProvider]) – 用于流的连接。

  • working_dir (Optional[str]) – 用于流的工作目录。默认值为 None。

  • func (Optional[str]) – 如果提供了 .py 文件,则用于流的函数。默认值为 None。

  • storage (Optional[AbstractRunStorage]) – 用于流的存储。默认值为 None。

  • raise_ex (Optional[bool]) – 是否引发异常。默认值为 True。

  • node_override (Optional[Dict[str, Dict[str, Any]]]) – 用于流的节点覆盖。默认值为 None。

  • line_timeout_sec (Optional[int]) – 用于流的行超时(秒)。默认值为 LINE_TIMEOUT_SEC。

  • init_kwargs (Optional[Dict[str, Any]]) – 可调用类的类初始化参数,仅支持灵活流。

返回:

FlowExecutor 的新实例。

返回类型:

FlowExecutor

enable_streaming_for_llm_flow(stream_required: Callable[[], bool])#

启用连接到输出的 LLM 节点,以返回由 stream_required 控制的流式传输结果。

如果 stream_required 回调返回 True,则 LLM 节点将返回一个字符串生成器。否则,LLM 节点将返回一个字符串。

参数:

stream_required (Callable[[], bool]) – 一个不带参数并返回布尔值(指示是否应为 LLM 节点启用流式传输结果)的回调。

返回:

None (无)

ensure_flow_is_serializable()#

确保流是可序列化的。

某些节点可能会返回字符串生成器以创建流式输出。这在将流部署为 Web 服务时很有用。但是,在交互模式下,执行器假定节点结果是 JSON 可序列化的。

此方法为流中的每个节点添加一个包装器,以使用流式输出并将它们合并为字符串供执行器使用。

返回:

None (无)

exec(inputs: dict, node_concurrency=16) dict#

使用给定输入执行流并返回输出。

参数:
  • inputs (dict) – 包含流输入值的字典。

  • node_concurrency (int) – 可并发执行的最大节点数。

返回:

包含流输出值的字典。

返回类型:

dict

exec_aggregation(inputs: Mapping[str, Any], aggregation_inputs: Mapping[str, Any], run_id=None, node_concurrency=16) AggregationResult#

执行流的聚合节点。

参数:
  • inputs (Mapping[str, Any]) – 输入名称与其值的映射。

  • aggregation_inputs (Mapping[str, Any]) – 聚合输入名称与其值的映射。

  • run_id (Optional[str]) – 当前运行的 ID(如果有)。

  • node_concurrency (int) – 可并发执行的最大节点数。

返回:

聚合节点的结果。

返回类型:

AggregationResult

抛出:

如果输入或聚合输入无效,则为 FlowError。

exec_line(inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, validate_inputs: bool = True, node_concurrency=16, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None) LineResult#

执行流的单行。

参数:
  • inputs (Mapping[str, Any]) – 行的输入值。

  • index (Optional[int]) – 要执行的行的索引。

  • run_id (Optional[str]) – 流运行的 ID。

  • validate_inputs (bool) – 是否验证输入值。

  • node_concurrency (int) – 可并发执行的最大节点数。

  • allow_generator_output (bool) – 是否允许生成器输出。

  • line_timeout_sec (Optional[int]) – 等待一行输出的最长时间。

返回:

执行该行的结果。

返回类型:

LineResult

async exec_line_async(inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, validate_inputs: bool = True, node_concurrency=16, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None, sync_iterator_to_async: bool = True) LineResult#

执行流的单行。

参数:
  • inputs (Mapping[str, Any]) – 行的输入值。

  • index (Optional[int]) – 要执行的行的索引。

  • run_id (Optional[str]) – 流运行的 ID。

  • validate_inputs (bool) – 是否验证输入值。

  • node_concurrency (int) – 可并发执行的最大节点数。

  • allow_generator_output (bool) – 是否允许生成器输出。

  • sync_iterator_to_async (bool) – 是否将同步迭代器输出转换为异步迭代器。

返回:

执行该行的结果。

返回类型:

LineResult

get_inputs_definition()#
get_status_summary(run_id: str)#

获取给定运行状态的摘要。

参数:

run_id (str) – 要获取状态摘要的运行 ID。

返回:

给定运行状态的摘要。

返回类型:

str

property has_aggregation_node: bool#

检查流执行器是否包含任何聚合节点。

返回:

如果流执行器至少有一个聚合节点,则为 True,否则为 False。

返回类型:

bool

classmethod load_and_exec_node(flow_file: Path, node_name: str, *, storage: Optional[AbstractRunStorage] = None, output_sub_dir: Optional[str] = None, flow_inputs: Optional[Mapping[str, Any]] = None, dependency_nodes_outputs: Optional[Mapping[str, Any]] = None, connections: Optional[dict] = None, working_dir: Optional[Path] = None, raise_ex: bool = False)#

从流加载并执行单个节点。

参数:
  • flow_file (Path) – 流文件的路径。

  • node_name (str) – 要执行的节点的名称。

  • storage (Optional[AbstractRunStorage]) – 用于流的存储。

  • output_sub_dir (Optional[str]) – 用于流的图像持久化目录。仅为向后兼容保留。

  • flow_inputs (Optional[Mapping[str, Any]]) – 用于流的输入。默认值为 None。

  • dependency_nodes_outputs (Optional[Mapping[str, Any]) – 依赖节点的输出。默认值为 None。

  • connections (Optional[dict]) – 用于流的连接。默认值为 None。

  • working_dir (Optional[str]) – 用于流的工作目录。默认值为 None。

  • raise_ex (Optional[bool]) – 是否引发异常。默认值为 False。

static update_environment_variables_with_connections(connections: dict)#

使用连接更新环境变量。

参数:

connections (dict) – 包含连接信息的字典。

返回:

包含更新后环境变量的字典。

返回类型:

dict

promptflow.executor.flow_executor.enable_streaming_for_llm_tool(f)#

为支持流模式的 LLM 工具启用流模式。

参数:

f (function) – 要包装的函数。

返回:

包装后的函数。

返回类型:

函数

AzureOpenAI.completion 和 AzureOpenAI.chat 工具同时支持流模式和非流模式。默认情况下关闭流模式。使用此包装器将其打开。

promptflow.executor.flow_executor.execute_flow(flow_file: Path, working_dir: Path, output_dir: Path, connections: dict, inputs: Mapping[str, Any], *, run_id: Optional[str] = None, run_aggregation: bool = True, enable_stream_output: bool = False, allow_generator_output: bool = False, init_kwargs: Optional[dict] = None, **kwargs) LineResult#

执行流,包括聚合节点。

参数:
  • flow_file (Path) – 流文件的路径。

  • working_dir (Path) – 流的工作目录。

  • output_dir (Path) – 相对于 working_dir 的相对路径。

  • connections (dict) – 包含连接信息的字典。

  • inputs (Mapping[str, Any]) – 包含流输入值的字典。

  • enable_stream_output (Optional[bool]) – 是否允许流(生成器)输出作为流输出。默认值为 False。

  • run_id (Optional[str]) – 运行 ID 将设置在操作上下文中并用于会话。

  • init_kwargs (dict) – 灵活流的初始化参数,仅在流为可调用类时支持。

  • kwargs (Any) – 创建流执行器的其他关键字参数。

返回:

执行流的行结果。

返回类型:

LineResult

promptflow.executor.flow_executor.signal_handler(sig, frame)#

处理进程收到的终止信号。

目前,只有单节点运行使用此处理程序。我们打印日志并引发 KeyboardInterrupt,以便外部代码可以捕获此异常并取消正在运行的节点。