"""Server for mypy daemon mode.

This implements a daemon process which keeps useful state in memory
to enable fine-grained incremental reprocessing of changes.
"""

from __future__ import annotations

import argparse
import base64
import io
import json
import os
import pickle
import subprocess
import sys
import time
import traceback
from contextlib import redirect_stderr, redirect_stdout
from typing import AbstractSet, Any, Callable, Final, List, Sequence, Tuple
from typing_extensions import TypeAlias as _TypeAlias

import mypy.build
import mypy.errors
import mypy.main
from mypy.dmypy_util import WriteToConn, receive, send
from mypy.find_sources import InvalidSourceList, create_source_list
from mypy.fscache import FileSystemCache
from mypy.fswatcher import FileData, FileSystemWatcher
from mypy.inspections import InspectionEngine
from mypy.ipc import IPCServer
from mypy.modulefinder import BuildSource, FindModuleCache, SearchPaths, compute_search_paths
from mypy.options import Options
from mypy.server.update import FineGrainedBuildManager, refresh_suppressed_submodules
from mypy.suggestions import SuggestionEngine, SuggestionFailure
from mypy.typestate import reset_global_state
from mypy.util import FancyFormatter, count_stats
from mypy.version import __version__

MEM_PROFILE: Final = False  # If True, dump memory profile after initialization

if sys.platform == "win32":
    from subprocess import STARTUPINFO

    def daemonize(
        options: Options, status_file: str, timeout: int | None = None, log_file: str | None = None
    ) -> int:
        """Create the daemon process via "dmypy daemon" and pass options via command line

        When creating the daemon grandchild, we create it in a new console, which is
        started hidden. We cannot use DETACHED_PROCESS since it will cause console windows
        to pop up when starting. See
        https://github.com/python/cpython/pull/4150#issuecomment-340215696
        for more on why we can't have nice things.

        It also pickles the options to be unpickled by mypy.
        """
        command = [sys.executable, "-m", "mypy.dmypy", "--status-file", status_file, "daemon"]
        pickled_options = pickle.dumps(options.snapshot())
        command.append(f'--options-data="{base64.b64encode(pickled_options).decode()}"')
        if timeout:
            command.append(f"--timeout={timeout}")
        if log_file:
            command.append(f"--log-file={log_file}")
        info = STARTUPINFO()
        info.dwFlags = 0x1  # STARTF_USESHOWWINDOW aka use wShowWindow's value
        info.wShowWindow = 0  # SW_HIDE aka make the window invisible
        try:
            subprocess.Popen(command, creationflags=0x10, startupinfo=info)  # CREATE_NEW_CONSOLE
            return 0
        except subprocess.CalledProcessError as e:
            return e.returncode

else:

    def _daemonize_cb(func: Callable[[], None], log_file: str | None = None) -> int:
        """Arrange to call func() in a grandchild of the current process.

        Return 0 for success, exit status for failure, negative if
        subprocess killed by signal.
        """
        # See https://stackoverflow.com/questions/473620/how-do-you-create-a-daemon-in-python
        sys.stdout.flush()
        sys.stderr.flush()
        pid = os.fork()
        if pid:
            # Parent process: wait for child in case things go bad there.
            npid, sts = os.waitpid(pid, 0)
            sig = sts & 0xFF
            if sig:
                print("Child killed by signal", sig)
                return -sig
            sts = sts >> 8
            if sts:
                print("Child exit status", sts)
            return sts
        # Child process: do a bunch of UNIX stuff and then fork a grandchild.
        try:
            os.setsid()  # Detach controlling terminal
            os.umask(0o27)
            devnull = os.open("/dev/null", os.O_RDWR)
            os.dup2(devnull, 0)
            os.dup2(devnull, 1)
            os.dup2(devnull, 2)
            os.close(devnull)
            pid = os.fork()
            if pid:
                # Child is done, exit to parent.
                os._exit(0)
            # Grandchild: run the server.
            if log_file:
                sys.stdout = sys.stderr = open(log_file, "a", buffering=1)
                fd = sys.stdout.fileno()
                os.dup2(fd, 2)
                os.dup2(fd, 1)
            func()
        finally:
            # Make sure we never get back into the caller.
            os._exit(1)

    def daemonize(
        options: Options, status_file: str, timeout: int | None = None, log_file: str | None = None
    ) -> int:
        """Run the mypy daemon in a grandchild of the current process

        Return 0 for success, exit status for failure, negative if
        subprocess killed by signal.
        """
        return _daemonize_cb(Server(options, status_file, timeout).serve, log_file)


# Server code.

CONNECTION_NAME: Final = "dmypy"


