from __future__ import annotations

from itertools import chain, product
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Iterable,
    Iterator,
    Literal,
    Mapping,
    Sequence,
    cast,
    overload,
)

import numpy as np

from narwhals._compliant import EagerDataFrame
from narwhals._pandas_like.series import PANDAS_TO_NUMPY_DTYPE_MISSING, PandasLikeSeries
from narwhals._pandas_like.utils import (
    align_and_extract_native,
    align_series_full_broadcast,
    check_column_names_are_unique,
    get_dtype_backend,
    native_to_narwhals_dtype,
    object_native_to_narwhals_dtype,
    rename,
    select_columns_by_name,
    set_index,
)
from narwhals._utils import (
    Implementation,
    _into_arrow_table,
    _remap_full_join_keys,
    exclude_column_names,
    generate_temporary_column_name,
    parse_columns_to_drop,
    parse_version,
    scale_bytes,
    validate_backend_version,
)
from narwhals.dependencies import is_pandas_like_dataframe
from narwhals.exceptions import InvalidOperationError, ShapeError

if TYPE_CHECKING:
    from io import BytesIO
    from pathlib import Path
    from types import ModuleType

    import pandas as pd
    import polars as pl
    from typing_extensions import Self, TypeAlias, TypeIs

    from narwhals._compliant.typing import CompliantDataFrameAny, CompliantLazyFrameAny
    from narwhals._pandas_like.expr import PandasLikeExpr
    from narwhals._pandas_like.group_by import PandasLikeGroupBy
    from narwhals._pandas_like.namespace import PandasLikeNamespace
    from narwhals._translate import IntoArrowTable
    from narwhals._utils import Version, _FullContext
    from narwhals.dtypes import DType
    from narwhals.schema import Schema
    from narwhals.typing import (
        AsofJoinStrategy,
        DTypeBackend,
        JoinStrategy,
        PivotAgg,
        SizedMultiIndexSelector,
        SizedMultiNameSelector,
        SizeUnit,
        UniqueKeepStrategy,
        _2DArray,
        _SliceIndex,
        _SliceName,
    )

    Constructor: TypeAlias = Callable[..., pd.DataFrame]


CLASSICAL_NUMPY_DTYPES: frozenset[np.dtype[Any]] = frozenset(
    [
        np.dtype("float64"),
        np.dtype("float32"),
        np.dtype("int64"),
        np.dtype("int32"),
        np.dtype("int16"),
        np.dtype("int8"),
        np.dtype("uint64"),
        np.dtype("uint32"),
        np.dtype("uint16"),
        np.dtype("uint8"),
        np.dtype("bool"),
        np.dtype("datetime64[s]"),
        np.dtype("datetime64[ms]"),
        np.dtype("datetime64[us]"),
        np.dtype("datetime64[ns]"),
        np.dtype("timedelta64[s]"),
        np.dtype("timedelta64[ms]"),
        np.dtype("timedelta64[us]"),
        np.dtype("timedelta64[ns]"),
        np.dtype("object"),
    ]
)


