Skip to content

Commit

Permalink
[SPARK-50516][SS][MINOR] Fix the init state related test to use Strea…
Browse files Browse the repository at this point in the history
…mManualClock

### What changes were proposed in this pull request?
Fix the init state related test to use StreamManualClock

### Why are the changes needed?
Fix flakiness on CI runs

```
[info] Run completed in 2 minutes, 43 seconds.
[info] Total number of tests run: 22
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 22, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Test only change

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #49102 from anishshri-db/task/SPARK-50516.

Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
anishshri-db authored and HeartSaVioR committed Dec 7, 2024
1 parent 2fea84e commit fff6793
Showing 1 changed file with 25 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName) {

val clock = new StreamManualClock

val inputData = MemoryStream[InitInputRow]
val kvDataSet = inputData.toDS()
.groupByKey(x => x.key)
Expand All @@ -390,10 +392,12 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest
TimeMode.None(), OutputMode.Append(), initStateDf)

testStream(query, OutputMode.Update())(
StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
// non-exist key test
AddData(inputData, InitInputRow("k1", "update", 37.0)),
AddData(inputData, InitInputRow("k2", "update", 40.0)),
AddData(inputData, InitInputRow("non-exist", "getOption", -1.0)),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("non-exist", "getOption", -1.0)),
Execute { q =>
assert(q.lastProgress
Expand All @@ -402,59 +406,80 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest
AddData(inputData, InitInputRow("k1", "appendList", 37.0)),
AddData(inputData, InitInputRow("k2", "appendList", 40.0)),
AddData(inputData, InitInputRow("non-exist", "getList", -1.0)),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(),

AddData(inputData, InitInputRow("k1", "incCount", 37.0)),
AddData(inputData, InitInputRow("k2", "incCount", 40.0)),
AddData(inputData, InitInputRow("non-exist", "getCount", -1.0)),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("non-exist", "getCount", 0.0)),

AddData(inputData, InitInputRow("k2", "incCount", 40.0)),
AddData(inputData, InitInputRow("k2", "getCount", 40.0)),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("k2", "getCount", 2.0)),

// test every row in initial State is processed
AddData(inputData, InitInputRow("init_1", "getOption", -1.0)),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("init_1", "getOption", 40.0)),

AddData(inputData, InitInputRow("init_2", "getOption", -1.0)),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("init_2", "getOption", 100.0)),

AddData(inputData, InitInputRow("init_1", "getList", -1.0)),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("init_1", "getList", 40.0)),

AddData(inputData, InitInputRow("init_2", "getList", -1.0)),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("init_2", "getList", 100.0)),

AddData(inputData, InitInputRow("init_1", "getCount", 40.0)),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("init_1", "getCount", 1.0)),

AddData(inputData, InitInputRow("init_2", "getCount", 100.0)),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("init_2", "getCount", 1.0)),

// Update row with key in initial row will work
AddData(inputData, InitInputRow("init_1", "update", 50.0)),
AddData(inputData, InitInputRow("init_1", "getOption", -1.0)),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("init_1", "getOption", 50.0)),

AddData(inputData, InitInputRow("init_1", "remove", -1.0)),
AddData(inputData, InitInputRow("init_1", "getOption", -1.0)),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("init_1", "getOption", -1.0)),

AddData(inputData, InitInputRow("init_1", "appendList", 50.0)),
AddData(inputData, InitInputRow("init_1", "getList", -1.0)),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("init_1", "getList", 50.0), ("init_1", "getList", 40.0)),

AddData(inputData, InitInputRow("init_1", "incCount", 40.0)),
AddData(inputData, InitInputRow("init_1", "getCount", 40.0)),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("init_1", "getCount", 2.0)),

// test remove
AddData(inputData, InitInputRow("k1", "remove", -1.0)),
AddData(inputData, InitInputRow("k1", "getOption", -1.0)),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("k1", "getOption", -1.0)),

AddData(inputData, InitInputRow("init_1", "clearCount", -1.0)),
AddData(inputData, InitInputRow("init_1", "getCount", -1.0)),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("init_1", "getCount", 0.0)),

AddData(inputData, InitInputRow("init_1", "clearList", -1.0)),
AddData(inputData, InitInputRow("init_1", "getList", -1.0)),
AdvanceManualClock(1 * 1000),
CheckNewAnswer()
)
}
Expand Down

0 comments on commit fff6793

Please sign in to comment.