NVIDIA Clara Train 4.1
1.0

AutoML programming guide

As described in Clara AutoML high-level design, Clara AutoML is a platform for researchers to perform AutoML experimentation. As a researcher on this platform, you will be able to:

  1. Configure your search space in the training config of Clara Train, pick an existing Controller implementation, and let the platform do everything for you

  2. Develop and use your own Controller algorithm and let the platform take care of the rest

  3. Expand the functionalities of the AutoML workflow by creating your own handlers

  4. Write your own Executor to make the AutoML workflow engine work with your own way of model training, while still leveraging the engine’s powerful job management system

You should be able to do use case 1 after reading Clara AutoML high-level design. This programming guide will walk you through how to do use cases 2, 3, and 4.

You should first read Clara AutoML high-level design and be familiar with the following concepts:

  • Purpose of AutoML

  • Parameter Range Locator (PRL) and search range definition

  • Search Space

  • Controller

  • Executor

  • Handler

  • Workflow Engine and workflow logic

Before detailed discussion about the use cases, the first concept that is crucial to all components is the AutoML Context.

The AutoML process is a data-based collaboration among components (controller, executor, handlers, workflow engine, etc.). On the one hand, these components directly interact with each other through programmed interaction patterns (e.g. outcome generated by the executor will be fed to the controller for refined recommendations). On the other hand, the components can also interact with each other via indirect data sharing (e.g. a piece of data generated by a component can be desired by another component that has no direct programmed interaction to compile statistics). The AutoML Context makes this data sharing possible - different components can set and get properties from the AutoML Context.

API specification

Class path

automl.defs.Context

Property access

With set_prop(), you can set any object to the context by providing a key and the value of the object.

Copy
Copied!
            

def set_prop(self, key: str, value)

With get_prop(), you can get the object with the key from the context. If the specified property does not exist, the specified default value is returned.

Copy
Copied!
            

def get_prop(self, key: str, default=None)


Report exception

With exception_occurred(), you can tell the workflow engine that an exception has occurred in your component’s processing. It is up to the workflow engine how to react to this report.

Copy
Copied!
            

def exception_occurred(self, msg=None)

Note

Currently the engine will stop the workflow upon any exception report from any component, but this rule may change if necessary.


Ask to stop work

You can call the stop_work() method to request the engine to stop the workflow. You should provide a string for “msg” to specify the reason for the request.

Copy
Copied!
            

def stop_work(self, msg: str)

Note

Currently the engine will stop the workflow when the stop_work request is received from any component, but this rule may change if necessary.

With stop_work_asked(), you can check whether a stop_work has been asked.

Copy
Copied!
            

def stop_work_asked(self) -> bool

With get_stop_message(), you can get the stop message from the context.

Copy
Copied!
            

def get_stop_message(self)

Predefined properties

The class automl.defs.ContextKey defines some property keys reserved for the workflow engine. You can use these keys in your code to get the properties you are interested in.

Copy
Copied!
            

class ContextKey(object): """ Defines a set of standard property keys. """ RECOMMENDATIONS = "_recommendations" RECOMMENDATION = "_recommendation" SCORE = "_score" JOB_STATUS = "_jobStatus" CONCRETE_SEARCH_VALUE = "_concreteSearchValue" JOB_NAME = "_jobName" WORKER_NAME = "_workerName" SEARCH_SPACE = "_searchSpace" GPUS = "_gpus" NUM_WORKERS = "_numWorkers" PROCESS_EXIT_STATUS = "_processExitStatus"

Attention

If you want to store your own property into the context, make sure that your property key does not conflict with these predefined keys!

As a general rule, property keys beginning with the underscore character (_) are reserved. Name your properties without the underscore prefix to avoid name conflicts.

If you write your own controller or executor, you have to deal with SearchSpace.

The following classes are defined in the module automl.defs:

Copy
Copied!
            

class SearchRange(object): """ A SearchRange is the basic building block of search space. Args: minv: min value of the range. maxv: max value of the range. If not specified, it is set to the same value as minv Returns: A SearchRange object """ def __init__(self, minv, maxv=None): pass class SearchSpace(object): """ SearchSpace defines the space of candidates for searchable parameters. It is a dictionary of PRL (Parameter Range Locator) to a list of SearchRanges. Each PRL represents a unique search parameter. Args: name: name of the search space. targets: the dict of PRL => [SearchRange] Returns: A SearchSpace object """ def __init__(self, name: str, targets: dict): pass

