Skip to content

Commit

Permalink
Merge pull request #2 from Mahad-10/add-basic-ci
Browse files Browse the repository at this point in the history
Add basic CI
  • Loading branch information
Mahad-10 authored Dec 13, 2024
2 parents 56a263a + 6f8437a commit 18c70ce
Show file tree
Hide file tree
Showing 11 changed files with 195 additions and 95 deletions.
4 changes: 4 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
root = true

[*.{kt,kts}]
max_line_length = 120
31 changes: 31 additions & 0 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: WAMP WebRTC CI

on:
push:
branches:
- main
pull_request:
branches:
- main

jobs:
build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- run: |
curl -sSLO https://github.com/pinterest/ktlint/releases/download/1.5.0/ktlint && chmod a+x ktlint &&
sudo mv ktlint /usr/local/bin/
- name: Run lint
run: make lint

- name: Set up JDK 17
uses: actions/setup-java@v4
with:
distribution: 'corretto'
java-version: 17

- name: Build with Gradle
run: make build
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
lint:
ktlint

format:
ktlint -F

build:
./gradlew build
6 changes: 5 additions & 1 deletion app/src/main/java/io/xconn/wampwebrtc/Helpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ fun convertJsonToMap(jsonString: String): Map<String, Any> {
return map
}

suspend fun join(peer: Peer, realm: String, serializer: Serializer): PeerBaseSession {
suspend fun join(
peer: Peer,
realm: String,
serializer: Serializer,
): PeerBaseSession {
val joiner = Joiner(realm, serializer)
val hello = joiner.sendHello()

Expand Down
21 changes: 11 additions & 10 deletions app/src/main/java/io/xconn/wampwebrtc/MessageAssembler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@ class MessageAssembler {
}
}

fun chunkMessage(message: ByteArray): Sequence<ByteArray> = sequence {
val chunkSize = 16 * 1024 - 1 // 16KB - 1 byte for metadata
val totalChunks = (message.size + chunkSize - 1) / chunkSize
fun chunkMessage(message: ByteArray): Sequence<ByteArray> =
sequence {
val chunkSize = 16 * 1024 - 1 // 16KB - 1 byte for metadata
val totalChunks = (message.size + chunkSize - 1) / chunkSize

for (i in 0 until totalChunks) {
val start = i * chunkSize
val end = if (i == totalChunks - 1) message.size else start + chunkSize
val chunk = message.copyOfRange(start, end)
for (i in 0 until totalChunks) {
val start = i * chunkSize
val end = if (i == totalChunks - 1) message.size else start + chunkSize
val chunk = message.copyOfRange(start, end)

val isFinal = if (i == totalChunks - 1) 1.toByte() else 0.toByte()
yield(byteArrayOf(isFinal) + chunk)
val isFinal = if (i == totalChunks - 1) 1.toByte() else 0.toByte()
yield(byteArrayOf(isFinal) + chunk)
}
}
}
}
192 changes: 117 additions & 75 deletions app/src/main/java/io/xconn/wampwebrtc/Offerer.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
package io.xconn.wampwebrtc

import android.content.Context
import org.webrtc.*
import org.webrtc.DataChannel
import org.webrtc.IceCandidate
import org.webrtc.MediaConstraints
import org.webrtc.MediaStream
import org.webrtc.PeerConnection
import org.webrtc.PeerConnectionFactory
import org.webrtc.SdpObserver
import org.webrtc.SessionDescription
import java.util.concurrent.LinkedBlockingDeque
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
Expand All @@ -12,8 +19,10 @@ class Offerer(
private val signalIceCandidate: (IceCandidate) -> Unit,
) {
init {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
val options =
PeerConnectionFactory.InitializationOptions
.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)
}

Expand All @@ -25,94 +34,127 @@ class Offerer(
suspend fun createOffer(offerConfig: OfferConfig): SessionDescription? {
val configuration = PeerConnection.RTCConfiguration(offerConfig.iceServers)

peerConnection = peerConnectionFactory.createPeerConnection(
configuration,
object : PeerConnection.Observer {
override fun onIceCandidate(candidate: IceCandidate?) {
candidate?.let {
signalIceCandidate(it)
peerConnection?.addIceCandidate(it)
peerConnection =
peerConnectionFactory.createPeerConnection(
configuration,
object : PeerConnection.Observer {
override fun onIceCandidate(candidate: IceCandidate?) {
candidate?.let {
signalIceCandidate(it)
peerConnection?.addIceCandidate(it)
}
}

override fun onDataChannel(channel: DataChannel?) {
channel?.registerObserver(
object : DataChannel.Observer {
override fun onMessage(buffer: DataChannel.Buffer?) {}

override fun onBufferedAmountChange(p0: Long) {}

override fun onStateChange() {}
},
)
}
}

override fun onDataChannel(channel: DataChannel?) {
channel?.registerObserver(object : DataChannel.Observer {
override fun onMessage(buffer: DataChannel.Buffer?) {}

override fun onBufferedAmountChange(p0: Long) {}
override fun onStateChange() {}
})
}

override fun onSignalingChange(p0: PeerConnection.SignalingState?) {}
override fun onIceConnectionChange(p0: PeerConnection.IceConnectionState?) {}
override fun onIceConnectionReceivingChange(p0: Boolean) {}
override fun onIceGatheringChange(p0: PeerConnection.IceGatheringState?) {}
override fun onAddStream(p0: MediaStream?) {}
override fun onRemoveStream(p0: MediaStream?) {}
override fun onRenegotiationNeeded() {}
override fun onIceCandidatesRemoved(p0: Array<out IceCandidate>?) {}
})

override fun onSignalingChange(p0: PeerConnection.SignalingState?) {}

override fun onIceConnectionChange(p0: PeerConnection.IceConnectionState?) {}

override fun onIceConnectionReceivingChange(p0: Boolean) {}

override fun onIceGatheringChange(p0: PeerConnection.IceGatheringState?) {}

override fun onAddStream(p0: MediaStream?) {}

override fun onRemoveStream(p0: MediaStream?) {}

override fun onRenegotiationNeeded() {}

override fun onIceCandidatesRemoved(p0: Array<out IceCandidate>?) {}
},
)

// Create and set up the data channel
val conf = DataChannel.Init().apply {
id = offerConfig.id
ordered = offerConfig.ordered
protocol = offerConfig.protocol
}
val conf =
DataChannel.Init().apply {
id = offerConfig.id
ordered = offerConfig.ordered
protocol = offerConfig.protocol
}
dataChannel = peerConnection?.createDataChannel("wamp", conf)

return suspendCoroutine { continuation ->
peerConnection?.createOffer(object : SdpObserver {
override fun onCreateSuccess(description: SessionDescription?) {
peerConnection?.setLocalDescription(object : SdpObserver {
override fun onCreateSuccess(description: SessionDescription?) {}
override fun onSetSuccess() {
continuation.resume(description)
}
override fun onCreateFailure(p0: String?) {}
override fun onSetFailure(p0: String?) {}
peerConnection?.createOffer(
object : SdpObserver {
override fun onCreateSuccess(description: SessionDescription?) {
peerConnection?.setLocalDescription(
object : SdpObserver {
override fun onCreateSuccess(description: SessionDescription?) {}

override fun onSetSuccess() {
continuation.resume(description)
}

override fun onCreateFailure(p0: String?) {}

override fun onSetFailure(p0: String?) {}
},
description,
)
}

}, description)
}
override fun onSetSuccess() {}

override fun onSetSuccess() {}
override fun onCreateFailure(p0: String?) {}
override fun onSetFailure(p0: String?) {}
}, MediaConstraints())
override fun onCreateFailure(p0: String?) {}

override fun onSetFailure(p0: String?) {}
},
MediaConstraints(),
)
}
}

suspend fun waitForDataChannelOpen(): Unit = suspendCoroutine { continuation ->
dataChannel?.registerObserver(object : DataChannel.Observer {
override fun onStateChange() {
if (dataChannel?.state() == DataChannel.State.OPEN) {
continuation.resume(Unit)
}
}
suspend fun waitForDataChannelOpen(): Unit =
suspendCoroutine { continuation ->
dataChannel?.registerObserver(
object : DataChannel.Observer {
override fun onStateChange() {
if (dataChannel?.state() == DataChannel.State.OPEN) {
continuation.resume(Unit)
}
}

override fun onBufferedAmountChange(p0: Long) {}

override fun onBufferedAmountChange(p0: Long) {}
override fun onMessage(buffer: DataChannel.Buffer?) {
buffer?.data?.let {
val data = ByteArray(it.remaining())
it.get(data)
override fun onMessage(buffer: DataChannel.Buffer?) {
buffer?.data?.let {
val data = ByteArray(it.remaining())
it.get(data)

val message = assembler.feed(data)
if (message != null) {
queue.put(message)
val message = assembler.feed(data)
if (message != null) {
queue.put(message)
}
}
}
}
}
})
}
},
)
}

fun setRemoteDescription(sessionDescription: SessionDescription) {
peerConnection?.setRemoteDescription(object : SdpObserver {
override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onSetSuccess() {}
override fun onCreateFailure(p0: String?) {}
override fun onSetFailure(p0: String?) {}
}, sessionDescription)
peerConnection?.setRemoteDescription(
object : SdpObserver {
override fun onCreateSuccess(p0: SessionDescription?) {}

override fun onSetSuccess() {}

override fun onCreateFailure(p0: String?) {}

override fun onSetFailure(p0: String?) {}
},
sessionDescription,
)
}

fun addIceCandidate(candidate: IceCandidate) {
Expand Down
5 changes: 4 additions & 1 deletion app/src/main/java/io/xconn/wampwebrtc/Types.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,7 @@ data class OfferConfig(
val topicAnswererOnCandidate: String,
)

data class WebRTCSession(val connection: PeerConnection, val channel: DataChannel)
data class WebRTCSession(
val connection: PeerConnection,
val channel: DataChannel,
)
5 changes: 3 additions & 2 deletions app/src/main/java/io/xconn/wampwebrtc/WAMPSession.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import android.content.Context
import io.xconn.xconn.Session
import java.util.concurrent.LinkedBlockingDeque

class WAMPSession(private val context: Context) {

class WAMPSession(
private val context: Context,
) {
suspend fun connect(config: ClientConfig): Session {
val queue = LinkedBlockingDeque<ByteArray>()
val webRTCConnection = WebRTC(context, queue)
Expand Down
17 changes: 12 additions & 5 deletions app/src/main/java/io/xconn/wampwebrtc/WebRTC.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,28 @@ import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.json.JSONArray
import org.json.JSONObject
import org.webrtc.DataChannel
import org.webrtc.IceCandidate
import org.webrtc.SessionDescription
import java.util.UUID
import java.util.concurrent.LinkedBlockingDeque

class WebRTC(private val context: Context, private val queue: LinkedBlockingDeque<ByteArray>) {
class WebRTC(
private val context: Context,
private val queue: LinkedBlockingDeque<ByteArray>,
) {
suspend fun connect(config: ClientConfig): WebRTCSession {
val client = Client(serializer = CBORSerializer())
val session = client.connect(config.url, config.realm)

val requestID = UUID.randomUUID().toString()
val offerConfig = OfferConfig(
config.subProtocol, config.iceServers, true, 1, config.topicAnswererOnCandidate
)
val offerConfig =
OfferConfig(
config.subProtocol,
config.iceServers,
true,
1,
config.topicAnswererOnCandidate,
)

val candidates: MutableList<IceCandidate> = mutableListOf()
val offerer =
Expand Down
Binary file added gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
1 change: 0 additions & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,3 @@ dependencyResolutionManagement {

rootProject.name = "WampWebRTC"
include(":app")

0 comments on commit 18c70ce

Please sign in to comment.