fed_learn.client package

class FedAdminAgent(client_name, sender: fed_learn.client.admin.Sender, app_ctx, req_poll_interval=0.5, process_poll_interval=0.1)

Bases: object

register_processor(processor: fed_learn.client.admin.RequestProcessor)
shutdown()

To be called by the Client Engine to gracefully shutdown the agent. :return:

start()
class RequestProcessor

Bases: object

The RequestProcessor is responsible for processing a request.

get_topics() → [<class ‘str’>]

Get topics that this processor will handle :return: list of topics

process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message

Called to process the specified request :param req: :param app_ctx: :return: a reply message

class Sender

Bases: object

The Sender object integrate the agent with the underline messaging system. Make sure its methods are exception-proof!

retrieve_requests() → [<class ‘fed_learn.admin_defs.Message’>]

Send the message to retrieve pending requests from the Server :return:

send_reply(reply: fed_learn.admin_defs.Message)

Send the reply to the requester. :param reply: :return:

send_result(message: fed_learn.admin_defs.Message)

Send the processor results to server. :param reply: :return:

class AdminMessageSender(client_name, root_cert=None, ssl_cert=None, private_key=None, server_args=None, secure=False, is_multi_gpu=False, rank=0)

Bases: fed_learn.client.admin.Sender

retrieve_client_requests(taskname)
retrieve_requests() → [<class ‘fed_learn.admin_defs.Message’>]

Send the message to retrieve pending requests from the Server :return:

send_client_reply(message, taskname)
send_reply(message: fed_learn.admin_defs.Message)

Send the reply to the requester. :param reply: :return:

send_result(message: fed_learn.admin_defs.Message)

Send the processor results to server. :param reply: :return:

class ClientAdminInterface(client, client_name, sender, args, rank, workers=5)

Bases: object

abort_client()
client_status()
delete_run_number(num)
deploy_mmar(data)
do_validate(req: fed_learn.admin_defs.Message)
remove_custom_path()
restart_client()
set_agent(admin_agent)
shutdown_client()
start_client(run_number)
start_mgpu_client(run_number, gpu_number)
wait_process_complete()
wait_training_process_finish()
class ClientExecutor

Bases: object

abort_train(client)

To abort the running client. :param client: the FL client object. :return: N/A

check_status(client)

check the status of the running client. :param client: the FL client object. :return: running FL client status message.

start_mgpu_train(client, args, mmar_root, gpu_number)

start the FL client training using multi-GPU. :param client: the FL client object. :param args: admin command arguments for starting the FL client training. :param mmar_root: the root folder of the running MMAR. :param gpu_number: number of GPUs to run FL training :return:

start_train(client, args, mmar_root)

start_train method to start the FL client training. :param client: the FL client object. :param args: admin command arguments for starting the FL client training. :param mmar_root: the root folder of the running MMAR. :return:

class ProcessExecutor

Bases: fed_learn.client.client_executor.ClientExecutor

abort_train(client)

To abort the running client. :param client: the FL client object. :return: N/A

check_status(client)

check the status of the running client. :param client: the FL client object. :return: running FL client status message.

close()
start_mgpu_train(client, args, mmar_root, gpu_number)

start the FL client training using multi-GPU. :param client: the FL client object. :param args: admin command arguments for starting the FL client training. :param mmar_root: the root folder of the running MMAR. :param gpu_number: number of GPUs to run FL training :return:

start_train(client, args, mmar_root)

start_train method to start the FL client training. :param client: the FL client object. :param args: admin command arguments for starting the FL client training. :param mmar_root: the root folder of the running MMAR. :return:

wait_process_complete(client)
wait_training_process_finish(client)
class ThreadExecutor(client, executor)

Bases: fed_learn.client.client_executor.ClientExecutor

abort_train(client)

To abort the running client. :param client: the FL client object. :return: N/A

check_status(client)

check the status of the running client. :param client: the FL client object. :return: running FL client status message.

start_mgpu_train(client, args, mmar_root, gpu_number)

start the FL client training using multi-GPU. :param client: the FL client object. :param args: admin command arguments for starting the FL client training. :param mmar_root: the root folder of the running MMAR. :param gpu_number: number of GPUs to run FL training :return:

start_train(client, args, mmar_root)

