"""
Tests for VARMAX models

Author: Chad Fulton
License: Simplified-BSD
"""
import os
import platform
import re
import sys
import warnings

import numpy as np
from numpy.testing import assert_equal, assert_raises, assert_allclose
import pandas as pd
import pytest

from statsmodels.tsa.statespace import dynamic_factor
from .results import results_varmax, results_dynamic_factor
from statsmodels.iolib.summary import forg

current_path = os.path.dirname(os.path.abspath(__file__))

output_path = os.path.join('results', 'results_dynamic_factor_stata.csv')
output_results = pd.read_csv(os.path.join(current_path, output_path))

OSX_ARM = sys.platform == "darwin" and platform.processor() == "arm"


class CheckDynamicFactor:
    @classmethod
    def setup_class(cls, true, k_factors, factor_order, cov_type='approx',
                    included_vars=['dln_inv', 'dln_inc', 'dln_consump'],
                    demean=False, filter=True, **kwargs):
        cls.true = true
        # 1960:Q1 - 1982:Q4
        dta = pd.DataFrame(
            results_varmax.lutkepohl_data, columns=['inv', 'inc', 'consump'],
            index=pd.date_range('1960-01-01', '1982-10-01', freq='QS'))

        dta['dln_inv'] = np.log(dta['inv']).diff()
        dta['dln_inc'] = np.log(dta['inc']).diff()
        dta['dln_consump'] = np.log(dta['consump']).diff()

        endog = dta.loc['1960-04-01':'1978-10-01', included_vars]

        if demean:
            endog -= dta.iloc[1:][included_vars].mean()

        cls.model = dynamic_factor.DynamicFactor(endog, k_factors=k_factors,
                                                 factor_order=factor_order,
                                                 **kwargs)

        if filter:
            cls.results = cls.model.smooth(true['params'], cov_type=cov_type)

    def test_params(self):
        # Smoke test to make sure the start_params are well-defined and
        # lead to a well-defined model
        self.model.filter(self.model.start_params)
        # Similarly a smoke test for param_names
        assert_equal(len(self.model.start_params), len(self.model.param_names))
        # Finally make sure the transform and untransform do their job
        actual = self.model.transform_params(
            self.model.untransform_params(self.model.start_params))
        assert_allclose(actual, self.model.start_params)
        # Also in the case of enforce stationarity = False
        self.model.enforce_stationarity = False
        actual = self.model.transform_params(
            self.model.untransform_params(self.model.start_params))
        self.model.enforce_stationarity = True
        assert_allclose(actual, self.model.start_params)

    def test_results(self, close_figures):
        # Smoke test for creating the summary
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            self.results.summary()

        # Test cofficient matrix creation
        #  (via a different, more direct, method)
        if self.model.factor_order > 0:
            model = self.model
            k_factors = model.k_factors
            pft_params = self.results.params[model._params_factor_transition]
            coefficients = np.array(pft_params).reshape(
                k_factors, k_factors * model.factor_order)
            coefficient_matrices = np.array([
                coefficients[:self.model.k_factors,
                             i*self.model.k_factors:(i+1)*self.model.k_factors]
                for i in range(self.model.factor_order)
            ])
            assert_equal(
                self.results.coefficient_matrices_var,
                coefficient_matrices)
        else:
            assert_equal(self.results.coefficient_matrices_var, None)

    @pytest.mark.matplotlib
    def test_plot_coefficients_of_determination(self, close_figures):
        # Smoke test for plot_coefficients_of_determination
        self.results.plot_coefficients_of_determination()

    def test_no_enforce(self):
        return
        # Test that nothing goes wrong when we do not enforce stationarity
        params = self.model.untransform_params(self.true['params'])
        params[self.model._params_transition] = (
            self.true['params'][self.model._params_transition])
        self.model.enforce_stationarity = False
        results = self.model.filter(params, transformed=False)
        self.model.enforce_stationarity = True
        assert_allclose(results.llf, self.results.llf, rtol=1e-5)

    def test_mle(self, init_powell=True):
        with warnings.catch_warnings(record=True):
            warnings.simplefilter('always')
            start_params = self.model.start_params
            if init_powell:
                results = self.model.fit(method='powell',
                                         maxiter=100, disp=False)
                start_params = results.params
            results = self.model.fit(start_params, maxiter=1000, disp=False)
            results = self.model.fit(results.params, method='nm', maxiter=1000,
                                     disp=False)
            if not results.llf > self.results.llf:
                assert_allclose(results.llf, self.results.llf, rtol=1e-5)

    def test_loglike(self):
        assert_allclose(self.results.llf, self.true['loglike'], rtol=1e-6)

    def test_aic(self):
        # We only get 3 digits from Stata
        assert_allclose(self.results.aic, self.true['aic'], atol=3)

    def test_bic(self):
        # We only get 3 digits from Stata
        assert_allclose(self.results.bic, self.true['bic'], atol=3)

    def test_predict(self, **kwargs):
        # Tests predict + forecast
        self.results.predict(end='1982-10-01', **kwargs)
        assert_allclose(
            self.results.predict(end='1982-10-01', **kwargs),
            self.true['predict'],
            atol=1e-6)

    def test_dynamic_predict(self, **kwargs):
        # Tests predict + dynamic predict + forecast
        assert_allclose(
            self.results.predict(end='1982-10-01', dynamic='1961-01-01',
                                 **kwargs),
            self.true['dynamic_predict'],
            atol=1e-6)


