from __future__ import annotations

import contextlib
import os
from collections.abc import Sequence
from io import BytesIO, StringIO
from pathlib import Path
from typing import IO, TYPE_CHECKING, Any, Callable, Literal

import polars._reexport as pl
import polars.functions as F
from polars._utils.deprecation import deprecate_renamed_parameter
from polars._utils.various import (
    _process_null_values,
    is_path_or_str_sequence,
    is_str_sequence,
    normalize_filepath,
    qualified_type_name,
)
from polars._utils.wrap import wrap_df, wrap_ldf
from polars.datatypes import N_INFER_DEFAULT, String, parse_into_dtype
from polars.io._utils import (
    is_glob_pattern,
    parse_columns_arg,
    parse_row_index_args,
    prepare_file_arg,
)
from polars.io.cloud.credential_provider._builder import (
    _init_credential_provider_builder,
)
from polars.io.csv._utils import _check_arg_is_1byte, _update_columns
from polars.io.csv.batched_reader import BatchedCsvReader

with contextlib.suppress(ImportError):  # Module not available when building docs
    from polars.polars import PyDataFrame, PyLazyFrame

if TYPE_CHECKING:
    from collections.abc import Mapping

    from polars import DataFrame, LazyFrame
    from polars._typing import CsvEncoding, PolarsDataType, SchemaDict
    from polars.io.cloud import CredentialProviderFunction
    from polars.io.cloud.credential_provider._builder import CredentialProviderBuilder


