mlflow#
- class fkat.pytorch.schedule.mlflow.HasTag(tag: str, schedule: Schedule)[source]#
A schedule that activates only when a specific MLflow tag is present AND the trigger schedule is satisfied.
This schedule combines another schedule (the trigger schedule) with MLflow tag validation. It allows callbacks to be dynamically enabled or disabled through experiment configuration rather than code changes, which is particularly useful for performance-intensive callbacks like FLOP measurement or detailed logging that should only run conditionally.
The schedule checks two conditions: 1. If the trigger schedule is satisfied 2. If the specified MLflow tag exists in the current MLflow run
Both conditions must be true for the schedule to activate. If the trigger schedule doesn’t activate or the trainer is not provided, the schedule will never activate.
Note
The trainer can be optionally provided to the
checkmethod for MLflow tag validation. If trainer is None, the schedule will never activate.Tag checking occurs only when the trigger schedule condition is already satisfied, minimizing MLflow API calls.
If an exception occurs during tag checking, it will be logged and the schedule will not activate.
Example
Python code example:
# Create a schedule that checks every 5 batches if the 'enable_flops' tag exists from fkat.pytorch.schedule import Every trigger = Every(n_batches=5) flops_schedule = HasTag(tag="enable_flops", schedule=trigger) flops_callback = Flops(schedule=flops_schedule) trainer = L.Trainer(callbacks=[flops_callback])
Hydra configuration example:
# In your config.yaml file callbacks: - _target_: fkat.pytorch.callbacks.profiling.Flops schedule: _target_: fkat.pytorch.schedule.mlflow.HasTag tag: ENABLE_FLOPS schedule: _target_: fkat.pytorch.schedule.Every n_steps: 20 # Another example using Fixed schedule callbacks: - _target_: fkat.pytorch.callbacks.heartbeat.Heartbeat schedule: _target_: fkat.pytorch.schedule.mlflow.HasTag tag: ENABLE_HEARTBEAT schedule: _target_: fkat.pytorch.schedule.Fixed warmup_steps: 100 active_steps: 1000 # Example with Elapsed time-based schedule callbacks: - _target_: fkat.pytorch.callbacks.custom_logging.DetailedMetrics schedule: _target_: fkat.pytorch.schedule.mlflow.HasTag tag: DETAILED_LOGGING schedule: _target_: fkat.pytorch.schedule.Elapsed interval: ${timedelta:minutes=15}
- check(*, stage: Optional[str] = None, batch_idx: Optional[int] = None, step: Optional[int] = None, trainer: Optional[Trainer] = None) bool[source]#
Check if the schedule should activate based on the trigger schedule and MLflow tag presence.
This method first checks if the trigger schedule is satisfied. If this condition is met, it then checks if the specified MLflow tag exists in the current run. Both conditions must be true for the method to return True.
- Parameters:
stage (str, optional) – Current training stage (e.g., “train”, “validate”, “test”). Passed to the trigger schedule.
batch_idx (int, optional) – Current batch index within the epoch. Passed to the trigger schedule.
step (int, optional) – Current global step (cumulative across epochs). Passed to the trigger schedule.
trainer (L.Trainer, optional) – The Lightning Trainer instance. Required for MLflow tag validation.
- Returns:
- True if both the trigger schedule is satisfied AND the specified tag is present,
False otherwise.
- Return type:
bool
Note
The trainer must be provided for MLflow tag validation.
Tag checking occurs only when the trigger schedule is already satisfied.
If an exception occurs during tag checking, it will be logged as a warning and the method will return False.