From 6055e1a5be9933e1faaa8acf8d904f832fdee1cf Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Mon, 20 Nov 2023 16:30:25 -0800 Subject: [PATCH 1/2] handle index not found and not avaiable exception Signed-off-by: Peng Huo --- .../opensearch/flint/core/FlintClient.java | 12 +++ .../metadata/log/FlintMetadataLogEntry.scala | 77 +++++++++++++ .../core/storage/FlintOpenSearchClient.java | 35 ++++-- .../storage/FlintOpenSearchMetadataLog.java | 29 +++-- .../flint/core/storage/OpenSearchUpdater.java | 19 +++- .../opensearch/flint/spark/FlintSpark.scala | 2 +- .../flint/OpenSearchTransactionSuite.scala | 30 +++++- .../core/FlintOpenSearchClientSuite.scala | 8 +- .../flint/core/FlintTransactionITSuite.scala | 67 +++++++++++- .../flint/core/OpenSearchUpdaterSuite.scala | 102 ++++++++++++++++++ .../spark/FlintSparkIndexJobITSuite.scala | 14 ++- .../spark/FlintSparkTransactionITSuite.scala | 12 ++- .../org/apache/spark/sql/FlintREPL.scala | 8 +- 13 files changed, 377 insertions(+), 38 deletions(-) create mode 100644 integ-test/src/test/scala/org/opensearch/flint/core/OpenSearchUpdaterSuite.scala diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index c1f5d78c1..6cdf5187d 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -28,6 +28,18 @@ public interface FlintClient { */ OptimisticTransaction startTransaction(String indexName, String dataSourceName); + /** + * + * Start a new optimistic transaction. + * + * @param indexName index name + * @param dataSourceName TODO: read from elsewhere in future + * @param forceInit forceInit create empty translog if not exist. + * @return transaction handle + */ + OptimisticTransaction startTransaction(String indexName, String dataSourceName, + boolean forceInit); + /** * Create a Flint index with the metadata given. * diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala index fea9974c6..eb93c7fde 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala @@ -7,6 +7,7 @@ package org.opensearch.flint.core.metadata.log import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState +import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} /** * Flint metadata log entry. This is temporary and will merge field in FlintMetadata here and move @@ -92,4 +93,80 @@ object FlintMetadataLogEntry { .getOrElse(IndexState.UNKNOWN) } } + + val QUERY_EXECUTION_REQUEST_MAPPING: String = + """{ + | "dynamic": false, + | "properties": { + | "version": { + | "type": "keyword" + | }, + | "type": { + | "type": "keyword" + | }, + | "state": { + | "type": "keyword" + | }, + | "statementId": { + | "type": "keyword" + | }, + | "applicationId": { + | "type": "keyword" + | }, + | "sessionId": { + | "type": "keyword" + | }, + | "sessionType": { + | "type": "keyword" + | }, + | "error": { + | "type": "text" + | }, + | "lang": { + | "type": "keyword" + | }, + | "query": { + | "type": "text" + | }, + | "dataSourceName": { + | "type": "keyword" + | }, + | "submitTime": { + | "type": "date", + | "format": "strict_date_time||epoch_millis" + | }, + | "jobId": { + | "type": "keyword" + | }, + | "lastUpdateTime": { + | "type": "date", + | "format": "strict_date_time||epoch_millis" + | }, + | "queryId": { + | "type": "keyword" + | }, + | "excludeJobIds": { + | "type": "keyword" + | } + | } + |}""".stripMargin + + val QUERY_EXECUTION_REQUEST_SETTINGS: String = + """{ + | "index": { + | "number_of_shards": "1", + | "auto_expand_replicas": "0-2", + | "number_of_replicas": "0" + | } + |}""".stripMargin + + def failLogEntry(dataSourceName: String, error: String): FlintMetadataLogEntry = + FlintMetadataLogEntry( + "", + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + 0L, + IndexState.FAILED, + dataSourceName, + error) } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 8652f8092..92a749d86 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -44,14 +44,15 @@ import org.opensearch.flint.core.auth.AWSRequestSigningApacheInterceptor; import org.opensearch.flint.core.metadata.FlintMetadata; import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction; +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; import org.opensearch.flint.core.metadata.log.OptimisticTransaction; -import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NoOptimisticTransaction; import org.opensearch.index.query.AbstractQueryBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.QueryBuilder; import org.opensearch.search.SearchModule; import org.opensearch.search.builder.SearchSourceBuilder; import scala.Option; +import scala.Some; /** * Flint client implementation for OpenSearch storage. @@ -80,34 +81,48 @@ public FlintOpenSearchClient(FlintOptions options) { } @Override - public OptimisticTransaction startTransaction(String indexName, String dataSourceName) { + public OptimisticTransaction startTransaction(String indexName, String dataSourceName, + boolean forceInit) { LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName); String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX : META_LOG_NAME_PREFIX + "_" + dataSourceName; - try (RestHighLevelClient client = createClient()) { if (client.indices().exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) { LOG.info("Found metadata log index " + metaLogIndexName); - return new DefaultOptimisticTransaction<>(dataSourceName, - new FlintOpenSearchMetadataLog(this, indexName, metaLogIndexName)); } else { - LOG.info("Metadata log index not found " + metaLogIndexName); - return new NoOptimisticTransaction<>(); + if (forceInit) { + createIndex(metaLogIndexName, FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_MAPPING(), + Some.apply(FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_SETTINGS())); + } else { + String errorMsg = "Metadata log index not found " + metaLogIndexName; + LOG.warning(errorMsg); + throw new IllegalStateException(errorMsg); + } } + return new DefaultOptimisticTransaction<>(dataSourceName, + new FlintOpenSearchMetadataLog(this, indexName, metaLogIndexName)); } catch (IOException e) { throw new IllegalStateException("Failed to check if index metadata log index exists " + metaLogIndexName, e); } } + @Override + public OptimisticTransaction startTransaction(String indexName, String dataSourceName) { + return startTransaction(indexName, dataSourceName, false); + } + @Override public void createIndex(String indexName, FlintMetadata metadata) { LOG.info("Creating Flint index " + indexName + " with metadata " + metadata); + createIndex(indexName, metadata.getContent(), metadata.indexSettings()); + } + + protected void createIndex(String indexName, String mapping, Option settings) { + LOG.info("Creating Flint index " + indexName); String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { CreateIndexRequest request = new CreateIndexRequest(osIndexName); - request.mapping(metadata.getContent(), XContentType.JSON); - - Option settings = metadata.indexSettings(); + request.mapping(mapping, XContentType.JSON); if (settings.isDefined()) { request.settings(settings.get(), XContentType.JSON); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index 07029d608..2cb054f48 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -5,12 +5,6 @@ package org.opensearch.flint.core.storage; -import static org.opensearch.action.support.WriteRequest.RefreshPolicy; - -import java.io.IOException; -import java.util.Base64; -import java.util.Optional; -import java.util.logging.Logger; import org.opensearch.OpenSearchException; import org.opensearch.action.DocWriteResponse; import org.opensearch.action.get.GetRequest; @@ -19,11 +13,20 @@ import org.opensearch.action.update.UpdateRequest; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.core.FlintClient; import org.opensearch.flint.core.metadata.log.FlintMetadataLog; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; +import java.io.IOException; +import java.util.Base64; +import java.util.Optional; +import java.util.logging.Logger; + +import static java.util.logging.Level.SEVERE; +import static org.opensearch.action.support.WriteRequest.RefreshPolicy; + /** * Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history * of metadata log. @@ -57,6 +60,11 @@ public FlintOpenSearchMetadataLog(FlintClient flintClient, String flintIndexName public FlintMetadataLogEntry add(FlintMetadataLogEntry logEntry) { // TODO: use single doc for now. this will be always append in future. FlintMetadataLogEntry latest; + if (!exists()) { + String errorMsg = "Flint Metadata Log index not found " + metaLogIndexName; + LOG.log(SEVERE, errorMsg); + throw new IllegalStateException(errorMsg); + } if (logEntry.id().isEmpty()) { latest = createLogEntry(logEntry); } else { @@ -148,6 +156,15 @@ private FlintMetadataLogEntry writeLogEntry( } } + private boolean exists() { + LOG.info("Checking if Flint index exists " + metaLogIndexName); + try (RestHighLevelClient client = flintClient.createClient()) { + return client.indices().exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT); + } catch (IOException e) { + throw new IllegalStateException("Failed to check if Flint index exists " + metaLogIndexName, e); + } + } + @FunctionalInterface public interface CheckedFunction { R apply(T t) throws IOException; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java index 58963ab74..4a6424512 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java @@ -4,14 +4,17 @@ import org.opensearch.action.update.UpdateRequest; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.core.FlintClient; -import org.opensearch.flint.core.FlintClientBuilder; -import org.opensearch.flint.core.FlintOptions; import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; public class OpenSearchUpdater { + private static final Logger LOG = Logger.getLogger(OpenSearchUpdater.class.getName()); + private final String indexName; private final FlintClient flintClient; @@ -28,6 +31,7 @@ public void upsert(String id, String doc) { // also, failure to close the client causes the job to be stuck in the running state as the client resource // is not released. try (RestHighLevelClient client = flintClient.createClient()) { + assertIndexExist(client, indexName); UpdateRequest updateRequest = new UpdateRequest(indexName, id).doc(doc, XContentType.JSON) @@ -44,6 +48,7 @@ public void upsert(String id, String doc) { public void update(String id, String doc) { try (RestHighLevelClient client = flintClient.createClient()) { + assertIndexExist(client, indexName); UpdateRequest updateRequest = new UpdateRequest(indexName, id).doc(doc, XContentType.JSON) @@ -59,6 +64,7 @@ public void update(String id, String doc) { public void updateIf(String id, String doc, long seqNo, long primaryTerm) { try (RestHighLevelClient client = flintClient.createClient()) { + assertIndexExist(client, indexName); UpdateRequest updateRequest = new UpdateRequest(indexName, id).doc(doc, XContentType.JSON) @@ -73,4 +79,13 @@ public void updateIf(String id, String doc, long seqNo, long primaryTerm) { id), e); } } + + private void assertIndexExist(RestHighLevelClient client, String indexName) throws IOException { + LOG.info("Checking if index exists " + indexName); + if (!client.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) { + String errorMsg = "Index not found " + indexName; + LOG.log(Level.SEVERE, errorMsg); + throw new IllegalStateException(errorMsg); + } + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index e9331113a..47ade0f87 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -104,7 +104,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { val metadata = index.metadata() try { flintClient - .startTransaction(indexName, dataSourceName) + .startTransaction(indexName, dataSourceName, true) .initialLog(latest => latest.state == EMPTY || latest.state == DELETED) .transientLog(latest => latest.copy(state = CREATING)) .finalLog(latest => latest.copy(state = ACTIVE)) diff --git a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala index 1e7077799..ba9acffd1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala @@ -14,9 +14,10 @@ import org.opensearch.action.get.GetRequest import org.opensearch.action.index.IndexRequest import org.opensearch.action.update.UpdateRequest import org.opensearch.client.RequestOptions -import org.opensearch.client.indices.CreateIndexRequest +import org.opensearch.client.indices.{CreateIndexRequest, GetIndexRequest} import org.opensearch.common.xcontent.XContentType import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.{QUERY_EXECUTION_REQUEST_MAPPING, QUERY_EXECUTION_REQUEST_SETTINGS} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState import org.opensearch.flint.core.storage.FlintOpenSearchClient._ import org.opensearch.flint.spark.FlintSparkSuite @@ -39,13 +40,15 @@ trait OpenSearchTransactionSuite extends FlintSparkSuite { super.beforeEach() openSearchClient .indices() - .create(new CreateIndexRequest(testMetaLogIndex), RequestOptions.DEFAULT) + .create( + new CreateIndexRequest(testMetaLogIndex) + .mapping(QUERY_EXECUTION_REQUEST_MAPPING, XContentType.JSON) + .settings(QUERY_EXECUTION_REQUEST_SETTINGS, XContentType.JSON), + RequestOptions.DEFAULT) } override def afterEach(): Unit = { - openSearchClient - .indices() - .delete(new DeleteIndexRequest(testMetaLogIndex), RequestOptions.DEFAULT) + deleteIndex(testMetaLogIndex) super.afterEach() } @@ -71,4 +74,21 @@ trait OpenSearchTransactionSuite extends FlintSparkSuite { .doc(latest.copy(state = newState).toJson, XContentType.JSON), RequestOptions.DEFAULT) } + + def deleteIndex(indexName: String): Unit = { + if (openSearchClient + .indices() + .exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) { + openSearchClient + .indices() + .delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT) + } + } + + def indexMapping(): String = { + val response = + openSearchClient.indices.get(new GetIndexRequest(testMetaLogIndex), RequestOptions.DEFAULT) + + response.getMappings.get(testMetaLogIndex).source().toString + } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 9a762d9d6..7da67051d 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -16,7 +16,6 @@ import org.opensearch.client.opensearch.OpenSearchClient import org.opensearch.client.transport.rest_client.RestClientTransport import org.opensearch.flint.OpenSearchSuite import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NoOptimisticTransaction import org.opensearch.flint.core.storage.{FlintOpenSearchClient, OpenSearchScrollReader} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -31,9 +30,10 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M behavior of "Flint OpenSearch client" - it should "start no optimistic transaction if metadata log index doesn't exists" in { - val transaction = flintClient.startTransaction("test", "non-exist-index") - transaction shouldBe a[NoOptimisticTransaction[AnyRef]] + it should "throw IllegalStateException if metadata log index doesn't exists" in { + the[IllegalStateException] thrownBy { + flintClient.startTransaction("test", "non-exist-index") + } } it should "create index successfully" in { diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala index a8b5a1fa2..fa072898b 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala @@ -9,6 +9,8 @@ import java.util.Base64 import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.json4s.{Formats, NoTypeHints} +import org.json4s.native.{JsonMethods, Serialization} import org.opensearch.flint.OpenSearchTransactionSuite import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ @@ -214,7 +216,8 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { latestLogEntry(testLatestId) should contain("state" -> "active") } - test("should not necessarily rollback if transaction operation failed but no transient action") { + test( + "should not necessarily rollback if transaction operation failed but no transient action") { // Use create index scenario in this test case the[IllegalStateException] thrownBy { flintClient @@ -227,4 +230,66 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { // Should rollback to initial empty log latestLogEntry(testLatestId) should contain("state" -> "empty") } + + test("forceInit translog, even index is deleted before startTransaction") { + deleteIndex(testMetaLogIndex) + flintClient + .startTransaction(testFlintIndex, testDataSourceName, true) + .initialLog(latest => { + latest.id shouldBe testLatestId + latest.state shouldBe EMPTY + latest.createTime shouldBe 0L + latest.dataSource shouldBe testDataSourceName + latest.error shouldBe "" + true + }) + .finalLog(latest => latest) + .commit(_ => {}) + + implicit val formats: Formats = Serialization.formats(NoTypeHints) + (JsonMethods.parse(indexMapping()) \ "properties" \ "sessionId" \ "type") + .extract[String] should equal("keyword") + } + + test("should fail if index is deleted before initial operation") { + the[IllegalStateException] thrownBy { + flintClient + .startTransaction(testFlintIndex, testDataSourceName) + .initialLog(latest => { + deleteIndex(testMetaLogIndex) + true + }) + .transientLog(latest => latest.copy(state = CREATING)) + .finalLog(latest => latest.copy(state = ACTIVE)) + .commit(_ => {}) + } + } + + test("should fail if index is deleted before transient operation") { + the[IllegalStateException] thrownBy { + flintClient + .startTransaction(testFlintIndex, testDataSourceName) + .initialLog(latest => true) + .transientLog(latest => { + deleteIndex(testMetaLogIndex) + latest.copy(state = CREATING) + }) + .finalLog(latest => latest.copy(state = ACTIVE)) + .commit(_ => {}) + } + } + + test("should fail if index is deleted before final operation") { + the[IllegalStateException] thrownBy { + flintClient + .startTransaction(testFlintIndex, testDataSourceName) + .initialLog(latest => true) + .transientLog(latest => { latest.copy(state = CREATING) }) + .finalLog(latest => { + deleteIndex(testMetaLogIndex) + latest.copy(state = ACTIVE) + }) + .commit(_ => {}) + } + } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/OpenSearchUpdaterSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/OpenSearchUpdaterSuite.scala new file mode 100644 index 000000000..3b317a0fe --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/core/OpenSearchUpdaterSuite.scala @@ -0,0 +1,102 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.opensearch.action.get.{GetRequest, GetResponse} +import org.opensearch.client.RequestOptions +import org.opensearch.flint.OpenSearchTransactionSuite +import org.opensearch.flint.app.FlintInstance +import org.opensearch.flint.core.storage.{FlintOpenSearchClient, OpenSearchUpdater} +import org.scalatest.matchers.should.Matchers + +class OpenSearchUpdaterSuite extends OpenSearchTransactionSuite with Matchers { + val sessionId = "sessionId" + val timestamp = 1700090926955L + val flintJob = + new FlintInstance( + "applicationId", + "jobId", + sessionId, + "running", + timestamp, + timestamp, + Seq("")) + var flintClient: FlintClient = _ + var updater: OpenSearchUpdater = _ + + override def beforeAll(): Unit = { + super.beforeAll() + flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)); + updater = new OpenSearchUpdater( + testMetaLogIndex, + new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava))) + } + + test("upsert flintJob should success") { + updater.upsert(sessionId, FlintInstance.serialize(flintJob, timestamp)) + getFlintInstance(sessionId)._2.lastUpdateTime shouldBe timestamp + } + + test("index is deleted when upsert flintJob should throw IllegalStateException") { + deleteIndex(testMetaLogIndex) + + the[IllegalStateException] thrownBy { + updater.upsert(sessionId, FlintInstance.serialize(flintJob, timestamp)) + } + } + + test("update flintJob should success") { + updater.upsert(sessionId, FlintInstance.serialize(flintJob, timestamp)) + + val newTimestamp = 1700090926956L + updater.update(sessionId, FlintInstance.serialize(flintJob, newTimestamp)) + getFlintInstance(sessionId)._2.lastUpdateTime shouldBe newTimestamp + } + + test("index is deleted when update flintJob should throw IllegalStateException") { + deleteIndex(testMetaLogIndex) + + the[IllegalStateException] thrownBy { + updater.update(sessionId, FlintInstance.serialize(flintJob, timestamp)) + } + } + + test("updateIf flintJob should success") { + updater.upsert(sessionId, FlintInstance.serialize(flintJob, timestamp)) + val (resp, latest) = getFlintInstance(sessionId) + + val newTimestamp = 1700090926956L + updater.updateIf( + sessionId, + FlintInstance.serialize(latest, newTimestamp), + resp.getSeqNo, + resp.getPrimaryTerm) + getFlintInstance(sessionId)._2.lastUpdateTime shouldBe newTimestamp + } + + test("index is deleted when updateIf flintJob should throw IllegalStateException") { + updater.upsert(sessionId, FlintInstance.serialize(flintJob, timestamp)) + val (resp, latest) = getFlintInstance(sessionId) + + deleteIndex(testMetaLogIndex) + + the[IllegalStateException] thrownBy { + updater.updateIf( + sessionId, + FlintInstance.serialize(latest, timestamp), + resp.getSeqNo, + resp.getPrimaryTerm) + } + } + + def getFlintInstance(docId: String): (GetResponse, FlintInstance) = { + val response = + openSearchClient.get(new GetRequest(testMetaLogIndex, docId), RequestOptions.DEFAULT) + (response, FlintInstance.deserializeFromMap(response.getSourceAsMap)) + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala index 365aab83d..8df2bc472 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala @@ -29,8 +29,18 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers } override def afterEach(): Unit = { - super.afterEach() // must clean up metadata log first and then delete - flint.deleteIndex(testIndex) + + /** + * Todo, if state is not valid, will throw IllegalStateException. Should check flint + * .isRefresh before cleanup resource. Current solution, (1) try to delete flint index, (2) if + * failed, delete index itself. + */ + try { + flint.deleteIndex(testIndex) + } catch { + case _: IllegalStateException => deleteIndex(testIndex) + } + super.afterEach() } test("recover should exit if index doesn't exist") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index 294449a48..56227533a 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -33,8 +33,18 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match } override def afterEach(): Unit = { + + /** + * Todo, if state is not valid, will throw IllegalStateException. Should check flint + * .isRefresh before cleanup resource. Current solution, (1) try to delete flint index, (2) if + * failed, delete index itself. + */ + try { + flint.deleteIndex(testFlintIndex) + } catch { + case _: IllegalStateException => deleteIndex(testFlintIndex) + } super.afterEach() - flint.deleteIndex(testFlintIndex) } test("create index") { diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index 28ce90d62..0f6c21786 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -6,14 +6,10 @@ package org.apache.spark.sql import java.net.ConnectException -import java.time.Instant -import java.util.Map import java.util.concurrent.ScheduledExecutorService -import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future, TimeoutException} -import scala.concurrent.duration._ -import scala.concurrent.duration.{Duration, MINUTES} +import scala.concurrent.duration.{Duration, MINUTES, _} import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal @@ -46,7 +42,7 @@ object FlintREPL extends Logging with FlintJobExecutor { private val HEARTBEAT_INTERVAL_MILLIS = 60000L private val DEFAULT_INACTIVITY_LIMIT_MILLIS = 10 * 60 * 1000 private val MAPPING_CHECK_TIMEOUT = Duration(1, MINUTES) - private val DEFAULT_QUERY_EXECUTION_TIMEOUT = Duration(10, MINUTES) + private val DEFAULT_QUERY_EXECUTION_TIMEOUT = Duration(30, MINUTES) private val DEFAULT_QUERY_WAIT_TIMEOUT_MILLIS = 10 * 60 * 1000 val INITIAL_DELAY_MILLIS = 3000L From a0a251d631ddcbdc24fb7e573e47bd6afcd58f3a Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 21 Nov 2023 13:09:22 -0800 Subject: [PATCH 2/2] add refresh=wait_until when create metalog Signed-off-by: Peng Huo --- .../flint/core/storage/FlintOpenSearchMetadataLog.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index 2cb054f48..f51e8a628 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -116,6 +116,7 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { new IndexRequest() .index(metaLogIndexName) .id(logEntryWithId.id()) + .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL) .source(logEntryWithId.toJson(), XContentType.JSON), RequestOptions.DEFAULT)); }