@deprecate_renamed_parameter("dtypes", "schema_overrides", version="0.20.31")
@deprecate_renamed_parameter("row_count_name", "row_index_name", version="0.20.4")
@deprecate_renamed_parameter("row_count_offset", "row_index_offset", version="0.20.4")
def read_csv(
    source: str | Path | IO[str] | IO[bytes] | bytes,
    *,
    has_header: bool = True,
    columns: Sequence[int] | Sequence[str] | None = None,
    new_columns: Sequence[str] | None = None,
    separator: str = ",",
    comment_prefix: str | None = None,
    quote_char: str | None = '"',
    skip_rows: int = 0,
    skip_lines: int = 0,
    schema: SchemaDict | None = None,
    schema_overrides: (
        Mapping[str, PolarsDataType] | Sequence[PolarsDataType] | None
    ) = None,
    null_values: str | Sequence[str] | dict[str, str] | None = None,
    missing_utf8_is_empty_string: bool = False,
    ignore_errors: bool = False,
    try_parse_dates: bool = False,
    n_threads: int | None = None,
    infer_schema: bool = True,
    infer_schema_length: int | None = N_INFER_DEFAULT,
    batch_size: int = 8192,
    n_rows: int | None = None,
    encoding: CsvEncoding | str = "utf8",
    low_memory: bool = False,
    rechunk: bool = False,
    use_pyarrow: bool = False,
    storage_options: dict[str, Any] | None = None,
    skip_rows_after_header: int = 0,
    row_index_name: str | None = None,
    row_index_offset: int = 0,
    sample_size: int = 1024,
    eol_char: str = "\n",
    raise_if_empty: bool = True,
    truncate_ragged_lines: bool = False,
    decimal_comma: bool = False,
    glob: bool = True,
) -> DataFrame:
    r"""
    Read a CSV file into a DataFrame.

    .. versionchanged:: 0.20.31
        The `dtypes` parameter was renamed `schema_overrides`.
    .. versionchanged:: 0.20.4
        * The `row_count_name` parameter was renamed `row_index_name`.
        * The `row_count_offset` parameter was renamed `row_index_offset`.

    Parameters
    ----------
    source
        Path to a file or a file-like object (by "file-like object" we refer to objects
        that have a `read()` method, such as a file handler like the builtin `open`
        function, or a `BytesIO` instance). If `fsspec` is installed, it will be used
        to open remote files. For file-like objects, the stream position may not be
        updated accordingly after reading.
    has_header
        Indicate if the first row of the dataset is a header or not. If set to False,
        column names will be autogenerated in the following format: `column_x`, with
        `x` being an enumeration over every column in the dataset, starting at 1.
    columns
        Columns to select. Accepts a list of column indices (starting
        at zero) or a list of column names.
    new_columns
        Rename columns right after parsing the CSV file. If the given
        list is shorter than the width of the DataFrame the remaining
        columns will have their original name.
    separator
        Single byte character to use as separator in the file.
    comment_prefix
        A string used to indicate the start of a comment line. Comment lines are skipped
        during parsing. Common examples of comment prefixes are `#` and `//`.
    quote_char
        Single byte character used for csv quoting, default = `"`.
        Set to None to turn off special handling and escaping of quotes.
    skip_rows
        Start reading after ``skip_rows`` rows. The header will be parsed at this
        offset. Note that we respect CSV escaping/comments when skipping rows.
        If you want to skip by newline char only, use `skip_lines`.
    skip_lines
        Start reading after `skip_lines` lines. The header will be parsed at this
        offset. Note that CSV escaping will not be respected when skipping lines.
        If you want to skip valid CSV rows, use ``skip_rows``.
    schema
        Provide the schema. This means that polars doesn't do schema inference.
        This argument expects the complete schema, whereas `schema_overrides` can be
        used to partially overwrite a schema. Note that the order of the columns in
        the provided `schema` must match the order of the columns in the CSV being read.
    schema_overrides
        Overwrite dtypes for specific or all columns during schema inference.
    null_values
        Values to interpret as null values. You can provide a:

        - `str`: All values equal to this string will be null.
        - `List[str]`: All values equal to any string in this list will be null.
        - `Dict[str, str]`: A dictionary that maps column name to a
          null value string.

    missing_utf8_is_empty_string
        By default a missing value is considered to be null; if you would prefer missing
        utf8 values to be treated as the empty string you can set this param True.
    ignore_errors
        Try to keep reading lines if some lines yield errors.
        Before using this option, try to increase the number of lines used for schema
        inference with e.g `infer_schema_length=10000` or override automatic dtype
        inference for specific columns with the `schema_overrides` option or use
        `infer_schema=False` to read all columns as `pl.String` to check which
        values might cause an issue.
    try_parse_dates
        Try to automatically parse dates. Most ISO8601-like formats can
        be inferred, as well as a handful of others. If this does not succeed,
        the column remains of data type `pl.String`.
        If `use_pyarrow=True`, dates will always be parsed.
    n_threads
        Number of threads to use in csv parsing.
        Defaults to the number of physical cpu's of your system.
    infer_schema
        When `True`, the schema is inferred from the data using the first
        `infer_schema_length` rows.
        When `False`, the schema is not inferred and will be `pl.String` if not
        specified in `schema` or `schema_overrides`.
    infer_schema_length
        The maximum number of rows to scan for schema inference.
        If set to `None`, the full data may be scanned *(this is slow)*.
        Set `infer_schema=False` to read all columns as `pl.String`.
    batch_size
        Number of lines to read into the buffer at once.
        Modify this to change performance.
    n_rows
        Stop reading from CSV file after reading `n_rows`.
        During multi-threaded parsing, an upper bound of `n_rows`
        rows cannot be guaranteed.
    encoding : {'utf8', 'utf8-lossy', 'windows-1252', 'windows-1252-lossy', ...}
        Lossy means that invalid utf8 values are replaced with `�`
        characters. When using other encodings than `utf8` or
        `utf8-lossy`, the input is first decoded in memory with
        python. Defaults to `utf8`.
    low_memory
        Reduce memory pressure at the expense of performance.
    rechunk
        Make sure that all columns are contiguous in memory by
        aggregating the chunks into a single array.
    use_pyarrow
        Try to use pyarrow's native CSV parser. This will always
        parse dates, even if `try_parse_dates=False`.
        This is not always possible. The set of arguments given to
        this function determines if it is possible to use pyarrow's
        native parser. Note that pyarrow and polars may have a
        different strategy regarding type inference.
    storage_options
        Extra options that make sense for `fsspec.open()` or a
        particular storage connection.
        e.g. host, port, username, password, etc.
    skip_rows_after_header
        Skip this number of rows when the header is parsed.
    row_index_name
        Insert a row index column with the given name into the DataFrame as the first
        column. If set to `None` (default), no row index column is created.
    row_index_offset
        Start the row index at this offset. Cannot be negative.
        Only used if `row_index_name` is set.
    sample_size
        Set the sample size. This is used to sample statistics to estimate the
        allocation needed.

        .. deprecated:: 1.10.0
            This parameter is now a no-op.
    eol_char
        Single byte end of line character (default: `\n`). When encountering a file
        with windows line endings (`\r\n`), one can go with the default `\n`. The extra
        `\r` will be removed when processed.
    raise_if_empty
        When there is no data in the source, `NoDataError` is raised. If this parameter
        is set to False, an empty DataFrame (with no columns) is returned instead.
    truncate_ragged_lines
        Truncate lines that are longer than the schema.
    decimal_comma
        Parse floats using a comma as the decimal separator instead of a period.
    glob
        Expand path given via globbing rules.

    Returns
    -------
    DataFrame

    See Also
    --------
    scan_csv : Lazily read from a CSV file or multiple files via glob patterns.

    Warnings
    --------
    Calling `read_csv().lazy()` is an antipattern as this forces Polars to materialize
    a full csv file and therefore cannot push any optimizations into the reader.
    Therefore always prefer `scan_csv` if you want to work with `LazyFrame` s.

    Notes
    -----
    If the schema is inferred incorrectly (e.g. as `pl.Int64` instead of `pl.Float64`),
    try to increase the number of lines used to infer the schema with
    `infer_schema_length` or override the inferred dtype for those columns with
    `schema_overrides`.

    Examples
    --------
    >>> pl.read_csv("data.csv", separator="|")  # doctest: +SKIP

    Demonstrate use against a BytesIO object, parsing string dates.

    >>> from io import BytesIO
    >>> data = BytesIO(
    ...     b"ID,Name,Birthday\n"
    ...     b"1,Alice,1995-07-12\n"
    ...     b"2,Bob,1990-09-20\n"
    ...     b"3,Charlie,2002-03-08\n"
    ... )
    >>> pl.read_csv(data, try_parse_dates=True)
    shape: (3, 3)
    ┌─────┬─────────┬────────────┐
    │ ID  ┆ Name    ┆ Birthday   │
    │ --- ┆ ---     ┆ ---        │
    │ i64 ┆ str     ┆ date       │
    ╞═════╪═════════╪════════════╡
    │ 1   ┆ Alice   ┆ 1995-07-12 │
    │ 2   ┆ Bob     ┆ 1990-09-20 │
    │ 3   ┆ Charlie ┆ 2002-03-08 │
    └─────┴─────────┴────────────┘
    """
    _check_arg_is_1byte("separator", separator, can_be_empty=False)
    _check_arg_is_1byte("quote_char", quote_char, can_be_empty=True)
    _check_arg_is_1byte("eol_char", eol_char, can_be_empty=False)

    projection, columns = parse_columns_arg(columns)
    storage_options = storage_options or {}

    if columns and not has_header:
        for column in columns:
            if not column.startswith("column_"):
                msg = (
                    "specified column names do not start with 'column_',"
                    " but autogenerated header names were requested"
                )
                raise ValueError(msg)

    if schema_overrides is not None and not isinstance(
        schema_overrides, (dict, Sequence)
    ):
        msg = "`schema_overrides` should be of type list or dict"
        raise TypeError(msg)

    if (
        use_pyarrow
        and schema_overrides is None
        and n_rows is None
        and n_threads is None
        and not low_memory
        and null_values is None
    ):
        include_columns: Sequence[str] | None = None
        if columns:
            if not has_header:
                # Convert 'column_1', 'column_2', ... column names to 'f0', 'f1', ...
                # column names for pyarrow, if CSV file does not contain a header.
                include_columns = [f"f{int(column[7:]) - 1}" for column in columns]
            else:
                include_columns = columns

        if not columns and projection:
            # Convert column indices from projection to 'f0', 'f1', ... column names
            # for pyarrow.
            include_columns = [f"f{column_idx}" for column_idx in projection]

        with prepare_file_arg(
            source,
            encoding=None,
            use_pyarrow=True,
            raise_if_empty=raise_if_empty,
            storage_options=storage_options,
        ) as data:
            import pyarrow as pa
            import pyarrow.csv

            try:
                tbl = pa.csv.read_csv(
                    data,
                    pa.csv.ReadOptions(
                        skip_rows=skip_rows,
                        skip_rows_after_names=skip_rows_after_header,
                        autogenerate_column_names=not has_header,
                        encoding=encoding,
                    ),
                    pa.csv.ParseOptions(
                        delimiter=separator,
                        quote_char=quote_char if quote_char else False,
                        double_quote=quote_char is not None and quote_char == '"',
                    ),
                    pa.csv.ConvertOptions(
                        column_types=None,
                        include_columns=include_columns,
                        include_missing_columns=ignore_errors,
                    ),
                )
            except pa.ArrowInvalid as err:
                if raise_if_empty or "Empty CSV" not in str(err):
                    raise
                return pl.DataFrame()

        if not has_header:
            # Rename 'f0', 'f1', ... columns names autogenerated by pyarrow
            # to 'column_1', 'column_2', ...
            tbl = tbl.rename_columns(
                [f"column_{int(column[1:]) + 1}" for column in tbl.column_names]
            )

        df = pl.DataFrame._from_arrow(tbl, rechunk=rechunk)
        if new_columns:
            return _update_columns(df, new_columns)
        return df

    if projection and schema_overrides and isinstance(schema_overrides, list):
        if len(projection) < len(schema_overrides):
            msg = "more schema overrides are specified than there are selected columns"
            raise ValueError(msg)

        # Fix list of dtypes when used together with projection as polars CSV reader
        # wants a list of dtypes for the x first columns before it does the projection.
        dtypes_list: list[PolarsDataType] = [String] * (max(projection) + 1)

        for idx, column_idx in enumerate(projection):
            if idx < len(schema_overrides):
                dtypes_list[column_idx] = schema_overrides[idx]

        schema_overrides = dtypes_list

    if columns and schema_overrides and isinstance(schema_overrides, list):
        if len(columns) < len(schema_overrides):
            msg = "more dtypes overrides are specified than there are selected columns"
            raise ValueError(msg)

        # Map list of dtypes when used together with selected columns as a dtypes dict
        # so the dtypes are applied to the correct column instead of the first x
        # columns.
        schema_overrides = dict(zip(columns, schema_overrides))

    if new_columns and schema_overrides and isinstance(schema_overrides, dict):
        current_columns = None

        # As new column names are not available yet while parsing the CSV file, rename
        # column names in dtypes to old names (if possible) so they can be used during
        # CSV parsing.
        if columns:
            if len(columns) < len(new_columns):
                msg = (
                    "more new column names are specified than there are selected"
                    " columns"
                )
                raise ValueError(msg)

            # Get column names of requested columns.
            current_columns = columns[0 : len(new_columns)]
        elif not has_header:
            # When there are no header, column names are autogenerated (and known).

            if projection:
                if columns and len(columns) < len(new_columns):
                    msg = (
                        "more new column names are specified than there are selected"
                        " columns"
                    )
                    raise ValueError(msg)
                # Convert column indices from projection to 'column_1', 'column_2', ...
                # column names.
                current_columns = [
                    f"column_{column_idx + 1}" for column_idx in projection
                ]
            else:
                # Generate autogenerated 'column_1', 'column_2', ... column names for
                # new column names.
                current_columns = [
                    f"column_{column_idx}"
                    for column_idx in range(1, len(new_columns) + 1)
                ]
        else:
            # When a header is present, column names are not known yet.

            if len(schema_overrides) <= len(new_columns):
                # If dtypes dictionary contains less or same amount of values than new
                # column names a list of dtypes can be created if all listed column
                # names in dtypes dictionary appear in the first consecutive new column
                # names.
                dtype_list = [
                    schema_overrides[new_column_name]
                    for new_column_name in new_columns[0 : len(schema_overrides)]
                    if new_column_name in schema_overrides
                ]

                if len(dtype_list) == len(schema_overrides):
                    schema_overrides = dtype_list

        if current_columns and isinstance(schema_overrides, dict):
            new_to_current = dict(zip(new_columns, current_columns))
            # Change new column names to current column names in dtype.
            schema_overrides = {
                new_to_current.get(column_name, column_name): column_dtype
                for column_name, column_dtype in schema_overrides.items()
            }

    if not infer_schema:
        infer_schema_length = 0

    # TODO: scan_csv doesn't support a "dtype slice" (i.e. list[DataType])
    schema_overrides_is_list = isinstance(schema_overrides, Sequence)
    encoding_supported_in_lazy = encoding in {"utf8", "utf8-lossy"}

    new_streaming = (
        os.getenv("POLARS_FORCE_NEW_STREAMING") == "1"
        or os.getenv("POLARS_AUTO_NEW_STREAMING") == "1"
    )

    if new_streaming or (
        # Check that it is not a BytesIO object
        isinstance(v := source, (str, Path))
        and (
            # HuggingFace only for now ⊂( ◜◒◝ )⊃
            str(v).startswith("hf://")
            # Also dispatch on FORCE_ASYNC, so that this codepath gets run
            # through by our test suite during CI.
            or (
                os.getenv("POLARS_FORCE_ASYNC") == "1"
                and not schema_overrides_is_list
                and encoding_supported_in_lazy
            )
            # TODO: We can't dispatch this for all paths due to a few reasons:
            # * `scan_csv` does not support compressed files
            # * The `storage_options` configuration keys are different between
            #   fsspec and object_store (would require a breaking change)
        )
    ):
        if isinstance(source, (str, Path)):
            source = normalize_filepath(source, check_not_directory=False)
        elif is_path_or_str_sequence(source, allow_str=False):
            source = [  # type: ignore[assignment]
                normalize_filepath(source, check_not_directory=False)
                for source in source
            ]

        if not new_streaming:
            if schema_overrides_is_list:
                msg = "passing a list to `schema_overrides` is unsupported for hf:// paths"
                raise ValueError(msg)
            if not encoding_supported_in_lazy:
                msg = f"unsupported encoding {encoding} for hf:// paths"
                raise ValueError(msg)

        lf = _scan_csv_impl(
            source,
            has_header=has_header,
            separator=separator,
            comment_prefix=comment_prefix,
            quote_char=quote_char,
            skip_rows=skip_rows,
            skip_lines=skip_lines,
            schema_overrides=schema_overrides,  # type: ignore[arg-type]
            schema=schema,
            null_values=null_values,
            missing_utf8_is_empty_string=missing_utf8_is_empty_string,
            ignore_errors=ignore_errors,
            try_parse_dates=try_parse_dates,
            infer_schema_length=infer_schema_length,
            n_rows=n_rows,
            encoding=encoding,  # type: ignore[arg-type]
            low_memory=low_memory,
            rechunk=rechunk,
            skip_rows_after_header=skip_rows_after_header,
            row_index_name=row_index_name,
            row_index_offset=row_index_offset,
            eol_char=eol_char,
            raise_if_empty=raise_if_empty,
            truncate_ragged_lines=truncate_ragged_lines,
            decimal_comma=decimal_comma,
            glob=glob,
        )

        if columns:
            lf = lf.select(columns)
        elif projection:
            lf = lf.select(F.nth(projection))

        df = lf.collect()

    else:
        with prepare_file_arg(
            source,
            encoding=encoding,
            use_pyarrow=False,
            raise_if_empty=raise_if_empty,
            storage_options=storage_options,
        ) as data:
            df = _read_csv_impl(
                data,
                has_header=has_header,
                columns=columns if columns else projection,
                separator=separator,
                comment_prefix=comment_prefix,
                quote_char=quote_char,
                skip_rows=skip_rows,
                skip_lines=skip_lines,
                schema_overrides=schema_overrides,
                schema=schema,
                null_values=null_values,
                missing_utf8_is_empty_string=missing_utf8_is_empty_string,
                ignore_errors=ignore_errors,
                try_parse_dates=try_parse_dates,
                n_threads=n_threads,
                infer_schema_length=infer_schema_length,
                batch_size=batch_size,
                n_rows=n_rows,
                encoding=encoding if encoding == "utf8-lossy" else "utf8",
                low_memory=low_memory,
                rechunk=rechunk,
                skip_rows_after_header=skip_rows_after_header,
                row_index_name=row_index_name,
                row_index_offset=row_index_offset,
                eol_char=eol_char,
                raise_if_empty=raise_if_empty,
                truncate_ragged_lines=truncate_ragged_lines,
                decimal_comma=decimal_comma,
                glob=glob,
            )

    if new_columns:
        return _update_columns(df, new_columns)
    return df


