diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py b/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py index daaacda15870..862b15e07f12 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py @@ -15,7 +15,7 @@ UserMessage, ) from autogen_core.components.tools import FunctionTool, Tool -from pydantic import BaseModel, ConfigDict, Field, model_validator +from pydantic import BaseModel, Field, model_validator from .. import EVENT_LOGGER_NAME from ..base import Response @@ -33,30 +33,6 @@ event_logger = logging.getLogger(EVENT_LOGGER_NAME) -class ToolCallEvent(BaseModel): - """A tool call event.""" - - source: str - """The source of the event.""" - - tool_calls: List[FunctionCall] - """The tool call message.""" - - model_config = ConfigDict(arbitrary_types_allowed=True) - - -class ToolCallResultEvent(BaseModel): - """A tool call result event.""" - - source: str - """The source of the event.""" - - tool_call_results: List[FunctionExecutionResult] - """The tool call result message.""" - - model_config = ConfigDict(arbitrary_types_allowed=True) - - class Handoff(BaseModel): """Handoff configuration for :class:`AssistantAgent`.""" @@ -264,19 +240,21 @@ async def on_messages_stream( # Run tool calls until the model produces a string response. while isinstance(result.content, list) and all(isinstance(item, FunctionCall) for item in result.content): - event_logger.debug(ToolCallEvent(tool_calls=result.content, source=self.name)) + tool_call_msg = ToolCallMessage(content=result.content, source=self.name, models_usage=result.usage) + event_logger.debug(tool_call_msg) # Add the tool call message to the output. - inner_messages.append(ToolCallMessage(content=result.content, source=self.name, models_usage=result.usage)) - yield ToolCallMessage(content=result.content, source=self.name, models_usage=result.usage) + inner_messages.append(tool_call_msg) + yield tool_call_msg # Execute the tool calls. results = await asyncio.gather( *[self._execute_tool_call(call, cancellation_token) for call in result.content] ) - event_logger.debug(ToolCallResultEvent(tool_call_results=results, source=self.name)) + tool_call_result_msg = ToolCallResultMessage(content=results, source=self.name) + event_logger.debug(tool_call_result_msg) self._model_context.append(FunctionExecutionResultMessage(content=results)) - inner_messages.append(ToolCallResultMessage(content=results, source=self.name)) - yield ToolCallResultMessage(content=results, source=self.name) + inner_messages.append(tool_call_result_msg) + yield tool_call_result_msg # Detect handoff requests. handoffs: List[Handoff] = [] diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_base_chat_agent.py b/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_base_chat_agent.py index cf146b0c10fb..bbfc1ce1e0ab 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_base_chat_agent.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_base_chat_agent.py @@ -4,7 +4,7 @@ from autogen_core.base import CancellationToken from ..base import ChatAgent, Response, TaskResult -from ..messages import ChatMessage, InnerMessage, TextMessage +from ..messages import AgentMessage, ChatMessage, InnerMessage, TextMessage class BaseChatAgent(ChatAgent, ABC): @@ -62,7 +62,7 @@ async def run( cancellation_token = CancellationToken() first_message = TextMessage(content=task, source="user") response = await self.on_messages([first_message], cancellation_token) - messages: List[InnerMessage | ChatMessage] = [first_message] + messages: List[AgentMessage] = [first_message] if response.inner_messages is not None: messages += response.inner_messages messages.append(response.chat_message) @@ -73,14 +73,14 @@ async def run_stream( task: str, *, cancellation_token: CancellationToken | None = None, - ) -> AsyncGenerator[InnerMessage | ChatMessage | TaskResult, None]: + ) -> AsyncGenerator[AgentMessage | TaskResult, None]: """Run the agent with the given task and return a stream of messages and the final task result as the last item in the stream.""" if cancellation_token is None: cancellation_token = CancellationToken() first_message = TextMessage(content=task, source="user") yield first_message - messages: List[InnerMessage | ChatMessage] = [first_message] + messages: List[AgentMessage] = [first_message] async for message in self.on_messages_stream([first_message], cancellation_token): if isinstance(message, Response): yield message.chat_message diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/base/_task.py b/python/packages/autogen-agentchat/src/autogen_agentchat/base/_task.py index a642120799f2..1c33f7ecc185 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/base/_task.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/base/_task.py @@ -3,16 +3,19 @@ from autogen_core.base import CancellationToken -from ..messages import ChatMessage, InnerMessage +from ..messages import AgentMessage @dataclass class TaskResult: """Result of running a task.""" - messages: Sequence[InnerMessage | ChatMessage] + messages: Sequence[AgentMessage] """Messages produced by the task.""" + stop_reason: str | None = None + """The reason the task stopped.""" + class TaskRunner(Protocol): """A task runner.""" @@ -31,7 +34,7 @@ def run_stream( task: str, *, cancellation_token: CancellationToken | None = None, - ) -> AsyncGenerator[InnerMessage | ChatMessage | TaskResult, None]: + ) -> AsyncGenerator[AgentMessage | TaskResult, None]: """Run the task and produces a stream of messages and the final result :class:`TaskResult` as the last item in the stream.""" ... diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/base/_termination.py b/python/packages/autogen-agentchat/src/autogen_agentchat/base/_termination.py index 1442dd51358a..859740fa093e 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/base/_termination.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/base/_termination.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from typing import List, Sequence -from ..messages import ChatMessage, StopMessage +from ..messages import AgentMessage, StopMessage class TerminatedException(BaseException): ... @@ -50,7 +50,7 @@ def terminated(self) -> bool: ... @abstractmethod - async def __call__(self, messages: Sequence[ChatMessage]) -> StopMessage | None: + async def __call__(self, messages: Sequence[AgentMessage]) -> StopMessage | None: """Check if the conversation should be terminated based on the messages received since the last time the condition was called. Return a StopMessage if the conversation should be terminated, or None otherwise. @@ -88,7 +88,7 @@ def __init__(self, *conditions: TerminationCondition) -> None: def terminated(self) -> bool: return all(condition.terminated for condition in self._conditions) - async def __call__(self, messages: Sequence[ChatMessage]) -> StopMessage | None: + async def __call__(self, messages: Sequence[AgentMessage]) -> StopMessage | None: if self.terminated: raise TerminatedException("Termination condition has already been reached.") # Check all remaining conditions. @@ -120,7 +120,7 @@ def __init__(self, *conditions: TerminationCondition) -> None: def terminated(self) -> bool: return any(condition.terminated for condition in self._conditions) - async def __call__(self, messages: Sequence[ChatMessage]) -> StopMessage | None: + async def __call__(self, messages: Sequence[AgentMessage]) -> StopMessage | None: if self.terminated: raise RuntimeError("Termination condition has already been reached") stop_messages = await asyncio.gather(*[condition(messages) for condition in self._conditions]) diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/logging/_console_log_handler.py b/python/packages/autogen-agentchat/src/autogen_agentchat/logging/_console_log_handler.py index 571cc875cec3..cc292e76c7c4 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/logging/_console_log_handler.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/logging/_console_log_handler.py @@ -3,62 +3,18 @@ import sys from datetime import datetime -from ..agents._assistant_agent import ToolCallEvent, ToolCallResultEvent -from ..messages import ChatMessage, StopMessage, TextMessage -from ..teams._events import ( - GroupChatPublishEvent, - GroupChatSelectSpeakerEvent, - TerminationEvent, -) +from pydantic import BaseModel class ConsoleLogHandler(logging.Handler): - @staticmethod - def serialize_chat_message(message: ChatMessage) -> str: - if isinstance(message, TextMessage | StopMessage): - return message.content - else: - d = message.model_dump() - assert "content" in d - return json.dumps(d["content"], indent=2) - def emit(self, record: logging.LogRecord) -> None: ts = datetime.fromtimestamp(record.created).isoformat() - if isinstance(record.msg, GroupChatPublishEvent): - if record.msg.source is None: - sys.stdout.write( - f"\n{'-'*75} \n" - f"\033[91m[{ts}]:\033[0m\n" - f"\n{self.serialize_chat_message(record.msg.agent_message)}" - ) - else: - sys.stdout.write( - f"\n{'-'*75} \n" - f"\033[91m[{ts}], {record.msg.source.type}:\033[0m\n" - f"\n{self.serialize_chat_message(record.msg.agent_message)}" - ) - sys.stdout.flush() - elif isinstance(record.msg, ToolCallEvent): - sys.stdout.write( - f"\n{'-'*75} \n" f"\033[91m[{ts}], Tool Call:\033[0m\n" f"\n{str(record.msg.model_dump())}" - ) - sys.stdout.flush() - elif isinstance(record.msg, ToolCallResultEvent): - sys.stdout.write( - f"\n{'-'*75} \n" f"\033[91m[{ts}], Tool Call Result:\033[0m\n" f"\n{str(record.msg.model_dump())}" - ) - sys.stdout.flush() - elif isinstance(record.msg, GroupChatSelectSpeakerEvent): - sys.stdout.write( - f"\n{'-'*75} \n" f"\033[91m[{ts}], Selected Next Speaker:\033[0m\n" f"\n{record.msg.selected_speaker}" - ) - sys.stdout.flush() - elif isinstance(record.msg, TerminationEvent): - sys.stdout.write( - f"\n{'-'*75} \n" - f"\033[91m[{ts}], Termination:\033[0m\n" - f"\n{self.serialize_chat_message(record.msg.agent_message)}" + if isinstance(record.msg, BaseModel): + record.msg = json.dumps( + { + "timestamp": ts, + "message": record.msg.model_dump_json(indent=2), + "type": record.msg.__class__.__name__, + }, ) - sys.stdout.flush() - else: - raise ValueError(f"Unexpected log record: {record.msg}") + sys.stdout.write(f"{record.msg}\n") diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/logging/_file_log_handler.py b/python/packages/autogen-agentchat/src/autogen_agentchat/logging/_file_log_handler.py index 9923f313d9e5..1e4b402da35a 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/logging/_file_log_handler.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/logging/_file_log_handler.py @@ -1,15 +1,8 @@ import json import logging -from dataclasses import asdict, is_dataclass from datetime import datetime -from typing import Any -from ..agents._assistant_agent import ToolCallEvent, ToolCallResultEvent -from ..teams._events import ( - GroupChatPublishEvent, - GroupChatSelectSpeakerEvent, - TerminationEvent, -) +from pydantic import BaseModel class FileLogHandler(logging.Handler): @@ -20,65 +13,12 @@ def __init__(self, filename: str) -> None: def emit(self, record: logging.LogRecord) -> None: ts = datetime.fromtimestamp(record.created).isoformat() - if isinstance(record.msg, GroupChatPublishEvent | TerminationEvent): - log_entry = json.dumps( + if isinstance(record.msg, BaseModel): + record.msg = json.dumps( { "timestamp": ts, - "source": record.msg.source, - "agent_message": record.msg.agent_message.model_dump(), + "message": record.msg.model_dump(), "type": record.msg.__class__.__name__, }, - default=self.json_serializer, ) - elif isinstance(record.msg, GroupChatSelectSpeakerEvent): - log_entry = json.dumps( - { - "timestamp": ts, - "source": record.msg.source, - "selected_speaker": record.msg.selected_speaker, - "type": "SelectSpeakerEvent", - }, - default=self.json_serializer, - ) - elif isinstance(record.msg, ToolCallEvent): - log_entry = json.dumps( - { - "timestamp": ts, - "tool_calls": record.msg.model_dump(), - "type": "ToolCallEvent", - }, - default=self.json_serializer, - ) - elif isinstance(record.msg, ToolCallResultEvent): - log_entry = json.dumps( - { - "timestamp": ts, - "tool_call_results": record.msg.model_dump(), - "type": "ToolCallResultEvent", - }, - default=self.json_serializer, - ) - else: - raise ValueError(f"Unexpected log record: {record.msg}") - file_record = logging.LogRecord( - name=record.name, - level=record.levelno, - pathname=record.pathname, - lineno=record.lineno, - msg=log_entry, - args=(), - exc_info=record.exc_info, - ) - self.file_handler.emit(file_record) - - def close(self) -> None: - self.file_handler.close() - super().close() - - @staticmethod - def json_serializer(obj: Any) -> Any: - if is_dataclass(obj) and not isinstance(obj, type): - return asdict(obj) - elif isinstance(obj, type): - return str(obj) - return str(obj) + self.file_handler.emit(record) diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/messages.py b/python/packages/autogen-agentchat/src/autogen_agentchat/messages.py index 1ac85edf1bd1..c2d2b7abf5cc 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/messages.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/messages.py @@ -2,7 +2,7 @@ from autogen_core.components import FunctionCall, Image from autogen_core.components.models import FunctionExecutionResult, RequestUsage -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict class BaseMessage(BaseModel): @@ -14,6 +14,8 @@ class BaseMessage(BaseModel): models_usage: RequestUsage | None = None """The model client usage incurred when producing this message.""" + model_config = ConfigDict(arbitrary_types_allowed=True) + class TextMessage(BaseMessage): """A text message.""" @@ -75,6 +77,10 @@ class ToolCallResultMessage(BaseMessage): """Messages for agent-to-agent communication.""" +AgentMessage = InnerMessage | ChatMessage +"""All message types.""" + + __all__ = [ "BaseMessage", "TextMessage", @@ -85,4 +91,6 @@ class ToolCallResultMessage(BaseMessage): "ToolCallMessage", "ToolCallResultMessage", "ChatMessage", + "InnerMessage", + "AgentMessage", ] diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/task/_terminations.py b/python/packages/autogen-agentchat/src/autogen_agentchat/task/_terminations.py index 825d5bea28e6..68662326c9dd 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/task/_terminations.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/task/_terminations.py @@ -1,7 +1,7 @@ from typing import Sequence from ..base import TerminatedException, TerminationCondition -from ..messages import ChatMessage, MultiModalMessage, StopMessage, TextMessage +from ..messages import AgentMessage, MultiModalMessage, StopMessage, TextMessage class StopMessageTermination(TerminationCondition): @@ -14,7 +14,7 @@ def __init__(self) -> None: def terminated(self) -> bool: return self._terminated - async def __call__(self, messages: Sequence[ChatMessage]) -> StopMessage | None: + async def __call__(self, messages: Sequence[AgentMessage]) -> StopMessage | None: if self._terminated: raise TerminatedException("Termination condition has already been reached") for message in messages: @@ -42,7 +42,7 @@ def __init__(self, max_messages: int) -> None: def terminated(self) -> bool: return self._message_count >= self._max_messages - async def __call__(self, messages: Sequence[ChatMessage]) -> StopMessage | None: + async def __call__(self, messages: Sequence[AgentMessage]) -> StopMessage | None: if self.terminated: raise TerminatedException("Termination condition has already been reached") self._message_count += len(messages) @@ -72,7 +72,7 @@ def __init__(self, text: str) -> None: def terminated(self) -> bool: return self._terminated - async def __call__(self, messages: Sequence[ChatMessage]) -> StopMessage | None: + async def __call__(self, messages: Sequence[AgentMessage]) -> StopMessage | None: if self._terminated: raise TerminatedException("Termination condition has already been reached") for message in messages: @@ -127,7 +127,7 @@ def terminated(self) -> bool: or (self._max_completion_token is not None and self._completion_token_count >= self._max_completion_token) ) - async def __call__(self, messages: Sequence[ChatMessage]) -> StopMessage | None: + async def __call__(self, messages: Sequence[AgentMessage]) -> StopMessage | None: if self.terminated: raise TerminatedException("Termination condition has already been reached") for message in messages: diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_events.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_events.py deleted file mode 100644 index 3442b35ce87a..000000000000 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_events.py +++ /dev/null @@ -1,51 +0,0 @@ -from autogen_core.base import AgentId -from pydantic import BaseModel, ConfigDict - -from ..messages import ChatMessage, StopMessage - - -class GroupChatPublishEvent(BaseModel): - """An group chat event for sharing some data. Agents receive this event should - update their internal state (e.g., append to message history) with the - content of the event. - """ - - agent_message: ChatMessage - """The message published by the agent.""" - - source: AgentId | None = None - """The agent ID that published the message.""" - - model_config = ConfigDict(arbitrary_types_allowed=True) - - -class GroupChatRequestPublishEvent(BaseModel): - """An event for requesting to publish a group chat publish event. - Upon receiving this event, the agent should publish a group chat publish event. - """ - - ... - - -class GroupChatSelectSpeakerEvent(BaseModel): - """An event for selecting the next speaker in a group chat.""" - - selected_speaker: str - """The name of the selected speaker.""" - - source: AgentId - """The agent ID that selected the speaker.""" - - model_config = ConfigDict(arbitrary_types_allowed=True) - - -class TerminationEvent(BaseModel): - """An event for terminating a conversation.""" - - agent_message: StopMessage - """The stop message that terminates the conversation.""" - - source: AgentId - """The agent ID that triggered the termination.""" - - model_config = ConfigDict(arbitrary_types_allowed=True) diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat.py index cd4efe8f350e..cc5c0e58138a 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat.py @@ -1,4 +1,5 @@ import asyncio +import logging import uuid from abc import ABC, abstractmethod from typing import AsyncGenerator, Callable, List @@ -15,11 +16,14 @@ ) from autogen_core.components import ClosureAgent, TypeSubscription +from ... import EVENT_LOGGER_NAME from ...base import ChatAgent, TaskResult, Team, TerminationCondition -from ...messages import ChatMessage, InnerMessage, TextMessage -from .._events import GroupChatPublishEvent, GroupChatRequestPublishEvent +from ...messages import AgentMessage, TextMessage from ._base_group_chat_manager import BaseGroupChatManager from ._chat_agent_container import ChatAgentContainer +from ._events import GroupChatMessage, GroupChatStart, GroupChatTermination + +event_logger = logging.getLogger(EVENT_LOGGER_NAME) class BaseGroupChat(Team, ABC): @@ -43,14 +47,16 @@ def __init__( self._team_id = str(uuid.uuid4()) self._base_group_chat_manager_class = group_chat_manager_class self._termination_condition = termination_condition + self._message_thread: List[AgentMessage] = [] @abstractmethod def _create_group_chat_manager_factory( self, - parent_topic_type: str, group_topic_type: str, + output_topic_type: str, participant_topic_types: List[str], participant_descriptions: List[str], + message_thread: List[AgentMessage], termination_condition: TerminationCondition | None, ) -> Callable[[], BaseGroupChatManager]: ... @@ -90,9 +96,14 @@ async def run_stream( task: str, *, cancellation_token: CancellationToken | None = None, - ) -> AsyncGenerator[InnerMessage | ChatMessage | TaskResult, None]: + ) -> AsyncGenerator[AgentMessage | TaskResult, None]: """Run the team and produces a stream of messages and the final result of the type :class:`TaskResult` as the last item in the stream.""" + + # TODO: runtime is currently a local variable, but it should be stored in + # a managed context so it can be accessed by all nested teams. Also, the runtime + # should be not be started or stopped by the team, but by the context. + # Create the runtime. runtime = SingleThreadedAgentRuntime() @@ -100,7 +111,6 @@ async def run_stream( group_chat_manager_agent_type = AgentType("group_chat_manager") group_chat_manager_topic_type = group_chat_manager_agent_type.type group_topic_type = "round_robin_group_topic" - team_topic_type = "team_topic" output_topic_type = "output_topic" # Register participants. @@ -128,10 +138,11 @@ async def run_stream( runtime, type=group_chat_manager_agent_type.type, factory=self._create_group_chat_manager_factory( - parent_topic_type=team_topic_type, group_topic_type=group_topic_type, + output_topic_type=output_topic_type, participant_topic_types=participant_topic_types, participant_descriptions=participant_descriptions, + message_thread=self._message_thread, termination_condition=self._termination_condition, ), ) @@ -142,21 +153,23 @@ async def run_stream( await runtime.add_subscription( TypeSubscription(topic_type=group_topic_type, agent_type=group_chat_manager_agent_type.type) ) - await runtime.add_subscription( - TypeSubscription(topic_type=team_topic_type, agent_type=group_chat_manager_agent_type.type) - ) - output_messages: List[InnerMessage | ChatMessage] = [] - output_message_queue: asyncio.Queue[InnerMessage | ChatMessage | None] = asyncio.Queue() + # Create a closure agent to collect the output messages. + stop_reason: str | None = None + output_message_queue: asyncio.Queue[AgentMessage | None] = asyncio.Queue() async def collect_output_messages( _runtime: AgentRuntime, id: AgentId, - message: InnerMessage | ChatMessage, + message: GroupChatStart | GroupChatMessage | GroupChatTermination, ctx: MessageContext, ) -> None: - output_messages.append(message) - await output_message_queue.put(message) + event_logger.info(message.message) + if isinstance(message, GroupChatTermination): + nonlocal stop_reason + stop_reason = message.message.content + return + await output_message_queue.put(message.message) await ClosureAgent.register( runtime, @@ -170,17 +183,12 @@ async def collect_output_messages( # Start the runtime. runtime.start() - # Run the team by publishing the task to the team topic and then requesting the result. - team_topic_id = TopicId(type=team_topic_type, source=self._team_id) - group_chat_manager_topic_id = TopicId(type=group_chat_manager_topic_type, source=self._team_id) + # Run the team by publishing the task to the group chat manager. first_chat_message = TextMessage(content=task, source="user") - output_messages.append(first_chat_message) - await output_message_queue.put(first_chat_message) await runtime.publish_message( - GroupChatPublishEvent(agent_message=first_chat_message), - topic_id=team_topic_id, + GroupChatStart(message=first_chat_message), + topic_id=TopicId(type=group_topic_type, source=self._team_id), ) - await runtime.publish_message(GroupChatRequestPublishEvent(), topic_id=group_chat_manager_topic_id) # Start a coroutine to stop the runtime and signal the output message queue is complete. async def stop_runtime() -> None: @@ -189,15 +197,18 @@ async def stop_runtime() -> None: shutdown_task = asyncio.create_task(stop_runtime()) + # Collect the output messages in order. + output_messages: List[AgentMessage] = [] # Yield the messsages until the queue is empty. while True: message = await output_message_queue.get() if message is None: break yield message + output_messages.append(message) # Wait for the shutdown task to finish. await shutdown_task # Yield the final result. - yield TaskResult(messages=output_messages) + yield TaskResult(messages=output_messages, stop_reason=stop_reason) diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat_manager.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat_manager.py index 68eb76c06e81..ab10c2b864c5 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat_manager.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat_manager.py @@ -1,139 +1,111 @@ -import logging from abc import ABC, abstractmethod from typing import Any, List from autogen_core.base import MessageContext from autogen_core.components import DefaultTopicId, event -from ... import EVENT_LOGGER_NAME from ...base import TerminationCondition -from .._events import ( - GroupChatPublishEvent, - GroupChatRequestPublishEvent, - TerminationEvent, -) +from ...messages import AgentMessage, StopMessage +from ._events import GroupChatAgentResponse, GroupChatRequestPublish, GroupChatStart, GroupChatTermination from ._sequential_routed_agent import SequentialRoutedAgent -event_logger = logging.getLogger(EVENT_LOGGER_NAME) - class BaseGroupChatManager(SequentialRoutedAgent, ABC): """Base class for a group chat manager that manages a group chat with multiple participants. It is the responsibility of the caller to ensure: - All participants must subscribe to the group chat topic and each of their own topics. - - The group chat manager must subscribe to the parent topic and the group chat topic. + - The group chat manager must subscribe to the group chat topic. - The agent types of the participants must be unique. - For each participant, the agent type must be the same as the topic type. Without the above conditions, the group chat will not function correctly. - - Args: - parent_topic_type (str): The topic type of the parent orchestrator. - group_topic_type (str): The topic type of the group chat. - participant_topic_types (List[str]): The topic types of the participants. - participant_descriptions (List[str]): The descriptions of the participants - termination_condition (TerminationCondition, optional): The termination condition for the group chat. Defaults to None. - - Raises: - ValueError: If the number of participant topic types, agent types, and descriptions are not the same. - ValueError: If the participant topic types are not unique. - ValueError: If the group topic type is in the participant topic types. - ValueError: If the parent topic type is in the participant topic types. - ValueError: If the group topic type is the same as the parent topic type. """ def __init__( self, - parent_topic_type: str, group_topic_type: str, + output_topic_type: str, participant_topic_types: List[str], participant_descriptions: List[str], + message_thread: List[AgentMessage], termination_condition: TerminationCondition | None = None, ): super().__init__(description="Group chat manager") - self._parent_topic_type = parent_topic_type self._group_topic_type = group_topic_type + self._output_topic_type = output_topic_type if len(participant_topic_types) != len(participant_descriptions): raise ValueError("The number of participant topic types, agent types, and descriptions must be the same.") if len(set(participant_topic_types)) != len(participant_topic_types): raise ValueError("The participant topic types must be unique.") if group_topic_type in participant_topic_types: raise ValueError("The group topic type must not be in the participant topic types.") - if parent_topic_type in participant_topic_types: - raise ValueError("The parent topic type must not be in the participant topic types.") - if group_topic_type == parent_topic_type: - raise ValueError("The group topic type must not be the same as the parent topic type.") self._participant_topic_types = participant_topic_types self._participant_descriptions = participant_descriptions - self._message_thread: List[GroupChatPublishEvent] = [] + self._message_thread = message_thread self._termination_condition = termination_condition @event - async def handle_content_publish(self, message: GroupChatPublishEvent, ctx: MessageContext) -> None: - """Handle a content publish event. - - If the event is from the parent topic, add the message to the thread. + async def handle_start(self, message: GroupChatStart, ctx: MessageContext) -> None: + """Handle the start of a group chat by selecting a speaker to start the conversation.""" - If the event is from the group chat topic, add the message to the thread and select a speaker to continue the conversation. - If the event from the group chat session requests a pause, publish the last message to the parent topic.""" - assert ctx.topic_id is not None - - event_logger.info(message) + # Log the start message. + await self.publish_message(message, topic_id=DefaultTopicId(type=self._output_topic_type)) + # Check if the conversation has already terminated. if self._termination_condition is not None and self._termination_condition.terminated: - # The group chat has been terminated. - return - - # Process event from parent. - if ctx.topic_id.type == self._parent_topic_type: - self._message_thread.append(message) + early_stop_message = StopMessage( + content="The group chat has already terminated.", source="Group chat manager" + ) await self.publish_message( - GroupChatPublishEvent(agent_message=message.agent_message, source=self.id), - topic_id=DefaultTopicId(type=self._group_topic_type), + GroupChatTermination(message=early_stop_message), topic_id=DefaultTopicId(type=self._output_topic_type) ) - if self._termination_condition is not None: - stop_message = await self._termination_condition([message.agent_message]) - if stop_message is not None: - event_logger.info(TerminationEvent(agent_message=stop_message, source=self.id)) - # Stop the group chat. + # Stop the group chat. return - # Process event from the group chat this agent manages. - assert ctx.topic_id.type == self._group_topic_type - self._message_thread.append(message) + # Append the user message to the message thread. + self._message_thread.append(message.message) # Check if the conversation should be terminated. if self._termination_condition is not None: - stop_message = await self._termination_condition([message.agent_message]) + stop_message = await self._termination_condition([message.message]) if stop_message is not None: - event_logger.info(TerminationEvent(agent_message=stop_message, source=self.id)) + await self.publish_message( + GroupChatTermination(message=stop_message), topic_id=DefaultTopicId(type=self._output_topic_type) + ) # Stop the group chat. - # TODO: this should be different if the group chat is nested. return - # Select a speaker to continue the conversation. speaker_topic_type = await self.select_speaker(self._message_thread) - - await self.publish_message(GroupChatRequestPublishEvent(), topic_id=DefaultTopicId(type=speaker_topic_type)) + await self.publish_message(GroupChatRequestPublish(), topic_id=DefaultTopicId(type=speaker_topic_type)) @event - async def handle_content_request(self, message: GroupChatRequestPublishEvent, ctx: MessageContext) -> None: - """Handle a content request by selecting a speaker to start the conversation.""" - assert ctx.topic_id is not None - if ctx.topic_id.type == self._group_topic_type: - raise RuntimeError("Content request event from the group chat topic is not allowed.") + async def handle_agent_response(self, message: GroupChatAgentResponse, ctx: MessageContext) -> None: + # Append the message to the message thread and construct the delta. + delta: List[AgentMessage] = [] + if message.agent_response.inner_messages is not None: + for inner_message in message.agent_response.inner_messages: + self._message_thread.append(inner_message) + delta.append(inner_message) + self._message_thread.append(message.agent_response.chat_message) + delta.append(message.agent_response.chat_message) - if self._termination_condition is not None and self._termination_condition.terminated: - # The group chat has been terminated. - return + # Check if the conversation should be terminated. + if self._termination_condition is not None: + stop_message = await self._termination_condition(delta) + if stop_message is not None: + await self.publish_message( + GroupChatTermination(message=stop_message), topic_id=DefaultTopicId(type=self._output_topic_type) + ) + # Stop the group chat. + return + # Select a speaker to continue the conversation. speaker_topic_type = await self.select_speaker(self._message_thread) - - await self.publish_message(GroupChatRequestPublishEvent(), topic_id=DefaultTopicId(type=speaker_topic_type)) + await self.publish_message(GroupChatRequestPublish(), topic_id=DefaultTopicId(type=speaker_topic_type)) @abstractmethod - async def select_speaker(self, thread: List[GroupChatPublishEvent]) -> str: + async def select_speaker(self, thread: List[AgentMessage]) -> str: """Select a speaker from the participants and return the topic type of the selected speaker.""" ... diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_chat_agent_container.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_chat_agent_container.py index 3fde3f6864b9..49325c39f044 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_chat_agent_container.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_chat_agent_container.py @@ -5,7 +5,7 @@ from ...base import ChatAgent, Response from ...messages import ChatMessage -from .._events import GroupChatPublishEvent, GroupChatRequestPublishEvent +from ._events import GroupChatAgentResponse, GroupChatMessage, GroupChatRequestPublish, GroupChatStart from ._sequential_routed_agent import SequentialRoutedAgent @@ -28,33 +28,41 @@ def __init__(self, parent_topic_type: str, output_topic_type: str, agent: ChatAg self._message_buffer: List[ChatMessage] = [] @event - async def handle_message(self, message: GroupChatPublishEvent, ctx: MessageContext) -> None: - """Handle an event by appending the content to the buffer.""" - self._message_buffer.append(message.agent_message) + async def handle_start(self, message: GroupChatStart, ctx: MessageContext) -> None: + """Handle a start event by appending the content to the buffer.""" + self._message_buffer.append(message.message) @event - async def handle_content_request(self, message: GroupChatRequestPublishEvent, ctx: MessageContext) -> None: + async def handle_agent_response(self, message: GroupChatAgentResponse, ctx: MessageContext) -> None: + """Handle an agent response event by appending the content to the buffer.""" + self._message_buffer.append(message.agent_response.chat_message) + + @event + async def handle_request(self, message: GroupChatRequestPublish, ctx: MessageContext) -> None: """Handle a content request event by passing the messages in the buffer to the delegate agent and publish the response.""" # Pass the messages in the buffer to the delegate agent. response: Response | None = None async for msg in self._agent.on_messages_stream(self._message_buffer, ctx.cancellation_token): if isinstance(msg, Response): + # Log the response. await self.publish_message( - msg.chat_message, + GroupChatMessage(message=msg.chat_message), topic_id=DefaultTopicId(type=self._output_topic_type), ) response = msg else: - # Publish the message to the output topic. - await self.publish_message(msg, topic_id=DefaultTopicId(type=self._output_topic_type)) + # Log the message. + await self.publish_message( + GroupChatMessage(message=msg), topic_id=DefaultTopicId(type=self._output_topic_type) + ) if response is None: raise ValueError("The agent did not produce a final response. Check the agent's on_messages_stream method.") # Publish the response to the group chat. self._message_buffer.clear() await self.publish_message( - GroupChatPublishEvent(agent_message=response.chat_message, source=self.id), + GroupChatAgentResponse(agent_response=response), topic_id=DefaultTopicId(type=self._parent_topic_type), ) diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_events.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_events.py new file mode 100644 index 000000000000..0b146c88667e --- /dev/null +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_events.py @@ -0,0 +1,38 @@ +from pydantic import BaseModel + +from ...base import Response +from ...messages import AgentMessage, ChatMessage, StopMessage + + +class GroupChatStart(BaseModel): + """A request to start a group chat.""" + + message: ChatMessage + """The user message that started the group chat.""" + + +class GroupChatAgentResponse(BaseModel): + """A response published to a group chat.""" + + agent_response: Response + """The response from an agent.""" + + +class GroupChatRequestPublish(BaseModel): + """A request to publish a message to a group chat.""" + + ... + + +class GroupChatMessage(BaseModel): + """A message from a group chat.""" + + message: AgentMessage + """The message that was published.""" + + +class GroupChatTermination(BaseModel): + """A message indicating that a group chat has terminated.""" + + message: StopMessage + """The stop message that indicates the reason of termination.""" diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_round_robin_group_chat.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_round_robin_group_chat.py index 3ef4a2e07ad0..4f355b58e66c 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_round_robin_group_chat.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_round_robin_group_chat.py @@ -1,44 +1,38 @@ -import logging from typing import Callable, List -from ... import EVENT_LOGGER_NAME from ...base import ChatAgent, TerminationCondition -from .._events import ( - GroupChatPublishEvent, - GroupChatSelectSpeakerEvent, -) +from ...messages import AgentMessage from ._base_group_chat import BaseGroupChat from ._base_group_chat_manager import BaseGroupChatManager -event_logger = logging.getLogger(EVENT_LOGGER_NAME) - class RoundRobinGroupChatManager(BaseGroupChatManager): """A group chat manager that selects the next speaker in a round-robin fashion.""" def __init__( self, - parent_topic_type: str, group_topic_type: str, + output_topic_type: str, participant_topic_types: List[str], participant_descriptions: List[str], + message_thread: List[AgentMessage], termination_condition: TerminationCondition | None, ) -> None: super().__init__( - parent_topic_type, group_topic_type, + output_topic_type, participant_topic_types, participant_descriptions, + message_thread, termination_condition, ) self._next_speaker_index = 0 - async def select_speaker(self, thread: List[GroupChatPublishEvent]) -> str: + async def select_speaker(self, thread: List[AgentMessage]) -> str: """Select a speaker from the participants in a round-robin fashion.""" current_speaker_index = self._next_speaker_index self._next_speaker_index = (current_speaker_index + 1) % len(self._participant_topic_types) current_speaker = self._participant_topic_types[current_speaker_index] - event_logger.debug(GroupChatSelectSpeakerEvent(selected_speaker=current_speaker, source=self.id)) return current_speaker @@ -126,18 +120,20 @@ def __init__( def _create_group_chat_manager_factory( self, - parent_topic_type: str, group_topic_type: str, + output_topic_type: str, participant_topic_types: List[str], participant_descriptions: List[str], + message_thread: List[AgentMessage], termination_condition: TerminationCondition | None, ) -> Callable[[], RoundRobinGroupChatManager]: def _factory() -> RoundRobinGroupChatManager: return RoundRobinGroupChatManager( - parent_topic_type, group_topic_type, + output_topic_type, participant_topic_types, participant_descriptions, + message_thread, termination_condition, ) diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_selector_group_chat.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_selector_group_chat.py index 63b73cb88c2b..a70f8494624f 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_selector_group_chat.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_selector_group_chat.py @@ -6,10 +6,15 @@ from ... import EVENT_LOGGER_NAME, TRACE_LOGGER_NAME from ...base import ChatAgent, TerminationCondition -from ...messages import ChatMessage, MultiModalMessage, StopMessage, TextMessage -from .._events import ( - GroupChatPublishEvent, - GroupChatSelectSpeakerEvent, +from ...messages import ( + AgentMessage, + HandoffMessage, + MultiModalMessage, + ResetMessage, + StopMessage, + TextMessage, + ToolCallMessage, + ToolCallResultMessage, ) from ._base_group_chat import BaseGroupChat from ._base_group_chat_manager import BaseGroupChatManager @@ -24,21 +29,23 @@ class SelectorGroupChatManager(BaseGroupChatManager): def __init__( self, - parent_topic_type: str, group_topic_type: str, + output_topic_type: str, participant_topic_types: List[str], participant_descriptions: List[str], + message_thread: List[AgentMessage], termination_condition: TerminationCondition | None, model_client: ChatCompletionClient, selector_prompt: str, allow_repeated_speaker: bool, - selector_func: Callable[[Sequence[ChatMessage]], str | None] | None, + selector_func: Callable[[Sequence[AgentMessage]], str | None] | None, ) -> None: super().__init__( - parent_topic_type, group_topic_type, + output_topic_type, participant_topic_types, participant_descriptions, + message_thread, termination_condition, ) self._model_client = model_client @@ -47,7 +54,7 @@ def __init__( self._allow_repeated_speaker = allow_repeated_speaker self._selector_func = selector_func - async def select_speaker(self, thread: List[GroupChatPublishEvent]) -> str: + async def select_speaker(self, thread: List[AgentMessage]) -> str: """Selects the next speaker in a group chat using a ChatCompletion client, with the selector function as override if it returns a speaker name. @@ -56,23 +63,20 @@ async def select_speaker(self, thread: List[GroupChatPublishEvent]) -> str: # Use the selector function if provided. if self._selector_func is not None: - speaker = self._selector_func([msg.agent_message for msg in thread]) + speaker = self._selector_func(thread) if speaker is not None: # Skip the model based selection. - event_logger.debug(GroupChatSelectSpeakerEvent(selected_speaker=speaker, source=self.id)) return speaker # Construct the history of the conversation. history_messages: List[str] = [] - for event in thread: - msg = event.agent_message - source = event.source - if source is None: - message = "" - else: - # The agent type must be the same as the topic type, which we use as the agent name. - message = f"{source.type}:" - if isinstance(msg, TextMessage | StopMessage): + for msg in thread: + if isinstance(msg, ToolCallMessage | ToolCallResultMessage | ResetMessage): + # Ignore tool call messages and reset messages. + continue + # The agent type must be the same as the topic type, which we use as the agent name. + message = f"{msg.source}:" + if isinstance(msg, TextMessage | StopMessage | HandoffMessage): message += f" {msg.content}" elif isinstance(msg, MultiModalMessage): for item in msg.content: @@ -123,7 +127,7 @@ async def select_speaker(self, thread: List[GroupChatPublishEvent]) -> str: else: agent_name = participants[0] self._previous_speaker = agent_name - event_logger.debug(GroupChatSelectSpeakerEvent(selected_speaker=agent_name, source=self.id)) + trace_logger.debug(f"Selected speaker: {agent_name}") return agent_name def _mentioned_agents(self, message_content: str, agent_names: List[str]) -> Dict[str, int]: @@ -175,7 +179,7 @@ class SelectorGroupChat(BaseGroupChat): Must contain '{roles}', '{participants}', and '{history}' to be filled in. allow_repeated_speaker (bool, optional): Whether to allow the same speaker to be selected consecutively. Defaults to False. - selector_func (Callable[[Sequence[ChatMessage]], str | None], optional): A custom selector + selector_func (Callable[[Sequence[AgentMessage]], str | None], optional): A custom selector function that takes the conversation history and returns the name of the next speaker. If provided, this function will be used to override the model to select the next speaker. If the function returns None, the model will be used to select the next speaker. @@ -311,7 +315,7 @@ def __init__( Read the above conversation. Then select the next role from {participants} to play. Only return the role. """, allow_repeated_speaker: bool = False, - selector_func: Callable[[Sequence[ChatMessage]], str | None] | None = None, + selector_func: Callable[[Sequence[AgentMessage]], str | None] | None = None, ): super().__init__( participants, group_chat_manager_class=SelectorGroupChatManager, termination_condition=termination_condition @@ -333,17 +337,19 @@ def __init__( def _create_group_chat_manager_factory( self, - parent_topic_type: str, group_topic_type: str, + output_topic_type: str, participant_topic_types: List[str], participant_descriptions: List[str], + message_thread: List[AgentMessage], termination_condition: TerminationCondition | None, ) -> Callable[[], BaseGroupChatManager]: return lambda: SelectorGroupChatManager( - parent_topic_type, group_topic_type, + output_topic_type, participant_topic_types, participant_descriptions, + message_thread, termination_condition, self._model_client, self._selector_prompt, diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_swarm_group_chat.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_swarm_group_chat.py index 0a6bf9ee73fa..bf464b526a7d 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_swarm_group_chat.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_swarm_group_chat.py @@ -3,11 +3,7 @@ from ... import EVENT_LOGGER_NAME from ...base import ChatAgent, TerminationCondition -from ...messages import HandoffMessage -from .._events import ( - GroupChatPublishEvent, - GroupChatSelectSpeakerEvent, -) +from ...messages import AgentMessage, HandoffMessage from ._base_group_chat import BaseGroupChat from ._base_group_chat_manager import BaseGroupChatManager @@ -19,28 +15,29 @@ class SwarmGroupChatManager(BaseGroupChatManager): def __init__( self, - parent_topic_type: str, group_topic_type: str, + output_topic_type: str, participant_topic_types: List[str], participant_descriptions: List[str], + message_thread: List[AgentMessage], termination_condition: TerminationCondition | None, ) -> None: super().__init__( - parent_topic_type, group_topic_type, + output_topic_type, participant_topic_types, participant_descriptions, + message_thread, termination_condition, ) self._current_speaker = participant_topic_types[0] - async def select_speaker(self, thread: List[GroupChatPublishEvent]) -> str: + async def select_speaker(self, thread: List[AgentMessage]) -> str: """Select a speaker from the participants based on handoff message.""" - if len(thread) > 0 and isinstance(thread[-1].agent_message, HandoffMessage): - self._current_speaker = thread[-1].agent_message.target + if len(thread) > 0 and isinstance(thread[-1], HandoffMessage): + self._current_speaker = thread[-1].target if self._current_speaker not in self._participant_topic_types: raise ValueError("The selected speaker in the handoff message is not a participant.") - event_logger.debug(GroupChatSelectSpeakerEvent(selected_speaker=self._current_speaker, source=self.id)) return self._current_speaker else: return self._current_speaker @@ -107,18 +104,20 @@ def __init__( def _create_group_chat_manager_factory( self, - parent_topic_type: str, group_topic_type: str, + output_topic_type: str, participant_topic_types: List[str], participant_descriptions: List[str], + message_thread: List[AgentMessage], termination_condition: TerminationCondition | None, ) -> Callable[[], SwarmGroupChatManager]: def _factory() -> SwarmGroupChatManager: return SwarmGroupChatManager( - parent_topic_type, group_topic_type, + output_topic_type, participant_topic_types, participant_descriptions, + message_thread, termination_condition, ) diff --git a/python/packages/autogen-agentchat/tests/test_group_chat.py b/python/packages/autogen-agentchat/tests/test_group_chat.py index 4922ceed9143..8f05c3f4977e 100644 --- a/python/packages/autogen-agentchat/tests/test_group_chat.py +++ b/python/packages/autogen-agentchat/tests/test_group_chat.py @@ -15,6 +15,7 @@ from autogen_agentchat.base import Response, TaskResult from autogen_agentchat.logging import FileLogHandler from autogen_agentchat.messages import ( + AgentMessage, ChatMessage, HandoffMessage, StopMessage, @@ -170,6 +171,8 @@ async def test_round_robin_group_chat(monkeypatch: pytest.MonkeyPatch) -> None: # Assert that all expected messages are in the collected messages assert normalized_messages == expected_messages + assert result.stop_reason is not None and result.stop_reason == "Text 'TERMINATE' mentioned" + # Test streaming. mock.reset() index = 0 @@ -260,6 +263,7 @@ async def test_round_robin_group_chat_with_tools(monkeypatch: pytest.MonkeyPatch assert isinstance(result.messages[3], TextMessage) # tool use agent response assert isinstance(result.messages[4], TextMessage) # echo agent response assert isinstance(result.messages[5], TextMessage) # tool use agent response + assert result.stop_reason is not None and result.stop_reason == "Text 'TERMINATE' mentioned" context = tool_use_agent._model_context # pyright: ignore assert context[0].content == "Write a program that prints 'Hello, world!'" @@ -365,6 +369,7 @@ async def test_selector_group_chat(monkeypatch: pytest.MonkeyPatch) -> None: assert result.messages[3].source == "agent1" assert result.messages[4].source == "agent2" assert result.messages[5].source == "agent1" + assert result.stop_reason is not None and result.stop_reason == "Text 'TERMINATE' mentioned" # Test streaming. mock.reset() @@ -418,6 +423,7 @@ async def test_selector_group_chat_two_speakers(monkeypatch: pytest.MonkeyPatch) assert result.messages[4].source == "agent1" # only one chat completion was called assert mock._curr_index == 1 # pyright: ignore + assert result.stop_reason is not None and result.stop_reason == "Text 'TERMINATE' mentioned" # Test streaming. mock.reset() @@ -485,6 +491,7 @@ async def test_selector_group_chat_two_speakers_allow_repeated(monkeypatch: pyte assert result.messages[1].source == "agent2" assert result.messages[2].source == "agent2" assert result.messages[3].source == "agent1" + assert result.stop_reason is not None and result.stop_reason == "Text 'TERMINATE' mentioned" # Test streaming. mock.reset() @@ -520,7 +527,7 @@ async def test_selector_group_chat_custom_selector(monkeypatch: pytest.MonkeyPat agent3 = _EchoAgent("agent3", description="echo agent 3") agent4 = _EchoAgent("agent4", description="echo agent 4") - def _select_agent(messages: Sequence[ChatMessage]) -> str | None: + def _select_agent(messages: Sequence[AgentMessage]) -> str | None: if len(messages) == 0: return "agent1" elif messages[-1].source == "agent1": @@ -546,6 +553,10 @@ def _select_agent(messages: Sequence[ChatMessage]) -> str | None: assert result.messages[3].source == "agent3" assert result.messages[4].source == "agent4" assert result.messages[5].source == "agent1" + assert ( + result.stop_reason is not None + and result.stop_reason == "Maximum number of messages 6 reached, current message count: 6" + ) class _HandOffAgent(BaseChatAgent): @@ -581,6 +592,10 @@ async def test_swarm_handoff() -> None: assert result.messages[3].content == "Transferred to second_agent." assert result.messages[4].content == "Transferred to third_agent." assert result.messages[5].content == "Transferred to first_agent." + assert ( + result.stop_reason is not None + and result.stop_reason == "Maximum number of messages 6 reached, current message count: 6" + ) # Test streaming. index = 0 @@ -668,6 +683,7 @@ async def test_swarm_handoff_using_tool_calls(monkeypatch: pytest.MonkeyPatch) - assert result.messages[4].content == "Transferred to agent1." assert result.messages[5].content == "Hello" assert result.messages[6].content == "TERMINATE" + assert result.stop_reason is not None and result.stop_reason == "Text 'TERMINATE' mentioned" # Test streaming. agent1._model_context.clear() # pyright: ignore diff --git a/python/packages/autogen-core/docs/src/user-guide/agentchat-user-guide/quickstart.ipynb b/python/packages/autogen-core/docs/src/user-guide/agentchat-user-guide/quickstart.ipynb index 684dcb838a7c..1a5875325de3 100644 --- a/python/packages/autogen-core/docs/src/user-guide/agentchat-user-guide/quickstart.ipynb +++ b/python/packages/autogen-core/docs/src/user-guide/agentchat-user-guide/quickstart.ipynb @@ -28,64 +28,56 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "\n", - "--------------------------------------------------------------------------- \n", - "\u001b[91m[2024-10-29T15:48:06.329810]:\u001b[0m\n", - "\n", - "What is the weather in New York?\n", - "--------------------------------------------------------------------------- \n", - "\u001b[91m[2024-10-29T15:48:08.085839], weather_agent:\u001b[0m\n", - "\n", - "The weather in New York is 73 degrees and sunny.\n", - "--------------------------------------------------------------------------- \n", - "\u001b[91m[2024-10-29T15:48:08.086180], Termination:\u001b[0m\n", - "\n", - "Maximum number of messages 2 reached, current message count: 2\n", - " TaskResult(messages=[TextMessage(source='user', content='What is the weather in New York?'), TextMessage(source='weather_agent', content='The weather in New York is 73 degrees and sunny.')])\n" + "source='user' models_usage=None content='What is the weather in New York?'\n", + "source='weather_agent' models_usage=RequestUsage(prompt_tokens=79, completion_tokens=15) content=[FunctionCall(id='call_CntvzLVL7iYJwPP2WWeBKNHc', arguments='{\"city\":\"New York\"}', name='get_weather')]\n", + "source='weather_agent' models_usage=None content=[FunctionExecutionResult(content='The weather in New York is 73 degrees and Sunny.', call_id='call_CntvzLVL7iYJwPP2WWeBKNHc')]\n", + "source='weather_agent' models_usage=RequestUsage(prompt_tokens=90, completion_tokens=14) content='The weather in New York is currently 73 degrees and sunny.'\n", + "source='weather_agent' models_usage=RequestUsage(prompt_tokens=137, completion_tokens=4) content='TERMINATE'\n", + "TaskResult(messages=[TextMessage(source='user', models_usage=None, content='What is the weather in New York?'), ToolCallMessage(source='weather_agent', models_usage=RequestUsage(prompt_tokens=79, completion_tokens=15), content=[FunctionCall(id='call_CntvzLVL7iYJwPP2WWeBKNHc', arguments='{\"city\":\"New York\"}', name='get_weather')]), ToolCallResultMessage(source='weather_agent', models_usage=None, content=[FunctionExecutionResult(content='The weather in New York is 73 degrees and Sunny.', call_id='call_CntvzLVL7iYJwPP2WWeBKNHc')]), TextMessage(source='weather_agent', models_usage=RequestUsage(prompt_tokens=90, completion_tokens=14), content='The weather in New York is currently 73 degrees and sunny.'), TextMessage(source='weather_agent', models_usage=RequestUsage(prompt_tokens=137, completion_tokens=4), content='TERMINATE')], stop_reason=\"Text 'TERMINATE' mentioned\")\n" ] } ], "source": [ - "import logging\n", - "\n", - "from autogen_agentchat import EVENT_LOGGER_NAME\n", "from autogen_agentchat.agents import AssistantAgent\n", - "from autogen_agentchat.logging import ConsoleLogHandler\n", - "from autogen_agentchat.task import MaxMessageTermination\n", + "from autogen_agentchat.task import TextMentionTermination\n", "from autogen_agentchat.teams import RoundRobinGroupChat\n", "from autogen_ext.models import OpenAIChatCompletionClient\n", "\n", - "# set up logging. You can define your own logger\n", - "logger = logging.getLogger(EVENT_LOGGER_NAME)\n", - "logger.addHandler(ConsoleLogHandler())\n", - "logger.setLevel(logging.INFO)\n", - "\n", "\n", - "# define a tool\n", + "# Define a tool\n", "async def get_weather(city: str) -> str:\n", " return f\"The weather in {city} is 73 degrees and Sunny.\"\n", "\n", "\n", - "# define an agent\n", - "weather_agent = AssistantAgent(\n", - " name=\"weather_agent\",\n", - " model_client=OpenAIChatCompletionClient(model=\"gpt-4o-2024-08-06\"),\n", - " tools=[get_weather],\n", - ")\n", + "async def main() -> None:\n", + " # Define an agent\n", + " weather_agent = AssistantAgent(\n", + " name=\"weather_agent\",\n", + " model_client=OpenAIChatCompletionClient(model=\"gpt-4o-2024-08-06\"),\n", + " tools=[get_weather],\n", + " )\n", + "\n", + " # Define termination condition\n", + " termination = TextMentionTermination(\"TERMINATE\")\n", + "\n", + " # Define a team\n", + " agent_team = RoundRobinGroupChat([weather_agent], termination_condition=termination)\n", + "\n", + " # Run the team and stream messages\n", + " stream = agent_team.run_stream(\"What is the weather in New York?\")\n", + " async for response in stream:\n", + " print(response)\n", + "\n", "\n", - "# add the agent to a team\n", - "termination = MaxMessageTermination(max_messages=2)\n", - "agent_team = RoundRobinGroupChat([weather_agent], termination_condition=termination)\n", - "# Note: if running in a Python file directly you'll need to use asyncio.run(agent_team.run(...)) instead of await agent_team.run(...)\n", - "result = await agent_team.run(task=\"What is the weather in New York?\")\n", - "print(\"\\n\", result)" + "# NOTE: if running this inside a Python script you'll need to use asyncio.run(main()).\n", + "await main()" ] }, {