Skip to main content

PipelineController

class PipelineController()

Pipeline controller. Pipeline is a DAG of base tasks, each task will be cloned (arguments changed as required), executed, and monitored. The pipeline process (task) itself can be executed manually or by the clearml-agent services queue. Notice: The pipeline controller lives as long as the pipeline itself is being executed.

Create a new pipeline controller. The newly created object will launch and monitor the new experiments.

  • Parameters

    • name (str) – Provide pipeline name (if main Task exists it overrides its name)

    • project (str) – Provide project storing the pipeline (if main Task exists it overrides its project)

    • version (Optional[str]) – Pipeline version. This version allows to uniquely identify the pipeline template execution. Examples for semantic versions: version=’1.0.1’ , version=’23’, version=’1.2’. If not set, find the latest version of the pipeline and increment it. If no such version is found, default to ‘1.0.0’

    • pool_frequency (float ) – The pooling frequency (in minutes) for monitoring experiments / states.

    • add_pipeline_tags (bool ) – (default: False) if True, add pipe: <pipeline_task_id> tag to all steps (Tasks) created by this pipeline.

    • target_project (str ) – If provided, all pipeline steps are cloned into the target project. If True, pipeline steps are stored into the pipeline project

    • auto_version_bump (bool ) – (Deprecated) If True, if the same pipeline version already exists (with any difference from the current one), the current pipeline version will be bumped to a new version version bump examples: 1.0.0 -> 1.0.1 , 1.2 -> 1.3, 10 -> 11 etc.

    • abort_on_failure (bool ) – If False (default), failed pipeline steps will not cause the pipeline to stop immediately, instead any step that is not connected (or indirectly connected) to the failed step, will still be executed. Nonetheless the pipeline itself will be marked failed, unless the failed step was specifically defined with “continue_on_fail=True”. If True, any failed step will cause the pipeline to immediately abort, stop all running steps, and mark the pipeline as failed.

    • add_run_number (bool) – If True (default), add the run number of the pipeline to the pipeline name. Example, the second time we launch the pipeline “best pipeline”, we rename it to “best pipeline #2”

    • retry_on_failure (Union[int, Callable[[PipelineController, Node, int], bool], None]) – Integer (number of retries) or Callback function that returns True to allow a retry

      • Integer: In case of node failure, retry the node the number of times indicated by this parameter.

      • Callable: A function called on node failure. Takes as parameters:

        the PipelineController instance, the PipelineController.Node that failed and an int representing the number of previous retries for the node that failed. The function must return True if the node should be retried and False otherwise. If True, the node will be re-queued and the number of retries left will be decremented by 1. By default, if this callback is not specified, the function will be retried the number of times indicated by retry_on_failure.

        def example_retry_on_failure_callback(pipeline, node, retries):
        print(node.name, ' failed')
        # allow up to 5 retries (total of 6 runs)
        return retries &lt; 5
    • docker (Optional[str]) – Select the docker image to be executed in by the remote session

    • docker_args (Optional[str]) – Add docker arguments, pass a single string

    • docker_bash_setup_script (Optional[str]) – Add bash script to be executed inside the docker before setting up the Task’s environment

    • packages (Union[str, Sequence[str], None]) – Manually specify a list of required packages or a local requirements.txt file. Example: [“tqdm>=2.1”, “scikit-learn”] or “./requirements.txt” If not provided, packages are automatically added.

    • repo (Optional[str]) – Optional, specify a repository to attach to the pipeline controller, when remotely executing. Allow users to execute the controller inside the specified repository, enabling them to load modules/script from the repository. Notice the execution work directory will be the repository root folder. Supports both git repo url link, and local repository path (automatically converted into the remote git/commit as is currently checkout). Example remote url: ‘https://github.com/user/repo.git’ Example local repo copy: ‘./repo’ -> will automatically store the remote repo url and commit ID based on the locally cloned copy Use empty string (“”) to disable any repository auto-detection

    • repo_branch (Optional[str]) – Optional, specify the remote repository branch (Ignored, if local repo path is used)

    • repo_commit (Optional[str]) – Optional, specify the repository commit ID (Ignored, if local repo path is used)

    • always_create_from_code (bool) – If True (default) the pipeline is always constructed from code, if False, pipeline is generated from pipeline configuration section on the pipeline Task itsef. this allows to edit (also add/remove) pipeline steps without changing the original codebase

    • artifact_serialization_function (Optional[Callable[[Any], Union[bytes, bytearray]]]) – A serialization function that takes one parameter of any type which is the object to be serialized. The function should return a bytes or bytearray object, which represents the serialized object. All parameter/return artifacts uploaded by the pipeline will be serialized using this function. All relevant imports must be done in this function. For example:

      def serialize(obj):
      import dill
      return dill.dumps(obj)
    • artifact_deserialization_function (Optional[Callable[[bytes], Any]]) – A deserialization function that takes one parameter of type bytes, which represents the serialized object. This function should return the deserialized object. All parameter/return artifacts fetched by the pipeline will be deserialized using this function. All relevant imports must be done in this function. For example:

      def deserialize(bytes_):
      import dill
      return dill.loads(bytes_)
    • output_uri (Union[str, bool, None]) – The storage / output url for this pipeline. This is the default location for output models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter). The output_uri of this pipeline’s steps will default to this value.

    • skip_global_imports (bool) – If True, global imports will not be included in the steps’ execution when creating the steps from a functions, otherwise all global imports will be automatically imported in a safe manner at the beginning of each step’s execution. Default is False

    • working_dir (Optional[str]) – Working directory to launch the pipeline from.


add_function_step

add_function_step(name, function, function_kwargs=None, function_return=None, project_name=None, task_name=None, task_type=None, auto_connect_frameworks=None, auto_connect_arg_parser=None, packages=None, repo=None, repo_branch=None, repo_commit=None, helper_functions=None, docker=None, docker_args=None, docker_bash_setup_script=None, parents=None, execution_queue=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None, time_limit=None, continue_on_fail=False, pre_execute_callback=None, post_execute_callback=None, cache_executed_step=False, retry_on_failure=None, status_change_callback=None, tags=None, output_uri=None, draft=False, working_dir=None)

Create a Task from a function, including wrapping the function input arguments into the hyper-parameter section as kwargs, and storing function results as named artifacts

Example:

def mock_func(a=6, b=9):
c = a*b
print(a, b, c)
return c, c**2

create_task_from_function(mock_func, function_return=['mul', 'square'])

Example arguments from other Tasks (artifact):

def mock_func(matrix_np):
c = matrix_np*matrix_np
print(matrix_np, c)
return c

create_task_from_function(
mock_func,
function_kwargs={'matrix_np': 'aabb1122.previous_matrix'},
function_return=['square_matrix']
)
  • Parameters

    • name (str ) – Unique of the step. For example stage1

    • function (Callable ) – A global function to convert into a standalone Task

    • function_kwargs (Optional [ Dict [ str , Any ] ] ) – Optional, provide subset of function arguments and default values to expose. If not provided automatically take all function arguments & defaults Optional, pass input arguments to the function from other Tasks’ output artifact. Example argument named numpy_matrix from Task ID aabbcc artifact name answer: {‘numpy_matrix’: ‘aabbcc.answer’}

    • function_return (Optional [ List [ str ] ] ) – Provide a list of names for all the results. If not provided, no results will be stored as artifacts.

    • project_name (Optional [ str ] ) – Set the project name for the task. Required if base_task_id is None.

    • task_name (Optional [ str ] ) – Set the name of the remote task, if not provided use name argument.

    • task_type (Optional [ str ] ) – Optional, The task type to be created. Supported values: ‘training’, ‘testing’, ‘inference’, ‘data_processing’, ‘application’, ‘monitor’, ‘controller’, ‘optimizer’, ‘service’, ‘qc’, ‘custom’

    • auto_connect_frameworks (Optional [ dict ] ) – Control the frameworks auto connect, see Task.init auto_connect_frameworks

    • auto_connect_arg_parser (Optional [ dict ] ) – Control the ArgParser auto connect, see Task.init auto_connect_arg_parser

    • packages (Optional [ Union [ str , Sequence [ str ] ] ] ) – Manually specify a list of required packages or a local requirements.txt file. Example: [“tqdm>=2.1”, “scikit-learn”] or “./requirements.txt” If not provided, packages are automatically added based on the imports used in the function.

    • repo (Optional [ str ] ) – Optional, specify a repository to attach to the function, when remotely executing. Allow users to execute the function inside the specified repository, enabling to load modules/script from a repository Notice the execution work directory will be the repository root folder. Supports both git repo url link, and local repository path. Example remote url: ‘https://github.com/user/repo.git’ Example local repo copy: ‘./repo’ -> will automatically store the remote repo url and commit ID based on the locally cloned copy

    • repo_branch (Optional [ str ] ) – Optional, specify the remote repository branch (Ignored, if local repo path is used)

    • repo_commit (Optional [ str ] ) – Optional, specify the repository commit ID (Ignored, if local repo path is used)

    • helper_functions (Optional [ Sequence [ Callable ] ] ) – Optional, a list of helper functions to make available for the standalone function Task.

    • docker (Optional [ str ] ) – Select the docker image to be executed in by the remote session

    • docker_args (Optional [ str ] ) – Add docker arguments, pass a single string

    • docker_bash_setup_script (Optional [ str ] ) – Add bash script to be executed inside the docker before setting up the Task’s environment

    • parents (Optional [ Sequence [ str ] ] ) – Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully.

    • execution_queue (Optional [ str ] ) – Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the default execution queue, as defined on the class

    • monitor_metrics (Optional [ List [ Union [ Tuple [ str , str ] , Tuple [ ( str , str ) , ( str , str ) ] ] ] ] ) – Optional, log the step’s metrics on the pipeline Task. Format is a list of pairs metric (title, series) to log:

      [(step_metric_title, step_metric_series), ] Example: [(‘test’, ‘accuracy’), ]

      Or a list of tuple pairs, to specify a different target metric for to use on the pipeline Task:

      [((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ]
      Example: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]
    • monitor_artifacts (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s artifacts on the pipeline Task. Provided a list of artifact names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘processed_data’, ‘final_processed_data’), ] Alternatively user can also provide a list of artifacts to monitor (target artifact name will be the same as original artifact name) Example: [‘processed_data’, ]

    • monitor_models (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s output models on the pipeline Task. Provided a list of model names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘model_weights’, ‘final_model_weights’), ] Alternatively user can also provide a list of models to monitor (target models name will be the same as original model) Example: [‘model_weights’, ] To select the latest (lexicographic) model use “model*”, or the last created model with just “*” Example: [‘model_weights*’, ]

    • time_limit (Optional [ float ] ) – Default None, no time limit. Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.

    • continue_on_fail (bool ) – (default False). If True, failed step will not cause the pipeline to stop (or marked as failed). Notice, that steps that are connected (or indirectly connected) to the failed step will be skipped.

    • pre_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , dict ] , bool ] ] # noqa ) – Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.

      If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.

      Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.

      def step_created_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      parameters, # type: dict
      ):
      pass
    • post_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node ] , None ] ] # noqa ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.

      def step_completed_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      ):
      pass
    • cache_executed_step (bool ) – If True, before launching the new step, after updating with the latest configuration, check if an exact Task with the same parameter/code was already executed. If it was found, use it instead of launching a new Task. Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used.

    • retry_on_failure (Optional [ Union [ int , Callable [ [ PipelineController , PipelineController.Node , int ] , bool ] ] ] # noqa ) – Integer (number of retries) or Callback function that returns True to allow a retry

      • Integer: In case of node failure, retry the node the number of times indicated by this parameter.

      • Callable: A function called on node failure. Takes as parameters:

        the PipelineController instance, the PipelineController.Node that failed and an int representing the number of previous retries for the node that failed. The function must return True if the node should be retried and False otherwise. If True, the node will be re-queued and the number of retries left will be decremented by 1. By default, if this callback is not specified, the function will be retried the number of times indicated by retry_on_failure.

        def example_retry_on_failure_callback(pipeline, node, retries):
        print(node.name, ' failed')
        # allow up to 5 retries (total of 6 runs)
        return retries &lt; 5
    • status_change_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , str ] , None ] ] # noqa ) – Callback function, called when the status of a step (Task) changes. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. The signature of the function must look the following way:

      def status_change_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      previous_status # type: str
      ):
      pass
    • tags (Optional [ Union [ str , Sequence [ str ] ] ] ) – A list of tags for the specific pipeline step. When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.

    • output_uri (Optional [ Union [ str , bool ] ] ) – The storage / output url for this step. This is the default location for output models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).

    • draft (Optional [ bool ] ) – (default False). If True, the Task will be created as a draft task.

    • working_dir (Optional [ str ] ) – Working directory to launch the script from.

  • Return type

    bool

  • Returns

    True if successful


