"""Testing for the boost module (sklearn.ensemble.boost)."""

import re

import numpy as np
import pytest

from sklearn import datasets
from sklearn.base import BaseEstimator, clone
from sklearn.dummy import DummyClassifier, DummyRegressor
from sklearn.ensemble import AdaBoostClassifier, AdaBoostRegressor
from sklearn.ensemble._weight_boosting import _samme_proba
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.svm import SVC, SVR
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.utils import shuffle
from sklearn.utils._mocking import NoSampleWeightWrapper
from sklearn.utils._testing import (
    assert_allclose,
    assert_array_almost_equal,
    assert_array_equal,
)
from sklearn.utils.fixes import (
    COO_CONTAINERS,
    CSC_CONTAINERS,
    CSR_CONTAINERS,
    DOK_CONTAINERS,
    LIL_CONTAINERS,
)

# Common random state
rng = np.random.RandomState(0)

# Toy sample
X = [[-2, -1], [-1, -1], [-1, -2], [1, 1], [1, 2], [2, 1]]
y_class = ["foo", "foo", "foo", 1, 1, 1]  # test string class labels
y_regr = [-1, -1, -1, 1, 1, 1]
T = [[-1, -1], [2, 2], [3, 2]]
y_t_class = ["foo", 1, 1]
y_t_regr = [-1, 1, 1]

# Load the iris dataset and randomly permute it
iris = datasets.load_iris()
perm = rng.permutation(iris.target.size)
iris.data, iris.target = shuffle(iris.data, iris.target, random_state=rng)

# Load the diabetes dataset and randomly permute it
diabetes = datasets.load_diabetes()
diabetes.data, diabetes.target = shuffle(
    diabetes.data, diabetes.target, random_state=rng
)


def test_samme_proba():
    # Test the `_samme_proba` helper function.

    # Define some example (bad) `predict_proba` output.
    probs = np.array(
        [[1, 1e-6, 0], [0.19, 0.6, 0.2], [-999, 0.51, 0.5], [1e-6, 1, 1e-9]]
    )
    probs /= np.abs(probs.sum(axis=1))[:, np.newaxis]

    # _samme_proba calls estimator.predict_proba.
    # Make a mock object so I can control what gets returned.
    class MockEstimator:
        def predict_proba(self, X):
            assert_array_equal(X.shape, probs.shape)
            return probs

    mock = MockEstimator()

    samme_proba = _samme_proba(mock, 3, np.ones_like(probs))

    assert_array_equal(samme_proba.shape, probs.shape)
    assert np.isfinite(samme_proba).all()

    # Make sure that the correct elements come out as smallest --
    # `_samme_proba` should preserve the ordering in each example.
    assert_array_equal(np.argmin(samme_proba, axis=1), [2, 0, 0, 2])
    assert_array_equal(np.argmax(samme_proba, axis=1), [0, 1, 1, 1])


def test_oneclass_adaboost_proba():
    # Test predict_proba robustness for one class label input.
    # In response to issue #7501
    # https://github.com/scikit-learn/scikit-learn/issues/7501
    y_t = np.ones(len(X))
    clf = AdaBoostClassifier().fit(X, y_t)
    assert_array_almost_equal(clf.predict_proba(X), np.ones((len(X), 1)))


def test_classification_toy():
    # Check classification on a toy dataset.
    clf = AdaBoostClassifier(random_state=0)
    clf.fit(X, y_class)
    assert_array_equal(clf.predict(T), y_t_class)
    assert_array_equal(np.unique(np.asarray(y_t_class)), clf.classes_)
    assert clf.predict_proba(T).shape == (len(T), 2)
    assert clf.decision_function(T).shape == (len(T),)


def test_regression_toy():
    # Check classification on a toy dataset.
    clf = AdaBoostRegressor(random_state=0)
    clf.fit(X, y_regr)
    assert_array_equal(clf.predict(T), y_t_regr)


