# Repository: https://gitlab.com/quantify-os/quantify-core
# Licensed according to the LICENCE file on the main branch
# pylint: disable=too-many-lines
"""Utilities for handling data."""
from __future__ import annotations
import datetime
import json
import os
import sys
from copy import deepcopy
from pathlib import Path
from typing import TYPE_CHECKING, Any, List
from uuid import uuid4
import numpy as np
import xarray as xr
from dateutil.parser import parse
from qcodes.instrument import (
    ChannelTuple,
    Instrument,
    InstrumentBase,
    InstrumentModule,
)
import quantify_core.data.dataset_adapters as da
from quantify_core.data.types import TUID
from quantify_core.utilities.general import delete_keys_from_dict, get_subclasses
if TYPE_CHECKING:
    from collections.abc import Iterable
    from qcodes.parameters import ParameterBase
    from quantify_core.measurement.types import Gettable, Settable
# this is a pointer to the module object instance itself.
this = sys.modules[__name__]
this._datadir = None
# FIXME: This environment variable is needed to avoid locking when loading a dataset.
# Remove when dataset v2 gets implemented and merged!
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
DATASET_NAME = "dataset.hdf5"
QUANTITIES_OF_INTEREST_NAME = "quantities_of_interest.json"
PROCESSED_DATASET_NAME = "dataset_processed.hdf5"
# pylint: disable=keyword-arg-before-vararg
[docs]
class DecodeToNumpy(json.JSONDecoder):
    """Decodes a JSON object to Python/Numpy objects."""
[docs]
    def __init__(self, list_to_ndarray: bool = False, *args, **kwargs) -> None:
        """Decodes a JSON object to Python/Numpy objects.
        Example
        -------
        json.loads(json_string, cls=DecodeToNumpy, list_to_numpy=True)
        Parameters
        ----------
        list_to_ndarray
            If True, will try to convert python lists to a numpy array.
        args
            Additional args to be passed to :class:`json.JSONDecoder`.
        kwargs
            Additional kwargs to be passed to :class:`json.JSONDecoder`.
        """
        self.list_to_ndarray = list_to_ndarray
        json.JSONDecoder.__init__(self, object_hook=self._object_hook, *args, **kwargs) 
[docs]
    def _object_hook(self, obj: dict) -> dict:
        """Custom deserialization hook that converts lists to np.arrays."""
        for key, val in obj.items():
            if self.list_to_ndarray and isinstance(val, list):
                obj[key] = np.array(val)
        return obj 
 
[docs]
def default_datadir(verbose: bool = True) -> Path:
    """Returns (and optionally print) a default datadir path.
    Intended for fast prototyping, tutorials, examples, etc..
    Parameters
    ----------
    verbose
        If ``True`` prints the returned datadir.
    Returns
    -------
    :
        The ``Path.home() / "quantify-data"`` path.
    """
    datadir = (Path.home() / "quantify-data").resolve()
    if verbose:
        print(f"Data will be saved in:\n{datadir}")
    return datadir 
[docs]
def gen_tuid(time_stamp: datetime.datetime | None = None) -> TUID:
    """Generates a :class:`~quantify_core.data.types.TUID` based on current time.
    Parameters
    ----------
    time_stamp
        Optional, can be passed to ensure the tuid is based on a specific time.
    Returns
    -------
    :
        Timestamp based uid.
    """
    if time_stamp is None:
        time_stamp = datetime.datetime.now()
    # time_stamp gives microseconds by default
    (date_time, micro) = time_stamp.strftime("%Y%m%d-%H%M%S-.%f").split(".")
    # this ensures the string is formatted correctly as some systems return 0 for micro
    date_time = f"{date_time}{int(int(micro) / 1000):03d}-"
    # the tuid is composed of the timestamp and a 6 character uuid.
    tuid = TUID(date_time + str(uuid4())[:6])
    return tuid 
[docs]
def get_datadir() -> str:
    """Returns the current data directory.
    The data directory can be changed using
    :func:`~quantify_core.data.handling.set_datadir`.
    Returns
    -------
    :
        The current data directory.
    """
    set_datadir_import = "from " + this.__name__ + " import set_datadir"
    if this._datadir is None or not os.path.isdir(this._datadir):
        raise NotADirectoryError(
            "The datadir is not valid. Please set the datadir after importing Quantify."
            "\nWe recommend to settle for a single common data directory for all \n"
            "notebooks/experiments within your measurement setup/PC.\n"
            "E.g. '~/quantify-data' (unix), or 'D:\\Data\\quantify-data' (Windows).\n"
            "The datadir can be changed as follows:\n\n"
            f"    {set_datadir_import}\n"
            "    set_datadir('path_to_datadir')",
        )
    return this._datadir 
