mirror of
https://gitlab.futo.org/videostreaming/grayjay.git
synced 2025-09-27 11:49:03 +00:00
Added separate error status code for transport rejection. Added unhandled exception handler for relay loop. Added additional booleans to keep track of the server/relay connections being up/down. Added additional messaging to let the user know when something is wrong.
This commit is contained in:
parent
5d44f0f2b6
commit
94ab3da0e4
5 changed files with 198 additions and 134 deletions
|
@ -9,6 +9,7 @@ import android.widget.LinearLayout
|
|||
import androidx.appcompat.app.AppCompatActivity
|
||||
import androidx.lifecycle.lifecycleScope
|
||||
import com.futo.platformplayer.R
|
||||
import com.futo.platformplayer.UIDialogs
|
||||
import com.futo.platformplayer.logging.Logger
|
||||
import com.futo.platformplayer.setNavigationBarColorAndIcons
|
||||
import com.futo.platformplayer.states.StateApp
|
||||
|
@ -100,12 +101,18 @@ class SyncHomeActivity : AppCompatActivity() {
|
|||
}
|
||||
}
|
||||
|
||||
StateSync.instance.confirmStarted(this, {
|
||||
StateSync.instance.showFailedToBindDialogIfNecessary(this@SyncHomeActivity)
|
||||
}, {
|
||||
StateSync.instance.confirmStarted(this, onStarted = {
|
||||
if (StateSync.instance.syncService?.serverSocketFailedToStart == true) {
|
||||
UIDialogs.toast(this, "Server socket failed to start, is the port in use?", true)
|
||||
}
|
||||
if (StateSync.instance.syncService?.relayConnected == false) {
|
||||
UIDialogs.toast(this, "Not connected to relay, remote connections will work.", false)
|
||||
}
|
||||
if (StateSync.instance.syncService?.serverSocketStarted == false) {
|
||||
UIDialogs.toast(this, "Listener not started, local connections will not work.", false)
|
||||
}
|
||||
}, onNotStarted = {
|
||||
finish()
|
||||
}, {
|
||||
StateSync.instance.showFailedToBindDialogIfNecessary(this@SyncHomeActivity)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -412,24 +412,12 @@ class StateApp {
|
|||
}
|
||||
|
||||
if (Settings.instance.synchronization.enabled) {
|
||||
StateSync.instance.start(context, {
|
||||
try {
|
||||
UIDialogs.toast("Failed to start sync, port in use")
|
||||
} catch (e: Throwable) {
|
||||
//Ignored
|
||||
}
|
||||
})
|
||||
StateSync.instance.start(context)
|
||||
}
|
||||
|
||||
settingsActivityClosed.subscribe {
|
||||
if (Settings.instance.synchronization.enabled) {
|
||||
StateSync.instance.start(context, {
|
||||
try {
|
||||
UIDialogs.toast("Failed to start sync, port in use")
|
||||
} catch (e: Throwable) {
|
||||
//Ignored
|
||||
}
|
||||
})
|
||||
StateSync.instance.start(context)
|
||||
} else {
|
||||
StateSync.instance.stop()
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ class StateSync {
|
|||
val deviceRemoved: Event1<String> = Event1()
|
||||
val deviceUpdatedOrAdded: Event2<String, SyncSession> = Event2()
|
||||
|
||||
fun start(context: Context, onServerBindFail: () -> Unit) {
|
||||
fun start(context: Context) {
|
||||
if (syncService != null) {
|
||||
Logger.i(TAG, "Already started.")
|
||||
return
|
||||
|
@ -150,24 +150,14 @@ class StateSync {
|
|||
}
|
||||
}
|
||||
|
||||
syncService?.start(context, onServerBindFail)
|
||||
syncService?.start(context)
|
||||
}
|
||||
|
||||
fun showFailedToBindDialogIfNecessary(context: Context) {
|
||||
if (syncService?.serverSocketFailedToStart == true && Settings.instance.synchronization.localConnections) {
|
||||
try {
|
||||
UIDialogs.showDialogOk(context, R.drawable.ic_warning, "Local discovery unavailable, port was in use")
|
||||
} catch (e: Throwable) {
|
||||
//Ignored
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun confirmStarted(context: Context, onStarted: () -> Unit, onNotStarted: () -> Unit, onServerBindFail: () -> Unit) {
|
||||
fun confirmStarted(context: Context, onStarted: () -> Unit, onNotStarted: () -> Unit) {
|
||||
if (syncService == null) {
|
||||
UIDialogs.showConfirmationDialog(context, "Sync has not been enabled yet, would you like to enable sync?", {
|
||||
Settings.instance.synchronization.enabled = true
|
||||
start(context, onServerBindFail)
|
||||
start(context)
|
||||
Settings.instance.save()
|
||||
onStarted.invoke()
|
||||
}, {
|
||||
|
|
|
@ -72,6 +72,8 @@ class SyncService(
|
|||
private val _lastConnectTimesMdns: MutableMap<String, Long> = mutableMapOf()
|
||||
private val _lastConnectTimesIp: MutableMap<String, Long> = mutableMapOf()
|
||||
var serverSocketFailedToStart = false
|
||||
var serverSocketStarted = false
|
||||
var relayConnected = false
|
||||
//TODO: Should sync mdns and casting mdns be merged?
|
||||
//TODO: Decrease interval that devices are updated
|
||||
//TODO: Send less data
|
||||
|
@ -212,7 +214,7 @@ class SyncService(
|
|||
var onData: ((SyncSession, UByte, UByte, ByteBuffer) -> Unit)? = null
|
||||
var authorizePrompt: ((String, (Boolean) -> Unit) -> Unit)? = null
|
||||
|
||||
fun start(context: Context, onServerBindFail: (() -> Unit)? = null) {
|
||||
fun start(context: Context) {
|
||||
if (_started) {
|
||||
Logger.i(TAG, "Already started.")
|
||||
return
|
||||
|
@ -273,10 +275,12 @@ class SyncService(
|
|||
|
||||
Logger.i(TAG, "Sync key pair initialized (public key = $publicKey)")
|
||||
|
||||
serverSocketStarted = false
|
||||
if (settings.bindListener) {
|
||||
startListener(onServerBindFail)
|
||||
startListener()
|
||||
}
|
||||
|
||||
relayConnected = false
|
||||
if (settings.relayEnabled) {
|
||||
startRelayLoop()
|
||||
}
|
||||
|
@ -286,13 +290,15 @@ class SyncService(
|
|||
}
|
||||
}
|
||||
|
||||
private fun startListener(onServerBindFail: (() -> Unit)? = null) {
|
||||
private fun startListener() {
|
||||
serverSocketFailedToStart = false
|
||||
serverSocketStarted = false
|
||||
_thread = Thread {
|
||||
try {
|
||||
val serverSocket = ServerSocket(settings.listenerPort)
|
||||
_serverSocket = serverSocket
|
||||
|
||||
serverSocketStarted = true
|
||||
Log.i(TAG, "Running on port ${settings.listenerPort} (TCP)")
|
||||
|
||||
while (_started) {
|
||||
|
@ -300,10 +306,12 @@ class SyncService(
|
|||
val session = createSocketSession(socket, true)
|
||||
session.startAsResponder()
|
||||
}
|
||||
|
||||
serverSocketStarted = false
|
||||
} catch (e: Throwable) {
|
||||
Logger.e(TAG, "Failed to bind server socket to port ${settings.listenerPort}", e)
|
||||
serverSocketFailedToStart = true
|
||||
onServerBindFail?.invoke()
|
||||
serverSocketStarted = false
|
||||
}
|
||||
}.apply { start() }
|
||||
}
|
||||
|
@ -386,121 +394,192 @@ class SyncService(
|
|||
}
|
||||
|
||||
private fun startRelayLoop() {
|
||||
relayConnected = false
|
||||
_threadRelay = Thread {
|
||||
var backoffs: Array<Long> = arrayOf(1000, 5000, 10000, 20000)
|
||||
var backoffIndex = 0;
|
||||
try {
|
||||
var backoffs: Array<Long> = arrayOf(1000, 5000, 10000, 20000)
|
||||
var backoffIndex = 0;
|
||||
|
||||
while (_started) {
|
||||
try {
|
||||
Log.i(TAG, "Starting relay session...")
|
||||
while (_started) {
|
||||
try {
|
||||
Log.i(TAG, "Starting relay session...")
|
||||
relayConnected = false
|
||||
|
||||
var socketClosed = false;
|
||||
val socket = Socket(relayServer, 9000)
|
||||
_relaySession = SyncSocketSession(
|
||||
(socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!,
|
||||
keyPair!!,
|
||||
socket,
|
||||
isHandshakeAllowed = { linkType, syncSocketSession, publicKey, pairingCode, appId -> isHandshakeAllowed(linkType, syncSocketSession, publicKey, pairingCode, appId) },
|
||||
onNewChannel = { _, c ->
|
||||
val remotePublicKey = c.remotePublicKey
|
||||
if (remotePublicKey == null) {
|
||||
Log.e(TAG, "Remote public key should never be null in onNewChannel.")
|
||||
return@SyncSocketSession
|
||||
}
|
||||
|
||||
Log.i(TAG, "New channel established from relay (pk: '$remotePublicKey').")
|
||||
|
||||
var session: SyncSession?
|
||||
synchronized(_sessions) {
|
||||
session = _sessions[remotePublicKey]
|
||||
if (session == null) {
|
||||
val remoteDeviceName = database.getDeviceName(remotePublicKey)
|
||||
session = createNewSyncSession(remotePublicKey, remoteDeviceName)
|
||||
_sessions[remotePublicKey] = session!!
|
||||
var socketClosed = false;
|
||||
val socket = Socket(relayServer, 9000)
|
||||
_relaySession = SyncSocketSession(
|
||||
(socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!,
|
||||
keyPair!!,
|
||||
socket,
|
||||
isHandshakeAllowed = { linkType, syncSocketSession, publicKey, pairingCode, appId ->
|
||||
isHandshakeAllowed(
|
||||
linkType,
|
||||
syncSocketSession,
|
||||
publicKey,
|
||||
pairingCode,
|
||||
appId
|
||||
)
|
||||
},
|
||||
onNewChannel = { _, c ->
|
||||
val remotePublicKey = c.remotePublicKey
|
||||
if (remotePublicKey == null) {
|
||||
Log.e(
|
||||
TAG,
|
||||
"Remote public key should never be null in onNewChannel."
|
||||
)
|
||||
return@SyncSocketSession
|
||||
}
|
||||
session!!.addChannel(c)
|
||||
}
|
||||
|
||||
c.setDataHandler { _, channel, opcode, subOpcode, data ->
|
||||
session?.handlePacket(opcode, subOpcode, data)
|
||||
}
|
||||
c.setCloseHandler { channel ->
|
||||
session?.removeChannel(channel)
|
||||
}
|
||||
},
|
||||
onChannelEstablished = { _, channel, isResponder ->
|
||||
handleAuthorization(channel, isResponder)
|
||||
},
|
||||
onClose = { socketClosed = true },
|
||||
onHandshakeComplete = { relaySession ->
|
||||
backoffIndex = 0
|
||||
Log.i(
|
||||
TAG,
|
||||
"New channel established from relay (pk: '$remotePublicKey')."
|
||||
)
|
||||
|
||||
Thread {
|
||||
try {
|
||||
while (_started && !socketClosed) {
|
||||
val unconnectedAuthorizedDevices = database.getAllAuthorizedDevices()?.filter { !isConnected(it) }?.toTypedArray() ?: arrayOf()
|
||||
relaySession.publishConnectionInformation(unconnectedAuthorizedDevices, settings.listenerPort, settings.relayConnectDirect, false, false, settings.relayConnectRelayed)
|
||||
var session: SyncSession?
|
||||
synchronized(_sessions) {
|
||||
session = _sessions[remotePublicKey]
|
||||
if (session == null) {
|
||||
val remoteDeviceName =
|
||||
database.getDeviceName(remotePublicKey)
|
||||
session =
|
||||
createNewSyncSession(remotePublicKey, remoteDeviceName)
|
||||
_sessions[remotePublicKey] = session!!
|
||||
}
|
||||
session!!.addChannel(c)
|
||||
}
|
||||
|
||||
Logger.v(TAG, "Requesting ${unconnectedAuthorizedDevices.size} devices connection information")
|
||||
val connectionInfos = runBlocking { relaySession.requestBulkConnectionInfo(unconnectedAuthorizedDevices) }
|
||||
Logger.v(TAG, "Received ${connectionInfos.size} devices connection information")
|
||||
c.setDataHandler { _, channel, opcode, subOpcode, data ->
|
||||
session?.handlePacket(opcode, subOpcode, data)
|
||||
}
|
||||
c.setCloseHandler { channel ->
|
||||
session?.removeChannel(channel)
|
||||
}
|
||||
},
|
||||
onChannelEstablished = { _, channel, isResponder ->
|
||||
handleAuthorization(channel, isResponder)
|
||||
},
|
||||
onClose = { socketClosed = true },
|
||||
onHandshakeComplete = { relaySession ->
|
||||
backoffIndex = 0
|
||||
|
||||
for ((targetKey, connectionInfo) in connectionInfos) {
|
||||
val potentialLocalAddresses = connectionInfo.ipv4Addresses
|
||||
.filter { it != connectionInfo.remoteIp }
|
||||
if (connectionInfo.allowLocalDirect && Settings.instance.synchronization.connectLocalDirectThroughRelay) {
|
||||
Thread {
|
||||
Thread {
|
||||
try {
|
||||
while (_started && !socketClosed) {
|
||||
val unconnectedAuthorizedDevices =
|
||||
database.getAllAuthorizedDevices()
|
||||
?.filter { !isConnected(it) }?.toTypedArray()
|
||||
?: arrayOf()
|
||||
relaySession.publishConnectionInformation(
|
||||
unconnectedAuthorizedDevices,
|
||||
settings.listenerPort,
|
||||
settings.relayConnectDirect,
|
||||
false,
|
||||
false,
|
||||
settings.relayConnectRelayed
|
||||
)
|
||||
|
||||
Logger.v(
|
||||
TAG,
|
||||
"Requesting ${unconnectedAuthorizedDevices.size} devices connection information"
|
||||
)
|
||||
val connectionInfos = runBlocking {
|
||||
relaySession.requestBulkConnectionInfo(
|
||||
unconnectedAuthorizedDevices
|
||||
)
|
||||
}
|
||||
Logger.v(
|
||||
TAG,
|
||||
"Received ${connectionInfos.size} devices connection information"
|
||||
)
|
||||
|
||||
for ((targetKey, connectionInfo) in connectionInfos) {
|
||||
val potentialLocalAddresses =
|
||||
connectionInfo.ipv4Addresses
|
||||
.filter { it != connectionInfo.remoteIp }
|
||||
if (connectionInfo.allowLocalDirect && Settings.instance.synchronization.connectLocalDirectThroughRelay) {
|
||||
Thread {
|
||||
try {
|
||||
Log.v(
|
||||
TAG,
|
||||
"Attempting to connect directly, locally to '$targetKey'."
|
||||
)
|
||||
connect(
|
||||
potentialLocalAddresses.map { it }
|
||||
.toTypedArray(),
|
||||
settings.listenerPort,
|
||||
targetKey,
|
||||
null
|
||||
)
|
||||
} catch (e: Throwable) {
|
||||
Log.e(
|
||||
TAG,
|
||||
"Failed to start direct connection using connection info with $targetKey.",
|
||||
e
|
||||
)
|
||||
}
|
||||
}.start()
|
||||
}
|
||||
|
||||
if (connectionInfo.allowRemoteDirect) {
|
||||
// TODO: Implement direct remote connection if needed
|
||||
}
|
||||
|
||||
if (connectionInfo.allowRemoteHolePunched) {
|
||||
// TODO: Implement hole punching if needed
|
||||
}
|
||||
|
||||
if (connectionInfo.allowRemoteRelayed && Settings.instance.synchronization.connectThroughRelay) {
|
||||
try {
|
||||
Log.v(TAG, "Attempting to connect directly, locally to '$targetKey'.")
|
||||
connect(potentialLocalAddresses.map { it }.toTypedArray(), settings.listenerPort, targetKey, null)
|
||||
Logger.v(
|
||||
TAG,
|
||||
"Attempting relayed connection with '$targetKey'."
|
||||
)
|
||||
runBlocking {
|
||||
relaySession.startRelayedChannel(
|
||||
targetKey,
|
||||
appId,
|
||||
null
|
||||
)
|
||||
}
|
||||
} catch (e: Throwable) {
|
||||
Log.e(TAG, "Failed to start direct connection using connection info with $targetKey.", e)
|
||||
Logger.e(
|
||||
TAG,
|
||||
"Failed to start relayed channel with $targetKey.",
|
||||
e
|
||||
)
|
||||
}
|
||||
}.start()
|
||||
}
|
||||
|
||||
if (connectionInfo.allowRemoteDirect) {
|
||||
// TODO: Implement direct remote connection if needed
|
||||
}
|
||||
|
||||
if (connectionInfo.allowRemoteHolePunched) {
|
||||
// TODO: Implement hole punching if needed
|
||||
}
|
||||
|
||||
if (connectionInfo.allowRemoteRelayed && Settings.instance.synchronization.connectThroughRelay) {
|
||||
try {
|
||||
Logger.v(TAG, "Attempting relayed connection with '$targetKey'.")
|
||||
runBlocking { relaySession.startRelayedChannel(targetKey, appId, null) }
|
||||
} catch (e: Throwable) {
|
||||
Logger.e(TAG, "Failed to start relayed channel with $targetKey.", e)
|
||||
}
|
||||
}
|
||||
|
||||
Thread.sleep(15000)
|
||||
}
|
||||
|
||||
Thread.sleep(15000)
|
||||
} catch (e: Throwable) {
|
||||
Logger.e(TAG, "Unhandled exception in relay session.", e)
|
||||
relaySession.stop()
|
||||
}
|
||||
} catch (e: Throwable) {
|
||||
Logger.e(TAG, "Unhandled exception in relay session.", e)
|
||||
relaySession.stop()
|
||||
}
|
||||
}.start()
|
||||
}.start()
|
||||
}
|
||||
)
|
||||
|
||||
_relaySession!!.authorizable = object : IAuthorizable {
|
||||
override val isAuthorized: Boolean get() = true
|
||||
}
|
||||
)
|
||||
|
||||
_relaySession!!.authorizable = object : IAuthorizable {
|
||||
override val isAuthorized: Boolean get() = true
|
||||
relayConnected = true
|
||||
_relaySession!!.runAsInitiator(relayPublicKey, appId, null)
|
||||
|
||||
Log.i(TAG, "Started relay session.")
|
||||
} catch (e: Throwable) {
|
||||
Log.e(TAG, "Relay session failed.", e)
|
||||
} finally {
|
||||
relayConnected = false
|
||||
_relaySession?.stop()
|
||||
_relaySession = null
|
||||
Thread.sleep(backoffs[min(backoffs.size - 1, backoffIndex++)])
|
||||
}
|
||||
|
||||
_relaySession!!.runAsInitiator(relayPublicKey, appId, null)
|
||||
|
||||
Log.i(TAG, "Started relay session.")
|
||||
} catch (e: Throwable) {
|
||||
Log.e(TAG, "Relay session failed.", e)
|
||||
} finally {
|
||||
_relaySession?.stop()
|
||||
_relaySession = null
|
||||
Thread.sleep(backoffs[min(backoffs.size - 1, backoffIndex++)])
|
||||
}
|
||||
} catch (ex: Throwable) {
|
||||
Log.i(TAG, "Unhandled exception in relay loop.", ex)
|
||||
}
|
||||
}.apply { start() }
|
||||
}
|
||||
|
|
|
@ -529,7 +529,7 @@ class SyncSocketSession {
|
|||
val isAllowed = publicKey != _localPublicKey && (_isHandshakeAllowed?.invoke(LinkType.Relayed, this, publicKey, pairingCode, appId) ?: true)
|
||||
if (!isAllowed) {
|
||||
val rp = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN)
|
||||
rp.putInt(2) // Status code for not allowed
|
||||
rp.putInt(7) // Status code for not allowed
|
||||
rp.putLong(connectionId)
|
||||
rp.putInt(requestId)
|
||||
rp.rewind()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue