Skip to content
This repository has been archived by the owner on Apr 4, 2023. It is now read-only.

Commit

Permalink
Introduce endpoint discovery to detect new endpoint being started on …
Browse files Browse the repository at this point in the history
…the same network

(so sidecar container could be discovered automatically)

Also introduce automatic port if port is already in used (so for example if you run three or four endpoints it won't collide)
note: if port is specified then it doesn't take unassigned ports

It allows as well pure docker image of sidecar + plug-ins

Change-Id: If0c0b58814892b1dbbe6d8b04e8fb5e7691ef820
Signed-off-by: Florent Benoit <[email protected]>
  • Loading branch information
benoitf committed Mar 6, 2019
1 parent c028f3f commit f574c99
Show file tree
Hide file tree
Showing 5 changed files with 534 additions and 229 deletions.
65 changes: 42 additions & 23 deletions dockerfiles/theia-endpoint-runtime/src/node/hosted-plugin-remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { HostedPluginClient, PluginMetadata } from '@theia/plugin-ext';
import { HostedPluginMapping } from './plugin-remote-mapping';
import { Websocket } from './websocket';
import { getPluginId } from '@theia/plugin-ext/lib/common';
import { PluginDiscovery } from './plugin-discovery';

/**
* Class handling remote connection for executing plug-ins.
Expand Down Expand Up @@ -42,6 +43,7 @@ export class HostedPluginRemote {

@postConstruct()
protected postConstruct(): void {
this.setupDiscovery();
this.setupWebsocket();
}

Expand All @@ -52,34 +54,51 @@ export class HostedPluginRemote {
this.client = client;
}

/**
* Handle discovery of other endpoints on same network.
*/
protected setupDiscovery(): void {
const pluginDiscovery = new PluginDiscovery(this.logger);
pluginDiscovery.onNewEndpoint = announceRequest => {
const endpointAdress = announceRequest.websocketAddress;
// only accept new endpoint address
if (!this.endpointsSockets.has(endpointAdress)) {
this.logger.debug(`Adding a new remote endpoint from ${endpointAdress}`);
this.connect(endpointAdress);
}
};
pluginDiscovery.discover();
}

/**
* Handle the creation of connection to remote endpoints.
*/
setupWebsocket(): void {
this.hostedPluginMapping.getEndPoints().forEach(endpointAdress => {
if (endpointAdress) {
const websocket = new Websocket(this.logger, endpointAdress);
this.endpointsSockets.set(endpointAdress, websocket);
websocket.onMessage = (messageRaw: string) => {
const parsed = JSON.parse(messageRaw);
if (parsed.internal) {
this.handleLocalMessage(parsed.internal);
return;
}
this.sendToClient(messageRaw);
};

// when websocket is opened, send the order
websocket.onOpen = event => {
websocket.send(JSON.stringify({
'internal': {
'endpointName': endpointAdress,
'metadata': 'request'
}
}));
};
this.hostedPluginMapping.getEndPoints().forEach(endpointAdress => this.connect(endpointAdress));
}

connect(endpointAdress: string) {
this.logger.debug(`Establish websocket connection to ${endpointAdress}`);
const websocket = new Websocket(this.logger, endpointAdress);
this.endpointsSockets.set(endpointAdress, websocket);
websocket.onMessage = (messageRaw: string) => {
const parsed = JSON.parse(messageRaw);
if (parsed.internal) {
this.handleLocalMessage(parsed.internal);
return;
}
});
this.sendToClient(messageRaw);
};

// when websocket is opened, send the order
websocket.onOpen = event => {
websocket.send(JSON.stringify({
'internal': {
'endpointName': endpointAdress,
'metadata': 'request'
}
}));
};
}

/**
Expand Down
176 changes: 176 additions & 0 deletions dockerfiles/theia-endpoint-runtime/src/node/plugin-discovery.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*********************************************************************
* Copyright (c) 2018-2019 Red Hat, Inc.
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
**********************************************************************/

import * as dgram from 'dgram';
import { ILogger } from '@theia/core/lib/common';

/**
* Class handling discovery of nodes
* @author Florent Benoit
*/
export class PluginDiscovery {

/**
* Default Multicast PORT.
*/
private static readonly DEFAULT_MULTICAST_PORT = '2503';

/**
* Default Multicast address.
*/
private static readonly DEFAULT_MULTICAST_ADDRESS = '225.0.0.3';

/**
* Multicast socket.
*/
private socket: dgram.Socket;

/**
* port number for multicast discovery.
*/
private discoveryPort: number;

/**
* port number for multicast discovery.
*/
private discoveryAddress: string;

/**
* Unique identifier of ourself used to communicate with others
*/
private discoveryName: string;

constructor(private readonly logger: ILogger, private readonly endpointPort?: number) {
this.discoveryName = `discovery[${Math.random().toString(36).substring(7)}]`;
}

/**
* Enter in discovery mode.
* On start, it will :
* - request others to announce themselves
* - announce himself if endpoint.
*/
discover() {

// if discovery is disabled, do not proceed with discover.
const discoveryDisabled = process.env.THEIA_PLUGIN_DISCOVERY_DISABLE || 'false';
if (discoveryDisabled === 'true') {
this.logger.warn('Plugin discovery is disabled.');
return;
}

// configured port number
this.discoveryPort = parseInt(process.env.THEIA_PLUGIN_ENDPOINT_DISCOVERY_PORT || PluginDiscovery.DEFAULT_MULTICAST_PORT, 10);

// configured adress
this.discoveryAddress = process.env.THEIA_PLUGIN_ENDPOINT_DISCOVERY_ADDRESS || PluginDiscovery.DEFAULT_MULTICAST_ADDRESS;
this.socket = dgram.createSocket({ type: 'udp4', reuseAddr: true });
this.socket.bind(this.discoveryPort);

// start to listen
this.socket.on('listening', () => {
this.socket.addMembership(this.discoveryAddress);
// announce if we're an endpoint
if (this.endpointPort) {
this.announceMySelf();
}
this.requestEndpoints();
});

// handle messages
this.socket.on('message', (msg: Buffer) => {

// receive order
const jsonMessage: DiscoveryMessage = JSON.parse(msg.toString());

// ignore ourself
if (this.discoveryName === jsonMessage.id) {
return;
}

// handle message based on the type
switch (jsonMessage.type) {
case 'REQUEST_ENDPOINTS':
// need to announce ourself but only if we're an endpoint
if (this.endpointPort) {
this.announceMySelf();
}
break;

case 'ANNOUNCE_ENDPOINT':
this.onNewEndpoint(JSON.parse(jsonMessage.content));
break;
}

});

}

/**
* Sends the given message to the multicast discovery address.
*/
protected send(message: DiscoveryMessage) {
this.socket.send(JSON.stringify(message), this.discoveryPort, this.discoveryAddress);
}

/**
* Greeting message announcing ourself to the others.
*/
protected announceMySelf() {

const announceRequest: DiscoveryAnnounceRequest = {
websocketAddress: `ws://localhost:${this.endpointPort}`
};
const announceMessage: DiscoveryMessage = {
id: this.discoveryName,
type: 'ANNOUNCE_ENDPOINT',
content: JSON.stringify(announceRequest)
};
this.send(announceMessage);

}

/**
* Request message asking others to announce.
*/
protected requestEndpoints() {

const announceMessage: DiscoveryMessage = {
id: this.discoveryName,
type: 'REQUEST_ENDPOINTS',
content: ''
};
this.send(announceMessage);

}

// callback used when a new endpoint is registered
public onNewEndpoint(discoveryAnnounceRequest: DiscoveryAnnounceRequest) { }

}

/**
* Announce Request message format
*/
interface DiscoveryAnnounceRequest {
websocketAddress: string;
}

/**
* Discovery message format
*/
interface DiscoveryMessage {
id: string,

type: 'REQUEST_ENDPOINTS' | 'ANNOUNCE_ENDPOINT',

content: string

}
Loading

0 comments on commit f574c99

Please sign in to comment.