class TestDynamicFactor(CheckDynamicFactor):
    """
    Test for a dynamic factor model with 1 AR(2) factor
    """
    @classmethod
    def setup_class(cls):
        true = results_dynamic_factor.lutkepohl_dfm.copy()
        true['predict'] = output_results.iloc[1:][[
            'predict_dfm_1', 'predict_dfm_2', 'predict_dfm_3']]
        true['dynamic_predict'] = output_results.iloc[1:][[
            'dyn_predict_dfm_1', 'dyn_predict_dfm_2', 'dyn_predict_dfm_3']]
        super().setup_class(
            true, k_factors=1, factor_order=2
        )

    def test_bse_approx(self):
        bse = self.results._cov_params_approx().diagonal()**0.5
        assert_allclose(bse, self.true['bse_oim'], atol=1e-5)


class TestDynamicFactor2(CheckDynamicFactor):
    """
    Test for a dynamic factor model with two VAR(1) factors
    """
    @classmethod
    def setup_class(cls):
        true = results_dynamic_factor.lutkepohl_dfm2.copy()
        true['predict'] = output_results.iloc[1:][[
            'predict_dfm2_1', 'predict_dfm2_2', 'predict_dfm2_3']]
        true['dynamic_predict'] = output_results.iloc[1:][[
            'dyn_predict_dfm2_1', 'dyn_predict_dfm2_2', 'dyn_predict_dfm2_3']]
        super().setup_class(
            true, k_factors=2, factor_order=1
        )

    def test_mle(self):
        # Stata's MLE on this model does not converge, so no reason to check
        pass

    def test_bse(self):
        # Stata's MLE on this model does not converge, and four of their
        # params do not even have bse (possibly they are still at starting
        # values?), so no reason to check this
        pass

    def test_aic(self):
        # Stata uses 9 df (i.e. 9 params) here instead of 13, because since the
        # model did not coverge, 4 of the parameters are not fully estimated
        # (possibly they are still at starting values?) so the AIC is off
        pass

    def test_bic(self):
        # Stata uses 9 df (i.e. 9 params) here instead of 13, because since the
        # model did not coverge, 4 of the parameters are not fully estimated
        # (possibly they are still at starting values?) so the BIC is off
        pass

    def test_summary(self):
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            summary = self.results.summary()
        tables = [str(table) for table in summary.tables]
        params = self.true['params']

        # Make sure we have the right number of tables
        assert_equal(
            len(tables),
            2 + self.model.k_endog + self.model.k_factors + 1)

        # Check the model overview table
        assert re.search(
            r'Model:.*DynamicFactor\(factors=2, order=1\)',
            tables[0])

        # For each endogenous variable, check the output
        for i in range(self.model.k_endog):
            offset_loading = self.model.k_factors * i
            table = tables[i + 2]

            # -> Make sure we have the right table / table name
            name = self.model.endog_names[i]
            assert re.search('Results for equation %s' % name, table)

            # -> Make sure it's the right size
            assert_equal(len(table.split('\n')), 7)

            # -> Check that we have the right coefficients
            assert re.search(
                'loading.f1 +' + forg(params[offset_loading + 0], prec=4),
                table)
            assert re.search(
                'loading.f2 +' + forg(params[offset_loading + 1], prec=4),
                table)

        # For each factor, check the output
        for i in range(self.model.k_factors):
            offset = (self.model.k_endog * (self.model.k_factors + 1) +
                      i * self.model.k_factors)
            table = tables[self.model.k_endog + i + 2]

            # -> Make sure we have the right table / table name
            name = self.model.endog_names[i]
            assert re.search('Results for factor equation f%d' % (i+1), table)

            # -> Make sure it's the right size
            assert_equal(len(table.split('\n')), 7)

            # -> Check that we have the right coefficients
            assert re.search('L1.f1 +' + forg(params[offset + 0], prec=4),
                             table)
            assert re.search('L1.f2 +' + forg(params[offset + 1], prec=4),
                             table)

        # Check the Error covariance matrix output
        table = tables[2 + self.model.k_endog + self.model.k_factors]

        # -> Make sure we have the right table / table name
        name = self.model.endog_names[i]
        assert re.search('Error covariance matrix', table)

        # -> Make sure it's the right size
        assert_equal(len(table.split('\n')), 8)

        # -> Check that we have the right coefficients
        offset = self.model.k_endog * self.model.k_factors
        for i in range(self.model.k_endog):
            iname = self.model.endog_names[i]
            iparam = forg(params[offset + i], prec=4)
            assert re.search(f'sigma2.{iname} +{iparam}', table)


