-
Notifications
You must be signed in to change notification settings - Fork 120
/
Copy pathmanager.py
111 lines (91 loc) · 3.92 KB
/
manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from typing import AsyncGenerator, Union, Optional
import time
import json
from pathlib import Path
from .datamodel import TeamResult, TaskResult, TeamConfig
from autogen_agentchat.messages import AgentMessage, ChatMessage
from autogen_core.base import CancellationToken
from .provider import Provider
from .datamodel import TeamConfig
class TeamManager:
def __init__(self) -> None:
self.provider = Provider()
async def load_team_config(self, config_path: Union[str, Path]) -> TeamConfig:
"""Load team configuration from JSON file"""
if isinstance(config_path, str):
config_path = Path(config_path)
if not config_path.exists():
raise FileNotFoundError(
f"Team configuration file not found: {config_path}")
with config_path.open() as f:
team_json_spec = json.load(f)
# Convert JSON spec to TeamConfig format
if isinstance(team_json_spec, dict):
try:
return TeamConfig(**team_json_spec)
except Exception as e:
raise ValueError(f"Invalid team configuration: {str(e)}")
raise ValueError("Team configuration must be a JSON object")
async def run_stream(
self,
task: str,
team_config: Optional[Union[TeamConfig, str, Path]] = None,
cancellation_token: Optional[CancellationToken] = None
) -> AsyncGenerator[Union[AgentMessage, ChatMessage, TaskResult], None]:
"""Stream the team's execution results with optional JSON config loading"""
start_time = time.time()
try:
if isinstance(team_config, (str, Path)):
team_config = await self.load_team_config(team_config)
elif team_config is None:
# Load default team config if none provided
team_config = await self.load_team_config("notebooks/default_team.json")
# Use provider to create team from config
team = self.provider.load_team(team_config)
# Check if team supports streaming
if not hasattr(team, 'run_stream'):
raise NotImplementedError("Team does not support streaming")
stream = team.run_stream(
task=task,
cancellation_token=cancellation_token
)
async for message in stream:
if cancellation_token and cancellation_token.is_cancelled():
break
if isinstance(message, TaskResult):
yield TeamResult(
task_result=message,
usage="", # TODO: Implement token usage parsing
duration=time.time() - start_time
)
else:
yield message
except Exception as e:
raise e
async def run(
self,
task: str,
team_config: Optional[Union[TeamConfig, str, Path]] = None,
cancellation_token: Optional[CancellationToken] = None
) -> TeamResult:
"""Non-streaming run method with optional JSON config loading"""
start_time = time.time()
try:
if isinstance(team_config, (str, Path)):
team_config = await self.load_team_config(team_config)
elif team_config is None:
# Load default team config if none provided
team_config = await self.load_team_config("notebooks/default_team.json")
# Use provider to create team from config
team = self.provider.load_team(team_config)
result = await team.run(
task=task,
cancellation_token=cancellation_token
)
return TeamResult(
task_result=result,
usage="", # TODO: Implement token usage parsing
duration=time.time() - start_time
)
except Exception as e:
raise e