From bd704beabcade6587f9681ed0e664590b0e649e0 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Wed, 11 Sep 2024 11:21:43 +0200 Subject: [PATCH 01/10] Fix failing type checks Signed-off-by: Paolo Di Tommaso --- src/main/groovy/io/seqera/wave/WaveDefault.groovy | 2 +- .../seqera/wave/memstore/range/impl/RedisRangeProvider.groovy | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/groovy/io/seqera/wave/WaveDefault.groovy b/src/main/groovy/io/seqera/wave/WaveDefault.groovy index c42a69187..ec8748a5a 100644 --- a/src/main/groovy/io/seqera/wave/WaveDefault.groovy +++ b/src/main/groovy/io/seqera/wave/WaveDefault.groovy @@ -41,7 +41,7 @@ interface WaveDefault { 'application/vnd.docker.distribution.manifest.list.v2+json' ) ) - final public static int[] HTTP_REDIRECT_CODES = List.of(301, 302, 303, 307, 308) + final public static List HTTP_REDIRECT_CODES = List.of(301, 302, 303, 307, 308) final public static List HTTP_SERVER_ERRORS = List.of(500, 502, 503, 504) diff --git a/src/main/groovy/io/seqera/wave/memstore/range/impl/RedisRangeProvider.groovy b/src/main/groovy/io/seqera/wave/memstore/range/impl/RedisRangeProvider.groovy index 070b12b93..22a5ede1a 100644 --- a/src/main/groovy/io/seqera/wave/memstore/range/impl/RedisRangeProvider.groovy +++ b/src/main/groovy/io/seqera/wave/memstore/range/impl/RedisRangeProvider.groovy @@ -62,7 +62,7 @@ class RedisRangeProvider implements RangeProvider { if( remove ) { final entries = conn.eval(SCRIPT, 1, key, min.toString(), max.toString(), '0', count.toString()) if( entries instanceof List ) - result.addAll(entries) + result.addAll((List) entries) } else { List found = conn.zrangeByScoreWithScores(key, min, max, 0, count) From 7c1eb03cca29ede01caeb58280800be1bd807f5a Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Wed, 11 Sep 2024 14:12:36 +0200 Subject: [PATCH 02/10] Add s5cmd/dist.sh script [ci skip] Signed-off-by: Paolo Di Tommaso --- s5cmd/dist.sh | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 s5cmd/dist.sh diff --git a/s5cmd/dist.sh b/s5cmd/dist.sh new file mode 100644 index 000000000..af66e14cb --- /dev/null +++ b/s5cmd/dist.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +# +# Wave, containers provisioning service +# Copyright (c) 2023-2024, Seqera Labs +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# + +arch=$(uname -m) + +case $arch in + x86_64|amd64) + echo "https://github.com/peak/s5cmd/releases/download/v$1/s5cmd_$1_Linux-64bit.tar.gz" + ;; + aarch64|arm64) + echo "https://github.com/peak/s5cmd/releases/download/v$1/s5cmd_$1_Linux-arm64.tar.gz" + ;; + *) + echo "Unknown architecture: $arch" + ;; +esac From c16c09597931c9870186b5a3c718ecdbf74cfb0e Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Wed, 11 Sep 2024 14:30:25 +0200 Subject: [PATCH 03/10] Use public repo for s5cmd (#639) Signed-off-by: Paolo Di Tommaso --- configuration.md | 2 +- s5cmd/Makefile | 2 +- src/main/resources/application.yml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/configuration.md b/configuration.md index 9c8daf16b..b7523b17c 100644 --- a/configuration.md +++ b/configuration.md @@ -216,7 +216,7 @@ Wave offers a feature to provide a cache for Docker blobs, which improves the pe - **`wave.blobCache.enabled`**: whether to enable the blob cache. It is `false` by default. *Optional*. -- **`wave.blobCache.s5cmdImage`**: the Docker image that supplies the [s5cmd tool](https://github.com/peak/s5cmd). This tool is used to upload blob binaries to the S3 bucket. The default image used by Wave is `cr.seqera.io/public/wave/s5cmd:v2.2.2`. *Optional*. +- **`wave.blobCache.s5cmdImage`**: the Docker image that supplies the [s5cmd tool](https://github.com/peak/s5cmd). This tool is used to upload blob binaries to the S3 bucket. The default image used by Wave is `public.cr.seqera.io/wave/s5cmd:v2.2.2`. *Optional*. - **`wave.blobCache.status.delay`**: the time delay in checking the status of the transfer of the blob binary from the repository to the cache. Its default value is `5s`. *Optional*. diff --git a/s5cmd/Makefile b/s5cmd/Makefile index 2de4f13e6..45ea7ee29 100644 --- a/s5cmd/Makefile +++ b/s5cmd/Makefile @@ -5,5 +5,5 @@ build: --push \ --platform linux/amd64,linux/arm64 \ --build-arg version=${version} \ - --tag cr.seqera.io/public/wave/s5cmd:v${version} \ + --tag public.cr.seqera.io/wave/s5cmd:v${version} \ . diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 15dd5c8f1..609117754 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -85,7 +85,7 @@ wave: image: name: aquasec/trivy:0.53.0 blobCache: - s5cmdImage: cr.seqera.io/public/wave/s5cmd:v2.2.2 + s5cmdImage: public.cr.seqera.io/wave/s5cmd:v2.2.2 --- jackson: serialization: From b69d34c41f15b3bc4ab9d0df03fe8044b7bb0727 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Wed, 11 Sep 2024 14:30:39 +0200 Subject: [PATCH 04/10] Bump trivy 0.55 (#638) Signed-off-by: Paolo Di Tommaso --- src/main/resources/application.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 609117754..0be2a3603 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -83,7 +83,7 @@ wave: multiplier: '1.75' scan: image: - name: aquasec/trivy:0.53.0 + name: aquasec/trivy:0.55.0 blobCache: s5cmdImage: public.cr.seqera.io/wave/s5cmd:v2.2.2 --- From 94d37637d9fd7b0c83028cd664d3af35f9e9f825 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Wed, 11 Sep 2024 15:03:05 +0200 Subject: [PATCH 05/10] Update scan model (#637) Signed-off-by: Paolo Di Tommaso Co-authored-by: Munish Chouhan --- typespec/models/WaveScanRecord.tsp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/typespec/models/WaveScanRecord.tsp b/typespec/models/WaveScanRecord.tsp index e74e56cf9..f5f4d9158 100644 --- a/typespec/models/WaveScanRecord.tsp +++ b/typespec/models/WaveScanRecord.tsp @@ -4,8 +4,9 @@ import "./Vulnerability.tsp"; model WaveScanRecord { buildId: string; duration: int64; + containerImage: string; id: string; startTime: string; status: string; vulnerabilities: Vulnerability[]; - } \ No newline at end of file + } From a6d1d884db40d188fefda0be9f81b539e1276ae7 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Thu, 12 Sep 2024 08:04:04 +0200 Subject: [PATCH 06/10] Add entropy to cron services (#640) Signed-off-by: Paolo Di Tommaso --- .../wave/service/cleanup/CleanupConfig.groovy | 4 + .../service/cleanup/CleanupServiceImpl.groovy | 6 +- .../seqera/wave/tower/auth/JwtConfig.groovy | 8 +- .../seqera/wave/tower/auth/JwtMonitor.groovy | 6 +- .../io/seqera/wave/util/DurationUtils.groovy | 61 ++++++++ .../service/cleanup/CleanupConfigTest.groovy | 42 ++++++ .../wave/tower/auth/JwtConfigTest.groovy | 41 ++++++ .../seqera/wave/util/DurationUtilsTest.groovy | 139 ++++++++++++++++++ 8 files changed, 304 insertions(+), 3 deletions(-) create mode 100644 src/main/groovy/io/seqera/wave/util/DurationUtils.groovy create mode 100644 src/test/groovy/io/seqera/wave/service/cleanup/CleanupConfigTest.groovy create mode 100644 src/test/groovy/io/seqera/wave/tower/auth/JwtConfigTest.groovy create mode 100644 src/test/groovy/io/seqera/wave/util/DurationUtilsTest.groovy diff --git a/src/main/groovy/io/seqera/wave/service/cleanup/CleanupConfig.groovy b/src/main/groovy/io/seqera/wave/service/cleanup/CleanupConfig.groovy index bcf559051..1dddc77d4 100644 --- a/src/main/groovy/io/seqera/wave/service/cleanup/CleanupConfig.groovy +++ b/src/main/groovy/io/seqera/wave/service/cleanup/CleanupConfig.groovy @@ -24,6 +24,7 @@ import javax.annotation.Nullable import groovy.transform.CompileStatic import groovy.transform.ToString import io.micronaut.context.annotation.Value +import io.seqera.wave.util.DurationUtils import jakarta.inject.Singleton /** @@ -55,4 +56,7 @@ class CleanupConfig { @Value('${wave.cleanup.run-interval:30s}') Duration cleanupRunInterval + Duration getCleanupStartupDelayRandomized() { + DurationUtils.randomDuration(cleanupStartupDelay, 0.4f) + } } diff --git a/src/main/groovy/io/seqera/wave/service/cleanup/CleanupServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/cleanup/CleanupServiceImpl.groovy index 514a0c5f4..1b8baba53 100644 --- a/src/main/groovy/io/seqera/wave/service/cleanup/CleanupServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/cleanup/CleanupServiceImpl.groovy @@ -61,7 +61,11 @@ class CleanupServiceImpl implements Runnable, CleanupService { @PostConstruct private init() { log.info "Creating cleanup service - config=$config" - scheduler.scheduleAtFixedRate(config.cleanupStartupDelay, config.cleanupRunInterval, this) + // use randomize initial delay to prevent multiple replicas running at the same time + scheduler.scheduleWithFixedDelay( + config.cleanupStartupDelayRandomized, + config.cleanupRunInterval, + this ) } @Override diff --git a/src/main/groovy/io/seqera/wave/tower/auth/JwtConfig.groovy b/src/main/groovy/io/seqera/wave/tower/auth/JwtConfig.groovy index e6fd0b7a6..9e040da47 100644 --- a/src/main/groovy/io/seqera/wave/tower/auth/JwtConfig.groovy +++ b/src/main/groovy/io/seqera/wave/tower/auth/JwtConfig.groovy @@ -23,10 +23,12 @@ import java.time.Duration import groovy.transform.CompileStatic import groovy.transform.ToString import io.micronaut.context.annotation.Value +import io.seqera.wave.util.DurationUtils import jakarta.inject.Singleton /** - * + * Model JWT refreshing service config + * * @author Paolo Di Tommaso */ @Singleton @@ -59,4 +61,8 @@ class JwtConfig { @Value('${wave.jwt.monitor.count:10}') int monitorCount + Duration getMonitorDelayRandomized() { + DurationUtils.randomDuration(monitorDelay, 0.4f) + } + } diff --git a/src/main/groovy/io/seqera/wave/tower/auth/JwtMonitor.groovy b/src/main/groovy/io/seqera/wave/tower/auth/JwtMonitor.groovy index 41262b847..0fee156f7 100644 --- a/src/main/groovy/io/seqera/wave/tower/auth/JwtMonitor.groovy +++ b/src/main/groovy/io/seqera/wave/tower/auth/JwtMonitor.groovy @@ -63,7 +63,11 @@ class JwtMonitor implements Runnable { @PostConstruct private init() { log.info "Creating JWT heartbeat - $jwtConfig" - scheduler.scheduleAtFixedRate(jwtConfig.monitorDelay, jwtConfig.monitorInterval, this) + // use randomize initial delay to prevent multiple replicas running at the same time + scheduler.scheduleAtFixedRate( + jwtConfig.monitorDelayRandomized, + jwtConfig.monitorInterval, + this ) } void run() { diff --git a/src/main/groovy/io/seqera/wave/util/DurationUtils.groovy b/src/main/groovy/io/seqera/wave/util/DurationUtils.groovy new file mode 100644 index 000000000..1d5f13034 --- /dev/null +++ b/src/main/groovy/io/seqera/wave/util/DurationUtils.groovy @@ -0,0 +1,61 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.util + +import java.time.Duration +import java.util.concurrent.ThreadLocalRandom + +import groovy.transform.CompileStatic +/** + * Utility functions for handling duration + * + * @author Paolo Di Tommaso + */ +@CompileStatic +class DurationUtils { + + static Duration randomDuration(Duration min, Duration max) { + if (min > max) { + throw new IllegalArgumentException("Min duration must be less than or equal to max duration") + } + + long minNanos = min.toNanos() + long maxNanos = max.toNanos() + long randomNanos = ThreadLocalRandom.current().nextLong(minNanos, maxNanos + 1) + + return Duration.ofNanos(randomNanos) + } + + static Duration randomDuration(Duration reference, float intervalPercentage) { + if (intervalPercentage < 0 || intervalPercentage > 1) { + throw new IllegalArgumentException("Interval percentage must be between 0 and 1") + } + + long refNanos = reference.toNanos() + long intervalNanos = (long)(refNanos * intervalPercentage) + + long minNanos = Math.max(0, refNanos - intervalNanos) + long maxNanos = refNanos + intervalNanos + + long randomNanos = ThreadLocalRandom.current().nextLong(minNanos, maxNanos + 1) + + return Duration.ofNanos(randomNanos) + } + +} diff --git a/src/test/groovy/io/seqera/wave/service/cleanup/CleanupConfigTest.groovy b/src/test/groovy/io/seqera/wave/service/cleanup/CleanupConfigTest.groovy new file mode 100644 index 000000000..e34e114c4 --- /dev/null +++ b/src/test/groovy/io/seqera/wave/service/cleanup/CleanupConfigTest.groovy @@ -0,0 +1,42 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.cleanup + +import spock.lang.Specification + +import java.time.Duration + +/** + * + * @author Paolo Di Tommaso + */ +class CleanupConfigTest extends Specification { + + def 'should get random delay' () { + when: + def d = Duration.ofSeconds(10) + def config = new CleanupConfig(cleanupStartupDelay: d) + then: + config.cleanupStartupDelay == d + and: + config.cleanupStartupDelayRandomized >= d.dividedBy(2) + config.cleanupStartupDelayRandomized < (d + d.dividedBy(2)) + } + +} diff --git a/src/test/groovy/io/seqera/wave/tower/auth/JwtConfigTest.groovy b/src/test/groovy/io/seqera/wave/tower/auth/JwtConfigTest.groovy new file mode 100644 index 000000000..d74646444 --- /dev/null +++ b/src/test/groovy/io/seqera/wave/tower/auth/JwtConfigTest.groovy @@ -0,0 +1,41 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.tower.auth + +import spock.lang.Specification + +import java.time.Duration +/** + * + * @author Paolo Di Tommaso + */ +class JwtConfigTest extends Specification { + + def 'should get random delay' () { + when: + def d = Duration.ofSeconds(10) + def config = new JwtConfig(monitorDelay: d) + then: + config.monitorDelay == d + and: + config.monitorDelayRandomized >= d.dividedBy(2) + config.monitorDelayRandomized < (d + d.dividedBy(2)) + } + +} diff --git a/src/test/groovy/io/seqera/wave/util/DurationUtilsTest.groovy b/src/test/groovy/io/seqera/wave/util/DurationUtilsTest.groovy new file mode 100644 index 000000000..596252923 --- /dev/null +++ b/src/test/groovy/io/seqera/wave/util/DurationUtilsTest.groovy @@ -0,0 +1,139 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.util + +import spock.lang.Specification +import spock.lang.Unroll + +import java.time.Duration + +class DurationUtilsTest extends Specification { + + @Unroll + def "randomDuration returns a duration between #min and #max"() { + given: + def minDuration = Duration.ofSeconds(min) + def maxDuration = Duration.ofSeconds(max) + + when: + def result = DurationUtils.randomDuration(minDuration, maxDuration) + + then: + result >= minDuration + result <= maxDuration + + where: + min | max + 1 | 10 + 0 | 100 + 60 | 3600 + 3600| 7200 + } + + def "randomDuration returns min or max when they are equal"() { + given: + def duration = Duration.ofSeconds(10) + + when: + def result = DurationUtils.randomDuration(duration, duration) + + then: + result == duration + } + + + def "randomDuration generates different values over multiple calls"() { + given: + def minDuration = Duration.ofSeconds(1) + def maxDuration = Duration.ofSeconds(1000) + def iterations = 100 + def results = [] + + when: + iterations.times { + results << DurationUtils.randomDuration(minDuration, maxDuration) + } + + then: + results.unique().size() > 1 + } + + @Unroll + def "randomDuration returns a duration within #intervalPercentage of #reference"() { + given: + def referenceDuration = Duration.ofSeconds(reference) + + when: + def result = DurationUtils.randomDuration(referenceDuration, intervalPercentage) + + then: + def minAllowed = referenceDuration.multipliedBy((long)((1 - intervalPercentage) * 100)).dividedBy(100) + def maxAllowed = referenceDuration.multipliedBy((long)((1 + intervalPercentage) * 100)).dividedBy(100) + result >= minAllowed + result <= maxAllowed + + where: + reference | intervalPercentage + 100 | 0.2f + 60 | 0.5f + 3600 | 0.1f + 10 | 0.0f + } + + def "randomDuration throws IllegalArgumentException for invalid interval percentage"() { + given: + def referenceDuration = Duration.ofSeconds(100) + + when: + DurationUtils.randomDuration(referenceDuration, invalidPercentage) + + then: + thrown(IllegalArgumentException) + + where: + invalidPercentage << [-0.1f, 1.1f] + } + + def "randomDuration handles zero reference duration"() { + given: + def referenceDuration = Duration.ZERO + + when: + def result = DurationUtils.randomDuration(referenceDuration, 0.2f) + + then: + result == Duration.ZERO + } + + def "randomDuration generates different values over multiple calls"() { + given: + def referenceDuration = Duration.ofSeconds(100) + def intervalPercentage = 0.2f + def iterations = 100 + def results = [] + + when: + iterations.times { + results << DurationUtils.randomDuration(referenceDuration, intervalPercentage) + } + + then: + results.unique().size() > 1 + } +} From df32b3057e86688163e9982c53554db4310dcc5d Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Thu, 12 Sep 2024 16:55:03 +0200 Subject: [PATCH 07/10] Increase cache-tower-client to 1min (#641) [ci skip] Signed-off-by: Paolo Di Tommaso --- src/main/resources/application.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0be2a3603..e5a3b0832 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -7,7 +7,7 @@ micronaut: # Using `expire-after-read` can cause an entry to be retained in the cache more than expected if it is hit continuously # with a frequency shorter than the declared cache period. cache-tower-client: - expire-after-write: 20s + expire-after-write: 60s record-stats: true cache-registry-proxy: expire-after-write: 20s From ebf65adcf343728ce25ec24596dddf974cf184a8 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Thu, 12 Sep 2024 20:45:59 +0200 Subject: [PATCH 08/10] Fix Blob cache failure duration (#643) Signed-off-by: Paolo Di Tommaso --- .../wave/configuration/BlobCacheConfig.groovy | 14 +++++++++++--- .../service/blob/impl/BlobCacheServiceImpl.groovy | 2 +- .../blob/impl/BlobCacheServiceImplTest2.groovy | 6 +++--- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/main/groovy/io/seqera/wave/configuration/BlobCacheConfig.groovy b/src/main/groovy/io/seqera/wave/configuration/BlobCacheConfig.groovy index 329904a9e..c9b65c714 100644 --- a/src/main/groovy/io/seqera/wave/configuration/BlobCacheConfig.groovy +++ b/src/main/groovy/io/seqera/wave/configuration/BlobCacheConfig.groovy @@ -40,11 +40,19 @@ class BlobCacheConfig { @Value('${wave.blobCache.enabled:false}') boolean enabled - @Value('${wave.blobCache.status.delay:5s}') + /** + * The time interval every when the status of the blob transfer is checked + */ + @Value('${wave.blobCache.status.delay:2s}') Duration statusDelay - @Value('${wave.blobCache.failure.duration:4s}') - Duration failureDuration + /** + * How long a failed blob should survive in the cache. Note: this should be longer than + * {@link #statusDelay} otherwise the state can be lost. + */ + Duration getFailureDuration() { + return statusDelay.multipliedBy(3) + } @Value('${wave.blobCache.timeout:10m}') Duration transferTimeout diff --git a/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy index 0322a724b..683d27b11 100644 --- a/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy @@ -305,7 +305,7 @@ class BlobCacheServiceImpl implements BlobCacheService, JobHandler { ? blob.completed(state.exitCode, state.stdout) : blob.failed(state.stdout) blobStore.storeBlob(blob.id(), result, ttl) - log.debug "== Blob cache completed for object '${blob.objectUri}'; id=${blob.objectUri}; status=${result.exitStatus}; duration=${result.duration()}" + log.debug "== Blob cache completed for object '${blob.objectUri}'; operation=${job.operationName}; status=${result.exitStatus}; duration=${result.duration()}" } protected void handleJobException(JobSpec job, BlobCacheInfo blob, Throwable error) { diff --git a/src/test/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImplTest2.groovy b/src/test/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImplTest2.groovy index 1a7ddead3..c955df2a7 100644 --- a/src/test/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImplTest2.groovy +++ b/src/test/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImplTest2.groovy @@ -163,7 +163,7 @@ class BlobCacheServiceImplTest2 extends Specification implements AwsS3TestContai def blobStore = Mock(BlobStore) def blob = Mock(BlobCacheInfo) blob.completed(_,_) >> BlobCacheInfo.create('location', 'object', null, null) - def config = new BlobCacheConfig(failureDuration: Duration.ofMinutes(1)) + def config = new BlobCacheConfig(statusDelay: Duration.ofSeconds(2)) def service = new BlobCacheServiceImpl(blobStore: blobStore, blobConfig: config) def event = Mock(JobEvent) event.job >> JobSpec.transfer('job-id', 'foo', Instant.now(), Duration.ofMinutes(1)) @@ -184,7 +184,7 @@ class BlobCacheServiceImplTest2 extends Specification implements AwsS3TestContai given: def blobStore = Mock(BlobStore) def blob = Mock(BlobCacheInfo) - def config = new BlobCacheConfig(failureDuration: Duration.ofMinutes(1)) + def config = new BlobCacheConfig(statusDelay: Duration.ofSeconds(2)) def service = new BlobCacheServiceImpl(blobStore: blobStore, blobConfig: config) def event = Mock(JobEvent) event.job >> JobSpec.transfer('job-id', 'foo', Instant.now(), Duration.ofMinutes(1)) @@ -202,7 +202,7 @@ class BlobCacheServiceImplTest2 extends Specification implements AwsS3TestContai given: def blobStore = Mock(BlobStore) def blob = Mock(BlobCacheInfo) - def config = new BlobCacheConfig(failureDuration: Duration.ofMinutes(1)) + def config = new BlobCacheConfig(statusDelay: Duration.ofSeconds(2)) def service = new BlobCacheServiceImpl(blobStore: blobStore, blobConfig: config) def event = Mock(JobEvent) event.job >> JobSpec.transfer('job-id', 'foo', Instant.now(), Duration.ofMinutes(1)) From 8b96173a47630811e982f7217c6b218b73b54796 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Thu, 12 Sep 2024 22:23:26 +0200 Subject: [PATCH 09/10] Improve blob cache info (#644) * Improve blob cache info Signed-off-by: Paolo Di Tommaso * Improve failure ttl handling Signed-off-by: Paolo Di Tommaso --------- Signed-off-by: Paolo Di Tommaso --- .../wave/service/blob/BlobCacheInfo.groovy | 39 +++++- .../seqera/wave/service/blob/BlobStore.groovy | 10 -- .../blob/impl/BlobCacheServiceImpl.groovy | 21 ++- .../service/blob/impl/BlobCacheStore.groovy | 9 +- .../service/blob/BlobCacheInfoTest.groovy | 65 +++++++++- .../impl/BlobCacheServiceImplTest2.groovy | 4 +- .../blob/impl/BlobCacheStoreImplTest.groovy | 120 ++++++++++++++++++ 7 files changed, 232 insertions(+), 36 deletions(-) create mode 100644 src/test/groovy/io/seqera/wave/service/blob/impl/BlobCacheStoreImplTest.groovy diff --git a/src/main/groovy/io/seqera/wave/service/blob/BlobCacheInfo.groovy b/src/main/groovy/io/seqera/wave/service/blob/BlobCacheInfo.groovy index 8511cc4ce..e278a9e20 100644 --- a/src/main/groovy/io/seqera/wave/service/blob/BlobCacheInfo.groovy +++ b/src/main/groovy/io/seqera/wave/service/blob/BlobCacheInfo.groovy @@ -22,7 +22,6 @@ import java.time.Instant import groovy.transform.Canonical import groovy.transform.CompileStatic -import groovy.transform.ToString import groovy.util.logging.Slf4j /** * Model a blob cache metadata entry @@ -30,11 +29,17 @@ import groovy.util.logging.Slf4j * @author Paolo Di Tommaso */ @Slf4j -@ToString(includePackage = false, includeNames = true, excludes = ['headers','logs']) @Canonical @CompileStatic class BlobCacheInfo { + enum State { CREATED, CACHED, COMPLETED, ERRORED, UNKNOWN } + + /** + * The Blob state + */ + final State state + /** * The HTTP location from the where the cached container blob can be retrieved */ @@ -113,7 +118,7 @@ class BlobCacheInfo { final type = headerString0(response, 'Content-Type') final cache = headerString0(response, 'Cache-Control') final creationTime = Instant.now() - return new BlobCacheInfo(locationUri, objectUri, headers0, length, type, cache, creationTime, null, null, null) + return new BlobCacheInfo(State.CREATED, locationUri, objectUri, headers0, length, type, cache, creationTime, null, null, null) } static String headerString0(Map> headers, String name) { @@ -130,8 +135,28 @@ class BlobCacheInfo { } } + @Override + String toString() { + if( state==State.UNKNOWN ) { + return "BlobCacheInfo(UNKNOWN)" + } + + return "BlobCacheInfo(" + + "state=" + state + + ", locationUri='" + locationUri + "'" + + ", objectUri='" + objectUri + "'" + + ", contentLength=" + contentLength + + ", contentType='" + contentType + "'" + + ", cacheControl='" + cacheControl + "'" + + ", creationTime=" + creationTime + + ", completionTime=" + completionTime + + ", exitStatus=" + exitStatus + + ')' + } + BlobCacheInfo cached() { new BlobCacheInfo( + State.CACHED, locationUri, objectUri, headers, @@ -147,6 +172,7 @@ class BlobCacheInfo { BlobCacheInfo completed(int status, String logs) { new BlobCacheInfo( + State.COMPLETED, locationUri, objectUri, headers, @@ -160,8 +186,9 @@ class BlobCacheInfo { ) } - BlobCacheInfo failed(String logs) { + BlobCacheInfo errored(String logs) { new BlobCacheInfo( + State.ERRORED, locationUri, objectUri, headers, @@ -177,6 +204,7 @@ class BlobCacheInfo { BlobCacheInfo withLocation(String location) { new BlobCacheInfo( + state, location, objectUri, headers, @@ -191,7 +219,7 @@ class BlobCacheInfo { } static BlobCacheInfo unknown(String logs) { - new BlobCacheInfo(null, null, null, null, null, null, Instant.ofEpochMilli(0), Instant.ofEpochMilli(0), null, logs) { + new BlobCacheInfo(State.UNKNOWN, null, null, null, null, null, null, Instant.ofEpochMilli(0), Instant.ofEpochMilli(0), null, logs) { @Override BlobCacheInfo withLocation(String uri) { // prevent the change of location for unknown status @@ -200,4 +228,5 @@ class BlobCacheInfo { } } + } diff --git a/src/main/groovy/io/seqera/wave/service/blob/BlobStore.groovy b/src/main/groovy/io/seqera/wave/service/blob/BlobStore.groovy index ce431312b..b5ed02c8e 100644 --- a/src/main/groovy/io/seqera/wave/service/blob/BlobStore.groovy +++ b/src/main/groovy/io/seqera/wave/service/blob/BlobStore.groovy @@ -55,16 +55,6 @@ interface BlobStore { */ void storeBlob(String key, BlobCacheInfo info) - /** - * Store the blob cache info object with the specified key. The object is evicted after the specified - * duration is reached - * - * @param key The unique to be used to store the blob cache info - * @param info The {@link BlobCacheInfo} object modelling the container blob information - * @param ttl How long the object is allowed to stay in the cache - */ - void storeBlob(String key, BlobCacheInfo info, Duration ttl) - /** * Store a blob cache location only if the specified key does not exit * diff --git a/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy index 683d27b11..13375d83d 100644 --- a/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy @@ -189,9 +189,9 @@ class BlobCacheServiceImpl implements BlobCacheService, JobHandler { } catch (Throwable t) { log.warn "== Blob cache failed for object '${blob.objectUri}' - cause: ${t.message}", t - final result = blob.failed(t.message) + final result = blob.errored(t.message) // update the blob status - blobStore.storeBlob(blob.id(), result, blobConfig.failureDuration) + blobStore.storeBlob(blob.id(), result) } } @@ -295,28 +295,23 @@ class BlobCacheServiceImpl implements BlobCacheService, JobHandler { } protected void handleJobCompletion(JobSpec job, BlobCacheInfo blob, JobState state) { - // use a short time-to-live for failed downloads - // this is needed to allow re-try caching of failure transfers - final ttl = state.succeeded() - ? blobConfig.statusDuration - : blobConfig.failureDuration // update the blob status final result = state.succeeded() ? blob.completed(state.exitCode, state.stdout) - : blob.failed(state.stdout) - blobStore.storeBlob(blob.id(), result, ttl) + : blob.errored(state.stdout) + blobStore.storeBlob(blob.id(), result) log.debug "== Blob cache completed for object '${blob.objectUri}'; operation=${job.operationName}; status=${result.exitStatus}; duration=${result.duration()}" } protected void handleJobException(JobSpec job, BlobCacheInfo blob, Throwable error) { - final result = blob.failed("Unexpected error caching blob '${blob.locationUri}' - operation '${job.operationName}'") + final result = blob.errored("Unexpected error caching blob '${blob.locationUri}' - operation '${job.operationName}'") log.error("== Blob cache exception for object '${blob.objectUri}'; operation=${job.operationName}; cause=${error.message}", error) - blobStore.storeBlob(blob.id(), result, blobConfig.failureDuration) + blobStore.storeBlob(blob.id(), result) } protected void handleJobTimeout(JobSpec job, BlobCacheInfo blob) { - final result = blob.failed("Blob cache transfer timed out ${blob.objectUri}") + final result = blob.errored("Blob cache transfer timed out ${blob.objectUri}") log.warn "== Blob cache completed for object '${blob.objectUri}'; operation=${job.operationName}; duration=${result.duration()}" - blobStore.storeBlob(blob.id(), result, blobConfig.failureDuration) + blobStore.storeBlob(blob.id(), result) } } diff --git a/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheStore.groovy b/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheStore.groovy index 835dc0288..a75d148c6 100644 --- a/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheStore.groovy +++ b/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheStore.groovy @@ -80,11 +80,10 @@ class BlobCacheStore extends AbstractCacheStore implements BlobSt @Override void storeBlob(String key, BlobCacheInfo info) { - put(key, info) - } - - @Override - void storeBlob(String key, BlobCacheInfo info, Duration ttl) { + final ttl = info.state == BlobCacheInfo.State.ERRORED + ? blobConfig.failureDuration + : blobConfig.statusDuration put(key, info, ttl) } + } diff --git a/src/test/groovy/io/seqera/wave/service/blob/BlobCacheInfoTest.groovy b/src/test/groovy/io/seqera/wave/service/blob/BlobCacheInfoTest.groovy index 845f15fec..36acfb08a 100644 --- a/src/test/groovy/io/seqera/wave/service/blob/BlobCacheInfoTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/blob/BlobCacheInfoTest.groovy @@ -38,6 +38,7 @@ class BlobCacheInfoTest extends Specification { blob.objectUri == 's3://foo/com' blob.headers == [:] blob.id() == 's3://foo/com' + blob.state == BlobCacheInfo.State.CREATED expect: BlobCacheInfo.create('http://foo.com', 's3://foo/com', [Foo:['alpha'], Bar:['delta', 'gamma', 'omega']], [:]) @@ -137,7 +138,7 @@ class BlobCacheInfoTest extends Specification { def cache = BlobCacheInfo.create(location, object, headers, response) when: - def result = cache.failed('Oops') + def result = cache.errored('Oops') then: result.headers == [Foo:'something'] result.locationUri == 'http://foo.com' @@ -243,6 +244,7 @@ class BlobCacheInfoTest extends Specification { def 'should validate duration' () { given: def info = new BlobCacheInfo( + BlobCacheInfo.State.CREATED, null, null, null, @@ -265,4 +267,65 @@ class BlobCacheInfoTest extends Specification { now | now.plusSeconds(10) | Duration.ofSeconds(10) now | now.plusSeconds(60) | Duration.ofSeconds(60) } + + def 'should create blob cached' () { + given: + def blob = BlobCacheInfo.create('http://foo.com', 's3://foo/com', [:], [:]) + + when: + def info = blob.cached() + then: + info.state == BlobCacheInfo.State.CACHED + info.locationUri == blob.locationUri + info.objectUri == blob.objectUri + info.headers == blob.headers + info.contentLength == blob.contentLength + info.contentType == blob.contentType + info.cacheControl == blob.cacheControl + info.creationTime == blob.creationTime + info.completionTime == blob.creationTime + info.exitStatus == 0 + info.logs == null + } + + def 'should create blob completed' () { + given: + def blob = BlobCacheInfo.create('http://foo.com', 's3://foo/com', [:], [:]) + + when: + def info = blob.completed(1, 'this is the log') + then: + info.state == BlobCacheInfo.State.COMPLETED + info.locationUri == blob.locationUri + info.objectUri == blob.objectUri + info.headers == blob.headers + info.contentLength == blob.contentLength + info.contentType == blob.contentType + info.cacheControl == blob.cacheControl + info.creationTime == blob.creationTime + info.completionTime <= Instant.now() + info.exitStatus == 1 + info.logs == 'this is the log' + } + + def 'should create blob failed' () { + given: + def blob = BlobCacheInfo.create('http://foo.com', 's3://foo/com', [:], [:]) + + when: + def info = blob.errored('this is the log') + then: + info.state == BlobCacheInfo.State.ERRORED + info.locationUri == blob.locationUri + info.objectUri == blob.objectUri + info.headers == blob.headers + info.contentLength == blob.contentLength + info.contentType == blob.contentType + info.cacheControl == blob.cacheControl + info.creationTime == blob.creationTime + info.completionTime <= Instant.now() + info.exitStatus == null + info.logs == 'this is the log' + } + } diff --git a/src/test/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImplTest2.groovy b/src/test/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImplTest2.groovy index c955df2a7..215dfd6f3 100644 --- a/src/test/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImplTest2.groovy +++ b/src/test/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImplTest2.groovy @@ -176,7 +176,7 @@ class BlobCacheServiceImplTest2 extends Specification implements AwsS3TestContai then: 1 * blobStore.getBlob('job-id') >> blob 1 * blob.done() >> false - 1 * blobStore.storeBlob(_, _, _) + 1 * blobStore.storeBlob(_, _) 1 * blob.id() } @@ -195,7 +195,7 @@ class BlobCacheServiceImplTest2 extends Specification implements AwsS3TestContai then: 1 * blobStore.getBlob('job-id') >> blob - 1 * blob.failed(_) >> blob + 1 * blob.errored(_) >> blob } def 'handle job event when job encounters an error'() { diff --git a/src/test/groovy/io/seqera/wave/service/blob/impl/BlobCacheStoreImplTest.groovy b/src/test/groovy/io/seqera/wave/service/blob/impl/BlobCacheStoreImplTest.groovy new file mode 100644 index 000000000..8b06d2b03 --- /dev/null +++ b/src/test/groovy/io/seqera/wave/service/blob/impl/BlobCacheStoreImplTest.groovy @@ -0,0 +1,120 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.blob.impl + +import spock.lang.Specification + +import java.time.Duration + +import io.micronaut.context.annotation.Property +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import io.seqera.wave.configuration.BlobCacheConfig +import io.seqera.wave.service.blob.BlobCacheInfo +import io.seqera.wave.service.cache.impl.CacheProvider +import jakarta.inject.Inject + +/** + * + * @author Paolo Di Tommaso + */ +@Property(name = 'wave.blobCache.enabled', value = 'true') +@Property(name = 'wave.blobCache.storage.bucket', value='s3://foo') +@Property(name = 'wave.blobCache.storage.region', value='eu-west-1') +@MicronautTest +class BlobCacheStoreImplTest extends Specification { + + @Inject + BlobCacheStore store + + @Inject + CacheProvider provider + + def 'should get and store an entry' () { + given: + def key = UUID.randomUUID().toString() + def info1 = new BlobCacheInfo(BlobCacheInfo.State.CREATED, 'foo') + def info2 = new BlobCacheInfo(BlobCacheInfo.State.CREATED, 'bar') + + expect: + store.get(key) == null + + when: + store.put(key, info1) + then: + store.get(key) == info1 + + when: + store.put(key, info2) + then: + store.get(key) == info2 + and: + info1 != info2 + } + + def 'should put an item only if absent' () { + given: + def key = UUID.randomUUID().toString() + def info1 = new BlobCacheInfo(BlobCacheInfo.State.CREATED, 'foo') + def info2 = new BlobCacheInfo(BlobCacheInfo.State.CREATED, 'bar') + + expect: + store.putIfAbsent(key, info1) + and: + store.get(key) == info1 + + and: + !store.putIfAbsent(key, info2) + and: + store.get(key) == info1 // <-- didn't change + } + + def 'should put an entry with conditional ttl' () { + given: + def key = UUID.randomUUID().toString() + def info_ok = new BlobCacheInfo(BlobCacheInfo.State.CREATED, 'foo') + def info_err = new BlobCacheInfo(BlobCacheInfo.State.ERRORED, 'foo') + and: + def DELAY_ONE = Duration.ofMinutes(1) + def DELAY_TWO = Duration.ofSeconds(1) + and: + def config = Mock(BlobCacheConfig) + def cache = Spy(new BlobCacheStore(provider)) + cache.@blobConfig = config + + when: + cache.storeBlob(key, info_ok) + then: + // should use 'status duration' for TTL + 1 * config.getStatusDuration() >> DELAY_ONE + 0 * config.getFailureDuration() >> null + and: + 1 * cache.put(key, info_ok, DELAY_ONE) >> null + + when: + cache.storeBlob(key, info_err) + then: + // should use 'failure duration' for TTL + 0 * config.getStatusDuration() >> null + 1 * config.getFailureDuration() >> DELAY_TWO + and: + 1 * cache.put(key, info_err, DELAY_TWO) >> null + + } + +} From 7a9046ed6605018cdaebc5fb22c17148868ae3dc Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Fri, 13 Sep 2024 00:17:00 +0200 Subject: [PATCH 10/10] Fix K8s job status detection Signed-off-by: Paolo Di Tommaso --- .../wave/service/k8s/K8sServiceImpl.groovy | 17 +++++--- .../service/k8s/K8sServiceImplTest.groovy | 43 +++++++++++++++---- 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy index 7753c54b0..26686162a 100644 --- a/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy @@ -205,14 +205,19 @@ class K8sServiceImpl implements K8sService { .readNamespacedJob(name, namespace, null) if( !job ) return null - if( job.status.active ) - return JobStatus.Pending if( job.status.succeeded ) return JobStatus.Succeeded - if( job.status.failed ) - return JobStatus.Failed - else - return null + if( job.status.active ) + return JobStatus.Pending + if( job.status.failed ) { + if( job.status.completionTime!=null ) + return JobStatus.Failed + if( job.status.failed > job.spec.backoffLimit ) + return JobStatus.Failed + else + return JobStatus.Pending + } + return null } /** diff --git a/src/test/groovy/io/seqera/wave/service/k8s/K8sServiceImplTest.groovy b/src/test/groovy/io/seqera/wave/service/k8s/K8sServiceImplTest.groovy index 8aee4945d..4684432c5 100644 --- a/src/test/groovy/io/seqera/wave/service/k8s/K8sServiceImplTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/k8s/K8sServiceImplTest.groovy @@ -31,6 +31,7 @@ import io.kubernetes.client.openapi.apis.BatchV1Api import io.kubernetes.client.openapi.apis.CoreV1Api import io.kubernetes.client.openapi.models.V1EnvVar import io.kubernetes.client.openapi.models.V1Job +import io.kubernetes.client.openapi.models.V1JobSpec import io.kubernetes.client.openapi.models.V1JobStatus import io.kubernetes.client.openapi.models.V1ObjectMeta import io.kubernetes.client.openapi.models.V1Pod @@ -943,7 +944,7 @@ class K8sServiceImplTest extends Specification { } - private V1Job jobPending() { + private V1Job jobActive() { def status = new V1JobStatus(); status.setActive(1) status.setFailed(2) @@ -964,7 +965,30 @@ class K8sServiceImplTest extends Specification { private V1Job jobFailed() { def status = new V1JobStatus(); - status.setFailed(1) + status.setFailed(3) // <-- failed 3 times + def spec = new V1JobSpec() + spec.setBackoffLimit(2) // <-- max 2 retries + def result = new V1Job() + result.setStatus(status) + result.setSpec(spec) + return result + } + + private V1Job jobFailedWitMoreRetries() { + def status = new V1JobStatus(); + status.setFailed(1) // <-- failed 1 time + def spec = new V1JobSpec() + spec.setBackoffLimit(2) // <-- max 2 retries + def result = new V1Job() + result.setStatus(status) + result.setSpec(spec) + return result + } + + private V1Job jobCompleted() { + def status = new V1JobStatus(); + status.setFailed(1) // <-- failed 1 time + status.setCompletionTime(OffsetDateTime.now()) def result = new V1Job() result.setStatus(status) return result @@ -981,6 +1005,7 @@ class K8sServiceImplTest extends Specification { def 'should validate get status' () { given: def NS = 'foo' + def NAME = 'bar' def api = Mock(BatchV1Api) def client = Mock(K8sClient) { batchV1Api()>>api } def service = Spy(new K8sServiceImpl(namespace:NS, k8sClient: client)) @@ -993,11 +1018,13 @@ class K8sServiceImplTest extends Specification { status == EXPECTED where: - NAME | JOB | EXPECTED - 'foo' | null | null - 'foo' | jobPending() | K8sService.JobStatus.Pending - 'foo' | jobSucceeded() | K8sService.JobStatus.Succeeded - 'foo' | jobFailed() | K8sService.JobStatus.Failed - 'foo' | jobUnknown() | null + JOB | EXPECTED + null | null + jobActive() | K8sService.JobStatus.Pending + jobSucceeded() | K8sService.JobStatus.Succeeded + jobFailed() | K8sService.JobStatus.Failed + jobFailedWitMoreRetries() | K8sService.JobStatus.Pending + jobCompleted() | K8sService.JobStatus.Failed + jobUnknown() | null } }