回合

一个 Round 是用户与 UFO 之间的一次交互,用于处理一个用户请求。一个 Round 负责协调 HostAgentAppAgent 来完成用户的请求。

回合生命周期

在一个 Round 中,会执行以下步骤:

1. 回合初始化

Round 开始时,创建 Round 对象,并由 HostAgent 处理用户请求,以确定合适的应用程序来满足请求。

2. 动作执行

一旦创建,Round 会协调 HostAgentAppAgent 来执行必要的动作以完成用户的请求。Round 的核心逻辑如下所示:

def run(self) -> None:
    """
    Run the round.
    """

    while not self.is_finished():

        self.agent.handle(self.context)

        self.state = self.agent.state.next_state(self.agent)
        self.agent = self.agent.state.next_agent(self.agent)
        self.agent.set_state(self.state)

        # If the subtask ends, capture the last snapshot of the application.
        if self.state.is_subtask_end():
            time.sleep(configs["SLEEP_TIME"])
            self.capture_last_snapshot(sub_round_id=self.subtask_amount)
            self.subtask_amount += 1

    self.agent.blackboard.add_requests(
        {"request_{i}".format(i=self.id), self.request}
    )

    if self.application_window is not None:
        self.capture_last_snapshot()

    if self._should_evaluate:
        self.evaluation()

在每个步骤中,Round 根据当前状态调用 AppAgentHostAgenthandle 方法来处理用户请求。状态决定了下一个处理请求的代理以及下一个要转换到的状态。

3. 请求完成

AppAgent 完成应用程序内的动作。如果请求涉及多个应用程序,HostAgent 可能会切换到不同的应用程序以继续任务。

4. 回合终止

一旦用户请求完成,Round 将终止,结果返回给用户。如果配置了,EvaluationAgent 会评估 Round 的完整性。

参考

基类:ABC

UFO 会话中的一轮。一轮管理一个用户请求,并包含多个步骤。一个会话可能包含多轮交互。

初始化一轮。

参数
  • request (str) –

    本轮的请求。

  • agent (BasicAgent) –

    本轮的初始代理。

  • context (Context) –

    本轮的共享上下文。

  • should_evaluate (bool) –

    是否评估本轮。

  • id (int) –

    本轮的 ID。

源代码在 module/basic.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def __init__(
    self,
    request: str,
    agent: BasicAgent,
    context: Context,
    should_evaluate: bool,
    id: int,
) -> None:
    """
    Initialize a round.
    :param request: The request of the round.
    :param agent: The initial agent of the round.
    :param context: The shared context of the round.
    :param should_evaluate: Whether to evaluate the round.
    :param id: The id of the round.
    """

    self._request = request
    self._context = context
    self._agent = agent
    self._state = agent.state
    self._id = id
    self._should_evaluate = should_evaluate

    self._init_context()

agent property writable

获取本轮的代理。返回:本轮的代理。

application_window property writable

获取会话的应用程序。返回:会话的应用程序。

context property

获取本轮的上下文。返回:本轮的上下文。

cost property

获取本轮的成本。返回:本轮的成本。

id property

获取本轮的 ID。返回:本轮的 ID。

log_path property

获取本轮的日志路径。

返回:本轮的日志路径。

request property

获取本轮的请求。返回:本轮的请求。

state property writable

获取本轮的状态。返回:本轮的状态。

step property

获取本轮的本地步骤。返回:本轮的步骤。

subtask_amount property writable

获取本轮的子任务数量。返回:本轮的子任务数量。

capture_last_snapshot(sub_round_id=None)

捕获应用程序的最后快照,如果配置了,包括屏幕截图和 XML 文件。

参数
  • sub_round_id (Optional[int], default: None ) –

    子回合的 ID,默认为 None。