add_parameter

add_parameter(name, default=None, description=None, param_type=None)

Add a parameter to the pipeline Task. The parameter can be used as input parameter for any step in the pipeline. Notice all parameters will appear under the PipelineController Task’s Hyper-parameters -> Pipeline section Example: pipeline.add_parameter(name=’dataset’, description=’dataset ID to process the pipeline’) Then in one of the steps we can refer to the value of the parameter with ‘${pipeline.dataset}’

  • Parameters

    • name (str) – String name of the parameter.

    • default (Optional[Any]) – Default value to be put as the default value (can be later changed in the UI)

    • description (Optional[str]) – String description of the parameter and its usage in the pipeline

    • param_type (Optional[str]) – Optional, parameter type information (to be used as hint for casting and description)

  • Return type

    None


add_step

add_step(name, base_task_id=None, parents=None, parameter_override=None, configuration_overrides=None, task_overrides=None, execution_queue=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None, time_limit=None, base_task_project=None, base_task_name=None, clone_base_task=True, continue_on_fail=False, pre_execute_callback=None, post_execute_callback=None, cache_executed_step=False, base_task_factory=None, retry_on_failure=None, status_change_callback=None, recursively_parse_parameters=False, output_uri=None)

Add a step to the pipeline execution DAG. Each step must have a unique name (this name will later be used to address the step)

  • Parameters

    • name (str ) – Unique of the step. For example stage1

    • base_task_id (Optional [ str ] ) – The Task ID to use for the step. Each time the step is executed, the base Task is cloned, then the cloned task will be sent for execution.

    • parents (Optional [ Sequence [ str ] ] ) – Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully.

    • parameter_override (Optional [ Mapping [ str , Any ] ] ) – Optional parameter overriding dictionary.

      The dict values can reference a previously executed step using the following form ‘${step_name}’. Examples:

      • Artifact access parameter_override={'Args/input_file': '${&lt;step_name&gt;.artifacts.&lt;artifact_name&gt;.url}' }

      • Model access (last model used) parameter_override={'Args/input_file': '${&lt;step_name&gt;.models.output.-1.url}' }

      • Parameter access parameter_override={'Args/input_file': '${&lt;step_name&gt;.parameters.Args/input_file}' }

      • Pipeline Task argument (see Pipeline.add_parameter) parameter_override={'Args/input_file': '${pipeline.&lt;pipeline_parameter&gt;}' }

      • Task ID parameter_override={'Args/input_file': '${stage3.id}' }

    • recursively_parse_parameters (bool ) – If True, recursively parse parameters from parameter_override in lists, dicts, or tuples. Example:

      - `parameter_override={'Args/input_file': ['${&lt;step_name&gt;.artifacts.&lt;artifact_name&gt;.url}', 'file2.txt']}` will be correctly parsed.
      - `parameter_override={'Args/input_file': ('${&lt;step_name_1&gt;.parameters.Args/input_file}', '${&lt;step_name_2&gt;.parameters.Args/input_file}')}` will be correctly parsed.

      * **configuration_overrides** (*Optional* *[* *Mapping* *[* *str* *, * *Union* *[* *str* *, * *Mapping* *]* *]* *]* ) – Optional, override Task configuration objects.
      Expected dictionary of configuration object name and configuration object content.
      Examples:

      {‘General’: dict(key=’value’)} {‘General’: ‘configuration file content’} {‘OmegaConf’: YAML.dumps(full_hydra_dict)}

    • task_overrides (Optional [ Mapping [ str , Any ] ] ) – Optional task section overriding dictionary.

      The dict values can reference a previously executed step using the following form ‘${step_name}’. Examples:

      • get the latest commit from a specific branch task_overrides={'script.version_num': '', 'script.branch': 'main'}

      • match git repository branch to a previous step task_overrides={'script.branch': '${stage1.script.branch}', 'script.version_num': ''}

      • change container image task_overrides={'container.image': 'nvidia/cuda:11.6.0-devel-ubuntu20.04', 'container.arguments': '--ipc=host'}

      • match container image to a previous step task_overrides={'container.image': '${stage1.container.image}'}

      • reset requirements (the agent will use the “requirements.txt” inside the repo) task_overrides={'script.requirements.pip': ""}

    • execution_queue (Optional [ str ] ) – Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the default execution queue, as defined on the class

    • monitor_metrics (Optional [ List [ Union [ Tuple [ str , str ] , Tuple [ ( str , str ) , ( str , str ) ] ] ] ] ) – Optional, log the step’s metrics on the pipeline Task. Format is a list of pairs metric (title, series) to log:

      [(step_metric_title, step_metric_series), ] Example: [(‘test’, ‘accuracy’), ]

      Or a list of tuple pairs, to specify a different target metric for to use on the pipeline Task:

      [((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ]
      Example: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]
    • monitor_artifacts (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s artifacts on the pipeline Task. Provided a list of artifact names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘processed_data’, ‘final_processed_data’), ] Alternatively user can also provide a list of artifacts to monitor (target artifact name will be the same as original artifact name) Example: [‘processed_data’, ]

    • monitor_models (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s output models on the pipeline Task. Provided a list of model names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘model_weights’, ‘final_model_weights’), ] Alternatively user can also provide a list of models to monitor (target models name will be the same as original model) Example: [‘model_weights’, ] To select the latest (lexicographic) model use “model*”, or the last created model with just “*” Example: [‘model_weights*’, ]

    • time_limit (Optional [ float ] ) – Default None, no time limit. Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.

    • base_task_project (Optional [ str ] ) – If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.

    • base_task_name (Optional [ str ] ) – If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.

    • clone_base_task (bool ) – If True (default), the pipeline will clone the base task, and modify/enqueue the cloned Task. If False, the base-task is used directly, notice it has to be in draft-mode (created).

    • continue_on_fail (bool ) – (default False). If True, failed step will not cause the pipeline to stop (or marked as failed). Notice, that steps that are connected (or indirectly connected) to the failed step will be skipped.

    • pre_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , dict ] , bool ] ] # noqa ) – Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.

      If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.

      Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.

      def step_created_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      parameters, # type: dict
      ):
      pass
    • post_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node ] , None ] ] # noqa ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.

      def step_completed_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      ):
      pass
    • cache_executed_step (bool ) – If True, before launching the new step, after updating with the latest configuration, check if an exact Task with the same parameter/code was already executed. If it was found, use it instead of launching a new Task. Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used. If clone_base_task is False there is no cloning, hence the base_task is used.

    • base_task_factory (Optional [ Callable [ [ PipelineController.Node ] , Task ] ] ) – Optional, instead of providing a pre-existing Task, provide a Callable function to create the Task (returns Task object)

    • retry_on_failure (Optional [ Union [ int , Callable [ [ PipelineController , PipelineController.Node , int ] , bool ] ] ] # noqa ) – Integer (number of retries) or Callback function that returns True to allow a retry

      • Integer: In case of node failure, retry the node the number of times indicated by this parameter.

      • Callable: A function called on node failure. Takes as parameters:

        the PipelineController instance, the PipelineController.Node that failed and an int representing the number of previous retries for the node that failed. The function must return True if the node should be retried and False otherwise. If True, the node will be re-queued and the number of retries left will be decremented by 1. By default, if this callback is not specified, the function will be retried the number of times indicated by retry_on_failure.

        def example_retry_on_failure_callback(pipeline, node, retries):
        print(node.name, ' failed')
        # allow up to 5 retries (total of 6 runs)
        return retries &lt; 5
    • status_change_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , str ] , None ] ] # noqa ) – Callback function, called when the status of a step (Task) changes. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. The signature of the function must look the following way:

      def status_change_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      previous_status # type: str
      ):
      pass
    • output_uri (Optional [ Union [ str , bool ] ] ) – The storage / output url for this step. This is the default location for output models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).

  • Return type

    bool

  • Returns

    True if successful


add_tags

add_tags(tags)

Add tags to this pipeline. Old tags are not deleted. When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.

  • Parameters

    tags (Union[Sequence[str], str]) – A list of tags for this pipeline.

  • Return type

    None


connect_configuration

connect_configuration(configuration, name=None, description=None)

Connect a configuration dictionary or configuration file (pathlib.Path / str) to the PipelineController object. This method should be called before reading the configuration file.

For example, a local file:

config_file = pipe.connect_configuration(config_file)
my_params = json.load(open(config_file,'rt'))

A parameter dictionary/list:

my_params = pipe.connect_configuration(my_params)
  • Parameters

    • configuration (Union[Mapping, list, Path, str]) – The configuration. This is usually the configuration used in the model training process.

      Specify one of the following:

      • A dictionary/list - A dictionary containing the configuration. ClearML stores the configuration in

        the ClearML Server (backend), in a HOCON format (JSON-like format) which is editable.

      • A pathlib2.Path string - A path to the configuration file. ClearML stores the content of the file.

        A local path must be relative path. When executing a pipeline remotely in a worker, the contents brought from the ClearML Server (backend) overwrites the contents of the file.

    • name (str ) – Configuration section name. default: ‘General’ Allowing users to store multiple configuration dicts/files

    • description (str ) – Configuration section description (text). default: None

  • Return type

    Union[dict, Path, str]

  • Returns

    If a dictionary is specified, then a dictionary is returned. If pathlib2.Path / string is specified, then a path to a local configuration file is returned. Configuration object.


create_draft

create_draft()

Optional, manually create & serialize the Pipeline Task (use with care for manual multi pipeline creation).

Notice The recommended flow would be to call pipeline.start(queue=None) which would have a similar effect and will allow you to clone/enqueue later on.

After calling Pipeline.create(), users can edit the pipeline in the UI and enqueue it for execution.

Notice: this function should be used to programmatically create pipeline for later usage. To automatically create and launch pipelines, call the start() method.

  • Return type

    None


elapsed

elapsed()

Return minutes elapsed from controller stating time stamp.

  • Return type

    float

  • Returns

    The minutes from controller start time. A negative value means the process has not started yet.


PipelineController.enqueue

classmethod enqueue(pipeline_controller, queue_name=None, queue_id=None, force=False)

Enqueue a PipelineController for execution, by adding it to an execution queue.

info

A worker daemon must be listening at the queue for the worker to fetch the Task and execute it, see ClearML Agent in the ClearML Documentation.

  • Parameters

    • pipeline_controller (Union[PipelineController, str]) – The PipelineController to enqueue. Specify a PipelineController object or PipelineController ID

    • queue_name (Optional[str]) – The name of the queue. If not specified, then queue_id must be specified.

    • queue_id (Optional[str]) – The ID of the queue. If not specified, then queue_name must be specified.

    • force (bool ) – If True, reset the PipelineController if necessary before enqueuing it

  • Return type

    Any

  • Returns

    An enqueue JSON response.

    {
    "queued": 1,
    "updated": 1,
    "fields": {
    "status": "queued",
    "status_reason": "",
    "status_message": "",
    "status_changed": "2020-02-24T15:05:35.426770+00:00",
    "last_update": "2020-02-24T15:05:35.426770+00:00",
    "execution.queue": "2bd96ab2d9e54b578cc2fb195e52c7cf"
    }
    }
    • queued - The number of Tasks enqueued (an integer or null).

    • updated - The number of Tasks updated (an integer or null).

    • fields

      • status - The status of the experiment.

      • status_reason - The reason for the last status change.

      • status_message - Information about the status.

      • status_changed - The last status change date and time (ISO 8601 format).

      • last_update - The last Task update time, including Task creation, update, change, or events for this task (ISO 8601 format).

      • execution.queue - The ID of the queue where the Task is enqueued. null indicates not enqueued.


PipelineController.get

classmethod get(pipeline_id=None, pipeline_project=None, pipeline_name=None, pipeline_version=None, pipeline_tags=None, shallow_search=False)

Get a specific PipelineController. If multiple pipeline controllers are found, the pipeline controller with the highest semantic version is returned. If no semantic version is found, the most recently updated pipeline controller is returned. This function raises aan Exception if no pipeline controller was found

Note: In order to run the pipeline controller returned by this function, use PipelineController.enqueue

  • Parameters

    • pipeline_id (Optional[str]) – Requested PipelineController ID

    • pipeline_project (Optional[str]) – Requested PipelineController project

    • pipeline_name (Optional[str]) – Requested PipelineController name

    • pipeline_tags (Optional[Sequence[str]]) – Requested PipelineController tags (list of tag strings)

    • shallow_search (bool) – If True, search only the first 500 results (first page)

    • pipeline_version (Optional [ str ] ) –

  • Return type

    ForwardRef


PipelineController.get_logger

classmethod get_logger()

Return a logger connected to the Pipeline Task. The logger can be used by any function/tasks executed by the pipeline, in order to report directly to the pipeline Task itself. It can also be called from the main pipeline control Task.

Raise ValueError if main Pipeline task could not be located.

  • Return type

    Logger

  • Returns

    Logger object for reporting metrics (scalars, plots, debug samples etc.)


get_parameters

get_parameters()

Return the pipeline parameters dictionary :rtype: dict :return: Dictionary str -> str

  • Return type

    dict


get_pipeline_dag

get_pipeline_dag()

Return the pipeline execution graph, each node in the DAG is PipelineController.Node object. Graph itself is a dictionary of Nodes (key based on the Node name), each node holds links to its parent Nodes (identified by their unique names)

  • Return type

    Mapping[str, Node]

  • Returns

    execution tree, as a nested dictionary. Example:

{
'stage1' : Node() {
name: 'stage1'
job: ClearmlJob
...
},
}

get_processed_nodes

get_processed_nodes()

Return a list of the processed pipeline nodes, each entry in the list is PipelineController.Node object.

  • Return type

    Sequence[Node]

  • Returns

    executed (excluding currently executing) nodes list


get_running_nodes

get_running_nodes()

Return a list of the currently running pipeline nodes, each entry in the list is PipelineController.Node object.

  • Return type

    Sequence[Node]

  • Returns

    Currently running nodes list


is_running

is_running()

return True if the pipeline controller is running.

  • Return type

    bool

  • Returns

    A boolean indicating whether the pipeline controller is active (still running) or stopped.


is_successful

is_successful(fail_on_step_fail=True, fail_condition='all')

Evaluate whether the pipeline is successful.

  • Parameters

    • fail_on_step_fail (bool) – If True (default), evaluate the pipeline steps’ status to assess if the pipeline is successful. If False, only evaluate the controller

    • fail_condition (str) – Must be one of the following: ‘all’ (default), ‘failed’ or ‘aborted’. If ‘failed’, this function will return False if the pipeline failed and True if the pipeline was aborted. If ‘aborted’, this function will return False if the pipeline was aborted and True if the pipeline failed. If ‘all’, this function will return False in both cases.

  • Return type

    bool

  • Returns

    A boolean indicating whether the pipeline was successful or not. Note that if the pipeline is in a running/pending state, this function will return False


set_default_execution_queue

set_default_execution_queue(default_execution_queue)

Set the default execution queue if pipeline step does not specify an execution queue

  • Parameters

    default_execution_queue (Optional[str]) – The execution queue to use if no execution queue is provided

  • Return type

    None


set_pipeline_execution_time_limit

set_pipeline_execution_time_limit(max_execution_minutes)

Set maximum execution time (minutes) for the entire pipeline. Pass None or 0 to disable execution time limit.

  • Parameters

    max_execution_minutes (float ) – The maximum time (minutes) for the entire pipeline process. The default is None, indicating no time limit.

  • Return type

    None


start

start(queue='services', step_task_created_callback=None, step_task_completed_callback=None, wait=True)

Start the current pipeline remotely (on the selected services queue). The current process will be stopped and launched remotely.

  • Parameters

    • queue – queue name to launch the pipeline on

    • step_task_created_callback (Callable ) – Callback function, called when a step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.

      If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.

      Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.

      def step_created_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      parameters, # type: dict
      ):
      pass
    • step_task_completed_callback (Callable ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.

      def step_completed_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      ):
      pass
    • wait – If True (default), start the pipeline controller, return only after the pipeline is done (completed/aborted/failed)

  • Return type

    bool

  • Returns

    True, if the controller started. False, if the controller did not start.


start_locally

start_locally(run_pipeline_steps_locally=False)

Start the current pipeline locally, meaning the pipeline logic is running on the current machine, instead of on the services queue.

Using run_pipeline_steps_locally=True you can run all the pipeline steps locally as sub-processes. Notice: when running pipeline steps locally, it assumes local code execution (i.e. it is running the local code as is, regardless of the git commit/diff on the pipeline steps Task)

  • Parameters

    run_pipeline_steps_locally (bool) – (default False) If True, run the pipeline steps themselves locally as a subprocess (use for debugging the pipeline locally, notice the pipeline code is expected to be available on the local machine)

  • Return type

    None


stop

stop(timeout=None, mark_failed=False, mark_aborted=False)

Stop the pipeline controller and the optimization thread. If mark_failed and mark_aborted are False (default) mark the pipeline as completed, unless one of the steps failed, then mark the pipeline as failed.

  • Parameters

    • timeout (Optional [ float ] ) – Wait timeout for the optimization thread to exit (minutes). The default is None, indicating do not wait to terminate immediately.

    • mark_failed (bool ) – If True, mark the pipeline task as failed. (default False)

    • mark_aborted (bool ) – If True, mark the pipeline task as aborted. (default False)

  • Return type

    ()


update_execution_plot

update_execution_plot()

Update sankey diagram of the current pipeline

  • Return type

    ()


PipelineController.upload_artifact

classmethod upload_artifact(name, artifact_object, metadata=None, delete_after_upload=False, auto_pickle=True, preview=None, wait_on_upload=False, serialization_function=None)

Upload (add) an artifact to the main Pipeline Task object. This function can be called from any pipeline component to directly add artifacts into the main pipeline Task.

The artifact can be uploaded by any function/tasks executed by the pipeline, in order to report directly to the pipeline Task itself. It can also be called from the main pipeline control Task.

Raise ValueError if main Pipeline task could not be located.

