Skip to content

Commit

Permalink
Merge branch 'refs/heads/main' into rohitsanj/int-test-cp
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/main/java/io/confluent/idesidecar/restapi/messageviewer/MessageViewerContext.java
#	src/main/java/io/confluent/idesidecar/restapi/proxy/ProxyContext.java
  • Loading branch information
rohitsanj committed Dec 10, 2024
2 parents 2640bd4 + bdaeb9d commit 14624a9
Show file tree
Hide file tree
Showing 42 changed files with 992 additions and 101 deletions.
2 changes: 1 addition & 1 deletion .versions/ide-sidecar-version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.110.0
v0.113.0
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<groupId>io.confluent</groupId>
<artifactId>ide-sidecar</artifactId>
<version>0.110.0</version>
<version>0.113.0</version>
<packaging>jar</packaging>
<name>IDE Sidecar</name>
<description>Sidecar used by Confluent for VS Code.</description>
Expand Down
2 changes: 1 addition & 1 deletion release.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -9,54 +9,122 @@
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.microprofile.config.ConfigProvider;

/**
* The SchemaErrors class manages schema error caching for different connections.
* It uses Caffeine cache to store and retrieve schema errors based on connection and schema IDs.
*
* <p>This class is application-scoped and observes connection lifecycle events to manage the cache.
* It provides methods to read, write, and clear schema errors for specific connections.
*
* <p>Configuration properties:
* <ul>
* <li>`ide-sidecar.schema-fetch-error-ttl`: The time-to-live duration for schema fetch errors in the cache.</li>
* </ul>
*/
@ApplicationScoped
public class SchemaErrors {

public record ConnectionId(String id) {
/**
* Represents a unique connection identifier.
*/
private record ConnectionId(String id) {

}

public record SchemaId(String clusterId, int schemaId) {
/**
* Represents a unique schema identifier within a cluster.
*/
private record SchemaId(String clusterId, int schemaId) {

}

/**
* Represents an error message associated with a schema.
*/
public record Error(String message) {

}

private static final Duration SCHEMA_FETCH_ERROR_TTL = Duration.ofSeconds(30);
private static final Duration SCHEMA_FETCH_ERROR_TTL = Duration.ofSeconds(
ConfigProvider
.getConfig()
.getValue(
"ide-sidecar.schema-fetch-error-ttl",
Long.class));

public static final Map<ConnectionId, Cache<SchemaId, SchemaErrors.Error>> cacheOfCaches = new ConcurrentHashMap<>();
private static final Map<ConnectionId, Cache<SchemaId, Error>> cacheOfCaches = new ConcurrentHashMap<>();

public static Cache<SchemaId, Error> getSubCache(ConnectionId key) {
/**
* Retrieves or creates a sub-cache for a specific connection.
*
* @param key The connection identifier.
* @return The sub-cache for the connection.
*/
private Cache<SchemaId, Error> getSubCache(ConnectionId key) {
return cacheOfCaches.computeIfAbsent(
key,
k -> Caffeine.newBuilder().expireAfterAccess(SCHEMA_FETCH_ERROR_TTL).build());
}

public Error readSchemaIdByConnectionId(
ConnectionId connectionId,
SchemaId schemaId
) {
return getSubCache(connectionId).getIfPresent(schemaId);
/**
* Reads a schema error from the cache.
*
* @param connectionId The connection identifier.
* @param schemaId The schema identifier.
* @return The schema error, or null if not found.
*/

public Error readSchemaIdByConnectionId(String connectionId, String clusterId, int schemaId) {
var cId = new ConnectionId(connectionId);
var sId = new SchemaId(
clusterId,
schemaId
);
return getSubCache(cId).getIfPresent(sId);
}

/**
* Writes a schema error to the cache.
*
* @param connectionId The connection identifier.
* @param schemaId The schema identifier.
* @param error The schema error.
*/
public void writeSchemaIdByConnectionId(
ConnectionId connectionId,
SchemaId schemaId,
String connectionId,
int schemaId,
String clusterId,
Error error
) {
getSubCache(connectionId).put(schemaId, error);
var cId = new ConnectionId(connectionId);
var sId = new SchemaId(
clusterId,
schemaId
);
getSubCache(cId).put(sId, error);
}

/**
* Clears the cache for a specific connection.
*
* @param connectionId The connection identifier.
*/
public void clearByConnectionId(
ConnectionId connectionId
String connectionId
) {
cacheOfCaches.remove(connectionId);
var cId = new ConnectionId(connectionId);
cacheOfCaches.remove(cId);
}

/**
* Removes the cache by connections id in response to lifecycle events.
*
* @param connection The connection state.
*/
public void onConnectionChange(@ObservesAsync @Lifecycle.Deleted @Lifecycle.Created @Lifecycle.Updated ConnectionState connection) {
cacheOfCaches.remove(new ConnectionId(connection.getId()));
var cId = new ConnectionId(connection.getId());
cacheOfCaches.remove(cId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
public class MessageViewerClusterInfoProcessor extends
Processor<MessageViewerContext, Future<MessageViewerContext>> {

@Inject
SchemaErrors schemaErrors;

private final ClusterCache clusterCache;

Expand Down Expand Up @@ -79,25 +77,4 @@ public Future<MessageViewerContext> process(MessageViewerContext context) {
);
}
}

/**
* Respond to the connection being disconnected by clearing cached information.
*
* @param connection the connection that was disconnected
*/
void onConnectionDisconnected(
@ObservesAsync @Lifecycle.Disconnected ConnectionState connection
) {
schemaErrors.clearByConnectionId(new SchemaErrors.ConnectionId(connection.getId()));
}

/**
* Respond to the connection being deleted by clearing cached information.
*
* @param connection the connection that was deleted
*/
void onConnectionDeleted(@ObservesAsync @Lifecycle.Deleted ConnectionState connection) {
schemaErrors.clearByConnectionId(new SchemaErrors.ConnectionId(connection.getId()));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
import io.confluent.idesidecar.restapi.clients.SchemaErrors;
import io.confluent.idesidecar.restapi.messageviewer.data.SimpleConsumeMultiPartitionRequest;
import io.confluent.idesidecar.restapi.messageviewer.data.SimpleConsumeMultiPartitionResponse;
import io.confluent.idesidecar.restapi.models.ClusterType;
import io.confluent.idesidecar.restapi.models.graph.KafkaCluster;
import io.confluent.idesidecar.restapi.models.graph.SchemaRegistry;
import io.confluent.idesidecar.restapi.proxy.clusters.ClusterProxyContext;
import io.confluent.idesidecar.restapi.proxy.ProxyContext;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
Expand All @@ -16,7 +15,7 @@
/**
* Stores the context of a request of the message viewer API.
*/
public class MessageViewerContext extends ClusterProxyContext {
public class MessageViewerContext extends ProxyContext {
private final String clusterId;
private final String topicName;
private KafkaCluster kafkaClusterInfo;
Expand All @@ -31,7 +30,7 @@ public MessageViewerContext(
HttpMethod requestMethod,
SimpleConsumeMultiPartitionRequest requestBody,
Map<String, String> requestPathParams,
SchemaErrors.ConnectionId connectionId,
String connectionId,
String clusterId,
String topicName
) {
Expand All @@ -44,11 +43,8 @@ public MessageViewerContext(
.map(body -> Buffer.buffer(body.toJsonString()))
.orElse(null),
requestPathParams,
connectionId.id(),
clusterId,
ClusterType.KAFKA
connectionId
);
// TODO: Read from context instead of storing
this.clusterId = clusterId;
this.topicName = topicName;
this.consumeRequest = requestBody;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ public class RecordDeserializer {

private static final Map<String, String> SERDE_CONFIGS = ConfigUtil
.asMap("ide-sidecar.serde-configs");
private final int schemaFetchMaxRetries;

SchemaErrors schemaErrors;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ObjectMapper AVRO_OBJECT_MAPPER = new AvroMapper(new AvroFactory());
public static final byte MAGIC_BYTE = 0x0;
private static final Duration CACHE_FAILED_SCHEMA_ID_FETCH_DURATION = Duration.ofSeconds(30);

private final int schemaFetchMaxRetries;
private final SchemaErrors schemaErrors;
private final Duration schemaFetchRetryInitialBackoff;
private final Duration schemaFetchRetryMaxBackoff;
private final Duration schemaFetchTimeout;
Expand Down Expand Up @@ -235,15 +235,20 @@ public DecodedResult deserialize(
if (result.isPresent()) {
return result.get();
}
SchemaErrors.SchemaId schemaId = new SchemaErrors.SchemaId(context.getClusterId(), getSchemaIdFromRawBytes(bytes));
int schemaId = getSchemaIdFromRawBytes(bytes);
String connectionId = context.getConnectionId();
// Check if schema retrieval has failed recently
if (schemaErrors.readSchemaIdByConnectionId(new SchemaErrors.ConnectionId(context.getConnectionId()), schemaId) != null) {
var error = schemaErrors.readSchemaIdByConnectionId(
connectionId,
context.getClusterId(),
schemaId
);
if (error != null) {
return new DecodedResult(
// If the schema fetch failed, we can't decode the data, so we just return the raw bytes.
// We apply the encoderOnFailure function to the bytes before returning them.
onFailure(encoderOnFailure, bytes),
String.valueOf(
schemaErrors.readSchemaIdByConnectionId(new SchemaErrors.ConnectionId(context.getConnectionId()), schemaId))
error.message()
);
}

Expand All @@ -252,7 +257,7 @@ public DecodedResult deserialize(
// and retry if the operation fails due to a retryable exception.
var parsedSchema = Uni
.createFrom()
.item(Unchecked.supplier(() -> schemaRegistryClient.getSchemaById(schemaId.schemaId())))
.item(Unchecked.supplier(() -> schemaRegistryClient.getSchemaById(schemaId)))
.onFailure(this::isRetryableException)
.retry()
.withBackOff(schemaFetchRetryInitialBackoff, schemaFetchRetryMaxBackoff)
Expand Down Expand Up @@ -340,25 +345,30 @@ public DecodedResult deserialize(
}

private void cacheSchemaFetchError(
Throwable e, SchemaErrors.SchemaId schemaId, MessageViewerContext context
Throwable e, int schemaId, MessageViewerContext context
) {
var retryTime = Instant.now().plus(CACHE_FAILED_SCHEMA_ID_FETCH_DURATION);
var timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss")
.withZone(ZoneId.systemDefault());
Log.errorf(
"Failed to retrieve schema with ID %d. Will try again in %d seconds at %s. Error: %s",
schemaId.schemaId(),
schemaId,
CACHE_FAILED_SCHEMA_ID_FETCH_DURATION.getSeconds(),
timeFormatter.format(retryTime),
e.getMessage(),
e
);
SchemaErrors.Error errorMessage = new SchemaErrors.Error(String.format(
"Failed to retrieve schema with ID %d: %s",
schemaId.schemaId(),
schemaId,
e.getMessage()
));
schemaErrors.writeSchemaIdByConnectionId(new SchemaErrors.ConnectionId(context.getConnectionId()), schemaId, errorMessage);
schemaErrors.writeSchemaIdByConnectionId(
context.getConnectionId(),
schemaId,
context.getClusterId(),
errorMessage
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public MessageViewerContext consumeMessages(MessageViewerContext context) {
var schemaRegistryClient = Optional
.ofNullable(context.getSchemaRegistryInfo())
.map(info -> schemaRegistryClients.getClient(
String.valueOf(context.getConnectionId()), info.id()))
context.getConnectionId(), info.id()))
.orElse(null);

var consumer = consumerFactory.getClient(
String.valueOf(context.getConnectionId()),
context.getConnectionId(),
context.getClusterId(),
request.consumerConfigOverrides()
);
Expand Down
Loading

0 comments on commit 14624a9

Please sign in to comment.