start_train method to start the FL client training. :param client: the FL client object. :param args: admin command arguments for starting the FL client training. :param mmar_root: the root folder of the running MMAR. :return:

listen_command(client, conn)
update_client_properties(client, trainer, cross_site_val_conf=None)

Server and client side model handlers

class ClientModelManager(task_names=None, exclude_vars=None, privacy=None, model_reader_writer=None, model_validator=None, visualizer=None)

Bases: object

Client-side model manager lives on client’s local machine.

Initialization of general params inside a cient model manager.

Parameters

task_names – a list of hashable, each uniquely defines a remote server

assign_current_model(remote_models)

Save model from remote model by model writer.

Parameters

remote_models – a ModelData message

Returns

True if the local model changed

close()

Clear up everything.

get_best_model()
get_current_model_data(local_model_params=None)

Save the local_model values by model writer.

Parameters

local_model – Should be a dict of numpy.ndarray with weights values.

get_fitter()
model_meta(task_name)

task meta data, should be consistent with the server’s

model_vars(task_name)

task variables, should be a subset of the server’s

num_local_iterations()

Number of local training iterations per federated round

Returns

1 if fitter not set else value extracted from fitter

read_current_model(task_name, type_str='delta_w')

Read variables from the model reader. Also it computes the model diff values.

Returns

a ModelData message to be sent to server

set_fitter(fitter)
train()

Proxy API to start fitter train.

update_current_model(local_model_params)

Save the local_model values by model writer.

Parameters

local_model – Should be a dict of numpy.ndarray with weights values.

validate_model(cross_site_val_configer, checkpoint)
class ClientStatus

Bases: object

TRAINING_EXCEPTION = 4
TRAINING_NOT_STARTED = 0
TRAINING_STARTED = 2
TRAINING_STARTING = 1
TRAINING_STOPPED = 3
status_messages = {0: 'training not started', 1: 'training starting', 2: 'training started', 3: 'training stopped', 4: 'training exception'}
get_status_message(status)
class Communicator(model_manager, ssl_args=None, secure_train=False, retry_timeout=30, pre_processors: [ ] = None , post_processors: [ ] = None , client_state_processors: [ ] = None , compression=None)

Bases: object

client_registration(uid, servers, task_name)

Client’s meta data used to authenticate and communicate.

Returns

a ClientLogin message.

getModel(servers, task_name, token, app_ctx)

Get registered with the remote server via channel, and fetch the server’s model parameters.

Parameters

task_name – server identifier string

Returns

a CurrentModel message from server

get_client_state(task_name, token, app_ctx=None)

Client’s meta data used to authenticate and communicate.

Returns

a ClientState message.

get_validation_models(uid, servers, task_name, token)

Send a request to server to get models for cross validation.

Parameters

task_name – server identifier string

Returns

a CurrentModel message from server

grpc_error_handler(service, grpc_error, action, start_time, retry, verbose=False)

Handling grpc exceptions :param action: :param start_time: :param service:

quit_remote(servers, task_name, token)

Sending the last message to the server before leaving.

Parameters

task_name – server task identifier

Returns

server’s reply to the last message

send_heartbeat(servers, task_name, token)
set_up_channel(channel_dict, token=None)

Connect client to the server.

Parameters
  • channel_dict – grpc channel parameters

  • token – client token

Returns

an initialised grpc channel

submitUpdate(servers, task_name, token, fl_ctx: fed_learn.model_meta.FLContext, app_ctx, uid)
submit_best_local_model(uid, servers, task_name, token, disable=False)

Submit the best local model to server.

Parameters
  • task_name – server task identifier

  • disable – Send an empty message (signals not participating.)

Returns

server’s reply to the last message.

submit_cross_site_validation_results(uid, servers, task_name, token, client_names, client_metrics)

Submit results of cross validation to server.

Parameters
  • client_names – List of client names

  • client_metrics – List of dicts. Each dict is set of metric names and values.

class DataAssembler

Bases: object

get_best_local_model(model_manager, task_name, client_state, disable=False)

Creates a message containing best model files.

get_contribution_data(model_manager, task_name, client_state, pre_processors: [ ], fl_ctx: fed_learn.model_meta.FLContext, app_ctx )

The client of the federated training process