class TestDynamicFactor_exog1(CheckDynamicFactor):
    """
    Test for a dynamic factor model with 1 exogenous regressor: a constant
    """
    @classmethod
    def setup_class(cls):
        true = results_dynamic_factor.lutkepohl_dfm_exog1.copy()
        true['predict'] = output_results.iloc[1:][[
            'predict_dfm_exog1_1',
            'predict_dfm_exog1_2',
            'predict_dfm_exog1_3']]
        true['dynamic_predict'] = output_results.iloc[1:][[
            'dyn_predict_dfm_exog1_1',
            'dyn_predict_dfm_exog1_2',
            'dyn_predict_dfm_exog1_3']]
        exog = np.ones((75, 1))
        super().setup_class(
            true, k_factors=1, factor_order=1, exog=exog
        )

    def test_predict(self):
        exog = np.ones((16, 1))
        super().test_predict(exog=exog)

    def test_dynamic_predict(self):
        exog = np.ones((16, 1))
        super().test_dynamic_predict(exog=exog)

    def test_bse_approx(self):
        bse = self.results._cov_params_approx().diagonal()**0.5
        assert_allclose(bse**2, self.true['var_oim'], atol=1e-5)


class TestDynamicFactor_exog2(CheckDynamicFactor):
    """
    Test for a dynamic factor model with 2 exogenous regressors: a constant
    and a time-trend
    """
    @classmethod
    def setup_class(cls):
        true = results_dynamic_factor.lutkepohl_dfm_exog2.copy()
        true['predict'] = output_results.iloc[1:][[
            'predict_dfm_exog2_1',
            'predict_dfm_exog2_2',
            'predict_dfm_exog2_3']]
        true['dynamic_predict'] = output_results.iloc[1:][[
            'dyn_predict_dfm_exog2_1',
            'dyn_predict_dfm_exog2_2',
            'dyn_predict_dfm_exog2_3']]
        exog = np.c_[np.ones((75, 1)), (np.arange(75) + 2)[:, np.newaxis]]
        super().setup_class(
            true, k_factors=1, factor_order=1, exog=exog
        )

    def test_bse_approx(self):
        bse = self.results._cov_params_approx().diagonal()**0.5
        assert_allclose(bse**2, self.true['var_oim'], atol=1e-5)

    def test_predict(self):
        exog = np.c_[np.ones((16, 1)),
                     (np.arange(75, 75+16) + 2)[:, np.newaxis]]
        super().test_predict(exog=exog)

    def test_dynamic_predict(self):
        exog = np.c_[np.ones((16, 1)),
                     (np.arange(75, 75+16) + 2)[:, np.newaxis]]
        super().test_dynamic_predict(exog=exog)

    def test_summary(self):
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            summary = self.results.summary()
        tables = [str(table) for table in summary.tables]
        params = self.true['params']

        # Make sure we have the right number of tables
        assert_equal(
            len(tables),
            2 + self.model.k_endog + self.model.k_factors + 1)

        # Check the model overview table
        assert re.search(r'Model:.*DynamicFactor\(factors=1, order=1\)',
                         tables[0])
        assert_equal(re.search(r'.*2 regressors', tables[0]) is None, False)

        # For each endogenous variable, check the output
        for i in range(self.model.k_endog):
            offset_loading = self.model.k_factors * i
            offset_exog = self.model.k_factors * self.model.k_endog
            table = tables[i + 2]

            # -> Make sure we have the right table / table name
            name = self.model.endog_names[i]
            assert re.search('Results for equation %s' % name, table)

            # -> Make sure it's the right size
            assert_equal(len(table.split('\n')), 8)

            # -> Check that we have the right coefficients
            assert re.search(
                'loading.f1 +' + forg(params[offset_loading + 0], prec=4),
                table)
            assert re.search(
                'beta.const +' + forg(params[offset_exog + i*2 + 0], prec=4),
                table)
            assert re.search(
                'beta.x1 +' + forg(params[offset_exog + i*2 + 1], prec=4),
                table)

        # For each factor, check the output
        for i in range(self.model.k_factors):
            offset = (self.model.k_endog * (self.model.k_factors + 3) +
                      i * self.model.k_factors)
            table = tables[self.model.k_endog + i + 2]

            # -> Make sure we have the right table / table name
            name = self.model.endog_names[i]
            assert re.search('Results for factor equation f%d' % (i+1), table)

            # -> Make sure it's the right size
            assert_equal(len(table.split('\n')), 6)

            # -> Check that we have the right coefficients
            assert re.search('L1.f1 +' + forg(params[offset + 0], prec=4),
                             table)

        # Check the Error covariance matrix output
        table = tables[2 + self.model.k_endog + self.model.k_factors]

        # -> Make sure we have the right table / table name
        name = self.model.endog_names[i]
        assert re.search('Error covariance matrix', table)

        # -> Make sure it's the right size
        assert_equal(len(table.split('\n')), 8)

        # -> Check that we have the right coefficients
        offset = self.model.k_endog * (self.model.k_factors + 2)
        for i in range(self.model.k_endog):
            iname = self.model.endog_names[i]
            iparam = forg(params[offset + i], prec=4)
            assert re.search(f'sigma2.{iname} +{iparam}', table)


