from toolz import curried
import uuid
from weakref import WeakValueDictionary

from typing import Union, Dict, Set, MutableMapping

from typing import TypedDict, Final

from altair.utils._importers import import_vegafusion
from altair.utils.core import _DataFrameLike
from altair.utils.data import _DataType, _ToValuesReturnType, MaxRowsError
from altair.vegalite.data import default_data_transformer

# Temporary storage for dataframes that have been extracted
# from charts by the vegafusion data transformer. Use a WeakValueDictionary
# rather than a dict so that the Python interpreter is free to garbage
# collect the stored DataFrames.
extracted_inline_tables: MutableMapping[str, _DataFrameLike] = WeakValueDictionary()

# Special URL prefix that VegaFusion uses to denote that a
# dataset in a Vega spec corresponds to an entry in the `inline_datasets`
# kwarg of vf.runtime.pre_transform_spec().
VEGAFUSION_PREFIX: Final = "vegafusion+dataset://"


class _ToVegaFusionReturnUrlDict(TypedDict):
    url: str


@curried.curry
def vegafusion_data_transformer(
    data: _DataType, max_rows: int = 100000
) -> Union[_ToVegaFusionReturnUrlDict, _ToValuesReturnType]:
    """VegaFusion Data Transformer"""
    if hasattr(data, "__geo_interface__"):
        # Use default transformer for geo interface objects
        # # (e.g. a geopandas GeoDataFrame)
        return default_data_transformer(data)
    elif hasattr(data, "__dataframe__"):
        table_name = f"table_{uuid.uuid4()}".replace("-", "_")
        extracted_inline_tables[table_name] = data
        return {"url": VEGAFUSION_PREFIX + table_name}
    else:
        # Use default transformer if we don't recognize data type
        return default_data_transformer(data)


def get_inline_table_names(vega_spec: dict) -> Set[str]:
    """Get a set of the inline datasets names in the provided Vega spec

    Inline datasets are encoded as URLs that start with the table://
    prefix.

    Parameters
    ----------
    vega_spec: dict
        A Vega specification dict

    Returns
    -------
    set of str
        Set of the names of the inline datasets that are referenced
        in the specification.

    Examples
    --------
    >>> spec = {
    ...     "data": [
    ...         {
    ...             "name": "foo",
    ...             "url": "https://path/to/file.csv"
    ...         },
    ...         {
    ...             "name": "bar",
    ...             "url": "vegafusion+dataset://inline_dataset_123"
    ...         }
    ...     ]
    ... }
    >>> get_inline_table_names(spec)
    {'inline_dataset_123'}
    """
    table_names = set()

    # Process datasets
    for data in vega_spec.get("data", []):
        url = data.get("url", "")
        if url.startswith(VEGAFUSION_PREFIX):
            name = url[len(VEGAFUSION_PREFIX) :]
            table_names.add(name)

    # Recursively process child marks, which may have their own datasets
    for mark in vega_spec.get("marks", []):
        table_names.update(get_inline_table_names(mark))

    return table_names


def get_inline_tables(vega_spec: dict) -> Dict[str, _DataFrameLike]:
    """Get the inline tables referenced by a Vega specification

    Note: This function should only be called on a Vega spec that corresponds
    to a chart that was processed by the vegafusion_data_transformer.
    Furthermore, this function may only be called once per spec because
    the returned dataframes are deleted from internal storage.

    Parameters
    ----------
    vega_spec: dict
        A Vega specification dict

    Returns
    -------
    dict from str to dataframe
        dict from inline dataset name to dataframe object
    """
    table_names = get_inline_table_names(vega_spec)
    tables = {}
    for table_name in table_names:
        try:
            tables[table_name] = extracted_inline_tables.pop(table_name)
        except KeyError:
            # named dataset that was provided by the user
            pass
    return tables


def compile_with_vegafusion(vegalite_spec: dict) -> dict:
    """Compile a Vega-Lite spec to Vega and pre-transform with VegaFusion

    Note: This function should only be called on a Vega-Lite spec
    that was generated with the "vegafusion" data transformer enabled.
    In particular, this spec may contain references to extract datasets
    using table:// prefixed URLs.

    Parameters
    ----------
    vegalite_spec: dict
        A Vega-Lite spec that was generated from an Altair chart with
        the "vegafusion" data transformer enabled

    Returns
    -------
    dict
        A Vega spec that has been pre-transformed by VegaFusion
    """
    # Local import to avoid circular ImportError
    from altair import vegalite_compilers, data_transformers

    vf = import_vegafusion()

    # Compile Vega-Lite spec to Vega
    compiler = vegalite_compilers.get()
    if compiler is None:
        raise ValueError("No active vega-lite compiler plugin found")

    vega_spec = compiler(vegalite_spec)

    # Retrieve dict of inline tables referenced by the spec
    inline_tables = get_inline_tables(vega_spec)

    # Pre-evaluate transforms in vega spec with vegafusion
    row_limit = data_transformers.options.get("max_rows", None)
    transformed_vega_spec, warnings = vf.runtime.pre_transform_spec(
        vega_spec,
        vf.get_local_tz(),
        inline_datasets=inline_tables,
        row_limit=row_limit,
    )

    # Check from row limit warning and convert to MaxRowsError
    for warning in warnings:
        if warning.get("type") == "RowLimitExceeded":
            raise MaxRowsError(
                "The number of dataset rows after filtering and aggregation exceeds\n"
                f"the current limit of {row_limit}. Try adding an aggregation to reduce\n"
                "the size of the dataset that must be loaded into the browser. Or, disable\n"
                "the limit by calling alt.data_transformers.disable_max_rows(). Note that\n"
                "disabling this limit may cause the browser to freeze or crash."
            )

    return transformed_vega_spec


def using_vegafusion() -> bool:
    """Check whether the vegafusion data transfomer is enabled"""
    # Local import to avoid circular ImportError
    from altair import data_transformers

    return data_transformers.active == "vegafusion"
