diff --git a/docs/index.md b/docs/index.md index 2a573d18d..6f63ceb9e 100644 --- a/docs/index.md +++ b/docs/index.md @@ -546,6 +546,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.flint.index.checkpointLocation.rootDir`: default is None. Flint will create a default checkpoint location in format of '//' to isolate checkpoint data. - `spark.flint.index.checkpoint.mandatory`: default is true. - `spark.datasource.flint.socket_timeout_millis`: default value is 60000. +- `spark.datasource.flint.request.completionDelayMillis`: Time to wait in milliseconds after request is complete. Applied after index creation. Default value is 2000 if using aoss service, otherwise 0. - `spark.flint.monitor.initialDelaySeconds`: Initial delay in seconds before starting the monitoring task. Default value is 15. - `spark.flint.monitor.intervalSeconds`: Interval in seconds for scheduling the monitoring task. Default value is 60. - `spark.flint.monitor.maxErrorCount`: Maximum number of consecutive errors allowed before stopping the monitoring task. Default value is 5. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 6ddc6ae9c..f9d181b70 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -88,7 +88,11 @@ public class FlintOptions implements Serializable { public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000; public static final int DEFAULT_INACTIVITY_LIMIT_MILLIS = 3 * 60 * 1000; - + + public static final String REQUEST_COMPLETION_DELAY_MILLIS = "request.completionDelayMillis"; + public static final int DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS = 0; + public static final int DEFAULT_AOSS_REQUEST_COMPLETION_DELAY_MILLIS = 2000; + public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name"; public static final String BATCH_BYTES = "write.batch_bytes"; @@ -178,6 +182,13 @@ public int getSocketTimeoutMillis() { return Integer.parseInt(options.getOrDefault(SOCKET_TIMEOUT_MILLIS, String.valueOf(DEFAULT_SOCKET_TIMEOUT_MILLIS))); } + public int getRequestCompletionDelayMillis() { + int defaultValue = SERVICE_NAME_AOSS.equals(getServiceName()) + ? DEFAULT_AOSS_REQUEST_COMPLETION_DELAY_MILLIS + : DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS; + return Integer.parseInt(options.getOrDefault(REQUEST_COMPLETION_DELAY_MILLIS, String.valueOf(defaultValue))); + } + public String getDataSourceName() { return options.getOrDefault(DATA_SOURCE_NAME, ""); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 2bc097bba..5861ccf22 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -44,6 +44,7 @@ public void createIndex(String indexName, FlintMetadata metadata) { LOG.info("Creating Flint index " + indexName + " with metadata " + metadata); try { createIndex(indexName, FlintOpenSearchIndexMetadataService.serialize(metadata, false), metadata.indexSettings()); + waitRequestComplete(); // Delay to ensure create is complete before making other requests for the index emitIndexCreationSuccessMetric(metadata.kind()); } catch (IllegalStateException ex) { emitIndexCreationFailureMetric(metadata.kind()); @@ -131,6 +132,14 @@ private String sanitizeIndexName(String indexName) { return OpenSearchClientUtils.sanitizeIndexName(indexName); } + private void waitRequestComplete() { + try { + Thread.sleep(options.getRequestCompletionDelayMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + private void emitIndexCreationSuccessMetric(String indexKind) { emitIndexCreationMetric(indexKind, "success"); } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index bdcc120c0..364a8a1de 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -201,6 +201,11 @@ object FlintSparkConf { .datasourceOption() .doc("socket duration in milliseconds") .createWithDefault(String.valueOf(FlintOptions.DEFAULT_SOCKET_TIMEOUT_MILLIS)) + val REQUEST_COMPLETION_DELAY_MILLIS = + FlintConfig(s"spark.datasource.flint.${FlintOptions.REQUEST_COMPLETION_DELAY_MILLIS}") + .datasourceOption() + .doc("delay in milliseconds after index creation is completed") + .createOptional() val DATA_SOURCE_NAME = FlintConfig(s"spark.flint.datasource.name") .doc("data source name") @@ -356,7 +361,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable REQUEST_INDEX, METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER, EXCLUDE_JOB_IDS, - SCROLL_SIZE) + SCROLL_SIZE, + REQUEST_COMPLETION_DELAY_MILLIS) .map(conf => (conf.optionKey, conf.readFrom(reader))) .flatMap { case (_, None) => None diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala index 0cde6ab0f..594322bae 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala @@ -114,6 +114,21 @@ class FlintSparkConfSuite extends FlintSuite { } } + test("test request completionDelayMillis default value") { + FlintSparkConf().flintOptions().getRequestCompletionDelayMillis shouldBe 0 + } + + test("test request completionDelayMillis default value for aoss") { + val options = FlintSparkConf(Map("auth.servicename" -> "aoss").asJava).flintOptions() + options.getRequestCompletionDelayMillis shouldBe 2000 + } + + test("test specified request completionDelayMillis") { + val options = + FlintSparkConf(Map("request.completionDelayMillis" -> "1000").asJava).flintOptions() + options.getRequestCompletionDelayMillis shouldBe 1000 + } + test("externalSchedulerIntervalThreshold should return default value when empty") { val options = FlintSparkConf(Map("spark.flint.job.externalScheduler.interval" -> "").asJava) assert(options diff --git a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index a2c2d26f6..fe3cefef8 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -65,6 +65,27 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } + it should "create index with request completion delay config" in { + val metadata = FlintOpenSearchIndexMetadataService.deserialize("{}") + // Create a dummy index to avoid timing the initial overhead + flintClient.createIndex("dummy", metadata) + + val indexName = "flint_test_without_request_completion_delay" + val elapsedTimeWithoutDelay = timer { + flintClient.createIndex(indexName, metadata) + } + + val delayIndexName = "flint_test_with_request_completion_delay" + val delayOptions = + openSearchOptions + (FlintOptions.REQUEST_COMPLETION_DELAY_MILLIS -> "2000") + val delayFlintOptions = new FlintOptions(delayOptions.asJava) + val delayFlintClient = new FlintOpenSearchClient(delayFlintOptions) + val elapsedTimeWithDelay = timer { + delayFlintClient.createIndex(delayIndexName, metadata) + } + elapsedTimeWithDelay - elapsedTimeWithoutDelay should be >= 1800L // allowing 200ms of wiggle room + } + it should "get all index names with the given index name pattern" in { val metadata = FlintOpenSearchIndexMetadataService.deserialize( """{"properties": {"test": { "type": "integer" } } }""") @@ -220,4 +241,11 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M def createTable(indexName: String, options: FlintOptions): Table = { OpenSearchCluster.apply(indexName, options).asScala.head } + + def timer(block: => Unit): Long = { + val start = System.currentTimeMillis() + block + val end = System.currentTimeMillis() + end - start + } }