willsh1997's picture
:wrench: remove codecarbon install, add slightly modified codecarbon pkg to repo
deb7c43
import csv
import os
from typing import List
import pandas as pd
from codecarbon.core.util import backup
from codecarbon.external.logger import logger
from codecarbon.output_methods.base_output import BaseOutput
from codecarbon.output_methods.emissions_data import EmissionsData, TaskEmissionsData
class FileOutput(BaseOutput):
"""
Saves experiment artifacts to a file
"""
def __init__(
self, output_file_name: str, output_dir: str, on_csv_write: str = "append"
):
if on_csv_write not in {"append", "update"}:
raise ValueError(
f"Unknown `on_csv_write` value: {on_csv_write}"
+ " (should be one of 'append' or 'update'"
)
self.output_file_name: str = output_file_name
if not os.path.exists(output_dir):
raise OSError(f"Folder '{output_dir}' doesn't exist !")
self.output_dir: str = output_dir
self.on_csv_write: str = on_csv_write
self.save_file_path = os.path.join(self.output_dir, self.output_file_name)
logger.info(
f"Emissions data (if any) will be saved to file {os.path.abspath(self.save_file_path)}"
)
def has_valid_headers(self, data: EmissionsData):
with open(self.save_file_path) as csv_file:
csv_reader = csv.DictReader(csv_file)
dict_from_csv = dict(list(csv_reader)[0])
list_of_column_names = list(dict_from_csv.keys())
return list(data.values.keys()) == list_of_column_names
def out(self, total: EmissionsData, delta: EmissionsData):
"""
Save the emissions data to a CSV file.
If the file already exists, append the new data to it.
param `delta` is not used in this method.
"""
file_exists: bool = os.path.isfile(self.save_file_path)
if file_exists and not self.has_valid_headers(total):
logger.warning("The CSV format have changed, backing up old emission file.")
backup(self.save_file_path)
file_exists = False
new_df = pd.DataFrame.from_records([dict(total.values)])
if not file_exists:
df = new_df
elif self.on_csv_write == "append":
df = pd.read_csv(self.save_file_path)
df = pd.concat([df, new_df])
else:
df = pd.read_csv(self.save_file_path)
df_run = df.loc[df.run_id == total.run_id]
if len(df_run) < 1:
df = pd.concat([df, new_df])
elif len(df_run) > 1:
logger.warning(
f"CSV contains more than 1 ({len(df_run)})"
+ f" rows with current run ID ({total.run_id})."
+ "Appending instead of updating."
)
df = pd.concat([df, new_df])
else:
df.at[df.run_id == total.run_id, total.values.keys()] = (
total.values.values()
)
df.to_csv(self.save_file_path, index=False)
def task_out(self, data: List[TaskEmissionsData], experiment_name: str):
run_id = data[0].run_id
save_task_file_path = os.path.join(
self.output_dir, "emissions_" + experiment_name + "_" + run_id + ".csv"
)
df = pd.DataFrame(columns=data[0].values.keys())
new_df = pd.DataFrame.from_records(
[dict(data_point.values) for data_point in data]
)
# Filter out empty or all-NA columns, to avoid warnings from Pandas
new_df = new_df.dropna(axis=1, how="all")
df = pd.concat([df, new_df], ignore_index=True)
df.to_csv(save_task_file_path, index=False)