引言

本仓库包含用于训练论文《大型动作模型:从概念到实现》中大型动作模型(LAMs)的数据收集过程的实现。数据收集过程旨在简化任务处理,确保所有必要步骤从初始化到执行无缝集成。该模块是UFO项目的一部分。

数据流

Dataflow 使用 UFO 为给定任务实现实例化执行数据流,并提供批量处理和单一处理选项。

  1. 实例化:实例化指设置和准备任务以供执行的过程。此步骤通常涉及选择模板预填充筛选
  2. 执行:执行是实际运行任务的过程。此步骤涉及执行由实例化指定的动作或操作。执行后,评估代理将评估整个执行过程的质量。
  3. 数据流:数据流是将实例化执行组合成单一管道的总体过程。它提供了一个端到端的任务处理解决方案,确保所有必要步骤(从初始化到执行)无缝集成。

如果您只需要执行流程中的某个特定部分,可以独立使用实例化执行。当任务需要这两个步骤时,数据流过程会将它们整合起来,让您可以在一个管道中从头到尾执行任务。

数据流的总体处理过程如下。给定任务-计划数据,LLM 将实例化任务-动作数据,包括选择模板、预填充、过滤。

如何使用

1. 安装包

您应该在 UFO 根文件夹中安装必要的包

pip install -r requirements.txt

2. 配置 LLM

在运行数据流之前,您需要分别为 PrefillAgent 和 FilterAgent 提供 LLM 配置。您可以通过复制dataflow/config/config.yaml.template并编辑 PREFILL_AGENTFILTER_AGENT 的配置来创建自己的配置文件dataflow/config/config.yaml,如下所示

OpenAI

VISUAL_MODE: True, # Whether to use the visual mode
API_TYPE: "openai" , # The API type, "openai" for the OpenAI API.  
API_BASE: "https://api.openai.com/v1/chat/completions", # The the OpenAI API endpoint.
API_KEY: "sk-",  # The OpenAI API key, begin with sk-
API_VERSION: "2024-02-15-preview", # "2024-02-15-preview" by default
API_MODEL: "gpt-4-vision-preview",  # The only OpenAI model

Azure OpenAI (AOAI)

VISUAL_MODE: True, # Whether to use the visual mode
API_TYPE: "aoai" , # The API type, "aoai" for the Azure OpenAI.  
API_BASE: "YOUR_ENDPOINT", #  The AOAI API address. Format: https://{your-resource-name}.openai.azure.com
API_KEY: "YOUR_KEY",  # The aoai API key
API_VERSION: "2024-02-15-preview", # "2024-02-15-preview" by default
API_MODEL: "gpt-4-vision-preview",  # The only OpenAI model
API_DEPLOYMENT_ID: "YOUR_AOAI_DEPLOYMENT", # The deployment id for the AOAI API

您还可以为每个代理使用非可视化模型(例如 GPT-4),方法是设置VISUAL_MODE: False以及适当的API_MODEL(openai)和API_DEPLOYMENT_ID(aoai)。

非可视化模型配置

您可以通过在config.yaml文件中配置以下设置来为每个代理利用非可视化模型(例如 GPT-4)

  • VISUAL_MODE: False # 启用非可视化模式。
  • 为每个代理指定适当的API_MODEL(OpenAI)和API_DEPLOYMENT_ID(AOAI)。

请确保准确配置这些设置,以有效利用非可视化模型。

其他配置

config_dev.yaml指定了相关文件的路径并包含默认设置。窗口匹配和控件过滤的匹配策略支持选项:'contains''fuzzy''regex',允许用户灵活的匹配策略。MAX_STEPS是 execute_flow 的最大步数,可以由用户设置。

注意

匹配策略的具体实现和调用方法可参考windows_app_env

注意

请注意!如果您正在使用 GitHub 或其他开源工具,请不要在线公开您的config.yaml,因为它包含您的私钥。

3. 准备文件

在运行任务之前,需要准备某些文件。

3.1. JSON 格式的任务

需要实例化的任务应组织在 JSON 文件文件夹中,默认文件夹路径设置为 dataflow /tasks。此路径可以在dataflow/config/config.yaml文件中更改,或者您可以在终端中指定它,如4. 开始运行中所述。例如,存储在dataflow/tasks/prefill/中的任务可能如下所示