The currently supported upload artifact types include:

  • string / Path - A path to artifact file. If a wildcard or a folder is specified, then ClearML creates and uploads a ZIP file.

  • dict - ClearML stores a dictionary as .json file and uploads it.

  • pandas.DataFrame - ClearML stores a pandas.DataFrame as .csv.gz (compressed CSV) file and uploads it.

  • numpy.ndarray - ClearML stores a numpy.ndarray as .npz file and uploads it.

  • PIL.Image - ClearML stores a PIL.Image as .png file and uploads it.

  • Any - If called with auto_pickle=True, the object will be pickled and uploaded.

  • Parameters

    • name (str ) – The artifact name.

      danger

      If an artifact with the same name was previously uploaded, then it is overwritten.

    • artifact_object (object ) – The artifact object.

    • metadata (dict ) – A dictionary of key-value pairs for any metadata. This dictionary appears with the experiment in the ClearML Web-App (UI), ARTIFACTS tab.

    • delete_after_upload (bool ) – After the upload, delete the local copy of the artifact

      • True - Delete the local copy of the artifact.

      • False - Do not delete. (default)

    • auto_pickle (bool ) – If True (default), and the artifact_object is not one of the following types: pathlib2.Path, dict, pandas.DataFrame, numpy.ndarray, PIL.Image, url (string), local_file (string) the artifact_object will be pickled and uploaded as pickle file artifact (with file extension .pkl)

    • preview (Any ) – The artifact preview

    • wait_on_upload (bool ) – Whether the upload should be synchronous, forcing the upload to complete before continuing.

    • Union [ bytes , bytearray ] ] serialization_function (Callable [ Any , ) – A serialization function that takes one parameter of any type which is the object to be serialized. The function should return a bytes or bytearray object, which represents the serialized object. Note that the object will be immediately serialized using this function, thus other serialization methods will not be used (e.g. pandas.DataFrame.to_csv), even if possible. To deserialize this artifact when getting it using the Artifact.get method, use its deserialization_function argument.

    • serialization_function (Optional [ Callable [ [ Any ] , Union [ bytes , bytearray ] ] ] ) –

  • Return type

    bool

  • Returns

    The status of the upload.

  • True - Upload succeeded.

  • False - Upload failed.

  • Raise

    If the artifact object type is not supported, raise a ValueError.

  • Parameters

    • name (str ) –

    • artifact_object (Any ) –

    • metadata (Optional [ Mapping ] ) –

    • delete_after_upload (bool ) –

    • auto_pickle (bool ) –

    • preview (Optional [ Any ] ) –

    • wait_on_upload (bool ) –

    • serialization_function (Optional [ Callable [ [ Any ] , Union [ bytes , bytearray ] ] ] ) –

  • Return type

    bool


PipelineController.upload_model

classmethod upload_model(model_name, model_local_path, upload_uri=None)

Upload (add) a model to the main Pipeline Task object. This function can be called from any pipeline component to directly add models into the main pipeline Task

The model file/path will be uploaded to the Pipeline Task and registered on the model repository.

Raise ValueError if main Pipeline task could not be located.

  • Parameters

    • model_name (str) – Model name as will appear in the model registry (in the pipeline’s project)

    • model_local_path (str) – Path to the local model file or directory to be uploaded. If a local directory is provided the content of the folder (recursively) will be packaged into a zip file and uploaded

    • upload_uri (Optional[str]) – The URI of the storage destination for model weights upload. The default value is the previously used URI.

  • Return type

    OutputModel

  • Returns

    The uploaded OutputModel


wait

wait(timeout=None)

Wait for the pipeline to finish.

info

This method does not stop the pipeline. Call stop to terminate the pipeline.

  • Parameters

    timeout (float ) – The timeout to wait for the pipeline to complete (minutes). If None, then wait until we reached the timeout, or pipeline completed.

  • Return type

    bool

  • Returns

    True, if the pipeline finished. False, if the pipeline timed out.

class automation.controller.PipelineDecorator()

Create a new pipeline controller. The newly created object will launch and monitor the new experiments.

  • Parameters

    • name (str ) – Provide pipeline name (if main Task exists it overrides its name)

    • project (str ) – Provide project storing the pipeline (if main Task exists it overrides its project)

    • version (Optional [ str ] ) – Pipeline version. This version allows to uniquely identify the pipeline template execution. Examples for semantic versions: version=’1.0.1’ , version=’23’, version=’1.2’. If not set, find the latest version of the pipeline and increment it. If no such version is found, default to ‘1.0.0’

    • pool_frequency (float ) – The pooling frequency (in minutes) for monitoring experiments / states.

    • add_pipeline_tags (bool ) – (default: False) if True, add pipe: <pipeline_task_id> tag to all steps (Tasks) created by this pipeline.

    • target_project (str ) – If provided, all pipeline steps are cloned into the target project

    • abort_on_failure (bool ) – If False (default), failed pipeline steps will not cause the pipeline to stop immediately, instead any step that is not connected (or indirectly connected) to the failed step, will still be executed. Nonetheless, the pipeline itself will be marked failed, unless the failed step was specifically defined with “continue_on_fail=True”. If True, any failed step will cause the pipeline to immediately abort, stop all running steps, and mark the pipeline as failed.

    • add_run_number (bool ) – If True (default), add the run number of the pipeline to the pipeline name. Example, the second time we launch the pipeline “best pipeline”, we rename it to “best pipeline #2”

    • retry_on_failure (Optional [ Union [ int , Callable [ [ PipelineController , PipelineController.Node , int ] , bool ] ] ] # noqa ) – Integer (number of retries) or Callback function that returns True to allow a retry

      • Integer: In case of node failure, retry the node the number of times indicated by this parameter.

      • Callable: A function called on node failure. Takes as parameters:

        the PipelineController instance, the PipelineController.Node that failed and an int representing the number of previous retries for the node that failed. The function must return True if the node should be retried and False otherwise. If True, the node will be re-queued and the number of retries left will be decremented by 1. By default, if this callback is not specified, the function will be retried the number of times indicated by retry_on_failure.

        def example_retry_on_failure_callback(pipeline, node, retries):
        print(node.name, ' failed')
        # allow up to 5 retries (total of 6 runs)
        return retries &lt; 5
    • docker (Optional [ str ] ) – Select the docker image to be executed in by the remote session

    • docker_args (Optional [ str ] ) – Add docker arguments, pass a single string

    • docker_bash_setup_script (Optional [ str ] ) – Add bash script to be executed inside the docker before setting up the Task’s environment

    • packages (Optional [ Union [ str , Sequence [ str ] ] ] ) – Manually specify a list of required packages or a local requirements.txt file. Example: [“tqdm>=2.1”, “scikit-learn”] or “./requirements.txt” If not provided, packages are automatically added.

    • repo (Optional [ str ] ) – Optional, specify a repository to attach to the pipeline controller, when remotely executing. Allow users to execute the controller inside the specified repository, enabling them to load modules/script from the repository. Notice the execution work directory will be the repository root folder. Supports both git repo url link, and local repository path (automatically converted into the remote git/commit as is currently checkout). Example remote url: ‘https://github.com/user/repo.git’ Example local repo copy: ‘./repo’ -> will automatically store the remote repo url and commit ID based on the locally cloned copy Use empty string (“”) to disable any repository auto-detection

    • repo_branch (Optional [ str ] ) – Optional, specify the remote repository branch (Ignored, if local repo path is used)

    • repo_commit (Optional [ str ] ) – Optional, specify the repository commit ID (Ignored, if local repo path is used)

    • artifact_serialization_function (Optional [ Callable [ [ Any ] , Union [ bytes , bytearray ] ] ] ) – A serialization function that takes one parameter of any type which is the object to be serialized. The function should return a bytes or bytearray object, which represents the serialized object. All parameter/return artifacts uploaded by the pipeline will be serialized using this function. All relevant imports must be done in this function. For example:

      def serialize(obj):
      import dill
      return dill.dumps(obj)
    • artifact_deserialization_function (Optional [ Callable [ [ bytes ] , Any ] ] ) – A deserialization function that takes one parameter of type bytes, which represents the serialized object. This function should return the deserialized object. All parameter/return artifacts fetched by the pipeline will be deserialized using this function. All relevant imports must be done in this function. For example:

      def deserialize(bytes_):
      import dill
      return dill.loads(bytes_)
    • output_uri (Optional [ Union [ str , bool ] ] ) – The storage / output url for this pipeline. This is the default location for output models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter). The output_uri of this pipeline’s steps will default to this value.

    • skip_global_imports (bool ) – If True, global imports will not be included in the steps’ execution, otherwise all global imports will be automatically imported in a safe manner at the beginning of each step’s execution. Default is False

    • working_dir (Optional [ str ] ) – Working directory to launch the pipeline from.


add_function_step

add_function_step(name, function, function_kwargs=None, function_return=None, project_name=None, task_name=None, task_type=None, auto_connect_frameworks=None, auto_connect_arg_parser=None, packages=None, repo=None, repo_branch=None, repo_commit=None, helper_functions=None, docker=None, docker_args=None, docker_bash_setup_script=None, parents=None, execution_queue=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None, time_limit=None, continue_on_fail=False, pre_execute_callback=None, post_execute_callback=None, cache_executed_step=False, retry_on_failure=None, status_change_callback=None, tags=None, output_uri=None, draft=False, working_dir=None)

Create a Task from a function, including wrapping the function input arguments into the hyper-parameter section as kwargs, and storing function results as named artifacts

Example:

def mock_func(a=6, b=9):
c = a*b
print(a, b, c)
return c, c**2

create_task_from_function(mock_func, function_return=['mul', 'square'])

Example arguments from other Tasks (artifact):

def mock_func(matrix_np):
c = matrix_np*matrix_np
print(matrix_np, c)
return c

create_task_from_function(
mock_func,
function_kwargs={'matrix_np': 'aabb1122.previous_matrix'},
function_return=['square_matrix']
)
  • Parameters

    • name (str ) – Unique of the step. For example stage1

    • function (Callable ) – A global function to convert into a standalone Task

    • function_kwargs (Optional [ Dict [ str , Any ] ] ) – Optional, provide subset of function arguments and default values to expose. If not provided automatically take all function arguments & defaults Optional, pass input arguments to the function from other Tasks’ output artifact. Example argument named numpy_matrix from Task ID aabbcc artifact name answer: {‘numpy_matrix’: ‘aabbcc.answer’}

    • function_return (Optional [ List [ str ] ] ) – Provide a list of names for all the results. If not provided, no results will be stored as artifacts.

    • project_name (Optional [ str ] ) – Set the project name for the task. Required if base_task_id is None.

    • task_name (Optional [ str ] ) – Set the name of the remote task, if not provided use name argument.

    • task_type (Optional [ str ] ) – Optional, The task type to be created. Supported values: ‘training’, ‘testing’, ‘inference’, ‘data_processing’, ‘application’, ‘monitor’, ‘controller’, ‘optimizer’, ‘service’, ‘qc’, ‘custom’

    • auto_connect_frameworks (Optional [ dict ] ) – Control the frameworks auto connect, see Task.init auto_connect_frameworks

    • auto_connect_arg_parser (Optional [ dict ] ) – Control the ArgParser auto connect, see Task.init auto_connect_arg_parser

    • packages (Optional [ Union [ str , Sequence [ str ] ] ] ) – Manually specify a list of required packages or a local requirements.txt file. Example: [“tqdm>=2.1”, “scikit-learn”] or “./requirements.txt” If not provided, packages are automatically added based on the imports used in the function.

    • repo (Optional [ str ] ) – Optional, specify a repository to attach to the function, when remotely executing. Allow users to execute the function inside the specified repository, enabling to load modules/script from a repository Notice the execution work directory will be the repository root folder. Supports both git repo url link, and local repository path. Example remote url: ‘https://github.com/user/repo.git’ Example local repo copy: ‘./repo’ -> will automatically store the remote repo url and commit ID based on the locally cloned copy

    • repo_branch (Optional [ str ] ) – Optional, specify the remote repository branch (Ignored, if local repo path is used)

    • repo_commit (Optional [ str ] ) – Optional, specify the repository commit ID (Ignored, if local repo path is used)

    • helper_functions (Optional [ Sequence [ Callable ] ] ) – Optional, a list of helper functions to make available for the standalone function Task.

    • docker (Optional [ str ] ) – Select the docker image to be executed in by the remote session

    • docker_args (Optional [ str ] ) – Add docker arguments, pass a single string

    • docker_bash_setup_script (Optional [ str ] ) – Add bash script to be executed inside the docker before setting up the Task’s environment

    • parents (Optional [ Sequence [ str ] ] ) – Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully.

    • execution_queue (Optional [ str ] ) – Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the default execution queue, as defined on the class

    • monitor_metrics (Optional [ List [ Union [ Tuple [ str , str ] , Tuple [ ( str , str ) , ( str , str ) ] ] ] ] ) – Optional, log the step’s metrics on the pipeline Task. Format is a list of pairs metric (title, series) to log:

      [(step_metric_title, step_metric_series), ] Example: [(‘test’, ‘accuracy’), ]

      Or a list of tuple pairs, to specify a different target metric for to use on the pipeline Task:

      [((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ]
      Example: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]
    • monitor_artifacts (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s artifacts on the pipeline Task. Provided a list of artifact names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘processed_data’, ‘final_processed_data’), ] Alternatively user can also provide a list of artifacts to monitor (target artifact name will be the same as original artifact name) Example: [‘processed_data’, ]

    • monitor_models (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s output models on the pipeline Task. Provided a list of model names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘model_weights’, ‘final_model_weights’), ] Alternatively user can also provide a list of models to monitor (target models name will be the same as original model) Example: [‘model_weights’, ] To select the latest (lexicographic) model use “model*”, or the last created model with just “*” Example: [‘model_weights*’, ]

    • time_limit (Optional [ float ] ) – Default None, no time limit. Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.

    • continue_on_fail (bool ) – (default False). If True, failed step will not cause the pipeline to stop (or marked as failed). Notice, that steps that are connected (or indirectly connected) to the failed step will be skipped.

    • pre_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , dict ] , bool ] ] # noqa ) – Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.

      If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.

      Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.

      def step_created_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      parameters, # type: dict
      ):
      pass
    • post_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node ] , None ] ] # noqa ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.

      def step_completed_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      ):
      pass
    • cache_executed_step (bool ) – If True, before launching the new step, after updating with the latest configuration, check if an exact Task with the same parameter/code was already executed. If it was found, use it instead of launching a new Task. Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used.

    • retry_on_failure (Optional [ Union [ int , Callable [ [ PipelineController , PipelineController.Node , int ] , bool ] ] ] # noqa ) – Integer (number of retries) or Callback function that returns True to allow a retry

      • Integer: In case of node failure, retry the node the number of times indicated by this parameter.

      • Callable: A function called on node failure. Takes as parameters:

        the PipelineController instance, the PipelineController.Node that failed and an int representing the number of previous retries for the node that failed. The function must return True if the node should be retried and False otherwise. If True, the node will be re-queued and the number of retries left will be decremented by 1. By default, if this callback is not specified, the function will be retried the number of times indicated by retry_on_failure.

        def example_retry_on_failure_callback(pipeline, node, retries):
        print(node.name, ' failed')
        # allow up to 5 retries (total of 6 runs)
        return retries &lt; 5
    • status_change_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , str ] , None ] ] # noqa ) – Callback function, called when the status of a step (Task) changes. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. The signature of the function must look the following way:

      def status_change_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      previous_status # type: str
      ):
      pass
    • tags (Optional [ Union [ str , Sequence [ str ] ] ] ) – A list of tags for the specific pipeline step. When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.

    • output_uri (Optional [ Union [ str , bool ] ] ) – The storage / output url for this step. This is the default location for output models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).

    • draft (Optional [ bool ] ) – (default False). If True, the Task will be created as a draft task.

    • working_dir (Optional [ str ] ) – Working directory to launch the script from.

  • Return type

    bool

  • Returns

    True if successful