Create SearchSpace

As shown above, to create a SearchSpace, you specify a name and a dictionary that maps PRLs to lists of SearchRanges.

The name is an arbitrary string. Currently it is not used by the system.

Each PRL maps to a list that contains one or more SearchRanges, depending on the date type of the PRL (enum or float).

For float type, the list must contain a single SearchRange, defining the min and max values of the range;

For enum type, the list can contain any number of SearchRanges, each defining a choice. Each SearchRange is defined with the same min and max integer number, starting from 0. For example, to define an enum with 4 choices, the SearchRange list should be like this:

Copy
Copied!
            

[SearchRange(0,0), SearchRange(1,1), SearchRange(2,2), SearchRange(3,3)]


A Parameter Range Locator (PRL) is a formatted string that uniquely represents a search parameter in the search space. The module automl.prl provides definitions and convenience functions that you may find useful for PRL processing:

Copy
Copied!
            

""" Defines the format and convenience functions of PRL. PRL (Parameter Range Locator - modeled after URL) Format: domain.type[.extra] """ class Domain(object): """ Defines support domains. """ NET = 'net' LEARNING_RATE = 'lr' TRANSFORM = 'transform' class PType(object): """ Defines supported parameter data types """ FLOAT = 'float' ENUM = 'enum' SUPPORTED_PARAM_DOMAIN = [Domain.NET, Domain.LEARNING_RATE, Domain.TRANSFORM] SUPPORTED_PARAM_TYPE = [PType.ENUM, PType.FLOAT] def make_prl(domain: str, ptype: str, rest: str="") -> str: """ Make a PRL from specified PRL elements Args: domain: domain of the PRL ptype: parameter data type rest: extra items Returns: a PRL """ pass def validate_prl(prl: str): """ Validate a specified prl. Args: prl: the PRL to be validated Returns: error message if not valid; None if valid. """ pass def split_prl(prl: str) -> [str]: """ Split the PRL into list of elements Args: prl: the PRL to be split Returns: list of elements """ pass

Controller generates recommendations. The Recommendation class defines the structure of recommendations, which includes a recommendation id (rec_id) and a SearchResult.

Note

Recommendation id is generated by the Controller to uniquely identify the recommendation. It can be of any object type.

The classes are defined in the module automl.defs:

Copy
Copied!
            

class SearchResult(object): """ SearchResult is a set of values from the search space. Args: space: the SearchSpace that the values are from. values: the dict of PRL => value Returns: A SearchResult object """ def __init__(self, space: SearchSpace, values: dict): assert isinstance(space, SearchSpace), "space must be SearchSpace" assert isinstance(values, dict), "values must be dict" for k in values.keys(): assert (k in space.targets), "value key{}not in space".format(k) self.space = space self.values = values class Recommendation(object): """ Recommendation is a recommendation for AutoML execution. Args: rec_id: the unique ID of the recommendation. result: the search result """ def __init__(self, rec_id, result: SearchResult): self.rec_id = rec_id self.result = result

Once a recommendation is executed, the result is represented as an Outcome object, in the module automl.defs:

Copy
Copied!
            

class Status(object): OK = 0 ERROR = -1 EXCEPTION = -2 UNFINISHED = -3 class Outcome(object): """ Outcome represents the execution result of a recommendation Args: rec_id: the ID of the recommendation executed status: job completion state score: the score produced by the recommendation execution job_ctx: the context used for the job execution Note: score is conceptual. It could be a simple number or an object of any type. """ def __init__(self, rec_id, status: Status, score, job_ctx: Context=None): self.recommendation_id = rec_id self.status = status self.score = score self.job_ctx = job_ctx

Attention

As described above, the ctx enables data sharing among AutoML components. In the multithreaded workflow environment (see below), each job is executed in a separate thread and is given a separate context! This means that the job context is only accessible from a single job execution thread.

Before starting to write your AutoML components, you must understand the running environment that the components live in - the AutoML workflow.

