- Joined
- Nov 5, 2023

- Messages
- 1

- Reaction score
- 0

This is the code so far:

import pandas as pd

import numpy as np

from datetime import datetime

import matplotlib.pyplot as plt

import seaborn as sns

import itertools

import yfinance as yf

import statsmodels.api as sm

# %matplotlib inline

import warnings

warnings.filterwarnings("ignore")

from statsmodels.tsa.arima.model import ARIMA

from statsmodels.tsa.seasonal import seasonal_decompose

from statsmodels.tsa.stattools import adfuller as ADF

ticker = "SPY"

sp500 = yf.download("SPY", start="2018-10-01", end="2023-10-01") # this is to retrieve data for the S&P500 ETF (richer dataset with more data points compared to the daily closing values of the S&P500)

sp500['DATE'] = sp500.index.date

sp500.dropna(inplace=True)

sp500.set_index('DATE', inplace=True)

print(sp500.head(10))

print(sp500.dtypes)

df_sample = sp500[(sp500.index > datetime(2018, 10, 1).date())]

sp500['Daily Return'] = sp500['Close'].diff()

plt.figure(figsize=(10, 5))

plt.plot(sp500.index, sp500['Daily Return'], label='Daily Returns', color='darkblue')

plt.xlabel('Date')

plt.ylabel('Daily Returns')

plt.title('S&P500 Daily Returns')

plt.legend()

plt.grid(True)

plt.show()

sp500_reset = sp500.reset_index(drop=True)

fig, axes = plt.subplots(1, 2, figsize=(16, 8))

fig = sm.graphics.tsa.plot_acf(sp500.iloc[1:]['Close'], lags=30, ax=axes[0])

fig = sm.graphics.tsa.plot_pacf(sp500.iloc[1:]['Close'], lags=30, ax=axes[1])

from statsmodels.tsa.stattools import adfuller

def check_stationarity(timeseries):

result = adfuller(timeseries,autolag='AIC')

dfoutput = pd.Series(result[0:4], index=['Test Statistic', 'p-value', '#Lags Used','Number of Observations Used'])

print('The test statistic: %f' % result[0])

print('p-value: %f' % result[1])

print('Critical Values:')

for key, value in result[4].items():

print('%s: %.3f' % (key,value))

sp500.dropna(inplace=True)

check_stationarity(sp500.Close)

# Negative test statistic upon performing ADF test means data is STATIONARY

df_diff = sp500_reset['Close'].diff()

df_diff.plot(figsize=(16,9))

# Second differencing performed to improve data stationarity

df_diff2 = sp500_reset['Close'].diff().diff()

df_diff2.index = sp500_reset.index

df_diff2.dropna(inplace=True)

check_stationarity(df_diff2)

# Better test statistic after second differencing

# p-value = 0, null hypothesis of non-stationarity can be rejected

# The decomposition is performed using an additive model

# Meaning: observed data is assumed to be a combination of trend, seasonality, and residuals

from statsmodels.tsa.seasonal import seasonal_decompose

decompose = seasonal_decompose(sp500['Close'], model='additive', period = 30)

plt.rcParams["figure.figsize"] = [13, 7]

fig = decompose.plot()

p = d = q = range(0, 2)

pdq = list(itertools.product(p,d,q))

seasonal_pdq = [(x[0], x[1], x[2], 12) for x in list(itertools.product(p,d,q))]

from statsmodels.tsa.statespace.sarimax import SARIMAX

best_aic = float("inf")

best_params = None

for param in pdq:

for param_seasonal in seasonal_pdq:

try:

mod = SARIMAX(df_diff2,

order=param,

seasonal_order=param_seasonal,

enforce_stationarity=False,

enforce_invertibility=False)

results = mod.fit()

if results.aic < best_aic:

best_aic = results.aic

best_params = (param, param_seasonal)

print('SARIMAX{} x {}12 - AIC:{}'.format(param, param_seasonal, results.aic))

except:

continue

print("Best parameters")

print("AIC:", best_aic)

print("Order:", best_params[0])

print("Seasonal order:", best_params[1])

# Now we take the suggested parameters and fit them to SARIMAX

sarimax = SARIMAX(df_diff2,

order=(1,0,1),

seasonal_order=(0,1,1,12),

enforce_stationarity=False,

enforce_invertibility=False)

fit_sarimax = sarimax.fit()

print(fit_sarimax.summary().tables[1])

fit_sarimax.plot_diagnostics(figsize=(10,10))

plt.plot()

from sklearn.model_selection import TimeSeriesSplit

from sklearn.metrics import mean_squared_error

# Time Series Cross-Validation (TSCV)

numsplits = 5

tcsv = TimeSeriesSplit(n_splits=numsplits)

# Range for potential training percentages

training_percent = [0.6, 0.7, 0.8, 0.9]

best_rmse = float("inf")

best_split = None

for train_percentage in training_percent:

for train_index, test_index in tcsv.split(df_diff2):

train_size = int(train_percentage * len(df_diff2))

train, test = df_diff2.iloc[train_index[:train_size]], df_diff2.iloc[test_index]

sarimax = SARIMAX(train,

order=best_params[0],

seasonal_order=best_params[1],

enforce_stationarity=False,

enforce_invertibility=False)

fit_sarimax = sarimax.fit(disp=-1)

forecast_steps = len(test)

forecast = fit_sarimax.get_forecast(steps=forecast_steps)

forecast_mean = forecast.predicted_mean

rmse = mean_squared_error(test, forecast_mean, squared=False)

if rmse < best_rmse:

best_rmse = rmse

best_split = (train_percentage, rmse)

print("Best Training %:", best_split[0])

print("Best RMSE:", best_split[1])

training_percentage = 0.6

data_size = len(df_diff2)

train_size = int(training_percentage * data_size)

train = df_diff2[:train_size]

test = df_diff2[train_size:]

train_size = int(len(sp500_reset) * 0.8)

train_orig = sp500_reset[:train_size]

test_orig = sp500_reset[train_size:]

forecast_start = pd.to_datetime('2023-10-01')

forecast_end = pd.to_datetime('2023-10-31')

forecast_index = pd.date_range(start=forecast_start, end=forecast_end, freq='D')

sarimax = SARIMAX(train,

order=(1, 0, 1),

seasonal_order=(0, 1, 1, 12),

enforce_stationarity=False,

enforce_invertibility=False)

fit_sarimax = sarimax.fit(disp=-1)

# Calculate the initial value (the last available value in the original data)

initial_value = sp500_reset['Close'].iloc[train_size - 1]

# Undifference the forecasted values

undifferenced_forecast = initial_value + np.cumsum(forecast.predicted_mean)

# Create a time index for the forecasted values (adjust it according to your time range)

forecast_index = test_orig.index

# Create a DataFrame for the undifferenced forecast

undifferenced_forecast_df = pd.DataFrame({'Close': undifferenced_forecast}, index=forecast_index)

# Plot the undifferenced forecast

plt.figure(figsize=(16, 9))

plt.plot(sp500_reset['Close'][:train_size], label='training')

plt.plot(sp500_reset['Close'][train_size:], label='actual')

plt.plot(undifferenced_forecast_df['Close'], label='forecast')

plt.title('Undifferenced Forecast vs Actuals')

plt.legend(loc='upper left', fontsize=8)