AutoML programming guide

Introduction

As described in Clara AutoML high-level design, Clara AutoML is a platform for researchers to perform AutoML experimentation. As a researcher on this platform, you will be able to:

  1. Configure your search space in the training config of Clara Train, pick an existing Controller implementation, and let the platform do everything for you

  2. Develop and use your own Controller algorithm and let the platform take care of the rest

  3. Expand the functionalities of the AutoML workflow by creating your own handlers

  4. Write your own Executor to make the AutoML workflow engine work with your own way of model training, while still leveraging the engine’s powerful job management system

You should be able to do use case 1 after reading Clara AutoML high-level design. This programming guide will walk you through how to do use cases 2, 3, and 4.

You should first read Clara AutoML high-level design and be familiar with the following concepts:

  • Purpose of AutoML

  • Parameter Range Locator (PRL) and search range definition

  • Search Space

  • Controller

  • Executor

  • Handler

  • Workflow Engine and workflow logic

AutoML Context

Before detailed discussion about the use cases, the first concept that is crucial to all components is the AutoML Context.

The AutoML process is a data-based collaboration among components (controller, executor, handlers, workflow engine, etc.). On the one hand, these components directly interact with each other through programmed interaction patterns (e.g. outcome generated by the executor will be fed to the controller for refined recommendations). On the other hand, the components can also interact with each other via indirect data sharing (e.g. a piece of data generated by a component can be desired by another component that has no direct programmed interaction to compile statistics). The AutoML Context makes this data sharing possible - different components can set and get properties from the AutoML Context.

API specification

Class path

automl.defs.Context

Property access

With set_prop(), you can set any object to the context by providing a key and the value of the object.

def set_prop(self, key: str, value)

With get_prop(), you can get the object with the key from the context. If the specified property does not exist, the specified default value is returned.

def get_prop(self, key: str, default=None)

Report exception

With exception_occurred(), you can tell the workflow engine that an exception has occurred in your component’s processing. It is up to the workflow engine how to react to this report.

def exception_occurred(self, msg=None)

Note

Currently the engine will stop the workflow upon any exception report from any component, but this rule may change if necessary.

Ask to stop work

You can call the stop_work() method to request the engine to stop the workflow. You should provide a string for “msg” to specify the reason for the request.

def stop_work(self, msg: str)

Note

Currently the engine will stop the workflow when the stop_work request is received from any component, but this rule may change if necessary.

With stop_work_asked(), you can check whether a stop_work has been asked.

def stop_work_asked(self) -> bool

With get_stop_message(), you can get the stop message from the context.

def get_stop_message(self)

Predefined properties

The class automl.defs.ContextKey defines some property keys reserved for the workflow engine. You can use these keys in your code to get the properties you are interested in.

class ContextKey(object):
    """
    Defines a set of standard property keys.
    """

    RECOMMENDATIONS = "_recommendations"
    SCORE = "_score"
    JOB_STATUS = "_jobStatus"
    JOB_NAME = "_jobName"
    WORKER_NAME = "_workerName"
    SEARCH_SPACE = "_searchSpace"
    GPUS = "_gpus"
    NUM_WORKERS = "_numWorkers"

Attention

If you want to store your own property into the context, make sure that your property key does not conflict with these predefined keys!

As a general rule, property keys beginning with the underscore character (_) are reserved. Name your properties without the underscore prefix to avoid name conflicts.

SearchSpace

If you write your own controller or executor, you have to deal with SearchSpace.

The following classes are defined in the module automl.defs:

class SearchRange(object):
    """
    A SearchRange is the basic building block of search space.

    Args:
        minv: min value of the range.
        maxv: max value of the range. If not specified, it is set to the same value as minv

    Returns:
        A SearchRange object
    """

    def __init__(self, minv, maxv=None):
        pass


