PipelineController
class PipelineController()
Pipeline controller. Pipeline is a DAG of base tasks, each task will be cloned (arguments changed as required), executed, and monitored. The pipeline process (task) itself can be executed manually or by the clearml-agent services queue. Notice: The pipeline controller lives as long as the pipeline itself is being executed.
Create a new pipeline controller. The newly created object will launch and monitor the new experiments.
Parameters
name (
str
) – Provide pipeline name (if main Task exists it overrides its name)project (
str
) – Provide project storing the pipeline (if main Task exists it overrides its project)version (
Optional
[str
]) – Pipeline version. This version allows to uniquely identify the pipeline template execution. Examples for semantic versions: version=’1.0.1’ , version=’23’, version=’1.2’. If not set, find the latest version of the pipeline and increment it. If no such version is found, default to ‘1.0.0’pool_frequency (float ) – The pooling frequency (in minutes) for monitoring experiments / states.
add_pipeline_tags (bool ) – (default: False) if True, add pipe: <pipeline_task_id> tag to all steps (Tasks) created by this pipeline.
target_project (str ) – If provided, all pipeline steps are cloned into the target project. If True, pipeline steps are stored into the pipeline project
auto_version_bump (bool ) – (Deprecated) If True, if the same pipeline version already exists (with any difference from the current one), the current pipeline version will be bumped to a new version version bump examples: 1.0.0 -> 1.0.1 , 1.2 -> 1.3, 10 -> 11 etc.
abort_on_failure (bool ) – If False (default), failed pipeline steps will not cause the pipeline to stop immediately, instead any step that is not connected (or indirectly connected) to the failed step, will still be executed. Nonetheless the pipeline itself will be marked failed, unless the failed step was specifically defined with “continue_on_fail=True”. If True, any failed step will cause the pipeline to immediately abort, stop all running steps, and mark the pipeline as failed.
add_run_number (
bool
) – If True (default), add the run number of the pipeline to the pipeline name. Example, the second time we launch the pipeline “best pipeline”, we rename it to “best pipeline #2”retry_on_failure (
Union
[int
,Callable
[[PipelineController
,Node
,int
],bool
],None
]) – Integer (number of retries) or Callback function that returns True to allow a retryInteger: In case of node failure, retry the node the number of times indicated by this parameter.
Callable: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int representing the number of previous retries for the node that failed. The function must return
True
if the node should be retried andFalse
otherwise. If True, the node will be re-queued and the number of retries left will be decremented by 1. By default, if this callback is not specified, the function will be retried the number of times indicated by retry_on_failure.def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# allow up to 5 retries (total of 6 runs)
return retries < 5
docker (
Optional
[str
]) – Select the docker image to be executed in by the remote sessiondocker_args (
Optional
[str
]) – Add docker arguments, pass a single stringdocker_bash_setup_script (
Optional
[str
]) – Add bash script to be executed inside the docker before setting up the Task’s environmentpackages (
Union
[str
,Sequence
[str
],None
]) – Manually specify a list of required packages or a local requirements.txt file. Example: [“tqdm>=2.1”, “scikit-learn”] or “./requirements.txt” If not provided, packages are automatically added.repo (
Optional
[str
]) – Optional, specify a repository to attach to the pipeline controller, when remotely executing. Allow users to execute the controller inside the specified repository, enabling them to load modules/script from the repository. Notice the execution work directory will be the repository root folder. Supports both git repo url link, and local repository path (automatically converted into the remote git/commit as is currently checkout). Example remote url: ‘https://github.com/user/repo.git’ Example local repo copy: ‘./repo’ -> will automatically store the remote repo url and commit ID based on the locally cloned copy Use empty string (“”) to disable any repository auto-detectionrepo_branch (
Optional
[str
]) – Optional, specify the remote repository branch (Ignored, if local repo path is used)repo_commit (
Optional
[str
]) – Optional, specify the repository commit ID (Ignored, if local repo path is used)artifact_serialization_function (
Optional
[Callable
[[Any
],Union
[bytes
,bytearray
]]]) – A serialization function that takes one parameter of any type which is the object to be serialized. The function should return a bytes or bytearray object, which represents the serialized object. All parameter/return artifacts uploaded by the pipeline will be serialized using this function. All relevant imports must be done in this function. For example:def serialize(obj):
import dill
return dill.dumps(obj)artifact_deserialization_function (
Optional
[Callable
[[bytes
],Any
]]) – A deserialization function that takes one parameter of type bytes, which represents the serialized object. This function should return the deserialized object. All parameter/return artifacts fetched by the pipeline will be deserialized using this function. All relevant imports must be done in this function. For example:def deserialize(bytes_):
import dill
return dill.loads(bytes_)
add_function_step
add_function_step(name, function, function_kwargs=None, function_return=None, project_name=None, task_name=None, task_type=None, auto_connect_frameworks=None, auto_connect_arg_parser=None, packages=None, repo=None, repo_branch=None, repo_commit=None, helper_functions=None, docker=None, docker_args=None, docker_bash_setup_script=None, parents=None, execution_queue=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None, time_limit=None, continue_on_fail=False, pre_execute_callback=None, post_execute_callback=None, cache_executed_step=False, retry_on_failure=None, status_change_callback=None, tags=None)
Create a Task from a function, including wrapping the function input arguments into the hyper-parameter section as kwargs, and storing function results as named artifacts
Example:
def mock_func(a=6, b=9):
c = a*b
print(a, b, c)
return c, c**2
create_task_from_function(mock_func, function_return=['mul', 'square'])
Example arguments from other Tasks (artifact):
def mock_func(matrix_np):
c = matrix_np*matrix_np
print(matrix_np, c)
return c
create_task_from_function(
mock_func,
function_kwargs={'matrix_np': 'aabb1122.previous_matrix'},
function_return=['square_matrix']
)
Parameters
name (str ) – Unique of the step. For example stage1
function (Callable ) – A global function to convert into a standalone Task
function_kwargs (Optional [ Dict [ str , Any ] ] ) – Optional, provide subset of function arguments and default values to expose. If not provided automatically take all function arguments & defaults Optional, pass input arguments to the function from other Tasks’s output artifact. Example argument named numpy_matrix from Task ID aabbcc artifact name answer: {‘numpy_matrix’: ‘aabbcc.answer’}
function_return (Optional [ List [ str ] ] ) – Provide a list of names for all the results. If not provided, no results will be stored as artifacts.
project_name (Optional [ str ] ) – Set the project name for the task. Required if base_task_id is None.
task_name (Optional [ str ] ) – Set the name of the remote task, if not provided use name argument.
task_type (Optional [ str ] ) – Optional, The task type to be created. Supported values: ‘training’, ‘testing’, ‘inference’, ‘data_processing’, ‘application’, ‘monitor’, ‘controller’, ‘optimizer’, ‘service’, ‘qc’, ‘custom’
auto_connect_frameworks (Optional [ dict ] ) – Control the frameworks auto connect, see Task.init auto_connect_frameworks
auto_connect_arg_parser (Optional [ dict ] ) – Control the ArgParser auto connect, see Task.init auto_connect_arg_parser
packages (Optional [ Union [ str , Sequence [ str ] ] ] ) – Manually specify a list of required packages or a local requirements.txt file. Example: [“tqdm>=2.1”, “scikit-learn”] or “./requirements.txt” If not provided, packages are automatically added based on the imports used in the function.
repo (Optional [ str ] ) – Optional, specify a repository to attach to the function, when remotely executing. Allow users to execute the function inside the specified repository, enabling to load modules/script from a repository Notice the execution work directory will be the repository root folder. Supports both git repo url link, and local repository path. Example remote url: ‘https://github.com/user/repo.git’ Example local repo copy: ‘./repo’ -> will automatically store the remote repo url and commit ID based on the locally cloned copy
repo_branch (Optional [ str ] ) – Optional, specify the remote repository branch (Ignored, if local repo path is used)
repo_commit (Optional [ str ] ) – Optional, specify the repository commit ID (Ignored, if local repo path is used)
helper_functions (Optional [ Sequence [ Callable ] ] ) – Optional, a list of helper functions to make available for the standalone function Task.
docker (Optional [ str ] ) – Select the docker image to be executed in by the remote session
docker_args (Optional [ str ] ) – Add docker arguments, pass a single string
docker_bash_setup_script (Optional [ str ] ) – Add bash script to be executed inside the docker before setting up the Task’s environment
parents (Optional [ Sequence [ str ] ] ) – Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully.
execution_queue (Optional [ str ] ) – Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the default execution queue, as defined on the class
monitor_metrics (Optional [ List [ Union [ Tuple [ str , str ] , Tuple [ ( str , str ) , ( str , str ) ] ] ] ] ) – Optional, log the step’s metrics on the pipeline Task. Format is a list of pairs metric (title, series) to log:
[(step_metric_title, step_metric_series), ] Example: [(‘test’, ‘accuracy’), ]
Or a list of tuple pairs, to specify a different target metric for to use on the pipeline Task:
[((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ]
Example: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]monitor_artifacts (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s artifacts on the pipeline Task. Provided a list of artifact names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘processed_data’, ‘final_processed_data’), ] Alternatively user can also provide a list of artifacts to monitor (target artifact name will be the same as original artifact name) Example: [‘processed_data’, ]
monitor_models (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s output models on the pipeline Task. Provided a list of model names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘model_weights’, ‘final_model_weights’), ] Alternatively user can also provide a list of models to monitor (target models name will be the same as original model) Example: [‘model_weights’, ] To select the latest (lexicographic) model use “model*”, or the last created model with just “*” Example: [‘model_weights*’, ]
time_limit (Optional [ float ] ) – Default None, no time limit. Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.
continue_on_fail (bool ) – (default False). If True, failed step will not cause the pipeline to stop (or marked as failed). Notice, that steps that are connected (or indirectly connected) to the failed step will be skipped.
pre_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , dict ] , bool ] ] # noqa ) – Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.
If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.
Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.
def step_created_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
parameters, # type: dict
):
passpost_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node ] , None ] ] # noqa ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.
def step_completed_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
):
passcache_executed_step (bool ) – If True, before launching the new step, after updating with the latest configuration, check if an exact Task with the same parameter/code was already executed. If it was found, use it instead of launching a new Task. Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used.
retry_on_failure (Optional [ Union [ int , Callable [ [ PipelineController , PipelineController.Node , int ] , bool ] ] ] # noqa ) – Integer (number of retries) or Callback function that returns True to allow a retry
Integer: In case of node failure, retry the node the number of times indicated by this parameter.
Callable: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int representing the number of previous retries for the node that failed. The function must return
True
if the node should be retried andFalse
otherwise. If True, the node will be re-queued and the number of retries left will be decremented by 1. By default, if this callback is not specified, the function will be retried the number of times indicated by retry_on_failure.def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# allow up to 5 retries (total of 6 runs)
return retries < 5
status_change_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , str ] , None ] ] # noqa ) – Callback function, called when the status of a step (Task) changes. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. The signature of the function must look the following way:
def status_change_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
previous_status # type: str
):
passtags (Optional [ Union [ str , Sequence [ str ] ] ] ) – A list of tags for the specific pipeline step. When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
Return type
bool
Returns
True if successful
add_parameter
add_parameter(name, default=None, description=None, param_type=None)
Add a parameter to the pipeline Task. The parameter can be used as input parameter for any step in the pipeline. Notice all parameters will appear under the PipelineController Task’s Hyper-parameters -> Pipeline section Example: pipeline.add_parameter(name=’dataset’, description=’dataset ID to process the pipeline’) Then in one of the steps we can refer to the value of the parameter with ‘${pipeline.dataset}’
Parameters
name (
str
) – String name of the parameter.default (
Optional
[Any
]) – Default value to be put as the default value (can be later changed in the UI)description (
Optional
[str
]) – String description of the parameter and its usage in the pipelineparam_type (
Optional
[str
]) – Optional, parameter type information (to be used as hint for casting and description)
Return type
None
add_step
add_step(name, base_task_id=None, parents=None, parameter_override=None, configuration_overrides=None, task_overrides=None, execution_queue=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None, time_limit=None, base_task_project=None, base_task_name=None, clone_base_task=True, continue_on_fail=False, pre_execute_callback=None, post_execute_callback=None, cache_executed_step=False, base_task_factory=None, retry_on_failure=None, status_change_callback=None)
Add a step to the pipeline execution DAG. Each step must have a unique name (this name will later be used to address the step)
Parameters
name (str ) – Unique of the step. For example stage1
base_task_id (Optional [ str ] ) – The Task ID to use for the step. Each time the step is executed, the base Task is cloned, then the cloned task will be sent for execution.
parents (Optional [ Sequence [ str ] ] ) – Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully.
parameter_override (Optional [ Mapping [ str , Any ] ] ) – Optional parameter overriding dictionary.
The dict values can reference a previously executed step using the following form ‘${step_name}’. Examples:
Artifact access
parameter_override={'Args/input_file': '${<step_name>.artifacts.<artifact_name>.url}' }
Model access (last model used)
parameter_override={'Args/input_file': '${<step_name>.models.output.-1.url}' }
Parameter access
parameter_override={'Args/input_file': '${<step_name>.parameters.Args/input_file}' }
Pipeline Task argument (see Pipeline.add_parameter)
parameter_override={'Args/input_file': '${pipeline.<pipeline_parameter>}' }
Task ID
parameter_override={'Args/input_file': '${stage3.id}' }
configuration_overrides (Optional [ Mapping [ str , Union [ str , Mapping ] ] ] ) – Optional, override Task configuration objects. Expected dictionary of configuration object name and configuration object content. Examples:
{‘General’: dict(key=’value’)} {‘General’: ‘configuration file content’} {‘OmegaConf’: YAML.dumps(full_hydra_dict)}
task_overrides (Optional [ Mapping [ str , Any ] ] ) – Optional task section overriding dictionary.
The dict values can reference a previously executed step using the following form ‘${step_name}’. Examples:
get the latest commit from a specific branch
task_overrides={'script.version_num': '', 'script.branch': 'main'}
match git repository branch to a previous step
task_overrides={'script.branch': '${stage1.script.branch}', 'script.version_num': ''}
change container image
task_overrides={'container.image': 'nvidia/cuda:11.6.0-devel-ubuntu20.04', 'container.arguments': '--ipc=host'}
match container image to a previous step
task_overrides={'container.image': '${stage1.container.image}'}
reset requirements (the agent will use the “requirements.txt” inside the repo)
task_overrides={'script.requirements.pip': ""}
execution_queue (Optional [ str ] ) – Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the default execution queue, as defined on the class
monitor_metrics (Optional [ List [ Union [ Tuple [ str , str ] , Tuple [ ( str , str ) , ( str , str ) ] ] ] ] ) – Optional, log the step’s metrics on the pipeline Task. Format is a list of pairs metric (title, series) to log:
[(step_metric_title, step_metric_series), ] Example: [(‘test’, ‘accuracy’), ]
Or a list of tuple pairs, to specify a different target metric for to use on the pipeline Task:
[((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ]
Example: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]monitor_artifacts (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s artifacts on the pipeline Task. Provided a list of artifact names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘processed_data’, ‘final_processed_data’), ] Alternatively user can also provide a list of artifacts to monitor (target artifact name will be the same as original artifact name) Example: [‘processed_data’, ]
monitor_models (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s output models on the pipeline Task. Provided a list of model names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘model_weights’, ‘final_model_weights’), ] Alternatively user can also provide a list of models to monitor (target models name will be the same as original model) Example: [‘model_weights’, ] To select the latest (lexicographic) model use “model*”, or the last created model with just “*” Example: [‘model_weights*’, ]
time_limit (Optional [ float ] ) – Default None, no time limit. Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.
base_task_project (Optional [ str ] ) – If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.
base_task_name (Optional [ str ] ) – If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.
clone_base_task (bool ) – If True (default), the pipeline will clone the base task, and modify/enqueue the cloned Task. If False, the base-task is used directly, notice it has to be in draft-mode (created).
continue_on_fail (bool ) – (default False). If True, failed step will not cause the pipeline to stop (or marked as failed). Notice, that steps that are connected (or indirectly connected) to the failed step will be skipped.
pre_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , dict ] , bool ] ] # noqa ) – Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.
If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.
Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.
def step_created_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
parameters, # type: dict
):
passpost_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node ] , None ] ] # noqa ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.
def step_completed_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
):
passcache_executed_step (bool ) – If True, before launching the new step, after updating with the latest configuration, check if an exact Task with the same parameter/code was already executed. If it was found, use it instead of launching a new Task. Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used. If clone_base_task is False there is no cloning, hence the base_task is used.
base_task_factory (Optional [ Callable [ [ PipelineController.Node ] , Task ] ] ) – Optional, instead of providing a pre-existing Task, provide a Callable function to create the Task (returns Task object)
retry_on_failure (Optional [ Union [ int , Callable [ [ PipelineController , PipelineController.Node , int ] , bool ] ] ] # noqa ) – Integer (number of retries) or Callback function that returns True to allow a retry
Integer: In case of node failure, retry the node the number of times indicated by this parameter.
Callable: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int representing the number of previous retries for the node that failed. The function must return
True
if the node should be retried andFalse
otherwise. If True, the node will be re-queued and the number of retries left will be decremented by 1. By default, if this callback is not specified, the function will be retried the number of times indicated by retry_on_failure.def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# allow up to 5 retries (total of 6 runs)
return retries < 5
status_change_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , str ] , None ] ] # noqa ) – Callback function, called when the status of a step (Task) changes. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. The signature of the function must look the following way:
def status_change_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
previous_status # type: str
):
pass
Return type
bool
Returns
True if successful
add_tags
add_tags(tags)
Add tags to this pipeline. Old tags are not deleted. When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
Parameters
tags (
Union
[Sequence
[str
],str
]) – A list of tags for this pipeline.Return type
None
connect_configuration
connect_configuration(configuration, name=None, description=None)
Connect a configuration dictionary or configuration file (pathlib.Path / str) to the PipelineController object. This method should be called before reading the configuration file.
For example, a local file:
config_file = pipe.connect_configuration(config_file)
my_params = json.load(open(config_file,'rt'))
A parameter dictionary/list:
my_params = pipe.connect_configuration(my_params)
Parameters
configuration (
Union
[Mapping
,list
,Path
,str
]) – The configuration. This is usually the configuration used in the model training process.Specify one of the following:
A dictionary/list - A dictionary containing the configuration. ClearML stores the configuration in
the ClearML Server (backend), in a HOCON format (JSON-like format) which is editable.
A
pathlib2.Path
string - A path to the configuration file. ClearML stores the content of the file.A local path must be relative path. When executing a pipeline remotely in a worker, the contents brought from the ClearML Server (backend) overwrites the contents of the file.
name (str ) – Configuration section name. default: ‘General’ Allowing users to store multiple configuration dicts/files
description (str ) – Configuration section description (text). default: None
Return type
Union
[dict
,Path
,str
]Returns
If a dictionary is specified, then a dictionary is returned. If pathlib2.Path / string is specified, then a path to a local configuration file is returned. Configuration object.
create_draft
create_draft()
Optional, manually create & serialize the Pipeline Task (use with care for manual multi pipeline creation).
Notice The recommended flow would be to call pipeline.start(queue=None) which would have a similar effect and will allow you to clone/enqueue later on.
After calling Pipeline.create(), users can edit the pipeline in the UI and enqueue it for execution.
Notice: this function should be used to programmatically create pipeline for later usage. To automatically create and launch pipelines, call the start() method.
Return type
None
elapsed
elapsed()
Return minutes elapsed from controller stating time stamp.
Return type
float
Returns
The minutes from controller start time. A negative value means the process has not started yet.
PipelineController.get_logger
classmethod get_logger()
Return a logger connected to the Pipeline Task. The logger can be used by any function/tasks executed by the pipeline, in order to report directly to the pipeline Task itself. It can also be called from the main pipeline control Task.
Raise ValueError if main Pipeline task could not be located.
Return type
Returns
Logger object for reporting metrics (scalars, plots, debug samples etc.)
get_parameters
get_parameters()
Return the pipeline parameters dictionary
:rtype: dict
:return: Dictionary str -> str
Return type
dict
get_pipeline_dag
get_pipeline_dag()
Return the pipeline execution graph, each node in the DAG is PipelineController.Node object. Graph itself is a dictionary of Nodes (key based on the Node name), each node holds links to its parent Nodes (identified by their unique names)
Return type
Mapping
[str
,Node
]Returns
execution tree, as a nested dictionary. Example:
{
'stage1' : Node() {
name: 'stage1'
job: ClearmlJob
...
},
}
get_processed_nodes
get_processed_nodes()
Return a list of the processed pipeline nodes, each entry in the list is PipelineController.Node object.
Return type
Sequence
[Node
]Returns
executed (excluding currently executing) nodes list
get_running_nodes
get_running_nodes()
Return a list of the currently running pipeline nodes, each entry in the list is PipelineController.Node object.
Return type
Sequence
[Node
]Returns
Currently running nodes list
is_running
is_running()
return True if the pipeline controller is running.
Return type
bool
Returns
A boolean indicating whether the pipeline controller is active (still running) or stopped.
is_successful
is_successful(fail_on_step_fail=True, fail_condition='all')
Evaluate whether the pipeline is successful.
Parameters
fail_on_step_fail (
bool
) – If True (default), evaluate the pipeline steps’ status to assess if the pipeline is successful. If False, only evaluate the controllerfail_condition (
str
) – Must be one of the following: ‘all’ (default), ‘failed’ or ‘aborted’. If ‘failed’, this function will return False if the pipeline failed and True if the pipeline was aborted. If ‘aborted’, this function will return False if the pipeline was aborted and True if the pipeline failed. If ‘all’, this function will return False in both cases.
Return type
bool
Returns
A boolean indicating whether the pipeline was successful or not. Note that if the pipeline is in a running/pending state, this function will return False
set_default_execution_queue
set_default_execution_queue(default_execution_queue)
Set the default execution queue if pipeline step does not specify an execution queue
Parameters
default_execution_queue (
Optional
[str
]) – The execution queue to use if no execution queue is providedReturn type
None
set_pipeline_execution_time_limit
set_pipeline_execution_time_limit(max_execution_minutes)
Set maximum execution time (minutes) for the entire pipeline. Pass None or 0 to disable execution time limit.
Parameters
max_execution_minutes (float ) – The maximum time (minutes) for the entire pipeline process. The default is
None
, indicating no time limit.Return type
None
start
start(queue='services', step_task_created_callback=None, step_task_completed_callback=None, wait=True)
Start the current pipeline remotely (on the selected services queue). The current process will be stopped and launched remotely.
Parameters
queue – queue name to launch the pipeline on
step_task_created_callback (Callable ) – Callback function, called when a step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.
If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.
Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.
def step_created_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
parameters, # type: dict
):
passstep_task_completed_callback (Callable ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.
def step_completed_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
):
passwait – If True (default), start the pipeline controller, return only after the pipeline is done (completed/aborted/failed)
Return type
bool
Returns
True, if the controller started. False, if the controller did not start.
start_locally
start_locally(run_pipeline_steps_locally=False)
Start the current pipeline locally, meaning the pipeline logic is running on the current machine, instead of on the services queue.
Using run_pipeline_steps_locally=True you can run all the pipeline steps locally as sub-processes. Notice: when running pipeline steps locally, it assumes local code execution (i.e. it is running the local code as is, regardless of the git commit/diff on the pipeline steps Task)
Parameters
run_pipeline_steps_locally (
bool
) – (default False) If True, run the pipeline steps themselves locally as a subprocess (use for debugging the pipeline locally, notice the pipeline code is expected to be available on the local machine)Return type
None
stop
stop(timeout=None, mark_failed=False, mark_aborted=False)
Stop the pipeline controller and the optimization thread. If mark_failed and mark_aborted are False (default) mark the pipeline as completed, unless one of the steps failed, then mark the pipeline as failed.
Parameters
timeout (Optional [ float ] ) – Wait timeout for the optimization thread to exit (minutes). The default is
None
, indicating do not wait to terminate immediately.mark_failed (bool ) – If True, mark the pipeline task as failed. (default False)
mark_aborted (bool ) – If True, mark the pipeline task as aborted. (default False)
Return type
()
update_execution_plot
update_execution_plot()
Update sankey diagram of the current pipeline
Return type
()
PipelineController.upload_artifact
classmethod upload_artifact(name, artifact_object, metadata=None, delete_after_upload=False, auto_pickle=True, preview=None, wait_on_upload=False, serialization_function=None)
Upload (add) an artifact to the main Pipeline Task object. This function can be called from any pipeline component to directly add artifacts into the main pipeline Task.
The artifact can be uploaded by any function/tasks executed by the pipeline, in order to report directly to the pipeline Task itself. It can also be called from the main pipeline control Task.
Raise ValueError if main Pipeline task could not be located.
The currently supported upload artifact types include:
string / Path - A path to artifact file. If a wildcard or a folder is specified, then ClearML creates and uploads a ZIP file.
dict - ClearML stores a dictionary as
.json
file and uploads it.pandas.DataFrame - ClearML stores a pandas.DataFrame as
.csv.gz
(compressed CSV) file and uploads it.numpy.ndarray - ClearML stores a numpy.ndarray as
.npz
file and uploads it.PIL.Image - ClearML stores a PIL.Image as
.png
file and uploads it.Any - If called with auto_pickle=True, the object will be pickled and uploaded.
Parameters
name (str ) – The artifact name.
dangerIf an artifact with the same name was previously uploaded, then it is overwritten.
artifact_object (object ) – The artifact object.
metadata (dict ) – A dictionary of key-value pairs for any metadata. This dictionary appears with the experiment in the ClearML Web-App (UI), ARTIFACTS tab.
delete_after_upload (bool ) – After the upload, delete the local copy of the artifact
True
- Delete the local copy of the artifact.False
- Do not delete. (default)
auto_pickle (bool ) – If True (default), and the artifact_object is not one of the following types: pathlib2.Path, dict, pandas.DataFrame, numpy.ndarray, PIL.Image, url (string), local_file (string) the artifact_object will be pickled and uploaded as pickle file artifact (with file extension .pkl)
preview (Any ) – The artifact preview
wait_on_upload (bool ) – Whether the upload should be synchronous, forcing the upload to complete before continuing.
Union [ bytes , bytearray ] ] serialization_function (Callable [ Any , ) – A serialization function that takes one parameter of any type which is the object to be serialized. The function should return a bytes or bytearray object, which represents the serialized object. Note that the object will be immediately serialized using this function, thus other serialization methods will not be used (e.g. pandas.DataFrame.to_csv), even if possible. To deserialize this artifact when getting it using the Artifact.get method, use its deserialization_function argument.
serialization_function (Optional [ Callable [ [ Any ] , Union [ bytes , bytearray ] ] ] ) –
Return type
bool
Returns
The status of the upload.
True
- Upload succeeded.False
- Upload failed.
Raise
If the artifact object type is not supported, raise a
ValueError
.Parameters
name (str ) –
artifact_object (Any ) –
metadata (Optional [ Mapping ] ) –
delete_after_upload (bool ) –
auto_pickle (bool ) –
preview (Optional [ Any ] ) –
wait_on_upload (bool ) –
serialization_function (Optional [ Callable [ [ Any ] , Union [ bytes , bytearray ] ] ] ) –
Return type
bool
PipelineController.upload_model
classmethod upload_model(model_name, model_local_path)
Upload (add) a model to the main Pipeline Task object. This function can be called from any pipeline component to directly add models into the main pipeline Task
The model file/path will be uploaded to the Pipeline Task and registered on the model repository.
Raise ValueError if main Pipeline task could not be located.
Parameters
model_name (
str
) – Model name as will appear in the model registry (in the pipeline’s project)model_local_path (
str
) – Path to the local model file or directory to be uploaded. If a local directory is provided the content of the folder (recursively) will be packaged into a zip file and uploaded
Return type
wait
wait(timeout=None)
Wait for the pipeline to finish.
This method does not stop the pipeline. Call stop to terminate the pipeline.
Parameters
timeout (float ) – The timeout to wait for the pipeline to complete (minutes). If
None
, then wait until we reached the timeout, or pipeline completed.Return type
bool
Returns
True, if the pipeline finished. False, if the pipeline timed out.
class automation.controller.PipelineDecorator()
Create a new pipeline controller. The newly created object will launch and monitor the new experiments.
Parameters
name (str ) – Provide pipeline name (if main Task exists it overrides its name)
project (str ) – Provide project storing the pipeline (if main Task exists it overrides its project)
version (Optional [ str ] ) – Pipeline version. This version allows to uniquely identify the pipeline template execution. Examples for semantic versions: version=’1.0.1’ , version=’23’, version=’1.2’. If not set, find the latest version of the pipeline and increment it. If no such version is found, default to ‘1.0.0’
pool_frequency (float ) – The pooling frequency (in minutes) for monitoring experiments / states.
add_pipeline_tags (bool ) – (default: False) if True, add pipe: <pipeline_task_id> tag to all steps (Tasks) created by this pipeline.
target_project (str ) – If provided, all pipeline steps are cloned into the target project
abort_on_failure (bool ) – If False (default), failed pipeline steps will not cause the pipeline to stop immediately, instead any step that is not connected (or indirectly connected) to the failed step, will still be executed. Nonetheless the pipeline itself will be marked failed, unless the failed step was specifically defined with “continue_on_fail=True”. If True, any failed step will cause the pipeline to immediately abort, stop all running steps, and mark the pipeline as failed.
add_run_number (bool ) – If True (default), add the run number of the pipeline to the pipeline name. Example, the second time we launch the pipeline “best pipeline”, we rename it to “best pipeline #2”
retry_on_failure (Optional [ Union [ int , Callable [ [ PipelineController , PipelineController.Node , int ] , bool ] ] ] # noqa ) – Integer (number of retries) or Callback function that returns True to allow a retry
Integer: In case of node failure, retry the node the number of times indicated by this parameter.
Callable: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int representing the number of previous retries for the node that failed. The function must return
True
if the node should be retried andFalse
otherwise. If True, the node will be re-queued and the number of retries left will be decremented by 1. By default, if this callback is not specified, the function will be retried the number of times indicated by retry_on_failure.def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# allow up to 5 retries (total of 6 runs)
return retries < 5
docker (Optional [ str ] ) – Select the docker image to be executed in by the remote session
docker_args (Optional [ str ] ) – Add docker arguments, pass a single string
docker_bash_setup_script (Optional [ str ] ) – Add bash script to be executed inside the docker before setting up the Task’s environment
packages (Optional [ Union [ str , Sequence [ str ] ] ] ) – Manually specify a list of required packages or a local requirements.txt file. Example: [“tqdm>=2.1”, “scikit-learn”] or “./requirements.txt” If not provided, packages are automatically added.
repo (Optional [ str ] ) – Optional, specify a repository to attach to the pipeline controller, when remotely executing. Allow users to execute the controller inside the specified repository, enabling them to load modules/script from the repository. Notice the execution work directory will be the repository root folder. Supports both git repo url link, and local repository path (automatically converted into the remote git/commit as is currently checkout). Example remote url: ‘https://github.com/user/repo.git’ Example local repo copy: ‘./repo’ -> will automatically store the remote repo url and commit ID based on the locally cloned copy Use empty string (“”) to disable any repository auto-detection
repo_branch (Optional [ str ] ) – Optional, specify the remote repository branch (Ignored, if local repo path is used)
repo_commit (Optional [ str ] ) – Optional, specify the repository commit ID (Ignored, if local repo path is used)
artifact_serialization_function (Optional [ Callable [ [ Any ] , Union [ bytes , bytearray ] ] ] ) – A serialization function that takes one parameter of any type which is the object to be serialized. The function should return a bytes or bytearray object, which represents the serialized object. All parameter/return artifacts uploaded by the pipeline will be serialized using this function. All relevant imports must be done in this function. For example:
def serialize(obj):
import dill
return dill.dumps(obj)artifact_deserialization_function (Optional [ Callable [ [ bytes ] , Any ] ] ) – A deserialization function that takes one parameter of type bytes, which represents the serialized object. This function should return the deserialized object. All parameter/return artifacts fetched by the pipeline will be deserialized using this function. All relevant imports must be done in this function. For example:
def deserialize(bytes_):
import dill
return dill.loads(bytes_)
add_function_step
add_function_step(name, function, function_kwargs=None, function_return=None, project_name=None, task_name=None, task_type=None, auto_connect_frameworks=None, auto_connect_arg_parser=None, packages=None, repo=None, repo_branch=None, repo_commit=None, helper_functions=None, docker=None, docker_args=None, docker_bash_setup_script=None, parents=None, execution_queue=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None, time_limit=None, continue_on_fail=False, pre_execute_callback=None, post_execute_callback=None, cache_executed_step=False, retry_on_failure=None, status_change_callback=None, tags=None)
Create a Task from a function, including wrapping the function input arguments into the hyper-parameter section as kwargs, and storing function results as named artifacts
Example:
def mock_func(a=6, b=9):
c = a*b
print(a, b, c)
return c, c**2
create_task_from_function(mock_func, function_return=['mul', 'square'])
Example arguments from other Tasks (artifact):
def mock_func(matrix_np):
c = matrix_np*matrix_np
print(matrix_np, c)
return c
create_task_from_function(
mock_func,
function_kwargs={'matrix_np': 'aabb1122.previous_matrix'},
function_return=['square_matrix']
)
Parameters
name (str ) – Unique of the step. For example stage1
function (Callable ) – A global function to convert into a standalone Task
function_kwargs (Optional [ Dict [ str , Any ] ] ) – Optional, provide subset of function arguments and default values to expose. If not provided automatically take all function arguments & defaults Optional, pass input arguments to the function from other Tasks’s output artifact. Example argument named numpy_matrix from Task ID aabbcc artifact name answer: {‘numpy_matrix’: ‘aabbcc.answer’}
function_return (Optional [ List [ str ] ] ) – Provide a list of names for all the results. If not provided, no results will be stored as artifacts.
project_name (Optional [ str ] ) – Set the project name for the task. Required if base_task_id is None.
task_name (Optional [ str ] ) – Set the name of the remote task, if not provided use name argument.
task_type (Optional [ str ] ) – Optional, The task type to be created. Supported values: ‘training’, ‘testing’, ‘inference’, ‘data_processing’, ‘application’, ‘monitor’, ‘controller’, ‘optimizer’, ‘service’, ‘qc’, ‘custom’
auto_connect_frameworks (Optional [ dict ] ) – Control the frameworks auto connect, see Task.init auto_connect_frameworks
auto_connect_arg_parser (Optional [ dict ] ) – Control the ArgParser auto connect, see Task.init auto_connect_arg_parser
packages (Optional [ Union [ str , Sequence [ str ] ] ] ) – Manually specify a list of required packages or a local requirements.txt file. Example: [“tqdm>=2.1”, “scikit-learn”] or “./requirements.txt” If not provided, packages are automatically added based on the imports used in the function.
repo (Optional [ str ] ) – Optional, specify a repository to attach to the function, when remotely executing. Allow users to execute the function inside the specified repository, enabling to load modules/script from a repository Notice the execution work directory will be the repository root folder. Supports both git repo url link, and local repository path. Example remote url: ‘https://github.com/user/repo.git’ Example local repo copy: ‘./repo’ -> will automatically store the remote repo url and commit ID based on the locally cloned copy
repo_branch (Optional [ str ] ) – Optional, specify the remote repository branch (Ignored, if local repo path is used)
repo_commit (Optional [ str ] ) – Optional, specify the repository commit ID (Ignored, if local repo path is used)
helper_functions (Optional [ Sequence [ Callable ] ] ) – Optional, a list of helper functions to make available for the standalone function Task.
docker (Optional [ str ] ) – Select the docker image to be executed in by the remote session
docker_args (Optional [ str ] ) – Add docker arguments, pass a single string
docker_bash_setup_script (Optional [ str ] ) – Add bash script to be executed inside the docker before setting up the Task’s environment
parents (Optional [ Sequence [ str ] ] ) – Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully.
execution_queue (Optional [ str ] ) – Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the default execution queue, as defined on the class
monitor_metrics (Optional [ List [ Union [ Tuple [ str , str ] , Tuple [ ( str , str ) , ( str , str ) ] ] ] ] ) – Optional, log the step’s metrics on the pipeline Task. Format is a list of pairs metric (title, series) to log:
[(step_metric_title, step_metric_series), ] Example: [(‘test’, ‘accuracy’), ]
Or a list of tuple pairs, to specify a different target metric for to use on the pipeline Task:
[((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ]
Example: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]monitor_artifacts (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s artifacts on the pipeline Task. Provided a list of artifact names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘processed_data’, ‘final_processed_data’), ] Alternatively user can also provide a list of artifacts to monitor (target artifact name will be the same as original artifact name) Example: [‘processed_data’, ]
monitor_models (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s output models on the pipeline Task. Provided a list of model names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘model_weights’, ‘final_model_weights’), ] Alternatively user can also provide a list of models to monitor (target models name will be the same as original model) Example: [‘model_weights’, ] To select the latest (lexicographic) model use “model*”, or the last created model with just “*” Example: [‘model_weights*’, ]
time_limit (Optional [ float ] ) – Default None, no time limit. Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.
continue_on_fail (bool ) – (default False). If True, failed step will not cause the pipeline to stop (or marked as failed). Notice, that steps that are connected (or indirectly connected) to the failed step will be skipped.
pre_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , dict ] , bool ] ] # noqa ) – Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.
If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.
Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.
def step_created_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
parameters, # type: dict
):
passpost_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node ] , None ] ] # noqa ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.
def step_completed_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
):
passcache_executed_step (bool ) – If True, before launching the new step, after updating with the latest configuration, check if an exact Task with the same parameter/code was already executed. If it was found, use it instead of launching a new Task. Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used.
retry_on_failure (Optional [ Union [ int , Callable [ [ PipelineController , PipelineController.Node , int ] , bool ] ] ] # noqa ) – Integer (number of retries) or Callback function that returns True to allow a retry
Integer: In case of node failure, retry the node the number of times indicated by this parameter.
Callable: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int representing the number of previous retries for the node that failed. The function must return
True
if the node should be retried andFalse
otherwise. If True, the node will be re-queued and the number of retries left will be decremented by 1. By default, if this callback is not specified, the function will be retried the number of times indicated by retry_on_failure.def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# allow up to 5 retries (total of 6 runs)
return retries < 5
status_change_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , str ] , None ] ] # noqa ) – Callback function, called when the status of a step (Task) changes. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. The signature of the function must look the following way:
def status_change_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
previous_status # type: str
):
passtags (Optional [ Union [ str , Sequence [ str ] ] ] ) – A list of tags for the specific pipeline step. When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
Return type
bool
Returns
True if successful
add_parameter
add_parameter(name, default=None, description=None, param_type=None)
Add a parameter to the pipeline Task. The parameter can be used as input parameter for any step in the pipeline. Notice all parameters will appear under the PipelineController Task’s Hyper-parameters -> Pipeline section Example: pipeline.add_parameter(name=’dataset’, description=’dataset ID to process the pipeline’) Then in one of the steps we can refer to the value of the parameter with ‘${pipeline.dataset}’
Parameters
name (
str
) – String name of the parameter.default (
Optional
[Any
]) – Default value to be put as the default value (can be later changed in the UI)description (
Optional
[str
]) – String description of the parameter and its usage in the pipelineparam_type (
Optional
[str
]) – Optional, parameter type information (to be used as hint for casting and description)
Return type
None
add_step
add_step(name, base_task_id=None, parents=None, parameter_override=None, configuration_overrides=None, task_overrides=None, execution_queue=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None, time_limit=None, base_task_project=None, base_task_name=None, clone_base_task=True, continue_on_fail=False, pre_execute_callback=None, post_execute_callback=None, cache_executed_step=False, base_task_factory=None, retry_on_failure=None, status_change_callback=None)
Add a step to the pipeline execution DAG. Each step must have a unique name (this name will later be used to address the step)
Parameters
name (str ) – Unique of the step. For example stage1
base_task_id (Optional [ str ] ) – The Task ID to use for the step. Each time the step is executed, the base Task is cloned, then the cloned task will be sent for execution.
parents (Optional [ Sequence [ str ] ] ) – Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully.
parameter_override (Optional [ Mapping [ str , Any ] ] ) – Optional parameter overriding dictionary.
The dict values can reference a previously executed step using the following form ‘${step_name}’. Examples:
Artifact access
parameter_override={'Args/input_file': '${<step_name>.artifacts.<artifact_name>.url}' }
Model access (last model used)
parameter_override={'Args/input_file': '${<step_name>.models.output.-1.url}' }
Parameter access
parameter_override={'Args/input_file': '${<step_name>.parameters.Args/input_file}' }
Pipeline Task argument (see Pipeline.add_parameter)
parameter_override={'Args/input_file': '${pipeline.<pipeline_parameter>}' }
Task ID
parameter_override={'Args/input_file': '${stage3.id}' }
configuration_overrides (Optional [ Mapping [ str , Union [ str , Mapping ] ] ] ) – Optional, override Task configuration objects. Expected dictionary of configuration object name and configuration object content. Examples:
{‘General’: dict(key=’value’)} {‘General’: ‘configuration file content’} {‘OmegaConf’: YAML.dumps(full_hydra_dict)}
task_overrides (Optional [ Mapping [ str , Any ] ] ) – Optional task section overriding dictionary.
The dict values can reference a previously executed step using the following form ‘${step_name}’. Examples:
get the latest commit from a specific branch
task_overrides={'script.version_num': '', 'script.branch': 'main'}
match git repository branch to a previous step
task_overrides={'script.branch': '${stage1.script.branch}', 'script.version_num': ''}
change container image
task_overrides={'container.image': 'nvidia/cuda:11.6.0-devel-ubuntu20.04', 'container.arguments': '--ipc=host'}
match container image to a previous step
task_overrides={'container.image': '${stage1.container.image}'}
reset requirements (the agent will use the “requirements.txt” inside the repo)
task_overrides={'script.requirements.pip': ""}
execution_queue (Optional [ str ] ) – Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the default execution queue, as defined on the class
monitor_metrics (Optional [ List [ Union [ Tuple [ str , str ] , Tuple [ ( str , str ) , ( str , str ) ] ] ] ] ) – Optional, log the step’s metrics on the pipeline Task. Format is a list of pairs metric (title, series) to log:
[(step_metric_title, step_metric_series), ] Example: [(‘test’, ‘accuracy’), ]
Or a list of tuple pairs, to specify a different target metric for to use on the pipeline Task:
[((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ]
Example: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]monitor_artifacts (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s artifacts on the pipeline Task. Provided a list of artifact names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘processed_data’, ‘final_processed_data’), ] Alternatively user can also provide a list of artifacts to monitor (target artifact name will be the same as original artifact name) Example: [‘processed_data’, ]
monitor_models (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s output models on the pipeline Task. Provided a list of model names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘model_weights’, ‘final_model_weights’), ] Alternatively user can also provide a list of models to monitor (target models name will be the same as original model) Example: [‘model_weights’, ] To select the latest (lexicographic) model use “model*”, or the last created model with just “*” Example: [‘model_weights*’, ]
time_limit (Optional [ float ] ) – Default None, no time limit. Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.
base_task_project (Optional [ str ] ) – If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.
base_task_name (Optional [ str ] ) – If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.
clone_base_task (bool ) – If True (default), the pipeline will clone the base task, and modify/enqueue the cloned Task. If False, the base-task is used directly, notice it has to be in draft-mode (created).
continue_on_fail (bool ) – (default False). If True, failed step will not cause the pipeline to stop (or marked as failed). Notice, that steps that are connected (or indirectly connected) to the failed step will be skipped.
pre_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , dict ] , bool ] ] # noqa ) – Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.
If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.
Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.
def step_created_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
parameters, # type: dict
):
passpost_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node ] , None ] ] # noqa ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.
def step_completed_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
):
passcache_executed_step (bool ) – If True, before launching the new step, after updating with the latest configuration, check if an exact Task with the same parameter/code was already executed. If it was found, use it instead of launching a new Task. Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used. If clone_base_task is False there is no cloning, hence the base_task is used.
base_task_factory (Optional [ Callable [ [ PipelineController.Node ] , Task ] ] ) – Optional, instead of providing a pre-existing Task, provide a Callable function to create the Task (returns Task object)
retry_on_failure (Optional [ Union [ int , Callable [ [ PipelineController , PipelineController.Node , int ] , bool ] ] ] # noqa ) – Integer (number of retries) or Callback function that returns True to allow a retry
Integer: In case of node failure, retry the node the number of times indicated by this parameter.
Callable: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int representing the number of previous retries for the node that failed. The function must return
True
if the node should be retried andFalse
otherwise. If True, the node will be re-queued and the number of retries left will be decremented by 1. By default, if this callback is not specified, the function will be retried the number of times indicated by retry_on_failure.def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# allow up to 5 retries (total of 6 runs)
return retries < 5
status_change_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , str ] , None ] ] # noqa ) – Callback function, called when the status of a step (Task) changes. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. The signature of the function must look the following way:
def status_change_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
previous_status # type: str
):
pass
Return type
bool
Returns
True if successful
add_tags
add_tags(tags)
Add tags to this pipeline. Old tags are not deleted. When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
Parameters
tags (
Union
[Sequence
[str
],str
]) – A list of tags for this pipeline.Return type
None
PipelineDecorator.component
classmethod component(_func=None, *, return_values=('return_object'), name=None, cache=False, packages=None, parents=None, execution_queue=None, continue_on_fail=False, docker=None, docker_args=None, docker_bash_setup_script=None, task_type=None, auto_connect_frameworks=None, auto_connect_arg_parser=None, repo=None, repo_branch=None, repo_commit=None, helper_functions=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None, retry_on_failure=None, pre_execute_callback=None, post_execute_callback=None, status_change_callback=None, tags=None)
pipeline component function to be executed remotely
Parameters
_func – wrapper function
return_values (Union [ str , Sequence [ str ] ] ) – Provide a list of names for all the results. Notice! If not provided no results will be stored as artifacts.
name (Optional [ str ] ) – Optional, set the name of the pipeline component task. If not provided, the wrapped function name is used as the pipeline component name
cache (bool ) – If True, before launching the new step, after updating with the latest configuration, check if an exact Task with the same parameter/code was already executed. If it was found, use it instead of launching a new Task. Default: False
packages (Optional [ Union [ str , Sequence [ str ] ] ] ) – Manually specify a list of required packages or a local requirements.txt file. Example: [“tqdm>=2.1”, “scikit-learn”] or “./requirements.txt” If not provided, packages are automatically added based on the imports used inside the wrapped function.
parents (Optional [ List [ str ] ] ) – Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully.
execution_queue (Optional [ str ] ) – Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the pipeline’s default execution queue
continue_on_fail (bool ) – (default False). If True, a failed step will not cause the pipeline to stop (or marked as failed). Notice, that steps that are connected (or indirectly connected) to the failed step will be skipped.
docker (Optional [ str ] ) – Specify the docker image to be used when executing the pipeline step remotely
docker_args (Optional [ str ] ) – Add docker execution arguments for the remote execution (use single string for all docker arguments).
docker_bash_setup_script (Optional [ str ] ) – Add a bash script to be executed inside the docker before setting up the Task’s environment
task_type (Optional [ str ] ) – Optional, The task type to be created. Supported values: ‘training’, ‘testing’, ‘inference’, ‘data_processing’, ‘application’, ‘monitor’, ‘controller’, ‘optimizer’, ‘service’, ‘qc’, ‘custom’
auto_connect_frameworks (Optional [ dict ] ) – Control the frameworks auto connect, see Task.init auto_connect_frameworks
auto_connect_arg_parser (Optional [ dict ] ) – Control the ArgParser auto connect, see Task.init auto_connect_arg_parser
repo (Optional [ str ] ) – Optional, specify a repository to attach to the function, when remotely executing. Allow users to execute the function inside the specified repository, enabling them to load modules/script from the repository. Notice the execution work directory will be the repository root folder. Supports both git repo url link, and local repository path (automatically converted into the remote git/commit as is currently checkout). Example remote url: ‘https://github.com/user/repo.git’ Example local repo copy: ‘./repo’ -> will automatically store the remote repo url and commit ID based on the locally cloned copy
repo_branch (Optional [ str ] ) – Optional, specify the remote repository branch (Ignored, if local repo path is used)
repo_commit (Optional [ str ] ) – Optional, specify the repository commit ID (Ignored, if local repo path is used)
helper_functions (Optional [ Sequence [ Callable ] ] ) – Optional, a list of helper functions to make available for the standalone pipeline step function Task. By default the pipeline step function has no access to any of the other functions, by specifying additional functions here, the remote pipeline step could call the additional functions. Example, assuming we have two functions parse_data(), and load_data(): [parse_data, load_data]
monitor_metrics (Optional [ List [ Union [ Tuple [ str , str ] , Tuple [ ( str , str ) , ( str , str ) ] ] ] ] ) – Optional, Automatically log the step’s reported metrics also on the pipeline Task. The expected format is a list of pairs metric (title, series) to log:
[(step_metric_title, step_metric_series), ] Example: [(‘test’, ‘accuracy’), ]
Or a list of tuple pairs, to specify a different target metric to use on the pipeline Task:
[((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ]
Example: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]monitor_artifacts (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, Automatically log the step’s artifacts on the pipeline Task. Provided a list of artifact names created by the step function, these artifacts will be logged automatically also on the Pipeline Task itself. Example: [‘processed_data’, ] (target artifact name on the Pipeline Task will hav ethe same name as the original artifact) Alternatively, provide a list of pairs (source_artifact_name, target_artifact_name): where the first string is the artifact name as it appears on the component Task, and the second is the target artifact name to put on the Pipeline Task Example: [(‘processed_data’, ‘final_processed_data’), ]
monitor_models (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, Automatically log the step’s output models on the pipeline Task. Provided a list of model names created by the step’s Task, they will also appear on the Pipeline itself. Example: [‘model_weights’, ] To select the latest (lexicographic) model use “model*”, or the last created model with just “*” Example: [‘model_weights*’, ] Alternatively, provide a list of pairs (source_model_name, target_model_name): where the first string is the model name as it appears on the component Task, and the second is the target model name to put on the Pipeline Task Example: [(‘model_weights’, ‘final_model_weights’), ]
retry_on_failure (Optional [ Union [ int , Callable [ [ PipelineController , PipelineController.Node , int ] , bool ] ] ] # noqa ) – Integer (number of retries) or Callback function that returns True to allow a retry
- Integer: In case of node failure, retry the node the number of times indicated by this parameter.
- Callable: A function called on node failure. Takes as parameters:
> the PipelineController instance, the PipelineController.Node that failed and an int
> representing the number of previous retries for the node that failed
> The function must return a bool: True if the node should be retried and False otherwise.
> If True, the node will be re-queued and the number of retries left will be decremented by 1.
> By default, if this callback is not specified, the function will be retried the number of
> times indicated by retry_on_failure.def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# allow up to 5 retries (total of 6 runs)
return retries < 5pre_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , dict ] , bool ] ] # noqa ) – Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.
If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.
Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.
def step_created_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
parameters, # type: dict
):
passpost_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node ] , None ] ] # noqa ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.
def step_completed_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
):
passstatus_change_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , str ] , None ] ] # noqa ) – Callback function, called when the status of a step (Task) changes. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. The signature of the function must look the following way:
def status_change_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
previous_status # type: str
):
passtags (Optional [ Union [ str , Sequence [ str ] ] ] ) – A list of tags for the specific pipeline step. When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
Return type
Callable
Returns
function wrapper
connect_configuration
connect_configuration(configuration, name=None, description=None)
Connect a configuration dictionary or configuration file (pathlib.Path / str) to the PipelineController object. This method should be called before reading the configuration file.
For example, a local file:
config_file = pipe.connect_configuration(config_file)
my_params = json.load(open(config_file,'rt'))
A parameter dictionary/list:
my_params = pipe.connect_configuration(my_params)
Parameters
configuration (
Union
[Mapping
,list
,Path
,str
]) – The configuration. This is usually the configuration used in the model training process.Specify one of the following:
A dictionary/list - A dictionary containing the configuration. ClearML stores the configuration in
the ClearML Server (backend), in a HOCON format (JSON-like format) which is editable.
A
pathlib2.Path
string - A path to the configuration file. ClearML stores the content of the file.A local path must be relative path. When executing a pipeline remotely in a worker, the contents brought from the ClearML Server (backend) overwrites the contents of the file.
name (str ) – Configuration section name. default: ‘General’ Allowing users to store multiple configuration dicts/files
description (str ) – Configuration section description (text). default: None
Return type
Union
[dict
,Path
,str
]Returns
If a dictionary is specified, then a dictionary is returned. If pathlib2.Path / string is specified, then a path to a local configuration file is returned. Configuration object.
create_draft
create_draft()
Optional, manually create & serialize the Pipeline Task (use with care for manual multi pipeline creation).
Notice The recommended flow would be to call pipeline.start(queue=None) which would have a similar effect and will allow you to clone/enqueue later on.
After calling Pipeline.create(), users can edit the pipeline in the UI and enqueue it for execution.
Notice: this function should be used to programmatically create pipeline for later usage. To automatically create and launch pipelines, call the start() method.
Return type
None
PipelineDecorator.debug_pipeline
classmethod debug_pipeline()
Set debugging mode, run all functions locally as functions (serially) Run the full pipeline DAG locally, where steps are executed as functions Notice:
running the DAG locally assumes the local code execution (i.e. it will not clone & apply git diff) Pipeline steps are executed as functions (no Task will be created), fo ease debugging J
Return type
()
elapsed
elapsed()
Return minutes elapsed from controller stating time stamp.
Return type
float
Returns
The minutes from controller start time. A negative value means the process has not started yet.
PipelineDecorator.get_current_pipeline
classmethod get_current_pipeline()
Return the currently running pipeline instance
Return type
ForwardRef
PipelineDecorator.get_logger
classmethod get_logger()
Return a logger connected to the Pipeline Task. The logger can be used by any function/tasks executed by the pipeline, in order to report directly to the pipeline Task itself. It can also be called from the main pipeline control Task.
Raise ValueError if main Pipeline task could not be located.
Return type
Returns
Logger object for reporting metrics (scalars, plots, debug samples etc.)
get_parameters
get_parameters()
Return the pipeline parameters dictionary
:rtype: dict
:return: Dictionary str -> str
Return type
dict
get_pipeline_dag
get_pipeline_dag()
Return the pipeline execution graph, each node in the DAG is PipelineController.Node object. Graph itself is a dictionary of Nodes (key based on the Node name), each node holds links to its parent Nodes (identified by their unique names)
Return type
Mapping
[str
,Node
]Returns
execution tree, as a nested dictionary. Example:
{
'stage1' : Node() {
name: 'stage1'
job: ClearmlJob
...
},
}
get_processed_nodes
get_processed_nodes()
Return a list of the processed pipeline nodes, each entry in the list is PipelineController.Node object.
Return type
Sequence
[Node
]Returns
executed (excluding currently executing) nodes list
get_running_nodes
get_running_nodes()
Return a list of the currently running pipeline nodes, each entry in the list is PipelineController.Node object.
Return type
Sequence
[Node
]Returns
Currently running nodes list
is_running
is_running()
return True if the pipeline controller is running.
Return type
bool
Returns
A boolean indicating whether the pipeline controller is active (still running) or stopped.
is_successful
is_successful(fail_on_step_fail=True, fail_condition='all')
Evaluate whether the pipeline is successful.
Parameters
fail_on_step_fail (
bool
) – If True (default), evaluate the pipeline steps’ status to assess if the pipeline is successful. If False, only evaluate the controllerfail_condition (
str
) – Must be one of the following: ‘all’ (default), ‘failed’ or ‘aborted’. If ‘failed’, this function will return False if the pipeline failed and True if the pipeline was aborted. If ‘aborted’, this function will return False if the pipeline was aborted and True if the pipeline failed. If ‘all’, this function will return False in both cases.
Return type
bool
Returns
A boolean indicating whether the pipeline was successful or not. Note that if the pipeline is in a running/pending state, this function will return False
PipelineDecorator.pipeline
classmethod pipeline(_func=None, *, name, project, version=None, return_value=None, default_queue=None, pool_frequency=0.2, add_pipeline_tags=False, target_project=None, abort_on_failure=False, pipeline_execution_queue='services', multi_instance_support=False, add_run_number=True, args_map=None, start_controller_locally=False, retry_on_failure=None, docker=None, docker_args=None, docker_bash_setup_script=None, packages=None, repo=None, repo_branch=None, repo_commit=None, artifact_serialization_function=None, artifact_deserialization_function=None)
Decorate pipeline logic function.
Parameters
name (str ) – Provide pipeline name (if main Task exists it overrides its name)
project (str ) – Provide project storing the pipeline (if main Task exists it overrides its project)
version (Optional [ str ] ) – Pipeline version. This version allows to uniquely identify the pipeline template execution. Examples for semantic versions: version=’1.0.1’ , version=’23’, version=’1.2’. If not set, find the latest version of the pipeline and increment it. If no such version is found, default to ‘1.0.0’
return_value (Optional [ str ] ) – Optional, Provide an artifact name to store the pipeline function return object Notice, If not provided the pipeline will not store the pipeline function return value.
default_queue (Optional [ str ] ) – default pipeline step queue
pool_frequency (float ) – The pooling frequency (in minutes) for monitoring experiments / states.
add_pipeline_tags (bool ) – (default: False) if True, add pipe: <pipeline_task_id> tag to all steps (Tasks) created by this pipeline.
target_project (str ) – If provided, all pipeline steps are cloned into the target project
abort_on_failure (bool ) – If False (default), failed pipeline steps will not cause the pipeline to stop immediately, instead any step that is not connected (or indirectly connected) to the failed step, will still be executed. Nonetheless the pipeline itself will be marked failed, unless the failed step was specifically defined with “continue_on_fail=True”. If True, any failed step will cause the pipeline to immediately abort, stop all running steps, and mark the pipeline as failed.
pipeline_execution_queue (Optional [ str ] ) – remote pipeline execution queue (default ‘services’ queue). If None is passed, execute the pipeline logic locally (pipeline steps are still executed remotely)
multi_instance_support (bool ) – If True, allow multiple calls to the same pipeline function, each call creating a new Pipeline Task. Notice it is recommended to create an additional Task on the “main process” acting as a master pipeline, automatically collecting the execution plots. If multi_instance_support==’parallel’ then the pipeline calls are executed in parallel, in the parallel case the function calls return None, to collect all pipeline results call PipelineDecorator.wait_for_multi_pipelines(). Default False, no multi instance pipeline support.
add_run_number (bool ) – If True (default), add the run number of the pipeline to the pipeline name. Example, the second time we launch the pipeline “best pipeline”, we rename it to “best pipeline #2”
args_map (dict [ str , List [ str ] ] ) – Map arguments to their specific configuration section. Arguments not included in this map
will default to Args section. For example, for the following code:
@PipelineDecorator.pipeline(args_map={'sectionA':['paramA'], 'sectionB:['paramB','paramC']
def executing_pipeline(paramA, paramB, paramC, paramD):
passParameters would be stored as:
paramA: sectionA/paramA
paramB: sectionB/paramB
paramC: sectionB/paramC
paramD: Args/paramD
start_controller_locally (bool ) – If True, start the controller on the local machine. The steps will run remotely if PipelineDecorator.run_locally or PipelineDecorator.debug_pipeline are not called. Default: False
retry_on_failure (Optional [ Union [ int , Callable [ [ PipelineController , PipelineController.Node , int ] , bool ] ] ] # noqa ) – Integer (number of retries) or Callback function that returns True to allow a retry
Integer: In case of node failure, retry the node the number of times indicated by this parameter.
Callable: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int representing the number of previous retries for the node that failed. The function must return
True
if the node should be retried andFalse
otherwise. If True, the node will be re-queued and the number of retries left will be decremented by 1. By default, if this callback is not specified, the function will be retried the number of times indicated by retry_on_failure.def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# allow up to 5 retries (total of 6 runs)
return retries < 5
docker (Optional [ str ] ) – Select the docker image to be executed in by the remote session
docker_args (Optional [ str ] ) – Add docker arguments, pass a single string
docker_bash_setup_script (Optional [ str ] ) – Add bash script to be executed inside the docker before setting up the Task’s environment
packages (Optional [ Union [ str , Sequence [ str ] ] ] ) – Manually specify a list of required packages or a local requirements.txt file. Example: [“tqdm>=2.1”, “scikit-learn”] or “./requirements.txt” If not provided, packages are automatically added based on the imports used in the function.
repo (Optional [ str ] ) – Optional, specify a repository to attach to the function, when remotely executing. Allow users to execute the function inside the specified repository, enabling them to load modules/script from the repository. Notice the execution work directory will be the repository root folder. Supports both git repo url link, and local repository path (automatically converted into the remote git/commit as is currently checkout). Example remote url: ‘https://github.com/user/repo.git’ Example local repo copy: ‘./repo’ -> will automatically store the remote repo url and commit ID based on the locally cloned copy Use empty string (“”) to disable any repository auto-detection
repo_branch (Optional [ str ] ) – Optional, specify the remote repository branch (Ignored, if local repo path is used)
repo_commit (Optional [ str ] ) – Optional, specify the repository commit ID (Ignored, if local repo path is used)
artifact_serialization_function (Optional [ Callable [ [ Any ] , Union [ bytes , bytearray ] ] ] ) – A serialization function that takes one parameter of any type which is the object to be serialized. The function should return a bytes or bytearray object, which represents the serialized object. All parameter/return artifacts uploaded by the pipeline will be serialized using this function. All relevant imports must be done in this function. For example:
def serialize(obj):
import dill
return dill.dumps(obj)artifact_deserialization_function (Optional [ Callable [ [ bytes ] , Any ] ] ) – A deserialization function that takes one parameter of type bytes, which represents the serialized object. This function should return the deserialized object. All parameter/return artifacts fetched by the pipeline will be deserialized using this function. All relevant imports must be done in this function. For example:
def deserialize(bytes_):
import dill
return dill.loads(bytes_)
Return type
Callable
PipelineDecorator.run_locally
classmethod run_locally()
Set local mode, run all functions locally as subprocess
Run the full pipeline DAG locally, where steps are executed as sub-processes Tasks Notice: running the DAG locally assumes the local code execution (i.e. it will not clone & apply git diff)
Return type
()
PipelineDecorator.set_default_execution_queue
classmethod set_default_execution_queue(default_execution_queue)
Set the default execution queue if pipeline step does not specify an execution queue
Parameters
default_execution_queue (
Optional
[str
]) – The execution queue to use if no execution queue is providedReturn type
None
set_pipeline_execution_time_limit
set_pipeline_execution_time_limit(max_execution_minutes)
Set maximum execution time (minutes) for the entire pipeline. Pass None or 0 to disable execution time limit.
Parameters
max_execution_minutes (float ) – The maximum time (minutes) for the entire pipeline process. The default is
None
, indicating no time limit.Return type
None
start
start(queue='services', step_task_created_callback=None, step_task_completed_callback=None, wait=True)
Start the current pipeline remotely (on the selected services queue). The current process will be stopped and launched remotely.
Parameters
queue – queue name to launch the pipeline on
step_task_created_callback (Callable ) – Callback function, called when a step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.
If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.
Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.
def step_created_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
parameters, # type: dict
):
passstep_task_completed_callback (Callable ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.
def step_completed_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
):
passwait – If True (default), start the pipeline controller, return only after the pipeline is done (completed/aborted/failed)
Return type
bool
Returns
True, if the controller started. False, if the controller did not start.
start_locally
start_locally(run_pipeline_steps_locally=False)
Start the current pipeline locally, meaning the pipeline logic is running on the current machine, instead of on the services queue.
Using run_pipeline_steps_locally=True you can run all the pipeline steps locally as sub-processes. Notice: when running pipeline steps locally, it assumes local code execution (i.e. it is running the local code as is, regardless of the git commit/diff on the pipeline steps Task)
Parameters
run_pipeline_steps_locally (
bool
) – (default False) If True, run the pipeline steps themselves locally as a subprocess (use for debugging the pipeline locally, notice the pipeline code is expected to be available on the local machine)Return type
None
stop
stop(timeout=None, mark_failed=False, mark_aborted=False)
Stop the pipeline controller and the optimization thread. If mark_failed and mark_aborted are False (default) mark the pipeline as completed, unless one of the steps failed, then mark the pipeline as failed.
Parameters
timeout (Optional [ float ] ) – Wait timeout for the optimization thread to exit (minutes). The default is
None
, indicating do not wait to terminate immediately.mark_failed (bool ) – If True, mark the pipeline task as failed. (default False)
mark_aborted (bool ) – If True, mark the pipeline task as aborted. (default False)
Return type
()
update_execution_plot
update_execution_plot()
Update sankey diagram of the current pipeline
Return type
()
PipelineDecorator.upload_artifact
classmethod upload_artifact(name, artifact_object, metadata=None, delete_after_upload=False, auto_pickle=True, preview=None, wait_on_upload=False, serialization_function=None)
Upload (add) an artifact to the main Pipeline Task object. This function can be called from any pipeline component to directly add artifacts into the main pipeline Task.
The artifact can be uploaded by any function/tasks executed by the pipeline, in order to report directly to the pipeline Task itself. It can also be called from the main pipeline control Task.
Raise ValueError if main Pipeline task could not be located.
The currently supported upload artifact types include:
string / Path - A path to artifact file. If a wildcard or a folder is specified, then ClearML creates and uploads a ZIP file.
dict - ClearML stores a dictionary as
.json
file and uploads it.pandas.DataFrame - ClearML stores a pandas.DataFrame as
.csv.gz
(compressed CSV) file and uploads it.numpy.ndarray - ClearML stores a numpy.ndarray as
.npz
file and uploads it.PIL.Image - ClearML stores a PIL.Image as
.png
file and uploads it.Any - If called with auto_pickle=True, the object will be pickled and uploaded.
Parameters
name (str ) – The artifact name.
dangerIf an artifact with the same name was previously uploaded, then it is overwritten.
artifact_object (object ) – The artifact object.
metadata (dict ) – A dictionary of key-value pairs for any metadata. This dictionary appears with the experiment in the ClearML Web-App (UI), ARTIFACTS tab.
delete_after_upload (bool ) – After the upload, delete the local copy of the artifact
True
- Delete the local copy of the artifact.False
- Do not delete. (default)
auto_pickle (bool ) – If True (default), and the artifact_object is not one of the following types: pathlib2.Path, dict, pandas.DataFrame, numpy.ndarray, PIL.Image, url (string), local_file (string) the artifact_object will be pickled and uploaded as pickle file artifact (with file extension .pkl)
preview (Any ) – The artifact preview
wait_on_upload (bool ) – Whether the upload should be synchronous, forcing the upload to complete before continuing.
Union [ bytes , bytearray ] ] serialization_function (Callable [ Any , ) – A serialization function that takes one parameter of any type which is the object to be serialized. The function should return a bytes or bytearray object, which represents the serialized object. Note that the object will be immediately serialized using this function, thus other serialization methods will not be used (e.g. pandas.DataFrame.to_csv), even if possible. To deserialize this artifact when getting it using the Artifact.get method, use its deserialization_function argument.
serialization_function (Optional [ Callable [ [ Any ] , Union [ bytes , bytearray ] ] ] ) –
Return type
bool
Returns
The status of the upload.
True
- Upload succeeded.False
- Upload failed.
Raise
If the artifact object type is not supported, raise a
ValueError
.Parameters
name (str ) –
artifact_object (Any ) –
metadata (Optional [ Mapping ] ) –
delete_after_upload (bool ) –
auto_pickle (bool ) –
preview (Optional [ Any ] ) –
wait_on_upload (bool ) –
serialization_function (Optional [ Callable [ [ Any ] , Union [ bytes , bytearray ] ] ] ) –
Return type
bool
PipelineDecorator.upload_model
classmethod upload_model(model_name, model_local_path)
Upload (add) a model to the main Pipeline Task object. This function can be called from any pipeline component to directly add models into the main pipeline Task
The model file/path will be uploaded to the Pipeline Task and registered on the model repository.
Raise ValueError if main Pipeline task could not be located.
Parameters
model_name (
str
) – Model name as will appear in the model registry (in the pipeline’s project)model_local_path (
str
) – Path to the local model file or directory to be uploaded. If a local directory is provided the content of the folder (recursively) will be packaged into a zip file and uploaded
Return type
wait
wait(timeout=None)
Wait for the pipeline to finish.
This method does not stop the pipeline. Call stop to terminate the pipeline.
Parameters
timeout (float ) – The timeout to wait for the pipeline to complete (minutes). If
None
, then wait until we reached the timeout, or pipeline completed.Return type
bool
Returns
True, if the pipeline finished. False, if the pipeline timed out.
PipelineDecorator.wait_for_multi_pipelines
classmethod wait_for_multi_pipelines()
Wait until all background multi pipeline execution is completed. Returns all the pipeline results in call order (first pipeline call at index 0)
Returns
List of return values from executed pipeline, based on call order.