Skip to main content

PipelineController

class PipelineController()#

Pipeline controller. Pipeline is a DAG of base tasks, each task will be cloned (arguments changed as required) executed and monitored The pipeline process (task) itself can be executed manually or by the clearml-agent services queue. Notice: The pipeline controller lives as long as the pipeline itself is being executed.

Create a new pipeline controller. The newly created object will launch and monitor the new experiments.

  • Parameters

    • name (str) – Provide pipeline name (if main Task exists it overrides its name)

    • project (str) – Provide project storing the pipeline (if main Task exists it overrides its project)

    • version (str) – Must provide pipeline version. This version allows to uniquely identify the pipeline template execution. Examples for semantic versions: version=’1.0.1’ , version=’23’, version=’1.2’

    • pool_frequency (float ) – The pooling frequency (in minutes) for monitoring experiments / states.

    • add_pipeline_tags (bool ) – (default: False) if True, add pipe: <pipeline_task_id> tag to all steps (Tasks) created by this pipeline.

    • target_project (str ) – If provided, all pipeline steps are cloned into the target project

    • auto_version_bump (bool ) – If True (default), if the same pipeline version already exists (with any difference from the current one), the current pipeline version will be bumped to a new version version bump examples: 1.0.0 -> 1.0.1 , 1.2 -> 1.3, 10 -> 11 etc.

    • abort_on_failure (bool ) – If False (default), failed pipeline steps will not cause the pipeline to stop immediately, instead any step that is not connected (or indeirectly connected) to the failed step, will still be executed. Nonetheless the pipeline itself will be marked failed, unless the failed step was specifically defined with “continue_on_fail=True”. If True, any failed step will cause the pipeline to immediately abort, stop all running steps, and mark the pipeline as failed.


add_function_step#

add_function_step(name, function, function_kwargs=None, function_return=None, project_name=None, task_name=None, task_type=None, packages=None, repo=None, repo_branch=None, repo_commit=None, helper_functions=None, docker=None, docker_args=None, docker_bash_setup_script=None, parents=None, execution_queue=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None, time_limit=None, continue_on_fail=False, pre_execute_callback=None, post_execute_callback=None, cache_executed_step=False)

Create a Task from a function, including wrapping the function input arguments into the hyper-parameter section as kwargs, and storing function results as named artifacts

Example:

def mock_func(a=6, b=9):
c = a\*b
print(a, b, c)
return c, c\*\*2
create_task_from_function(mock_func, function_return=[‘mul’, ‘square’])

Example arguments from other Tasks (artifact):

def mock_func(matrix_np):
c = matrix_np\*matrix_np
print(matrix_np, c)
return c
create_task_from_function(
mock_func,
function_input_artifacts={‘matrix_np’: ‘aabb1122.previous_matrix’},
function_return=[‘square_matrix’]
)
  • Parameters

    • name (str ) – Unique of the step. For example stage1

    • function (Callable ) – A global function to convert into a standalone Task

    • function_kwargs (Optional [ Dict [ str , Any ] ] ) – Optional, provide subset of function arguments and default values to expose. If not provided automatically take all function arguments & defaults Optional, pass input arguments to the function from other Tasks’s output artifact. Example argument named numpy_matrix from Task ID aabbcc artifact name answer: {‘numpy_matrix’: ‘aabbcc.answer’}

    • function_return (Optional [ List [ str ] ] ) – Provide a list of names for all the results. If not provided no results will be stored as artifacts.

    • project_name (Optional [ str ] ) – Set the project name for the task. Required if base_task_id is None.

    • task_name (Optional [ str ] ) – Set the name of the remote task. Required if base_task_id is None.

    • task_type (Optional [ str ] ) – Optional, The task type to be created. Supported values: ‘training’, ‘testing’, ‘inference’, ‘data_processing’, ‘application’, ‘monitor’, ‘controller’, ‘optimizer’, ‘service’, ‘qc’, ‘custom’

    • packages (Optional [ Union [ str , Sequence [ str ] ] ] ) – Manually specify a list of required packages or a local requirements.txt file. Example: [“tqdm>=2.1”, “scikit-learn”] or “./requirements.txt” If not provided, packages are automatically added based on the imports used in the function.

    • repo (Optional [ str ] ) – Optional, specify a repository to attach to the function, when remotely executing. Allow users to execute the function inside the specified repository, enabling to load modules/script from a repository Notice the execution work directory will be the repository root folder. Supports both git repo url link, and local repository path. Example remote url: ‘https://github.com/user/repo.git’ Example local repo copy: ‘./repo’ -> will automatically store the remote repo url and commit ID based on the locally cloned copy

    • repo_branch (Optional [ str ] ) – Optional, specify the remote repository branch (Ignored, if local repo path is used)

    • repo_commit (Optional [ str ] ) – Optional, specify the repository commit id (Ignored, if local repo path is used)

    • helper_functions (Optional [ Sequence [ Callable ] ] ) – Optional, a list of helper functions to make available for the standalone function Task.

    • docker (Optional [ str ] ) – Select the docker image to be executed in by the remote session

    • docker_args (Optional [ str ] ) – Add docker arguments, pass a single string

    • docker_bash_setup_script (Optional [ str ] ) – Add bash script to be executed inside the docker before setting up the Task’s environment

    • parents (Optional [ Sequence [ str ] ] , ) – Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully.

    • execution_queue (Optional [ str ] ) – Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the default execution queue, as defined on the class

    • monitor_metrics (Optional [ List [ Union [ Tuple [ str , str ] , Tuple [ ( str , str ) , ( str , str ) ] ] ] ] ) – Optional, log the step’s metrics on the pipeline Task. Format is a list of pairs metric (title, series) to log:

      [(step_metric_title, step_metric_series), ] Example: [(‘test’, ‘accuracy’), ]

      Or a list of tuple pairs, to specify a different target metric for to use on the pipeline Task:

      [((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ]
      Example: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]
    • monitor_artifacts (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s artifacts on the pipeline Task. Provided a list of artifact names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘processed_data’, ‘final_processed_data’), ] Alternatively user can also provide a list of artifacts to monitor (target artifact name will be the same as original artifact name) Example: [‘processed_data’, ]

    • monitor_models (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s output models on the pipeline Task. Provided a list of model names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘model_weights’, ‘final_model_weights’), ] Alternatively user can also provide a list of models to monitor (target models name will be the same as original model) Example: [‘model_weights’, ] To select the latest (lexicographic) model use “model*”, or the last created model with just “*” Example: [‘model_weights*’, ]

    • time_limit (Optional [ float ] ) – Default None, no time limit. Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.

    • continue_on_fail (bool ) – (default False). If True, failed step will not cause the pipeline to stop (or marked as failed). Notice, that steps that are connected (or indirectly connected) to the failed step will be skipped.

    • pre_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , dict ] , bool ] ] # noqa ) – Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.

      If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.

      Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.

      def step_created_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      parameters, # type: dict
      ):
      pass
    • post_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node ] , None ] ] # noqa ) – Callback function, called when a step (Task) is completed and it other jobs are executed. Allows a user to modify the Task status after completion.

      def step_completed_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      ):
      pass
    • cache_executed_step (bool ) – If True, before launching the new step, after updating with the latest configuration, check if an exact Task with the same parameter/code was already executed. If it was found, use it instead of launching a new Task. Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used.

  • Return type

    bool

  • Returns

    True if successful


add_parameter#

add_parameter(name, default=None, description=None)

Add a parameter to the pipeline Task. The parameter can be used as input parameter for any step in the pipeline. Notice all parameters will appear under the PipelineController Task’s Hyper-parameters -> Pipeline section Example: pipeline.add_parameter(name=’dataset’, description=’dataset ID to process the pipeline’) Then in one of the steps we can refer to the value of the parameter with ‘${pipeline.dataset}’

  • Parameters

    • name (str) – String name of the parameter.

    • default (Optional[Any]) – Default value to be put as the default value (can be later changed in the UI)

    • description (Optional[str]) – String description of the parameter and its usage in the pipeline

  • Return type

    None


add_step#

add_step(name, base_task_id=None, parents=None, parameter_override=None, configuration_overrides=None, task_overrides=None, execution_queue=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None, time_limit=None, base_task_project=None, base_task_name=None, clone_base_task=True, continue_on_fail=False, pre_execute_callback=None, post_execute_callback=None, cache_executed_step=False, base_task_factory=None)

Add a step to the pipeline execution DAG. Each step must have a unique name (this name will later be used to address the step)

  • Parameters

    • name (str ) – Unique of the step. For example stage1

    • base_task_id (Optional [ str ] ) – The Task ID to use for the step. Each time the step is executed, the base Task is cloned, then the cloned task will be sent for execution.

    • parents (Optional [ Sequence [ str ] ] ) – Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully.

    • parameter_override (Optional [ Mapping [ str , Any ] ] ) – Optional parameter overriding dictionary. The dict values can reference a previously executed step using the following form ‘${step_name}’ Examples:

      - Artifact access

      parameter_override={‘Args/input_file’: ‘${<step_name>.artifacts.<artifact_name>.url}’ }

      * Model access (last model used)
      parameter_override={‘Args/input_file’: ‘${&lt;step_name&gt;.models.output.-1.url}’ }
      * Parameter access
      parameter_override={‘Args/input_file’: ‘${&lt;step_name&gt;.parameters.Args/input_file}’ }
      * Pipeline Task argument (see Pipeline.add_parameter)
      parameter_override={‘Args/input_file’: ‘${pipeline.&lt;pipeline_parameter&gt;}’ }
      * Task ID
      parameter_override={‘Args/input_file’: ‘${stage3.id}’ }
    • configuration_overrides (Optional [ Mapping [ str , Union [ str , Mapping ] ] ] ) – Optional, override Task configuration objects. Expected dictionary of configuration object name and configuration object content. Examples:

      {‘General’: dict(key=’value’)} {‘General’: ‘configuration file content’} {‘OmegaConf’: YAML.dumps(full_hydra_dict)}

    • task_overrides (Optional [ Mapping [ str , Any ] ] ) – Optional task section overriding dictionary. The dict values can reference a previously executed step using the following form ‘${step_name}’ Examples:

      - get the latest commit from a specific branch

      task_overrides={‘script.version_num’: ‘’, ‘script.branch’: ‘main’}

      * match git repository branch to a previous step
      task_overrides={‘script.branch’: ‘${stage1.script.branch}’, ‘script.version_num’: ‘’}
      * change container image
      task_overrides={‘container.image’: ‘${stage1.container.image}’}
      * match container image to a previous step
      task_overrides={‘container.image’: ‘${stage1.container.image}’}
    • execution_queue (Optional [ str ] ) – Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the default execution queue, as defined on the class

    • monitor_metrics (Optional [ List [ Union [ Tuple [ str , str ] , Tuple [ ( str , str ) , ( str , str ) ] ] ] ] ) – Optional, log the step’s metrics on the pipeline Task. Format is a list of pairs metric (title, series) to log:

      [(step_metric_title, step_metric_series), ] Example: [(‘test’, ‘accuracy’), ]

      Or a list of tuple pairs, to specify a different target metric for to use on the pipeline Task:

      [((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ]
      Example: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]
    • monitor_artifacts (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s artifacts on the pipeline Task. Provided a list of artifact names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘processed_data’, ‘final_processed_data’), ] Alternatively user can also provide a list of artifacts to monitor (target artifact name will be the same as original artifact name) Example: [‘processed_data’, ]

    • monitor_models (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s output models on the pipeline Task. Provided a list of model names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘model_weights’, ‘final_model_weights’), ] Alternatively user can also provide a list of models to monitor (target models name will be the same as original model) Example: [‘model_weights’, ] To select the latest (lexicographic) model use “model*”, or the last created model with just “*” Example: [‘model_weights*’, ]

    • time_limit (Optional [ float ] ) – Default None, no time limit. Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.

    • base_task_project (Optional [ str ] ) – If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.

    • base_task_name (Optional [ str ] ) – If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.

    • clone_base_task (bool ) – If True (default) the pipeline will clone the base task, and modify/enqueue the cloned Task. If False, the base-task is used directly, notice it has to be in draft-mode (created).

    • continue_on_fail (bool ) – (default False). If True, failed step will not cause the pipeline to stop (or marked as failed). Notice, that steps that are connected (or indirectly connected) to the failed step will be skipped.

    • pre_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , dict ] , bool ] ] # noqa ) – Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.

      If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.

      Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.

      def step_created_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      parameters, # type: dict
      ):
      pass
    • post_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node ] , None ] ] # noqa ) – Callback function, called when a step (Task) is completed and it other jobs are executed. Allows a user to modify the Task status after completion.

      def step_completed_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      ):
      pass
    • cache_executed_step (bool ) – If True, before launching the new step, after updating with the latest configuration, check if an exact Task with the same parameter/code was already executed. If it was found, use it instead of launching a new Task. Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used. If clone_base_task is False there is no cloning, hence the base_task is used.

    • base_task_factory (Optional [ Callable [ [ PipelineController.Node ] , Task ] ] ) – Optional, instead of providing a pre-existing Task, provide a Callable function to create the Task (returns Task object)

  • Return type

    bool

  • Returns

    True if successful


