Skip to content

Commit

Permalink
Merge branch 'bugfix/CLDSRV-592' into tmp/octopus/w/9.0/bugfix/CLDSRV…
Browse files Browse the repository at this point in the history
  • Loading branch information
bert-e committed Jan 9, 2025
2 parents 827afb2 + 7e9954d commit 4ac4b30
Show file tree
Hide file tree
Showing 6 changed files with 317 additions and 28 deletions.
1 change: 1 addition & 0 deletions lib/api/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ const api = {

const requestContexts = prepareRequestContexts(apiMethod, request,
sourceBucket, sourceObject, sourceVersionId);

// Extract all the _apiMethods and store them in an array
const apiMethods = requestContexts ? requestContexts.map(context => context._apiMethod) : [];
// Attach the names to the current request
Expand Down
9 changes: 9 additions & 0 deletions lib/api/apiUtils/authorization/prepareRequestContexts.js
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,15 @@ function prepareRequestContexts(apiMethod, request, sourceBucket,
requestContexts.push(requestContext);
}

if (apiMethod === 'completeMultipartUpload' || apiMethod === 'multipartDelete') {
// Request account quotas explicitly for MPU requests, to consider parts cleanup
// NOTE: we need quota for these, but it will be evaluated at the end of the API,
// once the parts have actually been deleted (not via standardMetadataValidateBucketAndObj)
requestContexts.forEach(context => {
context._needQuota = true; // eslint-disable-line no-param-reassign
});
}

return requestContexts;
}

Expand Down
18 changes: 17 additions & 1 deletion lib/api/apiUtils/object/abortMultipartUpload.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const { data } = require('../../../data/wrapper');
const locationConstraintCheck = require('../object/locationConstraintCheck');
const { standardMetadataValidateBucketAndObj } =
require('../../../metadata/metadataUtils');
const { validateQuotas } = require('../quotas/quotaUtils');
const services = require('../../../services');
const metadata = require('../../../metadata/wrapper');

Expand Down Expand Up @@ -138,7 +139,22 @@ function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
}
cb();
});
}, () => next(null, mpuBucket, storedParts, destBucket));
}, () => {
const length = storedParts.reduce((length, loc) => length + loc.value.Size, 0);
return validateQuotas(request, destBucket, request.accountQuotas,
['objectDelete'], 'objectDelete', -length, false, log, err => {
if (err) {
// Ignore error, as the data has been deleted already: only inflight count
// has not been updated, and will be eventually consistent anyway
log.warn('failed to update inflights', {
method: 'abortMultipartUpload',
locations,
error: err,
});
}
next(null, mpuBucket, storedParts, destBucket);
});
});
},
function deleteShadowObjectMetadata(mpuBucket, storedParts, destBucket, next) {
let splitter = constants.splitter;
Expand Down
72 changes: 45 additions & 27 deletions lib/api/completeMultipartUpload.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const locationKeysHaveChanged
= require('./apiUtils/object/locationKeysHaveChanged');
const { setExpirationHeaders } = require('./apiUtils/object/expirationHeaders');
const { validatePutVersionId } = require('./apiUtils/object/coldStorage');
const { validateQuotas } = require('./apiUtils/quotas/quotaUtils');

const versionIdUtils = versioning.VersionID;

Expand Down Expand Up @@ -213,12 +214,13 @@ function completeMultipartUpload(authInfo, request, log, callback) {
return next(err, destBucket);
}
const storedParts = result.Contents;
const totalMPUSize = storedParts.reduce((acc, part) => acc + part.value.Size, 0);
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, location, mpuOverviewKey);
jsonList, storedMetadata, location, mpuOverviewKey, totalMPUSize);
});
},
function completeExternalMpu(destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, location, mpuOverviewKey, next) {
jsonList, storedMetadata, location, mpuOverviewKey, totalMPUSize, next) {
const mdInfo = { storedParts, mpuOverviewKey, splitter };
const mpuInfo =
{ objectKey, uploadId, jsonList, bucketName, destBucket };
Expand All @@ -236,16 +238,17 @@ function completeMultipartUpload(authInfo, request, log, callback) {
}
// if mpu not handled externally, completeObjData will be null
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey);
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
totalMPUSize);
});
},
function validateAndFilterParts(destBucket, objMD, mpuBucket,
storedParts, jsonList, storedMetadata, completeObjData, mpuOverviewKey,
next) {
totalMPUSize, next) {
if (completeObjData) {
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
completeObjData.filteredPartsObj);
completeObjData.filteredPartsObj, totalMPUSize);
}
const filteredPartsObj = validateAndFilterMpuParts(storedParts,
jsonList, mpuOverviewKey, splitter, log);
Expand All @@ -254,11 +257,11 @@ function completeMultipartUpload(authInfo, request, log, callback) {
}
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
filteredPartsObj);
filteredPartsObj, totalMPUSize);
},
function processParts(destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
filteredPartsObj, next) {
filteredPartsObj, totalMPUSize, next) {
// if mpu was completed on backend that stored mpu MD externally,
// skip MD processing steps
if (completeObjData && skipMpuPartProcessing(completeObjData)) {
Expand All @@ -276,7 +279,7 @@ function completeMultipartUpload(authInfo, request, log, callback) {
const calculatedSize = completeObjData.contentLength;
return next(null, destBucket, objMD, mpuBucket, storedMetadata,
completeObjData.eTag, calculatedSize, dataLocations,
[mpuOverviewKey], null, completeObjData);
[mpuOverviewKey], null, completeObjData, totalMPUSize);
}

const partsInfo =
Expand All @@ -300,15 +303,15 @@ function completeMultipartUpload(authInfo, request, log, callback) {
];
return next(null, destBucket, objMD, mpuBucket, storedMetadata,
aggregateETag, calculatedSize, dataLocations, keysToDelete,
extraPartLocations, completeObjData);
extraPartLocations, completeObjData, totalMPUSize);
}
return next(null, destBucket, objMD, mpuBucket, storedMetadata,
aggregateETag, calculatedSize, dataLocations, keysToDelete,
extraPartLocations, null);
extraPartLocations, null, totalMPUSize);
},
function prepForStoring(destBucket, objMD, mpuBucket, storedMetadata,
aggregateETag, calculatedSize, dataLocations, keysToDelete,
extraPartLocations, completeObjData, next) {
extraPartLocations, completeObjData, totalMPUSize, next) {
const metaHeaders = {};
const keysNotNeeded =
['initiator', 'partLocations', 'key',
Expand All @@ -321,6 +324,8 @@ function completeMultipartUpload(authInfo, request, log, callback) {
metaHeaders[item] = storedMetadata[item];
});

const droppedMPUSize = totalMPUSize - calculatedSize;

const metaStoreParams = {
authInfo,
objectKey,
Expand Down Expand Up @@ -380,7 +385,7 @@ function completeMultipartUpload(authInfo, request, log, callback) {
return process.nextTick(() => next(null, destBucket, dataLocations,
metaStoreParams, mpuBucket, keysToDelete, aggregateETag,
objMD, extraPartLocations, pseudoCipherBundle,
completeObjData, options));
completeObjData, options, droppedMPUSize));
}

