From eece4f689a3e28469633d5d37ba7dbc898305ab5 Mon Sep 17 00:00:00 2001 From: Mahad Date: Tue, 17 Dec 2024 15:34:09 +0500 Subject: [PATCH] handle remote candidates --- .../main/java/io/xconn/wampwebrtc/Offerer.kt | 73 +++++++++---------- .../main/java/io/xconn/wampwebrtc/Types.kt | 1 + .../main/java/io/xconn/wampwebrtc/WebRTC.kt | 57 +++++++++++++-- 3 files changed, 88 insertions(+), 43 deletions(-) diff --git a/app/src/main/java/io/xconn/wampwebrtc/Offerer.kt b/app/src/main/java/io/xconn/wampwebrtc/Offerer.kt index 06db0cf..e161ec8 100644 --- a/app/src/main/java/io/xconn/wampwebrtc/Offerer.kt +++ b/app/src/main/java/io/xconn/wampwebrtc/Offerer.kt @@ -1,6 +1,8 @@ package io.xconn.wampwebrtc import android.content.Context +import kotlinx.coroutines.delay +import kotlinx.coroutines.withTimeoutOrNull import org.webrtc.DataChannel import org.webrtc.IceCandidate import org.webrtc.MediaConstraints @@ -30,6 +32,8 @@ class Offerer( var dataChannel: DataChannel? = null val peerConnectionFactory = PeerConnectionFactory.builder().createPeerConnectionFactory() private var assembler = MessageAssembler() + private var onDataChannelOpen: (() -> Unit)? = null + private var dataChannelTimeoutMillis: Long = 20000 suspend fun createOffer(offerConfig: OfferConfig): SessionDescription? { val configuration = PeerConnection.RTCConfiguration(offerConfig.iceServers) @@ -45,17 +49,7 @@ class Offerer( } } - override fun onDataChannel(channel: DataChannel?) { - channel?.registerObserver( - object : DataChannel.Observer { - override fun onMessage(buffer: DataChannel.Buffer?) {} - - override fun onBufferedAmountChange(p0: Long) {} - - override fun onStateChange() {} - }, - ) - } + override fun onDataChannel(channel: DataChannel?) {} override fun onSignalingChange(p0: PeerConnection.SignalingState?) {} @@ -83,6 +77,29 @@ class Offerer( protocol = offerConfig.protocol } dataChannel = peerConnection?.createDataChannel("wamp", conf) + dataChannel?.registerObserver( + object : DataChannel.Observer { + override fun onStateChange() { + if (dataChannel?.state() == DataChannel.State.OPEN) { + onDataChannelOpen?.invoke() + } + } + + override fun onBufferedAmountChange(p0: Long) {} + + override fun onMessage(buffer: DataChannel.Buffer?) { + buffer?.data?.let { + val data = ByteArray(it.remaining()) + it.get(data) + + val message = assembler.feed(data) + if (message != null) { + queue.put(message) + } + } + } + }, + ) return suspendCoroutine { continuation -> peerConnection?.createOffer( @@ -115,32 +132,14 @@ class Offerer( } } - suspend fun waitForDataChannelOpen(): Unit = - suspendCoroutine { continuation -> - dataChannel?.registerObserver( - object : DataChannel.Observer { - override fun onStateChange() { - if (dataChannel?.state() == DataChannel.State.OPEN) { - continuation.resume(Unit) - } - } - - override fun onBufferedAmountChange(p0: Long) {} - - override fun onMessage(buffer: DataChannel.Buffer?) { - buffer?.data?.let { - val data = ByteArray(it.remaining()) - it.get(data) - - val message = assembler.feed(data) - if (message != null) { - queue.put(message) - } - } - } - }, - ) - } + suspend fun waitForDataChannelToOpen() { + withTimeoutOrNull(dataChannelTimeoutMillis) { + while (true) { + if (dataChannel?.state() == DataChannel.State.OPEN) return@withTimeoutOrNull + delay(100) + } + } ?: throw IllegalStateException("Data channel failed to open within $dataChannelTimeoutMillis milliseconds") + } fun setRemoteDescription(sessionDescription: SessionDescription) { peerConnection?.setRemoteDescription( diff --git a/app/src/main/java/io/xconn/wampwebrtc/Types.kt b/app/src/main/java/io/xconn/wampwebrtc/Types.kt index ee87440..b39dc30 100644 --- a/app/src/main/java/io/xconn/wampwebrtc/Types.kt +++ b/app/src/main/java/io/xconn/wampwebrtc/Types.kt @@ -55,6 +55,7 @@ data class ClientConfig( val realm: String, val procedureWebRTCOffer: String, val topicAnswererOnCandidate: String, + val topicOffererOnCandidate: String, val serializer: Serializer, val subProtocol: String, val iceServers: List, diff --git a/app/src/main/java/io/xconn/wampwebrtc/WebRTC.kt b/app/src/main/java/io/xconn/wampwebrtc/WebRTC.kt index 26a6170..7caae17 100644 --- a/app/src/main/java/io/xconn/wampwebrtc/WebRTC.kt +++ b/app/src/main/java/io/xconn/wampwebrtc/WebRTC.kt @@ -1,9 +1,12 @@ package io.xconn.wampwebrtc import android.content.Context -import io.xconn.wampproto.serializers.CBORSerializer import io.xconn.xconn.Client +import io.xconn.xconn.Event +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job import kotlinx.coroutines.launch import org.json.JSONArray import org.json.JSONObject @@ -16,8 +19,11 @@ class WebRTC( private val context: Context, private val queue: LinkedBlockingDeque, ) { + private lateinit var offerer: Offerer + private val couroutineScope = CoroutineScope(Dispatchers.Default + Job()) + suspend fun connect(config: ClientConfig): WebRTCSession { - val client = Client(serializer = CBORSerializer()) + val client = Client(serializer = config.serializer) val session = client.connect(config.url, config.realm) val requestID = UUID.randomUUID().toString() @@ -31,7 +37,7 @@ class WebRTC( ) val candidates: MutableList = mutableListOf() - val offerer = + offerer = Offerer( context, queue, @@ -55,6 +61,10 @@ class WebRTC( }, ) + couroutineScope.launch { + session.subscribe(config.topicOffererOnCandidate, ::candidateHandler).await() + } + val offer = offerer.createOffer(offerConfig) Thread.sleep(200) @@ -95,13 +105,48 @@ class WebRTC( val sdpString = descriptionMap.getString("sdp") val sdpType = descriptionMap.getString("type") - val remoteDescription = - SessionDescription(SessionDescription.Type.fromCanonicalForm(sdpType), sdpString) + val remoteDescription = SessionDescription(SessionDescription.Type.fromCanonicalForm(sdpType), sdpString) offerer.setRemoteDescription(remoteDescription) - offerer.waitForDataChannelOpen() + offerer.waitForDataChannelToOpen() return WebRTCSession(offerer.peerConnection!!, offerer.dataChannel!!) } + + private fun candidateHandler(event: Event) { + if (event.args == null || event.args!!.size < 2) { + throw Exception("invalid arguments length") + } + + val jsonString = + event.args?.get(1) as? String + ?: throw Exception("Invalid argument type: Second argument must be a JSON string") + + val result = + try { + convertJsonToMap(jsonString) + } catch (e: Exception) { + throw Exception("Invalid JSON: Unable to parse JSON string") + } + + val candidate = + result["candidate"] as? String + ?: throw Exception("Invalid candidate: 'candidate' field is missing or not a string") + + val sdpMLineIndex = + result["sdpMLineIndex"] as? Int + ?: throw Exception("Invalid sdpMLineIndex: 'sdpMLineIndex' field is missing or not an integer") + + val sdpMid = + result["sdpMid"] as? String + ?: throw Exception("Invalid sdpMid: 'sdpMid' field is missing or not a string") + + try { + val iceCandidate = IceCandidate(sdpMid, sdpMLineIndex, candidate) + offerer.addIceCandidate(iceCandidate) + } catch (e: Exception) { + throw Exception("Failed to add ICE candidate: ${e.message}") + } + } }