promptflow.executor.flow_executor 模块#
- class promptflow.executor.flow_executor.FlowExecutor(flow: Flow, connections: ConnectionProvider, run_tracker: RunTracker, cache_manager: AbstractCacheManager, loaded_tools: Mapping[str, Callable], *, raise_ex: bool = False, working_dir=None, line_timeout_sec=None, flow_file=None)#
基类:
object
此类别用于为不同的输入执行单个流。
- 参数:
flow (Flow) – 要执行的流。
connections (dict) – 用于流的连接。
run_tracker (RunTracker) – 用于流的运行跟踪器。
cache_manager (AbstractCacheManager) – 用于流的缓存管理器。
loaded_tools (Mapping[str, Callable]) – 用于流的已加载工具。
worker_count (Optional[int]) – 用于流的工作器数量。默认值为 16。
raise_ex (Optional[bool]) – 是否引发异常。默认值为 False。
working_dir (Optional[str]) – 用于流的工作目录。默认值为 None。
line_timeout_sec (Optional[int]) – 用于流的行超时(秒)。默认值为 LINE_TIMEOUT_SEC。
flow_file (Optional[Path]) – 用于流的流文件。默认值为 None。
- property aggregation_nodes#
获取流执行器的聚合节点。
- 返回:
聚合节点列表。
- 返回类型:
列表
- static apply_inputs_mapping(inputs: Mapping[str, Mapping[str, Any]], inputs_mapping: Mapping[str, str]) Dict[str, Any] #
- convert_flow_input_types(inputs: dict) Mapping[str, Any] #
将给定输入字典的输入类型转换为与流的预期类型匹配。
- 参数:
inputs (dict) – 包含流输入的字典。
- 返回:
包含转换后的输入的字典。
- 返回类型:
Mapping[str, Any]
- classmethod create(flow_file: Path, connections: Union[dict, ConnectionProvider], working_dir: Optional[Path] = None, *, entry: Optional[str] = None, storage: Optional[AbstractRunStorage] = None, raise_ex: bool = True, node_override: Optional[Dict[str, Dict[str, Any]]] = None, line_timeout_sec: Optional[int] = None, init_kwargs: Optional[Dict[str, Any]] = None, **kwargs) FlowExecutor #
创建 FlowExecutor 的新实例。
- 参数:
flow_file (Path) – 流文件的路径。
connections (Union[dict, ConnectionProvider]) – 用于流的连接。
working_dir (Optional[str]) – 用于流的工作目录。默认值为 None。
func (Optional[str]) – 如果提供了 .py 文件,则用于流的函数。默认值为 None。
storage (Optional[AbstractRunStorage]) – 用于流的存储。默认值为 None。
raise_ex (Optional[bool]) – 是否引发异常。默认值为 True。
node_override (Optional[Dict[str, Dict[str, Any]]]) – 用于流的节点覆盖。默认值为 None。
line_timeout_sec (Optional[int]) – 用于流的行超时(秒)。默认值为 LINE_TIMEOUT_SEC。
init_kwargs (Optional[Dict[str, Any]]) – 可调用类的类初始化参数,仅支持灵活流。
- 返回:
FlowExecutor 的新实例。
- 返回类型:
- enable_streaming_for_llm_flow(stream_required: Callable[[], bool])#
启用连接到输出的 LLM 节点,以返回由 stream_required 控制的流式传输结果。
如果 stream_required 回调返回 True,则 LLM 节点将返回一个字符串生成器。否则,LLM 节点将返回一个字符串。
- 参数:
stream_required (Callable[[], bool]) – 一个不带参数并返回布尔值(指示是否应为 LLM 节点启用流式传输结果)的回调。
- 返回:
None (无)
- ensure_flow_is_serializable()#
确保流是可序列化的。
某些节点可能会返回字符串生成器以创建流式输出。这在将流部署为 Web 服务时很有用。但是,在交互模式下,执行器假定节点结果是 JSON 可序列化的。
此方法为流中的每个节点添加一个包装器,以使用流式输出并将它们合并为字符串供执行器使用。
- 返回:
None (无)
- exec(inputs: dict, node_concurrency=16) dict #
使用给定输入执行流并返回输出。
- 参数:
inputs (dict) – 包含流输入值的字典。
node_concurrency (int) – 可并发执行的最大节点数。
- 返回:
包含流输出值的字典。
- 返回类型:
dict
- exec_aggregation(inputs: Mapping[str, Any], aggregation_inputs: Mapping[str, Any], run_id=None, node_concurrency=16) AggregationResult #
执行流的聚合节点。
- 参数:
inputs (Mapping[str, Any]) – 输入名称与其值的映射。
aggregation_inputs (Mapping[str, Any]) – 聚合输入名称与其值的映射。
run_id (Optional[str]) – 当前运行的 ID(如果有)。
node_concurrency (int) – 可并发执行的最大节点数。
- 返回:
聚合节点的结果。
- 返回类型:
AggregationResult
- 抛出:
如果输入或聚合输入无效,则为 FlowError。
- exec_line(inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, validate_inputs: bool = True, node_concurrency=16, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None) LineResult #
执行流的单行。
- 参数:
inputs (Mapping[str, Any]) – 行的输入值。
index (Optional[int]) – 要执行的行的索引。
run_id (Optional[str]) – 流运行的 ID。
validate_inputs (bool) – 是否验证输入值。
node_concurrency (int) – 可并发执行的最大节点数。
allow_generator_output (bool) – 是否允许生成器输出。
line_timeout_sec (Optional[int]) – 等待一行输出的最长时间。
- 返回:
执行该行的结果。
- 返回类型:
LineResult
- async exec_line_async(inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, validate_inputs: bool = True, node_concurrency=16, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None, sync_iterator_to_async: bool = True) LineResult #
执行流的单行。
- 参数:
inputs (Mapping[str, Any]) – 行的输入值。
index (Optional[int]) – 要执行的行的索引。
run_id (Optional[str]) – 流运行的 ID。
validate_inputs (bool) – 是否验证输入值。
node_concurrency (int) – 可并发执行的最大节点数。
allow_generator_output (bool) – 是否允许生成器输出。
sync_iterator_to_async (bool) – 是否将同步迭代器输出转换为异步迭代器。
- 返回:
执行该行的结果。
- 返回类型:
LineResult
- get_inputs_definition()#
- get_status_summary(run_id: str)#
获取给定运行状态的摘要。
- 参数:
run_id (str) – 要获取状态摘要的运行 ID。
- 返回:
给定运行状态的摘要。
- 返回类型:
str
- property has_aggregation_node: bool#
检查流执行器是否包含任何聚合节点。
- 返回:
如果流执行器至少有一个聚合节点,则为 True,否则为 False。
- 返回类型:
bool
- classmethod load_and_exec_node(flow_file: Path, node_name: str, *, storage: Optional[AbstractRunStorage] = None, output_sub_dir: Optional[str] = None, flow_inputs: Optional[Mapping[str, Any]] = None, dependency_nodes_outputs: Optional[Mapping[str, Any]] = None, connections: Optional[dict] = None, working_dir: Optional[Path] = None, raise_ex: bool = False)#
从流加载并执行单个节点。
- 参数:
flow_file (Path) – 流文件的路径。
node_name (str) – 要执行的节点的名称。
storage (Optional[AbstractRunStorage]) – 用于流的存储。
output_sub_dir (Optional[str]) – 用于流的图像持久化目录。仅为向后兼容保留。
flow_inputs (Optional[Mapping[str, Any]]) – 用于流的输入。默认值为 None。
dependency_nodes_outputs (Optional[Mapping[str, Any]) – 依赖节点的输出。默认值为 None。
connections (Optional[dict]) – 用于流的连接。默认值为 None。
working_dir (Optional[str]) – 用于流的工作目录。默认值为 None。
raise_ex (Optional[bool]) – 是否引发异常。默认值为 False。
- static update_environment_variables_with_connections(connections: dict)#
使用连接更新环境变量。
- 参数:
connections (dict) – 包含连接信息的字典。
- 返回:
包含更新后环境变量的字典。
- 返回类型:
dict
- promptflow.executor.flow_executor.enable_streaming_for_llm_tool(f)#
为支持流模式的 LLM 工具启用流模式。
- 参数:
f (function) – 要包装的函数。
- 返回:
包装后的函数。
- 返回类型:
函数
AzureOpenAI.completion 和 AzureOpenAI.chat 工具同时支持流模式和非流模式。默认情况下关闭流模式。使用此包装器将其打开。
- promptflow.executor.flow_executor.execute_flow(flow_file: Path, working_dir: Path, output_dir: Path, connections: dict, inputs: Mapping[str, Any], *, run_id: Optional[str] = None, run_aggregation: bool = True, enable_stream_output: bool = False, allow_generator_output: bool = False, init_kwargs: Optional[dict] = None, **kwargs) LineResult #
执行流,包括聚合节点。
- 参数:
flow_file (Path) – 流文件的路径。
working_dir (Path) – 流的工作目录。
output_dir (Path) – 相对于 working_dir 的相对路径。
connections (dict) – 包含连接信息的字典。
inputs (Mapping[str, Any]) – 包含流输入值的字典。
enable_stream_output (Optional[bool]) – 是否允许流(生成器)输出作为流输出。默认值为 False。
run_id (Optional[str]) – 运行 ID 将设置在操作上下文中并用于会话。
init_kwargs (dict) – 灵活流的初始化参数,仅在流为可调用类时支持。
kwargs (Any) – 创建流执行器的其他关键字参数。
- 返回:
执行流的行结果。
- 返回类型:
LineResult
- promptflow.executor.flow_executor.signal_handler(sig, frame)#
处理进程收到的终止信号。
目前,只有单节点运行使用此处理程序。我们打印日志并引发 KeyboardInterrupt,以便外部代码可以捕获此异常并取消正在运行的节点。