def _read_csv_impl(
    source: str | Path | IO[bytes] | bytes,
    *,
    has_header: bool = True,
    columns: Sequence[int] | Sequence[str] | None = None,
    separator: str = ",",
    comment_prefix: str | None = None,
    quote_char: str | None = '"',
    skip_rows: int = 0,
    skip_lines: int = 0,
    schema: None | SchemaDict = None,
    schema_overrides: None | (SchemaDict | Sequence[PolarsDataType]) = None,
    null_values: str | Sequence[str] | dict[str, str] | None = None,
    missing_utf8_is_empty_string: bool = False,
    ignore_errors: bool = False,
    try_parse_dates: bool = False,
    n_threads: int | None = None,
    infer_schema_length: int | None = N_INFER_DEFAULT,
    batch_size: int = 8192,
    n_rows: int | None = None,
    encoding: CsvEncoding = "utf8",
    low_memory: bool = False,
    rechunk: bool = False,
    skip_rows_after_header: int = 0,
    row_index_name: str | None = None,
    row_index_offset: int = 0,
    sample_size: int = 1024,
    eol_char: str = "\n",
    raise_if_empty: bool = True,
    truncate_ragged_lines: bool = False,
    decimal_comma: bool = False,
    glob: bool = True,
) -> DataFrame:
    path: str | None
    if isinstance(source, (str, Path)):
        path = normalize_filepath(source, check_not_directory=False)
    else:
        path = None
        if isinstance(source, BytesIO):
            source = source.getvalue()
        if isinstance(source, StringIO):
            source = source.getvalue().encode()

    dtype_list: Sequence[tuple[str, PolarsDataType]] | None = None
    dtype_slice: Sequence[PolarsDataType] | None = None
    if schema_overrides is not None:
        if isinstance(schema_overrides, dict):
            dtype_list = []
            for k, v in schema_overrides.items():
                dtype_list.append((k, parse_into_dtype(v)))
        elif isinstance(schema_overrides, Sequence):
            dtype_slice = schema_overrides
        else:
            msg = f"`schema_overrides` should be of type list or dict, got {qualified_type_name(schema_overrides)!r}"
            raise TypeError(msg)

    processed_null_values = _process_null_values(null_values)

    if isinstance(columns, str):
        columns = [columns]
    if isinstance(source, str) and is_glob_pattern(source):
        dtypes_dict = None
        if dtype_list is not None:
            dtypes_dict = dict(dtype_list)
        if dtype_slice is not None:
            msg = (
                "cannot use glob patterns and unnamed dtypes as `schema_overrides` argument"
                "\n\nUse `schema_overrides`: Mapping[str, Type[DataType]]"
            )
            raise ValueError(msg)
        from polars import scan_csv

        scan = scan_csv(
            source,
            has_header=has_header,
            separator=separator,
            comment_prefix=comment_prefix,
            quote_char=quote_char,
            skip_rows=skip_rows,
            skip_lines=skip_lines,
            schema=schema,
            schema_overrides=dtypes_dict,
            null_values=null_values,
            missing_utf8_is_empty_string=missing_utf8_is_empty_string,
            ignore_errors=ignore_errors,
            infer_schema_length=infer_schema_length,
            n_rows=n_rows,
            low_memory=low_memory,
            rechunk=rechunk,
            skip_rows_after_header=skip_rows_after_header,
            row_index_name=row_index_name,
            row_index_offset=row_index_offset,
            eol_char=eol_char,
            raise_if_empty=raise_if_empty,
            truncate_ragged_lines=truncate_ragged_lines,
            decimal_comma=decimal_comma,
            glob=glob,
        )
        if columns is None:
            return scan.collect()
        elif is_str_sequence(columns, allow_str=False):
            return scan.select(columns).collect()
        else:
            msg = (
                "cannot use glob patterns and integer based projection as `columns` argument"
                "\n\nUse columns: List[str]"
            )
            raise ValueError(msg)

    projection, columns = parse_columns_arg(columns)

    pydf = PyDataFrame.read_csv(
        source,
        infer_schema_length,
        batch_size,
        has_header,
        ignore_errors,
        n_rows,
        skip_rows,
        skip_lines,
        projection,
        separator,
        rechunk,
        columns,
        encoding,
        n_threads,
        path,
        dtype_list,
        dtype_slice,
        low_memory,
        comment_prefix,
        quote_char,
        processed_null_values,
        missing_utf8_is_empty_string,
        try_parse_dates,
        skip_rows_after_header,
        parse_row_index_args(row_index_name, row_index_offset),
        eol_char=eol_char,
        raise_if_empty=raise_if_empty,
        truncate_ragged_lines=truncate_ragged_lines,
        decimal_comma=decimal_comma,
        schema=schema,
    )
    return wrap_df(pydf)