[docs]
def set_datadir(datadir: Path | str | None = None) -> None:
    """Sets the data directory.
    Parameters
    ----------
    datadir
        Path of the data directory. If set to ``None``, resets the datadir to the
        default datadir (``<top_level>/data``).
    """
    if datadir is None:
        datadir = default_datadir()
    if not os.path.isdir(datadir):
        os.mkdir(datadir)
    this._datadir = datadir 
[docs]
def locate_experiment_container(tuid: TUID, datadir: Path | str | None = None) -> str:
    """Returns the path to the experiment container of the specified tuid.
    Parameters
    ----------
    tuid
        A :class:`~quantify_core.data.types.TUID` string. It is also possible to specify
        only the first part of a tuid.
    datadir
        Path of the data directory. If ``None``, uses :meth:`~get_datadir` to determine
        the data directory.
    Returns
    -------
    :
        The path to the experiment container
    Raises
    ------
    FileNotFoundError
        Experiment container not found.
    """
    if datadir is None:
        datadir = get_datadir()
    daydir = os.path.join(datadir, tuid[:8])
    # This will raise a file not found error if no data exists on the specified date
    exp_folders = list(filter(lambda x: tuid in x, os.listdir(daydir)))
    if len(exp_folders) == 0:
        raise FileNotFoundError(f"File with tuid: {tuid} was not found.")
    # We assume that the length is 1 as tuid is assumed to be unique
    exp_folder = exp_folders[0]
    return os.path.join(daydir, exp_folder) 
def _locate_experiment_file(
    tuid: TUID,
    datadir: Path | str | None = None,
    name: str = DATASET_NAME,
) -> str:
    exp_container = locate_experiment_container(tuid=tuid, datadir=datadir)
    return os.path.join(exp_container, name)
[docs]
def load_dataset(
    tuid: TUID,
    datadir: Path | str | None = None,
    name: str = DATASET_NAME,
) -> xr.Dataset:
    """Loads a dataset specified by a tuid.
    .. tip::
        This method also works when specifying only the first part of a
        :class:`~quantify_core.data.types.TUID`.
    .. note::
        This method uses :func:`~.load_dataset` to ensure the file is closed after
        loading as datasets are intended to be immutable after performing the initial
        experiment.
    Parameters
    ----------
    tuid
        A :class:`~quantify_core.data.types.TUID` string. It is also possible to specify
        only the first part of a tuid.
    datadir
        Path of the data directory. If ``None``, uses :meth:`~get_datadir` to determine
        the data directory.
    name
        Name of the dataset.
    Returns
    -------
    :
        The dataset.
    Raises
    ------
    FileNotFoundError
        No data found for specified date.
    """
    return load_dataset_from_path(_locate_experiment_file(tuid, datadir, name)) 
[docs]
def load_dataset_from_path(path: Path | str) -> xr.Dataset:
    """Loads a :class:`~xarray.Dataset` with a specific engine preference.
    Before returning the dataset :meth:`AdapterH5NetCDF.recover()
    <quantify_core.data.dataset_adapters.AdapterH5NetCDF.recover>` is applied.
    This function tries to load the dataset until success with the following engine
    preference:
    - ``"h5netcdf"``
    - ``"netcdf4"``
    - No engine specified (:func:`~xarray.load_dataset` default)
    Parameters
    ----------
    path
        Path to the dataset.
    Returns
    -------
    :
        The loaded dataset.
    """  # pylint: disable=line-too-long
    exceptions = []
    engines = ["h5netcdf", "netcdf4", None]
    for engine in engines:
        try:
            dataset = xr.load_dataset(path, engine=engine)
        except Exception as exception:
            exceptions.append(exception)
        else:
            # Only quantify_dataset_version=>2.0.0 requires the adapter
            if "quantify_dataset_version" in dataset.attrs:
                dataset = da.AdapterH5NetCDF.recover(dataset)
            return dataset
    # Do not let exceptions pass silently
    for exception, engine in zip(exceptions, engines[: engines.index(engine)]):
        print(
            f"Failed loading dataset with '{engine}' engine. "
            f"Raised '{exception.__class__.__name__}':\n    {exception}",
        )
    # raise the last exception
    raise exception 