class TestDynamicFactor_general_errors(CheckDynamicFactor):
    """
    Test for a dynamic factor model where errors are as general as possible,
    meaning:

    - Errors are vector autocorrelated, VAR(1)
    - Innovations are correlated
    """
    @classmethod
    def setup_class(cls):
        true = results_dynamic_factor.lutkepohl_dfm_gen.copy()
        true['predict'] = output_results.iloc[1:][[
            'predict_dfm_gen_1', 'predict_dfm_gen_2', 'predict_dfm_gen_3']]
        true['dynamic_predict'] = output_results.iloc[1:][[
            'dyn_predict_dfm_gen_1',
            'dyn_predict_dfm_gen_2',
            'dyn_predict_dfm_gen_3']]
        super().setup_class(
            true, k_factors=1, factor_order=1, error_var=True,
            error_order=1, error_cov_type='unstructured'
        )

    def test_bse_approx(self):
        bse = self.results._cov_params_approx().diagonal()
        assert_allclose(bse[:3], self.true['var_oim'][:3], atol=1e-5)
        assert_allclose(bse[-10:], self.true['var_oim'][-10:], atol=3e-4)

    @pytest.mark.skip("Known failure, no sequence of optimizers has been "
                      "found which can achieve the maximum.")
    def test_mle(self):
        # The following gets us to llf=546.53, which is still not good enough
        # llf = 300.842477412
        # res = mod.fit(method='lbfgs', maxiter=10000)
        # llf = 460.26576722
        # res = mod.fit(res.params, method='nm', maxiter=10000, maxfev=10000)
        # llf = 542.245718508
        # res = mod.fit(res.params, method='lbfgs', maxiter=10000)
        # llf = 544.035160955
        # res = mod.fit(res.params, method='nm', maxiter=10000, maxfev=10000)
        # llf = 557.442240083
        # res = mod.fit(res.params, method='lbfgs', maxiter=10000)
        # llf = 558.199513262
        # res = mod.fit(res.params, method='nm', maxiter=10000, maxfev=10000)
        # llf = 559.049076604
        # res = mod.fit(res.params, method='nm', maxiter=10000, maxfev=10000)
        # llf = 559.049076604
        # ...
        pass

    def test_summary(self):
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            summary = self.results.summary()
        tables = [str(table) for table in summary.tables]
        params = self.true['params']

        # Make sure we have the right number of tables
        assert_equal(
            len(tables),
            2 + self.model.k_endog + self.model.k_factors +
            self.model.k_endog + 1)

        # Check the model overview table
        assert re.search(r'Model:.*DynamicFactor\(factors=1, order=1\)',
                         tables[0])
        assert re.search(r'.*VAR\(1\) errors', tables[0])

        # For each endogenous variable, check the output
        for i in range(self.model.k_endog):
            offset_loading = self.model.k_factors * i
            table = tables[i + 2]

            # -> Make sure we have the right table / table name
            name = self.model.endog_names[i]
            assert re.search('Results for equation %s' % name, table)

            # -> Make sure it's the right size
            assert_equal(len(table.split('\n')), 6)

            # -> Check that we have the right coefficients
            pattern = 'loading.f1 +' + forg(params[offset_loading + 0], prec=4)
            assert re.search(pattern, table)

        # For each factor, check the output
        for i in range(self.model.k_factors):
            offset = (self.model.k_endog * self.model.k_factors +
                      6 + i * self.model.k_factors)
            table = tables[2 + self.model.k_endog + i]

            # -> Make sure we have the right table / table name
            name = self.model.endog_names[i]
            assert re.search('Results for factor equation f%d' % (i+1), table)

            # -> Make sure it's the right size
            assert_equal(len(table.split('\n')), 6)

            # -> Check that we have the right coefficients
            assert re.search('L1.f1 +' + forg(params[offset + 0], prec=4),
                             table)

        # For each error equation, check the output
        for i in range(self.model.k_endog):
            offset = (self.model.k_endog * (self.model.k_factors + i) +
                      6 + self.model.k_factors)
            table = tables[2 + self.model.k_endog + self.model.k_factors + i]

            # -> Make sure we have the right table / table name
            name = self.model.endog_names[i]
            assert re.search(r'Results for error equation e\(%s\)' % name,
                             table)

            # -> Make sure it's the right size
            assert_equal(len(table.split('\n')), 8)

            # -> Check that we have the right coefficients
            for j in range(self.model.k_endog):
                name = self.model.endog_names[j]
                pattern = r'L1.e\({}\) +{}'.format(
                    name,
                    forg(params[offset + j], prec=4)
                )
                assert re.search(pattern, table)

        # Check the Error covariance matrix output
        table = tables[2 + self.model.k_endog +
                       self.model.k_factors + self.model.k_endog]

        # -> Make sure we have the right table / table name
        name = self.model.endog_names[i]
        assert re.search('Error covariance matrix', table)

        # -> Make sure it's the right size
        assert_equal(len(table.split('\n')), 11)

        # -> Check that we have the right coefficients
        offset = self.model.k_endog * self.model.k_factors
        assert re.search(
            r'cov.chol\[1,1\] +' + forg(params[offset + 0], prec=4),
            table)
        assert re.search(
            r'cov.chol\[2,1\] +' + forg(params[offset + 1], prec=4),
            table)
        assert re.search(
            r'cov.chol\[2,2\] +' + forg(params[offset + 2], prec=4),
            table)
        assert re.search(
            r'cov.chol\[3,1\] +' + forg(params[offset+3], prec=4),
            table)
        assert re.search(
            r'cov.chol\[3,2\] +' + forg(params[offset+4], prec=4),
            table)
        assert re.search(
            r'cov.chol\[3,3\] +' + forg(params[offset + 5], prec=4),
            table)