def test_iris():
    # Check consistency on dataset iris.
    classes = np.unique(iris.target)

    clf = AdaBoostClassifier()
    clf.fit(iris.data, iris.target)

    assert_array_equal(classes, clf.classes_)
    proba = clf.predict_proba(iris.data)

    assert proba.shape[1] == len(classes)
    assert clf.decision_function(iris.data).shape[1] == len(classes)

    score = clf.score(iris.data, iris.target)
    assert score > 0.9, f"Failed with {score = }"

    # Check we used multiple estimators
    assert len(clf.estimators_) > 1
    # Check for distinct random states (see issue #7408)
    assert len(set(est.random_state for est in clf.estimators_)) == len(clf.estimators_)


@pytest.mark.parametrize("loss", ["linear", "square", "exponential"])
def test_diabetes(loss):
    # Check consistency on dataset diabetes.
    reg = AdaBoostRegressor(loss=loss, random_state=0)
    reg.fit(diabetes.data, diabetes.target)
    score = reg.score(diabetes.data, diabetes.target)
    assert score > 0.55

    # Check we used multiple estimators
    assert len(reg.estimators_) > 1
    # Check for distinct random states (see issue #7408)
    assert len(set(est.random_state for est in reg.estimators_)) == len(reg.estimators_)


def test_staged_predict():
    # Check staged predictions.
    rng = np.random.RandomState(0)
    iris_weights = rng.randint(10, size=iris.target.shape)
    diabetes_weights = rng.randint(10, size=diabetes.target.shape)

    clf = AdaBoostClassifier(n_estimators=10)
    clf.fit(iris.data, iris.target, sample_weight=iris_weights)

    predictions = clf.predict(iris.data)
    staged_predictions = [p for p in clf.staged_predict(iris.data)]
    proba = clf.predict_proba(iris.data)
    staged_probas = [p for p in clf.staged_predict_proba(iris.data)]
    score = clf.score(iris.data, iris.target, sample_weight=iris_weights)
    staged_scores = [
        s for s in clf.staged_score(iris.data, iris.target, sample_weight=iris_weights)
    ]

    assert len(staged_predictions) == 10
    assert_array_almost_equal(predictions, staged_predictions[-1])
    assert len(staged_probas) == 10
    assert_array_almost_equal(proba, staged_probas[-1])
    assert len(staged_scores) == 10
    assert_array_almost_equal(score, staged_scores[-1])

    # AdaBoost regression
    clf = AdaBoostRegressor(n_estimators=10, random_state=0)
    clf.fit(diabetes.data, diabetes.target, sample_weight=diabetes_weights)

    predictions = clf.predict(diabetes.data)
    staged_predictions = [p for p in clf.staged_predict(diabetes.data)]
    score = clf.score(diabetes.data, diabetes.target, sample_weight=diabetes_weights)
    staged_scores = [
        s
        for s in clf.staged_score(
            diabetes.data, diabetes.target, sample_weight=diabetes_weights
        )
    ]

    assert len(staged_predictions) == 10
    assert_array_almost_equal(predictions, staged_predictions[-1])
    assert len(staged_scores) == 10
    assert_array_almost_equal(score, staged_scores[-1])


def test_gridsearch():
    # Check that base trees can be grid-searched.
    # AdaBoost classification
    boost = AdaBoostClassifier(estimator=DecisionTreeClassifier())
    parameters = {
        "n_estimators": (1, 2),
        "estimator__max_depth": (1, 2),
    }
    clf = GridSearchCV(boost, parameters)
    clf.fit(iris.data, iris.target)

    # AdaBoost regression
    boost = AdaBoostRegressor(estimator=DecisionTreeRegressor(), random_state=0)
    parameters = {"n_estimators": (1, 2), "estimator__max_depth": (1, 2)}
    clf = GridSearchCV(boost, parameters)
    clf.fit(diabetes.data, diabetes.target)