add_parameter

add_parameter(name, default=None, description=None, param_type=None)

Add a parameter to the pipeline Task. The parameter can be used as input parameter for any step in the pipeline. Notice all parameters will appear under the PipelineController Task’s Hyper-parameters -> Pipeline section Example: pipeline.add_parameter(name=’dataset’, description=’dataset ID to process the pipeline’) Then in one of the steps we can refer to the value of the parameter with ‘${pipeline.dataset}’

  • Parameters

    • name (str) – String name of the parameter.

    • default (Optional[Any]) – Default value to be put as the default value (can be later changed in the UI)

    • description (Optional[str]) – String description of the parameter and its usage in the pipeline

    • param_type (Optional[str]) – Optional, parameter type information (to be used as hint for casting and description)

  • Return type

    None


add_step

add_step(name, base_task_id=None, parents=None, parameter_override=None, configuration_overrides=None, task_overrides=None, execution_queue=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None, time_limit=None, base_task_project=None, base_task_name=None, clone_base_task=True, continue_on_fail=False, pre_execute_callback=None, post_execute_callback=None, cache_executed_step=False, base_task_factory=None, retry_on_failure=None, status_change_callback=None, recursively_parse_parameters=False, output_uri=None)

Add a step to the pipeline execution DAG. Each step must have a unique name (this name will later be used to address the step)

  • Parameters

    • name (str ) – Unique of the step. For example stage1

    • base_task_id (Optional [ str ] ) – The Task ID to use for the step. Each time the step is executed, the base Task is cloned, then the cloned task will be sent for execution.

    • parents (Optional [ Sequence [ str ] ] ) – Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully.

    • parameter_override (Optional [ Mapping [ str , Any ] ] ) – Optional parameter overriding dictionary.

      The dict values can reference a previously executed step using the following form ‘${step_name}’. Examples:

      • Artifact access parameter_override={'Args/input_file': '${&lt;step_name&gt;.artifacts.&lt;artifact_name&gt;.url}' }

      • Model access (last model used) parameter_override={'Args/input_file': '${&lt;step_name&gt;.models.output.-1.url}' }

      • Parameter access parameter_override={'Args/input_file': '${&lt;step_name&gt;.parameters.Args/input_file}' }

      • Pipeline Task argument (see Pipeline.add_parameter) parameter_override={'Args/input_file': '${pipeline.&lt;pipeline_parameter&gt;}' }

      • Task ID parameter_override={'Args/input_file': '${stage3.id}' }

    • recursively_parse_parameters (bool ) – If True, recursively parse parameters from parameter_override in lists, dicts, or tuples. Example:

      - `parameter_override={'Args/input_file': ['${&lt;step_name&gt;.artifacts.&lt;artifact_name&gt;.url}', 'file2.txt']}` will be correctly parsed.
      - `parameter_override={'Args/input_file': ('${&lt;step_name_1&gt;.parameters.Args/input_file}', '${&lt;step_name_2&gt;.parameters.Args/input_file}')}` will be correctly parsed.

      * **configuration_overrides** (*Optional* *[* *Mapping* *[* *str* *, * *Union* *[* *str* *, * *Mapping* *]* *]* *]* ) – Optional, override Task configuration objects.
      Expected dictionary of configuration object name and configuration object content.
      Examples:

      {‘General’: dict(key=’value’)} {‘General’: ‘configuration file content’} {‘OmegaConf’: YAML.dumps(full_hydra_dict)}

    • task_overrides (Optional [ Mapping [ str , Any ] ] ) – Optional task section overriding dictionary.

      The dict values can reference a previously executed step using the following form ‘${step_name}’. Examples:

      • get the latest commit from a specific branch task_overrides={'script.version_num': '', 'script.branch': 'main'}

      • match git repository branch to a previous step task_overrides={'script.branch': '${stage1.script.branch}', 'script.version_num': ''}

      • change container image task_overrides={'container.image': 'nvidia/cuda:11.6.0-devel-ubuntu20.04', 'container.arguments': '--ipc=host'}

      • match container image to a previous step task_overrides={'container.image': '${stage1.container.image}'}

      • reset requirements (the agent will use the “requirements.txt” inside the repo) task_overrides={'script.requirements.pip': ""}

    • execution_queue (Optional [ str ] ) – Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the default execution queue, as defined on the class

    • monitor_metrics (Optional [ List [ Union [ Tuple [ str , str ] , Tuple [ ( str , str ) , ( str , str ) ] ] ] ] ) – Optional, log the step’s metrics on the pipeline Task. Format is a list of pairs metric (title, series) to log:

      [(step_metric_title, step_metric_series), ] Example: [(‘test’, ‘accuracy’), ]

      Or a list of tuple pairs, to specify a different target metric for to use on the pipeline Task:

      [((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ]
      Example: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]
    • monitor_artifacts (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s artifacts on the pipeline Task. Provided a list of artifact names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘processed_data’, ‘final_processed_data’), ] Alternatively user can also provide a list of artifacts to monitor (target artifact name will be the same as original artifact name) Example: [‘processed_data’, ]

    • monitor_models (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s output models on the pipeline Task. Provided a list of model names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘model_weights’, ‘final_model_weights’), ] Alternatively user can also provide a list of models to monitor (target models name will be the same as original model) Example: [‘model_weights’, ] To select the latest (lexicographic) model use “model*”, or the last created model with just “*” Example: [‘model_weights*’, ]

    • time_limit (Optional [ float ] ) – Default None, no time limit. Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.

    • base_task_project (Optional [ str ] ) – If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.

    • base_task_name (Optional [ str ] ) – If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.

    • clone_base_task (bool ) – If True (default), the pipeline will clone the base task, and modify/enqueue the cloned Task. If False, the base-task is used directly, notice it has to be in draft-mode (created).

    • continue_on_fail (bool ) – (default False). If True, failed step will not cause the pipeline to stop (or marked as failed). Notice, that steps that are connected (or indirectly connected) to the failed step will be skipped.

    • pre_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , dict ] , bool ] ] # noqa ) – Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.

      If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.

      Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.

      def step_created_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      parameters, # type: dict
      ):
      pass
    • post_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node ] , None ] ] # noqa ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.

      def step_completed_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      ):
      pass
    • cache_executed_step (bool ) – If True, before launching the new step, after updating with the latest configuration, check if an exact Task with the same parameter/code was already executed. If it was found, use it instead of launching a new Task. Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used. If clone_base_task is False there is no cloning, hence the base_task is used.

    • base_task_factory (Optional [ Callable [ [ PipelineController.Node ] , Task ] ] ) – Optional, instead of providing a pre-existing Task, provide a Callable function to create the Task (returns Task object)

    • retry_on_failure (Optional [ Union [ int , Callable [ [ PipelineController , PipelineController.Node , int ] , bool ] ] ] # noqa ) – Integer (number of retries) or Callback function that returns True to allow a retry

      • Integer: In case of node failure, retry the node the number of times indicated by this parameter.

      • Callable: A function called on node failure. Takes as parameters:

        the PipelineController instance, the PipelineController.Node that failed and an int representing the number of previous retries for the node that failed. The function must return True if the node should be retried and False otherwise. If True, the node will be re-queued and the number of retries left will be decremented by 1. By default, if this callback is not specified, the function will be retried the number of times indicated by retry_on_failure.

        def example_retry_on_failure_callback(pipeline, node, retries):
        print(node.name, ' failed')
        # allow up to 5 retries (total of 6 runs)
        return retries &lt; 5
    • status_change_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , str ] , None ] ] # noqa ) – Callback function, called when the status of a step (Task) changes. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. The signature of the function must look the following way:

      def status_change_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      previous_status # type: str
      ):
      pass
    • output_uri (Optional [ Union [ str , bool ] ] ) – The storage / output url for this step. This is the default location for output models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).

  • Return type

    bool

  • Returns

    True if successful


