AutoML programming guide
As described in Clara AutoML high-level design, Clara AutoML is a platform for researchers to perform AutoML experimentation. As a researcher on this platform, you will be able to:
Configure your search space in the training config of Clara Train, pick an existing Controller implementation, and let the platform do everything for you
Develop and use your own Controller algorithm and let the platform take care of the rest
Expand the functionalities of the AutoML workflow by creating your own handlers
Write your own Executor to make the AutoML workflow engine work with your own way of model training, while still leveraging the engine’s powerful job management system
You should be able to do use case 1 after reading Clara AutoML high-level design. This programming guide will walk you through how to do use cases 2, 3, and 4.
You should first read Clara AutoML high-level design and be familiar with the following concepts:
Purpose of AutoML
Parameter Range Locator (PRL) and search range definition
Search Space
Controller
Executor
Handler
Workflow Engine and workflow logic
Before detailed discussion about the use cases, the first concept that is crucial to all components is the AutoML Context.
The AutoML process is a data-based collaboration among components (controller, executor, handlers, workflow engine, etc.). On the one hand, these components directly interact with each other through programmed interaction patterns (e.g. outcome generated by the executor will be fed to the controller for refined recommendations). On the other hand, the components can also interact with each other via indirect data sharing (e.g. a piece of data generated by a component can be desired by another component that has no direct programmed interaction to compile statistics). The AutoML Context makes this data sharing possible - different components can set and get properties from the AutoML Context.
API specification
Class path
automl.defs.Context
Property access
With set_prop(), you can set any object to the context by providing a key and the value of the object.
def set_prop(self, key: str, value)
With get_prop(), you can get the object with the key from the context. If the specified property does not exist, the specified default value is returned.
def get_prop(self, key: str, default=None)
Report exception
With exception_occurred(), you can tell the workflow engine that an exception has occurred in your component’s processing. It is up to the workflow engine how to react to this report.
def exception_occurred(self, msg=None)
Currently the engine will stop the workflow upon any exception report from any component, but this rule may change if necessary.
Ask to stop work
You can call the stop_work() method to request the engine to stop the workflow. You should provide a string for “msg” to specify the reason for the request.
def stop_work(self, msg: str)
Currently the engine will stop the workflow when the stop_work request is received from any component, but this rule may change if necessary.
With stop_work_asked(), you can check whether a stop_work has been asked.
def stop_work_asked(self) -> bool
With get_stop_message(), you can get the stop message from the context.
def get_stop_message(self)
Predefined properties
The class automl.defs.ContextKey defines some property keys reserved for the workflow engine. You can use these keys in your code to get the properties you are interested in.
class ContextKey(object):
"""
Defines a set of standard property keys.
"""
RECOMMENDATIONS = "_recommendations"
SCORE = "_score"
JOB_STATUS = "_jobStatus"
JOB_NAME = "_jobName"
WORKER_NAME = "_workerName"
SEARCH_SPACE = "_searchSpace"
GPUS = "_gpus"
NUM_WORKERS = "_numWorkers"
If you want to store your own property into the context, make sure that your property key does not conflict with these predefined keys!
As a general rule, property keys beginning with the underscore character (_) are reserved. Name your properties without the underscore prefix to avoid name conflicts.
If you write your own controller or executor, you have to deal with SearchSpace.
The following classes are defined in the module automl.defs:
class SearchRange(object):
"""
A SearchRange is the basic building block of search space.
Args:
minv: min value of the range.
maxv: max value of the range. If not specified, it is set to the same value as minv
Returns:
A SearchRange object
"""
def __init__(self, minv, maxv=None):
pass
class SearchSpace(object):
"""
SearchSpace defines the space of candidates for searchable parameters.
It is a dictionary of PRL (Parameter Range Locator) to a list of SearchRanges.
Each PRL represents a unique search parameter.
Args:
name: name of the search space.
targets: the dict of PRL => [SearchRange]
Returns:
A SearchSpace object
"""
def __init__(self, name: str, targets: dict):
pass
Create SearchSpace
As shown above, to create a SearchSpace, you specify a name and a dictionary that maps PRLs to lists of SearchRanges.
The name is an arbitrary string. Currently it is not used by the system.
Each PRL maps to a list that contains one or more SearchRanges, depending on the date type of the PRL (enum or float).
For float type, the list must contain a single SearchRange, defining the min and max values of the range;
For enum type, the list can contain any number of SearchRanges, each defining a choice. Each SearchRange is defined with the same min and max integer number, starting from 0. For example, to define an enum with 4 choices, the SearchRange list should be like this:
[SearchRange(0,0), SearchRange(1,1), SearchRange(2,2), SearchRange(3,3)]
A Parameter Range Locator (PRL) is a formatted string that uniquely represents a search parameter in the search space. The module automl.prl provides definitions and convenience functions that you may find useful for PRL processing:
"""
Defines the format and convenience functions of PRL.
PRL (Parameter Range Locator - modeled after URL)
Format: domain.type[.extra]
"""
class Domain(object):
"""
Defines support domains.
"""
NET = 'net'
LEARNING_RATE = 'lr'
TRANSFORM = 'transform'
class PType(object):
"""
Defines supported parameter data types
"""
FLOAT = 'float'
ENUM = 'enum'
SUPPORTED_PARAM_DOMAIN = [Domain.NET, Domain.LEARNING_RATE, Domain.TRANSFORM]
SUPPORTED_PARAM_TYPE = [PType.ENUM, PType.FLOAT]
def make_prl(domain: str, ptype: str, rest: str="") -> str:
"""
Make a PRL from specified PRL elements
Args:
domain: domain of the PRL
ptype: parameter data type
rest: extra items
Returns: a PRL
"""
pass
def validate_prl(prl: str):
"""
Validate a specified prl.
Args:
prl: the PRL to be validated
Returns: error message if not valid; None if valid.
"""
pass
def split_prl(prl: str) -> [str]:
"""
Split the PRL into list of elements
Args:
prl: the PRL to be split
Returns: list of elements
"""
pass
Controller generates recommendations. The Recommendation class defines the structure of recommendations, which includes a recommendation id (rec_id) and a SearchResult.
Recommendation id is generated by the Controller to uniquely identify the recommendation. It can be of any object type.
The classes are defined in the module automl.defs:
class SearchResult(object):
"""
SearchResult is a set of values from the search space.
Args:
space: the SearchSpace that the values are from.
values: the dict of PRL => value
Returns:
A SearchResult object
"""
def __init__(self, space: SearchSpace, values: dict):
assert isinstance(space, SearchSpace), "space must be SearchSpace"
assert isinstance(values, dict), "values must be dict"
for k in values.keys():
assert (k in space.targets), "value key{}not in space".format(k)
self.space = space
self.values = values
class Recommendation(object):
"""
Recommendation is a recommendation for AutoML execution.
Args:
rec_id: the unique ID of the recommendation.
result: the search result
"""
def __init__(self, rec_id, result: SearchResult):
self.rec_id = rec_id
self.result = result
Once a recommendation is executed, the result is represented as an Outcome object, in the module automl.defs:
class Status(object):
OK = 0
ERROR = -1
EXCEPTION = -2
UNFINISHED = -3
class Outcome(object):
"""
Outcome represents the execution result of a recommendation
Args:
rec_id: the ID of the recommendation executed
status: job completion state
score: the score produced by the recommendation execution
job_ctx: the context used for the job execution
Note: score is conceptual. It could be a simple number or an object of any type.
"""
def __init__(self, rec_id, status: Status, score, job_ctx: Context=None):
self.recommendation_id = rec_id
self.status = status
self.score = score
self.job_ctx = job_ctx
As described above, the ctx enables data sharing among AutoML components. In the multithreaded workflow environment (see below), each job is executed in a separate thread and is given a separate context! This means that the job context is only accessible from a single job execution thread.
Before starting to write your AutoML components, you must understand the running environment that the components live in - the AutoML workflow.
As described in Clara AutoML high-level design, AutoML workflow manages the interactions between the Controller and the Executor. In addition, it implements a multithreaded job management system that runs the jobs in parallel. To work properly in this multithreaded environment, you should take care to ensure the thread safety of your implementation because all components (Executor, Controller, Handlers) are singleton objects.
There are three kinds of threads during the workflow execution:
The main thread - the Engine’s execution thread. The Controller only runs in this thread.
The scheduler thread - monitors and schedules job executions.
The job execution thread - executes the job. There can be many such threads. The Executor’s execute() method runs in this thread.
Some methods of the executor and handlers objects are run in multiple job threads. You should be careful when storing state data in these objects. In general, if the state data is specific to a single job execution, you should store it in the job context. But if the data is related to all job execution (e.g. you are collecting cross-job stats), you should ensure the integrity of the data when updated from multiple threads in parallel.
The controller is the “brain” of AutoML - it decides how to produce recommendations to find the best model possible.
You can write your own controller and use it on the AutoML platform. Your controller class must extend the Controller class, defined in the module automl.components.controllers.controller:
Controller methods are only run in the Main Thread.
from abc import abstractmethod, ABC
import logging
from automl.defs import Context, SearchSpace, Recommendation, Outcome
class Controller(ABC):
"""
This class defines the abstract behavior required of an AutoML Controller.
Controller implements the AutoML strategy that decides how the training is to be conducted.
Controller produces recommendations by finding values from a search space with some algorithm.
"""
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
@abstractmethod
def set_search_space(self, space: SearchSpace, ctx: Context):
"""
Set the search space.
This is the search space that the controller will search against to produce recommendations.
The controller must keep it for later use.
Args:
space: the search space
ctx: the context that enables across-component data sharing and communication
Returns:
NOTE: the controller should validate the search space and makes sure it is acceptable.
In case the search space is not acceptable, the controller should either raise an exception
or ask to stop the workflow by calling: ctx.ask_to_stop().
"""
pass
@abstractmethod
def initial_recommendation(self, ctx: Context) -> [Recommendation]:
"""
This method is called by the AutoML workflow engine to produce the initial set of recommendations.
The controller must produce 1 or more recommendations.
If no recommendation is produced, the AutoML workflow will stop immediately.
This method is called only once at the beginning of the AutoML process.
Args:
ctx: the context that enables across-component data sharing and communication
Returns: a list of recommendations
"""
pass
@abstractmethod
def refine_recommendation(self, outcome: Outcome, ctx: Context) -> [Recommendation]:
"""
This method is called by the AutoML workflow engine to produce a set of recommendations based
on the result from a previous job.
The controller can produce 0 or more recommendations.
This method is called every time a job finishes executing a previous recommendation.
Args:
outcome: the result of executing the previous recommendation
ctx: the context that enables across-component data sharing and communication
Returns: a list of recommendations, could be empty
"""
pass
def shutdown(self, ctx: Context):
"""
Called at the end of the AutoML workflow. This provides the opportunity for the controller
to clean up if needed.
Args:
ctx: the context that enables across-component data sharing and communication
"""
pass
AutoML controller example
The following example implements a dummy controller that simply returns randomly generated recommendations for a single PRL “lr.float” (which is for learning rate). In reality, your controller should produce recommendations for all PRLs in the search space.
When creating this controller, you specify:
total_recs - the total number of recommendations that controller will generate. If this number is reached, the controller will stop generating new recommendations.
max_recs_each_time - the max number of recommendations the controller will generate each time. This controller will generate a random number of recs that is less than the specified max.
from automl.components.controllers.controller import Controller
from automl.defs import Recommendation, Context, SearchSpace, SearchResult, Outcome
import random
class DummyController(Controller):
def __init__(self, total_recs, max_recs_each_time):
Controller.__init__(self)
self.max_num_recs = total_recs
self.max_recs_each_time = max_recs_each_time
self.num_recs_done = 0
self.space = None
def set_search_space(self, space: SearchSpace, ctx: Context):
self.space = space
def initial_recommendation(self, ctx: Context) -> [Recommendation]:
num_recs = random.randint(1, self.max_recs_each_time)
recs = []
for i in range(num_recs):
result = SearchResult(space=self.space,
values={
'lr.float': random.uniform(0.0001, 0.001)
})
recs.append(Recommendation(self.num_recs_done, result))
self.num_recs_done += 1
print("num_recs_done:{}".format(self.num_recs_done))
return recs
def refine_recommendation(self, outcome: Outcome, ctx: Context) -> [Recommendation]:
# must return a list of Recommendation
if self.num_recs_done >= self.max_num_recs:
return []
return self.initial_recommendation(ctx)
You write handlers to extend the functionalities of AutoML. For example, you may want to log the produced search space or recommendations to a file, perform analysis on the execution results, write them to a separate system, etc.
To fulfill these needs, you can write handlers to handle the events you are interested in.
Your handler class must extend the Handler class defined in module automl.components.handlers.handler, shown below:
import logging
import traceback
from automl.defs import Context, EventType
class Handler(object):
"""
This class defines the abstract behavior required of a Handler.
A handler is an object that listens to certain AutoML workflow events and takes actions on such events.
"""
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
def startup(self, ctx: Context):
"""
The handler is being started up.
Use this method to initialize the handler based on info in the context.
NOTE: this method is called in the order of handler chain. If your handler depends on some
info provided by previous handlers, you can get such info from the ctx.
Args:
ctx: main context
"""
pass
def shutdown(self, ctx: Context):
"""
The handler is being shutdown.
Use this method to clean up the handler.
NOTE: this method is called in the reverse order of handler chain.
If your handler depends on other handlers, your handler will be shutdown before the handlers
you depend on.
Args:
ctx:
"""
pass
def start_automl(self, ctx: Context):
"""
AUTOML process is about to start. Called from the engine thread.
Args:
ctx: the main context
"""
pass
def end_automl(self, ctx: Context):
"""
AUTOML process has ended. Called from the engine thread.
Args:
ctx: the main context
"""
pass
def start_job(self, ctx: Context):
"""
The job execution is about to start. Called from the job thread.
NOTE: this method could be called from multiple job threads at the same time. If you want
to store across-job state data in the handler, you should ensure thread safety when updating
such state data.
NOTE: the ctx is a per-job context, hence it is not subject to multi-thread access.
Consider using the ctx to store per-job state data.
Args:
ctx: the job context
"""
pass
def end_job(self, ctx: Context):
"""
The job execution has ended. Called from the job thread.
See notes in start_job method.
You can check the job status from prop ContextKey.JOB_STATUS in the ctx.
NOTE: if the start_job is called, it is guaranteed that end_job will be called.
Args:
ctx: the job context
"""
pass
def search_space_available(self, ctx: Context):
"""
The search space is available. Called from the main thread.
You can get the search space from prop ContextKey.SEARCH_SPACE in the ctx.
Args:
ctx: main context
"""
pass
def recommendations_available(self, ctx: Context):
"""
The recommendations are available. Called from the main thread.
You can get recommendations from prop ContextKey.RECOMMENDATIONS in the ctx.
Args:
ctx: main context
"""
pass
def handle_event(self, event: str, ctx: Context):
"""
The default event handler that calls a predefined method for each event type.
Args:
event: event to be handled
ctx: context for cross-component data sharing
"""
if event == EventType.START_AUTOML:
self.start_automl(ctx)
elif event == EventType.START_JOB:
self.start_job(ctx)
elif event == EventType.END_JOB:
self.end_job(ctx)
elif event == EventType.END_AUTOML:
self.end_automl(ctx)
elif event == EventType.RECOMMENDATIONS_AVAILABLE:
self.recommendations_available(ctx)
elif event == EventType.SEARCH_SPACE_AVAILABLE:
self.search_space_available(ctx)
def fire_event(event: str, handlers: list, ctx: Context):
"""
Fires the specified event and invokes the list of handlers.
Args:
event: the event to be fired
handlers: handlers to be invoked
ctx: context for cross-component data sharing
"""
if handlers:
for h in handlers:
try:
h.handle_event(event, ctx)
except Exception as ex:
ctx.trace(h, "Exception form handler{}:{}".format(h.__class__.__name__, ex))
traceback.print_exc()
ctx.exception_occurred(h)
Make sure you understand which thread each of these methods is called from. If called from the job thread, you should ensure the thread safety of your implementation.
AutoML handler example
The following is a simple stats handler that collects and computes the stats of each worker:
from .handler import Handler
from automl.defs import Context, ContextKey
import time
import threading
KEY_JOB_START_TIME = 'StatsHandler.jobStartTime'
class WorkerStats(object):
def __init__(self):
self.job_count = 0
self.total_time = 0
class StatsHandler(Handler):
"""
Defines a simple stats handler that collects job execution stats.
For each worker, it computes things like the number of jobs started, number of jobs finished and unfinished,
and total amount of time the worker worked.
It also shows the total amount of time used by the whole workflow.
"""
def __init__(self):
Handler.__init__(self)
self.automl_start_time = None
self.worker_stats = {} # worker name => stats
self.total_jobs_started = 0
self.total_jobs_failed = 0
self.total_jobs_completed = 0
self.worker_update_lock = threading.Lock()
def start_automl(self, ctx: Context):
self.automl_start_time = time.time()
def recommendations_available(self, ctx: Context):
recs = ctx.get_prop(ContextKey.RECOMMENDATIONS)
if recs:
print('Number of recs:{}'.format(len(recs)))
else:
print('No recs')
def start_job(self, ctx: Context):
ctx.set_prop(KEY_JOB_START_TIME, time.time())
self.worker_update_lock.acquire()
self.total_jobs_started += 1
self.worker_update_lock.release()
def end_job(self, ctx: Context):
job_start = ctx.get_prop(KEY_JOB_START_TIME)
worker_name = ctx.get_prop(ContextKey.WORKER_NAME)
self.worker_update_lock.acquire()
try:
if ctx.stop_work_asked():
self.total_jobs_failed += 1
else:
self.total_jobs_completed += 1
if worker_name in self.worker_stats:
stats = self.worker_stats[worker_name]
else:
stats = WorkerStats()
stats.job_count += 1
stats.total_time += time.time() - job_start
self.worker_stats[worker_name] = stats
except Exception:
pass
self.worker_update_lock.release()
def end_automl(self, ctx: Context):
for worker, stats in self.worker_stats.items():
print("Worker{}: processed{}jobs in{}secs".format(
worker, stats.job_count, stats.total_time))
print("Total jobs started :{}".format(self.total_jobs_started))
print("Total jobs unfinished:{}".format(self.total_jobs_failed))
print("Total jobs finished :{}".format(self.total_jobs_completed))
print('Total automl time:{}secs'.format(time.time() - self.automl_start_time))
This example shows:
How to use the job context to store job specific data (KEY_JOB_START_TIME)
How to ensure thread safety of common state data stored in the handler with lock
Clara AutoML provides executors for MMAR-based model training on local machines or NGC. However, if you want to do it in a different way, you can write your own executor, while still leveraging the workflow engine’s job management capability.
To create your own executor, you extend the Executor class defined in the module automl.components.executors.executor:
import logging
from abc import abstractmethod, ABC
from automl.defs import Context, Recommendation, SearchSpace
class Executor(ABC):
"""
This class defines the abstract behavior required of an Executor.
Executor executes a recommendation, and produces a score.
"""
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
@abstractmethod
def execute(self, recommendation: Recommendation, ctx: Context) -> object:
"""
Execute the specified recommendation and return a score.
This method is called from the Job Thread.
Args:
recommendation: the recommendation to be executed
ctx: the context that enables cross-component data sharing
Returns: an object that represents the score
"""
pass
@abstractmethod
def determine_search_space(self, ctx: Context) -> SearchSpace:
"""
This method is called by the AutoML engine to determine the AutoML search space.
This method is called from the Main Thread.
Args:
ctx: the context that enables cross-component data sharing
Returns: a search space
"""
pass
def abort(self, ctx: Context):
"""
This method is called to abort execution immediately.
This method is called from the Main Thread.
Args:
ctx: the context that enables cross-component data sharing
"""
pass
def shutdown(self, ctx: Context):
"""
This method is called to shutdown the executor. This is called at the end of the AutoML
workflow.
This method is called from the Main Thread.
Args:
ctx: the context that enables cross-component data sharing
"""
pass
The ctx to all these methods is job context.
Make sure you understand which thread each method runs in and ensure thread safety of your implementation!
AutoML executor example
The following example shows a dummy executor, which simply returns a randomly generated score for a recommendation after sleeping for a few seconds:
import random
import time
from automl.components.executors.executor import Executor
from automl.defs import Recommendation, Context, SearchSpace, SearchRange, SearchResult, Outcome, ContextKey
class TestExecutor(Executor):
def __init__(self):
Executor.__init__(self)
def execute(self, recommendation: Recommendation, ctx: Context) -> object:
job_name = ctx.get_prop(ContextKey.JOB_NAME)
worker_name = ctx.get_prop(ContextKey.WORKER_NAME)
print("Worker{}: executing job{}...".format(worker_name, job_name))
secs_to_sleep = random.randrange(5, 10)
print("Worker{}: job{}sleep{}secs".format(worker_name, job_name, secs_to_sleep))
time.sleep(secs_to_sleep)
score = random.uniform(0.0, 1.0)
print("Worker{}: finished job{}with score{}".format(worker_name, job_name, score))
return score
def determine_search_space(self, ctx: Context) -> SearchSpace:
return SearchSpace("test",
{
"lr.float": [SearchRange(0.001, 0.002)]
})
If you have a completely different way of doing AutoML, you can create your own AutoML system while still leveraging the workflow engine’s job management system.
The following is an end-to-end example:
import argparse
import random
import time
from automl.components.executors.executor import Executor
from automl.components.controllers.controller import Controller
from automl.components.handlers.handler import Handler
from automl.defs import Recommendation, Context, SearchSpace, SearchRange, SearchResult, Outcome, ContextKey
from automl.workflows.engine import WorkerConfig, Engine
class TestExecutor(Executor):
def __init__(self):
Executor.__init__(self)
def execute(self, recommendation: Recommendation, ctx: Context) -> object:
job_name = ctx.get_prop(ContextKey.JOB_NAME)
worker_name = ctx.get_prop(ContextKey.WORKER_NAME)
print("Worker{}: executing job{}...".format(worker_name, job_name))
secs_to_sleep = random.randrange(5, 10)
print("Worker{}: job{}sleep{}secs".format(worker_name, job_name, secs_to_sleep))
time.sleep(secs_to_sleep)
# excep = random.randrange(1, 10)
# if excep == 1:
# print("Worker {}: crash job {}".format(worker_name, job_name))
# y = 1 / 0 # simulate exception
score = random.uniform(0.0, 1.0)
print("Worker{}: finished job{}with score{}".format(worker_name, job_name, score))
return score
def determine_search_space(self, ctx: Context) -> SearchSpace:
return SearchSpace("test",
{
"lr.float": [SearchRange(0.001, 0.002)]
})
class TestController(Controller):
def __init__(self, total_recs, max_recs_each_time):
Controller.__init__(self)
self.max_num_recs = total_recs
self.max_recs_each_time = max_recs_each_time
self.num_recs_done = 0
self.space = None
def set_search_space(self, space: SearchSpace, ctx: Context):
self.space = space
def initial_recommendation(self, ctx: Context) -> [Recommendation]:
num_recs = random.randint(1, self.max_recs_each_time)
recs = []
for i in range(num_recs):
result = SearchResult(space=self.space,
values={
'lr.float': random.uniform(0.001, 0.002)
})
recs.append(Recommendation(self.num_recs_done, result))
self.num_recs_done += 1
print("num_recs_done:{}".format(self.num_recs_done))
return recs
def refine_recommendation(self, outcome: Outcome, ctx: Context) -> [Recommendation]:
# must return a list of Recommendation and whether to stop
if self.num_recs_done >= self.max_num_recs:
return []
return self.initial_recommendation(ctx)
class TestHandler(Handler):
def __init__(self):
Handler.__init__(self)
def handle_event(self, event: str, ctx: Context):
print("TestHandler: got event{}".format(event))
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--total_recs', '-t', type=int, default=20,
help='total number of recommendations to produce')
parser.add_argument('--max_recs_each_time', '-e', type=int, default=5,
help='max number of recommendations to produce each time')
parser.add_argument('--num_workers', '-w', type=int, default=2,
help='max number of workers')
args = parser.parse_args()
ctx = Context()
executor = TestExecutor()
controller = TestController(args.total_recs, args.max_recs_each_time)
handler = TestHandler()
worker_configs = []
for i in range(args.num_workers):
worker_configs.append(WorkerConfig('W{}'.format(i+1), [i]))
engine = Engine(worker_configs, controller, executor, [handler])
engine.run(ctx)
engine.shutdown(ctx)
if __name__ == '__main__':
main()