def test_pickle():
    # Check pickability.
    import pickle

    # Adaboost classifier
    obj = AdaBoostClassifier()
    obj.fit(iris.data, iris.target)
    score = obj.score(iris.data, iris.target)
    s = pickle.dumps(obj)

    obj2 = pickle.loads(s)
    assert type(obj2) == obj.__class__
    score2 = obj2.score(iris.data, iris.target)
    assert score == score2

    # Adaboost regressor
    obj = AdaBoostRegressor(random_state=0)
    obj.fit(diabetes.data, diabetes.target)
    score = obj.score(diabetes.data, diabetes.target)
    s = pickle.dumps(obj)

    obj2 = pickle.loads(s)
    assert type(obj2) == obj.__class__
    score2 = obj2.score(diabetes.data, diabetes.target)
    assert score == score2


def test_importances():
    # Check variable importances.
    X, y = datasets.make_classification(
        n_samples=2000,
        n_features=10,
        n_informative=3,
        n_redundant=0,
        n_repeated=0,
        shuffle=False,
        random_state=1,
    )

    clf = AdaBoostClassifier()

    clf.fit(X, y)
    importances = clf.feature_importances_

    assert importances.shape[0] == 10
    assert (importances[:3, np.newaxis] >= importances[3:]).all()


def test_adaboost_classifier_sample_weight_error():
    # Test that it gives proper exception on incorrect sample weight.
    clf = AdaBoostClassifier()
    msg = re.escape("sample_weight.shape == (1,), expected (6,)")
    with pytest.raises(ValueError, match=msg):
        clf.fit(X, y_class, sample_weight=np.asarray([-1]))


def test_estimator():
    # Test different estimators.
    from sklearn.ensemble import RandomForestClassifier

    # XXX doesn't work with y_class because RF doesn't support classes_
    # Shouldn't AdaBoost run a LabelBinarizer?
    clf = AdaBoostClassifier(RandomForestClassifier())
    clf.fit(X, y_regr)

    clf = AdaBoostClassifier(SVC())
    clf.fit(X, y_class)

    from sklearn.ensemble import RandomForestRegressor

    clf = AdaBoostRegressor(RandomForestRegressor(), random_state=0)
    clf.fit(X, y_regr)

    clf = AdaBoostRegressor(SVR(), random_state=0)
    clf.fit(X, y_regr)

    # Check that an empty discrete ensemble fails in fit, not predict.
    X_fail = [[1, 1], [1, 1], [1, 1], [1, 1]]
    y_fail = ["foo", "bar", 1, 2]
    clf = AdaBoostClassifier(SVC())
    with pytest.raises(ValueError, match="worse than random"):
        clf.fit(X_fail, y_fail)


def test_sample_weights_infinite():
    msg = "Sample weights have reached infinite values"
    clf = AdaBoostClassifier(n_estimators=30, learning_rate=23.0)
    with pytest.warns(UserWarning, match=msg):
        clf.fit(iris.data, iris.target)