As described in Clara AutoML high-level design, AutoML workflow manages the interactions between the Controller and the Executor. In addition, it implements a multithreaded job management system that runs the jobs in parallel. To work properly in this multithreaded environment, you should take care to ensure the thread safety of your implementation because all components (Executor, Controller, Handlers) are singleton objects.

There are three kinds of threads during the workflow execution:

  • The main thread - the Engine’s execution thread. The Controller only runs in this thread.

  • The scheduler thread - monitors and schedules job executions.

  • The job execution thread - executes the job. There can be many such threads. The Executor’s execute() method runs in this thread.

Note

Some methods of the executor and handlers objects are run in multiple job threads. You should be careful when storing state data in these objects. In general, if the state data is specific to a single job execution, you should store it in the job context. But if the data is related to all job execution (e.g. you are collecting cross-job stats), you should ensure the integrity of the data when updated from multiple threads in parallel.

The controller is the “brain” of AutoML - it decides how to produce recommendations to find the best model possible.

You can write your own controller and use it on the AutoML platform. Your controller class must extend the Controller class, defined in the module automl.components.controllers.controller:

Attention

Controller methods are only run in the Main Thread.

Copy
Copied!
            

from abc import abstractmethod, ABC import logging from automl.defs import Context, SearchSpace, Recommendation, Outcome class Controller(ABC): """ This class defines the abstract behavior required of an AutoML Controller. Controller implements the AutoML strategy that decides how the training is to be conducted. Controller produces recommendations by finding values from a search space with some algorithm. """ def __init__(self): self.logger = logging.getLogger(self.__class__.__name__) @abstractmethod def set_search_space(self, space: SearchSpace, ctx: Context): """ Set the search space. This is the search space that the controller will search against to produce recommendations. The controller must keep it for later use. Args: space: the search space ctx: the context that enables across-component data sharing and communication Returns: NOTE: the controller should validate the search space and makes sure it is acceptable. In case the search space is not acceptable, the controller should either raise an exception or ask to stop the workflow by calling: ctx.ask_to_stop(). """ pass @abstractmethod def initial_recommendation(self, ctx: Context) -> [Recommendation]: """ This method is called by the AutoML workflow engine to produce the initial set of recommendations. The controller must produce 1 or more recommendations. If no recommendation is produced, the AutoML workflow will stop immediately. This method is called only once at the beginning of the AutoML process. Args: ctx: the context that enables across-component data sharing and communication Returns: a list of recommendations """ pass @abstractmethod def refine_recommendation(self, outcome: Outcome, ctx: Context) -> [Recommendation]: """ This method is called by the AutoML workflow engine to produce a set of recommendations based on the result from a previous job. The controller can produce 0 or more recommendations. This method is called every time a job finishes executing a previous recommendation. Args: outcome: the result of executing the previous recommendation ctx: the context that enables across-component data sharing and communication Returns: a list of recommendations, could be empty """ pass def shutdown(self, ctx: Context): """ Called at the end of the AutoML workflow. This provides the opportunity for the controller to clean up if needed. Args: ctx: the context that enables across-component data sharing and communication """ pass

AutoML controller example

The following example implements a dummy controller that simply returns randomly generated recommendations for a single PRL “lr.float” (which is for learning rate). In reality, your controller should produce recommendations for all PRLs in the search space.

When creating this controller, you specify:

  • total_recs - the total number of recommendations that controller will generate. If this number is reached, the controller will stop generating new recommendations.

  • max_recs_each_time - the max number of recommendations the controller will generate each time. This controller will generate a random number of recs that is less than the specified max.

Copy
Copied!
            

from automl.components.controllers.controller import Controller from automl.defs import Recommendation, Context, SearchSpace, SearchResult, Outcome import random class DummyController(Controller): def __init__(self, total_recs, max_recs_each_time): Controller.__init__(self) self.max_num_recs = total_recs self.max_recs_each_time = max_recs_each_time self.num_recs_done = 0 self.space = None def set_search_space(self, space: SearchSpace, ctx: Context): self.space = space def initial_recommendation(self, ctx: Context) -> [Recommendation]: num_recs = random.randint(1, self.max_recs_each_time) recs = [] for i in range(num_recs): result = SearchResult(space=self.space, values={ 'lr.float': random.uniform(0.0001, 0.001) }) recs.append(Recommendation(self.num_recs_done, result)) self.num_recs_done += 1 print("num_recs_done:{}".format(self.num_recs_done)) return recs def refine_recommendation(self, outcome: Outcome, ctx: Context) -> [Recommendation]: # must return a list of Recommendation if self.num_recs_done >= self.max_num_recs: return [] return self.initial_recommendation(ctx)


