diff --git a/Qora/.classpath b/Qora/.classpath index 76a38f40..e197e4ac 100644 --- a/Qora/.classpath +++ b/Qora/.classpath @@ -2,113 +2,109 @@ - + - - - + + + - + - + - - - - - + + + + + - + - - - + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - + + diff --git a/Qora/libs/mapdb-1.0.7.jar b/Qora/libs/mapdb-1.0.7.jar deleted file mode 100644 index 2bc659e7..00000000 Binary files a/Qora/libs/mapdb-1.0.7.jar and /dev/null differ diff --git a/Qora/libs/mapdb-1.0.8.jar b/Qora/libs/mapdb-1.0.8.jar deleted file mode 100644 index 14ab2306..00000000 Binary files a/Qora/libs/mapdb-1.0.8.jar and /dev/null differ diff --git a/Qora/libs/sourceandjavadoc/mapdb-1.0.7-javadoc.jar b/Qora/libs/sourceandjavadoc/mapdb-1.0.7-javadoc.jar deleted file mode 100644 index 94c612b4..00000000 Binary files a/Qora/libs/sourceandjavadoc/mapdb-1.0.7-javadoc.jar and /dev/null differ diff --git a/Qora/libs/sourceandjavadoc/mapdb-1.0.7-sources.jar b/Qora/libs/sourceandjavadoc/mapdb-1.0.7-sources.jar deleted file mode 100644 index b18f87e8..00000000 Binary files a/Qora/libs/sourceandjavadoc/mapdb-1.0.7-sources.jar and /dev/null differ diff --git a/Qora/log4j.properties b/Qora/log4j.properties index 2e2750f5..2aff324b 100644 --- a/Qora/log4j.properties +++ b/Qora/log4j.properties @@ -8,7 +8,11 @@ log4j.logger.network.ConnectionCreator=info log4j.logger.network.Network=info log4j.logger.network.Peer=info log4j.logger.settings.Settings=info +log4j.logger.qora.block.Block=info +log4j.logger.qora.transaction.Transaction=info log4j.logger.qora.BlockChain=info +log4j.logger.qora.Synchronizer=info +log4j.logger.at.AT_Controller=info log4j.logger.ntp.NTP=info log4j.logger.wallet.Wallet=info log4j.logger.Start=info @@ -44,4 +48,4 @@ log4j.appender.FILE.layout.conversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}: log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n \ No newline at end of file +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n diff --git a/Qora/src/api/PeersResource.java b/Qora/src/api/PeersResource.java index b81701dd..1b2dbe60 100644 --- a/Qora/src/api/PeersResource.java +++ b/Qora/src/api/PeersResource.java @@ -71,7 +71,16 @@ public String addPeer(String address) { throw ApiErrorFactory.getInstance().createError( ApiErrorFactory.ERROR_INVALID_NETWORK_ADDRESS); } - peer.addPingCounter(); + /* + * XXX what is the purpose of this call? + * PeerMap.addPeer() tests for peer.pingCounter > 1 but after this call + * peer.pingCounter will only be 1 so there's no difference code-path-wise + * between pingCounter being 0 or 1. + * Also creating a new Peer starts a Pinger thread which will increment pingCounter + * and call PeerMap.addPeer() + */ + // was: peer.addPingCounter(); + // now: peer.onPingSuccess(); which also performs next line: DBSet.getInstance().getPeerMap().addPeer(peer); return "OK"; @@ -173,7 +182,7 @@ else if(DBSet.getInstance().getPeerMap().contains(peer.getAddress().getAddress() o.put("version", Controller.getInstance().getVersionOfPeer(peer).getA()); o.put("buildTime", DateTimeFormat.timestamptoString(Controller.getInstance().getVersionOfPeer(peer).getB(), "yyyy-MM-dd HH:mm:ss z", "UTC")); } - if(peer.isPinger()) { + if(peer.hasPinger()) { o.put("ping", peer.getPing()); } if(peer.getConnectionTime()>0) { diff --git a/Qora/src/at/AT_Controller.java b/Qora/src/at/AT_Controller.java index 8094b78d..d082378f 100644 --- a/Qora/src/at/AT_Controller.java +++ b/Qora/src/at/AT_Controller.java @@ -20,6 +20,7 @@ import qora.account.Account; import qora.crypto.Base58; import qora.crypto.Crypto; +import utils.Converter; import com.google.common.primitives.Bytes; import com.google.common.primitives.Longs; @@ -426,7 +427,7 @@ public static AT_Block validateATs( byte[] blockATs , int blockHeight , DBSet db md5 = digest.digest( at.getBytes() ); if ( !Arrays.equals( md5 , ats.get( atIdBuffer ) ) ) { - throw new AT_Exception( "Calculated md5 and recieved md5 are not matching" ); + throw new AT_Exception( "Calculated MD5 " + Converter.toHex(md5) + " and received MD5 " + Converter.toHex(ats.get(atIdBuffer)) + " do not match" ); } tempAtStates.put( new String(at.getId() , "UTF-8") , state ); } diff --git a/Qora/src/controller/Controller.java b/Qora/src/controller/Controller.java index 9d04ed4c..406a3df6 100644 --- a/Qora/src/controller/Controller.java +++ b/Qora/src/controller/Controller.java @@ -93,11 +93,11 @@ public class Controller extends Observable { private static final Logger LOGGER = Logger.getLogger(Controller.class); - private String version = "0.26.0"; - private String buildTime = "2016-05-24 00:00:00 UTC"; + private String version = "0.26.2.1"; + private String buildTime = "2017-11-06 11:33:00 UTC"; private long buildTimestamp; - public static final String releaseVersion = "0.26.0"; + public static final String releaseVersion = "0.26.2.1"; // TODO ENUM would be better here public static final int STATUS_NO_CONNECTIONS = 0; @@ -403,6 +403,10 @@ public void start() throws Exception { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { + Thread.currentThread().setName("Controller Shutdown"); + + LOGGER.info("Controller shutdown hook"); + stopAll(); } }); @@ -604,6 +608,9 @@ public void stopAll() { if (!this.isStopping) { this.isStopping = true; + // STOP SENDING OUR HEIGHT TO PEERS + this.timerPeerHeightUpdate.cancel(); + // STOP MESSAGE PROCESSOR LOGGER.info(Lang.getInstance().translate("Stopping message processor")); this.network.stop(); @@ -612,7 +619,11 @@ public void stopAll() { LOGGER.info(Lang.getInstance().translate("Stopping block processor")); this.synchronizer.stop(); - // CLOSE DATABABASE + // STOP BLOCK GENERATOR + LOGGER.info(Lang.getInstance().translate("Stopping block generator")); + this.blockGenerator.shutdown(); + + // CLOSE DATABASE LOGGER.info(Lang.getInstance().translate("Closing database")); DBSet.getInstance().close(); @@ -623,8 +634,6 @@ public void stopAll() { createDataCheckpoint(); LOGGER.info(Lang.getInstance().translate("Closed.")); - // FORCE CLOSE - System.exit(0); } } @@ -1015,8 +1024,9 @@ public void update() { Peer peer = null; try { - // WHILE NOT UPTODATE - while (!this.isUpToDate()) { + // Synchronize while we're not up-to-date + // (but bail out if we're shutdown while updating blockchain) + while (!this.isStopping && !this.isUpToDate()) { // START UPDATE FROM HIGHEST HEIGHT PEER peer = this.getMaxHeightPeer(); diff --git a/Qora/src/database/BlockMap.java b/Qora/src/database/BlockMap.java index d88c8a48..53b13775 100644 --- a/Qora/src/database/BlockMap.java +++ b/Qora/src/database/BlockMap.java @@ -46,10 +46,14 @@ public BlockMap(DBSet databaseSet, DB database) this.observableData.put(DBMap.NOTIFY_LIST, ObserverMessage.LIST_BLOCK_TYPE); //LAST BLOCK + if ( !database.exists("lastBlock") ) + database.createAtomicVar("lastBlock", new byte[0], null); this.lastBlockVar = database.getAtomicVar("lastBlock"); this.lastBlockSignature = this.lastBlockVar.get(); //PROCESSING + if ( !database.exists("processingBlock") ) + database.createAtomicVar("processingBlock", false, null); this.processingVar = database.getAtomicVar("processingBlock"); this.processing = this.processingVar.get(); } diff --git a/Qora/src/database/DBSet.java b/Qora/src/database/DBSet.java index f854073b..308ec3a4 100644 --- a/Qora/src/database/DBSet.java +++ b/Qora/src/database/DBSet.java @@ -74,7 +74,6 @@ public static void reCreateDatabase() { //CREATE DATABASE DB database = DBMaker.newFileDB(dbFile) - .closeOnJvmShutdown() .cacheSize(2048) .checksumEnable() .mmapFileEnableIfSupported() diff --git a/Qora/src/database/wallet/SecureWalletDatabase.java b/Qora/src/database/wallet/SecureWalletDatabase.java index 77994f50..aa26cab4 100644 --- a/Qora/src/database/wallet/SecureWalletDatabase.java +++ b/Qora/src/database/wallet/SecureWalletDatabase.java @@ -36,7 +36,6 @@ public SecureWalletDatabase(String password) this.database = DBMaker.newFileDB(SECURE_WALLET_FILE) .encryptionEnable(password) - .closeOnJvmShutdown() .cacheSize(2048) .checksumEnable() .mmapFileEnableIfSupported() diff --git a/Qora/src/database/wallet/WalletDatabase.java b/Qora/src/database/wallet/WalletDatabase.java index 2214a680..ef9d6795 100644 --- a/Qora/src/database/wallet/WalletDatabase.java +++ b/Qora/src/database/wallet/WalletDatabase.java @@ -3,6 +3,7 @@ import java.io.File; import org.mapdb.Atomic.Var; +import org.apache.log4j.Logger; import org.mapdb.DB; import org.mapdb.DBMaker; @@ -12,6 +13,7 @@ public class WalletDatabase implements IDB { + private static final Logger LOGGER = Logger.getLogger(WalletDatabase.class); private static final File WALLET_FILE = new File(Settings.getInstance().getWalletDir(), "wallet.dat"); private static final String VERSION = "version"; @@ -43,7 +45,6 @@ public WalletDatabase() //transactionFile.delete(); this.database = DBMaker.newFileDB(WALLET_FILE) - .closeOnJvmShutdown() .cacheSize(2048) .checksumEnable() .mmapFileEnableIfSupported() @@ -58,6 +59,18 @@ public WalletDatabase() this.assetMap = new AssetMap(this, this.database); this.orderMap = new OrderMap(this, this.database); this.assetFavoritesSet = new AssetFavoritesSet(this, this.database); + + if ( !this.database.exists(LAST_BLOCK) ) + this.database.createAtomicVar(LAST_BLOCK, new byte[0], null); + + // If LAST_BLOCK has previously been incorrectly initialized + // then reset back to empty byte array + try { + getLastBlockSignature(); + } catch (ClassCastException e) { + LOGGER.info("Resetting wallet's last block signature"); + this.setLastBlockSignature(new byte[0]); + } } public void setVersion(int version) @@ -78,7 +91,7 @@ public void setLastBlockSignature(byte[] signature) public byte[] getLastBlockSignature() { - Var atomic = this.database.getAtomicVar(LAST_BLOCK); + Var atomic = this.database.getAtomicVar(LAST_BLOCK); return atomic.get(); } diff --git a/Qora/src/network/ConnectionAcceptor.java b/Qora/src/network/ConnectionAcceptor.java index 6272b3c7..470d8f2c 100644 --- a/Qora/src/network/ConnectionAcceptor.java +++ b/Qora/src/network/ConnectionAcceptor.java @@ -2,6 +2,7 @@ import java.net.ServerSocket; import java.net.Socket; +import java.net.SocketException; import lang.Lang; import ntp.NTP; @@ -87,6 +88,14 @@ public void run() } } } + catch(SocketException e) + { + if (this.isRun) + { + LOGGER.error(e.getMessage(),e); + LOGGER.warn(Lang.getInstance().translate("Error accepting new connection")); + } + } catch(Exception e) { LOGGER.error(e.getMessage(),e); @@ -98,5 +107,13 @@ public void run() public void halt() { this.isRun = false; + + if (socket != null && !socket.isClosed()) { + try { + socket.close(); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + } } } diff --git a/Qora/src/network/Network.java b/Qora/src/network/Network.java index 0fe184f8..c9c27ec1 100644 --- a/Qora/src/network/Network.java +++ b/Qora/src/network/Network.java @@ -63,7 +63,7 @@ private void start() @Override public void onConnect(Peer peer) { - LOGGER.info(Lang.getInstance().translate("Connection successfull : ") + peer.getAddress()); + LOGGER.info(Lang.getInstance().translate("Connection successful : ") + peer.getAddress()); //ADD TO CONNECTED PEERS synchronized(this.connectedPeers) @@ -101,7 +101,7 @@ public void onDisconnect(Peer peer) { //CLOSE CONNECTION IF STILL ACTIVE peer.close(); - peer.interrupt(); + peer.interrupt(); // might be handled inside peer.close() //NOTIFY OBSERVERS this.setChanged(); diff --git a/Qora/src/network/Peer.java b/Qora/src/network/Peer.java index ede096cc..816984f4 100644 --- a/Qora/src/network/Peer.java +++ b/Qora/src/network/Peer.java @@ -1,9 +1,13 @@ package network; import java.io.DataInputStream; +import java.io.IOException; import java.io.OutputStream; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -15,6 +19,7 @@ import lang.Lang; import network.message.Message; import network.message.MessageFactory; +import network.message.MessageException; import ntp.NTP; import org.apache.log4j.Logger; @@ -23,7 +28,7 @@ import controller.Controller; import database.DBSet; -public class Peer extends Thread{ +public class Peer extends Thread { private InetAddress address; private ConnectionCallback callback; @@ -33,280 +38,381 @@ public class Peer extends Thread{ private boolean white; private long pingCounter; private long connectionTime; - + + private static final int INACTIVITY_TIMEOUT = 60 * 60 * 1000; // one hour private static final Logger LOGGER = Logger.getLogger(Peer.class); private Map> messages; - - public Peer(InetAddress address) - { + + /** + * Construct simple, non-connected Peer + * + * @param address + */ + public Peer(InetAddress address) { this.address = address; - this.messages = Collections.synchronizedMap(new HashMap>()); } - - public Peer(ConnectionCallback callback, Socket socket) - { - try - { - this.callback = callback; - this.socket = socket; - this.address = socket.getInetAddress(); + + /** + * Construct Peer based on existing connected socket + *

