"""Basic introspection of modules."""

from __future__ import annotations

import importlib
import inspect
import os
import pkgutil
import queue
import sys
from multiprocessing import Queue, get_context
from types import ModuleType


class ModuleProperties:
    # Note that all __init__ args must have default values
    def __init__(
        self,
        name: str = "",
        file: str | None = None,
        path: list[str] | None = None,
        all: list[str] | None = None,
        is_c_module: bool = False,
        subpackages: list[str] | None = None,
    ) -> None:
        self.name = name  # __name__ attribute
        self.file = file  # __file__ attribute
        self.path = path  # __path__ attribute
        self.all = all  # __all__ attribute
        self.is_c_module = is_c_module
        self.subpackages = subpackages or []


def is_c_module(module: ModuleType) -> bool:
    if module.__dict__.get("__file__") is None:
        # Could be a namespace package. These must be handled through
        # introspection, since there is no source file.
        return True
    return os.path.splitext(module.__dict__["__file__"])[-1] in [".so", ".pyd", ".dll"]


def is_pyc_only(file: str | None) -> bool:
    return bool(file and file.endswith(".pyc") and not os.path.exists(file[:-1]))


class InspectError(Exception):
    pass


def get_package_properties(package_id: str) -> ModuleProperties:
    """Use runtime introspection to get information about a module/package."""
    try:
        package = importlib.import_module(package_id)
    except BaseException as e:
        raise InspectError(str(e)) from e
    name = getattr(package, "__name__", package_id)
    file = getattr(package, "__file__", None)
    path: list[str] | None = getattr(package, "__path__", None)
    if not isinstance(path, list):
        path = None
    pkg_all = getattr(package, "__all__", None)
    if pkg_all is not None:
        try:
            pkg_all = list(pkg_all)
        except Exception:
            pkg_all = None
    is_c = is_c_module(package)

    if path is None:
        # Object has no path; this means it's either a module inside a package
        # (and thus no sub-packages), or it could be a C extension package.
        if is_c:
            # This is a C extension module, now get the list of all sub-packages
            # using the inspect module
            subpackages = [
                package.__name__ + "." + name
                for name, val in inspect.getmembers(package)
                if inspect.ismodule(val) and val.__name__ == package.__name__ + "." + name
            ]
        else:
            # It's a module inside a package.  There's nothing else to walk/yield.
            subpackages = []
    else:
        all_packages = pkgutil.walk_packages(
            path, prefix=package.__name__ + ".", onerror=lambda r: None
        )
        subpackages = [qualified_name for importer, qualified_name, ispkg in all_packages]
    return ModuleProperties(
        name=name, file=file, path=path, all=pkg_all, is_c_module=is_c, subpackages=subpackages
    )


def worker(tasks: Queue[str], results: Queue[str | ModuleProperties], sys_path: list[str]) -> None:
    """The main loop of a worker introspection process."""
    sys.path = sys_path
    while True:
        mod = tasks.get()
        try:
            prop = get_package_properties(mod)
        except InspectError as e:
            results.put(str(e))
            continue
        results.put(prop)


class ModuleInspect:
    """Perform runtime introspection of modules in a separate process.

    Reuse the process for multiple modules for efficiency. However, if there is an
    error, retry using a fresh process to avoid cross-contamination of state between
    modules.

    We use a separate process to isolate us from many side effects. For example, the
    import of a module may kill the current process, and we want to recover from that.

    Always use in a with statement for proper clean-up:

      with ModuleInspect() as m:
          p = m.get_package_properties('urllib.parse')
    """

    def __init__(self) -> None:
        self._start()

    def _start(self) -> None:
        if sys.platform == "linux":
            ctx = get_context("forkserver")
        else:
            ctx = get_context("spawn")
        self.tasks: Queue[str] = ctx.Queue()
        self.results: Queue[ModuleProperties | str] = ctx.Queue()
        self.proc = ctx.Process(target=worker, args=(self.tasks, self.results, sys.path))
        self.proc.start()
        self.counter = 0  # Number of successful roundtrips

    def close(self) -> None:
        """Free any resources used."""
        self.proc.terminate()

    def get_package_properties(self, package_id: str) -> ModuleProperties:
        """Return some properties of a module/package using runtime introspection.

        Raise InspectError if the target couldn't be imported.
        """
        self.tasks.put(package_id)
        res = self._get_from_queue()
        if res is None:
            # The process died; recover and report error.
            self._start()
            raise InspectError(f"Process died when importing {package_id!r}")
        if isinstance(res, str):
            # Error importing module
            if self.counter > 0:
                # Also try with a fresh process. Maybe one of the previous imports has
                # corrupted some global state.
                self.close()
                self._start()
                return self.get_package_properties(package_id)
            raise InspectError(res)
        self.counter += 1
        return res

    def _get_from_queue(self) -> ModuleProperties | str | None:
        """Get value from the queue.

        Return the value read from the queue, or None if the process unexpectedly died.
        """
        max_iter = 600
        n = 0
        while True:
            if n == max_iter:
                raise RuntimeError("Timeout waiting for subprocess")
            try:
                return self.results.get(timeout=0.05)
            except queue.Empty:
                if not self.proc.is_alive():
                    return None
            n += 1

    def __enter__(self) -> ModuleInspect:
        return self

    def __exit__(self, *args: object) -> None:
        self.close()
