Source code for quantify_scheduler.helpers.diagnostics_report

# Repository: https://gitlab.com/quantify-os/quantify-scheduler
# Licensed according to the LICENCE file on the main branch

"""Helper functions for debugging experiments."""

import json
import os
import sys
import traceback
import zipfile
from datetime import datetime
from os.path import dirname
from types import TracebackType
from typing import Any, Dict, Optional, Tuple, Type, Union
from uuid import uuid4

import numpy as np
import pytz
from qcodes.utils.json_utils import NumpyJSONEncoder

try:
    import tomllib  # type: ignore[reportMissingImports]
except ModuleNotFoundError:
    import tomli as tomllib

import pkg_resources
from packaging.specifiers import SpecifierSet

from quantify_core._version import __version__ as __core_version__
from quantify_core.data.handling import get_datadir, snapshot
from quantify_scheduler import CompiledSchedule, Schedule
from quantify_scheduler._version import __version__ as __scheduler_version__
from quantify_scheduler.device_under_test.quantum_device import QuantumDevice
from quantify_scheduler.instrument_coordinator.instrument_coordinator import (
    InstrumentCoordinator,
)


[docs]def _generate_diagnostics_report( # noqa: PLR0915 quantum_device: QuantumDevice, gettable_config: Dict[str, Any], schedule: Schedule, instrument_coordinator: InstrumentCoordinator, initialized: bool, compiled_schedule: Union[CompiledSchedule, None], acquisition_data: Union[Tuple[np.ndarray, ...], None], experiment_exception: Union[ Union[ Tuple[Type[BaseException], BaseException, TracebackType], Tuple[None, None, None], ], None, ], ) -> str: """ Generate a report with the current state of an experiment for debugging. Returns ------- : A path to the generated zipfile report. """ def _flatten_hardware_logs_dict( hw_logs: dict, extracted_hw_logs: dict, prefix: Optional[str] = None ) -> dict: for key, value in hw_logs.items(): if isinstance(value, dict): _flatten_hardware_logs_dict( hw_logs=value, extracted_hw_logs=extracted_hw_logs, prefix=key ) else: extracted_hw_logs[ f"{prefix}_{key}" if prefix is not None else f"{key}" ] = value return extracted_hw_logs def _get_dependency_versions() -> list: package_root = dirname(dirname(os.path.realpath(__file__))) pyproject_path = os.path.join(dirname(package_root), "pyproject.toml") with open(pyproject_path, "rb") as pyproject_file: pyproject_data = tomllib.load(pyproject_file) all_dependency_versions = [] for dependency in pyproject_data["project"]["dependencies"]: version = ( __core_version__ if dependency == "quantify-core" else pkg_resources.get_distribution(dependency).version ) all_dependency_versions.append(f"{dependency}: {version}") python_specs = SpecifierSet(pyproject_data["project"]["requires-python"]) python_version = sys.version.split(" ")[0] all_dependency_versions.append(f"python{python_specs}: {python_version}") all_dependency_versions.append(f"quantify-scheduler: {__scheduler_version__}") return sorted(all_dependency_versions) report_type = "failed_initialization" connection_exception = None hardware_logs = {} if initialized: try: hardware_logs = instrument_coordinator.retrieve_hardware_logs() except: # noqa: E722 pylint: disable=bare-except connection_exception = sys.exc_info() if experiment_exception is not None: report_type = ( "failed_connection_to_hw" if connection_exception is not None else "failed_exp" ) else: report_type = ( "failed_hw_log_retrieval" if connection_exception is not None else "completed_exp" ) report_zipfile = os.path.join( get_datadir(), f"diagnostics_report_{report_type}_{uuid4()}.zip" ) with zipfile.ZipFile( report_zipfile, mode="w", compression=zipfile.ZIP_DEFLATED, compresslevel=9 ) as zip_file: zip_file.writestr( "timestamp.txt", datetime.now(pytz.utc).strftime("%Y-%m-%d_%H-%M-%S_%Z"), ) zip_file.writestr( "dependency_versions.json", json.dumps(_get_dependency_versions()) ) zip_file.writestr( "gettable.json", json.dumps(gettable_config, cls=NumpyJSONEncoder, indent=4) ) zip_file.writestr("quantum_device.json", quantum_device.to_json()) zip_file.writestr( "hardware_cfg.json", json.dumps( quantum_device.generate_hardware_config(), cls=NumpyJSONEncoder, indent=4, ), ) zip_file.writestr("schedule.json", schedule.to_json()) if compiled_schedule is not None: zip_file.writestr("compiled_schedule.json", compiled_schedule.to_json()) if initialized: zip_file.writestr( "snapshot.json", json.dumps(snapshot(), cls=NumpyJSONEncoder, indent=4), ) if acquisition_data is not None: data_str = "" for line in acquisition_data: data_str += f"{np.array2string(line)},\n" data_str.rstrip(",\n") zip_file.writestr("acquisition_data.txt", data_str) if hardware_logs: for ( component_name, component_hardware_logs, ) in hardware_logs.items(): for log_name, hardware_log in _flatten_hardware_logs_dict( hw_logs=component_hardware_logs, extracted_hw_logs={} ).items(): zip_file.writestr( os.path.join(component_name, f"{log_name}.txt"), hardware_log, ) if experiment_exception is not None: exception_str = " ".join(traceback.format_exception(*experiment_exception)) zip_file.writestr("error_trace.txt", exception_str) elif connection_exception is not None: # No need to print this if there is already an experiment_exception exception_str = " ".join(traceback.format_exception(*connection_exception)) zip_file.writestr("connection_error_trace.txt", exception_str) return report_zipfile