跟随模式

跟随者模式是 UFO 的一个功能,代理会遵循预定义的自然语言步骤列表,对应用程序执行操作。与普通模式不同,此模式会创建一个 FollowerAgent,它遵循用户提供的计划列表与应用程序交互,而不是自行生成计划。此模式对于调试、软件测试或验证非常有用。

快速入门

步骤 1:创建计划文件

在启动跟随者模式之前,您需要创建一个计划文件,其中包含代理要遵循的步骤列表。计划文件是一个 JSON 文件,包含以下字段:

字段 描述 类型
task 任务描述。 字符串
steps 代理要遵循的步骤列表。 字符串列表
object 要与之交互的应用程序或文件。 字符串

以下是计划文件的示例:

{
    "task": "Type in a text of 'Test For Fun' with heading 1 level",
    "steps": 
    [
        "1.type in 'Test For Fun'", 
        "2.Select the 'Test For Fun' text",
        "3.Click 'Home' tab to show the 'Styles' ribbon tab",
        "4.Click 'Styles' ribbon tab to show the style 'Heading 1'",
        "5.Click 'Heading 1' style to apply the style to the selected text"
    ],
    "object": "draft.docx"
}

注意

object 字段是代理将与之交互的应用程序或文件。启动跟随者模式时,该对象必须处于活动状态(可以最小化)。

步骤 2:启动跟随者模式

要启动跟随者模式,请运行以下命令:

# assume you are in the cloned UFO folder
python ufo.py --task_name {task_name} --mode follower --plan {plan_file}

提示

{task_name} 替换为任务名称,将 {plan_file} 替换为计划文件的路径。

步骤 3:批量运行(可选)

您还可以通过提供包含多个计划文件的文件夹,以批处理模式运行跟随者模式。代理将逐个遵循文件夹中的计划。要以批处理模式运行,请运行以下命令:

# assume you are in the cloned UFO folder
python ufo.py --task_name {task_name} --mode follower --plan {plan_folder}

UFO 将自动检测文件夹中的计划文件并逐个运行它们。

提示

{task_name} 替换为任务名称,将 {plan_folder} 替换为包含计划文件的文件夹路径。

评估

您可能希望通过遵循计划来评估 task 是否成功完成。如果 config_dev.yaml 文件中将 EVA_SESSION 设置为 True,UFO 将调用 EvaluationAgent 来评估任务。

您可以在 logs/{task_name}/evaluation.log 文件中查看评估日志。

参考资料

跟随者模式采用 PlanReader 来解析计划文件,并创建 FollowerSession 来遵循计划。

PlanReader

PlanReader 位于 ufo/module/sessions/plan_reader.py 文件中。

用于计划文件的读取器。

初始化计划读取器。

参数
  • plan_file (str) –

    计划文件的路径。

源代码在 module/sessions/plan_reader.py
18
19
20
21
22
23
24
25
26
27
28
def __init__(self, plan_file: str):
    """
    Initialize a plan reader.
    :param plan_file: The path of the plan file.
    """

    self.plan_file = plan_file
    with open(plan_file, "r") as f:
        self.plan = json.load(f)
    self.remaining_steps = self.get_steps()
    self.support_apps = ["WINWORD.EXE", "EXCEL.EXE", "POWERPNT.EXE"]

get_close()

检查计划是否已关闭。

返回
  • 布尔值

    如果计划需要关闭,则为 True,否则为 False。

源代码在 module/sessions/plan_reader.py
30
31
32
33
34
35
36
def get_close(self) -> bool:
    """
    Check if the plan is closed.
    :return: True if the plan need closed, False otherwise.
    """

    return self.plan.get("close", False)

get_host_agent_request()

获取主机代理的请求。

返回
  • 字符串

    主机代理的请求。

源代码在 module/sessions/plan_reader.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def get_host_agent_request(self) -> str:
    """
    Get the request for the host agent.
    :return: The request for the host agent.
    """

    object_name = self.get_operation_object()

    request = (
        f"Open and select the application of {object_name}, and output the FINISH status immediately. "
        "You must output the selected application with their control text and label even if it is already open."
    )

    return request

get_host_request()

获取主机代理的请求。

返回
  • 字符串

    主机代理的请求。

源代码在 module/sessions/plan_reader.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def get_host_request(self) -> str:
    """
    Get the request for the host agent.
    :return: The request for the host agent.
    """

    task = self.get_task()
    object_name = self.get_operation_object()
    if object_name in self.support_apps:
        request = task
    else:
        request = (
            f"Your task is '{task}'. And open the application of {object_name}. "
            "You must output the selected application with their control text and label even if it is already open."
        )
    return request

get_initial_request()

获取计划中的初始请求。

返回
  • 字符串

    初始请求。

源代码在 module/sessions/plan_reader.py
62
63
64
65
66
67
68
69
70
71
72
73
def get_initial_request(self) -> str:
    """
    Get the initial request in the plan.
    :return: The initial request.
    """

    task = self.get_task()
    object_name = self.get_operation_object()

    request = f"{task} in {object_name}"

    return request

get_operation_object()

获取步骤中的操作对象。

返回
  • 字符串

    操作对象。

