# Repository: https://gitlab.com/quantify-os/quantify-scheduler
# Licensed according to the LICENCE file on the main branch
"""Helper functions for debugging experiments."""
import json
import os
import re
import sys
import traceback
import zipfile
from datetime import datetime, timezone
from importlib.metadata import distribution
from importlib.metadata import version as get_version
from types import TracebackType
from typing import Any, Dict, Optional, Tuple, Type, Union
from uuid import uuid4
import numpy as np
from qcodes.utils.json_utils import NumpyJSONEncoder
from quantify_core._version import __version__ as __core_version__
from quantify_core.data.handling import get_datadir, snapshot
from quantify_scheduler import CompiledSchedule, Schedule
from quantify_scheduler._version import __version__ as __scheduler_version__
from quantify_scheduler.device_under_test.quantum_device import QuantumDevice
from quantify_scheduler.instrument_coordinator.instrument_coordinator import (
InstrumentCoordinator,
)
[docs]
def _generate_diagnostics_report( # noqa: PLR0912, PLR0915
quantum_device: QuantumDevice,
gettable_config: Dict[str, Any],
schedule: Schedule,
instrument_coordinator: InstrumentCoordinator,
initialized: bool,
compiled_schedule: Union[CompiledSchedule, None],
acquisition_data: Union[Tuple[np.ndarray, ...], None],
experiment_exception: Union[
Union[
Tuple[Type[BaseException], BaseException, TracebackType],
Tuple[None, None, None],
],
None,
],
) -> str:
"""
Generate a report with the current state of an experiment for debugging.
Returns
-------
:
A path to the generated zipfile report.
"""
def _flatten_hardware_logs_dict(
hw_logs: dict, extracted_hw_logs: dict, prefix: Optional[str] = None
) -> dict:
for key, value in hw_logs.items():
if isinstance(value, dict):
_flatten_hardware_logs_dict(
hw_logs=value, extracted_hw_logs=extracted_hw_logs, prefix=key
)
else:
extracted_hw_logs[
f"{prefix}_{key}" if prefix is not None else f"{key}"
] = value
return extracted_hw_logs
def _get_dependency_versions() -> list:
dependencies = [
dependency
for dependency in distribution(
"quantify-scheduler"
).requires # pyright: ignore[reportOptionalIterable]
if "extra" not in dependency
]
all_dependency_versions = []
_operator_regex_str = r"(~=|==|!=|<=|>=|<|>|===|\[|\s)"
for line in dependencies:
dependency = re.split(_operator_regex_str, line)[0]
if dependency == "quantify_core":
dependency = dependency.replace("_", "-")
version = (
__core_version__
if dependency == "quantify-core"
else get_version(dependency)
)
all_dependency_versions.append(f"{dependency}: {version}")
all_dependency_versions.append(f"quantify-scheduler: {__scheduler_version__}")
all_dependency_versions.append(f"python: {sys.version.split(' ')[0]}")
return sorted(all_dependency_versions)
report_type = "failed_initialization"
connection_exception = None
hardware_logs = {}
if initialized:
try:
hardware_logs = instrument_coordinator.retrieve_hardware_logs()
except Exception:
connection_exception = sys.exc_info()
if experiment_exception is not None:
report_type = (
"failed_connection_to_hw"
if connection_exception is not None
else "failed_exp"
)
else:
report_type = (
"failed_hw_log_retrieval"
if connection_exception is not None
else "completed_exp"
)
report_zipfile = os.path.join(
get_datadir(), f"diagnostics_report_{report_type}_{uuid4()}.zip"
)
with zipfile.ZipFile(
report_zipfile, mode="w", compression=zipfile.ZIP_DEFLATED, compresslevel=9
) as zip_file:
zip_file.writestr(
"timestamp.txt",
datetime.now(timezone.utc).strftime("%Y-%m-%d_%H-%M-%S_%Z"),
)
zip_file.writestr(
"dependency_versions.json", json.dumps(_get_dependency_versions())
)
zip_file.writestr(
"gettable.json", json.dumps(gettable_config, cls=NumpyJSONEncoder, indent=4)
)
zip_file.writestr("quantum_device.json", quantum_device.to_json())
zip_file.writestr(
"hardware_cfg.json",
json.dumps(
quantum_device.generate_hardware_config(),
cls=NumpyJSONEncoder,
indent=4,
),
)
zip_file.writestr("schedule.json", schedule.to_json())
if compiled_schedule is not None:
zip_file.writestr("compiled_schedule.json", compiled_schedule.to_json())
if initialized:
zip_file.writestr(
"snapshot.json",
json.dumps(snapshot(), cls=NumpyJSONEncoder, indent=4),
)
if acquisition_data is not None:
data_str = ""
for line in acquisition_data:
data_str += f"{np.array2string(line)},\n"
data_str.rstrip(",\n")
zip_file.writestr("acquisition_data.txt", data_str)
if hardware_logs:
for (
component_name,
component_hardware_logs,
) in hardware_logs.items():
for log_name, hardware_log in _flatten_hardware_logs_dict(
hw_logs=component_hardware_logs, extracted_hw_logs={}
).items():
zip_file.writestr(
os.path.join(component_name, f"{log_name}.txt"),
hardware_log,
)
if experiment_exception is not None:
exception_str = " ".join(traceback.format_exception(*experiment_exception))
zip_file.writestr("error_trace.txt", exception_str)
elif connection_exception is not None:
# No need to print this if there is already an experiment_exception
exception_str = " ".join(traceback.format_exception(*connection_exception))
zip_file.writestr("connection_error_trace.txt", exception_str)
return report_zipfile