class FederatedClient(client_id, client_args, secure_train, server_args=None, exclude_vars=None, privacy=None, retry_timeout=30, cross_site_validate=False, model_reader_writer: fed_learn.components.model_reader_writer.ModelProcessor = None, model_validator: fed_learn.components.model_validator.ModelValidator = None, pre_processors: [ ] = None , post_processors: [ ] = None , client_state_processors: [ ] = None , req_processors=None, handlers: [ ] = None , compression=None)

Bases: fed_learn.client.fed_client_base.FederatedClientBase

Federated client-side implementation.

admin_run(fitter)
cross_validation(client_local_rank)
federated_step(fitter)

Run a federated step.

run(fitter)

Run the client-side as a thread

run_federated_steps(fitter)

Keep running federated steps.

The client of the federated training process

class FederatedClientBase(client_id, client_args, secure_train, server_args=None, exclude_vars=None, privacy=None, retry_timeout=30, cross_site_validate=False, model_reader_writer: fed_learn.components.model_reader_writer.ModelProcessor = None, model_validator: fed_learn.components.model_validator.ModelValidator = None, pre_processors: [ ] = None , post_processors: [ ] = None , client_state_processors: [ ] = None , req_processors=None, handlers: [ ] = None , compression=None)

Bases: object

Federated client-side base implementation. This class provide the tools function which will be used in both FedClient and FedClientLite.

property app_context
check_progress(remote_models)
client_register(task_name)
close()

Quit the remote federated server, close the local session.

fetch_remote_model(task_name)

Get registered with the remote server via channel, and fetch the server’s model parameters.

Parameters

task_name – server identifier string

Returns

a CurrentModel message from server

get_validation_models(task_name)

Gets the best models of other clients from server to run cross-validation.

heartbeat()

Sends a heartbeat from the client to the server.

pull_models()

Fetch remote models and update the local client’s session.

push_best_model(task_name)

Pushes the best local model to server. This function sends a LocalModel message.

Parameters

task_name – Should be one of the keys to self.servers

push_best_models()

Push the best local model to multiple servers.

push_models()

Push the local model to multiple servers.

push_remote_model(task_name)

Read local model and push to self.server[task_name] channel. This function makes and sends a Contribution Message.

Parameters

task_name – should be one of the keys of self.server

quit_remote(task_name)

Sending the last message to the server before leaving.

Parameters

task_name – server task identifier

Returns

server’s reply to the last message

register()

Push the local model to multiple servers.

run_cross_site_validation()

Run cross validation

run_heartbeat()

Periodically runs the heartbeat.

send_heartbeat(task_name)
start_heartbeat()
submit_cross_site_validation_results(params)

The client of the federated training process

class DirectFederatedClient(client_id, client_args, secure_train, server_args=None, exclude_vars=None, privacy=None, retry_timeout=30, train_context=None, model_reader_writer: fed_learn.components.model_reader_writer.ModelProcessor = None, pre_processors: [ ] = None , post_processors: [ ] = None , client_state_processors: [ ] = None , req_processors=None, handlers: [ ] = None , train_options=None)

Bases: fed_learn.client.fed_client_base.FederatedClientBase

Federated client-side implementation

property app_context
close_connect()

End connection from server. Should be only called by main thread in multi-gpu case.

connect_to_server(options={'heart_beat': True})

Connect to server. Should be only called by main thread in multi-gpu case.

in_training_progress()
pull_models_from_server(options=None)

Pull models from server. Should be only called by main thread in multi-gpu case.

Parameters

options – options when pulling model from server

Returns

Get model data in intermediate format(dict)

push_models_to_server(model_data, options=None)

Push models from server. Should be only called by main thread in multi-gpu case.

Parameters

model_data – dict-formated model data to push to server

privacy preserving modules

class LaplacianProtocol(fraction=0.1, epsilon=1000, gamma=0.001, tau=0.0001)

Bases: fed_learn.client.fed_privacy.PrivacyProtocol

implementation of “random with thresholding” privacy preserving policy

Shokri and Shmatikov, Privacy-preserving deep learning, CCS ‘15

apply(model_diff, train_ctx=None)

differentially private DSSGD.

Parameters
  • model_diff – delta w after a round of local training

  • train_ctx – TLT training context.

Returns

diff model to be uploaded to the server

class PercentileProtocol(percentile=10, gamma=0.01)

Bases: fed_learn.client.fed_privacy.PrivacyProtocol

implementation of “largest percentile to share” privacy preserving policy

