Skip to content

Commit

Permalink
feat: queue message payloads (#20749)
Browse files Browse the repository at this point in the history
* feat: queue message payloads

Add sent payloads to message
queue and resend if no response
to message inside MaxMessageSuspendTimeout

fixes #20507

* Add test for re-request

Fix queued message send timing.

* fix test server response

format server

* Make custom service simpler

Fix concurrent issue with custom uidl handler.

* use the has method

clear queue for push messaging.

* Do not up clientId for message already containing clientId

* cleanup
  • Loading branch information
caalador authored Jan 13, 2025
1 parent 0440699 commit d2f9ee0
Show file tree
Hide file tree
Showing 12 changed files with 461 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,10 @@ protected void handleJSON(final ValueMap valueMap) {
}

/**
* Should only prepare resync after the if (locked ||
* Should only prepare resync after the (locked ||
* !isNextExpectedMessage(serverId)) {...} since
* stateTree.repareForResync() will remove the nodes, and if locked is
* true, it will return without handling the message, thus won't adding
* true, it will return without handling the message, thus won't add
* nodes back.
*
* This is related to https://github.com/vaadin/flow/issues/8699 It
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
*/
package com.vaadin.client.communication;

import java.util.ArrayList;
import java.util.List;

import com.google.gwt.core.client.GWT;
import com.google.gwt.user.client.Timer;

import com.vaadin.client.ConnectionIndicator;
import com.vaadin.client.Console;
Expand Down Expand Up @@ -67,6 +71,10 @@ public enum ResynchronizationState {

private JsonObject pushPendingMessage;

private List<JsonObject> messageQueue = new ArrayList<>();

private Timer resendMessageTimer;

/**
* Creates a new instance connected to the given registry.
*
Expand Down Expand Up @@ -119,7 +127,10 @@ private void doSendInvocationsToServer() {
JsonObject payload = pushPendingMessage;
pushPendingMessage = null;
registry.getRequestResponseTracker().startRequest();
send(payload);
sendPayload(payload);
return;
} else if (hasQueuedMessages() && resendMessageTimer == null) {
sendPayload(messageQueue.get(0));
return;
}

Expand All @@ -146,6 +157,8 @@ private void doSendInvocationsToServer() {
if (resynchronizationState == ResynchronizationState.SEND_TO_SERVER) {
resynchronizationState = ResynchronizationState.WAITING_FOR_RESPONSE;
Console.warn("Resynchronizing from server");
messageQueue.clear();
resetTimer();
extraJson.put(ApplicationConstants.RESYNCHRONIZE_ID, true);
}
if (showLoadingIndicator) {
Expand All @@ -166,7 +179,6 @@ protected void send(final JsonArray reqInvocations,
final JsonObject extraJson) {
registry.getRequestResponseTracker().startRequest();
send(preparePayload(reqInvocations, extraJson));

}

private JsonObject preparePayload(final JsonArray reqInvocations,
Expand All @@ -177,10 +189,6 @@ private JsonObject preparePayload(final JsonArray reqInvocations,
payload.put(ApplicationConstants.CSRF_TOKEN, csrfToken);
}
payload.put(ApplicationConstants.RPC_INVOCATIONS, reqInvocations);
payload.put(ApplicationConstants.SERVER_SYNC_ID,
registry.getMessageHandler().getLastSeenServerSyncId());
payload.put(ApplicationConstants.CLIENT_TO_SERVER_ID,
clientToServerMessageId++);
if (extraJson != null) {
for (String key : extraJson.keys()) {
JsonValue value = extraJson.get(key);
Expand All @@ -192,12 +200,44 @@ private JsonObject preparePayload(final JsonArray reqInvocations,

/**
* Sends an asynchronous or synchronous UIDL request to the server using the
* given URI.
* given URI. Adds message to message queue and postpones sending if queue
* not empty.
*
* @param payload
* The contents of the request to send
*/
public void send(final JsonObject payload) {
if (hasQueuedMessages()) {
messageQueue.add(payload);
return;
}
messageQueue.add(payload);
sendPayload(payload);
}

/**
* Sends an asynchronous or synchronous UIDL request to the server using the
* given URI.
*
* @param payload
* The contents of the request to send
*/
private void sendPayload(final JsonObject payload) {
payload.put(ApplicationConstants.SERVER_SYNC_ID,
registry.getMessageHandler().getLastSeenServerSyncId());
// clientID should only be set and updated if payload doesn't contain
// clientID. If one exists we are probably trying to resend.
if (!payload.hasKey(ApplicationConstants.CLIENT_TO_SERVER_ID)) {
payload.put(ApplicationConstants.CLIENT_TO_SERVER_ID,
clientToServerMessageId++);
}

if (!registry.getRequestResponseTracker().hasActiveRequest()) {
// Direct calls to send from outside probably have not started
// request.
registry.getRequestResponseTracker().startRequest();
}

if (push != null && push.isBidirectional()) {
// When using bidirectional transport, the payload is not resent
// to the server during reconnection attempts.
Expand All @@ -211,6 +251,31 @@ public void send(final JsonObject payload) {
} else {
Console.debug("send XHR");
registry.getXhrConnection().send(payload);

resetTimer();
// resend last payload if response hasn't come in.
resendMessageTimer = new Timer() {
@Override
public void run() {
resendMessageTimer
.schedule(registry.getApplicationConfiguration()
.getMaxMessageSuspendTimeout() + 500);
if (!registry.getRequestResponseTracker()
.hasActiveRequest()) {
registry.getRequestResponseTracker().startRequest();
}
registry.getXhrConnection().send(payload);
}
};
resendMessageTimer.schedule(registry.getApplicationConfiguration()
.getMaxMessageSuspendTimeout() + 500);
}
}

private void resetTimer() {
if (resendMessageTimer != null) {
resendMessageTimer.cancel();
resendMessageTimer = null;
}
}

Expand Down Expand Up @@ -289,6 +354,8 @@ public String getCommunicationMethodName() {
*/
public void resynchronize() {
if (requestResynchronize()) {
messageQueue.clear();
resetTimer();
sendInvocationsToServer();
}
}
Expand All @@ -311,12 +378,24 @@ public void setClientToServerMessageId(int nextExpectedId, boolean force) {
ApplicationConstants.CLIENT_TO_SERVER_ID) < nextExpectedId) {
pushPendingMessage = null;
}
if (hasQueuedMessages()) {
// If queued message is the expected one. remove from queue
// and send next message if any.
if (messageQueue.get(0)
.getNumber(ApplicationConstants.CLIENT_TO_SERVER_ID)
+ 1 == nextExpectedId) {
resetTimer();
messageQueue.remove(0);
}
}
return;
}
if (force) {
Console.debug(
"Forced update of clientId to " + clientToServerMessageId);
clientToServerMessageId = nextExpectedId;
messageQueue.clear();
resetTimer();
return;
}

Expand Down Expand Up @@ -372,4 +451,8 @@ void clearResynchronizationState() {
ResynchronizationState getResynchronizationState() {
return resynchronizationState;
}

public boolean hasQueuedMessages() {
return !messageQueue.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public void endRequest() {
if ((registry.getUILifecycle().isRunning()
&& registry.getServerRpcQueue().isFlushPending())
|| registry.getMessageSender()
.getResynchronizationState() == ResynchronizationState.SEND_TO_SERVER) {
.getResynchronizationState() == ResynchronizationState.SEND_TO_SERVER
|| registry.getMessageSender().hasQueuedMessages()) {
// Send the pending RPCs immediately.
// This might be an unnecessary optimization as ServerRpcQueue has a
// finally scheduled command which trigger the send if we do not do
Expand Down
2 changes: 1 addition & 1 deletion flow-client/src/test/frontend/FlowTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ function stubServerRemoteFunction(
handlers.leaveNavigation();
}
}
req.respond(200, { 'content-type': 'application/json' }, 'for(;;);[{}]');
req.respond(200, {'content-type': 'application/json'}, 'for(;;);[{"syncId":' + (payload["syncId"] + 1) + ',"clientId":' + (payload["clientId"] + 1) + '}]');
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public RpcRequest(String jsonString, boolean isSyncIdCheckEnabled) {
this.csrfToken = csrfToken;
}

if (isSyncIdCheckEnabled) {
if (isSyncIdCheckEnabled && !isUnloadBeaconRequest()) {
syncId = (int) json
.getNumber(ApplicationConstants.SERVER_SYNC_ID);
} else {
Expand All @@ -131,7 +131,10 @@ public RpcRequest(String jsonString, boolean isSyncIdCheckEnabled) {
clientToServerMessageId = (int) json
.getNumber(ApplicationConstants.CLIENT_TO_SERVER_ID);
} else {
getLogger().warn("Server message without client id received");
if (!isUnloadBeaconRequest()) {
getLogger()
.warn("Server message without client id received");
}
clientToServerMessageId = -1;
}
invocations = json.getArray(ApplicationConstants.RPC_INVOCATIONS);
Expand Down
1 change: 1 addition & 0 deletions flow-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@
<module>test-react-adapter</module>
<module>test-react-adapter/pom-production.xml</module>
<module>test-legacy-frontend</module>
<module>test-client-queue</module>
</modules>
</profile>
<profile>
Expand Down
63 changes: 63 additions & 0 deletions flow-tests/test-client-queue/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>flow-tests</artifactId>
<groupId>com.vaadin</groupId>
<version>24.7-SNAPSHOT</version>
</parent>
<artifactId>flow-client-queue-test</artifactId>
<name>Test Flow client queue</name>

<packaging>war</packaging>
<properties>
<maven.deploy.skip>true</maven.deploy.skip>
<!-- Test checks client log so java.util.logging.Level import is needed -->
<enforcer.skip>true</enforcer.skip>
</properties>

<dependencies>
<dependency>
<groupId>com.vaadin</groupId>
<artifactId>flow-test-resources</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.vaadin</groupId>
<artifactId>vaadin-dev-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.vaadin</groupId>
<artifactId>flow-html-components-testbench</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<!-- Run flow plugin to build frontend -->
<plugin>
<groupId>com.vaadin</groupId>
<artifactId>flow-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>prepare-frontend</goal>
</goals>
</execution>
</executions>
<!-- <configuration>-->
<!-- <frontendHotdeploy>true</frontendHotdeploy>-->
<!-- </configuration>-->
</plugin>
<!-- Run jetty before integration tests, and stop after -->
<plugin>
<groupId>org.eclipse.jetty.ee10</groupId>
<artifactId>jetty-ee10-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2000-2025 Vaadin Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.vaadin.flow.misc.ui;

import java.util.List;

import com.vaadin.flow.function.DeploymentConfiguration;
import com.vaadin.flow.server.RequestHandler;
import com.vaadin.flow.server.ServiceException;
import com.vaadin.flow.server.VaadinServlet;
import com.vaadin.flow.server.VaadinServletService;
import com.vaadin.flow.server.communication.UidlRequestHandler;

public class CustomService extends VaadinServletService {

public CustomService(VaadinServlet servlet,
DeploymentConfiguration deploymentConfiguration) {
super(servlet, deploymentConfiguration);
}

@Override
protected List<RequestHandler> createRequestHandlers()
throws ServiceException {
List<RequestHandler> requestHandlers = super.createRequestHandlers();
requestHandlers.replaceAll(handler -> {
if (handler instanceof UidlRequestHandler) {
return new CustomUidlRequestHandler();
}
return handler;
});
return requestHandlers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2000-2024 Vaadin Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.vaadin.flow.misc.ui;

import jakarta.servlet.annotation.WebServlet;

import com.vaadin.flow.function.DeploymentConfiguration;
import com.vaadin.flow.server.ServiceException;
import com.vaadin.flow.server.VaadinServlet;
import com.vaadin.flow.server.VaadinServletService;

@WebServlet(urlPatterns = "/*", asyncSupported = true)
public class CustomServlet extends VaadinServlet {

@Override
protected VaadinServletService createServletService(
DeploymentConfiguration deploymentConfiguration)
throws ServiceException {
CustomService service = new CustomService(this,
deploymentConfiguration);
service.init();
return service;
}
}
Loading

0 comments on commit d2f9ee0

Please sign in to comment.