create_draft#

create_draft()

Optional, manually create & serialize the Pipeline Task. After calling Pipeline.create(), users can edit the pipeline in the UI and enqueue it for execution.

Notice: this function should be used to programmatically create pipeline for later usage. To automatically create and launch pipelines, call the start() method.

  • Return type

    None


elapsed#

elapsed()

Return minutes elapsed from controller stating time stamp.

  • Return type

    float

  • Returns

    The minutes from controller start time. A negative value means the process has not started yet.


PipelineController.get_logger#

classmethod get_logger()

Return a logger connected to the Pipeline Task. The logger can be used by any function/tasks executed by the pipeline, in order to report directly to the pipeline Task itself. It can also be called from the main pipeline control Task.

Raise ValueError if main Pipeline task could not be located.

  • Return type

    Logger

  • Returns

    Logger object for reporting metrics (scalars, plots, debug samples etc.)


get_parameters#

get_parameters()

Return the pipeline parameters dictionary :rtype: dict :return: Dictionary str -> str

  • Return type

    dict


get_pipeline_dag#

get_pipeline_dag()

Return the pipeline execution graph, each node in the DAG is PipelineController.Node object. Graph itself is a dictionary of Nodes (key based on the Node name), each node holds links to its parent Nodes (identified by their unique names)

  • Return type

    Mapping[str, Node]

  • Returns

    execution tree, as a nested dictionary. Example:

{
'stage1' : Node() {
name: 'stage1'
job: ClearmlJob
...
},
}

get_processed_nodes#

get_processed_nodes()

Return the a list of the processed pipeline nodes, each entry in the list is PipelineController.Node object.

  • Return type

    Sequence[Node]

  • Returns

    executed (excluding currently executing) nodes list


get_running_nodes#

get_running_nodes()

Return the a list of the currently running pipeline nodes, each entry in the list is PipelineController.Node object.

  • Return type

    Sequence[Node]

  • Returns

    Currently running nodes list


is_running#

is_running()

return True if the pipeline controller is running.

  • Return type

    bool

  • Returns

    A boolean indicating whether the pipeline controller is active (still running) or stopped.


is_successful#

is_successful()

return True if the pipeline controller is fully executed and none of the steps / Tasks failed

  • Return type

    bool

  • Returns

    A boolean indicating whether all steps did not fail


set_default_execution_queue#

set_default_execution_queue(default_execution_queue)

Set the default execution queue for if pipeline step does not specify an execution queue

  • Parameters

    default_execution_queue (Optional[str]) – The execution queue to use if no execution queue is provided

  • Return type

    None


set_pipeline_execution_time_limit#

set_pipeline_execution_time_limit(max_execution_minutes)

Set maximum execution time (minutes) for the entire pipeline. Pass None or 0 to disable execution time limit.

  • Parameters

    max_execution_minutes (float ) – The maximum time (minutes) for the entire pipeline process. The default is None, indicating no time limit.

  • Return type

    None


start#

start(queue='services', step_task_created_callback=None, step_task_completed_callback=None, wait=True)

Start the current pipeline remotely (on the selected services queue) The current process will be stopped if exit_process is True.

  • Parameters

    • queue – queue name to launch the pipeline on

    • step_task_created_callback (Callable ) – Callback function, called when a step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.

      If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.

      Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.

      def step_created_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      parameters, # type: dict
      ):
      pass
    • step_task_completed_callback (Callable ) – Callback function, called when a step (Task) is completed and it other jobs are executed. Allows a user to modify the Task status after completion.

      def step_completed_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      ):
      pass
    • wait – If True (default), start the pipeline controller, return only after the pipeline is done (completed/aborted/failed)

  • Return type

    bool

  • Returns

    True, if the controller started. False, if the controller did not start.


start_locally#

start_locally(run_pipeline_steps_locally=False)

Start the current pipeline locally, meaning the pipeline logic is running on the current machine, instead of on the services queue.

Using run_pipeline_steps_locally=True you can run all the pipeline steps locally as sub-processes. Notice: when running pipeline steps locally, it assumes local code execution (i.e. it is running the local code as is, regardless of the git commit/diff on the pipeline steps Task)

  • Parameters

    run_pipeline_steps_locally (bool) – (default False) If True, run the

  • Return type

    None

pipeline steps themselves locally as a subprocess (use for debugging the pipeline locally, notice the pipeline code is expected to be available on the local machine)

  • Return type

    None

  • Parameters

    run_pipeline_steps_locally (bool ) –


stop#

stop(timeout=None, mark_failed=False, mark_aborted=False)

Stop the pipeline controller and the optimization thread. If mark_failed and mark_aborted are False (default) mark the pipeline as completed, unless one of the steps failed, then mark the pipeline as failed

  • Parameters

    • timeout (Optional [ float ] ) – Wait timeout for the optimization thread to exit (minutes). The default is None, indicating do not wait terminate immediately.

    • mark_failed (bool ) – If True, mark the pipeline task as failed. (default False)

    • mark_aborted (bool ) – If False, mark the pipeline task as aborted. (default False)

  • Return type

    ()


update_execution_plot#

update_execution_plot()

Update sankey diagram of the current pipeline

  • Return type

    ()


PipelineController.upload_artifact#

classmethod upload_artifact(name, artifact_object, metadata=None, delete_after_upload=False, auto_pickle=True, preview=None, wait_on_upload=False)

Upload (add) an artifact to the main Pipeline Task object. This function can be called from any pipeline component to directly add artifacts into the main pipeline Task

The artifact can be uploaded by any function/tasks executed by the pipeline, in order to report directly to the pipeline Task itself. It can also be called from the main pipeline control Task.

Raise ValueError if main Pipeline task could not be located.

The currently supported upload artifact types include:

  • string / Path - A path to artifact file. If a wildcard or a folder is specified, then ClearML

