From d2f9ee0ff4e2726f98cc142fe11c56b59b6cbec0 Mon Sep 17 00:00:00 2001 From: caalador Date: Mon, 13 Jan 2025 14:36:15 +0200 Subject: [PATCH] feat: queue message payloads (#20749) * feat: queue message payloads Add sent payloads to message queue and resend if no response to message inside MaxMessageSuspendTimeout fixes #20507 * Add test for re-request Fix queued message send timing. * fix test server response format server * Make custom service simpler Fix concurrent issue with custom uidl handler. * use the has method clear queue for push messaging. * Do not up clientId for message already containing clientId * cleanup --- .../client/communication/MessageHandler.java | 4 +- .../client/communication/MessageSender.java | 97 ++++++++++++++-- .../communication/RequestResponseTracker.java | 3 +- flow-client/src/test/frontend/FlowTests.ts | 2 +- .../communication/ServerRpcHandler.java | 7 +- flow-tests/pom.xml | 1 + flow-tests/test-client-queue/pom.xml | 63 +++++++++++ .../vaadin/flow/misc/ui/CustomService.java | 46 ++++++++ .../vaadin/flow/misc/ui/CustomServlet.java | 37 ++++++ .../misc/ui/CustomUidlRequestHandler.java | 61 ++++++++++ .../flow/misc/ui/TestNoResponseView.java | 47 ++++++++ .../com/vaadin/flow/misc/ui/NoResponseIT.java | 106 ++++++++++++++++++ 12 files changed, 461 insertions(+), 13 deletions(-) create mode 100644 flow-tests/test-client-queue/pom.xml create mode 100644 flow-tests/test-client-queue/src/main/java/com/vaadin/flow/misc/ui/CustomService.java create mode 100644 flow-tests/test-client-queue/src/main/java/com/vaadin/flow/misc/ui/CustomServlet.java create mode 100644 flow-tests/test-client-queue/src/main/java/com/vaadin/flow/misc/ui/CustomUidlRequestHandler.java create mode 100644 flow-tests/test-client-queue/src/main/java/com/vaadin/flow/misc/ui/TestNoResponseView.java create mode 100644 flow-tests/test-client-queue/src/test/java/com/vaadin/flow/misc/ui/NoResponseIT.java diff --git a/flow-client/src/main/java/com/vaadin/client/communication/MessageHandler.java b/flow-client/src/main/java/com/vaadin/client/communication/MessageHandler.java index 98533e45c1e..9963c03cbf0 100644 --- a/flow-client/src/main/java/com/vaadin/client/communication/MessageHandler.java +++ b/flow-client/src/main/java/com/vaadin/client/communication/MessageHandler.java @@ -295,10 +295,10 @@ protected void handleJSON(final ValueMap valueMap) { } /** - * Should only prepare resync after the if (locked || + * Should only prepare resync after the (locked || * !isNextExpectedMessage(serverId)) {...} since * stateTree.repareForResync() will remove the nodes, and if locked is - * true, it will return without handling the message, thus won't adding + * true, it will return without handling the message, thus won't add * nodes back. * * This is related to https://github.com/vaadin/flow/issues/8699 It diff --git a/flow-client/src/main/java/com/vaadin/client/communication/MessageSender.java b/flow-client/src/main/java/com/vaadin/client/communication/MessageSender.java index 6182fde8123..4c6e2c0c215 100644 --- a/flow-client/src/main/java/com/vaadin/client/communication/MessageSender.java +++ b/flow-client/src/main/java/com/vaadin/client/communication/MessageSender.java @@ -15,7 +15,11 @@ */ package com.vaadin.client.communication; +import java.util.ArrayList; +import java.util.List; + import com.google.gwt.core.client.GWT; +import com.google.gwt.user.client.Timer; import com.vaadin.client.ConnectionIndicator; import com.vaadin.client.Console; @@ -67,6 +71,10 @@ public enum ResynchronizationState { private JsonObject pushPendingMessage; + private List messageQueue = new ArrayList<>(); + + private Timer resendMessageTimer; + /** * Creates a new instance connected to the given registry. * @@ -119,7 +127,10 @@ private void doSendInvocationsToServer() { JsonObject payload = pushPendingMessage; pushPendingMessage = null; registry.getRequestResponseTracker().startRequest(); - send(payload); + sendPayload(payload); + return; + } else if (hasQueuedMessages() && resendMessageTimer == null) { + sendPayload(messageQueue.get(0)); return; } @@ -146,6 +157,8 @@ private void doSendInvocationsToServer() { if (resynchronizationState == ResynchronizationState.SEND_TO_SERVER) { resynchronizationState = ResynchronizationState.WAITING_FOR_RESPONSE; Console.warn("Resynchronizing from server"); + messageQueue.clear(); + resetTimer(); extraJson.put(ApplicationConstants.RESYNCHRONIZE_ID, true); } if (showLoadingIndicator) { @@ -166,7 +179,6 @@ protected void send(final JsonArray reqInvocations, final JsonObject extraJson) { registry.getRequestResponseTracker().startRequest(); send(preparePayload(reqInvocations, extraJson)); - } private JsonObject preparePayload(final JsonArray reqInvocations, @@ -177,10 +189,6 @@ private JsonObject preparePayload(final JsonArray reqInvocations, payload.put(ApplicationConstants.CSRF_TOKEN, csrfToken); } payload.put(ApplicationConstants.RPC_INVOCATIONS, reqInvocations); - payload.put(ApplicationConstants.SERVER_SYNC_ID, - registry.getMessageHandler().getLastSeenServerSyncId()); - payload.put(ApplicationConstants.CLIENT_TO_SERVER_ID, - clientToServerMessageId++); if (extraJson != null) { for (String key : extraJson.keys()) { JsonValue value = extraJson.get(key); @@ -192,12 +200,44 @@ private JsonObject preparePayload(final JsonArray reqInvocations, /** * Sends an asynchronous or synchronous UIDL request to the server using the - * given URI. + * given URI. Adds message to message queue and postpones sending if queue + * not empty. * * @param payload * The contents of the request to send */ public void send(final JsonObject payload) { + if (hasQueuedMessages()) { + messageQueue.add(payload); + return; + } + messageQueue.add(payload); + sendPayload(payload); + } + + /** + * Sends an asynchronous or synchronous UIDL request to the server using the + * given URI. + * + * @param payload + * The contents of the request to send + */ + private void sendPayload(final JsonObject payload) { + payload.put(ApplicationConstants.SERVER_SYNC_ID, + registry.getMessageHandler().getLastSeenServerSyncId()); + // clientID should only be set and updated if payload doesn't contain + // clientID. If one exists we are probably trying to resend. + if (!payload.hasKey(ApplicationConstants.CLIENT_TO_SERVER_ID)) { + payload.put(ApplicationConstants.CLIENT_TO_SERVER_ID, + clientToServerMessageId++); + } + + if (!registry.getRequestResponseTracker().hasActiveRequest()) { + // Direct calls to send from outside probably have not started + // request. + registry.getRequestResponseTracker().startRequest(); + } + if (push != null && push.isBidirectional()) { // When using bidirectional transport, the payload is not resent // to the server during reconnection attempts. @@ -211,6 +251,31 @@ public void send(final JsonObject payload) { } else { Console.debug("send XHR"); registry.getXhrConnection().send(payload); + + resetTimer(); + // resend last payload if response hasn't come in. + resendMessageTimer = new Timer() { + @Override + public void run() { + resendMessageTimer + .schedule(registry.getApplicationConfiguration() + .getMaxMessageSuspendTimeout() + 500); + if (!registry.getRequestResponseTracker() + .hasActiveRequest()) { + registry.getRequestResponseTracker().startRequest(); + } + registry.getXhrConnection().send(payload); + } + }; + resendMessageTimer.schedule(registry.getApplicationConfiguration() + .getMaxMessageSuspendTimeout() + 500); + } + } + + private void resetTimer() { + if (resendMessageTimer != null) { + resendMessageTimer.cancel(); + resendMessageTimer = null; } } @@ -289,6 +354,8 @@ public String getCommunicationMethodName() { */ public void resynchronize() { if (requestResynchronize()) { + messageQueue.clear(); + resetTimer(); sendInvocationsToServer(); } } @@ -311,12 +378,24 @@ public void setClientToServerMessageId(int nextExpectedId, boolean force) { ApplicationConstants.CLIENT_TO_SERVER_ID) < nextExpectedId) { pushPendingMessage = null; } + if (hasQueuedMessages()) { + // If queued message is the expected one. remove from queue + // and send next message if any. + if (messageQueue.get(0) + .getNumber(ApplicationConstants.CLIENT_TO_SERVER_ID) + + 1 == nextExpectedId) { + resetTimer(); + messageQueue.remove(0); + } + } return; } if (force) { Console.debug( "Forced update of clientId to " + clientToServerMessageId); clientToServerMessageId = nextExpectedId; + messageQueue.clear(); + resetTimer(); return; } @@ -372,4 +451,8 @@ void clearResynchronizationState() { ResynchronizationState getResynchronizationState() { return resynchronizationState; } + + public boolean hasQueuedMessages() { + return !messageQueue.isEmpty(); + } } diff --git a/flow-client/src/main/java/com/vaadin/client/communication/RequestResponseTracker.java b/flow-client/src/main/java/com/vaadin/client/communication/RequestResponseTracker.java index 0d08be7644a..bb5e263e011 100644 --- a/flow-client/src/main/java/com/vaadin/client/communication/RequestResponseTracker.java +++ b/flow-client/src/main/java/com/vaadin/client/communication/RequestResponseTracker.java @@ -112,7 +112,8 @@ public void endRequest() { if ((registry.getUILifecycle().isRunning() && registry.getServerRpcQueue().isFlushPending()) || registry.getMessageSender() - .getResynchronizationState() == ResynchronizationState.SEND_TO_SERVER) { + .getResynchronizationState() == ResynchronizationState.SEND_TO_SERVER + || registry.getMessageSender().hasQueuedMessages()) { // Send the pending RPCs immediately. // This might be an unnecessary optimization as ServerRpcQueue has a // finally scheduled command which trigger the send if we do not do diff --git a/flow-client/src/test/frontend/FlowTests.ts b/flow-client/src/test/frontend/FlowTests.ts index 778c4137131..24b4edcf0ec 100644 --- a/flow-client/src/test/frontend/FlowTests.ts +++ b/flow-client/src/test/frontend/FlowTests.ts @@ -748,7 +748,7 @@ function stubServerRemoteFunction( handlers.leaveNavigation(); } } - req.respond(200, { 'content-type': 'application/json' }, 'for(;;);[{}]'); + req.respond(200, {'content-type': 'application/json'}, 'for(;;);[{"syncId":' + (payload["syncId"] + 1) + ',"clientId":' + (payload["clientId"] + 1) + '}]'); }); } diff --git a/flow-server/src/main/java/com/vaadin/flow/server/communication/ServerRpcHandler.java b/flow-server/src/main/java/com/vaadin/flow/server/communication/ServerRpcHandler.java index 802329461af..1b07e9466d7 100644 --- a/flow-server/src/main/java/com/vaadin/flow/server/communication/ServerRpcHandler.java +++ b/flow-server/src/main/java/com/vaadin/flow/server/communication/ServerRpcHandler.java @@ -113,7 +113,7 @@ public RpcRequest(String jsonString, boolean isSyncIdCheckEnabled) { this.csrfToken = csrfToken; } - if (isSyncIdCheckEnabled) { + if (isSyncIdCheckEnabled && !isUnloadBeaconRequest()) { syncId = (int) json .getNumber(ApplicationConstants.SERVER_SYNC_ID); } else { @@ -131,7 +131,10 @@ public RpcRequest(String jsonString, boolean isSyncIdCheckEnabled) { clientToServerMessageId = (int) json .getNumber(ApplicationConstants.CLIENT_TO_SERVER_ID); } else { - getLogger().warn("Server message without client id received"); + if (!isUnloadBeaconRequest()) { + getLogger() + .warn("Server message without client id received"); + } clientToServerMessageId = -1; } invocations = json.getArray(ApplicationConstants.RPC_INVOCATIONS); diff --git a/flow-tests/pom.xml b/flow-tests/pom.xml index aa7c2e83a08..d7946ba9863 100644 --- a/flow-tests/pom.xml +++ b/flow-tests/pom.xml @@ -335,6 +335,7 @@ test-react-adapter test-react-adapter/pom-production.xml test-legacy-frontend + test-client-queue diff --git a/flow-tests/test-client-queue/pom.xml b/flow-tests/test-client-queue/pom.xml new file mode 100644 index 00000000000..9e0908c433c --- /dev/null +++ b/flow-tests/test-client-queue/pom.xml @@ -0,0 +1,63 @@ + + + 4.0.0 + + flow-tests + com.vaadin + 24.7-SNAPSHOT + + flow-client-queue-test + Test Flow client queue + + war + + true + + true + + + + + com.vaadin + flow-test-resources + ${project.version} + + + com.vaadin + vaadin-dev-server + ${project.version} + + + com.vaadin + flow-html-components-testbench + ${project.version} + test + + + + + + + + com.vaadin + flow-maven-plugin + + + + prepare-frontend + + + + + + + + + + org.eclipse.jetty.ee10 + jetty-ee10-maven-plugin + + + + + diff --git a/flow-tests/test-client-queue/src/main/java/com/vaadin/flow/misc/ui/CustomService.java b/flow-tests/test-client-queue/src/main/java/com/vaadin/flow/misc/ui/CustomService.java new file mode 100644 index 00000000000..cdf340709c5 --- /dev/null +++ b/flow-tests/test-client-queue/src/main/java/com/vaadin/flow/misc/ui/CustomService.java @@ -0,0 +1,46 @@ +/* + * Copyright 2000-2025 Vaadin Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.vaadin.flow.misc.ui; + +import java.util.List; + +import com.vaadin.flow.function.DeploymentConfiguration; +import com.vaadin.flow.server.RequestHandler; +import com.vaadin.flow.server.ServiceException; +import com.vaadin.flow.server.VaadinServlet; +import com.vaadin.flow.server.VaadinServletService; +import com.vaadin.flow.server.communication.UidlRequestHandler; + +public class CustomService extends VaadinServletService { + + public CustomService(VaadinServlet servlet, + DeploymentConfiguration deploymentConfiguration) { + super(servlet, deploymentConfiguration); + } + + @Override + protected List createRequestHandlers() + throws ServiceException { + List requestHandlers = super.createRequestHandlers(); + requestHandlers.replaceAll(handler -> { + if (handler instanceof UidlRequestHandler) { + return new CustomUidlRequestHandler(); + } + return handler; + }); + return requestHandlers; + } +} diff --git a/flow-tests/test-client-queue/src/main/java/com/vaadin/flow/misc/ui/CustomServlet.java b/flow-tests/test-client-queue/src/main/java/com/vaadin/flow/misc/ui/CustomServlet.java new file mode 100644 index 00000000000..7aba1806e05 --- /dev/null +++ b/flow-tests/test-client-queue/src/main/java/com/vaadin/flow/misc/ui/CustomServlet.java @@ -0,0 +1,37 @@ +/* + * Copyright 2000-2024 Vaadin Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.vaadin.flow.misc.ui; + +import jakarta.servlet.annotation.WebServlet; + +import com.vaadin.flow.function.DeploymentConfiguration; +import com.vaadin.flow.server.ServiceException; +import com.vaadin.flow.server.VaadinServlet; +import com.vaadin.flow.server.VaadinServletService; + +@WebServlet(urlPatterns = "/*", asyncSupported = true) +public class CustomServlet extends VaadinServlet { + + @Override + protected VaadinServletService createServletService( + DeploymentConfiguration deploymentConfiguration) + throws ServiceException { + CustomService service = new CustomService(this, + deploymentConfiguration); + service.init(); + return service; + } +} diff --git a/flow-tests/test-client-queue/src/main/java/com/vaadin/flow/misc/ui/CustomUidlRequestHandler.java b/flow-tests/test-client-queue/src/main/java/com/vaadin/flow/misc/ui/CustomUidlRequestHandler.java new file mode 100644 index 00000000000..976e04555d5 --- /dev/null +++ b/flow-tests/test-client-queue/src/main/java/com/vaadin/flow/misc/ui/CustomUidlRequestHandler.java @@ -0,0 +1,61 @@ +/* + * Copyright 2000-2025 Vaadin Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.vaadin.flow.misc.ui; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +import com.vaadin.flow.server.VaadinRequest; +import com.vaadin.flow.server.VaadinResponse; +import com.vaadin.flow.server.VaadinSession; +import com.vaadin.flow.server.communication.UidlRequestHandler; + +public class CustomUidlRequestHandler extends UidlRequestHandler { + + public static Set emptyResponse = new HashSet(); + + @Override + public boolean synchronizedHandleRequest(VaadinSession session, + VaadinRequest request, VaadinResponse response) throws IOException { + if (emptyResponse.contains(session)) { + emptyResponse.remove(session); + commitEmptyResponse(response); + return true; + } + return super.synchronizedHandleRequest(session, request, response); + } + + @Override + public Optional synchronizedHandleRequest( + VaadinSession session, VaadinRequest request, + VaadinResponse response, String requestBody) + throws IOException, UnsupportedOperationException { + + if (emptyResponse.contains(session)) { + emptyResponse.remove(session); + return Optional.of(() -> commitEmptyResponse(response)); + } + return super.synchronizedHandleRequest(session, request, response, + requestBody); + } + + private void commitEmptyResponse(VaadinResponse response) + throws IOException { + commitJsonResponse(response, "for(;;);[{}]"); + } +} diff --git a/flow-tests/test-client-queue/src/main/java/com/vaadin/flow/misc/ui/TestNoResponseView.java b/flow-tests/test-client-queue/src/main/java/com/vaadin/flow/misc/ui/TestNoResponseView.java new file mode 100644 index 00000000000..f6c97bc8373 --- /dev/null +++ b/flow-tests/test-client-queue/src/main/java/com/vaadin/flow/misc/ui/TestNoResponseView.java @@ -0,0 +1,47 @@ +/* + * Copyright 2000-2025 Vaadin Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.vaadin.flow.misc.ui; + +import com.vaadin.flow.component.html.Div; +import com.vaadin.flow.component.html.NativeButton; +import com.vaadin.flow.router.Route; +import com.vaadin.flow.server.VaadinSession; + +@Route("no-response") +public class TestNoResponseView extends Div { + + public static final String DELAY_NEXT_RESPONSE = "delay-next"; + public static final String ADD = "add"; + public static final String ADDED_PREDICATE = "added_"; + + private int elements = 0; + + public TestNoResponseView() { + NativeButton delayNext = new NativeButton("\"Delay\" next response", + event -> CustomUidlRequestHandler.emptyResponse + .add(VaadinSession.getCurrent())); + delayNext.setId(DELAY_NEXT_RESPONSE); + + NativeButton addElement = new NativeButton("Add element", event -> { + Div addedElement = new Div("Added element"); + addedElement.setId(ADDED_PREDICATE + elements++); + add(addedElement); + }); + addElement.setId(ADD); + + add(delayNext, addElement); + } +} diff --git a/flow-tests/test-client-queue/src/test/java/com/vaadin/flow/misc/ui/NoResponseIT.java b/flow-tests/test-client-queue/src/test/java/com/vaadin/flow/misc/ui/NoResponseIT.java new file mode 100644 index 00000000000..30eb3a621cf --- /dev/null +++ b/flow-tests/test-client-queue/src/test/java/com/vaadin/flow/misc/ui/NoResponseIT.java @@ -0,0 +1,106 @@ +package com.vaadin.flow.misc.ui; + +import java.util.logging.Level; + +import org.junit.Assert; +import org.junit.Test; +import org.openqa.selenium.TimeoutException; + +import com.vaadin.flow.component.html.testbench.DivElement; +import com.vaadin.flow.component.html.testbench.NativeButtonElement; +import com.vaadin.flow.testutil.ChromeBrowserTest; + +import static com.vaadin.flow.misc.ui.TestNoResponseView.ADD; +import static com.vaadin.flow.misc.ui.TestNoResponseView.ADDED_PREDICATE; +import static com.vaadin.flow.misc.ui.TestNoResponseView.DELAY_NEXT_RESPONSE; + +public class NoResponseIT extends ChromeBrowserTest { + + @Override + protected String getTestPath() { + return "/no-response"; + } + + @Test + public void noResponseForRequest_clientResendsRequest_serverAnswersCorrectly() { + open(); + + try { + waitUntil(driver -> $(NativeButtonElement.class) + .withId(DELAY_NEXT_RESPONSE).exists()); + } catch (TimeoutException te) { + Assert.fail("Expected 'delay next' button wasn't found"); + } + + // Add element normally + $(NativeButtonElement.class).id(ADD).click(); + Assert.assertTrue( + $(DivElement.class).id(ADDED_PREDICATE + 0).isDisplayed()); + + // Request null response for next add + $(NativeButtonElement.class).id(DELAY_NEXT_RESPONSE).click(); + + $(NativeButtonElement.class).id(ADD).click(); + + Assert.assertEquals("No expected empty response found", 1, + getLogEntries(Level.WARNING).stream() + .filter(logEntry -> logEntry.getMessage().contains( + "Response didn't contain a server id.")) + .count()); + + try { + waitUntil(driver -> $(DivElement.class).withId(ADDED_PREDICATE + 1) + .exists()); + } catch (TimeoutException te) { + Assert.fail( + "New element was not added though client should re-send request."); + } + + } + + @Test + public void clickWhileRequestPending_clientQueuesRequests_messagesSentCorrectly() { + open(); + + try { + waitUntil(driver -> $(NativeButtonElement.class) + .withId(DELAY_NEXT_RESPONSE).exists()); + } catch (TimeoutException te) { + Assert.fail("Expected 'delay next' button wasn't found"); + } + + // Add element normally + $(NativeButtonElement.class).id(ADD).click(); + Assert.assertTrue( + $(DivElement.class).id(ADDED_PREDICATE + 0).isDisplayed()); + + // Request null response for next add + $(NativeButtonElement.class).id(DELAY_NEXT_RESPONSE).click(); + + $(NativeButtonElement.class).id(ADD).click(); + $(NativeButtonElement.class).id(ADD).click(); + + Assert.assertEquals("No expected empty response found", 1, + getLogEntries(Level.WARNING).stream() + .filter(logEntry -> logEntry.getMessage().contains( + "Response didn't contain a server id.")) + .count()); + + try { + waitUntil(driver -> $(DivElement.class).withId(ADDED_PREDICATE + 1) + .exists()); + } catch (TimeoutException te) { + Assert.fail( + "New element was not added though client should re-send request."); + } + + try { + waitUntil(driver -> $(DivElement.class).withId(ADDED_PREDICATE + 2) + .exists()); + } catch (TimeoutException te) { + Assert.fail( + "Second new element was not added though client should queue request."); + } + + } +}