import asyncio
import logging
from concurrent.futures import Executor, ProcessPoolExecutor
from datetime import datetime, timezone
from functools import partial
from multiprocessing import freeze_support
from typing import Set, Tuple

try:
    from aiohttp import web

    from .middlewares import cors
except ImportError as ie:
    raise ImportError(
        f"aiohttp dependency is not installed: {ie}. "
        + "Please re-install black with the '[d]' extra install "
        + "to obtain aiohttp_cors: `pip install black[d]`"
    ) from None

import click

import black
from _black_version import version as __version__
from black.concurrency import maybe_install_uvloop

# This is used internally by tests to shut down the server prematurely
_stop_signal = asyncio.Event()

# Request headers
PROTOCOL_VERSION_HEADER = "X-Protocol-Version"
LINE_LENGTH_HEADER = "X-Line-Length"
PYTHON_VARIANT_HEADER = "X-Python-Variant"
SKIP_SOURCE_FIRST_LINE = "X-Skip-Source-First-Line"
SKIP_STRING_NORMALIZATION_HEADER = "X-Skip-String-Normalization"
SKIP_MAGIC_TRAILING_COMMA = "X-Skip-Magic-Trailing-Comma"
PREVIEW = "X-Preview"
FAST_OR_SAFE_HEADER = "X-Fast-Or-Safe"
DIFF_HEADER = "X-Diff"

BLACK_HEADERS = [
    PROTOCOL_VERSION_HEADER,
    LINE_LENGTH_HEADER,
    PYTHON_VARIANT_HEADER,
    SKIP_SOURCE_FIRST_LINE,
    SKIP_STRING_NORMALIZATION_HEADER,
    SKIP_MAGIC_TRAILING_COMMA,
    PREVIEW,
    FAST_OR_SAFE_HEADER,
    DIFF_HEADER,
]

# Response headers
BLACK_VERSION_HEADER = "X-Black-Version"


class InvalidVariantHeader(Exception):
    pass


@click.command(context_settings={"help_option_names": ["-h", "--help"]})
@click.option(
    "--bind-host",
    type=str,
    help="Address to bind the server to.",
    default="localhost",
    show_default=True,
)
@click.option(
    "--bind-port", type=int, help="Port to listen on", default=45484, show_default=True
)
@click.version_option(version=black.__version__)
def main(bind_host: str, bind_port: int) -> None:
    logging.basicConfig(level=logging.INFO)
    app = make_app()
    ver = black.__version__
    black.out(f"blackd version {ver} listening on {bind_host} port {bind_port}")
    # TODO: aiohttp had an incorrect annotation for `print` argument,
    #  It'll be fixed once aiohttp releases that code
    web.run_app(app, host=bind_host, port=bind_port, handle_signals=True, print=None)  # type: ignore[arg-type]


def make_app() -> web.Application:
    app = web.Application(
        middlewares=[cors(allow_headers=(*BLACK_HEADERS, "Content-Type"))]
    )
    executor = ProcessPoolExecutor()
    app.add_routes([web.post("/", partial(handle, executor=executor))])
    return app


