Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add export tasks #2450

Draft
wants to merge 53 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
753618b
Util classes for data loader
inv-jishnu Dec 4, 2024
8d39d02
Fix spotbug issue
inv-jishnu Dec 4, 2024
bf94c49
Removed error message and added core error
inv-jishnu Dec 6, 2024
47be388
Applied spotless
inv-jishnu Dec 6, 2024
913eb1c
Fixed unit test failures
inv-jishnu Dec 6, 2024
1f204b8
Merge branch 'master' into feat/data-loader/utils
ypeckstadt Dec 11, 2024
6cfa83a
Basic data import enum and exception
inv-jishnu Dec 11, 2024
d381b2b
Removed exception class for now
inv-jishnu Dec 11, 2024
67f2474
Added DECIMAL_FORMAT
inv-jishnu Dec 12, 2024
14e3593
Path util class updated
inv-jishnu Dec 12, 2024
a096d51
Feedback changes
inv-jishnu Dec 13, 2024
dbf1940
Merge branch 'master' into feat/data-loader/utils
ypeckstadt Dec 13, 2024
cd8add9
Merge branch 'master' into feat/data-loader/utils
ypeckstadt Dec 16, 2024
52890c8
Changes
inv-jishnu Dec 16, 2024
5114639
Merge branch 'master' into feat/data-loader/import-data-1
inv-jishnu Dec 17, 2024
4f9cd75
Merge branch 'feat/data-loader/utils' into feat/data-loader/scaladb-dao
inv-jishnu Dec 17, 2024
1997eb8
Added ScalarDB Dao
inv-jishnu Dec 17, 2024
91e6310
Merge branch 'master' into feat/data-loader/scaladb-dao
inv-jishnu Dec 17, 2024
8a7338b
Remove unnecessary files
inv-jishnu Dec 17, 2024
2b52eeb
Initial commit [skip ci]
inv-jishnu Dec 17, 2024
e206073
Changes
inv-jishnu Dec 17, 2024
26d3144
Changes
inv-jishnu Dec 18, 2024
b86487d
spotbugs exclude
inv-jishnu Dec 18, 2024
818a2b4
spotbugs exclude -2
inv-jishnu Dec 18, 2024
90c4105
Added a file [skip ci]
inv-jishnu Dec 18, 2024
3d5d3e0
Added unit test files [skip ci]
inv-jishnu Dec 18, 2024
6495202
Spotbug fixes
inv-jishnu Dec 19, 2024
90abd9e
Removed use of List.of to fix CI error
inv-jishnu Dec 19, 2024
ba2b3dd
Merged changes from master after resolving conflict
inv-jishnu Dec 19, 2024
b1b811b
Merge branch 'master' into feat/data-loader/metadata-service
inv-jishnu Dec 19, 2024
30db988
Applied spotless
inv-jishnu Dec 19, 2024
e9bb004
Added export options validator
inv-jishnu Dec 19, 2024
03324e1
Minor change in test
inv-jishnu Dec 19, 2024
d6aaf85
Applied spotless on CoreError
inv-jishnu Dec 19, 2024
4439dea
Make constructor private and improve javadocs
ypeckstadt Dec 19, 2024
ccb1ace
Improve javadocs
ypeckstadt Dec 20, 2024
a374f1a
Add private constructor to TableMetadataUtil
ypeckstadt Dec 20, 2024
a65c9b5
Apply spotless fix
ypeckstadt Dec 20, 2024
b3279ba
Fix the validation for partition and clustering keys
ypeckstadt Dec 23, 2024
78a8170
Fix spotless format
ypeckstadt Dec 23, 2024
acedabe
Partial feedback changes
inv-jishnu Dec 24, 2024
a95a858
Resolved conflicts and merged latest changes from main
inv-jishnu Dec 26, 2024
c05286d
Merge branch 'feat/data-loader/scaladb-dao' into feat/data-loader/exp…
inv-jishnu Jan 2, 2025
0d3f79e
Export tasks added
inv-jishnu Jan 2, 2025
2365460
Merge branch 'feat/data-loader/metadata-service' into feat/data-loade…
inv-jishnu Jan 2, 2025
95022a9
Initial commit [skip ci]
inv-jishnu Jan 2, 2025
89fea78
Added changes
inv-jishnu Jan 6, 2025
29a8c25
Fix spot less issue
inv-jishnu Jan 6, 2025
45adc95
Merge branch 'master' into feat/data-loader/export-tasks
inv-jishnu Jan 6, 2025
f6c54ec
Updated test code to remove warning
inv-jishnu Jan 13, 2025
b92758c
Merged latest changes from main after resolving conflicts
inv-jishnu Jan 13, 2025
9c4ae23
Resolved conflicts and merge latest changes from master
inv-jishnu Jan 28, 2025
c9d01cb
Added default case in switch to resolve sportbugs warning
inv-jishnu Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,12 @@ public enum CoreError implements ScalarDbError {
"The underlying-storage data type %s is not supported as the ScalarDB %s data type: %s",
"",
""),
DATA_LOADER_VALUE_TO_STRING_CONVERSION_FAILED(
Category.USER_ERROR,
"0168",
"Something went wrong while converting the ScalarDB values to strings. The table metadata and Value datatype probably do not match. Details: %s",
"",
""),

