|
import numpy as np |
|
import pandas as pd |
|
|
|
|
|
def create_lag_features_for_single_day(data, random_index, lag_days): |
|
lag_features = [ |
|
column |
|
for column in data.columns |
|
if column |
|
in [ |
|
"O3", |
|
"NO2", |
|
"wind_speed", |
|
"mean_temp", |
|
"global_radiation", |
|
"percipitation", |
|
"pressure", |
|
"minimum_visibility", |
|
"humidity", |
|
] |
|
] |
|
lagged_data = {} |
|
for feature in lag_features: |
|
for lag in range(1, lag_days + 1): |
|
try: |
|
lagged_value = data.loc[random_index - lag, feature] |
|
lagged_data[f"{feature}_lag_{lag}"] = lagged_value |
|
except IndexError: |
|
print( |
|
f"Value not found for feature {feature} lagged by {lag} from day {random_index}" |
|
) |
|
continue |
|
|
|
|
|
current_data = data.iloc[random_index].to_dict() |
|
current_data.update(lagged_data) |
|
return pd.DataFrame([current_data]) |
|
|
|
|
|
def create_targets_for_single_day(data, random_index, target_column, days_ahead): |
|
targets = {} |
|
for day in range(1, days_ahead + 1): |
|
future_index = random_index + day |
|
try: |
|
targets[f"{target_column}_{day}_days_ahead"] = data.loc[ |
|
future_index, target_column |
|
] |
|
except IndexError: |
|
print( |
|
f"Value not found for particle {target_column} forwarded by {day} day" |
|
) |
|
|
|
return pd.DataFrame([targets]) |
|
|
|
|
|
def load_data_batch(data, target_particle, lag_days): |
|
data["date"] = pd.to_datetime(data["date"]) |
|
|
|
|
|
start_exclusion = pd.to_datetime("2022-01-01") - pd.Timedelta(days=3) |
|
end_exclusion = pd.to_datetime("2022-04-27") + pd.Timedelta(days=lag_days) |
|
valid_data = data[ |
|
~((data["date"] >= start_exclusion) & (data["date"] <= end_exclusion)) |
|
] |
|
valid_data = valid_data[ |
|
lag_days:-3 |
|
] |
|
|
|
|
|
random_index = np.random.choice(valid_data.index, 1)[0] |
|
|
|
|
|
train_data = create_lag_features_for_single_day(data, random_index, lag_days) |
|
targets = create_targets_for_single_day( |
|
data, random_index, target_particle, days_ahead=3 |
|
) |
|
|
|
return train_data, targets |
|
|
|
|
|
def create_features_and_targets( |
|
data, |
|
target_particle, |
|
lag_days=7, |
|
sma_days=7, |
|
days_ahead=3, |
|
): |
|
""" |
|
Creates lagged features, SMA features, last year's particle data (NO2 and O3) for specific days, |
|
sine and cosine transformations for 'weekday' and 'month', and target variables for the specified |
|
particle ('O3' or 'NO2') for the next 'days_ahead' days. Scales features and targets without |
|
disregarding outliers and saves the scalers for inverse scaling. Splits the data into train, |
|
validation, and test sets using the most recent dates. Prints the number of rows with missing |
|
values dropped from the dataset. |
|
|
|
Parameters: |
|
- data (pd.DataFrame): The input time-series dataset. |
|
- target_particle (str): The target particle ('O3' or 'NO2') for which targets are created. |
|
- lag_days (int): Number of lag days to create features for (default 7). |
|
- sma_days (int): Window size for Simple Moving Average (default 7). |
|
- days_ahead (int): Number of days ahead to create target variables for (default 3). |
|
|
|
Returns: |
|
- X_train_scaled (pd.DataFrame): Scaled training features. |
|
- y_train_scaled (pd.DataFrame): Scaled training targets. |
|
- X_val_scaled (pd.DataFrame): Scaled validation features (365 days). |
|
- y_val_scaled (pd.DataFrame): Scaled validation targets (365 days). |
|
- X_test_scaled (pd.DataFrame): Scaled test features (365 days). |
|
- y_test_scaled (pd.DataFrame): Scaled test targets (365 days). |
|
""" |
|
import warnings |
|
|
|
import joblib |
|
import numpy as np |
|
import pandas as pd |
|
from sklearn.preprocessing import StandardScaler |
|
|
|
warnings.filterwarnings("ignore") |
|
|
|
lag_features = [ |
|
"NO2", |
|
"O3", |
|
"wind_speed", |
|
"mean_temp", |
|
"global_radiation", |
|
"minimum_visibility", |
|
"humidity", |
|
] |
|
if target_particle == "NO2": |
|
lag_features = lag_features + ["percipitation", "pressure"] |
|
|
|
if target_particle not in ["O3", "NO2"]: |
|
raise ValueError("target_particle must be 'O3' or 'NO2'") |
|
|
|
data = data.copy() |
|
data["date"] = pd.to_datetime(data["date"]) |
|
data = data.sort_values("date").reset_index(drop=True) |
|
|
|
|
|
if "weekday" not in data.columns or data["weekday"].dtype == object: |
|
data["weekday"] = data["date"].dt.weekday |
|
if "month" not in data.columns: |
|
data["month"] = data["date"].dt.month |
|
|
|
|
|
data["weekday_sin"] = np.sin(2 * np.pi * data["weekday"] / 7) |
|
data["weekday_cos"] = np.cos(2 * np.pi * data["weekday"] / 7) |
|
data["month_sin"] = np.sin( |
|
2 * np.pi * (data["month"] - 1) / 12 |
|
) |
|
data["month_cos"] = np.cos(2 * np.pi * (data["month"] - 1) / 12) |
|
|
|
|
|
for feature in lag_features: |
|
for lag in range(1, lag_days + 1): |
|
data[f"{feature}_lag_{lag}"] = data[feature].shift(lag) |
|
|
|
|
|
for feature in lag_features: |
|
data[f"{feature}_sma_{sma_days}"] = ( |
|
data[feature].rolling(window=sma_days).mean() |
|
) |
|
|
|
|
|
|
|
data["O3_last_year"] = data["O3"].shift(365) |
|
data["NO2_last_year"] = data["NO2"].shift(365) |
|
|
|
|
|
for i in range(1, lag_days + 1): |
|
data[f"O3_last_year_{i}_days_before"] = data["O3"].shift(365 + i) |
|
data[f"NO2_last_year_{i}_days_before"] = data["NO2"].shift(365 + i) |
|
|
|
|
|
data["O3_last_year_3_days_after"] = data["O3"].shift(365 - 3) |
|
data["NO2_last_year_3_days_after"] = data["NO2"].shift(365 - 3) |
|
|
|
|
|
for day in range(1, days_ahead + 1): |
|
data[f"{target_particle}_plus_{day}_day"] = data[target_particle].shift(-day) |
|
|
|
|
|
rows_before = data.shape[0] |
|
|
|
|
|
data = data.dropna().reset_index(drop=True) |
|
|
|
|
|
rows_after = data.shape[0] |
|
|
|
|
|
rows_dropped = rows_before - rows_after |
|
print(f"Number of rows with missing values dropped: {rows_dropped}") |
|
|
|
|
|
total_days = data.shape[0] |
|
test_size = 365 |
|
val_size = 365 |
|
|
|
if total_days < test_size + val_size: |
|
raise ValueError( |
|
"Not enough data to create validation and test sets of 365 days each." |
|
) |
|
|
|
|
|
data = data.sort_values("date").reset_index(drop=True) |
|
|
|
|
|
train_data = data.iloc[: -(val_size + test_size)] |
|
val_data = data.iloc[-(val_size + test_size) : -test_size] |
|
test_data = data.iloc[-test_size:] |
|
|
|
|
|
target_cols = [ |
|
f"{target_particle}_plus_{day}_day" for day in range(1, days_ahead + 1) |
|
] |
|
|
|
|
|
exclude_cols = ["date", "weekday", "month"] + target_cols |
|
feature_cols = [col for col in data.columns if col not in exclude_cols] |
|
|
|
|
|
X_train = train_data[feature_cols] |
|
y_train = train_data[target_cols] |
|
|
|
X_val = val_data[feature_cols] |
|
y_val = val_data[target_cols] |
|
|
|
X_test = test_data[feature_cols] |
|
y_test = test_data[target_cols] |
|
|
|
|
|
feature_scaler = StandardScaler() |
|
target_scaler = StandardScaler() |
|
|
|
|
|
X_train_scaled = feature_scaler.fit_transform(X_train) |
|
y_train_scaled = target_scaler.fit_transform(y_train) |
|
|
|
|
|
X_val_scaled = feature_scaler.transform(X_val) |
|
y_val_scaled = target_scaler.transform(y_val) |
|
|
|
X_test_scaled = feature_scaler.transform(X_test) |
|
y_test_scaled = target_scaler.transform(y_test) |
|
|
|
|
|
X_train_scaled = pd.DataFrame( |
|
X_train_scaled, columns=feature_cols, index=X_train.index |
|
) |
|
y_train_scaled = pd.DataFrame( |
|
y_train_scaled, columns=target_cols, index=y_train.index |
|
) |
|
|
|
X_val_scaled = pd.DataFrame(X_val_scaled, columns=feature_cols, index=X_val.index) |
|
y_val_scaled = pd.DataFrame(y_val_scaled, columns=target_cols, index=y_val.index) |
|
|
|
X_test_scaled = pd.DataFrame( |
|
X_test_scaled, columns=feature_cols, index=X_test.index |
|
) |
|
y_test_scaled = pd.DataFrame(y_test_scaled, columns=target_cols, index=y_test.index) |
|
|
|
|
|
joblib.dump(feature_scaler, "feature_scaler.joblib") |
|
|
|
target_scaler_filename = f"target_scaler_{target_particle}.joblib" |
|
joblib.dump(target_scaler, target_scaler_filename) |
|
|
|
return ( |
|
X_train_scaled, |
|
y_train_scaled, |
|
X_val_scaled, |
|
y_val_scaled, |
|
X_test_scaled, |
|
y_test_scaled, |
|
) |
|
|