class SearchSpace(object):
    """
    SearchSpace defines the space of candidates for searchable parameters.
    It is a dictionary of PRL (Parameter Range Locator) to a list of SearchRanges.
    Each PRL represents a unique search parameter.

    Args:
        name: name of the search space.
        targets: the dict of PRL => [SearchRange]

    Returns:
        A SearchSpace object
    """

    def __init__(self, name: str, targets: dict):
        pass

Create SearchSpace

As shown above, to create a SearchSpace, you specify a name and a dictionary that maps PRLs to lists of SearchRanges.

The name is an arbitrary string. Currently it is not used by the system.

Each PRL maps to a list that contains one or more SearchRanges, depending on the date type of the PRL (enum or float).

For float type, the list must contain a single SearchRange, defining the min and max values of the range;

For enum type, the list can contain any number of SearchRanges, each defining a choice. Each SearchRange is defined with the same min and max integer number, starting from 0. For example, to define an enum with 4 choices, the SearchRange list should be like this:

[SearchRange(0,0), SearchRange(1,1), SearchRange(2,2), SearchRange(3,3)]

Parameter Range Locator (PRL)

A Parameter Range Locator (PRL) is a formatted string that uniquely represents a search parameter in the search space. The module automl.prl provides definitions and convenience functions that you may find useful for PRL processing:

"""
Defines the format and convenience functions of PRL.

PRL (Parameter Range Locator - modeled after URL)
Format: domain.type[.extra]
"""


class Domain(object):
    """
    Defines support domains.
    """

    NET = 'net'
    LEARNING_RATE = 'lr'
    TRANSFORM = 'transform'


class PType(object):
    """
    Defines supported parameter data types
    """

    FLOAT = 'float'
    ENUM = 'enum'


SUPPORTED_PARAM_DOMAIN = [Domain.NET, Domain.LEARNING_RATE, Domain.TRANSFORM]
SUPPORTED_PARAM_TYPE = [PType.ENUM, PType.FLOAT]


def make_prl(domain: str, ptype: str, rest: str="") -> str:
    """
    Make a PRL from specified PRL elements

    Args:
        domain: domain of the PRL
        ptype: parameter data type
        rest: extra items

    Returns: a PRL

    """
    pass


def validate_prl(prl: str):
    """
    Validate a specified prl.

    Args:
        prl: the PRL to be validated

    Returns: error message if not valid; None if valid.

    """
    pass


def split_prl(prl: str) -> [str]:
    """
    Split the PRL into list of elements

    Args:
        prl: the PRL to be split

    Returns: list of elements

    """
    pass

Search result and recommendations

Controller generates recommendations. The Recommendation class defines the structure of recommendations, which includes a recommendation id (rec_id) and a SearchResult.

Note

Recommendation id is generated by the Controller to uniquely identify the recommendation. It can be of any object type.

The classes are defined in the module automl.defs:

class SearchResult(object):
    """
    SearchResult is a set of values from the search space.

    Args:
        space: the SearchSpace that the values are from.
        values: the dict of PRL => value

    Returns:
        A SearchResult object

    """

    def __init__(self, space: SearchSpace, values: dict):
        assert isinstance(space, SearchSpace), "space must be SearchSpace"
        assert isinstance(values, dict), "values must be dict"

        for k in values.keys():
            assert (k in space.targets), "value key {} not in space".format(k)

        self.space = space
        self.values = values


class Recommendation(object):
    """
    Recommendation is a recommendation for AutoML execution.

    Args:
        rec_id: the unique ID of the recommendation.
        result: the search result
    """

    def __init__(self, rec_id, result: SearchResult):
        self.rec_id = rec_id
        self.result = result

Outcome - result of recommendation execution

Once a recommendation is executed, the result is represented as an Outcome object, in the module automl.defs:

class Status(object):

    OK = 0
    ERROR = -1
    EXCEPTION = -2
    UNFINISHED = -3


