Skip to content

Commit

Permalink
MINOR: Cleanup websocket code and tests (#242)
Browse files Browse the repository at this point in the history
No change in functionality.
  • Loading branch information
rhauch authored Dec 19, 2024
1 parent 8ef6b06 commit c1841ed
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.confluent.idesidecar.restapi.credentials.ApiKeyAndSecret;
import io.confluent.idesidecar.restapi.credentials.BasicCredentials;
import io.quarkus.runtime.annotations.RegisterForReflection;


/**
* Base class for all websocket message bodies. As long as the message body types have distinct
* fields, the {@link JsonTypeInfo.Id#DEDUCTION} strategy should be sufficient to deserialize them.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.confluent.idesidecar.websocket.messages;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.quarkus.runtime.annotations.RegisterForReflection;
import java.util.UUID;
Expand All @@ -16,9 +17,19 @@ public record MessageHeaders(
@NotNull @JsonProperty("originator") String originator,
@NotNull @JsonProperty("message_id") String id
) {
public static final String SIDECAR_ORIGINATOR = "sidecar";

/** Constructor for outbound messages. */
public MessageHeaders(MessageType messageType) {
this(messageType, SIDECAR_ORIGINATOR);
}

public MessageHeaders(MessageType messageType, String originator) {
this(messageType, originator, UUID.randomUUID().toString());
}

@JsonIgnore
public boolean originatedBySidecar() {
return SIDECAR_ORIGINATOR.equals(originator);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.confluent.idesidecar.websocket.messages;


import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
Expand All @@ -25,16 +24,22 @@ public enum MessageType {
*/
WORKSPACE_HELLO,

/** Message sent by the sidecar to all workspaces when count of connected workspaces
* (that have sent proper WORKSPACE_HELLO messages) has increased or decreased. */
/**
* Message sent by the sidecar to all workspaces when count of connected workspaces
* (that have sent proper WORKSPACE_HELLO messages) has increased or decreased.
*/
WORKSPACE_COUNT_CHANGED,

/** Message sent by sidecar to a workspace when sidecar has noticed a websocket messaging
* error and is going to disconnect its end of the websocket. */
/**
* Message sent by sidecar to a workspace when sidecar has noticed a websocket messaging
* error and is going to disconnect its end of the websocket.
*/
PROTOCOL_ERROR,

/** Placeholder for unknown-to-sidecar message types for messages intended to be
* for extension -> extension messaging via sidecar. */
/**
* Placeholder for unknown-to-sidecar message types for messages intended to be
* for extension -> extension messaging via sidecar.
*/
UNKNOWN;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,9 @@
import io.smallrye.common.constraint.Nullable;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.core.CompositeFuture;
import jakarta.inject.Singleton;
import jakarta.validation.constraints.NotNull;
import java.awt.*;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -36,9 +31,7 @@
import jakarta.websocket.Session;
import jakarta.websocket.server.ServerEndpoint;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* Websocket endpoint for "control plane" variety async messaging between sidecar and workspaces,
Expand Down Expand Up @@ -442,7 +435,7 @@ private void broadcastWorkspacesChanged() {
}

var message = new Message(
new MessageHeaders(MessageType.WORKSPACE_COUNT_CHANGED, "sidecar"),
new MessageHeaders(MessageType.WORKSPACE_COUNT_CHANGED),
new WorkspacesChangedBody(this.sessions.size())
);

Expand Down Expand Up @@ -483,18 +476,18 @@ private void sendErrorAndCloseSession(
Log.error(msg);
try {
var errorMessage = new Message(
new MessageHeaders(MessageType.PROTOCOL_ERROR, "sidecar"),
new MessageHeaders(MessageType.PROTOCOL_ERROR),
new ProtocolErrorBody(msg, originalMessageId)
);
// Do not convert to getBasicRemote(), for some reason breaks tests.
session.getAsyncRemote()
.sendText(mapper.writeValueAsString(errorMessage)).get();
} catch (IOException | InterruptedException | ExecutionException e) {
Log.errorf(
e,
"Unable to send error message to session %s: %s",
session.getId(),
e.getMessage(),
e
e.getMessage()
);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
Expand All @@ -505,10 +498,10 @@ private void sendErrorAndCloseSession(
session.close();
} catch (IOException e) {
Log.errorf(
e,
"Unable to close session %s: %s",
session.getId(),
e.getMessage(),
e
e.getMessage()
);
}
}
Expand All @@ -524,7 +517,7 @@ private void sendErrorAndCloseSession(
static MessageHeaders validateHeadersForSidecarBroadcast(Message outboundMessage) {
MessageHeaders headers = outboundMessage.headers();

if (!headers.originator().equals("sidecar")) {
if (!headers.originatedBySidecar()) {
Log.errorf(
"Message id %s is not originator=sidecar message, cannot broadcast.",
headers.id()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.confluent.idesidecar.websocket.resources;

import io.quarkus.runtime.annotations.RegisterForReflection;

/** Record describing authorized workspace sessions: processId long, ... **/
@RegisterForReflection
public record WorkspaceSession(long processId) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,85 @@

import static io.confluent.idesidecar.restapi.util.ResourceIOUtil.deserializeAndSerialize;
import static io.confluent.idesidecar.restapi.util.ResourceIOUtil.loadResourceAsObject;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.List;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

public class MessageSerializationTest {

/**
* Test deserializing sample messages from resource files.
* <p>
* Proves that the messages deserialize properly into the expected types and body classes.
*
* <p>Proves that the messages deserialize properly into the expected types and body classes.
* Also verifies that all MessageType enum members are covered by the test cases to
* force developers to add test cases for new message types.
*/
@Test
public void testSerdeResourceMessageFiles() throws IOException {
public void testSerdeResourceMessageFiles() {
// Define the test data as an array of Object arrays
Object[][] pathAndBodyTypeAndClassTuples = new Object[][]{
{"websocket-messages/workspace-hello.json", MessageType.WORKSPACE_HELLO, HelloBody.class},
{"websocket-messages/random-extension-message.json", MessageType.UNKNOWN,
DynamicMessageBody.class},
{"websocket-messages/workspaces-changed.json", MessageType.WORKSPACE_COUNT_CHANGED,
WorkspacesChangedBody.class},
{"websocket-messages/protocol-error.json", MessageType.PROTOCOL_ERROR,
ProtocolErrorBody.class}
};

// Set to keep track of which MessageType enums have been tested
Set<MessageType> testedMessageTypes = new HashSet<>();
record TestInput(
String resourceFilePath,
MessageType expectedMessageType,
Class<? extends MessageBody> expectedBodyType
) {
}

for (Object[] tuple : pathAndBodyTypeAndClassTuples) {
String resourceFilePath = (String) tuple[0];
MessageType expectedMessageType = (MessageType) tuple[1];
Class<? extends MessageBody> expectedBodyType = (Class<? extends MessageBody>) tuple[2];
var inputs = List.of(
new TestInput(
"websocket-messages/workspace-hello.json",
MessageType.WORKSPACE_HELLO,
HelloBody.class
),
new TestInput(
"websocket-messages/random-extension-message.json",
MessageType.UNKNOWN,
DynamicMessageBody.class
),
new TestInput(
"websocket-messages/workspaces-changed.json",
MessageType.WORKSPACE_COUNT_CHANGED,
WorkspacesChangedBody.class
),
new TestInput(
"websocket-messages/protocol-error.json",
MessageType.PROTOCOL_ERROR,
ProtocolErrorBody.class
)
);

deserializeAndSerialize(resourceFilePath, Message.class);
Message message = loadResourceAsObject(resourceFilePath, Message.class);
Assertions.assertEquals(expectedMessageType, message.messageType());
Assertions.assertEquals(expectedBodyType, message.body().getClass());
// Set to keep track of which MessageType enums have been tested
var testedMessageTypes = new HashSet<>();
inputs.forEach(input -> {
deserializeAndSerialize(input.resourceFilePath(), Message.class);
Message message = loadResourceAsObject(input.resourceFilePath(), Message.class);
assertNotNull(message);
assertEquals(
input.expectedMessageType(),
message.messageType())
;
assertEquals(
input.expectedBodyType(),
message.body().getClass()
);
// while here, prove that the MessageType -> bodyClass mapping is correct.
Assertions.assertEquals(expectedMessageType.bodyClass(), message.body().getClass());
assertEquals(
input.expectedMessageType().bodyClass(),
message.body().getClass()
);

testedMessageTypes.add(expectedMessageType);
}
testedMessageTypes.add(input.expectedMessageType());
});

// Prove that all MessageType enum members are covered by the test cases
for (MessageType messageType : MessageType.values()) {
Assertions.assertTrue(testedMessageTypes.contains(messageType),
"No test case for MessageType: " + messageType);
assertTrue(
testedMessageTypes.contains(messageType),
"No test case for MessageType: " + messageType
);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.confluent.idesidecar.websocket.messages;

import static org.junit.Assert.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import java.util.HashSet;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class MessageTypeTest {
Expand All @@ -15,9 +15,15 @@ public class MessageTypeTest {
@Test
public void testBodyClass() {
var seenClasses = new HashSet<Class<? extends MessageBody>>();
for (MessageType messageType : MessageType.values()) {
Assertions.assertNotEquals(null, messageType.bodyClass());
Assertions.assertNotEquals(true, seenClasses.contains(messageType.bodyClass()));
for (var messageType : MessageType.values()) {
assertNotNull(
messageType.bodyClass(),
"Expected body class to be non-null for MessageType: " + messageType
);
assertFalse(
seenClasses.contains(messageType.bodyClass()),
"Expected body class to be unique for MessageType: " + messageType
);
seenClasses.add(messageType.bodyClass());
}
}
Expand Down
Loading

0 comments on commit c1841ed

Please sign in to comment.