creates and uploads a ZIP file.

  • dict - ClearML stores a dictionary as .json file and uploads it.

  • pandas.DataFrame - ClearML stores a pandas.DataFrame as .csv.gz (compressed CSV) file and uploads it.

  • numpy.ndarray - ClearML stores a numpy.ndarray as .npz file and uploads it.

  • PIL.Image - ClearML stores a PIL.Image as .png file and uploads it.

  • Any - If called with auto_pickle=True, the object will be pickled and uploaded.

  • Parameters

    • name (str ) – The artifact name.

      warning

      If an artifact with the same name was previously uploaded, then it is overwritten.

    • artifact_object (object ) – The artifact object.

    • metadata (dict ) – A dictionary of key-value pairs for any metadata. This dictionary appears with the experiment in the ClearML Web-App (UI), ARTIFACTS tab.

    • delete_after_upload (bool ) – After the upload, delete the local copy of the artifact

      • True - Delete the local copy of the artifact.

      • False - Do not delete. (default)

    • auto_pickle (bool ) – If True (default) and the artifact_object is not one of the following types: pathlib2.Path, dict, pandas.DataFrame, numpy.ndarray, PIL.Image, url (string), local_file (string) the artifact_object will be pickled and uploaded as pickle file artifact (with file extension .pkl)

    • preview (Any ) – The artifact preview

    • wait_on_upload (bool ) – Whether or not the upload should be synchronous, forcing the upload to complete before continuing.

  • Return type

    bool

  • Returns

    The status of the upload.

  • True - Upload succeeded.

  • False - Upload failed.

  • Raise

    If the artifact object type is not supported, raise a ValueError.

  • Parameters

    • name (str ) –

    • artifact_object (Any ) –

    • metadata (Optional [ Mapping ] ) –

    • delete_after_upload (bool ) –

    • auto_pickle (bool ) –

    • preview (Optional [ Any ] ) –

    • wait_on_upload (bool ) –

  • Return type

    bool


PipelineController.upload_model#

classmethod upload_model(model_name, model_local_path)

Upload (add) a model to the main Pipeline Task object. This function can be called from any pipeline component to directly add models into the main pipeline Task

The model file/path will be uploaded to the Pipeline Task and registered on the model repository.

Raise ValueError if main Pipeline task could not be located.

  • Parameters

    • model_name (str) – Model name as will appear in the model registry (in the pipeline’s project)

    • model_local_path (str) – Path to the local model file or directory to be uploaded. If a local directory is provided the content of the folder (recursively) will be packaged into a zip file and uploaded

  • Return type

    OutputModel


wait#

wait(timeout=None)

Wait for the pipeline to finish.

info

This method does not stop the pipeline. Call stop to terminate the pipeline.

  • Parameters

    timeout (float ) – The timeout to wait for the pipeline to complete (minutes). If None, then wait until we reached the timeout, or pipeline completed.

  • Return type

    bool

  • Returns

    True, if the pipeline finished. False, if the pipeline timed out.

class automation.controller.PipelineDecorator()#

Create a new pipeline controller. The newly created object will launch and monitor the new experiments.

  • Parameters

    • name (str ) – Provide pipeline name (if main Task exists it overrides its name)

    • project (str ) – Provide project storing the pipeline (if main Task exists it overrides its project)

    • version (str ) – Must provide pipeline version. This version allows to uniquely identify the pipeline template execution. Examples for semantic versions: version=’1.0.1’ , version=’23’, version=’1.2’

    • pool_frequency (float ) – The pooling frequency (in minutes) for monitoring experiments / states.

    • add_pipeline_tags (bool ) – (default: False) if True, add pipe: <pipeline_task_id> tag to all steps (Tasks) created by this pipeline.

    • target_project (str ) – If provided, all pipeline steps are cloned into the target project

    • abort_on_failure (bool ) – If False (default), failed pipeline steps will not cause the pipeline to stop immediately, instead any step that is not connected (or indeirectly connected) to the failed step, will still be executed. Nonetheless the pipeline itself will be marked failed, unless the failed step was specifically defined with “continue_on_fail=True”. If True, any failed step will cause the pipeline to immediately abort, stop all running steps, and mark the pipeline as failed.


add_function_step#

add_function_step(name, function, function_kwargs=None, function_return=None, project_name=None, task_name=None, task_type=None, packages=None, repo=None, repo_branch=None, repo_commit=None, helper_functions=None, docker=None, docker_args=None, docker_bash_setup_script=None, parents=None, execution_queue=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None, time_limit=None, continue_on_fail=False, pre_execute_callback=None, post_execute_callback=None, cache_executed_step=False)

Create a Task from a function, including wrapping the function input arguments into the hyper-parameter section as kwargs, and storing function results as named artifacts

Example:

def mock_func(a=6, b=9):
c = a\*b
print(a, b, c)
return c, c\*\*2
create_task_from_function(mock_func, function_return=[‘mul’, ‘square’])

Example arguments from other Tasks (artifact):

