Parallelize Training
Alexandre Strube // Sabrina Benassou // Josรฉ Ignacio Robledo
December 5th, 2024
Move to the correct folder
Again, this is not a deep learning course.
If you are not familiar with the model and the dataset, just imagine it as a black box: you provide it with text, and it returns a summary.
Letโs have a look at the files train.py
and run_train.sbatch
in the repo.
There are TODOs in these two files. Do not modify the TODOs for now. The code is already working, so you donโt need to make any changes at this point.
Now run:
Spoiler alert ๐จ
The code wonโt work.
Check the output and error files
Comment out lines 90 to 140.
Activate your environment:
Run:
Uncomment back lines 90-140.
Finally, run your job again ๐:
You can see that in fact we are using 1 GPU
It is a waste of resources.
The training takes time (13m according to llview).
Then, can we run our model on multiple GPUs ?
In file
run_train.sbatch
, we increase the number
of GPUs at line 3 to 4:
And run our job again
We are still using 1 GPU
Without correct setup, the GPUs might not be utilized.
Furthermore, we donโt have an established communication between the GPUs
That we have understood how the devices communicate and the terminologies used in parallel computing, we can move on to distributed training (training on multiple GPUs).
DDP is a method in parallel computing used to train deep learning models across multiple GPUs or nodes efficiently.
If youโre scaling DDP to use multiple nodes, the underlying principle remains the same as single-node multi-GPU training.
Whenever you see TODOs๐ป๐, follow the instructions to either copy-paste the code at the specified line numbers or type it yourself.
Depending on how you copy and paste, the line numbers may vary, but always refer to the TODO numbers in the code and slides.
distributed_utils.py
.Import
distributed_utils
file at line 13:
Then remove lines 77 and 78:
and add at line 77 a call to
the method setup()
defined in
distributed_utils.py
:
What is in the setup()
method ?
def setup():
# Initializes a communication group using 'nccl' as the backend for GPU communication.
torch.distributed.init_process_group(backend='nccl')
# Get the identifier of each process within a node
local_rank = int(os.getenv('LOCAL_RANK'))
# Get the global identifier of each process within the distributed system
rank = int(os.environ['RANK'])
# Creates a torch.device object that represents the GPU to be used by this process.
device = torch.device('cuda', local_rank)
# Sets the default CUDA device for the current process,
# ensuring all subsequent CUDA operations are performed on the specified GPU device.
torch.cuda.set_device(device)
# Different random seed for each process.
torch.random.manual_seed(1000 + torch.distributed.get_rank())
return local_rank, rank, device
TODO 4๐ป๐:
At line 83, wrap the model in a DistributedDataParallel (DDP) module to parallelize the training across multiple GPUs.
TODO 5๐ป๐:
At line 94, instantiate a DistributedSampler object for each set to ensure that each process gets a different subset of the data.
# DistributedSampler object for each set to ensure that each process gets a different subset of the data.
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
shuffle=True,
seed=args.seed)
val_sampler = torch.utils.data.distributed.DistributedSampler(val_dataset)
test_sampler = torch.utils.data.distributed.DistributedSampler(test_dataset)
TODO 6๐ป๐:
At line 103, REMOVE
shuffle=True
in the DataLoader of
train_loader and REPLACE it by
sampler=train_sampler
TODO 7๐ป๐:
At line 108, pass val_sampler to the sampler argument of the val_dataLoader
TODO 8๐ป๐:
At line 112, pass test_sampler to the sampler argument of the test_dataLoader
TODO 9๐ป๐:
At line 125, set the current epoch for the dataset sampler to ensure proper data shuffling in each epoch
TODO 10๐ป๐:
At lines 49 and 72, Obtain the global average loss across the GPUs.
TODO 11๐ป๐:
At lines 133, 144, and 148,
replace all the print
methods by
print0
method defined in
distributed_utils.py
to allow only rank 0
to print in the output file.
# We use the utility function print0 to print messages only from rank 0.
print0(f'[{epoch+1}/{args.epochs}] Train loss: {train_loss:.5f}, validation loss: {val_loss:.5f}')
The definition of the function print0 is in
distributed_utils.py
TODO 12๐ป๐:
At lines 138 and 151, replace torch.save method with the utility function save0 to allow only the process with rank 0 to save the model.
The method save0 is defined in
distributed_utils.py
functools.lru_cache(maxsize=None)
def is_root_process():
"""Return whether this process is the root process."""
return torch.distributed.get_rank() == 0
def save0(*args, **kwargs):
"""Pass the given arguments to `torch.save`, but only on the root
process.
"""
# We do *not* want to write to the same location with multiple
# processes at the same time.
if is_root_process():
torch.save(*args, **kwargs)
In run_train.sbatch
file:
At line 3, increase the number of GPUs to 4 if it is not already done.
At line 22, pass the correct number of devices.
Stay in run_train.sbatch
file:
TODO 14๐ป๐: we need to setup MASTER_ADDR and MASTER_PORT to allow communication over the system.
At line 24, add the following:
# Extracts the first hostname from the list of allocated nodes to use as the master address.
MASTER_ADDR="$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1)"
# Modifies the master address to allow communication over InfiniBand cells.
MASTER_ADDR="${MASTER_ADDR}i"
# Get IP for hostname.
export MASTER_ADDR="$(nslookup "$MASTER_ADDR" | grep -oP '(?<=Address: ).*')"
export MASTER_PORT=7010
We are not done yet with
run_train.sbatch
file:
TODO 15๐ป๐:
At line 35, we change the lauching script to use torchrun_jsc and pass the following argument:
# Launch a distributed training job across multiple nodes and GPUs
srun --cpu_bind=none bash -c "torchrun_jsc \
--nnodes=$SLURM_NNODES \
--rdzv_backend c10d \
--nproc_per_node=gpu \
--rdzv_id $RANDOM \
--rdzv_endpoint=$MASTER_ADDR:$MASTER_PORT \
--rdzv_conf=is_host=\$(if ((SLURM_NODEID)); then echo 0; else echo 1; fi) \
train.py "
The arguments that we pass are:
nnodes=$SLURM_NNODES
:
the number of nodesrdzv_backend c10d
:
the c10d method for coordinating the setup of communication among
distributed processes.nproc_per_node=gpu
the number of GPUsrdzv_id $RANDOM
a
random id which that acts as a central point for initializing and
coordinating the communication among different nodes participating in
the distributed training.rdzv_endpoint=$MASTER_ADDR:$MASTER_PORT
the IP that we setup in the previous slide to ensure all nodes know
where to connect to start the training session.rdzv_conf=is_host=\$(if ((SLURM_NODEID)); then echo 0; else echo 1; fi)
The rendezvous host which is responsible for coordinating the initial
setup of communication among the nodes.You can finally run:
Letโs have a look at our job using llview again.
You can see that now, we are using all the GPUs of the node
And that our job took less time to finish training (4m vs 13m with one GPU)
And even the test loss function is lower (0.538 vs 0.636 with one GPU).
But what about using more nodes ?
In
run_train.sbatch
at line 2, you can
increase the number of nodes to 2:
Hence, you will use 8 GPUs for training.
Run again:
Open llview again.
You can see that now, we are using 2 nodes and 8 GPUs.
Here are some useful: