From 532adecae4fea6096e79ed6b9ef76c7560bdf3f6 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Sun, 24 Nov 2024 01:04:04 +0100 Subject: [PATCH] PR comments, further refactoring --- .../common/counters/FileSystemCounter.java | 38 ++++++++++++++++--- .../runtime/metrics/TaskCounterUpdater.java | 10 ++--- .../TestFileSystemStatisticUpdater.java | 27 +++++-------- .../metrics/TestTaskCounterUpdater.java | 12 ++---- 4 files changed, 51 insertions(+), 36 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java index 6cf15b2a52..8fe077962d 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java @@ -20,7 +20,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames; +import org.apache.hadoop.fs.statistics.StoreStatisticNames; +/** + * FileSystemCounter is an enum for defining which filesystem/storage statistics are exposed in Tez. + */ @Private public enum FileSystemCounter { BYTES_READ("bytesRead"), @@ -28,25 +32,49 @@ public enum FileSystemCounter { READ_OPS("readOps"), LARGE_READ_OPS("largeReadOps"), WRITE_OPS("writeOps"), - HDFS_BYTES_READ("hdfsBytesRead"), - HDFS_BYTES_WRITTEN("hdfsBytesWritten"), - FILE_BYTES_READ("fileBytesRead"), - FILE_BYTES_WRITTEN("fileBytesWritten"), + // Additional counters from HADOOP-13305 // Additional counters from HADOOP-13305 OP_APPEND(CommonStatisticNames.OP_APPEND), + OP_COPY_FROM_LOCAL_FILE(CommonStatisticNames.OP_COPY_FROM_LOCAL_FILE), OP_CREATE(CommonStatisticNames.OP_CREATE), + OP_CREATE_NON_RECURSIVE(CommonStatisticNames.OP_CREATE_NON_RECURSIVE), OP_DELETE(CommonStatisticNames.OP_DELETE), + OP_EXISTS(CommonStatisticNames.OP_EXISTS), + OP_GET_CONTENT_SUMMARY(CommonStatisticNames.OP_GET_CONTENT_SUMMARY), + OP_GET_DELEGATION_TOKEN(CommonStatisticNames.OP_GET_DELEGATION_TOKEN), + OP_GET_FILE_CHECKSUM(CommonStatisticNames.OP_GET_FILE_CHECKSUM), OP_GET_FILE_STATUS(CommonStatisticNames.OP_GET_FILE_STATUS), + OP_GET_STATUS(CommonStatisticNames.OP_GET_STATUS), + OP_GLOB_STATUS(CommonStatisticNames.OP_GLOB_STATUS), + OP_IS_FILE(CommonStatisticNames.OP_IS_FILE), + OP_IS_DIRECTORY(CommonStatisticNames.OP_IS_DIRECTORY), OP_LIST_FILES(CommonStatisticNames.OP_LIST_FILES), OP_LIST_LOCATED_STATUS(CommonStatisticNames.OP_LIST_LOCATED_STATUS), + OP_LIST_STATUS(CommonStatisticNames.OP_LIST_STATUS), OP_MKDIRS(CommonStatisticNames.OP_MKDIRS), + OP_MODIFY_ACL_ENTRIES(CommonStatisticNames.OP_MODIFY_ACL_ENTRIES), OP_OPEN(CommonStatisticNames.OP_OPEN), + OP_REMOVE_ACL(CommonStatisticNames.OP_REMOVE_ACL), + OP_REMOVE_ACL_ENTRIES(CommonStatisticNames.OP_REMOVE_ACL_ENTRIES), + OP_REMOVE_DEFAULT_ACL(CommonStatisticNames.OP_REMOVE_DEFAULT_ACL), OP_RENAME(CommonStatisticNames.OP_RENAME), OP_SET_ACL(CommonStatisticNames.OP_SET_ACL), OP_SET_OWNER(CommonStatisticNames.OP_SET_OWNER), OP_SET_PERMISSION(CommonStatisticNames.OP_SET_PERMISSION), - OP_GET_FILE_BLOCK_LOCATIONS("op_get_file_block_locations"); + OP_SET_TIMES(CommonStatisticNames.OP_SET_TIMES), + OP_TRUNCATE(CommonStatisticNames.OP_TRUNCATE), + + // counters below are not needed in production, as the scheme_countername expansion is taken care of by the + // FileSystemCounterGroup, the only reason they are here is that some analyzers still depend on them + @Deprecated + HDFS_BYTES_READ("hdfsBytesRead"), + @Deprecated + HDFS_BYTES_WRITTEN("hdfsBytesWritten"), + @Deprecated + FILE_BYTES_READ("fileBytesRead"), + @Deprecated + FILE_BYTES_WRITTEN("fileBytesWritten"); private final String opName; diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java index d0ab041b50..75e3e4c4ea 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java @@ -79,11 +79,11 @@ public void updateCounters() { Iterator iter = globalStorageStatistics.iterator(); while (iter.hasNext()) { StorageStatistics stats = iter.next(); - if (!statisticUpdaters.containsKey(stats.getScheme())) { - Map updaterSet = new TreeMap<>(); - statisticUpdaters.put(stats.getScheme(), updaterSet); - } - FileSystemStatisticUpdater updater = statisticUpdaters.get(stats.getScheme()) + // Fetch or initialize the updater set for the scheme + Map updaterSet = statisticUpdaters + .computeIfAbsent(stats.getScheme(), k -> new TreeMap<>()); + // Fetch or create the updater for the specific statistic + FileSystemStatisticUpdater updater = updaterSet .computeIfAbsent(stats.getName(), k -> new FileSystemStatisticUpdater(tezCounters, stats)); updater.updateCounters(); } diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java index 040960907c..c3d321f13f 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tez.common.counters.FileSystemCounter; import org.apache.tez.common.counters.TezCounter; @@ -42,7 +43,7 @@ public class TestFileSystemStatisticUpdater { private static MiniDFSCluster dfsCluster; - private static Configuration conf = new Configuration(); + private static final Configuration conf = new Configuration(); private static FileSystem remoteFs; private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR + @@ -52,8 +53,7 @@ public class TestFileSystemStatisticUpdater { public static void setup() throws IOException { try { conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); - dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null) - .build(); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); remoteFs = dfsCluster.getFileSystem(); } catch (IOException io) { throw new RuntimeException("problem starting mini dfs cluster", io); @@ -73,14 +73,11 @@ public void basicTest() throws IOException { TezCounters counters = new TezCounters(); TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid"); - remoteFs.mkdirs(new Path("/tmp/foo/")); - FSDataOutputStream out = remoteFs.create(new Path("/tmp/foo/abc.txt")); - out.writeBytes("xyz"); - out.close(); + DFSTestUtil.writeFile(remoteFs, new Path("/tmp/foo/abc.txt"), "xyz"); updater.updateCounters(); - LOG.info("Counters: " + counters); + LOG.info("Counters: {}", counters); TezCounter mkdirCounter = counters.findCounter(remoteFs.getScheme(), FileSystemCounter.OP_MKDIRS); TezCounter createCounter = counters.findCounter(remoteFs.getScheme(), @@ -90,26 +87,20 @@ public void basicTest() throws IOException { Assert.assertEquals(1, mkdirCounter.getValue()); Assert.assertEquals(1, createCounter.getValue()); - FSDataOutputStream out1 = remoteFs.create(new Path("/tmp/foo/abc1.txt")); - out1.writeBytes("xyz"); - out1.close(); + DFSTestUtil.writeFile(remoteFs, new Path("/tmp/foo/abc1.txt"), "xyz"); long oldCreateVal = createCounter.getValue(); updater.updateCounters(); - LOG.info("Counters: " + counters); + LOG.info("Counters: {}", counters); Assert.assertTrue("Counter not updated, old=" + oldCreateVal + ", new=" + createCounter.getValue(), createCounter.getValue() > oldCreateVal); oldCreateVal = createCounter.getValue(); // Ensure all numbers are reset - remoteFs.clearStatistics(); + FileSystem.clearStatistics(); updater.updateCounters(); - LOG.info("Counters: " + counters); + LOG.info("Counters: {}", counters); Assert.assertEquals(oldCreateVal, createCounter.getValue()); - } - - - } diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java index 88b0941fe7..d43f48b1c3 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java @@ -29,9 +29,8 @@ public class TestTaskCounterUpdater { - private static final Logger LOG = LoggerFactory.getLogger( - TestTaskCounterUpdater.class); - private static Configuration conf = new Configuration(); + private static final Logger LOG = LoggerFactory.getLogger(TestTaskCounterUpdater.class); + private static final Configuration conf = new Configuration(); @Test public void basicTest() { @@ -39,7 +38,7 @@ public void basicTest() { TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid"); updater.updateCounters(); - LOG.info("Counters: " + counters); + LOG.info("Counters: {}", counters); TezCounter gcCounter = counters.findCounter(TaskCounter.GC_TIME_MILLIS); TezCounter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS); @@ -49,11 +48,8 @@ public void basicTest() { Assert.assertTrue(cpuCounter.getValue() > 0); updater.updateCounters(); - LOG.info("Counters: " + counters); + LOG.info("Counters: {}", counters); Assert.assertTrue("Counter not updated, old=" + oldVal + ", new=" + cpuCounter.getValue(), cpuCounter.getValue() > oldVal); - } - - }