Finished remote sync.

This commit is contained in:
Koen J 2025-04-11 16:31:07 +02:00
commit 5b2f8b8617
6 changed files with 203 additions and 144 deletions

View file

@ -100,7 +100,8 @@ class SyncHomeActivity : AppCompatActivity() {
private fun updateDeviceView(syncDeviceView: SyncDeviceView, publicKey: String, session: SyncSession?): SyncDeviceView { private fun updateDeviceView(syncDeviceView: SyncDeviceView, publicKey: String, session: SyncSession?): SyncDeviceView {
val connected = session?.connected ?: false val connected = session?.connected ?: false
syncDeviceView.setLinkType(if (connected) LinkType.Local else LinkType.None)
syncDeviceView.setLinkType(session?.linkType ?: LinkType.None)
.setName(session?.displayName ?: StateSync.instance.getCachedName(publicKey) ?: publicKey) .setName(session?.displayName ?: StateSync.instance.getCachedName(publicKey) ?: publicKey)
//TODO: also display public key? //TODO: also display public key?
.setStatus(if (connected) "Connected" else "Disconnected") .setStatus(if (connected) "Connected" else "Disconnected")

View file

@ -109,9 +109,9 @@ class SyncPairActivity : AppCompatActivity() {
lifecycleScope.launch(Dispatchers.IO) { lifecycleScope.launch(Dispatchers.IO) {
try { try {
StateSync.instance.connect(deviceInfo) { session, complete, message -> StateSync.instance.connect(deviceInfo) { complete, message ->
lifecycleScope.launch(Dispatchers.Main) { lifecycleScope.launch(Dispatchers.Main) {
if (complete) { if (complete != null && complete) {
_layoutPairingSuccess.visibility = View.VISIBLE _layoutPairingSuccess.visibility = View.VISIBLE
_layoutPairing.visibility = View.GONE _layoutPairing.visibility = View.GONE
} else { } else {

View file

@ -31,6 +31,7 @@ import com.futo.platformplayer.sync.SyncSessionData
import com.futo.platformplayer.sync.internal.ChannelSocket import com.futo.platformplayer.sync.internal.ChannelSocket
import com.futo.platformplayer.sync.internal.GJSyncOpcodes import com.futo.platformplayer.sync.internal.GJSyncOpcodes
import com.futo.platformplayer.sync.internal.IAuthorizable import com.futo.platformplayer.sync.internal.IAuthorizable
import com.futo.platformplayer.sync.internal.IChannel
import com.futo.platformplayer.sync.internal.Opcode import com.futo.platformplayer.sync.internal.Opcode
import com.futo.platformplayer.sync.internal.SyncDeviceInfo import com.futo.platformplayer.sync.internal.SyncDeviceInfo
import com.futo.platformplayer.sync.internal.SyncKeyPair import com.futo.platformplayer.sync.internal.SyncKeyPair
@ -51,6 +52,7 @@ import kotlinx.coroutines.withContext
import kotlinx.serialization.encodeToString import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import java.io.ByteArrayInputStream import java.io.ByteArrayInputStream
import java.lang.Thread.sleep
import java.net.InetAddress import java.net.InetAddress
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.net.ServerSocket import java.net.ServerSocket
@ -92,6 +94,8 @@ class StateSync {
val deviceRemoved: Event1<String> = Event1() val deviceRemoved: Event1<String> = Event1()
val deviceUpdatedOrAdded: Event2<String, SyncSession> = Event2() val deviceUpdatedOrAdded: Event2<String, SyncSession> = Event2()
//TODO: Should authorize acknowledge be implemented?
fun hasAuthorizedDevice(): Boolean { fun hasAuthorizedDevice(): Boolean {
synchronized(_sessions) { synchronized(_sessions) {
return _sessions.any{ it.value.connected && it.value.isAuthorized }; return _sessions.any{ it.value.connected && it.value.isAuthorized };
@ -220,6 +224,7 @@ class StateSync {
try { try {
Log.i(TAG, "Starting relay session...") Log.i(TAG, "Starting relay session...")
var socketClosed = false;
val socket = Socket(RELAY_SERVER, 9000) val socket = Socket(RELAY_SERVER, 9000)
_relaySession = SyncSocketSession( _relaySession = SyncSocketSession(
(socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!, (socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!,
@ -271,9 +276,14 @@ class StateSync {
session?.removeChannel(channel) session?.removeChannel(channel)
} }
}, },
onChannelEstablished = { _, channel, isResponder ->
handleAuthorization(channel, isResponder)
},
onClose = { socketClosed = true },
onHandshakeComplete = { relaySession -> onHandshakeComplete = { relaySession ->
Thread {
try { try {
while (_started) { while (_started && !socketClosed) {
val unconnectedAuthorizedDevices = synchronized(_authorizedDevices) { val unconnectedAuthorizedDevices = synchronized(_authorizedDevices) {
_authorizedDevices.values.filter { !isConnected(it) }.toTypedArray() _authorizedDevices.values.filter { !isConnected(it) }.toTypedArray()
} }
@ -288,14 +298,8 @@ class StateSync {
if (connectionInfo.allowLocalDirect) { if (connectionInfo.allowLocalDirect) {
Thread { Thread {
try { try {
val syncDeviceInfo = SyncDeviceInfo(
targetKey,
potentialLocalAddresses.map { it }.toTypedArray(),
PORT,
null
)
Log.v(TAG, "Attempting to connect directly, locally to '$targetKey'.") Log.v(TAG, "Attempting to connect directly, locally to '$targetKey'.")
connect(syncDeviceInfo) connect(potentialLocalAddresses.map { it }.toTypedArray(), PORT, targetKey, null)
} catch (e: Throwable) { } catch (e: Throwable) {
Log.e(TAG, "Failed to start direct connection using connection info with $targetKey.", e) Log.e(TAG, "Failed to start direct connection using connection info with $targetKey.", e)
} }
@ -326,6 +330,7 @@ class StateSync {
Log.e(TAG, "Unhandled exception in relay session.", e) Log.e(TAG, "Unhandled exception in relay session.", e)
relaySession.stop() relaySession.stop()
} }
}.start()
} }
) )
@ -718,7 +723,6 @@ class StateSync {
} }
deviceRemoved.emit(it.remotePublicKey) deviceRemoved.emit(it.remotePublicKey)
}, },
dataHandler = { it, opcode, subOpcode, data -> dataHandler = { it, opcode, subOpcode, data ->
handleData(it, opcode, subOpcode, data) handleData(it, opcode, subOpcode, data)
@ -782,6 +786,18 @@ class StateSync {
session!!.addChannel(channelSocket!!) session!!.addChannel(channelSocket!!)
} }
handleAuthorization(channelSocket!!, isResponder)
},
onData = { s, opcode, subOpcode, data ->
session?.handlePacket(opcode, subOpcode, data)
}
)
}
private fun handleAuthorization(channel: IChannel, isResponder: Boolean) {
val syncSession = channel.syncSession!!
val remotePublicKey = channel.remotePublicKey!!
if (isResponder) { if (isResponder) {
val isAuthorized = synchronized(_authorizedDevices) { val isAuthorized = synchronized(_authorizedDevices) {
_authorizedDevices.values.contains(remotePublicKey) _authorizedDevices.values.contains(remotePublicKey)
@ -797,7 +813,7 @@ class StateSync {
action = { action = {
scope.launch(Dispatchers.IO) { scope.launch(Dispatchers.IO) {
try { try {
session!!.authorize() syncSession.authorize()
Logger.i(TAG, "Connection authorized for $remotePublicKey by confirmation") Logger.i(TAG, "Connection authorized for $remotePublicKey by confirmation")
} catch (e: Throwable) { } catch (e: Throwable) {
Logger.e(TAG, "Failed to send authorize", e) Logger.e(TAG, "Failed to send authorize", e)
@ -812,8 +828,8 @@ class StateSync {
Logger.w(TAG, "Failed to send unauthorize", e) Logger.w(TAG, "Failed to send unauthorize", e)
} }
syncSession.close()
synchronized(_sessions) { synchronized(_sessions) {
session?.close()
_sessions.remove(remotePublicKey) _sessions.remove(remotePublicKey)
} }
} }
@ -821,9 +837,9 @@ class StateSync {
) )
} }
} else { } else {
val publicKey = session!!.remotePublicKey val publicKey = syncSession.remotePublicKey
session!!.unauthorize() syncSession.unauthorize()
session!!.close() syncSession.close()
synchronized(_sessions) { synchronized(_sessions) {
_sessions.remove(publicKey) _sessions.remove(publicKey)
@ -833,19 +849,14 @@ class StateSync {
} }
} else { } else {
//Responder does not need to check because already approved //Responder does not need to check because already approved
session!!.authorize() syncSession.authorize()
Logger.i(TAG, "Connection authorized for $remotePublicKey because already authorized") Logger.i(TAG, "Connection authorized for $remotePublicKey because already authorized")
} }
} else { } else {
//Initiator does not need to check because the manual action of scanning the QR counts as approval //Initiator does not need to check because the manual action of scanning the QR counts as approval
session!!.authorize() syncSession.authorize()
Logger.i(TAG, "Connection authorized for $remotePublicKey because initiator") Logger.i(TAG, "Connection authorized for $remotePublicKey because initiator")
} }
},
onData = { s, opcode, subOpcode, data ->
session?.handlePacket(opcode, subOpcode, data)
}
)
} }
inline fun <reified T> broadcastJsonData(subOpcode: UByte, data: T) { inline fun <reified T> broadcastJsonData(subOpcode: UByte, data: T) {
@ -895,16 +906,35 @@ class StateSync {
_relaySession = null _relaySession = null
} }
fun connect(deviceInfo: SyncDeviceInfo, onStatusUpdate: ((session: SyncSession?, complete: Boolean, message: String) -> Unit)? = null): SyncSocketSession { fun connect(deviceInfo: SyncDeviceInfo, onStatusUpdate: ((complete: Boolean?, message: String) -> Unit)? = null) {
onStatusUpdate?.invoke(null, false, "Connecting...") try {
val socket = getConnectedSocket(deviceInfo.addresses.map { InetAddress.getByName(it) }, deviceInfo.port) ?: throw Exception("Failed to connect") connect(deviceInfo.addresses, deviceInfo.port, deviceInfo.publicKey, deviceInfo.pairingCode, onStatusUpdate)
onStatusUpdate?.invoke(null, false, "Handshaking...") } catch (e: Throwable) {
Logger.e(TAG, "Failed to connect directly", e)
val relaySession = _relaySession
if (relaySession != null) {
onStatusUpdate?.invoke(null, "Connecting via relay...")
val session = createSocketSession(socket, false) { s -> runBlocking {
onStatusUpdate?.invoke(s, true, "Handshake complete") relaySession.startRelayedChannel(deviceInfo.publicKey, deviceInfo.pairingCode)
onStatusUpdate?.invoke(true, "Connected")
}
} else {
throw Exception("Failed to connect.")
}
}
} }
session.startAsInitiator(deviceInfo.publicKey, deviceInfo.pairingCode) fun connect(addresses: Array<String>, port: Int, publicKey: String, pairingCode: String?, onStatusUpdate: ((complete: Boolean?, message: String) -> Unit)? = null): SyncSocketSession {
onStatusUpdate?.invoke(null, "Connecting directly...")
val socket = getConnectedSocket(addresses.map { InetAddress.getByName(it) }, port) ?: throw Exception("Failed to connect")
onStatusUpdate?.invoke(null, "Handshaking...")
val session = createSocketSession(socket, false) { s ->
onStatusUpdate?.invoke(true, "Authorized")
}
session.startAsInitiator(publicKey, pairingCode)
return session return session
} }

View file

@ -13,6 +13,7 @@ interface IChannel : AutoCloseable {
val remotePublicKey: String? val remotePublicKey: String?
val remoteVersion: Int? val remoteVersion: Int?
var authorizable: IAuthorizable? var authorizable: IAuthorizable?
var syncSession: SyncSession?
fun setDataHandler(onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)?) fun setDataHandler(onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)?)
fun send(opcode: UByte, subOpcode: UByte = 0u, data: ByteBuffer? = null) fun send(opcode: UByte, subOpcode: UByte = 0u, data: ByteBuffer? = null)
fun setCloseHandler(onClose: ((IChannel) -> Unit)?) fun setCloseHandler(onClose: ((IChannel) -> Unit)?)
@ -27,6 +28,7 @@ class ChannelSocket(private val session: SyncSocketSession) : IChannel {
override var authorizable: IAuthorizable? override var authorizable: IAuthorizable?
get() = session.authorizable get() = session.authorizable
set(value) { session.authorizable = value } set(value) { session.authorizable = value }
override var syncSession: SyncSession? = null
override fun setDataHandler(onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)?) { override fun setDataHandler(onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)?) {
this.onData = onData this.onData = onData
@ -76,10 +78,11 @@ class ChannelRelayed(
override var authorizable: IAuthorizable? = null override var authorizable: IAuthorizable? = null
val isAuthorized: Boolean get() = authorizable?.isAuthorized ?: false val isAuthorized: Boolean get() = authorizable?.isAuthorized ?: false
var connectionId: Long = 0L var connectionId: Long = 0L
override var remotePublicKey: String? = null override var remotePublicKey: String? = publicKey
private set private set
override var remoteVersion: Int? = null override var remoteVersion: Int? = null
private set private set
override var syncSession: SyncSession? = null
private var onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)? = null private var onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)? = null
private var onClose: ((IChannel) -> Unit)? = null private var onClose: ((IChannel) -> Unit)? = null

View file

@ -33,6 +33,30 @@ class SyncSession : IAuthorizable {
private set private set
val displayName: String get() = remoteDeviceName ?: remotePublicKey val displayName: String get() = remoteDeviceName ?: remotePublicKey
val linkType: LinkType get()
{
var hasProxied = false
var hasDirect = false
synchronized(_channels)
{
for (channel in _channels)
{
if (channel is ChannelRelayed)
hasProxied = true
if (channel is ChannelSocket)
hasDirect = true
if (hasProxied && hasDirect)
return LinkType.Local
}
}
if (hasProxied)
return LinkType.Proxied
if (hasDirect)
return LinkType.Local
return LinkType.None
}
var connected: Boolean = false var connected: Boolean = false
private set(v) { private set(v) {
if (field != v) { if (field != v) {
@ -70,6 +94,7 @@ class SyncSession : IAuthorizable {
} }
channel.authorizable = this channel.authorizable = this
channel.syncSession = this
} }
fun authorize() { fun authorize() {

View file

@ -37,8 +37,8 @@ class SyncSocketSession {
private val _onClose: ((session: SyncSocketSession) -> Unit)? private val _onClose: ((session: SyncSocketSession) -> Unit)?
private val _onHandshakeComplete: ((session: SyncSocketSession) -> Unit)? private val _onHandshakeComplete: ((session: SyncSocketSession) -> Unit)?
private val _onNewChannel: ((session: SyncSocketSession, channel: ChannelRelayed) -> Unit)? private val _onNewChannel: ((session: SyncSocketSession, channel: ChannelRelayed) -> Unit)?
private val _onChannelEstablished: ((session: SyncSocketSession, channel: ChannelRelayed, isResponder: Boolean) -> Unit)?
private val _isHandshakeAllowed: ((session: SyncSocketSession, remotePublicKey: String, pairingCode: String?) -> Boolean)? private val _isHandshakeAllowed: ((session: SyncSocketSession, remotePublicKey: String, pairingCode: String?) -> Boolean)?
private var _thread: Thread? = null
private var _cipherStatePair: CipherStatePair? = null private var _cipherStatePair: CipherStatePair? = null
private var _remotePublicKey: String? = null private var _remotePublicKey: String? = null
val remotePublicKey: String? get() = _remotePublicKey val remotePublicKey: String? get() = _remotePublicKey
@ -86,6 +86,7 @@ class SyncSocketSession {
onHandshakeComplete: ((session: SyncSocketSession) -> Unit)? = null, onHandshakeComplete: ((session: SyncSocketSession) -> Unit)? = null,
onData: ((session: SyncSocketSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) -> Unit)? = null, onData: ((session: SyncSocketSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) -> Unit)? = null,
onNewChannel: ((session: SyncSocketSession, channel: ChannelRelayed) -> Unit)? = null, onNewChannel: ((session: SyncSocketSession, channel: ChannelRelayed) -> Unit)? = null,
onChannelEstablished: ((session: SyncSocketSession, channel: ChannelRelayed, isResponder: Boolean) -> Unit)? = null,
isHandshakeAllowed: ((session: SyncSocketSession, remotePublicKey: String, pairingCode: String?) -> Boolean)? = null isHandshakeAllowed: ((session: SyncSocketSession, remotePublicKey: String, pairingCode: String?) -> Boolean)? = null
) { ) {
_inputStream = inputStream _inputStream = inputStream
@ -95,6 +96,7 @@ class SyncSocketSession {
_localKeyPair = localKeyPair _localKeyPair = localKeyPair
_onData = onData _onData = onData
_onNewChannel = onNewChannel _onNewChannel = onNewChannel
_onChannelEstablished = onChannelEstablished
_isHandshakeAllowed = isHandshakeAllowed _isHandshakeAllowed = isHandshakeAllowed
this.remoteAddress = remoteAddress this.remoteAddress = remoteAddress
@ -105,7 +107,6 @@ class SyncSocketSession {
fun startAsInitiator(remotePublicKey: String, pairingCode: String? = null) { fun startAsInitiator(remotePublicKey: String, pairingCode: String? = null) {
_started = true _started = true
_thread = Thread {
try { try {
handshakeAsInitiator(remotePublicKey, pairingCode) handshakeAsInitiator(remotePublicKey, pairingCode)
_onHandshakeComplete?.invoke(this) _onHandshakeComplete?.invoke(this)
@ -115,12 +116,10 @@ class SyncSocketSession {
} finally { } finally {
stop() stop()
} }
}.apply { start() }
} }
fun startAsResponder() { fun startAsResponder() {
_started = true _started = true
_thread = Thread {
try { try {
if (handshakeAsResponder()) { if (handshakeAsResponder()) {
_onHandshakeComplete?.invoke(this) _onHandshakeComplete?.invoke(this)
@ -131,7 +130,6 @@ class SyncSocketSession {
} finally { } finally {
stop() stop()
} }
}.apply { start() }
} }
private fun receiveLoop() { private fun receiveLoop() {
@ -191,7 +189,6 @@ class SyncSocketSession {
_outputStream.close() _outputStream.close()
_cipherStatePair?.sender?.destroy() _cipherStatePair?.sender?.destroy()
_cipherStatePair?.receiver?.destroy() _cipherStatePair?.receiver?.destroy()
_thread = null
Logger.i(TAG, "Session closed") Logger.i(TAG, "Session closed")
} }
@ -434,10 +431,11 @@ class SyncSocketSession {
return return
} }
val channel = ChannelRelayed(this, _localKeyPair, publicKey, false) val channel = ChannelRelayed(this, _localKeyPair, publicKey, false)
_onNewChannel?.invoke(this, channel)
channel.connectionId = connectionId channel.connectionId = connectionId
_onNewChannel?.invoke(this, channel)
_channels[connectionId] = channel _channels[connectionId] = channel
channel.sendResponseTransport(remoteVersion, requestId, channelHandshakeMessage) channel.sendResponseTransport(remoteVersion, requestId, channelHandshakeMessage)
_onChannelEstablished?.invoke(this, channel, true)
} }
else -> Logger.w(TAG, "Unhandled request opcode: $subOpcode") else -> Logger.w(TAG, "Unhandled request opcode: $subOpcode")
} }
@ -483,6 +481,7 @@ class SyncSocketSession {
channel.handleTransportRelayed(remoteVersion, connectionId, handshakeMessage) channel.handleTransportRelayed(remoteVersion, connectionId, handshakeMessage)
_channels[connectionId] = channel _channels[connectionId] = channel
tcs.complete(channel) tcs.complete(channel)
_onChannelEstablished?.invoke(this, channel, false)
} ?: Logger.e(TAG, "No pending channel for requestId $requestId") } ?: Logger.e(TAG, "No pending channel for requestId $requestId")
} else { } else {
_pendingChannels.remove(requestId)?.let { (channel, tcs) -> _pendingChannels.remove(requestId)?.let { (channel, tcs) ->
@ -656,7 +655,12 @@ class SyncSocketSession {
private fun handleNotify(subOpcode: UByte, data: ByteBuffer, sourceChannel: ChannelRelayed?) { private fun handleNotify(subOpcode: UByte, data: ByteBuffer, sourceChannel: ChannelRelayed?) {
when (subOpcode) { when (subOpcode) {
NotifyOpcode.AUTHORIZED.value, NotifyOpcode.UNAUTHORIZED.value -> _onData?.invoke(this, Opcode.NOTIFY.value, subOpcode, data) NotifyOpcode.AUTHORIZED.value, NotifyOpcode.UNAUTHORIZED.value -> {
if (sourceChannel != null)
sourceChannel.invokeDataHandler(Opcode.NOTIFY.value, subOpcode, data)
else
_onData?.invoke(this, Opcode.NOTIFY.value, subOpcode, data)
}
NotifyOpcode.CONNECTION_INFO.value -> { /* Handle connection info if needed */ } NotifyOpcode.CONNECTION_INFO.value -> { /* Handle connection info if needed */ }
} }
} }
@ -829,10 +833,6 @@ class SyncSocketSession {
} }
} }
} }
if (authorizable?.isAuthorized != true) {
return
}
} }
suspend fun requestConnectionInfo(publicKey: String): ConnectionInfo? { suspend fun requestConnectionInfo(publicKey: String): ConnectionInfo? {