批处理模式

批处理模式是 UFO 的一个功能,该代理允许对任务进行批量自动化。

快速入门

步骤 1:创建计划文件

在启动批处理模式之前,您需要创建一个计划文件,其中包含代理要遵循的步骤列表。计划文件是一个 JSON 文件,包含以下字段:

字段 描述 类型
task 任务描述。 字符串
object 要交互的应用程序或文件。 字符串
close 确定在完成任务后是否关闭相应的应用程序或文件。 布尔值

下面是一个计划文件的示例:

{
    "task": "Type in a text of 'Test For Fun' with heading 1 level",
    "object": "draft.docx",
    "close": False
}

注意

object 字段是代理将与之交互的应用程序或文件。在启动批处理模式时,该对象**必须处于活动状态**(可以最小化)。您的文件结构应如下所示,其中 tasks 是任务目录,files 是存储对象文件的位置:

  • Parent
  • tasks
  • files

步骤 2:启动批处理模式

要启动批处理模式,请运行以下命令:

# assume you are in the cloned UFO folder
python ufo.py --task_name {task_name} --mode batch_normal --plan {plan_file}

提示

{task_name} 替换为任务的名称,将 {plan_file} 替换为 Path_to_Parent/Plan_file

评估

您可能希望通过遵循计划来评估 task 是否成功完成。如果 config_dev.yaml 文件中的 EVA_SESSION 设置为 True,UFO 将调用 EvaluationAgent 来评估任务。

您可以在 logs/{task_name}/evaluation.log 文件中查看评估日志。

参考资料

批处理模式使用 PlanReader 解析计划文件并创建 FromFileSession 来遵循计划。

PlanReader

PlanReader 位于 ufo/module/sessions/plan_reader.py 文件中。

计划文件的读取器。

初始化计划读取器。

参数
  • plan_file (str) –

    计划文件的路径。

源代码位于 module/sessions/plan_reader.py
18
19
20
21
22
23
24
25
26
27
28
def __init__(self, plan_file: str):
    """
    Initialize a plan reader.
    :param plan_file: The path of the plan file.
    """

    self.plan_file = plan_file
    with open(plan_file, "r") as f:
        self.plan = json.load(f)
    self.remaining_steps = self.get_steps()
    self.support_apps = ["WINWORD.EXE", "EXCEL.EXE", "POWERPNT.EXE"]

get_close()

检查计划是否已关闭。

返回
  • 布尔值

    如果计划需要关闭则为 True,否则为 False。

源代码位于 module/sessions/plan_reader.py
30
31
32
33
34
35
36
def get_close(self) -> bool:
    """
    Check if the plan is closed.
    :return: True if the plan need closed, False otherwise.
    """

    return self.plan.get("close", False)

get_host_agent_request()

获取主机代理的请求。

返回
  • 字符串

    主机代理的请求。

源代码位于 module/sessions/plan_reader.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def get_host_agent_request(self) -> str:
    """
    Get the request for the host agent.
    :return: The request for the host agent.
    """

    object_name = self.get_operation_object()

    request = (
        f"Open and select the application of {object_name}, and output the FINISH status immediately. "
        "You must output the selected application with their control text and label even if it is already open."
    )

    return request

get_host_request()

获取主机代理的请求。

返回
  • 字符串

    主机代理的请求。

源代码位于 module/sessions/plan_reader.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def get_host_request(self) -> str:
    """
    Get the request for the host agent.
    :return: The request for the host agent.
    """

    task = self.get_task()
    object_name = self.get_operation_object()
    if object_name in self.support_apps:
        request = task
    else:
        request = (
            f"Your task is '{task}'. And open the application of {object_name}. "
            "You must output the selected application with their control text and label even if it is already open."
        )
    return request

get_initial_request()

获取计划中的初始请求。

返回
  • 字符串

    初始请求。

源代码位于 module/sessions/plan_reader.py
62
63
64
65
66
67
68
69
70
71
72
73
def get_initial_request(self) -> str:
    """
    Get the initial request in the plan.
    :return: The initial request.
    """

    task = self.get_task()
    object_name = self.get_operation_object()

    request = f"{task} in {object_name}"

    return request

get_operation_object()

获取步骤中的操作对象。

返回
  • 字符串

    操作对象。

源代码位于 module/sessions/plan_reader.py
54
55
56
57
58
59
60
def get_operation_object(self) -> str:
    """
    Get the operation object in the step.
    :return: The operation object.
    """

    return self.plan.get("object", None).lower()