@deprecate_renamed_parameter("dtypes", "schema_overrides", version="0.20.31")
@deprecate_renamed_parameter("row_count_name", "row_index_name", version="0.20.4")
@deprecate_renamed_parameter("row_count_offset", "row_index_offset", version="0.20.4")
def read_csv_batched(
    source: str | Path,
    *,
    has_header: bool = True,
    columns: Sequence[int] | Sequence[str] | None = None,
    new_columns: Sequence[str] | None = None,
    separator: str = ",",
    comment_prefix: str | None = None,
    quote_char: str | None = '"',
    skip_rows: int = 0,
    schema_overrides: (
        Mapping[str, PolarsDataType] | Sequence[PolarsDataType] | None
    ) = None,
    null_values: str | Sequence[str] | dict[str, str] | None = None,
    missing_utf8_is_empty_string: bool = False,
    ignore_errors: bool = False,
    try_parse_dates: bool = False,
    n_threads: int | None = None,
    infer_schema_length: int | None = N_INFER_DEFAULT,
    batch_size: int = 50_000,
    n_rows: int | None = None,
    encoding: CsvEncoding | str = "utf8",
    low_memory: bool = False,
    rechunk: bool = False,
    skip_rows_after_header: int = 0,
    row_index_name: str | None = None,
    row_index_offset: int = 0,
    sample_size: int = 1024,
    eol_char: str = "\n",
    raise_if_empty: bool = True,
    truncate_ragged_lines: bool = False,
    decimal_comma: bool = False,
) -> BatchedCsvReader:
    r"""
    Read a CSV file in batches.

    Upon creation of the `BatchedCsvReader`, Polars will gather statistics and
    determine the file chunks. After that, work will only be done if `next_batches`
    is called, which will return a list of `n` frames of the given batch size.

    .. versionchanged:: 0.20.31
        The `dtypes` parameter was renamed `schema_overrides`.
    .. versionchanged:: 0.20.4
        * The `row_count_name` parameter was renamed `row_index_name`.
        * The `row_count_offset` parameter was renamed `row_index_offset`.

    Parameters
    ----------
    source
        Path to a file or a file-like object (by "file-like object" we refer to objects
        that have a `read()` method, such as a file handler like the builtin `open`
        function, or a `BytesIO` instance). If `fsspec` is installed, it will be used
        to open remote files. For file-like objects, the stream position may not be
        updated accordingly after reading.
    has_header
        Indicate if the first row of the dataset is a header or not. If set to False,
        column names will be autogenerated in the following format: `column_x`, with
        `x` being an enumeration over every column in the dataset, starting at 1.
    columns
        Columns to select. Accepts a list of column indices (starting
        at zero) or a list of column names.
    new_columns
        Rename columns right after parsing the CSV file. If the given
        list is shorter than the width of the DataFrame the remaining
        columns will have their original name.
    separator
        Single byte character to use as separator in the file.
    comment_prefix
        A string used to indicate the start of a comment line. Comment lines are skipped
        during parsing. Common examples of comment prefixes are `#` and `//`.
    quote_char
        Single byte character used for csv quoting, default = `"`.
        Set to None to turn off special handling and escaping of quotes.
    skip_rows
        Start reading after `skip_rows` lines.
    schema_overrides
        Overwrite dtypes during inference.
    null_values
        Values to interpret as null values. You can provide a:

        - `str`: All values equal to this string will be null.
        - `List[str]`: All values equal to any string in this list will be null.
        - `Dict[str, str]`: A dictionary that maps column name to a
          null value string.

    missing_utf8_is_empty_string
        By default a missing value is considered to be null; if you would prefer missing
        utf8 values to be treated as the empty string you can set this param True.
    ignore_errors
        Try to keep reading lines if some lines yield errors.
        First try `infer_schema_length=0` to read all columns as
        `pl.String` to check which values might cause an issue.
    try_parse_dates
        Try to automatically parse dates. Most ISO8601-like formats can
        be inferred, as well as a handful of others. If this does not succeed,
        the column remains of data type `pl.String`.
    n_threads
        Number of threads to use in csv parsing.
        Defaults to the number of physical cpu's of your system.
    infer_schema_length
        The maximum number of rows to scan for schema inference.
        If set to `0`, all columns will be read as `pl.String`.
        If set to `None`, the full data may be scanned *(this is slow)*.
    batch_size
        Number of lines to read into the buffer at once.

        Modify this to change performance.
    n_rows
        Stop reading from CSV file after reading `n_rows`.
        During multi-threaded parsing, an upper bound of `n_rows`
        rows cannot be guaranteed.
    encoding : {'utf8', 'utf8-lossy', ...}
        Lossy means that invalid utf8 values are replaced with `�`
        characters. When using other encodings than `utf8` or
        `utf8-lossy`, the input is first decoded in memory with
        python. Defaults to `utf8`.
    low_memory
        Reduce memory pressure at the expense of performance.
    rechunk
        Make sure that all columns are contiguous in memory by
        aggregating the chunks into a single array.
    skip_rows_after_header
        Skip this number of rows when the header is parsed.
    row_index_name
        Insert a row index column with the given name into the DataFrame as the first
        column. If set to `None` (default), no row index column is created.
    row_index_offset
        Start the row index at this offset. Cannot be negative.
        Only used if `row_index_name` is set.
    sample_size
        Set the sample size. This is used to sample statistics to estimate the
        allocation needed.

        .. deprecated:: 1.10.0
            Is a no-op.
    eol_char
        Single byte end of line character (default: `\n`). When encountering a file
        with windows line endings (`\r\n`), one can go with the default `\n`. The extra
        `\r` will be removed when processed.
    raise_if_empty
        When there is no data in the source,`NoDataError` is raised. If this parameter
        is set to False, `None` will be returned from `next_batches(n)` instead.
    truncate_ragged_lines
        Truncate lines that are longer than the schema.
    decimal_comma
        Parse floats using a comma as the decimal separator instead of a period.

    Returns
    -------
    BatchedCsvReader

    See Also
    --------
    scan_csv : Lazily read from a CSV file or multiple files via glob patterns.

    Examples
    --------
    >>> reader = pl.read_csv_batched(
    ...     "./pdsh/tables_scale_100/lineitem.tbl",
    ...     separator="|",
    ...     try_parse_dates=True,
    ... )  # doctest: +SKIP
    >>> batches = reader.next_batches(5)  # doctest: +SKIP
    >>> for df in batches:  # doctest: +SKIP
    ...     print(df)

    Read big CSV file in batches and write a CSV file for each "group" of interest.

    >>> seen_groups = set()
    >>> reader = pl.read_csv_batched("big_file.csv")  # doctest: +SKIP
    >>> batches = reader.next_batches(100)  # doctest: +SKIP

    >>> while batches:  # doctest: +SKIP
    ...     df_current_batches = pl.concat(batches)
    ...     partition_dfs = df_current_batches.partition_by("group", as_dict=True)
    ...
    ...     for group, df in partition_dfs.items():
    ...         if group in seen_groups:
    ...             with open(f"./data/{group}.csv", "a") as fh:
    ...                 fh.write(df.write_csv(file=None, include_header=False))
    ...         else:
    ...             df.write_csv(file=f"./data/{group}.csv", include_header=True)
    ...         seen_groups.add(group)
    ...
    ...     batches = reader.next_batches(100)
    """
    projection, columns = parse_columns_arg(columns)

    if columns and not has_header:
        for column in columns:
            if not column.startswith("column_"):
                msg = (
                    "specified column names do not start with 'column_',"
                    " but autogenerated header names were requested"
                )
                raise ValueError(msg)

    if projection and schema_overrides and isinstance(schema_overrides, list):
        if len(projection) < len(schema_overrides):
            msg = "more schema overrides are specified than there are selected columns"
            raise ValueError(msg)

        # Fix list of dtypes when used together with projection as polars CSV reader
        # wants a list of dtypes for the x first columns before it does the projection.
        dtypes_list: list[PolarsDataType] = [String] * (max(projection) + 1)

        for idx, column_idx in enumerate(projection):
            if idx < len(schema_overrides):
                dtypes_list[column_idx] = schema_overrides[idx]

        schema_overrides = dtypes_list

    if columns and schema_overrides and isinstance(schema_overrides, list):
        if len(columns) < len(schema_overrides):
            msg = "more schema overrides are specified than there are selected columns"
            raise ValueError(msg)

        # Map list of dtypes when used together with selected columns as a dtypes dict
        # so the dtypes are applied to the correct column instead of the first x
        # columns.
        schema_overrides = dict(zip(columns, schema_overrides))

    if new_columns and schema_overrides and isinstance(schema_overrides, dict):
        current_columns = None

        # As new column names are not available yet while parsing the CSV file, rename
        # column names in dtypes to old names (if possible) so they can be used during
        # CSV parsing.
        if columns:
            if len(columns) < len(new_columns):
                msg = "more new column names are specified than there are selected columns"
                raise ValueError(msg)

            # Get column names of requested columns.
            current_columns = columns[0 : len(new_columns)]
        elif not has_header:
            # When there are no header, column names are autogenerated (and known).

            if projection:
                if columns and len(columns) < len(new_columns):
                    msg = "more new column names are specified than there are selected columns"
                    raise ValueError(msg)
                # Convert column indices from projection to 'column_1', 'column_2', ...
                # column names.
                current_columns = [
                    f"column_{column_idx + 1}" for column_idx in projection
                ]
            else:
                # Generate autogenerated 'column_1', 'column_2', ... column names for
                # new column names.
                current_columns = [
                    f"column_{column_idx}"
                    for column_idx in range(1, len(new_columns) + 1)
                ]
        else:
            # When a header is present, column names are not known yet.

            if len(schema_overrides) <= len(new_columns):
                # If dtypes dictionary contains less or same amount of values than new
                # column names a list of dtypes can be created if all listed column
                # names in dtypes dictionary appear in the first consecutive new column
                # names.
                dtype_list = [
                    schema_overrides[new_column_name]
                    for new_column_name in new_columns[0 : len(schema_overrides)]
                    if new_column_name in schema_overrides
                ]

                if len(dtype_list) == len(schema_overrides):
                    schema_overrides = dtype_list

        if current_columns and isinstance(schema_overrides, dict):
            new_to_current = dict(zip(new_columns, current_columns))
            # Change new column names to current column names in dtype.
            schema_overrides = {
                new_to_current.get(column_name, column_name): column_dtype
                for column_name, column_dtype in schema_overrides.items()
            }

    return BatchedCsvReader(
        source,
        has_header=has_header,
        columns=columns if columns else projection,
        separator=separator,
        comment_prefix=comment_prefix,
        quote_char=quote_char,
        skip_rows=skip_rows,
        schema_overrides=schema_overrides,
        null_values=null_values,
        missing_utf8_is_empty_string=missing_utf8_is_empty_string,
        ignore_errors=ignore_errors,
        try_parse_dates=try_parse_dates,
        n_threads=n_threads,
        infer_schema_length=infer_schema_length,
        batch_size=batch_size,
        n_rows=n_rows,
        encoding=encoding if encoding == "utf8-lossy" else "utf8",
        low_memory=low_memory,
        rechunk=rechunk,
        skip_rows_after_header=skip_rows_after_header,
        row_index_name=row_index_name,
        row_index_offset=row_index_offset,
        eol_char=eol_char,
        new_columns=new_columns,
        raise_if_empty=raise_if_empty,
        truncate_ragged_lines=truncate_ragged_lines,
        decimal_comma=decimal_comma,
    )


