Skip to main content

PipelineController

The PipelineController Class#

Create the PipelineController, where you will define the pipeline's execution logic:

from clearml import PipelineController
pipe = PipelineController(
name="Pipeline Controller", project="Pipeline example", version="1.0.0"
)
  • name - The name for the pipeline controller task
  • project - The ClearML project where the pipeline tasks will be created.
  • version - Numbered version string (e.g. 1.2.3). When auto_version_bump is set to True, the version number will be automatically bumped if the same version already exists and the code has changed

See PipelineController for all arguments.

Pipeline Parameters#

You can define parameters for controlling different pipeline runs:

pipe.add_parameter(
name='pickle_url',
description='url to pickle file',
default='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl'
)
  • name - Parameter name
  • default - Parameter’s default value (this value can later be changed in the UI)
  • description - String description of the parameter and its usage in the pipeline

These parameters can be programmatically injected into a step’s configuration using the following format: "${pipeline.<parameter_name>}".

When launching a new pipeline run from the UI, you can modify their values for the new run.

Pipeline new run

Pipeline Steps#

Once you have a PipelineController object, add steps to it. These steps can be existing ClearML tasks or functions in your code. When the pipeline runs, the controller will launch the steps according to the specified structure.

Steps from Tasks#

Creating a pipeline step from an existing ClearML task means that when the step is run, the task will be cloned, and a new task will be launched through the configured execution queue (the original task is unmodified). The new task’s parameters can be specified.

Task steps are added using the PipelineController.add_step method:

pipe.add_step(
name='stage_process',
parents=['stage_data', ],
base_task_project='examples',
base_task_name='pipeline step 2 process dataset',
parameter_override={
'General/dataset_url': '${stage_data.artifacts.dataset.url}',
'General/test_size': 0.25},
pre_execute_callback=pre_execute_callback_example,
post_execute_callback=post_execute_callback_example
)
  • name - Unique name for the step. This step can be referenced by any proceeding steps in the pipeline using its name.
  • One of the following:
    • base_task_project and base_task_name - Project and name of the base task to clone
    • base_task_id - ID of the base task to clone
  • cache_executed_step – If True, the controller will check if an identical task with the same parameters was already executed. If it was found, its outputs will be used instead of launching a new task.
  • execution_queue (Optional) - the queue to use for executing this specific step. If not provided, the task will be sent to the default execution queue, as defined on the class
  • parents – Optional list of parent steps in the pipeline. The current step in the pipeline will be sent for execution only after all the parent steps have been executed successfully.
  • parameter_override - Dictionary of parameters and values to override in the current step. See parameter_override.
  • configuration_overrides - Dictionary of configuration objects and values to override in the current step. See configuration_overrides
  • monitor_models, monitor_metrics, monitor_artifacts - see here.

See add_step for all arguments.

parameter_override#

Use the parameter_override argument to modify the step’s parameter values. The parameter_override dictionary key is the task parameter’s full path, which includes the parameter section's name and the parameter name separated by a slash (e.g. 'General/dataset_url'). Passing "${}" in the argument value allows you to reference input/output configurations from other pipeline steps. For example: "${<step_name>.id}" will be converted to the Task ID of the referenced pipeline step.

Examples:

  • Artifact URL access: '${<step_name>.artifacts.<artifact_name>.url}'
  • Model access URL access: '${<step_name>.models.output.-1.url}'
  • Different step parameter access: '${<step_name>.parameters.Args/input_file}'
  • Pipeline parameters (see adding pipeline parameters): '${pipeline.<pipeline_parameter>}'

configuration_overrides#

You can override a step’s configuration object by passing either a string representation of the content of the configuration object, or a configuration dictionary.

Examples:

  • Configuration dictionary: configuration_overrides={"my_config": {"key": "value"}}
  • Configuration file: configuration_overrides={"my_config": open("config.txt", "rt").read()}

Steps from Functions#

Creating a pipeline step from a function means that when the function is called, it will be transformed into a ClearML task, translating its arguments into parameters, and returning values into artifacts.

Function steps are added using the PipelineController.add_function_step method:

pipe.add_function_step(
name='step_one',
function=step_one,
function_kwargs=dict(pickle_data_url='${pipeline.url}'),
function_return=['data_frame'],
cache_executed_step=True,
)
pipe.add_function_step(
name='step_two',
# parents=['step_one'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
function=step_two,
function_kwargs=dict(data_frame='${step_one.data_frame}'),
function_return=['processed_data'],
cache_executed_step=True,
)
  • name - The pipeline step’s name. This name can be referenced in subsequent steps
  • function - A global function to be used as a pipeline step, which will be converted into a standalone task
  • function_kwargs (optional) - A dictionary of function arguments and default values which are translated into task hyperparameters. If not provided, all function arguments are translated into hyperparameters.
  • function_return - The names for storing the pipeline step’s returned objects as artifacts in its ClearML task.
  • cache_executed_step - If True, the controller checks if an identical task with the same parameters was already executed. If it was found, its outputs are used instead of launching a new task.
  • parents – Optional list of parent steps in the pipeline. The current step in the pipeline will be sent for execution only after all the parent steps have been executed successfully.
  • pre_execute_callback & post_execute_callback - Control pipeline flow with callback functions that can be called before and/or after a step’s execution. See here.
  • monitor_models, monitor_metrics, monitor_artifacts - see here.

See add_function_step for all arguments.

Important Arguments#

pre_execute_callback & post_execute_callback#

Callbacks can be utilized to control pipeline execution flow.

A pre_execute_callback function is called when the step is created and before it is sent for execution. This allows a user to modify the task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. Parameters are the configuration arguments passed to the ClearmlJob.

If the callback returned value is False, the step is skipped and so is any step in the pipeline that relies on this step.

Notice the parameters are already parsed (e.g. ${step1.parameters.Args/param} is replaced with relevant value).

def step_created_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
parameters, # type: dict
):
pass

A post_execute_callback function is called when a step is completed. It allows you to modify the step’s status after completion.

def step_completed_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
):
pass

Models, Artifacts, and Metrics#

You can enable automatic logging of a step’s metrics /artifacts / models to the pipeline task using the following arguments:

  • monitor_metrics (Optional) - Automatically log the step's reported metrics also on the pipeline Task. The expected format is one of the following:
    • List of pairs metric (title, series) to log: [(step_metric_title, step_metric_series), ]. Example: [('test', 'accuracy'), ]
    • List of tuple pairs, to specify a different target metric to use on the pipeline Task: [((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ]. Example: [[('test', 'accuracy'), ('model', 'accuracy')], ]
  • monitor_artifacts (Optional) - Automatically log the step's artifacts on the pipeline Task.
    • Provided a list of artifact names created by the step function, these artifacts will be logged automatically also on the Pipeline Task itself. Example: ['processed_data', ] (target artifact name on the Pipeline Task will have the same name as the original artifact).
    • Alternatively, provide a list of pairs (source_artifact_name, target_artifact_name), where the first string is the artifact name as it appears on the step Task, and the second is the target artifact name to put on the Pipeline Task. Example: [('processed_data', 'final_processed_data'), ]
  • monitor_models (Optional) - Automatically log the step's output models on the pipeline Task.
    • Provided a list of model names created by the step's Task, they will also appear on the Pipeline itself. Example: ['model_weights', ]
    • To select the latest (lexicographic) model, use model_*, or the last created model with just *. Example: ['model_weights_*', ]
    • Alternatively, provide a list of pairs (source_model_name, target_model_name), where the first string is the model name as it appears on the step Task, and the second is the target model name to put on the Pipeline Task. Example: [('model_weights', 'final_model_weights'), ]

You can also directly upload a model or an artifact from the step to the pipeline controller, using the PipelineController.upload_model and PipelineController.upload_artifact methods respectively.

Controlling Pipeline Execution#

Default Execution Queue#

The PipelineController.set_default_execution_queue method lets you set a default queue through which all pipeline steps will be executed. Once set, step-specific overrides can be specified through execution_queue of the PipelineController.add_step or PipelineController.add_function_step methods.

Running the Pipeline#

Run the pipeline by using one of the following methods:

  • PipelineController.start - launches the pipeline controller through the services queue, unless otherwise specified. The pipeline steps are enqueued to their respective queues or in the default execution queue.
  • PipelineController.start_locally - launches the pipeline controller locally. To run the pipeline steps locally as well, pass run_pipeline_steps_locally=True.