get_root_path()

获取计划的根路径。

返回
  • 字符串

    计划的根路径。

源代码位于 module/sessions/plan_reader.py
148
149
150
151
152
153
154
def get_root_path(self) -> str:
    """
    Get the root path of the plan.
    :return: The root path of the plan.
    """

    return os.path.dirname(os.path.abspath(self.plan_file))

get_steps()

获取计划中的步骤。

返回
  • List[str]

    计划中的步骤。

源代码位于 module/sessions/plan_reader.py
46
47
48
49
50
51
52
def get_steps(self) -> List[str]:
    """
    Get the steps in the plan.
    :return: The steps in the plan.
    """

    return self.plan.get("steps", [])

get_support_apps()

获取计划中的支持应用程序。

返回
  • List[str]

    计划中的支持应用程序。

源代码位于 module/sessions/plan_reader.py
103
104
105
106
107
108
109
def get_support_apps(self) -> List[str]:
    """
    Get the support apps in the plan.
    :return: The support apps in the plan.
    """

    return self.support_apps

get_task()

获取任务名称。

返回
  • 字符串

    任务名称。

源代码位于 module/sessions/plan_reader.py
38
39
40
41
42
43
44
def get_task(self) -> str:
    """
    Get the task name.
    :return: The task name.
    """

    return self.plan.get("task", "")

next_step()

获取计划中的下一步。

返回
  • Optional[str]

    下一步。

源代码位于 module/sessions/plan_reader.py
128
129
130
131
132
133
134
135
136
137
138
def next_step(self) -> Optional[str]:
    """
    Get the next step in the plan.
    :return: The next step.
    """

    if self.remaining_steps:
        step = self.remaining_steps.pop(0)
        return step

    return None

task_finished()

检查任务是否完成。

返回
  • 布尔值

    如果任务完成则为 True,否则为 False。

源代码位于 module/sessions/plan_reader.py
140
141
142
143
144
145
146
def task_finished(self) -> bool:
    """
    Check if the task is finished.
    :return: True if the task is finished, False otherwise.
    """

    return not self.remaining_steps


FollowerSession

FromFileSession 也位于 ufo/module/sessions/session.py 文件中。

基类:BaseSession

一个用于 UFO 的文件会话。

初始化会话。

参数
  • task (str) –

    当前任务的名称。

  • plan_file (str) –

    要遵循的计划文件的路径。

  • should_evaluate (bool) –

    是否评估会话。

  • id (int) –

    会话的 ID。

源代码位于 module/sessions/session.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def __init__(
    self, task: str, plan_file: str, should_evaluate: bool, id: int
) -> None:
    """
    Initialize a session.
    :param task: The name of current task.
    :param plan_file: The path of the plan file to follow.
    :param should_evaluate: Whether to evaluate the session.
    :param id: The id of the session.
    """

    super().__init__(task, should_evaluate, id)
    self.plan_file = plan_file
    self.plan_reader = PlanReader(plan_file)
    self.support_apps = self.plan_reader.get_support_apps()
    self.close = self.plan_reader.get_close()
    self.task_name = task.split("/")[1]
    self.object_name = ""

create_new_round()

创建一个新回合。

源代码位于 module/sessions/session.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
def create_new_round(self) -> None:
    """
    Create a new round.
    """

    # Get a request for the new round.
    request = self.next_request()

    # Create a new round and return None if the session is finished.
    if self.is_finished():
        return None

    self._host_agent.set_state(ContinueHostAgentState())

    round = BaseRound(
        request=request,
        agent=self._host_agent,
        context=self.context,
        should_evaluate=configs.get("EVA_ROUND", False),
        id=self.total_rounds,
    )

    self.add_round(round.id, round)

    return round

get_app_com(object_name)

根据对象名称获取 COM 对象名称。

参数
  • object_name (str) –

    对象的名称。

返回
  • 字符串

    COM 对象名称。

源代码位于 module/sessions/session.py
467
468
469
470
471
472
473
474
475
476
477
478
479
def get_app_com(self, object_name: str) -> str:
    """
    Get the COM object name based on the object name.
    :param object_name: The name of the object.
    :return: The COM object name.
    """
    application_mapping = {
        ".docx": "Word.Application",
        ".xlsx": "Excel.Application",
        ".pptx": "PowerPoint.Application",
    }
    self.app_name = application_mapping.get(object_name)
    return self.app_name

get_app_name(object_name)

根据对象名称获取应用程序名称。

