fed_learn.client package
-
class
FedAdminAgent(client_name, sender: fed_learn.client.admin.Sender, app_ctx, req_poll_interval=0.5, process_poll_interval=0.1) Bases:
object-
register_processor(processor: fed_learn.client.admin.RequestProcessor)
-
shutdown() To be called by the Client Engine to gracefully shutdown the agent. :return:
-
start()
-
-
class
RequestProcessor Bases:
objectThe RequestProcessor is responsible for processing a request.
-
get_topics() → [<class ‘str’>] Get topics that this processor will handle :return: list of topics
-
process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message Called to process the specified request :param req: :param app_ctx: :return: a reply message
-
-
class
Sender Bases:
objectThe Sender object integrate the agent with the underline messaging system. Make sure its methods are exception-proof!
-
retrieve_requests() → [<class ‘fed_learn.admin_defs.Message’>] Send the message to retrieve pending requests from the Server :return:
-
send_reply(reply: fed_learn.admin_defs.Message) Send the reply to the requester. :param reply: :return:
-
send_result(message: fed_learn.admin_defs.Message) Send the processor results to server. :param reply: :return:
-
-
class
AdminMessageSender(client_name, root_cert=None, ssl_cert=None, private_key=None, server_args=None, secure=False, is_multi_gpu=False, rank=0) Bases:
fed_learn.client.admin.Sender-
retrieve_client_requests(taskname)
-
retrieve_requests() → [<class ‘fed_learn.admin_defs.Message’>] Send the message to retrieve pending requests from the Server :return:
-
send_client_reply(message, taskname)
-
send_reply(message: fed_learn.admin_defs.Message) Send the reply to the requester. :param reply: :return:
-
send_result(message: fed_learn.admin_defs.Message) Send the processor results to server. :param reply: :return:
-
-
class
ClientAdminInterface(client, client_name, sender, args, rank, workers=5) Bases:
object-
abort_client()
-
client_status()
-
delete_run_number(num)
-
deploy_mmar(data)
-
do_validate(req: fed_learn.admin_defs.Message)
-
remove_custom_path()
-
restart_client()
-
set_agent(admin_agent)
-
shutdown_client()
-
start_client(run_number)
-
start_mgpu_client(run_number, gpu_number)
-
wait_process_complete()
-
wait_training_process_finish()
-
-
class
ClientExecutor Bases:
object-
abort_train(client) To abort the running client. :param client: the FL client object. :return: N/A
-
check_status(client) check the status of the running client. :param client: the FL client object. :return: running FL client status message.
-
start_mgpu_train(client, args, mmar_root, gpu_number) start the FL client training using multi-GPU. :param client: the FL client object. :param args: admin command arguments for starting the FL client training. :param mmar_root: the root folder of the running MMAR. :param gpu_number: number of GPUs to run FL training :return:
-
start_train(client, args, mmar_root) start_train method to start the FL client training. :param client: the FL client object. :param args: admin command arguments for starting the FL client training. :param mmar_root: the root folder of the running MMAR. :return:
-
-
class
ProcessExecutor Bases:
fed_learn.client.client_executor.ClientExecutor-
abort_train(client) To abort the running client. :param client: the FL client object. :return: N/A
-
check_status(client) check the status of the running client. :param client: the FL client object. :return: running FL client status message.
-
close()
-
start_mgpu_train(client, args, mmar_root, gpu_number) start the FL client training using multi-GPU. :param client: the FL client object. :param args: admin command arguments for starting the FL client training. :param mmar_root: the root folder of the running MMAR. :param gpu_number: number of GPUs to run FL training :return:
-
start_train(client, args, mmar_root) start_train method to start the FL client training. :param client: the FL client object. :param args: admin command arguments for starting the FL client training. :param mmar_root: the root folder of the running MMAR. :return:
-
wait_process_complete(client)
-
wait_training_process_finish(client)
-
-
class
ThreadExecutor(client, executor) Bases:
fed_learn.client.client_executor.ClientExecutor-
abort_train(client) To abort the running client. :param client: the FL client object. :return: N/A
-
check_status(client) check the status of the running client. :param client: the FL client object. :return: running FL client status message.
-
start_mgpu_train(client, args, mmar_root, gpu_number) start the FL client training using multi-GPU. :param client: the FL client object. :param args: admin command arguments for starting the FL client training. :param mmar_root: the root folder of the running MMAR. :param gpu_number: number of GPUs to run FL training :return:
-
start_train(client, args, mmar_root) start_train method to start the FL client training. :param client: the FL client object. :param args: admin command arguments for starting the FL client training. :param mmar_root: the root folder of the running MMAR. :return:
-
-
listen_command(client, conn)
-
update_client_properties(client, trainer, cross_site_val_conf=None)
Server and client side model handlers
-
class
ClientModelManager(task_names=None, exclude_vars=None, privacy=None, model_reader_writer=None, model_validator=None, visualizer=None) Bases:
objectClient-side model manager lives on client’s local machine.
Initialization of general params inside a cient model manager.
- Parameters
task_names – a list of hashable, each uniquely defines a remote server
-
assign_current_model(remote_models) Save model from remote model by model writer.
- Parameters
remote_models – a ModelData message
- Returns
True if the local model changed
-
close() Clear up everything.
-
get_best_model()
-
get_current_model_data(local_model_params=None) Save the local_model values by model writer.
- Parameters
local_model – Should be a dict of numpy.ndarray with weights values.
-
get_fitter()
-
model_meta(task_name) task meta data, should be consistent with the server’s
-
model_vars(task_name) task variables, should be a subset of the server’s
-
num_local_iterations() Number of local training iterations per federated round
- Returns
1 if fitter not set else value extracted from fitter
-
read_current_model(task_name, type_str='delta_w') Read variables from the model reader. Also it computes the model diff values.
- Returns
a ModelData message to be sent to server
-
set_fitter(fitter)
-
train() Proxy API to start fitter train.
-
update_current_model(local_model_params) Save the local_model values by model writer.
- Parameters
local_model – Should be a dict of numpy.ndarray with weights values.
-
validate_model(cross_site_val_configer, checkpoint)
-
class
ClientStatus Bases:
object-
TRAINING_EXCEPTION= 4
-
TRAINING_NOT_STARTED= 0
-
TRAINING_STARTED= 2
-
TRAINING_STARTING= 1
-
TRAINING_STOPPED= 3
-
status_messages= {0: 'training not started', 1: 'training starting', 2: 'training started', 3: 'training stopped', 4: 'training exception'}
-
-
get_status_message(status)
-
class
Communicator(model_manager, ssl_args=None, secure_train=False, retry_timeout=30, pre_processors: [] = None , post_processors: [] = None , client_state_processors: [] = None , compression=None) Bases:
object-
client_registration(uid, servers, task_name) Client’s meta data used to authenticate and communicate.
- Returns
a ClientLogin message.
-
getModel(servers, task_name, token, app_ctx) Get registered with the remote server via channel, and fetch the server’s model parameters.
- Parameters
task_name – server identifier string
- Returns
a CurrentModel message from server
-
get_client_state(task_name, token, app_ctx=None) Client’s meta data used to authenticate and communicate.
- Returns
a ClientState message.
-
get_validation_models(uid, servers, task_name, token) Send a request to server to get models for cross validation.
- Parameters
task_name – server identifier string
- Returns
a CurrentModel message from server
-
grpc_error_handler(service, grpc_error, action, start_time, retry, verbose=False) Handling grpc exceptions :param action: :param start_time: :param service:
-
quit_remote(servers, task_name, token) Sending the last message to the server before leaving.
- Parameters
task_name – server task identifier
- Returns
server’s reply to the last message
-
send_heartbeat(servers, task_name, token)
-
set_up_channel(channel_dict, token=None) Connect client to the server.
- Parameters
channel_dict – grpc channel parameters
token – client token
- Returns
an initialised grpc channel
-
submitUpdate(servers, task_name, token, fl_ctx: fed_learn.model_meta.FLContext, app_ctx, uid)
-
submit_best_local_model(uid, servers, task_name, token, disable=False) Submit the best local model to server.
- Parameters
task_name – server task identifier
disable – Send an empty message (signals not participating.)
- Returns
server’s reply to the last message.
-
submit_cross_site_validation_results(uid, servers, task_name, token, client_names, client_metrics) Submit results of cross validation to server.
- Parameters
client_names – List of client names
client_metrics – List of dicts. Each dict is set of metric names and values.
-
-
class
DataAssembler Bases:
object-
get_best_local_model(model_manager, task_name, client_state, disable=False) Creates a message containing best model files.
-
get_contribution_data(model_manager, task_name, client_state, pre_processors: [], fl_ctx: fed_learn.model_meta.FLContext, app_ctx )
-
The client of the federated training process
-
class
FederatedClient(client_id, client_args, secure_train, server_args=None, exclude_vars=None, privacy=None, retry_timeout=30, cross_site_validate=False, model_reader_writer: fed_learn.components.model_reader_writer.ModelProcessor = None, model_validator: fed_learn.components.model_validator.ModelValidator = None, pre_processors: [] = None , post_processors: [] = None , client_state_processors: [] = None , req_processors=None, handlers: [] = None , compression=None) Bases:
fed_learn.client.fed_client_base.FederatedClientBaseFederated client-side implementation.
-
admin_run(fitter)
-
cross_validation(client_local_rank)
-
federated_step(fitter) Run a federated step.
-
run(fitter) Run the client-side as a thread
-
run_federated_steps(fitter) Keep running federated steps.
-
The client of the federated training process
-
class
FederatedClientBase(client_id, client_args, secure_train, server_args=None, exclude_vars=None, privacy=None, retry_timeout=30, cross_site_validate=False, model_reader_writer: fed_learn.components.model_reader_writer.ModelProcessor = None, model_validator: fed_learn.components.model_validator.ModelValidator = None, pre_processors: [] = None , post_processors: [] = None , client_state_processors: [] = None , req_processors=None, handlers: [] = None , compression=None) Bases:
objectFederated client-side base implementation. This class provide the tools function which will be used in both FedClient and FedClientLite.
-
property
app_context
-
check_progress(remote_models)
-
client_register(task_name)
-
close() Quit the remote federated server, close the local session.
-
fetch_remote_model(task_name) Get registered with the remote server via channel, and fetch the server’s model parameters.
- Parameters
task_name – server identifier string
- Returns
a CurrentModel message from server
-
get_validation_models(task_name) Gets the best models of other clients from server to run cross-validation.
-
heartbeat() Sends a heartbeat from the client to the server.
-
pull_models() Fetch remote models and update the local client’s session.
-
push_best_model(task_name) Pushes the best local model to server. This function sends a LocalModel message.
- Parameters
task_name – Should be one of the keys to self.servers
-
push_best_models() Push the best local model to multiple servers.
-
push_models() Push the local model to multiple servers.
-
push_remote_model(task_name) Read local model and push to self.server[task_name] channel. This function makes and sends a Contribution Message.
- Parameters
task_name – should be one of the keys of self.server
-
quit_remote(task_name) Sending the last message to the server before leaving.
- Parameters
task_name – server task identifier
- Returns
server’s reply to the last message
-
register() Push the local model to multiple servers.
-
run_cross_site_validation() Run cross validation
-
run_heartbeat() Periodically runs the heartbeat.
-
send_heartbeat(task_name)
-
start_heartbeat()
-
submit_cross_site_validation_results(params)
-
property
The client of the federated training process
-
class
DirectFederatedClient(client_id, client_args, secure_train, server_args=None, exclude_vars=None, privacy=None, retry_timeout=30, train_context=None, model_reader_writer: fed_learn.components.model_reader_writer.ModelProcessor = None, pre_processors: [] = None , post_processors: [] = None , client_state_processors: [] = None , req_processors=None, handlers: [] = None , train_options=None) Bases:
fed_learn.client.fed_client_base.FederatedClientBaseFederated client-side implementation
-
property
app_context
-
close_connect() End connection from server. Should be only called by main thread in multi-gpu case.
-
connect_to_server(options={'heart_beat': True}) Connect to server. Should be only called by main thread in multi-gpu case.
-
in_training_progress()
-
pull_models_from_server(options=None) Pull models from server. Should be only called by main thread in multi-gpu case.
- Parameters
options – options when pulling model from server
- Returns
Get model data in intermediate format(dict)
-
push_models_to_server(model_data, options=None) Push models from server. Should be only called by main thread in multi-gpu case.
- Parameters
model_data – dict-formated model data to push to server
-
property
privacy preserving modules
-
class
LaplacianProtocol(fraction=0.1, epsilon=1000, gamma=0.001, tau=0.0001) Bases:
fed_learn.client.fed_privacy.PrivacyProtocolimplementation of “random with thresholding” privacy preserving policy
Shokri and Shmatikov, Privacy-preserving deep learning, CCS ‘15
-
apply(model_diff, train_ctx=None) differentially private DSSGD.
- Parameters
model_diff – delta w after a round of local training
train_ctx – TLT training context.
- Returns
diff model to be uploaded to the server
-
-
class
PercentileProtocol(percentile=10, gamma=0.01) Bases:
fed_learn.client.fed_privacy.PrivacyProtocolimplementation of “largest percentile to share” privacy preserving policy
Shokri and Shmatikov, Privacy-preserving deep learning, CCS ‘15
-
apply(model_diff, train_ctx=None) compute the percentile on the abs delta_W, only share the params where absolute delta_W greater than the percentile value
# TODO dump the values for visualisation :param model_diff: model after a round of local training :param train_ctx: unused param. :return: model data to be uploaded to the server
-
-
class
PrivacyProtocol Bases:
abc.ABC-
abstract
apply(model_diff, train_ctx) compute model data to be uploaded to the server.
- Returns
model dictionary {var_name: numpy_var_value}
-
abstract
-
class
SVTProtocol(fraction=0.1, epsilon=0.1, noise_var=0.1, gamma=1e-05, tau=1e-06) Bases:
fed_learn.client.fed_privacy.PrivacyProtocolimplementation of the standard SVT differential privacy algorithm.
-
apply(model_diff, train_ctx=None) differentially private DSSGD.
- Parameters
model_diff – delta w after a round of local training
train_ctx – TLT training context.
- Returns
diff model to be uploaded to the server
-
-
class
TestFederatedClient(client_id, client_args, secure_train, server_args=None, exclude_vars=None, privacy_params=None, req_processors=None) Bases:
fed_learn.client.fed_client.FederatedClient-
client_login(task_name)
-
-
class
ShellCommandProcessor Bases:
fed_learn.client.admin.RequestProcessor-
get_topics() → [<class ‘str’>] Get topics that this processor will handle :return: list of topics
-
process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message Called to process the specified request :param req: :param app_ctx: :return: a reply message
-
-
class
SysInfoProcessor Bases:
fed_learn.client.admin.RequestProcessor-
get_topics() → [<class ‘str’>] Get topics that this processor will handle :return: list of topics
-
process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message Called to process the specified request :param req: :param app_ctx: :return: a reply message
-
-
class
AbortClientProcessor Bases:
fed_learn.client.admin.RequestProcessor-
get_topics() → [<class ‘str’>] Get topics that this processor will handle :return: list of topics
-
process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message Called to process the specified request :param req: :param app_ctx: :return: a reply message
-
-
class
ClientStatusProcessor Bases:
fed_learn.client.admin.RequestProcessor-
get_topics() → [<class ‘str’>] Get topics that this processor will handle :return: list of topics
-
process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message Called to process the specified request :param req: :param app_ctx: :return: a reply message
-
-
class
DeleteRunNumberProcessor Bases:
fed_learn.client.admin.RequestProcessor-
get_topics() → [<class ‘str’>] Get topics that this processor will handle :return: list of topics
-
process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message Called to process the specified request :param req: :param app_ctx: :return: a reply message
-
-
class
DeployProcessor Bases:
fed_learn.client.admin.RequestProcessor-
get_topics() → [<class ‘str’>] Get topics that this processor will handle :return: list of topics
-
process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message Called to process the specified request :param req: :param app_ctx: :return: a reply message
-
-
class
RestartClientProcessor Bases:
fed_learn.client.admin.RequestProcessor-
get_topics() → [<class ‘str’>] Get topics that this processor will handle :return: list of topics
-
process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message Called to process the specified request :param req: :param app_ctx: :return: a reply message
-
-
class
ShutdownClientProcessor Bases:
fed_learn.client.admin.RequestProcessor-
get_topics() → [<class ‘str’>] Get topics that this processor will handle :return: list of topics
-
process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message Called to process the specified request :param req: :param app_ctx: :return: a reply message
-
-
class
StartClientMGpuProcessor Bases:
fed_learn.client.admin.RequestProcessor-
get_topics() → [<class ‘str’>] Get topics that this processor will handle :return: list of topics
-
process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message Called to process the specified request :param req: :param app_ctx: :return: a reply message
-
-
class
StartClientProcessor Bases:
fed_learn.client.admin.RequestProcessor-
get_topics() → [<class ‘str’>] Get topics that this processor will handle :return: list of topics
-
process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message Called to process the specified request :param req: :param app_ctx: :return: a reply message
-
-
class
ValidateRequestProcessor Bases:
fed_learn.client.admin.RequestProcessor-
get_topics() → [<class ‘str’>] Get topics that this processor will handle :return: list of topics
-
process(req: fed_learn.admin_defs.Message, app_ctx) → fed_learn.admin_defs.Message Called to process the specified request :param req: :param app_ctx: :return: a reply message
-