- Joined
- Nov 5, 2023
- Messages
- 1
- Reaction score
- 0
I need to build a SARIMAX model. I tried to solve some of the errors I was encountering using ChatGPT so I don't know if what I'm doing makes sense. At the end, I need the forecast to be in the scale of the original data (a graph that goes from 2018 to 2023) with the trained data incorporated but in the original context (a graph with a trend, NOT a stationary graph). Whenever I manage to get a trend line as opposed to a graph with stationary data, the x-axis makes no sense and does not display the appropriate time values (2018 - 2023). I want my forecast to be for October (even though it's passed) & November 2023.
This is the code so far:
import pandas as pd
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
import itertools
import yfinance as yf
import statsmodels.api as sm
# %matplotlib inline
import warnings
warnings.filterwarnings("ignore")
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import adfuller as ADF
ticker = "SPY"
sp500 = yf.download("SPY", start="2018-10-01", end="2023-10-01") # this is to retrieve data for the S&P500 ETF (richer dataset with more data points compared to the daily closing values of the S&P500)
sp500['DATE'] = sp500.index.date
sp500.dropna(inplace=True)
sp500.set_index('DATE', inplace=True)
print(sp500.head(10))
print(sp500.dtypes)
df_sample = sp500[(sp500.index > datetime(2018, 10, 1).date())]
sp500['Daily Return'] = sp500['Close'].diff()
plt.figure(figsize=(10, 5))
plt.plot(sp500.index, sp500['Daily Return'], label='Daily Returns', color='darkblue')
plt.xlabel('Date')
plt.ylabel('Daily Returns')
plt.title('S&P500 Daily Returns')
plt.legend()
plt.grid(True)
plt.show()
sp500_reset = sp500.reset_index(drop=True)
fig, axes = plt.subplots(1, 2, figsize=(16, 8))
fig = sm.graphics.tsa.plot_acf(sp500.iloc[1:]['Close'], lags=30, ax=axes[0])
fig = sm.graphics.tsa.plot_pacf(sp500.iloc[1:]['Close'], lags=30, ax=axes[1])
from statsmodels.tsa.stattools import adfuller
def check_stationarity(timeseries):
result = adfuller(timeseries,autolag='AIC')
dfoutput = pd.Series(result[0:4], index=['Test Statistic', 'p-value', '#Lags Used','Number of Observations Used'])
print('The test statistic: %f' % result[0])
print('p-value: %f' % result[1])
print('Critical Values:')
for key, value in result[4].items():
print('%s: %.3f' % (key,value))
sp500.dropna(inplace=True)
check_stationarity(sp500.Close)
# Negative test statistic upon performing ADF test means data is STATIONARY
df_diff = sp500_reset['Close'].diff()
df_diff.plot(figsize=(16,9))
# Second differencing performed to improve data stationarity
df_diff2 = sp500_reset['Close'].diff().diff()
df_diff2.index = sp500_reset.index
df_diff2.dropna(inplace=True)
check_stationarity(df_diff2)
# Better test statistic after second differencing
# p-value = 0, null hypothesis of non-stationarity can be rejected
# The decomposition is performed using an additive model
# Meaning: observed data is assumed to be a combination of trend, seasonality, and residuals
from statsmodels.tsa.seasonal import seasonal_decompose
decompose = seasonal_decompose(sp500['Close'], model='additive', period = 30)
plt.rcParams["figure.figsize"] = [13, 7]
fig = decompose.plot()
p = d = q = range(0, 2)
pdq = list(itertools.product(p,d,q))
seasonal_pdq = [(x[0], x[1], x[2], 12) for x in list(itertools.product(p,d,q))]
from statsmodels.tsa.statespace.sarimax import SARIMAX
best_aic = float("inf")
best_params = None
for param in pdq:
for param_seasonal in seasonal_pdq:
try:
mod = SARIMAX(df_diff2,
order=param,
seasonal_order=param_seasonal,
enforce_stationarity=False,
enforce_invertibility=False)
results = mod.fit()
if results.aic < best_aic:
best_aic = results.aic
best_params = (param, param_seasonal)
print('SARIMAX{} x {}12 - AIC:{}'.format(param, param_seasonal, results.aic))
except:
continue
print("Best parameters")
print("AIC:", best_aic)
print("Order:", best_params[0])
print("Seasonal order:", best_params[1])
# Now we take the suggested parameters and fit them to SARIMAX
sarimax = SARIMAX(df_diff2,
order=(1,0,1),
seasonal_order=(0,1,1,12),
enforce_stationarity=False,
enforce_invertibility=False)
fit_sarimax = sarimax.fit()
print(fit_sarimax.summary().tables[1])
fit_sarimax.plot_diagnostics(figsize=(10,10))
plt.plot()
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_squared_error
# Time Series Cross-Validation (TSCV)
numsplits = 5
tcsv = TimeSeriesSplit(n_splits=numsplits)
# Range for potential training percentages
training_percent = [0.6, 0.7, 0.8, 0.9]
best_rmse = float("inf")
best_split = None
for train_percentage in training_percent:
for train_index, test_index in tcsv.split(df_diff2):
train_size = int(train_percentage * len(df_diff2))
train, test = df_diff2.iloc[train_index[:train_size]], df_diff2.iloc[test_index]
sarimax = SARIMAX(train,
order=best_params[0],
seasonal_order=best_params[1],
enforce_stationarity=False,
enforce_invertibility=False)
fit_sarimax = sarimax.fit(disp=-1)
forecast_steps = len(test)
forecast = fit_sarimax.get_forecast(steps=forecast_steps)
forecast_mean = forecast.predicted_mean
rmse = mean_squared_error(test, forecast_mean, squared=False)
if rmse < best_rmse:
best_rmse = rmse
best_split = (train_percentage, rmse)
print("Best Training %:", best_split[0])
print("Best RMSE:", best_split[1])
training_percentage = 0.6
data_size = len(df_diff2)
train_size = int(training_percentage * data_size)
train = df_diff2[:train_size]
test = df_diff2[train_size:]
train_size = int(len(sp500_reset) * 0.8)
train_orig = sp500_reset[:train_size]
test_orig = sp500_reset[train_size:]
forecast_start = pd.to_datetime('2023-10-01')
forecast_end = pd.to_datetime('2023-10-31')
forecast_index = pd.date_range(start=forecast_start, end=forecast_end, freq='D')
sarimax = SARIMAX(train,
order=(1, 0, 1),
seasonal_order=(0, 1, 1, 12),
enforce_stationarity=False,
enforce_invertibility=False)
fit_sarimax = sarimax.fit(disp=-1)
# Calculate the initial value (the last available value in the original data)
initial_value = sp500_reset['Close'].iloc[train_size - 1]
# Undifference the forecasted values
undifferenced_forecast = initial_value + np.cumsum(forecast.predicted_mean)
# Create a time index for the forecasted values (adjust it according to your time range)
forecast_index = test_orig.index
# Create a DataFrame for the undifferenced forecast
undifferenced_forecast_df = pd.DataFrame({'Close': undifferenced_forecast}, index=forecast_index)
# Plot the undifferenced forecast
plt.figure(figsize=(16, 9))
plt.plot(sp500_reset['Close'][:train_size], label='training')
plt.plot(sp500_reset['Close'][train_size:], label='actual')
plt.plot(undifferenced_forecast_df['Close'], label='forecast')
plt.title('Undifferenced Forecast vs Actuals')
plt.legend(loc='upper left', fontsize=8)
This is the code so far:
import pandas as pd
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
import itertools
import yfinance as yf
import statsmodels.api as sm
# %matplotlib inline
import warnings
warnings.filterwarnings("ignore")
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import adfuller as ADF
ticker = "SPY"
sp500 = yf.download("SPY", start="2018-10-01", end="2023-10-01") # this is to retrieve data for the S&P500 ETF (richer dataset with more data points compared to the daily closing values of the S&P500)
sp500['DATE'] = sp500.index.date
sp500.dropna(inplace=True)
sp500.set_index('DATE', inplace=True)
print(sp500.head(10))
print(sp500.dtypes)
df_sample = sp500[(sp500.index > datetime(2018, 10, 1).date())]
sp500['Daily Return'] = sp500['Close'].diff()
plt.figure(figsize=(10, 5))
plt.plot(sp500.index, sp500['Daily Return'], label='Daily Returns', color='darkblue')
plt.xlabel('Date')
plt.ylabel('Daily Returns')
plt.title('S&P500 Daily Returns')
plt.legend()
plt.grid(True)
plt.show()
sp500_reset = sp500.reset_index(drop=True)
fig, axes = plt.subplots(1, 2, figsize=(16, 8))
fig = sm.graphics.tsa.plot_acf(sp500.iloc[1:]['Close'], lags=30, ax=axes[0])
fig = sm.graphics.tsa.plot_pacf(sp500.iloc[1:]['Close'], lags=30, ax=axes[1])
from statsmodels.tsa.stattools import adfuller
def check_stationarity(timeseries):
result = adfuller(timeseries,autolag='AIC')
dfoutput = pd.Series(result[0:4], index=['Test Statistic', 'p-value', '#Lags Used','Number of Observations Used'])
print('The test statistic: %f' % result[0])
print('p-value: %f' % result[1])
print('Critical Values:')
for key, value in result[4].items():
print('%s: %.3f' % (key,value))
sp500.dropna(inplace=True)
check_stationarity(sp500.Close)
# Negative test statistic upon performing ADF test means data is STATIONARY
df_diff = sp500_reset['Close'].diff()
df_diff.plot(figsize=(16,9))
# Second differencing performed to improve data stationarity
df_diff2 = sp500_reset['Close'].diff().diff()
df_diff2.index = sp500_reset.index
df_diff2.dropna(inplace=True)
check_stationarity(df_diff2)
# Better test statistic after second differencing
# p-value = 0, null hypothesis of non-stationarity can be rejected
# The decomposition is performed using an additive model
# Meaning: observed data is assumed to be a combination of trend, seasonality, and residuals
from statsmodels.tsa.seasonal import seasonal_decompose
decompose = seasonal_decompose(sp500['Close'], model='additive', period = 30)
plt.rcParams["figure.figsize"] = [13, 7]
fig = decompose.plot()
p = d = q = range(0, 2)
pdq = list(itertools.product(p,d,q))
seasonal_pdq = [(x[0], x[1], x[2], 12) for x in list(itertools.product(p,d,q))]
from statsmodels.tsa.statespace.sarimax import SARIMAX
best_aic = float("inf")
best_params = None
for param in pdq:
for param_seasonal in seasonal_pdq:
try:
mod = SARIMAX(df_diff2,
order=param,
seasonal_order=param_seasonal,
enforce_stationarity=False,
enforce_invertibility=False)
results = mod.fit()
if results.aic < best_aic:
best_aic = results.aic
best_params = (param, param_seasonal)
print('SARIMAX{} x {}12 - AIC:{}'.format(param, param_seasonal, results.aic))
except:
continue
print("Best parameters")
print("AIC:", best_aic)
print("Order:", best_params[0])
print("Seasonal order:", best_params[1])
# Now we take the suggested parameters and fit them to SARIMAX
sarimax = SARIMAX(df_diff2,
order=(1,0,1),
seasonal_order=(0,1,1,12),
enforce_stationarity=False,
enforce_invertibility=False)
fit_sarimax = sarimax.fit()
print(fit_sarimax.summary().tables[1])
fit_sarimax.plot_diagnostics(figsize=(10,10))
plt.plot()
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_squared_error
# Time Series Cross-Validation (TSCV)
numsplits = 5
tcsv = TimeSeriesSplit(n_splits=numsplits)
# Range for potential training percentages
training_percent = [0.6, 0.7, 0.8, 0.9]
best_rmse = float("inf")
best_split = None
for train_percentage in training_percent:
for train_index, test_index in tcsv.split(df_diff2):
train_size = int(train_percentage * len(df_diff2))
train, test = df_diff2.iloc[train_index[:train_size]], df_diff2.iloc[test_index]
sarimax = SARIMAX(train,
order=best_params[0],
seasonal_order=best_params[1],
enforce_stationarity=False,
enforce_invertibility=False)
fit_sarimax = sarimax.fit(disp=-1)
forecast_steps = len(test)
forecast = fit_sarimax.get_forecast(steps=forecast_steps)
forecast_mean = forecast.predicted_mean
rmse = mean_squared_error(test, forecast_mean, squared=False)
if rmse < best_rmse:
best_rmse = rmse
best_split = (train_percentage, rmse)
print("Best Training %:", best_split[0])
print("Best RMSE:", best_split[1])
training_percentage = 0.6
data_size = len(df_diff2)
train_size = int(training_percentage * data_size)
train = df_diff2[:train_size]
test = df_diff2[train_size:]
train_size = int(len(sp500_reset) * 0.8)
train_orig = sp500_reset[:train_size]
test_orig = sp500_reset[train_size:]
forecast_start = pd.to_datetime('2023-10-01')
forecast_end = pd.to_datetime('2023-10-31')
forecast_index = pd.date_range(start=forecast_start, end=forecast_end, freq='D')
sarimax = SARIMAX(train,
order=(1, 0, 1),
seasonal_order=(0, 1, 1, 12),
enforce_stationarity=False,
enforce_invertibility=False)
fit_sarimax = sarimax.fit(disp=-1)
# Calculate the initial value (the last available value in the original data)
initial_value = sp500_reset['Close'].iloc[train_size - 1]
# Undifference the forecasted values
undifferenced_forecast = initial_value + np.cumsum(forecast.predicted_mean)
# Create a time index for the forecasted values (adjust it according to your time range)
forecast_index = test_orig.index
# Create a DataFrame for the undifferenced forecast
undifferenced_forecast_df = pd.DataFrame({'Close': undifferenced_forecast}, index=forecast_index)
# Plot the undifferenced forecast
plt.figure(figsize=(16, 9))
plt.plot(sp500_reset['Close'][:train_size], label='training')
plt.plot(sp500_reset['Close'][train_size:], label='actual')
plt.plot(undifferenced_forecast_df['Close'], label='forecast')
plt.title('Undifferenced Forecast vs Actuals')
plt.legend(loc='upper left', fontsize=8)