源代码在 module/sessions/plan_reader.py
54
55
56
57
58
59
60
def get_operation_object(self) -> str:
    """
    Get the operation object in the step.
    :return: The operation object.
    """

    return self.plan.get("object", None).lower()

get_root_path()

获取计划的根路径。

返回
  • 字符串

    计划的根路径。

源代码在 module/sessions/plan_reader.py
148
149
150
151
152
153
154
def get_root_path(self) -> str:
    """
    Get the root path of the plan.
    :return: The root path of the plan.
    """

    return os.path.dirname(os.path.abspath(self.plan_file))

get_steps()

获取计划中的步骤。

返回
  • List[str]

    计划中的步骤。

源代码在 module/sessions/plan_reader.py
46
47
48
49
50
51
52
def get_steps(self) -> List[str]:
    """
    Get the steps in the plan.
    :return: The steps in the plan.
    """

    return self.plan.get("steps", [])

get_support_apps()

获取计划中的支持应用程序。

返回
  • List[str]

    计划中的支持应用程序。

源代码在 module/sessions/plan_reader.py
103
104
105
106
107
108
109
def get_support_apps(self) -> List[str]:
    """
    Get the support apps in the plan.
    :return: The support apps in the plan.
    """

    return self.support_apps

get_task()

获取任务名称。

返回
  • 字符串

    任务名称。

源代码在 module/sessions/plan_reader.py
38
39
40
41
42
43
44
def get_task(self) -> str:
    """
    Get the task name.
    :return: The task name.
    """

    return self.plan.get("task", "")

next_step()

获取计划中的下一步。

返回
  • Optional[str]

    下一步。

源代码在 module/sessions/plan_reader.py
128
129
130
131
132
133
134
135
136
137
138
def next_step(self) -> Optional[str]:
    """
    Get the next step in the plan.
    :return: The next step.
    """

    if self.remaining_steps:
        step = self.remaining_steps.pop(0)
        return step

    return None

task_finished()

检查任务是否已完成。

返回
  • 布尔值

    如果任务已完成,则为 True,否则为 False。

源代码在 module/sessions/plan_reader.py
140
141
142
143
144
145
146
def task_finished(self) -> bool:
    """
    Check if the task is finished.
    :return: True if the task is finished, False otherwise.
    """

    return not self.remaining_steps


FollowerSession

FollowerSession 也位于 ufo/module/sessions/session.py 文件中。

基类:BaseSession

一个用于遵循操作计划列表的会话。此会话用于跟随者代理,它接受计划文件以使用 PlanReader 进行遵循。

初始化会话。

参数
  • task (str) –

    当前任务的名称。

  • plan_file (str) –

    要遵循的计划文件的路径。

  • should_evaluate (bool) –

    是否评估会话。

  • id (int) –

    会话的 ID。

源代码在 module/sessions/session.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def __init__(
    self, task: str, plan_file: str, should_evaluate: bool, id: int
) -> None:
    """
    Initialize a session.
    :param task: The name of current task.
    :param plan_file: The path of the plan file to follow.
    :param should_evaluate: Whether to evaluate the session.
    :param id: The id of the session.
    """

    super().__init__(task, should_evaluate, id)

    self.plan_reader = PlanReader(plan_file)

create_new_round()

创建一个新回合。

源代码在 module/sessions/session.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def create_new_round(self) -> None:
    """
    Create a new round.
    """

    # Get a request for the new round.
    request = self.next_request()

    # Create a new round and return None if the session is finished.
    if self.is_finished():
        return None

    if self.total_rounds == 0:
        utils.print_with_color("Complete the following request:", "yellow")
        utils.print_with_color(self.plan_reader.get_initial_request(), "cyan")
        agent = self._host_agent
    else:
        self.context.set(ContextNames.SUBTASK, request)
        agent = self._host_agent.create_app_agent(
            application_window_name=self.context.get(
                ContextNames.APPLICATION_PROCESS_NAME
            ),
            application_root_name=self.context.get(
                ContextNames.APPLICATION_ROOT_NAME
            ),
            request=request,
            mode=self.context.get(ContextNames.MODE),
        )

        # Clear the memory and set the state to continue the app agent.
        agent.clear_memory()
        agent.blackboard.requests.clear()

        agent.set_state(ContinueAppAgentState())

    round = BaseRound(
        request=request,
        agent=agent,
        context=self.context,
        should_evaluate=configs.get("EVA_ROUND", False),
        id=self.total_rounds,
    )

    self.add_round(round.id, round)

    return round

next_request()

获取新一轮的请求。

源代码在 module/sessions/session.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def next_request(self) -> str:
    """
    Get the request for the new round.
    """

    # If the task is finished, return an empty string.
    if self.plan_reader.task_finished():
        self._finish = True
        return ""

    # Get the request from the plan reader.
    if self.total_rounds == 0:
        return self.plan_reader.get_host_agent_request()
    else:
        return self.plan_reader.next_step()

request_to_evaluate()

获取要评估的请求。返回:要评估的请求。

源代码在 module/sessions/session.py
371
372
373
374
375
376
377
def request_to_evaluate(self) -> str:
    """
    Get the request to evaluate.
    return: The request(s) to evaluate.
    """

    return self.plan_reader.get_task()