{
    // The app you want to use
    "app": "word",
    // A unique ID to distinguish different tasks 
    "unique_id": "1",
    // The task and steps to be instantiated
    "task": "Type 'hello' and set the font type to Arial",
    "refined_steps": [
        "Type 'hello'",
        "Set the font to Arial"
    ]
}

3.2. 模板和描述

您应该将应用程序文件作为实例化参考,放置在以应用程序命名的文件夹中。

例如,如果您有 Word 的template1.docx,它应该位于dataflow/templates/word/template1.docx

此外,对于每个应用程序文件夹,应该有一个description.json文件位于dataflow/templates/word/description.json,其中详细描述了每个模板文件。它可能如下所示

{
    "template1.docx": "A document with a rectangle shape",
    "template2.docx": "A document with a line of text"
}

如果不存在description.json文件,将随机选择一个模板文件。

3.3. 最终结构

确保以下文件已就位

  • 要实例化的 JSON 文件
  • 作为实例化参考的模板
  • JSON 格式的描述文件

文件结构可以是

dataflow/
|
├── tasks
│   └── prefill
│       ├── bulleted.json
│       ├── delete.json
│       ├── draw.json
│       ├── macro.json
│       └── rotate.json
├── templates
│   └── word
│       ├── description.json
│       ├── template1.docx
│       ├── template2.docx
│       ├── template3.docx
│       ├── template4.docx
│       ├── template5.docx
│       ├── template6.docx
│       └── template7.docx
└── ...

4. 开始运行

完成以上步骤后,您可以在命令行中使用以下命令。我们提供单一/批量处理,您需要提供单一文件路径/文件夹路径。根据用户提供的路径类型,自动决定是处理单个任务还是批量任务。

此外,您可以选择单独使用实例化/执行部分,或者将它们作为一个整体部分使用,即数据流

默认的任务中心在dataflow/config_dev.yaml中设置为"TASKS_HUB"

  • 数据流任务
python -m dataflow -dataflow --task_path path_to_task_file
  • 实例化任务
python -m dataflow -instantiation --task_path path_to_task_file
  • 执行任务
python -m dataflow -execution --task_path path_to_task_file

注意

  1. 用户在使用本项目时应小心保存原始文件;否则,当应用程序关闭时,文件将被关闭。
  2. 启动项目后,在程序截屏时,用户不应关闭应用程序窗口。

工作流

实例化

实例化过程有三个关键步骤

  1. 根据指定的应用程序和指令选择模板文件。
  2. 使用当前屏幕截图预填充任务。
  3. 筛选已建立的任务。

