P2P Sync
Documentation of p2p_sync
submodule's interface.
Index
Public Interface
ACiD.sync_process
— FunctionProcess run by every worker in the background.
This process allows each worker to communicate with the "orchestrating" master process (hosted by worker 0). The goal is to signal to the master process when this worker is available for communication, and to gather from the master process the rank of the peer with which we are supposed to communicate. When received, this information rank_other
is sent to the p2p_averaging process run in parallel in this worker, so that the p2p averaging process knows with which worker to communicate next. This process will communicate with one of the world_size
"listen_given_rank" processes hosted at worker 0, which has world_size + 1
processes run in parallel: * one to listen_to
each one of the syncprocess run by each worker. * one "orchestrating" process, dedicated to make pairs of workers. So, in total, there are `2*worldsize + 1` processes that need to communicate with each other (only sending ints), so initialize a process_group using gloo backend here.
Parameters:
- rank (int): our rank id in the distributed setting.
- world_size (int): the total number of workers.
- rank_other (mp.Value): a multiprocessing Value to store the id of the rank of the next communication. It is updated here, based on the information given by the master process, to signal to the p2p_averaging process run in parallel in this worker which peer to communicate wiith next. if
rank_other.value
== -2: signal from the orchestrating process that enough gradients have been computed in total, stops the communication process. - new_grads (mp.Value): a multiprocessing Value updated by the process and the main one, counting how many new grad steps have been performed by this worker since last communication. This is used by the master process to count the total number of grad done, and initiate the "kill" of all processes when the right amount of grad steps have been performed in total.
- barrier_sync_averaging (mp.Barrier): a barrier used to communicate with the p2p_averaging process. When the averaging process meets this barrier, it signals to this process that the worker is available for the next communication, so we can begin to look for another available peer to connect to by sending our rank information to the master process which will realize the pairing.
- log (logger): to print messages in the logs if needed.
ACiD.listen_given_rank
— FunctionProcess run in the background of worker 0.
Its goal is to listen to one specific worker (specifically, its "sync_process" process), and to send it information coming from the orchestrating process also hosted by worker 0. The main goal of this function is to put to the mp.Queue the rank of the worker it is listening to when this worker sent, through its "sync_process" function, the signal that its corresponding worker was available for a communication. Then, as this mp.Queue is shared with the orchestrating process, the orchestrating process can receive the information and pair the worker with another one.
Parameters:
- rank (int): our rank id in the distributed setting.
- world_size (int): the total number of workers.
- queue (mp.Queue): queue containing the ranks of all available workers for communication. The orchestrating process then only needs to "de-queue" the ranks to make pairs, insuring that the communications are performed in FIFO style, minimizing latency.
- nb_grad_tot_so_far (mp.Value): int storing the global count of grads (total number of gradients taken by all workers). This value is updated by adding to it the "new_grads" (see "sync_process" doc) from every worker. This mp.Value is thus updated by world_size "listen_given_rank" processes, and used by the orchestrating process to kill all processes when the target number of grads is reached.
- lock (mp.Lock): multiprocessing lock to make sure that the nb_grad_tot_so_far is edited by only one process at a time, so that no "new gradients" are thrown out by a multiprocessing bug.
- log (logger): to print messages in the logs if needed.
ACiD.master_process
— FunctionOrchestrating process hosted on worker 0.
This process accomplishes 2 things:
- Group available workers by pairs for p2p communication, according to the given graph topology, and trying to minimize latency by pairing together workers that were the first to be available to communicate.
- Signal to all processes when the target number of grads have been reached, so that computations & communication can stop.
Parameters:
- world_size (int): the total number of workers.
- ∇max_steps (int): The target number of total nb of grads performed by all workers. When it is reached, this process sends the signal to all other to stop all computations & communications.