class Outcome(object):
    """
    Outcome represents the execution result of a recommendation

    Args:
        rec_id: the ID of the recommendation executed
        status: job completion state
        score: the score produced by the recommendation execution
        job_ctx: the context used for the job execution

    Note: score is conceptual. It could be a simple number or an object of any type.
    """

    def __init__(self, rec_id, status: Status, score, job_ctx: Context=None):
        self.recommendation_id = rec_id
        self.status = status
        self.score = score
        self.job_ctx = job_ctx

Attention

As described above, the ctx enables data sharing among AutoML components. In the multithreaded workflow environment (see below), each job is executed in a separate thread and is given a separate context! This means that the job context is only accessible from a single job execution thread.

Understanding AutoML workflow

Before starting to write your AutoML components, you must understand the running environment that the components live in - the AutoML workflow.

As described in Clara AutoML high-level design, AutoML workflow manages the interactions between the Controller and the Executor. In addition, it implements a multithreaded job management system that runs the jobs in parallel. To work properly in this multithreaded environment, you should take care to ensure the thread safety of your implementation because all components (Executor, Controller, Handlers) are singleton objects.

There are three kinds of threads during the workflow execution:

  • The main thread - the Engine’s execution thread. The Controller only runs in this thread.

  • The scheduler thread - monitors and schedules job executions.

  • The job execution thread - executes the job. There can be many such threads. The Executor’s execute() method runs in this thread.

Note

Some methods of the executor and handlers objects are run in multiple job threads. You should be careful when storing state data in these objects. In general, if the state data is specific to a single job execution, you should store it in the job context. But if the data is related to all job execution (e.g. you are collecting cross-job stats), you should ensure the integrity of the data when updated from multiple threads in parallel.

Write your own controller

The controller is the “brain” of AutoML - it decides how to produce recommendations to find the best model possible.

You can write your own controller and use it on the AutoML platform. Your controller class must extend the Controller class, defined in the module automl.components.controllers.controller:

Attention

Controller methods are only run in the Main Thread.

from abc import abstractmethod, ABC
import logging
from automl.defs import Context, SearchSpace, Recommendation, Outcome


class Controller(ABC):
    """
    This class defines the abstract behavior required of an AutoML Controller.

    Controller implements the AutoML strategy that decides how the training is to be conducted.
    Controller produces recommendations by finding values from a search space with some algorithm.
    """

    def __init__(self):
        self.logger = logging.getLogger(self.__class__.__name__)

    @abstractmethod
    def set_search_space(self, space: SearchSpace, ctx: Context):
        """
        Set the search space.
        This is the search space that the controller will search against to produce recommendations.
        The controller must keep it for later use.

        Args:
            space: the search space
            ctx: the context that enables across-component data sharing and communication

        Returns:

        NOTE: the controller should validate the search space and makes sure it is acceptable.
        In case the search space is not acceptable, the controller should either raise an exception
        or ask to stop the workflow by calling: ctx.ask_to_stop().

        """
        pass

    @abstractmethod
    def initial_recommendation(self, ctx: Context) -> [Recommendation]:
        """
        This method is called by the AutoML workflow engine to produce the initial set of recommendations.
        The controller must produce 1 or more recommendations.
        If no recommendation is produced, the AutoML workflow will stop immediately.

        This method is called only once at the beginning of the AutoML process.

        Args:
            ctx: the context that enables across-component data sharing and communication

        Returns: a list of recommendations

        """
        pass

    @abstractmethod
    def refine_recommendation(self, outcome: Outcome, ctx: Context) -> [Recommendation]:
        """
        This method is called by the AutoML workflow engine to produce a set of recommendations based
        on the result from a previous job.

        The controller can produce 0 or more recommendations.

        This method is called every time a job finishes executing a previous recommendation.

        Args:
            outcome: the result of executing the previous recommendation
            ctx: the context that enables across-component data sharing and communication

        Returns: a list of recommendations, could be empty

        """
        pass

    def shutdown(self, ctx: Context):
        """
        Called at the end of the AutoML workflow. This provides the opportunity for the controller
        to clean up if needed.

        Args:
            ctx: the context that enables across-component data sharing and communication
        """
        pass

