# Repository: https://gitlab.com/quantify-os/quantify-scheduler
# Licensed according to the LICENCE file on the main branch
"""Compiler for the quantify_scheduler."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Literal, overload
import networkx as nx
from quantify_scheduler.backends.qblox.operations.inline_q1asm import InlineQ1ASM
from quantify_scheduler.enums import SchedulingStrategy
from quantify_scheduler.json_utils import load_json_schema, validate_json
from quantify_scheduler.operations.control_flow_library import (
ControlFlowOperation,
)
from quantify_scheduler.schedules.schedule import (
Schedulable,
Schedule,
ScheduleBase,
TimingConstraint,
)
if TYPE_CHECKING:
from quantify_scheduler.backends.graph_compilation import (
CompilationConfig,
)
from quantify_scheduler.operations.operation import Operation
[docs]
logger = logging.getLogger(__name__)
@overload
[docs]
def _determine_absolute_timing(
schedule: Schedule,
time_unit: Literal["physical", "ideal", None] = "physical",
config: CompilationConfig | None = None,
) -> Schedule: ...
@overload
def _determine_absolute_timing(
schedule: Operation,
time_unit: Literal["physical", "ideal", None] = "physical",
config: CompilationConfig | None = None,
) -> Operation | Schedule: ...
def _determine_absolute_timing(
schedule: Operation | Schedule,
time_unit: Literal[
"physical", "ideal", None
] = "physical", # should be included in CompilationConfig
config: CompilationConfig | None = None,
):
"""
Determine the absolute timing of a schedule based on the timing constraints.
This function determines absolute timings for every operation in the
:attr:`~.ScheduleBase.schedulables`. It does this by:
1. iterating over all and elements in the :attr:`~.ScheduleBase.schedulables`.
2. determining the absolute time of the reference operation
- reference point :code:`"ref_pt"` of the reference operation defaults to
:code:`"end"` in case it is not set (i.e., is :code:`None`).
3. determining the start of the operation based on the :code:`rel_time` and
:code:`duration` of operations
- reference point :code:`"ref_pt_new"` of the added operation defaults to
:code:`"start"` in case it is not set.
Parameters
----------
schedule
The schedule for which to determine timings.
config
Compilation config for
:class:`~quantify_scheduler.backends.graph_compilation.QuantifyCompiler`.
time_unit
Whether to use physical units to determine the absolute time or ideal time.
When :code:`time_unit == "physical"` the duration attribute is used.
When :code:`time_unit == "ideal"` the duration attribute is ignored and treated
as if it is :code:`1`.
When :code:`time_unit == None` it will revert to :code:`"physical"`.
Returns
-------
:
The modified ``schedule`` where the absolute time for each operation has been
determined.
Raises
------
NotImplementedError
If the scheduling strategy is not SchedulingStrategy.ASAP
"""
time_unit = time_unit or "physical"
if time_unit not in (valid_time_units := ("physical", "ideal")):
raise ValueError(f"Undefined time_unit '{time_unit}'! Must be one of {valid_time_units}")
if isinstance(schedule, ScheduleBase):
return _determine_absolute_timing_schedule(schedule, time_unit, config)
elif isinstance(schedule, ControlFlowOperation):
schedule.body = _determine_absolute_timing(schedule.body, time_unit, config)
return schedule
elif schedule.duration is None:
raise RuntimeError(
f"Cannot determine timing for operation {schedule.name}. Operation data: {schedule!r}"
)
else:
return schedule
[docs]
def _determine_absolute_timing_schedule(
schedule: Schedule,
time_unit: Literal["physical", "ideal", None],
config: CompilationConfig | None,
) -> Schedule:
scheduling_strategy = _determine_scheduling_strategy(config)
if not schedule.schedulables:
raise ValueError(f"schedule '{schedule.name}' contains no schedulables.")
for op_key in schedule.operations:
if isinstance(schedule.operations[op_key], Schedule):
if schedule.operations[op_key].get("duration", None) is None:
schedule.operations[op_key] = _determine_absolute_timing(
schedule=schedule.operations[op_key],
time_unit=time_unit,
config=config,
)
elif isinstance(schedule.operations[op_key], ControlFlowOperation):
schedule.operations[op_key] = _determine_absolute_timing(
schedule=schedule.operations[op_key],
time_unit=time_unit,
config=config,
)
# Note: type checker cannot reason that schedule.operations[op_key] can only be
# an Operation after the `or`.
elif isinstance(schedule.operations[op_key], Schedule) or (
time_unit == "physical"
and not schedule.operations[op_key].valid_pulse # type: ignore
and not schedule.operations[op_key].valid_acquisition # type: ignore
# TODO (SE-650): move to qblox backend.
and not isinstance(schedule.operations[op_key], InlineQ1ASM)
):
# Gates do not have a defined duration, so only ideal timing is defined
raise RuntimeError(
f"Operation {schedule.operations[op_key].name} is not a valid pulse or acquisition."
f" Please check whether the device compilation has been performed."
f" Operation data: {schedule.operations[op_key]!r}"
)
_make_timing_constraints_explicit(schedule, scheduling_strategy)
references_graph = _populate_references_graph(schedule)
_validate_schedulable_references(schedule, references_graph)
schedulables_sorted_by_reference = nx.topological_sort(references_graph)
for i, schedulable_name in enumerate(schedulables_sorted_by_reference):
i: int
schedulable_name: str
schedulable: Schedulable = schedule.schedulables[schedulable_name]
timing_constraints: list[TimingConstraint] = schedulable.data["timing_constraints"]
operation: Operation | Schedule = schedule.operations[schedulable.data["operation_id"]]
if i == 0:
schedulable.data["abs_time"] = 0.0
else:
schedulable.data["abs_time"] = _get_start_time(
schedule, timing_constraints[0], operation, time_unit
)
schedule = _normalize_absolute_timing(schedule)
schedule["duration"] = schedule.get_schedule_duration()
if time_unit == "ideal":
schedule["depth"] = schedule["duration"] + 1
return schedule
[docs]
def _determine_scheduling_strategy(config: CompilationConfig | None = None) -> SchedulingStrategy:
if config is not None and config.device_compilation_config is not None:
return config.device_compilation_config.scheduling_strategy
return SchedulingStrategy.ASAP
[docs]
def _validate_schedulable_references(schedule: Schedule, references_graph: nx.DiGraph) -> None:
"""Check the schedulable references for circular references."""
for node in references_graph.nodes:
if node not in schedule.schedulables:
raise ValueError(f"Node {node} not found in schedulables.")
if not nx.is_directed_acyclic_graph(references_graph):
raise TypeError(
"`schedulable_references` is not a Directed Acyclic Graph. This is most likely "
"caused by a circular reference in the Timing Constraints."
)
[docs]
def _populate_references_graph(schedule: Schedule) -> nx.DiGraph:
"""Add nodes and edges to the graph containing schedulable references."""
graph = nx.DiGraph()
# Add nodes
graph.add_nodes_from(schedule.schedulables.keys())
# Add edges
for schedulable_name, schedulable in schedule.schedulables.items():
schedulable_name: str
schedulable: Schedulable
graph.add_edges_from(
(timing_constraint.ref_schedulable, schedulable_name)
for timing_constraint in schedulable.data["timing_constraints"]
if timing_constraint.ref_schedulable is not None
)
return graph
[docs]
def _make_timing_constraints_explicit(schedule: Schedule, strategy: SchedulingStrategy) -> None:
default_schedulable_by_schedulable: list[tuple[str, str | None]] = (
_determine_default_ref_schedulables_by_schedulable(schedule, strategy)
)
for (
schedulable_name,
default_reference_schedulable_name,
) in default_schedulable_by_schedulable:
schedulable_name: str
default_reference_schedulable_name: str | None
_make_timing_constraints_explicit_for_schedulable(
schedule=schedule,
schedulable_name=schedulable_name,
default_reference_schedulable_name=default_reference_schedulable_name,
strategy=strategy,
)
[docs]
def _make_timing_constraints_explicit_for_schedulable(
schedule: Schedule,
schedulable_name: str,
default_reference_schedulable_name: str | None,
strategy: SchedulingStrategy,
) -> None:
schedulable: Schedulable = schedule.schedulables[schedulable_name]
given_timing_constraints: list[TimingConstraint] = schedulable.data["timing_constraints"]
# Support only one timing constraint for now
if len(given_timing_constraints) != 1:
raise NotImplementedError("Only exactly one timing constraint per Schedulable supported.")
timing_constraint: TimingConstraint = given_timing_constraints[0]
if timing_constraint.ref_schedulable is None:
timing_constraint.ref_schedulable = default_reference_schedulable_name
if timing_constraint.ref_pt is None:
timing_constraint.ref_pt = _determine_default_ref_pt(strategy)
if timing_constraint.ref_pt_new is None:
timing_constraint.ref_pt_new = _determine_default_ref_pt_new(strategy)
if timing_constraint.rel_time is None:
timing_constraint.rel_time = 0.0
[docs]
def _determine_default_ref_pt(strategy: SchedulingStrategy) -> Literal["start", "end"]:
if strategy == SchedulingStrategy.ASAP:
return "end"
if strategy == SchedulingStrategy.ALAP:
return "start"
raise ValueError(f"Cannot determine default `ref_pt`. Unknown scheduling strategy: {strategy}")
[docs]
def _determine_default_ref_pt_new(strategy: SchedulingStrategy) -> Literal["start", "end"]:
if strategy == SchedulingStrategy.ASAP:
return "start"
if strategy == SchedulingStrategy.ALAP:
return "end"
raise ValueError(
f"Cannot determine default `ref_pt_new`. Unknown scheduling strategy: {strategy}"
)
[docs]
def _determine_default_ref_schedulables_by_schedulable(
schedule: Schedule, strategy: SchedulingStrategy
) -> list[tuple[str, str | None]]:
schedulable_names: list[str] = list(schedule.schedulables)
if strategy == SchedulingStrategy.ASAP:
default_schedulable_names: list[str | None] = [None] + list(schedule.schedulables)[:-1]
elif strategy == SchedulingStrategy.ALAP:
default_schedulable_names: list[str | None] = list(schedule.schedulables)[1:] + [None]
else:
raise ValueError(f"Scheduling strategy {strategy} not one of `ASAP` or `ALAP`.")
return [
(schedulable_name, default_schedulable_name)
for schedulable_name, default_schedulable_name in zip(
schedulable_names, default_schedulable_names
)
]
[docs]
def _get_start_time(
schedule: Schedule,
t_constr: TimingConstraint,
curr_op: Operation | Schedule,
time_unit: Literal["physical", "ideal", None],
) -> float:
assert t_constr.ref_schedulable is not None
ref_schedulable: Schedulable = schedule.schedulables[t_constr.ref_schedulable]
ref_op: Operation | Schedule = schedule.operations[ref_schedulable["operation_id"]]
# duration = 1 is useful when e.g., drawing a circuit diagram.
if time_unit == "physical":
duration_ref_op = ref_op.duration
else:
duration_ref_op = (
ref_op.body.get("depth", 1)
if isinstance(ref_op, ControlFlowOperation)
else ref_op.get("depth", 1)
)
# Type checker does not know that ref_op.duration is not None if time_unit ==
# "physical"
assert duration_ref_op is not None
ref_pt = t_constr.ref_pt or "end"
if ref_pt == "start":
t0 = ref_schedulable["abs_time"]
elif ref_pt == "center":
t0 = ref_schedulable["abs_time"] + duration_ref_op / 2
elif ref_pt == "end":
t0 = ref_schedulable["abs_time"] + duration_ref_op
else:
raise NotImplementedError(f'Timing "{ref_pt=}" not supported by backend.')
if time_unit == "physical":
duration_new_op = curr_op.duration
else:
duration_new_op = (
curr_op.body.get("depth", 1)
if isinstance(curr_op, ControlFlowOperation)
else curr_op.get("depth", 1)
)
assert duration_new_op is not None
ref_pt_new = t_constr.ref_pt_new or "start"
if ref_pt_new == "start":
abs_time = t0 + t_constr.rel_time
elif ref_pt_new == "center":
abs_time = t0 + t_constr.rel_time - duration_new_op / 2
elif ref_pt_new == "end":
abs_time = t0 + t_constr.rel_time - duration_new_op
else:
raise NotImplementedError(f'Timing "{ref_pt_new=}" not supported by backend.')
return abs_time
[docs]
def _normalize_absolute_timing(
schedule: Schedule,
config: CompilationConfig | None = None, # noqa: ARG001
) -> Schedule:
# TODO: Support normalization of absolute timing in subschedules
# See test_negative_absolute_timing_is_normalized_with_subschedule in test_compilation.py
# and https://gitlab.com/quantify-os/quantify-scheduler/-/issues/489
min_time = min(schedulable["abs_time"] for schedulable in schedule.schedulables.values())
if min_time < 0:
for schedulable in schedule.schedulables.values():
schedulable["abs_time"] -= min_time
return schedule
[docs]
def validate_config(config: dict, scheme_fn: str) -> bool:
"""
Validate a configuration using a schema.
Parameters
----------
config
The configuration to validate
scheme_fn
The name of a json schema in the quantify_scheduler.schemas folder.
Returns
-------
:
True if valid
"""
scheme = load_json_schema(__file__, scheme_fn)
validate_json(config, scheme)
return True
[docs]
def plot_schedulable_references_graph(schedule: Schedule) -> None:
"""
Show the schedulable reference graph.
Can be used as a debugging tool to spot any circular references.
"""
graph = _populate_references_graph(schedule)
nx.draw(graph, with_labels=True)