From 265925275306abe6d72849fab572e331579e2572 Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Sun, 9 Jun 2024 11:03:13 -0400 Subject: [PATCH 01/20] Create ProxyController.java spotless spotless --- .../googolplex_theater/ProxyController.java | 130 ++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 src/main/java/com/jyuzawa/googolplex_theater/ProxyController.java diff --git a/src/main/java/com/jyuzawa/googolplex_theater/ProxyController.java b/src/main/java/com/jyuzawa/googolplex_theater/ProxyController.java new file mode 100644 index 0000000..006bde0 --- /dev/null +++ b/src/main/java/com/jyuzawa/googolplex_theater/ProxyController.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2024 James Yuzawa (https://www.jyuzawa.com/) + * SPDX-License-Identifier: MIT + */ +package com.jyuzawa.googolplex_theater; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.server.ResponseStatusException; +import reactor.core.publisher.Mono; + +@Slf4j +@RestController +public class ProxyController { + + private static final List BANNED_REQUEST_HEADERS = List.of( + HttpHeaders.HOST, + HttpHeaders.ACCEPT_ENCODING, + HttpHeaders.CONNECTION, + HttpHeaders.REFERER, + HttpHeaders.ORIGIN, + "Upgrade-Insecure-Requests", + "CAST-DEVICE-CAPABILITIES"); + private static final Set BANNED_RESPONSE_HEADERS = Set.of( + HttpHeaders.CONTENT_LENGTH, + HttpHeaders.CONNECTION, + HttpHeaders.TRANSFER_ENCODING, + "Content-Security-Policy", + "X-Frame-Options", + "X-Xss-Protection", + "X-Content-Type-Options", + "Strict-Transport-Security"); + + private final WebClient httpClient; + + @Autowired + public ProxyController() { + this.httpClient = WebClient.builder() + .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)) + .build(); + } + + private static final Pattern PATTERN = Pattern.compile("^/(.*)$"); + + private static final String requestUri(ServerHttpRequest request) { + return request.getURI().getRawPath() + "?" + + defaultIfEmpty(request.getURI().getRawQuery(), ""); + } + + public static T defaultIfEmpty(final T str, final T defaultStr) { + return isEmpty(str) ? defaultStr : str; + } + + public static boolean isEmpty(final CharSequence cs) { + return cs == null || cs.length() == 0; + } + + @RequestMapping("/**") + public Mono> viewer(ServerHttpRequest request, @RequestBody(required = false) byte[] body) { + String uri = requestUri(request); + Matcher matcher = PATTERN.matcher(uri); + if (!matcher.find()) { + throw new ResponseStatusException(HttpStatus.NOT_FOUND, uri); + } + return run("", matcher.group(1), body, request); + } + + public Mono> run(String ref, String path, byte[] body, ServerHttpRequest request) { + URI uri = URI.create(String.format("https://example.com/%s", path)); + HttpHeaders originalHeaders = request.getHeaders(); + if (originalHeaders.containsKey(HttpHeaders.UPGRADE)) { + return Mono.just(ResponseEntity.badRequest() + .contentType(MediaType.TEXT_PLAIN) + .body(Unpooled.copiedBuffer("upgrade not supported", StandardCharsets.UTF_8))); + } + HttpHeaders requestHeaders = new org.springframework.http.HttpHeaders(); + originalHeaders.forEach((k, v) -> { + if (!BANNED_REQUEST_HEADERS.contains(k)) { + requestHeaders.put(k, v); + } + }); + requestHeaders.set("Authorization", "Bearer blah"); + if (body == null) { + body = Unpooled.EMPTY_BUFFER.array(); + } + log.info("REQ " + uri + " " + requestHeaders); + return httpClient + .method(HttpMethod.valueOf(request.getMethod().name())) + .uri(uri) + .headers(h -> h.putAll(requestHeaders)) + .bodyValue(body) + .retrieve() + .toEntity(ByteBuf.class) + .map(response -> ResponseEntity.status(response.getStatusCode()) + .headers(httpHeaders -> { + HttpHeaders responseHeaders = response.getHeaders(); + responseHeaders.forEach((k, v) -> { + if (!BANNED_RESPONSE_HEADERS.contains(k)) { + httpHeaders.put(k, v); + } + }); + log.info("RESP " + response.getStatusCode() + " " + uri + " " + responseHeaders); + }) + .body(response.getBody())) + .onErrorResume(e -> { + log.error(e.toString(), e); + return Mono.just(ResponseEntity.badRequest() + .contentType(MediaType.TEXT_PLAIN) + .body(Unpooled.copiedBuffer(e.toString(), StandardCharsets.UTF_8))); + }); + } +} From fccc6ab034c43238a921adf7cbd9129b0a15cabe Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Sun, 9 Jun 2024 18:11:17 -0400 Subject: [PATCH 02/20] add proxy --- .../DeviceConfigLoader.java | 25 +++- .../googolplex_theater/GoogolplexService.java | 6 +- .../com/jyuzawa/googolplex_theater/Proxy.java | 115 ++++++++++++++++ .../googolplex_theater/ProxyController.java | 130 ------------------ .../googolplex_theater/ProxyProperties.java | 21 +++ .../googolplex_theater/ServiceDiscovery.java | 7 +- .../DeviceConfigLoaderTest.java | 3 +- .../GoogolplexServiceTest.java | 3 +- 8 files changed, 171 insertions(+), 139 deletions(-) create mode 100644 src/main/java/com/jyuzawa/googolplex_theater/Proxy.java delete mode 100644 src/main/java/com/jyuzawa/googolplex_theater/ProxyController.java create mode 100644 src/main/java/com/jyuzawa/googolplex_theater/ProxyProperties.java diff --git a/src/main/java/com/jyuzawa/googolplex_theater/DeviceConfigLoader.java b/src/main/java/com/jyuzawa/googolplex_theater/DeviceConfigLoader.java index 66e540d..d4c378c 100644 --- a/src/main/java/com/jyuzawa/googolplex_theater/DeviceConfigLoader.java +++ b/src/main/java/com/jyuzawa/googolplex_theater/DeviceConfigLoader.java @@ -4,6 +4,10 @@ */ package com.jyuzawa.googolplex_theater; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import com.jyuzawa.googolplex_theater.DeviceConfig.DeviceInfo; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -14,6 +18,8 @@ import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -39,12 +45,15 @@ public final class DeviceConfigLoader implements Closeable { private final Path directoryPath; private WatchService watchService; private final GoogolplexService service; + private final String proxyUrl; @Autowired public DeviceConfigLoader( GoogolplexService service, Path appHome, - @Value("${googolplex-theater.devices-path}") String deviceConfigPath) + @Value("${googolplex-theater.devices-path}") String deviceConfigPath, + ProxyProperties proxyProperties, + ServiceDiscovery serviceDiscovery) throws IOException { this.service = service; this.executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("deviceConfigLoader")); @@ -57,6 +66,7 @@ public DeviceConfigLoader( if (directoryPath == null) { throw new IllegalArgumentException("Path has missing parent"); } + this.proxyUrl = "http://" + serviceDiscovery.getInetAddress().getHostAddress() + ":" + proxyProperties.port; } @PostConstruct @@ -104,7 +114,18 @@ public void start() throws IOException { private void load() throws IOException { log.info("Reloading device config"); try (InputStream stream = Files.newInputStream(path)) { - DeviceConfig out = MapperUtil.YAML_MAPPER.readValue(stream, DeviceConfig.class); + DeviceConfig deviceConfig = MapperUtil.YAML_MAPPER.readValue(stream, DeviceConfig.class); + List out = new ArrayList<>(); + for (DeviceInfo deviceInfo : deviceConfig.getDevices()) { + ObjectNode newSettings = new ObjectNode(MapperUtil.YAML_MAPPER.getNodeFactory()); + newSettings.setAll(deviceInfo.getSettings()); + JsonNode urlNode = newSettings.get("url"); + if (urlNode != null) { + String url = urlNode.asText().replace("${PROXY}", proxyUrl); + newSettings.set("url", new TextNode(url)); + } + out.add(new DeviceInfo(deviceInfo.getName(), newSettings)); + } service.processDeviceConfig(out); } } diff --git a/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexService.java b/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexService.java index 93b9c8b..d1503a7 100644 --- a/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexService.java +++ b/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexService.java @@ -73,12 +73,12 @@ private record Channel(AtomicReference birth, Disposable disposable) {} /** * Load the config and propagate the changes to the any currently connected devices. * - * @param config the settings loaded from the file + * @param deviceInfos the devices loaded from the config file */ - public Future processDeviceConfig(DeviceConfig config) { + public Future processDeviceConfig(List deviceInfos) { return executor.submit(() -> { Set namesToRemove = new HashSet<>(nameToDeviceInfo.keySet()); - for (DeviceInfo deviceInfo : config.getDevices()) { + for (DeviceInfo deviceInfo : deviceInfos) { String name = deviceInfo.getName(); // mark that we should not remove this device namesToRemove.remove(name); diff --git a/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java b/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java new file mode 100644 index 0000000..c60b8d7 --- /dev/null +++ b/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2024 James Yuzawa (https://www.jyuzawa.com/) + * SPDX-License-Identifier: MIT + */ +package com.jyuzawa.googolplex_theater; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.util.ReferenceCountUtil; +import java.time.Duration; +import java.util.List; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.netty.DisposableServer; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.HttpClientResponse; +import reactor.netty.http.server.HttpServer; +import reactor.netty.http.server.HttpServerRequest; +import reactor.netty.http.server.HttpServerResponse; + +@Slf4j +@Component +@ConditionalOnProperty("googolplex-theater.proxy.url") +public class Proxy { + + private static final List BLOCKED_REQUEST_HEADERS = List.of( + HttpHeaderNames.CONNECTION, + HttpHeaderNames.HOST, + HttpHeaderNames.UPGRADE, + HttpHeaderNames.ORIGIN, + HttpHeaderNames.ACCEPT_ENCODING, + "Sec-WebSocket-Version", + "Sec-WebSocket-Key", + "Sec-WebSocket-Extensions"); + + private static final List BLOCKED_RESPONSE_HEADERS = List.of("X-Frame-Options"); + + private final ProxyProperties properties; + private final HttpClient httpClient; + private final HttpServer httpServer; + private DisposableServer disposableServer; + + public Proxy(ProxyProperties properties) { + this.properties = properties; + this.httpClient = HttpClient.create().baseUrl(properties.url); + this.httpServer = HttpServer.create().port(properties.port).handle(this::handle); + } + + @PostConstruct + public void start() { + log.info("Starting proxy on port {}", properties.port); + disposableServer = httpServer.bindNow(); + } + + private Publisher handle(HttpServerRequest req, HttpServerResponse res) { + if (req.requestHeaders().containsValue(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE, true)) { + return handleWebsocket(req, res); + } + return handleHttp(req, res); + } + + private HttpClient configureHttpClient(HttpServerRequest req) { + return httpClient.headers(h -> { + h.set(req.requestHeaders()); + for (CharSequence name : BLOCKED_REQUEST_HEADERS) { + h.remove(name); + } + properties.addRequestHeaders.forEach((k, v) -> h.set(k, v)); + }); + } + + private Publisher handleWebsocket(HttpServerRequest req, HttpServerResponse res) { + return res.sendWebsocket((fromBrowser, toBrowser) -> configureHttpClient(req) + .websocket() + .uri(req.uri()) + .handle((fromServer, toServer) -> Flux.zip( + toBrowser.sendObject(fromServer.receiveFrames().doOnNext(ReferenceCountUtil::retain)), + toServer.sendObject(fromBrowser.receiveFrames().doOnNext(ReferenceCountUtil::retain)))) + .then()); + } + + private Publisher handleHttp(HttpServerRequest req, HttpServerResponse res) { + return configureHttpClient(req) + .request(req.method()) + .uri(req.uri()) + .send(req.receive().doOnNext(ByteBuf::retain)) + .response((r, body) -> + res.status(r.status()).headers(stripResponseHeaders(r)).send(body.doOnNext(ByteBuf::retain))); + } + + private HttpHeaders stripResponseHeaders(HttpClientResponse res) { + HttpHeaders headers = res.responseHeaders(); + for (CharSequence name : properties.removeResponseHeaders) { + headers.remove(name); + } + for (CharSequence name : BLOCKED_RESPONSE_HEADERS) { + headers.remove(name); + } + return headers; + } + + @PreDestroy + public void stop() { + if (disposableServer != null) { + disposableServer.disposeNow(Duration.ofSeconds(10)); + } + } +} diff --git a/src/main/java/com/jyuzawa/googolplex_theater/ProxyController.java b/src/main/java/com/jyuzawa/googolplex_theater/ProxyController.java deleted file mode 100644 index 006bde0..0000000 --- a/src/main/java/com/jyuzawa/googolplex_theater/ProxyController.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright (c) 2024 James Yuzawa (https://www.jyuzawa.com/) - * SPDX-License-Identifier: MIT - */ -package com.jyuzawa.googolplex_theater; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; -import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; -import org.springframework.http.ResponseEntity; -import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.server.ResponseStatusException; -import reactor.core.publisher.Mono; - -@Slf4j -@RestController -public class ProxyController { - - private static final List BANNED_REQUEST_HEADERS = List.of( - HttpHeaders.HOST, - HttpHeaders.ACCEPT_ENCODING, - HttpHeaders.CONNECTION, - HttpHeaders.REFERER, - HttpHeaders.ORIGIN, - "Upgrade-Insecure-Requests", - "CAST-DEVICE-CAPABILITIES"); - private static final Set BANNED_RESPONSE_HEADERS = Set.of( - HttpHeaders.CONTENT_LENGTH, - HttpHeaders.CONNECTION, - HttpHeaders.TRANSFER_ENCODING, - "Content-Security-Policy", - "X-Frame-Options", - "X-Xss-Protection", - "X-Content-Type-Options", - "Strict-Transport-Security"); - - private final WebClient httpClient; - - @Autowired - public ProxyController() { - this.httpClient = WebClient.builder() - .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)) - .build(); - } - - private static final Pattern PATTERN = Pattern.compile("^/(.*)$"); - - private static final String requestUri(ServerHttpRequest request) { - return request.getURI().getRawPath() + "?" - + defaultIfEmpty(request.getURI().getRawQuery(), ""); - } - - public static T defaultIfEmpty(final T str, final T defaultStr) { - return isEmpty(str) ? defaultStr : str; - } - - public static boolean isEmpty(final CharSequence cs) { - return cs == null || cs.length() == 0; - } - - @RequestMapping("/**") - public Mono> viewer(ServerHttpRequest request, @RequestBody(required = false) byte[] body) { - String uri = requestUri(request); - Matcher matcher = PATTERN.matcher(uri); - if (!matcher.find()) { - throw new ResponseStatusException(HttpStatus.NOT_FOUND, uri); - } - return run("", matcher.group(1), body, request); - } - - public Mono> run(String ref, String path, byte[] body, ServerHttpRequest request) { - URI uri = URI.create(String.format("https://example.com/%s", path)); - HttpHeaders originalHeaders = request.getHeaders(); - if (originalHeaders.containsKey(HttpHeaders.UPGRADE)) { - return Mono.just(ResponseEntity.badRequest() - .contentType(MediaType.TEXT_PLAIN) - .body(Unpooled.copiedBuffer("upgrade not supported", StandardCharsets.UTF_8))); - } - HttpHeaders requestHeaders = new org.springframework.http.HttpHeaders(); - originalHeaders.forEach((k, v) -> { - if (!BANNED_REQUEST_HEADERS.contains(k)) { - requestHeaders.put(k, v); - } - }); - requestHeaders.set("Authorization", "Bearer blah"); - if (body == null) { - body = Unpooled.EMPTY_BUFFER.array(); - } - log.info("REQ " + uri + " " + requestHeaders); - return httpClient - .method(HttpMethod.valueOf(request.getMethod().name())) - .uri(uri) - .headers(h -> h.putAll(requestHeaders)) - .bodyValue(body) - .retrieve() - .toEntity(ByteBuf.class) - .map(response -> ResponseEntity.status(response.getStatusCode()) - .headers(httpHeaders -> { - HttpHeaders responseHeaders = response.getHeaders(); - responseHeaders.forEach((k, v) -> { - if (!BANNED_RESPONSE_HEADERS.contains(k)) { - httpHeaders.put(k, v); - } - }); - log.info("RESP " + response.getStatusCode() + " " + uri + " " + responseHeaders); - }) - .body(response.getBody())) - .onErrorResume(e -> { - log.error(e.toString(), e); - return Mono.just(ResponseEntity.badRequest() - .contentType(MediaType.TEXT_PLAIN) - .body(Unpooled.copiedBuffer(e.toString(), StandardCharsets.UTF_8))); - }); - } -} diff --git a/src/main/java/com/jyuzawa/googolplex_theater/ProxyProperties.java b/src/main/java/com/jyuzawa/googolplex_theater/ProxyProperties.java new file mode 100644 index 0000000..ff9a3bf --- /dev/null +++ b/src/main/java/com/jyuzawa/googolplex_theater/ProxyProperties.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2024 James Yuzawa (https://www.jyuzawa.com/) + * SPDX-License-Identifier: MIT + */ +package com.jyuzawa.googolplex_theater; + +import java.util.List; +import java.util.Map; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Setter +@Configuration +@ConfigurationProperties(prefix = "googolplex-theater.proxy") +public class ProxyProperties { + public String url; + public int port = 8081; + public Map addRequestHeaders = Map.of(); + public List removeResponseHeaders = List.of(); +} diff --git a/src/main/java/com/jyuzawa/googolplex_theater/ServiceDiscovery.java b/src/main/java/com/jyuzawa/googolplex_theater/ServiceDiscovery.java index 995cf76..98dad73 100644 --- a/src/main/java/com/jyuzawa/googolplex_theater/ServiceDiscovery.java +++ b/src/main/java/com/jyuzawa/googolplex_theater/ServiceDiscovery.java @@ -17,6 +17,7 @@ import javax.jmdns.ServiceEvent; import javax.jmdns.ServiceInfo; import javax.jmdns.ServiceListener; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -40,6 +41,9 @@ public final class ServiceDiscovery implements Closeable { private final JmDNS mdns; private final boolean advertise; + @Getter + private final InetAddress inetAddress; + @Autowired public ServiceDiscovery( GoogolplexService service, @@ -53,7 +57,8 @@ public ServiceDiscovery( log.warn("No IP address for service discovery found. Falling back to JmDNS library default."); } this.mdns = JmDNS.create(inetAddress); - log.info("Search for casts using {}", mdns.getInetAddress()); + this.inetAddress = mdns.getInetAddress(); + log.info("Search for casts using {}", inetAddress); } @PostConstruct diff --git a/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java b/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java index b6fa24a..d150128 100644 --- a/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java +++ b/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java @@ -58,7 +58,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable { return null; } }); - DeviceConfigLoader loader = new DeviceConfigLoader(controller, conf, devicePath.toString()); + DeviceConfigLoader loader = new DeviceConfigLoader( + controller, conf, devicePath.toString(), new ProxyProperties(), Mockito.mock(ServiceDiscovery.class)); loader.start(); try { DeviceConfig config = queue.take(); diff --git a/src/test/java/com/jyuzawa/googolplex_theater/GoogolplexServiceTest.java b/src/test/java/com/jyuzawa/googolplex_theater/GoogolplexServiceTest.java index ef22234..85938b5 100644 --- a/src/test/java/com/jyuzawa/googolplex_theater/GoogolplexServiceTest.java +++ b/src/test/java/com/jyuzawa/googolplex_theater/GoogolplexServiceTest.java @@ -65,11 +65,10 @@ void test() throws Exception { devices.add(cast2.device()); devices.add(cast3.device()); devices.add(cast4.device()); - DeviceConfig config = new DeviceConfig(devices, null); service.register(cast1.event()).get(); service.register(cast2.event()).get(); Mockito.verify(client, Mockito.never()).connect(Mockito.any(), Mockito.any(), Mockito.any()); - service.processDeviceConfig(config).get(); + service.processDeviceConfig(devices).get(); Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast1.device()), Mockito.any()); Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast2.device()), Mockito.any()); Mockito.verify(client, Mockito.never()).connect(Mockito.any(), Mockito.eq(cast3.device()), Mockito.any()); From b0697a20b68336d74d07620358b1dd4ea3612697 Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Sun, 9 Jun 2024 19:23:32 -0400 Subject: [PATCH 03/20] some docs --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index f7b0eb8..ea27add 100644 --- a/README.md +++ b/README.md @@ -12,8 +12,8 @@ by [@yuzawa-san](https://github.com/yuzawa-san/) [![codecov](https://codecov.io/gh/yuzawa-san/googolplex-theater/branch/develop/graph/badge.svg)](https://codecov.io/gh/yuzawa-san/googolplex-theater) Persistently maintain multiple Chromecast devices on you local network without using your browser. -Ideal for digital signage applications. -Originally developed to display statistics dashboards. +Ideal for digital signage applications: restaurant menus, notice boards. +Originally developed to display statistics dashboards (e.g. Grafana). ![Example](docs/example.jpg) @@ -26,6 +26,7 @@ There is no backing database or database dependencies, rather there is a YAML fi The YAML configuration is conveyed to the receiver application, which by default accepts a URL to display in an IFRAME. The receiver application can be customized easily to suit your needs. The application will try to reconnect if a session is ended for whatever reason. +(Optional) The application has a local HTTP proxy for advanced use cases (adding/removing headers for auth or frame breaking). See [feature files](src/test/resources/features/) for more details. ## Requirements @@ -43,7 +44,7 @@ There are certain requirements for networking which are beyond the realm of this * The [Raspberry Pi](https://en.wikipedia.org/wiki/Raspberry_Pi) is a good, small, and cost-effective computer to use. * The newer models with ARMv8 processors are most desirable. See the [models list](https://en.wikipedia.org/wiki/Raspberry_Pi#Specifications) for more details. Most models introduced after 2016 fulfill these recommendations. * It is not advisable to use older models which use older processor architectures (ARMv6 or ARMv7), specifically the _original_ Raspberry Pi Zero or Zero W. See the linked specifications table in previous item for more details. -* IMPORTANT: URLs must be HTTPS and must not [deny framing](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Frame-Options) This is a limit of using an IFRAME to display content. +* IMPORTANT: URLs must be HTTPS (unless you use an unpublished receiver app) and must not [deny framing](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Frame-Options) This is a limit of using an IFRAME to display content. NOTE: the built-in proxy provides a way to attempt to circumvent this. Development requirements: @@ -195,7 +196,6 @@ This is intended to be minimalist and easy to set up, so advanced features are n ### TODO * Split screen layouts -* Framing proxy (may not be feasible or allowed under HTTPS) ## Related Projects From 286dabd1a11b7af49b0cc59fb31bb8323eb2ba14 Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Sun, 9 Jun 2024 19:46:38 -0400 Subject: [PATCH 04/20] update docs --- README.md | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index ea27add..362284e 100644 --- a/README.md +++ b/README.md @@ -151,21 +151,6 @@ The configuration is defined in `./conf/config.yml` and `./conf/devices.yml`. The file is automatically watched for changes. Some example use cases involve using cron and putting your config under version control and pulling from origin periodically, or downloading from S3/web, or updating using rsync/scp. -### Case Study: Grafana Dashboards - -The maintainer has used this to show statistics dashboards in a software engineering context. - -* Buy a new Raspberry Pi and install the default Raspberry Pi OS (Raspbian). -* Configure and name your Chromecasts. -* Install application Debian package and Java runtime. -* Create one Grafana playlist per device. -* Figure out how to use proper Grafana auth (proxy, token, etc). -* Make your devices.yml file with each playlist url per device. -* Place the devices.yml file under version control (git) or store it someplace accessible (http/s3/gcs). -* Add a cron job to pull the devices.yml file from wherever you stored it (alternatively configure something to push the file to the Raspberry Pi). -* devices.yml is updated periodically as our dashboard needs change. The updates are automatically picked up. -* If a screen needs to be refreshed, one can do so by accessing the web UI exposed port 8080 and hitting a few buttons. - ### Using a Custom Receiver If you wish to customize the behavior of the receiver from just displaying a single URL in an IFRAME, see the example custom receiver in `receiver/custom.html`. @@ -178,6 +163,27 @@ Host your modified file via HTTPS on your hosting provider of choice. Then point There is a property in the `config.yml` to override the receiver application. +### Case Study: Grafana Dashboards + +The maintainer has used this to show statistics dashboards in a software engineering context. + +- Buy a new Raspberry Pi and install the default Raspberry Pi OS (Raspbian). +- Configure and name your Chromecast(s). +- Install application via download or via Debian package (which will likely install Java runtime if it is not already installed). +- Create one Grafana playlist per device. +- Make your devices.yml file with each playlist url per device. Set the rotation and refresh parameters and make sure the kiosk mode is in the query string parameters. +- Figure out how to connect. + - Less secure: Use HTTPS with an IP address allowlist to your location (which must have a static IP) on whatever proxy you may have in front of your Grafana deployment. + - More Secure: Create a Grafana API token with viewer permission and use this application's proxy feature. + - [Sign up as a Chromecast developer](https://developers.google.com/cast/docs/registration#RegisterApp) so you can use the local proxy over HTTP. + - Register your [devices](https://cast.google.com/publish) for development. + - Register a new custom reciever, but do not publish it (that would force it to use HTTPS). Configure this app with the "appId" alphanumeric string. Point the url to `http://my-device.local:8001/receiver.html` or `http://192.168.1.XXX:8001/receiver.html` (assuming you have a static/sticky IP). + - Configure this app's proxy settings pointing at your Grafana root url. + - Add the Grafana token to the proxy settings + - Make your devices.yml have urls like `${PROXY}/path/to/playlist?....` +- If you want to update the devices periodically, place the devices.yml file under version control (git) or store it someplace accessible (http/s3/gcs). Add a cron job to pull the devices.yml file from wherever you stored it (alternatively configure something to push the file to the Raspberry Pi). The updates are automatically picked up. +- If a screen needs to be refreshed, one can do so by accessing the web UI exposed port 8080 and hitting a few buttons. + ### Troubleshooting There may be some issues related to discovering the Chromecast devices on your network. From 65bb23a4eacb12d11abde5475f03c5adc97c8392 Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Sun, 9 Jun 2024 19:50:28 -0400 Subject: [PATCH 05/20] update configs --- src/dist/conf/config.yml | 4 ++++ src/dist/conf/devices.yml | 7 +++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/dist/conf/config.yml b/src/dist/conf/config.yml index 58e91f9..2d5f54f 100644 --- a/src/dist/conf/config.yml +++ b/src/dist/conf/config.yml @@ -13,3 +13,7 @@ # devices-path: conf/devices.yml # preferred-interface: eth0 # advertise: true +# proxy: +# url: https://grafana.mysite.com/ +# add-request-headers: +# Authorization: Bearer my_grafana_token diff --git a/src/dist/conf/devices.yml b/src/dist/conf/devices.yml index 04a107d..ddfc06f 100644 --- a/src/dist/conf/devices.yml +++ b/src/dist/conf/devices.yml @@ -1,5 +1,8 @@ -settings: - refreshSeconds: 180 +# NOTE: these top level settings can be applied to all devices + +#settings: +# refreshSeconds: 180 + devices: - name: device1 settings: From 4450d9c529634a7f62c20a3e651478ea05b8b74e Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Sun, 9 Jun 2024 20:37:34 -0400 Subject: [PATCH 06/20] update device config test --- .../DeviceConfigLoader.java | 6 +++- .../DeviceConfigLoaderTest.java | 36 ++++++++++++------- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/jyuzawa/googolplex_theater/DeviceConfigLoader.java b/src/main/java/com/jyuzawa/googolplex_theater/DeviceConfigLoader.java index d4c378c..b8d0a18 100644 --- a/src/main/java/com/jyuzawa/googolplex_theater/DeviceConfigLoader.java +++ b/src/main/java/com/jyuzawa/googolplex_theater/DeviceConfigLoader.java @@ -8,9 +8,11 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; import com.jyuzawa.googolplex_theater.DeviceConfig.DeviceInfo; +import io.netty.util.NetUtil; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.net.InetSocketAddress; import java.nio.file.ClosedWatchServiceException; import java.nio.file.Files; import java.nio.file.Path; @@ -66,7 +68,9 @@ public DeviceConfigLoader( if (directoryPath == null) { throw new IllegalArgumentException("Path has missing parent"); } - this.proxyUrl = "http://" + serviceDiscovery.getInetAddress().getHostAddress() + ":" + proxyProperties.port; + this.proxyUrl = "http://" + + NetUtil.toSocketAddressString( + new InetSocketAddress(serviceDiscovery.getInetAddress(), proxyProperties.port)); } @PostConstruct diff --git a/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java b/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java index d150128..98bb43f 100644 --- a/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java +++ b/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java @@ -12,12 +12,15 @@ import com.google.common.jimfs.WatchServiceConfiguration; import com.jyuzawa.googolplex_theater.DeviceConfig.DeviceInfo; import io.netty.util.CharsetUtil; +import io.netty.util.NetUtil; import java.io.BufferedWriter; import java.io.IOException; +import java.net.InetAddress; import java.nio.file.FileSystem; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; @@ -29,10 +32,10 @@ class DeviceConfigLoaderTest { private static final String VALUE1 = - "devices:\n - name: NameOfYourDevice2\n settings:\n url: https://example2.com/\n refreshSeconds: 9600"; + "devices:\n - name: NameOfYourDevice2\n settings:\n url: https://example2.com/\n refreshSeconds: 9600\n - name: ProxiedDevice\n settings:\n url: ${PROXY}/foo/bar"; private static final String VALUE2 = - "devices:\n - name: NameOfYourDevice2\n settings:\n url: https://example2.com/updated\n refreshSeconds: 600"; + "devices:\n - name: NameOfYourDevice2\n settings:\n url: https://example2.com/updated\n refreshSeconds: 600\n - name: ProxiedDevice\n settings:\n url: ${PROXY}/foo/bar"; @Test void loaderTest() throws IOException, InterruptedException { @@ -48,37 +51,46 @@ void loaderTest() throws IOException, InterruptedException { devicePath, CharsetUtil.UTF_8, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { bufferedWriter.write(VALUE1); } - BlockingQueue queue = new ArrayBlockingQueue<>(10); + BlockingQueue> queue = new ArrayBlockingQueue<>(10); GoogolplexService controller = Mockito.mock(GoogolplexService.class); Mockito.when(controller.processDeviceConfig(Mockito.any())).then(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { - queue.add(invocation.getArgument(0, DeviceConfig.class)); + List newDevices = invocation.getArgument(0); + queue.add(newDevices); return null; } }); + ServiceDiscovery serviceDiscovery = Mockito.mock(ServiceDiscovery.class); + InetAddress address = InetAddress.getByName("192.168.1.239"); + Mockito.when(serviceDiscovery.getInetAddress()).thenReturn(address); DeviceConfigLoader loader = new DeviceConfigLoader( - controller, conf, devicePath.toString(), new ProxyProperties(), Mockito.mock(ServiceDiscovery.class)); + controller, conf, devicePath.toString(), new ProxyProperties(), serviceDiscovery); loader.start(); try { - DeviceConfig config = queue.take(); - assertEquals(1, config.getDevices().size()); - DeviceInfo device = config.getDevices().get(0); + List devices = queue.take(); + assertEquals(2, devices.size()); + DeviceInfo device = devices.get(0); assertEquals("NameOfYourDevice2", device.getName()); assertEquals( "https://example2.com/", device.getSettings().get("url").asText()); assertEquals(9600, device.getSettings().get("refreshSeconds").asInt()); + device = devices.get(1); + assertEquals("ProxiedDevice", device.getName()); + assertEquals( + "http://" + NetUtil.toAddressString(address) + ":8081/foo/bar", + device.getSettings().get("url").asText()); // see if an update is detected try (BufferedWriter bufferedWriter = Files.newBufferedWriter(devicePath, CharsetUtil.UTF_8, StandardOpenOption.WRITE)) { bufferedWriter.write(VALUE2); } - config = queue.poll(1, TimeUnit.MINUTES); - assertNotNull(config); - assertEquals(1, config.getDevices().size()); - device = config.getDevices().get(0); + devices = queue.poll(1, TimeUnit.MINUTES); + assertNotNull(devices); + assertEquals(2, devices.size()); + device = devices.get(0); assertEquals("NameOfYourDevice2", device.getName()); assertEquals( "https://example2.com/updated", From 9feea539a098b2cd13273c02b3f22ef8dbf1be35 Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Sun, 9 Jun 2024 20:43:08 -0400 Subject: [PATCH 07/20] clean up test --- .../googolplex_theater/DeviceConfigLoaderTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java b/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java index 98bb43f..22fd1ca 100644 --- a/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java +++ b/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java @@ -63,10 +63,12 @@ public Void answer(InvocationOnMock invocation) throws Throwable { } }); ServiceDiscovery serviceDiscovery = Mockito.mock(ServiceDiscovery.class); - InetAddress address = InetAddress.getByName("192.168.1.239"); + String ipAddress = "192.168.1.239"; + InetAddress address = InetAddress.getByName(ipAddress); Mockito.when(serviceDiscovery.getInetAddress()).thenReturn(address); + ProxyProperties proxyProperties = new ProxyProperties(); DeviceConfigLoader loader = new DeviceConfigLoader( - controller, conf, devicePath.toString(), new ProxyProperties(), serviceDiscovery); + controller, conf, devicePath.toString(), proxyProperties, serviceDiscovery); loader.start(); try { List devices = queue.take(); @@ -79,7 +81,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { device = devices.get(1); assertEquals("ProxiedDevice", device.getName()); assertEquals( - "http://" + NetUtil.toAddressString(address) + ":8081/foo/bar", + "http://" + ipAddress + ":"+proxyProperties.port+"/foo/bar", device.getSettings().get("url").asText()); // see if an update is detected From fe92329c8fdd7f59e20433f37bc3bdebd5e7933e Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Sun, 9 Jun 2024 20:46:42 -0400 Subject: [PATCH 08/20] spotless --- .../jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java b/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java index 22fd1ca..d018ad6 100644 --- a/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java +++ b/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java @@ -12,7 +12,6 @@ import com.google.common.jimfs.WatchServiceConfiguration; import com.jyuzawa.googolplex_theater.DeviceConfig.DeviceInfo; import io.netty.util.CharsetUtil; -import io.netty.util.NetUtil; import java.io.BufferedWriter; import java.io.IOException; import java.net.InetAddress; @@ -67,8 +66,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable { InetAddress address = InetAddress.getByName(ipAddress); Mockito.when(serviceDiscovery.getInetAddress()).thenReturn(address); ProxyProperties proxyProperties = new ProxyProperties(); - DeviceConfigLoader loader = new DeviceConfigLoader( - controller, conf, devicePath.toString(), proxyProperties, serviceDiscovery); + DeviceConfigLoader loader = + new DeviceConfigLoader(controller, conf, devicePath.toString(), proxyProperties, serviceDiscovery); loader.start(); try { List devices = queue.take(); @@ -81,7 +80,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { device = devices.get(1); assertEquals("ProxiedDevice", device.getName()); assertEquals( - "http://" + ipAddress + ":"+proxyProperties.port+"/foo/bar", + "http://" + ipAddress + ":" + proxyProperties.port + "/foo/bar", device.getSettings().get("url").asText()); // see if an update is detected From 58d75a45379df8bc5b712a8baaf07324facc9535 Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Sun, 9 Jun 2024 20:50:06 -0400 Subject: [PATCH 09/20] add more example --- src/dist/conf/config.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/dist/conf/config.yml b/src/dist/conf/config.yml index 2d5f54f..f34a156 100644 --- a/src/dist/conf/config.yml +++ b/src/dist/conf/config.yml @@ -17,3 +17,5 @@ # url: https://grafana.mysite.com/ # add-request-headers: # Authorization: Bearer my_grafana_token +# remove-response-headers: +# - My-Bad-Header From 9da5ae52c9b6d6952fed28494f988432f4866dbf Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Sun, 9 Jun 2024 22:17:47 -0400 Subject: [PATCH 10/20] proxy test --- .../com/jyuzawa/googolplex_theater/Proxy.java | 12 ++- .../jyuzawa/googolplex_theater/ProxyTest.java | 91 +++++++++++++++++++ 2 files changed, 98 insertions(+), 5 deletions(-) create mode 100644 src/test/java/com/jyuzawa/googolplex_theater/ProxyTest.java diff --git a/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java b/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java index c60b8d7..5bd29d7 100644 --- a/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java +++ b/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java @@ -17,7 +17,7 @@ import org.reactivestreams.Publisher; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.netty.DisposableServer; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClientResponse; @@ -80,10 +80,12 @@ private Publisher handleWebsocket(HttpServerRequest req, HttpServerRespons return res.sendWebsocket((fromBrowser, toBrowser) -> configureHttpClient(req) .websocket() .uri(req.uri()) - .handle((fromServer, toServer) -> Flux.zip( - toBrowser.sendObject(fromServer.receiveFrames().doOnNext(ReferenceCountUtil::retain)), - toServer.sendObject(fromBrowser.receiveFrames().doOnNext(ReferenceCountUtil::retain)))) - .then()); + .handle((fromServer, toServer) -> Mono.when( + toBrowser + .sendObject(fromServer.receiveFrames().doOnNext(ReferenceCountUtil::retain)) + .then(), + toServer.sendObject(fromBrowser.receiveFrames().doOnNext(ReferenceCountUtil::retain))) + .then())); } private Publisher handleHttp(HttpServerRequest req, HttpServerResponse res) { diff --git a/src/test/java/com/jyuzawa/googolplex_theater/ProxyTest.java b/src/test/java/com/jyuzawa/googolplex_theater/ProxyTest.java new file mode 100644 index 0000000..d810b1b --- /dev/null +++ b/src/test/java/com/jyuzawa/googolplex_theater/ProxyTest.java @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2024 James Yuzawa (https://www.jyuzawa.com/) + * SPDX-License-Identifier: MIT + */ +package com.jyuzawa.googolplex_theater; + +import static org.junit.jupiter.api.Assertions.*; + +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.util.ReferenceCountUtil; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.netty.DisposableServer; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.server.HttpServer; + +class ProxyTest { + + private static DisposableServer server; + private static Proxy proxy; + + @BeforeAll + static void start() { + server = HttpServer.create() + .port(8082) + .route(routes -> routes.get( + "/hello", (request, response) -> response.sendString(Mono.just("Hello World!"))) + .post( + "/echo", + (request, response) -> + response.send(request.receive().retain())) + .ws( + "/ws", + (wsInbound, wsOutbound) -> wsOutbound.sendObject( + wsInbound.receiveFrames().doOnNext(ReferenceCountUtil::retain)))) + .bindNow(); + ProxyProperties proxyProperties = new ProxyProperties(); + proxyProperties.url = "http://localhost:8082/"; + proxyProperties.port = 8081; + proxy = new Proxy(proxyProperties); + proxy.start(); + } + + @AfterAll + static void stop() { + proxy.stop(); + server.disposeNow(Duration.ofSeconds(10)); + } + + @Test + void test() { + HttpClient httpClient = HttpClient.create().baseUrl("http://localhost:8081/"); + assertEquals( + "Hello World!", + httpClient + .get() + .uri("/hello") + .responseContent() + .aggregate() + .asString() + .block(Duration.ofSeconds(10))); + assertEquals( + "payload", + httpClient + .post() + .uri("/echo") + .send(Mono.just(Unpooled.copiedBuffer("payload", StandardCharsets.UTF_8))) + .responseContent() + .aggregate() + .asString() + .block(Duration.ofSeconds(10))); + assertEquals( + "wspayload", + httpClient + .websocket() + .uri("/ws") + .handle((in, out) -> out.sendObject(new TextWebSocketFrame( + Unpooled.copiedBuffer("wspayload", StandardCharsets.UTF_8))) + .then() + .thenMany(in.receiveFrames() + .cast(TextWebSocketFrame.class) + .map(TextWebSocketFrame::text) + .doOnNext(System.out::println))) + .blockFirst(Duration.ofSeconds(10))); + } +} From 7fc540237b530c34afc7b4e1b6644370acc40cac Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Sun, 9 Jun 2024 22:36:20 -0400 Subject: [PATCH 11/20] Update ProxyTest.java --- .../jyuzawa/googolplex_theater/ProxyTest.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/test/java/com/jyuzawa/googolplex_theater/ProxyTest.java b/src/test/java/com/jyuzawa/googolplex_theater/ProxyTest.java index d810b1b..57c6e77 100644 --- a/src/test/java/com/jyuzawa/googolplex_theater/ProxyTest.java +++ b/src/test/java/com/jyuzawa/googolplex_theater/ProxyTest.java @@ -11,9 +11,11 @@ import io.netty.util.ReferenceCountUtil; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.List; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.DisposableServer; import reactor.netty.http.client.HttpClient; @@ -74,18 +76,27 @@ void test() { .aggregate() .asString() .block(Duration.ofSeconds(10))); + List payloads = List.of("foo", "bar", "baz"); assertEquals( - "wspayload", + payloads, httpClient .websocket() .uri("/ws") - .handle((in, out) -> out.sendObject(new TextWebSocketFrame( - Unpooled.copiedBuffer("wspayload", StandardCharsets.UTF_8))) + .handle((in, out) -> out.sendObject(Flux.fromIterable(payloads) + .map(payload -> new TextWebSocketFrame( + Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8)))) .then() .thenMany(in.receiveFrames() .cast(TextWebSocketFrame.class) - .map(TextWebSocketFrame::text) + .flatMap(f -> { + String value = f.text(); + if (value.equals(payloads.get(payloads.size() - 1))) { + return out.sendClose().thenReturn(value); + } + return Mono.just(value); + }) .doOnNext(System.out::println))) - .blockFirst(Duration.ofSeconds(10))); + .collectList() + .block(Duration.ofSeconds(10))); } } From 00f725ef6904886044465e6d475ce47d90b05c8d Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Sun, 9 Jun 2024 22:39:00 -0400 Subject: [PATCH 12/20] use take --- .../java/com/jyuzawa/googolplex_theater/ProxyTest.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/test/java/com/jyuzawa/googolplex_theater/ProxyTest.java b/src/test/java/com/jyuzawa/googolplex_theater/ProxyTest.java index 57c6e77..c18b54b 100644 --- a/src/test/java/com/jyuzawa/googolplex_theater/ProxyTest.java +++ b/src/test/java/com/jyuzawa/googolplex_theater/ProxyTest.java @@ -87,15 +87,9 @@ void test() { Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8)))) .then() .thenMany(in.receiveFrames() + .take(payloads.size()) .cast(TextWebSocketFrame.class) - .flatMap(f -> { - String value = f.text(); - if (value.equals(payloads.get(payloads.size() - 1))) { - return out.sendClose().thenReturn(value); - } - return Mono.just(value); - }) - .doOnNext(System.out::println))) + .map(TextWebSocketFrame::text))) .collectList() .block(Duration.ofSeconds(10))); } From c22869b07dbdb5d7651ca2682c91f119d2cfefff Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Sun, 9 Jun 2024 22:46:20 -0400 Subject: [PATCH 13/20] add start message --- src/main/java/com/jyuzawa/googolplex_theater/Proxy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java b/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java index 5bd29d7..3c45593 100644 --- a/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java +++ b/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java @@ -55,7 +55,7 @@ public Proxy(ProxyProperties properties) { @PostConstruct public void start() { - log.info("Starting proxy on port {}", properties.port); + log.info("Starting proxy for {} on port {}", properties.url, properties.port); disposableServer = httpServer.bindNow(); } From d791cd0a5fb114b8f1f5dbcaee823e7371c38282 Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Mon, 10 Jun 2024 08:29:45 -0400 Subject: [PATCH 14/20] add nginx to docs --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 362284e..40c60da 100644 --- a/README.md +++ b/README.md @@ -174,6 +174,7 @@ The maintainer has used this to show statistics dashboards in a software enginee - Make your devices.yml file with each playlist url per device. Set the rotation and refresh parameters and make sure the kiosk mode is in the query string parameters. - Figure out how to connect. - Less secure: Use HTTPS with an IP address allowlist to your location (which must have a static IP) on whatever proxy you may have in front of your Grafana deployment. + - Medium Secure: Create a Grafana API token with viewer permission and run nginx or other proxy locally with a real SSL certificate, add Authorization header. - More Secure: Create a Grafana API token with viewer permission and use this application's proxy feature. - [Sign up as a Chromecast developer](https://developers.google.com/cast/docs/registration#RegisterApp) so you can use the local proxy over HTTP. - Register your [devices](https://cast.google.com/publish) for development. From 5a89f88d32ed9d8d18f2c9f69d4602944cc6cbbd Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Mon, 10 Jun 2024 08:33:12 -0400 Subject: [PATCH 15/20] use schedulers --- .../DeviceConfigLoader.java | 21 ++-- .../googolplex_theater/GoogolplexService.java | 118 +++++++++--------- .../GoogolplexServiceTest.java | 16 +-- 3 files changed, 73 insertions(+), 82 deletions(-) diff --git a/src/main/java/com/jyuzawa/googolplex_theater/DeviceConfigLoader.java b/src/main/java/com/jyuzawa/googolplex_theater/DeviceConfigLoader.java index b8d0a18..5bad8d8 100644 --- a/src/main/java/com/jyuzawa/googolplex_theater/DeviceConfigLoader.java +++ b/src/main/java/com/jyuzawa/googolplex_theater/DeviceConfigLoader.java @@ -20,17 +20,16 @@ import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; +import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; -import javax.jmdns.impl.util.NamedThreadFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; /** * This class loads the device config at start and watches the files for subsequent changes. The @@ -42,7 +41,7 @@ @Component public final class DeviceConfigLoader implements Closeable { - private final ExecutorService executor; + private final Scheduler executor; private final Path path; private final Path directoryPath; private WatchService watchService; @@ -58,7 +57,8 @@ public DeviceConfigLoader( ServiceDiscovery serviceDiscovery) throws IOException { this.service = service; - this.executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("deviceConfigLoader")); + this.executor = Schedulers.newSingle("deviceConfigLoader"); + this.path = appHome.resolve(deviceConfigPath).toAbsolutePath(); log.info("Using device config: {}", path); if (!Files.isRegularFile(path)) { @@ -78,7 +78,7 @@ public void start() throws IOException { load(); this.watchService = path.getFileSystem().newWatchService(); directoryPath.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY); - executor.submit(() -> { + executor.schedule(() -> { try { WatchKey key; // this blocks until the system notifies us of any changes. @@ -139,11 +139,6 @@ public void close() throws IOException { if (watchService != null) { watchService.close(); } - executor.shutdown(); - try { - executor.awaitTermination(1, TimeUnit.MINUTES); - } catch (InterruptedException e) { - // pass - } + executor.disposeGracefully().block(Duration.ofSeconds(10)); } } diff --git a/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexService.java b/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexService.java index d1503a7..3b9aa4a 100644 --- a/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexService.java +++ b/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexService.java @@ -19,18 +19,15 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.jmdns.ServiceEvent; import javax.jmdns.ServiceInfo; -import javax.jmdns.impl.util.NamedThreadFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.Disposable; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; /** * This class represents the state of the application. All modifications to state occur in the same @@ -56,7 +53,7 @@ public class GoogolplexService implements Closeable { private final Map nameToDeviceInfo; private final Map nameToAddress; private final Map nameToChannel; - private final ExecutorService executor; + private final Scheduler executor; @Autowired public GoogolplexService(GoogolplexClient client) { @@ -65,7 +62,7 @@ public GoogolplexService(GoogolplexClient client) { this.nameToDeviceInfo = new ConcurrentHashMap<>(); this.nameToAddress = new ConcurrentHashMap<>(); this.nameToChannel = new ConcurrentHashMap<>(); - this.executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("controller")); + this.executor = Schedulers.newSingle("controller"); } private record Channel(AtomicReference birth, Disposable disposable) {} @@ -75,28 +72,30 @@ private record Channel(AtomicReference birth, Disposable disposable) {} * * @param deviceInfos the devices loaded from the config file */ - public Future processDeviceConfig(List deviceInfos) { - return executor.submit(() -> { - Set namesToRemove = new HashSet<>(nameToDeviceInfo.keySet()); - for (DeviceInfo deviceInfo : deviceInfos) { - String name = deviceInfo.getName(); - // mark that we should not remove this device - namesToRemove.remove(name); - DeviceInfo oldDeviceInfo = nameToDeviceInfo.get(name); - // ignore unchanged devices - if (!deviceInfo.equals(oldDeviceInfo)) { - log.info("CONFIG_UPDATED '{}'", name); - nameToDeviceInfo.put(name, deviceInfo); - apply(name); - } - } - // remove devices that were missing in the new config - for (String name : namesToRemove) { - log.info("CONFIG_REMOVED '{}'", name); - nameToDeviceInfo.remove(name); + public Disposable processDeviceConfig(List deviceInfos) { + return executor.schedule(() -> processDeviceConfig0(deviceInfos)); + } + + void processDeviceConfig0(List deviceInfos) { + Set namesToRemove = new HashSet<>(nameToDeviceInfo.keySet()); + for (DeviceInfo deviceInfo : deviceInfos) { + String name = deviceInfo.getName(); + // mark that we should not remove this device + namesToRemove.remove(name); + DeviceInfo oldDeviceInfo = nameToDeviceInfo.get(name); + // ignore unchanged devices + if (!deviceInfo.equals(oldDeviceInfo)) { + log.info("CONFIG_UPDATED '{}'", name); + nameToDeviceInfo.put(name, deviceInfo); apply(name); } - }); + } + // remove devices that were missing in the new config + for (String name : namesToRemove) { + log.info("CONFIG_REMOVED '{}'", name); + nameToDeviceInfo.remove(name); + apply(name); + } } /** @@ -105,34 +104,36 @@ public Future processDeviceConfig(List deviceInfos) { * * @param event mdns info */ - public Future register(ServiceEvent event) { - return executor.submit(() -> { - // the device information may not be full - ServiceInfo info = event.getInfo(); - String name = info.getPropertyString("fn"); - if (name == null) { - log.debug("Found unnamed cast:\n{}", info); - return; - } - InetAddress[] addresses = info.getInetAddresses(); - if (addresses == null || addresses.length == 0) { - log.debug("Found unaddressable cast:\n{}", info); - return; - } + public Disposable register(ServiceEvent event) { + return executor.schedule(() -> register0(event)); + } + + void register0(ServiceEvent event) { + // the device information may not be full + ServiceInfo info = event.getInfo(); + String name = info.getPropertyString("fn"); + if (name == null) { + log.debug("Found unnamed cast:\n{}", info); + return; + } + InetAddress[] addresses = info.getInetAddresses(); + if (addresses == null || addresses.length == 0) { + log.debug("Found unaddressable cast:\n{}", info); + return; + } + /* + * we choose the first address. there should usually be just one. the mdns library returns ipv4 + * addresses before ipv6. + */ + InetSocketAddress address = new InetSocketAddress(addresses[0], info.getPort()); + InetSocketAddress oldAddress = nameToAddress.put(name, address); + if (!address.equals(oldAddress)) { /* - * we choose the first address. there should usually be just one. the mdns library returns ipv4 - * addresses before ipv6. + * this is a newly discovered device, or an existing device whose address was updated. */ - InetSocketAddress address = new InetSocketAddress(addresses[0], info.getPort()); - InetSocketAddress oldAddress = nameToAddress.put(name, address); - if (!address.equals(oldAddress)) { - /* - * this is a newly discovered device, or an existing device whose address was updated. - */ - log.info("REGISTER '{}' {}", name, address); - apply(name); - } - }); + log.info("REGISTER '{}' {}", name, address); + apply(name); + } } /** @@ -168,8 +169,8 @@ private void apply(String name) { * * @param name the device to refresh */ - public Future refresh(String name) { - return executor.submit(() -> { + public Disposable refresh(String name) { + return executor.schedule(() -> { // closing channels will cause them to reconnect if (name == null) { // close all channels @@ -217,12 +218,7 @@ public List getDeviceInfo() { public void close() { nameToDeviceInfo.clear(); refresh(null); - executor.shutdown(); - try { - executor.awaitTermination(1, TimeUnit.MINUTES); - } catch (InterruptedException e) { - // pass - } + executor.disposeGracefully().block(Duration.ofSeconds(10)); } /** diff --git a/src/test/java/com/jyuzawa/googolplex_theater/GoogolplexServiceTest.java b/src/test/java/com/jyuzawa/googolplex_theater/GoogolplexServiceTest.java index 85938b5..9561b19 100644 --- a/src/test/java/com/jyuzawa/googolplex_theater/GoogolplexServiceTest.java +++ b/src/test/java/com/jyuzawa/googolplex_theater/GoogolplexServiceTest.java @@ -65,31 +65,31 @@ void test() throws Exception { devices.add(cast2.device()); devices.add(cast3.device()); devices.add(cast4.device()); - service.register(cast1.event()).get(); - service.register(cast2.event()).get(); + service.register0(cast1.event()); + service.register0(cast2.event()); Mockito.verify(client, Mockito.never()).connect(Mockito.any(), Mockito.any(), Mockito.any()); - service.processDeviceConfig(devices).get(); + service.processDeviceConfig0(devices); Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast1.device()), Mockito.any()); Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast2.device()), Mockito.any()); Mockito.verify(client, Mockito.never()).connect(Mockito.any(), Mockito.eq(cast3.device()), Mockito.any()); Mockito.verify(client, Mockito.never()).connect(Mockito.any(), Mockito.eq(cast4.device()), Mockito.any()); - service.register(cast3.event()).get(); - service.register(cast4.event()).get(); + service.register(cast3.event()); + service.register(cast4.event()); Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast3.device()), Mockito.any()); Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast4.device()), Mockito.any()); - service.register(FakeCast.event(9005, "UnknownCast")).get(); + service.register(FakeCast.event(9005, "UnknownCast")); ServiceEvent noName = Mockito.mock(ServiceEvent.class); ServiceInfo noNameInfo = Mockito.mock(ServiceInfo.class); Mockito.when(noName.getInfo()).thenReturn(noNameInfo); Mockito.when(noNameInfo.getPropertyString(Mockito.anyString())).thenReturn(null); - service.register(noName).get(); + service.register(noName); ServiceEvent noAddr = Mockito.mock(ServiceEvent.class); Mockito.when(noAddr.getName()).thenReturn("Chromecast-NOIP.local"); ServiceInfo noAddrInfo = Mockito.mock(ServiceInfo.class); Mockito.when(noAddr.getInfo()).thenReturn(noAddrInfo); Mockito.when(noAddrInfo.getPropertyString(Mockito.anyString())).thenReturn("NOIP"); Mockito.when(noAddrInfo.getInetAddresses()).thenReturn(new InetAddress[] {}); - service.register(noAddr).get(); + service.register(noAddr); List deviceInfos = service.getDeviceInfo(); Set configureds = getConfigureds(deviceInfos); From 39ea8cf25ca22f5a08524f363d631f958f3e08c7 Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Tue, 11 Jun 2024 18:19:51 -0400 Subject: [PATCH 16/20] enable logs --- src/dist/conf/config.yml | 4 +++- .../com/jyuzawa/googolplex_theater/GoogolplexTheater.java | 6 ++++++ src/main/java/com/jyuzawa/googolplex_theater/Proxy.java | 3 ++- .../com/jyuzawa/googolplex_theater/ProxyProperties.java | 1 + src/main/resources/application.yml | 1 + 5 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/dist/conf/config.yml b/src/dist/conf/config.yml index f34a156..1a68c8b 100644 --- a/src/dist/conf/config.yml +++ b/src/dist/conf/config.yml @@ -12,9 +12,11 @@ # retry-interval: 15s # devices-path: conf/devices.yml # preferred-interface: eth0 -# advertise: true +# advertise: false +# server-log: true # proxy: # url: https://grafana.mysite.com/ +# log: false # add-request-headers: # Authorization: Bearer my_grafana_token # remove-response-headers: diff --git a/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexTheater.java b/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexTheater.java index 7c7a809..d33b3c4 100644 --- a/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexTheater.java +++ b/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexTheater.java @@ -11,6 +11,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.system.ApplicationHome; +import org.springframework.boot.web.embedded.netty.NettyServerCustomizer; import org.springframework.context.annotation.Bean; /** @@ -26,6 +27,11 @@ public Path appHome(@Value("${googolplex-theater.app-home}") Path appHome) { return appHome; } + @Bean + public NettyServerCustomizer nettyServerCustomizer(@Value("${googolplex-theater.server-log}") boolean enabled) { + return s -> s.accessLog(enabled); + } + public static void main(String[] args) throws Exception { Path appHome = Paths.get("src/dist").toAbsolutePath(); ApplicationHome home = new ApplicationHome(GoogolplexTheater.class); diff --git a/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java b/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java index 3c45593..fde8171 100644 --- a/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java +++ b/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java @@ -50,7 +50,8 @@ public class Proxy { public Proxy(ProxyProperties properties) { this.properties = properties; this.httpClient = HttpClient.create().baseUrl(properties.url); - this.httpServer = HttpServer.create().port(properties.port).handle(this::handle); + this.httpServer = + HttpServer.create().accessLog(properties.log).port(properties.port).handle(this::handle); } @PostConstruct diff --git a/src/main/java/com/jyuzawa/googolplex_theater/ProxyProperties.java b/src/main/java/com/jyuzawa/googolplex_theater/ProxyProperties.java index ff9a3bf..499f9bc 100644 --- a/src/main/java/com/jyuzawa/googolplex_theater/ProxyProperties.java +++ b/src/main/java/com/jyuzawa/googolplex_theater/ProxyProperties.java @@ -16,6 +16,7 @@ public class ProxyProperties { public String url; public int port = 8081; + public boolean log; public Map addRequestHeaders = Map.of(); public List removeResponseHeaders = List.of(); } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 393e87c..b9b8046 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -7,6 +7,7 @@ googolplex-theater: heartbeat-timeout: 30s retry-interval: 15s devices-path: conf/devices.yml + server-log: true --- spring: config: From 7000c690d00067a2dec04031a17427beb4dfc7d0 Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Tue, 11 Jun 2024 18:36:09 -0400 Subject: [PATCH 17/20] update proxy config --- README.md | 6 ++--- src/dist/conf/devices.yml | 5 +++- .../DeviceConfigLoader.java | 24 ++++++++++++------- .../com/jyuzawa/googolplex_theater/Proxy.java | 6 +++-- .../DeviceConfigLoaderTest.java | 4 ++-- 5 files changed, 28 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 40c60da..49a7ecc 100644 --- a/README.md +++ b/README.md @@ -178,10 +178,10 @@ The maintainer has used this to show statistics dashboards in a software enginee - More Secure: Create a Grafana API token with viewer permission and use this application's proxy feature. - [Sign up as a Chromecast developer](https://developers.google.com/cast/docs/registration#RegisterApp) so you can use the local proxy over HTTP. - Register your [devices](https://cast.google.com/publish) for development. - - Register a new custom reciever, but do not publish it (that would force it to use HTTPS). Configure this app with the "appId" alphanumeric string. Point the url to `http://my-device.local:8001/receiver.html` or `http://192.168.1.XXX:8001/receiver.html` (assuming you have a static/sticky IP). + - Register a new custom reciever, but do not publish it (that would force it to use HTTPS). Configure this app with the "appId" alphanumeric string. Point the url to your _static_ private IP `http://192.168.1.XXX:8001/receiver.html`. - Configure this app's proxy settings pointing at your Grafana root url. - - Add the Grafana token to the proxy settings - - Make your devices.yml have urls like `${PROXY}/path/to/playlist?....` + - Add the Grafana token to the proxy "add header" settings + - Make your devices.yml have `proxyPath: /path/to/playlist?....` - If you want to update the devices periodically, place the devices.yml file under version control (git) or store it someplace accessible (http/s3/gcs). Add a cron job to pull the devices.yml file from wherever you stored it (alternatively configure something to push the file to the Raspberry Pi). The updates are automatically picked up. - If a screen needs to be refreshed, one can do so by accessing the web UI exposed port 8080 and hitting a few buttons. diff --git a/src/dist/conf/devices.yml b/src/dist/conf/devices.yml index ddfc06f..9b1fc04 100644 --- a/src/dist/conf/devices.yml +++ b/src/dist/conf/devices.yml @@ -10,4 +10,7 @@ devices: - name: device2 settings: url: https://example.com/ - refreshSeconds: 1800 \ No newline at end of file + refreshSeconds: 1800 +# - name: device3 +# settings: +# proxyPath: /path/to/be/proxied diff --git a/src/main/java/com/jyuzawa/googolplex_theater/DeviceConfigLoader.java b/src/main/java/com/jyuzawa/googolplex_theater/DeviceConfigLoader.java index 5bad8d8..31fe5d6 100644 --- a/src/main/java/com/jyuzawa/googolplex_theater/DeviceConfigLoader.java +++ b/src/main/java/com/jyuzawa/googolplex_theater/DeviceConfigLoader.java @@ -13,6 +13,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; +import java.net.URI; import java.nio.file.ClosedWatchServiceException; import java.nio.file.Files; import java.nio.file.Path; @@ -46,7 +47,7 @@ public final class DeviceConfigLoader implements Closeable { private final Path directoryPath; private WatchService watchService; private final GoogolplexService service; - private final String proxyUrl; + private final URI proxyUri; @Autowired public DeviceConfigLoader( @@ -68,9 +69,9 @@ public DeviceConfigLoader( if (directoryPath == null) { throw new IllegalArgumentException("Path has missing parent"); } - this.proxyUrl = "http://" + this.proxyUri = URI.create("http://" + NetUtil.toSocketAddressString( - new InetSocketAddress(serviceDiscovery.getInetAddress(), proxyProperties.port)); + new InetSocketAddress(serviceDiscovery.getInetAddress(), proxyProperties.port))); } @PostConstruct @@ -121,14 +122,19 @@ private void load() throws IOException { DeviceConfig deviceConfig = MapperUtil.YAML_MAPPER.readValue(stream, DeviceConfig.class); List out = new ArrayList<>(); for (DeviceInfo deviceInfo : deviceConfig.getDevices()) { - ObjectNode newSettings = new ObjectNode(MapperUtil.YAML_MAPPER.getNodeFactory()); - newSettings.setAll(deviceInfo.getSettings()); - JsonNode urlNode = newSettings.get("url"); - if (urlNode != null) { - String url = urlNode.asText().replace("${PROXY}", proxyUrl); + ObjectNode settings = deviceInfo.getSettings(); + JsonNode proxyPathNode = settings.get("proxyPath"); + if (proxyPathNode != null) { + ObjectNode newSettings = new ObjectNode(MapperUtil.YAML_MAPPER.getNodeFactory()); + newSettings.setAll(settings); + String url = + proxyUri.resolve(URI.create(proxyPathNode.asText())).toString(); newSettings.set("url", new TextNode(url)); + newSettings.remove("proxyPath"); + out.add(new DeviceInfo(deviceInfo.getName(), newSettings)); + } else { + out.add(deviceInfo); } - out.add(new DeviceInfo(deviceInfo.getName(), newSettings)); } service.processDeviceConfig(out); } diff --git a/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java b/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java index fde8171..d24c71f 100644 --- a/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java +++ b/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java @@ -50,8 +50,10 @@ public class Proxy { public Proxy(ProxyProperties properties) { this.properties = properties; this.httpClient = HttpClient.create().baseUrl(properties.url); - this.httpServer = - HttpServer.create().accessLog(properties.log).port(properties.port).handle(this::handle); + this.httpServer = HttpServer.create() + .accessLog(properties.log) + .port(properties.port) + .handle(this::handle); } @PostConstruct diff --git a/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java b/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java index d018ad6..5855a41 100644 --- a/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java +++ b/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java @@ -31,10 +31,10 @@ class DeviceConfigLoaderTest { private static final String VALUE1 = - "devices:\n - name: NameOfYourDevice2\n settings:\n url: https://example2.com/\n refreshSeconds: 9600\n - name: ProxiedDevice\n settings:\n url: ${PROXY}/foo/bar"; + "devices:\n - name: NameOfYourDevice2\n settings:\n url: https://example2.com/\n refreshSeconds: 9600\n - name: ProxiedDevice\n settings:\n proxyPath: /foo/bar"; private static final String VALUE2 = - "devices:\n - name: NameOfYourDevice2\n settings:\n url: https://example2.com/updated\n refreshSeconds: 600\n - name: ProxiedDevice\n settings:\n url: ${PROXY}/foo/bar"; + "devices:\n - name: NameOfYourDevice2\n settings:\n url: https://example2.com/updated\n refreshSeconds: 600\n - name: ProxiedDevice\n settings:\n proxyPath: /foo/bar"; @Test void loaderTest() throws IOException, InterruptedException { From c0f867876b58aa00e9da109b77aed5d17ec31b92 Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Tue, 11 Jun 2024 19:02:17 -0400 Subject: [PATCH 18/20] reuse loop resources --- .../jyuzawa/googolplex_theater/GoogolplexClient.java | 5 ++++- .../java/com/jyuzawa/googolplex_theater/Proxy.java | 10 ++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexClient.java b/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexClient.java index 992b004..a1d6587 100644 --- a/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexClient.java +++ b/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexClient.java @@ -30,6 +30,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.client.ReactorResourceFactory; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -80,7 +81,8 @@ public GoogolplexClient( @Value("${googolplex-theater.app-id}") String appId, @Value("${googolplex-theater.heartbeat-interval}") Duration heartbeatInterval, @Value("${googolplex-theater.heartbeat-timeout}") Duration heartbeatTimeout, - @Value("${googolplex-theater.retry-interval}") Duration retryInterval) + @Value("${googolplex-theater.retry-interval}") Duration retryInterval, + ReactorResourceFactory reactorResourceFactory) throws SSLException { this.appId = appId; if (!APP_ID_PATTERN.matcher(appId).find()) { @@ -96,6 +98,7 @@ public GoogolplexClient( log.info("Using cast application id: {}", appId); // configure the socket client this.bootstrap = TcpClient.create() + .runOn(reactorResourceFactory.getLoopResources()) .secure(spec -> spec.sslContext(sslContext)) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000); } diff --git a/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java b/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java index d24c71f..2acf967 100644 --- a/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java +++ b/src/main/java/com/jyuzawa/googolplex_theater/Proxy.java @@ -15,7 +15,9 @@ import javax.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.reactivestreams.Publisher; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.http.client.ReactorResourceFactory; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; import reactor.netty.DisposableServer; @@ -24,6 +26,7 @@ import reactor.netty.http.server.HttpServer; import reactor.netty.http.server.HttpServerRequest; import reactor.netty.http.server.HttpServerResponse; +import reactor.netty.resources.LoopResources; @Slf4j @Component @@ -47,10 +50,13 @@ public class Proxy { private final HttpServer httpServer; private DisposableServer disposableServer; - public Proxy(ProxyProperties properties) { + @Autowired + public Proxy(ProxyProperties properties, ReactorResourceFactory reactorResourceFactory) { this.properties = properties; - this.httpClient = HttpClient.create().baseUrl(properties.url); + LoopResources loopResources = reactorResourceFactory.getLoopResources(); + this.httpClient = HttpClient.create().runOn(loopResources).baseUrl(properties.url); this.httpServer = HttpServer.create() + .runOn(loopResources) .accessLog(properties.log) .port(properties.port) .handle(this::handle); From 34a859d56a2cd57572d4c8ae2eb8bba94430514e Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Tue, 11 Jun 2024 19:30:06 -0400 Subject: [PATCH 19/20] fix test --- .../java/com/jyuzawa/googolplex_theater/ProxyTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/jyuzawa/googolplex_theater/ProxyTest.java b/src/test/java/com/jyuzawa/googolplex_theater/ProxyTest.java index c18b54b..4f02aa3 100644 --- a/src/test/java/com/jyuzawa/googolplex_theater/ProxyTest.java +++ b/src/test/java/com/jyuzawa/googolplex_theater/ProxyTest.java @@ -15,11 +15,14 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.http.client.ReactorResourceFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.DisposableServer; import reactor.netty.http.client.HttpClient; import reactor.netty.http.server.HttpServer; +import reactor.netty.resources.LoopResources; class ProxyTest { @@ -44,7 +47,9 @@ static void start() { ProxyProperties proxyProperties = new ProxyProperties(); proxyProperties.url = "http://localhost:8082/"; proxyProperties.port = 8081; - proxy = new Proxy(proxyProperties); + ReactorResourceFactory factory = Mockito.mock(ReactorResourceFactory.class); + Mockito.when(factory.getLoopResources()).thenReturn(LoopResources.create("test")); + proxy = new Proxy(proxyProperties, factory); proxy.start(); } From 32120e4ecc0f19c89b88f5a0547bae667b81f032 Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Tue, 11 Jun 2024 20:11:20 -0400 Subject: [PATCH 20/20] revert just to annoy dan --- .../googolplex_theater/GoogolplexService.java | 120 +++++++++--------- .../DeviceConfigLoaderTest.java | 18 +-- .../GoogolplexServiceTest.java | 16 +-- 3 files changed, 80 insertions(+), 74 deletions(-) diff --git a/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexService.java b/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexService.java index 3b9aa4a..afbeb53 100644 --- a/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexService.java +++ b/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexService.java @@ -19,15 +19,18 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.jmdns.ServiceEvent; import javax.jmdns.ServiceInfo; +import javax.jmdns.impl.util.NamedThreadFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.Disposable; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; /** * This class represents the state of the application. All modifications to state occur in the same @@ -53,7 +56,7 @@ public class GoogolplexService implements Closeable { private final Map nameToDeviceInfo; private final Map nameToAddress; private final Map nameToChannel; - private final Scheduler executor; + private final ExecutorService executor; @Autowired public GoogolplexService(GoogolplexClient client) { @@ -62,7 +65,7 @@ public GoogolplexService(GoogolplexClient client) { this.nameToDeviceInfo = new ConcurrentHashMap<>(); this.nameToAddress = new ConcurrentHashMap<>(); this.nameToChannel = new ConcurrentHashMap<>(); - this.executor = Schedulers.newSingle("controller"); + this.executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("controller")); } private record Channel(AtomicReference birth, Disposable disposable) {} @@ -70,32 +73,30 @@ private record Channel(AtomicReference birth, Disposable disposable) {} /** * Load the config and propagate the changes to the any currently connected devices. * - * @param deviceInfos the devices loaded from the config file + * @param deviceInfos the device settings loaded from the file */ - public Disposable processDeviceConfig(List deviceInfos) { - return executor.schedule(() -> processDeviceConfig0(deviceInfos)); - } - - void processDeviceConfig0(List deviceInfos) { - Set namesToRemove = new HashSet<>(nameToDeviceInfo.keySet()); - for (DeviceInfo deviceInfo : deviceInfos) { - String name = deviceInfo.getName(); - // mark that we should not remove this device - namesToRemove.remove(name); - DeviceInfo oldDeviceInfo = nameToDeviceInfo.get(name); - // ignore unchanged devices - if (!deviceInfo.equals(oldDeviceInfo)) { - log.info("CONFIG_UPDATED '{}'", name); - nameToDeviceInfo.put(name, deviceInfo); + public Future processDeviceConfig(List deviceInfos) { + return executor.submit(() -> { + Set namesToRemove = new HashSet<>(nameToDeviceInfo.keySet()); + for (DeviceInfo deviceInfo : deviceInfos) { + String name = deviceInfo.getName(); + // mark that we should not remove this device + namesToRemove.remove(name); + DeviceInfo oldDeviceInfo = nameToDeviceInfo.get(name); + // ignore unchanged devices + if (!deviceInfo.equals(oldDeviceInfo)) { + log.info("CONFIG_UPDATED '{}'", name); + nameToDeviceInfo.put(name, deviceInfo); + apply(name); + } + } + // remove devices that were missing in the new config + for (String name : namesToRemove) { + log.info("CONFIG_REMOVED '{}'", name); + nameToDeviceInfo.remove(name); apply(name); } - } - // remove devices that were missing in the new config - for (String name : namesToRemove) { - log.info("CONFIG_REMOVED '{}'", name); - nameToDeviceInfo.remove(name); - apply(name); - } + }); } /** @@ -104,36 +105,34 @@ void processDeviceConfig0(List deviceInfos) { * * @param event mdns info */ - public Disposable register(ServiceEvent event) { - return executor.schedule(() -> register0(event)); - } - - void register0(ServiceEvent event) { - // the device information may not be full - ServiceInfo info = event.getInfo(); - String name = info.getPropertyString("fn"); - if (name == null) { - log.debug("Found unnamed cast:\n{}", info); - return; - } - InetAddress[] addresses = info.getInetAddresses(); - if (addresses == null || addresses.length == 0) { - log.debug("Found unaddressable cast:\n{}", info); - return; - } - /* - * we choose the first address. there should usually be just one. the mdns library returns ipv4 - * addresses before ipv6. - */ - InetSocketAddress address = new InetSocketAddress(addresses[0], info.getPort()); - InetSocketAddress oldAddress = nameToAddress.put(name, address); - if (!address.equals(oldAddress)) { + public Future register(ServiceEvent event) { + return executor.submit(() -> { + // the device information may not be full + ServiceInfo info = event.getInfo(); + String name = info.getPropertyString("fn"); + if (name == null) { + log.debug("Found unnamed cast:\n{}", info); + return; + } + InetAddress[] addresses = info.getInetAddresses(); + if (addresses == null || addresses.length == 0) { + log.debug("Found unaddressable cast:\n{}", info); + return; + } /* - * this is a newly discovered device, or an existing device whose address was updated. + * we choose the first address. there should usually be just one. the mdns library returns ipv4 + * addresses before ipv6. */ - log.info("REGISTER '{}' {}", name, address); - apply(name); - } + InetSocketAddress address = new InetSocketAddress(addresses[0], info.getPort()); + InetSocketAddress oldAddress = nameToAddress.put(name, address); + if (!address.equals(oldAddress)) { + /* + * this is a newly discovered device, or an existing device whose address was updated. + */ + log.info("REGISTER '{}' {}", name, address); + apply(name); + } + }); } /** @@ -169,8 +168,8 @@ private void apply(String name) { * * @param name the device to refresh */ - public Disposable refresh(String name) { - return executor.schedule(() -> { + public Future refresh(String name) { + return executor.submit(() -> { // closing channels will cause them to reconnect if (name == null) { // close all channels @@ -218,7 +217,12 @@ public List getDeviceInfo() { public void close() { nameToDeviceInfo.clear(); refresh(null); - executor.disposeGracefully().block(Duration.ofSeconds(10)); + executor.shutdown(); + try { + executor.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + // pass + } } /** diff --git a/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java b/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java index 5855a41..c0ff692 100644 --- a/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java +++ b/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java @@ -52,15 +52,17 @@ void loaderTest() throws IOException, InterruptedException { } BlockingQueue> queue = new ArrayBlockingQueue<>(10); GoogolplexService controller = Mockito.mock(GoogolplexService.class); - Mockito.when(controller.processDeviceConfig(Mockito.any())).then(new Answer() { + Mockito.doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - List newDevices = invocation.getArgument(0); - queue.add(newDevices); - return null; - } - }); + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + List newDevices = invocation.getArgument(0); + queue.add(newDevices); + return null; + } + }) + .when(controller) + .processDeviceConfig(Mockito.any()); ServiceDiscovery serviceDiscovery = Mockito.mock(ServiceDiscovery.class); String ipAddress = "192.168.1.239"; InetAddress address = InetAddress.getByName(ipAddress); diff --git a/src/test/java/com/jyuzawa/googolplex_theater/GoogolplexServiceTest.java b/src/test/java/com/jyuzawa/googolplex_theater/GoogolplexServiceTest.java index 9561b19..85938b5 100644 --- a/src/test/java/com/jyuzawa/googolplex_theater/GoogolplexServiceTest.java +++ b/src/test/java/com/jyuzawa/googolplex_theater/GoogolplexServiceTest.java @@ -65,31 +65,31 @@ void test() throws Exception { devices.add(cast2.device()); devices.add(cast3.device()); devices.add(cast4.device()); - service.register0(cast1.event()); - service.register0(cast2.event()); + service.register(cast1.event()).get(); + service.register(cast2.event()).get(); Mockito.verify(client, Mockito.never()).connect(Mockito.any(), Mockito.any(), Mockito.any()); - service.processDeviceConfig0(devices); + service.processDeviceConfig(devices).get(); Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast1.device()), Mockito.any()); Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast2.device()), Mockito.any()); Mockito.verify(client, Mockito.never()).connect(Mockito.any(), Mockito.eq(cast3.device()), Mockito.any()); Mockito.verify(client, Mockito.never()).connect(Mockito.any(), Mockito.eq(cast4.device()), Mockito.any()); - service.register(cast3.event()); - service.register(cast4.event()); + service.register(cast3.event()).get(); + service.register(cast4.event()).get(); Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast3.device()), Mockito.any()); Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast4.device()), Mockito.any()); - service.register(FakeCast.event(9005, "UnknownCast")); + service.register(FakeCast.event(9005, "UnknownCast")).get(); ServiceEvent noName = Mockito.mock(ServiceEvent.class); ServiceInfo noNameInfo = Mockito.mock(ServiceInfo.class); Mockito.when(noName.getInfo()).thenReturn(noNameInfo); Mockito.when(noNameInfo.getPropertyString(Mockito.anyString())).thenReturn(null); - service.register(noName); + service.register(noName).get(); ServiceEvent noAddr = Mockito.mock(ServiceEvent.class); Mockito.when(noAddr.getName()).thenReturn("Chromecast-NOIP.local"); ServiceInfo noAddrInfo = Mockito.mock(ServiceInfo.class); Mockito.when(noAddr.getInfo()).thenReturn(noAddrInfo); Mockito.when(noAddrInfo.getPropertyString(Mockito.anyString())).thenReturn("NOIP"); Mockito.when(noAddrInfo.getInetAddresses()).thenReturn(new InetAddress[] {}); - service.register(noAddr); + service.register(noAddr).get(); List deviceInfos = service.getDeviceInfo(); Set configureds = getConfigureds(deviceInfos);