Gzip only for data packets.

This commit is contained in:
Koen J 2025-05-03 17:09:34 +02:00
commit a100785ad7
2 changed files with 34 additions and 15 deletions

View file

@ -185,17 +185,24 @@ class ChannelRelayed(
} }
} }
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?, contentEncoding: ContentEncoding?) { override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?, ce: ContentEncoding?) {
throwIfDisposed() throwIfDisposed()
var contentEncoding: ContentEncoding? = ce
var processedData = data var processedData = data
if (data != null && contentEncoding == ContentEncoding.Gzip) { if (data != null && contentEncoding == ContentEncoding.Gzip) {
val compressedStream = ByteArrayOutputStream() val isGzipSupported = opcode == Opcode.DATA.value
GZIPOutputStream(compressedStream).use { gzipStream -> if (isGzipSupported) {
gzipStream.write(data.array(), data.position(), data.remaining()) val compressedStream = ByteArrayOutputStream()
gzipStream.finish() GZIPOutputStream(compressedStream).use { gzipStream ->
gzipStream.write(data.array(), data.position(), data.remaining())
gzipStream.finish()
}
processedData = ByteBuffer.wrap(compressedStream.toByteArray())
} else {
Logger.w(TAG, "Gzip requested but not supported on this (opcode = ${opcode}, subOpcode = ${subOpcode}), falling back.")
contentEncoding = ContentEncoding.Raw
} }
processedData = ByteBuffer.wrap(compressedStream.toByteArray())
} }
val ENCRYPTION_OVERHEAD = 16 val ENCRYPTION_OVERHEAD = 16

View file

@ -9,6 +9,7 @@ import com.futo.platformplayer.noise.protocol.CipherStatePair
import com.futo.platformplayer.noise.protocol.DHState import com.futo.platformplayer.noise.protocol.DHState
import com.futo.platformplayer.noise.protocol.HandshakeState import com.futo.platformplayer.noise.protocol.HandshakeState
import com.futo.platformplayer.states.StateSync import com.futo.platformplayer.states.StateSync
import com.futo.platformplayer.sync.internal.ChannelRelayed.Companion
import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CompletableDeferred
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import java.io.InputStream import java.io.InputStream
@ -363,19 +364,26 @@ class SyncSocketSession {
fun generateStreamId(): Int = synchronized(_streamIdGeneratorLock) { _streamIdGenerator++ } fun generateStreamId(): Int = synchronized(_streamIdGeneratorLock) { _streamIdGenerator++ }
private fun generateRequestId(): Int = synchronized(_requestIdGeneratorLock) { _requestIdGenerator++ } private fun generateRequestId(): Int = synchronized(_requestIdGeneratorLock) { _requestIdGenerator++ }
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer, contentEncoding: ContentEncoding? = null) { fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer, ce: ContentEncoding? = null) {
ensureNotMainThread() ensureNotMainThread()
Logger.v(TAG, "send (opcode: ${opcode}, subOpcode: ${subOpcode}, data.remaining(): ${data.remaining()})") Logger.v(TAG, "send (opcode: ${opcode}, subOpcode: ${subOpcode}, data.remaining(): ${data.remaining()})")
var contentEncoding: ContentEncoding? = ce
var processedData = data var processedData = data
if (contentEncoding == ContentEncoding.Gzip) { if (contentEncoding == ContentEncoding.Gzip) {
val compressedStream = ByteArrayOutputStream() val isGzipSupported = opcode == Opcode.DATA.value
GZIPOutputStream(compressedStream).use { gzipStream -> if (isGzipSupported) {
gzipStream.write(data.array(), data.position(), data.remaining()) val compressedStream = ByteArrayOutputStream()
gzipStream.finish() GZIPOutputStream(compressedStream).use { gzipStream ->
gzipStream.write(data.array(), data.position(), data.remaining())
gzipStream.finish()
}
processedData = ByteBuffer.wrap(compressedStream.toByteArray())
} else {
Logger.w(TAG, "Gzip requested but not supported on this (opcode = ${opcode}, subOpcode = ${subOpcode}), falling back.")
contentEncoding = ContentEncoding.Raw
} }
processedData = ByteBuffer.wrap(compressedStream.toByteArray())
} }
if (processedData.remaining() + HEADER_SIZE > MAXIMUM_PACKET_SIZE) { if (processedData.remaining() + HEADER_SIZE > MAXIMUM_PACKET_SIZE) {
@ -429,7 +437,7 @@ class SyncSocketSession {
ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len) ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len)
_outputStream.write(_sendBufferEncrypted, 0, 4 + len) _outputStream.write(_sendBufferEncrypted, 0, 4 + len)
} }
Logger.v(TAG, "_outputStream.write (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.remaining(): ${processedData.remaining()}, sendDuration: ${sendDuration})") //Logger.v(TAG, "_outputStream.write (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.remaining(): ${processedData.remaining()}, sendDuration: ${sendDuration})")
} }
} }
} }
@ -471,7 +479,7 @@ class SyncSocketSession {
val subOpcode = data.get().toUByte() val subOpcode = data.get().toUByte()
val contentEncoding = data.get().toUByte() val contentEncoding = data.get().toUByte()
Logger.v(TAG, "handleData (opcode: ${opcode}, subOpcode: ${subOpcode}, data.size: ${data.remaining()}, sourceChannel.connectionId: ${sourceChannel?.connectionId})") //Logger.v(TAG, "handleData (opcode: ${opcode}, subOpcode: ${subOpcode}, data.size: ${data.remaining()}, sourceChannel.connectionId: ${sourceChannel?.connectionId})")
handlePacket(opcode, subOpcode, data, contentEncoding, sourceChannel) handlePacket(opcode, subOpcode, data, contentEncoding, sourceChannel)
} }
@ -825,6 +833,10 @@ class SyncSocketSession {
var data = d var data = d
if (contentEncoding == ContentEncoding.Gzip.value) { if (contentEncoding == ContentEncoding.Gzip.value) {
val isGzipSupported = opcode == Opcode.DATA.value
if (!isGzipSupported)
throw Exception("Failed to handle packet, gzip is not supported for this opcode (opcode = ${opcode}, subOpcode = ${subOpcode}, data.length = ${data.remaining()}).")
val compressedStream = ByteArrayOutputStream() val compressedStream = ByteArrayOutputStream()
GZIPOutputStream(compressedStream).use { gzipStream -> GZIPOutputStream(compressedStream).use { gzipStream ->
gzipStream.write(data.array(), data.position(), data.remaining()) gzipStream.write(data.array(), data.position(), data.remaining())
@ -1152,7 +1164,7 @@ class SyncSocketSession {
} }
} }
packet.rewind() packet.rewind()
send(Opcode.REQUEST.value, RequestOpcode.BULK_PUBLISH_RECORD.value, packet, contentEncoding = contentEncoding) send(Opcode.REQUEST.value, RequestOpcode.BULK_PUBLISH_RECORD.value, packet, ce = contentEncoding)
} catch (e: Exception) { } catch (e: Exception) {
_pendingPublishRequests.remove(requestId)?.completeExceptionally(e) _pendingPublishRequests.remove(requestId)?.completeExceptionally(e)
throw e throw e