def mock_func(matrix_np):
c = matrix_np\*matrix_np
print(matrix_np, c)
return c
create_task_from_function(
mock_func,
function_input_artifacts={‘matrix_np’: ‘aabb1122.previous_matrix’},
function_return=[‘square_matrix’]
)
  • Parameters

    • name (str ) – Unique of the step. For example stage1

    • function (Callable ) – A global function to convert into a standalone Task

    • function_kwargs (Optional [ Dict [ str , Any ] ] ) – Optional, provide subset of function arguments and default values to expose. If not provided automatically take all function arguments & defaults Optional, pass input arguments to the function from other Tasks’s output artifact. Example argument named numpy_matrix from Task ID aabbcc artifact name answer: {‘numpy_matrix’: ‘aabbcc.answer’}

    • function_return (Optional [ List [ str ] ] ) – Provide a list of names for all the results. If not provided no results will be stored as artifacts.

    • project_name (Optional [ str ] ) – Set the project name for the task. Required if base_task_id is None.

    • task_name (Optional [ str ] ) – Set the name of the remote task. Required if base_task_id is None.

    • task_type (Optional [ str ] ) – Optional, The task type to be created. Supported values: ‘training’, ‘testing’, ‘inference’, ‘data_processing’, ‘application’, ‘monitor’, ‘controller’, ‘optimizer’, ‘service’, ‘qc’, ‘custom’

    • packages (Optional [ Union [ str , Sequence [ str ] ] ] ) – Manually specify a list of required packages or a local requirements.txt file. Example: [“tqdm>=2.1”, “scikit-learn”] or “./requirements.txt” If not provided, packages are automatically added based on the imports used in the function.

    • repo (Optional [ str ] ) – Optional, specify a repository to attach to the function, when remotely executing. Allow users to execute the function inside the specified repository, enabling to load modules/script from a repository Notice the execution work directory will be the repository root folder. Supports both git repo url link, and local repository path. Example remote url: ‘https://github.com/user/repo.git’ Example local repo copy: ‘./repo’ -> will automatically store the remote repo url and commit ID based on the locally cloned copy

    • repo_branch (Optional [ str ] ) – Optional, specify the remote repository branch (Ignored, if local repo path is used)

    • repo_commit (Optional [ str ] ) – Optional, specify the repository commit id (Ignored, if local repo path is used)

    • helper_functions (Optional [ Sequence [ Callable ] ] ) – Optional, a list of helper functions to make available for the standalone function Task.

    • docker (Optional [ str ] ) – Select the docker image to be executed in by the remote session

    • docker_args (Optional [ str ] ) – Add docker arguments, pass a single string

    • docker_bash_setup_script (Optional [ str ] ) – Add bash script to be executed inside the docker before setting up the Task’s environment

    • parents (Optional [ Sequence [ str ] ] , ) – Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully.

    • execution_queue (Optional [ str ] ) – Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the default execution queue, as defined on the class

    • monitor_metrics (Optional [ List [ Union [ Tuple [ str , str ] , Tuple [ ( str , str ) , ( str , str ) ] ] ] ] ) – Optional, log the step’s metrics on the pipeline Task. Format is a list of pairs metric (title, series) to log:

      [(step_metric_title, step_metric_series), ] Example: [(‘test’, ‘accuracy’), ]

      Or a list of tuple pairs, to specify a different target metric for to use on the pipeline Task:

      [((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ]
      Example: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]
    • monitor_artifacts (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s artifacts on the pipeline Task. Provided a list of artifact names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘processed_data’, ‘final_processed_data’), ] Alternatively user can also provide a list of artifacts to monitor (target artifact name will be the same as original artifact name) Example: [‘processed_data’, ]

    • monitor_models (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s output models on the pipeline Task. Provided a list of model names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘model_weights’, ‘final_model_weights’), ] Alternatively user can also provide a list of models to monitor (target models name will be the same as original model) Example: [‘model_weights’, ] To select the latest (lexicographic) model use “model*”, or the last created model with just “*” Example: [‘model_weights*’, ]

    • time_limit (Optional [ float ] ) – Default None, no time limit. Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.

    • continue_on_fail (bool ) – (default False). If True, failed step will not cause the pipeline to stop (or marked as failed). Notice, that steps that are connected (or indirectly connected) to the failed step will be skipped.

    • pre_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , dict ] , bool ] ] # noqa ) – Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.

      If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.

      Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.

      def step_created_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      parameters, # type: dict
      ):
      pass
    • post_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node ] , None ] ] # noqa ) – Callback function, called when a step (Task) is completed and it other jobs are executed. Allows a user to modify the Task status after completion.

      def step_completed_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      ):
      pass
    • cache_executed_step (bool ) – If True, before launching the new step, after updating with the latest configuration, check if an exact Task with the same parameter/code was already executed. If it was found, use it instead of launching a new Task. Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used.

  • Return type

    bool

  • Returns

    True if successful


add_parameter#

add_parameter(name, default=None, description=None)

Add a parameter to the pipeline Task. The parameter can be used as input parameter for any step in the pipeline. Notice all parameters will appear under the PipelineController Task’s Hyper-parameters -> Pipeline section Example: pipeline.add_parameter(name=’dataset’, description=’dataset ID to process the pipeline’) Then in one of the steps we can refer to the value of the parameter with ‘${pipeline.dataset}’

  • Parameters

    • name (str) – String name of the parameter.

    • default (Optional[Any]) – Default value to be put as the default value (can be later changed in the UI)

    • description (Optional[str]) – String description of the parameter and its usage in the pipeline

  • Return type

    None


add_step#

add_step(name, base_task_id=None, parents=None, parameter_override=None, configuration_overrides=None, task_overrides=None, execution_queue=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None, time_limit=None, base_task_project=None, base_task_name=None, clone_base_task=True, continue_on_fail=False, pre_execute_callback=None, post_execute_callback=None, cache_executed_step=False, base_task_factory=None)

Add a step to the pipeline execution DAG. Each step must have a unique name (this name will later be used to address the step)

  • Parameters

    • name (str ) – Unique of the step. For example stage1

    • base_task_id (Optional [ str ] ) – The Task ID to use for the step. Each time the step is executed, the base Task is cloned, then the cloned task will be sent for execution.

    • parents (Optional [ Sequence [ str ] ] ) – Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully.

    • parameter_override (Optional [ Mapping [ str , Any ] ] ) – Optional parameter overriding dictionary. The dict values can reference a previously executed step using the following form ‘${step_name}’ Examples:

      - Artifact access

      parameter_override={‘Args/input_file’: ‘${<step_name>.artifacts.<artifact_name>.url}’ }

      * Model access (last model used)
      parameter_override={‘Args/input_file’: ‘${&lt;step_name&gt;.models.output.-1.url}’ }
      * Parameter access
      parameter_override={‘Args/input_file’: ‘${&lt;step_name&gt;.parameters.Args/input_file}’ }
      * Pipeline Task argument (see Pipeline.add_parameter)
      parameter_override={‘Args/input_file’: ‘${pipeline.&lt;pipeline_parameter&gt;}’ }
      * Task ID
      parameter_override={‘Args/input_file’: ‘${stage3.id}’ }
    • configuration_overrides (Optional [ Mapping [ str , Union [ str , Mapping ] ] ] ) – Optional, override Task configuration objects. Expected dictionary of configuration object name and configuration object content. Examples:

      {‘General’: dict(key=’value’)} {‘General’: ‘configuration file content’} {‘OmegaConf’: YAML.dumps(full_hydra_dict)}

    • task_overrides (Optional [ Mapping [ str , Any ] ] ) – Optional task section overriding dictionary. The dict values can reference a previously executed step using the following form ‘${step_name}’ Examples:

      - get the latest commit from a specific branch

      task_overrides={‘script.version_num’: ‘’, ‘script.branch’: ‘main’}

      * match git repository branch to a previous step
      task_overrides={‘script.branch’: ‘${stage1.script.branch}’, ‘script.version_num’: ‘’}
      * change container image
      task_overrides={‘container.image’: ‘${stage1.container.image}’}
      * match container image to a previous step
      task_overrides={‘container.image’: ‘${stage1.container.image}’}
    • execution_queue (Optional [ str ] ) – Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the default execution queue, as defined on the class

    • monitor_metrics (Optional [ List [ Union [ Tuple [ str , str ] , Tuple [ ( str , str ) , ( str , str ) ] ] ] ] ) – Optional, log the step’s metrics on the pipeline Task. Format is a list of pairs metric (title, series) to log:

      [(step_metric_title, step_metric_series), ] Example: [(‘test’, ‘accuracy’), ]

      Or a list of tuple pairs, to specify a different target metric for to use on the pipeline Task:

      [((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ]
      Example: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]
    • monitor_artifacts (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s artifacts on the pipeline Task. Provided a list of artifact names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘processed_data’, ‘final_processed_data’), ] Alternatively user can also provide a list of artifacts to monitor (target artifact name will be the same as original artifact name) Example: [‘processed_data’, ]

    • monitor_models (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, log the step’s output models on the pipeline Task. Provided a list of model names existing on the step’s Task, they will also appear on the Pipeline itself. Example: [(‘model_weights’, ‘final_model_weights’), ] Alternatively user can also provide a list of models to monitor (target models name will be the same as original model) Example: [‘model_weights’, ] To select the latest (lexicographic) model use “model*”, or the last created model with just “*” Example: [‘model_weights*’, ]

    • time_limit (Optional [ float ] ) – Default None, no time limit. Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.

    • base_task_project (Optional [ str ] ) – If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.

    • base_task_name (Optional [ str ] ) – If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.

    • clone_base_task (bool ) – If True (default) the pipeline will clone the base task, and modify/enqueue the cloned Task. If False, the base-task is used directly, notice it has to be in draft-mode (created).

    • continue_on_fail (bool ) – (default False). If True, failed step will not cause the pipeline to stop (or marked as failed). Notice, that steps that are connected (or indirectly connected) to the failed step will be skipped.

    • pre_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node , dict ] , bool ] ] # noqa ) – Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.

      If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.

      Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.

      def step_created_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      parameters, # type: dict
      ):
      pass
    • post_execute_callback (Optional [ Callable [ [ PipelineController , PipelineController.Node ] , None ] ] # noqa ) – Callback function, called when a step (Task) is completed and it other jobs are executed. Allows a user to modify the Task status after completion.

      def step_completed_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      ):
      pass
    • cache_executed_step (bool ) – If True, before launching the new step, after updating with the latest configuration, check if an exact Task with the same parameter/code was already executed. If it was found, use it instead of launching a new Task. Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used. If clone_base_task is False there is no cloning, hence the base_task is used.

    • base_task_factory (Optional [ Callable [ [ PipelineController.Node ] , Task ] ] ) – Optional, instead of providing a pre-existing Task, provide a Callable function to create the Task (returns Task object)

  • Return type

    bool

  • Returns

    True if successful