[docs]
def load_quantities_of_interest(tuid: TUID, analysis_name: str) -> dict:
    """Given an experiment TUID and the name of an analysis previously run on it,
    retrieves the corresponding "quantities of interest" data.
    Parameters
    ----------
    tuid
        TUID of the experiment.
    analysis_name
        Name of the Analysis from which to load the data.
    Returns
    -------
    :
        A dictionary containing the loaded quantities of interest.
    """
    # Get Analysis directory from TUID
    exp_folder = Path(locate_experiment_container(tuid, get_datadir()))
    analysis_dir = exp_folder / f"analysis_{analysis_name}"
    if not analysis_dir.is_dir():
        raise FileNotFoundError("Analysis not found in current experiment.")
    # Load JSON file and return
    with open(
        os.path.join(analysis_dir, QUANTITIES_OF_INTEREST_NAME),
        encoding="utf-8",
    ) as file:
        quantities_of_interest = json.load(file)
    return quantities_of_interest 
[docs]
def load_processed_dataset(tuid: TUID, analysis_name: str) -> xr.Dataset:
    """Given an experiment TUID and the name of an analysis previously run on it,
    retrieves the processed dataset resulting from that analysis.
    Parameters
    ----------
    tuid
        TUID of the experiment from which to load the data.
    analysis_name
        Name of the Analysis from which to load the data.
    Returns
    -------
    :
        A dataset containing the results of the analysis.
    """
    # Get Analysis directory from TUID
    exp_folder = Path(locate_experiment_container(tuid, get_datadir()))
    analysis_dir = exp_folder / f"analysis_{analysis_name}"
    if not analysis_dir.is_dir():
        raise FileNotFoundError("Analysis not found in current experiment.")
    # Load dataset and return
    return load_dataset_from_path(analysis_dir / PROCESSED_DATASET_NAME) 
[docs]
def _xarray_numpy_bool_patch(dataset: xr.Dataset) -> None:
    """Converts any attribute of :obj:`~numpy.bool_` type to a :obj:`~bool`.
    This is a patch to a bug in xarray 0.17.0.
    .. seealso::
        See issue #161 in quantify-core.
        Our (accepted) pull request https://github.com/pydata/xarray/pull/4986
        Version >0.17.0 will fix the problem but will have breaking changes,
        for now we use this patch.
    Parameters
    ----------
    dataset
        The dataset to be patched in-place.
    """
    def bool_cast_attributes(attrs: dict) -> None:
        for attr_name, attr_val in attrs.items():
            if isinstance(attr_val, np.bool_):
                # cast to bool to avoid xarray 0.17.0 type exception
                # for engine="h5netcdf"
                attrs[attr_name] = bool(attr_val)
    for data_array in dataset.variables.values():
        bool_cast_attributes(data_array.attrs)
    bool_cast_attributes(dataset.attrs) 
[docs]
def write_dataset(path: Path | str, dataset: xr.Dataset) -> None:
    """Writes a :class:`~xarray.Dataset` to a file with the `h5netcdf` engine.
    Before writing the
    :meth:`~quantify_core.data.dataset_adapters.AdapterH5NetCDF.adapt`
    is applied.
    To accommodate for complex-type numbers and arrays ``invalid_netcdf=True`` is used.
    Parameters
    ----------
    path
        Path to the file including filename and extension
    dataset
        The :class:`~xarray.Dataset` to be written to file.
    """  # pylint: disable=line-too-long
    _xarray_numpy_bool_patch(dataset)  # See issue #161 in quantify-core
    # Only quantify_dataset_version=>2.0.0 requires the adapter
    if "quantify_dataset_version" in dataset.attrs:
        dataset = da.AdapterH5NetCDF.adapt(dataset)
    dataset.to_netcdf(path, engine="h5netcdf", invalid_netcdf=True) 
[docs]
def load_snapshot(
    tuid: TUID,
    datadir: Path | str | None = None,
    list_to_ndarray: bool = False,
    file: str = "snapshot.json",
) -> dict:
    """Loads a snapshot specified by a tuid.
    Parameters
    ----------
    tuid
        A :class:`~quantify_core.data.types.TUID` string. It is also possible to specify
        only the first part of a tuid.
    datadir
        Path of the data directory. If ``None``, uses :meth:`~get_datadir` to determine
        the data directory.
    list_to_ndarray
        Uses an internal DecodeToNumpy decoder which allows a user to automatically
        convert a list to numpy array during deserialization of the snapshot.
    file
        Filename to load.
    Returns
    -------
    :
        The snapshot.
    Raises
    ------
    FileNotFoundError
        No data found for specified date.
    """
    with open(_locate_experiment_file(tuid, datadir, file)) as snap:
        return json.load(snap, cls=DecodeToNumpy, list_to_ndarray=list_to_ndarray) 
