Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Container mirror #646

Merged
merged 33 commits into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
49b38c5
wip
pditommaso Sep 14, 2024
6f0d5c3
Merge branch 'master' into container-mirror-v2
pditommaso Sep 14, 2024
6d23ae1
wip
pditommaso Sep 15, 2024
c1e3ce6
Fix failing tests
pditommaso Sep 15, 2024
b17c33c
Add validation
pditommaso Sep 16, 2024
9acd7cd
Fix failing test
pditommaso Sep 16, 2024
bde22be
Merge branch 'master' into container-mirror-v2
pditommaso Sep 16, 2024
702525e
Merge branch 'master' into container-mirror-v2
pditommaso Sep 17, 2024
6e232e4
Merge branch 'master' into container-mirror-v2
pditommaso Sep 18, 2024
ee860ec
Merge branch 'master' into container-mirror-v2
pditommaso Sep 18, 2024
f21eabc
Merge branch 'master' into container-mirror-v2
pditommaso Sep 18, 2024
5f7f659
Merge branch 'master' into container-mirror-v2
pditommaso Sep 18, 2024
1be2b39
Minor change
pditommaso Sep 18, 2024
1afb2bd
Minor changes
pditommaso Sep 18, 2024
bb12769
Add tests
pditommaso Sep 18, 2024
f41430c
Merge branch 'master' into container-mirror-v2
pditommaso Sep 18, 2024
583d59c
Make cache store stateId aware
pditommaso Sep 19, 2024
083c921
Implements MirrorResult as StateRecord
pditommaso Sep 19, 2024
8e81201
Merge branch 'master' into container-mirror-v2
pditommaso Sep 19, 2024
2437fb1
Merge branch 'master' into container-mirror-v2
pditommaso Sep 19, 2024
55bc991
Add tests
pditommaso Sep 19, 2024
e288483
More docs & refactoring
pditommaso Sep 19, 2024
5d843e2
Refactoring and fixes
pditommaso Sep 19, 2024
483e63c
Minor changes
pditommaso Sep 19, 2024
c82f403
More validation
pditommaso Sep 19, 2024
b2f3e96
Add kube strategy
pditommaso Sep 19, 2024
e49c62e
[release] bump version 1.12.3
pditommaso Sep 22, 2024
423ff9b
Merge branch 'master' into container-mirror-v2
pditommaso Sep 22, 2024
0f6c49f
Remove unused method
pditommaso Sep 22, 2024
891468b
Minor changes
pditommaso Sep 22, 2024
b1deca6
Minor changes
pditommaso Sep 22, 2024
335291f
Update src/main/groovy/io/seqera/wave/tower/auth/JwtAuth.groovy [ci s…
pditommaso Sep 22, 2024
268054f
Update src/main/groovy/io/seqera/wave/service/builder/KubeBuildStrate…
pditommaso Sep 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.12.2
1.12.3
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies {
compileOnly("io.micronaut:micronaut-http-validation")
implementation("jakarta.persistence:jakarta.persistence-api:3.0.0")
api 'io.seqera:lib-mail:1.0.0'
api 'io.seqera:wave-api:0.10.0'
api 'io.seqera:wave-api:0.12.0'
api 'io.seqera:wave-utils:0.13.1'

implementation("io.micronaut:micronaut-http-client")
Expand Down
4 changes: 4 additions & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Wave changelog
1.12.3 - 22 Sep 2024
- Fix build status completion of submit exception [3c3af360]
- Fix singularity build mounts [3b338b29]

1.12.2 - 18 Sep 2024
- Fix Remove entries permanently from stream once consumed [adfad9d6]
- Refactor container build service [1a858c12]
Expand Down
31 changes: 26 additions & 5 deletions src/main/groovy/io/seqera/wave/controller/BuildController.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ import io.micronaut.http.server.types.files.StreamedFile
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import io.seqera.wave.api.BuildStatusResponse
import io.seqera.wave.exception.BadRequestException
import io.seqera.wave.service.builder.ContainerBuildService
import io.seqera.wave.service.logs.BuildLogService
import io.seqera.wave.service.mirror.ContainerMirrorService
import io.seqera.wave.service.mirror.MirrorRequest
import io.seqera.wave.service.persistence.WaveBuildRecord
import jakarta.inject.Inject
/**
Expand All @@ -48,12 +51,15 @@ class BuildController {
@Inject
private ContainerBuildService buildService

@Inject
private ContainerMirrorService mirrorService

@Inject
@Nullable
BuildLogService logService

@Get("/v1alpha1/builds/{buildId}")
HttpResponse<WaveBuildRecord> getBuildRecord(String buildId){
HttpResponse<WaveBuildRecord> getBuildRecord(String buildId) {
final record = buildService.getBuildRecord(buildId)
return record
? HttpResponse.ok(record)
Expand All @@ -72,11 +78,26 @@ class BuildController {
}

@Get("/v1alpha1/builds/{buildId}/status")
HttpResponse<BuildStatusResponse> getBuildStatus(String buildId){
final build = buildService.getBuildRecord(buildId)
build != null
? HttpResponse.ok(build.toStatusResponse())
HttpResponse<BuildStatusResponse> getBuildStatus(String buildId) {
final resp = buildResponse0(buildId)
resp != null
? HttpResponse.ok(resp)
: HttpResponse.<BuildStatusResponse>notFound()
}

protected BuildStatusResponse buildResponse0(String buildId) {
if( !buildId )
throw new BadRequestException("Missing 'buildId' parameter")
// build IDs starting with the `mr-` prefix are interpreted as mirror requests
if( buildId.startsWith(MirrorRequest.ID_PREFIX) ) {
return mirrorService
.getMirrorState(buildId)
?.toStatusResponse()
}
else {
return buildService
.getBuildRecord(buildId)
?.toStatusResponse()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ import io.seqera.wave.service.builder.ContainerBuildService
import io.seqera.wave.service.builder.FreezeService
import io.seqera.wave.service.inclusion.ContainerInclusionService
import io.seqera.wave.service.inspect.ContainerInspectService
import io.seqera.wave.service.mirror.ContainerMirrorService
import io.seqera.wave.service.mirror.MirrorRequest
import io.seqera.wave.service.pairing.PairingService
import io.seqera.wave.service.pairing.socket.PairingChannel
import io.seqera.wave.service.persistence.PersistenceService
Expand Down Expand Up @@ -125,7 +127,7 @@ class ContainerController {
ContainerBuildService buildService

@Inject
ContainerInspectService dockerAuthService
ContainerInspectService inspectService

@Inject
RegistryProxyService registryProxyService
Expand All @@ -152,6 +154,9 @@ class ContainerController {
@Nullable
RateLimiterService rateLimiterService

@Inject
private ContainerMirrorService mirrorService

@PostConstruct
private void init() {
log.info "Wave server url: $serverUrl; allowAnonymous: $allowAnonymous; tower-endpoint-url: $towerEndpointUrl; default-build-repo: $buildConfig.defaultBuildRepository; default-cache-repo: $buildConfig.defaultCacheRepository; default-public-repo: $buildConfig.defaultPublicRepository"
Expand All @@ -176,6 +181,7 @@ class ContainerController {

// validate request
validateContainerRequest(req)
validateMirrorRequest(req, v2)

// this is needed for backward compatibility with old clients
if( !req.towerEndpoint ) {
Expand Down Expand Up @@ -325,7 +331,7 @@ class ContainerController {
? buildConfig.defaultPublicRepository
: buildConfig.defaultBuildRepository), req.nameStrategy)
final cacheRepository = req.cacheRepository ?: buildConfig.defaultCacheRepository
final configJson = dockerAuthService.credentialsConfigJson(containerSpec, buildRepository, cacheRepository, identity)
final configJson = inspectService.credentialsConfigJson(containerSpec, buildRepository, cacheRepository, identity)
final containerConfig = req.freeze ? req.containerConfig : null
final offset = DataTimeUtils.offsetId(req.timestamp)
final scanId = scanEnabled && format==DOCKER ? LongRndKey.rndHex() : null
Expand Down Expand Up @@ -414,6 +420,15 @@ class ContainerController {
buildId = track.id
buildNew = !track.cached
}
else if( req.mirrorRegistry ) {
final mirror = makeMirrorRequest(req, identity)
final track = checkMirror(mirror, identity, req.dryRun)
targetImage = track.targetImage
targetContent = null
condaContent = null
buildId = track.id
buildNew = !track.cached
}
else if( req.containerImage ) {
// normalize container image
final coords = ContainerCoordinates.parse(req.containerImage)
Expand All @@ -435,7 +450,51 @@ class ContainerController {
ContainerPlatform.of(req.containerPlatform),
buildId,
buildNew,
req.freeze )
req.freeze,
req.mirrorRegistry!=null
)
}

protected MirrorRequest makeMirrorRequest(SubmitContainerTokenRequest request, PlatformId identity) {
final coords = ContainerCoordinates.parse(request.containerImage)
if( coords.registry == request.mirrorRegistry )
throw new BadRequestException("Source and target mirror registry as the same - offending value '${request.mirrorRegistry}'")
final targetImage = request.mirrorRegistry + '/' + coords.imageAndTag
final configJson = inspectService.credentialsConfigJson(null, request.containerImage, targetImage, identity)
final platform = request.containerPlatform
? ContainerPlatform.of(request.containerPlatform)
: ContainerPlatform.DEFAULT
final digest = registryProxyService.getImageDigest(request.containerImage, identity)
if( !digest )
throw new BadRequestException("Container image '$request.containerImage' does not exist")
return MirrorRequest.create(
request.containerImage,
targetImage,
digest,
platform,
Path.of(buildConfig.buildWorkspace).toAbsolutePath(),
configJson )
}

protected BuildTrack checkMirror(MirrorRequest request, PlatformId identity, boolean dryRun) {
final targetDigest = registryProxyService.getImageDigest(request.targetImage, identity)
log.debug "== Mirror target digest: $targetDigest"
final cached = request.digest==targetDigest
// check for dry-run execution
if( dryRun ) {
log.debug "== Dry-run request request: $request"
final dryId = request.id + BuildRequest.SEP + '0'
return new BuildTrack(dryId, request.targetImage, cached)
}
// check for existing image
if( request.digest==targetDigest ) {
log.debug "== Found cached request for request: $request"
final cache = persistenceService.loadMirrorState(request.targetImage, targetDigest)
return new BuildTrack(cache?.mirrorId, request.targetImage, true)
}
else {
return mirrorService.mirrorImage(request)
}
}

protected String targetImage(String token, ContainerCoordinates container) {
Expand All @@ -461,7 +520,7 @@ class ContainerController {
return HttpResponse.ok()
}

void validateContainerRequest(SubmitContainerTokenRequest req) throws BadRequestException{
void validateContainerRequest(SubmitContainerTokenRequest req) throws BadRequestException {
String msg
// check valid image name
msg = validationService.checkContainerName(req.containerImage)
Expand All @@ -474,6 +533,32 @@ class ContainerController {
if( msg ) throw new BadRequestException(msg)
}

void validateMirrorRequest(SubmitContainerTokenRequest req, boolean v2) throws BadRequestException {
if( !req.mirrorRegistry )
return
// container mirror validation
if( !v2 )
throw new BadRequestException("Container mirroring requires the use of v2 API")
if( !req.containerImage )
throw new BadRequestException("Attribute `containerImage` is required when specifying `mirrorRegistry`")
if( !req.towerAccessToken )
throw new BadRequestException("Container mirroring requires an authenticated request - specify the tower token attribute")
if( req.freeze )
throw new BadRequestException("Attribute `mirrorRegistry` and `freeze` conflict each other")
if( req.containerFile )
throw new BadRequestException("Attribute `mirrorRegistry` and `containerFile` conflict each other")
if( req.containerIncludes )
throw new BadRequestException("Attribute `mirrorRegistry` and `containerIncludes` conflict each other")
if( req.containerConfig )
throw new BadRequestException("Attribute `mirrorRegistry` and `containerConfig` conflict each other")
final coords = ContainerCoordinates.parse(req.containerImage)
if( coords.registry == req.mirrorRegistry )
throw new BadRequestException("Source and target mirror registry as the same - offending value '${req.mirrorRegistry}'")
def msg = validationService.checkMirrorRegistry(req.mirrorRegistry)
if( msg )
throw new BadRequestException(msg)
}

@Error(exception = AuthorizationException.class)
HttpResponse<?> handleAuthorizationException() {
return HttpResponse.unauthorized()
Expand Down
53 changes: 53 additions & 0 deletions src/main/groovy/io/seqera/wave/controller/MirrorController.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Wave, containers provisioning service
* Copyright (c) 2023-2024, Seqera Labs
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

package io.seqera.wave.controller

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.http.HttpResponse
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import io.seqera.wave.service.mirror.ContainerMirrorService
import io.seqera.wave.service.mirror.MirrorState
import jakarta.inject.Inject
/**
* Implements a controller for container mirror apis
*
* @author Paolo Di Tommaso <[email protected]>
*/
@Slf4j
@CompileStatic
@Controller("/")
@ExecuteOn(TaskExecutors.IO)
class MirrorController {

@Inject
private ContainerMirrorService mirrorService

@Get("/v1alpha1/mirrors/{mirrorId}")
HttpResponse<MirrorState> getMirrorRecord(String mirrorId) {
final result = mirrorService.getMirrorState(mirrorId)
return result
? HttpResponse.ok(result)
: HttpResponse.<MirrorState>notFound()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,15 @@ class RegistryProxyService {
}

String getImageDigest(BuildRequest request, boolean retryOnNotFound=false) {
return getImageDigest(request.targetImage, request.identity, retryOnNotFound)
}

String getImageDigest(String containerImage, PlatformId identity, boolean retryOnNotFound=false) {
try {
return getImageDigest0(request.targetImage, request.identity, retryOnNotFound)
return getImageDigest0(containerImage, identity, retryOnNotFound)
}
catch(Exception e) {
log.warn "Unable to retrieve digest for image '${request.targetImage}' -- cause: ${e.message}"
log.warn "Unable to retrieve digest for image '${containerImage}' -- cause: ${e.message}"
return null
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class ContainerRequestData {
final String buildId
final Boolean buildNew
final Boolean freeze
final Boolean mirror

boolean durable() {
return freeze || mirror
}

PlatformId getIdentity() {
return identity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package io.seqera.wave.service.cache

import java.time.Duration

import groovy.transform.CompileStatic
import io.seqera.wave.encoder.EncodingStrategy
import io.seqera.wave.service.cache.impl.CacheProvider

Expand All @@ -28,6 +29,7 @@ import io.seqera.wave.service.cache.impl.CacheProvider
*
* @author Paolo Di Tommaso <[email protected]>
*/
@CompileStatic
abstract class AbstractCacheStore<V> implements CacheStore<String,V> {

private EncodingStrategy<V> encodingStrategy
Expand All @@ -45,6 +47,10 @@ abstract class AbstractCacheStore<V> implements CacheStore<String,V> {

protected String key0(String k) { return getPrefix() + k }

protected String recordId0(String recordId) {
return getPrefix() + 'state-id/' + recordId
}

protected V deserialize(String encoded) {
return encodingStrategy.decode(encoded)
}
Expand All @@ -63,32 +69,34 @@ abstract class AbstractCacheStore<V> implements CacheStore<String,V> {
return result ? deserialize(result) : null
}

V getByRecordId(String recordId) {
final key = delegate.get(recordId0(recordId))
return get(key)
}

void put(String key, V value) {
delegate.put(key0(key), serialize(value), getDuration())
put(key, value, getDuration())
}

@Override
void put(String key, V value, Duration ttl) {
delegate.put(key0(key), serialize(value), ttl)
if( value instanceof StateRecord ) {
delegate.put(recordId0(value.getRecordId()), key, ttl)
}
}

@Override
boolean putIfAbsent(String key, V value, Duration ttl) {
delegate.putIfAbsent(key0(key), serialize(value), ttl)
final result = delegate.putIfAbsent(key0(key), serialize(value), ttl)
if( result && value instanceof StateRecord ) {
delegate.put(recordId0(value.getRecordId()), key, ttl)
}
return result
}

boolean putIfAbsent(String key, V value) {
delegate.putIfAbsent(key0(key), serialize(value), getDuration())
}

@Override
V putIfAbsentAndGetCurrent(String key, V value, Duration ttl) {
final result = delegate.putIfAbsentAndGetCurrent(key0(key), serialize(value), ttl)
return result? deserialize(result) : null
}

V putIfAbsentAndGetCurrent(String key, V value) {
return putIfAbsentAndGetCurrent(key, value, getDuration())
return putIfAbsent(key, value, getDuration())
}

@Override
Expand Down
Loading