AutoML controller example

The following example implements a dummy controller that simply returns randomly generated recommendations for a single PRL “lr.float” (which is for learning rate). In reality, your controller should produce recommendations for all PRLs in the search space.

When creating this controller, you specify:

  • total_recs - the total number of recommendations that controller will generate. If this number is reached, the controller will stop generating new recommendations.

  • max_recs_each_time - the max number of recommendations the controller will generate each time. This controller will generate a random number of recs that is less than the specified max.

from automl.components.controllers.controller import Controller
from automl.defs import Recommendation, Context, SearchSpace, SearchResult, Outcome
import random


class DummyController(Controller):

   def __init__(self, total_recs, max_recs_each_time):
       Controller.__init__(self)
       self.max_num_recs = total_recs
       self.max_recs_each_time = max_recs_each_time
       self.num_recs_done = 0
       self.space = None

   def set_search_space(self, space: SearchSpace, ctx: Context):
       self.space = space

   def initial_recommendation(self, ctx: Context) -> [Recommendation]:
       num_recs = random.randint(1, self.max_recs_each_time)
       recs = []
       for i in range(num_recs):
           result = SearchResult(space=self.space,
                                 values={
                                     'lr.float': random.uniform(0.0001, 0.001)
                                 })
           recs.append(Recommendation(self.num_recs_done, result))
           self.num_recs_done += 1
       print("num_recs_done: {}".format(self.num_recs_done))
       return recs

   def refine_recommendation(self, outcome: Outcome, ctx: Context) -> [Recommendation]:
       # must return a list of Recommendation
       if self.num_recs_done >= self.max_num_recs:
           return []
       return self.initial_recommendation(ctx)

Write handlers

You write handlers to extend the functionalities of AutoML. For example, you may want to log the produced search space or recommendations to a file, perform analysis on the execution results, write them to a separate system, etc.

To fulfill these needs, you can write handlers to handle the events you are interested in.

Your handler class must extend the Handler class defined in module automl.components.handlers.handler, shown below:

import logging
import traceback
from automl.defs import Context, EventType


class Handler(object):
    """
    This class defines the abstract behavior required of a Handler.

    A handler is an object that listens to certain AutoML workflow events and takes actions on such events.

    """

    def __init__(self):
        self.logger = logging.getLogger(self.__class__.__name__)

    def startup(self, ctx: Context):
        """
        The handler is being started up.
        Use this method to initialize the handler based on info in the context.

        NOTE: this method is called in the order of handler chain. If your handler depends on some
        info provided by previous handlers, you can get such info from the ctx.

        Args:
            ctx: main context
        """
        pass

    def shutdown(self, ctx: Context):
        """
        The handler is being shutdown.
        Use this method to clean up the handler.

        NOTE: this method is called in the reverse order of handler chain.
        If your handler depends on other handlers, your handler will be shutdown before the handlers
        you depend on.

        Args:
            ctx:
        """
        pass

    def start_automl(self, ctx: Context):
        """
        AUTOML process is about to start. Called from the engine thread.

        Args:
            ctx: the main context
        """
        pass

    def end_automl(self, ctx: Context):
        """
        AUTOML process has ended. Called from the engine thread.
        Args:
            ctx: the main context
        """
        pass

    def start_job(self, ctx: Context):
        """
        The job execution is about to start. Called from the job thread.

        NOTE: this method could be called from multiple job threads at the same time. If you want
        to store across-job state data in the handler, you should ensure thread safety when updating
        such state data.

        NOTE: the ctx is a per-job context, hence it is not subject to multi-thread access.
        Consider using the ctx to store per-job state data.

        Args:
            ctx: the job context
        """
        pass

    def end_job(self, ctx: Context):
        """
        The job execution has ended. Called from the job thread.
        See notes in start_job method.
        You can check the job status from prop ContextKey.JOB_STATUS in the ctx.

        NOTE: if the start_job is called, it is guaranteed that end_job will be called.

        Args:
            ctx: the job context
        """
        pass

    def search_space_available(self, ctx: Context):
        """
        The search space is available. Called from the main thread.
        You can get the search space from prop ContextKey.SEARCH_SPACE in the ctx.

        Args:
            ctx: main context
        """
        pass

    def recommendations_available(self, ctx: Context):
        """
        The recommendations are available. Called from the main thread.
        You can get recommendations from prop ContextKey.RECOMMENDATIONS in the ctx.

        Args:
            ctx: main context
        """
        pass

    def handle_event(self, event: str, ctx: Context):
        """
        The default event handler that calls a predefined method for each event type.

        Args:
            event: event to be handled
            ctx: context for cross-component data sharing
        """
        if event == EventType.START_AUTOML:
            self.start_automl(ctx)
        elif event == EventType.START_JOB:
            self.start_job(ctx)
        elif event == EventType.END_JOB:
            self.end_job(ctx)
        elif event == EventType.END_AUTOML:
            self.end_automl(ctx)
        elif event == EventType.RECOMMENDATIONS_AVAILABLE:
            self.recommendations_available(ctx)
        elif event == EventType.SEARCH_SPACE_AVAILABLE:
            self.search_space_available(ctx)