PipelineDecorator.component#

classmethod component(_func=None, *, return_values=('return_object'), name=None, cache=False, packages=None, parents=None, execution_queue=None, continue_on_fail=False, docker=None, docker_args=None, docker_bash_setup_script=None, task_type=None, repo=None, repo_branch=None, repo_commit=None, helper_functions=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None)

pipeline component function to be executed remotely

  • Parameters

    • _func – wrapper function

    • return_values (Union [ str , List [ str ] ] ) – Provide a list of names for all the results. Notice! If not provided no results will be stored as artifacts.

    • name (Optional [ str ] ) – Optional, set the name of the pipeline component task. If not provided, the wrapped function name is used as the pipeline component name

    • cache (bool ) – If True, before launching the new step, after updating with the latest configuration, check if an exact Task with the same parameter/code was already executed. If it was found, use it instead of launching a new Task. Default: False

    • packages (Optional [ Union [ str , Sequence [ str ] ] ] ) – Manually specify a list of required packages or a local requirements.txt file. Example: [“tqdm>=2.1”, “scikit-learn”] or “./requirements.txt” If not provided, packages are automatically added based on the imports used inside the wrapped function.

    • parents (Optional [ List [ str ] ] ) – Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully.

    • execution_queue (Optional [ str ] ) – Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the pipeline’s default execution queue

    • continue_on_fail (bool ) – (default False). If True, a failed step will not cause the pipeline to stop (or marked as failed). Notice, that steps that are connected (or indirectly connected) to the failed step will be skipped.

    • docker (Optional [ str ] ) – Specify the docker image to be used when executing the pipeline step remotely

    • docker_args (Optional [ str ] ) – Add docker execution arguments for the remote execution (use single string for all docker arguments).

    • docker_bash_setup_script (Optional [ str ] ) – Add a bash script to be executed inside the docker before setting up the Task’s environment

    • task_type (Optional [ str ] ) – Optional, The task type to be created. Supported values: ‘training’, ‘testing’, ‘inference’, ‘data_processing’, ‘application’, ‘monitor’, ‘controller’, ‘optimizer’, ‘service’, ‘qc’, ‘custom’

    • repo (Optional [ str ] ) – Optional, specify a repository to attach to the function, when remotely executing. Allow users to execute the function inside the specified repository, enabling them to load modules/script from the repository. Notice the execution work directory will be the repository root folder. Supports both git repo url link, and local repository path (automatically converted into the remote git/commit as is currently checkout). Example remote url: ‘https://github.com/user/repo.git’ Example local repo copy: ‘./repo’ -> will automatically store the remote repo url and commit ID based on the locally cloned copy

    • repo_branch (Optional [ str ] ) – Optional, specify the remote repository branch (Ignored, if local repo path is used)

    • repo_commit (Optional [ str ] ) – Optional, specify the repository commit id (Ignored, if local repo path is used)

    • helper_functions (Optional [ Sequence [ Callable ] ] ) – Optional, a list of helper functions to make available for the standalone pipeline step function Task. By default the pipeline step function has no access to any of the other functions, by specifying additional functions here, the remote pipeline step could call the additional functions. Example, assuming we have two functions parse_data(), and load_data(): [parse_data, load_data]

    • monitor_metrics (Optional [ List [ Union [ Tuple [ str , str ] , Tuple [ ( str , str ) , ( str , str ) ] ] ] ] ) – Optional, Automatically log the step’s reported metrics also on the pipeline Task. The expected format is a list of pairs metric (title, series) to log:

      [(step_metric_title, step_metric_series), ] Example: [(‘test’, ‘accuracy’), ]

      Or a list of tuple pairs, to specify a different target metric to use on the pipeline Task:

      [((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ]
      Example: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]
    • monitor_artifacts (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, Automatically log the step’s artifacts on the pipeline Task. Provided a list of artifact names created by the step function, these artifacts will be logged automatically also on the Pipeline Task itself. Example: [‘processed_data’, ] (target artifact name on the Pipeline Task will hav ethe same name as the original artifact) Alternatively, provide a list of pairs (source_artifact_name, target_artifact_name): where the first string is the artifact name as it appears on the component Task, and the second is the target artifact name to put on the Pipeline Task Example: [(‘processed_data’, ‘final_processed_data’), ]

    • monitor_models (Optional [ List [ Union [ str , Tuple [ str , str ] ] ] ] ) – Optional, Automatically log the step’s output models on the pipeline Task. Provided a list of model names created by the step’s Task, they will also appear on the Pipeline itself. Example: [‘model_weights’, ] To select the latest (lexicographic) model use “model*”, or the last created model with just “*” Example: [‘model_weights*’, ] Alternatively, provide a list of pairs (source_model_name, target_model_name): where the first string is the model name as it appears on the component Task, and the second is the target model name to put on the Pipeline Task Example: [(‘model_weights’, ‘final_model_weights’), ]

  • Return type

    Callable

  • Returns

    function wrapper


create_draft#

create_draft()

Optional, manually create & serialize the Pipeline Task. After calling Pipeline.create(), users can edit the pipeline in the UI and enqueue it for execution.

Notice: this function should be used to programmatically create pipeline for later usage. To automatically create and launch pipelines, call the start() method.

  • Return type

    None


PipelineDecorator.debug_pipeline#

classmethod debug_pipeline()

Set debugging mode, run all functions locally as functions Run the full pipeline DAG locally, where steps are executed as functions Notice:

running the DAG locally assumes the local code execution (i.e. it will not clone & apply git diff) Pipeline steps are executed as functions (no Task will be created), fo ease debugging J

  • Return type

    ()


elapsed#

elapsed()

Return minutes elapsed from controller stating time stamp.

  • Return type

    float

  • Returns

    The minutes from controller start time. A negative value means the process has not started yet.


PipelineDecorator.get_current_pipeline#

classmethod get_current_pipeline()

Return the currently running pipeline instance

  • Return type

    ForwardRef


PipelineDecorator.get_logger#

classmethod get_logger()

Return a logger connected to the Pipeline Task. The logger can be used by any function/tasks executed by the pipeline, in order to report directly to the pipeline Task itself. It can also be called from the main pipeline control Task.

Raise ValueError if main Pipeline task could not be located.

  • Return type

    Logger

  • Returns

    Logger object for reporting metrics (scalars, plots, debug samples etc.)


get_parameters#

get_parameters()

Return the pipeline parameters dictionary :rtype: dict :return: Dictionary str -> str

  • Return type

    dict


get_pipeline_dag#

get_pipeline_dag()

Return the pipeline execution graph, each node in the DAG is PipelineController.Node object. Graph itself is a dictionary of Nodes (key based on the Node name), each node holds links to its parent Nodes (identified by their unique names)

  • Return type

    Mapping[str, Node]

  • Returns

    execution tree, as a nested dictionary. Example:

{
'stage1' : Node() {
name: 'stage1'
job: ClearmlJob
...
},
}

get_processed_nodes#

get_processed_nodes()

Return the a list of the processed pipeline nodes, each entry in the list is PipelineController.Node object.

  • Return type

    Sequence[Node]

  • Returns

    executed (excluding currently executing) nodes list


get_running_nodes#

get_running_nodes()

Return the a list of the currently running pipeline nodes, each entry in the list is PipelineController.Node object.

  • Return type

    Sequence[Node]

  • Returns

    Currently running nodes list


is_running#

is_running()

return True if the pipeline controller is running.

  • Return type

    bool

  • Returns

    A boolean indicating whether the pipeline controller is active (still running) or stopped.


is_successful#

is_successful()

return True if the pipeline controller is fully executed and none of the steps / Tasks failed

  • Return type

    bool

  • Returns

    A boolean indicating whether all steps did not fail


PipelineDecorator.pipeline#

classmethod pipeline(_func=None, *, name, project, version, return_value=None, default_queue=None, pool_frequency=0.2, add_pipeline_tags=False, target_project=None, abort_on_failure=False, pipeline_execution_queue='services', multi_instance_support=False)

Decorate pipeline logic function.

  • Parameters

    • name (str) – Provide pipeline name (if main Task exists it overrides its name)

    • project (str) – Provide project storing the pipeline (if main Task exists it overrides its project)

    • version (str) – Must provide pipeline version. This version allows to uniquely identify the pipeline template execution. Examples for semantic versions: version=’1.0.1’ , version=’23’, version=’1.2’

    • return_value (Optional[str]) – Optional, Provide an artifact name to store the pipeline function return object Notice, If not provided the pipeline will not store the pipeline function return value.

    • default_queue (Optional[str]) – default pipeline step queue

    • pool_frequency (float ) – The pooling frequency (in minutes) for monitoring experiments / states.

    • add_pipeline_tags (bool ) – (default: False) if True, add pipe: <pipeline_task_id> tag to all steps (Tasks) created by this pipeline.

    • target_project (str ) – If provided, all pipeline steps are cloned into the target project

    • abort_on_failure (bool ) – If False (default), failed pipeline steps will not cause the pipeline to stop immediately, instead any step that is not connected (or indirectly connected) to the failed step, will still be executed. Nonetheless the pipeline itself will be marked failed, unless the failed step was specifically defined with “continue_on_fail=True”. If True, any failed step will cause the pipeline to immediately abort, stop all running steps, and mark the pipeline as failed.

    • pipeline_execution_queue (Optional[str]) – remote pipeline execution queue (default ‘services’ queue). If None is passed, execute the pipeline logic locally (pipeline steps are still executed remotely)

    • multi_instance_support – If True, allow multiple calls to the same pipeline function, each call creating a new Pipeline Task. Notice it is recommended to create an additional Task on the “main process” acting as a master pipeline, automatically collecting the execution plots. If multi_instance_support==’parallel’ then the pipeline calls are executed in parallel, in the parallel case the function calls return None, to collect all pipeline results call PipelineDecorator.wait_for_multi_pipelines(). Default False, no multi instance pipeline support.

  • Return type

    Callable


PipelineDecorator.run_locally#

classmethod run_locally()

Set local mode, run all functions locally as subprocess or serially as functions

Run the full pipeline DAG locally, where steps are executed as sub-processes Tasks Notice: running the DAG locally assumes the local code execution (i.e. it will not clone & apply git diff)

  • Return type

    ()


PipelineDecorator.set_default_execution_queue#

classmethod set_default_execution_queue(default_execution_queue)

Set the default execution queue for if pipeline step does not specify an execution queue

  • Parameters

    default_execution_queue (Optional[str]) – The execution queue to use if no execution queue is provided

  • Return type

    None


set_pipeline_execution_time_limit#

set_pipeline_execution_time_limit(max_execution_minutes)

Set maximum execution time (minutes) for the entire pipeline. Pass None or 0 to disable execution time limit.

  • Parameters

    max_execution_minutes (float ) – The maximum time (minutes) for the entire pipeline process. The default is None, indicating no time limit.

  • Return type

    None


start#

start(queue='services', step_task_created_callback=None, step_task_completed_callback=None, wait=True)

Start the current pipeline remotely (on the selected services queue) The current process will be stopped if exit_process is True.

  • Parameters

    • queue – queue name to launch the pipeline on

    • step_task_created_callback (Callable ) – Callback function, called when a step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.

      If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.

      Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.

      def step_created_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      parameters, # type: dict
      ):
      pass
    • step_task_completed_callback (Callable ) – Callback function, called when a step (Task) is completed and it other jobs are executed. Allows a user to modify the Task status after completion.

      def step_completed_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      ):
      pass
    • wait – If True (default), start the pipeline controller, return only after the pipeline is done (completed/aborted/failed)

  • Return type

    bool

  • Returns

    True, if the controller started. False, if the controller did not start.