async def handle(request: web.Request, executor: Executor) -> web.Response:
    headers = {BLACK_VERSION_HEADER: __version__}
    try:
        if request.headers.get(PROTOCOL_VERSION_HEADER, "1") != "1":
            return web.Response(
                status=501, text="This server only supports protocol version 1"
            )
        try:
            line_length = int(
                request.headers.get(LINE_LENGTH_HEADER, black.DEFAULT_LINE_LENGTH)
            )
        except ValueError:
            return web.Response(status=400, text="Invalid line length header value")

        if PYTHON_VARIANT_HEADER in request.headers:
            value = request.headers[PYTHON_VARIANT_HEADER]
            try:
                pyi, versions = parse_python_variant_header(value)
            except InvalidVariantHeader as e:
                return web.Response(
                    status=400,
                    text=f"Invalid value for {PYTHON_VARIANT_HEADER}: {e.args[0]}",
                )
        else:
            pyi = False
            versions = set()

        skip_string_normalization = bool(
            request.headers.get(SKIP_STRING_NORMALIZATION_HEADER, False)
        )
        skip_magic_trailing_comma = bool(
            request.headers.get(SKIP_MAGIC_TRAILING_COMMA, False)
        )
        skip_source_first_line = bool(
            request.headers.get(SKIP_SOURCE_FIRST_LINE, False)
        )
        preview = bool(request.headers.get(PREVIEW, False))
        fast = False
        if request.headers.get(FAST_OR_SAFE_HEADER, "safe") == "fast":
            fast = True
        mode = black.FileMode(
            target_versions=versions,
            is_pyi=pyi,
            line_length=line_length,
            skip_source_first_line=skip_source_first_line,
            string_normalization=not skip_string_normalization,
            magic_trailing_comma=not skip_magic_trailing_comma,
            preview=preview,
        )
        req_bytes = await request.content.read()
        charset = request.charset if request.charset is not None else "utf8"
        req_str = req_bytes.decode(charset)
        then = datetime.now(timezone.utc)

        header = ""
        if skip_source_first_line:
            first_newline_position: int = req_str.find("\n") + 1
            header = req_str[:first_newline_position]
            req_str = req_str[first_newline_position:]

        loop = asyncio.get_event_loop()
        formatted_str = await loop.run_in_executor(
            executor, partial(black.format_file_contents, req_str, fast=fast, mode=mode)
        )

        # Preserve CRLF line endings
        nl = req_str.find("\n")
        if nl > 0 and req_str[nl - 1] == "\r":
            formatted_str = formatted_str.replace("\n", "\r\n")
            # If, after swapping line endings, nothing changed, then say so
            if formatted_str == req_str:
                raise black.NothingChanged

        # Put the source first line back
        req_str = header + req_str
        formatted_str = header + formatted_str

        # Only output the diff in the HTTP response
        only_diff = bool(request.headers.get(DIFF_HEADER, False))
        if only_diff:
            now = datetime.now(timezone.utc)
            src_name = f"In\t{then}"
            dst_name = f"Out\t{now}"
            loop = asyncio.get_event_loop()
            formatted_str = await loop.run_in_executor(
                executor,
                partial(black.diff, req_str, formatted_str, src_name, dst_name),
            )

        return web.Response(
            content_type=request.content_type,
            charset=charset,
            headers=headers,
            text=formatted_str,
        )
    except black.NothingChanged:
        return web.Response(status=204, headers=headers)
    except black.InvalidInput as e:
        return web.Response(status=400, headers=headers, text=str(e))
    except Exception as e:
        logging.exception("Exception during handling a request")
        return web.Response(status=500, headers=headers, text=str(e))


def parse_python_variant_header(value: str) -> Tuple[bool, Set[black.TargetVersion]]:
    if value == "pyi":
        return True, set()
    else:
        versions = set()
        for version in value.split(","):
            if version.startswith("py"):
                version = version[len("py") :]
            if "." in version:
                major_str, *rest = version.split(".")
            else:
                major_str = version[0]
                rest = [version[1:]] if len(version) > 1 else []
            try:
                major = int(major_str)
                if major not in (2, 3):
                    raise InvalidVariantHeader("major version must be 2 or 3")
                if len(rest) > 0:
                    minor = int(rest[0])
                    if major == 2:
                        raise InvalidVariantHeader("Python 2 is not supported")
                else:
                    # Default to lowest supported minor version.
                    minor = 7 if major == 2 else 3
                version_str = f"PY{major}{minor}"
                if major == 3 and not hasattr(black.TargetVersion, version_str):
                    raise InvalidVariantHeader(f"3.{minor} is not supported")
                versions.add(black.TargetVersion[version_str])
            except (KeyError, ValueError):
                raise InvalidVariantHeader("expected e.g. '3.7', 'py3.5'") from None
        return False, versions


def patched_main() -> None:
    maybe_install_uvloop()
    freeze_support()
    main()


if __name__ == "__main__":
    patched_main()