@pytest.mark.parametrize(
    "sparse_container, expected_internal_type",
    zip(
        [
            *CSC_CONTAINERS,
            *CSR_CONTAINERS,
            *LIL_CONTAINERS,
            *COO_CONTAINERS,
            *DOK_CONTAINERS,
        ],
        CSC_CONTAINERS + 4 * CSR_CONTAINERS,
    ),
)
def test_sparse_classification(sparse_container, expected_internal_type):
    # Check classification with sparse input.

    class CustomSVC(SVC):
        """SVC variant that records the nature of the training set."""

        def fit(self, X, y, sample_weight=None):
            """Modification on fit caries data type for later verification."""
            super().fit(X, y, sample_weight=sample_weight)
            self.data_type_ = type(X)
            return self

    X, y = datasets.make_multilabel_classification(
        n_classes=1, n_samples=15, n_features=5, random_state=42
    )
    # Flatten y to a 1d array
    y = np.ravel(y)

    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)

    X_train_sparse = sparse_container(X_train)
    X_test_sparse = sparse_container(X_test)

    # Trained on sparse format
    sparse_classifier = AdaBoostClassifier(
        estimator=CustomSVC(probability=True),
        random_state=1,
    ).fit(X_train_sparse, y_train)

    # Trained on dense format
    dense_classifier = AdaBoostClassifier(
        estimator=CustomSVC(probability=True),
        random_state=1,
    ).fit(X_train, y_train)

    # predict
    sparse_clf_results = sparse_classifier.predict(X_test_sparse)
    dense_clf_results = dense_classifier.predict(X_test)
    assert_array_equal(sparse_clf_results, dense_clf_results)

    # decision_function
    sparse_clf_results = sparse_classifier.decision_function(X_test_sparse)
    dense_clf_results = dense_classifier.decision_function(X_test)
    assert_array_almost_equal(sparse_clf_results, dense_clf_results)

    # predict_log_proba
    sparse_clf_results = sparse_classifier.predict_log_proba(X_test_sparse)
    dense_clf_results = dense_classifier.predict_log_proba(X_test)
    assert_array_almost_equal(sparse_clf_results, dense_clf_results)

    # predict_proba
    sparse_clf_results = sparse_classifier.predict_proba(X_test_sparse)
    dense_clf_results = dense_classifier.predict_proba(X_test)
    assert_array_almost_equal(sparse_clf_results, dense_clf_results)

    # score
    sparse_clf_results = sparse_classifier.score(X_test_sparse, y_test)
    dense_clf_results = dense_classifier.score(X_test, y_test)
    assert_array_almost_equal(sparse_clf_results, dense_clf_results)

    # staged_decision_function
    sparse_clf_results = sparse_classifier.staged_decision_function(X_test_sparse)
    dense_clf_results = dense_classifier.staged_decision_function(X_test)
    for sparse_clf_res, dense_clf_res in zip(sparse_clf_results, dense_clf_results):
        assert_array_almost_equal(sparse_clf_res, dense_clf_res)

    # staged_predict
    sparse_clf_results = sparse_classifier.staged_predict(X_test_sparse)
    dense_clf_results = dense_classifier.staged_predict(X_test)
    for sparse_clf_res, dense_clf_res in zip(sparse_clf_results, dense_clf_results):
        assert_array_equal(sparse_clf_res, dense_clf_res)

    # staged_predict_proba
    sparse_clf_results = sparse_classifier.staged_predict_proba(X_test_sparse)
    dense_clf_results = dense_classifier.staged_predict_proba(X_test)
    for sparse_clf_res, dense_clf_res in zip(sparse_clf_results, dense_clf_results):
        assert_array_almost_equal(sparse_clf_res, dense_clf_res)

    # staged_score
    sparse_clf_results = sparse_classifier.staged_score(X_test_sparse, y_test)
    dense_clf_results = dense_classifier.staged_score(X_test, y_test)
    for sparse_clf_res, dense_clf_res in zip(sparse_clf_results, dense_clf_results):
        assert_array_equal(sparse_clf_res, dense_clf_res)

    # Verify sparsity of data is maintained during training
    types = [i.data_type_ for i in sparse_classifier.estimators_]

    assert all([t == expected_internal_type for t in types])


