From 5b2f8b86175fdc03c96a6a70e316d3eb276d4218 Mon Sep 17 00:00:00 2001 From: Koen J Date: Fri, 11 Apr 2025 16:31:07 +0200 Subject: [PATCH] Finished remote sync. --- .../activities/SyncHomeActivity.kt | 3 +- .../activities/SyncPairActivity.kt | 4 +- .../futo/platformplayer/states/StateSync.kt | 250 ++++++++++-------- .../platformplayer/sync/internal/Channel.kt | 5 +- .../sync/internal/SyncSession.kt | 25 ++ .../sync/internal/SyncSocketSession.kt | 60 ++--- 6 files changed, 203 insertions(+), 144 deletions(-) diff --git a/app/src/main/java/com/futo/platformplayer/activities/SyncHomeActivity.kt b/app/src/main/java/com/futo/platformplayer/activities/SyncHomeActivity.kt index d1cd7706..2b7e3a72 100644 --- a/app/src/main/java/com/futo/platformplayer/activities/SyncHomeActivity.kt +++ b/app/src/main/java/com/futo/platformplayer/activities/SyncHomeActivity.kt @@ -100,7 +100,8 @@ class SyncHomeActivity : AppCompatActivity() { private fun updateDeviceView(syncDeviceView: SyncDeviceView, publicKey: String, session: SyncSession?): SyncDeviceView { val connected = session?.connected ?: false - syncDeviceView.setLinkType(if (connected) LinkType.Local else LinkType.None) + + syncDeviceView.setLinkType(session?.linkType ?: LinkType.None) .setName(session?.displayName ?: StateSync.instance.getCachedName(publicKey) ?: publicKey) //TODO: also display public key? .setStatus(if (connected) "Connected" else "Disconnected") diff --git a/app/src/main/java/com/futo/platformplayer/activities/SyncPairActivity.kt b/app/src/main/java/com/futo/platformplayer/activities/SyncPairActivity.kt index a7030b97..5e808977 100644 --- a/app/src/main/java/com/futo/platformplayer/activities/SyncPairActivity.kt +++ b/app/src/main/java/com/futo/platformplayer/activities/SyncPairActivity.kt @@ -109,9 +109,9 @@ class SyncPairActivity : AppCompatActivity() { lifecycleScope.launch(Dispatchers.IO) { try { - StateSync.instance.connect(deviceInfo) { session, complete, message -> + StateSync.instance.connect(deviceInfo) { complete, message -> lifecycleScope.launch(Dispatchers.Main) { - if (complete) { + if (complete != null && complete) { _layoutPairingSuccess.visibility = View.VISIBLE _layoutPairing.visibility = View.GONE } else { diff --git a/app/src/main/java/com/futo/platformplayer/states/StateSync.kt b/app/src/main/java/com/futo/platformplayer/states/StateSync.kt index 3e4a9891..57bcde5f 100644 --- a/app/src/main/java/com/futo/platformplayer/states/StateSync.kt +++ b/app/src/main/java/com/futo/platformplayer/states/StateSync.kt @@ -31,6 +31,7 @@ import com.futo.platformplayer.sync.SyncSessionData import com.futo.platformplayer.sync.internal.ChannelSocket import com.futo.platformplayer.sync.internal.GJSyncOpcodes import com.futo.platformplayer.sync.internal.IAuthorizable +import com.futo.platformplayer.sync.internal.IChannel import com.futo.platformplayer.sync.internal.Opcode import com.futo.platformplayer.sync.internal.SyncDeviceInfo import com.futo.platformplayer.sync.internal.SyncKeyPair @@ -51,6 +52,7 @@ import kotlinx.coroutines.withContext import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json import java.io.ByteArrayInputStream +import java.lang.Thread.sleep import java.net.InetAddress import java.net.InetSocketAddress import java.net.ServerSocket @@ -92,6 +94,8 @@ class StateSync { val deviceRemoved: Event1 = Event1() val deviceUpdatedOrAdded: Event2 = Event2() + //TODO: Should authorize acknowledge be implemented? + fun hasAuthorizedDevice(): Boolean { synchronized(_sessions) { return _sessions.any{ it.value.connected && it.value.isAuthorized }; @@ -220,6 +224,7 @@ class StateSync { try { Log.i(TAG, "Starting relay session...") + var socketClosed = false; val socket = Socket(RELAY_SERVER, 9000) _relaySession = SyncSocketSession( (socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!, @@ -271,61 +276,61 @@ class StateSync { session?.removeChannel(channel) } }, + onChannelEstablished = { _, channel, isResponder -> + handleAuthorization(channel, isResponder) + }, + onClose = { socketClosed = true }, onHandshakeComplete = { relaySession -> - try { - while (_started) { - val unconnectedAuthorizedDevices = synchronized(_authorizedDevices) { - _authorizedDevices.values.filter { !isConnected(it) }.toTypedArray() - } + Thread { + try { + while (_started && !socketClosed) { + val unconnectedAuthorizedDevices = synchronized(_authorizedDevices) { + _authorizedDevices.values.filter { !isConnected(it) }.toTypedArray() + } - relaySession.publishConnectionInformation(unconnectedAuthorizedDevices, PORT, true, false, false, true) + relaySession.publishConnectionInformation(unconnectedAuthorizedDevices, PORT, true, false, false, true) - val connectionInfos = runBlocking { relaySession.requestBulkConnectionInfo(unconnectedAuthorizedDevices) } + val connectionInfos = runBlocking { relaySession.requestBulkConnectionInfo(unconnectedAuthorizedDevices) } - for ((targetKey, connectionInfo) in connectionInfos) { - val potentialLocalAddresses = connectionInfo.ipv4Addresses.union(connectionInfo.ipv6Addresses) - .filter { it != connectionInfo.remoteIp } - if (connectionInfo.allowLocalDirect) { - Thread { + for ((targetKey, connectionInfo) in connectionInfos) { + val potentialLocalAddresses = connectionInfo.ipv4Addresses.union(connectionInfo.ipv6Addresses) + .filter { it != connectionInfo.remoteIp } + if (connectionInfo.allowLocalDirect) { + Thread { + try { + Log.v(TAG, "Attempting to connect directly, locally to '$targetKey'.") + connect(potentialLocalAddresses.map { it }.toTypedArray(), PORT, targetKey, null) + } catch (e: Throwable) { + Log.e(TAG, "Failed to start direct connection using connection info with $targetKey.", e) + } + }.start() + } + + if (connectionInfo.allowRemoteDirect) { + // TODO: Implement direct remote connection if needed + } + + if (connectionInfo.allowRemoteHolePunched) { + // TODO: Implement hole punching if needed + } + + if (connectionInfo.allowRemoteProxied) { try { - val syncDeviceInfo = SyncDeviceInfo( - targetKey, - potentialLocalAddresses.map { it }.toTypedArray(), - PORT, - null - ) - Log.v(TAG, "Attempting to connect directly, locally to '$targetKey'.") - connect(syncDeviceInfo) + Log.v(TAG, "Attempting relayed connection with '$targetKey'.") + runBlocking { relaySession.startRelayedChannel(targetKey, null) } } catch (e: Throwable) { - Log.e(TAG, "Failed to start direct connection using connection info with $targetKey.", e) + Log.e(TAG, "Failed to start relayed channel with $targetKey.", e) } - }.start() - } - - if (connectionInfo.allowRemoteDirect) { - // TODO: Implement direct remote connection if needed - } - - if (connectionInfo.allowRemoteHolePunched) { - // TODO: Implement hole punching if needed - } - - if (connectionInfo.allowRemoteProxied) { - try { - Log.v(TAG, "Attempting relayed connection with '$targetKey'.") - runBlocking { relaySession.startRelayedChannel(targetKey, null) } - } catch (e: Throwable) { - Log.e(TAG, "Failed to start relayed channel with $targetKey.", e) } } - } - Thread.sleep(15000) + Thread.sleep(15000) + } + } catch (e: Throwable) { + Log.e(TAG, "Unhandled exception in relay session.", e) + relaySession.stop() } - } catch (e: Throwable) { - Log.e(TAG, "Unhandled exception in relay session.", e) - relaySession.stop() - } + }.start() } ) @@ -718,7 +723,6 @@ class StateSync { } deviceRemoved.emit(it.remotePublicKey) - }, dataHandler = { it, opcode, subOpcode, data -> handleData(it, opcode, subOpcode, data) @@ -782,65 +786,7 @@ class StateSync { session!!.addChannel(channelSocket!!) } - if (isResponder) { - val isAuthorized = synchronized(_authorizedDevices) { - _authorizedDevices.values.contains(remotePublicKey) - } - - if (!isAuthorized) { - val scope = StateApp.instance.scopeOrNull - val activity = SyncShowPairingCodeActivity.activity - - if (scope != null && activity != null) { - scope.launch(Dispatchers.Main) { - UIDialogs.showConfirmationDialog(activity, "Allow connection from ${remotePublicKey}?", - action = { - scope.launch(Dispatchers.IO) { - try { - session!!.authorize() - Logger.i(TAG, "Connection authorized for $remotePublicKey by confirmation") - } catch (e: Throwable) { - Logger.e(TAG, "Failed to send authorize", e) - } - } - }, - cancelAction = { - scope.launch(Dispatchers.IO) { - try { - unauthorize(remotePublicKey) - } catch (e: Throwable) { - Logger.w(TAG, "Failed to send unauthorize", e) - } - - synchronized(_sessions) { - session?.close() - _sessions.remove(remotePublicKey) - } - } - } - ) - } - } else { - val publicKey = session!!.remotePublicKey - session!!.unauthorize() - session!!.close() - - synchronized(_sessions) { - _sessions.remove(publicKey) - } - - Logger.i(TAG, "Connection unauthorized for $remotePublicKey because not authorized and not on pairing activity to ask") - } - } else { - //Responder does not need to check because already approved - session!!.authorize() - Logger.i(TAG, "Connection authorized for $remotePublicKey because already authorized") - } - } else { - //Initiator does not need to check because the manual action of scanning the QR counts as approval - session!!.authorize() - Logger.i(TAG, "Connection authorized for $remotePublicKey because initiator") - } + handleAuthorization(channelSocket!!, isResponder) }, onData = { s, opcode, subOpcode, data -> session?.handlePacket(opcode, subOpcode, data) @@ -848,6 +794,71 @@ class StateSync { ) } + private fun handleAuthorization(channel: IChannel, isResponder: Boolean) { + val syncSession = channel.syncSession!! + val remotePublicKey = channel.remotePublicKey!! + + if (isResponder) { + val isAuthorized = synchronized(_authorizedDevices) { + _authorizedDevices.values.contains(remotePublicKey) + } + + if (!isAuthorized) { + val scope = StateApp.instance.scopeOrNull + val activity = SyncShowPairingCodeActivity.activity + + if (scope != null && activity != null) { + scope.launch(Dispatchers.Main) { + UIDialogs.showConfirmationDialog(activity, "Allow connection from ${remotePublicKey}?", + action = { + scope.launch(Dispatchers.IO) { + try { + syncSession.authorize() + Logger.i(TAG, "Connection authorized for $remotePublicKey by confirmation") + } catch (e: Throwable) { + Logger.e(TAG, "Failed to send authorize", e) + } + } + }, + cancelAction = { + scope.launch(Dispatchers.IO) { + try { + unauthorize(remotePublicKey) + } catch (e: Throwable) { + Logger.w(TAG, "Failed to send unauthorize", e) + } + + syncSession.close() + synchronized(_sessions) { + _sessions.remove(remotePublicKey) + } + } + } + ) + } + } else { + val publicKey = syncSession.remotePublicKey + syncSession.unauthorize() + syncSession.close() + + synchronized(_sessions) { + _sessions.remove(publicKey) + } + + Logger.i(TAG, "Connection unauthorized for $remotePublicKey because not authorized and not on pairing activity to ask") + } + } else { + //Responder does not need to check because already approved + syncSession.authorize() + Logger.i(TAG, "Connection authorized for $remotePublicKey because already authorized") + } + } else { + //Initiator does not need to check because the manual action of scanning the QR counts as approval + syncSession.authorize() + Logger.i(TAG, "Connection authorized for $remotePublicKey because initiator") + } + } + inline fun broadcastJsonData(subOpcode: UByte, data: T) { broadcast(Opcode.DATA.value, subOpcode, Json.encodeToString(data)); } @@ -895,16 +906,35 @@ class StateSync { _relaySession = null } - fun connect(deviceInfo: SyncDeviceInfo, onStatusUpdate: ((session: SyncSession?, complete: Boolean, message: String) -> Unit)? = null): SyncSocketSession { - onStatusUpdate?.invoke(null, false, "Connecting...") - val socket = getConnectedSocket(deviceInfo.addresses.map { InetAddress.getByName(it) }, deviceInfo.port) ?: throw Exception("Failed to connect") - onStatusUpdate?.invoke(null, false, "Handshaking...") + fun connect(deviceInfo: SyncDeviceInfo, onStatusUpdate: ((complete: Boolean?, message: String) -> Unit)? = null) { + try { + connect(deviceInfo.addresses, deviceInfo.port, deviceInfo.publicKey, deviceInfo.pairingCode, onStatusUpdate) + } catch (e: Throwable) { + Logger.e(TAG, "Failed to connect directly", e) + val relaySession = _relaySession + if (relaySession != null) { + onStatusUpdate?.invoke(null, "Connecting via relay...") + + runBlocking { + relaySession.startRelayedChannel(deviceInfo.publicKey, deviceInfo.pairingCode) + onStatusUpdate?.invoke(true, "Connected") + } + } else { + throw Exception("Failed to connect.") + } + } + } + + fun connect(addresses: Array, port: Int, publicKey: String, pairingCode: String?, onStatusUpdate: ((complete: Boolean?, message: String) -> Unit)? = null): SyncSocketSession { + onStatusUpdate?.invoke(null, "Connecting directly...") + val socket = getConnectedSocket(addresses.map { InetAddress.getByName(it) }, port) ?: throw Exception("Failed to connect") + onStatusUpdate?.invoke(null, "Handshaking...") val session = createSocketSession(socket, false) { s -> - onStatusUpdate?.invoke(s, true, "Handshake complete") + onStatusUpdate?.invoke(true, "Authorized") } - session.startAsInitiator(deviceInfo.publicKey, deviceInfo.pairingCode) + session.startAsInitiator(publicKey, pairingCode) return session } diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/Channel.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/Channel.kt index 1c7e1b7e..2d3f1580 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/internal/Channel.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/Channel.kt @@ -13,6 +13,7 @@ interface IChannel : AutoCloseable { val remotePublicKey: String? val remoteVersion: Int? var authorizable: IAuthorizable? + var syncSession: SyncSession? fun setDataHandler(onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)?) fun send(opcode: UByte, subOpcode: UByte = 0u, data: ByteBuffer? = null) fun setCloseHandler(onClose: ((IChannel) -> Unit)?) @@ -27,6 +28,7 @@ class ChannelSocket(private val session: SyncSocketSession) : IChannel { override var authorizable: IAuthorizable? get() = session.authorizable set(value) { session.authorizable = value } + override var syncSession: SyncSession? = null override fun setDataHandler(onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)?) { this.onData = onData @@ -76,10 +78,11 @@ class ChannelRelayed( override var authorizable: IAuthorizable? = null val isAuthorized: Boolean get() = authorizable?.isAuthorized ?: false var connectionId: Long = 0L - override var remotePublicKey: String? = null + override var remotePublicKey: String? = publicKey private set override var remoteVersion: Int? = null private set + override var syncSession: SyncSession? = null private var onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)? = null private var onClose: ((IChannel) -> Unit)? = null diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSession.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSession.kt index 2b4f112d..76af1edb 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSession.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSession.kt @@ -33,6 +33,30 @@ class SyncSession : IAuthorizable { private set val displayName: String get() = remoteDeviceName ?: remotePublicKey + val linkType: LinkType get() + { + var hasProxied = false + var hasDirect = false + synchronized(_channels) + { + for (channel in _channels) + { + if (channel is ChannelRelayed) + hasProxied = true + if (channel is ChannelSocket) + hasDirect = true + if (hasProxied && hasDirect) + return LinkType.Local + } + } + + if (hasProxied) + return LinkType.Proxied + if (hasDirect) + return LinkType.Local + return LinkType.None + } + var connected: Boolean = false private set(v) { if (field != v) { @@ -70,6 +94,7 @@ class SyncSession : IAuthorizable { } channel.authorizable = this + channel.syncSession = this } fun authorize() { diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt index 24d64f87..c8f4f683 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt @@ -37,8 +37,8 @@ class SyncSocketSession { private val _onClose: ((session: SyncSocketSession) -> Unit)? private val _onHandshakeComplete: ((session: SyncSocketSession) -> Unit)? private val _onNewChannel: ((session: SyncSocketSession, channel: ChannelRelayed) -> Unit)? + private val _onChannelEstablished: ((session: SyncSocketSession, channel: ChannelRelayed, isResponder: Boolean) -> Unit)? private val _isHandshakeAllowed: ((session: SyncSocketSession, remotePublicKey: String, pairingCode: String?) -> Boolean)? - private var _thread: Thread? = null private var _cipherStatePair: CipherStatePair? = null private var _remotePublicKey: String? = null val remotePublicKey: String? get() = _remotePublicKey @@ -86,6 +86,7 @@ class SyncSocketSession { onHandshakeComplete: ((session: SyncSocketSession) -> Unit)? = null, onData: ((session: SyncSocketSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) -> Unit)? = null, onNewChannel: ((session: SyncSocketSession, channel: ChannelRelayed) -> Unit)? = null, + onChannelEstablished: ((session: SyncSocketSession, channel: ChannelRelayed, isResponder: Boolean) -> Unit)? = null, isHandshakeAllowed: ((session: SyncSocketSession, remotePublicKey: String, pairingCode: String?) -> Boolean)? = null ) { _inputStream = inputStream @@ -95,6 +96,7 @@ class SyncSocketSession { _localKeyPair = localKeyPair _onData = onData _onNewChannel = onNewChannel + _onChannelEstablished = onChannelEstablished _isHandshakeAllowed = isHandshakeAllowed this.remoteAddress = remoteAddress @@ -105,33 +107,29 @@ class SyncSocketSession { fun startAsInitiator(remotePublicKey: String, pairingCode: String? = null) { _started = true - _thread = Thread { - try { - handshakeAsInitiator(remotePublicKey, pairingCode) - _onHandshakeComplete?.invoke(this) - receiveLoop() - } catch (e: Throwable) { - Logger.e(TAG, "Failed to run as initiator", e) - } finally { - stop() - } - }.apply { start() } + try { + handshakeAsInitiator(remotePublicKey, pairingCode) + _onHandshakeComplete?.invoke(this) + receiveLoop() + } catch (e: Throwable) { + Logger.e(TAG, "Failed to run as initiator", e) + } finally { + stop() + } } fun startAsResponder() { _started = true - _thread = Thread { - try { - if (handshakeAsResponder()) { - _onHandshakeComplete?.invoke(this) - receiveLoop() - } - } catch (e: Throwable) { - Logger.e(TAG, "Failed to run as responder", e) - } finally { - stop() + try { + if (handshakeAsResponder()) { + _onHandshakeComplete?.invoke(this) + receiveLoop() } - }.apply { start() } + } catch (e: Throwable) { + Logger.e(TAG, "Failed to run as responder", e) + } finally { + stop() + } } private fun receiveLoop() { @@ -191,7 +189,6 @@ class SyncSocketSession { _outputStream.close() _cipherStatePair?.sender?.destroy() _cipherStatePair?.receiver?.destroy() - _thread = null Logger.i(TAG, "Session closed") } @@ -434,10 +431,11 @@ class SyncSocketSession { return } val channel = ChannelRelayed(this, _localKeyPair, publicKey, false) - _onNewChannel?.invoke(this, channel) channel.connectionId = connectionId + _onNewChannel?.invoke(this, channel) _channels[connectionId] = channel channel.sendResponseTransport(remoteVersion, requestId, channelHandshakeMessage) + _onChannelEstablished?.invoke(this, channel, true) } else -> Logger.w(TAG, "Unhandled request opcode: $subOpcode") } @@ -483,6 +481,7 @@ class SyncSocketSession { channel.handleTransportRelayed(remoteVersion, connectionId, handshakeMessage) _channels[connectionId] = channel tcs.complete(channel) + _onChannelEstablished?.invoke(this, channel, false) } ?: Logger.e(TAG, "No pending channel for requestId $requestId") } else { _pendingChannels.remove(requestId)?.let { (channel, tcs) -> @@ -656,7 +655,12 @@ class SyncSocketSession { private fun handleNotify(subOpcode: UByte, data: ByteBuffer, sourceChannel: ChannelRelayed?) { when (subOpcode) { - NotifyOpcode.AUTHORIZED.value, NotifyOpcode.UNAUTHORIZED.value -> _onData?.invoke(this, Opcode.NOTIFY.value, subOpcode, data) + NotifyOpcode.AUTHORIZED.value, NotifyOpcode.UNAUTHORIZED.value -> { + if (sourceChannel != null) + sourceChannel.invokeDataHandler(Opcode.NOTIFY.value, subOpcode, data) + else + _onData?.invoke(this, Opcode.NOTIFY.value, subOpcode, data) + } NotifyOpcode.CONNECTION_INFO.value -> { /* Handle connection info if needed */ } } } @@ -829,10 +833,6 @@ class SyncSocketSession { } } } - - if (authorizable?.isAuthorized != true) { - return - } } suspend fun requestConnectionInfo(publicKey: String): ConnectionInfo? {