def fire_event(event: str, handlers: list, ctx: Context):
    """
    Fires the specified event and invokes the list of handlers.

    Args:
        event: the event to be fired
        handlers: handlers to be invoked
        ctx: context for cross-component data sharing
    """
    if handlers:
        for h in handlers:
            try:
                h.handle_event(event, ctx)
            except Exception as ex:
                ctx.trace(h, "Exception form handler {}: {}".format(h.__class__.__name__, ex))
                traceback.print_exc()
                ctx.exception_occurred(h)

Attention

Make sure you understand which thread each of these methods is called from. If called from the job thread, you should ensure the thread safety of your implementation.

AutoML handler example

The following is a simple stats handler that collects and computes the stats of each worker:

from .handler import Handler
from automl.defs import Context, ContextKey
import time
import threading


KEY_JOB_START_TIME = 'StatsHandler.jobStartTime'


class WorkerStats(object):

   def __init__(self):
       self.job_count = 0
       self.total_time = 0


class StatsHandler(Handler):
   """
   Defines a simple stats handler that collects job execution stats.

   For each worker, it computes things like the number of jobs started, number of jobs finished and unfinished,
   and total amount of time the worker worked.
   It also shows the total amount of time used by the whole workflow.

   """

   def __init__(self):
       Handler.__init__(self)
       self.automl_start_time = None
       self.worker_stats = {}  # worker name => stats
       self.total_jobs_started = 0
       self.total_jobs_failed = 0
       self.total_jobs_completed = 0
       self.worker_update_lock = threading.Lock()

   def start_automl(self, ctx: Context):
       self.automl_start_time = time.time()

   def recommendations_available(self, ctx: Context):
       recs = ctx.get_prop(ContextKey.RECOMMENDATIONS)
       if recs:
           print('Number of recs: {}'.format(len(recs)))
       else:
           print('No recs')

   def start_job(self, ctx: Context):
       ctx.set_prop(KEY_JOB_START_TIME, time.time())
       self.worker_update_lock.acquire()
       self.total_jobs_started += 1
       self.worker_update_lock.release()

   def end_job(self, ctx: Context):
       job_start = ctx.get_prop(KEY_JOB_START_TIME)
       worker_name = ctx.get_prop(ContextKey.WORKER_NAME)
       self.worker_update_lock.acquire()
       try:
           if ctx.stop_work_asked():
               self.total_jobs_failed += 1
           else:
               self.total_jobs_completed += 1

           if worker_name in self.worker_stats:
               stats = self.worker_stats[worker_name]
           else:
               stats = WorkerStats()
           stats.job_count += 1
           stats.total_time += time.time() - job_start
           self.worker_stats[worker_name] = stats
       except Exception:
           pass

       self.worker_update_lock.release()

   def end_automl(self, ctx: Context):
       for worker, stats in self.worker_stats.items():
           print("Worker {}: processed {} jobs in {} secs".format(
               worker, stats.job_count, stats.total_time))
       print("Total jobs started   : {}".format(self.total_jobs_started))
       print("Total jobs unfinished: {}".format(self.total_jobs_failed))
       print("Total jobs finished  : {}".format(self.total_jobs_completed))
       print('Total automl time: {} secs'.format(time.time() - self.automl_start_time))

