Skip to content

Commit

Permalink
fix and update readme
Browse files Browse the repository at this point in the history
  • Loading branch information
SolenoidWGT committed Sep 12, 2024
1 parent b536cd3 commit bf8b40e
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 306 deletions.
3 changes: 1 addition & 2 deletions gen_profiler_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@

from internlm.simulator.profiler.perf_comm import gen_perf

if __name__ == "__main__":
gen_perf()
gen_perf()
112 changes: 15 additions & 97 deletions internlm/core/context/parallel_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
import torch.distributed as dist

from internlm.accelerator import get_accelerator
from internlm.core.context.process_group_initializer_simplified import Initializer, ParallelMeta
from internlm.utils.common import SingletonMeta
from internlm.utils.common import SingletonMeta, get_args
from internlm.utils.logger import get_logger
from internlm.utils.timeout import LLM_NCCL_TIMEOUT

from . import process_group_initializer as pgroup_initializer
from .process_group_initializer_simplified import ParallelMode
from .process_group_initializer import ParallelMode
from .random import add_seed, get_seeds, set_mode
from internlm.utils.common import get_args

IS_REPLICA_ZERO_PARALLEL = "is_replica_zero_parallel"
# for isp, with optimizer split in dp group
Expand Down Expand Up @@ -422,20 +420,6 @@ def init_global_dist(
use_cpu (bool): whether to set up cpu process group.
"""

# find cluster info
if "clusters" not in self.config:
nv_info = {
"rank_range": [0, 8],
"peak_tflops": 320,
"capacity": 80 * 1024**3,
"intra_bw": 150,
"inter_bw": 100,
}
self.set_cluster_info("nv_cluster", nv_info)
else:
for cluster in self.config.clusters:
self.clusters.append(ClusterInfo(**cluster))

# initialize the default process group
if not fake_mode:
init_method = f"tcp://[{host}]:{port}"
Expand Down Expand Up @@ -576,8 +560,7 @@ def init_parallel_groups(self, fake_mode: bool = False):
self._set_parallel_size_from_config(parallel_config, "tensor", "tensor_parallel_size")
self._set_parallel_size_from_config(parallel_config, "pipeline", "pipeline_parallel_size")
self._set_parallel_size_from_config(parallel_config, "zero1", "zero1_parallel_size")



if get_args().use_simplified_gp_init:
self._init_use_simplified_pg(rank, world_size, parallel_config)
else:
Expand All @@ -592,10 +575,7 @@ def _init_pg(self, rank, world_size, parallel_config):
1, self.world_size // self.pipeline_parallel_size // self.weight_parallel_size
)

if (
isinstance(parallel_config["tensor"], dict)
and parallel_config["tensor"]["mode"] == "isp"
):
if isinstance(parallel_config["tensor"], dict) and parallel_config["tensor"]["mode"] == "isp":
if self.zero1_parallel_size == -1:
self.zero1_parallel_size = self.weight_data_parallel_size
self.zero1_parallel_size = max(1, self.zero1_parallel_size)
Expand All @@ -622,8 +602,7 @@ def _init_pg(self, rank, world_size, parallel_config):
if "sequence_parallel" not in parallel_config:
parallel_config._add_item("sequence_parallel", True)
if isinstance(parallel_config["tensor"], int) or (
isinstance(parallel_config["tensor"], dict)
and parallel_config["tensor"]["mode"] == "mtp"
isinstance(parallel_config["tensor"], dict) and parallel_config["tensor"]["mode"] == "mtp"
):
parallel_config["sequence_parallel"] = False

Expand Down Expand Up @@ -665,10 +644,7 @@ def _init_pg(self, rank, world_size, parallel_config):
initializers.append(pgroup_initializer.Initializer_Tensor(*initializer_args))
initializers.append(pgroup_initializer.Initializer_Data(*initializer_args))
initializers.append(pgroup_initializer.Initializer_ISP_Data(*initializer_args))
if (
isinstance(parallel_config["tensor"], dict)
and parallel_config["tensor"]["mode"] == TensorParallelMode.isp.name
):
if isinstance(parallel_config["tensor"], dict) and parallel_config["tensor"]["mode"] == "isp":
initializers.append(pgroup_initializer.Initializer_Zero1_ISP(*initializer_args))
else:
initializers.append(pgroup_initializer.Initializer_Zero1(*initializer_args))
Expand All @@ -686,7 +662,7 @@ def _init_pg(self, rank, world_size, parallel_config):
self._register_dist(*args)
else:
self._register_dist(*parallel_setting)

def _init_use_simplified_pg(self, rank, world_size, parallel_config):
try:
self.tensor_mode = parallel_config["tensor"]["mode"]
Expand Down Expand Up @@ -723,6 +699,11 @@ def _init_use_simplified_pg(self, rank, world_size, parallel_config):

self.check_sanity()

from internlm.core.context.process_group_initializer_simplified import (
Initializer,
ParallelMeta,
)

parallel_info = {
"tp": ParallelMeta(self.tensor_parallel_size, ParallelMode.TENSOR),
"wp": ParallelMeta(self.weight_parallel_size, ParallelMode.WEIGHT),
Expand Down Expand Up @@ -861,14 +842,14 @@ def check_pg_is_intra(self, parallel_mode: ParallelMode):
return (max_rank - min_rank) <= 7

def same_group_in_one_node(self, parallel_mode: ParallelMode):
"""获得一个节点内有多少个相同类型的PG, 在跨节点通信时会存在带宽竞争
这里返回的相同PG的数量会乘上每个rank的通信数据量大小
"""Get the number of the same type of PG within a node. There will be bandwidth competition during cross-node communication.
The number of the same PG returned here will be multiplied by the communication data size of each rank.
Args:
parallel_mode (ParallelMode):
Returns:
int: 一个节点内相同类型的PG的数量
int: The number of the same type of PG within a node.
"""
pg_group_ranks = self.get_ranks_in_group(parallel_mode)
pg_group_ranks = sorted(pg_group_ranks)
Expand All @@ -881,68 +862,5 @@ def same_group_in_one_node(self, parallel_mode: ParallelMode):
else:
return stride

# def set_cluster_info(self, name: str, info: dict):
# self.clusters[name] = ClusterInfo(**info)

def get_cluster_info(self, name: str):
return self.clusters[name]

def get_cluster_name_from_ip(self):
"""
node_ip_list = [
'metax-c500-1',
'metax-c500-2',
'nvidia-node-1',
'nvidia-node-2',
]
"""
hostname = socket.gethostname()
cluster_name = hostname.split("-")[0]
return cluster_name

def sort_rank_based_on_ip_and_capacity(self):
Capacity = []

def sort_rank(x, y):
x_name = self.get_cluster_name_from_ip(x)
y_name = self.get_cluster_name_from_ip(y)
if x_name == y_name:
return x_name > y_name
else:
x_c = self.clusters[x_name]["capacity"]
y_c = self.clusters[y_name]["capacity"]
return x_c > y_c

for cluster_name, cluster_info in self.clusters.items():
peak_tflops.append(cluster_info["peak_tflops"])
# Alpha.append(cluster_info.rank_range[-1] - cluster_info.rank_range[-1] + 1)
Capacity.append(cluster_info["capacity"])

def switch_topology_aware_rank_scheduling():
"""
Switch topology-aware rank scheduling can optimize the performance of small-scale
collective communications. Currently only supported in Alibaba Cloud.
"""

local_rank = int(os.environ["LOCAL_RANK"])
cluster_name = get_cluster_name_from_ip()

try:
if cluster_name == "Ali":
pass
else:
rank = int(os.environ["MLP_WORKER_RACK_RANK_INDEX"]) * 8 + local_rank
except Exception as e:
logger.error(
f"The switch topology awareness error is reported, the reason is: {e}",
"but don’t worry, this error will not affect normal training.",
"If you train on Alibaba or Volcano Cloud, please contact wangguoteng or lijiaxing",
)
else:
# If there is no any error, hack torch rank.
os.environ["RANK"] = str(rank)
if local_rank == 0:
logger.info("Successfully bound node switch affinity!")


global_context = ParallelContext()
4 changes: 4 additions & 0 deletions internlm/core/context/process_group_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class ParallelMode(Enum):

# grouped query attention
GQA = "gqa"

INTRA_DP_SZIE = "intra_dp"

INTER_DP_SZIE = "inter_dp"


class ProcessGroupInitializer(ABC):
Expand Down
2 changes: 0 additions & 2 deletions internlm/core/context/process_group_initializer_simplified.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@
# -*- encoding: utf-8 -*-

from copy import deepcopy
from enum import Enum

import torch
import torch.distributed as dist

from internlm.utils.timeout import LLM_NCCL_TIMEOUT
from internlm.core.context.process_group_initializer import ParallelMode

class ParallelMeta:
def __init__(self, parallel_size, mode) -> None:
Expand Down
5 changes: 3 additions & 2 deletions internlm/initialize/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from internlm.accelerator import AcceleratorType, get_accelerator
from internlm.core.context import Config
from internlm.core.context import global_context as gpc
from internlm.core.context.process_group_initializer_simplified import ParallelMode
from internlm.core.context.process_group_initializer import ParallelMode
from internlm.utils.common import get_master_node
from internlm.utils.gputest import warmup_process_group
from internlm.utils.logger import get_logger
Expand Down Expand Up @@ -86,7 +86,8 @@ def add_simulator_arguments(parser):
group.add_argument(
"--pre_profiling_data_path", type=str, help="The path to pre-profiled performance data on the target cluster."
)
group.add_argument("--use_simplified_gp_init", action="store_true", default=False)
group.add_argument("--use_simplified_gp_init", action="store_true", default=True)

return parser


Expand Down
4 changes: 0 additions & 4 deletions internlm/model/ops/linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@

from internlm.accelerator import AcceleratorType, get_accelerator
from internlm.core.context import global_context as gpc
from internlm.simulator.ops.linear import (
_fake_linear_bwdward_op,
_fake_linear_forward_op,
)

try:
from fused_dense_lib import linear_bias_wgrad as _flash_linear_backward_op
Expand Down
61 changes: 61 additions & 0 deletions internlm/simulator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# InternLM Simulator


## 1. Introduction
The solver mainly consists of two components:
1. `profiling`: Collects the time consumption of each stage during the model training process in advance and saves it as data files and image files.
2. `simulation`: Simulates the model training process based on the collected data files and outputs the time consumption of each stage during the training process.

## 2. Usage

### 2.1 Generate profiling data

There are two types of profiling data:
1. '`linear`' profiling data, include: [`LINEAR`]
2. '`Communication`' profiling data, include: [`ALL2ALL`, `ALLREDUCE`, `REDUCESCATTER`, `ALLGATHER`, `BROADCAST`]


Note:
1. It is recommended to use more than 64 GPUs for data collection to ensure more accurate communication data.
2. `Flash Attention` information is not collected in advance but is collected on the fly during the simulation and stored in the cache. This is because there are many variables that affect the performance of flash attention, and collecting in advance cannot cover all variables.

```python
# generate profiling data
torchrun --nproc-per-node=8 gen_profiler_data.py

# the profiling data will be saved in the following path
./prof_data
├── data.pt
└── pics
├── cal
│ └── linear.jpg
└── comm
├── all2all_intra_2_inter_1.jpg
├── all2all_intra_4_inter_1.jpg
├── all_gather_intra_2_inter_1.jpg
├── all_gather_intra_4_inter_1.jpg
├── all_reduce_intra_2_inter_1.jpg
├── all_reduce_intra_4_inter_1.jpg
├── broadcast_intra_2_inter_1.jpg
├── broadcast_intra_4_inter_1.jpg
├── reduce_scatter_intra_2_inter_1.jpg
└── reduce_scatter_intra_4_inter_1.jpg

```

### 2.2 Run simulation
Running the solver does not require a GPU (although some packages may require a GPU environment, if you encounter any issues, please raise an issue). Currently, the solver only supports the formulaic solving method using simulation_train_formulaic.py, which requires a config file and profiling data file as follows:

```bash

python simulation_train_formulaic.py --pre_profiling_data_path ./prof_data/data.pt --config configs/7B_internlm2.py --run_all_solu --model_size 7 --world_size 128 --global_batch_size 4194304

# explanation:
python simulation_train_formulaic.py
--pre_profiling_data_path ./prof_data/data.pt # profiling data file
--config configs/7B_internlm2.py # model configuration file
--run_all_solu # whether to iterate and solve all possible solutions
--model_size 7 # means 7B model, if you want to run 70B model, you can set model_size to 70
--world_size 128 # solving range is 128 cards
--global_batch_size 4194304 # global batch size, 4M
```
Loading

0 comments on commit bf8b40e

Please sign in to comment.