+ * + * @param callback + * @param socket + */ + public Peer(ConnectionCallback callback, Socket socket) { + this.callback = callback; + this.socket = socket; + this.address = socket.getInetAddress(); + + this.setup(false); + } + + /** + * Set up initial peer values + *

+ * Set up initial peer settings, e.g. socket timeout, ping thread & counter, + * etc.
+ * Will close peer if setup fails. On success, will call + * ConnectionCallback.onConnect + * + * @param white + * @see Pinger + * @see ConnectionCallback#onConnect(Peer) + */ + private void setup(boolean white) { + try { this.messages = Collections.synchronizedMap(new HashMap>()); - this.white = false; + this.white = white; this.pingCounter = 0; this.connectionTime = NTP.getTime(); - - //ENABLE KEEPALIVE - //this.socket.setKeepAlive(true); - - //TIMEOUT - this.socket.setSoTimeout(1000*60*60); - - //CREATE STRINGWRITER + + // Enable TCP keep-alive packets + // this.socket.setKeepAlive(true); + + // Inactivity timeout - see run() + this.socket.setSoTimeout(INACTIVITY_TIMEOUT); + + // Grab reference to output stream this.out = socket.getOutputStream(); - - //START COMMUNICATON THREAD + + // Start main communication thread this.start(); - - //START PINGER + + // Start Pinger (requires main communication thread) this.pinger = new Pinger(this); - - //ON SOCKET CONNECT - this.callback.onConnect(this); - } - catch(Exception e) - { - LOGGER.debug(e.getMessage(),e); - //FAILED TO CONNECT NO NEED TO BLACKLIST - LOGGER.info("Failed to connect to : " + address); + + // Notify peer is connected + this.callback.onConnect(this); + } catch (Exception e) { + // Connection setup failure NO NEED TO BLACKLIST + LOGGER.info("Failed to connect to " + address + ": " + e.getMessage()); + LOGGER.debug(e.getMessage(), e); + + // peer no longer usable + this.close(); } } - - public InetAddress getAddress() - { + + public InetAddress getAddress() { return address; } - - public long getPingCounter() - { + + /* + * Ping-related + */ + + /** + * Get number of times we've successfully pinged peer. + * + * @return number of pings, 0+ + */ + public long getPingCounter() { return this.pingCounter; } - - public void addPingCounter() - { - this.pingCounter ++; + + /** + * Callback, used by Pinger, on successful ping. + *

+ * Updates ping counter and peer info in PeerMap database. + * + * @see Pinger#run() + * @see PingMap + */ + public void onPingSuccess() { + this.pingCounter++; + + if (!DBSet.getInstance().isStoped()) + DBSet.getInstance().getPeerMap().addPeer(this); + } + + /** + * Callback, used by Pinger, on ping failure. + *

+ * Disconnects peer using ConnectionCallback.onDisconnect(Peer) + * which is typically a Network object. + * + * @see Pinger#run() + * @see ConnectionCallback#onDisconnect(Peer) + * @see Network#onDisconnect(Peer) + */ + public void onPingFailure() { + // Disconnect + this.callback.onDisconnect(this); } - - public long getPing() - { + + /** + * Get most recent ping round-trip time. + * + * @return ping RTT time in milliseconds or Long.MAX_VALUE if no ping yet. + * @see Pinger#getPing() + */ + public long getPing() { return this.pinger.getPing(); } - - public boolean isPinger() - { + + /** + * Do we have a Pinger object for this peer? + *

+ * NB: Pinger may not necessarily be running. + * + * @return true if we have a Pinger object + * @see Pinger + */ + public boolean hasPinger() { return this.pinger != null; } - - public void connect(ConnectionCallback callback) - { - if(DBSet.getInstance().isStoped()){ + + /** + * Connect to address using timeout from settings. + *

+ * On success, ConnectionCallback.onConnect() is called.
+ * On failure, we simply return. + * + * @param callback + * @see ConnectionCallback#onConnect(Peer) + */ + public void connect(ConnectionCallback callback) { + // XXX we don't actually use DB so replace with cleaner "are we shutting + // down?" test + if (DBSet.getInstance().isStoped()) { return; } - + this.callback = callback; - this.white = true; - this.pingCounter = 0; - this.connectionTime = NTP.getTime(); - - try - { - //OPEN SOCKET - this.socket = new Socket(address, Controller.getInstance().getNetworkPort()); - - //ENABLE KEEPALIVE - //this.socket.setKeepAlive(true); - - //TIMEOUT - this.socket.setSoTimeout(1000*60*60); - - //CREATE STRINGWRITER - this.out = socket.getOutputStream(); - - //START COMMUNICATON THREAD - this.start(); - - //START PINGER - this.pinger = new Pinger(this); - - //ON SOCKET CONNECT - this.callback.onConnect(this); - } - catch(Exception e) - { - LOGGER.debug(e.getMessage(),e); - LOGGER.info(Lang.getInstance().translate("Failed to connect to : ") + address); + + // Create new socket for connection to peer + this.socket = new Socket(); + + // Collate this.address and destination port from controller + InetSocketAddress socketAddress = new InetSocketAddress(address, Controller.getInstance().getNetworkPort()); + + // Attempt to connect, with timeout from settings + try { + this.socket.connect(socketAddress, Settings.getInstance().getConnectionTimeout()); + } catch (Exception e) { + LOGGER.info(Lang.getInstance().translate("Failed to connect to ") + address + ": " + e.getMessage()); + return; } + + this.setup(true); } - - public void run() - { - try - { + + /** + * Main communication thread + *

+ * Waits for incoming messages from peer, unless inactivity timeout reached. + *

+ * If something is waiting for a message with a specific ID then they are + * notified so it can be processed. Otherwise the message is added to our + * queue, keyed by message ID. + * + * @see #getResponse(Message) + * @see MessageFactory#parse(Peer, DataInputStream) + * @see ConnectionCallback#onMessage(Message) + * @see ConnectionCallback#onDisconnect(Message) + */ + public void run() { + Thread.currentThread().setName("Peer " + this.address.toString()); + + try { DataInputStream in = new DataInputStream(socket.getInputStream()); - - while(true) - { - //READ FIRST 4 BYTES + + while (true) { + // Read only enough bytes to cover Message "magic" preamble byte[] messageMagic = new byte[Message.MAGIC_LENGTH]; in.readFully(messageMagic); - - if(Arrays.equals(messageMagic, Controller.getInstance().getMessageMagic())) - { - //PROCESS NEW MESSAGE - Message message = MessageFactory.getInstance().parse(this, in); - - //Logger.getGlobal().info("received message " + message.getType() + " from " + this.address.toString()); - - //CHECK IF WE ARE WAITING FOR A MESSAGE WITH THAT ID - if(message.hasId() && this.messages.containsKey(message.getId())) - { - //ADD TO OUR OWN LIST - this.messages.get(message.getId()).add(message); - } - else - { - //CALLBACK - this.callback.onMessage(message); - } - } - else - { - //ERROR - callback.onError(this, Lang.getInstance().translate("received message with wrong magic")); + + if (!Arrays.equals(messageMagic, Controller.getInstance().getMessageMagic())) { + // Didn't receive valid Message "magic" + this.callback.onError(this, + Lang.getInstance().translate("received message with wrong magic") + " " + address); return; } + + // Attempt to parse incoming message - throws on failure + Message message = MessageFactory.getInstance().parse(this, in); + + // LOGGER.debug("Received message (type " + message.getType() + + // ") from " + this.address); + + // If there's a queue for this message ID then add message to + // queue + if (message.hasId() && this.messages.containsKey(message.getId())) { + // Adding message to queue will unblock waiting caller (if + // any) + this.messages.get(message.getId()).add(message); + } else { + // Generic message callback + this.callback.onMessage(message); + } } - } - catch (Exception e) - { - LOGGER.debug(e.getMessage(),e); - - //DISCONNECT - callback.onDisconnect(this); + } catch (InterruptedException e) { + // peer connection being closed - simply exit + return; + } catch (SocketTimeoutException e) { + LOGGER.info(Lang.getInstance().translate("Inactivity timeout with peer") + " " + address); + + // Disconnect peer + this.callback.onDisconnect(this); + return; + } catch (SocketException e) { + LOGGER.info(Lang.getInstance().translate("Socket issue with peer") + " " + address); + + // Disconnect peer + this.callback.onDisconnect(this); + return; + } catch (MessageException e) { + // Suspect peer + this.callback.onError(this, e.getMessage()); + return; + } catch (Exception e) { + // not expected as above + LOGGER.debug(e.getMessage(), e); + + // Disconnect peer + this.callback.onDisconnect(this); return; } } - - public boolean sendMessage(Message message) - { - try - { - //CHECK IF SOCKET IS STILL ALIVE - if(!this.socket.isConnected()) - { - //ERROR + + /** + * Attempt to send Message to peer + * + * @param message + * @return true if message successfully sent; + * false otherwise + */ + public boolean sendMessage(Message message) { + try { + // CHECK IF SOCKET IS STILL ALIVE + if (!this.socket.isConnected()) { + // ERROR callback.onError(this, Lang.getInstance().translate("socket not still alive")); - + return false; } - - //SEND MESSAGE - synchronized(this.out) - { + + // SEND MESSAGE + synchronized (this.out) { this.out.write(message.toBytes()); this.out.flush(); } - - //RETURN + + // RETURN return true; - } - catch (Exception e) - { - LOGGER.debug(e.getMessage(),e); - //ERROR + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + // ERROR callback.onError(this, e.getMessage()); - - //RETURN + + // RETURN return false; } } - - public Message getResponse(Message message) - { - //GENERATE ID + + /** + * Send message to peer and await response. + *

+ * Message is assigned a random ID and sent. If a response with matching ID + * is received then it is returned to caller. + *

+ * If no response with matching ID within timeout, or some other + * error/exception occurs, then return null. (Assume peer will + * be rapidly disconnected after this). + * + * @param message + * @return Message if valid response received; + * null if not or error/exception occurs + */ + public Message getResponse(Message message) { + // Assign random ID to this message int id = (int) ((Math.random() * 1000000) + 1); - - //SET ID message.setId(id); - - //PUT QUEUE INTO MAP SO WE KNOW WE ARE WAITING FOR A RESPONSE + + // Put queue into map (keyed by message ID) so we can poll for a + // response BlockingQueue blockingQueue = new ArrayBlockingQueue(1); this.messages.put(id, blockingQueue); - - //WHEN FAILED TO SEND MESSAGE - if(!this.sendMessage(message)) - { + + // Try to send message + if (!this.sendMessage(message)) { + this.messages.remove(id); return null; } - - try - { + + try { Message response = blockingQueue.poll(Settings.getInstance().getConnectionTimeout(), TimeUnit.MILLISECONDS); this.messages.remove(id); - + + if (response == null && this.socket.isConnected()) + LOGGER.info("Timed out while waiting for response from peer " + address); + return response; - } - catch (InterruptedException e) - { - //NO MESSAGE RECEIVED WITHIN TIME; + } catch (InterruptedException e) { + // Our thread was interrupted. Probably in shutdown scenario. + LOGGER.info("Interrupted while waiting for response from peer " + address); + this.messages.remove(id); return null; } } - - public void onPingFail() - { - //DISCONNECTED - this.callback.onDisconnect(this); + + public boolean isWhite() { + return this.white; } - public boolean isWhite() - { - return this.white; + public long getConnectionTime() { + return this.connectionTime; } - - public long getConnectionTime() - { - return this.connectionTime; - } - - public boolean isBad() - { - return DBSet.getInstance().getPeerMap().isBad(this.getAddress()); + + public boolean isBad() { + return DBSet.getInstance().getPeerMap().isBad(this.getAddress()); } - - public void close() - { - try - { - //STOP PINGER - if(this.pinger != null) - { - this.pinger.stopPing(); - } - - //CHECK IS SOCKET EXISTS - if(socket != null) - { - //CHECK IF SOCKET IS CONNECTED - if(socket.isConnected()) - { - //CLOSE SOCKET - socket.close(); - } - } + + /** + * Close connection to peer + *

+ * Can be called during normal operation or also in case of error, shutdown, + * etc. + * + * @see Pinger#stopPing() + */ + public void close() { + // Stop Pinger if applicable + if (this.pinger != null) + this.pinger.stopPing(); + + try { + // maybe interrupt() run() thread to differentiate from peer closing + // connection? + /* + * if (this.isAlive()) { this.interrupt(); this.join(); } + */ + + // Close socket if applicable + if (socket != null && socket.isConnected()) + socket.close(); + } catch (IOException e) { + LOGGER.debug("Error closing socket connection to peer " + address + ": " + e.getMessage(), e); } - catch(Exception e) - { - LOGGER.debug(e.getMessage(),e); - } } } diff --git a/Qora/src/network/Pinger.java b/Qora/src/network/Pinger.java index 58dd0225..d56d2867 100644 --- a/Qora/src/network/Pinger.java +++ b/Qora/src/network/Pinger.java @@ -2,92 +2,110 @@ import org.apache.log4j.Logger; -import database.DBSet; import network.message.Message; import network.message.MessageFactory; import settings.Settings; -public class Pinger extends Thread -{ - +/** + * Pinger is a Thread that periodically pings a Peer to maintain/check + * connectivity. + */ +public class Pinger extends Thread { + private static final Logger LOGGER = Logger.getLogger(Pinger.class); private Peer peer; - private boolean run; + /** + * Most recent ping round-trip time in milliseconds, or Long.MAX_VALUE if no + * ping yet. + */ private long ping; - - public Pinger(Peer peer) - { + + /** + * Simple Pinger constructor + *

+ * Will start Pinger thread. + * @param peer + * @see #run() + */ + public Pinger(Peer peer) { this.peer = peer; - this.run = true; this.ping = Long.MAX_VALUE; - + this.start(); } - - public long getPing() - { + + /** + * Get last ping's round-trip time. + * + * @return ping's RTT in milliseconds or Long.MAX_VALUE if no ping yet. + */ + public long getPing() { return this.ping; } - - public void run() - { - while(this.run) - { - //CREATE PING - Message pingMessage = MessageFactory.getInstance().createPingMessage(); - - if(!this.run) - break; - - //GET RESPONSE + + /** + * Repeatedly ping peer using interval from settings. + *

+ * Will exit if interrupted, typically by stopPing() + * + * @see #stopPing() + * @see Peer#onPingSuccess() + * @see Peer#onPingFailure() + */ + @Override + public void run() { + Thread.currentThread().setName("Pinger " + this.peer.getAddress()); + + while (true) { + // Send ping message to peer long start = System.currentTimeMillis(); + Message pingMessage = MessageFactory.getInstance().createPingMessage(); + // NB: Peer.getResponse returns null if no response within timeout + // or interrupt occurs Message response = this.peer.getResponse(pingMessage); - if(!this.run) - break; - - //CHECK IF VALID PING - if(response == null || response.getType() != Message.PING_TYPE) - { - //PING FAILES - this.peer.onPingFail(); - - //STOP PINGER - this.run = false; + // Check for valid ping response + if (response == null || response.getType() != Message.PING_TYPE) { + // Notify Peer that ping has failed. + // NB: currently Peer.onPingFailure() may call Pinger.stopPing() + // (see below) + LOGGER.debug("Ping failure with " + this.peer.getAddress()); + this.peer.onPingFailure(); return; } - - //UPDATE PING + + // Calculate ping's round-trip time and notify peer this.ping = System.currentTimeMillis() - start; - this.peer.addPingCounter(); - - if(!DBSet.getInstance().isStoped()){ - DBSet.getInstance().getPeerMap().addPeer(this.peer); - } - - //SLEEP - try - { + this.peer.onPingSuccess(); + + // Sleep until we need to send next ping + try { Thread.sleep(Settings.getInstance().getPingInterval()); - } - catch (InterruptedException e) - { - //FAILED TO SLEEP + } catch (InterruptedException e) { + // If interrupted, usually by stopPing(), we need to exit thread + return; } } } - public void stopPing() - { - try - { - this.run = false; + /** + * Stop pinging peer. + *

+ * Usually called by Peer.close() + * + * @see Peer#close() + */ + public void stopPing() { + if (this.isAlive()) { this.interrupt(); - this.join(); - } - catch(Exception e) - { - LOGGER.debug(e.getMessage(),e); + + try { + this.join(); + } catch (InterruptedException e) { + // We've probably reached here from run() above calling + // Peer.onPingFailure() so when we return run() above will + // terminate + } } } } diff --git a/Qora/src/network/message/MessageException.java b/Qora/src/network/message/MessageException.java new file mode 100644 index 00000000..c1166235 --- /dev/null +++ b/Qora/src/network/message/MessageException.java @@ -0,0 +1,9 @@ +package network.message; + +public class MessageException extends Exception { + private static final long serialVersionUID = 1L; + + public MessageException(String message) { + super(message); + } +} \ No newline at end of file diff --git a/Qora/src/network/message/MessageFactory.java b/Qora/src/network/message/MessageFactory.java index af71fafa..d8f9d774 100644 --- a/Qora/src/network/message/MessageFactory.java +++ b/Qora/src/network/message/MessageFactory.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.logging.Logger; +import network.message.MessageException; import qora.block.Block; import qora.crypto.Crypto; import qora.transaction.Transaction; @@ -147,7 +148,7 @@ public Message parse(Peer sender, DataInputStream inputStream) throws Exception //CHECK IF CHECKSUM MATCHES if(!Arrays.equals(checksum, digest)) { - throw new Exception(Lang.getInstance().translate("Invalid data checksum length=")+length); + throw new MessageException(Lang.getInstance().translate("Invalid data checksum length=")+length); } } diff --git a/Qora/src/qora/BlockBuffer.java b/Qora/src/qora/BlockBuffer.java index d5e02ca3..750b0006 100644 --- a/Qora/src/qora/BlockBuffer.java +++ b/Qora/src/qora/BlockBuffer.java @@ -39,6 +39,8 @@ public BlockBuffer(List signatures, Peer peer) public void run() { + Thread.currentThread().setName("BlockBuffer"); + while(this.run) { for(int i=0; i(); } @@ -104,8 +107,12 @@ public void run() { try { Thread.sleep(250); } catch (InterruptedException e) { -// does not matter + // does not matter } + + // If we're shutting down - exit thread + if (DBSet.getInstance().isStoped()) + return; } Controller.getInstance().addWalletListener(BlockGenerator.this); @@ -167,17 +174,16 @@ private void setForgingStatus(ForgingStatus status) public void run() { - while(true) + Thread.currentThread().setName("BlockGenerator"); + + while(!this.stopping && !DBSet.getInstance().isStoped()) { - if(DBSet.getInstance().isStoped()) - continue; - //CHECK IF WE ARE UPTODATE if(!Controller.getInstance().isUpToDate() && !Controller.getInstance().isProcessingWalletSynchronize()) { Controller.getInstance().update(); } - + //CHECK IF WE HAVE CONNECTIONS if(forgingStatus == ForgingStatus.FORGING) { @@ -274,6 +280,15 @@ public void run() } } + public void shutdown() { + this.stopping = true; + try { + this.join(); + } catch (InterruptedException e) { + // ... + } + } + public Block generateNextBlock(DBSet db, PrivateKeyAccount account, Block block) { //CHECK IF ACCOUNT HAS BALANCE @@ -419,8 +434,11 @@ public void addUnconfirmedTransactions(DBSet db, Block block) //PROCESS IN NEWBLOCKDB transaction.process(newBlockDb); - //TRANSACTION PROCESSES + //TRANSACTION PROCESSED transactionProcessed = true; + + //INCREASE TRANSACTIONS SIZE TOTAL + totalBytes += transaction.getDataLength(); break; } } diff --git a/Qora/src/qora/Synchronizer.java b/Qora/src/qora/Synchronizer.java index 8f01d54f..45cdc263 100644 --- a/Qora/src/qora/Synchronizer.java +++ b/Qora/src/qora/Synchronizer.java @@ -191,6 +191,16 @@ public void synchronize(Peer peer) throws Exception //GET BLOCK Block block = blockBuffer.getBlock(signature); + if (block == null) + { + LOGGER.info("Failed to receive block from peer"); + break; + } + + // We're shutting down - bail out + if (!this.run) + break; + //PROCESS BLOCK if(!this.process(block)) { @@ -353,6 +363,7 @@ public synchronized boolean process(Block block) public void stop() { this.run = false; + // XXX surely does nothing? this.process(null); } } diff --git a/Qora/src/qora/block/Block.java b/Qora/src/qora/block/Block.java index 2f0880e0..8fac4b61 100644 --- a/Qora/src/qora/block/Block.java +++ b/Qora/src/qora/block/Block.java @@ -774,7 +774,7 @@ public void process(DBSet db) //UPDATE LAST BLOCK db.getBlockMap().setLastBlock(this); - System.out.println("Process Block " + height); + LOGGER.info("Processed block " + height); } public void orphan() diff --git a/Qora/src/qora/transaction/DeployATTransaction.java b/Qora/src/qora/transaction/DeployATTransaction.java index 09f47f41..c33f6155 100644 --- a/Qora/src/qora/transaction/DeployATTransaction.java +++ b/Qora/src/qora/transaction/DeployATTransaction.java @@ -560,7 +560,6 @@ public HashSet getInvolvedAccounts() { HashSet accounts = new HashSet<>(); accounts.add(this.creator); - accounts.addAll(this.getRecipientAccounts()); return accounts; } @@ -568,7 +567,6 @@ public HashSet getInvolvedAccounts() public HashSet getRecipientAccounts() { HashSet accounts = new HashSet<>(); - accounts.add(this.getATaccount()); return accounts; } diff --git a/Qora/src/qora/transaction/Transaction.java b/Qora/src/qora/transaction/Transaction.java index a0502e60..e36ef792 100644 --- a/Qora/src/qora/transaction/Transaction.java +++ b/Qora/src/qora/transaction/Transaction.java @@ -21,7 +21,7 @@ public abstract class Transaction { - private static final Logger LOGGER = Logger.getLogger(Transaction.class); + protected static final Logger LOGGER = Logger.getLogger(Transaction.class); //VALIDATION CODE public static final int VALIDATE_OK = 1; public static final int INVALID_ADDRESS = 2;