Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Connector API] Support hard deletes with new URL param in delete endpoint #120200

Merged
merged 9 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/120200.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120200
summary: "[Connector API] Support hard deletes with new URL param in delete endpoint"
area: Extract&Transform
type: feature
issues: []
5 changes: 4 additions & 1 deletion docs/reference/connector/apis/delete-connector-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ beta::[]
For the most up-to-date API details, refer to {api-es}/group/endpoint-connector[Connector APIs].
--

Soft-deletes a connector and removes associated sync jobs.
Deletes a connector and optionally removes associated sync jobs.

Note: this action doesn't delete any API key, ingest pipeline or data index associated with the connector. These need to be removed manually.

Expand All @@ -37,6 +37,9 @@ To get started with Connector APIs, check out <<es-connectors-tutorial-api, our
`<connector_id>`::
(Required, string)

`<hard>`::
(Optional, boolean) If `true`, the connector doc is deleted. If `false`, connector doc is marked as deleted (soft deletion). Defaults to `false`.

`delete_sync_jobs`::
(Optional, boolean) A flag indicating if associated sync jobs should be also removed. Defaults to `false`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
]
},
"params": {
"hard": {
"type": "boolean",
"default": false,
"description": "If true, the connector doc is deleted. If false, connector doc is marked as deleted (soft-deleted)."
},
"delete_sync_jobs": {
"type": "boolean",
"default": false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,94 @@ setup:

- match: { count: 3 }


---
"List Connectors - Single hard deleted connector":
- requires:
cluster_features: ["connector_soft_deletes"]
reason: Soft deletes were introduced in 9.0 release

- do:
connector.delete:
connector_id: connector-a
hard: true

- do:
connector.list: {}

- match: { count: 2 }

- do:
connector.list:
include_deleted: true

- match: { count: 2 }


---
"List Connectors - All hard deleted connectors":
- requires:
cluster_features: ["connector_soft_deletes"]
reason: Soft deletes were introduced in 9.0 release

- do:
connector.delete:
connector_id: connector-a
hard: true

- do:
connector.delete:
connector_id: connector-b
hard: true

- do:
connector.delete:
connector_id: connector-b
hard: true

- do:
connector.list: {}

- match: { count: 0 }

- do:
connector.list:
include_deleted: true

- match: { count: 0 }

---
"List Connectors - 2 hard deleted connectors, 1 soft deleted":
- requires:
cluster_features: ["connector_soft_deletes"]
reason: Soft deletes were introduced in 9.0 release

- do:
connector.delete:
connector_id: connector-a
hard: false

- do:
connector.delete:
connector_id: connector-b
hard: true

- do:
connector.delete:
connector_id: connector-b
hard: true

- do:
connector.list: {}

- match: { count: 0 }

- do:
connector.list:
include_deleted: true

- match: { count: 1 }

---
"List Connectors - Soft deleted connectors":
- requires:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,31 @@ setup:
connector_id: test-connector-to-delete


---
"Delete Connector - Hard Delete":
- do:
connector.put:
connector_id: test-connector-hard-delete
body:
index_name: search-2-test
name: my-hard-delete-connector
language: en
is_native: false
service_type: super-connector

- do:
connector.delete:
connector_id: test-connector-hard-delete
hard: true

- match: { acknowledged: true }

- do:
catch: "missing"
connector.get:
connector_id: test-connector-hard-delete
include_deleted: true

---
"Delete Connector - deletes associated sync jobs":

Expand Down Expand Up @@ -107,7 +132,6 @@ setup:
connector.delete:
connector_id: test-nonexistent-connector


---
"Delete Connector - Supports soft deletes":
- requires:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DelegatingActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -232,40 +234,71 @@ public void getConnector(String connectorId, boolean includeDeleted, ActionListe
}

/**
* Soft deletes the {@link Connector} and optionally removes the related instances of {@link ConnectorSyncJob} in the underlying index.
* Deletes the {@link Connector} and optionally removes the related instances of {@link ConnectorSyncJob} in the underlying index.
*
* @param connectorId The id of the {@link Connector}.
* @param hardDelete If set to true, the {@link Connector} is permanently deleted; otherwise, it is soft-deleted.
* @param shouldDeleteSyncJobs The flag indicating if {@link ConnectorSyncJob} should also be deleted.
* @param listener The action listener to invoke on response/failure.
*/
public void deleteConnector(String connectorId, boolean shouldDeleteSyncJobs, ActionListener<UpdateResponse> listener) {
public void deleteConnector(
String connectorId,
boolean hardDelete,
boolean shouldDeleteSyncJobs,
ActionListener<DocWriteResponse> listener
) {

try {
// ensure that if connector is soft-deleted, deleting it again results in 404
getConnector(connectorId, false, listener.delegateFailure((l, connector) -> {
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).setRefreshPolicy(
WriteRequest.RefreshPolicy.IMMEDIATE
)
.doc(
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.source(Map.of(Connector.IS_DELETED_FIELD.getPreferredName(), true))
);
clientWithOrigin.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, l, (ll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
if (shouldDeleteSyncJobs) {
new ConnectorSyncJobIndexService(clientWithOrigin).deleteAllSyncJobsByConnectorId(
connectorId,
ll.map(r -> updateResponse)
if (hardDelete) {
final DeleteRequest deleteRequest = new DeleteRequest(CONNECTOR_INDEX_NAME).id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

clientWithOrigin.delete(
deleteRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, deleteResponse) -> {
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
if (shouldDeleteSyncJobs) {
new ConnectorSyncJobIndexService(clientWithOrigin).deleteAllSyncJobsByConnectorId(
connectorId,
l.map(r -> deleteResponse)
);
} else {
l.onResponse(deleteResponse);
}
})
);
} else {
getConnector(connectorId, false, listener.delegateFailure((l, connector) -> {
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).setRefreshPolicy(
WriteRequest.RefreshPolicy.IMMEDIATE
)
.doc(
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.source(Map.of(Connector.IS_DELETED_FIELD.getPreferredName(), true))
);
} else {
ll.onResponse(updateResponse);
}
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, l, (ll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
if (shouldDeleteSyncJobs) {
new ConnectorSyncJobIndexService(clientWithOrigin).deleteAllSyncJobsByConnectorId(
connectorId,
ll.map(r -> updateResponse)
);
} else {
ll.onResponse(updateResponse);
}
})
);
}));
}));
}
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
Expand All @@ -35,19 +35,16 @@ private DeleteConnectorAction() {/* no instances */}
public static class Request extends ConnectorActionRequest implements ToXContentObject {

private final String connectorId;
private final boolean hardDelete;
private final boolean deleteSyncJobs;

private static final ParseField CONNECTOR_ID_FIELD = new ParseField("connector_id");
private static final ParseField HARD_DELETE_FIELD = new ParseField("hard");
private static final ParseField DELETE_SYNC_JOB_FIELD = new ParseField("delete_sync_jobs");

public Request(StreamInput in) throws IOException {
super(in);
this.connectorId = in.readString();
this.deleteSyncJobs = in.readBoolean();
}

public Request(String connectorId, boolean deleteSyncJobs) {
public Request(String connectorId, boolean hardDelete, boolean deleteSyncJobs) {
this.connectorId = connectorId;
this.hardDelete = hardDelete;
this.deleteSyncJobs = deleteSyncJobs;
}

Expand All @@ -66,34 +63,39 @@ public String getConnectorId() {
return connectorId;
}

public boolean isHardDelete() {
return hardDelete;
}

public boolean shouldDeleteSyncJobs() {
return deleteSyncJobs;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(connectorId);
out.writeBoolean(deleteSyncJobs);
TransportAction.localOnly();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return deleteSyncJobs == request.deleteSyncJobs && Objects.equals(connectorId, request.connectorId);
return hardDelete == request.hardDelete
&& deleteSyncJobs == request.deleteSyncJobs
&& Objects.equals(connectorId, request.connectorId);
}

@Override
public int hashCode() {
return Objects.hash(connectorId, deleteSyncJobs);
return Objects.hash(connectorId, hardDelete, deleteSyncJobs);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(CONNECTOR_ID_FIELD.getPreferredName(), connectorId);
builder.field(HARD_DELETE_FIELD.getPreferredName(), hardDelete);
builder.field(DELETE_SYNC_JOB_FIELD.getPreferredName(), deleteSyncJobs);
builder.endObject();
return builder;
Expand All @@ -102,10 +104,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
private static final ConstructingObjectParser<DeleteConnectorAction.Request, Void> PARSER = new ConstructingObjectParser<>(
"delete_connector_request",
false,
(p) -> new Request((String) p[0], (boolean) p[1])
(p) -> new Request((String) p[0], (boolean) p[1], (boolean) p[2])
);
static {
PARSER.declareString(constructorArg(), CONNECTOR_ID_FIELD);
PARSER.declareBoolean(constructorArg(), HARD_DELETE_FIELD);
PARSER.declareBoolean(constructorArg(), DELETE_SYNC_JOB_FIELD);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient

String connectorId = restRequest.param(CONNECTOR_ID_PARAM);
boolean shouldDeleteSyncJobs = restRequest.paramAsBoolean("delete_sync_jobs", false);
boolean hardDelete = restRequest.paramAsBoolean("hard", false);

DeleteConnectorAction.Request request = new DeleteConnectorAction.Request(connectorId, shouldDeleteSyncJobs);
DeleteConnectorAction.Request request = new DeleteConnectorAction.Request(connectorId, hardDelete, shouldDeleteSyncJobs);
return channel -> client.execute(DeleteConnectorAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}
Loading