from __future__ import annotations

from urllib.parse import urljoin

from tlz import memoize
from tornado import web
from tornado.ioloop import IOLoop

from distributed.dashboard.components.nvml import (
    gpu_doc,
    gpu_memory_doc,
    gpu_utilization_doc,
)
from distributed.dashboard.components.rmm import rmm_memory_doc
from distributed.dashboard.components.scheduler import (
    AggregateAction,
    BandwidthTypes,
    BandwidthWorkers,
    ClusterMemory,
    ComputePerKey,
    Contention,
    CurrentLoad,
    ExceptionsTable,
    FinePerformanceMetrics,
    MemoryByKey,
    Occupancy,
    SystemMonitor,
    SystemTimeseries,
    TaskGraph,
    TaskGroupGraph,
    TaskGroupProgress,
    TaskProgress,
    TaskStream,
    WorkerNetworkBandwidth,
    WorkersMemory,
    WorkersTransferBytes,
    WorkerTable,
    events_doc,
    exceptions_doc,
    graph_doc,
    hardware_doc,
    individual_doc,
    individual_profile_doc,
    individual_profile_server_doc,
    profile_doc,
    profile_server_doc,
    shuffling_doc,
    status_doc,
    stealing_doc,
    systemmonitor_doc,
    tasks_doc,
    tg_graph_doc,
    workers_doc,
)
from distributed.dashboard.core import BokehApplication
from distributed.dashboard.worker import counters_doc

applications = {
    "/system": systemmonitor_doc,
    "/shuffle": shuffling_doc,
    "/stealing": stealing_doc,
    "/workers": workers_doc,
    "/exceptions": exceptions_doc,
    "/events": events_doc,
    "/counters": counters_doc,
    "/tasks": tasks_doc,
    "/status": status_doc,
    "/profile": profile_doc,
    "/profile-server": profile_server_doc,
    "/graph": graph_doc,
    "/hardware": hardware_doc,
    "/groups": tg_graph_doc,
    "/gpu": gpu_doc,
    "/individual-task-stream": individual_doc(
        TaskStream, 100, n_rectangles=1000, clear_interval="10s"
    ),
    "/individual-progress": individual_doc(TaskProgress, 100, height=160),
    "/individual-graph": individual_doc(TaskGraph, 200),
    "/individual-groups": individual_doc(TaskGroupGraph, 200),
    "/individual-group-progress": individual_doc(TaskGroupProgress, 200),
    "/individual-workers-memory": individual_doc(WorkersMemory, 100),
    "/individual-cluster-memory": individual_doc(ClusterMemory, 100),
    "/individual-workers-transfer-bytes": individual_doc(WorkersTransferBytes, 100),
    "/individual-cpu": individual_doc(CurrentLoad, 100, fig_attr="cpu_figure"),
    "/individual-nprocessing": individual_doc(
        CurrentLoad, 100, fig_attr="processing_figure"
    ),
    "/individual-occupancy": individual_doc(Occupancy, 100),
    "/individual-workers": individual_doc(WorkerTable, 500),
    "/individual-exceptions": individual_doc(ExceptionsTable, 1000),
    "/individual-bandwidth-types": individual_doc(BandwidthTypes, 500),
    "/individual-bandwidth-workers": individual_doc(BandwidthWorkers, 500),
    "/individual-workers-network": individual_doc(
        WorkerNetworkBandwidth, 500, fig_attr="bandwidth"
    ),
    "/individual-workers-disk": individual_doc(
        WorkerNetworkBandwidth, 500, fig_attr="disk"
    ),
    "/individual-workers-network-timeseries": individual_doc(
        SystemTimeseries, 500, fig_attr="bandwidth"
    ),
    "/individual-workers-cpu-timeseries": individual_doc(
        SystemTimeseries, 500, fig_attr="cpu"
    ),
    "/individual-workers-memory-timeseries": individual_doc(
        SystemTimeseries, 500, fig_attr="memory"
    ),
    "/individual-workers-disk-timeseries": individual_doc(
        SystemTimeseries, 500, fig_attr="disk"
    ),
    "/individual-memory-by-key": individual_doc(MemoryByKey, 500),
    "/individual-compute-time-per-key": individual_doc(ComputePerKey, 500),
    "/individual-aggregate-time-per-action": individual_doc(AggregateAction, 500),
    "/individual-scheduler-system": individual_doc(SystemMonitor, 500),
    "/individual-contention": individual_doc(Contention, 500),
    "/individual-fine-performance-metrics": individual_doc(FinePerformanceMetrics, 500),
    "/individual-profile": individual_profile_doc,
    "/individual-profile-server": individual_profile_server_doc,
    "/individual-gpu-memory": gpu_memory_doc,
    "/individual-gpu-utilization": gpu_utilization_doc,
    "/individual-rmm-memory": rmm_memory_doc,
}


@memoize
def template_variables(scheduler):
    from distributed.diagnostics.nvml import device_get_count

    template_variables = {
        "pages": [
            "status",
            "workers",
            "tasks",
            "system",
            *(["gpu"] if device_get_count() > 0 else []),
            "profile",
            "graph",
            "groups",
            "info",
        ],
        "plots": [
            {
                "url": x.strip("/"),
                "name": " ".join(x.strip("/").split("-")[1:])
                .title()
                .replace("Cpu", "CPU")
                .replace("Gpu", "GPU")
                .replace("Rmm", "RMM"),
            }
            for x in applications
            if "individual" in x
        ]
        + [
            {"url": "hardware", "name": "Hardware"},
            {"url": "shuffle", "name": "Shuffle"},
        ],
        "jupyter": scheduler.jupyter,
    }
    template_variables["plots"] = sorted(
        template_variables["plots"], key=lambda d: d["name"]
    )
    return template_variables


def connect(application, http_server, scheduler, prefix=""):
    bokeh_app = BokehApplication(
        applications,
        scheduler,
        prefix=prefix,
        template_variables=template_variables(scheduler),
    )
    application.add_application(bokeh_app)
    bokeh_app.initialize(IOLoop.current())

    bokeh_app.add_handlers(
        r".*",
        [
            (
                r"/",
                web.RedirectHandler,
                {"url": urljoin((prefix or "").strip("/") + "/", r"status")},
            )
        ],
    )

    bokeh_app.start()