def process_start_options(flags: list[str], allow_sources: bool) -> Options:
    _, options = mypy.main.process_options(
        ["-i"] + flags, require_targets=False, server_options=True
    )
    if options.report_dirs:
        print("dmypy: Ignoring report generation settings. Start/restart cannot generate reports.")
    if options.junit_xml:
        print(
            "dmypy: Ignoring report generation settings. "
            "Start/restart does not support --junit-xml. Pass it to check/recheck instead"
        )
        options.junit_xml = None
    if not options.incremental:
        sys.exit("dmypy: start/restart should not disable incremental mode")
    if options.follow_imports not in ("skip", "error", "normal"):
        sys.exit("dmypy: follow-imports=silent not supported")
    return options


def ignore_suppressed_imports(module: str) -> bool:
    """Can we skip looking for newly unsuppressed imports to module?"""
    # Various submodules of 'encodings' can be suppressed, since it
    # uses module-level '__getattr__'. Skip them since there are many
    # of them, and following imports to them is kind of pointless.
    return module.startswith("encodings.")


ModulePathPair: _TypeAlias = Tuple[str, str]
ModulePathPairs: _TypeAlias = List[ModulePathPair]
ChangesAndRemovals: _TypeAlias = Tuple[ModulePathPairs, ModulePathPairs]


