diff --git a/ext/build.gradle.kts b/ext/build.gradle.kts index cd43613..1dc19ac 100644 --- a/ext/build.gradle.kts +++ b/ext/build.gradle.kts @@ -20,8 +20,6 @@ kotlin { dependencies { val libVersion: String by project compileOnly("com.github.brahmkshatriya:echo:$libVersion") - - implementation("org.nanohttpd:nanohttpd:2.3.1") } val extType: String by project diff --git a/ext/src/main/java/dev/brahmkshatriya/echo/extension/LocalAudioWebServer.kt b/ext/src/main/java/dev/brahmkshatriya/echo/extension/LocalAudioWebServer.kt index 86720c1..ef2d0c2 100644 --- a/ext/src/main/java/dev/brahmkshatriya/echo/extension/LocalAudioWebServer.kt +++ b/ext/src/main/java/dev/brahmkshatriya/echo/extension/LocalAudioWebServer.kt @@ -1,212 +1,305 @@ package dev.brahmkshatriya.echo.extension import dev.brahmkshatriya.echo.common.models.Streamable -import fi.iki.elonen.NanoHTTPD -import fi.iki.elonen.NanoHTTPD.MIME_PLAINTEXT -import fi.iki.elonen.NanoHTTPD.SOCKET_READ_TIMEOUT -import fi.iki.elonen.NanoHTTPD.newFixedLengthResponse +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch import kotlinx.io.IOException +import okhttp3.OkHttpClient +import okhttp3.Request import java.io.BufferedInputStream +import java.io.BufferedOutputStream +import java.io.BufferedReader import java.io.FilterInputStream -import java.net.URL -import javax.net.ssl.HttpsURLConnection +import java.io.InputStreamReader +import java.io.OutputStream +import java.net.ServerSocket +import java.net.Socket +import java.net.SocketException +import java.net.URLDecoder -class LocalAudioWebServer( - hostname: String, - port: Int -) : NanoHTTPD(hostname, port) { +object LocalAudioServer { private val trackMap = mutableMapOf>() + private var serverSocket: ServerSocket? = null + private var serverJob: Job? = null + private val lock = Any() - fun addOrUpdateTrack(streamable: Streamable) { - val key = streamable.extras["key"] ?: "" - trackMap[streamable.id] = streamable to key - } + @Volatile + private var usedPort: Int = -1 + private const val HOSTNAME = "127.0.0.1" - override fun serve(session: IHTTPSession): Response { - if (session.method != Method.GET) { - return newFixedLengthResponse(Response.Status.METHOD_NOT_ALLOWED, MIME_PLAINTEXT, "Only GET allowed") - } + private val okHttpClient = OkHttpClient() - if (session.uri != "/stream") { - return newFixedLengthResponse(Response.Status.NOT_FOUND, MIME_PLAINTEXT, "Not Found") + private fun startServerIfNeeded(scope: CoroutineScope) { + synchronized(lock) { + if (serverSocket == null) { + serverSocket = ServerSocket(0, 50) // backlog=50 + usedPort = serverSocket!!.localPort + + serverJob = scope.launch(Dispatchers.IO) { + println("LocalAudioServer started on port: $usedPort") + while (isActive) { + try { + val clientSocket = serverSocket!!.accept() + launch(Dispatchers.IO) { + handleClient(clientSocket) + } + } catch (e: IOException) { + if (isActive) e.printStackTrace() + } + } + } + } } + } - val trackId = session.parms["trackId"] - if (trackId.isNullOrBlank()) { - return newFixedLengthResponse( - Response.Status.BAD_REQUEST, - MIME_PLAINTEXT, - "Missing trackId parameter!" - ) + fun stopServer() { + synchronized(lock) { + serverJob?.cancel() + serverJob = null + serverSocket?.close() + serverSocket = null } + } - val (streamable, key) = trackMap[trackId] - ?: return newFixedLengthResponse( - Response.Status.NOT_FOUND, - MIME_PLAINTEXT, - "Track not found!" - ) - - val rangeHeader = session.headers["range"] - var startBytes = 0L - var endBytes = -1L - var isRanged = false - - if (rangeHeader != null && rangeHeader.startsWith("bytes=")) { - isRanged = true - val ranges = rangeHeader.removePrefix("bytes=").split("-") - startBytes = ranges[0].toLongOrNull() ?: 0 - if (ranges.size > 1 && ranges[1].isNotEmpty()) { - endBytes = ranges[1].toLongOrNull() ?: -1 - } + fun addTrack(streamable: Streamable, scope: CoroutineScope) { + synchronized(lock) { + startServerIfNeeded(scope) + val key = streamable.extras["key"] ?: "" + trackMap[streamable.id] = streamable to key } + } - return deezerStream(streamable.id, startBytes, endBytes, isRanged, key) + fun getStreamUrlForTrack(trackId: String, scope: CoroutineScope): String { + startServerIfNeeded(scope) + return "http://$HOSTNAME:$usedPort/stream?trackId=$trackId" } -} - -private fun deezerStream( - sURL: String, - startBytes: Long, - end: Long, - isRanged: Boolean, - key: String -): NanoHTTPD.Response { - - var deezerStart = startBytes - deezerStart -= startBytes % 2048 - val dropBytes = startBytes % 2048 - - return try { - val url = URL(sURL) - val connection = url.openConnection() as HttpsURLConnection - - connection.connectTimeout = 10_000 - connection.requestMethod = "GET" - connection.setRequestProperty( - "User-Agent", - "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " + - "Chrome/79.0.3945.130 Safari/537.36" - ) - connection.setRequestProperty("Accept-Language", "*") - connection.setRequestProperty("Accept", "*/*") - connection.setRequestProperty( - "Range", - "bytes=$deezerStart-${if (end == -1L) "" else end}" - ) - connection.connect() - - val outResponse: NanoHTTPD.Response = - newFixedLengthResponse( - if (isRanged) NanoHTTPD.Response.Status.PARTIAL_CONTENT else NanoHTTPD.Response.Status.OK, - "audio/mpeg", - BufferedInputStream(object : FilterInputStream(connection.inputStream) { - var counter = deezerStart / 2048 - var drop = dropBytes - - @Throws(IOException::class) - override fun read(b: ByteArray, off: Int, len: Int): Int { - val buffer = ByteArray(2048) - var readBytes: Int - var totalRead = 0 - - while (totalRead < 2048) { - readBytes = `in`.read(buffer, totalRead, 2048 - totalRead) - if (readBytes == -1) break - totalRead += readBytes - } - if (totalRead == 0) return -1 - if (totalRead != 2048) { - System.arraycopy(buffer, 0, b, off, totalRead) - return totalRead - } + private fun handleClient(socket: Socket) { + socket.use { s -> + s.soTimeout = 10_000 + val input = BufferedReader(InputStreamReader(s.getInputStream())) + val output = BufferedOutputStream(s.getOutputStream()) - var processedBuffer = buffer - if (counter % 3 == 0L) { - processedBuffer = Utils.decryptBlowfish(buffer, key) - } + val requestLine = input.readLine() ?: return + val (method, path) = parseRequestLine(requestLine) ?: run { + sendResponse(output, 400, "Bad Request", "Invalid request line.") + return + } - if (drop > 0) { - val output = 2048 - drop.toInt() - System.arraycopy(processedBuffer, drop.toInt(), b, off, output) - drop = 0 - counter++ - return output - } + if (method != "GET") { + sendResponse(output, 405, "Method Not Allowed", "Only GET supported.") + return + } - System.arraycopy(processedBuffer, 0, b, off, 2048) - counter++ - return 2048 - } - }, 2048), - connection.contentLengthLong - dropBytes - ) - - if (isRanged) { - val contentLength = connection.contentLengthLong + deezerStart - val rangeEnd = if (end == -1L) contentLength - 1 else end - val range = "bytes $startBytes-$rangeEnd/$contentLength" - outResponse.addHeader("Content-Range", range) + val headers = mutableMapOf() + while (true) { + val line = input.readLine() ?: break + if (line.isBlank()) break + val idx = line.indexOf(":") + if (idx != -1) { + val key = line.substring(0, idx).trim() + val value = line.substring(idx + 1).trim() + headers[key.lowercase()] = value + } + } + + val (requestPath, queryString) = splitPathQuery(path) + if (requestPath != "/stream") { + sendResponse(output, 404, "Not Found", "Unknown path.") + return + } + val queryParams = parseQueryString(queryString) + val trackId = queryParams["trackId"] + if (trackId.isNullOrEmpty()) { + sendResponse(output, 400, "Bad Request", "Missing trackId parameter.") + return + } + + val (streamable, key) = trackMap[trackId] + ?: run { + sendResponse(output, 404, "Not Found", "Track not found.") + return + } + + val rangeHeader = headers["range"] + var startBytes = 0L + var endBytes = -1L + var isRanged = false + + if (!rangeHeader.isNullOrBlank() && rangeHeader.startsWith("bytes=")) { + isRanged = true + val ranges = rangeHeader.removePrefix("bytes=").split("-") + startBytes = ranges[0].toLongOrNull() ?: 0 + if (ranges.size > 1 && ranges[1].isNotEmpty()) { + endBytes = ranges[1].toLongOrNull() ?: -1 + } + } + + streamDeezer(output, isRanged, startBytes, endBytes, key, streamable.id) } - outResponse.addHeader("Accept-Ranges", "bytes") - - outResponse - } catch (e: Exception) { - e.printStackTrace() - newFixedLengthResponse( - NanoHTTPD.Response.Status.INTERNAL_ERROR, - MIME_PLAINTEXT, - "Failed getting data!" - ) } -} -object AudioStreamManager { - private var server: LocalAudioWebServer? = null - private val lock = Any() - private const val HOSTNAME = "127.0.0.1" + private fun streamDeezer( + output: BufferedOutputStream, + isRanged: Boolean, + startBytes: Long, + endBytes: Long, + key: String, + deezerUrl: String + ) { + var deezerStart = startBytes - (startBytes % 2048) + val dropBytes = startBytes % 2048 + + val rangeValue = if (endBytes == -1L) { + "bytes=$deezerStart-" + } else { + "bytes=$deezerStart-$endBytes" + } - @Volatile - private var usedPort = -1 + val request = Request.Builder() + .url(deezerUrl) + .header("User-Agent", "Mozilla/5.0 etc...") + .header("Range", rangeValue) + .build() + + try { + okHttpClient.newCall(request).execute().use { response -> + val code = if (isRanged) 206 else 200 + val reason = if (isRanged) "Partial Content" else "OK" + val contentType = response.header("Content-Type") ?: "audio/mpeg" + val contentLength = response.body.contentLength() + + val headers = mutableListOf( + "HTTP/1.1 $code $reason", + "Content-Type: $contentType", + "Accept-Ranges: bytes" + ) + if (isRanged) { + val totalLen = deezerStart + contentLength - 1 + val rangeEnd = if (endBytes == -1L) totalLen else endBytes + val contentRange = "bytes $startBytes-$rangeEnd/${deezerStart + contentLength}" + headers.add("Content-Range: $contentRange") + } - private fun startServerIfNeeded() { - synchronized(lock) { - if (server == null) { - try { - server = LocalAudioWebServer(HOSTNAME, 0).apply { - start(SOCKET_READ_TIMEOUT, false) - } - usedPort = server?.listeningPort ?: -1 + if (contentLength >= 0) { + val computedLength = (contentLength - dropBytes).coerceAtLeast(0) + headers.add("Content-Length: $computedLength") + } - println("LocalAudioWebServer started on port: $usedPort") - } catch (e: Exception) { - println("Failed to start LocalAudioWebServer: ${e.message}") - e.printStackTrace() - server = null + headers.add("") + val headerBytes = headers.joinToString("\r\n").toByteArray() + output.write(headerBytes) + output.write("\r\n".toByteArray()) + + response.body.byteStream().use { responseStream -> + val bufferedIn = BufferedInputStream( + object : FilterInputStream(responseStream) { + var counter = deezerStart / 2048 + var drop = dropBytes + + override fun read(b: ByteArray, off: Int, len: Int): Int { + val chunkSize = 2048 + val buffer = ByteArray(chunkSize) + var totalRead = 0 + while (totalRead < chunkSize) { + val r = `in`.read(buffer, totalRead, chunkSize - totalRead) + if (r == -1) break + totalRead += r + } + if (totalRead == 0) return -1 + + var processed = buffer + if (totalRead == chunkSize && counter % 3 == 0L) { + processed = Utils.decryptBlowfish(buffer, key) + } + + if (drop > 0 && totalRead == chunkSize) { + val toWrite = (chunkSize - drop).toInt() + System.arraycopy(processed, drop.toInt(), b, off, toWrite) + drop = 0 + counter++ + return toWrite + } + + System.arraycopy(processed, 0, b, off, totalRead) + if (totalRead == chunkSize) counter++ + return totalRead + } + }, + 2048 + ) + + val buf = ByteArray(16_384) + while (true) { + val read = bufferedIn.read(buf) + if (read < 0) break + output.write(buf, 0, read) + } } + output.flush() + } + } catch (e: IOException) { + if (e is SocketException && e.message?.contains("Broken pipe") == true) { + println("Client closed connection: ${e.message}") + } else { + e.printStackTrace() } + } finally { + output.close() } } - fun addTrack(streamable: Streamable) { - synchronized(lock) { - startServerIfNeeded() - server?.addOrUpdateTrack(streamable) - } + private fun parseRequestLine(line: String): Triple? { + val parts = line.split(" ") + if (parts.size < 3) return null + return Triple(parts[0], parts[1], parts[2]) } - fun stopServer() { - synchronized(lock) { - server?.stop() - server = null - println("LocalAudioWebServer stopped.") + + private fun splitPathQuery(path: String): Pair { + val idx = path.indexOf('?') + return if (idx != -1) { + val p = path.substring(0, idx) + val q = path.substring(idx + 1) + p to q + } else { + path to null } } - fun getStreamUrlForTrack(trackId: String): String { - return "http://$HOSTNAME:$usedPort/stream?trackId=$trackId" + private fun parseQueryString(qs: String?): Map { + if (qs.isNullOrEmpty()) return emptyMap() + val result = mutableMapOf() + val pairs = qs.split("&") + for (pair in pairs) { + val idx = pair.indexOf('=') + if (idx != -1) { + val key = URLDecoder.decode(pair.substring(0, idx), Charsets.UTF_8.name()) + val value = URLDecoder.decode(pair.substring(idx + 1), Charsets.UTF_8.name()) + result[key] = value + } + } + return result } -} + private fun sendResponse( + output: OutputStream, + code: Int, + reason: String, + message: String + ) { + val headers = """ + HTTP/1.1 $code $reason + Content-Type: text/plain + Content-Length: ${message.toByteArray().size} + + """.trimIndent() + "\r\n" + output.write(headers.toByteArray()) + output.write(message.toByteArray()) + output.flush() + } +} \ No newline at end of file diff --git a/ext/src/main/java/dev/brahmkshatriya/echo/extension/clients/DeezerTrackClient.kt b/ext/src/main/java/dev/brahmkshatriya/echo/extension/clients/DeezerTrackClient.kt index d54fc52..28e8c59 100644 --- a/ext/src/main/java/dev/brahmkshatriya/echo/extension/clients/DeezerTrackClient.kt +++ b/ext/src/main/java/dev/brahmkshatriya/echo/extension/clients/DeezerTrackClient.kt @@ -4,10 +4,10 @@ import dev.brahmkshatriya.echo.common.models.Streamable import dev.brahmkshatriya.echo.common.models.Streamable.Media.Companion.toMedia import dev.brahmkshatriya.echo.common.models.Streamable.Source.Companion.toSource import dev.brahmkshatriya.echo.common.models.Track -import dev.brahmkshatriya.echo.extension.AudioStreamManager import dev.brahmkshatriya.echo.extension.DeezerApi import dev.brahmkshatriya.echo.extension.DeezerExtension import dev.brahmkshatriya.echo.extension.DeezerParser +import dev.brahmkshatriya.echo.extension.LocalAudioServer import dev.brahmkshatriya.echo.extension.Utils import dev.brahmkshatriya.echo.extension.generateTrackUrl import dev.brahmkshatriya.echo.extension.getByteStreamAudio @@ -30,13 +30,13 @@ class DeezerTrackClient(private val api: DeezerApi, private val parser: DeezerPa return if (streamable.quality == 1) { streamable.id.toSource().toMedia() } else { + val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) if(isDownload) { - val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) getByteStreamAudio(scope, streamable, client) } else { - val audioStreamManger = AudioStreamManager - audioStreamManger.addTrack(streamable) - audioStreamManger.getStreamUrlForTrack(streamable.id).toSource().toMedia() + val localAudioServer = LocalAudioServer + localAudioServer.addTrack(streamable, scope) + localAudioServer.getStreamUrlForTrack(streamable.id, scope).toSource().toMedia() } } }