Shokri and Shmatikov, Privacy-preserving deep learning, CCS ‘15

apply(model_diff, train_ctx=None)

compute the percentile on the abs delta_W, only share the params where absolute delta_W greater than the percentile value

# TODO dump the values for visualisation :param model_diff: model after a round of local training :param train_ctx: unused param. :return: model data to be uploaded to the server

class PrivacyProtocol

Bases: abc.ABC

abstract apply(model_diff, train_ctx)

compute model data to be uploaded to the server.

Returns

model dictionary {var_name: numpy_var_value}

class SVTProtocol(fraction=0.1, epsilon=0.1, noise_var=0.1, gamma=1e-05, tau=1e-06)

Bases: fed_learn.client.fed_privacy.PrivacyProtocol

implementation of the standard SVT differential privacy algorithm.

W, Li. et, al “Privacy-preserving Federated Brain Tumor Segmentation”, arXiv preprint arXiv:1910.00962 (2019)

apply(model_diff, train_ctx=None)

differentially private DSSGD.

Parameters
  • model_diff – delta w after a round of local training

  • train_ctx – TLT training context.

Returns

diff model to be uploaded to the server

class TestFederatedClient(client_id, client_args, secure_train, server_args=None, exclude_vars=None, privacy_params=None, req_processors=None)

Bases: fed_learn.client.fed_client.FederatedClient

client_login(task_name)
class ShellCommandProcessor

Bases: fed_learn.client.admin.RequestProcessor

get_topics() → [<class ‘str’>]

Get topics that this processor will handle :return: list of topics

process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message

Called to process the specified request :param req: :param app_ctx: :return: a reply message

class SysInfoProcessor

Bases: fed_learn.client.admin.RequestProcessor

get_topics() → [<class ‘str’>]

Get topics that this processor will handle :return: list of topics

process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message

Called to process the specified request :param req: :param app_ctx: :return: a reply message

class AbortClientProcessor

Bases: fed_learn.client.admin.RequestProcessor

get_topics() → [<class ‘str’>]

Get topics that this processor will handle :return: list of topics

process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message

Called to process the specified request :param req: :param app_ctx: :return: a reply message

class ClientStatusProcessor

Bases: fed_learn.client.admin.RequestProcessor

get_topics() → [<class ‘str’>]

Get topics that this processor will handle :return: list of topics

process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message

Called to process the specified request :param req: :param app_ctx: :return: a reply message

class DeleteRunNumberProcessor

Bases: fed_learn.client.admin.RequestProcessor

get_topics() → [<class ‘str’>]

Get topics that this processor will handle :return: list of topics

process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message

Called to process the specified request :param req: :param app_ctx: :return: a reply message

class DeployProcessor

Bases: fed_learn.client.admin.RequestProcessor

get_topics() → [<class ‘str’>]

Get topics that this processor will handle :return: list of topics

process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message

Called to process the specified request :param req: :param app_ctx: :return: a reply message

class RestartClientProcessor

Bases: fed_learn.client.admin.RequestProcessor

get_topics() → [<class ‘str’>]

Get topics that this processor will handle :return: list of topics

process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message

Called to process the specified request :param req: :param app_ctx: :return: a reply message

class ShutdownClientProcessor

Bases: fed_learn.client.admin.RequestProcessor

get_topics() → [<class ‘str’>]

Get topics that this processor will handle :return: list of topics

process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message

Called to process the specified request :param req: :param app_ctx: :return: a reply message

class StartClientMGpuProcessor

Bases: fed_learn.client.admin.RequestProcessor

get_topics() → [<class ‘str’>]

Get topics that this processor will handle :return: list of topics

process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message

Called to process the specified request :param req: :param app_ctx: :return: a reply message

class StartClientProcessor

Bases: fed_learn.client.admin.RequestProcessor

get_topics() → [<class ‘str’>]

Get topics that this processor will handle :return: list of topics

process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message

Called to process the specified request :param req: :param app_ctx: :return: a reply message

class ValidateRequestProcessor

Bases: fed_learn.client.admin.RequestProcessor

get_topics() → [<class ‘str’>]

Get topics that this processor will handle :return: list of topics

process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message

Called to process the specified request :param req: :param app_ctx: :return: a reply message

© Copyright 2020, NVIDIA. Last updated on Feb 2, 2023.