add_tags

add_tags(tags)

Add tags to this pipeline. Old tags are not deleted. When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.

  • Parameters

    tags (Union[Sequence[str], str]) – A list of tags for this pipeline.

  • Return type

    None


PipelineDecorator.component

classmethod component(_func=None, *, return_values=('return_object'), name=None, cache=False, packages=None, parents=None, execution_queue=None, continue_on_fail=False, docker=None, docker_args=None, docker_bash_setup_script=None, task_type=None, auto_connect_frameworks=None, auto_connect_arg_parser=None, repo=None, repo_branch=None, repo_commit=None, helper_functions=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None, retry_on_failure=None, pre_execute_callback=None, post_execute_callback=None, status_change_callback=None, tags=None, output_uri=None, draft=False, working_dir=None)

pipeline component function to be executed remotely

  • Parameters

    • _func – wrapper function

    • return_values (Union [ str , Sequence [ str ] ] ) – Provide a list of names for all the results. Notice! If not provided, no results will be stored as artifacts.

    • name (Optional [ str ] ) – Optional, set the name of the pipeline component task. If not provided, the wrapped function name is used as the pipeline component name

    • cache (bool ) – If True, before launching the new step, after updating with the latest configuration, check if an exact Task with the same parameter/code was already executed. If it was found, use it instead of launching a new Task. Default: False

    • packages (Optional [ Union [ str , Sequence [ str ] ] ] ) – Manually specify a list of required packages or a local requirements.txt file. Example: [“tqdm>=2.1”, “scikit-learn”] or “./requirements.txt” If not provided, packages are automatically added based on the imports used inside the wrapped function.

    • parents (Optional [ List [ str ] ] ) – Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully.

    • execution_queue (Optional [ str ] ) – Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the pipeline’s default execution queue

    • continue_on_fail (bool ) – (default False). If True, a failed step will not cause the pipeline to stop (or marked as failed). Notice, that steps that are connected (or indirectly connected) to the failed step will be skipped.

    • docker (Optional [ str ] ) – Specify the docker image to be used when executing the pipeline step remotely

    • docker_args (Optional [ str ] ) – Add docker execution arguments for the remote execution (use single string for all docker arguments).

    • docker_bash_setup_script (Optional [ str ] ) – Add a bash script to be executed inside the docker before setting up the Task’s environment

    • task_type (Optional [ str ] ) – Optional, The task type to be created. Supported values: ‘training’, ‘testing’, ‘inference’, ‘data_processing’, ‘application’, ‘monitor’, ‘controller’, ‘optimizer’, ‘service’, ‘qc’, ‘custom’

    • auto_connect_frameworks (Optional [ dict ] ) – Control the frameworks auto connect, see Task.init auto_connect_frameworks

    • auto_connect_arg_parser (Optional [ dict ] ) – Control the ArgParser auto connect, see Task.init auto_connect_arg_parser

    • repo (Optional [ str ] ) – Optional, specify a repository to attach to the function, when remotely executing. Allow users to execute the function inside the specified repository, enabling them to load modules/script from the repository. Notice the execution work directory will be the repository root folder. Supports both git repo url link, and local repository path (automatically converted into the remote git/commit as is currently checkout). Example remote url: ‘https://github.com/user/repo.git’ Example local repo copy: ‘./repo’ -> will automatically store the remote repo url and commit ID based on the locally cloned copy

    • repo_branch (Optional [ str ] ) – Optional, specify the remote repository branch (Ignored, if local repo path is used)

    • repo_commit (Optional [ str ] ) – Optional, specify the repository commit ID (Ignored, if local repo path is used)

    • helper_functions (Optional [ Sequence [ Callable ] ] ) – Optional, a list of helper functions to make available for the standalone pipeline step function Task. By default the pipeline step function has no access to any of the other functions, by specifying additional functions here, the remote pipeline step could call the additional functions. Example, assuming we have two functions parse_data(), and load_data(): [parse_data, load_data]

    • monitor_metrics (Optional [ List [ Union [ Tuple [ str , str ] , Tuple [ ( str , str ) , ( str , str ) ] ] ] ] ) – Optional, Automatically log the step’s reported metrics also on the pipeline Task. The expected format is a list of pairs metric (title, series) to log:

      [(step_metric_title, step_metric_series), ] Example: [(‘test’, ‘accuracy’), ]

      Or a list of tuple pairs, to specify a different target metric to use on the pipeline Task:

      [((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ]
      Example: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]
    • monitor_artifacts (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, Automatically log the step’s artifacts on the pipeline Task. Provided a list of artifact names created by the step function, these artifacts will be logged automatically also on the Pipeline Task itself. Example: [‘processed_data’, ] (target artifact name on the Pipeline Task will hav ethe same name as the original artifact) Alternatively, provide a list of pairs (source_artifact_name, target_artifact_name): where the first string is the artifact name as it appears on the component Task, and the second is the target artifact name to put on the Pipeline Task Example: [(‘processed_data’, ‘final_processed_data’), ]

    • monitor_models (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, Automatically log the step’s output models on the pipeline Task. Provided a list of model names created by the step’s Task, they will also appear on the Pipeline itself. Example: [‘model_weights’, ] To select the latest (lexicographic) model use “model*”, or the last created model with just “*” Example: [‘model_weights*’, ] Alternatively, provide a list of pairs (source_model_name, target_model_name): where the first string is the model name as it appears on the component Task, and the second is the target model name to put on the Pipeline Task Example: [(‘model_weights’, ‘final_model_weights’), ]

    • retry_on_failure (Optional [ Union [ int , Callable [ [ PipelineController , PipelineController.Node , int ] , bool ] ] ] # noqa ) – Integer (number of retries) or Callback function that returns True to allow a retry

      - Integer: In case of node failure, retry the node the number of times indicated by this parameter.
      - Callable: A function called on node failure. Takes as parameters:

      > the PipelineController instance, the PipelineController.Node that failed and an int
      > representing the number of previous retries for the node that failed
      > The function must return a bool: True if the node should be retried and False otherwise.
      > If True, the node will be re-queued and the number of retries left will be decremented by 1.
      > By default, if this callback is not specified, the function will be retried the number of
      > times indicated by retry_on_failure.
      def example_retry_on_failure_callback(pipeline, node, retries):
      print(node.name, ' failed')
      # allow up to 5 retries (total of 6 runs)
      return retries &lt; 5
    • pre_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , dict ] , bool ] ] # noqa ) – Callback function, called when the step (Task) is created,

      and before it is sent for execution. Allows a user to modify the Task before launch.

      Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.

      If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.

      Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.

      def step_created_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      parameters, # type: dict
      ):
      pass
    • post_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node ] , None ] ] # noqa ) – Callback function, called when a step (Task) is completed and other jobs are going to be executed. Allows a user to modify the Task status after completion.

      def step_completed_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      ):
      pass
    • status_change_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , str ] , None ] ] # noqa ) – Callback function, called when the status of a step (Task) changes. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. The signature of the function must look the following way:

      def status_change_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      previous_status # type: str
      ):
      pass
    • tags (Optional [ Union [ str , Sequence [ str ] ] ] ) – A list of tags for the specific pipeline step. When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.

    • output_uri (Optional [ Union [ str , bool ] ] ) – The storage / output url for this step. This is the default location for output models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).

    • draft (Optional [ bool ] ) – (default False). If True, the Task will be created as a draft task.

    • working_dir (Optional [ str ] ) – Working directory to launch the step from.

  • Return type

    Callable

  • Returns

    function wrapper


connect_configuration

connect_configuration(configuration, name=None, description=None)

Connect a configuration dictionary or configuration file (pathlib.Path / str) to the PipelineController object. This method should be called before reading the configuration file.

For example, a local file:

config_file = pipe.connect_configuration(config_file)
my_params = json.load(open(config_file,'rt'))

A parameter dictionary/list:

my_params = pipe.connect_configuration(my_params)
  • Parameters

    • configuration (Union[Mapping, list, Path, str]) – The configuration. This is usually the configuration used in the model training process.

      Specify one of the following:

      • A dictionary/list - A dictionary containing the configuration. ClearML stores the configuration in

        the ClearML Server (backend), in a HOCON format (JSON-like format) which is editable.

      • A pathlib2.Path string - A path to the configuration file. ClearML stores the content of the file.

        A local path must be relative path. When executing a pipeline remotely in a worker, the contents brought from the ClearML Server (backend) overwrites the contents of the file.

    • name (str ) – Configuration section name. default: ‘General’ Allowing users to store multiple configuration dicts/files

    • description (str ) – Configuration section description (text). default: None

  • Return type

    Union[dict, Path, str]

  • Returns

    If a dictionary is specified, then a dictionary is returned. If pathlib2.Path / string is specified, then a path to a local configuration file is returned. Configuration object.


