From 156eb4d15ecc2f7d49194041bbfa88d877244120 Mon Sep 17 00:00:00 2001 From: Koen J Date: Sat, 3 May 2025 15:00:17 +0200 Subject: [PATCH] Implemented sync protocol gzip. --- .../platformplayer/sync/internal/Channel.kt | 53 +++++++++------ .../sync/internal/ContentEncoding.kt | 6 ++ .../sync/internal/SyncSession.kt | 8 +-- .../sync/internal/SyncSocketSession.kt | 67 +++++++++++++------ .../sync/internal/SyncStream.kt | 2 +- 5 files changed, 91 insertions(+), 45 deletions(-) create mode 100644 app/src/main/java/com/futo/platformplayer/sync/internal/ContentEncoding.kt diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/Channel.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/Channel.kt index aa9036f1..70b758ad 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/internal/Channel.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/Channel.kt @@ -5,9 +5,11 @@ import com.futo.platformplayer.noise.protocol.CipherStatePair import com.futo.platformplayer.noise.protocol.DHState import com.futo.platformplayer.noise.protocol.HandshakeState import com.futo.platformplayer.states.StateSync +import java.io.ByteArrayOutputStream import java.nio.ByteBuffer import java.nio.ByteOrder import java.util.Base64 +import java.util.zip.GZIPOutputStream interface IChannel : AutoCloseable { val remotePublicKey: String? @@ -15,7 +17,7 @@ interface IChannel : AutoCloseable { var authorizable: IAuthorizable? var syncSession: SyncSession? fun setDataHandler(onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)?) - fun send(opcode: UByte, subOpcode: UByte = 0u, data: ByteBuffer? = null) + fun send(opcode: UByte, subOpcode: UByte = 0u, data: ByteBuffer? = null, contentEncoding: ContentEncoding? = null) fun setCloseHandler(onClose: ((IChannel) -> Unit)?) val linkType: LinkType } @@ -49,9 +51,9 @@ class ChannelSocket(private val session: SyncSocketSession) : IChannel { onData?.invoke(session, this, opcode, subOpcode, data) } - override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?) { + override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?, contentEncoding: ContentEncoding?) { if (data != null) { - session.send(opcode, subOpcode, data) + session.send(opcode, subOpcode, data, contentEncoding) } else { session.send(opcode, subOpcode) } @@ -183,53 +185,63 @@ class ChannelRelayed( } } - override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?) { + override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?, contentEncoding: ContentEncoding?) { throwIfDisposed() - val actualCount = data?.remaining() ?: 0 + var processedData = data + if (data != null && contentEncoding == ContentEncoding.Gzip) { + val compressedStream = ByteArrayOutputStream() + GZIPOutputStream(compressedStream).use { gzipStream -> + gzipStream.write(data.array(), data.position(), data.remaining()) + gzipStream.finish() + } + processedData = ByteBuffer.wrap(compressedStream.toByteArray()) + } + val ENCRYPTION_OVERHEAD = 16 val CONNECTION_ID_SIZE = 8 - val HEADER_SIZE = 6 + val HEADER_SIZE = 7 val MAX_DATA_PER_PACKET = SyncSocketSession.MAXIMUM_PACKET_SIZE - HEADER_SIZE - CONNECTION_ID_SIZE - ENCRYPTION_OVERHEAD - 16 - Logger.v(TAG, "Send (opcode: ${opcode}, subOpcode: ${subOpcode}, data.size: ${data?.remaining()})") + Logger.v(TAG, "Send (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.size: ${processedData?.remaining()})") - if (actualCount > MAX_DATA_PER_PACKET && data != null) { + if (processedData != null && processedData.remaining() > MAX_DATA_PER_PACKET) { val streamId = session.generateStreamId() - val totalSize = actualCount var sendOffset = 0 - while (sendOffset < totalSize) { - val bytesRemaining = totalSize - sendOffset - val bytesToSend = minOf(MAX_DATA_PER_PACKET - 8 - 2, bytesRemaining) + while (sendOffset < processedData.remaining()) { + val bytesRemaining = processedData.remaining() - sendOffset + val bytesToSend = minOf(MAX_DATA_PER_PACKET - 8 - HEADER_SIZE + 4, bytesRemaining) val streamData: ByteArray val streamOpcode: StreamOpcode if (sendOffset == 0) { streamOpcode = StreamOpcode.START - streamData = ByteArray(4 + 4 + 1 + 1 + bytesToSend) + streamData = ByteArray(4 + HEADER_SIZE + bytesToSend) ByteBuffer.wrap(streamData).order(ByteOrder.LITTLE_ENDIAN).apply { putInt(streamId) - putInt(totalSize) + putInt(processedData.remaining()) put(opcode.toByte()) put(subOpcode.toByte()) - put(data.array(), data.position() + sendOffset, bytesToSend) + put(contentEncoding?.value?.toByte() ?: 0.toByte()) + put(processedData.array(), processedData.position() + sendOffset, bytesToSend) } } else { streamData = ByteArray(4 + 4 + bytesToSend) ByteBuffer.wrap(streamData).order(ByteOrder.LITTLE_ENDIAN).apply { putInt(streamId) putInt(sendOffset) - put(data.array(), data.position() + sendOffset, bytesToSend) + put(processedData.array(), processedData.position() + sendOffset, bytesToSend) } streamOpcode = if (bytesToSend < bytesRemaining) StreamOpcode.DATA else StreamOpcode.END } val fullPacket = ByteArray(HEADER_SIZE + streamData.size) ByteBuffer.wrap(fullPacket).order(ByteOrder.LITTLE_ENDIAN).apply { - putInt(streamData.size + 2) + putInt(streamData.size + HEADER_SIZE - 4) put(Opcode.STREAM.value.toByte()) put(streamOpcode.value.toByte()) + put(ContentEncoding.Raw.value.toByte()) put(streamData) } @@ -237,12 +249,13 @@ class ChannelRelayed( sendOffset += bytesToSend } } else { - val packet = ByteArray(HEADER_SIZE + actualCount) + val packet = ByteArray(HEADER_SIZE + (processedData?.remaining() ?: 0)) ByteBuffer.wrap(packet).order(ByteOrder.LITTLE_ENDIAN).apply { - putInt(actualCount + 2) + putInt((processedData?.remaining() ?: 0) + HEADER_SIZE - 4) put(opcode.toByte()) put(subOpcode.toByte()) - if (actualCount > 0 && data != null) put(data.array(), data.position(), actualCount) + put(contentEncoding?.value?.toByte() ?: ContentEncoding.Raw.value.toByte()) + if (processedData != null && processedData.remaining() > 0) put(processedData.array(), processedData.position(), processedData.remaining()) } sendPacket(packet) } diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/ContentEncoding.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/ContentEncoding.kt new file mode 100644 index 00000000..ab9ed6a9 --- /dev/null +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/ContentEncoding.kt @@ -0,0 +1,6 @@ +package com.futo.platformplayer.sync.internal + +enum class ContentEncoding(val value: UByte) { + Raw(0u), + Gzip(1u) +} \ No newline at end of file diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSession.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSession.kt index 10b24d89..da9c321d 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSession.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSession.kt @@ -196,14 +196,14 @@ class SyncSession : IAuthorizable { } fun sendData(subOpcode: UByte, data: String) { - send(Opcode.DATA.value, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8))) + send(Opcode.DATA.value, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)), ContentEncoding.Gzip) } fun send(opcode: UByte, subOpcode: UByte, data: String) { - send(opcode, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8))) + send(opcode, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)), ContentEncoding.Gzip) } - fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer? = null) { + fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer? = null, contentEncoding: ContentEncoding? = null) { val channels = synchronized(_channels) { _channels.sortedBy { it.linkType.ordinal }.toList() } if (channels.isEmpty()) { //TODO: Should this throw? @@ -214,7 +214,7 @@ class SyncSession : IAuthorizable { var sent = false for (channel in channels) { try { - channel.send(opcode, subOpcode, data) + channel.send(opcode, subOpcode, data, contentEncoding) sent = true break } catch (e: Throwable) { diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt index 4f74c5ff..8492faf3 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt @@ -10,6 +10,7 @@ import com.futo.platformplayer.noise.protocol.DHState import com.futo.platformplayer.noise.protocol.HandshakeState import com.futo.platformplayer.states.StateSync import kotlinx.coroutines.CompletableDeferred +import java.io.ByteArrayOutputStream import java.io.InputStream import java.io.OutputStream import java.net.Inet4Address @@ -22,6 +23,7 @@ import java.nio.ByteOrder import java.util.Base64 import java.util.Locale import java.util.concurrent.ConcurrentHashMap +import java.util.zip.GZIPOutputStream import kotlin.math.min import kotlin.system.measureTimeMillis import kotlin.time.measureTime @@ -361,27 +363,37 @@ class SyncSocketSession { fun generateStreamId(): Int = synchronized(_streamIdGeneratorLock) { _streamIdGenerator++ } private fun generateRequestId(): Int = synchronized(_requestIdGeneratorLock) { _requestIdGenerator++ } - fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer) { + fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer, contentEncoding: ContentEncoding? = null) { ensureNotMainThread() Logger.v(TAG, "send (opcode: ${opcode}, subOpcode: ${subOpcode}, data.remaining(): ${data.remaining()})") - if (data.remaining() + HEADER_SIZE > MAXIMUM_PACKET_SIZE) { + var processedData = data + if (contentEncoding == ContentEncoding.Gzip) { + val compressedStream = ByteArrayOutputStream() + GZIPOutputStream(compressedStream).use { gzipStream -> + gzipStream.write(data.array(), data.position(), data.remaining()) + gzipStream.finish() + } + processedData = ByteBuffer.wrap(compressedStream.toByteArray()) + } + + if (processedData.remaining() + HEADER_SIZE > MAXIMUM_PACKET_SIZE) { val segmentSize = MAXIMUM_PACKET_SIZE - HEADER_SIZE val segmentData = ByteArray(segmentSize) var sendOffset = 0 val id = generateStreamId() - while (sendOffset < data.remaining()) { - val bytesRemaining = data.remaining() - sendOffset + while (sendOffset < processedData.remaining()) { + val bytesRemaining = processedData.remaining() - sendOffset var bytesToSend: Int var segmentPacketSize: Int val streamOp: StreamOpcode if (sendOffset == 0) { streamOp = StreamOpcode.START - bytesToSend = segmentSize - 4 - 4 - 1 - 1 - segmentPacketSize = bytesToSend + 4 + 4 + 1 + 1 + bytesToSend = segmentSize - 4 - HEADER_SIZE + segmentPacketSize = bytesToSend + 4 + HEADER_SIZE } else { bytesToSend = minOf(segmentSize - 4 - 4, bytesRemaining) streamOp = if (bytesToSend >= bytesRemaining) StreamOpcode.END else StreamOpcode.DATA @@ -390,12 +402,13 @@ class SyncSocketSession { ByteBuffer.wrap(segmentData).order(ByteOrder.LITTLE_ENDIAN).apply { putInt(id) - putInt(if (streamOp == StreamOpcode.START) data.remaining() else sendOffset) + putInt(if (streamOp == StreamOpcode.START) processedData.remaining() else sendOffset) if (streamOp == StreamOpcode.START) { put(opcode.toByte()) put(subOpcode.toByte()) + put(contentEncoding?.value?.toByte() ?: ContentEncoding.Raw.value.toByte()) } - put(data.array(), data.position() + sendOffset, bytesToSend) + put(processedData.array(), processedData.position() + sendOffset, bytesToSend) } send(Opcode.STREAM.value, streamOp.value, ByteBuffer.wrap(segmentData, 0, segmentPacketSize)) @@ -404,18 +417,19 @@ class SyncSocketSession { } else { synchronized(_sendLockObject) { ByteBuffer.wrap(_sendBuffer).order(ByteOrder.LITTLE_ENDIAN).apply { - putInt(data.remaining() + 2) + putInt(processedData.remaining() + HEADER_SIZE - 4) put(opcode.toByte()) put(subOpcode.toByte()) - put(data.array(), data.position(), data.remaining()) + put(contentEncoding?.value?.toByte() ?: ContentEncoding.Raw.value.toByte()) + put(processedData.array(), processedData.position(), processedData.remaining()) } - val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 4, data.remaining() + HEADER_SIZE) + val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 4, processedData.remaining() + HEADER_SIZE) val sendDuration = measureTimeMillis { ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len) _outputStream.write(_sendBufferEncrypted, 0, 4 + len) } - Logger.v(TAG, "_outputStream.write (opcode: ${opcode}, subOpcode: ${subOpcode}, data.remaining(): ${data.remaining()}, sendDuration: ${sendDuration})") + Logger.v(TAG, "_outputStream.write (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.remaining(): ${processedData.remaining()}, sendDuration: ${sendDuration})") } } } @@ -428,6 +442,7 @@ class SyncSocketSession { ByteBuffer.wrap(_sendBuffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(2) _sendBuffer.asUByteArray()[4] = opcode _sendBuffer.asUByteArray()[5] = subOpcode + _sendBuffer.asUByteArray()[6] = ContentEncoding.Raw.value //Logger.i(TAG, "Encrypting message (opcode = ${opcode}, subOpcode = ${subOpcode}, size = ${HEADER_SIZE})") @@ -446,7 +461,7 @@ class SyncSocketSession { private fun handleData(data: ByteBuffer, sourceChannel: ChannelRelayed?) { val length = data.remaining() if (length < HEADER_SIZE) - throw Exception("Packet must be at least 6 bytes (header size)") + throw Exception("Packet must be at least ${HEADER_SIZE} bytes (header size)") val size = data.int if (size != length - 4) @@ -454,9 +469,10 @@ class SyncSocketSession { val opcode = data.get().toUByte() val subOpcode = data.get().toUByte() + val contentEncoding = data.get().toUByte() Logger.v(TAG, "handleData (opcode: ${opcode}, subOpcode: ${subOpcode}, data.size: ${data.remaining()}, sourceChannel.connectionId: ${sourceChannel?.connectionId})") - handlePacket(opcode, subOpcode, data, sourceChannel) + handlePacket(opcode, subOpcode, data, contentEncoding, sourceChannel) } private fun handleRequest(subOpcode: UByte, data: ByteBuffer, sourceChannel: ChannelRelayed?) { @@ -804,9 +820,19 @@ class SyncSocketSession { } } - private fun handlePacket(opcode: UByte, subOpcode: UByte, data: ByteBuffer, sourceChannel: ChannelRelayed?) { + private fun handlePacket(opcode: UByte, subOpcode: UByte, d: ByteBuffer, contentEncoding: UByte, sourceChannel: ChannelRelayed?) { Logger.i(TAG, "Handle packet (opcode = ${opcode}, subOpcode = ${subOpcode})") + var data = d + if (contentEncoding == ContentEncoding.Gzip.value) { + val compressedStream = ByteArrayOutputStream() + GZIPOutputStream(compressedStream).use { gzipStream -> + gzipStream.write(data.array(), data.position(), data.remaining()) + gzipStream.finish() + } + data = ByteBuffer.wrap(compressedStream.toByteArray()) + } + when (opcode) { Opcode.PING.value -> { if (sourceChannel != null) @@ -844,8 +870,9 @@ class SyncSocketSession { val expectedSize = data.int val op = data.get().toUByte() val subOp = data.get().toUByte() + val ce = data.get().toUByte() - val syncStream = SyncStream(expectedSize, op, subOp) + val syncStream = SyncStream(expectedSize, op, subOp, ce) if (data.remaining() > 0) { syncStream.add(data.array(), data.position(), data.remaining()) } @@ -890,7 +917,7 @@ class SyncSocketSession { throw Exception("After sync stream end, the stream must be complete") } - handlePacket(syncStream.opcode, syncStream.subOpcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) }, sourceChannel) + handlePacket(syncStream.opcode, syncStream.subOpcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) }, contentEncoding, sourceChannel) } } Opcode.DATA.value -> { @@ -1070,7 +1097,7 @@ class SyncSocketSession { send(Opcode.NOTIFY.value, NotifyOpcode.CONNECTION_INFO.value, publishBytes) } - suspend fun publishRecords(consumerPublicKeys: List, key: String, data: ByteArray): Boolean { + suspend fun publishRecords(consumerPublicKeys: List, key: String, data: ByteArray, contentEncoding: ContentEncoding? = null): Boolean { val keyBytes = key.toByteArray(Charsets.UTF_8) if (key.isEmpty() || keyBytes.size > 32) throw IllegalArgumentException("Key must be 1-32 bytes") if (consumerPublicKeys.isEmpty()) throw IllegalArgumentException("At least one consumer required") @@ -1125,7 +1152,7 @@ class SyncSocketSession { } } packet.rewind() - send(Opcode.REQUEST.value, RequestOpcode.BULK_PUBLISH_RECORD.value, packet) + send(Opcode.REQUEST.value, RequestOpcode.BULK_PUBLISH_RECORD.value, packet, contentEncoding = contentEncoding) } catch (e: Exception) { _pendingPublishRequests.remove(requestId)?.completeExceptionally(e) throw e @@ -1245,6 +1272,6 @@ class SyncSocketSession { private const val TAG = "SyncSocketSession" const val MAXIMUM_PACKET_SIZE = 65535 - 16 const val MAXIMUM_PACKET_SIZE_ENCRYPTED = MAXIMUM_PACKET_SIZE + 16 - const val HEADER_SIZE = 6 + const val HEADER_SIZE = 7 } } \ No newline at end of file diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncStream.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncStream.kt index d558feef..b7ed0626 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncStream.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncStream.kt @@ -1,6 +1,6 @@ package com.futo.platformplayer.sync.internal -class SyncStream(expectedSize: Int, val opcode: UByte, val subOpcode: UByte) { +class SyncStream(expectedSize: Int, val opcode: UByte, val subOpcode: UByte, val contentEncoding: UByte) { companion object { const val MAXIMUM_SIZE = 10_000_000 }