import numpy as np

from skimage._shared.testing import assert_array_almost_equal, assert_equal
from skimage import color, data, img_as_float
from skimage.filters import threshold_local, gaussian
from skimage.util.apply_parallel import apply_parallel

import pytest

da = pytest.importorskip('dask.array')


def test_apply_parallel():
    # data
    a = np.arange(144).reshape(12, 12).astype(float)

    # apply the filter
    expected1 = threshold_local(a, 3)
    result1 = apply_parallel(
        threshold_local,
        a,
        chunks=(6, 6),
        depth=5,
        extra_arguments=(3,),
        extra_keywords={'mode': 'reflect'},
    )

    assert_array_almost_equal(result1, expected1)

    def wrapped_gauss(arr):
        return gaussian(arr, sigma=1, mode='reflect')

    expected2 = gaussian(a, sigma=1, mode='reflect')
    result2 = apply_parallel(wrapped_gauss, a, chunks=(6, 6), depth=5)

    assert_array_almost_equal(result2, expected2)

    expected3 = gaussian(a, sigma=1, mode='reflect')
    result3 = apply_parallel(
        wrapped_gauss, da.from_array(a, chunks=(6, 6)), depth=5, compute=True
    )

    assert isinstance(result3, np.ndarray)
    assert_array_almost_equal(result3, expected3)


def test_apply_parallel_lazy():
    # data
    a = np.arange(144).reshape(12, 12).astype(float)
    d = da.from_array(a, chunks=(6, 6))

    # apply the filter
    expected1 = threshold_local(a, 3)
    result1 = apply_parallel(
        threshold_local,
        a,
        chunks=(6, 6),
        depth=5,
        extra_arguments=(3,),
        extra_keywords={'mode': 'reflect'},
        compute=False,
    )

    # apply the filter on a Dask Array
    result2 = apply_parallel(
        threshold_local,
        d,
        depth=5,
        extra_arguments=(3,),
        extra_keywords={'mode': 'reflect'},
    )

    assert isinstance(result1, da.Array)

    assert_array_almost_equal(result1.compute(), expected1)

    assert isinstance(result2, da.Array)

    assert_array_almost_equal(result2.compute(), expected1)


def test_no_chunks():
    a = np.ones(1 * 4 * 8 * 9).reshape(1, 4, 8, 9)

    def add_42(arr):
        return arr + 42

    expected = add_42(a)
    result = apply_parallel(add_42, a)

    assert_array_almost_equal(result, expected)


def test_apply_parallel_wrap():
    def wrapped(arr):
        return gaussian(arr, sigma=1, mode='wrap')

    a = np.arange(144).reshape(12, 12).astype(float)
    expected = gaussian(a, sigma=1, mode='wrap')
    result = apply_parallel(wrapped, a, chunks=(6, 6), depth=5, mode='wrap')

    assert_array_almost_equal(result, expected)


def test_apply_parallel_nearest():
    def wrapped(arr):
        return gaussian(arr, sigma=1, mode='nearest')

    a = np.arange(144).reshape(12, 12).astype(float)
    expected = gaussian(a, sigma=1, mode='nearest')
    result = apply_parallel(
        wrapped, a, chunks=(6, 6), depth={0: 5, 1: 5}, mode='nearest'
    )

    assert_array_almost_equal(result, expected)


@pytest.mark.parametrize('dtype', (np.float32, np.float64))
@pytest.mark.parametrize('chunks', (None, (128, 128, 3)))
@pytest.mark.parametrize('depth', (0, 8, (8, 8, 0)))
def test_apply_parallel_rgb(depth, chunks, dtype):
    cat = data.chelsea().astype(dtype) / 255.0

    func = color.rgb2ycbcr
    cat_ycbcr_expected = func(cat)
    cat_ycbcr = apply_parallel(
        func, cat, chunks=chunks, depth=depth, dtype=dtype, channel_axis=-1
    )

    assert_equal(cat_ycbcr.dtype, cat.dtype)

    assert_array_almost_equal(cat_ycbcr_expected, cat_ycbcr)


@pytest.mark.parametrize('chunks', (None, (128, 256), 'ndim'))
@pytest.mark.parametrize('depth', (0, 8, (8, 16), 'ndim'))
@pytest.mark.parametrize('channel_axis', (0, 1, 2, -1, -2, -3))
def test_apply_parallel_rgb_channel_axis(depth, chunks, channel_axis):
    """Test channel_axis combinations.

    For depth and chunks, test in three ways:
    1.) scalar (to be applied over all axes)
    2.) tuple of length ``image.ndim - 1`` corresponding to spatial axes
    3.) tuple of length ``image.ndim`` corresponding to all axes
    """
    cat = img_as_float(data.chelsea())

    func = color.rgb2ycbcr
    cat_ycbcr_expected = func(cat, channel_axis=-1)

    # move channel axis to another position
    cat = np.moveaxis(cat, -1, channel_axis)
    if chunks == 'ndim':
        # explicitly specify the chunksize for the channel axis
        chunks = [128, 128]
        chunks.insert(channel_axis % cat.ndim, cat.shape[channel_axis])
    if depth == 'ndim':
        # explicitly specify the depth for the channel axis
        depth = [8, 8]
        depth.insert(channel_axis % cat.ndim, 0)
    cat_ycbcr = apply_parallel(
        func,
        cat,
        chunks=chunks,
        depth=depth,
        dtype=cat.dtype,
        channel_axis=channel_axis,
        extra_keywords=dict(channel_axis=channel_axis),
    )
    # move channels of output back to the last dimension
    cat_ycbcr = np.moveaxis(cat_ycbcr, channel_axis, -1)

    assert_array_almost_equal(cat_ycbcr_expected, cat_ycbcr)
