Create Voyager_log_MetaGPT_statistical_caliber.py

This commit is contained in:
向劲宇 2023-10-23 16:55:08 +08:00 committed by GitHub
parent 0efa07c4c3
commit 4c0f7287d7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -0,0 +1,382 @@
import re
import argparse
import os
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import math
import numpy as np
import random
import seaborn as sns
import matplotlib as mpl
import pandas as pd
mpl.rcParams.update(mpl.rcParamsDefault)
def extract_logs(filename, start_time=None, end_time=None):
with open(filename, 'r',encoding='utf-8') as f:
lines = f.readlines()
if start_time is None :
# 如果没有提供时间参数,则返回所有日志
return lines
# 截取开始时间和停止时间之间的日志块
logs_block = []
capture = False
for line in lines:
if start_time in line:
capture = True
if capture:
logs_block.append(line)
if end_time and end_time in line:
capture = False
break
return logs_block
def extract_time_from_first_round_zero(log_filename):
with open(log_filename, 'r',encoding='utf-8') as f:
lines = f.readlines()
# 反向遍历文件的每一行
for line in reversed(lines):
if "****Recorder message:" in line:
if "iteration passed****" in line:
round_number = re.search(r'(\d+) iteration passed', line).group(1)
return round_number
return None
"""def analyze_log_block(logs_block):
rounds: list[int] = []
items_collected: list[int] = []
total_items = 0
positions:list[(int, int, int)] = []
completed_tasks: list[int] = []
failed_tasks: list[int] = []
items_variety_collected :list[int] = []
items_collected_dict :list[dict] = []
line_counter = 0 # 用于计数
round_counter = 0 # 用于计数轮数
for line in logs_block:
if "****Curriculum Agent human message****" in line:
line_counter = 0 # 当找到新的 "****Curriculum Agent human message****" 时重置计数器
round_counter += 1 # 每次找到 "****Curriculum Agent human message****"轮数加1
rounds.append(round_counter) # 将新的轮数添加到轮数列表中
continue
if line_counter < 50:
if "Position: x=" in line:
match = re.search(r'Position: x=([\d.-]+), y=([\d.-]+), z=([\d.-]+)', line)
x, y, z = float(match.group(1)), float(match.group(2)), float(match.group(3))
positions.append((x, y, z))
if "Inventory (" in line:
items = re.search(r'Inventory \(\d+/36\): ({.*?})', line).group(1)
items_dict = eval(items)
items_collected_dict.append(items_dict) # 将这一轮结束时的物品存储状态添加到列表中
total_items = sum(items_dict.values())
items_collected.append(total_items)
items_variety_collected.append(len(items_dict)) # 统计物品种类数量
if "Completed tasks so far:" in line:
tasks = line.replace("Completed tasks so far:", "").strip().split(", ")
completed_tasks.append(0 if tasks == ['None'] else len(tasks))
if "Failed tasks that are too hard:" in line:
tasks = line.replace("Failed tasks that are too hard:", "").strip().split(", ")
failed_tasks.append(0 if tasks == ['None'] else len(tasks))
line_counter += 1 # 每处理一行就将计数器加1"""
def analyze_log_block(logs_block):
rounds: list[int] = []
items_collected: list[int] = []
total_items = 0
positions:list[(int, int, int)] = []
completed_tasks: list[int] = []
failed_tasks: list[int] = []
items_variety_collected :list[int] = []
items_collected_dict :list[dict] = []
biomes: list[str] = [] # 存储所有出现过的生物群系
biomes_per_round: list[list[str]] = [] # 存储每轮结束时已经经历过的生物群系
new_biome_rounds: list[int] = [] # 存储添加新生物群系的轮次
processing_block = False # 用于标记是否正在处理一个块
round_counter = 0 # 用于计数轮数
for line in logs_block:
if "****Curriculum Agent human message****" in line:
processing_block = True # 当找到新的 "****Curriculum Agent human message****" 时开始处理块
round_counter += 1 # 每次找到 "****Curriculum Agent human message****"轮数加1
rounds.append(round_counter) # 将新的轮数添加到轮数列表中
continue
if "****Curriculum Agent ai message****" in line:
processing_block = False # 当找到 "****Curriculum Agent ai message****" 时停止处理块
continue
if processing_block: # 如果正在处理一个块,则执行以下操作
if "Position: x=" in line:
match = re.search(r'Position: x=([\d.-]+), y=([\d.-]+), z=([\d.-]+)', line)
x, y, z = float(match.group(1)), float(match.group(2)), float(match.group(3))
positions.append((x, y, z))
if "Inventory (" in line:
items = re.search(r'Inventory \(\d+/36\): ({.*?})', line).group(1)
items_dict = eval(items)
items_collected_dict.append(items_dict) # 将这一轮结束时的物品存储状态添加到列表中
total_items = sum(items_dict.values())
items_collected.append(total_items)
items_variety_collected.append(len(items_dict)) # 统计物品种类数量
if "Biome: " in line:
biome = line.replace("Biome: ", "").strip()
if biome not in biomes:
biomes.append(biome)
new_biome_rounds.append(rounds[-1]) # 记录添加新生物群系的轮次
biomes_per_round.append(biomes.copy()) # 记录当前已经经历过
if "Completed tasks so far:" in line:
tasks = line.replace("Completed tasks so far:", "").strip().split(", ")
completed_tasks.append(0 if tasks == ['None'] else len(tasks))
if "Failed tasks that are too hard:" in line:
tasks = line.replace("Failed tasks that are too hard:", "").strip().split(", ")
failed_tasks.append(0 if tasks == ['None'] else len(tasks))
min_len: int = min(len(rounds), len(items_collected), len(positions), len(completed_tasks), len(failed_tasks))
return rounds[:min_len], items_collected[:min_len], items_variety_collected[:min_len], items_collected_dict[:min_len], positions[:min_len], completed_tasks[:min_len], failed_tasks[:min_len], biomes, biomes_per_round[:min_len], new_biome_rounds
def save_item_results_png(rounds, items_collected, items_collected_dict, start_time, path_prefix):
items_collected_total = {}
items_collected_total_list = []
for round_index in range(len(rounds)):
for item, quantity in items_collected_dict[round_index].items():
if item in items_collected_total:
items_collected_total[item] = max(items_collected_total[item], quantity)
else:
items_collected_total[item] = quantity
items_collected_total_list.append(sum(items_collected_total.values()))
print("总数",items_collected_total_list)
plt.figure(figsize=(10, 5))
plt.plot(rounds, items_collected_total_list, label="Items Collected")
plt.xlabel("# of Rounds")
plt.ylabel("Total Items Collected")
plt.title("Items Collected Over Rounds")
plt.grid(True)
plt.legend()
ax = plt.gca()
ax.xaxis.set_major_locator(plt.MaxNLocator(integer=True))
ax.yaxis.set_major_locator(plt.MaxNLocator(integer=True))
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['bottom'].set_visible(False)
ax.spines['left'].set_visible(False)
ax.grid(color='gray', linestyle='-', linewidth=0.3, alpha=0.2)
# 设置x轴和y轴的箭头
ax.annotate('', xy=(1, 0), xycoords='axes fraction', xytext=(0, 0),
arrowprops=dict(arrowstyle="->", color='black'))
ax.annotate('', xy=(0, 1), xycoords='axes fraction', xytext=(0, 0),
arrowprops=dict(arrowstyle="->", color='black'))
plt.savefig(f'{path_prefix}/{start_time}_items_collected_over_rounds.png', dpi=300)
plt.close()
# 物品种类数量png
plt.figure(figsize=(10, 5))
plt.xlabel("# of Rounds")
plt.ylabel("Total Variety of Items Collected")
plt.title("Variety of Items Collected Over Rounds")
plt.grid(True)
special_items = ['wood', 'crafting_table', 'wooden_pickaxe', 'wooden_sword', 'wooden_axe', 'wooden_shovel', 'wooden_hoe',
'stone', 'furnace', 'stone_pickaxe', 'stone_sword', 'stone_axe', 'stone_shovel', 'stone_hoe',
'iron_ingot', 'iron_pickaxe', 'iron_sword', 'iron_axe', 'iron_shovel', 'iron_hoe',
'diamond', 'diamond_pickaxe', 'diamond_sword', 'diamond_axe', 'diamond_shovel', 'diamond_hoe']
collected_items_set = set()
items_variety_collected = []
for i in range(0, len(items_collected_dict)):
current_round_items_set = set(items_collected_dict[i])
diff_items = list(current_round_items_set - collected_items_set)
collected_items_set.update(current_round_items_set)
items_variety_collected.append(len(collected_items_set))
if diff_items:
for j, item in enumerate(diff_items):
# 判断物品类型,选择不同的背景颜色
if item in special_items:
color = 'black'
bgcolor = 'orange'
else:
color = 'black'
bgcolor = 'green'
# 避免文字重叠,通过调整 y 坐标的值
y = items_variety_collected[i]-1-1*j
plt.text(rounds[i], y, item, fontsize=4, color=color, ha='center', va='top',
bbox=dict(boxstyle='round,pad=0.5', fc=bgcolor, alpha=0.5))
"""for i in range(0, len(items_collected_dict)):
current_round_items_set = set(items_collected_dict[i])
diff_items = list(current_round_items_set - collected_items_set)
collected_items_set.update(current_round_items_set)
items_variety_collected.append(len(collected_items_set))
if diff_items:
for j, item in enumerate(diff_items):
color = 'red' if item in special_items else 'black'
plt.text(rounds[i], items_variety_collected[i] - 1 - j * 0.4, item, fontsize=8, color=color, ha='center', va='top',
bbox=dict(boxstyle='round,pad=0.5', fc='yellow', alpha=0.5))"""
plt.plot(rounds, items_variety_collected, label="Variety of Items Collected")
plt.legend()
ax = plt.gca()
# 设置坐标显示为整数
ax.xaxis.set_major_locator(plt.MaxNLocator(integer=True))
ax.yaxis.set_major_locator(plt.MaxNLocator(integer=True))
# 移除所有边框
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['bottom'].set_visible(False)
ax.spines['left'].set_visible(False)
# 设置网格线颜色和透明度
ax.grid(color='gray', linestyle='-', linewidth=0.3, alpha=0.2)
# 设置x轴和y轴的箭头
ax.annotate('', xy=(1, 0), xycoords='axes fraction', xytext=(0, 0),
arrowprops=dict(arrowstyle="->", color='black'))
ax.annotate('', xy=(0, 1), xycoords='axes fraction', xytext=(0, 0),
arrowprops=dict(arrowstyle="->", color='black'))
plt.savefig(f'{path_prefix}/{start_time}_variety_items_collected_over_rounds.png', dpi=600)
plt.close()
def save_path_results_png(positions, start_time, path_prefix):
x_coords = [pos[0] for pos in positions]
y_coords = [pos[1] for pos in positions]
z_coords = [pos[2] for pos in positions]
# 计算总里程数
total_distance = 0
for i in range(1, len(positions)):
dx = positions[i][0] - positions[i - 1][0]
dy = positions[i][1] - positions[i - 1][1]
dz = positions[i][2] - positions[i - 1][2]
distance = math.sqrt(dx ** 2 + dy ** 2 + dz ** 2)
total_distance += distance
# 3D path png
fig = plt.figure(figsize=(10, 10))
ax = fig.add_subplot(111, projection='3d')
ax.plot(x_coords, y_coords, z_coords, '-o', color='blue', markersize=4)
ax.set_title("Bot Movement Path in 3D")
ax.set_xlabel("X Coordinate")
ax.set_ylabel("Y Coordinate")
ax.set_zlabel("Z Coordinate")
ax.text(min(x_coords), max(y_coords), max(z_coords), f"Total Distance: {total_distance:.2f} units", fontsize=15, color='red')
ax.plot(x_coords[0], y_coords[0], z_coords[0], 'ro') # start point with red color
ax.plot(x_coords[-1], y_coords[-1], z_coords[-1], 'go') # end point with green color
plt.savefig(f'{path_prefix}/{start_time}_bot_movement_3D_path.png', dpi=300)
plt.close()
def save_task_results_png(rounds , completed, failed, start_time, path_prefix):
plt.plot(rounds, completed, label='Completed Tasks')
plt.plot(rounds, failed, label='Failed Tasks')
plt.xlabel("# of Rounds")
plt.ylabel("Number of Tasks")
plt.title("Completed vs Failed Tasks per Round")
plt.legend()
ax = plt.gca()
# 设置坐标显示为整数
ax.xaxis.set_major_locator(plt.MaxNLocator(integer=True))
ax.yaxis.set_major_locator(plt.MaxNLocator(integer=True))
# 移除所有边框
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['bottom'].set_visible(False)
ax.spines['left'].set_visible(False)
# 设置网格线颜色和透明度
ax.grid(color='gray', linestyle='-', linewidth=0.3, alpha=0.2)
# 设置x轴和y轴的箭头
ax.annotate('', xy=(1, 0), xycoords='axes fraction', xytext=(0, 0),
arrowprops=dict(arrowstyle="->", color='black'))
ax.annotate('', xy=(0, 1), xycoords='axes fraction', xytext=(0, 0),
arrowprops=dict(arrowstyle="->", color='black'))
plt.savefig(f'{path_prefix}/{start_time}_task_results.png', dpi=300)
plt.close()
def main():
# parser = argparse.ArgumentParser(description="Analyze game log file between a start and end time.")
# parser.add_argument('--start_time', type=str, default=None, nargs='?',
# help="Start time for analysis in the log file.")
# parser.add_argument('--end_time', type=str, default=None, nargs='?', help="End time for analysis in the log file.")
# args = parser.parse_args()
filename = r"C:\Users\86151\Desktop\数模国赛\新建文件夹\input\VG-2.txt"
path_prefix = r"C:\Users\86151\Desktop\数模国赛\新建文件夹\input\results_pic"
# 自动寻找最新的实验开始时间
filename = r"D:\MG-MC\input\VG-1.txt"
path_prefix = r"D:\MG-MC\results_pic"
rounds, items_collected, items_variety_collected, items_collected_dict, positions, completed_tasks, failed_tasks, biomes, biomes_per_round, new_biome_rounds = analyze_log_block(logs_block)
""" print(positions)
print(items_collected_dict)
print(items_variety_collected)
print(completed_tasks)
print(failed_tasks)
print(rounds)
print(new_biome_rounds)"""
#print(biomes_per_round)
#print(biomes)
collected_items_set = set()
total_items_variety = []
for i in range(0, len(items_collected_dict)):
current_round_items_set = set(items_collected_dict[i])
collected_items_set.update(current_round_items_set)
total_items_variety.append(len(collected_items_set))
print(total_items_variety)
#save png
start_time = start_time.replace(":", "_")
save_item_results_png(rounds, items_collected, items_collected_dict, start_time, path_prefix)
save_path_results_png(positions, start_time, path_prefix)
save_task_results_png(rounds, completed_tasks, failed_tasks, start_time, path_prefix)
#save_item_results_png(rounds, items_collected, items_variety_collected, items_collected_dict, start_time, path_prefix)
print("种类",total_items_variety)
print("轮次", rounds)
print("总数",items_collected)
print("成功",completed_tasks)
print("失败",failed_tasks)
print("位置",positions)
data = {'rounds': rounds,
'items_collected': items_collected,
'total_items_variety':total_items_variety,
'items_variety_collected': items_collected_dict,
'positions': positions,
'completed_tasks': completed_tasks,
'failed_tasks': failed_tasks}
# 创建一个DataFrame
df = pd.DataFrame(data)
# 创建一个新的DataFrame存储new_biome_rounds和biomes
df_new_biomes = pd.DataFrame({
'new_biome_rounds': new_biome_rounds,
'biomes': biomes
})
# 将新的DataFrame和原来的df进行合并
df = pd.concat([df, df_new_biomes], axis=1)
# 写入到Excel文件中
df.to_excel(fr'D:\MG-MC\results_pic\{start_time}_results.xlsx', index=False)
if __name__ == "__main__":
main()