# -----------------------------------------------------------------------------
# Copyright (c) 2012 - 2018, Anaconda, Inc. and Intake contributors
# All rights reserved.
#
# The full license is in the LICENSE file, distributed with this software.
# -----------------------------------------------------------------------------

import itertools

from .base import RemoteSource, Schema, get_partition


class RemoteArray(RemoteSource):
    """nd-array on an Intake server"""

    name = "remote_ndarray"
    container = "ndarray"

    def __init__(self, url, headers, **kwargs):
        super(RemoteArray, self).__init__(url, headers, **kwargs)
        self.npartitions = kwargs["npartitions"]
        self.shape = tuple(kwargs["shape"])
        self.metadata = kwargs["metadata"]
        self.dtype = kwargs["dtype"]
        self.chunks = tuple(tuple(c) for c in tuple(kwargs.get("chunks", (-1,))))
        self.arr = None
        self._schema = Schema(npartitions=self.npartitions, extra_metadata=self.metadata, dtype=self.dtype, shape=self.shape, chunks=self.chunks)

    def _load_metadata(self):
        import dask.array as da

        if self.arr is None:
            name = "remote-array-" + self._source_id
            nparts = (range(len(n)) for n in self.chunks)
            dask = {(name,) + part: (get_partition, self.url, self.headers, self._source_id, self.container, part) for part in itertools.product(*nparts)}
            self.arr = da.Array(dask, name=name, chunks=self.chunks, dtype=self.dtype, shape=self.shape)

        return self._schema

    def _get_partition(self, i):
        self._load_metadata()
        return self.arr.blocks[i].compute()

    def read_partition(self, i):
        self._load_metadata()
        return self._get_partition(i)

    def read(self):
        self._load_metadata()
        return self.arr.compute()

    def to_dask(self):
        self._load_metadata()
        return self.arr

    def _close(self):
        self.arr = None

    @staticmethod
    def _persist(source, path, component=None, storage_options=None, **kwargs):
        """Save array to local persistent store

        Makes a parquet dataset out of the data using zarr.
        This then becomes a data entry in the persisted datasets catalog.
        Only works locally for the moment.

        Parameters
        ----------
        source: a DataSource instance to save
        name: str or None
            Key to refer to this persisted dataset by. If not given, will
            attempt to get from the source's name
        kwargs: passed on to zarr array creation, see
        """
        try:
            arr = source.to_dask()
        except NotImplementedError:
            arr = source.read()
        return RemoteArray._data_to_source(arr, path, component=None, storage_options=None, **kwargs)

    @staticmethod
    def _data_to_source(arr, path, component=None, storage_options=None, **kwargs):
        from dask.array import from_array, to_zarr
        from dask.utils import is_arraylike

        from ..source.zarr import ZarrArraySource

        if not is_arraylike(arr):
            raise NotImplementedError
        if not hasattr(arr, "npartitions"):
            arr = from_array(arr, chunks="auto")
        to_zarr(arr, path, component=None, storage_options=storage_options, **kwargs)
        source = ZarrArraySource(path, storage_options, component)
        return source
