From f8edd6cf3d93c2c13ab5c117c5f47b9db2a10da8 Mon Sep 17 00:00:00 2001 From: Koen J Date: Sat, 3 May 2025 11:21:57 +0200 Subject: [PATCH] Possibel performance improvements to sync under high lat conditions. --- .../java/com/futo/platformplayer/Settings.kt | 3 + .../futo/platformplayer/states/StateSync.kt | 53 ++++++-- .../platformplayer/sync/internal/Channel.kt | 6 + .../sync/internal/SyncSocketSession.kt | 125 ++++++++++++------ app/src/main/res/values/strings.xml | 2 + 5 files changed, 136 insertions(+), 53 deletions(-) diff --git a/app/src/main/java/com/futo/platformplayer/Settings.kt b/app/src/main/java/com/futo/platformplayer/Settings.kt index b9b81ec6..eb593c5b 100644 --- a/app/src/main/java/com/futo/platformplayer/Settings.kt +++ b/app/src/main/java/com/futo/platformplayer/Settings.kt @@ -945,6 +945,9 @@ class Settings : FragmentedStorageFileJson() { @FormField(R.string.connect_through_relay, FieldForm.TOGGLE, R.string.connect_through_relay_description, 3) var connectThroughRelay: Boolean = true; + + @FormField(R.string.connect_local_direct_through_relay, FieldForm.TOGGLE, R.string.connect_local_direct_through_relay_description, 3) + var connectLocalDirectThroughRelay: Boolean = true; } @FormField(R.string.info, FieldForm.GROUP, -1, 21) diff --git a/app/src/main/java/com/futo/platformplayer/states/StateSync.kt b/app/src/main/java/com/futo/platformplayer/states/StateSync.kt index 6ab62f11..14e3f510 100644 --- a/app/src/main/java/com/futo/platformplayer/states/StateSync.kt +++ b/app/src/main/java/com/futo/platformplayer/states/StateSync.kt @@ -361,8 +361,7 @@ class StateSync { _relaySession = SyncSocketSession( (socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!, keyPair!!, - LittleEndianDataInputStream(socket.getInputStream()), - LittleEndianDataOutputStream(socket.getOutputStream()), + socket, isHandshakeAllowed = { linkType, syncSocketSession, publicKey, pairingCode, appId -> isHandshakeAllowed(linkType, syncSocketSession, publicKey, pairingCode, appId) }, onNewChannel = { _, c -> val remotePublicKey = c.remotePublicKey @@ -407,12 +406,14 @@ class StateSync { relaySession.publishConnectionInformation(unconnectedAuthorizedDevices, PORT, Settings.instance.synchronization.discoverThroughRelay, false, false, Settings.instance.synchronization.discoverThroughRelay && Settings.instance.synchronization.connectThroughRelay) + Logger.v(TAG, "Requesting ${unconnectedAuthorizedDevices.size} devices connection information") val connectionInfos = runBlocking { relaySession.requestBulkConnectionInfo(unconnectedAuthorizedDevices) } + Logger.v(TAG, "Received ${connectionInfos.size} devices connection information") for ((targetKey, connectionInfo) in connectionInfos) { val potentialLocalAddresses = connectionInfo.ipv4Addresses.union(connectionInfo.ipv6Addresses) .filter { it != connectionInfo.remoteIp } - if (connectionInfo.allowLocalDirect) { + if (connectionInfo.allowLocalDirect && Settings.instance.synchronization.connectLocalDirectThroughRelay) { Thread { try { Log.v(TAG, "Attempting to connect directly, locally to '$targetKey'.") @@ -433,10 +434,10 @@ class StateSync { if (connectionInfo.allowRemoteRelayed && Settings.instance.synchronization.connectThroughRelay) { try { - Log.v(TAG, "Attempting relayed connection with '$targetKey'.") + Logger.v(TAG, "Attempting relayed connection with '$targetKey'.") runBlocking { relaySession.startRelayedChannel(targetKey, APP_ID, null) } } catch (e: Throwable) { - Log.e(TAG, "Failed to start relayed channel with $targetKey.", e) + Logger.e(TAG, "Failed to start relayed channel with $targetKey.", e) } } } @@ -444,7 +445,7 @@ class StateSync { Thread.sleep(15000) } } catch (e: Throwable) { - Log.e(TAG, "Unhandled exception in relay session.", e) + Logger.e(TAG, "Unhandled exception in relay session.", e) relaySession.stop() } }.start() @@ -585,16 +586,33 @@ class StateSync { Logger.i(TAG, "Received SyncSessionData from $remotePublicKey"); + val subscriptionPackageString = StateSubscriptions.instance.getSyncSubscriptionsPackageString() + Logger.i(TAG, "syncStateExchange syncSubscriptions b (size: ${subscriptionPackageString.length})") + session.sendData(GJSyncOpcodes.syncSubscriptions, subscriptionPackageString); + Logger.i(TAG, "syncStateExchange syncSubscriptions (size: ${subscriptionPackageString.length})") - session.sendData(GJSyncOpcodes.syncSubscriptions, StateSubscriptions.instance.getSyncSubscriptionsPackageString()); - session.sendData(GJSyncOpcodes.syncSubscriptionGroups, StateSubscriptionGroups.instance.getSyncSubscriptionGroupsPackageString()); - session.sendData(GJSyncOpcodes.syncPlaylists, StatePlaylists.instance.getSyncPlaylistsPackageString()) + val subscriptionGroupPackageString = StateSubscriptionGroups.instance.getSyncSubscriptionGroupsPackageString() + Logger.i(TAG, "syncStateExchange syncSubscriptionGroups b (size: ${subscriptionGroupPackageString.length})") + session.sendData(GJSyncOpcodes.syncSubscriptionGroups, subscriptionGroupPackageString); + Logger.i(TAG, "syncStateExchange syncSubscriptionGroups (size: ${subscriptionGroupPackageString.length})") - session.sendData(GJSyncOpcodes.syncWatchLater, Json.encodeToString(StatePlaylists.instance.getWatchLaterSyncPacket(false))); + val syncPlaylistPackageString = StatePlaylists.instance.getSyncPlaylistsPackageString() + Logger.i(TAG, "syncStateExchange syncPlaylists b (size: ${syncPlaylistPackageString.length})") + session.sendData(GJSyncOpcodes.syncPlaylists, syncPlaylistPackageString) + Logger.i(TAG, "syncStateExchange syncPlaylists (size: ${syncPlaylistPackageString.length})") + + val watchLaterPackageString = Json.encodeToString(StatePlaylists.instance.getWatchLaterSyncPacket(false)) + Logger.i(TAG, "syncStateExchange syncWatchLater b (size: ${watchLaterPackageString.length})") + session.sendData(GJSyncOpcodes.syncWatchLater, watchLaterPackageString); + Logger.i(TAG, "syncStateExchange syncWatchLater (size: ${watchLaterPackageString.length})") val recentHistory = StateHistory.instance.getRecentHistory(syncSessionData.lastHistory); + + Logger.i(TAG, "syncStateExchange syncHistory b (size: ${recentHistory.size})") if(recentHistory.isNotEmpty()) session.sendJsonData(GJSyncOpcodes.syncHistory, recentHistory); + + Logger.i(TAG, "syncStateExchange syncHistory (size: ${recentHistory.size})") } GJSyncOpcodes.syncExport -> { @@ -825,7 +843,17 @@ class StateSync { } }, dataHandler = { it, opcode, subOpcode, data -> - handleData(it, opcode, subOpcode, data) + val dataCopy = ByteArray(data.remaining()) + data.get(dataCopy) + + StateApp.instance.scopeOrNull?.launch { + try { + handleData(it, opcode, subOpcode, ByteBuffer.wrap(dataCopy)) + } catch (e: Throwable) { + Logger.e(TAG, "Exception occurred while handling data, closing session", e) + it.close() + } + } }, remoteDeviceName ) @@ -860,8 +888,7 @@ class StateSync { return SyncSocketSession( (socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!, keyPair!!, - LittleEndianDataInputStream(socket.getInputStream()), - LittleEndianDataOutputStream(socket.getOutputStream()), + socket, onClose = { s -> if (channelSocket != null) session?.removeChannel(channelSocket!!) diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/Channel.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/Channel.kt index 84c1445c..aa9036f1 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/internal/Channel.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/Channel.kt @@ -192,6 +192,8 @@ class ChannelRelayed( val HEADER_SIZE = 6 val MAX_DATA_PER_PACKET = SyncSocketSession.MAXIMUM_PACKET_SIZE - HEADER_SIZE - CONNECTION_ID_SIZE - ENCRYPTION_OVERHEAD - 16 + Logger.v(TAG, "Send (opcode: ${opcode}, subOpcode: ${subOpcode}, data.size: ${data?.remaining()})") + if (actualCount > MAX_DATA_PER_PACKET && data != null) { val streamId = session.generateStreamId() val totalSize = actualCount @@ -333,4 +335,8 @@ class ChannelRelayed( completeHandshake(remoteVersion, transport) } } + + companion object { + private val TAG = "Channel" + } } \ No newline at end of file diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt index e66da337..4f74c5ff 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt @@ -10,25 +10,31 @@ import com.futo.platformplayer.noise.protocol.DHState import com.futo.platformplayer.noise.protocol.HandshakeState import com.futo.platformplayer.states.StateSync import kotlinx.coroutines.CompletableDeferred +import java.io.InputStream +import java.io.OutputStream import java.net.Inet4Address import java.net.Inet6Address import java.net.InetAddress import java.net.NetworkInterface +import java.net.Socket import java.nio.ByteBuffer import java.nio.ByteOrder import java.util.Base64 import java.util.Locale import java.util.concurrent.ConcurrentHashMap import kotlin.math.min +import kotlin.system.measureTimeMillis +import kotlin.time.measureTime class SyncSocketSession { - private val _inputStream: LittleEndianDataInputStream - private val _outputStream: LittleEndianDataOutputStream + private val _socket: Socket + private val _inputStream: InputStream + private val _outputStream: OutputStream private val _sendLockObject = Object() private val _buffer = ByteArray(MAXIMUM_PACKET_SIZE_ENCRYPTED) private val _bufferDecrypted = ByteArray(MAXIMUM_PACKET_SIZE) private val _sendBuffer = ByteArray(MAXIMUM_PACKET_SIZE) - private val _sendBufferEncrypted = ByteArray(MAXIMUM_PACKET_SIZE_ENCRYPTED) + private val _sendBufferEncrypted = ByteArray(4 + MAXIMUM_PACKET_SIZE_ENCRYPTED) private val _syncStreams = hashMapOf() private var _streamIdGenerator = 0 private val _streamIdGeneratorLock = Object() @@ -81,8 +87,7 @@ class SyncSocketSession { constructor( remoteAddress: String, localKeyPair: DHState, - inputStream: LittleEndianDataInputStream, - outputStream: LittleEndianDataOutputStream, + socket: Socket, onClose: ((session: SyncSocketSession) -> Unit)? = null, onHandshakeComplete: ((session: SyncSocketSession) -> Unit)? = null, onData: ((session: SyncSocketSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) -> Unit)? = null, @@ -90,8 +95,12 @@ class SyncSocketSession { onChannelEstablished: ((session: SyncSocketSession, channel: ChannelRelayed, isResponder: Boolean) -> Unit)? = null, isHandshakeAllowed: ((linkType: LinkType, session: SyncSocketSession, remotePublicKey: String, pairingCode: String?, appId: UInt) -> Boolean)? = null ) { - _inputStream = inputStream - _outputStream = outputStream + _socket = socket + _socket.receiveBufferSize = MAXIMUM_PACKET_SIZE_ENCRYPTED + _socket.sendBufferSize = MAXIMUM_PACKET_SIZE_ENCRYPTED + _socket.tcpNoDelay = true + _inputStream = _socket.getInputStream() + _outputStream = _socket.getOutputStream() _onClose = onClose _onHandshakeComplete = onHandshakeComplete _localKeyPair = localKeyPair @@ -150,30 +159,45 @@ class SyncSocketSession { }.apply { start() } } + private fun readExact(buffer: ByteArray, offset: Int, size: Int) { + var totalBytesReceived: Int = 0 + while (totalBytesReceived < size) { + val bytesReceived = _inputStream.read(buffer, offset + totalBytesReceived, size - totalBytesReceived) + if (bytesReceived == 0) + throw Exception("Socket disconnected") + totalBytesReceived += bytesReceived + } + } + private fun receiveLoop() { while (_started) { try { - val messageSize = _inputStream.readInt() + //Logger.v(TAG, "Waiting for message size...") + + readExact(_buffer, 0, 4) + val messageSize = ByteBuffer.wrap(_buffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int + + //Logger.v(TAG, "Read message size ${messageSize}.") + if (messageSize > MAXIMUM_PACKET_SIZE_ENCRYPTED) { throw Exception("Message size (${messageSize}) cannot exceed MAXIMUM_PACKET_SIZE ($MAXIMUM_PACKET_SIZE_ENCRYPTED)") } //Logger.i(TAG, "Receiving message (size = ${messageSize})") - var bytesRead = 0 - while (bytesRead < messageSize) { - val read = _inputStream.read(_buffer, bytesRead, messageSize - bytesRead) - if (read == -1) - throw Exception("Stream closed") - bytesRead += read - } + readExact(_buffer, 0, messageSize) + //Logger.v(TAG, "Read ${messageSize}.") + //Logger.v(TAG, "Decrypting ${messageSize} bytes.") val plen: Int = _cipherStatePair!!.receiver.decryptWithAd(null, _buffer, 0, _bufferDecrypted, 0, messageSize) //Logger.i(TAG, "Decrypted message (size = ${plen})") + //Logger.v(TAG, "Decrypted ${messageSize} bytes.") handleData(_bufferDecrypted, plen, null) + //Logger.v(TAG, "Handled data ${messageSize} bytes.") } catch (e: Throwable) { - Logger.e(TAG, "Exception while receiving data", e) + Logger.e(TAG, "Exception while receiving data, closing socket session", e) + stop() break } } @@ -203,8 +227,7 @@ class SyncSocketSession { _channels.values.forEach { it.close() } _channels.clear() _onClose?.invoke(this) - _inputStream.close() - _outputStream.close() + _socket.close() _thread = null _cipherStatePair?.sender?.destroy() _cipherStatePair?.receiver?.destroy() @@ -237,18 +260,25 @@ class SyncSocketSession { val mainBuffer = ByteArray(512) val mainLength = initiator.writeMessage(mainBuffer, 0, null, 0, 0) - val messageData = ByteBuffer.allocate(4 + 4 + pairingMessageLength + mainLength).order(ByteOrder.LITTLE_ENDIAN) + val messageSize = 4 + 4 + pairingMessageLength + mainLength + val messageData = ByteBuffer.allocate(4 + messageSize).order(ByteOrder.LITTLE_ENDIAN) + messageData.putInt(messageSize) messageData.putInt(appId.toInt()) messageData.putInt(pairingMessageLength) if (pairingMessageLength > 0) messageData.put(pairingMessage) messageData.put(mainBuffer, 0, mainLength) val messageDataArray = messageData.array() - _outputStream.writeInt(messageDataArray.size) - _outputStream.write(messageDataArray) + _outputStream.write(messageDataArray, 0, 4 + messageSize) + + readExact(_buffer, 0, 4) + val responseSize = ByteBuffer.wrap(_buffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int + if (responseSize > MAXIMUM_PACKET_SIZE_ENCRYPTED) { + throw Exception("Message size (${messageSize}) cannot exceed MAXIMUM_PACKET_SIZE ($MAXIMUM_PACKET_SIZE_ENCRYPTED)") + } - val responseSize = _inputStream.readInt() val responseMessage = ByteArray(responseSize) - _inputStream.readFully(responseMessage) + readExact(responseMessage, 0, responseSize) + val plaintext = ByteArray(512) // Buffer for any payload (none expected here) initiator.readMessage(responseMessage, 0, responseSize, plaintext, 0) @@ -265,11 +295,16 @@ class SyncSocketSession { responder.localKeyPair.copyFrom(_localKeyPair) responder.start() - val messageSize = _inputStream.readInt() - val message = ByteArray(messageSize) - _inputStream.readFully(message) - val messageBuffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN) + readExact(_buffer, 0, 4) + val messageSize = ByteBuffer.wrap(_buffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int + if (messageSize > MAXIMUM_PACKET_SIZE_ENCRYPTED) { + throw Exception("Message size (${messageSize}) cannot exceed MAXIMUM_PACKET_SIZE ($MAXIMUM_PACKET_SIZE_ENCRYPTED)") + } + val message = ByteArray(messageSize) + readExact(message, 0, messageSize) + + val messageBuffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN) val appId = messageBuffer.int.toUInt() val pairingMessageLength = messageBuffer.int val pairingMessage = if (pairingMessageLength > 0) ByteArray(pairingMessageLength).also { messageBuffer.get(it) } else byteArrayOf() @@ -298,10 +333,10 @@ class SyncSocketSession { return false } - val responseBuffer = ByteArray(512) - val responseLength = responder.writeMessage(responseBuffer, 0, null, 0, 0) - _outputStream.writeInt(responseLength) - _outputStream.write(responseBuffer, 0, responseLength) + val responseBuffer = ByteArray(4 + 512) + val responseLength = responder.writeMessage(responseBuffer, 4, null, 0, 0) + ByteBuffer.wrap(responseBuffer).order(ByteOrder.LITTLE_ENDIAN).putInt(responseLength) + _outputStream.write(responseBuffer, 0, 4 + responseLength) _cipherStatePair = responder.split() _remotePublicKey = remotePublicKey @@ -311,8 +346,13 @@ class SyncSocketSession { private fun performVersionCheck() { val CURRENT_VERSION = 4 val MINIMUM_VERSION = 4 - _outputStream.writeInt(CURRENT_VERSION) - remoteVersion = _inputStream.readInt() + + val versionBytes = ByteArray(4) + ByteBuffer.wrap(versionBytes).order(ByteOrder.LITTLE_ENDIAN).putInt(CURRENT_VERSION) + _outputStream.write(versionBytes, 0, 4) + + readExact(versionBytes, 0, 4) + remoteVersion = ByteBuffer.wrap(versionBytes, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int Logger.i(TAG, "performVersionCheck (version = $remoteVersion)") if (remoteVersion < MINIMUM_VERSION) throw Exception("Invalid version") @@ -324,6 +364,8 @@ class SyncSocketSession { fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer) { ensureNotMainThread() + Logger.v(TAG, "send (opcode: ${opcode}, subOpcode: ${subOpcode}, data.remaining(): ${data.remaining()})") + if (data.remaining() + HEADER_SIZE > MAXIMUM_PACKET_SIZE) { val segmentSize = MAXIMUM_PACKET_SIZE - HEADER_SIZE val segmentData = ByteArray(segmentSize) @@ -368,11 +410,12 @@ class SyncSocketSession { put(data.array(), data.position(), data.remaining()) } - //Logger.i(TAG, "Encrypting message (size = ${data.size + HEADER_SIZE})") - val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 0, data.remaining() + HEADER_SIZE) - //Logger.i(TAG, "Sending encrypted message (size = ${len})") - _outputStream.writeInt(len) - _outputStream.write(_sendBufferEncrypted, 0, len) + val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 4, data.remaining() + HEADER_SIZE) + val sendDuration = measureTimeMillis { + ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len) + _outputStream.write(_sendBufferEncrypted, 0, 4 + len) + } + Logger.v(TAG, "_outputStream.write (opcode: ${opcode}, subOpcode: ${subOpcode}, data.remaining(): ${data.remaining()}, sendDuration: ${sendDuration})") } } } @@ -391,8 +434,8 @@ class SyncSocketSession { val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 0, HEADER_SIZE) //Logger.i(TAG, "Sending encrypted message (size = ${len})") - _outputStream.writeInt(len) - _outputStream.write(_sendBufferEncrypted, 0, len) + ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len) + _outputStream.write(_sendBufferEncrypted, 0, 4 + len) } } @@ -411,6 +454,8 @@ class SyncSocketSession { val opcode = data.get().toUByte() val subOpcode = data.get().toUByte() + + Logger.v(TAG, "handleData (opcode: ${opcode}, subOpcode: ${subOpcode}, data.size: ${data.remaining()}, sourceChannel.connectionId: ${sourceChannel?.connectionId})") handlePacket(opcode, subOpcode, data, sourceChannel) } diff --git a/app/src/main/res/values/strings.xml b/app/src/main/res/values/strings.xml index 3edff39b..328f1be1 100644 --- a/app/src/main/res/values/strings.xml +++ b/app/src/main/res/values/strings.xml @@ -382,6 +382,8 @@ Allow devices to be paired through the relay Connection through relay Allow devices to be connected to through the relay + Connect direct through relay + Allow devices to be directly locally connected to through information discovered from the relay Gesture controls Volume slider Enable slide gesture to change volume