From 1e3868df98f763c46dbdaff5bcff281e0b19b6d9 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Fri, 27 Dec 2024 00:57:29 +0000 Subject: [PATCH 01/14] add trompt ddp --- benchmark/data_frame_benchmark.py | 4 +- benchmark/data_frame_text_benchmark.py | 6 +- examples/trompt.py | 4 +- examples/trompt_multi_gpu.py | 245 +++++++++++++++++++++++++ test/nn/models/test_trompt.py | 4 +- torch_frame/nn/models/trompt.py | 5 +- 6 files changed, 257 insertions(+), 11 deletions(-) create mode 100644 examples/trompt_multi_gpu.py diff --git a/benchmark/data_frame_benchmark.py b/benchmark/data_frame_benchmark.py index 3916e3486..5685403d4 100644 --- a/benchmark/data_frame_benchmark.py +++ b/benchmark/data_frame_benchmark.py @@ -263,7 +263,7 @@ def train( pred, y = model(tf, mixup_encoded=True) elif isinstance(model, Trompt): # Trompt uses the layer-wise loss - pred = model.forward_stacked(tf) + pred = model(tf) num_layers = pred.size(1) # [batch_size * num_layers, num_classes] pred = pred.view(-1, out_channels) @@ -294,6 +294,8 @@ def test( for tf in loader: tf = tf.to(device) pred = model(tf) + if isinstance(model, Trompt): + pred = pred.mean(dim=1) if dataset.task_type == TaskType.MULTICLASS_CLASSIFICATION: pred = pred.argmax(dim=-1) elif dataset.task_type == TaskType.REGRESSION: diff --git a/benchmark/data_frame_text_benchmark.py b/benchmark/data_frame_text_benchmark.py index 7a8cb05f2..c440024ed 100644 --- a/benchmark/data_frame_text_benchmark.py +++ b/benchmark/data_frame_text_benchmark.py @@ -307,7 +307,7 @@ def train( y = tf.y if isinstance(model, Trompt): # Trompt uses the layer-wise loss - pred = model.forward_stacked(tf) + pred = model(tf) num_layers = pred.size(1) # [batch_size * num_layers, num_classes] pred = pred.view(-1, out_channels) @@ -337,6 +337,10 @@ def test( for tf in loader: tf = tf.to(device) pred = model(tf) + if isinstance(model, Trompt): + # [batch_size, num_layers, out_channels] + # -> [batch_size, out_channels] + pred = pred.mean(dim=1) if dataset.task_type == TaskType.MULTICLASS_CLASSIFICATION: pred = pred.argmax(dim=-1) elif dataset.task_type == TaskType.REGRESSION: diff --git a/examples/trompt.py b/examples/trompt.py index 9d7d6e964..342ab653e 100644 --- a/examples/trompt.py +++ b/examples/trompt.py @@ -90,7 +90,7 @@ def train(epoch: int) -> float: for tf in tqdm(train_loader, desc=f"Epoch: {epoch}"): tf = tf.to(device) # [batch_size, num_layers, num_classes] - out = model.forward_stacked(tf) + out = model(tf) num_layers = out.size(1) # [batch_size * num_layers, num_classes] pred = out.view(-1, dataset.num_classes) @@ -112,7 +112,7 @@ def test(loader: DataLoader) -> float: for tf in loader: tf = tf.to(device) - pred = model(tf) + pred = model(tf).mean(dim=1) pred_class = pred.argmax(dim=-1) accum += float((tf.y == pred_class).sum()) total_count += len(tf.y) diff --git a/examples/trompt_multi_gpu.py b/examples/trompt_multi_gpu.py new file mode 100644 index 000000000..9a18ba435 --- /dev/null +++ b/examples/trompt_multi_gpu.py @@ -0,0 +1,245 @@ +import argparse +import logging +import os +import os.path as osp + +import torch +import torch.distributed as dist +import torch.multiprocessing as mp +import torch.nn.functional as F +import torchmetrics +from torch.nn.parallel import DistributedDataParallel +from torch.optim.lr_scheduler import ExponentialLR +from torch.utils.data.distributed import DistributedSampler +from tqdm import tqdm + +from torch_frame.data import DataLoader +from torch_frame.datasets import TabularBenchmark +from torch_frame.nn import Trompt + + +def prepare_dataset(dataset_str: str) -> TabularBenchmark: + path = osp.join( + osp.dirname(osp.realpath(__file__)), + "..", + "data", + dataset_str, + ) + materialized_path = osp.join(path, 'materialized_data.pt') + if dist.get_rank() == 0: + logging.info(f"Preparing dataset '{dataset_str}' from '{path}'") + dataset = TabularBenchmark(root=path, name=dataset_str) + logging.info("Materializing dataset") + dataset.materialize(path=materialized_path) + + dist.barrier() + if dist.get_rank() != 0: + logging.info(f"Preparing dataset '{dataset_str}' from '{path}'") + dataset = TabularBenchmark(root=path, name=dataset_str) + logging.info("Loading materialized dataset") + dataset.materialize(path=materialized_path) + + dist.barrier() + return dataset + + +def train( + model: DistributedDataParallel, + epoch: int, + loader: DataLoader, + optimizer: torch.optim.Optimizer, + num_classes: int, + metric: torchmetrics.Metric, + rank: int, +) -> float: + model.train() + loss_accum = torch.tensor(0.0, device=rank, dtype=torch.float32) + for tf in tqdm(loader, desc=f"Epoch {epoch:02d}", disable=rank != 0): + tf = tf.to(rank) + # [batch_size, num_layers, num_classes] + out = model(tf) + with torch.no_grad(): + metric.update(out.mean(dim=1).argmax(dim=-1), tf.y) + num_layers = out.size(1) + # [batch_size * num_layers, num_classes] + pred = out.view(-1, num_classes) + y = tf.y.repeat_interleave(num_layers) + # Layer-wise logit loss + loss = F.cross_entropy(pred, y) + loss.backward() + optimizer.step() + optimizer.zero_grad() + loss_accum += loss + + # The number of samples is guaranteed to be the same across all ranks + # because of DistributedSampler(drop_last=True). + dist.all_reduce(loss_accum, op=dist.ReduceOp.AVG) + metric_value = metric.compute() + metric.reset() + return loss_accum, metric_value + + +@torch.no_grad() +def test( + model: DistributedDataParallel, + epoch: int, + loader: DataLoader, + metric: torchmetrics.Metric, + rank: int, + desc: str, +) -> float: + model.eval() + for tf in tqdm( + loader, + desc=f"Epoch {epoch:02d} ({desc})", + disable=rank != 0, + ): + tf = tf.to(rank) + # [batch_size, num_layers, num_classes] -> [batch_size, num_classes] + pred = model(tf).mean(dim=1) + pred_class = pred.argmax(dim=-1) + metric.update(pred_class, tf.y) + + metric_value = metric.compute() + metric.reset() + return metric_value + + +def run(rank, world_size, args) -> None: + dist.init_process_group( + backend='nccl', + init_method='env://', + world_size=world_size, + rank=rank, + ) + logging.basicConfig( + format=f"[rank={rank}] [%(asctime)s] %(levelname)s: %(message)s", + level=logging.INFO, + ) + logger = logging.getLogger(__name__) + logger.info(f"Running on rank {rank} of {world_size}") + dataset = prepare_dataset(args.dataset) + assert dataset.task_type.is_classification + + # Ensure train, val and test splits are the same across all ranks by + # setting the seed before shuffling. + torch.manual_seed(args.seed) + dataset = dataset.shuffle() + train_dataset, val_dataset, test_dataset = ( + dataset[:0.7], + dataset[0.7:0.79], + dataset[0.79:], + ) + train_loader = DataLoader( + train_dataset.tensor_frame, + batch_size=args.batch_size, + sampler=DistributedSampler( + train_dataset, + shuffle=True, + drop_last=True, + ), + ) + val_loader = DataLoader( + val_dataset.tensor_frame, + batch_size=args.batch_size, + sampler=DistributedSampler( + val_dataset, + shuffle=False, + drop_last=False, + ), + ) + test_loader = DataLoader( + test_dataset.tensor_frame, + batch_size=args.batch_size, + sampler=DistributedSampler( + test_dataset, + shuffle=False, + drop_last=False, + ), + ) + model = Trompt( + channels=args.channels, + out_channels=dataset.num_classes, + num_prompts=args.num_prompts, + num_layers=args.num_layers, + col_stats=dataset.col_stats, + col_names_dict=train_dataset.tensor_frame.col_names_dict, + ).to(rank) + model = DistributedDataParallel(model, device_ids=[rank]) + optimizer = torch.optim.Adam(model.parameters(), lr=args.lr) + lr_scheduler = ExponentialLR(optimizer, gamma=0.95) + + metrics_kwargs = { + "task": "multiclass", + "num_classes": dataset.num_classes, + } + train_metric = torchmetrics.Accuracy(**metrics_kwargs).to(rank) + val_metric = torchmetrics.Accuracy(**metrics_kwargs).to(rank) + test_metric = torchmetrics.Accuracy(**metrics_kwargs).to(rank) + + best_val_acc = 0.0 + best_test_acc = 0.0 + for epoch in range(1, args.epochs + 1): + train_loader.sampler.set_epoch(epoch) + train_loss, train_acc = train( + model, + epoch, + train_loader, + optimizer, + dataset.num_classes, + train_metric, + rank, + ) + val_acc = test( + model, + epoch, + val_loader, + val_metric, + rank, + 'val', + ) + test_acc = test( + model, + epoch, + test_loader, + test_metric, + rank, + 'test', + ) + if best_val_acc < val_acc: + best_val_acc = val_acc + best_test_acc = test_acc + if rank == 0: + print(f"Train Loss: {train_loss:.4f}, " + f"Train Acc: {train_acc:.4f}, " + f"Val Acc: {val_acc:.4f}, " + f"Test Acc: {test_acc:.4f}") + + lr_scheduler.step() + + if rank == 0: + print(f"Best Val Acc: {best_val_acc:.4f}, " + f"Best Test Acc: {best_test_acc:.4f}") + + dist.destroy_process_group() + logging.info("Process group destroyed") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--dataset", type=str, default="california") + parser.add_argument("--channels", type=int, default=128) + parser.add_argument("--num_prompts", type=int, default=128) + parser.add_argument("--num_layers", type=int, default=6) + parser.add_argument("--batch_size", type=int, default=256) + parser.add_argument("--lr", type=float, default=0.001) + parser.add_argument("--epochs", type=int, default=200) + parser.add_argument("--seed", type=int, default=0) + parser.add_argument("--compile", action="store_true") + args = parser.parse_args() + + os.environ['MASTER_ADDR'] = 'localhost' + os.environ['MASTER_PORT'] = '12355' + + world_size = torch.cuda.device_count() + mp.spawn(run, args=(world_size, args), nprocs=world_size, join=True) diff --git a/test/nn/models/test_trompt.py b/test/nn/models/test_trompt.py index 3ed78a966..b117c3383 100644 --- a/test/nn/models/test_trompt.py +++ b/test/nn/models/test_trompt.py @@ -47,7 +47,5 @@ def test_trompt(batch_size, use_stype_encoder_dicts): stype_encoder_dicts=stype_encoder_dicts, ) model.reset_parameters() - out = model.forward_stacked(tensor_frame) - assert out.shape == (batch_size, num_layers, out_channels) pred = model(tensor_frame) - assert pred.shape == (batch_size, out_channels) + assert pred.shape == (batch_size, num_layers, out_channels) diff --git a/torch_frame/nn/models/trompt.py b/torch_frame/nn/models/trompt.py index ccb39524e..a7cf36ca8 100644 --- a/torch_frame/nn/models/trompt.py +++ b/torch_frame/nn/models/trompt.py @@ -122,7 +122,7 @@ def reset_parameters(self) -> None: trompt_conv.reset_parameters() self.trompt_decoder.reset_parameters() - def forward_stacked(self, tf: TensorFrame) -> Tensor: + def forward(self, tf: TensorFrame) -> Tensor: r"""Transforming :class:`TensorFrame` object into a series of output predictions at each layer. Used during training to compute layer-wise loss. @@ -152,6 +152,3 @@ def forward_stacked(self, tf: TensorFrame) -> Tensor: # [batch_size, num_layers, out_channels] stacked_out = torch.cat(outs, dim=1) return stacked_out - - def forward(self, tf: TensorFrame) -> Tensor: - return self.forward_stacked(tf).mean(dim=1) From e3431d29c42e4e200a6f6db993f61380ea86dbaa Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Fri, 27 Dec 2024 01:09:06 +0000 Subject: [PATCH 02/14] update --- examples/trompt_multi_gpu.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/examples/trompt_multi_gpu.py b/examples/trompt_multi_gpu.py index 9a18ba435..c9cadcfdb 100644 --- a/examples/trompt_multi_gpu.py +++ b/examples/trompt_multi_gpu.py @@ -48,19 +48,24 @@ def train( epoch: int, loader: DataLoader, optimizer: torch.optim.Optimizer, - num_classes: int, metric: torchmetrics.Metric, rank: int, ) -> float: model.train() loss_accum = torch.tensor(0.0, device=rank, dtype=torch.float32) - for tf in tqdm(loader, desc=f"Epoch {epoch:02d}", disable=rank != 0): + for tf in tqdm( + loader, + desc=f"Epoch {epoch:02d} (train)", + disable=rank != 0, + ): tf = tf.to(rank) # [batch_size, num_layers, num_classes] out = model(tf) + with torch.no_grad(): metric.update(out.mean(dim=1).argmax(dim=-1), tf.y) - num_layers = out.size(1) + + _, num_layers, num_classes = out.size() # [batch_size * num_layers, num_classes] pred = out.view(-1, num_classes) y = tf.y.repeat_interleave(num_layers) @@ -105,7 +110,7 @@ def test( return metric_value -def run(rank, world_size, args) -> None: +def run(rank: int, world_size: int, args: argparse.Namespace) -> None: dist.init_process_group( backend='nccl', init_method='env://', @@ -113,7 +118,8 @@ def run(rank, world_size, args) -> None: rank=rank, ) logging.basicConfig( - format=f"[rank={rank}] [%(asctime)s] %(levelname)s: %(message)s", + format=(f"[rank={rank}/{world_size}] " + f"[%(asctime)s] %(levelname)s: %(message)s"), level=logging.INFO, ) logger = logging.getLogger(__name__) @@ -122,7 +128,7 @@ def run(rank, world_size, args) -> None: assert dataset.task_type.is_classification # Ensure train, val and test splits are the same across all ranks by - # setting the seed before shuffling. + # setting the seed on each rank. torch.manual_seed(args.seed) dataset = dataset.shuffle() train_dataset, val_dataset, test_dataset = ( @@ -186,7 +192,6 @@ def run(rank, world_size, args) -> None: epoch, train_loader, optimizer, - dataset.num_classes, train_metric, rank, ) From f566dfd964033fb128e8569db26d1957d6f27ded Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Fri, 27 Dec 2024 01:11:07 +0000 Subject: [PATCH 03/14] update --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fae9694b9..4854ca4de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). ### Added +- Added an example for multi-GPU training of `Trompt` ([#474](https://github.com/pyg-team/pytorch-frame/pull/474)) - Added support for PyTorch 2.5 ([#464](https://github.com/pyg-team/pytorch-frame/pull/464)) - Added a benchmark script to compare PyTorch Frame with PyTorch Tabular ([#398](https://github.com/pyg-team/pytorch-frame/pull/398), [#444](https://github.com/pyg-team/pytorch-frame/pull/444)) - Added `is_floating_point` method to `MultiNestedTensor` and `MultiEmbeddingTensor` ([#445](https://github.com/pyg-team/pytorch-frame/pull/445)) From c411f932b580f90e6b36078680d986bbab7c5252 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Fri, 27 Dec 2024 01:26:24 +0000 Subject: [PATCH 04/14] update test --- test/nn/models/test_compile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/nn/models/test_compile.py b/test/nn/models/test_compile.py index dc22527cf..79db1da5e 100644 --- a/test/nn/models/test_compile.py +++ b/test/nn/models/test_compile.py @@ -54,7 +54,7 @@ Trompt, dict(channels=8, num_prompts=2), None, - 4, + 3, id="Trompt", ), pytest.param( From 024e8a584195e51b4148bad5393af9b373eda050 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Fri, 27 Dec 2024 01:27:30 +0000 Subject: [PATCH 05/14] update --- examples/trompt_multi_gpu.py | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/trompt_multi_gpu.py b/examples/trompt_multi_gpu.py index c9cadcfdb..1dcd21182 100644 --- a/examples/trompt_multi_gpu.py +++ b/examples/trompt_multi_gpu.py @@ -240,7 +240,6 @@ def run(rank: int, world_size: int, args: argparse.Namespace) -> None: parser.add_argument("--lr", type=float, default=0.001) parser.add_argument("--epochs", type=int, default=200) parser.add_argument("--seed", type=int, default=0) - parser.add_argument("--compile", action="store_true") args = parser.parse_args() os.environ['MASTER_ADDR'] = 'localhost' From 30ac94332ef9c56a4e477976412ab4b6c1cc26b8 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Fri, 27 Dec 2024 01:30:11 +0000 Subject: [PATCH 06/14] update --- examples/trompt_multi_gpu.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/trompt_multi_gpu.py b/examples/trompt_multi_gpu.py index 1dcd21182..d24673894 100644 --- a/examples/trompt_multi_gpu.py +++ b/examples/trompt_multi_gpu.py @@ -55,7 +55,7 @@ def train( loss_accum = torch.tensor(0.0, device=rank, dtype=torch.float32) for tf in tqdm( loader, - desc=f"Epoch {epoch:02d} (train)", + desc=f"Epoch {epoch:03d} (train)", disable=rank != 0, ): tf = tf.to(rank) @@ -96,7 +96,7 @@ def test( model.eval() for tf in tqdm( loader, - desc=f"Epoch {epoch:02d} ({desc})", + desc=f"Epoch {epoch:03d} ({desc})", disable=rank != 0, ): tf = tf.to(rank) From 4d07d343bdff04bd39c2bf2a139b97bc2f61679c Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Fri, 27 Dec 2024 02:00:32 +0000 Subject: [PATCH 07/14] no stream sync --- examples/trompt_multi_gpu.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/examples/trompt_multi_gpu.py b/examples/trompt_multi_gpu.py index d24673894..b4e1a12e7 100644 --- a/examples/trompt_multi_gpu.py +++ b/examples/trompt_multi_gpu.py @@ -65,10 +65,13 @@ def train( with torch.no_grad(): metric.update(out.mean(dim=1).argmax(dim=-1), tf.y) - _, num_layers, num_classes = out.size() + batch_size, num_layers, num_classes = out.size() # [batch_size * num_layers, num_classes] pred = out.view(-1, num_classes) - y = tf.y.repeat_interleave(num_layers) + y = tf.y.repeat_interleave( + num_layers, + output_size=num_layers * batch_size, + ) # Layer-wise logit loss loss = F.cross_entropy(pred, y) loss.backward() From a494e01b04dfbba4be9e03ad838d5648bc1744c3 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Fri, 27 Dec 2024 02:01:44 +0000 Subject: [PATCH 08/14] update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4854ca4de..239b27880 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). ### Added -- Added an example for multi-GPU training of `Trompt` ([#474](https://github.com/pyg-team/pytorch-frame/pull/474)) +- Added an example for training `Trompt` on multiple GPUs ([#474](https://github.com/pyg-team/pytorch-frame/pull/474)) - Added support for PyTorch 2.5 ([#464](https://github.com/pyg-team/pytorch-frame/pull/464)) - Added a benchmark script to compare PyTorch Frame with PyTorch Tabular ([#398](https://github.com/pyg-team/pytorch-frame/pull/398), [#444](https://github.com/pyg-team/pytorch-frame/pull/444)) - Added `is_floating_point` method to `MultiNestedTensor` and `MultiEmbeddingTensor` ([#445](https://github.com/pyg-team/pytorch-frame/pull/445)) From 1941549c8cb0483f062836a62fd40f032c21c120 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Fri, 27 Dec 2024 02:48:32 +0000 Subject: [PATCH 09/14] update --- examples/trompt_multi_gpu.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/examples/trompt_multi_gpu.py b/examples/trompt_multi_gpu.py index b4e1a12e7..caaac158d 100644 --- a/examples/trompt_multi_gpu.py +++ b/examples/trompt_multi_gpu.py @@ -187,7 +187,7 @@ def run(rank: int, world_size: int, args: argparse.Namespace) -> None: test_metric = torchmetrics.Accuracy(**metrics_kwargs).to(rank) best_val_acc = 0.0 - best_test_acc = 0.0 + test_acc = 0.0 for epoch in range(1, args.epochs + 1): train_loader.sampler.set_epoch(epoch) train_loss, train_acc = train( @@ -206,28 +206,26 @@ def run(rank: int, world_size: int, args: argparse.Namespace) -> None: rank, 'val', ) - test_acc = test( - model, - epoch, - test_loader, - test_metric, - rank, - 'test', - ) if best_val_acc < val_acc: best_val_acc = val_acc - best_test_acc = test_acc + test_acc = test( + model, + epoch, + test_loader, + test_metric, + rank, + 'test', + ) if rank == 0: print(f"Train Loss: {train_loss:.4f}, " f"Train Acc: {train_acc:.4f}, " - f"Val Acc: {val_acc:.4f}, " - f"Test Acc: {test_acc:.4f}") + f"Val Acc: {val_acc:.4f}") lr_scheduler.step() if rank == 0: print(f"Best Val Acc: {best_val_acc:.4f}, " - f"Best Test Acc: {best_test_acc:.4f}") + f"Test Acc: {test_acc:.4f}") dist.destroy_process_group() logging.info("Process group destroyed") @@ -241,7 +239,7 @@ def run(rank: int, world_size: int, args: argparse.Namespace) -> None: parser.add_argument("--num_layers", type=int, default=6) parser.add_argument("--batch_size", type=int, default=256) parser.add_argument("--lr", type=float, default=0.001) - parser.add_argument("--epochs", type=int, default=200) + parser.add_argument("--epochs", type=int, default=50) parser.add_argument("--seed", type=int, default=0) args = parser.parse_args() From 35d2fe57edfbe7885c1ec3288f799e171693a59d Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Fri, 27 Dec 2024 03:30:05 +0000 Subject: [PATCH 10/14] update --- examples/trompt_multi_gpu.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/trompt_multi_gpu.py b/examples/trompt_multi_gpu.py index caaac158d..7ebd51950 100644 --- a/examples/trompt_multi_gpu.py +++ b/examples/trompt_multi_gpu.py @@ -125,8 +125,7 @@ def run(rank: int, world_size: int, args: argparse.Namespace) -> None: f"[%(asctime)s] %(levelname)s: %(message)s"), level=logging.INFO, ) - logger = logging.getLogger(__name__) - logger.info(f"Running on rank {rank} of {world_size}") + logging.info(f"Initialized rank {rank}/{world_size}") dataset = prepare_dataset(args.dataset) assert dataset.task_type.is_classification From b3123855c411ead583034d2dffff16f247751ef9 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Fri, 27 Dec 2024 03:47:18 +0000 Subject: [PATCH 11/14] add --compile --- examples/trompt_multi_gpu.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/trompt_multi_gpu.py b/examples/trompt_multi_gpu.py index 7ebd51950..eab9ad1d0 100644 --- a/examples/trompt_multi_gpu.py +++ b/examples/trompt_multi_gpu.py @@ -174,6 +174,7 @@ def run(rank: int, world_size: int, args: argparse.Namespace) -> None: col_names_dict=train_dataset.tensor_frame.col_names_dict, ).to(rank) model = DistributedDataParallel(model, device_ids=[rank]) + model = torch.compile(model) if args.compile else model optimizer = torch.optim.Adam(model.parameters(), lr=args.lr) lr_scheduler = ExponentialLR(optimizer, gamma=0.95) @@ -240,6 +241,7 @@ def run(rank: int, world_size: int, args: argparse.Namespace) -> None: parser.add_argument("--lr", type=float, default=0.001) parser.add_argument("--epochs", type=int, default=50) parser.add_argument("--seed", type=int, default=0) + parser.add_argument("--compile", action="store_true") args = parser.parse_args() os.environ['MASTER_ADDR'] = 'localhost' From aabe3b03f59845c0e6a1e12d5db3692dae171623 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Fri, 27 Dec 2024 03:52:16 +0000 Subject: [PATCH 12/14] update --- examples/trompt_multi_gpu.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/examples/trompt_multi_gpu.py b/examples/trompt_multi_gpu.py index eab9ad1d0..07f2bb809 100644 --- a/examples/trompt_multi_gpu.py +++ b/examples/trompt_multi_gpu.py @@ -79,8 +79,6 @@ def train( optimizer.zero_grad() loss_accum += loss - # The number of samples is guaranteed to be the same across all ranks - # because of DistributedSampler(drop_last=True). dist.all_reduce(loss_accum, op=dist.ReduceOp.AVG) metric_value = metric.compute() metric.reset() From cd1ddc09480b72bd3dcf2f5482c40f8b97a3086e Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Fri, 27 Dec 2024 04:10:19 +0000 Subject: [PATCH 13/14] update --- examples/trompt_multi_gpu.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/examples/trompt_multi_gpu.py b/examples/trompt_multi_gpu.py index 07f2bb809..436c5c6d6 100644 --- a/examples/trompt_multi_gpu.py +++ b/examples/trompt_multi_gpu.py @@ -136,6 +136,10 @@ def run(rank: int, world_size: int, args: argparse.Namespace) -> None: dataset[0.7:0.79], dataset[0.79:], ) + # Note that the last batch of evaluation loops is dropped for now because + # drop_last=False will duplicate samples to fill the last batch, leading to + # the wrong evaluation metrics. + # https://github.com/pytorch/pytorch/issues/25162 train_loader = DataLoader( train_dataset.tensor_frame, batch_size=args.batch_size, @@ -150,8 +154,8 @@ def run(rank: int, world_size: int, args: argparse.Namespace) -> None: batch_size=args.batch_size, sampler=DistributedSampler( val_dataset, - shuffle=False, - drop_last=False, + shuffle=True, + drop_last=True, ), ) test_loader = DataLoader( @@ -159,8 +163,8 @@ def run(rank: int, world_size: int, args: argparse.Namespace) -> None: batch_size=args.batch_size, sampler=DistributedSampler( test_dataset, - shuffle=False, - drop_last=False, + shuffle=True, + drop_last=True, ), ) model = Trompt( From 7d090e7436bb078704f37ec8be5975787445ea12 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Fri, 27 Dec 2024 04:13:12 +0000 Subject: [PATCH 14/14] update --- examples/trompt_multi_gpu.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/examples/trompt_multi_gpu.py b/examples/trompt_multi_gpu.py index 436c5c6d6..10ef4c205 100644 --- a/examples/trompt_multi_gpu.py +++ b/examples/trompt_multi_gpu.py @@ -179,7 +179,6 @@ def run(rank: int, world_size: int, args: argparse.Namespace) -> None: model = torch.compile(model) if args.compile else model optimizer = torch.optim.Adam(model.parameters(), lr=args.lr) lr_scheduler = ExponentialLR(optimizer, gamma=0.95) - metrics_kwargs = { "task": "multiclass", "num_classes": dataset.num_classes, @@ -187,7 +186,6 @@ def run(rank: int, world_size: int, args: argparse.Namespace) -> None: train_metric = torchmetrics.Accuracy(**metrics_kwargs).to(rank) val_metric = torchmetrics.Accuracy(**metrics_kwargs).to(rank) test_metric = torchmetrics.Accuracy(**metrics_kwargs).to(rank) - best_val_acc = 0.0 test_acc = 0.0 for epoch in range(1, args.epochs + 1):