You write handlers to extend the functionalities of AutoML. For example, you may want to log the produced search space or recommendations to a file, perform analysis on the execution results, write them to a separate system, etc.

To fulfill these needs, you can write handlers to handle the events you are interested in.

Your handler class must extend the Handler class defined in module automl.components.handlers.handler, shown below:

Copy
Copied!
            

import logging import traceback from automl.defs import Context, EventType class Handler(object): """ This class defines the abstract behavior required of a Handler. A handler is an object that listens to certain AutoML workflow events and takes actions on such events. """ def __init__(self): self.logger = logging.getLogger(self.__class__.__name__) def startup(self, ctx: Context): """ The handler is being started up. Use this method to initialize the handler based on info in the context. NOTE: this method is called in the order of handler chain. If your handler depends on some info provided by previous handlers, you can get such info from the ctx. Args: ctx: main context """ pass def shutdown(self, ctx: Context): """ The handler is being shutdown. Use this method to clean up the handler. NOTE: this method is called in the reverse order of handler chain. If your handler depends on other handlers, your handler will be shutdown before the handlers you depend on. Args: ctx: """ pass def start_automl(self, ctx: Context): """ AUTOML process is about to start. Called from the engine thread. Args: ctx: the main context """ pass def end_automl(self, ctx: Context): """ AUTOML process has ended. Called from the engine thread. Args: ctx: the main context """ pass def start_job(self, ctx: Context): """ The job execution is about to start. Called from the job thread. NOTE: this method could be called from multiple job threads at the same time. If you want to store across-job state data in the handler, you should ensure thread safety when updating such state data. NOTE: the ctx is a per-job context, hence it is not subject to multi-thread access. Consider using the ctx to store per-job state data. Args: ctx: the job context """ pass def end_job(self, ctx: Context): """ The job execution has ended. Called from the job thread. See notes in start_job method. You can check the job status from prop ContextKey.JOB_STATUS in the ctx. NOTE: if the start_job is called, it is guaranteed that end_job will be called. Args: ctx: the job context """ pass def search_space_available(self, ctx: Context): """ The search space is available. Called from the main thread. You can get the search space from prop ContextKey.SEARCH_SPACE in the ctx. Args: ctx: main context """ pass def recommendations_available(self, ctx: Context): """ The recommendations are available. Called from the main thread. You can get recommendations from prop ContextKey.RECOMMENDATIONS in the ctx. Args: ctx: main context """ pass def handle_event(self, event: str, ctx: Context): """ The default event handler that calls a predefined method for each event type. Args: event: event to be handled ctx: context for cross-component data sharing """ if event == EventType.START_AUTOML: self.start_automl(ctx) elif event == EventType.START_JOB: self.start_job(ctx) elif event == EventType.END_JOB: self.end_job(ctx) elif event == EventType.END_AUTOML: self.end_automl(ctx) elif event == EventType.RECOMMENDATIONS_AVAILABLE: self.recommendations_available(ctx) elif event == EventType.SEARCH_SPACE_AVAILABLE: self.search_space_available(ctx) def fire_event(event: str, handlers: list, ctx: Context): """ Fires the specified event and invokes the list of handlers. Args: event: the event to be fired handlers: handlers to be invoked ctx: context for cross-component data sharing """ if handlers: for h in handlers: try: h.handle_event(event, ctx) except Exception as ex: ctx.trace(h, "Exception form handler{}:{}".format(h.__class__.__name__, ex)) traceback.print_exc() ctx.exception_occurred(h)

Attention

Make sure you understand which thread each of these methods is called from. If called from the job thread, you should ensure the thread safety of your implementation.

AutoML handler example

The following is a simple stats handler that collects and computes the stats of each worker:

Copy
Copied!
            

