diff --git a/kernel/global-clock/core/pom.xml b/kernel/global-clock/core/pom.xml index 9c38d93d620c4..008170ee423e6 100644 --- a/kernel/global-clock/core/pom.xml +++ b/kernel/global-clock/core/pom.xml @@ -43,6 +43,12 @@ ${project.version} + + org.apache.shardingsphere + shardingsphere-test-util + ${project.version} + test + org.apache.shardingsphere shardingsphere-test-it-yaml diff --git a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java index 49a040437b76d..87a0ba3f06752 100644 --- a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java +++ b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.globalclock.executor; +import com.google.common.base.Preconditions; import org.apache.shardingsphere.globalclock.provider.GlobalClockProvider; import org.apache.shardingsphere.globalclock.rule.GlobalClockRule; import org.apache.shardingsphere.globalclock.rule.constant.GlobalClockOrder; @@ -66,17 +67,16 @@ public void afterCreateConnections(final GlobalClockRule rule, final DatabaseTyp @Override public void beforeExecuteSQL(final GlobalClockRule rule, final DatabaseType databaseType, final Collection connections, final TransactionConnectionContext connectionContext, final TransactionIsolationLevel isolationLevel) throws SQLException { - if (!rule.getConfiguration().isEnabled()) { + if (!rule.getConfiguration().isEnabled() || null != isolationLevel && TransactionIsolationLevel.READ_COMMITTED != isolationLevel) { return; } - if (null != isolationLevel && TransactionIsolationLevel.READ_COMMITTED != isolationLevel) { + Optional globalClockTransactionExecutor = DatabaseTypedSPILoader.findService(GlobalClockTransactionExecutor.class, databaseType); + if (!globalClockTransactionExecutor.isPresent()) { return; } - Optional globalClockTransactionExecutor = DatabaseTypedSPILoader.findService(GlobalClockTransactionExecutor.class, databaseType); Optional globalClockProvider = rule.getGlobalClockProvider(); - if (globalClockTransactionExecutor.isPresent() && globalClockProvider.isPresent()) { - globalClockTransactionExecutor.get().sendSnapshotTimestamp(connections, globalClockProvider.get().getCurrentTimestamp()); - } + Preconditions.checkState(globalClockProvider.isPresent()); + globalClockTransactionExecutor.get().sendSnapshotTimestamp(connections, globalClockProvider.get().getCurrentTimestamp()); } @Override @@ -88,10 +88,12 @@ public void beforeCommit(final GlobalClockRule rule, final DatabaseType database } if (lockContext.tryLock(lockDefinition, 200L)) { Optional globalClockTransactionExecutor = DatabaseTypedSPILoader.findService(GlobalClockTransactionExecutor.class, databaseType); - Optional globalClockProvider = rule.getGlobalClockProvider(); - if (globalClockTransactionExecutor.isPresent() && globalClockProvider.isPresent()) { - globalClockTransactionExecutor.get().sendCommitTimestamp(connections, globalClockProvider.get().getCurrentTimestamp()); + if (!globalClockTransactionExecutor.isPresent()) { + return; } + Optional globalClockProvider = rule.getGlobalClockProvider(); + Preconditions.checkState(globalClockProvider.isPresent()); + globalClockTransactionExecutor.get().sendCommitTimestamp(connections, globalClockProvider.get().getCurrentTimestamp()); } } diff --git a/kernel/global-clock/core/src/test/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHookTest.java b/kernel/global-clock/core/src/test/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHookTest.java index 910050cc44091..1a5a16c0486f1 100644 --- a/kernel/global-clock/core/src/test/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHookTest.java +++ b/kernel/global-clock/core/src/test/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHookTest.java @@ -21,6 +21,7 @@ import org.apache.shardingsphere.globalclock.rule.GlobalClockRule; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import org.apache.shardingsphere.infra.lock.LockContext; import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext; import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader; import org.apache.shardingsphere.sql.parser.statement.core.enums.TransactionIsolationLevel; @@ -40,6 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -127,26 +129,83 @@ void assertBeforeExecuteSQLWhenNotReadCommittedIsolationLevel() throws SQLExcept @Test void assertBeforeExecuteSQLWhenGlobalClockTransactionExecutorAbsent() throws SQLException { when(rule.getConfiguration().isEnabled()).thenReturn(true); + when(rule.getGlobalClockProvider()).thenReturn(Optional.of(globalClockProvider)); transactionHook.beforeExecuteSQL(rule, databaseType, Collections.emptyList(), transactionContext, TransactionIsolationLevel.READ_COMMITTED); when(DatabaseTypedSPILoader.findService(GlobalClockTransactionExecutor.class, databaseType)).thenReturn(Optional.empty()); verify(globalClockTransactionExecutor, times(0)).sendSnapshotTimestamp(any(), anyLong()); } @Test - void assertBeforeExecuteSQLWhenGlobalClockProviderAbsent() throws SQLException { + void assertBeforeExecuteSQLWhenNullTransactionIsolationLevel() throws SQLException { when(rule.getConfiguration().isEnabled()).thenReturn(true); - transactionHook.beforeExecuteSQL(rule, databaseType, Collections.emptyList(), transactionContext, TransactionIsolationLevel.READ_COMMITTED); + when(rule.getGlobalClockProvider()).thenReturn(Optional.of(globalClockProvider)); when(DatabaseTypedSPILoader.findService(GlobalClockTransactionExecutor.class, databaseType)).thenReturn(Optional.of(globalClockTransactionExecutor)); - verify(globalClockTransactionExecutor, times(0)).sendSnapshotTimestamp(any(), anyLong()); + when(globalClockProvider.getCurrentTimestamp()).thenReturn(10L); + transactionHook.beforeExecuteSQL(rule, databaseType, Collections.emptyList(), transactionContext, null); + verify(globalClockTransactionExecutor).sendSnapshotTimestamp(Collections.emptyList(), 10L); } + @SuppressWarnings({"rawtypes", "unchecked"}) @Test - void assertBeforeExecuteSQLWhenNullTransactionIsolationLevel() throws SQLException { + void assertBeforeCommitWhenDisabledGlobalClockRule() throws SQLException { + LockContext lockContext = mock(LockContext.class); + transactionHook.beforeCommit(rule, databaseType, Collections.emptyList(), transactionContext, lockContext); + verify(lockContext, times(0)).tryLock(any(), anyLong()); + } + + @SuppressWarnings("rawtypes") + @Test + void assertBeforeCommitWhenTryLockFailed() throws SQLException { + when(rule.getConfiguration().isEnabled()).thenReturn(true); + LockContext lockContext = mock(LockContext.class); + transactionHook.beforeCommit(rule, databaseType, Collections.emptyList(), transactionContext, lockContext); + verify(globalClockTransactionExecutor, times(0)).sendCommitTimestamp(any(), anyLong()); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test + void assertBeforeCommitWhenGlobalClockTransactionExecutorAbsent() throws SQLException { + when(rule.getConfiguration().isEnabled()).thenReturn(true); + LockContext lockContext = mock(LockContext.class); + when(lockContext.tryLock(any(), anyLong())).thenReturn(true); + when(DatabaseTypedSPILoader.findService(GlobalClockTransactionExecutor.class, databaseType)).thenReturn(Optional.empty()); + transactionHook.beforeCommit(rule, databaseType, Collections.emptyList(), transactionContext, lockContext); + verify(globalClockTransactionExecutor, times(0)).sendCommitTimestamp(any(), anyLong()); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test + void assertBeforeCommit() throws SQLException { when(rule.getConfiguration().isEnabled()).thenReturn(true); when(rule.getGlobalClockProvider()).thenReturn(Optional.of(globalClockProvider)); - when(DatabaseTypedSPILoader.findService(GlobalClockTransactionExecutor.class, databaseType)).thenReturn(Optional.of(globalClockTransactionExecutor)); when(globalClockProvider.getCurrentTimestamp()).thenReturn(10L); - transactionHook.beforeExecuteSQL(rule, databaseType, Collections.emptyList(), transactionContext, null); - verify(globalClockTransactionExecutor).sendSnapshotTimestamp(Collections.emptyList(), 10L); + LockContext lockContext = mock(LockContext.class); + when(lockContext.tryLock(any(), anyLong())).thenReturn(true); + when(DatabaseTypedSPILoader.findService(GlobalClockTransactionExecutor.class, databaseType)).thenReturn(Optional.of(globalClockTransactionExecutor)); + transactionHook.beforeCommit(rule, databaseType, Collections.emptyList(), transactionContext, lockContext); + verify(globalClockTransactionExecutor).sendCommitTimestamp(Collections.emptyList(), 10L); + } + + @Test + void assertAfterCommitWhenGlobalClockProviderAbsent() { + transactionHook.afterCommit(rule, databaseType, Collections.emptyList(), transactionContext, mock(LockContext.class)); + verify(globalClockProvider, times(0)).getNextTimestamp(); + } + + @Test + void assertAfterCommitWhenGlobalClockProviderPresent() { + when(rule.getGlobalClockProvider()).thenReturn(Optional.of(globalClockProvider)); + transactionHook.afterCommit(rule, databaseType, Collections.emptyList(), transactionContext, mock(LockContext.class)); + verify(globalClockProvider).getNextTimestamp(); + } + + @Test + void assertBeforeRollback() { + assertDoesNotThrow(() -> transactionHook.beforeRollback(rule, databaseType, Collections.emptyList(), transactionContext)); + } + + @Test + void assertAfterRollback() { + assertDoesNotThrow(() -> transactionHook.afterRollback(rule, databaseType, Collections.emptyList(), transactionContext)); } }