class TestDynamicFactor_ar2_errors(CheckDynamicFactor):
    """
    Test for a dynamic factor model where errors are as general as possible,
    meaning:

    - Errors are vector autocorrelated, VAR(1)
    - Innovations are correlated
    """
    @classmethod
    def setup_class(cls):
        true = results_dynamic_factor.lutkepohl_dfm_ar2.copy()
        true['predict'] = output_results.iloc[1:][[
            'predict_dfm_ar2_1', 'predict_dfm_ar2_2', 'predict_dfm_ar2_3']]
        true['dynamic_predict'] = output_results.iloc[1:][[
            'dyn_predict_dfm_ar2_1',
            'dyn_predict_dfm_ar2_2',
            'dyn_predict_dfm_ar2_3']]
        super().setup_class(
            true, k_factors=1, factor_order=1, error_order=2)

    def test_bse_approx(self):
        bse = self.results._cov_params_approx().diagonal()
        assert_allclose(bse, self.true['var_oim'], atol=1e-5)

    @pytest.mark.xfail(OSX_ARM, reason="Known failure on OSX ARM", strict=False)
    def test_mle(self):
        with warnings.catch_warnings(record=True):
            # Depending on the system, this test can reach a greater precision,
            # but for cross-platform results keep it at 1e-2
            mod = self.model
            res1 = mod.fit(maxiter=100, optim_score='approx', disp=False)
            res = mod.fit(
                res1.params, method='nm', maxiter=10000,
                optim_score='approx', disp=False)
            # Added rtol to catch spurious failures on some platforms
            assert_allclose(res.llf, self.results.llf, atol=1e-2, rtol=1e-4)


class TestDynamicFactor_scalar_error(CheckDynamicFactor):
    """
    Test for a dynamic factor model where innovations are uncorrelated and
    are forced to have the same variance.
    """
    @classmethod
    def setup_class(cls):
        true = results_dynamic_factor.lutkepohl_dfm_scalar.copy()
        true['predict'] = output_results.iloc[1:][[
            'predict_dfm_scalar_1', 'predict_dfm_scalar_2',
            'predict_dfm_scalar_3']]
        true['dynamic_predict'] = output_results.iloc[1:][[
            'dyn_predict_dfm_scalar_1', 'dyn_predict_dfm_scalar_2',
            'dyn_predict_dfm_scalar_3']]
        exog = np.ones((75, 1))
        super().setup_class(
            true, k_factors=1, factor_order=1,
            exog=exog, error_cov_type='scalar')

    def test_bse_approx(self):
        bse = self.results._cov_params_approx().diagonal()
        assert_allclose(bse, self.true['var_oim'], atol=1e-5)

    def test_predict(self):
        exog = np.ones((16, 1))
        super().test_predict(exog=exog)

    def test_dynamic_predict(self):
        exog = np.ones((16, 1))
        super().test_dynamic_predict(exog=exog)


