diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/Channel.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/Channel.kt index 70b758ad..0b3b710d 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/internal/Channel.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/Channel.kt @@ -185,17 +185,24 @@ class ChannelRelayed( } } - override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?, contentEncoding: ContentEncoding?) { + override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?, ce: ContentEncoding?) { throwIfDisposed() + var contentEncoding: ContentEncoding? = ce var processedData = data if (data != null && contentEncoding == ContentEncoding.Gzip) { - val compressedStream = ByteArrayOutputStream() - GZIPOutputStream(compressedStream).use { gzipStream -> - gzipStream.write(data.array(), data.position(), data.remaining()) - gzipStream.finish() + val isGzipSupported = opcode == Opcode.DATA.value + if (isGzipSupported) { + val compressedStream = ByteArrayOutputStream() + GZIPOutputStream(compressedStream).use { gzipStream -> + gzipStream.write(data.array(), data.position(), data.remaining()) + gzipStream.finish() + } + processedData = ByteBuffer.wrap(compressedStream.toByteArray()) + } else { + Logger.w(TAG, "Gzip requested but not supported on this (opcode = ${opcode}, subOpcode = ${subOpcode}), falling back.") + contentEncoding = ContentEncoding.Raw } - processedData = ByteBuffer.wrap(compressedStream.toByteArray()) } val ENCRYPTION_OVERHEAD = 16 diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt index 8492faf3..5d7db083 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt @@ -9,6 +9,7 @@ import com.futo.platformplayer.noise.protocol.CipherStatePair import com.futo.platformplayer.noise.protocol.DHState import com.futo.platformplayer.noise.protocol.HandshakeState import com.futo.platformplayer.states.StateSync +import com.futo.platformplayer.sync.internal.ChannelRelayed.Companion import kotlinx.coroutines.CompletableDeferred import java.io.ByteArrayOutputStream import java.io.InputStream @@ -363,19 +364,26 @@ class SyncSocketSession { fun generateStreamId(): Int = synchronized(_streamIdGeneratorLock) { _streamIdGenerator++ } private fun generateRequestId(): Int = synchronized(_requestIdGeneratorLock) { _requestIdGenerator++ } - fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer, contentEncoding: ContentEncoding? = null) { + fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer, ce: ContentEncoding? = null) { ensureNotMainThread() Logger.v(TAG, "send (opcode: ${opcode}, subOpcode: ${subOpcode}, data.remaining(): ${data.remaining()})") + var contentEncoding: ContentEncoding? = ce var processedData = data if (contentEncoding == ContentEncoding.Gzip) { - val compressedStream = ByteArrayOutputStream() - GZIPOutputStream(compressedStream).use { gzipStream -> - gzipStream.write(data.array(), data.position(), data.remaining()) - gzipStream.finish() + val isGzipSupported = opcode == Opcode.DATA.value + if (isGzipSupported) { + val compressedStream = ByteArrayOutputStream() + GZIPOutputStream(compressedStream).use { gzipStream -> + gzipStream.write(data.array(), data.position(), data.remaining()) + gzipStream.finish() + } + processedData = ByteBuffer.wrap(compressedStream.toByteArray()) + } else { + Logger.w(TAG, "Gzip requested but not supported on this (opcode = ${opcode}, subOpcode = ${subOpcode}), falling back.") + contentEncoding = ContentEncoding.Raw } - processedData = ByteBuffer.wrap(compressedStream.toByteArray()) } if (processedData.remaining() + HEADER_SIZE > MAXIMUM_PACKET_SIZE) { @@ -429,7 +437,7 @@ class SyncSocketSession { ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len) _outputStream.write(_sendBufferEncrypted, 0, 4 + len) } - Logger.v(TAG, "_outputStream.write (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.remaining(): ${processedData.remaining()}, sendDuration: ${sendDuration})") + //Logger.v(TAG, "_outputStream.write (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.remaining(): ${processedData.remaining()}, sendDuration: ${sendDuration})") } } } @@ -471,7 +479,7 @@ class SyncSocketSession { val subOpcode = data.get().toUByte() val contentEncoding = data.get().toUByte() - Logger.v(TAG, "handleData (opcode: ${opcode}, subOpcode: ${subOpcode}, data.size: ${data.remaining()}, sourceChannel.connectionId: ${sourceChannel?.connectionId})") + //Logger.v(TAG, "handleData (opcode: ${opcode}, subOpcode: ${subOpcode}, data.size: ${data.remaining()}, sourceChannel.connectionId: ${sourceChannel?.connectionId})") handlePacket(opcode, subOpcode, data, contentEncoding, sourceChannel) } @@ -825,6 +833,10 @@ class SyncSocketSession { var data = d if (contentEncoding == ContentEncoding.Gzip.value) { + val isGzipSupported = opcode == Opcode.DATA.value + if (!isGzipSupported) + throw Exception("Failed to handle packet, gzip is not supported for this opcode (opcode = ${opcode}, subOpcode = ${subOpcode}, data.length = ${data.remaining()}).") + val compressedStream = ByteArrayOutputStream() GZIPOutputStream(compressedStream).use { gzipStream -> gzipStream.write(data.array(), data.position(), data.remaining()) @@ -1152,7 +1164,7 @@ class SyncSocketSession { } } packet.rewind() - send(Opcode.REQUEST.value, RequestOpcode.BULK_PUBLISH_RECORD.value, packet, contentEncoding = contentEncoding) + send(Opcode.REQUEST.value, RequestOpcode.BULK_PUBLISH_RECORD.value, packet, ce = contentEncoding) } catch (e: Exception) { _pendingPublishRequests.remove(requestId)?.completeExceptionally(e) throw e