start_locally#

start_locally(run_pipeline_steps_locally=False)

Start the current pipeline locally, meaning the pipeline logic is running on the current machine, instead of on the services queue.

Using run_pipeline_steps_locally=True you can run all the pipeline steps locally as sub-processes. Notice: when running pipeline steps locally, it assumes local code execution (i.e. it is running the local code as is, regardless of the git commit/diff on the pipeline steps Task)

  • Parameters

    run_pipeline_steps_locally (bool) – (default False) If True, run the

  • Return type

    None

pipeline steps themselves locally as a subprocess (use for debugging the pipeline locally, notice the pipeline code is expected to be available on the local machine)

  • Return type

    None

  • Parameters

    run_pipeline_steps_locally (bool ) –


stop#

stop(timeout=None, mark_failed=False, mark_aborted=False)

Stop the pipeline controller and the optimization thread. If mark_failed and mark_aborted are False (default) mark the pipeline as completed, unless one of the steps failed, then mark the pipeline as failed

  • Parameters

    • timeout (Optional [ float ] ) – Wait timeout for the optimization thread to exit (minutes). The default is None, indicating do not wait terminate immediately.

    • mark_failed (bool ) – If True, mark the pipeline task as failed. (default False)

    • mark_aborted (bool ) – If False, mark the pipeline task as aborted. (default False)

  • Return type

    ()