if (!destBucket.isVersioningEnabled() && objMD?.archive?.archiveInfo) {
Expand All @@ -403,13 +408,13 @@ function completeMultipartUpload(authInfo, request, log, callback) {
return next(null, destBucket, dataLocations,
metaStoreParams, mpuBucket, keysToDelete, aggregateETag,
objMD, extraPartLocations, pseudoCipherBundle,
completeObjData, options);
completeObjData, options, droppedMPUSize);
});
},
function storeAsNewObj(destinationBucket, dataLocations,
metaStoreParams, mpuBucket, keysToDelete, aggregateETag, objMD,
extraPartLocations, pseudoCipherBundle,
completeObjData, options, next) {
completeObjData, options, droppedMPUSize, next) {
const dataToDelete = options.dataToDelete;
/* eslint-disable no-param-reassign */
metaStoreParams.versionId = options.versionId;
Expand Down Expand Up @@ -461,7 +466,7 @@ function completeMultipartUpload(authInfo, request, log, callback) {
return next(null, mpuBucket, keysToDelete, aggregateETag,
extraPartLocations, destinationBucket,
// pass the original version ID as generatedVersionId
objMD.versionId);
objMD.versionId, droppedMPUSize);
}
}
return services.metadataStoreObject(destinationBucket.getName(),
Expand Down Expand Up @@ -499,31 +504,44 @@ function completeMultipartUpload(authInfo, request, log, callback) {
}
return next(null, mpuBucket, keysToDelete,
aggregateETag, extraPartLocations,
destinationBucket, generatedVersionId);
destinationBucket, generatedVersionId,
droppedMPUSize);
});
}
return next(null, mpuBucket, keysToDelete, aggregateETag,
extraPartLocations, destinationBucket,
generatedVersionId);
generatedVersionId, droppedMPUSize);
});
},
function deletePartsMetadata(mpuBucket, keysToDelete, aggregateETag,
extraPartLocations, destinationBucket, generatedVersionId, next) {
extraPartLocations, destinationBucket, generatedVersionId, droppedMPUSize, next) {
services.batchDeleteObjectMetadata(mpuBucket.getName(),
keysToDelete, log, err => next(err, extraPartLocations,
destinationBucket, aggregateETag, generatedVersionId));
destinationBucket, aggregateETag, generatedVersionId, droppedMPUSize));
},
function batchDeleteExtraParts(extraPartLocations, destinationBucket,
aggregateETag, generatedVersionId, next) {
aggregateETag, generatedVersionId, droppedMPUSize, next) {
if (extraPartLocations && extraPartLocations.length > 0) {
return data.batchDelete(extraPartLocations, request.method,
null, log, err => {
if (err) {
return next(err);
}
return next(null, destinationBucket, aggregateETag,
generatedVersionId);
return data.batchDelete(extraPartLocations, request.method, null, log, err => {
if (err) {
return next(err);
}

return validateQuotas(request, destinationBucket, request.accountQuotas,
['objectDelete'], 'objectDelete', -droppedMPUSize, false, log, err => {
if (err) {
// Ignore error, as the data has been deleted already: only inflight count
// has not been updated, and will be eventually consistent anyway
log.warn('failed to update inflights', {
method: 'completeMultipartUpload',
extraPartLocations,
error: err,
});
}
return next(null, destinationBucket, aggregateETag,
generatedVersionId);
});
});
}
return next(null, destinationBucket, aggregateETag,
generatedVersionId);
Expand Down
132 changes: 132 additions & 0 deletions tests/quota/awsNodeSdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -883,4 +883,136 @@ function multiObjectDelete(bucket, keys, size, callback) {
next => deleteBucket(bucket, next),
], done);
});

it('should reduce inflights when completing MPU with fewer parts than uploaded', done => {
const bucket = 'quota-test-bucket-mpu1';
const key = 'quota-test-object';
const parts = 3;
const partSize = 5 * 1024 * 1024;
const totalSize = parts * partSize;
const usedParts = 2;
let uploadId = null;
const ETags = [];

if (!s3Config.isQuotaInflightEnabled()) {
return done();
}

return async.series([
next => createBucket(bucket, false, next),
next => sendRequest(putQuotaVerb, '127.0.0.1:8000', `/${bucket}/?quota=true`,
JSON.stringify({ quota: totalSize * 2 }), config)
.then(() => next()).catch(err => next(err)),
next => s3Client.createMultipartUpload({
Bucket: bucket,
Key: key,
}, (err, data) => {
if (err) {
return next(err);
}
uploadId = data.UploadId;
return next();
}),
next => async.timesSeries(parts, (n, cb) => {
const uploadPartParams = {
Bucket: bucket,
Key: key,
PartNumber: n + 1,
UploadId: uploadId,
Body: Buffer.alloc(partSize),
};
return s3Client.uploadPart(uploadPartParams, (err, data) => {
if (err) {
return cb(err);
}
ETags[n] = data.ETag;
return cb();
});
}, next),
next => wait(inflightFlushFrequencyMS * 2, next),
next => {
// Verify all parts are counted in inflights
assert.strictEqual(scuba.getInflightsForBucket(bucket), totalSize);
return next();
},
next => {
// Complete with only first two parts
const params = {
Bucket: bucket,
Key: key,
MultipartUpload: {
Parts: Array.from({ length: usedParts }, (_, i) => ({
ETag: ETags[i],
PartNumber: i + 1,
})),
},
UploadId: uploadId,
};
return s3Client.completeMultipartUpload(params, next);
},
next => wait(inflightFlushFrequencyMS * 2, () => next()),
next => {
// Verify inflights reduced by dropped part
const expectedInflights = usedParts * partSize;
assert.strictEqual(scuba.getInflightsForBucket(bucket), expectedInflights);
return next();
},
next => deleteObject(bucket, key, usedParts * partSize, next),
next => deleteBucket(bucket, next),
], done);
});

it('should reduce inflights when aborting MPU', done => {
const bucket = 'quota-test-bucket-mpu2';
const key = 'quota-test-object';
const parts = 3;
const partSize = 5 * 1024 * 1024;
const totalSize = parts * partSize;
let uploadId = null;

if (!s3Config.isQuotaInflightEnabled()) {
return done();
}

return async.series([
next => createBucket(bucket, false, next),
next => sendRequest(putQuotaVerb, '127.0.0.1:8000', `/${bucket}/?quota=true`,
JSON.stringify({ quota: totalSize * 2 }), config)
.then(() => next()).catch(err => next(err)),
next => s3Client.createMultipartUpload({
Bucket: bucket,
Key: key,
}, (err, data) => {
if (err) {
return next(err);
}
uploadId = data.UploadId;
return next();
}),
next => async.timesSeries(parts, (n, cb) => {
const uploadPartParams = {
Bucket: bucket,
Key: key,
PartNumber: n + 1,
UploadId: uploadId,
Body: Buffer.alloc(partSize),
};
return s3Client.uploadPart(uploadPartParams, cb);
}, next),
next => wait(inflightFlushFrequencyMS * 2, next),
next => {
// Verify all parts are counted in inflights
assert.strictEqual(scuba.getInflightsForBucket(bucket), totalSize);
return next();
},
next => abortMPU(bucket, key, uploadId, totalSize, next),
next => wait(inflightFlushFrequencyMS * 2, next),
next => {
// Verify inflights reduced to zero after abort
assert.strictEqual(scuba.getInflightsForBucket(bucket), 0);
return next();
},
next => deleteBucket(bucket, next),
], done);
});
});
Loading

0 comments on commit 4ac4b30

Please sign in to comment.