class TestStaticFactor(CheckDynamicFactor):
    """
    Test for a static factor model (i.e. factors are not autocorrelated).
    """
    @classmethod
    def setup_class(cls):
        true = results_dynamic_factor.lutkepohl_sfm.copy()
        true['predict'] = output_results.iloc[1:][[
            'predict_sfm_1', 'predict_sfm_2', 'predict_sfm_3']]
        true['dynamic_predict'] = output_results.iloc[1:][[
            'dyn_predict_sfm_1', 'dyn_predict_sfm_2', 'dyn_predict_sfm_3']]
        super().setup_class(
            true, k_factors=1, factor_order=0)

    def test_bse_approx(self):
        bse = self.results._cov_params_approx().diagonal()
        assert_allclose(bse, self.true['var_oim'], atol=1e-5)

    def test_bic(self):
        # Stata uses 5 df (i.e. 5 params) here instead of 6, because one param
        # is basically zero.
        pass


class TestSUR(CheckDynamicFactor):
    """
    Test for a seemingly unrelated regression model (i.e. no factors) with
    errors cross-sectionally, but not auto-, correlated
    """
    @classmethod
    def setup_class(cls):
        true = results_dynamic_factor.lutkepohl_sur.copy()
        true['predict'] = output_results.iloc[1:][[
            'predict_sur_1', 'predict_sur_2', 'predict_sur_3']]
        true['dynamic_predict'] = output_results.iloc[1:][[
            'dyn_predict_sur_1', 'dyn_predict_sur_2', 'dyn_predict_sur_3']]
        exog = np.c_[np.ones((75, 1)), (np.arange(75) + 2)[:, np.newaxis]]
        super().setup_class(
            true, k_factors=0, factor_order=0,
            exog=exog, error_cov_type='unstructured')

    def test_bse_approx(self):
        bse = self.results._cov_params_approx().diagonal()
        assert_allclose(bse[:6], self.true['var_oim'][:6], atol=1e-5)

    def test_predict(self):
        exog = np.c_[np.ones((16, 1)),
                     (np.arange(75, 75+16) + 2)[:, np.newaxis]]
        super().test_predict(exog=exog)

    def test_dynamic_predict(self):
        exog = np.c_[np.ones((16, 1)),
                     (np.arange(75, 75+16) + 2)[:, np.newaxis]]
        super().test_dynamic_predict(exog=exog)


class TestSUR_autocorrelated_errors(CheckDynamicFactor):
    """
    Test for a seemingly unrelated regression model (i.e. no factors) where
    the errors are vector autocorrelated, but innovations are uncorrelated.
    """
    @classmethod
    def setup_class(cls):
        true = results_dynamic_factor.lutkepohl_sur_auto.copy()
        true['predict'] = output_results.iloc[1:][[
            'predict_sur_auto_1', 'predict_sur_auto_2']]
        true['dynamic_predict'] = output_results.iloc[1:][[
            'dyn_predict_sur_auto_1', 'dyn_predict_sur_auto_2']]
        exog = np.c_[np.ones((75, 1)), (np.arange(75) + 2)[:, np.newaxis]]
        super().setup_class(
            true, k_factors=0, factor_order=0, exog=exog,
            error_order=1, error_var=True,
            error_cov_type='diagonal',
            included_vars=['dln_inv', 'dln_inc'])

    def test_bse_approx(self):
        bse = self.results._cov_params_approx().diagonal()
        assert_allclose(bse, self.true['var_oim'], atol=1e-5)

    def test_predict(self):
        exog = np.c_[np.ones((16, 1)),
                     (np.arange(75, 75+16) + 2)[:, np.newaxis]]
        super().test_predict(exog=exog)

    def test_dynamic_predict(self):
        exog = np.c_[np.ones((16, 1)),
                     (np.arange(75, 75+16) + 2)[:, np.newaxis]]
        super().test_dynamic_predict(exog=exog)

    def test_mle(self):
        super().test_mle(init_powell=False)


def test_misspecification():
    # Tests for model specification and misspecification exceptions
    endog = np.arange(20).reshape(10, 2)

    # Too few endog
    assert_raises(
        ValueError,
        dynamic_factor.DynamicFactor, endog[:, 0], k_factors=0, factor_order=0)

    # Too many factors
    assert_raises(
        ValueError,
        dynamic_factor.DynamicFactor, endog, k_factors=2, factor_order=1)

    # Bad error_cov_type specification
    assert_raises(
        ValueError,
        dynamic_factor.DynamicFactor,
        endog,
        k_factors=1, factor_order=1, order=(1, 0), error_cov_type='')


def test_miscellaneous():
    # Initialization with 1-dimensional exog array
    exog = np.arange(75)
    mod = CheckDynamicFactor()
    mod.setup_class(true=None, k_factors=1, factor_order=1,
                    exog=exog, filter=False)
    exog = pd.Series(np.arange(75),
                     index=pd.date_range(start='1960-04-01',
                                         end='1978-10-01', freq='QS'))
    mod = CheckDynamicFactor()
    mod.setup_class(
        true=None, k_factors=1, factor_order=1, exog=exog, filter=False)


