Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request index not exist handling #169

Merged
merged 2 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ public interface FlintClient {
*/
<T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourceName);

/**
*
* Start a new optimistic transaction.
*
* @param indexName index name
* @param dataSourceName TODO: read from elsewhere in future
* @param forceInit forceInit create empty translog if not exist.
* @return transaction handle
*/
<T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourceName,
boolean forceInit);

/**
* Create a Flint index with the metadata given.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.flint.core.metadata.log

import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState
import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO}

/**
* Flint metadata log entry. This is temporary and will merge field in FlintMetadata here and move
Expand Down Expand Up @@ -92,4 +93,80 @@ object FlintMetadataLogEntry {
.getOrElse(IndexState.UNKNOWN)
}
}

val QUERY_EXECUTION_REQUEST_MAPPING: String =
"""{
| "dynamic": false,
| "properties": {
| "version": {
| "type": "keyword"
| },
| "type": {
| "type": "keyword"
| },
| "state": {
| "type": "keyword"
| },
| "statementId": {
| "type": "keyword"
| },
| "applicationId": {
| "type": "keyword"
| },
| "sessionId": {
| "type": "keyword"
| },
| "sessionType": {
| "type": "keyword"
| },
| "error": {
| "type": "text"
| },
| "lang": {
| "type": "keyword"
| },
| "query": {
| "type": "text"
| },
| "dataSourceName": {
| "type": "keyword"
| },
| "submitTime": {
| "type": "date",
| "format": "strict_date_time||epoch_millis"
| },
| "jobId": {
| "type": "keyword"
| },
| "lastUpdateTime": {
| "type": "date",
| "format": "strict_date_time||epoch_millis"
| },
Comment on lines +141 to +144
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously this was Long type? And should we add jobStartTime here too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

| "queryId": {
| "type": "keyword"
| },
| "excludeJobIds": {
| "type": "keyword"
| }
| }
|}""".stripMargin

val QUERY_EXECUTION_REQUEST_SETTINGS: String =
"""{
| "index": {
| "number_of_shards": "1",
| "auto_expand_replicas": "0-2",
| "number_of_replicas": "0"
| }
|}""".stripMargin

def failLogEntry(dataSourceName: String, error: String): FlintMetadataLogEntry =
FlintMetadataLogEntry(
"",
UNASSIGNED_SEQ_NO,
UNASSIGNED_PRIMARY_TERM,
0L,
IndexState.FAILED,
dataSourceName,
error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@
import org.opensearch.flint.core.auth.AWSRequestSigningApacheInterceptor;
import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NoOptimisticTransaction;
import org.opensearch.index.query.AbstractQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.SearchModule;
import org.opensearch.search.builder.SearchSourceBuilder;
import scala.Option;
import scala.Some;

/**
* Flint client implementation for OpenSearch storage.
Expand Down Expand Up @@ -80,34 +81,48 @@ public FlintOpenSearchClient(FlintOptions options) {
}

@Override
public <T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourceName) {
public <T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourceName,
boolean forceInit) {
LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName);
String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX
: META_LOG_NAME_PREFIX + "_" + dataSourceName;

try (RestHighLevelClient client = createClient()) {
if (client.indices().exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
LOG.info("Found metadata log index " + metaLogIndexName);
return new DefaultOptimisticTransaction<>(dataSourceName,
new FlintOpenSearchMetadataLog(this, indexName, metaLogIndexName));
} else {
LOG.info("Metadata log index not found " + metaLogIndexName);
return new NoOptimisticTransaction<>();
if (forceInit) {
createIndex(metaLogIndexName, FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_MAPPING(),
Some.apply(FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_SETTINGS()));
} else {
String errorMsg = "Metadata log index not found " + metaLogIndexName;
LOG.warning(errorMsg);
throw new IllegalStateException(errorMsg);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do you emit metrics QueryExecutionRequestIndexNotFound ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add after metrics sink ready.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

track in here. #117

}
}
return new DefaultOptimisticTransaction<>(dataSourceName,
new FlintOpenSearchMetadataLog(this, indexName, metaLogIndexName));
} catch (IOException e) {
throw new IllegalStateException("Failed to check if index metadata log index exists " + metaLogIndexName, e);
}
}

@Override
public <T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourceName) {
return startTransaction(indexName, dataSourceName, false);
}

@Override
public void createIndex(String indexName, FlintMetadata metadata) {
LOG.info("Creating Flint index " + indexName + " with metadata " + metadata);
createIndex(indexName, metadata.getContent(), metadata.indexSettings());
}

protected void createIndex(String indexName, String mapping, Option<String> settings) {
LOG.info("Creating Flint index " + indexName);
String osIndexName = toLowercase(indexName);
try (RestHighLevelClient client = createClient()) {
CreateIndexRequest request = new CreateIndexRequest(osIndexName);
request.mapping(metadata.getContent(), XContentType.JSON);

Option<String> settings = metadata.indexSettings();
request.mapping(mapping, XContentType.JSON);
if (settings.isDefined()) {
request.settings(settings.get(), XContentType.JSON);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@

package org.opensearch.flint.core.storage;

import static org.opensearch.action.support.WriteRequest.RefreshPolicy;

import java.io.IOException;
import java.util.Base64;
import java.util.Optional;
import java.util.logging.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.get.GetRequest;
Expand All @@ -19,11 +13,20 @@
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;

import java.io.IOException;
import java.util.Base64;
import java.util.Optional;
import java.util.logging.Logger;

import static java.util.logging.Level.SEVERE;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy;

/**
* Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history
* of metadata log.
Expand Down Expand Up @@ -57,6 +60,11 @@ public FlintOpenSearchMetadataLog(FlintClient flintClient, String flintIndexName
public FlintMetadataLogEntry add(FlintMetadataLogEntry logEntry) {
// TODO: use single doc for now. this will be always append in future.
FlintMetadataLogEntry latest;
if (!exists()) {
String errorMsg = "Flint Metadata Log index not found " + metaLogIndexName;
LOG.log(SEVERE, errorMsg);
throw new IllegalStateException(errorMsg);
}
if (logEntry.id().isEmpty()) {
latest = createLogEntry(logEntry);
} else {
Expand Down Expand Up @@ -148,6 +156,15 @@ private FlintMetadataLogEntry writeLogEntry(
}
}

private boolean exists() {
LOG.info("Checking if Flint index exists " + metaLogIndexName);
try (RestHighLevelClient client = flintClient.createClient()) {
return client.indices().exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT);
} catch (IOException e) {
throw new IllegalStateException("Failed to check if Flint index exists " + metaLogIndexName, e);
}
}

@FunctionalInterface
public interface CheckedFunction<T, R> {
R apply(T t) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.FlintClientBuilder;
import org.opensearch.flint.core.FlintOptions;

import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class OpenSearchUpdater {
private static final Logger LOG = Logger.getLogger(OpenSearchUpdater.class.getName());

private final String indexName;

private final FlintClient flintClient;
Expand All @@ -28,6 +31,7 @@ public void upsert(String id, String doc) {
// also, failure to close the client causes the job to be stuck in the running state as the client resource
// is not released.
try (RestHighLevelClient client = flintClient.createClient()) {
assertIndexExist(client, indexName);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaituo any other test cases we need to cover when write to query_execution_request index?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may also need to check request indexing mapping as we did in the result index mapping. This can be done once during REPL lifetime.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Track in #172

UpdateRequest
updateRequest =
new UpdateRequest(indexName, id).doc(doc, XContentType.JSON)
Expand All @@ -44,6 +48,7 @@ public void upsert(String id, String doc) {

public void update(String id, String doc) {
try (RestHighLevelClient client = flintClient.createClient()) {
assertIndexExist(client, indexName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this check is not required for Flint metadata log entry update?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found I didn't set refresh policy in create/update doc in FlintOpenSearchMetadataLog. Could you help check if we should do .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) there as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UpdateRequest
updateRequest =
new UpdateRequest(indexName, id).doc(doc, XContentType.JSON)
Expand All @@ -59,6 +64,7 @@ public void update(String id, String doc) {

public void updateIf(String id, String doc, long seqNo, long primaryTerm) {
try (RestHighLevelClient client = flintClient.createClient()) {
assertIndexExist(client, indexName);
UpdateRequest
updateRequest =
new UpdateRequest(indexName, id).doc(doc, XContentType.JSON)
Expand All @@ -73,4 +79,13 @@ public void updateIf(String id, String doc, long seqNo, long primaryTerm) {
id), e);
}
}

private void assertIndexExist(RestHighLevelClient client, String indexName) throws IOException {
LOG.info("Checking if index exists " + indexName);
if (!client.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) {
String errorMsg = "Index not found " + indexName;
LOG.log(Level.SEVERE, errorMsg);
throw new IllegalStateException(errorMsg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
val metadata = index.metadata()
try {
flintClient
.startTransaction(indexName, dataSourceName)
.startTransaction(indexName, dataSourceName, true)
.initialLog(latest => latest.state == EMPTY || latest.state == DELETED)
.transientLog(latest => latest.copy(state = CREATING))
.finalLog(latest => latest.copy(state = ACTIVE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import org.opensearch.action.get.GetRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.update.UpdateRequest
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.CreateIndexRequest
import org.opensearch.client.indices.{CreateIndexRequest, GetIndexRequest}
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.{QUERY_EXECUTION_REQUEST_MAPPING, QUERY_EXECUTION_REQUEST_SETTINGS}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState
import org.opensearch.flint.core.storage.FlintOpenSearchClient._
import org.opensearch.flint.spark.FlintSparkSuite
Expand All @@ -39,13 +40,15 @@ trait OpenSearchTransactionSuite extends FlintSparkSuite {
super.beforeEach()
openSearchClient
.indices()
.create(new CreateIndexRequest(testMetaLogIndex), RequestOptions.DEFAULT)
.create(
new CreateIndexRequest(testMetaLogIndex)
.mapping(QUERY_EXECUTION_REQUEST_MAPPING, XContentType.JSON)
.settings(QUERY_EXECUTION_REQUEST_SETTINGS, XContentType.JSON),
RequestOptions.DEFAULT)
}

override def afterEach(): Unit = {
openSearchClient
.indices()
.delete(new DeleteIndexRequest(testMetaLogIndex), RequestOptions.DEFAULT)
deleteIndex(testMetaLogIndex)
super.afterEach()
}

Expand All @@ -71,4 +74,21 @@ trait OpenSearchTransactionSuite extends FlintSparkSuite {
.doc(latest.copy(state = newState).toJson, XContentType.JSON),
RequestOptions.DEFAULT)
}

def deleteIndex(indexName: String): Unit = {
if (openSearchClient
.indices()
.exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) {
openSearchClient
.indices()
.delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT)
}
}

def indexMapping(): String = {
val response =
openSearchClient.indices.get(new GetIndexRequest(testMetaLogIndex), RequestOptions.DEFAULT)

response.getMappings.get(testMetaLogIndex).source().toString
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import org.opensearch.client.opensearch.OpenSearchClient
import org.opensearch.client.transport.rest_client.RestClientTransport
import org.opensearch.flint.OpenSearchSuite
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NoOptimisticTransaction
import org.opensearch.flint.core.storage.{FlintOpenSearchClient, OpenSearchScrollReader}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
Expand All @@ -31,9 +30,10 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M

behavior of "Flint OpenSearch client"

it should "start no optimistic transaction if metadata log index doesn't exists" in {
val transaction = flintClient.startTransaction("test", "non-exist-index")
transaction shouldBe a[NoOptimisticTransaction[AnyRef]]
it should "throw IllegalStateException if metadata log index doesn't exists" in {
the[IllegalStateException] thrownBy {
flintClient.startTransaction("test", "non-exist-index")
}
}

it should "create index successfully" in {
Expand Down
Loading
Loading