Skip to content

Commit

Permalink
Fix seeking past cache
Browse files Browse the repository at this point in the history
  • Loading branch information
LuftVerbot committed Dec 10, 2024
1 parent f378927 commit e812866
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 140 deletions.
2 changes: 2 additions & 0 deletions ext/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ dependencies {
val libVersion: String by project
compileOnly("com.github.brahmkshatriya:echo:$libVersion")
compileOnly("com.squareup.okhttp3:okhttp:5.0.0-alpha.14")

implementation("org.nanohttpd:nanohttpd:2.3.1")
}

val extType: String by project
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package dev.brahmkshatriya.echo.extension

import dev.brahmkshatriya.echo.common.models.Streamable
import fi.iki.elonen.NanoHTTPD
import fi.iki.elonen.NanoHTTPD.MIME_PLAINTEXT
import fi.iki.elonen.NanoHTTPD.newFixedLengthResponse
import kotlinx.io.IOException
import java.io.BufferedInputStream
import java.io.FilterInputStream
import java.net.URL
import javax.net.ssl.HttpsURLConnection

class LocalAudioWebServer(
hostname: String,
port: Int,
private val streamable: Streamable,
private val key: String
) : NanoHTTPD(hostname, port) {

override fun serve(session: IHTTPSession): Response {
if (session.method != Method.GET) {
return newFixedLengthResponse(Response.Status.METHOD_NOT_ALLOWED, MIME_PLAINTEXT, "Only GET allowed")
}

if (session.uri != "/stream") {
return newFixedLengthResponse(Response.Status.NOT_FOUND, MIME_PLAINTEXT, "Not Found")
}

val rangeHeader = session.headers["range"]
var startBytes = 0L
var endBytes = -1L
var isRanged = false

if (rangeHeader != null && rangeHeader.startsWith("bytes=")) {
isRanged = true
val ranges = rangeHeader.removePrefix("bytes=").split("-")
startBytes = ranges[0].toLongOrNull() ?: 0
if (ranges.size > 1 && ranges[1].isNotEmpty()) {
endBytes = ranges[1].toLongOrNull() ?: -1
}
}


val outResponse = deezerStream(streamable.id, startBytes, endBytes, isRanged, key)

return outResponse
}
}

private fun deezerStream(
sURL: String,
startBytes: Long,
end: Long,
isRanged: Boolean,
key: String
): NanoHTTPD.Response {

var deezerStart = startBytes
deezerStart -= startBytes % 2048
val dropBytes = startBytes % 2048

return try {
val url = URL(sURL)
val connection = url.openConnection() as HttpsURLConnection

connection.connectTimeout = 10_000
connection.requestMethod = "GET"
connection.setRequestProperty(
"User-Agent",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " +
"Chrome/79.0.3945.130 Safari/537.36"
)
connection.setRequestProperty("Accept-Language", "*")
connection.setRequestProperty("Accept", "*/*")
connection.setRequestProperty(
"Range",
"bytes=$deezerStart-${if (end == -1L) "" else end}"
)
connection.connect()

val outResponse: NanoHTTPD.Response =
newFixedLengthResponse(
if (isRanged) NanoHTTPD.Response.Status.PARTIAL_CONTENT else NanoHTTPD.Response.Status.OK,
"audio/mpeg",
BufferedInputStream(object : FilterInputStream(connection.inputStream) {
var counter = deezerStart / 2048
var drop = dropBytes

@Throws(IOException::class)
override fun read(b: ByteArray, off: Int, len: Int): Int {
val buffer = ByteArray(2048)
var readBytes: Int
var totalRead = 0

while (totalRead < 2048) {
readBytes = `in`.read(buffer, totalRead, 2048 - totalRead)
if (readBytes == -1) break
totalRead += readBytes
}
if (totalRead == 0) return -1

if (totalRead != 2048) {
System.arraycopy(buffer, 0, b, off, totalRead)
return totalRead
}

var processedBuffer = buffer
if (counter % 3 == 0L) {
processedBuffer = Utils.decryptBlowfish(buffer, key)
}

if (drop > 0) {
val output = 2048 - drop.toInt()
System.arraycopy(processedBuffer, drop.toInt(), b, off, output)
drop = 0
counter++
return output
}

System.arraycopy(processedBuffer, 0, b, off, 2048)
counter++
return 2048
}
}, 2048),
connection.contentLengthLong - dropBytes
)

if (isRanged) {
val contentLength = connection.contentLengthLong + deezerStart
val rangeEnd = if (end == -1L) contentLength - 1 else end
val range = "bytes $startBytes-$rangeEnd/$contentLength"
outResponse.addHeader("Content-Range", range)
}
outResponse.addHeader("Accept-Ranges", "bytes")

outResponse
} catch (e: Exception) {
e.printStackTrace()
newFixedLengthResponse(
NanoHTTPD.Response.Status.INTERNAL_ERROR,
MIME_PLAINTEXT,
"Failed getting data!"
)
}
}