def test_predict_custom_index():
    np.random.seed(328423)
    endog = pd.DataFrame(np.random.normal(size=(50, 2)))
    mod = dynamic_factor.DynamicFactor(endog, k_factors=1, factor_order=1)
    res = mod.smooth(mod.start_params)
    out = res.predict(start=1, end=1, index=['a'])
    assert_equal(out.index.equals(pd.Index(['a'])), True)


def test_forecast_exog():
    # Test forecasting with various shapes of `exog`
    nobs = 100
    endog = np.ones((nobs, 2)) * 2.0
    exog = np.ones(nobs)

    mod = dynamic_factor.DynamicFactor(endog, exog=exog, k_factors=1,
                                       factor_order=1)
    res = mod.smooth(np.r_[[0] * 2, 2.0, 2.0, 1, 1., 0.])

    # 1-step-ahead, valid
    exog_fcast_scalar = 1.
    exog_fcast_1dim = np.ones(1)
    exog_fcast_2dim = np.ones((1, 1))

    assert_allclose(res.forecast(1, exog=exog_fcast_scalar), 2.)
    assert_allclose(res.forecast(1, exog=exog_fcast_1dim), 2.)
    assert_allclose(res.forecast(1, exog=exog_fcast_2dim), 2.)

    # h-steps-ahead, valid
    h = 10
    exog_fcast_1dim = np.ones(h)
    exog_fcast_2dim = np.ones((h, 1))

    assert_allclose(res.forecast(h, exog=exog_fcast_1dim), 2.)
    assert_allclose(res.forecast(h, exog=exog_fcast_2dim), 2.)

    # h-steps-ahead, invalid
    assert_raises(ValueError, res.forecast, h, exog=1.)
    assert_raises(ValueError, res.forecast, h, exog=[1, 2])
    assert_raises(ValueError, res.forecast, h, exog=np.ones((h, 2)))


def check_equivalent_models(mod, mod2):
    attrs = [
        'k_factors', 'factor_order', 'error_order', 'error_var',
        'error_cov_type', 'enforce_stationarity', 'mle_regression', 'k_params']

    ssm_attrs = [
        'nobs', 'k_endog', 'k_states', 'k_posdef', 'obs_intercept', 'design',
        'obs_cov', 'state_intercept', 'transition', 'selection', 'state_cov']

    for attr in attrs:
        assert_equal(getattr(mod2, attr), getattr(mod, attr))

    for attr in ssm_attrs:
        assert_equal(getattr(mod2.ssm, attr), getattr(mod.ssm, attr))

    assert_equal(mod2._get_init_kwds(), mod._get_init_kwds())


def test_recreate_model():
    nobs = 100
    endog = np.ones((nobs, 3)) * 2.0
    exog = np.ones(nobs)

    k_factors = [0, 1, 2]
    factor_orders = [0, 1, 2]
    error_orders = [0, 1]
    error_vars = [False, True]
    error_cov_types = ['diagonal', 'scalar']

    import itertools
    names = ['k_factors', 'factor_order', 'error_order', 'error_var',
             'error_cov_type']
    for element in itertools.product(k_factors, factor_orders, error_orders,
                                     error_vars, error_cov_types):
        kwargs = dict(zip(names, element))

        mod = dynamic_factor.DynamicFactor(endog, exog=exog, **kwargs)
        mod2 = dynamic_factor.DynamicFactor(endog, exog=exog,
                                            **mod._get_init_kwds())
        check_equivalent_models(mod, mod2)


def test_append_results():
    endog = np.arange(200).reshape(100, 2)
    exog = np.ones(100)
    params = [0.1, -0.2, 1., 2., 1., 1., 0.5, 0.1]

    mod1 = dynamic_factor.DynamicFactor(
        endog, k_factors=1, factor_order=2, exog=exog)
    res1 = mod1.smooth(params)

    mod2 = dynamic_factor.DynamicFactor(
        endog[:50], k_factors=1, factor_order=2, exog=exog[:50])
    res2 = mod2.smooth(params)
    res3 = res2.append(endog[50:], exog=exog[50:])

    assert_equal(res1.specification, res3.specification)

    assert_allclose(res3.cov_params_default, res2.cov_params_default)
    for attr in ['nobs', 'llf', 'llf_obs', 'loglikelihood_burn']:
        assert_equal(getattr(res3, attr), getattr(res1, attr))

    for attr in [
            'filtered_state', 'filtered_state_cov', 'predicted_state',
            'predicted_state_cov', 'forecasts', 'forecasts_error',
            'forecasts_error_cov', 'standardized_forecasts_error',
            'forecasts_error_diffuse_cov', 'predicted_diffuse_state_cov',
            'scaled_smoothed_estimator',
            'scaled_smoothed_estimator_cov', 'smoothing_error',
            'smoothed_state',
            'smoothed_state_cov', 'smoothed_state_autocov',
            'smoothed_measurement_disturbance',
            'smoothed_state_disturbance',
            'smoothed_measurement_disturbance_cov',
            'smoothed_state_disturbance_cov']:
        assert_equal(getattr(res3, attr), getattr(res1, attr))

    assert_allclose(res3.forecast(10, exog=np.ones(10)),
                    res1.forecast(10, exog=np.ones(10)))


