From 76d7d783afe4939627784de2a575b2ffc8e79d61 Mon Sep 17 00:00:00 2001 From: 7hong Date: Mon, 16 Dec 2024 10:13:47 +0800 Subject: [PATCH 1/3] [AMORO-3365]The table is always in committing state --- .../amoro/server/optimizing/OptimizingQueue.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index 64267b8c57..db9efebc3f 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -23,6 +23,7 @@ import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.api.OptimizingTaskId; import org.apache.amoro.exception.OptimizingClosedException; +import org.apache.amoro.exception.PersistenceException; import org.apache.amoro.optimizing.MetricsSummary; import org.apache.amoro.optimizing.OptimizingType; import org.apache.amoro.optimizing.RewriteFilesInput; @@ -564,7 +565,15 @@ public void commit() { lock.lock(); try { if (hasCommitted) { - LOG.warn("{} has already committed, give up", tableRuntime.getTableIdentifier()); + LOG.warn( + "{} has already committed, give up, last error: {}", + tableRuntime.getTableIdentifier(), + failedReason); + try { + persistProcessCompleted(status == ProcessStatus.SUCCESS); + } catch (Exception ignore) { + // ignore + } throw new IllegalStateException("repeat commit, and last error " + failedReason); } try { @@ -573,6 +582,11 @@ public void commit() { status = ProcessStatus.SUCCESS; endTime = System.currentTimeMillis(); persistProcessCompleted(true); + } catch (PersistenceException e) { + LOG.warn( + "{} failed to persist process completed, will retry next commit", + tableRuntime.getTableIdentifier(), + e); } catch (Exception e) { LOG.error("{} Commit optimizing failed ", tableRuntime.getTableIdentifier(), e); status = ProcessStatus.FAILED; From 87080284b18bfdb36882ebe0fcf2bd80fde089e4 Mon Sep 17 00:00:00 2001 From: 7hong Date: Wed, 22 Jan 2025 14:50:32 +0800 Subject: [PATCH 2/3] Close the committing process on start --- .../server/optimizing/OptimizingQueue.java | 29 ++++++++++++++++--- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index db10d9a8b6..2c8640d059 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -23,6 +23,7 @@ import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.api.OptimizingTaskId; import org.apache.amoro.exception.OptimizingClosedException; +import org.apache.amoro.exception.PersistenceException; import org.apache.amoro.optimizing.MetricsSummary; import org.apache.amoro.optimizing.OptimizingType; import org.apache.amoro.optimizing.RewriteFilesInput; @@ -120,9 +121,20 @@ private void initTableRuntime(TableRuntime tableRuntime) { if (tableRuntime.isOptimizingEnabled()) { tableRuntime.resetTaskQuotas( System.currentTimeMillis() - AmoroServiceConstants.QUOTA_LOOK_BACK_TIME); + // Close the committing process to avoid duplicate commit on the table. + if (tableRuntime.getOptimizingStatus() == OptimizingStatus.COMMITTING) { + OptimizingProcess process = tableRuntime.getOptimizingProcess(); + if (process != null) { + LOG.warn( + "Close the committing process {} on table {}", + process.getProcessId(), + tableRuntime.getTableIdentifier()); + process.close(); + } + } if (!tableRuntime.getOptimizingStatus().isProcessing()) { scheduler.addTable(tableRuntime); - } else if (tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING) { + } else { tableQueue.offer(new TableOptimizingProcess(tableRuntime)); } } else { @@ -569,6 +581,10 @@ public void commit() { try { if (hasCommitted) { LOG.warn("{} has already committed, give up", tableRuntime.getTableIdentifier()); + try { + persistAndSetCompleted(status == ProcessStatus.SUCCESS); + } catch (Exception ignored) { + } throw new IllegalStateException("repeat commit, and last error " + failedReason); } try { @@ -577,10 +593,15 @@ public void commit() { status = ProcessStatus.SUCCESS; endTime = System.currentTimeMillis(); persistAndSetCompleted(true); - } catch (Exception e) { - LOG.error("{} Commit optimizing failed ", tableRuntime.getTableIdentifier(), e); + } catch (PersistenceException e) { + LOG.warn( + "{} failed to persist process completed, will retry next commit", + tableRuntime.getTableIdentifier(), + e); + } catch (Throwable t) { + LOG.error("{} Commit optimizing failed ", tableRuntime.getTableIdentifier(), t); status = ProcessStatus.FAILED; - failedReason = ExceptionUtil.getErrorMessage(e, 4000); + failedReason = ExceptionUtil.getErrorMessage(t, 4000); endTime = System.currentTimeMillis(); persistAndSetCompleted(false); } From 008763a87d5631a981a00b7f7b8b181089330d7f Mon Sep 17 00:00:00 2001 From: 7hong Date: Thu, 23 Jan 2025 10:18:00 +0800 Subject: [PATCH 3/3] Fix unit tests Fix unit tests Fix unit tests Fix unit tests --- .../apache/amoro/server/TestDefaultOptimizingService.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java index 9f17b8a4ce..b68660a9cb 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java @@ -325,8 +325,12 @@ public void testReloadCompletedTask() { optimizingService().completeTask(token, buildOptimizingTaskResult(task.getTaskId())); reload(); - assertTaskCompleted(null); - Assertions.assertNull(optimizingService().pollTask(token, THREAD_ID)); + // Committing process will be closed when reloading + Assertions.assertNull( + tableService().getRuntime(serverTableIdentifier().getId()).getOptimizingProcess()); + Assertions.assertEquals( + OptimizingStatus.IDLE, + tableService().getRuntime(serverTableIdentifier().getId()).getOptimizingStatus()); } @Test