Megatron-LM源码系列(一):模型并行初始化
在本系列中,我们将探讨Megatron-LM的源代码。Megatron-LM是由Nvidia开发的一个大规模语言模型训练框架,它采用模型并行的方式实现分布式训练。在本篇文章中,我们将关注模型并行初始化的过程。
1. pretrain
在Megatron中pretrain
函数是框架执行的入口,定义在megatron/training.py
文件中。
1 | def pretrain(train_valid_test_dataset_provider, |
以gpt训练为例,在pretrain_gpt.py
中使用如下:
1 | if __name__ == "__main__": |
Pretrain训练的流程分为四步:
- 初始化Megatron
1 | initialize_megatron(extra_args_provider=extra_args_provider, |
- 使用
model_provider
设置model、optimizert和lr schedule
1 | model, optimizer, opt_param_scheduler = setup_model_and_optimizer( |
- 使用
train_val_test_data_provider
获取train/val/test
数据集
1 | train_data_iterator, valid_data_iterator, test_data_iterator \ |
- 使用
forward_step_func
去训练模型
1 | iteration = train(forward_step_func, |
2. initialize_megatron
initialize_megatron
初始化定义在文件
megatron/initialize.py
中。初始化分为3步:mpu初始化、autoresume自动恢复初始化、依赖编译。
mpu
全称是model parallelism unit
,定义在megatron/core/__init__.py
中,mpu = parallel_state
。
1 | # Megatron's MPU is the master. Complete initialization right away. |
3. _initialize_distributed
- 先设置当前cuda device,并调用init_process_group
1 | if args.rank == 0: |
- 进行模型并行的初始化
1 | args.pipeline_model_parallel_size, |
4. initialize_model_parallel
4.1 基本概念
- Tensor并行,也称为层内并行(Tensor/Intra-Layer Parallelism)。如下图,没有使用pipeline并行的情况下,tensor并行度设为2,表示每层layer都会被横向切成两部分,每部分会分别被不同的device处理。例如以单机8卡为例,tensor并行度设为2,总共会分为4组tensor并行组,每一组都会保存一份完整的网络,每组会各有两卡GPU,每张GPU卡处理所有网络层的一半。对应可以进行的数据并行度也就是4。
- Pipeline并行,也称为层间并行(Pipeline/Inter-Layer Parallelism)。如下图,没有使用tensor并行的情况下,pipeline并行度设为2,表示网络的所有layer会被分为两组(纵向切),每组layer会和由一个device进行处理。例如以单机8卡为例,pipeline并行度设为2,总共会分为4组pipeline并行组,每一组都会保存一份完整的网络,每组会各有两卡GPU,每张GPU卡处理一半的网络。对应可以进行的数据并行度也就是4。
在
Megatron-LM
中Pipeline并行支持阶段穿插调度(interleaved stage schedule
), 流水线分为多个stage的时候,每个device会处理多个流水线中的stage。例如每连续的4个layer是做为pipeline的一个stage,每个device处理4个layer的话,之前流水线做法是device顺序划分处理pipeline stage,device1
支持stage1(layer[1/2/3/4]
),device2
支持stage2([layer5/6/7/8]
),device3
支持stage3([layer9/10/11/12]
), 以此类推; 现在流水线处理是间隔处理,每个device处理涉及两个stage,device1
支持[layer1/2/9/10]
,device2
支持[layer3/4/11/12]
, 以此类推。Tensor并行+Pipeline并行混合并行。如下图,当同时设置了pipeline并行度为2,tensor并行度为2,表示一个网络先被纵向为左右两部分(也就是pipeline并行),然后每一部分被横向切开,被应用上2块gpu卡的tensor并行,总体上来看一个网络总共会被( \(pipeline并行度 \times tensor并行度 = 4\) )块gpu卡处理。例如以单机8卡为例,在pipeline并行度为2且tensor并行度为2的情况下,可以同时保存2份相同的网络,对应可以进行的数据并行度等同于2( $ total_cards_num = 2$ )。
4.2 initialize_model_parallel函数定义
initialize_model_parallel
是模型初始化的重点,定义在megatron/core/parallel_state.py
1 | def initialize_model_parallel( |
先看下传入参数定义: * tensor_model_parallel_size(必选,默认为1):
每个tensor并行组中的GPU卡数,默认为1 *
pipeline_model_parallel_size(必选,默认为1):
表示一个pipeline模型并行通信组中的GPU卡数,pipeline并行相当于把layer纵向切为了N个stage阶段,每个阶段对应一个卡,所以这里也就等于stage阶段数。例如pipeline_model_parallel_size
为2,tensor_model_parallel_size
为4,表示一个模型会被纵向分为2个stage进行pipeline并行,每个组(stage)内会对应有一个tensor并行组进行4卡gpu的tensor并行。如下图分为2个阶段,每个阶段按列[g0, g1, g2, g3]
和[g4, g5, g6, g7]
分别对应两个tensor并行通信组;
按行分别有4个pipeline并行通信组,通信方向是从阶段一到阶段二,分别是[g0->g4], [g1->g5], [g2->g6], [g3->g7]
阶段一 | 阶段二 |
---|---|
g0 | g4 |
g1 | g5 |
g2 | g6 |
g3 | g7 |
- virtual_pipeline_model_parallel_size(可选,默认为None):
为了支持
interleaved schedule
,例如在一个模型网络中有16层transformer, 设置【tensor_model_parallel_size=1, pipeline_model_parallel_size=4, virtual_pipeline_model_parallel_size=2
】,一个完整的模型会由【pipeline_model_parallel_size * tensor_model_parallel_size = 4
】张gpu卡进行处理,模型运行会被分为4*2=8
个stage,每个tensor并行组(这里一个组里只有一张卡)对应处理两个stage。virtual_pipeline_model_parallel_size
也就是对应一张卡上处理的stage个数。 - pipeline_model_parallel_split_rank(可选,默认为None):针对encoder-decoder模型结构,用于区分,例如
tensor_model_parallel_size=1, pipeline_model_parallel_size=8
对有8个stage的pipeline, 每个stage对应有一张卡,pipeline_model_parallel_split_rank=3
表示rank0-2
的4张卡属于encoder,rank3-7
的4张卡属于decoder。 - use_fp8(默认为False): 使用FP8训练
在initialize_model_parallel
中会进行5种类型的分组: *
data parallel,对应变量_DATA_PARALLEL_GROUP
* model
parallel,对应变量_MODEL_PARALLEL_GROUP
* tensor model
parallel,对应变量_TENSOR_MODEL_PARALLEL_GROUP
* pipeline
model parallel,对应变量_PIPELINE_MODEL_PARALLEL_GROUP
*
embedding groups,对应变量_EMBEDDING_GROUP
4.3 data parallel数据并行通信组初始化
数据并行度data_parallel_size
是自动算出来的,
表示总共有data_parallel_size
个模型可以同时训练。
1 | data_parallel_size: int = world_size // (tensor_model_parallel_size * pipeline_model_parallel_size) |
通信初始化先以pipeline_model_parallel_size
(表示pipeline并行的stage个数/tensor并行组的个数)为外循环,以tensor_model_parallel_size
为内循环(表示一个tensor并行组内gpu卡的个数),
并按tensor_model_parallel_size
大小为间隔来创建通信组。
1 | data_parallel_size: int = world_size // (tensor_model_parallel_size * |
以两机共16卡(2台gpu机器,每个机器8卡,以g0 ... g15
表示16块gpu卡)为例,当tensor_model_parallel_size=2, pipeline_model_parallel_size=4
时,
data_parallel_size
等于16/(2*4)=2
。all_data_parallel_group_ranks
数据并行通信组对应为
[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]
从机器视角来看如下图,\(g_n\) 表示一张卡,卡与卡之间的连线表示在同一个通信组中:
4.4 model parallel模型并行通信组初始化
这里的模型并行通信组相当于同时包含了tensor模型并行组
和pipeline模型并行组
(不区分是tensor还是pipeline),也就是对应包含训练整个模型的所有gpu卡的通信组,组的个数对应数据并行的data_parallel_size
,组内卡的个数对应tensor_model_parallel_size * pipeline_model_parallel_size
。代码如下:
1 | # Build the model-parallel groups. |
以两机共16卡(2台gpu机器,每个机器8卡,以g0 ... g15
表示16块gpu卡)为例,当tensor_model_parallel_size=2, pipeline_model_parallel_size=4
时,
data_parallel_size
等于16/(2*4)=2
,模型并行通信组也是2个。基于all_data_parallel_group_ranks
分组中来得到模型并行通信组,结果为[g0, g1, g4, g5, g8, g9, g12, g13]
和[g2, g3, g6, g7, g10, g11, g14, g15]
。
从机器视角来看数据并行如下图,黄色和绿色的块表示整体模型并行的通信组,每个通信组中有8张卡,每组各负责训练一个完整的模型,数据同时分发给两块:
4.5 tensor model parallel通信组初始化
tensor模型并行组内gpu卡数由tensor_model_parallel_size
来指定。以两机共16卡(2台gpu机器,每个机器8卡)为例,tensor_model_parallel_size=2, pipeline_model_parallel_size=4
情况下,tensor并行通信组创建按顺序来取,即为
[g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]
。代码如下:
1 | # Build the tensor model-parallel groups. |
从机器视角来看数据并行如下图, 每一个红色框相当于一个tensor并行通信组:
4.6 pipeline model parallel通信组初始化
pipeline模型并行pipeline_model_parallel_size
来指定,
有。以两机共16卡(2台gpu机器,每个机器8卡)为例,tensor_model_parallel_size=2, pipeline_model_parallel_size=4
情况下,表示一个模型训练过程会被分为4个stage(每个stage对应一个tensor并行组)。流水线并行分组按parallel_groups的个数
为间隔来取,即为
[g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]
。
1 | for i in range(num_pipeline_model_parallel_groups): |
从机器视角来看pipeline并行如下图, 每一大列都是pipeline中的一个stage,stage的处理是从左往右依次进行,为了解决不同stage的通信,所以需要创建四个通信组(每个色块表示一个通信组):
通信时上下游用到的rank号获取通过如下方法获取:
1 | def get_pipeline_model_parallel_next_rank(): |
4.7 embedding通信组初始化
_EMBEDDING_GROUP
构建embedding通信组,用于交换pipeline并行中第一个stage和最后一个stage中的gradient信息,
默认情况下只包含两个rank。在设置了pipeline_model_parallel_split_rank
情况下,会把split设置的中间的那个rank也加入通信组中。
_POSITION_EMBEDDING_GROUP
跟_EMBEDDING_GROUP
不同点在于只保留第一个stage的
rank。同样在设置了pipeline_model_parallel_split_rank
情况下,会把split设置的中间的那个rank也加入通信组中。
代码如下:
1 | # Setup embedding group (to exchange gradients between |
4.8 汇总说明如下
以两机共16卡(2台gpu机器,每个机器8卡)为例,tensor_model_parallel_size=2, pipeline_model_parallel_size=4
情况下。如下图,
蓝色表示8个tensor并行组,每个组内有2张卡(tensor_model_parallel_size=2);绿色表示4个pipeline并行组,每个组内有4张卡(pipeline_model_parallel_size=4);红色表示两个数据并行组,每个组内有8张卡;从模型切分的粒度从小到大来说,tensor并行(tensor分组)
< pipeline并行(layer分组) < 数据并行(整个模型复制)