[docs]
def create_exp_folder(
    tuid: TUID,
    name: str | None = None,
    datadir: Path | str | None = None,
) -> str:
    """Creates an empty folder to store an experiment container.
    If the folder already exists, simply returns the experiment folder corresponding to
    the :class:`~quantify_core.data.types.TUID`.
    Parameters
    ----------
    tuid
        A timestamp based human-readable unique identifier.
    name
        Optional name to identify the folder.
    datadir
        path of the data directory.
        If ``None``, uses :meth:`~get_datadir` to determine the data directory.
    Returns
    -------
    :
        Full path of the experiment folder following format:
        ``/datadir/YYYYmmDD/YYYYmmDD-HHMMSS-sss-******-name/``.
    """
    TUID.is_valid(tuid)
    if datadir is None:
        datadir = get_datadir()
    exp_folder = os.path.join(datadir, tuid[:8], tuid)
    if name:
        exp_folder += "-" + name
    os.makedirs(exp_folder, exist_ok=True)
    return exp_folder 
# pylint: disable=too-many-locals
[docs]
def initialize_dataset(
    settable_pars: Iterable,
    setpoints: list[np.ndarray],
    gettable_pars: Iterable,
) -> xr.Dataset:
    """Initialize an empty dataset based on settable_pars, setpoints and gettable_pars.
    Parameters
    ----------
    settable_pars
        A list of M settables.
    setpoints
        An (M*N) array.
    gettable_pars
        A list of gettables.
    Returns
    -------
    :
        The dataset.
    """
    darrs = []
    coords = []
    for i, setpar in enumerate(settable_pars):
        attrs = {
            "name": _generate_name(setpar),
            "long_name": _generate_long_name(setpar),
            "units": setpar.unit,
            "batched": _is_batched(setpar),
        }
        if attrs["batched"] and hasattr(setpar, "batch_size"):
            attrs["batch_size"] = setpar.batch_size
        coords.append(f"x{i}")
        darrs.append(xr.DataArray(data=setpoints[i], name=coords[-1], attrs=attrs))
    numpoints = len(setpoints[0])
    j = 0
    for getpar in gettable_pars:
        # it's possible for one Gettable to return multiple axes. to handle this, zip
        # the axis info together
        # so we can iterate through when defining the axis in the dataset
        if not isinstance(getpar.name, list):
            itrbl = zip([getpar.name], [getpar.label], [getpar.unit])
        else:
            itrbl = zip(getpar.name, getpar.label, getpar.unit)
        count = 0
        for idx, info in enumerate(itrbl):
            attrs = {
                "name": info[0],
                "long_name": info[1],
                "units": info[2],
                "batched": _is_batched(getpar),
            }
            if attrs["batched"] and hasattr(getpar, "batch_size"):
                attrs["batch_size"] = getpar.batch_size
            empty_arr = np.empty(numpoints)
            empty_arr[:] = np.nan
            darrs.append(
                xr.DataArray(
                    data=empty_arr,
                    name=f"y{j + idx}",
                    attrs=attrs,
                ),
            )
            count += 1
        j += count
    dataset = xr.merge(darrs)
    dataset = dataset.set_coords(coords)
    # xarray>=0.18.0 tries to combine attrs which we do not want at all
    dataset.attrs = {}
    dataset.attrs["tuid"] = gen_tuid()
    return dataset 
[docs]
def grow_dataset(dataset: xr.Dataset) -> xr.Dataset:
    """Resizes the dataset by doubling the current length of all arrays.
    Parameters
    ----------
    dataset
        The dataset to resize.
    Returns
    -------
    :
        The resized dataset.
    """
    darrs = []
    # coords will also be grown
    for vname in dataset.variables:
        data = dataset[vname].values
        darrs.append(
            xr.DataArray(
                name=dataset[vname].name,
                data=np.pad(data, (0, len(data)), "constant", constant_values=np.nan),
                attrs=dataset[vname].attrs,
            ),
        )
    coords = tuple(dataset.coords.keys())
    dataset = dataset.drop_dims(["dim_0"])
    merged_data_arrays = xr.merge(darrs)
    merged_data_arrays.attrs = {}  # xarray>=0.18.0 tries to merge attrs
    dataset = dataset.merge(merged_data_arrays)
    dataset = dataset.set_coords(coords)
    return dataset 
[docs]
def trim_dataset(dataset: xr.Dataset) -> xr.Dataset:
    """Trim NaNs from a dataset, useful in the case of a dynamically
    resized dataset (e.g. adaptive loops).
    Parameters
    ----------
    dataset
        The dataset to trim.
    Returns
    -------
    :
        The dataset, trimmed and resized if necessary or unchanged.
    """
    coords = tuple(dataset.coords.keys())
    for i, val in enumerate(reversed(dataset["y0"].values)):
        if not np.isnan(val):
            finish_idx = len(dataset["y0"].values) - i
            darrs = []
            # coords will also be trimmed
            for vname in dataset.variables:
                data = dataset[vname].values[:finish_idx]
                darrs.append(
                    xr.DataArray(
                        name=dataset[vname].name,
                        data=data,
                        attrs=dataset[vname].attrs,
                    ),
                )
            dataset = dataset.drop_dims(["dim_0"])
            merged_data_arrays = xr.merge(darrs)
            merged_data_arrays.attrs = {}  # xarray>=0.18.0 tries to merge attrs
            dataset = dataset.merge(merged_data_arrays)
            dataset = dataset.set_coords(coords)
            break
    return dataset 
[docs]
def concat_dataset(
    tuids: list[TUID],
    dim: str = "dim_0",
    name: str = None,
    analysis_name: str = None,
) -> xr.Dataset:
    """Takes in a list of TUIDs and concatenates the corresponding
    datasets. It adds the TUIDs as a coordinate in the new dataset.
    By default, we will extract the unprocessed dataset from each directory, but if
    analysis_name is specified, we will extract the processed dataset for that
    analysis.
    Parameters
    ----------
    tuids:
        List of TUIDs.
    dim:
        Dimension along which to concatenate the datasets.
    analysis_name:
        In the case that we want to extract the processed dataset for give
        analysis, this is the name of the analysis.
    name:
        The name of the concatenated dataset. If None, use the name of the
        first dataset in the list.
    Returns
    -------
    :
        Concatenated dataset with new TUID and references to the old TUIDs.
    """
    if not isinstance(tuids, List):
        raise TypeError(f"type(tuids)={type(tuids)} should be a list of TUIDs")
    dataset_list = []
    extended_tuids = []
    # loop over the TUIDs to get all dataset. Reversed so the extended tuid list can
    # be made
    for i, tuid in enumerate(tuids):
        if analysis_name:
            dataset = load_processed_dataset(tuid, analysis_name=analysis_name)
        else:
            dataset = load_dataset(tuid)
        # Ensure dataset names are consistent
        if i == 0 and not name:
            name = dataset.attrs.get("name")
        dataset.attrs["name"] = name
        # Set dataset attribute 'tuid' to None to resolve conflicting tuids between
        # the loaded datasets
        dataset.attrs["tuid"] = None
        dataset_list.append(dataset)
        extended_tuids += [TUID.datetime(tuid)] * len(dataset[dim])
    new_dataset = xr.concat(dataset_list, dim=dim, combine_attrs="no_conflicts")
    new_coord = {
        "ref_tuids": (
            dim,
            extended_tuids,
            {
                "is_main_coord": True,
                "long_name": "reference_tuids",
                "is_dataset_ref": True,
                "uniformly_spaced": False,
            },
        ),
    }
    new_dataset = new_dataset.assign_coords(new_coord)
    new_dataset.attrs["tuid"] = gen_tuid()
    return new_dataset 
[docs]
def get_varying_parameter_values(
    tuids: list[TUID],
    parameter: str,
) -> np.ndarray:
    """A function that gets a parameter which varies over multiple experiments and puts
    it in a ndarray.
    Parameters
    ----------
    tuids:
        The list of TUIDs from which to get the varying parameter.
    parameter:
        The name and address of the QCoDeS parameter from which to get the
        value, including the instrument name and all submodules. For example
        :code:`"current_source.module0.dac0.current"`.
    Returns
    -------
    :
        The values of the varying parameter.
    """
    value = []
    if not isinstance(tuids, List):
        TypeError(  # noqa :PLW0133
            f"type(tuids)={type(tuids)} should be a list of TUIDs"
        )
    for tuid in tuids:
        try:
            _tuid = TUID(tuid)
            _snapshot = load_snapshot(_tuid)
            value.append(extract_parameter_from_snapshot(_snapshot, parameter)["value"])
        except FileNotFoundError as fnf_error:
            raise FileNotFoundError(fnf_error) from fnf_error
        except ValueError as vl_error:
            raise ValueError(vl_error) from vl_error
        except KeyError as key_error:
            raise KeyError(
                f"Check the varying parameter you put in.\n {key_error}",
            ) from key_error
    values = np.array(value)
    return values 
