Skip to content

Commit

Permalink
update put flow when flattening result index
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Feb 3, 2025
1 parent 3e06458 commit eef1a57
Showing 1 changed file with 70 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -467,15 +467,14 @@ private void handlePutRequest(boolean indexingDryRun, ActionListener<T> listener
private void handlePostRequest(boolean indexingDryRun, ActionListener<T> listener) {
createConfig(indexingDryRun, ActionListener.wrap(createConfigResponse -> {
if (shouldHandleFlattening(indexingDryRun)) {
String configId = RestHandlerUtils.getConfigIdFromIndexResponse(createConfigResponse);
String flattenedResultIndexAlias = config.getFlattenResultIndexAlias();

timeSeriesIndices
.initFlattenedResultIndex(
flattenedResultIndexAlias,
ActionListener
.wrap(
initResponse -> setupIngestPipeline(flattenedResultIndexAlias, configId, listener, createConfigResponse),
initResponse -> setupIngestPipeline(flattenedResultIndexAlias, listener, createConfigResponse),
listener::onFailure
)
);
Expand All @@ -491,12 +490,30 @@ private boolean shouldHandleFlattening(boolean indexingDryRun) {
return !indexingDryRun && config.getCustomResultIndexOrAlias() != null && Boolean.TRUE.equals(flattenResultIndexMapping);
}

protected void setupIngestPipeline(
String flattenedResultIndexAlias,
String configId,
ActionListener<T> listener,
T createConfigResponse
) {
protected void setupIngestPipeline(String flattenedResultIndexAlias, ActionListener<T> listener, String id, boolean indexingDryRun) {
String pipelineId = config.getFlattenResultIndexIngestPipelineName();

try {
BytesReference pipelineSource = createPipelineDefinition(flattenedResultIndexAlias);

PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, pipelineSource, XContentType.JSON);

client.admin().cluster().putPipeline(putPipelineRequest, ActionListener.wrap(putPipelineResponse -> {
logger.info("Ingest pipeline created successfully for pipelineId: {}", pipelineId);
bindIngestPipelineWithFlattenedResultIndex(pipelineId, flattenedResultIndexAlias, listener, id, indexingDryRun);

}, exception -> {
logger.error("Error while creating ingest pipeline for pipelineId: {}", pipelineId, exception);
listener.onFailure(exception);
}));

} catch (IOException e) {
logger.error("Exception while building ingest pipeline definition for pipeline ID: {}", pipelineId, e);
listener.onFailure(e);
}
}

protected void setupIngestPipeline(String flattenedResultIndexAlias, ActionListener<T> listener, T response) {
String pipelineId = config.getFlattenResultIndexIngestPipelineName();

try {
Expand All @@ -506,7 +523,7 @@ protected void setupIngestPipeline(

client.admin().cluster().putPipeline(putPipelineRequest, ActionListener.wrap(putPipelineResponse -> {
logger.info("Ingest pipeline created successfully for pipelineId: {}", pipelineId);
bindIngestPipelineWithFlattenedResultIndex(pipelineId, configId, flattenedResultIndexAlias, listener, createConfigResponse);
bindIngestPipelineWithFlattenedResultIndex(pipelineId, flattenedResultIndexAlias, listener, response);

}, exception -> {
logger.error("Error while creating ingest pipeline for pipelineId: {}", pipelineId, exception);
Expand Down Expand Up @@ -544,11 +561,7 @@ private BytesReference createPipelineDefinition(String indexName) throws IOExcep
return BytesReference.bytes(pipelineBuilder);
}

private UpdateSettingsRequest buildUpdateSettingsRequest(
String flattenedResultIndexAlias,
String defaultPipelineName,
String configId
) {
private UpdateSettingsRequest buildUpdateSettingsRequest(String flattenedResultIndexAlias, String defaultPipelineName) {
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest();
updateSettingsRequest.indices(flattenedResultIndexAlias);

Expand All @@ -562,16 +575,33 @@ private UpdateSettingsRequest buildUpdateSettingsRequest(

protected void bindIngestPipelineWithFlattenedResultIndex(
String pipelineId,
String configId,
String flattenedResultIndexAlias,
ActionListener<T> listener,
T createConfigResponse
String id,
boolean indexingDryRun
) {
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(flattenedResultIndexAlias, pipelineId, configId);
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(flattenedResultIndexAlias, pipelineId);

client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(updateSettingsResponse -> {
logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId);
listener.onResponse(createConfigResponse);
searchConfigInputIndices(id, indexingDryRun, listener);
}, exception -> {
logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId, exception);
listener.onFailure(exception);
}));
}

protected void bindIngestPipelineWithFlattenedResultIndex(
String pipelineId,
String flattenedResultIndexAlias,
ActionListener<T> listener,
T response
) {
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(flattenedResultIndexAlias, pipelineId);

client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(updateSettingsResponse -> {
logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId);
listener.onResponse(response);
}, exception -> {
logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId, exception);
listener.onFailure(exception);
Expand Down Expand Up @@ -619,18 +649,6 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S
);
return;
}
if (!existingConfig.getFlattenResultIndexMapping()
&& config.getFlattenResultIndexMapping()
&& existingConfig.getCustomResultIndexOrAlias() != null) {
// customers can choose to use a flattened result index for newly created detectors and disable it for those detectors.
// however, since enabling the flattened result index creates additional resources and due to bwc concerns,
// we do not allow customers to enable this feature for existing running detectors.
listener
.onFailure(
new OpenSearchStatusException(CommonMessages.CAN_NOT_CHANGE_FLATTEN_RESULT_INDEX, RestStatus.BAD_REQUEST)
);
return;
}
} else {
if (!ParseUtils.listEqualsWithoutConsideringOrder(existingConfig.getCategoryFields(), config.getCategoryFields())
|| !Objects.equals(existingConfig.getCustomResultIndexOrAlias(), config.getCustomResultIndexOrAlias())) {
Expand All @@ -650,6 +668,27 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S
listener::onFailure
);

} else if (!existingConfig.getFlattenResultIndexMapping()
&& config.getFlattenResultIndexMapping()
&& existingConfig.getCustomResultIndexOrAlias() != null) {
confirmBatchRunningListener = ActionListener
.wrap(
r -> timeSeriesIndices
.initFlattenedResultIndex(
config.getFlattenResultIndexAlias(),
ActionListener
.wrap(
initResponse -> setupIngestPipeline(
config.getFlattenResultIndexAlias(),
listener,
id,
indexingDryRun
),
listener::onFailure
)
),
listener::onFailure
);
} else {
confirmBatchRunningListener = ActionListener
.wrap(
Expand All @@ -673,11 +712,7 @@ private void unbindIngestPipelineWithFlattenedResultIndex(
boolean indexingDryRun
) {
// The pipeline name _none specifies that the index does not have an ingest pipeline.
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(
existingConfig.getFlattenResultIndexAlias(),
"_none",
existingConfig.getId()
);
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(existingConfig.getFlattenResultIndexAlias(), "_none");
client
.admin()
.indices()
Expand Down

0 comments on commit eef1a57

Please sign in to comment.