给定初始任务,数据流首先选择一个模板(阶段 1),然后根据 Word 环境预填充初始任务以获取任务-动作数据(阶段 2)。最后,它将筛选已建立的任务以评估任务-动作数据的质量。(阶段 3

注意

有关实例化的更详细代码设计文档可在实例化中找到。

执行

实例化后的计划将由执行任务执行。执行后,评估代理将评估整个执行过程的质量。

注意

有关执行的更详细代码设计文档可在执行中找到。

结果

结果将保存在instantiationexecutiondataflow下的results\目录中,并根据执行结果进一步存储在子目录中。

注意

有关结果的更详细信息可在结果中找到。

快速入门

我们准备了两个案例来展示数据流,可在dataflow\tasks\prefill中找到。因此,安装所需软件包后,您可以在命令行中输入以下命令

python -m dataflow -dataflow

您可以在终端中看到提示信息,这意味着数据流正在运行。

两个任务完成后,任务和输出文件将如下所示

UFO/
├── dataflow/
│   └── results/
│       ├── saved_document/         # Directory for saved documents
│       │   ├── bulleted.docx       # Result of the "bulleted" task
│       │   └── rotate.docx         # Result of the "rotate" task
│       ├── dataflow/                    # Dataflow results directory
│       │   ├── execution_pass/     # Successfully executed tasks
│       │   │   ├── bulleted.json   # Execution result for the "bulleted" task
│       │   │   ├── rotate.json      # Execution result for the "rotate" task
│       │   │   └── ...
└── ...

具体结果可在结果中以 JSON 格式和示例数据进行参考。

日志文件

相应的日志可在logs/bulletedlogs/rotate目录中找到,如下所示。每个工作流的详细日志都被记录下来,捕获了执行过程的每一步。

参考

AppEnum

基类:Enum

应用程序的枚举类。

初始化应用程序枚举。

参数
  • id (int) –

    应用程序的 ID。

  • description (str) –

    应用程序的描述。

  • file_extension (str) –

    应用程序的文件扩展名。

  • win_app (str) –

    Windows 应用程序名称。

源代码在dataflow/data_flow_controller.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __init__(self, id: int, description: str, file_extension: str, win_app: str):
    """
    Initialize the application enum.
    :param id: The ID of the application.
    :param description: The description of the application.
    :param file_extension: The file extension of the application.
    :param win_app: The Windows application name.
    """

    self.id = id
    self.description = description
    self.file_extension = file_extension
    self.win_app = win_app
    self.app_root_name = win_app.upper() + ".EXE"

TaskObject

初始化任务对象。

参数
  • task_file_path (str) –

    任务文件的路径。

  • task_type (str) –

    任务对象的任务类型(数据流、实例化或执行)。

源代码在dataflow/data_flow_controller.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def __init__(self, task_file_path: str, task_type: str) -> None:
    """
    Initialize the task object.
    :param task_file_path: The path to the task file.
    :param task_type: The task_type of the task object (dataflow, instantiation, or execution).
    """

    self.task_file_path = task_file_path
    self.task_file_base_name = os.path.basename(task_file_path)
    self.task_file_name = self.task_file_base_name.split(".")[0]

    task_json_file = load_json_file(task_file_path)
    self.app_object = self._choose_app_from_json(task_json_file["app"])
    # Initialize the task attributes based on the task_type
    self._init_attr(task_type, task_json_file)

DataFlowController

流程控制器类,用于管理实例化和执行过程。

初始化流程控制器。

参数
  • task_path (str) –

    任务文件的路径。

  • task_type (str) –

    流程控制器的任务类型(实例化、执行或数据流)。

源代码在dataflow/data_flow_controller.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def __init__(self, task_path: str, task_type: str) -> None:
    """
    Initialize the flow controller.
    :param task_path: The path to the task file.
    :param task_type: The task_type of the flow controller (instantiation, execution, or dataflow).
    """

    self.task_object = TaskObject(task_path, task_type)
    self.app_env = None
    self.app_name = self.task_object.app_object.description.lower()
    self.task_file_name = self.task_object.task_file_name

    self.schema = self._load_schema(task_type)

    self.task_type = task_type
    self.task_info = self.init_task_info()
    self.result_hub = _configs["RESULT_HUB"].format(task_type=task_type)

instantiated_plan 属性 可写

从任务信息中获取实例化计划。

返回
  • List[Dict[str, Any]]

    实例化计划。

template_copied_path 属性

从任务信息中获取复制的模板路径。

返回
  • 字符串

    复制的模板路径。

execute_execution(request, plan)

执行执行过程。

参数
  • request (str) –

    要执行的任务请求。

  • plan (Dict[str, any]) –

    包含详细步骤的执行计划。

源代码在dataflow/data_flow_controller.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def execute_execution(self, request: str, plan: Dict[str, any]) -> None:
    """
    Execute the execution process.
    :param request: The task request to be executed.
    :param plan: The execution plan containing detailed steps.
    """

    print_with_color("Executing the execution process...", "blue")
    execute_flow = None

    try:
        self.app_env.start(self.template_copied_path)
        # Initialize the execution context and flow
        context = Context()
        execute_flow = ExecuteFlow(self.task_file_name, context, self.app_env)

        # Execute the plan
        executed_plan, execute_result = execute_flow.execute(request, plan)

        # Update the instantiated plan
        self.instantiated_plan = executed_plan
        # Record execution results and time metrics
        self.task_info["execution_result"]["result"] = execute_result
        self.task_info["time_cost"]["execute"] = execute_flow.execution_time
        self.task_info["time_cost"]["execute_eval"] = execute_flow.eval_time

    except Exception as e:
        # Handle and log any exceptions that occur during execution
        self.task_info["execution_result"]["error"] = {
            "type": str(type(e).__name__),
            "message": str(e),
            "traceback": traceback.format_exc(),
        }
        print_with_color(f"Error in Execution: {e}", "red")
        raise e
    finally:
        # Record the total time cost of the execution process
        if execute_flow and hasattr(execute_flow, "execution_time"):
            self.task_info["time_cost"]["execute"] = execute_flow.execution_time
        else:
            self.task_info["time_cost"]["execute"] = None
        if execute_flow and hasattr(execute_flow, "eval_time"):
            self.task_info["time_cost"]["execute_eval"] = execute_flow.eval_time
        else:
            self.task_info["time_cost"]["execute_eval"] = None

execute_instantiation()

执行实例化过程。

返回
  • Optional[List[Dict[str, Any]]]

    如果成功,则为实例化计划。

源代码在dataflow/data_flow_controller.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def execute_instantiation(self) -> Optional[List[Dict[str, Any]]]:
    """
    Execute the instantiation process.
    :return: The instantiation plan if successful.
    """

    print_with_color(
        f"Instantiating task {self.task_object.task_file_name}...", "blue"
    )

    template_copied_path = self.instantiation_single_flow(
        ChooseTemplateFlow,
        "choose_template",
        init_params=[self.task_object.app_object.file_extension],
        execute_params=[],
    )

    if template_copied_path:
        self.app_env.start(template_copied_path)

        prefill_result = self.instantiation_single_flow(
            PrefillFlow,
            "prefill",
            init_params=[self.app_env],
            execute_params=[
                template_copied_path,
                self.task_object.task,
                self.task_object.refined_steps,
            ],
        )
        self.app_env.close()

        if prefill_result:
            self.instantiation_single_flow(
                FilterFlow,
                "instantiation_evaluation",
                init_params=[],
                execute_params=[prefill_result["instantiated_request"]],
            )
            return prefill_result["instantiated_plan"]

init_task_info()

初始化任务信息。

返回
  • Dict[str, Any]

    初始化后的任务信息。

源代码在dataflow/data_flow_controller.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def init_task_info(self) -> Dict[str, Any]:
    """
    Initialize the task information.
    :return: The initialized task information.
    """
    init_task_info = None
    if self.task_type == "execution":
        # read from the instantiated task file
        init_task_info = load_json_file(self.task_object.task_file_path)
    else:
        init_task_info = {
            "unique_id": self.task_object.unique_id,
            "app": self.app_name,
            "original": {
                "original_task": self.task_object.task,
                "original_steps": self.task_object.refined_steps,
            },
            "execution_result": {"result": None, "error": None},
            "instantiation_result": {
                "choose_template": {"result": None, "error": None},
                "prefill": {"result": None, "error": None},
                "instantiation_evaluation": {"result": None, "error": None},
            },
            "time_cost": {},
        }
    return init_task_info

instantiation_single_flow(flow_class, flow_type, init_params=None, execute_params=None)

在实例化阶段执行单个流程。

参数
  • flow_class (AppAgentProcessor) –

    要实例化的流程类。

  • flow_type (str) –

    流程的类型。

  • init_params

    流程的初始化参数。

  • execute_params

    流程的执行参数。

返回
  • Optional[Dict[str, Any]]

    流程的结果。

源代码在dataflow/data_flow_controller.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def instantiation_single_flow(
    self,
    flow_class: AppAgentProcessor,
    flow_type: str,
    init_params=None,
    execute_params=None,
) -> Optional[Dict[str, Any]]:
    """
    Execute a single flow process in the instantiation phase.
    :param flow_class: The flow class to instantiate.
    :param flow_type: The type of the flow.
    :param init_params: The initialization parameters for the flow.
    :param execute_params: The execution parameters for the flow.
    :return: The result of the flow process.
    """

    flow_instance = None
    try:
        flow_instance = flow_class(self.app_name, self.task_file_name, *init_params)
        result = flow_instance.execute(*execute_params)
        self.task_info["instantiation_result"][flow_type]["result"] = result
        return result
    except Exception as e:
        self.task_info["instantiation_result"][flow_type]["error"] = {
            "type": str(e.__class__),
            "error_message": str(e),
            "traceback": traceback.format_exc(),
        }
        print_with_color(f"Error in {flow_type}: {e} {traceback.format_exc()}")
    finally:
        if flow_instance and hasattr(flow_instance, "execution_time"):
            self.task_info["time_cost"][flow_type] = flow_instance.execution_time
        else:
            self.task_info["time_cost"][flow_type] = None

reformat_to_batch(path)

将结果传输到结果中心。

源代码在dataflow/data_flow_controller.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def reformat_to_batch(self, path) -> None:
    """
    Transfer the result to the result hub.
    """
    os.makedirs(path, exist_ok=True)
    source_files_path = os.path.join(
        self.result_hub,
        self.task_type + "_pass",
    )
    source_template_path = os.path.join(
        os.path.dirname(self.result_hub),
        "saved_document",
    )
    target_file_path = os.path.join(
        path,
        "tasks",
    )
    target_template_path = os.path.join(
        path,
        "files",
    )
    os.makedirs((target_file_path), exist_ok=True)
    os.makedirs((target_template_path), exist_ok=True)

    for file in os.listdir(source_files_path):
        if file.endswith(".json"):
            source_file = os.path.join(source_files_path, file)
            target_file = os.path.join(target_file_path, file)
            target_object = os.path.join(
                target_template_path, file.replace(".json", ".docx")
            )
            is_successed = reformat_json_file(
                target_file,
                target_object,
                load_json_file(source_file),
            )
            if is_successed:
                shutil.copy(
                    os.path.join(
                        source_template_path, file.replace(".json", ".docx")
                    ),
                    target_template_path,
                )

run()

运行实例化和执行过程。

源代码在dataflow/data_flow_controller.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def run(self) -> None:
    """
    Run the instantiation and execution process.
    """

    start_time = time.time()

    try:
        self.app_env = WindowsAppEnv(self.task_object.app_object)

        if self.task_type == "dataflow":
            plan = self.execute_instantiation()
            self.execute_execution(self.task_object.task, plan)
        elif self.task_type == "instantiation":
            self.execute_instantiation()
        elif self.task_type == "execution":
            plan = self.instantiated_plan
            self.execute_execution(self.task_object.task, plan)
        else:
            raise ValueError(f"Unsupported task_type: {self.task_type}")
    except Exception as e:
        raise e

    finally:
        # Update or record the total time cost of the process
        total_time = round(time.time() - start_time, 3)
        new_total_time = (
            self.task_info.get("time_cost", {}).get("total", 0) + total_time
        )
        self.task_info["time_cost"]["total"] = round(new_total_time, 3)

        self.save_result()

    if _configs["REFORMAT_TO_BATCH"]:
        self.reformat_to_batch(_configs["REFORMAT_TO_BATCH_HUB"])

save_result()

验证并保存实例化任务结果。

源代码在dataflow/data_flow_controller.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def save_result(self) -> None:
    """
    Validate and save the instantiated task result.
    """

    validation_error = None

    # Validate the result against the schema
    try:
        validate(instance=self.task_info, schema=self.schema)
    except ValidationError as e:
        # Record the validation error but allow the process to continue
        validation_error = str(e.message)
        print_with_color(f"Validation Error: {e.message}", "yellow")

    # Determine the target directory based on task_type and quality/completeness
    target_file = None

    if self.task_type == "instantiation":
        # Determine the quality of the instantiation
        if not self.task_info["instantiation_result"]["instantiation_evaluation"][
            "result"
        ]:
            target_file = INSTANTIATION_RESULT_MAP[False]
        else:
            is_quality_good = self.task_info["instantiation_result"][
                "instantiation_evaluation"
            ]["result"]["judge"]
            target_file = INSTANTIATION_RESULT_MAP.get(
                is_quality_good, INSTANTIATION_RESULT_MAP[False]
            )

    else:
        # Determine the completion status of the execution
        if not self.task_info["execution_result"]["result"]:
            target_file = EXECUTION_RESULT_MAP["no"]
        else:
            is_completed = self.task_info["execution_result"]["result"]["complete"]
            target_file = EXECUTION_RESULT_MAP.get(
                is_completed, EXECUTION_RESULT_MAP["no"]
            )

    # Construct the full path to save the result
    new_task_path = os.path.join(
        self.result_hub, target_file, self.task_object.task_file_base_name
    )
    os.makedirs(os.path.dirname(new_task_path), exist_ok=True)
    save_json_file(new_task_path, self.task_info)

    print(f"Task saved to {new_task_path}")

    # If validation failed, indicate that the saved result may need further inspection
    if validation_error:
        print(
            "The saved task result does not conform to the expected schema and may require review."
        )