@pytest.mark.parametrize(
    "sparse_container, expected_internal_type",
    zip(
        [
            *CSC_CONTAINERS,
            *CSR_CONTAINERS,
            *LIL_CONTAINERS,
            *COO_CONTAINERS,
            *DOK_CONTAINERS,
        ],
        CSC_CONTAINERS + 4 * CSR_CONTAINERS,
    ),
)
def test_sparse_regression(sparse_container, expected_internal_type):
    # Check regression with sparse input.

    class CustomSVR(SVR):
        """SVR variant that records the nature of the training set."""

        def fit(self, X, y, sample_weight=None):
            """Modification on fit caries data type for later verification."""
            super().fit(X, y, sample_weight=sample_weight)
            self.data_type_ = type(X)
            return self

    X, y = datasets.make_regression(
        n_samples=15, n_features=50, n_targets=1, random_state=42
    )

    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)

    X_train_sparse = sparse_container(X_train)
    X_test_sparse = sparse_container(X_test)

    # Trained on sparse format
    sparse_regressor = AdaBoostRegressor(estimator=CustomSVR(), random_state=1).fit(
        X_train_sparse, y_train
    )

    # Trained on dense format
    dense_regressor = AdaBoostRegressor(estimator=CustomSVR(), random_state=1).fit(
        X_train, y_train
    )

    # predict
    sparse_regr_results = sparse_regressor.predict(X_test_sparse)
    dense_regr_results = dense_regressor.predict(X_test)
    assert_array_almost_equal(sparse_regr_results, dense_regr_results)

    # staged_predict
    sparse_regr_results = sparse_regressor.staged_predict(X_test_sparse)
    dense_regr_results = dense_regressor.staged_predict(X_test)
    for sparse_regr_res, dense_regr_res in zip(sparse_regr_results, dense_regr_results):
        assert_array_almost_equal(sparse_regr_res, dense_regr_res)

    types = [i.data_type_ for i in sparse_regressor.estimators_]

    assert all([t == expected_internal_type for t in types])


def test_sample_weight_adaboost_regressor():
    """
    AdaBoostRegressor should work without sample_weights in the base estimator
    The random weighted sampling is done internally in the _boost method in
    AdaBoostRegressor.
    """

    class DummyEstimator(BaseEstimator):
        def fit(self, X, y):
            pass

        def predict(self, X):
            return np.zeros(X.shape[0])

    boost = AdaBoostRegressor(DummyEstimator(), n_estimators=3)
    boost.fit(X, y_regr)
    assert len(boost.estimator_weights_) == len(boost.estimator_errors_)


def test_multidimensional_X():
    """
    Check that the AdaBoost estimators can work with n-dimensional
    data matrix
    """
    rng = np.random.RandomState(0)

    X = rng.randn(51, 3, 3)
    yc = rng.choice([0, 1], 51)
    yr = rng.randn(51)

    boost = AdaBoostClassifier(DummyClassifier(strategy="most_frequent"))
    boost.fit(X, yc)
    boost.predict(X)
    boost.predict_proba(X)

    boost = AdaBoostRegressor(DummyRegressor())
    boost.fit(X, yr)
    boost.predict(X)


def test_adaboostclassifier_without_sample_weight():
    X, y = iris.data, iris.target
    estimator = NoSampleWeightWrapper(DummyClassifier())
    clf = AdaBoostClassifier(estimator=estimator)
    err_msg = "{} doesn't support sample_weight".format(estimator.__class__.__name__)
    with pytest.raises(ValueError, match=err_msg):
        clf.fit(X, y)


def test_adaboostregressor_sample_weight():
    # check that giving weight will have an influence on the error computed
    # for a weak learner
    rng = np.random.RandomState(42)
    X = np.linspace(0, 100, num=1000)
    y = (0.8 * X + 0.2) + (rng.rand(X.shape[0]) * 0.0001)
    X = X.reshape(-1, 1)

    # add an arbitrary outlier
    X[-1] *= 10
    y[-1] = 10000

    # random_state=0 ensure that the underlying bootstrap will use the outlier
    regr_no_outlier = AdaBoostRegressor(
        estimator=LinearRegression(), n_estimators=1, random_state=0
    )
    regr_with_weight = clone(regr_no_outlier)
    regr_with_outlier = clone(regr_no_outlier)

    # fit 3 models:
    # - a model containing the outlier
    # - a model without the outlier
    # - a model containing the outlier but with a null sample-weight
    regr_with_outlier.fit(X, y)
    regr_no_outlier.fit(X[:-1], y[:-1])
    sample_weight = np.ones_like(y)
    sample_weight[-1] = 0
    regr_with_weight.fit(X, y, sample_weight=sample_weight)

    score_with_outlier = regr_with_outlier.score(X[:-1], y[:-1])
    score_no_outlier = regr_no_outlier.score(X[:-1], y[:-1])
    score_with_weight = regr_with_weight.score(X[:-1], y[:-1])

    assert score_with_outlier < score_no_outlier
    assert score_with_outlier < score_with_weight
    assert score_no_outlier == pytest.approx(score_with_weight)