@deprecate_renamed_parameter("dtypes", "schema_overrides", version="0.20.31")
@deprecate_renamed_parameter("row_count_name", "row_index_name", version="0.20.4")
@deprecate_renamed_parameter("row_count_offset", "row_index_offset", version="0.20.4")
def scan_csv(
    source: (
        str
        | Path
        | IO[str]
        | IO[bytes]
        | bytes
        | list[str]
        | list[Path]
        | list[IO[str]]
        | list[IO[bytes]]
        | list[bytes]
    ),
    *,
    has_header: bool = True,
    separator: str = ",",
    comment_prefix: str | None = None,
    quote_char: str | None = '"',
    skip_rows: int = 0,
    skip_lines: int = 0,
    schema: SchemaDict | None = None,
    schema_overrides: SchemaDict | Sequence[PolarsDataType] | None = None,
    null_values: str | Sequence[str] | dict[str, str] | None = None,
    missing_utf8_is_empty_string: bool = False,
    ignore_errors: bool = False,
    cache: bool = True,
    with_column_names: Callable[[list[str]], list[str]] | None = None,
    infer_schema: bool = True,
    infer_schema_length: int | None = N_INFER_DEFAULT,
    n_rows: int | None = None,
    encoding: CsvEncoding = "utf8",
    low_memory: bool = False,
    rechunk: bool = False,
    skip_rows_after_header: int = 0,
    row_index_name: str | None = None,
    row_index_offset: int = 0,
    try_parse_dates: bool = False,
    eol_char: str = "\n",
    new_columns: Sequence[str] | None = None,
    raise_if_empty: bool = True,
    truncate_ragged_lines: bool = False,
    decimal_comma: bool = False,
    glob: bool = True,
    storage_options: dict[str, Any] | None = None,
    credential_provider: CredentialProviderFunction | Literal["auto"] | None = "auto",
    retries: int = 2,
    file_cache_ttl: int | None = None,
    include_file_paths: str | None = None,
) -> LazyFrame:
    r"""
    Lazily read from a CSV file or multiple files via glob patterns.

    This allows the query optimizer to push down predicates and
    projections to the scan level, thereby potentially reducing
    memory overhead.

    .. versionchanged:: 0.20.31
        The `dtypes` parameter was renamed `schema_overrides`.
    .. versionchanged:: 0.20.4
        * The `row_count_name` parameter was renamed `row_index_name`.
        * The `row_count_offset` parameter was renamed `row_index_offset`.

    Parameters
    ----------
    source
        Path(s) to a file or directory
        When needing to authenticate for scanning cloud locations, see the
        `storage_options` parameter.
    has_header
        Indicate if the first row of the dataset is a header or not. If set to False,
        column names will be autogenerated in the following format: `column_x`, with
        `x` being an enumeration over every column in the dataset, starting at 1.
    separator
        Single byte character to use as separator in the file.
    comment_prefix
        A string used to indicate the start of a comment line. Comment lines are skipped
        during parsing. Common examples of comment prefixes are `#` and `//`.
    quote_char
        Single byte character used for csv quoting, default = `"`.
        Set to None to turn off special handling and escaping of quotes.
    skip_rows
        Start reading after ``skip_rows`` rows. The header will be parsed at this
        offset. Note that we respect CSV escaping/comments when skipping rows.
        If you want to skip by newline char only, use `skip_lines`.
    skip_lines
        Start reading after `skip_lines` lines. The header will be parsed at this
        offset. Note that CSV escaping will not be respected when skipping lines.
        If you want to skip valid CSV rows, use ``skip_rows``.
    schema
        Provide the schema. This means that polars doesn't do schema inference.
        This argument expects the complete schema, whereas `schema_overrides` can be
        used to partially overwrite a schema. Note that the order of the columns in
        the provided `schema` must match the order of the columns in the CSV being read.
    schema_overrides
        Overwrite dtypes during inference; should be a {colname:dtype,} dict or,
        if providing a list of strings to `new_columns`, a list of dtypes of
        the same length.
    null_values
        Values to interpret as null values. You can provide a:

        - `str`: All values equal to this string will be null.
        - `List[str]`: All values equal to any string in this list will be null.
        - `Dict[str, str]`: A dictionary that maps column name to a
          null value string.

    missing_utf8_is_empty_string
        By default a missing value is considered to be null; if you would prefer missing
        utf8 values to be treated as the empty string you can set this param True.
    ignore_errors
        Try to keep reading lines if some lines yield errors.
        First try `infer_schema=False` to read all columns as
        `pl.String` to check which values might cause an issue.
    cache
        Cache the result after reading.
    with_column_names
        Apply a function over the column names just in time (when they are determined);
        this function will receive (and should return) a list of column names.
    infer_schema
        When `True`, the schema is inferred from the data using the first
        `infer_schema_length` rows.
        When `False`, the schema is not inferred and will be `pl.String` if not
        specified in `schema` or `schema_overrides`.
    infer_schema_length
        The maximum number of rows to scan for schema inference.
        If set to `None`, the full data may be scanned *(this is slow)*.
        Set `infer_schema=False` to read all columns as `pl.String`.
    n_rows
        Stop reading from CSV file after reading `n_rows`.
    encoding : {'utf8', 'utf8-lossy'}
        Lossy means that invalid utf8 values are replaced with `�`
        characters. Defaults to "utf8".
    low_memory
        Reduce memory pressure at the expense of performance.
    rechunk
        Reallocate to contiguous memory when all chunks/ files are parsed.
    skip_rows_after_header
        Skip this number of rows when the header is parsed.
    row_index_name
        If not None, this will insert a row index column with the given name into
        the DataFrame.
    row_index_offset
        Offset to start the row index column (only used if the name is set).
    try_parse_dates
        Try to automatically parse dates. Most ISO8601-like formats
        can be inferred, as well as a handful of others. If this does not succeed,
        the column remains of data type `pl.String`.
    eol_char
        Single byte end of line character (default: `\n`). When encountering a file
        with windows line endings (`\r\n`), one can go with the default `\n`. The extra
        `\r` will be removed when processed.
    new_columns
        Provide an explicit list of string column names to use (for example, when
        scanning a headerless CSV file). If the given list is shorter than the width of
        the DataFrame the remaining columns will have their original name.
    raise_if_empty
        When there is no data in the source, `NoDataError` is raised. If this parameter
        is set to False, an empty LazyFrame (with no columns) is returned instead.
    truncate_ragged_lines
        Truncate lines that are longer than the schema.
    decimal_comma
        Parse floats using a comma as the decimal separator instead of a period.
    glob
        Expand path given via globbing rules.
    storage_options
        Options that indicate how to connect to a cloud provider.

        The cloud providers currently supported are AWS, GCP, and Azure.
        See supported keys here:

        * `aws <https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html>`_
        * `gcp <https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html>`_
        * `azure <https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html>`_
        * Hugging Face (`hf://`): Accepts an API key under the `token` parameter: \
          `{'token': '...'}`, or by setting the `HF_TOKEN` environment variable.

        If `storage_options` is not provided, Polars will try to infer the information
        from environment variables.
    credential_provider
        Provide a function that can be called to provide cloud storage
        credentials. The function is expected to return a dictionary of
        credential keys along with an optional credential expiry time.

        .. warning::
            This functionality is considered **unstable**. It may be changed
            at any point without it being considered a breaking change.
    retries
        Number of retries if accessing a cloud instance fails.
    file_cache_ttl
        Amount of time to keep downloaded cloud files since their last access time,
        in seconds. Uses the `POLARS_FILE_CACHE_TTL` environment variable
        (which defaults to 1 hour) if not given.
    include_file_paths
        Include the path of the source file(s) as a column with this name.

    Returns
    -------
    LazyFrame

    See Also
    --------
    read_csv : Read a CSV file into a DataFrame.

    Examples
    --------
    >>> import pathlib
    >>>
    >>> (
    ...     pl.scan_csv("my_long_file.csv")  # lazy, doesn't do a thing
    ...     .select(
    ...         ["a", "c"]
    ...     )  # select only 2 columns (other columns will not be read)
    ...     .filter(
    ...         pl.col("a") > 10
    ...     )  # the filter is pushed down the scan, so less data is read into memory
    ...     .head(100)  # constrain number of returned results to 100
    ... )  # doctest: +SKIP

    We can use `with_column_names` to modify the header before scanning:

    >>> df = pl.DataFrame(
    ...     {"BrEeZaH": [1, 2, 3, 4], "LaNgUaGe": ["is", "hard", "to", "read"]}
    ... )
    >>> path: pathlib.Path = dirpath / "mydf.csv"
    >>> df.write_csv(path)
    >>> pl.scan_csv(
    ...     path, with_column_names=lambda cols: [col.lower() for col in cols]
    ... ).collect()
    shape: (4, 2)
    ┌─────────┬──────────┐
    │ breezah ┆ language │
    │ ---     ┆ ---      │
    │ i64     ┆ str      │
    ╞═════════╪══════════╡
    │ 1       ┆ is       │
    │ 2       ┆ hard     │
    │ 3       ┆ to       │
    │ 4       ┆ read     │
    └─────────┴──────────┘

    You can also simply replace column names (or provide them if the file has none)
    by passing a list of new column names to the `new_columns` parameter:

    >>> df.write_csv(path)
    >>> pl.scan_csv(
    ...     path,
    ...     new_columns=["idx", "txt"],
    ...     schema_overrides=[pl.UInt16, pl.String],
    ... ).collect()
    shape: (4, 2)
    ┌─────┬──────┐
    │ idx ┆ txt  │
    │ --- ┆ ---  │
    │ u16 ┆ str  │
    ╞═════╪══════╡
    │ 1   ┆ is   │
    │ 2   ┆ hard │
    │ 3   ┆ to   │
    │ 4   ┆ read │
    └─────┴──────┘
    """
    if schema_overrides is not None and not isinstance(
        schema_overrides, (dict, Sequence)
    ):
        msg = "`schema_overrides` should be of type list or dict"
        raise TypeError(msg)

    if not new_columns and isinstance(schema_overrides, Sequence):
        msg = f"expected 'schema_overrides' dict, found {qualified_type_name(schema_overrides)!r}"
        raise TypeError(msg)
    elif new_columns:
        if with_column_names:
            msg = "cannot set both `with_column_names` and `new_columns`; mutually exclusive"
            raise ValueError(msg)
        if schema_overrides and isinstance(schema_overrides, Sequence):
            schema_overrides = dict(zip(new_columns, schema_overrides))

        # wrap new column names as a callable
        def with_column_names(cols: list[str]) -> list[str]:
            if len(cols) > len(new_columns):
                return new_columns + cols[len(new_columns) :]  # type: ignore[operator]
            else:
                return new_columns  # type: ignore[return-value]

    _check_arg_is_1byte("separator", separator, can_be_empty=False)
    _check_arg_is_1byte("quote_char", quote_char, can_be_empty=True)

    if isinstance(source, (str, Path)):
        source = normalize_filepath(source, check_not_directory=False)
    elif is_path_or_str_sequence(source, allow_str=False):
        source = [
            normalize_filepath(source, check_not_directory=False) for source in source
        ]

    if not infer_schema:
        infer_schema_length = 0

    credential_provider_builder = _init_credential_provider_builder(
        credential_provider, source, storage_options, "scan_csv"
    )
    del credential_provider

    return _scan_csv_impl(
        source,
        has_header=has_header,
        separator=separator,
        comment_prefix=comment_prefix,
        quote_char=quote_char,
        skip_rows=skip_rows,
        skip_lines=skip_lines,
        schema_overrides=schema_overrides,  # type: ignore[arg-type]
        schema=schema,
        null_values=null_values,
        missing_utf8_is_empty_string=missing_utf8_is_empty_string,
        ignore_errors=ignore_errors,
        cache=cache,
        with_column_names=with_column_names,
        infer_schema_length=infer_schema_length,
        n_rows=n_rows,
        low_memory=low_memory,
        rechunk=rechunk,
        skip_rows_after_header=skip_rows_after_header,
        encoding=encoding,
        row_index_name=row_index_name,
        row_index_offset=row_index_offset,
        try_parse_dates=try_parse_dates,
        eol_char=eol_char,
        raise_if_empty=raise_if_empty,
        truncate_ragged_lines=truncate_ragged_lines,
        decimal_comma=decimal_comma,
        glob=glob,
        retries=retries,
        storage_options=storage_options,
        credential_provider=credential_provider_builder,
        file_cache_ttl=file_cache_ttl,
        include_file_paths=include_file_paths,
    )


