Skip to content

Commit

Permalink
Various changes
Browse files Browse the repository at this point in the history
  • Loading branch information
koendeschacht committed Dec 17, 2016
1 parent d578a91 commit e2784a4
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 105 deletions.
44 changes: 0 additions & 44 deletions src/main/java/be/bagofwords/application/BaseServer.java

This file was deleted.

57 changes: 54 additions & 3 deletions src/main/java/be/bagofwords/application/SocketRequestHandler.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,63 @@
package be.bagofwords.application;

import be.bagofwords.util.SafeThread;
import be.bagofwords.util.SocketConnection;
import org.apache.commons.io.IOUtils;

/**
* Created by koen on 01.11.16.
*/
public interface SocketRequestHandler {
public abstract class SocketRequestHandler extends SafeThread {

protected SocketConnection connection;
private SocketServer socketServer;
private long startTime;

public SocketRequestHandler(String name, SocketConnection connection) {
super(name, true);
this.connection = connection;
}

public void setSocketServer(SocketServer socketServer) {
this.socketServer = socketServer;
}

public long getStartTime() {
return startTime;
}

@Override
protected void runInt() throws Exception {
try {
startTime = System.currentTimeMillis();
handleRequests();
} catch (Exception ex) {
if (isUnexpectedError(ex)) {
reportUnexpectedError(ex);
}
}
IOUtils.closeQuietly(connection);
socketServer.removeHandler(this);
}

protected boolean isUnexpectedError(Exception ex) {
if (ex.getMessage() != null && ex.getMessage().contains("Connection reset")) {
return false;
}
for (StackTraceElement el : ex.getStackTrace()) {
if (el.getMethodName().equals("readNextAction")) {
return false;
}
}
return true;
}

public abstract void handleRequests() throws Exception;

void handleRequests() throws Exception;
public abstract void reportUnexpectedError(Exception ex);

void reportUnexpectedError(Exception ex);
public long getTotalNumberOfRequests() {
return -1; //Should be overridden in subclasses
}

}
96 changes: 45 additions & 51 deletions src/main/java/be/bagofwords/application/SocketServer.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
package be.bagofwords.application;

import be.bagofwords.application.status.StatusViewable;
import be.bagofwords.ui.UI;
import be.bagofwords.util.SafeThread;
import be.bagofwords.util.SocketConnection;
import be.bagofwords.util.UnbufferedSocketConnection;
import be.bagofwords.util.Utils;
import be.bagofwords.util.*;
import org.apache.commons.io.IOUtils;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

/**
* Created by koen on 01.11.16.
*/
public class SocketServer extends SafeThread {
public class SocketServer extends SafeThread implements StatusViewable {

public static final String ENCODING = "UTF-8";
public static final long LONG_ERROR = Long.MAX_VALUE;
Expand All @@ -28,7 +23,7 @@ public class SocketServer extends SafeThread {

private ServerSocket serverSocket;
private Map<String, SocketRequestHandlerFactory> socketRequestHandlerFactories;
private final List<SocketRequestHandlerThread> runningRequestHandlers;
private final List<SocketRequestHandler> runningRequestHandlers;
private final int scpPort;
private int totalNumberOfConnections;

Expand All @@ -41,6 +36,7 @@ public SocketServer(int port) {
try {
this.serverSocket = new ServerSocket(scpPort);
} catch (IOException exp) {
this.serverSocket = null;
throw new RuntimeException("Failed to initialize server " + getName() + " on port " + scpPort, exp);
}
}
Expand All @@ -55,25 +51,30 @@ public synchronized void registerSocketRequestHandlerFactory(SocketRequestHandle
@Override
protected void runInt() throws Exception {
UI.write("Started server " + getName() + " on port " + scpPort);
Utils.threadSleep(500); //Make sure socket has had time to bind successfully (this does not yet work very well))
while (!serverSocket.isClosed() && !isTerminateRequested()) {
try {
Socket acceptedSocket = serverSocket.accept();
SocketConnection connection = new UnbufferedSocketConnection(acceptedSocket);
String factoryName = connection.readString();
if(factoryName==null || StringUtils.isEmpty(factoryName.trim())) {
connection.writeLong(SocketServer.LONG_ERROR);
connection.writeString("No name specified for the requested SocketRequestHandlerFactory");
continue;
}
SocketRequestHandlerFactory factory = socketRequestHandlerFactories.get(factoryName);
if (factory == null) {
UI.writeWarning("No SocketRequestHandlerFactory registered for name " + factoryName);
connection.writeLong(SocketServer.LONG_ERROR);
connection.writeString("No SocketRequestHandlerFactory registered for name " + factoryName);
continue;
}
SocketRequestHandler handler = factory.createSocketRequestHandler(acceptedSocket);
SocketRequestHandlerThread thread = new SocketRequestHandlerThread(factoryName, handler, acceptedSocket);
handler.setSocketServer(this);
if (handler != null) {
synchronized (runningRequestHandlers) {
runningRequestHandlers.add(thread);
runningRequestHandlers.add(handler);
}
thread.start();
handler.start();
totalNumberOfConnections++;
} else {
UI.writeWarning("Factory " + factoryName + " failed to create a socket handler. Closing socket...");
Expand All @@ -93,7 +94,7 @@ public void doTerminate() {
//once a request handler is finished, it removes itself from the list of requestHandlers, so we just wait until this list is empty
while (!runningRequestHandlers.isEmpty()) {
synchronized (runningRequestHandlers) {
for (SocketRequestHandlerThread requestHandler : runningRequestHandlers) {
for (SocketRequestHandler requestHandler : runningRequestHandlers) {
if (!requestHandler.isTerminateRequested()) {
requestHandler.terminate(); //we can not call terminateAndWaitForFinish() here since to finish the request handler needs access to the runningRequestHandlers list
}
Expand All @@ -108,50 +109,43 @@ public int getTotalNumberOfConnections() {
return totalNumberOfConnections;
}

public List<SocketRequestHandlerThread> getRunningRequestHandlers() {
public List<SocketRequestHandler> getRunningRequestHandlers() {
return runningRequestHandlers;
}


public class SocketRequestHandlerThread extends SafeThread {

private final SocketRequestHandler socketRequestHandler;
private Socket socket;

public SocketRequestHandlerThread(String factoryName, SocketRequestHandler socketRequestHandler, Socket socket) {
super(factoryName + "_request_handler", true);
this.socketRequestHandler = socketRequestHandler;
this.socket = socket;
public void removeHandler(SocketRequestHandler handler) {
synchronized (runningRequestHandlers) {
runningRequestHandlers.remove(handler);
}
}

@Override
protected void runInt() throws Exception {
try {
socketRequestHandler.handleRequests();
} catch (Exception ex) {
if (isUnexpectedError(ex)) {
socketRequestHandler.reportUnexpectedError(ex);
}
}
IOUtils.closeQuietly(socket);
synchronized (runningRequestHandlers) {
runningRequestHandlers.remove(this);
}
@Override
public void printHtmlStatus(StringBuilder sb) {
sb.append("<h1>Printing database server statistics</h1>");
ln(sb, "<table>");
ln(sb, "<tr><td>Used memory is </td><td>" + UI.getMemoryUsage() + "</td></tr>");
ln(sb, "<tr><td>Total number of connections </td><td>" + getTotalNumberOfConnections() + "</td></tr>");
List<SocketRequestHandler> runningRequestHandlers = getRunningRequestHandlers();
ln(sb, "<tr><td>Current number of handlers </td><td>" + runningRequestHandlers.size() + "</td></tr>");
List<SocketRequestHandler> sortedRequestHandlers;
synchronized (runningRequestHandlers) {
sortedRequestHandlers = new ArrayList<>(runningRequestHandlers);
}

protected boolean isUnexpectedError(Exception ex) {
if (ex.getMessage() != null && ex.getMessage().contains("Connection reset")) {
return false;
}
for (StackTraceElement el : ex.getStackTrace()) {
if (el.getMethodName().equals("readNextAction")) {
return false;
}
}
return true;
Collections.sort(sortedRequestHandlers, (o1, o2) -> -Double.compare(o1.getTotalNumberOfRequests(), o2.getTotalNumberOfRequests()));
for (int i = 0; i < sortedRequestHandlers.size(); i++) {
SocketRequestHandler handler = sortedRequestHandlers.get(i);
ln(sb, "<tr><td>" + i + " Name </td><td>" + handler.getName() + "</td></tr>");
ln(sb, "<tr><td>" + i + " Started at </td><td>" + new Date(handler.getStartTime()) + "</td></tr>");
ln(sb, "<tr><td>" + i + " Total number of requests </td><td>" + handler.getTotalNumberOfRequests() + "</td></tr>");
double requestsPerSec = handler.getTotalNumberOfRequests() * 1000.0 / (System.currentTimeMillis() - handler.getStartTime());
ln(sb, "<tr><td>" + i + " Average requests/s</td><td>" + NumUtils.fmt(requestsPerSec) + "</td></tr>");
}


ln(sb, "</table>");
}

private void ln(StringBuilder sb, String s) {
sb.append(s);
sb.append("\n");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package be.bagofwords.application;

public class SocketServerContextFactory extends MinimalApplicationContextFactory {

@Override
public void wireApplicationContext(ApplicationContext context) {
super.wireApplicationContext(context);
SocketServer server = new SocketServer(Integer.parseInt(context.getConfig("socket_port")));
context.registerBean(server);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ public class ListUrlsController extends BaseController {
public ListUrlsController(ApplicationContext applicationContext) {
super("/paths");
this.urls = new ArrayList<>();
RegisterUrlsServer registerUrlsServer = new RegisterUrlsServer(this);
SocketServer socketServer = applicationContext.getBean(SocketServer.class);
socketServer.registerSocketRequestHandlerFactory(registerUrlsServer);
new RegisterUrlsServer(this, applicationContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package be.bagofwords.application.status;

import be.bagofwords.application.ApplicationContext;
import be.bagofwords.application.BaseServer;
import be.bagofwords.application.SocketServer;
import be.bagofwords.util.SocketConnection;
import be.bagofwords.web.BaseController;
import be.bagofwords.web.WebContainer;
Expand Down Expand Up @@ -41,7 +41,7 @@ public void registerPath(String path) {
connection.writeString(path);
connection.flush();
long result = connection.readLong();
if (result != BaseServer.LONG_OK) {
if (result != SocketServer.LONG_OK) {
throw new RuntimeException("Unexpected response " + result);
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package be.bagofwords.application.status;

import be.bagofwords.application.ApplicationContext;
import be.bagofwords.application.SocketRequestHandler;
import be.bagofwords.application.SocketRequestHandlerFactory;
import be.bagofwords.application.SocketServer;
Expand All @@ -19,8 +20,10 @@ public class RegisterUrlsServer implements SocketRequestHandlerFactory {

private ListUrlsController listUrlsController;

public RegisterUrlsServer(ListUrlsController listUrlsController) {
public RegisterUrlsServer(ListUrlsController listUrlsController, ApplicationContext context) {
this.listUrlsController = listUrlsController;
SocketServer socketServer = context.getBean(SocketServer.class);
socketServer.registerSocketRequestHandlerFactory(this);
}

@Override
Expand All @@ -31,7 +34,7 @@ public String getName() {
@Override
public SocketRequestHandler createSocketRequestHandler(Socket socket) throws IOException {
SocketConnection connection = new BufferedSocketConnection(socket);
return new SocketRequestHandler() {
return new SocketRequestHandler("register_url_handler", connection) {
@Override
public void reportUnexpectedError(Exception ex) {
UI.writeError("Unexpected error in RegisterPathServer", ex);
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/be/bagofwords/util/ExecutionResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ public String getErrorOut() {
public void setErrorOut(String errorOut) {
this.errorOut = errorOut;
}

public boolean isSuccess() {
return this.returnCode == 0 ;
}
}
14 changes: 14 additions & 0 deletions src/main/java/be/bagofwords/util/SocketConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ public SocketConnection(String host, int port) throws IOException {
this.os = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), 32 * 1024));
}

public SocketConnection(Socket socket) throws IOException {
this(socket, false, false);
}

public SocketConnection(Socket socket, boolean useLargeOutputBuffer, boolean useLargeInputBuffer) throws IOException {
this.socket = socket;
this.is = new DataInputStream(new BufferedInputStream(socket.getInputStream(), useLargeInputBuffer ? 1024 * 1024 : 32 * 1024));
this.os = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), useLargeOutputBuffer ? 1024 * 1024 : 32 * 1024));
}

public SocketConnection(Socket socket, DataInputStream is, DataOutputStream os) throws IOException {
this.socket = socket;
this.is = is;
Expand Down Expand Up @@ -236,4 +246,8 @@ public void writeBoolean(boolean value) throws IOException {
public InetAddress getInetAddress() {
return socket.getInetAddress();
}

public int getRemotePort() {
return socket.getPort();
}
}

0 comments on commit e2784a4

Please sign in to comment.