update_execution_plot#

update_execution_plot()

Update sankey diagram of the current pipeline

  • Return type

    ()


PipelineDecorator.upload_artifact#

classmethod upload_artifact(name, artifact_object, metadata=None, delete_after_upload=False, auto_pickle=True, preview=None, wait_on_upload=False)

Upload (add) an artifact to the main Pipeline Task object. This function can be called from any pipeline component to directly add artifacts into the main pipeline Task

The artifact can be uploaded by any function/tasks executed by the pipeline, in order to report directly to the pipeline Task itself. It can also be called from the main pipeline control Task.

Raise ValueError if main Pipeline task could not be located.

The currently supported upload artifact types include:

  • string / Path - A path to artifact file. If a wildcard or a folder is specified, then ClearML

creates and uploads a ZIP file.

  • dict - ClearML stores a dictionary as .json file and uploads it.

  • pandas.DataFrame - ClearML stores a pandas.DataFrame as .csv.gz (compressed CSV) file and uploads it.

  • numpy.ndarray - ClearML stores a numpy.ndarray as .npz file and uploads it.

  • PIL.Image - ClearML stores a PIL.Image as .png file and uploads it.

  • Any - If called with auto_pickle=True, the object will be pickled and uploaded.

  • Parameters

    • name (str ) – The artifact name.

      warning

      If an artifact with the same name was previously uploaded, then it is overwritten.

    • artifact_object (object ) – The artifact object.

    • metadata (dict ) – A dictionary of key-value pairs for any metadata. This dictionary appears with the experiment in the ClearML Web-App (UI), ARTIFACTS tab.

    • delete_after_upload (bool ) – After the upload, delete the local copy of the artifact

      • True - Delete the local copy of the artifact.

      • False - Do not delete. (default)

    • auto_pickle (bool ) – If True (default) and the artifact_object is not one of the following types: pathlib2.Path, dict, pandas.DataFrame, numpy.ndarray, PIL.Image, url (string), local_file (string) the artifact_object will be pickled and uploaded as pickle file artifact (with file extension .pkl)

    • preview (Any ) – The artifact preview

    • wait_on_upload (bool ) – Whether or not the upload should be synchronous, forcing the upload to complete before continuing.

  • Return type

    bool

  • Returns

    The status of the upload.

  • True - Upload succeeded.

  • False - Upload failed.

  • Raise

    If the artifact object type is not supported, raise a ValueError.

  • Parameters

    • name (str ) –

    • artifact_object (Any ) –

    • metadata (Optional [ Mapping ] ) –

    • delete_after_upload (bool ) –

    • auto_pickle (bool ) –

    • preview (Optional [ Any ] ) –

    • wait_on_upload (bool ) –

  • Return type

    bool


PipelineDecorator.upload_model#

classmethod upload_model(model_name, model_local_path)

Upload (add) a model to the main Pipeline Task object. This function can be called from any pipeline component to directly add models into the main pipeline Task

The model file/path will be uploaded to the Pipeline Task and registered on the model repository.

Raise ValueError if main Pipeline task could not be located.

  • Parameters

    • model_name (str) – Model name as will appear in the model registry (in the pipeline’s project)

    • model_local_path (str) – Path to the local model file or directory to be uploaded. If a local directory is provided the content of the folder (recursively) will be packaged into a zip file and uploaded

  • Return type

    OutputModel


wait#

wait(timeout=None)

Wait for the pipeline to finish.

info

This method does not stop the pipeline. Call stop to terminate the pipeline.

  • Parameters

    timeout (float ) – The timeout to wait for the pipeline to complete (minutes). If None, then wait until we reached the timeout, or pipeline completed.

  • Return type

    bool

  • Returns

    True, if the pipeline finished. False, if the pipeline timed out.


PipelineDecorator.wait_for_multi_pipelines#

classmethod wait_for_multi_pipelines()

Wait until all background multi pipeline execution is completed. Returns all the pipeline results in call order (first pipeline call at index 0)

  • Returns

    List of return values from executed pipeline, based on call order.