class Server:
    # NOTE: the instance is constructed in the parent process but
    # serve() is called in the grandchild (by daemonize()).

    def __init__(self, options: Options, status_file: str, timeout: int | None = None) -> None:
        """Initialize the server with the desired mypy flags."""
        self.options = options
        # Snapshot the options info before we muck with it, to detect changes
        self.options_snapshot = options.snapshot()
        self.timeout = timeout
        self.fine_grained_manager: FineGrainedBuildManager | None = None

        if os.path.isfile(status_file):
            os.unlink(status_file)

        self.fscache = FileSystemCache()

        options.raise_exceptions = True
        options.incremental = True
        options.fine_grained_incremental = True
        options.show_traceback = True
        if options.use_fine_grained_cache:
            # Using fine_grained_cache implies generating and caring
            # about the fine grained cache
            options.cache_fine_grained = True
        else:
            options.cache_dir = os.devnull
        # Fine-grained incremental doesn't support general partial types
        # (details in https://github.com/python/mypy/issues/4492)
        options.local_partial_types = True
        self.status_file = status_file

        # Since the object is created in the parent process we can check
        # the output terminal options here.
        self.formatter = FancyFormatter(sys.stdout, sys.stderr, options.hide_error_codes)

    def _response_metadata(self) -> dict[str, str]:
        py_version = f"{self.options.python_version[0]}_{self.options.python_version[1]}"
        return {"platform": self.options.platform, "python_version": py_version}

    def serve(self) -> None:
        """Serve requests, synchronously (no thread or fork)."""

        command = None
        server = IPCServer(CONNECTION_NAME, self.timeout)
        orig_stdout = sys.stdout
        orig_stderr = sys.stderr

        try:
            with open(self.status_file, "w") as f:
                json.dump({"pid": os.getpid(), "connection_name": server.connection_name}, f)
                f.write("\n")  # I like my JSON with a trailing newline
            while True:
                with server:
                    data = receive(server)
                    sys.stdout = WriteToConn(server, "stdout", sys.stdout.isatty())
                    sys.stderr = WriteToConn(server, "stderr", sys.stderr.isatty())
                    resp: dict[str, Any] = {}
                    if "command" not in data:
                        resp = {"error": "No command found in request"}
                    else:
                        command = data["command"]
                        if not isinstance(command, str):
                            resp = {"error": "Command is not a string"}
                        else:
                            command = data.pop("command")
                            try:
                                resp = self.run_command(command, data)
                            except Exception:
                                # If we are crashing, report the crash to the client
                                tb = traceback.format_exception(*sys.exc_info())
                                resp = {"error": "Daemon crashed!\n" + "".join(tb)}
                                resp.update(self._response_metadata())
                                resp["final"] = True
                                send(server, resp)
                                raise
                    resp["final"] = True
                    try:
                        resp.update(self._response_metadata())
                        send(server, resp)
                    except OSError:
                        pass  # Maybe the client hung up
                    if command == "stop":
                        reset_global_state()
                        sys.exit(0)
        finally:
            # Revert stdout/stderr so we can see any errors.
            sys.stdout = orig_stdout
            sys.stderr = orig_stderr

            # If the final command is something other than a clean
            # stop, remove the status file. (We can't just
            # simplify the logic and always remove the file, since
            # that could cause us to remove a future server's
            # status file.)
            if command != "stop":
                os.unlink(self.status_file)
            try:
                server.cleanup()  # try to remove the socket dir on Linux
            except OSError:
                pass
            exc_info = sys.exc_info()
            if exc_info[0] and exc_info[0] is not SystemExit:
                traceback.print_exception(*exc_info)

    def run_command(self, command: str, data: dict[str, object]) -> dict[str, object]:
        """Run a specific command from the registry."""
        key = "cmd_" + command
        method = getattr(self.__class__, key, None)
        if method is None:
            return {"error": f"Unrecognized command '{command}'"}
        else:
            if command not in {"check", "recheck", "run"}:
                # Only the above commands use some error formatting.
                del data["is_tty"]
                del data["terminal_width"]
            ret = method(self, **data)
            assert isinstance(ret, dict)
            return ret

    # Command functions (run in the server via RPC).

    def cmd_status(self, fswatcher_dump_file: str | None = None) -> dict[str, object]:
        """Return daemon status."""
        res: dict[str, object] = {}
        res.update(get_meminfo())
        if fswatcher_dump_file:
            data = self.fswatcher.dump_file_data() if hasattr(self, "fswatcher") else {}
            # Using .dumps and then writing was noticeably faster than using dump
            s = json.dumps(data)
            with open(fswatcher_dump_file, "w") as f:
                f.write(s)
        return res

    def cmd_stop(self) -> dict[str, object]:
        """Stop daemon."""
        # We need to remove the status file *before* we complete the
        # RPC. Otherwise a race condition exists where a subsequent
        # command can see a status file from a dying server and think
        # it is a live one.
        os.unlink(self.status_file)
        return {}

    def cmd_run(
        self,
        version: str,
        args: Sequence[str],
        export_types: bool,
        is_tty: bool,
        terminal_width: int,
    ) -> dict[str, object]:
        """Check a list of files, triggering a restart if needed."""
        stderr = io.StringIO()
        stdout = io.StringIO()
        try:
            # Process options can exit on improper arguments, so we need to catch that and
            # capture stderr so the client can report it
            with redirect_stderr(stderr):
                with redirect_stdout(stdout):
                    sources, options = mypy.main.process_options(
                        ["-i"] + list(args),
                        require_targets=True,
                        server_options=True,
                        fscache=self.fscache,
                        program="mypy-daemon",
                        header=argparse.SUPPRESS,
                    )
            # Signal that we need to restart if the options have changed
            if not options.compare_stable(self.options_snapshot):
                return {"restart": "configuration changed"}
            if __version__ != version:
                return {"restart": "mypy version changed"}
            if self.fine_grained_manager:
                manager = self.fine_grained_manager.manager
                start_plugins_snapshot = manager.plugins_snapshot
                _, current_plugins_snapshot = mypy.build.load_plugins(
                    options, manager.errors, sys.stdout, extra_plugins=()
                )
                if current_plugins_snapshot != start_plugins_snapshot:
                    return {"restart": "plugins changed"}
        except InvalidSourceList as err:
            return {"out": "", "err": str(err), "status": 2}
        except SystemExit as e:
            return {"out": stdout.getvalue(), "err": stderr.getvalue(), "status": e.code}
        return self.check(sources, export_types, is_tty, terminal_width)

    def cmd_check(
        self, files: Sequence[str], export_types: bool, is_tty: bool, terminal_width: int
    ) -> dict[str, object]:
        """Check a list of files."""
        try:
            sources = create_source_list(files, self.options, self.fscache)
        except InvalidSourceList as err:
            return {"out": "", "err": str(err), "status": 2}
        return self.check(sources, export_types, is_tty, terminal_width)

    def cmd_recheck(
        self,
        is_tty: bool,
        terminal_width: int,
        export_types: bool,
        remove: list[str] | None = None,
        update: list[str] | None = None,
    ) -> dict[str, object]:
        """Check the same list of files we checked most recently.

        If remove/update is given, they modify the previous list;
        if all are None, stat() is called for each file in the previous list.
        """
        t0 = time.time()
        if not self.fine_grained_manager:
            return {"error": "Command 'recheck' is only valid after a 'check' command"}
        sources = self.previous_sources
        if remove:
            removals = set(remove)
            sources = [s for s in sources if s.path and s.path not in removals]
        if update:
            # Sort list of file updates by extension, so *.pyi files are first.
            update.sort(key=lambda f: os.path.splitext(f)[1], reverse=True)

            known = {s.path for s in sources if s.path}
            added = [p for p in update if p not in known]
            try:
                added_sources = create_source_list(added, self.options, self.fscache)
            except InvalidSourceList as err:
                return {"out": "", "err": str(err), "status": 2}
            sources = sources + added_sources  # Make a copy!
        t1 = time.time()
        manager = self.fine_grained_manager.manager
        manager.log(f"fine-grained increment: cmd_recheck: {t1 - t0:.3f}s")
        old_export_types = self.options.export_types
        self.options.export_types = self.options.export_types or export_types
        if not self.following_imports():
            messages = self.fine_grained_increment(
                sources, remove, update, explicit_export_types=export_types
            )
        else:
            assert remove is None and update is None
            messages = self.fine_grained_increment_follow_imports(
                sources, explicit_export_types=export_types
            )
        res = self.increment_output(messages, sources, is_tty, terminal_width)
        self.flush_caches()
        self.update_stats(res)
        self.options.export_types = old_export_types
        return res

    def check(
        self, sources: list[BuildSource], export_types: bool, is_tty: bool, terminal_width: int
    ) -> dict[str, Any]:
        """Check using fine-grained incremental mode.

        If is_tty is True format the output nicely with colors and summary line
        (unless disabled in self.options). Also pass the terminal_width to formatter.
        """
        old_export_types = self.options.export_types
        self.options.export_types = self.options.export_types or export_types
        if not self.fine_grained_manager:
            res = self.initialize_fine_grained(sources, is_tty, terminal_width)
        else:
            if not self.following_imports():
                messages = self.fine_grained_increment(sources, explicit_export_types=export_types)
            else:
                messages = self.fine_grained_increment_follow_imports(
                    sources, explicit_export_types=export_types
                )
            res = self.increment_output(messages, sources, is_tty, terminal_width)
        self.flush_caches()
        self.update_stats(res)
        self.options.export_types = old_export_types
        return res

    def flush_caches(self) -> None:
        self.fscache.flush()
        if self.fine_grained_manager:
            self.fine_grained_manager.flush_cache()

    def update_stats(self, res: dict[str, Any]) -> None:
        if self.fine_grained_manager:
            manager = self.fine_grained_manager.manager
            manager.dump_stats()
            res["stats"] = manager.stats
            manager.stats = {}

    def following_imports(self) -> bool:
        """Are we following imports?"""
        # TODO: What about silent?
        return self.options.follow_imports == "normal"

    def initialize_fine_grained(
        self, sources: list[BuildSource], is_tty: bool, terminal_width: int
    ) -> dict[str, Any]:
        self.fswatcher = FileSystemWatcher(self.fscache)
        t0 = time.time()
        self.update_sources(sources)
        t1 = time.time()
        try:
            result = mypy.build.build(sources=sources, options=self.options, fscache=self.fscache)
        except mypy.errors.CompileError as e:
            output = "".join(s + "\n" for s in e.messages)
            if e.use_stdout:
                out, err = output, ""
            else:
                out, err = "", output
            return {"out": out, "err": err, "status": 2}
        messages = result.errors
        self.fine_grained_manager = FineGrainedBuildManager(result)

        original_sources_len = len(sources)
        if self.following_imports():
            sources = find_all_sources_in_build(self.fine_grained_manager.graph, sources)
            self.update_sources(sources)

        self.previous_sources = sources

        # If we are using the fine-grained cache, build hasn't actually done
        # the typechecking on the updated files yet.
        # Run a fine-grained update starting from the cached data
        if result.used_cache:
            t2 = time.time()
            # Pull times and hashes out of the saved_cache and stick them into
            # the fswatcher, so we pick up the changes.
            for state in self.fine_grained_manager.graph.values():
                meta = state.meta
                if meta is None:
                    continue
                assert state.path is not None
                self.fswatcher.set_file_data(
                    state.path,
                    FileData(st_mtime=float(meta.mtime), st_size=meta.size, hash=meta.hash),
                )

            changed, removed = self.find_changed(sources)
            changed += self.find_added_suppressed(
                self.fine_grained_manager.graph,
                set(),
                self.fine_grained_manager.manager.search_paths,
            )

            # Find anything that has had its dependency list change
            for state in self.fine_grained_manager.graph.values():
                if not state.is_fresh():
                    assert state.path is not None
                    changed.append((state.id, state.path))

            t3 = time.time()
            # Run an update
            messages = self.fine_grained_manager.update(changed, removed)

            if self.following_imports():
                # We need to do another update to any new files found by following imports.
                messages = self.fine_grained_increment_follow_imports(sources)

            t4 = time.time()
            self.fine_grained_manager.manager.add_stats(
                update_sources_time=t1 - t0,
                build_time=t2 - t1,
                find_changes_time=t3 - t2,
                fg_update_time=t4 - t3,
                files_changed=len(removed) + len(changed),
            )

        else:
            # Stores the initial state of sources as a side effect.
            self.fswatcher.find_changed()

        if MEM_PROFILE:
            from mypy.memprofile import print_memory_profile

            print_memory_profile(run_gc=False)

        __, n_notes, __ = count_stats(messages)
        status = 1 if messages and n_notes < len(messages) else 0
        # We use explicit sources length to match the logic in non-incremental mode.
        messages = self.pretty_messages(messages, original_sources_len, is_tty, terminal_width)
        return {"out": "".join(s + "\n" for s in messages), "err": "", "status": status}

    def fine_grained_increment(
        self,
        sources: list[BuildSource],
        remove: list[str] | None = None,
        update: list[str] | None = None,
        explicit_export_types: bool = False,
    ) -> list[str]:
        """Perform a fine-grained type checking increment.

        If remove and update are None, determine changed paths by using
        fswatcher. Otherwise, assume that only these files have changes.

        Args:
            sources: sources passed on the command line
            remove: paths of files that have been removed
            update: paths of files that have been changed or created
            explicit_export_types: --export-type was passed in a check command
              (as opposite to being set in dmypy start)
        """
        assert self.fine_grained_manager is not None
        manager = self.fine_grained_manager.manager

        t0 = time.time()
        if remove is None and update is None:
            # Use the fswatcher to determine which files were changed
            # (updated or added) or removed.
            self.update_sources(sources)
            changed, removed = self.find_changed(sources)
        else:
            # Use the remove/update lists to update fswatcher.
            # This avoids calling stat() for unchanged files.
            changed, removed = self.update_changed(sources, remove or [], update or [])
        if explicit_export_types:
            # If --export-types is given, we need to force full re-checking of all
            # explicitly passed files, since we need to visit each expression.
            add_all_sources_to_changed(sources, changed)
        changed += self.find_added_suppressed(
            self.fine_grained_manager.graph, set(), manager.search_paths
        )
        manager.search_paths = compute_search_paths(sources, manager.options, manager.data_dir)
        t1 = time.time()
        manager.log(f"fine-grained increment: find_changed: {t1 - t0:.3f}s")
        messages = self.fine_grained_manager.update(changed, removed)
        t2 = time.time()
        manager.log(f"fine-grained increment: update: {t2 - t1:.3f}s")
        manager.add_stats(
            find_changes_time=t1 - t0,
            fg_update_time=t2 - t1,
            files_changed=len(removed) + len(changed),
        )

        self.previous_sources = sources
        return messages

    def fine_grained_increment_follow_imports(
        self, sources: list[BuildSource], explicit_export_types: bool = False
    ) -> list[str]:
        """Like fine_grained_increment, but follow imports."""
        t0 = time.time()

        # TODO: Support file events

        assert self.fine_grained_manager is not None
        fine_grained_manager = self.fine_grained_manager
        graph = fine_grained_manager.graph
        manager = fine_grained_manager.manager

        orig_modules = list(graph.keys())

        self.update_sources(sources)
        changed_paths = self.fswatcher.find_changed()
        manager.search_paths = compute_search_paths(sources, manager.options, manager.data_dir)

        t1 = time.time()
        manager.log(f"fine-grained increment: find_changed: {t1 - t0:.3f}s")

        seen = {source.module for source in sources}

        # Find changed modules reachable from roots (or in roots) already in graph.
        changed, new_files = self.find_reachable_changed_modules(
            sources, graph, seen, changed_paths
        )
        # Same as in fine_grained_increment().
        self.add_explicitly_new(sources, changed)
        if explicit_export_types:
            # Same as in fine_grained_increment().
            add_all_sources_to_changed(sources, changed)
        sources.extend(new_files)

        # Process changes directly reachable from roots.
        messages = fine_grained_manager.update(changed, [], followed=True)

        # Follow deps from changed modules (still within graph).
        worklist = changed.copy()
        while worklist:
            module = worklist.pop()
            if module[0] not in graph:
                continue
            sources2 = self.direct_imports(module, graph)
            # Filter anything already seen before. This prevents
            # infinite looping if there are any self edges. (Self
            # edges are maybe a bug, but...)
            sources2 = [source for source in sources2 if source.module not in seen]
            changed, new_files = self.find_reachable_changed_modules(
                sources2, graph, seen, changed_paths
            )
            self.update_sources(new_files)
            messages = fine_grained_manager.update(changed, [], followed=True)
            worklist.extend(changed)

        t2 = time.time()

        def refresh_file(module: str, path: str) -> list[str]:
            return fine_grained_manager.update([(module, path)], [], followed=True)

        for module_id, state in list(graph.items()):
            new_messages = refresh_suppressed_submodules(
                module_id, state.path, fine_grained_manager.deps, graph, self.fscache, refresh_file
            )
            if new_messages is not None:
                messages = new_messages

        t3 = time.time()

        # There may be new files that became available, currently treated as
        # suppressed imports. Process them.
        while True:
            new_unsuppressed = self.find_added_suppressed(graph, seen, manager.search_paths)
            if not new_unsuppressed:
                break
            new_files = [BuildSource(mod[1], mod[0], followed=True) for mod in new_unsuppressed]
            sources.extend(new_files)
            self.update_sources(new_files)
            messages = fine_grained_manager.update(new_unsuppressed, [], followed=True)

            for module_id, path in new_unsuppressed:
                new_messages = refresh_suppressed_submodules(
                    module_id, path, fine_grained_manager.deps, graph, self.fscache, refresh_file
                )
                if new_messages is not None:
                    messages = new_messages

        t4 = time.time()

        # Find all original modules in graph that were not reached -- they are deleted.
        to_delete = []
        for module_id in orig_modules:
            if module_id not in graph:
                continue
            if module_id not in seen:
                module_path = graph[module_id].path
                assert module_path is not None
                to_delete.append((module_id, module_path))
        if to_delete:
            messages = fine_grained_manager.update([], to_delete)

        fix_module_deps(graph)

        self.previous_sources = find_all_sources_in_build(graph)
        self.update_sources(self.previous_sources)

        # Store current file state as side effect
        self.fswatcher.find_changed()

        t5 = time.time()

        manager.log(f"fine-grained increment: update: {t5 - t1:.3f}s")
        manager.add_stats(
            find_changes_time=t1 - t0,
            fg_update_time=t2 - t1,
            refresh_suppressed_time=t3 - t2,
            find_added_supressed_time=t4 - t3,
            cleanup_time=t5 - t4,
        )

        return messages

    def find_reachable_changed_modules(
        self,
        roots: list[BuildSource],
        graph: mypy.build.Graph,
        seen: set[str],
        changed_paths: AbstractSet[str],
    ) -> tuple[list[tuple[str, str]], list[BuildSource]]:
        """Follow imports within graph from given sources until hitting changed modules.

        If we find a changed module, we can't continue following imports as the imports
        may have changed.

        Args:
            roots: modules where to start search from
            graph: module graph to use for the search
            seen: modules we've seen before that won't be visited (mutated here!!)
            changed_paths: which paths have changed (stop search here and return any found)

        Return (encountered reachable changed modules,
                unchanged files not in sources_set traversed).
        """
        changed = []
        new_files = []
        worklist = roots.copy()
        seen.update(source.module for source in worklist)
        while worklist:
            nxt = worklist.pop()
            if nxt.module not in seen:
                seen.add(nxt.module)
                new_files.append(nxt)
            if nxt.path in changed_paths:
                assert nxt.path is not None  # TODO
                changed.append((nxt.module, nxt.path))
            elif nxt.module in graph:
                state = graph[nxt.module]
                for dep in state.dependencies:
                    if dep not in seen:
                        seen.add(dep)
                        worklist.append(BuildSource(graph[dep].path, graph[dep].id, followed=True))
        return changed, new_files

    def direct_imports(
        self, module: tuple[str, str], graph: mypy.build.Graph
    ) -> list[BuildSource]:
        """Return the direct imports of module not included in seen."""
        state = graph[module[0]]
        return [BuildSource(graph[dep].path, dep, followed=True) for dep in state.dependencies]

    def find_added_suppressed(
        self, graph: mypy.build.Graph, seen: set[str], search_paths: SearchPaths
    ) -> list[tuple[str, str]]:
        """Find suppressed modules that have been added (and not included in seen).

        Args:
            seen: reachable modules we've seen before (mutated here!!)

        Return suppressed, added modules.
        """
        all_suppressed = set()
        for state in graph.values():
            all_suppressed |= state.suppressed_set

        # Filter out things that shouldn't actually be considered suppressed.
        #
        # TODO: Figure out why these are treated as suppressed
        all_suppressed = {
            module
            for module in all_suppressed
            if module not in graph and not ignore_suppressed_imports(module)
        }

        # Optimization: skip top-level packages that are obviously not
        # there, to avoid calling the relatively slow find_module()
        # below too many times.
        packages = {module.split(".", 1)[0] for module in all_suppressed}
        packages = filter_out_missing_top_level_packages(packages, search_paths, self.fscache)

        # TODO: Namespace packages

        finder = FindModuleCache(search_paths, self.fscache, self.options)

        found = []

        for module in all_suppressed:
            top_level_pkg = module.split(".", 1)[0]
            if top_level_pkg not in packages:
                # Fast path: non-existent top-level package
                continue
            result = finder.find_module(module, fast_path=True)
            if isinstance(result, str) and module not in seen:
                # When not following imports, we only follow imports to .pyi files.
                if not self.following_imports() and not result.endswith(".pyi"):
                    continue
                found.append((module, result))
                seen.add(module)

        return found

    def increment_output(
        self, messages: list[str], sources: list[BuildSource], is_tty: bool, terminal_width: int
    ) -> dict[str, Any]:
        status = 1 if messages else 0
        messages = self.pretty_messages(messages, len(sources), is_tty, terminal_width)
        return {"out": "".join(s + "\n" for s in messages), "err": "", "status": status}

    def pretty_messages(
        self,
        messages: list[str],
        n_sources: int,
        is_tty: bool = False,
        terminal_width: int | None = None,
    ) -> list[str]:
        use_color = self.options.color_output and is_tty
        fit_width = self.options.pretty and is_tty
        if fit_width:
            messages = self.formatter.fit_in_terminal(
                messages, fixed_terminal_width=terminal_width
            )
        if self.options.error_summary:
            summary: str | None = None
            n_errors, n_notes, n_files = count_stats(messages)
            if n_errors:
                summary = self.formatter.format_error(
                    n_errors, n_files, n_sources, use_color=use_color
                )
            elif not messages or n_notes == len(messages):
                summary = self.formatter.format_success(n_sources, use_color)
            if summary:
                # Create new list to avoid appending multiple summaries on successive runs.
                messages = messages + [summary]
        if use_color:
            messages = [self.formatter.colorize(m) for m in messages]
        return messages

    def update_sources(self, sources: list[BuildSource]) -> None:
        paths = [source.path for source in sources if source.path is not None]
        if self.following_imports():
            # Filter out directories (used for namespace packages).
            paths = [path for path in paths if self.fscache.isfile(path)]
        self.fswatcher.add_watched_paths(paths)

    def update_changed(
        self, sources: list[BuildSource], remove: list[str], update: list[str]
    ) -> ChangesAndRemovals:
        changed_paths = self.fswatcher.update_changed(remove, update)
        return self._find_changed(sources, changed_paths)

    def find_changed(self, sources: list[BuildSource]) -> ChangesAndRemovals:
        changed_paths = self.fswatcher.find_changed()
        return self._find_changed(sources, changed_paths)

    def _find_changed(
        self, sources: list[BuildSource], changed_paths: AbstractSet[str]
    ) -> ChangesAndRemovals:
        # Find anything that has been added or modified
        changed = [
            (source.module, source.path)
            for source in sources
            if source.path and source.path in changed_paths
        ]

        # Now find anything that has been removed from the build
        modules = {source.module for source in sources}
        omitted = [source for source in self.previous_sources if source.module not in modules]
        removed = []
        for source in omitted:
            path = source.path
            assert path
            removed.append((source.module, path))

        self.add_explicitly_new(sources, changed)

        # Find anything that has had its module path change because of added or removed __init__s
        last = {s.path: s.module for s in self.previous_sources}
        for s in sources:
            assert s.path
            if s.path in last and last[s.path] != s.module:
                # Mark it as removed from its old name and changed at its new name
                removed.append((last[s.path], s.path))
                changed.append((s.module, s.path))

        return changed, removed

    def add_explicitly_new(
        self, sources: list[BuildSource], changed: list[tuple[str, str]]
    ) -> None:
        # Always add modules that were (re-)added, since they may be detected as not changed by
        # fswatcher (if they were actually not changed), but they may still need to be checked
        # in case they had errors before they were deleted from sources on previous runs.
        previous_modules = {source.module for source in self.previous_sources}
        changed_set = set(changed)
        changed.extend(
            [
                (source.module, source.path)
                for source in sources
                if source.path
                and source.module not in previous_modules
                and (source.module, source.path) not in changed_set
            ]
        )

    def cmd_inspect(
        self,
        show: str,
        location: str,
        verbosity: int = 0,
        limit: int = 0,
        include_span: bool = False,
        include_kind: bool = False,
        include_object_attrs: bool = False,
        union_attrs: bool = False,
        force_reload: bool = False,
    ) -> dict[str, object]:
        """Locate and inspect expression(s)."""
        if not self.fine_grained_manager:
            return {
                "error": 'Command "inspect" is only valid after a "check" command'
                " (that produces no parse errors)"
            }
        engine = InspectionEngine(
            self.fine_grained_manager,
            verbosity=verbosity,
            limit=limit,
            include_span=include_span,
            include_kind=include_kind,
            include_object_attrs=include_object_attrs,
            union_attrs=union_attrs,
            force_reload=force_reload,
        )
        old_inspections = self.options.inspections
        self.options.inspections = True
        try:
            if show == "type":
                result = engine.get_type(location)
            elif show == "attrs":
                result = engine.get_attrs(location)
            elif show == "definition":
                result = engine.get_definition(location)
            else:
                assert False, "Unknown inspection kind"
        finally:
            self.options.inspections = old_inspections
        if "out" in result:
            assert isinstance(result["out"], str)
            result["out"] += "\n"
        return result

    def cmd_suggest(self, function: str, callsites: bool, **kwargs: Any) -> dict[str, object]:
        """Suggest a signature for a function."""
        if not self.fine_grained_manager:
            return {
                "error": "Command 'suggest' is only valid after a 'check' command"
                " (that produces no parse errors)"
            }
        engine = SuggestionEngine(self.fine_grained_manager, **kwargs)
        try:
            if callsites:
                out = engine.suggest_callsites(function)
            else:
                out = engine.suggest(function)
        except SuggestionFailure as err:
            return {"error": str(err)}
        else:
            if not out:
                out = "No suggestions\n"
            elif not out.endswith("\n"):
                out += "\n"
            return {"out": out, "err": "", "status": 0}
        finally:
            self.flush_caches()

    def cmd_hang(self) -> dict[str, object]:
        """Hang for 100 seconds, as a debug hack."""
        time.sleep(100)
        return {}


