mlflow

mlflow#

class fkat.pytorch.schedule.mlflow.HasTag(tag: str, schedule: Schedule)[source]#

A schedule that activates only when a specific MLflow tag is present AND the trigger schedule is satisfied.

This schedule combines another schedule (the trigger schedule) with MLflow tag validation. It allows callbacks to be dynamically enabled or disabled through experiment configuration rather than code changes, which is particularly useful for performance-intensive callbacks like FLOP measurement or detailed logging that should only run conditionally.

The schedule checks two conditions: 1. If the trigger schedule is satisfied 2. If the specified MLflow tag exists in the current MLflow run

Both conditions must be true for the schedule to activate. If the trigger schedule doesn’t activate or the trainer is not provided, the schedule will never activate.

Note

  • The trainer can be optionally provided to the check method for MLflow tag validation. If trainer is None, the schedule will never activate.

  • Tag checking occurs only when the trigger schedule condition is already satisfied, minimizing MLflow API calls.

  • If an exception occurs during tag checking, it will be logged and the schedule will not activate.

Example

Python code example:

# Create a schedule that checks every 5 batches if the 'enable_flops' tag exists
from fkat.pytorch.schedule import Every

trigger = Every(n_batches=5)
flops_schedule = HasTag(tag="enable_flops", schedule=trigger)
flops_callback = Flops(schedule=flops_schedule)
trainer = L.Trainer(callbacks=[flops_callback])

Hydra configuration example:

# In your config.yaml file
callbacks:
  - _target_: fkat.pytorch.callbacks.profiling.Flops
    schedule:
      _target_: fkat.pytorch.schedule.mlflow.HasTag
      tag: ENABLE_FLOPS
      schedule:
        _target_: fkat.pytorch.schedule.Every
        n_steps: 20

# Another example using Fixed schedule
callbacks:
  - _target_: fkat.pytorch.callbacks.heartbeat.Heartbeat
    schedule:
      _target_: fkat.pytorch.schedule.mlflow.HasTag
      tag: ENABLE_HEARTBEAT
      schedule:
        _target_: fkat.pytorch.schedule.Fixed
        warmup_steps: 100
        active_steps: 1000

# Example with Elapsed time-based schedule
callbacks:
  - _target_: fkat.pytorch.callbacks.custom_logging.DetailedMetrics
    schedule:
      _target_: fkat.pytorch.schedule.mlflow.HasTag
      tag: DETAILED_LOGGING
      schedule:
        _target_: fkat.pytorch.schedule.Elapsed
        interval: ${timedelta:minutes=15}
check(*, stage: Optional[str] = None, batch_idx: Optional[int] = None, step: Optional[int] = None, trainer: Optional[Trainer] = None) bool[source]#

Check if the schedule should activate based on the trigger schedule and MLflow tag presence.

This method first checks if the trigger schedule is satisfied. If this condition is met, it then checks if the specified MLflow tag exists in the current run. Both conditions must be true for the method to return True.

Parameters:
  • stage (str, optional) – Current training stage (e.g., “train”, “validate”, “test”). Passed to the trigger schedule.

  • batch_idx (int, optional) – Current batch index within the epoch. Passed to the trigger schedule.

  • step (int, optional) – Current global step (cumulative across epochs). Passed to the trigger schedule.

  • trainer (L.Trainer, optional) – The Lightning Trainer instance. Required for MLflow tag validation.

Returns:

True if both the trigger schedule is satisfied AND the specified tag is present,

False otherwise.

Return type:

bool

Note

  • The trainer must be provided for MLflow tag validation.

  • Tag checking occurs only when the trigger schedule is already satisfied.

  • If an exception occurs during tag checking, it will be logged as a warning and the method will return False.