//
// Errors for the concurrency error category
Expand Down Expand Up @@ -1011,6 +1017,18 @@ public enum CoreError implements ScalarDbError {
"Handling the before-preparation snapshot hook failed. Details: %s",
"",
""),
DATA_LOADER_ERROR_CRUD_EXCEPTION(
Category.INTERNAL_ERROR,
"0047",
"Something went wrong while trying to save the data. Details %s",
"",
""),
DATA_LOADER_ERROR_SCAN(
Category.INTERNAL_ERROR,
"0048",
"Something went wrong while scanning. Are you sure you are running in the correct transaction mode? Details %s",
"",
""),

//
// Errors for the unknown transaction status error category
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.scalar.db.dataloader.core;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

public class DataLoaderObjectMapper extends ObjectMapper {

public DataLoaderObjectMapper() {
super();
this.setSerializationInclusion(JsonInclude.Include.NON_NULL);
this.registerModule(new JavaTimeModule());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
package com.scalar.db.dataloader.core.dataexport;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scanner;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTask;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataexport.validation.ExportOptionsValidationException;
import com.scalar.db.dataloader.core.dataexport.validation.ExportOptionsValidator;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException;
import com.scalar.db.dataloader.core.util.CsvUtil;
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
import com.scalar.db.io.DataType;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RequiredArgsConstructor
public class ExportManager {
private static final Logger logger = LoggerFactory.getLogger(ExportManager.class);

private final DistributedStorage storage;
private final ScalarDBDao dao;
private final ProducerTaskFactory producerTaskFactory;
private final Object lock = new Object();

/**
* Starts the export process
*
* @param exportOptions Export options
* @param tableMetadata Metadata for a single ScalarDB table
* @param writer Writer to write the exported data
*/
public ExportReport startExport(
ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) {
ExportReport exportReport = new ExportReport();
try {
validateExportOptions(exportOptions, tableMetadata);
Map<String, DataType> dataTypeByColumnName =
TableMetadataUtil.extractColumnDataTypes(tableMetadata);
handleTransactionMetadata(exportOptions, tableMetadata);

if (exportOptions.getOutputFileFormat() == FileFormat.CSV
&& !exportOptions.isExcludeHeaderRow()) {
writeCsvHeaderRow(exportOptions, tableMetadata, dataTypeByColumnName, writer);
}

int maxThreadCount =
exportOptions.getMaxThreadCount() == 0
? Runtime.getRuntime().availableProcessors()
: exportOptions.getMaxThreadCount();
ExecutorService executorService = Executors.newFixedThreadPool(maxThreadCount);

BufferedWriter bufferedWriter = new BufferedWriter(writer);
boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON;

try (Scanner scanner = createScanner(exportOptions, dao, storage)) {
if (isJson) {
bufferedWriter.write("[");
}

Iterator<Result> iterator = scanner.iterator();
AtomicBoolean isFirstBatch = new AtomicBoolean(true);

while (iterator.hasNext()) {
List<Result> dataChunk = fetchDataChunk(iterator, exportOptions.getDataChunkSize());
executorService.submit(
() ->
processDataChunk(
exportOptions,
tableMetadata,
dataTypeByColumnName,
dataChunk,
bufferedWriter,
isJson,
isFirstBatch,
exportReport));
}
executorService.shutdown();
if (executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
logger.info("All tasks completed");
} else {
logger.error("Timeout occurred while waiting for tasks to complete");
// TODO: handle this
}
if (isJson) {
bufferedWriter.write("]");
}
bufferedWriter.flush();
} catch (InterruptedException | IOException e) {
logger.error("Error during export: {}", e.getMessage());
}
} catch (ExportOptionsValidationException | IOException | ScalarDBDaoException e) {
logger.error("Error during export: {}", e.getMessage());
}
return exportReport;
}

/**
* * To process result data chunk
*
* @param exportOptions export options
* @param tableMetadata metadata of the table
* @param dataTypeByColumnName map of columns and their data types
* @param dataChunk a list with result data
* @param bufferedWriter writer object
* @param isJson if data format is json or not
* @param isFirstBatch is the data going to be process is the first batch or not
* @param exportReport export report which will be updated once the data chunk is processed
*/
private void processDataChunk(
ExportOptions exportOptions,
TableMetadata tableMetadata,
Map<String, DataType> dataTypeByColumnName,
List<Result> dataChunk,
BufferedWriter bufferedWriter,
boolean isJson,
AtomicBoolean isFirstBatch,
ExportReport exportReport) {
ProducerTask producerTask =
producerTaskFactory.createProducerTask(
exportOptions.getOutputFileFormat(),
exportOptions.getProjectionColumns(),
tableMetadata,
dataTypeByColumnName);
String dataChunkContent = producerTask.process(dataChunk, exportReport);

try {
synchronized (lock) {
if (isJson && !isFirstBatch.getAndSet(false)) {
bufferedWriter.write(",");
}
bufferedWriter.write(dataChunkContent);
}
} catch (IOException e) {
logger.error("Error while writing data chunk: {}", e.getMessage());
}
}

/**
* * To split result into batches
*
* @param iterator iterator which parse results
* @param batchSize size of batch
* @return a list of results split to batches
*/
private List<Result> fetchDataChunk(Iterator<Result> iterator, int batchSize) {
List<Result> batch = new ArrayList<>();
int count = 0;
while (iterator.hasNext() && count < batchSize) {
batch.add(iterator.next());
count++;
}
return batch;
}

/**
* * To validate export options
*
* @param exportOptions export options
* @param tableMetadata metadata of the table
* @throws ExportOptionsValidationException thrown if any of the export option validation fails
*/
private void validateExportOptions(ExportOptions exportOptions, TableMetadata tableMetadata)
throws ExportOptionsValidationException {
ExportOptionsValidator.validate(exportOptions, tableMetadata);
}

/**
* * To update projection columns of export options if include metadata options is enabled
*
* @param exportOptions export options
* @param tableMetadata metadata of the table
*/
private void handleTransactionMetadata(ExportOptions exportOptions, TableMetadata tableMetadata) {
if (exportOptions.isIncludeTransactionMetadata()
&& !exportOptions.getProjectionColumns().isEmpty()) {
List<String> projectionMetadata =
TableMetadataUtil.populateProjectionsWithMetadata(
tableMetadata, exportOptions.getProjectionColumns());
exportOptions.setProjectionColumns(projectionMetadata);
}
}

/**
* * To create and write the header row to the CSV export file
*
* @param exportOptions export options
* @param tableMetadata metadata of the table
* @param dataTypeByColumnName map of columns and their data types
* @param writer writer object
* @throws IOException throws if any exception occur in file operations
*/
private void writeCsvHeaderRow(
ExportOptions exportOptions,
TableMetadata tableMetadata,
Map<String, DataType> dataTypeByColumnName,
Writer writer)
throws IOException {
String header =
createCsvHeaderRow(
exportOptions,
tableMetadata,
dataTypeByColumnName,
TableMetadataUtil.getMetadataColumns());
writer.append(header);
writer.flush();
}

/**
* * To create a scanner object
*
* @param exportOptions export options
* @param dao scalardb dao object
* @param storage distributed storage object
* @return created scanner
* @throws ScalarDBDaoException throws if any issue occurs in creating scanner object
*/
private Scanner createScanner(
ExportOptions exportOptions, ScalarDBDao dao, DistributedStorage storage)
throws ScalarDBDaoException {
boolean isScanAll = exportOptions.getScanPartitionKey() == null;
if (isScanAll) {
return dao.createScanner(
exportOptions.getNamespace(),
exportOptions.getTableName(),
exportOptions.getProjectionColumns(),
exportOptions.getLimit(),
storage);
} else {
return dao.createScanner(
exportOptions.getNamespace(),
exportOptions.getTableName(),
exportOptions.getScanPartitionKey(),
exportOptions.getScanRange(),
exportOptions.getSortOrders(),
exportOptions.getProjectionColumns(),
exportOptions.getLimit(),
storage);
}
}

/**
* * To generate the header row of CSV export file
*
* @param exportOptions export options
* @param tableMetadata metadata of the table
* @param dataTypeByColumnName map of columns and their data types
* @param columnsToIgnore set of columns to ignore
* @return generated CSV header row
*/
private String createCsvHeaderRow(
ExportOptions exportOptions,
TableMetadata tableMetadata,
Map<String, DataType> dataTypeByColumnName,
Set<String> columnsToIgnore) {
StringBuilder headerRow = new StringBuilder();
List<String> projections = exportOptions.getProjectionColumns();
Iterator<String> iterator = tableMetadata.getColumnNames().iterator();
while (iterator.hasNext()) {
String columnName = iterator.next();
if (shouldIgnoreColumn(
exportOptions.isIncludeTransactionMetadata(),
columnName,
columnsToIgnore,
dataTypeByColumnName.keySet(),
projections)) {
continue;
}
headerRow.append(columnName);
if (iterator.hasNext()) {
headerRow.append(exportOptions.getDelimiter());
}
}
CsvUtil.removeTrailingDelimiter(headerRow, exportOptions.getDelimiter());
headerRow.append("\n");
return headerRow.toString();
}

/**
* * To ignore a column or not based on conditions such as if it is a metadata column or if it is
* not include in selected projections
*
* @param isIncludeTransactionMetadata to include transaction metadata or not
* @param columnName column name
* @param columnsToIgnore set of columns to ignore
* @param dataTypeColumnNames data types of columns
* @param projections selected columns for projection
* @return ignore the column or not
*/
private boolean shouldIgnoreColumn(
boolean isIncludeTransactionMetadata,
String columnName,
Set<String> columnsToIgnore,
Set<String> dataTypeColumnNames,
List<String> projections) {
return (!isIncludeTransactionMetadata
&& TableMetadataUtil.isMetadataColumn(columnName, columnsToIgnore, dataTypeColumnNames))
|| (!projections.isEmpty() && !projections.contains(columnName));
}
}
Loading
Loading