mirror of
https://gitlab.futo.org/videostreaming/grayjay.git
synced 2025-09-12 12:32:27 +00:00
Implemented sync protocol gzip.
This commit is contained in:
parent
dabcfd965f
commit
156eb4d15e
5 changed files with 91 additions and 45 deletions
|
@ -5,9 +5,11 @@ import com.futo.platformplayer.noise.protocol.CipherStatePair
|
||||||
import com.futo.platformplayer.noise.protocol.DHState
|
import com.futo.platformplayer.noise.protocol.DHState
|
||||||
import com.futo.platformplayer.noise.protocol.HandshakeState
|
import com.futo.platformplayer.noise.protocol.HandshakeState
|
||||||
import com.futo.platformplayer.states.StateSync
|
import com.futo.platformplayer.states.StateSync
|
||||||
|
import java.io.ByteArrayOutputStream
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
import java.nio.ByteOrder
|
import java.nio.ByteOrder
|
||||||
import java.util.Base64
|
import java.util.Base64
|
||||||
|
import java.util.zip.GZIPOutputStream
|
||||||
|
|
||||||
interface IChannel : AutoCloseable {
|
interface IChannel : AutoCloseable {
|
||||||
val remotePublicKey: String?
|
val remotePublicKey: String?
|
||||||
|
@ -15,7 +17,7 @@ interface IChannel : AutoCloseable {
|
||||||
var authorizable: IAuthorizable?
|
var authorizable: IAuthorizable?
|
||||||
var syncSession: SyncSession?
|
var syncSession: SyncSession?
|
||||||
fun setDataHandler(onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)?)
|
fun setDataHandler(onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)?)
|
||||||
fun send(opcode: UByte, subOpcode: UByte = 0u, data: ByteBuffer? = null)
|
fun send(opcode: UByte, subOpcode: UByte = 0u, data: ByteBuffer? = null, contentEncoding: ContentEncoding? = null)
|
||||||
fun setCloseHandler(onClose: ((IChannel) -> Unit)?)
|
fun setCloseHandler(onClose: ((IChannel) -> Unit)?)
|
||||||
val linkType: LinkType
|
val linkType: LinkType
|
||||||
}
|
}
|
||||||
|
@ -49,9 +51,9 @@ class ChannelSocket(private val session: SyncSocketSession) : IChannel {
|
||||||
onData?.invoke(session, this, opcode, subOpcode, data)
|
onData?.invoke(session, this, opcode, subOpcode, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?) {
|
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?, contentEncoding: ContentEncoding?) {
|
||||||
if (data != null) {
|
if (data != null) {
|
||||||
session.send(opcode, subOpcode, data)
|
session.send(opcode, subOpcode, data, contentEncoding)
|
||||||
} else {
|
} else {
|
||||||
session.send(opcode, subOpcode)
|
session.send(opcode, subOpcode)
|
||||||
}
|
}
|
||||||
|
@ -183,53 +185,63 @@ class ChannelRelayed(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?) {
|
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?, contentEncoding: ContentEncoding?) {
|
||||||
throwIfDisposed()
|
throwIfDisposed()
|
||||||
|
|
||||||
val actualCount = data?.remaining() ?: 0
|
var processedData = data
|
||||||
|
if (data != null && contentEncoding == ContentEncoding.Gzip) {
|
||||||
|
val compressedStream = ByteArrayOutputStream()
|
||||||
|
GZIPOutputStream(compressedStream).use { gzipStream ->
|
||||||
|
gzipStream.write(data.array(), data.position(), data.remaining())
|
||||||
|
gzipStream.finish()
|
||||||
|
}
|
||||||
|
processedData = ByteBuffer.wrap(compressedStream.toByteArray())
|
||||||
|
}
|
||||||
|
|
||||||
val ENCRYPTION_OVERHEAD = 16
|
val ENCRYPTION_OVERHEAD = 16
|
||||||
val CONNECTION_ID_SIZE = 8
|
val CONNECTION_ID_SIZE = 8
|
||||||
val HEADER_SIZE = 6
|
val HEADER_SIZE = 7
|
||||||
val MAX_DATA_PER_PACKET = SyncSocketSession.MAXIMUM_PACKET_SIZE - HEADER_SIZE - CONNECTION_ID_SIZE - ENCRYPTION_OVERHEAD - 16
|
val MAX_DATA_PER_PACKET = SyncSocketSession.MAXIMUM_PACKET_SIZE - HEADER_SIZE - CONNECTION_ID_SIZE - ENCRYPTION_OVERHEAD - 16
|
||||||
|
|
||||||
Logger.v(TAG, "Send (opcode: ${opcode}, subOpcode: ${subOpcode}, data.size: ${data?.remaining()})")
|
Logger.v(TAG, "Send (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.size: ${processedData?.remaining()})")
|
||||||
|
|
||||||
if (actualCount > MAX_DATA_PER_PACKET && data != null) {
|
if (processedData != null && processedData.remaining() > MAX_DATA_PER_PACKET) {
|
||||||
val streamId = session.generateStreamId()
|
val streamId = session.generateStreamId()
|
||||||
val totalSize = actualCount
|
|
||||||
var sendOffset = 0
|
var sendOffset = 0
|
||||||
|
|
||||||
while (sendOffset < totalSize) {
|
while (sendOffset < processedData.remaining()) {
|
||||||
val bytesRemaining = totalSize - sendOffset
|
val bytesRemaining = processedData.remaining() - sendOffset
|
||||||
val bytesToSend = minOf(MAX_DATA_PER_PACKET - 8 - 2, bytesRemaining)
|
val bytesToSend = minOf(MAX_DATA_PER_PACKET - 8 - HEADER_SIZE + 4, bytesRemaining)
|
||||||
|
|
||||||
val streamData: ByteArray
|
val streamData: ByteArray
|
||||||
val streamOpcode: StreamOpcode
|
val streamOpcode: StreamOpcode
|
||||||
if (sendOffset == 0) {
|
if (sendOffset == 0) {
|
||||||
streamOpcode = StreamOpcode.START
|
streamOpcode = StreamOpcode.START
|
||||||
streamData = ByteArray(4 + 4 + 1 + 1 + bytesToSend)
|
streamData = ByteArray(4 + HEADER_SIZE + bytesToSend)
|
||||||
ByteBuffer.wrap(streamData).order(ByteOrder.LITTLE_ENDIAN).apply {
|
ByteBuffer.wrap(streamData).order(ByteOrder.LITTLE_ENDIAN).apply {
|
||||||
putInt(streamId)
|
putInt(streamId)
|
||||||
putInt(totalSize)
|
putInt(processedData.remaining())
|
||||||
put(opcode.toByte())
|
put(opcode.toByte())
|
||||||
put(subOpcode.toByte())
|
put(subOpcode.toByte())
|
||||||
put(data.array(), data.position() + sendOffset, bytesToSend)
|
put(contentEncoding?.value?.toByte() ?: 0.toByte())
|
||||||
|
put(processedData.array(), processedData.position() + sendOffset, bytesToSend)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
streamData = ByteArray(4 + 4 + bytesToSend)
|
streamData = ByteArray(4 + 4 + bytesToSend)
|
||||||
ByteBuffer.wrap(streamData).order(ByteOrder.LITTLE_ENDIAN).apply {
|
ByteBuffer.wrap(streamData).order(ByteOrder.LITTLE_ENDIAN).apply {
|
||||||
putInt(streamId)
|
putInt(streamId)
|
||||||
putInt(sendOffset)
|
putInt(sendOffset)
|
||||||
put(data.array(), data.position() + sendOffset, bytesToSend)
|
put(processedData.array(), processedData.position() + sendOffset, bytesToSend)
|
||||||
}
|
}
|
||||||
streamOpcode = if (bytesToSend < bytesRemaining) StreamOpcode.DATA else StreamOpcode.END
|
streamOpcode = if (bytesToSend < bytesRemaining) StreamOpcode.DATA else StreamOpcode.END
|
||||||
}
|
}
|
||||||
|
|
||||||
val fullPacket = ByteArray(HEADER_SIZE + streamData.size)
|
val fullPacket = ByteArray(HEADER_SIZE + streamData.size)
|
||||||
ByteBuffer.wrap(fullPacket).order(ByteOrder.LITTLE_ENDIAN).apply {
|
ByteBuffer.wrap(fullPacket).order(ByteOrder.LITTLE_ENDIAN).apply {
|
||||||
putInt(streamData.size + 2)
|
putInt(streamData.size + HEADER_SIZE - 4)
|
||||||
put(Opcode.STREAM.value.toByte())
|
put(Opcode.STREAM.value.toByte())
|
||||||
put(streamOpcode.value.toByte())
|
put(streamOpcode.value.toByte())
|
||||||
|
put(ContentEncoding.Raw.value.toByte())
|
||||||
put(streamData)
|
put(streamData)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,12 +249,13 @@ class ChannelRelayed(
|
||||||
sendOffset += bytesToSend
|
sendOffset += bytesToSend
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
val packet = ByteArray(HEADER_SIZE + actualCount)
|
val packet = ByteArray(HEADER_SIZE + (processedData?.remaining() ?: 0))
|
||||||
ByteBuffer.wrap(packet).order(ByteOrder.LITTLE_ENDIAN).apply {
|
ByteBuffer.wrap(packet).order(ByteOrder.LITTLE_ENDIAN).apply {
|
||||||
putInt(actualCount + 2)
|
putInt((processedData?.remaining() ?: 0) + HEADER_SIZE - 4)
|
||||||
put(opcode.toByte())
|
put(opcode.toByte())
|
||||||
put(subOpcode.toByte())
|
put(subOpcode.toByte())
|
||||||
if (actualCount > 0 && data != null) put(data.array(), data.position(), actualCount)
|
put(contentEncoding?.value?.toByte() ?: ContentEncoding.Raw.value.toByte())
|
||||||
|
if (processedData != null && processedData.remaining() > 0) put(processedData.array(), processedData.position(), processedData.remaining())
|
||||||
}
|
}
|
||||||
sendPacket(packet)
|
sendPacket(packet)
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,6 @@
|
||||||
|
package com.futo.platformplayer.sync.internal
|
||||||
|
|
||||||
|
enum class ContentEncoding(val value: UByte) {
|
||||||
|
Raw(0u),
|
||||||
|
Gzip(1u)
|
||||||
|
}
|
|
@ -196,14 +196,14 @@ class SyncSession : IAuthorizable {
|
||||||
}
|
}
|
||||||
|
|
||||||
fun sendData(subOpcode: UByte, data: String) {
|
fun sendData(subOpcode: UByte, data: String) {
|
||||||
send(Opcode.DATA.value, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)))
|
send(Opcode.DATA.value, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)), ContentEncoding.Gzip)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun send(opcode: UByte, subOpcode: UByte, data: String) {
|
fun send(opcode: UByte, subOpcode: UByte, data: String) {
|
||||||
send(opcode, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)))
|
send(opcode, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)), ContentEncoding.Gzip)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer? = null) {
|
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer? = null, contentEncoding: ContentEncoding? = null) {
|
||||||
val channels = synchronized(_channels) { _channels.sortedBy { it.linkType.ordinal }.toList() }
|
val channels = synchronized(_channels) { _channels.sortedBy { it.linkType.ordinal }.toList() }
|
||||||
if (channels.isEmpty()) {
|
if (channels.isEmpty()) {
|
||||||
//TODO: Should this throw?
|
//TODO: Should this throw?
|
||||||
|
@ -214,7 +214,7 @@ class SyncSession : IAuthorizable {
|
||||||
var sent = false
|
var sent = false
|
||||||
for (channel in channels) {
|
for (channel in channels) {
|
||||||
try {
|
try {
|
||||||
channel.send(opcode, subOpcode, data)
|
channel.send(opcode, subOpcode, data, contentEncoding)
|
||||||
sent = true
|
sent = true
|
||||||
break
|
break
|
||||||
} catch (e: Throwable) {
|
} catch (e: Throwable) {
|
||||||
|
|
|
@ -10,6 +10,7 @@ import com.futo.platformplayer.noise.protocol.DHState
|
||||||
import com.futo.platformplayer.noise.protocol.HandshakeState
|
import com.futo.platformplayer.noise.protocol.HandshakeState
|
||||||
import com.futo.platformplayer.states.StateSync
|
import com.futo.platformplayer.states.StateSync
|
||||||
import kotlinx.coroutines.CompletableDeferred
|
import kotlinx.coroutines.CompletableDeferred
|
||||||
|
import java.io.ByteArrayOutputStream
|
||||||
import java.io.InputStream
|
import java.io.InputStream
|
||||||
import java.io.OutputStream
|
import java.io.OutputStream
|
||||||
import java.net.Inet4Address
|
import java.net.Inet4Address
|
||||||
|
@ -22,6 +23,7 @@ import java.nio.ByteOrder
|
||||||
import java.util.Base64
|
import java.util.Base64
|
||||||
import java.util.Locale
|
import java.util.Locale
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import java.util.zip.GZIPOutputStream
|
||||||
import kotlin.math.min
|
import kotlin.math.min
|
||||||
import kotlin.system.measureTimeMillis
|
import kotlin.system.measureTimeMillis
|
||||||
import kotlin.time.measureTime
|
import kotlin.time.measureTime
|
||||||
|
@ -361,27 +363,37 @@ class SyncSocketSession {
|
||||||
fun generateStreamId(): Int = synchronized(_streamIdGeneratorLock) { _streamIdGenerator++ }
|
fun generateStreamId(): Int = synchronized(_streamIdGeneratorLock) { _streamIdGenerator++ }
|
||||||
private fun generateRequestId(): Int = synchronized(_requestIdGeneratorLock) { _requestIdGenerator++ }
|
private fun generateRequestId(): Int = synchronized(_requestIdGeneratorLock) { _requestIdGenerator++ }
|
||||||
|
|
||||||
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer) {
|
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer, contentEncoding: ContentEncoding? = null) {
|
||||||
ensureNotMainThread()
|
ensureNotMainThread()
|
||||||
|
|
||||||
Logger.v(TAG, "send (opcode: ${opcode}, subOpcode: ${subOpcode}, data.remaining(): ${data.remaining()})")
|
Logger.v(TAG, "send (opcode: ${opcode}, subOpcode: ${subOpcode}, data.remaining(): ${data.remaining()})")
|
||||||
|
|
||||||
if (data.remaining() + HEADER_SIZE > MAXIMUM_PACKET_SIZE) {
|
var processedData = data
|
||||||
|
if (contentEncoding == ContentEncoding.Gzip) {
|
||||||
|
val compressedStream = ByteArrayOutputStream()
|
||||||
|
GZIPOutputStream(compressedStream).use { gzipStream ->
|
||||||
|
gzipStream.write(data.array(), data.position(), data.remaining())
|
||||||
|
gzipStream.finish()
|
||||||
|
}
|
||||||
|
processedData = ByteBuffer.wrap(compressedStream.toByteArray())
|
||||||
|
}
|
||||||
|
|
||||||
|
if (processedData.remaining() + HEADER_SIZE > MAXIMUM_PACKET_SIZE) {
|
||||||
val segmentSize = MAXIMUM_PACKET_SIZE - HEADER_SIZE
|
val segmentSize = MAXIMUM_PACKET_SIZE - HEADER_SIZE
|
||||||
val segmentData = ByteArray(segmentSize)
|
val segmentData = ByteArray(segmentSize)
|
||||||
var sendOffset = 0
|
var sendOffset = 0
|
||||||
val id = generateStreamId()
|
val id = generateStreamId()
|
||||||
|
|
||||||
while (sendOffset < data.remaining()) {
|
while (sendOffset < processedData.remaining()) {
|
||||||
val bytesRemaining = data.remaining() - sendOffset
|
val bytesRemaining = processedData.remaining() - sendOffset
|
||||||
var bytesToSend: Int
|
var bytesToSend: Int
|
||||||
var segmentPacketSize: Int
|
var segmentPacketSize: Int
|
||||||
val streamOp: StreamOpcode
|
val streamOp: StreamOpcode
|
||||||
|
|
||||||
if (sendOffset == 0) {
|
if (sendOffset == 0) {
|
||||||
streamOp = StreamOpcode.START
|
streamOp = StreamOpcode.START
|
||||||
bytesToSend = segmentSize - 4 - 4 - 1 - 1
|
bytesToSend = segmentSize - 4 - HEADER_SIZE
|
||||||
segmentPacketSize = bytesToSend + 4 + 4 + 1 + 1
|
segmentPacketSize = bytesToSend + 4 + HEADER_SIZE
|
||||||
} else {
|
} else {
|
||||||
bytesToSend = minOf(segmentSize - 4 - 4, bytesRemaining)
|
bytesToSend = minOf(segmentSize - 4 - 4, bytesRemaining)
|
||||||
streamOp = if (bytesToSend >= bytesRemaining) StreamOpcode.END else StreamOpcode.DATA
|
streamOp = if (bytesToSend >= bytesRemaining) StreamOpcode.END else StreamOpcode.DATA
|
||||||
|
@ -390,12 +402,13 @@ class SyncSocketSession {
|
||||||
|
|
||||||
ByteBuffer.wrap(segmentData).order(ByteOrder.LITTLE_ENDIAN).apply {
|
ByteBuffer.wrap(segmentData).order(ByteOrder.LITTLE_ENDIAN).apply {
|
||||||
putInt(id)
|
putInt(id)
|
||||||
putInt(if (streamOp == StreamOpcode.START) data.remaining() else sendOffset)
|
putInt(if (streamOp == StreamOpcode.START) processedData.remaining() else sendOffset)
|
||||||
if (streamOp == StreamOpcode.START) {
|
if (streamOp == StreamOpcode.START) {
|
||||||
put(opcode.toByte())
|
put(opcode.toByte())
|
||||||
put(subOpcode.toByte())
|
put(subOpcode.toByte())
|
||||||
|
put(contentEncoding?.value?.toByte() ?: ContentEncoding.Raw.value.toByte())
|
||||||
}
|
}
|
||||||
put(data.array(), data.position() + sendOffset, bytesToSend)
|
put(processedData.array(), processedData.position() + sendOffset, bytesToSend)
|
||||||
}
|
}
|
||||||
|
|
||||||
send(Opcode.STREAM.value, streamOp.value, ByteBuffer.wrap(segmentData, 0, segmentPacketSize))
|
send(Opcode.STREAM.value, streamOp.value, ByteBuffer.wrap(segmentData, 0, segmentPacketSize))
|
||||||
|
@ -404,18 +417,19 @@ class SyncSocketSession {
|
||||||
} else {
|
} else {
|
||||||
synchronized(_sendLockObject) {
|
synchronized(_sendLockObject) {
|
||||||
ByteBuffer.wrap(_sendBuffer).order(ByteOrder.LITTLE_ENDIAN).apply {
|
ByteBuffer.wrap(_sendBuffer).order(ByteOrder.LITTLE_ENDIAN).apply {
|
||||||
putInt(data.remaining() + 2)
|
putInt(processedData.remaining() + HEADER_SIZE - 4)
|
||||||
put(opcode.toByte())
|
put(opcode.toByte())
|
||||||
put(subOpcode.toByte())
|
put(subOpcode.toByte())
|
||||||
put(data.array(), data.position(), data.remaining())
|
put(contentEncoding?.value?.toByte() ?: ContentEncoding.Raw.value.toByte())
|
||||||
|
put(processedData.array(), processedData.position(), processedData.remaining())
|
||||||
}
|
}
|
||||||
|
|
||||||
val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 4, data.remaining() + HEADER_SIZE)
|
val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 4, processedData.remaining() + HEADER_SIZE)
|
||||||
val sendDuration = measureTimeMillis {
|
val sendDuration = measureTimeMillis {
|
||||||
ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len)
|
ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len)
|
||||||
_outputStream.write(_sendBufferEncrypted, 0, 4 + len)
|
_outputStream.write(_sendBufferEncrypted, 0, 4 + len)
|
||||||
}
|
}
|
||||||
Logger.v(TAG, "_outputStream.write (opcode: ${opcode}, subOpcode: ${subOpcode}, data.remaining(): ${data.remaining()}, sendDuration: ${sendDuration})")
|
Logger.v(TAG, "_outputStream.write (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.remaining(): ${processedData.remaining()}, sendDuration: ${sendDuration})")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -428,6 +442,7 @@ class SyncSocketSession {
|
||||||
ByteBuffer.wrap(_sendBuffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(2)
|
ByteBuffer.wrap(_sendBuffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(2)
|
||||||
_sendBuffer.asUByteArray()[4] = opcode
|
_sendBuffer.asUByteArray()[4] = opcode
|
||||||
_sendBuffer.asUByteArray()[5] = subOpcode
|
_sendBuffer.asUByteArray()[5] = subOpcode
|
||||||
|
_sendBuffer.asUByteArray()[6] = ContentEncoding.Raw.value
|
||||||
|
|
||||||
//Logger.i(TAG, "Encrypting message (opcode = ${opcode}, subOpcode = ${subOpcode}, size = ${HEADER_SIZE})")
|
//Logger.i(TAG, "Encrypting message (opcode = ${opcode}, subOpcode = ${subOpcode}, size = ${HEADER_SIZE})")
|
||||||
|
|
||||||
|
@ -446,7 +461,7 @@ class SyncSocketSession {
|
||||||
private fun handleData(data: ByteBuffer, sourceChannel: ChannelRelayed?) {
|
private fun handleData(data: ByteBuffer, sourceChannel: ChannelRelayed?) {
|
||||||
val length = data.remaining()
|
val length = data.remaining()
|
||||||
if (length < HEADER_SIZE)
|
if (length < HEADER_SIZE)
|
||||||
throw Exception("Packet must be at least 6 bytes (header size)")
|
throw Exception("Packet must be at least ${HEADER_SIZE} bytes (header size)")
|
||||||
|
|
||||||
val size = data.int
|
val size = data.int
|
||||||
if (size != length - 4)
|
if (size != length - 4)
|
||||||
|
@ -454,9 +469,10 @@ class SyncSocketSession {
|
||||||
|
|
||||||
val opcode = data.get().toUByte()
|
val opcode = data.get().toUByte()
|
||||||
val subOpcode = data.get().toUByte()
|
val subOpcode = data.get().toUByte()
|
||||||
|
val contentEncoding = data.get().toUByte()
|
||||||
|
|
||||||
Logger.v(TAG, "handleData (opcode: ${opcode}, subOpcode: ${subOpcode}, data.size: ${data.remaining()}, sourceChannel.connectionId: ${sourceChannel?.connectionId})")
|
Logger.v(TAG, "handleData (opcode: ${opcode}, subOpcode: ${subOpcode}, data.size: ${data.remaining()}, sourceChannel.connectionId: ${sourceChannel?.connectionId})")
|
||||||
handlePacket(opcode, subOpcode, data, sourceChannel)
|
handlePacket(opcode, subOpcode, data, contentEncoding, sourceChannel)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun handleRequest(subOpcode: UByte, data: ByteBuffer, sourceChannel: ChannelRelayed?) {
|
private fun handleRequest(subOpcode: UByte, data: ByteBuffer, sourceChannel: ChannelRelayed?) {
|
||||||
|
@ -804,9 +820,19 @@ class SyncSocketSession {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun handlePacket(opcode: UByte, subOpcode: UByte, data: ByteBuffer, sourceChannel: ChannelRelayed?) {
|
private fun handlePacket(opcode: UByte, subOpcode: UByte, d: ByteBuffer, contentEncoding: UByte, sourceChannel: ChannelRelayed?) {
|
||||||
Logger.i(TAG, "Handle packet (opcode = ${opcode}, subOpcode = ${subOpcode})")
|
Logger.i(TAG, "Handle packet (opcode = ${opcode}, subOpcode = ${subOpcode})")
|
||||||
|
|
||||||
|
var data = d
|
||||||
|
if (contentEncoding == ContentEncoding.Gzip.value) {
|
||||||
|
val compressedStream = ByteArrayOutputStream()
|
||||||
|
GZIPOutputStream(compressedStream).use { gzipStream ->
|
||||||
|
gzipStream.write(data.array(), data.position(), data.remaining())
|
||||||
|
gzipStream.finish()
|
||||||
|
}
|
||||||
|
data = ByteBuffer.wrap(compressedStream.toByteArray())
|
||||||
|
}
|
||||||
|
|
||||||
when (opcode) {
|
when (opcode) {
|
||||||
Opcode.PING.value -> {
|
Opcode.PING.value -> {
|
||||||
if (sourceChannel != null)
|
if (sourceChannel != null)
|
||||||
|
@ -844,8 +870,9 @@ class SyncSocketSession {
|
||||||
val expectedSize = data.int
|
val expectedSize = data.int
|
||||||
val op = data.get().toUByte()
|
val op = data.get().toUByte()
|
||||||
val subOp = data.get().toUByte()
|
val subOp = data.get().toUByte()
|
||||||
|
val ce = data.get().toUByte()
|
||||||
|
|
||||||
val syncStream = SyncStream(expectedSize, op, subOp)
|
val syncStream = SyncStream(expectedSize, op, subOp, ce)
|
||||||
if (data.remaining() > 0) {
|
if (data.remaining() > 0) {
|
||||||
syncStream.add(data.array(), data.position(), data.remaining())
|
syncStream.add(data.array(), data.position(), data.remaining())
|
||||||
}
|
}
|
||||||
|
@ -890,7 +917,7 @@ class SyncSocketSession {
|
||||||
throw Exception("After sync stream end, the stream must be complete")
|
throw Exception("After sync stream end, the stream must be complete")
|
||||||
}
|
}
|
||||||
|
|
||||||
handlePacket(syncStream.opcode, syncStream.subOpcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) }, sourceChannel)
|
handlePacket(syncStream.opcode, syncStream.subOpcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) }, contentEncoding, sourceChannel)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Opcode.DATA.value -> {
|
Opcode.DATA.value -> {
|
||||||
|
@ -1070,7 +1097,7 @@ class SyncSocketSession {
|
||||||
send(Opcode.NOTIFY.value, NotifyOpcode.CONNECTION_INFO.value, publishBytes)
|
send(Opcode.NOTIFY.value, NotifyOpcode.CONNECTION_INFO.value, publishBytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
suspend fun publishRecords(consumerPublicKeys: List<String>, key: String, data: ByteArray): Boolean {
|
suspend fun publishRecords(consumerPublicKeys: List<String>, key: String, data: ByteArray, contentEncoding: ContentEncoding? = null): Boolean {
|
||||||
val keyBytes = key.toByteArray(Charsets.UTF_8)
|
val keyBytes = key.toByteArray(Charsets.UTF_8)
|
||||||
if (key.isEmpty() || keyBytes.size > 32) throw IllegalArgumentException("Key must be 1-32 bytes")
|
if (key.isEmpty() || keyBytes.size > 32) throw IllegalArgumentException("Key must be 1-32 bytes")
|
||||||
if (consumerPublicKeys.isEmpty()) throw IllegalArgumentException("At least one consumer required")
|
if (consumerPublicKeys.isEmpty()) throw IllegalArgumentException("At least one consumer required")
|
||||||
|
@ -1125,7 +1152,7 @@ class SyncSocketSession {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
packet.rewind()
|
packet.rewind()
|
||||||
send(Opcode.REQUEST.value, RequestOpcode.BULK_PUBLISH_RECORD.value, packet)
|
send(Opcode.REQUEST.value, RequestOpcode.BULK_PUBLISH_RECORD.value, packet, contentEncoding = contentEncoding)
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
_pendingPublishRequests.remove(requestId)?.completeExceptionally(e)
|
_pendingPublishRequests.remove(requestId)?.completeExceptionally(e)
|
||||||
throw e
|
throw e
|
||||||
|
@ -1245,6 +1272,6 @@ class SyncSocketSession {
|
||||||
private const val TAG = "SyncSocketSession"
|
private const val TAG = "SyncSocketSession"
|
||||||
const val MAXIMUM_PACKET_SIZE = 65535 - 16
|
const val MAXIMUM_PACKET_SIZE = 65535 - 16
|
||||||
const val MAXIMUM_PACKET_SIZE_ENCRYPTED = MAXIMUM_PACKET_SIZE + 16
|
const val MAXIMUM_PACKET_SIZE_ENCRYPTED = MAXIMUM_PACKET_SIZE + 16
|
||||||
const val HEADER_SIZE = 6
|
const val HEADER_SIZE = 7
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,6 +1,6 @@
|
||||||
package com.futo.platformplayer.sync.internal
|
package com.futo.platformplayer.sync.internal
|
||||||
|
|
||||||
class SyncStream(expectedSize: Int, val opcode: UByte, val subOpcode: UByte) {
|
class SyncStream(expectedSize: Int, val opcode: UByte, val subOpcode: UByte, val contentEncoding: UByte) {
|
||||||
companion object {
|
companion object {
|
||||||
const val MAXIMUM_SIZE = 10_000_000
|
const val MAXIMUM_SIZE = 10_000_000
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue