Added subopcode byte.

This commit is contained in:
Koen J 2024-11-18 11:27:55 +01:00
parent df1661d75a
commit fde6148ece
10 changed files with 90 additions and 63 deletions

View file

@ -927,7 +927,7 @@ class VideoDetailView : ConstraintLayout {
val device = devices.first();
UIDialogs.showConfirmationDialog(context, "Would you like to open\n[${videoToSend.name}]\non ${device.remotePublicKey}" , {
fragment.lifecycleScope.launch(Dispatchers.IO) {
device.sendJson(GJSyncOpcodes.sendToDevices, SendToDevicePackage(videoToSend.url, (lastPositionMilliseconds/1000).toInt()));
device.sendJsonData(GJSyncOpcodes.sendToDevices, SendToDevicePackage(videoToSend.url, (lastPositionMilliseconds/1000).toInt()));
}
})
}

View file

@ -89,7 +89,7 @@ class StateHistory {
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
if(StateSync.instance.hasAtLeastOneOnlineDevice()) {
Logger.i(TAG, "SyncHistory playback broadcasted (${liveObj.name}: ${position})");
StateSync.instance.broadcastJson(
StateSync.instance.broadcastJsonData(
GJSyncOpcodes.syncHistory,
listOf(historyVideo)
);

View file

@ -198,7 +198,7 @@ class StatePlaylists {
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
if(StateSync.instance.hasAtLeastOneOnlineDevice()) {
Logger.i(StateSubscriptionGroups.TAG, "SyncPlaylist (${playlist.name})");
StateSync.instance.broadcastJson(
StateSync.instance.broadcastJsonData(
GJSyncOpcodes.syncPlaylists,
SyncPlaylistsPackage(listOf(playlist), mapOf())
);
@ -217,7 +217,7 @@ class StatePlaylists {
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
if(StateSync.instance.hasAtLeastOneOnlineDevice()) {
Logger.i(StateSubscriptionGroups.TAG, "SyncPlaylist (${playlist.name})");
StateSync.instance.broadcastJson(
StateSync.instance.broadcastJsonData(
GJSyncOpcodes.syncPlaylists,
SyncPlaylistsPackage(listOf(), mapOf(Pair(playlist.id, OffsetDateTime.now().toEpochSecond())))
);

View file

@ -81,7 +81,7 @@ class StateSubscriptionGroups {
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
if(StateSync.instance.hasAtLeastOneOnlineDevice()) {
Logger.i(TAG, "SyncSubscriptionGroup (${subGroup.name})");
StateSync.instance.broadcastJson(
StateSync.instance.broadcastJsonData(
GJSyncOpcodes.syncSubscriptionGroups,
SyncSubscriptionGroupsPackage(listOf(subGroup), mapOf())
);
@ -100,7 +100,7 @@ class StateSubscriptionGroups {
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
if(StateSync.instance.hasAtLeastOneOnlineDevice()) {
Logger.i(TAG, "SyncSubscriptionGroup delete (${group.name})");
StateSync.instance.broadcastJson(
StateSync.instance.broadcastJsonData(
GJSyncOpcodes.syncSubscriptionGroups,
SyncSubscriptionGroupsPackage(listOf(), mapOf(Pair(id, OffsetDateTime.now().toEpochSecond())))
);

View file

@ -250,7 +250,7 @@ class StateSubscriptions {
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
try {
StateSync.instance.broadcast(
StateSync.instance.broadcastData(
GJSyncOpcodes.syncSubscriptions, Json.encodeToString(
SyncSubscriptionsPackage(
listOf(subObj),
@ -299,7 +299,7 @@ class StateSubscriptions {
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
try {
StateSync.instance.broadcast(
StateSync.instance.broadcastData(
GJSyncOpcodes.syncSubscriptions, Json.encodeToString(
SyncSubscriptionsPackage(
listOf(),

View file

@ -370,26 +370,29 @@ class StateSync {
Logger.i(TAG, "Connection authorized for ${remotePublicKey} because initiator")
}
},
onData = { s, opcode, data ->
session?.handlePacket(s, opcode, data)
onData = { s, opcode, subOpcode, data ->
session?.handlePacket(s, opcode, subOpcode, data)
})
}
inline fun <reified T> broadcastJson(opcode: UByte, data: T) {
broadcast(opcode, Json.encodeToString(data));
inline fun <reified T> broadcastJsonData(subOpcode: UByte, data: T) {
broadcast(SyncSocketSession.Opcode.DATA.value, subOpcode, Json.encodeToString(data));
}
fun broadcast(opcode: UByte, data: String) {
broadcast(opcode, data.toByteArray(Charsets.UTF_8));
fun broadcastData(subOpcode: UByte, data: String) {
broadcast(SyncSocketSession.Opcode.DATA.value, subOpcode, data.toByteArray(Charsets.UTF_8));
}
fun broadcast(opcode: UByte, data: ByteArray) {
fun broadcast(opcode: UByte, subOpcode: UByte, data: String) {
broadcast(opcode, subOpcode, data.toByteArray(Charsets.UTF_8));
}
fun broadcast(opcode: UByte, subOpcode: UByte, data: ByteArray) {
for(session in getSessions()) {
try {
if (session.isAuthorized && session.connected) {
session.send(opcode, data);
session.send(opcode, subOpcode, data);
}
}
catch(ex: Exception) {
Logger.w(TAG, "Failed to broadcast ${opcode} to ${session.remotePublicKey}: ${ex.message}}", ex);
Logger.w(TAG, "Failed to broadcast (opcode = ${opcode}, subOpcode = ${subOpcode}) to ${session.remotePublicKey}: ${ex.message}}", ex);
}
}
}
@ -398,7 +401,7 @@ class StateSync {
val time = measureTimeMillis {
//val export = StateBackup.export();
//session.send(GJSyncOpcodes.syncExport, export.asZip());
session.send(GJSyncOpcodes.syncStateExchange, getSyncSessionDataString(session.remotePublicKey));
session.sendData(GJSyncOpcodes.syncStateExchange, getSyncSessionDataString(session.remotePublicKey));
}
Logger.i(TAG, "Generated and sent sync export in ${time}ms");
}

View file

@ -45,6 +45,7 @@ class SyncSession : IAuthorizable {
private val _onConnectedChanged: (session: SyncSession, connected: Boolean) -> Unit
val remotePublicKey: String
override val isAuthorized get() = _authorized && _remoteAuthorized
private var _wasAuthorized = false
var connected: Boolean = false
private set(v) {
@ -94,8 +95,10 @@ class SyncSession : IAuthorizable {
}
private fun checkAuthorized() {
if (isAuthorized)
if (!_wasAuthorized && isAuthorized) {
_wasAuthorized = true
_onAuthorized.invoke(this)
}
}
fun removeSocketSession(socketSession: SyncSocketSession) {
@ -117,8 +120,8 @@ class SyncSession : IAuthorizable {
_onClose.invoke(this)
}
fun handlePacket(socketSession: SyncSocketSession, opcode: UByte, data: ByteBuffer) {
Logger.i(TAG, "Handle packet (opcode: ${opcode}, data.length: ${data.remaining()})")
fun handlePacket(socketSession: SyncSocketSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) {
Logger.i(TAG, "Handle packet (opcode: ${opcode}, subOpcode: ${subOpcode}, data.length: ${data.remaining()})")
when (opcode) {
Opcode.NOTIFY_AUTHORIZED.value -> {
@ -136,10 +139,15 @@ class SyncSession : IAuthorizable {
return
}
Logger.i(TAG, "Received ${opcode} (${data.remaining()} bytes)")
if (opcode != Opcode.DATA.value) {
Logger.w(TAG, "Unknown opcode received: (opcode = ${opcode}, subOpcode = ${subOpcode})}")
return
}
Logger.i(TAG, "Received (opcode = ${opcode}, subOpcode = ${subOpcode}) (${data.remaining()} bytes)")
//TODO: Abstract this out
try {
when (opcode) {
when (subOpcode) {
GJSyncOpcodes.sendToDevices -> {
StateApp.instance.scopeOrNull?.launch(Dispatchers.Main) {
val context = StateApp.instance.contextOrNull;
@ -164,13 +172,13 @@ class SyncSession : IAuthorizable {
Logger.i(TAG, "Received SyncSessionData from " + remotePublicKey);
send(GJSyncOpcodes.syncSubscriptions, StateSubscriptions.instance.getSyncSubscriptionsPackageString());
send(GJSyncOpcodes.syncSubscriptionGroups, StateSubscriptionGroups.instance.getSyncSubscriptionGroupsPackageString());
send(GJSyncOpcodes.syncPlaylists, StatePlaylists.instance.getSyncPlaylistsPackageString())
sendData(GJSyncOpcodes.syncSubscriptions, StateSubscriptions.instance.getSyncSubscriptionsPackageString());
sendData(GJSyncOpcodes.syncSubscriptionGroups, StateSubscriptionGroups.instance.getSyncSubscriptionGroupsPackageString());
sendData(GJSyncOpcodes.syncPlaylists, StatePlaylists.instance.getSyncPlaylistsPackageString())
val recentHistory = StateHistory.instance.getRecentHistory(syncSessionData.lastHistory);
if(recentHistory.size > 0)
sendJson(GJSyncOpcodes.syncHistory, recentHistory);
sendJsonData(GJSyncOpcodes.syncHistory, recentHistory);
}
GJSyncOpcodes.syncExport -> {
@ -338,16 +346,19 @@ class SyncSession : IAuthorizable {
}
inline fun <reified T> sendJson(opcode: UByte, data: T) {
send(opcode, Json.encodeToString<T>(data));
inline fun <reified T> sendJsonData(subOpcode: UByte, data: T) {
send(Opcode.DATA.value, subOpcode, Json.encodeToString<T>(data));
}
fun send(opcode: UByte, data: String) {
send(opcode, data.toByteArray(Charsets.UTF_8));
fun sendData(subOpcode: UByte, data: String) {
send(Opcode.DATA.value, subOpcode, data.toByteArray(Charsets.UTF_8));
}
fun send(opcode: UByte, data: ByteArray) {
fun send(opcode: UByte, subOpcode: UByte, data: String) {
send(opcode, subOpcode, data.toByteArray(Charsets.UTF_8));
}
fun send(opcode: UByte, subOpcode: UByte, data: ByteArray) {
val sock = _socketSessions.firstOrNull();
if(sock != null){
sock.send(opcode, ByteBuffer.wrap(data));
sock.send(opcode, subOpcode, ByteBuffer.wrap(data));
}
else
throw IllegalStateException("Session has no active sockets");

View file

@ -2,6 +2,7 @@ package com.futo.platformplayer.sync.internal
import com.futo.platformplayer.LittleEndianDataInputStream
import com.futo.platformplayer.LittleEndianDataOutputStream
import com.futo.platformplayer.ensureNotMainThread
import com.futo.platformplayer.logging.Logger
import com.futo.platformplayer.noise.protocol.CipherStatePair
import com.futo.platformplayer.noise.protocol.DHState
@ -18,7 +19,8 @@ class SyncSocketSession {
NOTIFY_UNAUTHORIZED(3u),
STREAM_START(4u),
STREAM_DATA(5u),
STREAM_END(6u)
STREAM_END(6u),
DATA(7u)
}
private val _inputStream: LittleEndianDataInputStream
@ -41,12 +43,12 @@ class SyncSocketSession {
private val _localKeyPair: DHState
private var _localPublicKey: String
val localPublicKey: String get() = _localPublicKey
private val _onData: (session: SyncSocketSession, opcode: UByte, data: ByteBuffer) -> Unit
private val _onData: (session: SyncSocketSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) -> Unit
var authorizable: IAuthorizable? = null
val remoteAddress: String
constructor(remoteAddress: String, localKeyPair: DHState, inputStream: LittleEndianDataInputStream, outputStream: LittleEndianDataOutputStream, onClose: (session: SyncSocketSession) -> Unit, onHandshakeComplete: (session: SyncSocketSession) -> Unit, onData: (session: SyncSocketSession, opcode: UByte, data: ByteBuffer) -> Unit) {
constructor(remoteAddress: String, localKeyPair: DHState, inputStream: LittleEndianDataInputStream, outputStream: LittleEndianDataOutputStream, onClose: (session: SyncSocketSession) -> Unit, onHandshakeComplete: (session: SyncSocketSession) -> Unit, onData: (session: SyncSocketSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) -> Unit) {
_inputStream = inputStream
_outputStream = outputStream
_onClose = onClose
@ -159,10 +161,11 @@ class SyncSocketSession {
}
private fun performVersionCheck() {
_outputStream.writeInt(1)
val CURRENT_VERSION = 2
_outputStream.writeInt(CURRENT_VERSION)
val version = _inputStream.readInt()
Logger.i(TAG, "performVersionCheck (version = $version)")
if (version != 1)
if (version != CURRENT_VERSION)
throw Exception("Invalid version")
}
@ -205,8 +208,9 @@ class SyncSocketSession {
throw Exception("Handshake finished without completing")
}
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer) {
ensureNotMainThread()
fun send(opcode: UByte, data: ByteBuffer) {
if (data.remaining() + HEADER_SIZE > MAXIMUM_PACKET_SIZE) {
val segmentSize = MAXIMUM_PACKET_SIZE - HEADER_SIZE
val segmentData = ByteArray(segmentSize)
@ -223,8 +227,8 @@ class SyncSocketSession {
if (sendOffset == 0) {
segmentOpcode = Opcode.STREAM_START.value
bytesToSend = segmentSize - 4 - 4 - 1
segmentPacketSize = bytesToSend + 4 + 4 + 1
bytesToSend = segmentSize - 4 - 4 - 1 - 1
segmentPacketSize = bytesToSend + 4 + 4 + 1 + 1
} else {
bytesToSend = minOf(segmentSize - 4 - 4, bytesRemaining)
segmentOpcode = if (bytesToSend >= bytesRemaining) Opcode.STREAM_END.value else Opcode.STREAM_DATA.value
@ -236,18 +240,20 @@ class SyncSocketSession {
putInt(if (segmentOpcode == Opcode.STREAM_START.value) data.remaining() else sendOffset)
if (segmentOpcode == Opcode.STREAM_START.value) {
put(opcode.toByte())
put(subOpcode.toByte())
}
put(data.array(), data.position() + sendOffset, bytesToSend)
}
send(segmentOpcode, ByteBuffer.wrap(segmentData, 0, segmentPacketSize))
send(segmentOpcode, 0u, ByteBuffer.wrap(segmentData, 0, segmentPacketSize))
sendOffset += bytesToSend
}
} else {
synchronized(_sendLockObject) {
ByteBuffer.wrap(_sendBuffer).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(data.remaining() + 1)
putInt(data.remaining() + 2)
put(opcode.toByte())
put(subOpcode.toByte())
put(data.array(), data.position(), data.remaining())
}
@ -260,10 +266,13 @@ class SyncSocketSession {
}
}
fun send(opcode: UByte) {
fun send(opcode: UByte, subOpcode: UByte = 0u) {
ensureNotMainThread()
synchronized(_sendLockObject) {
ByteBuffer.wrap(_sendBuffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(1)
ByteBuffer.wrap(_sendBuffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(2)
_sendBuffer.asUByteArray()[4] = opcode
_sendBuffer.asUByteArray()[5] = subOpcode
//Logger.i(TAG, "Encrypting message (size = ${HEADER_SIZE})")
@ -277,19 +286,19 @@ class SyncSocketSession {
private fun handleData(data: ByteArray, length: Int) {
if (length < HEADER_SIZE)
throw Exception("Packet must be at least 5 bytes (header size)")
throw Exception("Packet must be at least 6 bytes (header size)")
val size = ByteBuffer.wrap(data, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int
if (size != length - 4)
throw Exception("Incomplete packet received")
val opcode = data.asUByteArray()[4]
val packetData = ByteBuffer.wrap(data, HEADER_SIZE, size - 1)
handlePacket(opcode, packetData.order(ByteOrder.LITTLE_ENDIAN))
val subOpcode = data.asUByteArray()[5]
val packetData = ByteBuffer.wrap(data, HEADER_SIZE, size - 2)
handlePacket(opcode, subOpcode, packetData.order(ByteOrder.LITTLE_ENDIAN))
}
private fun handlePacket(opcode: UByte, data: ByteBuffer) {
private fun handlePacket(opcode: UByte, subOpcode: UByte, data: ByteBuffer) {
when (opcode) {
Opcode.PING.value -> {
send(Opcode.PONG.value)
@ -302,7 +311,7 @@ class SyncSocketSession {
}
Opcode.NOTIFY_AUTHORIZED.value,
Opcode.NOTIFY_UNAUTHORIZED.value -> {
_onData.invoke(this, opcode, data)
_onData.invoke(this, opcode, subOpcode, data)
return
}
}
@ -316,8 +325,9 @@ class SyncSocketSession {
val id = data.int
val expectedSize = data.int
val op = data.get().toUByte()
val subOp = data.get().toUByte()
val syncStream = SyncStream(expectedSize, op)
val syncStream = SyncStream(expectedSize, op, subOp)
if (data.remaining() > 0) {
syncStream.add(data.array(), data.position(), data.remaining())
}
@ -362,10 +372,13 @@ class SyncSocketSession {
throw Exception("After sync stream end, the stream must be complete")
}
handlePacket(syncStream.opcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) })
handlePacket(syncStream.opcode, syncStream.subOpcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) })
}
Opcode.DATA.value -> {
_onData.invoke(this, opcode, subOpcode, data)
}
else -> {
_onData.invoke(this, opcode, data)
Logger.w(TAG, "Unknown opcode received (opcode = ${opcode}, subOpcode = ${subOpcode})")
}
}
}
@ -374,6 +387,6 @@ class SyncSocketSession {
private const val TAG = "SyncSocketSession"
const val MAXIMUM_PACKET_SIZE = 65535 - 16
const val MAXIMUM_PACKET_SIZE_ENCRYPTED = MAXIMUM_PACKET_SIZE + 16
const val HEADER_SIZE = 5
const val HEADER_SIZE = 6
}
}

View file

@ -1,6 +1,6 @@
package com.futo.platformplayer.sync.internal
class SyncStream(expectedSize: Int, val opcode: UByte) {
class SyncStream(expectedSize: Int, val opcode: UByte, val subOpcode: UByte) {
companion object {
const val MAXIMUM_SIZE = 10_000_000
}

View file

@ -524,8 +524,8 @@ class NoiseProtocolTest {
println("Initiator handshake complete")
handshakeLatch.countDown() // Handshake complete for initiator
},
onData = { session, opcode, data ->
println("Initiator received: Opcode $opcode, Data Length: ${data.remaining()}")
onData = { session, opcode, subOpcode, data ->
println("Initiator received: Opcode: $opcode, SubOpcode: $subOpcode, Data Length: ${data.remaining()}")
when (data.remaining()) {
randomBytesExactlyOnePacket.remaining() -> {
@ -556,8 +556,8 @@ class NoiseProtocolTest {
println("Responder handshake complete")
handshakeLatch.countDown() // Handshake complete for responder
},
onData = { session, opcode, data ->
println("Responder received: Opcode $opcode, Data Length: ${data.remaining()}")
onData = { session, opcode, subOpcode, data ->
println("Responder received: Opcode $opcode, SubOpcode $subOpcode, Data Length: ${data.remaining()}")
when (data.remaining()) {
randomBytesExactlyOnePacket.remaining() -> {
@ -590,12 +590,12 @@ class NoiseProtocolTest {
responderSession.send(SyncSocketSession.Opcode.PONG.value)
// Test data transfer
responderSession.send(SyncSocketSession.Opcode.NOTIFY_AUTHORIZED.value, randomBytesExactlyOnePacket)
initiatorSession.send(SyncSocketSession.Opcode.NOTIFY_AUTHORIZED.value, randomBytes)
responderSession.send(SyncSocketSession.Opcode.DATA.value, 0u, randomBytesExactlyOnePacket)
initiatorSession.send(SyncSocketSession.Opcode.DATA.value, 1u, randomBytes)
// Send large data to test stream handling
val start = System.currentTimeMillis()
responderSession.send(SyncSocketSession.Opcode.NOTIFY_AUTHORIZED.value, randomBytesBig)
responderSession.send(SyncSocketSession.Opcode.DATA.value, 0u, randomBytesBig)
println("Sent 10MB in ${System.currentTimeMillis() - start}ms")
// Wait for a brief period to simulate delay and allow communication