PipelineDecorator
Creating Pipelines Using Function Decorators
Use the PipelineDecorator
class to create pipelines from your existing functions. Use @PipelineDecorator.component
to denote functions that comprise the steps of your pipeline, and @PipelineDecorator.pipeline
for your main pipeline execution logic function.
@PipelineDecorator.pipeline
Using the @PipelineDecorator.pipeline
decorator transforms the function which implements your pipeline's execution logic to a ClearML pipeline controller. Since
the function is transformed into an independently executed task, it needs to be self-contained. To facilitate this,
all package imports inside the function are automatically logged as required packages for the pipeline controller.
In the case your pipeline is implemented across multiple files, make sure the pipeline step implementation (files containing
functions decorated with @PipelineDecorator.component
) is imported before @PipelineDecorator.pipeline
.
@PipelineDecorator.pipeline(
name='pipeline', project='examples', version='0.1',
args_map={'General':['pickle_url'], 'Mock':['mock_parameter']}
)
def main(pickle_url, mock_parameter='mock'):
data_frame = step_one(pickle_url)
X_train, X_test, y_train, y_test = step_two(data_frame)
model = step_three(X_train, y_train)
accuracy = 100 * step_four(model, X_data=X_test, Y_data=y_test)
print(f"Accuracy={accuracy}%")
Arguments
name
- The name for the pipeline controller taskproject
- The ClearML project where the pipeline controller task is storedversion
- Numbered version string (e.g. 1.2.3). Ifauto_version_bump
is set toTrue
, the version number is automatically bumped if the same version already exists and the pipeline code has changeddefault_queue
- The default ClearML Queue in which to enqueue all pipeline steps (unless otherwise specified in the pipeline step).args_map
- Map arguments to their configuration section in the following format:{'section_name':['param_name']]}
. For example, the pipeline in the code above will store thepickle_url
parameter in theGeneral
section andmock_parameter
in theMock
section. By default, arguments will be stored in theArgs
section.pool_frequency
- The polling frequency (in minutes) for monitoring experiments / states.add_pipeline_tags
- IfTrue
, addpipe: <pipeline_task_id>
tag to all steps (Tasks) created by this pipeline (this is useful to create better visibility in projects with multiple pipelines, and for easy selection) (default:False
).target_project
- If provided, all pipeline steps are cloned into the target project. If not provided, pipeline steps are stored on the same project as the pipeline itself. Target sub-folder allows for easier organization of pipeline execution logic (Pipeline Task) and step execution Tasks. Example: "pipeline/component_execution".abort_on_failure
- IfFalse
(default), failed pipeline steps will not cause the pipeline to stop immediately. Instead, any step that is not connected (or indirectly connected) to the failed step, will still be executed. Nonetheless, the pipeline itself will be marked failed (unless the failed step was specifically defined withcontinue_on_fail=True
). IfTrue
, any failed step will cause the pipeline to immediately abort, stopping all running steps, and marking the pipeline as failed.pipeline_execution_queue
- The queue in which to enqueue the pipeline controller task. The default value is theservices
queue. To run the pipeline logic locally while the components are executed remotely, passpipeline_execution_queue=None
When the function is called, a corresponding ClearML Controller Task is created: its arguments are logged as the task’s parameters. When launching a new pipeline run from the UI, you can modify their values for the new run.
@PipelineDecorator.component
Using the @PipelineDecorator.component
decorator transforms a function into a ClearML pipeline step when called from a pipeline controller.
When the pipeline controller calls a pipeline step, a corresponding ClearML task is created. For this reason, each function which makes up a pipeline step needs to be self-contained. All package imports inside the function are automatically logged as required packages for the pipeline execution step.
from clearml.automation.controller import PipelineDecorator
@PipelineDecorator.component(return_values=['data_frame'], cache=True)
def step_one(pickle_data_url: str, extra: int = 43):
import sklearn # noqa
import pickle
import pandas as pd
from clearml import StorageManager
local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url)
with open(local_iris_pkl, 'rb') as f:
iris = pickle.load(f)
data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names'])
data_frame.columns += ['target']
data_frame['target'] = iris['target']
return data_frame
Arguments
return_values
- The artifact names for the step’s corresponding ClearML task to store the step’s returned objects. In the example above, a single object is returned and stored as an artifact nameddata_frame
name
(optional) - The name for the pipeline step. If not provided, the function name is usedcache
- IfTrue
, the pipeline controller checks if a step with the same code (including setup, see task Execution section) and input arguments was already executed. If found, the cached step's outputs are used instead of rerunning the step.packages
- A list of required packages or a local requirements.txt file. Example:["tqdm>=2.1", "scikit-learn"]
or"./requirements.txt"
. If not provided, packages are automatically added based on the imports used inside the function.execution_queue
(Optional) - Queue in which to enqueue the specific step. This overrides the queue set with thePipelineDecorator.set_default_execution_queue method
method.continue_on_fail
- IfTrue
, a failed step does not cause the pipeline to stop (or marked as failed). Notice, that steps that are connected (or indirectly connected) to the failed step are skipped (defaultFalse
)docker
- Specify the Docker image to be used when executing the pipeline step remotelydocker_args
- Add Docker execution arguments for the remote execution (use single string for all docker arguments).docker_bash_setup_script
- Add a bash script to be executed inside the docker before setting up the Task's environmenttask_type
(optional) - The task type to be createdrepo
(optional) - Specify a repository to attach to the function when remotely executing. Allow users to execute the function inside the specified repository, enabling them to load modules / script from the repository. Notice the execution work directory will be the repository root folder. Supports both git repo url link, and local repository path (automatically converted into the remote git / commit as is currently checkout).- Examples:
- remote url:
"https://github.com/user/repo.git"
- local repo copy:
"./repo"
-> will automatically store the remote repo url and commit ID based on the locally cloned copy
- remote url:
- Examples:
repo_branch
(Optional) - Specify the remote repository branch (Ignored, if local repo path is used)repo_commit
(Optional) - Specify the repository commit ID (Ignored, if local repo path is used)helper_functions
(Optional) - A list of helper functions to make available for the standalone pipeline step. By default, the pipeline step function has no access to any of the other functions, by specifying additional functions here, the remote pipeline step could call the additional functions. Example, assuming you have two functions,parse_data()
andload_data()
:[parse_data, load_data]
parents
– Optional list of parent steps in the pipeline. The current step in the pipeline will be sent for execution only after all the parent steps have been executed successfully.retry_on_failure
- Number of times to retry step in case of failure. You can also input a callable function in the following format:def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# allow up to 5 retries (total of 6 runs)
return retriesThe callback function takes the following parameters:
- PipelineController instance
- PipelineController.Node that failed
- Number of times to retry the step if it fails
The function must return a boolean value. If it returns
True
, the node is retried, and the number of retries is decremented by 1. If the function returnsFalse
, the node is not retried.Callbacks - Control pipeline execution flow with callback functions
pre_execute_callback
&post_execute_callback
- Control pipeline flow with callback functions that can be called before and/or after a step’s execution. See here.status_change_callback
- Callback function called when the status of a step changes. Usenode.job
to access theClearmlJob
object, ornode.job.task
to directly access the Task object. The signature of the function must look like this:def status_change_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
previous_status # type: str
):
pass
Additionally, you can enable automatic logging of a step’s metrics / artifacts / models to the pipeline task using the following arguments:
monitor_metrics
(Optional) - Automatically log the step's reported metrics also on the pipeline Task. The expected format is one of the following:- List of pairs metric (title, series) to log: [(step_metric_title, step_metric_series), ]. Example:
[('test', 'accuracy'), ]
- List of tuple pairs, to specify a different target metric to use on the pipeline Task: [((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ].
Example:
[[('test', 'accuracy'), ('model', 'accuracy')], ]
- List of pairs metric (title, series) to log: [(step_metric_title, step_metric_series), ]. Example:
monitor_artifacts
(Optional) - Automatically log the step's artifacts on the pipeline Task.- Provided a list of
artifact names created by the step function, these artifacts will be logged automatically also on the Pipeline Task
itself. Example:
['processed_data', ]
(target artifact name on the Pipeline Task will have the same name as the original artifact). - Alternatively, provide a list of pairs (source_artifact_name, target_artifact_name), where the first string is the
artifact name as it appears on the component Task, and the second is the target artifact name to put on the Pipeline
Task. Example:
[('processed_data', 'final_processed_data'), ]
- Provided a list of
artifact names created by the step function, these artifacts will be logged automatically also on the Pipeline Task
itself. Example:
monitor_models
(Optional) - Automatically log the step's output models on the pipeline Task.- Provided a list of model names created by the step's Task, they will also appear on the Pipeline itself. Example:
['model_weights', ]
- To select the latest (lexicographic) model use
model_*
, or the last created model with just*
. Example:['model_weights_*', ]
- Alternatively, provide a list of pairs (source_model_name, target_model_name), where the first string is the model
name as it appears on the component Task, and the second is the target model name to put on the Pipeline Task.
Example:
[('model_weights', 'final_model_weights'), ]
- Provided a list of model names created by the step's Task, they will also appear on the Pipeline itself. Example:
You can also control a pipeline component's automatic logging using the following parameters:
auto_connect_frameworks
- Control a component's framework logging. You can completely disable framework logging, or specify which frameworks to log. SeeTask.init
'sauto_connect_framework
parameterauto_connect_arg_parser
- control automatic logging of argparse objects. SeeTask.init
'sauto_connect_arg_parser
parameter
You can also directly upload a model or an artifact from the step to the pipeline controller, using the
PipelineDecorator.upload_model
and PipelineDecorator.upload_artifact
methods respectively.
Controlling Pipeline Execution
Default Execution Queue
The PipelineDecorator.set_default_execution_queue
method lets you set a default queue through which all pipeline steps
will be executed. Once set, step-specific overrides can be specified through the @PipelineDecorator.component
decorator.
Running the Pipeline
To run the pipeline, call the pipeline controller function.
ClearML pipelines can be run in one of the following modes:
Remote Mode
The remote mode is the pipeline controller's default mode. In this mode, the pipeline controller logic is executed on the services queue, and all the pipeline steps are launched remotely on their respective queues.
Example:
if __name__ == '__main__':
executing_pipeline(pickle_url='https://example.com/iris_dataset.pkl')
print('pipeline completed')
It is possible to run the pipeline logic itself locally, while keeping the pipeline components execution remote
(enqueued and executed by the clearml-agent). Pass pipeline_execution_queue=None
to the @PipelineDecorator.pipeline
decorator.
@PipelineDecorator.pipeline(
name='custom pipeline logic', project='examples', version='0.0.5', pipeline_execution_queue=None
)
Debugging Mode
In debugging mode, the pipeline controller and all components are treated as regular python functions, with components
called synchronously. This mode is great to debug the components and design the pipeline as the entire pipeline is
executed on the developer machine with full ability to debug each function call. Call PipelineDecorator.debug_pipeline
before the main pipeline logic function call.
Example:
if __name__ == '__main__':
PipelineDecorator.debug_pipeline()
executing_pipeline(pickle_url='https://example.com/iris_dataset.pkl')
print('pipeline completed')
Local Mode
In local mode, the pipeline controller creates Tasks for each component, and component functions calls are translated
into sub-processes running on the same machine. Notice that the data is passed between the components and the logic with
the exact same mechanism as in the remote mode (i.e. hyperparameters / artifacts), with the exception that the execution
itself is local. Notice that each subprocess is using the exact same python environment as the main pipeline logic. Call
PipelineDecorator.run_locally
before the main pipeline logic function.
Example:
if __name__ == '__main__':
PipelineDecorator.run_locally()
executing_pipeline(pickle_url='https://example.com/iris_dataset.pkl')
print('pipeline completed')