def test_adaboost_consistent_predict():
    # check that predict_proba and predict give consistent results
    # regression test for:
    # https://github.com/scikit-learn/scikit-learn/issues/14084
    X_train, X_test, y_train, y_test = train_test_split(
        *datasets.load_digits(return_X_y=True), random_state=42
    )
    model = AdaBoostClassifier(random_state=42)
    model.fit(X_train, y_train)

    assert_array_equal(
        np.argmax(model.predict_proba(X_test), axis=1), model.predict(X_test)
    )


@pytest.mark.parametrize(
    "model, X, y",
    [
        (AdaBoostClassifier(), iris.data, iris.target),
        (AdaBoostRegressor(), diabetes.data, diabetes.target),
    ],
)
def test_adaboost_negative_weight_error(model, X, y):
    sample_weight = np.ones_like(y)
    sample_weight[-1] = -10

    err_msg = "Negative values in data passed to `sample_weight`"
    with pytest.raises(ValueError, match=err_msg):
        model.fit(X, y, sample_weight=sample_weight)


def test_adaboost_numerically_stable_feature_importance_with_small_weights():
    """Check that we don't create NaN feature importance with numerically
    instable inputs.

    Non-regression test for:
    https://github.com/scikit-learn/scikit-learn/issues/20320
    """
    rng = np.random.RandomState(42)
    X = rng.normal(size=(1000, 10))
    y = rng.choice([0, 1], size=1000)
    sample_weight = np.ones_like(y) * 1e-263
    tree = DecisionTreeClassifier(max_depth=10, random_state=12)
    ada_model = AdaBoostClassifier(estimator=tree, n_estimators=20, random_state=12)
    ada_model.fit(X, y, sample_weight=sample_weight)
    assert np.isnan(ada_model.feature_importances_).sum() == 0


def test_adaboost_decision_function(global_random_seed):
    """Check that the decision function respects the symmetric constraint for weak
    learners.

    Non-regression test for:
    https://github.com/scikit-learn/scikit-learn/issues/26520
    """
    n_classes = 3
    X, y = datasets.make_classification(
        n_classes=n_classes, n_clusters_per_class=1, random_state=global_random_seed
    )
    clf = AdaBoostClassifier(n_estimators=1, random_state=global_random_seed).fit(X, y)

    y_score = clf.decision_function(X)
    assert_allclose(y_score.sum(axis=1), 0, atol=1e-8)

    # With a single learner, we expect to have a decision function in
    # {1, - 1 / (n_classes - 1)}.
    assert set(np.unique(y_score)) == {1, -1 / (n_classes - 1)}

    # We can assert the same for staged_decision_function since we have a single learner
    for y_score in clf.staged_decision_function(X):
        assert_allclose(y_score.sum(axis=1), 0, atol=1e-8)

        # With a single learner, we expect to have a decision function in
        # {1, - 1 / (n_classes - 1)}.
        assert set(np.unique(y_score)) == {1, -1 / (n_classes - 1)}

    clf.set_params(n_estimators=5).fit(X, y)

    y_score = clf.decision_function(X)
    assert_allclose(y_score.sum(axis=1), 0, atol=1e-8)

    for y_score in clf.staged_decision_function(X):
        assert_allclose(y_score.sum(axis=1), 0, atol=1e-8)


# TODO(1.8): remove
def test_deprecated_algorithm():
    adaboost_clf = AdaBoostClassifier(n_estimators=1, algorithm="SAMME")
    with pytest.warns(FutureWarning, match="The parameter 'algorithm' is deprecated"):
        adaboost_clf.fit(X, y_class)