create_draft

create_draft()

Optional, manually create & serialize the Pipeline Task (use with care for manual multi pipeline creation).

Notice The recommended flow would be to call pipeline.start(queue=None) which would have a similar effect and will allow you to clone/enqueue later on.

After calling Pipeline.create(), users can edit the pipeline in the UI and enqueue it for execution.

Notice: this function should be used to programmatically create pipeline for later usage. To automatically create and launch pipelines, call the start() method.

  • Return type

    None


PipelineDecorator.debug_pipeline

classmethod debug_pipeline()

Set debugging mode, run all functions locally as functions (serially) Run the full pipeline DAG locally, where steps are executed as functions Notice:

running the DAG locally assumes the local code execution (i.e. it will not clone & apply git diff) Pipeline steps are executed as functions (no Task will be created), fo ease debugging J

  • Return type

    ()


elapsed

elapsed()

Return minutes elapsed from controller stating time stamp.

  • Return type

    float

  • Returns

    The minutes from controller start time. A negative value means the process has not started yet.


PipelineDecorator.enqueue

classmethod enqueue(pipeline_controller, queue_name=None, queue_id=None, force=False)

Enqueue a PipelineController for execution, by adding it to an execution queue.

info

A worker daemon must be listening at the queue for the worker to fetch the Task and execute it, see ClearML Agent in the ClearML Documentation.

  • Parameters

    • pipeline_controller (Union[PipelineController, str]) – The PipelineController to enqueue. Specify a PipelineController object or PipelineController ID

    • queue_name (Optional[str]) – The name of the queue. If not specified, then queue_id must be specified.

    • queue_id (Optional[str]) – The ID of the queue. If not specified, then queue_name must be specified.

    • force (bool ) – If True, reset the PipelineController if necessary before enqueuing it

  • Return type

    Any

  • Returns

    An enqueue JSON response.

    {
    "queued": 1,
    "updated": 1,
    "fields": {
    "status": "queued",
    "status_reason": "",
    "status_message": "",
    "status_changed": "2020-02-24T15:05:35.426770+00:00",
    "last_update": "2020-02-24T15:05:35.426770+00:00",
    "execution.queue": "2bd96ab2d9e54b578cc2fb195e52c7cf"
    }
    }
    • queued - The number of Tasks enqueued (an integer or null).

    • updated - The number of Tasks updated (an integer or null).

    • fields

      • status - The status of the experiment.

      • status_reason - The reason for the last status change.

      • status_message - Information about the status.

      • status_changed - The last status change date and time (ISO 8601 format).

      • last_update - The last Task update time, including Task creation, update, change, or events for this task (ISO 8601 format).

      • execution.queue - The ID of the queue where the Task is enqueued. null indicates not enqueued.


PipelineDecorator.get

classmethod get(pipeline_id=None, pipeline_project=None, pipeline_name=None, pipeline_version=None, pipeline_tags=None, shallow_search=False)

Get a specific PipelineController. If multiple pipeline controllers are found, the pipeline controller with the highest semantic version is returned. If no semantic version is found, the most recently updated pipeline controller is returned. This function raises aan Exception if no pipeline controller was found

Note: In order to run the pipeline controller returned by this function, use PipelineController.enqueue

  • Parameters

    • pipeline_id (Optional[str]) – Requested PipelineController ID

    • pipeline_project (Optional[str]) – Requested PipelineController project

    • pipeline_name (Optional[str]) – Requested PipelineController name

    • pipeline_tags (Optional[Sequence[str]]) – Requested PipelineController tags (list of tag strings)

    • shallow_search (bool) – If True, search only the first 500 results (first page)

    • pipeline_version (Optional [ str ] ) –

  • Return type

    ForwardRef


PipelineDecorator.get_current_pipeline

classmethod get_current_pipeline()

Return the currently running pipeline instance

  • Return type

    ForwardRef


PipelineDecorator.get_logger

classmethod get_logger()

Return a logger connected to the Pipeline Task. The logger can be used by any function/tasks executed by the pipeline, in order to report directly to the pipeline Task itself. It can also be called from the main pipeline control Task.

Raise ValueError if main Pipeline task could not be located.

  • Return type

    Logger

  • Returns

    Logger object for reporting metrics (scalars, plots, debug samples etc.)


get_parameters

get_parameters()

Return the pipeline parameters dictionary :rtype: dict :return: Dictionary str -> str

  • Return type

    dict


get_pipeline_dag

get_pipeline_dag()

Return the pipeline execution graph, each node in the DAG is PipelineController.Node object. Graph itself is a dictionary of Nodes (key based on the Node name), each node holds links to its parent Nodes (identified by their unique names)

  • Return type

    Mapping[str, Node]

  • Returns

    execution tree, as a nested dictionary. Example:

{
'stage1' : Node() {
name: 'stage1'
job: ClearmlJob
...
},
}

get_processed_nodes

get_processed_nodes()

Return a list of the processed pipeline nodes, each entry in the list is PipelineController.Node object.

  • Return type

    Sequence[Node]

  • Returns

    executed (excluding currently executing) nodes list


get_running_nodes

get_running_nodes()

Return a list of the currently running pipeline nodes, each entry in the list is PipelineController.Node object.

  • Return type

    Sequence[Node]

  • Returns

    Currently running nodes list


is_running

is_running()

return True if the pipeline controller is running.

  • Return type

    bool

  • Returns

    A boolean indicating whether the pipeline controller is active (still running) or stopped.


is_successful

is_successful(fail_on_step_fail=True, fail_condition='all')

Evaluate whether the pipeline is successful.

  • Parameters

    • fail_on_step_fail (bool) – If True (default), evaluate the pipeline steps’ status to assess if the pipeline is successful. If False, only evaluate the controller

    • fail_condition (str) – Must be one of the following: ‘all’ (default), ‘failed’ or ‘aborted’. If ‘failed’, this function will return False if the pipeline failed and True if the pipeline was aborted. If ‘aborted’, this function will return False if the pipeline was aborted and True if the pipeline failed. If ‘all’, this function will return False in both cases.

  • Return type

    bool

  • Returns

    A boolean indicating whether the pipeline was successful or not. Note that if the pipeline is in a running/pending state, this function will return False


PipelineDecorator.pipeline

classmethod pipeline(_func=None, *, name, project, version=None, return_value=None, default_queue=None, pool_frequency=0.2, add_pipeline_tags=False, target_project=None, abort_on_failure=False, pipeline_execution_queue='services', multi_instance_support=False, add_run_number=True, args_map=None, start_controller_locally=False, retry_on_failure=None, docker=None, docker_args=None, docker_bash_setup_script=None, packages=None, repo=None, repo_branch=None, repo_commit=None, artifact_serialization_function=None, artifact_deserialization_function=None, output_uri=None, skip_global_imports=False, working_dir=None)

Decorate pipeline logic function.

  • Parameters

    • name (str ) – Provide pipeline name (if main Task exists it overrides its name)

    • project (str ) – Provide project storing the pipeline (if main Task exists it overrides its project)

    • version (Optional [ str ] ) – Pipeline version. This version allows to uniquely identify the pipeline template execution. Examples for semantic versions: version=’1.0.1’ , version=’23’, version=’1.2’. If not set, find the latest version of the pipeline and increment it. If no such version is found, default to ‘1.0.0’

    • return_value (Optional [ str ] ) – Optional, Provide an artifact name to store the pipeline function return object Notice, If not provided the pipeline will not store the pipeline function return value.

    • default_queue (Optional [ str ] ) – default pipeline step queue

    • pool_frequency (float ) – The pooling frequency (in minutes) for monitoring experiments / states.

    • add_pipeline_tags (bool ) – (default: False) if True, add pipe: <pipeline_task_id> tag to all steps (Tasks) created by this pipeline.

    • target_project (str ) – If provided, all pipeline steps are cloned into the target project

    • abort_on_failure (bool ) – If False (default), failed pipeline steps will not cause the pipeline to stop immediately, instead any step that is not connected (or indirectly connected) to the failed step, will still be executed. Nonetheless, the pipeline itself will be marked failed, unless the failed step was specifically defined with “continue_on_fail=True”. If True, any failed step will cause the pipeline to immediately abort, stop all running steps, and mark the pipeline as failed.

    • pipeline_execution_queue (Optional [ str ] ) – remote pipeline execution queue (default ‘services’ queue). If None is passed, execute the pipeline logic locally (pipeline steps are still executed remotely)

    • multi_instance_support (bool ) – If True, allow multiple calls to the same pipeline function, each call creating a new Pipeline Task. Notice it is recommended to create an additional Task on the “main process” acting as a master pipeline, automatically collecting the execution plots. If multi_instance_support==’parallel’ then the pipeline calls are executed in parallel, in the parallel case the function calls return None, to collect all pipeline results call PipelineDecorator.wait_for_multi_pipelines(). Default False, no multi instance pipeline support.

    • add_run_number (bool ) – If True (default), add the run number of the pipeline to the pipeline name. Example, the second time we launch the pipeline “best pipeline”, we rename it to “best pipeline #2”

    • args_map (dict [ str , List [ str ] ] ) – Map arguments to their specific configuration section. Arguments not included in this map

      will default to Args section. For example, for the following code:

      @PipelineDecorator.pipeline(args_map={'sectionA':['paramA'], 'sectionB:['paramB','paramC']
      def executing_pipeline(paramA, paramB, paramC, paramD):
      pass

      Parameters would be stored as:

      • paramA: sectionA/paramA

      • paramB: sectionB/paramB

      • paramC: sectionB/paramC

      • paramD: Args/paramD

    • start_controller_locally (bool ) – If True, start the controller on the local machine. The steps will run remotely if PipelineDecorator.run_locally or PipelineDecorator.debug_pipeline are not called. Default: False

    • retry_on_failure (Optional [ Union [ int , Callable [ [ PipelineController , PipelineController.Node , int ] , bool ] ] ] # noqa ) – Integer (number of retries) or Callback function that returns True to allow a retry

      • Integer: In case of node failure, retry the node the number of times indicated by this parameter.

      • Callable: A function called on node failure. Takes as parameters:

        the PipelineController instance, the PipelineController.Node that failed and an int representing the number of previous retries for the node that failed. The function must return True if the node should be retried and False otherwise. If True, the node will be re-queued and the number of retries left will be decremented by 1. By default, if this callback is not specified, the function will be retried the number of times indicated by retry_on_failure.

        def example_retry_on_failure_callback(pipeline, node, retries):
        print(node.name, ' failed')
        # allow up to 5 retries (total of 6 runs)
        return retries &lt; 5
    • docker (Optional [ str ] ) – Select the docker image to be executed in by the remote session

    • docker_args (Optional [ str ] ) – Add docker arguments, pass a single string

    • docker_bash_setup_script (Optional [ str ] ) – Add bash script to be executed inside the docker before setting up the Task’s environment

    • packages (Optional [ Union [ str , Sequence [ str ] ] ] ) – Manually specify a list of required packages or a local requirements.txt file. Example: [“tqdm>=2.1”, “scikit-learn”] or “./requirements.txt” If not provided, packages are automatically added based on the imports used in the function.

    • repo (Optional [ str ] ) – Optional, specify a repository to attach to the function, when remotely executing. Allow users to execute the function inside the specified repository, enabling them to load modules/script from the repository. Notice the execution work directory will be the repository root folder. Supports both git repo url link, and local repository path (automatically converted into the remote git/commit as is currently checkout). Example remote url: ‘https://github.com/user/repo.git’ Example local repo copy: ‘./repo’ -> will automatically store the remote repo url and commit ID based on the locally cloned copy Use empty string (“”) to disable any repository auto-detection

    • repo_branch (Optional [ str ] ) – Optional, specify the remote repository branch (Ignored, if local repo path is used)

    • repo_commit (Optional [ str ] ) – Optional, specify the repository commit ID (Ignored, if local repo path is used)

    • artifact_serialization_function (Optional [ Callable [ [ Any ] , Union [ bytes , bytearray ] ] ] ) – A serialization function that takes one parameter of any type which is the object to be serialized. The function should return a bytes or bytearray object, which represents the serialized object. All parameter/return artifacts uploaded by the pipeline will be serialized using this function. All relevant imports must be done in this function. For example:

      def serialize(obj):
      import dill
      return dill.dumps(obj)
    • artifact_deserialization_function (Optional [ Callable [ [ bytes ] , Any ] ] ) – A deserialization function that takes one parameter of type bytes, which represents the serialized object. This function should return the deserialized object. All parameter/return artifacts fetched by the pipeline will be deserialized using this function. All relevant imports must be done in this function. For example:

      def deserialize(bytes_):
      import dill
      return dill.loads(bytes_)
    • output_uri (Optional [ Union [ str , bool ] ] ) – The storage / output url for this pipeline. This is the default location for output models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter). The output_uri of this pipeline’s steps will default to this value.

    • skip_global_imports (bool ) – If True, global imports will not be included in the steps’ execution, otherwise all global imports will be automatically imported in a safe manner at the beginning of each step’s execution. Default is False

    • working_dir (Optional [ str ] ) – Working directory to launch the pipeline from.

  • Return type

    Callable


PipelineDecorator.run_locally

classmethod run_locally()

Set local mode, run all functions locally as subprocess

Run the full pipeline DAG locally, where steps are executed as sub-processes Tasks Notice: running the DAG locally assumes the local code execution (i.e. it will not clone & apply git diff)

  • Return type

    ()


PipelineDecorator.set_default_execution_queue

classmethod set_default_execution_queue(default_execution_queue)

Set the default execution queue if pipeline step does not specify an execution queue

  • Parameters

    default_execution_queue (Optional[str]) – The execution queue to use if no execution queue is provided

  • Return type

    None


set_pipeline_execution_time_limit

set_pipeline_execution_time_limit(max_execution_minutes)

Set maximum execution time (minutes) for the entire pipeline. Pass None or 0 to disable execution time limit.

  • Parameters

    max_execution_minutes (float ) – The maximum time (minutes) for the entire pipeline process. The default is None, indicating no time limit.

  • Return type

    None


start

start(queue='services', step_task_created_callback=None, step_task_completed_callback=None, wait=True)

Start the current pipeline remotely (on the selected services queue). The current process will be stopped and launched remotely.

  • Parameters

    • queue – queue name to launch the pipeline on

    • step_task_created_callback (Callable ) – Callback function, called when a step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.

      If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.

      Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.

      def step_created_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      parameters, # type: dict
      ):
      pass
    • step_task_completed_callback (Callable ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.

      def step_completed_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      ):
      pass
    • wait – If True (default), start the pipeline controller, return only after the pipeline is done (completed/aborted/failed)

  • Return type

    bool

  • Returns

    True, if the controller started. False, if the controller did not start.


start_locally

start_locally(run_pipeline_steps_locally=False)

Start the current pipeline locally, meaning the pipeline logic is running on the current machine, instead of on the services queue.

Using run_pipeline_steps_locally=True you can run all the pipeline steps locally as sub-processes. Notice: when running pipeline steps locally, it assumes local code execution (i.e. it is running the local code as is, regardless of the git commit/diff on the pipeline steps Task)

  • Parameters

    run_pipeline_steps_locally (bool) – (default False) If True, run the pipeline steps themselves locally as a subprocess (use for debugging the pipeline locally, notice the pipeline code is expected to be available on the local machine)

  • Return type

    None


stop

stop(timeout=None, mark_failed=False, mark_aborted=False)

Stop the pipeline controller and the optimization thread. If mark_failed and mark_aborted are False (default) mark the pipeline as completed, unless one of the steps failed, then mark the pipeline as failed.

  • Parameters

    • timeout (Optional [ float ] ) – Wait timeout for the optimization thread to exit (minutes). The default is None, indicating do not wait to terminate immediately.

    • mark_failed (bool ) – If True, mark the pipeline task as failed. (default False)

    • mark_aborted (bool ) – If True, mark the pipeline task as aborted. (default False)

  • Return type

    ()


update_execution_plot

update_execution_plot()

Update sankey diagram of the current pipeline

  • Return type

    ()


PipelineDecorator.upload_artifact

classmethod upload_artifact(name, artifact_object, metadata=None, delete_after_upload=False, auto_pickle=True, preview=None, wait_on_upload=False, serialization_function=None)

Upload (add) an artifact to the main Pipeline Task object. This function can be called from any pipeline component to directly add artifacts into the main pipeline Task.

The artifact can be uploaded by any function/tasks executed by the pipeline, in order to report directly to the pipeline Task itself. It can also be called from the main pipeline control Task.

Raise ValueError if main Pipeline task could not be located.

The currently supported upload artifact types include:

  • string / Path - A path to artifact file. If a wildcard or a folder is specified, then ClearML creates and uploads a ZIP file.

  • dict - ClearML stores a dictionary as .json file and uploads it.

  • pandas.DataFrame - ClearML stores a pandas.DataFrame as .csv.gz (compressed CSV) file and uploads it.

  • numpy.ndarray - ClearML stores a numpy.ndarray as .npz file and uploads it.

  • PIL.Image - ClearML stores a PIL.Image as .png file and uploads it.

  • Any - If called with auto_pickle=True, the object will be pickled and uploaded.

  • Parameters

    • name (str ) – The artifact name.

      danger

      If an artifact with the same name was previously uploaded, then it is overwritten.

    • artifact_object (object ) – The artifact object.

    • metadata (dict ) – A dictionary of key-value pairs for any metadata. This dictionary appears with the experiment in the ClearML Web-App (UI), ARTIFACTS tab.

    • delete_after_upload (bool ) – After the upload, delete the local copy of the artifact

      • True - Delete the local copy of the artifact.

      • False - Do not delete. (default)

    • auto_pickle (bool ) – If True (default), and the artifact_object is not one of the following types: pathlib2.Path, dict, pandas.DataFrame, numpy.ndarray, PIL.Image, url (string), local_file (string) the artifact_object will be pickled and uploaded as pickle file artifact (with file extension .pkl)

    • preview (Any ) – The artifact preview

    • wait_on_upload (bool ) – Whether the upload should be synchronous, forcing the upload to complete before continuing.

    • Union [ bytes , bytearray ] ] serialization_function (Callable [ Any , ) – A serialization function that takes one parameter of any type which is the object to be serialized. The function should return a bytes or bytearray object, which represents the serialized object. Note that the object will be immediately serialized using this function, thus other serialization methods will not be used (e.g. pandas.DataFrame.to_csv), even if possible. To deserialize this artifact when getting it using the Artifact.get method, use its deserialization_function argument.

    • serialization_function (Optional [ Callable [ [ Any ] , Union [ bytes , bytearray ] ] ] ) –

  • Return type

    bool

  • Returns

    The status of the upload.

  • True - Upload succeeded.

  • False - Upload failed.

  • Raise

    If the artifact object type is not supported, raise a ValueError.

  • Parameters

    • name (str ) –

    • artifact_object (Any ) –

    • metadata (Optional [ Mapping ] ) –

    • delete_after_upload (bool ) –

    • auto_pickle (bool ) –

    • preview (Optional [ Any ] ) –

    • wait_on_upload (bool ) –

    • serialization_function (Optional [ Callable [ [ Any ] , Union [ bytes , bytearray ] ] ] ) –

  • Return type

    bool


PipelineDecorator.upload_model

classmethod upload_model(model_name, model_local_path, upload_uri=None)

Upload (add) a model to the main Pipeline Task object. This function can be called from any pipeline component to directly add models into the main pipeline Task

The model file/path will be uploaded to the Pipeline Task and registered on the model repository.

Raise ValueError if main Pipeline task could not be located.

  • Parameters

    • model_name (str) – Model name as will appear in the model registry (in the pipeline’s project)

    • model_local_path (str) – Path to the local model file or directory to be uploaded. If a local directory is provided the content of the folder (recursively) will be packaged into a zip file and uploaded

    • upload_uri (Optional[str]) – The URI of the storage destination for model weights upload. The default value is the previously used URI.

  • Return type

    OutputModel

  • Returns

    The uploaded OutputModel


wait

wait(timeout=None)

Wait for the pipeline to finish.

info

This method does not stop the pipeline. Call stop to terminate the pipeline.

  • Parameters

    timeout (float ) – The timeout to wait for the pipeline to complete (minutes). If None, then wait until we reached the timeout, or pipeline completed.

  • Return type

    bool

  • Returns

    True, if the pipeline finished. False, if the pipeline timed out.


PipelineDecorator.wait_for_multi_pipelines

classmethod wait_for_multi_pipelines()

Wait until all background multi pipeline execution is completed. Returns all the pipeline results in call order (first pipeline call at index 0)

  • Returns

    List of return values from executed pipeline, based on call order.