This example shows:

  • How to use the job context to store job specific data (KEY_JOB_START_TIME)

  • How to ensure thread safety of common state data stored in the handler with lock

Write your own executor

Clara AutoML provides executors for MMAR-based model training on local machines or NGC. However, if you want to do it in a different way, you can write your own executor, while still leveraging the workflow engine’s job management capability.

To create your own executor, you extend the Executor class defined in the module automl.components.executors.executor:

import logging
from abc import abstractmethod, ABC
from automl.defs import Context, Recommendation, SearchSpace


class Executor(ABC):
    """
    This class defines the abstract behavior required of an Executor.

    Executor executes a recommendation, and produces a score.

    """

    def __init__(self):
        self.logger = logging.getLogger(self.__class__.__name__)

    @abstractmethod
    def execute(self, recommendation: Recommendation, ctx: Context) -> object:
        """
        Execute the specified recommendation and return a score.

        This method is called from the Job Thread.

        Args:
            recommendation: the recommendation to be executed
            ctx: the context that enables cross-component data sharing

        Returns: an object that represents the score

        """
        pass

    @abstractmethod
    def determine_search_space(self, ctx: Context) -> SearchSpace:
        """
        This method is called by the AutoML engine to determine the AutoML search space.
        This method is called from the Main Thread.

        Args:
            ctx: the context that enables cross-component data sharing

        Returns: a search space

        """
        pass

    def abort(self, ctx: Context):
        """
        This method is called to abort execution immediately.
        This method is called from the Main Thread.

        Args:
            ctx: the context that enables cross-component data sharing
        """

        pass

    def shutdown(self, ctx: Context):
        """
        This method is called to shutdown the executor. This is called at the end of the AutoML
        workflow.

        This method is called from the Main Thread.

        Args:
            ctx: the context that enables cross-component data sharing
        """
        pass

Note

The ctx to all these methods is job context.

Attention

Make sure you understand which thread each method runs in and ensure thread safety of your implementation!

AutoML executor example

The following example shows a dummy executor, which simply returns a randomly generated score for a recommendation after sleeping for a few seconds:

import random
import time
from automl.components.executors.executor import Executor
from automl.defs import Recommendation, Context, SearchSpace, SearchRange, SearchResult, Outcome, ContextKey


class TestExecutor(Executor):

    def __init__(self):
        Executor.__init__(self)

    def execute(self, recommendation: Recommendation, ctx: Context) -> object:
        job_name = ctx.get_prop(ContextKey.JOB_NAME)
        worker_name = ctx.get_prop(ContextKey.WORKER_NAME)
        print("Worker {}: executing job {} ...".format(worker_name, job_name))
        secs_to_sleep = random.randrange(5, 10)
        print("Worker {}: job {} sleep {} secs".format(worker_name, job_name, secs_to_sleep))
        time.sleep(secs_to_sleep)
        score = random.uniform(0.0, 1.0)
        print("Worker {}: finished job {} with score {}".format(worker_name, job_name, score))
        return score

    def determine_search_space(self, ctx: Context) -> SearchSpace:
        return SearchSpace("test",
                           {
                               "lr.float": [SearchRange(0.001, 0.002)]
                           })

