Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

repro #12667

Closed
Closed

repro #12667

Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,25 +152,6 @@ public void clean() throws IOException {
cleanupResources();
}

private static final List<Class> LOCK_PROVIDER_CLASSES = Arrays.asList(
InProcessLockProvider.class,
FileSystemBasedLockProvider.class);

private static final List<ConflictResolutionStrategy> CONFLICT_RESOLUTION_STRATEGY_CLASSES = Arrays.asList(
new SimpleConcurrentFileWritesConflictResolutionStrategy(),
new PreferWriterConflictResolutionStrategy());

private static Iterable<Object[]> providerClassResolutionStrategyAndTableType() {
List<Object[]> opts = new ArrayList<>();
for (Object providerClass : LOCK_PROVIDER_CLASSES) {
for (ConflictResolutionStrategy resolutionStrategy : CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass, resolutionStrategy});
opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass, resolutionStrategy});
}
}
return opts;
}

@ParameterizedTest
@MethodSource("configParamsDirectBased")
public void testHoodieClientBasicMultiWriterWithEarlyConflictDetectionDirect(String tableType, String earlyConflictDetectionStrategy) throws Exception {
Expand Down Expand Up @@ -499,6 +480,26 @@ private void latchCountDownAndWait(CountDownLatch latch, long waitTimeMillis) {
}
}

private static final List<Class> LOCK_PROVIDER_CLASSES = Arrays.asList(
// InProcessLockProvider.class,
FileSystemBasedLockProvider.class);

private static final List<ConflictResolutionStrategy> CONFLICT_RESOLUTION_STRATEGY_CLASSES = Arrays.asList(
new SimpleConcurrentFileWritesConflictResolutionStrategy()
// new PreferWriterConflictResolutionStrategy()
);

private static Iterable<Object[]> providerClassResolutionStrategyAndTableType() {
List<Object[]> opts = new ArrayList<>();
for (Object providerClass : LOCK_PROVIDER_CLASSES) {
for (ConflictResolutionStrategy resolutionStrategy : CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
// opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass, resolutionStrategy});
opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass, resolutionStrategy});
}
}
return opts;
}

@ParameterizedTest
@MethodSource("providerClassResolutionStrategyAndTableType")
public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType, Class<? extends LockProvider<?>> providerClass,
Expand Down Expand Up @@ -586,6 +587,9 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta

// Since the concurrent modifications went in, this upsert has
// to fail
// ----- This line error out, go to the assertThrows API and set breakpoint there, keep running the test for 20 times in debug mode should repro the issue
// to keep running a test, modify Run configuration of the test (right click the run button next to the test, choose the last option) -> modify options -> repeat -> run until stopped.
// the run test in debug mode. when there is a repro it will stop at the breakpoint
assertThrows(HoodieWriteConflictException.class, () -> {
createCommitWithUpserts(cfg, client1, thirdCommitTime, commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
});
Expand Down
Loading