# Misc utilities.


MiB: Final = 2**20


def get_meminfo() -> dict[str, Any]:
    res: dict[str, Any] = {}
    try:
        import psutil
    except ImportError:
        res["memory_psutil_missing"] = (
            "psutil not found, run pip install mypy[dmypy] "
            "to install the needed components for dmypy"
        )
    else:
        process = psutil.Process()
        meminfo = process.memory_info()
        res["memory_rss_mib"] = meminfo.rss / MiB
        res["memory_vms_mib"] = meminfo.vms / MiB
        if sys.platform == "win32":
            res["memory_maxrss_mib"] = meminfo.peak_wset / MiB
        else:
            # See https://stackoverflow.com/questions/938733/total-memory-used-by-python-process
            import resource  # Since it doesn't exist on Windows.

            rusage = resource.getrusage(resource.RUSAGE_SELF)
            if sys.platform == "darwin":
                factor = 1
            else:
                factor = 1024  # Linux
            res["memory_maxrss_mib"] = rusage.ru_maxrss * factor / MiB
    return res


def find_all_sources_in_build(
    graph: mypy.build.Graph, extra: Sequence[BuildSource] = ()
) -> list[BuildSource]:
    result = list(extra)
    seen = {source.module for source in result}
    for module, state in graph.items():
        if module not in seen:
            result.append(BuildSource(state.path, module))
    return result


