Skip to content

Commit

Permalink
Merge branch 'master' into 314-update-to-micronaut-41
Browse files Browse the repository at this point in the history
  • Loading branch information
munishchouhan authored Sep 13, 2024
2 parents a6d40bb + 7a9046e commit fe8121f
Show file tree
Hide file tree
Showing 23 changed files with 637 additions and 66 deletions.
2 changes: 1 addition & 1 deletion configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ Wave offers a feature to provide a cache for Docker blobs, which improves the pe

- **`wave.blobCache.enabled`**: whether to enable the blob cache. It is `false` by default. *Optional*.

- **`wave.blobCache.s5cmdImage`**: the Docker image that supplies the [s5cmd tool](https://github.com/peak/s5cmd). This tool is used to upload blob binaries to the S3 bucket. The default image used by Wave is `cr.seqera.io/public/wave/s5cmd:v2.2.2`*Optional*.
- **`wave.blobCache.s5cmdImage`**: the Docker image that supplies the [s5cmd tool](https://github.com/peak/s5cmd). This tool is used to upload blob binaries to the S3 bucket. The default image used by Wave is `public.cr.seqera.io/wave/s5cmd:v2.2.2`*Optional*.

- **`wave.blobCache.status.delay`**: the time delay in checking the status of the transfer of the blob binary from the repository to the cache. Its default value is `5s`*Optional*.

Expand Down
2 changes: 1 addition & 1 deletion s5cmd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ build:
--push \
--platform linux/amd64,linux/arm64 \
--build-arg version=${version} \
--tag cr.seqera.io/public/wave/s5cmd:v${version} \
--tag public.cr.seqera.io/wave/s5cmd:v${version} \
.
33 changes: 33 additions & 0 deletions s5cmd/dist.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/bin/bash

#
# Wave, containers provisioning service
# Copyright (c) 2023-2024, Seqera Labs
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#

arch=$(uname -m)

case $arch in
x86_64|amd64)
echo "https://github.com/peak/s5cmd/releases/download/v$1/s5cmd_$1_Linux-64bit.tar.gz"
;;
aarch64|arm64)
echo "https://github.com/peak/s5cmd/releases/download/v$1/s5cmd_$1_Linux-arm64.tar.gz"
;;
*)
echo "Unknown architecture: $arch"
;;
esac
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,19 @@ class BlobCacheConfig {
@Value('${wave.blobCache.enabled:false}')
boolean enabled

@Value('${wave.blobCache.status.delay:5s}')
/**
* The time interval every when the status of the blob transfer is checked
*/
@Value('${wave.blobCache.status.delay:2s}')
Duration statusDelay

@Value('${wave.blobCache.failure.duration:4s}')
Duration failureDuration
/**
* How long a failed blob should survive in the cache. Note: this should be longer than
* {@link #statusDelay} otherwise the state can be lost.
*/
Duration getFailureDuration() {
return statusDelay.multipliedBy(3)
}

@Value('${wave.blobCache.timeout:10m}')
Duration transferTimeout
Expand Down
39 changes: 34 additions & 5 deletions src/main/groovy/io/seqera/wave/service/blob/BlobCacheInfo.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,24 @@ import java.time.Instant

import groovy.transform.Canonical
import groovy.transform.CompileStatic
import groovy.transform.ToString
import groovy.util.logging.Slf4j
/**
* Model a blob cache metadata entry
*
* @author Paolo Di Tommaso <[email protected]>
*/
@Slf4j
@ToString(includePackage = false, includeNames = true, excludes = ['headers','logs'])
@Canonical
@CompileStatic
class BlobCacheInfo {

enum State { CREATED, CACHED, COMPLETED, ERRORED, UNKNOWN }

/**
* The Blob state
*/
final State state

/**
* The HTTP location from the where the cached container blob can be retrieved
*/
Expand Down Expand Up @@ -113,7 +118,7 @@ class BlobCacheInfo {
final type = headerString0(response, 'Content-Type')
final cache = headerString0(response, 'Cache-Control')
final creationTime = Instant.now()
return new BlobCacheInfo(locationUri, objectUri, headers0, length, type, cache, creationTime, null, null, null)
return new BlobCacheInfo(State.CREATED, locationUri, objectUri, headers0, length, type, cache, creationTime, null, null, null)
}

static String headerString0(Map<String,List<String>> headers, String name) {
Expand All @@ -130,8 +135,28 @@ class BlobCacheInfo {
}
}

@Override
String toString() {
if( state==State.UNKNOWN ) {
return "BlobCacheInfo(UNKNOWN)"
}

return "BlobCacheInfo(" +
"state=" + state +
", locationUri='" + locationUri + "'" +
", objectUri='" + objectUri + "'" +
", contentLength=" + contentLength +
", contentType='" + contentType + "'" +
", cacheControl='" + cacheControl + "'" +
", creationTime=" + creationTime +
", completionTime=" + completionTime +
", exitStatus=" + exitStatus +
')'
}

BlobCacheInfo cached() {
new BlobCacheInfo(
State.CACHED,
locationUri,
objectUri,
headers,
Expand All @@ -147,6 +172,7 @@ class BlobCacheInfo {

BlobCacheInfo completed(int status, String logs) {
new BlobCacheInfo(
State.COMPLETED,
locationUri,
objectUri,
headers,
Expand All @@ -160,8 +186,9 @@ class BlobCacheInfo {
)
}

BlobCacheInfo failed(String logs) {
BlobCacheInfo errored(String logs) {
new BlobCacheInfo(
State.ERRORED,
locationUri,
objectUri,
headers,
Expand All @@ -177,6 +204,7 @@ class BlobCacheInfo {

BlobCacheInfo withLocation(String location) {
new BlobCacheInfo(
state,
location,
objectUri,
headers,
Expand All @@ -191,7 +219,7 @@ class BlobCacheInfo {
}

static BlobCacheInfo unknown(String logs) {
new BlobCacheInfo(null, null, null, null, null, null, Instant.ofEpochMilli(0), Instant.ofEpochMilli(0), null, logs) {
new BlobCacheInfo(State.UNKNOWN, null, null, null, null, null, null, Instant.ofEpochMilli(0), Instant.ofEpochMilli(0), null, logs) {
@Override
BlobCacheInfo withLocation(String uri) {
// prevent the change of location for unknown status
Expand All @@ -200,4 +228,5 @@ class BlobCacheInfo {
}
}


}
10 changes: 0 additions & 10 deletions src/main/groovy/io/seqera/wave/service/blob/BlobStore.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,6 @@ interface BlobStore {
*/
void storeBlob(String key, BlobCacheInfo info)

/**
* Store the blob cache info object with the specified key. The object is evicted after the specified
* duration is reached
*
* @param key The unique to be used to store the blob cache info
* @param info The {@link BlobCacheInfo} object modelling the container blob information
* @param ttl How long the object is allowed to stay in the cache
*/
void storeBlob(String key, BlobCacheInfo info, Duration ttl)

/**
* Store a blob cache location only if the specified key does not exit
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ class BlobCacheServiceImpl implements BlobCacheService, JobHandler {
}
catch (Throwable t) {
log.warn "== Blob cache failed for object '${blob.objectUri}' - cause: ${t.message}", t
final result = blob.failed(t.message)
final result = blob.errored(t.message)
// update the blob status
blobStore.storeBlob(blob.id(), result, blobConfig.failureDuration)
blobStore.storeBlob(blob.id(), result)
}
}

Expand Down Expand Up @@ -295,28 +295,23 @@ class BlobCacheServiceImpl implements BlobCacheService, JobHandler {
}

protected void handleJobCompletion(JobSpec job, BlobCacheInfo blob, JobState state) {
// use a short time-to-live for failed downloads
// this is needed to allow re-try caching of failure transfers
final ttl = state.succeeded()
? blobConfig.statusDuration
: blobConfig.failureDuration
// update the blob status
final result = state.succeeded()
? blob.completed(state.exitCode, state.stdout)
: blob.failed(state.stdout)
blobStore.storeBlob(blob.id(), result, ttl)
log.debug "== Blob cache completed for object '${blob.objectUri}'; id=${blob.objectUri}; status=${result.exitStatus}; duration=${result.duration()}"
: blob.errored(state.stdout)
blobStore.storeBlob(blob.id(), result)
log.debug "== Blob cache completed for object '${blob.objectUri}'; operation=${job.operationName}; status=${result.exitStatus}; duration=${result.duration()}"
}

protected void handleJobException(JobSpec job, BlobCacheInfo blob, Throwable error) {
final result = blob.failed("Unexpected error caching blob '${blob.locationUri}' - operation '${job.operationName}'")
final result = blob.errored("Unexpected error caching blob '${blob.locationUri}' - operation '${job.operationName}'")
log.error("== Blob cache exception for object '${blob.objectUri}'; operation=${job.operationName}; cause=${error.message}", error)
blobStore.storeBlob(blob.id(), result, blobConfig.failureDuration)
blobStore.storeBlob(blob.id(), result)
}

protected void handleJobTimeout(JobSpec job, BlobCacheInfo blob) {
final result = blob.failed("Blob cache transfer timed out ${blob.objectUri}")
final result = blob.errored("Blob cache transfer timed out ${blob.objectUri}")
log.warn "== Blob cache completed for object '${blob.objectUri}'; operation=${job.operationName}; duration=${result.duration()}"
blobStore.storeBlob(blob.id(), result, blobConfig.failureDuration)
blobStore.storeBlob(blob.id(), result)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,10 @@ class BlobCacheStore extends AbstractCacheStore<BlobCacheInfo> implements BlobSt

@Override
void storeBlob(String key, BlobCacheInfo info) {
put(key, info)
}

@Override
void storeBlob(String key, BlobCacheInfo info, Duration ttl) {
final ttl = info.state == BlobCacheInfo.State.ERRORED
? blobConfig.failureDuration
: blobConfig.statusDuration
put(key, info, ttl)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import javax.annotation.Nullable
import groovy.transform.CompileStatic
import groovy.transform.ToString
import io.micronaut.context.annotation.Value
import io.seqera.wave.util.DurationUtils
import jakarta.inject.Singleton

/**
Expand Down Expand Up @@ -55,4 +56,7 @@ class CleanupConfig {
@Value('${wave.cleanup.run-interval:30s}')
Duration cleanupRunInterval

Duration getCleanupStartupDelayRandomized() {
DurationUtils.randomDuration(cleanupStartupDelay, 0.4f)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ class CleanupServiceImpl implements Runnable, CleanupService {
@PostConstruct
private init() {
log.info "Creating cleanup service - config=$config"
scheduler.scheduleAtFixedRate(config.cleanupStartupDelay, config.cleanupRunInterval, this)
// use randomize initial delay to prevent multiple replicas running at the same time
scheduler.scheduleWithFixedDelay(
config.cleanupStartupDelayRandomized,
config.cleanupRunInterval,
this )
}

@Override
Expand Down
17 changes: 11 additions & 6 deletions src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,19 @@ class K8sServiceImpl implements K8sService {
.readNamespacedJob(name, namespace, null)
if( !job )
return null
if( job.status.active )
return JobStatus.Pending
if( job.status.succeeded )
return JobStatus.Succeeded
if( job.status.failed )
return JobStatus.Failed
else
return null
if( job.status.active )
return JobStatus.Pending
if( job.status.failed ) {
if( job.status.completionTime!=null )
return JobStatus.Failed
if( job.status.failed > job.spec.backoffLimit )
return JobStatus.Failed
else
return JobStatus.Pending
}
return null
}

/**
Expand Down
8 changes: 7 additions & 1 deletion src/main/groovy/io/seqera/wave/tower/auth/JwtConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import java.time.Duration
import groovy.transform.CompileStatic
import groovy.transform.ToString
import io.micronaut.context.annotation.Value
import io.seqera.wave.util.DurationUtils
import jakarta.inject.Singleton

/**
*
* Model JWT refreshing service config
*
* @author Paolo Di Tommaso <[email protected]>
*/
@Singleton
Expand Down Expand Up @@ -59,4 +61,8 @@ class JwtConfig {
@Value('${wave.jwt.monitor.count:10}')
int monitorCount

Duration getMonitorDelayRandomized() {
DurationUtils.randomDuration(monitorDelay, 0.4f)
}

}
6 changes: 5 additions & 1 deletion src/main/groovy/io/seqera/wave/tower/auth/JwtMonitor.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ class JwtMonitor implements Runnable {
@PostConstruct
private init() {
log.info "Creating JWT heartbeat - $jwtConfig"
scheduler.scheduleAtFixedRate(jwtConfig.monitorDelay, jwtConfig.monitorInterval, this)
// use randomize initial delay to prevent multiple replicas running at the same time
scheduler.scheduleAtFixedRate(
jwtConfig.monitorDelayRandomized,
jwtConfig.monitorInterval,
this )
}

void run() {
Expand Down
Loading

0 comments on commit fe8121f

Please sign in to comment.