diff --git a/src/main/java/be/bagofwords/application/BaseServer.java b/src/main/java/be/bagofwords/application/BaseServer.java deleted file mode 100644 index c9aa0f5..0000000 --- a/src/main/java/be/bagofwords/application/BaseServer.java +++ /dev/null @@ -1,44 +0,0 @@ -package be.bagofwords.application; - - -import be.bagofwords.ui.UI; -import be.bagofwords.util.SafeThread; -import be.bagofwords.util.Utils; -import org.apache.commons.io.IOUtils; - -import java.io.IOException; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketException; -import java.util.ArrayList; -import java.util.List; - -public abstract class BaseServer extends SafeThread { - - public static final String ENCODING = "UTF-8"; - public static final long LONG_ERROR = Long.MAX_VALUE; - public static final long LONG_OK = Long.MAX_VALUE - 1; - public static final long LONG_END = Long.MAX_VALUE - 2; - - private ServerSocket serverSocket; - private final List runningRequestHandlers; - private final int scpPort; - private int totalNumberOfConnections; - - public BaseServer(String name, int port) { - super(name, false); - this.runningRequestHandlers = new ArrayList<>(); - this.scpPort = port; - this.totalNumberOfConnections = 0; - try { - this.serverSocket = new ServerSocket(scpPort); - } catch (IOException exp) { - throw new RuntimeException("Failed to initialize server " + getName() + " on port " + scpPort, exp); - } - } - - - - - -} diff --git a/src/main/java/be/bagofwords/application/SocketRequestHandler.java b/src/main/java/be/bagofwords/application/SocketRequestHandler.java index 61fbec0..7353ba2 100644 --- a/src/main/java/be/bagofwords/application/SocketRequestHandler.java +++ b/src/main/java/be/bagofwords/application/SocketRequestHandler.java @@ -1,12 +1,63 @@ package be.bagofwords.application; +import be.bagofwords.util.SafeThread; +import be.bagofwords.util.SocketConnection; +import org.apache.commons.io.IOUtils; + /** * Created by koen on 01.11.16. */ -public interface SocketRequestHandler { +public abstract class SocketRequestHandler extends SafeThread { + + protected SocketConnection connection; + private SocketServer socketServer; + private long startTime; + + public SocketRequestHandler(String name, SocketConnection connection) { + super(name, true); + this.connection = connection; + } + + public void setSocketServer(SocketServer socketServer) { + this.socketServer = socketServer; + } + + public long getStartTime() { + return startTime; + } + + @Override + protected void runInt() throws Exception { + try { + startTime = System.currentTimeMillis(); + handleRequests(); + } catch (Exception ex) { + if (isUnexpectedError(ex)) { + reportUnexpectedError(ex); + } + } + IOUtils.closeQuietly(connection); + socketServer.removeHandler(this); + } + + protected boolean isUnexpectedError(Exception ex) { + if (ex.getMessage() != null && ex.getMessage().contains("Connection reset")) { + return false; + } + for (StackTraceElement el : ex.getStackTrace()) { + if (el.getMethodName().equals("readNextAction")) { + return false; + } + } + return true; + } + + public abstract void handleRequests() throws Exception; - void handleRequests() throws Exception; + public abstract void reportUnexpectedError(Exception ex); - void reportUnexpectedError(Exception ex); + public long getTotalNumberOfRequests() { + return -1; //Should be overridden in subclasses + } } diff --git a/src/main/java/be/bagofwords/application/SocketServer.java b/src/main/java/be/bagofwords/application/SocketServer.java index cd9430e..e00c293 100644 --- a/src/main/java/be/bagofwords/application/SocketServer.java +++ b/src/main/java/be/bagofwords/application/SocketServer.java @@ -1,25 +1,20 @@ package be.bagofwords.application; +import be.bagofwords.application.status.StatusViewable; import be.bagofwords.ui.UI; -import be.bagofwords.util.SafeThread; -import be.bagofwords.util.SocketConnection; -import be.bagofwords.util.UnbufferedSocketConnection; -import be.bagofwords.util.Utils; +import be.bagofwords.util.*; import org.apache.commons.io.IOUtils; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** * Created by koen on 01.11.16. */ -public class SocketServer extends SafeThread { +public class SocketServer extends SafeThread implements StatusViewable { public static final String ENCODING = "UTF-8"; public static final long LONG_ERROR = Long.MAX_VALUE; @@ -28,7 +23,7 @@ public class SocketServer extends SafeThread { private ServerSocket serverSocket; private Map socketRequestHandlerFactories; - private final List runningRequestHandlers; + private final List runningRequestHandlers; private final int scpPort; private int totalNumberOfConnections; @@ -41,6 +36,7 @@ public SocketServer(int port) { try { this.serverSocket = new ServerSocket(scpPort); } catch (IOException exp) { + this.serverSocket = null; throw new RuntimeException("Failed to initialize server " + getName() + " on port " + scpPort, exp); } } @@ -55,25 +51,30 @@ public synchronized void registerSocketRequestHandlerFactory(SocketRequestHandle @Override protected void runInt() throws Exception { UI.write("Started server " + getName() + " on port " + scpPort); - Utils.threadSleep(500); //Make sure socket has had time to bind successfully (this does not yet work very well)) while (!serverSocket.isClosed() && !isTerminateRequested()) { try { Socket acceptedSocket = serverSocket.accept(); SocketConnection connection = new UnbufferedSocketConnection(acceptedSocket); String factoryName = connection.readString(); + if(factoryName==null || StringUtils.isEmpty(factoryName.trim())) { + connection.writeLong(SocketServer.LONG_ERROR); + connection.writeString("No name specified for the requested SocketRequestHandlerFactory"); + continue; + } SocketRequestHandlerFactory factory = socketRequestHandlerFactories.get(factoryName); if (factory == null) { + UI.writeWarning("No SocketRequestHandlerFactory registered for name " + factoryName); connection.writeLong(SocketServer.LONG_ERROR); connection.writeString("No SocketRequestHandlerFactory registered for name " + factoryName); continue; } SocketRequestHandler handler = factory.createSocketRequestHandler(acceptedSocket); - SocketRequestHandlerThread thread = new SocketRequestHandlerThread(factoryName, handler, acceptedSocket); + handler.setSocketServer(this); if (handler != null) { synchronized (runningRequestHandlers) { - runningRequestHandlers.add(thread); + runningRequestHandlers.add(handler); } - thread.start(); + handler.start(); totalNumberOfConnections++; } else { UI.writeWarning("Factory " + factoryName + " failed to create a socket handler. Closing socket..."); @@ -93,7 +94,7 @@ public void doTerminate() { //once a request handler is finished, it removes itself from the list of requestHandlers, so we just wait until this list is empty while (!runningRequestHandlers.isEmpty()) { synchronized (runningRequestHandlers) { - for (SocketRequestHandlerThread requestHandler : runningRequestHandlers) { + for (SocketRequestHandler requestHandler : runningRequestHandlers) { if (!requestHandler.isTerminateRequested()) { requestHandler.terminate(); //we can not call terminateAndWaitForFinish() here since to finish the request handler needs access to the runningRequestHandlers list } @@ -108,50 +109,43 @@ public int getTotalNumberOfConnections() { return totalNumberOfConnections; } - public List getRunningRequestHandlers() { + public List getRunningRequestHandlers() { return runningRequestHandlers; } - public class SocketRequestHandlerThread extends SafeThread { - - private final SocketRequestHandler socketRequestHandler; - private Socket socket; - - public SocketRequestHandlerThread(String factoryName, SocketRequestHandler socketRequestHandler, Socket socket) { - super(factoryName + "_request_handler", true); - this.socketRequestHandler = socketRequestHandler; - this.socket = socket; + public void removeHandler(SocketRequestHandler handler) { + synchronized (runningRequestHandlers) { + runningRequestHandlers.remove(handler); } + } - @Override - protected void runInt() throws Exception { - try { - socketRequestHandler.handleRequests(); - } catch (Exception ex) { - if (isUnexpectedError(ex)) { - socketRequestHandler.reportUnexpectedError(ex); - } - } - IOUtils.closeQuietly(socket); - synchronized (runningRequestHandlers) { - runningRequestHandlers.remove(this); - } + @Override + public void printHtmlStatus(StringBuilder sb) { + sb.append("

Printing database server statistics

"); + ln(sb, ""); + ln(sb, ""); + ln(sb, ""); + List runningRequestHandlers = getRunningRequestHandlers(); + ln(sb, ""); + List sortedRequestHandlers; + synchronized (runningRequestHandlers) { + sortedRequestHandlers = new ArrayList<>(runningRequestHandlers); } - - protected boolean isUnexpectedError(Exception ex) { - if (ex.getMessage() != null && ex.getMessage().contains("Connection reset")) { - return false; - } - for (StackTraceElement el : ex.getStackTrace()) { - if (el.getMethodName().equals("readNextAction")) { - return false; - } - } - return true; + Collections.sort(sortedRequestHandlers, (o1, o2) -> -Double.compare(o1.getTotalNumberOfRequests(), o2.getTotalNumberOfRequests())); + for (int i = 0; i < sortedRequestHandlers.size(); i++) { + SocketRequestHandler handler = sortedRequestHandlers.get(i); + ln(sb, ""); + ln(sb, ""); + ln(sb, ""); + double requestsPerSec = handler.getTotalNumberOfRequests() * 1000.0 / (System.currentTimeMillis() - handler.getStartTime()); + ln(sb, ""); } - - + ln(sb, "
Used memory is " + UI.getMemoryUsage() + "
Total number of connections " + getTotalNumberOfConnections() + "
Current number of handlers " + runningRequestHandlers.size() + "
" + i + " Name " + handler.getName() + "
" + i + " Started at " + new Date(handler.getStartTime()) + "
" + i + " Total number of requests " + handler.getTotalNumberOfRequests() + "
" + i + " Average requests/s" + NumUtils.fmt(requestsPerSec) + "
"); } + private void ln(StringBuilder sb, String s) { + sb.append(s); + sb.append("\n"); + } } diff --git a/src/main/java/be/bagofwords/application/SocketServerContextFactory.java b/src/main/java/be/bagofwords/application/SocketServerContextFactory.java new file mode 100644 index 0000000..24256f2 --- /dev/null +++ b/src/main/java/be/bagofwords/application/SocketServerContextFactory.java @@ -0,0 +1,11 @@ +package be.bagofwords.application; + +public class SocketServerContextFactory extends MinimalApplicationContextFactory { + + @Override + public void wireApplicationContext(ApplicationContext context) { + super.wireApplicationContext(context); + SocketServer server = new SocketServer(Integer.parseInt(context.getConfig("socket_port"))); + context.registerBean(server); + } +} diff --git a/src/main/java/be/bagofwords/application/status/ListUrlsController.java b/src/main/java/be/bagofwords/application/status/ListUrlsController.java index 683a1e5..6f84a08 100644 --- a/src/main/java/be/bagofwords/application/status/ListUrlsController.java +++ b/src/main/java/be/bagofwords/application/status/ListUrlsController.java @@ -22,9 +22,7 @@ public class ListUrlsController extends BaseController { public ListUrlsController(ApplicationContext applicationContext) { super("/paths"); this.urls = new ArrayList<>(); - RegisterUrlsServer registerUrlsServer = new RegisterUrlsServer(this); - SocketServer socketServer = applicationContext.getBean(SocketServer.class); - socketServer.registerSocketRequestHandlerFactory(registerUrlsServer); + new RegisterUrlsServer(this, applicationContext); } @Override diff --git a/src/main/java/be/bagofwords/application/status/RegisterUrlsClient.java b/src/main/java/be/bagofwords/application/status/RegisterUrlsClient.java index 7a297da..475c9a9 100644 --- a/src/main/java/be/bagofwords/application/status/RegisterUrlsClient.java +++ b/src/main/java/be/bagofwords/application/status/RegisterUrlsClient.java @@ -1,7 +1,7 @@ package be.bagofwords.application.status; import be.bagofwords.application.ApplicationContext; -import be.bagofwords.application.BaseServer; +import be.bagofwords.application.SocketServer; import be.bagofwords.util.SocketConnection; import be.bagofwords.web.BaseController; import be.bagofwords.web.WebContainer; @@ -41,7 +41,7 @@ public void registerPath(String path) { connection.writeString(path); connection.flush(); long result = connection.readLong(); - if (result != BaseServer.LONG_OK) { + if (result != SocketServer.LONG_OK) { throw new RuntimeException("Unexpected response " + result); } } catch (IOException e) { diff --git a/src/main/java/be/bagofwords/application/status/RegisterUrlsServer.java b/src/main/java/be/bagofwords/application/status/RegisterUrlsServer.java index 534143c..c31317a 100644 --- a/src/main/java/be/bagofwords/application/status/RegisterUrlsServer.java +++ b/src/main/java/be/bagofwords/application/status/RegisterUrlsServer.java @@ -1,5 +1,6 @@ package be.bagofwords.application.status; +import be.bagofwords.application.ApplicationContext; import be.bagofwords.application.SocketRequestHandler; import be.bagofwords.application.SocketRequestHandlerFactory; import be.bagofwords.application.SocketServer; @@ -19,8 +20,10 @@ public class RegisterUrlsServer implements SocketRequestHandlerFactory { private ListUrlsController listUrlsController; - public RegisterUrlsServer(ListUrlsController listUrlsController) { + public RegisterUrlsServer(ListUrlsController listUrlsController, ApplicationContext context) { this.listUrlsController = listUrlsController; + SocketServer socketServer = context.getBean(SocketServer.class); + socketServer.registerSocketRequestHandlerFactory(this); } @Override @@ -31,7 +34,7 @@ public String getName() { @Override public SocketRequestHandler createSocketRequestHandler(Socket socket) throws IOException { SocketConnection connection = new BufferedSocketConnection(socket); - return new SocketRequestHandler() { + return new SocketRequestHandler("register_url_handler", connection) { @Override public void reportUnexpectedError(Exception ex) { UI.writeError("Unexpected error in RegisterPathServer", ex); diff --git a/src/main/java/be/bagofwords/util/ExecutionResult.java b/src/main/java/be/bagofwords/util/ExecutionResult.java index a76932e..04bc07b 100644 --- a/src/main/java/be/bagofwords/util/ExecutionResult.java +++ b/src/main/java/be/bagofwords/util/ExecutionResult.java @@ -35,4 +35,8 @@ public String getErrorOut() { public void setErrorOut(String errorOut) { this.errorOut = errorOut; } + + public boolean isSuccess() { + return this.returnCode == 0 ; + } } diff --git a/src/main/java/be/bagofwords/util/SocketConnection.java b/src/main/java/be/bagofwords/util/SocketConnection.java index 6563219..58f21e5 100644 --- a/src/main/java/be/bagofwords/util/SocketConnection.java +++ b/src/main/java/be/bagofwords/util/SocketConnection.java @@ -21,6 +21,16 @@ public SocketConnection(String host, int port) throws IOException { this.os = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), 32 * 1024)); } + public SocketConnection(Socket socket) throws IOException { + this(socket, false, false); + } + + public SocketConnection(Socket socket, boolean useLargeOutputBuffer, boolean useLargeInputBuffer) throws IOException { + this.socket = socket; + this.is = new DataInputStream(new BufferedInputStream(socket.getInputStream(), useLargeInputBuffer ? 1024 * 1024 : 32 * 1024)); + this.os = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), useLargeOutputBuffer ? 1024 * 1024 : 32 * 1024)); + } + public SocketConnection(Socket socket, DataInputStream is, DataOutputStream os) throws IOException { this.socket = socket; this.is = is; @@ -236,4 +246,8 @@ public void writeBoolean(boolean value) throws IOException { public InetAddress getInetAddress() { return socket.getInetAddress(); } + + public int getRemotePort() { + return socket.getPort(); + } }