1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
| # 示例2:任务分配与执行系统
import time
from concurrent.futures import ThreadPoolExecutor
class Task:
def __init__(self, task_id: int, description: str, required_role: str):
self.task_id = task_id
self.description = description
self.required_role = required_role
self.completed = False
class WorkerAgent:
def __init__(self, name: str, role: str):
self.name = name
self.role = role
self.completed_tasks = []
def can_handle(self, task: Task) -> bool:
"""检查代理是否能处理任务"""
return self.role == task.required_role
def execute_task(self, task: Task):
"""执行任务"""
print(f"{self.name} ({self.role}) 开始执行任务: {task.description}")
time.sleep(1) # 模拟任务执行时间
task.completed = True
self.completed_tasks.append(task)
print(f"{self.name} 完成了任务 {task.task_id}")
def task_distribution_system():
"""任务分配系统"""
# 创建代理团队
agents = [
WorkerAgent("Alice", "研究员"),
WorkerAgent("Bob", "分析师"),
WorkerAgent("Charlie", "工程师")
]
# 创建任务队列
tasks = [
Task(1, "研究新算法", "研究员"),
Task(2, "分析用户数据", "分析师"),
Task(3, "优化系统性能", "工程师"),
Task(4, "设计实验方案", "研究员")
]
# 使用线程池并行处理任务
with ThreadPoolExecutor(max_workers=3) as executor:
for task in tasks:
# 找到能处理该任务的代理
suitable_agent = next((a for a in agents if a.can_handle(task)), None)
if suitable_agent:
executor.submit(suitable_agent.execute_task, task)
# 检查任务完成情况
print("\n任务完成情况:")
for task in tasks:
print(f"任务 {task.task_id}: {'已完成' if task.completed else '未完成'}")
task_distribution_system()
|