def add_all_sources_to_changed(sources: list[BuildSource], changed: list[tuple[str, str]]) -> None:
    """Add all (explicit) sources to the list changed files in place.

    Use this when re-processing of unchanged files is needed (e.g. for
    the purpose of exporting types for inspections).
    """
    changed_set = set(changed)
    changed.extend(
        [
            (bs.module, bs.path)
            for bs in sources
            if bs.path and (bs.module, bs.path) not in changed_set
        ]
    )


def fix_module_deps(graph: mypy.build.Graph) -> None:
    """After an incremental update, update module dependencies to reflect the new state.

    This can make some suppressed dependencies non-suppressed, and vice versa (if modules
    have been added to or removed from the build).
    """
    for state in graph.values():
        new_suppressed = []
        new_dependencies = []
        for dep in state.dependencies + state.suppressed:
            if dep in graph:
                new_dependencies.append(dep)
            else:
                new_suppressed.append(dep)
        state.dependencies = new_dependencies
        state.dependencies_set = set(new_dependencies)
        state.suppressed = new_suppressed
        state.suppressed_set = set(new_suppressed)


def filter_out_missing_top_level_packages(
    packages: set[str], search_paths: SearchPaths, fscache: FileSystemCache
) -> set[str]:
    """Quickly filter out obviously missing top-level packages.

    Return packages with entries that can't be found removed.

    This is approximate: some packages that aren't actually valid may be
    included. However, all potentially valid packages must be returned.
    """
    # Start with a empty set and add all potential top-level packages.
    found = set()
    paths = (
        search_paths.python_path
        + search_paths.mypy_path
        + search_paths.package_path
        + search_paths.typeshed_path
    )
    for p in paths:
        try:
            entries = fscache.listdir(p)
        except Exception:
            entries = []
        for entry in entries:
            # The code is hand-optimized for mypyc since this may be somewhat
            # performance-critical.
            if entry.endswith(".py"):
                entry = entry[:-3]
            elif entry.endswith(".pyi"):
                entry = entry[:-4]
            elif entry.endswith("-stubs"):
                # Possible PEP 561 stub package
                entry = entry[:-6]
            if entry in packages:
                found.add(entry)
    return found