# pylint: disable=redefined-outer-name
# pylint: disable=too-many-arguments
[docs]
def to_gridded_dataset(
    quantify_dataset: xr.Dataset,
    dimension: str = "dim_0",
    coords_names: Iterable | None = None,
) -> xr.Dataset:
    """Converts a flattened (a.k.a. "stacked") dataset as the one generated by the
    :func:`~initialize_dataset` to a dataset in which the measured values are mapped
    onto a grid in the `xarray` format.
    This will be meaningful only if the data itself corresponds to a gridded
    measurement.
    .. note::
        Each individual :code:`(x0[i], x1[i], x2[i], ...)` setpoint must be unique.
    Conversions applied:
    - The names :code:`"x0", "x1", ...` will correspond to the names of the Dimensions.
    - The unique values for each of the :code:`x0, x1, ...` Variables are converted to
        Coordinates.
    - The :code:`y0, y1, ...` Variables are reshaped into a (multi-)dimensional grid
        and associated to the Coordinates.
    .. seealso::
        - :ref:`howto-data-handling-to-gridded`
        - :meth:`.MeasurementControl.setpoints_grid`
    Parameters
    ----------
    quantify_dataset
        Input dataset in the format generated by the :class:`~initialize_dataset`.
    dimension
        The flattened xarray Dimension.
    coords_names
        Optionally specify explicitly which Variables correspond to orthogonal
        coordinates, e.g. datasets holds values for :code:`("x0", "x1")` but only "x0"
        is independent: :code:`to_gridded_dataset(dset, coords_names=["x0"])`.
    Returns
    -------
    :
        The new dataset.
    """
    if dimension not in (dims := tuple(quantify_dataset.dims)):
        raise ValueError(f"Dimension {dimension} not in dims {dims}.")
    if coords_names is None:
        # for compatibility with older datasets we use `variables` instead of `coords`
        coords_names = sorted(
            v for v in quantify_dataset.variables if v.startswith("x")
        )
    else:
        for coord in coords_names:
            vars_ = tuple(quantify_dataset.variables.keys())
            if coord not in vars_:
                raise ValueError(f"Coordinate {coord} not in coordinates {vars_}.")
    # Because xarray in general creates new objects and
    # due to https://github.com/pydata/xarray/issues/2245
    # the attributes need to be saved and restored in the new object
    attrs_coords = tuple(quantify_dataset[name].attrs for name in coords_names)
    # Convert "xi" variables to Coordinates
    dataset = quantify_dataset.set_coords(coords_names)
    # Convert to a gridded xarray dataset format
    if len(coords_names) == 1:
        # No unstacking needed just swap the dimension
        for var in quantify_dataset.data_vars:
            if dimension in dataset[var].dims:
                dataset = dataset.update(
                    {var: dataset[var].swap_dims({dimension: coords_names[0]})},
                )
    else:
        # Make the Dimension `dimension` a MultiIndex(x0, x1, ...)
        dataset = dataset.set_index({dimension: coords_names})
        # See also: https://docs.xarray.dev/en/stable/reshaping.html#stack-and-unstack
        dataset = dataset.unstack(dim=dimension)
    for name, attrs in zip(coords_names, attrs_coords):
        dataset[name].attrs = attrs
    if "grid_2d" in dataset.attrs:
        dataset.attrs["grid_2d"] = False
    return dataset 
# ######################################################################
[docs]
def get_latest_tuid(contains: str = "") -> TUID:
    """Returns the most recent tuid.
    .. tip::
        This function is similar to :func:`~get_tuids_containing` but is preferred if
        one is only interested in the most recent
        :class:`~quantify_core.data.types.TUID` for performance reasons.
    Parameters
    ----------
    contains
        An optional string contained in the experiment name.
    Returns
    -------
    :
        The latest TUID.
    Raises
    ------
    FileNotFoundError
        No data found.
    """
    # `max_results=1, reverse=True` makes sure the tuid is found efficiently asap
    return get_tuids_containing(contains, max_results=1, reverse=True)[0] 