from .handler import Handler from automl.defs import Context, ContextKey import time import threading KEY_JOB_START_TIME = 'StatsHandler.jobStartTime' class WorkerStats(object): def __init__(self): self.job_count = 0 self.total_time = 0 class StatsHandler(Handler): """ Defines a simple stats handler that collects job execution stats. For each worker, it computes things like the number of jobs started, number of jobs finished and unfinished, and total amount of time the worker worked. It also shows the total amount of time used by the whole workflow. """ def __init__(self): Handler.__init__(self) self.automl_start_time = None self.worker_stats = {} # worker name => stats self.total_jobs_started = 0 self.total_jobs_failed = 0 self.total_jobs_completed = 0 self.worker_update_lock = threading.Lock() def start_automl(self, ctx: Context): self.automl_start_time = time.time() def recommendations_available(self, ctx: Context): recs = ctx.get_prop(ContextKey.RECOMMENDATIONS) if recs: print('Number of recs:{}'.format(len(recs))) else: print('No recs') def start_job(self, ctx: Context): ctx.set_prop(KEY_JOB_START_TIME, time.time()) self.worker_update_lock.acquire() self.total_jobs_started += 1 self.worker_update_lock.release() def end_job(self, ctx: Context): job_start = ctx.get_prop(KEY_JOB_START_TIME) worker_name = ctx.get_prop(ContextKey.WORKER_NAME) self.worker_update_lock.acquire() try: if ctx.stop_work_asked(): self.total_jobs_failed += 1 else: self.total_jobs_completed += 1 if worker_name in self.worker_stats: stats = self.worker_stats[worker_name] else: stats = WorkerStats() stats.job_count += 1 stats.total_time += time.time() - job_start self.worker_stats[worker_name] = stats except Exception: pass self.worker_update_lock.release() def end_automl(self, ctx: Context): for worker, stats in self.worker_stats.items(): print("Worker{}: processed{}jobs in{}secs".format( worker, stats.job_count, stats.total_time)) print("Total jobs started :{}".format(self.total_jobs_started)) print("Total jobs unfinished:{}".format(self.total_jobs_failed)) print("Total jobs finished :{}".format(self.total_jobs_completed)) print('Total automl time:{}secs'.format(time.time() - self.automl_start_time))

This example shows:

  • How to use the job context to store job specific data (KEY_JOB_START_TIME)

  • How to ensure thread safety of common state data stored in the handler with lock

Clara AutoML provides executors for MMAR-based model training on local machines or NGC. However, if you want to do it in a different way, you can write your own executor, while still leveraging the workflow engine’s job management capability.

To create your own executor, you extend the Executor class defined in the module automl.components.executors.executor:

Copy
Copied!
            

import logging from abc import abstractmethod, ABC from automl.defs import Context, Recommendation, SearchSpace class Executor(ABC): """ This class defines the abstract behavior required of an Executor. Executor executes a recommendation, and produces a score. """ def __init__(self): self.logger = logging.getLogger(self.__class__.__name__) @abstractmethod def execute(self, recommendation: Recommendation, ctx: Context) -> object: """ Execute the specified recommendation and return a score. This method is called from the Job Thread. Args: recommendation: the recommendation to be executed ctx: the context that enables cross-component data sharing Returns: an object that represents the score """ pass @abstractmethod def determine_search_space(self, ctx: Context) -> SearchSpace: """ This method is called by the AutoML engine to determine the AutoML search space. This method is called from the Main Thread. Args: ctx: the context that enables cross-component data sharing Returns: a search space """ pass def abort(self, ctx: Context): """ This method is called to abort execution immediately. This method is called from the Main Thread. Args: ctx: the context that enables cross-component data sharing """ pass def shutdown(self, ctx: Context): """ This method is called to shutdown the executor. This is called at the end of the AutoML workflow. This method is called from the Main Thread. Args: ctx: the context that enables cross-component data sharing """ pass

Note

The ctx to all these methods is job context.

Attention

Make sure you understand which thread each method runs in and ensure thread safety of your implementation!

AutoML executor example

The following example shows a dummy executor, which simply returns a randomly generated score for a recommendation after sleeping for a few seconds:

Copy
Copied!
            