Create your own AutoML system

If you have a completely different way of doing AutoML, you can create your own AutoML system while still leveraging the workflow engine’s job management system.

The following is an end-to-end example:

import argparse
import random
import time
from automl.components.executors.executor import Executor
from automl.components.controllers.controller import Controller
from automl.components.handlers.handler import Handler
from automl.defs import Recommendation, Context, SearchSpace, SearchRange, SearchResult, Outcome, ContextKey
from automl.workflows.engine import WorkerConfig, Engine


class TestExecutor(Executor):

    def __init__(self):
        Executor.__init__(self)

    def execute(self, recommendation: Recommendation, ctx: Context) -> object:
        job_name = ctx.get_prop(ContextKey.JOB_NAME)
        worker_name = ctx.get_prop(ContextKey.WORKER_NAME)
        print("Worker {}: executing job {} ...".format(worker_name, job_name))
        secs_to_sleep = random.randrange(5, 10)
        print("Worker {}: job {} sleep {} secs".format(worker_name, job_name, secs_to_sleep))
        time.sleep(secs_to_sleep)
        # excep = random.randrange(1, 10)
        # if excep == 1:
        #     print("Worker {}: crash job {}".format(worker_name, job_name))
        #     y = 1 / 0  # simulate exception
        score = random.uniform(0.0, 1.0)
        print("Worker {}: finished job {} with score {}".format(worker_name, job_name, score))
        return score

    def determine_search_space(self, ctx: Context) -> SearchSpace:
        return SearchSpace("test",
                           {
                               "lr.float": [SearchRange(0.001, 0.002)]
                           })


class TestController(Controller):

    def __init__(self, total_recs, max_recs_each_time):
        Controller.__init__(self)
        self.max_num_recs = total_recs
        self.max_recs_each_time = max_recs_each_time
        self.num_recs_done = 0
        self.space = None

    def set_search_space(self, space: SearchSpace, ctx: Context):
        self.space = space

    def initial_recommendation(self, ctx: Context) -> [Recommendation]:
        num_recs = random.randint(1, self.max_recs_each_time)
        recs = []
        for i in range(num_recs):
            result = SearchResult(space=self.space,
                                  values={
                                      'lr.float': random.uniform(0.001, 0.002)
                                  })
            recs.append(Recommendation(self.num_recs_done, result))
            self.num_recs_done += 1
        print("num_recs_done: {}".format(self.num_recs_done))
        return recs

    def refine_recommendation(self, outcome: Outcome, ctx: Context) -> [Recommendation]:
        # must return a list of Recommendation and whether to stop
        if self.num_recs_done >= self.max_num_recs:
            return []
        return self.initial_recommendation(ctx)


class TestHandler(Handler):

    def __init__(self):
        Handler.__init__(self)

    def handle_event(self, event: str, ctx: Context):
        print("TestHandler: got event {}".format(event))


def main():
    parser = argparse.ArgumentParser()

    parser.add_argument('--total_recs', '-t', type=int, default=20,
                        help='total number of recommendations to produce')

    parser.add_argument('--max_recs_each_time', '-e', type=int, default=5,
                        help='max number of recommendations to produce each time')

    parser.add_argument('--num_workers', '-w', type=int, default=2,
                        help='max number of workers')

    args = parser.parse_args()

    ctx = Context()
    executor = TestExecutor()
    controller = TestController(args.total_recs, args.max_recs_each_time)
    handler = TestHandler()
    worker_configs = []

    for i in range(args.num_workers):
        worker_configs.append(WorkerConfig('W{}'.format(i+1), [i]))

    engine = Engine(worker_configs, controller, executor, [handler])
    engine.run(ctx)
    engine.shutdown(ctx)


if __name__ == '__main__':
    main()