def _scan_csv_impl(
    source: str
    | IO[str]
    | IO[bytes]
    | bytes
    | list[str]
    | list[Path]
    | list[IO[str]]
    | list[IO[bytes]]
    | list[bytes],
    *,
    has_header: bool = True,
    separator: str = ",",
    comment_prefix: str | None = None,
    quote_char: str | None = '"',
    skip_rows: int = 0,
    skip_lines: int = 0,
    schema: SchemaDict | None = None,
    schema_overrides: SchemaDict | None = None,
    null_values: str | Sequence[str] | dict[str, str] | None = None,
    missing_utf8_is_empty_string: bool = False,
    ignore_errors: bool = False,
    cache: bool = True,
    with_column_names: Callable[[list[str]], list[str]] | None = None,
    infer_schema_length: int | None = N_INFER_DEFAULT,
    n_rows: int | None = None,
    encoding: CsvEncoding = "utf8",
    low_memory: bool = False,
    rechunk: bool = False,
    skip_rows_after_header: int = 0,
    row_index_name: str | None = None,
    row_index_offset: int = 0,
    try_parse_dates: bool = False,
    eol_char: str = "\n",
    raise_if_empty: bool = True,
    truncate_ragged_lines: bool = True,
    decimal_comma: bool = False,
    glob: bool = True,
    storage_options: dict[str, Any] | None = None,
    credential_provider: CredentialProviderBuilder | None = None,
    retries: int = 2,
    file_cache_ttl: int | None = None,
    include_file_paths: str | None = None,
) -> LazyFrame:
    dtype_list: list[tuple[str, PolarsDataType]] | None = None
    if schema_overrides is not None:
        if not isinstance(schema_overrides, dict):
            msg = "expected 'schema_overrides' dict, found 'list'"
            raise TypeError(msg)
        dtype_list = []
        for k, v in schema_overrides.items():
            dtype_list.append((k, parse_into_dtype(v)))
    processed_null_values = _process_null_values(null_values)

    if isinstance(source, list):
        sources = source
        source = None  # type: ignore[assignment]
    else:
        sources = []

    if storage_options:
        storage_options = list(storage_options.items())  # type: ignore[assignment]
    else:
        # Handle empty dict input
        storage_options = None

    pylf = PyLazyFrame.new_from_csv(
        source,
        sources,
        separator=separator,
        has_header=has_header,
        ignore_errors=ignore_errors,
        skip_rows=skip_rows,
        skip_lines=skip_lines,
        n_rows=n_rows,
        cache=cache,
        overwrite_dtype=dtype_list,
        low_memory=low_memory,
        comment_prefix=comment_prefix,
        quote_char=quote_char,
        null_values=processed_null_values,
        missing_utf8_is_empty_string=missing_utf8_is_empty_string,
        infer_schema_length=infer_schema_length,
        with_schema_modify=with_column_names,
        rechunk=rechunk,
        skip_rows_after_header=skip_rows_after_header,
        encoding=encoding,
        row_index=parse_row_index_args(row_index_name, row_index_offset),
        try_parse_dates=try_parse_dates,
        eol_char=eol_char,
        raise_if_empty=raise_if_empty,
        truncate_ragged_lines=truncate_ragged_lines,
        decimal_comma=decimal_comma,
        glob=glob,
        schema=schema,
        cloud_options=storage_options,
        credential_provider=credential_provider,
        retries=retries,
        file_cache_ttl=file_cache_ttl,
        include_file_paths=include_file_paths,
    )
    return wrap_ldf(pylf)