参数
  • object_name (str) –

    对象的名称。

返回
  • 字符串

    应用程序名称。

源代码位于 module/sessions/session.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def get_app_name(self, object_name: str) -> str:
    """
    Get the application name based on the object name.
    :param object_name: The name of the object.
    :return: The application name.
    """
    application_mapping = {
        ".docx": "WINWORD.EXE",
        ".xlsx": "EXCEL.EXE",
        ".pptx": "POWERPNT.EXE",
        # "outlook": "olk.exe",
        # "onenote": "ONENOTE.EXE",
    }
    self.app_name = application_mapping.get(object_name)
    return self.app_name

next_request()

获取主机代理的请求。

返回
  • 字符串

    主机代理的请求。

源代码位于 module/sessions/session.py
438
439
440
441
442
443
444
445
446
447
448
449
def next_request(self) -> str:
    """
    Get the request for the host agent.
    :return: The request for the host agent.
    """

    if self.total_rounds == 0:
        utils.print_with_color(self.plan_reader.get_host_request(), "cyan")
        return self.plan_reader.get_host_request()
    else:
        self._finish = True
        return

record_task_done()

记录任务完成。

源代码位于 module/sessions/session.py
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
def record_task_done(self) -> None:
    """
    Record the task done.
    """
    is_record = configs.get("TASK_STATUS", True)
    if is_record:
        file_path = configs.get(
            "TASK_STATUS_FILE",
            os.path.join(self.plan_file, "../..", "tasks_status.json"),
        )
        task_done = json.load(open(file_path, "r"))
        task_done[self.task_name] = True
        json.dump(
            task_done,
            open(file_path, "w"),
            indent=4,
        )

request_to_evaluate()

获取要评估的请求。返回:要评估的请求。

源代码位于 module/sessions/session.py
543
544
545
546
547
548
def request_to_evaluate(self) -> str:
    """
    Get the request to evaluate.
    return: The request(s) to evaluate.
    """
    return self.plan_reader.get_task()

run()

运行会话。

源代码位于 module/sessions/session.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
def run(self) -> None:
    """
    Run the session.
    """
    self.setup_application_environment()
    try:
        super().run()
        self.record_task_done()
    except Exception as e:
        import traceback

        traceback.print_exc()
        print(f"An error occurred: {e}")
    # Close the APP if the user ask so.
    self.terminate_application_processes()

setup_application_environment()

通过根据操作对象确定应用程序名称和命令,然后启动应用程序来设置应用程序环境。

引发:Exception:如果在执行命令或通过 COM 与应用程序交互时发生错误。

源代码位于 module/sessions/session.py
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
def setup_application_environment(self):
    """
    Sets up the application environment by determining the application name and
    command based on the operation object, and then launching the application.

    Raises:
        Exception: If an error occurs during the execution of the command or
                   while interacting with the application via COM.
    """
    self.object_name = self.plan_reader.get_operation_object()
    if self.object_name:
        suffix = os.path.splitext(self.object_name)[1]
        self.app_name = self.get_app_name(suffix)
        print("app_name:", self.app_name)
        if self.app_name not in self.support_apps:
            print(f"The app {self.app_name} is not supported.")
            return  # The app is not supported, so we don't need to setup the environment.
        file = self.plan_reader.get_file_path()
        code_snippet = f"import os\nos.system('start {self.app_name} \"{file}\"')"
        code_snippet = code_snippet.replace("\\", "\\\\")  # escape backslashes
        try:
            exec(code_snippet, globals())
            app_com = self.get_app_com(suffix)
            time.sleep(2)  # wait for the app to boot
            word_app = win32com.client.Dispatch(app_com)
            word_app.WindowState = 1  # wdWindowStateMaximize
        except Exception as e:
            print(f"An error occurred: {e}")

terminate_application_processes()

根据提供的条件终止特定的应用程序进程。

源代码位于 module/sessions/session.py
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
def terminate_application_processes(self):
    """
    Terminates specific application processes based on the provided conditions.
    """
    if self.close:
        if self.object_name:
            for process in psutil.process_iter(["name"]):
                if process.info["name"] == self.app_name:
                    os.system(f"taskkill /f /im {self.app_name}")
                    time.sleep(1)
        else:
            app_names = ["WINWORD.EXE", "EXCEL.EXE", "POWERPNT.EXE"]
            for process in psutil.process_iter(["name"]):
                if process.info["name"] in app_names:
                    os.system(f"taskkill /f /im {process.info['name']}")
                    time.sleep(1)