Skip to content

Commit

Permalink
fix and update readme
Browse files Browse the repository at this point in the history
  • Loading branch information
SolenoidWGT committed Sep 12, 2024
1 parent b536cd3 commit 788c2fe
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 289 deletions.
3 changes: 1 addition & 2 deletions gen_profiler_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@

from internlm.simulator.profiler.perf_comm import gen_perf

if __name__ == "__main__":
gen_perf()
gen_perf()
88 changes: 7 additions & 81 deletions internlm/core/context/parallel_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
import torch.distributed as dist

from internlm.accelerator import get_accelerator
from internlm.core.context.process_group_initializer_simplified import Initializer, ParallelMeta
from internlm.utils.common import SingletonMeta
from internlm.utils.logger import get_logger
from internlm.utils.timeout import LLM_NCCL_TIMEOUT

from . import process_group_initializer as pgroup_initializer
from .process_group_initializer_simplified import ParallelMode
from .process_group_initializer import ParallelMode
from .random import add_seed, get_seeds, set_mode
from internlm.utils.common import get_args

Expand Down Expand Up @@ -422,20 +421,6 @@ def init_global_dist(
use_cpu (bool): whether to set up cpu process group.
"""

# find cluster info
if "clusters" not in self.config:
nv_info = {
"rank_range": [0, 8],
"peak_tflops": 320,
"capacity": 80 * 1024**3,
"intra_bw": 150,
"inter_bw": 100,
}
self.set_cluster_info("nv_cluster", nv_info)
else:
for cluster in self.config.clusters:
self.clusters.append(ClusterInfo(**cluster))

# initialize the default process group
if not fake_mode:
init_method = f"tcp://[{host}]:{port}"
Expand Down Expand Up @@ -667,7 +652,7 @@ def _init_pg(self, rank, world_size, parallel_config):
initializers.append(pgroup_initializer.Initializer_ISP_Data(*initializer_args))
if (
isinstance(parallel_config["tensor"], dict)
and parallel_config["tensor"]["mode"] == TensorParallelMode.isp.name
and parallel_config["tensor"]["mode"] == "isp"
):
initializers.append(pgroup_initializer.Initializer_Zero1_ISP(*initializer_args))
else:
Expand All @@ -688,6 +673,8 @@ def _init_pg(self, rank, world_size, parallel_config):
self._register_dist(*parallel_setting)

def _init_use_simplified_pg(self, rank, world_size, parallel_config):
from internlm.core.context.process_group_initializer_simplified import InitializerParallelMeta

try:
self.tensor_mode = parallel_config["tensor"]["mode"]
except AttributeError:
Expand Down Expand Up @@ -861,14 +848,14 @@ def check_pg_is_intra(self, parallel_mode: ParallelMode):
return (max_rank - min_rank) <= 7

def same_group_in_one_node(self, parallel_mode: ParallelMode):
"""获得一个节点内有多少个相同类型的PG, 在跨节点通信时会存在带宽竞争
这里返回的相同PG的数量会乘上每个rank的通信数据量大小
"""Get the number of the same type of PG within a node. There will be bandwidth competition during cross-node communication.
The number of the same PG returned here will be multiplied by the communication data size of each rank.
Args:
parallel_mode (ParallelMode):
Returns:
int: 一个节点内相同类型的PG的数量
int: The number of the same type of PG within a node.
"""
pg_group_ranks = self.get_ranks_in_group(parallel_mode)
pg_group_ranks = sorted(pg_group_ranks)
Expand All @@ -881,68 +868,7 @@ def same_group_in_one_node(self, parallel_mode: ParallelMode):
else:
return stride

# def set_cluster_info(self, name: str, info: dict):
# self.clusters[name] = ClusterInfo(**info)

def get_cluster_info(self, name: str):
return self.clusters[name]

def get_cluster_name_from_ip(self):
"""
node_ip_list = [
'metax-c500-1',
'metax-c500-2',
'nvidia-node-1',
'nvidia-node-2',
]
"""
hostname = socket.gethostname()
cluster_name = hostname.split("-")[0]
return cluster_name

def sort_rank_based_on_ip_and_capacity(self):
Capacity = []

def sort_rank(x, y):
x_name = self.get_cluster_name_from_ip(x)
y_name = self.get_cluster_name_from_ip(y)
if x_name == y_name:
return x_name > y_name
else:
x_c = self.clusters[x_name]["capacity"]
y_c = self.clusters[y_name]["capacity"]
return x_c > y_c

for cluster_name, cluster_info in self.clusters.items():
peak_tflops.append(cluster_info["peak_tflops"])
# Alpha.append(cluster_info.rank_range[-1] - cluster_info.rank_range[-1] + 1)
Capacity.append(cluster_info["capacity"])

def switch_topology_aware_rank_scheduling():
"""
Switch topology-aware rank scheduling can optimize the performance of small-scale
collective communications. Currently only supported in Alibaba Cloud.
"""

local_rank = int(os.environ["LOCAL_RANK"])
cluster_name = get_cluster_name_from_ip()

try:
if cluster_name == "Ali":
pass
else:
rank = int(os.environ["MLP_WORKER_RACK_RANK_INDEX"]) * 8 + local_rank
except Exception as e:
logger.error(
f"The switch topology awareness error is reported, the reason is: {e}",
"but don’t worry, this error will not affect normal training.",
"If you train on Alibaba or Volcano Cloud, please contact wangguoteng or lijiaxing",
)
else:
# If there is no any error, hack torch rank.
os.environ["RANK"] = str(rank)
if local_rank == 0:
logger.info("Successfully bound node switch affinity!")


global_context = ParallelContext()
4 changes: 4 additions & 0 deletions internlm/core/context/process_group_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class ParallelMode(Enum):

# grouped query attention
GQA = "gqa"

INTRA_DP_SZIE = "intra_dp"

INTER_DP_SZIE = "inter_dp"


class ProcessGroupInitializer(ABC):
Expand Down
2 changes: 0 additions & 2 deletions internlm/core/context/process_group_initializer_simplified.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@
# -*- encoding: utf-8 -*-

from copy import deepcopy
from enum import Enum

import torch
import torch.distributed as dist

from internlm.utils.timeout import LLM_NCCL_TIMEOUT
from internlm.core.context.process_group_initializer import ParallelMode

class ParallelMeta:
def __init__(self, parallel_size, mode) -> None:
Expand Down
4 changes: 3 additions & 1 deletion internlm/initialize/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from internlm.accelerator import AcceleratorType, get_accelerator
from internlm.core.context import Config
from internlm.core.context import global_context as gpc
from internlm.core.context.process_group_initializer_simplified import ParallelMode
from internlm.core.context.process_group_initializer import ParallelMode
from internlm.utils.common import get_master_node
from internlm.utils.gputest import warmup_process_group
from internlm.utils.logger import get_logger
Expand Down Expand Up @@ -86,7 +86,9 @@ def add_simulator_arguments(parser):
group.add_argument(
"--pre_profiling_data_path", type=str, help="The path to pre-profiled performance data on the target cluster."
)
group.add_argument("--use_simplified_gp_init", action="store_true", default=True)
group.add_argument("--use_simplified_gp_init", action="store_true", default=False)

return parser


Expand Down
4 changes: 0 additions & 4 deletions internlm/model/ops/linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@

from internlm.accelerator import AcceleratorType, get_accelerator
from internlm.core.context import global_context as gpc
from internlm.simulator.ops.linear import (
_fake_linear_bwdward_op,
_fake_linear_forward_op,
)

try:
from fused_dense_lib import linear_bias_wgrad as _flash_linear_backward_op
Expand Down
54 changes: 54 additions & 0 deletions internlm/simulator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# InternLM Simulator


## 1. Introduction
The solver mainly consists of two components:
1. `profiling`: Collects the time consumption of each stage during the model training process in advance and saves it as data files and image files.
2. `simulation`: Simulates the model training process based on the collected data files and outputs the time consumption of each stage during the training process.

## 2. Usage

### 2.1 Generate profiling data

There are two types of profiling data:
1. 'linear' profiling data, include: [LINEAR]
2. 'Communication' profiling data, include: [ALL2ALL, ALLREDUCE, REDUCESCATTER, ALLGATHER, BROADCAST]


Note:
1. It is recommended to use more than 64 GPUs for data collection to ensure more accurate communication data.
2. Flash attention information is not collected in advance but is collected on the fly during the simulation and stored in the cache. This is because there are many variables that affect the performance of flash attention, and collecting in advance cannot cover all variables.

```python
# generate profiling data
torchrun --nproc-per-node=8 gen_profiler_data.py

# the profiling data will be saved in the following path
./prof_data
├── data.pt
└── pics
├── cal
│ └── linear.jpg
└── comm
├── all2all_intra_2_inter_1.jpg
├── all2all_intra_4_inter_1.jpg
├── all_gather_intra_2_inter_1.jpg
├── all_gather_intra_4_inter_1.jpg
├── all_reduce_intra_2_inter_1.jpg
├── all_reduce_intra_4_inter_1.jpg
├── broadcast_intra_2_inter_1.jpg
├── broadcast_intra_4_inter_1.jpg
├── reduce_scatter_intra_2_inter_1.jpg
└── reduce_scatter_intra_4_inter_1.jpg

```

### 2.2 Run simulation
```python
python simulation_train_formulaic.py --pre_profiling_data_path ./data/profiling_data.json --config configs/exp_simluator.py

```



## 4. 贡献
Loading

0 comments on commit 788c2fe

Please sign in to comment.