diff --git a/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexService.java b/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexService.java index 3b9aa4a..afbeb53 100644 --- a/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexService.java +++ b/src/main/java/com/jyuzawa/googolplex_theater/GoogolplexService.java @@ -19,15 +19,18 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.jmdns.ServiceEvent; import javax.jmdns.ServiceInfo; +import javax.jmdns.impl.util.NamedThreadFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.Disposable; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; /** * This class represents the state of the application. All modifications to state occur in the same @@ -53,7 +56,7 @@ public class GoogolplexService implements Closeable { private final Map nameToDeviceInfo; private final Map nameToAddress; private final Map nameToChannel; - private final Scheduler executor; + private final ExecutorService executor; @Autowired public GoogolplexService(GoogolplexClient client) { @@ -62,7 +65,7 @@ public GoogolplexService(GoogolplexClient client) { this.nameToDeviceInfo = new ConcurrentHashMap<>(); this.nameToAddress = new ConcurrentHashMap<>(); this.nameToChannel = new ConcurrentHashMap<>(); - this.executor = Schedulers.newSingle("controller"); + this.executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("controller")); } private record Channel(AtomicReference birth, Disposable disposable) {} @@ -70,32 +73,30 @@ private record Channel(AtomicReference birth, Disposable disposable) {} /** * Load the config and propagate the changes to the any currently connected devices. * - * @param deviceInfos the devices loaded from the config file + * @param deviceInfos the device settings loaded from the file */ - public Disposable processDeviceConfig(List deviceInfos) { - return executor.schedule(() -> processDeviceConfig0(deviceInfos)); - } - - void processDeviceConfig0(List deviceInfos) { - Set namesToRemove = new HashSet<>(nameToDeviceInfo.keySet()); - for (DeviceInfo deviceInfo : deviceInfos) { - String name = deviceInfo.getName(); - // mark that we should not remove this device - namesToRemove.remove(name); - DeviceInfo oldDeviceInfo = nameToDeviceInfo.get(name); - // ignore unchanged devices - if (!deviceInfo.equals(oldDeviceInfo)) { - log.info("CONFIG_UPDATED '{}'", name); - nameToDeviceInfo.put(name, deviceInfo); + public Future processDeviceConfig(List deviceInfos) { + return executor.submit(() -> { + Set namesToRemove = new HashSet<>(nameToDeviceInfo.keySet()); + for (DeviceInfo deviceInfo : deviceInfos) { + String name = deviceInfo.getName(); + // mark that we should not remove this device + namesToRemove.remove(name); + DeviceInfo oldDeviceInfo = nameToDeviceInfo.get(name); + // ignore unchanged devices + if (!deviceInfo.equals(oldDeviceInfo)) { + log.info("CONFIG_UPDATED '{}'", name); + nameToDeviceInfo.put(name, deviceInfo); + apply(name); + } + } + // remove devices that were missing in the new config + for (String name : namesToRemove) { + log.info("CONFIG_REMOVED '{}'", name); + nameToDeviceInfo.remove(name); apply(name); } - } - // remove devices that were missing in the new config - for (String name : namesToRemove) { - log.info("CONFIG_REMOVED '{}'", name); - nameToDeviceInfo.remove(name); - apply(name); - } + }); } /** @@ -104,36 +105,34 @@ void processDeviceConfig0(List deviceInfos) { * * @param event mdns info */ - public Disposable register(ServiceEvent event) { - return executor.schedule(() -> register0(event)); - } - - void register0(ServiceEvent event) { - // the device information may not be full - ServiceInfo info = event.getInfo(); - String name = info.getPropertyString("fn"); - if (name == null) { - log.debug("Found unnamed cast:\n{}", info); - return; - } - InetAddress[] addresses = info.getInetAddresses(); - if (addresses == null || addresses.length == 0) { - log.debug("Found unaddressable cast:\n{}", info); - return; - } - /* - * we choose the first address. there should usually be just one. the mdns library returns ipv4 - * addresses before ipv6. - */ - InetSocketAddress address = new InetSocketAddress(addresses[0], info.getPort()); - InetSocketAddress oldAddress = nameToAddress.put(name, address); - if (!address.equals(oldAddress)) { + public Future register(ServiceEvent event) { + return executor.submit(() -> { + // the device information may not be full + ServiceInfo info = event.getInfo(); + String name = info.getPropertyString("fn"); + if (name == null) { + log.debug("Found unnamed cast:\n{}", info); + return; + } + InetAddress[] addresses = info.getInetAddresses(); + if (addresses == null || addresses.length == 0) { + log.debug("Found unaddressable cast:\n{}", info); + return; + } /* - * this is a newly discovered device, or an existing device whose address was updated. + * we choose the first address. there should usually be just one. the mdns library returns ipv4 + * addresses before ipv6. */ - log.info("REGISTER '{}' {}", name, address); - apply(name); - } + InetSocketAddress address = new InetSocketAddress(addresses[0], info.getPort()); + InetSocketAddress oldAddress = nameToAddress.put(name, address); + if (!address.equals(oldAddress)) { + /* + * this is a newly discovered device, or an existing device whose address was updated. + */ + log.info("REGISTER '{}' {}", name, address); + apply(name); + } + }); } /** @@ -169,8 +168,8 @@ private void apply(String name) { * * @param name the device to refresh */ - public Disposable refresh(String name) { - return executor.schedule(() -> { + public Future refresh(String name) { + return executor.submit(() -> { // closing channels will cause them to reconnect if (name == null) { // close all channels @@ -218,7 +217,12 @@ public List getDeviceInfo() { public void close() { nameToDeviceInfo.clear(); refresh(null); - executor.disposeGracefully().block(Duration.ofSeconds(10)); + executor.shutdown(); + try { + executor.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + // pass + } } /** diff --git a/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java b/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java index 5855a41..c0ff692 100644 --- a/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java +++ b/src/test/java/com/jyuzawa/googolplex_theater/DeviceConfigLoaderTest.java @@ -52,15 +52,17 @@ void loaderTest() throws IOException, InterruptedException { } BlockingQueue> queue = new ArrayBlockingQueue<>(10); GoogolplexService controller = Mockito.mock(GoogolplexService.class); - Mockito.when(controller.processDeviceConfig(Mockito.any())).then(new Answer() { + Mockito.doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - List newDevices = invocation.getArgument(0); - queue.add(newDevices); - return null; - } - }); + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + List newDevices = invocation.getArgument(0); + queue.add(newDevices); + return null; + } + }) + .when(controller) + .processDeviceConfig(Mockito.any()); ServiceDiscovery serviceDiscovery = Mockito.mock(ServiceDiscovery.class); String ipAddress = "192.168.1.239"; InetAddress address = InetAddress.getByName(ipAddress); diff --git a/src/test/java/com/jyuzawa/googolplex_theater/GoogolplexServiceTest.java b/src/test/java/com/jyuzawa/googolplex_theater/GoogolplexServiceTest.java index 9561b19..85938b5 100644 --- a/src/test/java/com/jyuzawa/googolplex_theater/GoogolplexServiceTest.java +++ b/src/test/java/com/jyuzawa/googolplex_theater/GoogolplexServiceTest.java @@ -65,31 +65,31 @@ void test() throws Exception { devices.add(cast2.device()); devices.add(cast3.device()); devices.add(cast4.device()); - service.register0(cast1.event()); - service.register0(cast2.event()); + service.register(cast1.event()).get(); + service.register(cast2.event()).get(); Mockito.verify(client, Mockito.never()).connect(Mockito.any(), Mockito.any(), Mockito.any()); - service.processDeviceConfig0(devices); + service.processDeviceConfig(devices).get(); Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast1.device()), Mockito.any()); Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast2.device()), Mockito.any()); Mockito.verify(client, Mockito.never()).connect(Mockito.any(), Mockito.eq(cast3.device()), Mockito.any()); Mockito.verify(client, Mockito.never()).connect(Mockito.any(), Mockito.eq(cast4.device()), Mockito.any()); - service.register(cast3.event()); - service.register(cast4.event()); + service.register(cast3.event()).get(); + service.register(cast4.event()).get(); Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast3.device()), Mockito.any()); Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast4.device()), Mockito.any()); - service.register(FakeCast.event(9005, "UnknownCast")); + service.register(FakeCast.event(9005, "UnknownCast")).get(); ServiceEvent noName = Mockito.mock(ServiceEvent.class); ServiceInfo noNameInfo = Mockito.mock(ServiceInfo.class); Mockito.when(noName.getInfo()).thenReturn(noNameInfo); Mockito.when(noNameInfo.getPropertyString(Mockito.anyString())).thenReturn(null); - service.register(noName); + service.register(noName).get(); ServiceEvent noAddr = Mockito.mock(ServiceEvent.class); Mockito.when(noAddr.getName()).thenReturn("Chromecast-NOIP.local"); ServiceInfo noAddrInfo = Mockito.mock(ServiceInfo.class); Mockito.when(noAddr.getInfo()).thenReturn(noAddrInfo); Mockito.when(noAddrInfo.getPropertyString(Mockito.anyString())).thenReturn("NOIP"); Mockito.when(noAddrInfo.getInetAddresses()).thenReturn(new InetAddress[] {}); - service.register(noAddr); + service.register(noAddr).get(); List deviceInfos = service.getDeviceInfo(); Set configureds = getConfigureds(deviceInfos);