# pylint: disable=too-many-locals
[docs]
def get_tuids_containing(
    contains: str = "",
    t_start: datetime.datetime | str | None = None,
    t_stop: datetime.datetime | str | None = None,
    max_results: int = sys.maxsize,
    reverse: bool = False,
) -> list[TUID]:
    """Returns a list of tuids containing a specific label.
    .. tip::
        If one is only interested in the most recent
        :class:`~quantify_core.data.types.TUID`, :func:`~get_latest_tuid` is preferred
        for performance reasons.
    Parameters
    ----------
    contains
        A string contained in the experiment name.
    t_start
        datetime to search from, inclusive. If a string is specified, it will be
        converted to a datetime object using :obj:`~dateutil.parser.parse`.
        If no value is specified, will use the year 1 as a reference t_start.
    t_stop
        datetime to search until, exclusive. If a string is specified, it will be
        converted to a datetime object using :obj:`~dateutil.parser.parse`.
        If no value is specified, will use the current time as a reference t_stop.
    max_results
        Maximum number of results to return. Defaults to unlimited.
    reverse
        If False, sorts tuids chronologically, if True sorts by most recent.
    Returns
    -------
    list
        A list of :class:`~quantify_core.data.types.TUID`: objects.
    Raises
    ------
    FileNotFoundError
        No data found.
    """
    datadir = get_datadir()
    if isinstance(t_start, str):
        t_start = parse(t_start)
    elif t_start is None:
        t_start = datetime.datetime(1, 1, 1)
    if isinstance(t_stop, str):
        t_stop = parse(t_stop)
    elif t_stop is None:
        t_stop = datetime.datetime.now()
    # date range filters, define here to make the next line more readable
    d_start = t_start.strftime("%Y%m%d")
    d_stop = t_stop.strftime("%Y%m%d")
    def lower_bound(dir_name: str) -> bool:
        return dir_name >= d_start if d_start else True
    def upper_bound(dir_name: str) -> bool:
        return dir_name <= d_stop if d_stop else True
    daydirs = list(
        filter(
            lambda x: (
                x.isdigit() and len(x) == 8 and lower_bound(x) and upper_bound(x)
            ),
            os.listdir(datadir),
        ),
    )
    daydirs.sort(reverse=reverse)
    if len(daydirs) == 0:
        err_msg = f"There are no valid day directories in the data folder '{datadir}'"
        if t_start or t_stop:
            err_msg += f", for the range {t_start or ''} to {t_stop or ''}"
        raise FileNotFoundError(err_msg)
    tuids = []
    for daydir in daydirs:
        expdirs = list(
            filter(
                lambda x: (
                    len(x) > 25
                    and (contains in x)  # label is part of exp_name
                    and TUID.is_valid(x[:26])  # tuid is valid
                    and (t_start <= TUID.datetime_seconds(x) < t_stop)
                ),
                os.listdir(os.path.join(datadir, daydir)),
            ),
        )
        expdirs.sort(reverse=reverse)
        for expname in expdirs:
            # Check for inconsistent folder structure for datasets portability
            if daydir != expname[:8]:
                raise FileNotFoundError(
                    f"Experiment container '{expname}' is in wrong day directory "
                    f"'{daydir}'",
                )
            tuids.append(TUID(expname[:26]))
            if len(tuids) == max_results:
                return tuids
    if len(tuids) == 0:
        raise FileNotFoundError(f"No experiment found containing '{contains}'")
    return tuids 
[docs]
def snapshot(update: bool = False, clean: bool = True) -> dict:
    """State of all instruments setup as a JSON-compatible dictionary (everything
    that the custom JSON encoder class :class:`~qcodes.utils.NumpyJSONEncoder`
    supports).
    Parameters
    ----------
    update
        If True, first gets all values before filling the snapshot.
    clean
        If True, removes certain keys from the snapshot to create a more
        readable and compact snapshot.
    """
    snap = {"instruments": {}, "parameters": {}}
    # Instances of Instrument subclasses are recorded inside their subclasses
    for instrument_class in get_subclasses(Instrument, include_base=True):
        for (
            instrument
        ) in (
            instrument_class.instances()
        ):  # qcodes.Instrument.instances() returns valid objects only
            snap["instruments"][instrument.name] = instrument.snapshot(update=update)
    if clean:
        exclude_keys = {
            "inter_delay",
            "post_delay",
            "vals",
            "instrument",
            "functions",
            "__class__",
            "raw_value",
            "instrument_name",
            "full_name",
            "val_mapping",
        }
        snap = delete_keys_from_dict(snap, exclude_keys)
    return snap 
