Skip to content

Commit

Permalink
Implements v2 of the spec
Browse files Browse the repository at this point in the history
* Custom data pre-processing (currently includes No-Op and AES e2e encryption)
* LZ4 frame compression
  • Loading branch information
austinv11 committed Jun 9, 2017
1 parent fbaa03e commit 053513f
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 41 deletions.
11 changes: 6 additions & 5 deletions .idea/modules/persistence.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Persisting data across multiple clients made easy.

## The protocol
This API implements the peer-to-peer-persistence-protocol v1 (available
This API implements the peer-to-peer-persistence-protocol v2 (available
[here](https://gist.github.com/austinv11/b91ada1d9f85e9ef3fdeb08952916c47)). This means the nodes this communicates with
aren't bound to this particular implementation!

Expand Down Expand Up @@ -76,6 +76,5 @@ whereby they can only intercept method calls to *interfaces*.

## The future
* Provide an annotation processing api in addition to proxies.
* Implement authentication and input validation.
* Migrate from TCP to a more robust protocol.
* Compress sent data
* Implement more robust input validation.

6 changes: 4 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
group 'com.austinv11'
version '1.0.0'
version '1.1.0'

buildscript {
ext.kotlin_version = '1.1.2-2'
Expand All @@ -8,6 +8,7 @@ buildscript {
ext.msgpack_version = '0.8.13'
ext.dokka_version = '0.9.14'
ext.jsr305_version = '3.0.0'
ext.commons_compression_version = '1.14'

repositories {
jcenter()
Expand Down Expand Up @@ -35,9 +36,10 @@ dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib-jre8:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
compile "org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:$kotlin_coroutines_version"
compile "com.google.code.findbugs:jsr305:$jsr305_version"
compileOnly "com.google.code.findbugs:jsr305:$jsr305_version"
compile "org.slf4j:slf4j-api:$slf4j_version"
compile "org.msgpack:msgpack-core:$msgpack_version"
compile "org.apache.commons:commons-compress:$commons_compression_version"//TODO replace with https://github.com/lz4/lz4-java when its updated

testCompile "org.slf4j:slf4j-simple:$slf4j_version"
testCompile group: 'junit', name: 'junit', version: '4.12'
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/com/austinv11/persistence/PreProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.austinv11.persistence;

import javax.annotation.Nonnull;

/**
* This represents a custom preprocessor which mutates data before consumption and mutates data before packaging.
*/
public interface PreProcessor {

/**
* A unique key for this pre processor type.
*
* @return The key for the pre-processor.
*/
byte getKey();

/**
* Called to pack data to be sent.
*
* @param host The host this data belongs to.
* @param port The port from the provided host this data belongs to.
* @param input The data to pack.
* @return The packed data.
*/
@Nonnull
byte[] pack(@Nonnull String host, int port, @Nonnull byte[] input);

/**
* Called to consume data received.
*
* @param host The host this data belongs to.
* @param port The port from the provided host this data belongs to.
* @param input The data to consume.
* @return The processed data.
*/
@Nonnull
byte[] consume(@Nonnull String host, int port, @Nonnull byte[] input);
}
13 changes: 13 additions & 0 deletions src/main/kotlin/com/austinv11/persistence/PersistenceManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.austinv11.persistence
import com.austinv11.persistence.impl.DefaultFactory
import com.austinv11.persistence.impl.NetworkStore
import com.austinv11.persistence.impl.NoOpConnectionSpy
import com.austinv11.persistence.impl.NoOpPreProcessor
import com.austinv11.persistence.internal.SourceAwareProxy
import com.austinv11.persistence.internal.TwoWaySocket
import kotlinx.coroutines.experimental.CommonPool
Expand All @@ -29,6 +30,7 @@ class PersistenceManager {
@Volatile internal var port = 6000
@Volatile internal var allowedConnections = 2
@Volatile internal var spy: ConnectionSpy = NoOpConnectionSpy()
@Volatile internal var processor: PreProcessor = NoOpPreProcessor()
internal val socket: TwoWaySocket by lazy {
TwoWaySocket(this@PersistenceManager, port, allowedConnections, spy)
}
Expand Down Expand Up @@ -197,6 +199,17 @@ class PersistenceManager {
return this
}

/**
* This sets the pre processor for data. By default this uses a No-Op implementation.
*
* @see NoOpPreProcessor
* @see EncryptedPreProcessor
*/
fun setPreProcessor(processor: PreProcessor) : PersistenceManager {
this.processor = processor
return this
}

/**
* Invalidates persisted caches. If this node is disconnected with no other peers, it is recommended that this node
* saves the persisted objects somehow locally.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import kotlin.experimental.and
/**
* This is the version of PPPP this version of the api is using.
*/
const val PPPP_VERSION = 1
const val PPPP_VERSION = 2

internal val logger = LoggerFactory.getLogger("Persistence")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.austinv11.persistence.impl

import com.austinv11.persistence.PreProcessor
import java.security.MessageDigest
import java.security.SecureRandom
import java.util.concurrent.ConcurrentHashMap
import javax.crypto.Cipher
import javax.crypto.spec.SecretKeySpec

internal const val KEY_LENGTH: Int = 16
internal const val SALT_LENGTH: Int = 32

/**
* This encrypts data via a AES with the provided key.
*/
class EncryptedPreProcessor(val key: String) : PreProcessor {

private val secretKey = SecretKeySpec(MessageDigest.getInstance("SHA-1").digest(key.toByteArray()).copyOf(KEY_LENGTH), "AES")
private val encryptionCipher
get() = Cipher.getInstance("AES/ECB/PKCS5Padding").apply { this.init(Cipher.ENCRYPT_MODE, secretKey) }
private val decryptionCipher
get() = Cipher.getInstance("AES/ECB/PKCS5Padding").apply { this.init(Cipher.DECRYPT_MODE, secretKey) }
private val connectionMetadata = ConcurrentHashMap<String, ConnectionMetadata>()

override fun getKey(): Byte = 1

override fun pack(host: String, port: Int, input: ByteArray): ByteArray {
if (!connectionMetadata.containsKey("host:$port")) { //If there isn't metadata on this end, this is a client since its the one to pack first
connectionMetadata["host:$port"] = ConnectionMetadata(false)
}

val metadata = connectionMetadata["host:$port"]!!
synchronized(metadata) {
if (!metadata.didCompleteHandshake) { //No need to do special handling if the handshake is finished
if (metadata.isServerSide) { //When this is server side, we are likely packing the OK payload (if not, who cares? its a failed connection)
metadata.didCompleteHandshake = true //Mark completed handshake server side
return ByteArray(SALT_LENGTH).apply { SecureRandom().nextBytes(this); metadata.salt = this } + input //Create a random salt and prepend it to the (raw) payload since there's no sensitive info yet
} else {
return input //This is the client side, so we don't have the salt yet, so no processing yet
}
}


return encryptionCipher.doFinal(metadata.salt!! + input) //Normal operation, encrypt the input with the prepended salt
}
}

override fun consume(host: String, port: Int, input: ByteArray): ByteArray {
if (!connectionMetadata.containsKey("host:$port")) { //If there isn't metadata on this end, this is a server since its the one to consume first
connectionMetadata["host:$port"] = ConnectionMetadata(true)
}

val metadata = connectionMetadata["host:$port"]!!
synchronized(metadata) {
if (!metadata.didCompleteHandshake) { //No need to do special handling if the handshake is finished
if (!metadata.isServerSide) { //When client side, we must take the salt
metadata.salt = input.copyOfRange(0, SALT_LENGTH) //Strip salt and store it
metadata.didCompleteHandshake = true //Mark completed handshake client side
return input.copyOfRange(SALT_LENGTH, input.size) //Strip the salt from the input and pass it forward (it still isn't encrypted at this point)
} else {
return input //When server side and we did not complete handshake, no need for processing since its unencrypted and has no salt yet
}
}

return decryptionCipher.doFinal(input).copyOfRange(SALT_LENGTH, input.size) //Normal operation, decrypt the input and remove the salt
}
}

data class ConnectionMetadata(@Volatile var isServerSide: Boolean,
@Volatile var didCompleteHandshake: Boolean = false,
@Volatile var salt: ByteArray? = null)
}
15 changes: 15 additions & 0 deletions src/main/kotlin/com/austinv11/persistence/impl/NoOpPreProcessor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.austinv11.persistence.impl

import com.austinv11.persistence.PreProcessor

/**
* This does no pre processing to provided data.
*/
class NoOpPreProcessor : PreProcessor {

override fun getKey(): Byte = 0

override fun pack(host: String, port: Int, input: ByteArray): ByteArray = input

override fun consume(host: String, port: Int, input: ByteArray): ByteArray = input
}
23 changes: 20 additions & 3 deletions src/main/kotlin/com/austinv11/persistence/internal/Packer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,36 @@ import com.austinv11.persistence.OpCode
import com.austinv11.persistence.PersistenceManager
import com.austinv11.persistence.logger
import com.austinv11.persistence.map
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream
import org.msgpack.core.MessageBufferPacker
import org.msgpack.core.MessagePack
import org.msgpack.core.MessageUnpacker
import org.msgpack.core.buffer.MessageBuffer
import org.msgpack.value.ValueType
import java.io.ByteArrayOutputStream
import java.util.*

private typealias RefArray = java.lang.reflect.Array
internal val ops = OpCode.values()
internal const val WRAPPER_KEY = "p"
internal const val RESPOND_KEY = "r"

internal fun PersistenceManager.pack(payload: Payload): MessageBuffer {
internal fun compress(bytes: ByteArray): ByteArray {
val byteStream = ByteArrayOutputStream()
val outStream = FramedLZ4CompressorOutputStream(byteStream)
outStream.write(bytes)
outStream.close()
return byteStream.toByteArray()
}

internal fun decompress(bytes: ByteArray): ByteArray {
val inStream = FramedLZ4CompressorInputStream(bytes.inputStream())
val readBytes = inStream.readBytes()
inStream.close()
return readBytes
}

internal fun PersistenceManager.pack(payload: Payload): ByteArray {
val packer = MessagePack.newDefaultBufferPacker()

val payloadMap = payload.toMap()
Expand Down Expand Up @@ -45,7 +62,7 @@ internal fun PersistenceManager.pack(payload: Payload): MessageBuffer {
}

packer.flush()
return packer.toMessageBuffer().also { packer.close() }
return packer.toMessageBuffer().also { packer.close() }.array()
}

internal fun PersistenceManager.unpack(bytes: ByteArray): Payload {
Expand Down
Loading

0 comments on commit 053513f

Please sign in to comment.