object AudioStreamManager {
private var server: LocalAudioWebServer? = null
private val lock = Any()
private const val HOSTNAME = "127.0.0.1"
private const val PORT = 36958

fun startServer(
streamable: Streamable,
) {
synchronized(lock) {
server?.stop()

val key = streamable.extra["key"] ?: ""

server = LocalAudioWebServer(HOSTNAME, PORT, streamable, key).apply {
start(NanoHTTPD.SOCKET_READ_TIMEOUT, false)
}
}
}

fun stopServer() {
synchronized(lock) {
server?.stop()
server = null
println("LocalAudioWebServer stopped.")
}
}

fun getStreamUrl(): String {
return "http://$HOSTNAME:$PORT/stream"
}
}
140 changes: 0 additions & 140 deletions ext/src/main/java/dev/brahmkshatriya/echo/extension/Utils.kt
Original file line number Diff line number Diff line change
@@ -1,20 +1,8 @@
package dev.brahmkshatriya.echo.extension

import dev.brahmkshatriya.echo.common.models.Streamable
import dev.brahmkshatriya.echo.common.models.Streamable.Media.Companion.toMedia
import io.ktor.utils.io.ByteChannel
import io.ktor.utils.io.writeFully
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import okhttp3.ConnectionPool
import okhttp3.OkHttpClient
import okhttp3.Protocol
import okhttp3.Request
import java.io.ByteArrayOutputStream
import java.io.IOException
import java.security.MessageDigest
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import javax.crypto.Cipher
import javax.crypto.spec.IvParameterSpec
import javax.crypto.spec.SecretKeySpec
Expand Down Expand Up @@ -63,134 +51,6 @@ object Utils {
}
return cipher.doFinal(chunk)
}

fun getContentLength(url: String, client: OkHttpClient): Long {
val request = Request.Builder().url(url).head().build()
client.newCall(request).execute().use { response ->
return response.header("Content-Length")?.toLong() ?: 0L
}
}
}

fun getByteChannel(
scope: CoroutineScope,
streamable: Streamable,
client: OkHttpClient,
contentLength: Long
): ByteChannel {
val url = streamable.id
val key = streamable.extra["key"] ?: ""

val byteChannel = ByteChannel(true)
var lastActivityTime = System.currentTimeMillis()

scope.launch {
val clientWithTimeouts = client.newBuilder()
.readTimeout(0, TimeUnit.SECONDS)
.connectTimeout(0, TimeUnit.SECONDS)
.writeTimeout(0, TimeUnit.SECONDS)
.connectionPool(ConnectionPool(5, 5, TimeUnit.MINUTES))
.protocols(listOf(Protocol.HTTP_1_1))
.retryOnConnectionFailure(true)
.build()

var totalBytesRead = 0L
var counter = 0

while (totalBytesRead < contentLength && !byteChannel.isClosedForWrite) {
val requestBuilder = Request.Builder().url(url)

if (totalBytesRead > 0) {
requestBuilder.header("Range", "bytes=$totalBytesRead-")
}

val request = requestBuilder.build()

val response = clientWithTimeouts.newCall(request).execute()

response.body.byteStream().use { byteStream ->
var shouldReopen = false

while (totalBytesRead < contentLength && !shouldReopen) {
if (System.currentTimeMillis() - lastActivityTime > 300_000L) {
shouldReopen = true
break
}

val buffer = ByteArray(2048)
var bytesRead: Int
var totalRead = 0

try {
while (totalRead < buffer.size) {
bytesRead = byteStream.read(buffer, totalRead, buffer.size - totalRead)
if (bytesRead == -1) {
shouldReopen = true
break
}
totalRead += bytesRead

lastActivityTime = System.currentTimeMillis()
}
} catch (e: Exception) {
e.printStackTrace()
shouldReopen = true
break
}

if (totalRead == 0) {
shouldReopen = true
break
}

try {
if (System.currentTimeMillis() - lastActivityTime > 300_000L) {
shouldReopen = true
break
}

if (totalRead != 2048) {
byteChannel.writeFully(buffer, 0, totalRead)
} else {
if ((counter % 3) == 0) {
val decryptedChunk = Utils.decryptBlowfish(buffer, key)
byteChannel.writeFully(decryptedChunk, 0, totalRead)
} else {
byteChannel.writeFully(buffer, 0, totalRead)
}
}

lastActivityTime = System.currentTimeMillis()
} catch (e: IOException) {
e.printStackTrace()
println("Exception occurred while writing to channel: ${e.message}")
shouldReopen = true
break
}
totalBytesRead += totalRead
counter++
}

if (!shouldReopen) {
response.close()
}
}
}
}

return byteChannel
}

fun getByteStreamAudio(
scope: CoroutineScope,
streamable: Streamable,
client: OkHttpClient
): Streamable.Media {
val contentLength = Utils.getContentLength(streamable.id, client)
return Streamable.Source.Channel(
channel = getByteChannel(scope, streamable, client, contentLength),
totalBytes = contentLength
).toMedia()
}

@Suppress("NewApi", "GetInstance")
Expand Down

0 comments on commit e812866

Please sign in to comment.