MetaGPT/draw_results.py
2023-10-11 20:46:55 +08:00

171 lines
No EOL
7 KiB
Python

import re
import argparse
import os
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import math
def extract_logs(filename, start_time=None, end_time=None):
with open(filename, 'r') as f:
lines = f.readlines()
if start_time is None :
# 如果没有提供时间参数,则返回所有日志
return lines
# 截取开始时间和停止时间之间的日志块
logs_block = []
capture = False
for line in lines:
if start_time in line:
capture = True
if capture:
logs_block.append(line)
if end_time and end_time in line:
capture = False
break
return logs_block
def extract_time_from_last_round_zero(log_filename):
with open(log_filename, 'r') as f:
lines = f.readlines()
# 倒序遍历文件的每一行
for line in reversed(lines):
if "Config loading done" in line:
# 正则表达式匹配年月日 小时:分钟的格式
match = re.search(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2})', line)
if match:
return match.group(1)
return None
def analyze_log_block(logs_block):
rounds: list[int] = []
items_collected :list[int] = []
total_items = 0
positions:list[(int, int, int)] = []
completed_tasks: list[int] = []
failed_tasks: list[int] = []
round_start = False
check_for_info = False # 用于检查是否应在下几行中查找Position Inventory 的标志
line_after_message = 0 # 从 "Curriculum Agent human message" 开始计数的行数
for line in logs_block:
if "round_id:" in line:
n = int(re.search(r'round_id:(\d+)', line).group(1))
if n not in rounds:
rounds.append(n)
round_start = True
check_for_info = True
if round_start:
if "Curriculum Agent human message" in line:
line_after_message = 0
continue
if check_for_info:
line_after_message += 1
if line_after_message <= 20:
if "Position: x=" in line:
match = re.search(r'Position: x=([\d.-]+), y=([\d.-]+), z=([\d.-]+)', line)
x, y, z = float(match.group(1)), float(match.group(2)), float(match.group(3))
positions.append((x, y, z))
if "Inventory (" in line:
if ": Empty" in line:
items_collected.append(0)
else:
items = re.search(r'Inventory \(\d+/36\): ({.*?})', line).group(1)
items_dict = eval(items)
total_items = sum(items_dict.values())
items_collected.append(total_items)
if "Completed tasks so far:" in line:
tasks = line.replace("Completed tasks so far:", "").strip().split(", ")
completed_tasks.append(0 if tasks == ['None'] else len(tasks))
if "Failed tasks that are too hard:" in line:
tasks = line.replace("Failed tasks that are too hard:", "").strip().split(", ")
failed_tasks.append(0 if tasks == ['None'] else len(tasks))
check_for_info = False
round_start = False
min_len: int = min(len(rounds), len(items_collected), len(positions), len(completed_tasks), len(failed_tasks))
return rounds[:min_len], items_collected[:min_len], positions[:min_len], completed_tasks[:min_len], failed_tasks[:min_len]
def save_item_results_png(rounds, items_collected, start_time, path_prefix):
# item png
plt.plot(rounds, items_collected, '-o', label="Items Collected")
plt.xlabel("Round")
plt.ylabel("Total Items Collected")
plt.title("Items Collected Over Rounds")
plt.grid(True)
plt.savefig(f'{path_prefix}/{start_time}_items_collected_over_rounds.png', dpi=300)
plt.close()
def save_path_results_png(positions, start_time, path_prefix):
x_coords = [pos[0] for pos in positions]
y_coords = [pos[1] for pos in positions]
z_coords = [pos[2] for pos in positions]
# 计算总里程数
total_distance = 0
for i in range(1, len(positions)):
dx = positions[i][0] - positions[i - 1][0]
dy = positions[i][1] - positions[i - 1][1]
dz = positions[i][2] - positions[i - 1][2]
distance = math.sqrt(dx ** 2 + dy ** 2 + dz ** 2)
total_distance += distance
# 3D path png
fig = plt.figure(figsize=(10, 10))
ax = fig.add_subplot(111, projection='3d')
ax.plot(x_coords, y_coords, z_coords, '-o', color='blue', markersize=4)
ax.set_title("Bot Movement Path in 3D")
ax.set_xlabel("X Coordinate")
ax.set_ylabel("Y Coordinate")
ax.set_zlabel("Z Coordinate")
ax.text(min(x_coords), max(y_coords), max(z_coords), f"Total Distance: {total_distance:.2f} units", fontsize=15, color='red')
plt.savefig(f'{path_prefix}/{start_time}_bot_movement_3D_path.png', dpi=300)
plt.close()
def save_task_results_png(rounds , completed, failed, start_time, path_prefix):
plt.plot(rounds, completed, label='Completed Tasks', marker='o')
plt.plot(rounds, failed, label='Failed Tasks', marker='x')
plt.xlabel("Rounds")
plt.ylabel("Number of Tasks")
plt.title("Completed vs Failed Tasks per Round")
plt.grid(True)
plt.legend()
plt.savefig(f'{path_prefix}/{start_time}_task_results.png', dpi=300)
plt.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Analyze game log file between a start and end time.")
parser.add_argument('--start_time', type=str, default=None, nargs='?',
help="Start time for analysis in the log file.")
parser.add_argument('--end_time', type=str, default=None, nargs='?', help="End time for analysis in the log file.")
args = parser.parse_args()
current_script_path = os.path.dirname(os.path.abspath(__file__))
filename = f"{current_script_path}/logs/log.txt"
start_time = None
if args.start_time:
# 手动输入起止时间,注意要带上"xxx" 如"2023-10-08 03:14"
logs_block = extract_logs(filename, args.start_time, args.end_time)
else:
# 自动寻找最新的实验开始时间
start_time = extract_time_from_last_round_zero(filename)
logs_block = extract_logs(filename, start_time)
rounds, items_collected, positions, completed_tasks, failed_tasks= analyze_log_block(logs_block)
#save png
save_item_results_png(rounds, items_collected, args.start_time if args.start_time else start_time, current_script_path+"/results_pic")
save_path_results_png(positions, args.start_time if args.start_time else start_time, current_script_path+"/results_pic")
save_task_results_png(rounds, completed_tasks, failed_tasks, args.start_time if args.start_time else start_time, current_script_path+"/results_pic" )