Skip to content

Commit

Permalink
HDDS-12110. Optimize memory overhead for OM background tasks.
Browse files Browse the repository at this point in the history
  • Loading branch information
deveshsingh committed Jan 24, 2025
1 parent ce82d12 commit 453790f
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,15 @@ public void handlePutEvent(OMDBUpdateEvent<String, Object> event,
HashMap<String, Long> unReplicatedSizeMap,
HashMap<String, Long> replicatedSizeMap) {

String countKey = getTableCountKeyFromTable(tableName);
String unReplicatedSizeKey = getUnReplicatedSizeKeyFromTable(tableName);
String replicatedSizeKey = getReplicatedSizeKeyFromTable(tableName);

if (event.getValue() != null) {
RepeatedOmKeyInfo repeatedOmKeyInfo =
(RepeatedOmKeyInfo) event.getValue();
objectCountMap.computeIfPresent(countKey,
objectCountMap.computeIfPresent(getTableCountKeyFromTable(tableName),
(k, count) -> count + repeatedOmKeyInfo.getOmKeyInfoList().size());
Pair<Long, Long> result = repeatedOmKeyInfo.getTotalSize();
unReplicatedSizeMap.computeIfPresent(unReplicatedSizeKey,
unReplicatedSizeMap.computeIfPresent(getUnReplicatedSizeKeyFromTable(tableName),
(k, size) -> size + result.getLeft());
replicatedSizeMap.computeIfPresent(replicatedSizeKey,
replicatedSizeMap.computeIfPresent(getReplicatedSizeKeyFromTable(tableName),
(k, size) -> size + result.getRight());
} else {
LOG.warn("Put event does not have the Key Info for {}.",
Expand All @@ -81,19 +77,15 @@ public void handleDeleteEvent(OMDBUpdateEvent<String, Object> event,
HashMap<String, Long> unReplicatedSizeMap,
HashMap<String, Long> replicatedSizeMap) {

String countKey = getTableCountKeyFromTable(tableName);
String unReplicatedSizeKey = getUnReplicatedSizeKeyFromTable(tableName);
String replicatedSizeKey = getReplicatedSizeKeyFromTable(tableName);

if (event.getValue() != null) {
RepeatedOmKeyInfo repeatedOmKeyInfo =
(RepeatedOmKeyInfo) event.getValue();
objectCountMap.computeIfPresent(countKey, (k, count) ->
objectCountMap.computeIfPresent(getTableCountKeyFromTable(tableName), (k, count) ->
count > 0 ? count - repeatedOmKeyInfo.getOmKeyInfoList().size() : 0L);
Pair<Long, Long> result = repeatedOmKeyInfo.getTotalSize();
unReplicatedSizeMap.computeIfPresent(unReplicatedSizeKey,
unReplicatedSizeMap.computeIfPresent(getUnReplicatedSizeKeyFromTable(tableName),
(k, size) -> size > result.getLeft() ? size - result.getLeft() : 0L);
replicatedSizeMap.computeIfPresent(replicatedSizeKey,
replicatedSizeMap.computeIfPresent(getReplicatedSizeKeyFromTable(tableName),
(k, size) -> size > result.getRight() ? size - result.getRight() :
0L);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public class OmTableInsightTask implements ReconOmTask {
private Configuration sqlConfiguration;
private ReconOMMetadataManager reconOMMetadataManager;
private Map<String, OmTableHandler> tableHandlers;
private Collection<String> tables;
private HashMap<String, Long> objectCountMap;
private HashMap<String, Long> unReplicatedSizeMap;
private HashMap<String, Long> replicatedSizeMap;

private List<GlobalStats> insertGlobalStats;
private List<GlobalStats> updateGlobalStats;

@Inject
public OmTableInsightTask(GlobalStatsDao globalStatsDao,
Expand All @@ -76,6 +83,9 @@ public OmTableInsightTask(GlobalStatsDao globalStatsDao,
tableHandlers.put(OPEN_KEY_TABLE, new OpenKeysInsightHandler());
tableHandlers.put(OPEN_FILE_TABLE, new OpenKeysInsightHandler());
tableHandlers.put(DELETED_TABLE, new DeletedKeysInsightHandler());

insertGlobalStats = new ArrayList<>();
updateGlobalStats = new ArrayList<>();
}

/**
Expand All @@ -92,16 +102,15 @@ public OmTableInsightTask(GlobalStatsDao globalStatsDao,
*/
@Override
public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
HashMap<String, Long> objectCountMap = initializeCountMap();
HashMap<String, Long> unReplicatedSizeMap = initializeSizeMap(false);
HashMap<String, Long> replicatedSizeMap = initializeSizeMap(true);
tables = getTaskTables();

// Initialize maps to store count and size information
objectCountMap = initializeCountMap();
unReplicatedSizeMap = initializeSizeMap(false);
replicatedSizeMap = initializeSizeMap(true);

for (String tableName : getTaskTables()) {
for (String tableName : tables) {
Table table = omMetadataManager.getTable(tableName);
if (table == null) {
LOG.error("Table " + tableName + " not found in OM Metadata.");
return new ImmutablePair<>(getTaskName(), false);
}

try (TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator
= table.iterator()) {
Expand Down Expand Up @@ -157,35 +166,32 @@ public Collection<String> getTaskTables() {
@Override
public Pair<String, Boolean> process(OMUpdateEventBatch events) {
Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
// Initialize maps to store count and size information
HashMap<String, Long> objectCountMap = initializeCountMap();
HashMap<String, Long> unReplicatedSizeMap = initializeSizeMap(false);
HashMap<String, Long> replicatedSizeMap = initializeSizeMap(true);
final Collection<String> taskTables = getTaskTables();

String tableName;
OMDBUpdateEvent<String, Object> omdbUpdateEvent;
// Process each update event
long startTime = System.currentTimeMillis();
while (eventIterator.hasNext()) {
OMDBUpdateEvent<String, Object> omdbUpdateEvent = eventIterator.next();
String tableName = omdbUpdateEvent.getTable();
if (!taskTables.contains(tableName)) {
omdbUpdateEvent = eventIterator.next();
tableName = omdbUpdateEvent.getTable();
if (!tables.contains(tableName)) {
continue;
}
try {
switch (omdbUpdateEvent.getAction()) {
case PUT:
handlePutEvent(omdbUpdateEvent, tableName, objectCountMap,
unReplicatedSizeMap, replicatedSizeMap);
handlePutEvent(omdbUpdateEvent, tableName
);
break;

case DELETE:
handleDeleteEvent(omdbUpdateEvent, tableName, objectCountMap,
unReplicatedSizeMap, replicatedSizeMap);
handleDeleteEvent(omdbUpdateEvent, tableName
);
break;

case UPDATE:
handleUpdateEvent(omdbUpdateEvent, tableName, objectCountMap,
unReplicatedSizeMap, replicatedSizeMap);
handleUpdateEvent(omdbUpdateEvent, tableName
);
break;

default:
Expand Down Expand Up @@ -215,11 +221,7 @@ public Pair<String, Boolean> process(OMUpdateEventBatch events) {
}

private void handlePutEvent(OMDBUpdateEvent<String, Object> event,
String tableName,
HashMap<String, Long> objectCountMap,
HashMap<String, Long> unReplicatedSizeMap,
HashMap<String, Long> replicatedSizeMap)
throws IOException {
String tableName) {
OmTableHandler tableHandler = tableHandlers.get(tableName);
if (event.getValue() != null) {
if (tableHandler != null) {
Expand All @@ -234,30 +236,22 @@ private void handlePutEvent(OMDBUpdateEvent<String, Object> event,


private void handleDeleteEvent(OMDBUpdateEvent<String, Object> event,
String tableName,
HashMap<String, Long> objectCountMap,
HashMap<String, Long> unReplicatedSizeMap,
HashMap<String, Long> replicatedSizeMap)
throws IOException {
String tableName) {
OmTableHandler tableHandler = tableHandlers.get(tableName);
if (event.getValue() != null) {
if (tableHandler != null) {
tableHandler.handleDeleteEvent(event, tableName, objectCountMap,
unReplicatedSizeMap, replicatedSizeMap);
} else {
String countKey = getTableCountKeyFromTable(tableName);
objectCountMap.computeIfPresent(countKey,
objectCountMap.computeIfPresent(getTableCountKeyFromTable(tableName),
(k, count) -> count > 0 ? count - 1L : 0L);
}
}
}


private void handleUpdateEvent(OMDBUpdateEvent<String, Object> event,
String tableName,
HashMap<String, Long> objectCountMap,
HashMap<String, Long> unReplicatedSizeMap,
HashMap<String, Long> replicatedSizeMap) {
String tableName) {

OmTableHandler tableHandler = tableHandlers.get(tableName);
if (event.getValue() != null) {
Expand All @@ -275,9 +269,6 @@ private void handleUpdateEvent(OMDBUpdateEvent<String, Object> event,
* @param dataMap Map containing the updated count and size information.
*/
private void writeDataToDB(Map<String, Long> dataMap) {
List<GlobalStats> insertGlobalStats = new ArrayList<>();
List<GlobalStats> updateGlobalStats = new ArrayList<>();

for (Entry<String, Long> entry : dataMap.entrySet()) {
Timestamp now =
using(sqlConfiguration).fetchValue(select(currentTimestamp()));
Expand All @@ -295,6 +286,9 @@ private void writeDataToDB(Map<String, Long> dataMap) {

globalStatsDao.insert(insertGlobalStats);
globalStatsDao.update(updateGlobalStats);

insertGlobalStats.clear();
updateGlobalStats.clear();
}

/**
Expand All @@ -303,13 +297,12 @@ private void writeDataToDB(Map<String, Long> dataMap) {
* @return The count map containing the counts for each table.
*/
private HashMap<String, Long> initializeCountMap() {
Collection<String> tables = getTaskTables();
HashMap<String, Long> objectCountMap = new HashMap<>(tables.size());
HashMap<String, Long> objCountMap = new HashMap<>(tables.size());
for (String tableName : tables) {
String key = getTableCountKeyFromTable(tableName);
objectCountMap.put(key, getValueForKey(key));
objCountMap.put(key, getValueForKey(key));
}
return objectCountMap;
return objCountMap;
}

/**
Expand All @@ -319,10 +312,12 @@ private HashMap<String, Long> initializeCountMap() {
* @return The size map containing the size counts for each table.
*/
private HashMap<String, Long> initializeSizeMap(boolean replicated) {
String tableName;
OmTableHandler tableHandler;
HashMap<String, Long> sizeCountMap = new HashMap<>();
for (Map.Entry<String, OmTableHandler> entry : tableHandlers.entrySet()) {
String tableName = entry.getKey();
OmTableHandler tableHandler = entry.getValue();
tableName = entry.getKey();
tableHandler = entry.getValue();
String key =
replicated ? tableHandler.getReplicatedSizeKeyFromTable(tableName) :
tableHandler.getUnReplicatedSizeKeyFromTable(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,12 @@ public void handlePutEvent(OMDBUpdateEvent<String, Object> event,
HashMap<String, Long> unReplicatedSizeMap,
HashMap<String, Long> replicatedSizeMap) {

String countKey = getTableCountKeyFromTable(tableName);
String unReplicatedSizeKey = getUnReplicatedSizeKeyFromTable(tableName);
String replicatedSizeKey = getReplicatedSizeKeyFromTable(tableName);

if (event.getValue() != null) {
OmKeyInfo omKeyInfo = (OmKeyInfo) event.getValue();
objectCountMap.computeIfPresent(countKey, (k, count) -> count + 1L);
unReplicatedSizeMap.computeIfPresent(unReplicatedSizeKey,
objectCountMap.computeIfPresent(getTableCountKeyFromTable(tableName), (k, count) -> count + 1L);
unReplicatedSizeMap.computeIfPresent(getUnReplicatedSizeKeyFromTable(tableName),
(k, size) -> size + omKeyInfo.getDataSize());
replicatedSizeMap.computeIfPresent(replicatedSizeKey,
replicatedSizeMap.computeIfPresent(getReplicatedSizeKeyFromTable(tableName),
(k, size) -> size + omKeyInfo.getReplicatedSize());
} else {
LOG.warn("Put event does not have the Key Info for {}.",
Expand All @@ -76,18 +72,14 @@ public void handleDeleteEvent(OMDBUpdateEvent<String, Object> event,
HashMap<String, Long> unReplicatedSizeMap,
HashMap<String, Long> replicatedSizeMap) {

String countKey = getTableCountKeyFromTable(tableName);
String unReplicatedSizeKey = getUnReplicatedSizeKeyFromTable(tableName);
String replicatedSizeKey = getReplicatedSizeKeyFromTable(tableName);

if (event.getValue() != null) {
OmKeyInfo omKeyInfo = (OmKeyInfo) event.getValue();
objectCountMap.computeIfPresent(countKey,
objectCountMap.computeIfPresent(getTableCountKeyFromTable(tableName),
(k, count) -> count > 0 ? count - 1L : 0L);
unReplicatedSizeMap.computeIfPresent(unReplicatedSizeKey,
unReplicatedSizeMap.computeIfPresent(getUnReplicatedSizeKeyFromTable(tableName),
(k, size) -> size > omKeyInfo.getDataSize() ?
size - omKeyInfo.getDataSize() : 0L);
replicatedSizeMap.computeIfPresent(replicatedSizeKey,
replicatedSizeMap.computeIfPresent(getReplicatedSizeKeyFromTable(tableName),
(k, size) -> size > omKeyInfo.getReplicatedSize() ?
size - omKeyInfo.getReplicatedSize() : 0L);
} else {
Expand All @@ -113,17 +105,15 @@ public void handleUpdateEvent(OMDBUpdateEvent<String, Object> event,
event.getKey());
return;
}
String unReplicatedSizeKey = getUnReplicatedSizeKeyFromTable(tableName);
String replicatedSizeKey = getReplicatedSizeKeyFromTable(tableName);

// In Update event the count for the open table will not change. So we
// don't need to update the count.
OmKeyInfo oldKeyInfo = (OmKeyInfo) event.getOldValue();
OmKeyInfo newKeyInfo = (OmKeyInfo) event.getValue();
unReplicatedSizeMap.computeIfPresent(unReplicatedSizeKey,
unReplicatedSizeMap.computeIfPresent(getUnReplicatedSizeKeyFromTable(tableName),
(k, size) -> size - oldKeyInfo.getDataSize() +
newKeyInfo.getDataSize());
replicatedSizeMap.computeIfPresent(replicatedSizeKey,
replicatedSizeMap.computeIfPresent(getReplicatedSizeKeyFromTable(tableName),
(k, size) -> size - oldKeyInfo.getReplicatedSize() +
newKeyInfo.getReplicatedSize());
} else {
Expand Down

0 comments on commit 453790f

Please sign in to comment.