import random import time from automl.components.executors.executor import Executor from automl.defs import Recommendation, Context, SearchSpace, SearchRange, SearchResult, Outcome, ContextKey class TestExecutor(Executor): def __init__(self): Executor.__init__(self) def execute(self, recommendation: Recommendation, ctx: Context) -> object: job_name = ctx.get_prop(ContextKey.JOB_NAME) worker_name = ctx.get_prop(ContextKey.WORKER_NAME) print("Worker{}: executing job{}...".format(worker_name, job_name)) secs_to_sleep = random.randrange(5, 10) print("Worker{}: job{}sleep{}secs".format(worker_name, job_name, secs_to_sleep)) time.sleep(secs_to_sleep) score = random.uniform(0.0, 1.0) print("Worker{}: finished job{}with score{}".format(worker_name, job_name, score)) return score def determine_search_space(self, ctx: Context) -> SearchSpace: return SearchSpace("test", { "lr.float": [SearchRange(0.001, 0.002)] })


If you have a completely different way of doing AutoML, you can create your own AutoML system while still leveraging the workflow engine’s job management system.

The following is an end-to-end example:

Copy
Copied!
            

import argparse import random import time from automl.components.executors.executor import Executor from automl.components.controllers.controller import Controller from automl.components.handlers.handler import Handler from automl.defs import Recommendation, Context, SearchSpace, SearchRange, SearchResult, Outcome, ContextKey from automl.workflows.engine import WorkerConfig, Engine class TestExecutor(Executor): def __init__(self): Executor.__init__(self) def execute(self, recommendation: Recommendation, ctx: Context) -> object: job_name = ctx.get_prop(ContextKey.JOB_NAME) worker_name = ctx.get_prop(ContextKey.WORKER_NAME) print("Worker{}: executing job{}...".format(worker_name, job_name)) secs_to_sleep = random.randrange(5, 10) print("Worker{}: job{}sleep{}secs".format(worker_name, job_name, secs_to_sleep)) time.sleep(secs_to_sleep) # excep = random.randrange(1, 10) # if excep == 1: # print("Worker {}: crash job {}".format(worker_name, job_name)) # y = 1 / 0 # simulate exception score = random.uniform(0.0, 1.0) print("Worker{}: finished job{}with score{}".format(worker_name, job_name, score)) return score def determine_search_space(self, ctx: Context) -> SearchSpace: return SearchSpace("test", { "lr.float": [SearchRange(0.001, 0.002)] }) class TestController(Controller): def __init__(self, total_recs, max_recs_each_time): Controller.__init__(self) self.max_num_recs = total_recs self.max_recs_each_time = max_recs_each_time self.num_recs_done = 0 self.space = None def set_search_space(self, space: SearchSpace, ctx: Context): self.space = space def initial_recommendation(self, ctx: Context) -> [Recommendation]: num_recs = random.randint(1, self.max_recs_each_time) recs = [] for i in range(num_recs): result = SearchResult(space=self.space, values={ 'lr.float': random.uniform(0.001, 0.002) }) recs.append(Recommendation(self.num_recs_done, result)) self.num_recs_done += 1 print("num_recs_done:{}".format(self.num_recs_done)) return recs def refine_recommendation(self, outcome: Outcome, ctx: Context) -> [Recommendation]: # must return a list of Recommendation and whether to stop if self.num_recs_done >= self.max_num_recs: return [] return self.initial_recommendation(ctx) class TestHandler(Handler): def __init__(self): Handler.__init__(self) def handle_event(self, event: str, ctx: Context): print("TestHandler: got event{}".format(event)) def main(): parser = argparse.ArgumentParser() parser.add_argument('--total_recs', '-t', type=int, default=20, help='total number of recommendations to produce') parser.add_argument('--max_recs_each_time', '-e', type=int, default=5, help='max number of recommendations to produce each time') parser.add_argument('--num_workers', '-w', type=int, default=2, help='max number of workers') args = parser.parse_args() ctx = Context() executor = TestExecutor() controller = TestController(args.total_recs, args.max_recs_each_time) handler = TestHandler() worker_configs = [] for i in range(args.num_workers): worker_configs.append(WorkerConfig('W{}'.format(i+1), [i])) engine = Engine(worker_configs, controller, executor, [handler]) engine.run(ctx) engine.shutdown(ctx) if __name__ == '__main__': main()

© Copyright 2021, NVIDIA. Last updated on Feb 2, 2023.