Possibel performance improvements to sync under high lat conditions.

This commit is contained in:
Koen J 2025-05-03 11:21:57 +02:00
commit f8edd6cf3d
5 changed files with 136 additions and 53 deletions

View file

@ -945,6 +945,9 @@ class Settings : FragmentedStorageFileJson() {
@FormField(R.string.connect_through_relay, FieldForm.TOGGLE, R.string.connect_through_relay_description, 3)
var connectThroughRelay: Boolean = true;
@FormField(R.string.connect_local_direct_through_relay, FieldForm.TOGGLE, R.string.connect_local_direct_through_relay_description, 3)
var connectLocalDirectThroughRelay: Boolean = true;
}
@FormField(R.string.info, FieldForm.GROUP, -1, 21)

View file

@ -361,8 +361,7 @@ class StateSync {
_relaySession = SyncSocketSession(
(socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!,
keyPair!!,
LittleEndianDataInputStream(socket.getInputStream()),
LittleEndianDataOutputStream(socket.getOutputStream()),
socket,
isHandshakeAllowed = { linkType, syncSocketSession, publicKey, pairingCode, appId -> isHandshakeAllowed(linkType, syncSocketSession, publicKey, pairingCode, appId) },
onNewChannel = { _, c ->
val remotePublicKey = c.remotePublicKey
@ -407,12 +406,14 @@ class StateSync {
relaySession.publishConnectionInformation(unconnectedAuthorizedDevices, PORT, Settings.instance.synchronization.discoverThroughRelay, false, false, Settings.instance.synchronization.discoverThroughRelay && Settings.instance.synchronization.connectThroughRelay)
Logger.v(TAG, "Requesting ${unconnectedAuthorizedDevices.size} devices connection information")
val connectionInfos = runBlocking { relaySession.requestBulkConnectionInfo(unconnectedAuthorizedDevices) }
Logger.v(TAG, "Received ${connectionInfos.size} devices connection information")
for ((targetKey, connectionInfo) in connectionInfos) {
val potentialLocalAddresses = connectionInfo.ipv4Addresses.union(connectionInfo.ipv6Addresses)
.filter { it != connectionInfo.remoteIp }
if (connectionInfo.allowLocalDirect) {
if (connectionInfo.allowLocalDirect && Settings.instance.synchronization.connectLocalDirectThroughRelay) {
Thread {
try {
Log.v(TAG, "Attempting to connect directly, locally to '$targetKey'.")
@ -433,10 +434,10 @@ class StateSync {
if (connectionInfo.allowRemoteRelayed && Settings.instance.synchronization.connectThroughRelay) {
try {
Log.v(TAG, "Attempting relayed connection with '$targetKey'.")
Logger.v(TAG, "Attempting relayed connection with '$targetKey'.")
runBlocking { relaySession.startRelayedChannel(targetKey, APP_ID, null) }
} catch (e: Throwable) {
Log.e(TAG, "Failed to start relayed channel with $targetKey.", e)
Logger.e(TAG, "Failed to start relayed channel with $targetKey.", e)
}
}
}
@ -444,7 +445,7 @@ class StateSync {
Thread.sleep(15000)
}
} catch (e: Throwable) {
Log.e(TAG, "Unhandled exception in relay session.", e)
Logger.e(TAG, "Unhandled exception in relay session.", e)
relaySession.stop()
}
}.start()
@ -585,16 +586,33 @@ class StateSync {
Logger.i(TAG, "Received SyncSessionData from $remotePublicKey");
val subscriptionPackageString = StateSubscriptions.instance.getSyncSubscriptionsPackageString()
Logger.i(TAG, "syncStateExchange syncSubscriptions b (size: ${subscriptionPackageString.length})")
session.sendData(GJSyncOpcodes.syncSubscriptions, subscriptionPackageString);
Logger.i(TAG, "syncStateExchange syncSubscriptions (size: ${subscriptionPackageString.length})")
session.sendData(GJSyncOpcodes.syncSubscriptions, StateSubscriptions.instance.getSyncSubscriptionsPackageString());
session.sendData(GJSyncOpcodes.syncSubscriptionGroups, StateSubscriptionGroups.instance.getSyncSubscriptionGroupsPackageString());
session.sendData(GJSyncOpcodes.syncPlaylists, StatePlaylists.instance.getSyncPlaylistsPackageString())
val subscriptionGroupPackageString = StateSubscriptionGroups.instance.getSyncSubscriptionGroupsPackageString()
Logger.i(TAG, "syncStateExchange syncSubscriptionGroups b (size: ${subscriptionGroupPackageString.length})")
session.sendData(GJSyncOpcodes.syncSubscriptionGroups, subscriptionGroupPackageString);
Logger.i(TAG, "syncStateExchange syncSubscriptionGroups (size: ${subscriptionGroupPackageString.length})")
session.sendData(GJSyncOpcodes.syncWatchLater, Json.encodeToString(StatePlaylists.instance.getWatchLaterSyncPacket(false)));
val syncPlaylistPackageString = StatePlaylists.instance.getSyncPlaylistsPackageString()
Logger.i(TAG, "syncStateExchange syncPlaylists b (size: ${syncPlaylistPackageString.length})")
session.sendData(GJSyncOpcodes.syncPlaylists, syncPlaylistPackageString)
Logger.i(TAG, "syncStateExchange syncPlaylists (size: ${syncPlaylistPackageString.length})")
val watchLaterPackageString = Json.encodeToString(StatePlaylists.instance.getWatchLaterSyncPacket(false))
Logger.i(TAG, "syncStateExchange syncWatchLater b (size: ${watchLaterPackageString.length})")
session.sendData(GJSyncOpcodes.syncWatchLater, watchLaterPackageString);
Logger.i(TAG, "syncStateExchange syncWatchLater (size: ${watchLaterPackageString.length})")
val recentHistory = StateHistory.instance.getRecentHistory(syncSessionData.lastHistory);
Logger.i(TAG, "syncStateExchange syncHistory b (size: ${recentHistory.size})")
if(recentHistory.isNotEmpty())
session.sendJsonData(GJSyncOpcodes.syncHistory, recentHistory);
Logger.i(TAG, "syncStateExchange syncHistory (size: ${recentHistory.size})")
}
GJSyncOpcodes.syncExport -> {
@ -825,7 +843,17 @@ class StateSync {
}
},
dataHandler = { it, opcode, subOpcode, data ->
handleData(it, opcode, subOpcode, data)
val dataCopy = ByteArray(data.remaining())
data.get(dataCopy)
StateApp.instance.scopeOrNull?.launch {
try {
handleData(it, opcode, subOpcode, ByteBuffer.wrap(dataCopy))
} catch (e: Throwable) {
Logger.e(TAG, "Exception occurred while handling data, closing session", e)
it.close()
}
}
},
remoteDeviceName
)
@ -860,8 +888,7 @@ class StateSync {
return SyncSocketSession(
(socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!,
keyPair!!,
LittleEndianDataInputStream(socket.getInputStream()),
LittleEndianDataOutputStream(socket.getOutputStream()),
socket,
onClose = { s ->
if (channelSocket != null)
session?.removeChannel(channelSocket!!)

View file

@ -192,6 +192,8 @@ class ChannelRelayed(
val HEADER_SIZE = 6
val MAX_DATA_PER_PACKET = SyncSocketSession.MAXIMUM_PACKET_SIZE - HEADER_SIZE - CONNECTION_ID_SIZE - ENCRYPTION_OVERHEAD - 16
Logger.v(TAG, "Send (opcode: ${opcode}, subOpcode: ${subOpcode}, data.size: ${data?.remaining()})")
if (actualCount > MAX_DATA_PER_PACKET && data != null) {
val streamId = session.generateStreamId()
val totalSize = actualCount
@ -333,4 +335,8 @@ class ChannelRelayed(
completeHandshake(remoteVersion, transport)
}
}
companion object {
private val TAG = "Channel"
}
}

View file

@ -10,25 +10,31 @@ import com.futo.platformplayer.noise.protocol.DHState
import com.futo.platformplayer.noise.protocol.HandshakeState
import com.futo.platformplayer.states.StateSync
import kotlinx.coroutines.CompletableDeferred
import java.io.InputStream
import java.io.OutputStream
import java.net.Inet4Address
import java.net.Inet6Address
import java.net.InetAddress
import java.net.NetworkInterface
import java.net.Socket
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.Base64
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import kotlin.math.min
import kotlin.system.measureTimeMillis
import kotlin.time.measureTime
class SyncSocketSession {
private val _inputStream: LittleEndianDataInputStream
private val _outputStream: LittleEndianDataOutputStream
private val _socket: Socket
private val _inputStream: InputStream
private val _outputStream: OutputStream
private val _sendLockObject = Object()
private val _buffer = ByteArray(MAXIMUM_PACKET_SIZE_ENCRYPTED)
private val _bufferDecrypted = ByteArray(MAXIMUM_PACKET_SIZE)
private val _sendBuffer = ByteArray(MAXIMUM_PACKET_SIZE)
private val _sendBufferEncrypted = ByteArray(MAXIMUM_PACKET_SIZE_ENCRYPTED)
private val _sendBufferEncrypted = ByteArray(4 + MAXIMUM_PACKET_SIZE_ENCRYPTED)
private val _syncStreams = hashMapOf<Int, SyncStream>()
private var _streamIdGenerator = 0
private val _streamIdGeneratorLock = Object()
@ -81,8 +87,7 @@ class SyncSocketSession {
constructor(
remoteAddress: String,
localKeyPair: DHState,
inputStream: LittleEndianDataInputStream,
outputStream: LittleEndianDataOutputStream,
socket: Socket,
onClose: ((session: SyncSocketSession) -> Unit)? = null,
onHandshakeComplete: ((session: SyncSocketSession) -> Unit)? = null,
onData: ((session: SyncSocketSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) -> Unit)? = null,
@ -90,8 +95,12 @@ class SyncSocketSession {
onChannelEstablished: ((session: SyncSocketSession, channel: ChannelRelayed, isResponder: Boolean) -> Unit)? = null,
isHandshakeAllowed: ((linkType: LinkType, session: SyncSocketSession, remotePublicKey: String, pairingCode: String?, appId: UInt) -> Boolean)? = null
) {
_inputStream = inputStream
_outputStream = outputStream
_socket = socket
_socket.receiveBufferSize = MAXIMUM_PACKET_SIZE_ENCRYPTED
_socket.sendBufferSize = MAXIMUM_PACKET_SIZE_ENCRYPTED
_socket.tcpNoDelay = true
_inputStream = _socket.getInputStream()
_outputStream = _socket.getOutputStream()
_onClose = onClose
_onHandshakeComplete = onHandshakeComplete
_localKeyPair = localKeyPair
@ -150,30 +159,45 @@ class SyncSocketSession {
}.apply { start() }
}
private fun readExact(buffer: ByteArray, offset: Int, size: Int) {
var totalBytesReceived: Int = 0
while (totalBytesReceived < size) {
val bytesReceived = _inputStream.read(buffer, offset + totalBytesReceived, size - totalBytesReceived)
if (bytesReceived == 0)
throw Exception("Socket disconnected")
totalBytesReceived += bytesReceived
}
}
private fun receiveLoop() {
while (_started) {
try {
val messageSize = _inputStream.readInt()
//Logger.v(TAG, "Waiting for message size...")
readExact(_buffer, 0, 4)
val messageSize = ByteBuffer.wrap(_buffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int
//Logger.v(TAG, "Read message size ${messageSize}.")
if (messageSize > MAXIMUM_PACKET_SIZE_ENCRYPTED) {
throw Exception("Message size (${messageSize}) cannot exceed MAXIMUM_PACKET_SIZE ($MAXIMUM_PACKET_SIZE_ENCRYPTED)")
}
//Logger.i(TAG, "Receiving message (size = ${messageSize})")
var bytesRead = 0
while (bytesRead < messageSize) {
val read = _inputStream.read(_buffer, bytesRead, messageSize - bytesRead)
if (read == -1)
throw Exception("Stream closed")
bytesRead += read
}
readExact(_buffer, 0, messageSize)
//Logger.v(TAG, "Read ${messageSize}.")
//Logger.v(TAG, "Decrypting ${messageSize} bytes.")
val plen: Int = _cipherStatePair!!.receiver.decryptWithAd(null, _buffer, 0, _bufferDecrypted, 0, messageSize)
//Logger.i(TAG, "Decrypted message (size = ${plen})")
//Logger.v(TAG, "Decrypted ${messageSize} bytes.")
handleData(_bufferDecrypted, plen, null)
//Logger.v(TAG, "Handled data ${messageSize} bytes.")
} catch (e: Throwable) {
Logger.e(TAG, "Exception while receiving data", e)
Logger.e(TAG, "Exception while receiving data, closing socket session", e)
stop()
break
}
}
@ -203,8 +227,7 @@ class SyncSocketSession {
_channels.values.forEach { it.close() }
_channels.clear()
_onClose?.invoke(this)
_inputStream.close()
_outputStream.close()
_socket.close()
_thread = null
_cipherStatePair?.sender?.destroy()
_cipherStatePair?.receiver?.destroy()
@ -237,18 +260,25 @@ class SyncSocketSession {
val mainBuffer = ByteArray(512)
val mainLength = initiator.writeMessage(mainBuffer, 0, null, 0, 0)
val messageData = ByteBuffer.allocate(4 + 4 + pairingMessageLength + mainLength).order(ByteOrder.LITTLE_ENDIAN)
val messageSize = 4 + 4 + pairingMessageLength + mainLength
val messageData = ByteBuffer.allocate(4 + messageSize).order(ByteOrder.LITTLE_ENDIAN)
messageData.putInt(messageSize)
messageData.putInt(appId.toInt())
messageData.putInt(pairingMessageLength)
if (pairingMessageLength > 0) messageData.put(pairingMessage)
messageData.put(mainBuffer, 0, mainLength)
val messageDataArray = messageData.array()
_outputStream.writeInt(messageDataArray.size)
_outputStream.write(messageDataArray)
_outputStream.write(messageDataArray, 0, 4 + messageSize)
readExact(_buffer, 0, 4)
val responseSize = ByteBuffer.wrap(_buffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int
if (responseSize > MAXIMUM_PACKET_SIZE_ENCRYPTED) {
throw Exception("Message size (${messageSize}) cannot exceed MAXIMUM_PACKET_SIZE ($MAXIMUM_PACKET_SIZE_ENCRYPTED)")
}
val responseSize = _inputStream.readInt()
val responseMessage = ByteArray(responseSize)
_inputStream.readFully(responseMessage)
readExact(responseMessage, 0, responseSize)
val plaintext = ByteArray(512) // Buffer for any payload (none expected here)
initiator.readMessage(responseMessage, 0, responseSize, plaintext, 0)
@ -265,11 +295,16 @@ class SyncSocketSession {
responder.localKeyPair.copyFrom(_localKeyPair)
responder.start()
val messageSize = _inputStream.readInt()
val message = ByteArray(messageSize)
_inputStream.readFully(message)
val messageBuffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN)
readExact(_buffer, 0, 4)
val messageSize = ByteBuffer.wrap(_buffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int
if (messageSize > MAXIMUM_PACKET_SIZE_ENCRYPTED) {
throw Exception("Message size (${messageSize}) cannot exceed MAXIMUM_PACKET_SIZE ($MAXIMUM_PACKET_SIZE_ENCRYPTED)")
}
val message = ByteArray(messageSize)
readExact(message, 0, messageSize)
val messageBuffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN)
val appId = messageBuffer.int.toUInt()
val pairingMessageLength = messageBuffer.int
val pairingMessage = if (pairingMessageLength > 0) ByteArray(pairingMessageLength).also { messageBuffer.get(it) } else byteArrayOf()
@ -298,10 +333,10 @@ class SyncSocketSession {
return false
}
val responseBuffer = ByteArray(512)
val responseLength = responder.writeMessage(responseBuffer, 0, null, 0, 0)
_outputStream.writeInt(responseLength)
_outputStream.write(responseBuffer, 0, responseLength)
val responseBuffer = ByteArray(4 + 512)
val responseLength = responder.writeMessage(responseBuffer, 4, null, 0, 0)
ByteBuffer.wrap(responseBuffer).order(ByteOrder.LITTLE_ENDIAN).putInt(responseLength)
_outputStream.write(responseBuffer, 0, 4 + responseLength)
_cipherStatePair = responder.split()
_remotePublicKey = remotePublicKey
@ -311,8 +346,13 @@ class SyncSocketSession {
private fun performVersionCheck() {
val CURRENT_VERSION = 4
val MINIMUM_VERSION = 4
_outputStream.writeInt(CURRENT_VERSION)
remoteVersion = _inputStream.readInt()
val versionBytes = ByteArray(4)
ByteBuffer.wrap(versionBytes).order(ByteOrder.LITTLE_ENDIAN).putInt(CURRENT_VERSION)
_outputStream.write(versionBytes, 0, 4)
readExact(versionBytes, 0, 4)
remoteVersion = ByteBuffer.wrap(versionBytes, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int
Logger.i(TAG, "performVersionCheck (version = $remoteVersion)")
if (remoteVersion < MINIMUM_VERSION)
throw Exception("Invalid version")
@ -324,6 +364,8 @@ class SyncSocketSession {
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer) {
ensureNotMainThread()
Logger.v(TAG, "send (opcode: ${opcode}, subOpcode: ${subOpcode}, data.remaining(): ${data.remaining()})")
if (data.remaining() + HEADER_SIZE > MAXIMUM_PACKET_SIZE) {
val segmentSize = MAXIMUM_PACKET_SIZE - HEADER_SIZE
val segmentData = ByteArray(segmentSize)
@ -368,11 +410,12 @@ class SyncSocketSession {
put(data.array(), data.position(), data.remaining())
}
//Logger.i(TAG, "Encrypting message (size = ${data.size + HEADER_SIZE})")
val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 0, data.remaining() + HEADER_SIZE)
//Logger.i(TAG, "Sending encrypted message (size = ${len})")
_outputStream.writeInt(len)
_outputStream.write(_sendBufferEncrypted, 0, len)
val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 4, data.remaining() + HEADER_SIZE)
val sendDuration = measureTimeMillis {
ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len)
_outputStream.write(_sendBufferEncrypted, 0, 4 + len)
}
Logger.v(TAG, "_outputStream.write (opcode: ${opcode}, subOpcode: ${subOpcode}, data.remaining(): ${data.remaining()}, sendDuration: ${sendDuration})")
}
}
}
@ -391,8 +434,8 @@ class SyncSocketSession {
val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 0, HEADER_SIZE)
//Logger.i(TAG, "Sending encrypted message (size = ${len})")
_outputStream.writeInt(len)
_outputStream.write(_sendBufferEncrypted, 0, len)
ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len)
_outputStream.write(_sendBufferEncrypted, 0, 4 + len)
}
}
@ -411,6 +454,8 @@ class SyncSocketSession {
val opcode = data.get().toUByte()
val subOpcode = data.get().toUByte()
Logger.v(TAG, "handleData (opcode: ${opcode}, subOpcode: ${subOpcode}, data.size: ${data.remaining()}, sourceChannel.connectionId: ${sourceChannel?.connectionId})")
handlePacket(opcode, subOpcode, data, sourceChannel)
}

View file

@ -382,6 +382,8 @@
<string name="pair_through_relay_description">Allow devices to be paired through the relay</string>
<string name="connect_through_relay">Connection through relay</string>
<string name="connect_through_relay_description">Allow devices to be connected to through the relay</string>
<string name="connect_local_direct_through_relay">Connect direct through relay</string>
<string name="connect_local_direct_through_relay_description">Allow devices to be directly locally connected to through information discovered from the relay</string>
<string name="gesture_controls">Gesture controls</string>
<string name="volume_slider">Volume slider</string>
<string name="volume_slider_descr">Enable slide gesture to change volume</string>