# ######################################################################
# Private utilities
# ######################################################################
[docs]
def _xi_and_yi_match(dsets: Iterable) -> bool:
    """Checks if all xi and yi data variables in `dsets` match.
    Returns `True` only when all these conditions are met:
    - Same number of xi's
    - Same number of yi's
    - Same attributes for xi's across `dsets`
    - Same attributes for yi's across `dsets`
    - Same order of the xi's across `dsets`
    - Same order of the yi's across `dsets`
    Otherwise returns `False`.
    """
    return _vars_match(dsets, var_type="x") and _vars_match(dsets, var_type="y") 
[docs]
def _vars_match(dsets: Iterable, var_type: str = "x") -> bool:
    """Checks if all the datasets have matching xi or yi."""
    def get_xi_attrs(dset: xr.Dataset) -> tuple[str, ...]:
        # Hash is used in order to ensure everything matches:
        # name, long_name, unit, number of xi
        return tuple(dset[xi].attrs for xi in _get_parnames(dset, var_type))
    iterator = map(get_xi_attrs, dsets)
    # We can compare to the first one always
    tup0 = next(iterator, None)
    return all(tup == tup0 for tup in iterator) 
def _get_parnames(dset: xr.Dataset, par_type: str) -> Iterable:
    attr = "coords" if par_type == "x" else "data_vars"
    return sorted(key for key in getattr(dset, attr) if key.startswith(par_type))
[docs]
def _is_batched(obj: Settable | Gettable) -> bool:
    """N.B. This function cannot be imported from quantify_core.measurement.type due to
    some circular dependencies that it would create in the
    quantify_core.measurement.__init__.
    Parameters
    ----------
    obj : Settable or Gettable
        settable or gettable to be checked.
    Returns
    -------
    :
        The `.batched` attribute of the settable/gettable `obj`, `False` if not present.
    """
    return getattr(obj, "batched", False) 
[docs]
def _instrument_submodules_settable(
    settable: Settable,
) -> list[ParameterBase | InstrumentBase]:
    """
    Returns a list containing the root instrument, submodules and settable (typically a
    :class:`Parameter`).
    For example, when passing :code:`current_source.module1.output3`, this function will
    return :code:`[current_source, module1, output3]`.
    Parameters
    ----------
    settable
        The settable for which to get the instrument and ancestors.
    Returns
    -------
    :
        A list with the root instrument, all submodules and the settable.
    """
    def _recursive_add_submodules(
        modules: list,
        root: InstrumentBase | InstrumentModule | ChannelTuple,
        parameter: ParameterBase,
    ) -> bool:
        # Special case for ChannelTuples
        if isinstance(root, ChannelTuple):
            for channel in root:
                in_channel = _recursive_add_submodules(
                    modules=modules, root=channel, parameter=parameter
                )
                if in_channel:
                    modules.append(root)
                    return True
            return False
        # InstrumentBase and InstrumentModule behave similarly
        if parameter in root.parameters.values():
            modules.append(root)
            return True
        if len(root.submodules) == 0:
            return False
        for submodule in root.submodules.values():
            in_submodule = _recursive_add_submodules(
                modules=modules, root=submodule, parameter=parameter
            )
            if in_submodule:
                modules.append(root)
                return True
        return False
    try:
        root = settable.root_instrument
    except AttributeError:
        root = None
    if root is None:
        return [settable]
    modules_list = []
    _recursive_add_submodules(modules_list, root, settable)
    # Since the recursive function adds the last submodule first to the list, we insert
    # the settable at place zero and return the reversed list.
    modules_list.insert(0, settable)
    return modules_list[::-1] 
[docs]
def _generate_long_name(settable: Settable) -> str:
    """
    Generate the :code:`long_name` entry for a dataset coordinate for a settable.
    The long name is based on the label of root instrument and all relevant submodules
    leading to the settable, including the settable. If no label is specified, the
    :code:`name` attribute is used.
    """
    path = _instrument_submodules_settable(settable)
    sublabels = []
    for x in path:
        if hasattr(x, "label"):
            sublabels.append(x.label)
        elif hasattr(x, "short_name"):
            sublabels.append(x.short_name)
        else:
            sublabels.append(x.name)
    return " ".join(sublabels) 
[docs]
def _generate_name(settable: Settable) -> str:
    """
    Generate the :code:`name` entry for a dataset coordinate for a settable.
    The long name is based on the :code:`name` of root instrument and all relevant
    submodules leading to the settable, including the settable.
    """
    path = _instrument_submodules_settable(settable)
    subnames = []
    for x in path:
        if hasattr(x, "short_name"):
            subnames.append(x.short_name)
        else:
            x_name = x.name
            for preceding_name in subnames:
                x_name = x_name.replace(preceding_name + "_", "")
            subnames.append(x_name)
    return ".".join(subnames)