class PandasLikeDataFrame(EagerDataFrame["PandasLikeSeries", "PandasLikeExpr", "Any"]):
    def __init__(
        self,
        native_dataframe: Any,
        *,
        implementation: Implementation,
        backend_version: tuple[int, ...],
        version: Version,
        validate_column_names: bool,
    ) -> None:
        self._native_frame = native_dataframe
        self._implementation = implementation
        self._backend_version = backend_version
        self._version = version
        validate_backend_version(self._implementation, self._backend_version)
        if validate_column_names:
            check_column_names_are_unique(native_dataframe.columns)

    @classmethod
    def from_arrow(cls, data: IntoArrowTable, /, *, context: _FullContext) -> Self:
        implementation = context._implementation
        tbl = _into_arrow_table(data, context)
        if implementation.is_pandas():
            native = tbl.to_pandas()
        elif implementation.is_modin():  # pragma: no cover
            from modin.pandas.utils import (
                from_arrow as mpd_from_arrow,  # pyright: ignore[reportAttributeAccessIssue]
            )

            native = mpd_from_arrow(tbl)
        elif implementation.is_cudf():  # pragma: no cover
            native = implementation.to_native_namespace().DataFrame.from_arrow(tbl)
        else:  # pragma: no cover
            msg = "congratulations, you entered unreachable code - please report a bug"
            raise AssertionError(msg)
        return cls.from_native(native, context=context)

    @classmethod
    def from_dict(
        cls,
        data: Mapping[str, Any],
        /,
        *,
        context: _FullContext,
        schema: Mapping[str, DType] | Schema | None,
    ) -> Self:
        from narwhals.schema import Schema

        implementation = context._implementation
        ns = implementation.to_native_namespace()
        Series = cast("type[pd.Series[Any]]", ns.Series)  # noqa: N806
        DataFrame = cast("type[pd.DataFrame]", ns.DataFrame)  # noqa: N806
        aligned_data: dict[str, pd.Series[Any] | Any] = {}
        left_most: PandasLikeSeries | None = None
        for name, series in data.items():
            if isinstance(series, Series):
                compliant = PandasLikeSeries.from_native(series, context=context)
                if left_most is None:
                    left_most = compliant
                    aligned_data[name] = series
                else:
                    aligned_data[name] = align_and_extract_native(left_most, compliant)[1]
            else:
                aligned_data[name] = series

        native = DataFrame.from_dict(aligned_data)
        if schema:
            it: Iterable[DTypeBackend] = (
                get_dtype_backend(dtype, implementation) for dtype in native.dtypes
            )
            native = native.astype(Schema(schema).to_pandas(it))
        return cls.from_native(native, context=context)

    @staticmethod
    def _is_native(obj: Any) -> TypeIs[Any]:
        return is_pandas_like_dataframe(obj)  # pragma: no cover

    @classmethod
    def from_native(cls, data: Any, /, *, context: _FullContext) -> Self:
        return cls(
            data,
            implementation=context._implementation,
            backend_version=context._backend_version,
            version=context._version,
            validate_column_names=True,
        )

    @classmethod
    def from_numpy(
        cls,
        data: _2DArray,
        /,
        *,
        context: _FullContext,
        schema: Mapping[str, DType] | Schema | Sequence[str] | None,
    ) -> Self:
        from narwhals.schema import Schema

        implementation = context._implementation
        DataFrame: Constructor = implementation.to_native_namespace().DataFrame  # noqa: N806
        if isinstance(schema, (Mapping, Schema)):
            it: Iterable[DTypeBackend] = (
                get_dtype_backend(native_type, implementation)
                for native_type in schema.values()
            )
            native = DataFrame(data, columns=schema.keys()).astype(
                Schema(schema).to_pandas(it)
            )
        else:
            native = DataFrame(data, columns=cls._numpy_column_names(data, schema))
        return cls.from_native(native, context=context)

    def __narwhals_dataframe__(self) -> Self:
        return self

    def __narwhals_lazyframe__(self) -> Self:
        return self

    def __narwhals_namespace__(self) -> PandasLikeNamespace:
        from narwhals._pandas_like.namespace import PandasLikeNamespace

        return PandasLikeNamespace(
            self._implementation, self._backend_version, version=self._version
        )

    def __native_namespace__(self) -> ModuleType:
        if self._implementation in {
            Implementation.PANDAS,
            Implementation.MODIN,
            Implementation.CUDF,
        }:
            return self._implementation.to_native_namespace()

        msg = f"Expected pandas/modin/cudf, got: {type(self._implementation)}"  # pragma: no cover
        raise AssertionError(msg)

    def __len__(self) -> int:
        return len(self.native)

    def _with_version(self, version: Version) -> Self:
        return self.__class__(
            self.native,
            implementation=self._implementation,
            backend_version=self._backend_version,
            version=version,
            validate_column_names=False,
        )

    def _with_native(self, df: Any, *, validate_column_names: bool = True) -> Self:
        return self.__class__(
            df,
            implementation=self._implementation,
            backend_version=self._backend_version,
            version=self._version,
            validate_column_names=validate_column_names,
        )

    def _extract_comparand(self, other: PandasLikeSeries) -> pd.Series[Any]:
        index = self.native.index
        if other._broadcast:
            s = other.native
            return type(s)(s.iloc[0], index=index, dtype=s.dtype, name=s.name)
        if (len_other := len(other)) != (len_idx := len(index)):
            msg = f"Expected object of length {len_idx}, got: {len_other}."
            raise ShapeError(msg)
        if other.native.index is not index:
            return set_index(
                other.native,
                index,
                implementation=other._implementation,
                backend_version=other._backend_version,
            )
        return other.native

    def get_column(self, name: str) -> PandasLikeSeries:
        return PandasLikeSeries.from_native(self.native[name], context=self)

    def __array__(self, dtype: Any = None, *, copy: bool | None = None) -> _2DArray:
        return self.to_numpy(dtype=dtype, copy=copy)

    def _gather(self, rows: SizedMultiIndexSelector[pd.Series[Any]]) -> Self:
        items = list(rows) if isinstance(rows, tuple) else rows
        return self._with_native(self.native.iloc[items, :])

    def _gather_slice(self, rows: _SliceIndex | range) -> Self:
        return self._with_native(
            self.native.iloc[slice(rows.start, rows.stop, rows.step), :],
            validate_column_names=False,
        )

    def _select_slice_name(self, columns: _SliceName) -> Self:
        start = (
            self.native.columns.get_loc(columns.start)
            if columns.start is not None
            else None
        )
        stop = (
            self.native.columns.get_loc(columns.stop) + 1
            if columns.stop is not None
            else None
        )
        selector = slice(start, stop, columns.step)
        return self._with_native(
            self.native.iloc[:, selector], validate_column_names=False
        )

    def _select_slice_index(self, columns: _SliceIndex | range) -> Self:
        return self._with_native(
            self.native.iloc[:, columns], validate_column_names=False
        )

    def _select_multi_index(
        self, columns: SizedMultiIndexSelector[pd.Series[Any]]
    ) -> Self:
        columns = list(columns) if isinstance(columns, tuple) else columns
        return self._with_native(
            self.native.iloc[:, columns], validate_column_names=False
        )

    def _select_multi_name(
        self, columns: SizedMultiNameSelector[pd.Series[Any]]
    ) -> PandasLikeDataFrame:
        return self._with_native(self.native.loc[:, columns])

    # --- properties ---
    @property
    def columns(self) -> list[str]:
        return self.native.columns.tolist()

    @overload
    def rows(self, *, named: Literal[True]) -> list[dict[str, Any]]: ...

    @overload
    def rows(self, *, named: Literal[False]) -> list[tuple[Any, ...]]: ...

    @overload
    def rows(self, *, named: bool) -> list[tuple[Any, ...]] | list[dict[str, Any]]: ...

    def rows(self, *, named: bool) -> list[tuple[Any, ...]] | list[dict[str, Any]]:
        if not named:
            # cuDF does not support itertuples. But it does support to_dict!
            if self._implementation is Implementation.CUDF:
                # Extract the row values from the named rows
                return [tuple(row.values()) for row in self.rows(named=True)]

            return list(self.native.itertuples(index=False, name=None))

        return self.native.to_dict(orient="records")

    def iter_columns(self) -> Iterator[PandasLikeSeries]:
        for _name, series in self.native.items():  # noqa: PERF102
            yield PandasLikeSeries.from_native(series, context=self)

    _iter_columns = iter_columns

    def iter_rows(
        self, *, named: bool, buffer_size: int
    ) -> Iterator[tuple[Any, ...]] | Iterator[dict[str, Any]]:
        # The param ``buffer_size`` is only here for compatibility with the Polars API
        # and has no effect on the output.
        if not named:
            yield from self.native.itertuples(index=False, name=None)
        else:
            col_names = self.native.columns
            for row in self.native.itertuples(index=False):
                yield dict(zip(col_names, row))

    @property
    def schema(self) -> dict[str, DType]:
        native_dtypes = self.native.dtypes
        return {
            col: native_to_narwhals_dtype(
                native_dtypes[col], self._version, self._implementation
            )
            if native_dtypes[col] != "object"
            else object_native_to_narwhals_dtype(
                self.native[col], self._version, self._implementation
            )
            for col in self.native.columns
        }

    def collect_schema(self) -> dict[str, DType]:
        return self.schema

    # --- reshape ---
    def simple_select(self, *column_names: str) -> Self:
        return self._with_native(
            select_columns_by_name(
                self.native,
                list(column_names),
                self._backend_version,
                self._implementation,
            ),
            validate_column_names=False,
        )

    def select(self: PandasLikeDataFrame, *exprs: PandasLikeExpr) -> PandasLikeDataFrame:
        new_series = self._evaluate_into_exprs(*exprs)
        if not new_series:
            # return empty dataframe, like Polars does
            return self._with_native(self.native.__class__(), validate_column_names=False)
        new_series = align_series_full_broadcast(*new_series)
        namespace = self.__narwhals_namespace__()
        df = namespace._concat_horizontal([s.native for s in new_series])
        # `concat` creates a new object, so fine to modify `.columns.name` inplace.
        df.columns.name = self.native.columns.name
        return self._with_native(df, validate_column_names=True)

    def drop_nulls(
        self: PandasLikeDataFrame, subset: Sequence[str] | None
    ) -> PandasLikeDataFrame:
        if subset is None:
            return self._with_native(
                self.native.dropna(axis=0), validate_column_names=False
            )
        plx = self.__narwhals_namespace__()
        return self.filter(~plx.any_horizontal(plx.col(*subset).is_null()))

    def estimated_size(self, unit: SizeUnit) -> int | float:
        sz = self.native.memory_usage(deep=True).sum()
        return scale_bytes(sz, unit=unit)

    def with_row_index(self, name: str) -> Self:
        frame = self.native
        namespace = self.__narwhals_namespace__()
        row_index = namespace._series.from_iterable(
            range(len(frame)), context=self, index=frame.index
        ).alias(name)
        return self._with_native(namespace._concat_horizontal([row_index.native, frame]))

    def row(self, index: int) -> tuple[Any, ...]:
        return tuple(x for x in self.native.iloc[index])

    def filter(
        self: PandasLikeDataFrame, predicate: PandasLikeExpr | list[bool]
    ) -> PandasLikeDataFrame:
        if isinstance(predicate, list):
            mask_native: pd.Series[Any] | list[bool] = predicate
        else:
            # `[0]` is safe as the predicate's expression only returns a single column
            mask = self._evaluate_into_exprs(predicate)[0]
            mask_native = self._extract_comparand(mask)
        return self._with_native(
            self.native.loc[mask_native], validate_column_names=False
        )

    def with_columns(
        self: PandasLikeDataFrame, *exprs: PandasLikeExpr
    ) -> PandasLikeDataFrame:
        columns = self._evaluate_into_exprs(*exprs)
        if not columns and len(self) == 0:
            return self
        name_columns: dict[str, PandasLikeSeries] = {s.name: s for s in columns}
        to_concat = []
        # Make sure to preserve column order
        for name in self.native.columns:
            if name in name_columns:
                series = self._extract_comparand(name_columns.pop(name))
            else:
                series = self.native[name]
            to_concat.append(series)
        to_concat.extend(self._extract_comparand(s) for s in name_columns.values())
        namespace = self.__narwhals_namespace__()
        df = namespace._concat_horizontal(to_concat)
        # `concat` creates a new object, so fine to modify `.columns.name` inplace.
        df.columns.name = self.native.columns.name
        return self._with_native(df, validate_column_names=False)

    def rename(self, mapping: Mapping[str, str]) -> Self:
        return self._with_native(
            rename(
                self.native,
                columns=mapping,
                implementation=self._implementation,
                backend_version=self._backend_version,
            )
        )

    def drop(self, columns: Sequence[str], *, strict: bool) -> Self:
        to_drop = parse_columns_to_drop(self, columns, strict=strict)
        return self._with_native(
            self.native.drop(columns=to_drop), validate_column_names=False
        )

    # --- transform ---
    def sort(self, *by: str, descending: bool | Sequence[bool], nulls_last: bool) -> Self:
        df = self.native
        if isinstance(descending, bool):
            ascending: bool | list[bool] = not descending
        else:
            ascending = [not d for d in descending]
        na_position = "last" if nulls_last else "first"
        return self._with_native(
            df.sort_values(list(by), ascending=ascending, na_position=na_position),
            validate_column_names=False,
        )

    # --- convert ---
    def collect(
        self, backend: Implementation | None, **kwargs: Any
    ) -> CompliantDataFrameAny:
        if backend is None:
            return PandasLikeDataFrame(
                self.native,
                implementation=self._implementation,
                backend_version=self._backend_version,
                version=self._version,
                validate_column_names=False,
            )

        if backend is Implementation.PANDAS:
            import pandas as pd  # ignore-banned-import

            return PandasLikeDataFrame(
                self.to_pandas(),
                implementation=Implementation.PANDAS,
                backend_version=parse_version(pd),
                version=self._version,
                validate_column_names=False,
            )

        if backend is Implementation.PYARROW:
            import pyarrow as pa  # ignore-banned-import

            from narwhals._arrow.dataframe import ArrowDataFrame

            return ArrowDataFrame(
                native_dataframe=self.to_arrow(),
                backend_version=parse_version(pa),
                version=self._version,
                validate_column_names=False,
            )

        if backend is Implementation.POLARS:
            import polars as pl  # ignore-banned-import

            from narwhals._polars.dataframe import PolarsDataFrame

            return PolarsDataFrame(
                df=self.to_polars(),
                backend_version=parse_version(pl),
                version=self._version,
            )

        msg = f"Unsupported `backend` value: {backend}"  # pragma: no cover
        raise ValueError(msg)  # pragma: no cover

    # --- actions ---
    def group_by(
        self, keys: Sequence[str] | Sequence[PandasLikeExpr], *, drop_null_keys: bool
    ) -> PandasLikeGroupBy:
        from narwhals._pandas_like.group_by import PandasLikeGroupBy

        return PandasLikeGroupBy(self, keys, drop_null_keys=drop_null_keys)

    def join(  # noqa: C901, PLR0911, PLR0912
        self,
        other: Self,
        *,
        how: JoinStrategy,
        left_on: Sequence[str] | None,
        right_on: Sequence[str] | None,
        suffix: str,
    ) -> Self:
        if how == "cross":
            if (
                self._implementation is Implementation.MODIN
                or self._implementation is Implementation.CUDF
            ) or (
                self._implementation is Implementation.PANDAS
                and self._backend_version < (1, 4)
            ):
                key_token = generate_temporary_column_name(
                    n_bytes=8, columns=[*self.columns, *other.columns]
                )

                return self._with_native(
                    self.native.assign(**{key_token: 0})
                    .merge(
                        other.native.assign(**{key_token: 0}),
                        how="inner",
                        left_on=key_token,
                        right_on=key_token,
                        suffixes=("", suffix),
                    )
                    .drop(columns=key_token)
                )
            else:
                return self._with_native(
                    self.native.merge(other.native, how="cross", suffixes=("", suffix))
                )

        if how == "anti":
            if self._implementation is Implementation.CUDF:
                return self._with_native(
                    self.native.merge(
                        other.native, how="leftanti", left_on=left_on, right_on=right_on
                    )
                )
            else:
                indicator_token = generate_temporary_column_name(
                    n_bytes=8, columns=[*self.columns, *other.columns]
                )
                if right_on is None:  # pragma: no cover
                    msg = "`right_on` cannot be `None` in anti-join"
                    raise TypeError(msg)

                # rename to avoid creating extra columns in join
                other_native = rename(
                    select_columns_by_name(
                        other.native,
                        list(right_on),
                        self._backend_version,
                        self._implementation,
                    ),
                    columns=dict(zip(right_on, left_on)),  # type: ignore[arg-type]
                    implementation=self._implementation,
                    backend_version=self._backend_version,
                ).drop_duplicates()
                return self._with_native(
                    self.native.merge(
                        other_native,
                        how="outer",
                        indicator=indicator_token,
                        left_on=left_on,
                        right_on=left_on,
                    )
                    .loc[lambda t: t[indicator_token] == "left_only"]
                    .drop(columns=indicator_token)
                )

        if how == "semi":
            if right_on is None:  # pragma: no cover
                msg = "`right_on` cannot be `None` in semi-join"
                raise TypeError(msg)
            # rename to avoid creating extra columns in join
            other_native = (
                rename(
                    select_columns_by_name(
                        other.native,
                        list(right_on),
                        self._backend_version,
                        self._implementation,
                    ),
                    columns=dict(zip(right_on, left_on)),  # type: ignore[arg-type]
                    implementation=self._implementation,
                    backend_version=self._backend_version,
                ).drop_duplicates()  # avoids potential rows duplication from inner join
            )
            return self._with_native(
                self.native.merge(
                    other_native, how="inner", left_on=left_on, right_on=left_on
                )
            )

        if how == "left":
            result_native = self.native.merge(
                other.native,
                how="left",
                left_on=left_on,
                right_on=right_on,
                suffixes=("", suffix),
            )
            extra = []
            for left_key, right_key in zip(left_on, right_on):  # type: ignore[arg-type]
                if right_key != left_key and right_key not in self.columns:
                    extra.append(right_key)
                elif right_key != left_key:
                    extra.append(f"{right_key}{suffix}")
            return self._with_native(result_native.drop(columns=extra))

        if how == "full":
            # Pandas coalesces keys in full joins unless there's no collision

            # help mypy
            assert left_on is not None  # noqa: S101
            assert right_on is not None  # noqa: S101

            right_on_mapper = _remap_full_join_keys(left_on, right_on, suffix)
            other_native = other.native.rename(columns=right_on_mapper)
            check_column_names_are_unique(other_native.columns)
            right_on = list(right_on_mapper.values())  # we now have the suffixed keys
            return self._with_native(
                self.native.merge(
                    other_native,
                    left_on=left_on,
                    right_on=right_on,
                    how="outer",
                    suffixes=("", suffix),
                )
            )

        return self._with_native(
            self.native.merge(
                other.native,
                left_on=left_on,
                right_on=right_on,
                how=how,
                suffixes=("", suffix),
            )
        )

    def join_asof(
        self,
        other: Self,
        *,
        left_on: str,
        right_on: str,
        by_left: Sequence[str] | None,
        by_right: Sequence[str] | None,
        strategy: AsofJoinStrategy,
        suffix: str,
    ) -> Self:
        plx = self.__native_namespace__()
        return self._with_native(
            plx.merge_asof(
                self.native,
                other.native,
                left_on=left_on,
                right_on=right_on,
                left_by=by_left,
                right_by=by_right,
                direction=strategy,
                suffixes=("", suffix),
            )
        )

    # --- partial reduction ---

    def head(self, n: int) -> Self:
        return self._with_native(self.native.head(n), validate_column_names=False)

    def tail(self, n: int) -> Self:
        return self._with_native(self.native.tail(n), validate_column_names=False)

    def unique(
        self,
        subset: Sequence[str] | None,
        *,
        keep: UniqueKeepStrategy,
        maintain_order: bool | None = None,
    ) -> Self:
        # The param `maintain_order` is only here for compatibility with the Polars API
        # and has no effect on the output.
        mapped_keep = {"none": False, "any": "first"}.get(keep, keep)
        if subset and (error := self._check_columns_exist(subset)):
            raise error
        return self._with_native(
            self.native.drop_duplicates(subset=subset, keep=mapped_keep),
            validate_column_names=False,
        )

    # --- lazy-only ---
    def lazy(self, *, backend: Implementation | None = None) -> CompliantLazyFrameAny:
        from narwhals.utils import parse_version

        pandas_df = self.to_pandas()
        if backend is None:
            return self
        elif backend is Implementation.DUCKDB:
            import duckdb  # ignore-banned-import

            from narwhals._duckdb.dataframe import DuckDBLazyFrame

            return DuckDBLazyFrame(
                df=duckdb.table("pandas_df"),
                backend_version=parse_version(duckdb),
                version=self._version,
            )
        elif backend is Implementation.POLARS:
            import polars as pl  # ignore-banned-import

            from narwhals._polars.dataframe import PolarsLazyFrame

            return PolarsLazyFrame(
                df=pl.from_pandas(pandas_df).lazy(),
                backend_version=parse_version(pl),
                version=self._version,
            )
        elif backend is Implementation.DASK:
            import dask  # ignore-banned-import
            import dask.dataframe as dd  # ignore-banned-import

            from narwhals._dask.dataframe import DaskLazyFrame

            return DaskLazyFrame(
                native_dataframe=dd.from_pandas(pandas_df),
                backend_version=parse_version(dask),
                version=self._version,
            )
        raise AssertionError  # pragma: no cover

    @property
    def shape(self) -> tuple[int, int]:
        return self.native.shape

    def to_dict(self, *, as_series: bool) -> dict[str, Any]:
        if as_series:
            return {
                col: PandasLikeSeries.from_native(self.native[col], context=self)
                for col in self.columns
            }
        return self.native.to_dict(orient="list")

    def to_numpy(self, dtype: Any = None, *, copy: bool | None = None) -> _2DArray:
        native_dtypes = self.native.dtypes

        if copy is None:
            # pandas default differs from Polars, but cuDF default is True
            copy = self._implementation is Implementation.CUDF

        if native_dtypes.isin(CLASSICAL_NUMPY_DTYPES).all():
            # Fast path, no conversions necessary.
            if dtype is not None:
                return self.native.to_numpy(dtype=dtype, copy=copy)
            return self.native.to_numpy(copy=copy)

        dtype_datetime = self._version.dtypes.Datetime
        to_convert = [
            key
            for key, val in self.schema.items()
            if isinstance(val, dtype_datetime) and val.time_zone is not None
        ]
        if to_convert:
            df = self.with_columns(
                self.__narwhals_namespace__()
                .col(*to_convert)
                .dt.convert_time_zone("UTC")
                .dt.replace_time_zone(None)
            ).native
        else:
            df = self.native

        if dtype is not None:
            return df.to_numpy(dtype=dtype, copy=copy)

        # pandas return `object` dtype for nullable dtypes if dtype=None,
        # so we cast each Series to numpy and let numpy find a common dtype.
        # If there aren't any dtypes where `to_numpy()` is "broken" (i.e. it
        # returns Object) then we just call `to_numpy()` on the DataFrame.
        for col_dtype in native_dtypes:
            if str(col_dtype) in PANDAS_TO_NUMPY_DTYPE_MISSING:
                import numpy as np

                arr: Any = np.hstack(
                    [
                        self.get_column(col).to_numpy(copy=copy, dtype=None)[:, None]
                        for col in self.columns
                    ]
                )
                return arr
        return df.to_numpy(copy=copy)

    def to_pandas(self) -> pd.DataFrame:
        if self._implementation is Implementation.PANDAS:
            return self.native
        elif self._implementation is Implementation.CUDF:
            return self.native.to_pandas()
        elif self._implementation is Implementation.MODIN:
            return self.native._to_pandas()
        msg = f"Unknown implementation: {self._implementation}"  # pragma: no cover
        raise AssertionError(msg)

    def to_polars(self) -> pl.DataFrame:
        import polars as pl  # ignore-banned-import

        return pl.from_pandas(self.to_pandas())

    def write_parquet(self, file: str | Path | BytesIO) -> None:
        self.native.to_parquet(file)

    @overload
    def write_csv(self, file: None) -> str: ...

    @overload
    def write_csv(self, file: str | Path | BytesIO) -> None: ...

    def write_csv(self, file: str | Path | BytesIO | None) -> str | None:
        return self.native.to_csv(file, index=False)

    # --- descriptive ---
    def is_unique(self) -> PandasLikeSeries:
        return PandasLikeSeries.from_native(
            ~self.native.duplicated(keep=False), context=self
        )

    def item(self, row: int | None, column: int | str | None) -> Any:
        if row is None and column is None:
            if self.shape != (1, 1):
                msg = (
                    "can only call `.item()` if the dataframe is of shape (1, 1),"
                    " or if explicit row/col values are provided;"
                    f" frame has shape {self.shape!r}"
                )
                raise ValueError(msg)
            return self.native.iloc[0, 0]

        elif row is None or column is None:
            msg = "cannot call `.item()` with only one of `row` or `column`"
            raise ValueError(msg)

        _col = self.columns.index(column) if isinstance(column, str) else column
        return self.native.iloc[row, _col]

    def clone(self) -> Self:
        return self._with_native(self.native.copy(), validate_column_names=False)

    def gather_every(self, n: int, offset: int) -> Self:
        return self._with_native(self.native.iloc[offset::n], validate_column_names=False)

    def _pivot_into_index_values(
        self,
        on: Sequence[str],
        index: Sequence[str] | None,
        values: Sequence[str] | None,
        /,
    ) -> tuple[Sequence[str], Sequence[str]]:
        index = index or (
            exclude_column_names(self, {*on, *values})
            if values
            else exclude_column_names(self, on)
        )
        values = values or exclude_column_names(self, {*on, *index})
        return index, values

    @staticmethod
    def _pivot_multi_on_name(unique_values: tuple[str, ...], /) -> str:
        LB, RB, Q = "{", "}", '"'  # noqa: N806
        body = '","'.join(unique_values)
        return f"{LB}{Q}{body}{Q}{RB}"

    @staticmethod
    def _pivot_single_on_names(
        column_names: Iterable[str], n_values: int, separator: str, /
    ) -> list[str]:
        if n_values > 1:
            return [separator.join(col).strip() for col in column_names]
        return [col[-1] for col in column_names]

    def _pivot_multi_on_names(
        self,
        column_names: Iterable[tuple[str, ...]],
        n_on: int,
        n_values: int,
        separator: str,
        /,
    ) -> Iterator[str]:
        if n_values > 1:
            for col in column_names:
                names = col[-n_on:]
                prefix = col[0]
                yield separator.join((prefix, self._pivot_multi_on_name(names)))
        else:
            for col in column_names:
                yield self._pivot_multi_on_name(col[-n_on:])

    def _pivot_remap_column_names(
        self, column_names: Iterable[Any], *, n_on: int, n_values: int, separator: str
    ) -> list[str]:
        """Reformat output column names from a native pivot operation, to match `polars`.

        Note:
            `column_names` is a `pd.MultiIndex`, but not in the stubs.
        """
        if n_on == 1:
            return self._pivot_single_on_names(column_names, n_values, separator)
        return list(self._pivot_multi_on_names(column_names, n_on, n_values, separator))

    def _pivot_table(
        self,
        on: Sequence[str],
        index: Sequence[str],
        values: Sequence[str],
        aggregate_function: Literal[
            "min", "max", "first", "last", "sum", "mean", "median"
        ],
        /,
    ) -> Any:
        categorical = self._version.dtypes.Categorical
        kwds: dict[Any, Any] = {"observed": True}
        if self._implementation is Implementation.CUDF:
            kwds.pop("observed")
            cols = set(chain(values, index, on))
            schema = self.schema.items()
            if any(
                tp for name, tp in schema if name in cols and isinstance(tp, categorical)
            ):
                msg = "`pivot` with Categoricals is not implemented for cuDF backend"
                raise NotImplementedError(msg)
        return self.native.pivot_table(
            values=values,
            index=index,
            columns=on,
            aggfunc=aggregate_function,
            margins=False,
            **kwds,
        )

    def _pivot(
        self,
        on: Sequence[str],
        index: Sequence[str],
        values: Sequence[str],
        aggregate_function: PivotAgg | None,
        /,
    ) -> pd.DataFrame:
        if aggregate_function is None:
            return self.native.pivot(columns=on, index=index, values=values)
        elif aggregate_function == "len":
            return (
                self.native.groupby([*on, *index], as_index=False)
                .agg(dict.fromkeys(values, "size"))
                .pivot(columns=on, index=index, values=values)
            )
        return self._pivot_table(on, index, values, aggregate_function)

    def pivot(
        self,
        on: Sequence[str],
        *,
        index: Sequence[str] | None,
        values: Sequence[str] | None,
        aggregate_function: PivotAgg | None,
        sort_columns: bool,
        separator: str,
    ) -> Self:
        implementation = self._implementation
        backend_version = self._backend_version
        if implementation.is_pandas() and backend_version < (1, 1):  # pragma: no cover
            msg = "pivot is only supported for 'pandas>=1.1'"
            raise NotImplementedError(msg)
        if implementation.is_modin():
            msg = "pivot is not supported for Modin backend due to https://github.com/modin-project/modin/issues/7409."
            raise NotImplementedError(msg)

        index, values = self._pivot_into_index_values(on, index, values)
        result = self._pivot(on, index, values, aggregate_function)

        # Select the columns in the right order
        uniques = (
            (
                self.get_column(col)
                .unique()
                .sort(descending=False, nulls_last=False)
                .to_list()
                for col in on
            )
            if sort_columns
            else (self.get_column(col).unique().to_list() for col in on)
        )
        ordered_cols = list(product(values, *chain(uniques)))
        result = result.loc[:, ordered_cols]
        columns = result.columns
        remapped = self._pivot_remap_column_names(
            columns, n_on=len(on), n_values=len(values), separator=separator
        )
        result.columns = remapped  # type: ignore[assignment]
        result.columns.names = [""]
        return self._with_native(result.reset_index())

    def to_arrow(self) -> Any:
        if self._implementation is Implementation.CUDF:
            return self.native.to_arrow(preserve_index=False)

        import pyarrow as pa  # ignore-banned-import()

        return pa.Table.from_pandas(self.native)

    def sample(
        self,
        n: int | None,
        *,
        fraction: float | None,
        with_replacement: bool,
        seed: int | None,
    ) -> Self:
        return self._with_native(
            self.native.sample(
                n=n, frac=fraction, replace=with_replacement, random_state=seed
            ),
            validate_column_names=False,
        )

    def unpivot(
        self,
        on: Sequence[str] | None,
        index: Sequence[str] | None,
        variable_name: str,
        value_name: str,
    ) -> Self:
        return self._with_native(
            self.native.melt(
                id_vars=index,
                value_vars=on,
                var_name=variable_name,
                value_name=value_name,
            )
        )

    def explode(self, columns: Sequence[str]) -> Self:
        dtypes = self._version.dtypes

        schema = self.collect_schema()
        for col_to_explode in columns:
            dtype = schema[col_to_explode]

            if dtype != dtypes.List:
                msg = (
                    f"`explode` operation not supported for dtype `{dtype}`, "
                    "expected List type"
                )
                raise InvalidOperationError(msg)

        if len(columns) == 1:
            return self._with_native(
                self.native.explode(columns[0]), validate_column_names=False
            )
        else:
            native_frame = self.native
            anchor_series = native_frame[columns[0]].list.len()

            if not all(
                (native_frame[col_name].list.len() == anchor_series).all()
                for col_name in columns[1:]
            ):
                msg = "exploded columns must have matching element counts"
                raise ShapeError(msg)

            original_columns = self.columns
            other_columns = [c for c in original_columns if c not in columns]

            exploded_frame = native_frame[[*other_columns, columns[0]]].explode(
                columns[0]
            )
            exploded_series = [
                native_frame[col_name].explode().to_frame() for col_name in columns[1:]
            ]

            plx = self.__native_namespace__()
            return self._with_native(
                plx.concat([exploded_frame, *exploded_series], axis=1)[original_columns],
                validate_column_names=False,
            )