源代码在 module/basic.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def capture_last_snapshot(self, sub_round_id: Optional[int] = None) -> None:
    """
    Capture the last snapshot of the application, including the screenshot and the XML file if configured.
    :param sub_round_id: The id of the sub-round, default is None.
    """

    # Capture the final screenshot
    if sub_round_id is None:
        screenshot_save_path = self.log_path + f"action_round_{self.id}_final.png"
    else:
        screenshot_save_path = (
            self.log_path
            + f"action_round_{self.id}_sub_round_{sub_round_id}_final.png"
        )

    if self.application_window is not None:

        try:
            PhotographerFacade().capture_app_window_screenshot(
                self.application_window, save_path=screenshot_save_path
            )

        except Exception as e:
            utils.print_with_color(
                f"Warning: The last snapshot capture failed, due to the error: {e}",
                "yellow",
            )

        if configs.get("SAVE_UI_TREE", False):
            step_ui_tree = ui_tree.UITree(self.application_window)

            ui_tree_path = os.path.join(self.log_path, "ui_trees")

            ui_tree_file_name = (
                f"ui_tree_round_{self.id}_final.json"
                if sub_round_id is None
                else f"ui_tree_round_{self.id}_sub_round_{sub_round_id}_final.json"
            )

            step_ui_tree.save_ui_tree_to_json(
                os.path.join(
                    ui_tree_path,
                    ui_tree_file_name,
                )
            )

        if configs.get("SAVE_FULL_SCREEN", False):

            desktop_save_path = (
                self.log_path
                + f"desktop_round_{self.id}_sub_round_{sub_round_id}_final.png"
            )

            # Capture the desktop screenshot for all screens.
            PhotographerFacade().capture_desktop_screen_screenshot(
                all_screens=True, save_path=desktop_save_path
            )

        # Save the final XML file
        if configs["LOG_XML"]:
            log_abs_path = os.path.abspath(self.log_path)
            xml_save_path = os.path.join(
                log_abs_path,
                (
                    f"xml/action_round_{self.id}_final.xml"
                    if sub_round_id is None
                    else f"xml/action_round_{self.id}_sub_round_{sub_round_id}_final.xml"
                ),
            )

            if issubclass(type(self.agent), HostAgent):

                app_agent: AppAgent = self.agent.get_active_appagent()
                app_agent.Puppeteer.save_to_xml(xml_save_path)
            elif issubclass(type(self.agent), AppAgent):
                app_agent: AppAgent = self.agent
                app_agent.Puppeteer.save_to_xml(xml_save_path)

evaluation()

TODO: 评估本轮。

源代码在 module/basic.py
326
327
328
329
330
def evaluation(self) -> None:
    """
    TODO: Evaluate the round.
    """
    pass

is_finished()

检查本轮是否完成。返回:如果本轮完成,则为 True,否则为 False。

源代码在 module/basic.py
129
130
131
132
133
134
135
136
137
def is_finished(self) -> bool:
    """
    Check if the round is finished.
    return: True if the round is finished, otherwise False.
    """
    return (
        self.state.is_round_end()
        or self.context.get(ContextNames.SESSION_STEP) >= configs["MAX_STEP"]
    )

print_cost()

打印本轮的总成本。

源代码在 module/basic.py
227
228
229
230
231
232
233
234
235
236
237
def print_cost(self) -> None:
    """
    Print the total cost of the round.
    """

    total_cost = self.cost
    if isinstance(total_cost, float):
        formatted_cost = "${:.2f}".format(total_cost)
        utils.print_with_color(
            f"Request total cost for current round is {formatted_cost}", "yellow"
        )

run()

运行本轮。

源代码在 module/basic.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def run(self) -> None:
    """
    Run the round.
    """

    while not self.is_finished():

        self.agent.handle(self.context)

        self.state = self.agent.state.next_state(self.agent)
        self.agent = self.agent.state.next_agent(self.agent)

        self.agent.set_state(self.state)

        # If the subtask ends, capture the last snapshot of the application.
        if self.state.is_subtask_end():
            time.sleep(configs["SLEEP_TIME"])
            self.capture_last_snapshot(sub_round_id=self.subtask_amount)
            self.subtask_amount += 1

    self.agent.blackboard.add_requests(
        {"request_{i}".format(i=self.id): self.request}
    )

    if self.application_window is not None:
        self.capture_last_snapshot()

    if self._should_evaluate:
        self.evaluation()