def test_extend_results():
    endog = np.arange(200).reshape(100, 2)
    exog = np.ones(100)
    params = [0.1, -0.2, 1., 2., 1., 1., 0.5, 0.1]

    mod1 = dynamic_factor.DynamicFactor(
        endog, k_factors=1, factor_order=2, exog=exog)
    res1 = mod1.smooth(params)

    mod2 = dynamic_factor.DynamicFactor(
        endog[:50], k_factors=1, factor_order=2, exog=exog[:50])
    res2 = mod2.smooth(params)
    res3 = res2.extend(endog[50:], exog=exog[50:])

    assert_allclose(res3.llf_obs, res1.llf_obs[50:])

    for attr in [
            'filtered_state', 'filtered_state_cov', 'predicted_state',
            'predicted_state_cov', 'forecasts', 'forecasts_error',
            'forecasts_error_cov', 'standardized_forecasts_error',
            'forecasts_error_diffuse_cov', 'predicted_diffuse_state_cov',
            'scaled_smoothed_estimator',
            'scaled_smoothed_estimator_cov', 'smoothing_error',
            'smoothed_state',
            'smoothed_state_cov', 'smoothed_state_autocov',
            'smoothed_measurement_disturbance',
            'smoothed_state_disturbance',
            'smoothed_measurement_disturbance_cov',
            'smoothed_state_disturbance_cov']:
        desired = getattr(res1, attr)
        if desired is not None:
            desired = desired[..., 50:]
        assert_equal(getattr(res3, attr), desired)

    assert_allclose(res3.forecast(10, exog=np.ones(10)),
                    res1.forecast(10, exog=np.ones(10)))


def test_apply_results():
    endog = np.arange(200).reshape(100, 2)
    exog = np.ones(100)
    params = [0.1, -0.2, 1., 2., 1., 1., 0.5, 0.1]

    mod1 = dynamic_factor.DynamicFactor(
        endog[:50], k_factors=1, factor_order=2, exog=exog[:50])
    res1 = mod1.smooth(params)

    mod2 = dynamic_factor.DynamicFactor(
        endog[50:], k_factors=1, factor_order=2, exog=exog[50:])
    res2 = mod2.smooth(params)

    res3 = res2.apply(endog[:50], exog=exog[:50])

    assert_equal(res1.specification, res3.specification)

    assert_allclose(res3.cov_params_default, res2.cov_params_default)
    for attr in ['nobs', 'llf', 'llf_obs', 'loglikelihood_burn']:
        assert_equal(getattr(res3, attr), getattr(res1, attr))

    for attr in [
            'filtered_state', 'filtered_state_cov', 'predicted_state',
            'predicted_state_cov', 'forecasts', 'forecasts_error',
            'forecasts_error_cov', 'standardized_forecasts_error',
            'forecasts_error_diffuse_cov', 'predicted_diffuse_state_cov',
            'scaled_smoothed_estimator',
            'scaled_smoothed_estimator_cov', 'smoothing_error',
            'smoothed_state',
            'smoothed_state_cov', 'smoothed_state_autocov',
            'smoothed_measurement_disturbance',
            'smoothed_state_disturbance',
            'smoothed_measurement_disturbance_cov',
            'smoothed_state_disturbance_cov']:
        assert_equal(getattr(res3, attr), getattr(res1, attr))

    assert_allclose(res3.forecast(10, exog=np.ones(10)),
                    res1.forecast(10, exog=np.ones(10)))


def test_start_params_nans():
    ix = pd.date_range('1960-01-01', '1982-10-01', freq='QS')
    dta = np.log(pd.DataFrame(
        results_varmax.lutkepohl_data, columns=['inv', 'inc', 'consump'],
        index=ix)).diff().iloc[1:]

    endog1 = dta.iloc[:-1]
    mod1 = dynamic_factor.DynamicFactor(endog1, k_factors=1, factor_order=1)
    endog2 = dta.copy()
    endog2.iloc[-1:] = np.nan
    mod2 = dynamic_factor.DynamicFactor(endog2, k_factors=1, factor_order=1)

    assert_allclose(mod2.start_params, mod1.start_params)
