mirror of
https://gitlab.futo.org/videostreaming/grayjay.git
synced 2025-09-26 11:19:05 +00:00
Migrated service to SyncService.
This commit is contained in:
parent
1eb62b31d2
commit
705eb6a3fa
9 changed files with 869 additions and 788 deletions
|
@ -110,7 +110,7 @@ class SyncPairActivity : AppCompatActivity() {
|
|||
|
||||
lifecycleScope.launch(Dispatchers.IO) {
|
||||
try {
|
||||
StateSync.instance.connect(deviceInfo) { complete, message ->
|
||||
StateSync.instance.syncService?.connect(deviceInfo) { complete, message ->
|
||||
lifecycleScope.launch(Dispatchers.Main) {
|
||||
if (complete != null) {
|
||||
if (complete) {
|
||||
|
|
|
@ -67,11 +67,18 @@ class SyncShowPairingCodeActivity : AppCompatActivity() {
|
|||
}
|
||||
|
||||
val ips = getIPs()
|
||||
val selfDeviceInfo = SyncDeviceInfo(StateSync.instance.publicKey!!, ips.toTypedArray(), StateSync.PORT, StateSync.instance.pairingCode)
|
||||
val json = Json.encodeToString(selfDeviceInfo)
|
||||
val base64 = Base64.encodeToString(json.toByteArray(), Base64.URL_SAFE or Base64.NO_PADDING or Base64.NO_WRAP)
|
||||
val url = "grayjay://sync/${base64}"
|
||||
setCode(url)
|
||||
val publicKey = StateSync.instance.syncService?.publicKey
|
||||
val pairingCode = StateSync.instance.syncService?.pairingCode
|
||||
if (publicKey == null || pairingCode == null) {
|
||||
setCode("Public key or pairing code was not known, is sync enabled?")
|
||||
} else {
|
||||
val selfDeviceInfo = SyncDeviceInfo(publicKey, ips.toTypedArray(), StateSync.PORT, pairingCode)
|
||||
val json = Json.encodeToString(selfDeviceInfo)
|
||||
val base64 = Base64.encodeToString(json.toByteArray(), Base64.URL_SAFE or Base64.NO_PADDING or Base64.NO_WRAP)
|
||||
val url = "grayjay://sync/${base64}"
|
||||
setCode(url)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
fun setCode(code: String?) {
|
||||
|
|
|
@ -45,8 +45,6 @@ import com.futo.platformplayer.logging.Logger
|
|||
import com.futo.platformplayer.models.CastingDeviceInfo
|
||||
import com.futo.platformplayer.parsers.HLS
|
||||
import com.futo.platformplayer.states.StateApp
|
||||
import com.futo.platformplayer.states.StateSync
|
||||
import com.futo.platformplayer.states.StateSync.Companion
|
||||
import com.futo.platformplayer.stores.CastingDeviceInfoStorage
|
||||
import com.futo.platformplayer.stores.FragmentedStorage
|
||||
import com.futo.platformplayer.toUrlAddress
|
||||
|
|
|
@ -58,7 +58,6 @@ import kotlinx.coroutines.coroutineScope
|
|||
import kotlinx.coroutines.suspendCancellableCoroutine
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlinx.serialization.Contextual
|
||||
import kotlinx.serialization.InternalSerializationApi
|
||||
import kotlinx.serialization.Transient
|
||||
import java.io.File
|
||||
import java.io.FileOutputStream
|
||||
|
@ -73,7 +72,6 @@ import java.util.concurrent.ThreadLocalRandom
|
|||
import kotlin.coroutines.resumeWithException
|
||||
import kotlin.time.times
|
||||
|
||||
@InternalSerializationApi
|
||||
@kotlinx.serialization.Serializable
|
||||
class VideoDownload {
|
||||
var state: State = State.QUEUED;
|
||||
|
|
|
@ -47,7 +47,6 @@ import com.futo.platformplayer.Settings
|
|||
import com.futo.platformplayer.UIDialogs
|
||||
import com.futo.platformplayer.UISlideOverlays
|
||||
import com.futo.platformplayer.activities.MainActivity
|
||||
import com.futo.platformplayer.activities.SettingsActivity
|
||||
import com.futo.platformplayer.api.media.IPluginSourced
|
||||
import com.futo.platformplayer.api.media.LiveChatManager
|
||||
import com.futo.platformplayer.api.media.PlatformID
|
||||
|
@ -150,7 +149,6 @@ import com.futo.platformplayer.views.pills.PillRatingLikesDislikes
|
|||
import com.futo.platformplayer.views.pills.RoundButton
|
||||
import com.futo.platformplayer.views.pills.RoundButtonGroup
|
||||
import com.futo.platformplayer.views.platform.PlatformIndicator
|
||||
import com.futo.platformplayer.views.segments.ChaptersList
|
||||
import com.futo.platformplayer.views.segments.CommentsList
|
||||
import com.futo.platformplayer.views.subscriptions.SubscribeButton
|
||||
import com.futo.platformplayer.views.video.FutoVideoPlayer
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -73,12 +73,12 @@ class ChannelRelayed(
|
|||
private val sendLock = Object()
|
||||
private val decryptLock = Object()
|
||||
private var handshakeState: HandshakeState? = if (initiator) {
|
||||
HandshakeState(StateSync.protocolName, HandshakeState.INITIATOR).apply {
|
||||
HandshakeState(SyncService.protocolName, HandshakeState.INITIATOR).apply {
|
||||
localKeyPair.copyFrom(this@ChannelRelayed.localKeyPair)
|
||||
remotePublicKey.setPublicKey(Base64.getDecoder().decode(publicKey), 0)
|
||||
}
|
||||
} else {
|
||||
HandshakeState(StateSync.protocolName, HandshakeState.RESPONDER).apply {
|
||||
HandshakeState(SyncService.protocolName, HandshakeState.RESPONDER).apply {
|
||||
localKeyPair.copyFrom(this@ChannelRelayed.localKeyPair)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,721 @@
|
|||
package com.futo.platformplayer.sync.internal
|
||||
|
||||
import android.content.Context
|
||||
import android.net.nsd.NsdManager
|
||||
import android.net.nsd.NsdServiceInfo
|
||||
import android.os.Build
|
||||
import android.util.Log
|
||||
import com.futo.platformplayer.Settings
|
||||
import com.futo.platformplayer.generateReadablePassword
|
||||
import com.futo.platformplayer.getConnectedSocket
|
||||
import com.futo.platformplayer.logging.Logger
|
||||
import com.futo.platformplayer.noise.protocol.DHState
|
||||
import com.futo.platformplayer.noise.protocol.Noise
|
||||
import com.futo.platformplayer.states.StateSync
|
||||
import com.futo.polycentric.core.base64ToByteArray
|
||||
import com.futo.polycentric.core.toBase64
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import java.net.InetAddress
|
||||
import java.net.InetSocketAddress
|
||||
import java.net.ServerSocket
|
||||
import java.net.Socket
|
||||
import java.nio.ByteBuffer
|
||||
import java.util.Base64
|
||||
import java.util.Locale
|
||||
import kotlin.math.min
|
||||
|
||||
public data class SyncServiceSettings(
|
||||
val listenerPort: Int = 12315,
|
||||
val mdnsBroadcast: Boolean = true,
|
||||
val mdnsConnectDiscovered: Boolean = true,
|
||||
val bindListener: Boolean = true,
|
||||
val connectLastKnown: Boolean = true,
|
||||
val relayHandshakeAllowed: Boolean = true,
|
||||
val relayPairAllowed: Boolean = true,
|
||||
val relayEnabled: Boolean = true,
|
||||
val relayConnectDirect: Boolean = true,
|
||||
val relayConnectRelayed: Boolean = true
|
||||
)
|
||||
|
||||
interface ISyncDatabaseProvider {
|
||||
fun isAuthorized(publicKey: String): Boolean
|
||||
fun addAuthorizedDevice(publicKey: String)
|
||||
fun removeAuthorizedDevice(publicKey: String)
|
||||
fun getAllAuthorizedDevices(): Array<String>?
|
||||
fun getAuthorizedDeviceCount(): Int
|
||||
fun getSyncKeyPair(): SyncKeyPair?
|
||||
fun setSyncKeyPair(value: SyncKeyPair)
|
||||
fun getLastAddress(publicKey: String): String?
|
||||
fun setLastAddress(publicKey: String, address: String)
|
||||
fun getDeviceName(publicKey: String): String?
|
||||
fun setDeviceName(publicKey: String, name: String)
|
||||
}
|
||||
|
||||
class SyncService(
|
||||
private val serviceName: String,
|
||||
private val relayServer: String,
|
||||
private val relayPublicKey: String,
|
||||
private val appId: UInt,
|
||||
private val database: ISyncDatabaseProvider,
|
||||
private val settings: SyncServiceSettings = SyncServiceSettings()
|
||||
) {
|
||||
private var _serverSocket: ServerSocket? = null
|
||||
private var _thread: Thread? = null
|
||||
private var _connectThread: Thread? = null
|
||||
@Volatile private var _started = false
|
||||
private val _sessions: MutableMap<String, SyncSession> = mutableMapOf()
|
||||
private val _lastConnectTimesMdns: MutableMap<String, Long> = mutableMapOf()
|
||||
private val _lastConnectTimesIp: MutableMap<String, Long> = mutableMapOf()
|
||||
var serverSocketFailedToStart = false
|
||||
//TODO: Should sync mdns and casting mdns be merged?
|
||||
//TODO: Decrease interval that devices are updated
|
||||
//TODO: Send less data
|
||||
|
||||
private val _pairingCode: String? = generateReadablePassword(8)
|
||||
val pairingCode: String? get() = _pairingCode
|
||||
private var _relaySession: SyncSocketSession? = null
|
||||
private var _threadRelay: Thread? = null
|
||||
private val _remotePendingStatusUpdate = mutableMapOf<String, (complete: Boolean?, message: String) -> Unit>()
|
||||
private var _nsdManager: NsdManager? = null
|
||||
private var _scope: CoroutineScope? = null
|
||||
private var _discoveryListener: NsdManager.DiscoveryListener = object : NsdManager.DiscoveryListener {
|
||||
override fun onDiscoveryStarted(regType: String) {
|
||||
Log.d(TAG, "Service discovery started for $regType")
|
||||
}
|
||||
|
||||
override fun onDiscoveryStopped(serviceType: String) {
|
||||
Log.i(TAG, "Discovery stopped: $serviceType")
|
||||
}
|
||||
|
||||
override fun onServiceLost(service: NsdServiceInfo) {
|
||||
Log.e(TAG, "service lost: $service")
|
||||
// TODO: Handle service lost, e.g., remove device
|
||||
}
|
||||
|
||||
override fun onStartDiscoveryFailed(serviceType: String, errorCode: Int) {
|
||||
Log.e(TAG, "Discovery failed for $serviceType: Error code:$errorCode")
|
||||
try {
|
||||
_nsdManager?.stopServiceDiscovery(this)
|
||||
} catch (e: Throwable) {
|
||||
Logger.w(TAG, "Failed to stop service discovery", e)
|
||||
}
|
||||
}
|
||||
|
||||
override fun onStopDiscoveryFailed(serviceType: String, errorCode: Int) {
|
||||
Log.e(TAG, "Stop discovery failed for $serviceType: Error code:$errorCode")
|
||||
try {
|
||||
_nsdManager?.stopServiceDiscovery(this)
|
||||
} catch (e: Throwable) {
|
||||
Logger.w(TAG, "Failed to stop service discovery", e)
|
||||
}
|
||||
}
|
||||
|
||||
fun addOrUpdate(name: String, adrs: Array<InetAddress>, port: Int, attributes: Map<String, ByteArray>) {
|
||||
if (!Settings.instance.synchronization.connectDiscovered) {
|
||||
return
|
||||
}
|
||||
|
||||
val urlSafePkey = attributes.get("pk")?.decodeToString() ?: return
|
||||
val pkey = Base64.getEncoder().encodeToString(Base64.getDecoder().decode(urlSafePkey.replace('-', '+').replace('_', '/')))
|
||||
val syncDeviceInfo = SyncDeviceInfo(pkey, adrs.map { it.hostAddress }.toTypedArray(), port, null)
|
||||
val authorized = isAuthorized(pkey)
|
||||
|
||||
if (authorized && !isConnected(pkey)) {
|
||||
val now = System.currentTimeMillis()
|
||||
val lastConnectTime = synchronized(_lastConnectTimesMdns) {
|
||||
_lastConnectTimesMdns[pkey] ?: 0
|
||||
}
|
||||
|
||||
//Connect once every 30 seconds, max
|
||||
if (now - lastConnectTime > 30000) {
|
||||
synchronized(_lastConnectTimesMdns) {
|
||||
_lastConnectTimesMdns[pkey] = now
|
||||
}
|
||||
|
||||
Logger.i(TAG, "Found device authorized device '${name}' with pkey=$pkey, attempting to connect")
|
||||
|
||||
try {
|
||||
connect(syncDeviceInfo)
|
||||
} catch (e: Throwable) {
|
||||
Logger.i(TAG, "Failed to connect to $pkey", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun onServiceFound(service: NsdServiceInfo) {
|
||||
Log.v(TAG, "Service discovery success for ${service.serviceType}: $service")
|
||||
addOrUpdate(service.serviceName, if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.UPSIDE_DOWN_CAKE) {
|
||||
service.hostAddresses.toTypedArray()
|
||||
} else {
|
||||
if(service.host != null)
|
||||
arrayOf(service.host);
|
||||
else
|
||||
arrayOf();
|
||||
}, service.port, service.attributes)
|
||||
|
||||
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.UPSIDE_DOWN_CAKE) {
|
||||
_nsdManager?.registerServiceInfoCallback(service, { it.run() }, object : NsdManager.ServiceInfoCallback {
|
||||
override fun onServiceUpdated(serviceInfo: NsdServiceInfo) {
|
||||
Log.v(TAG, "onServiceUpdated: $serviceInfo")
|
||||
addOrUpdate(serviceInfo.serviceName, serviceInfo.hostAddresses.toTypedArray(), serviceInfo.port, serviceInfo.attributes)
|
||||
}
|
||||
|
||||
override fun onServiceLost() {
|
||||
Log.v(TAG, "onServiceLost: $service")
|
||||
// TODO: Handle service lost
|
||||
}
|
||||
|
||||
override fun onServiceInfoCallbackRegistrationFailed(errorCode: Int) {
|
||||
Log.v(TAG, "onServiceInfoCallbackRegistrationFailed: $errorCode")
|
||||
}
|
||||
|
||||
override fun onServiceInfoCallbackUnregistered() {
|
||||
Log.v(TAG, "onServiceInfoCallbackUnregistered")
|
||||
}
|
||||
})
|
||||
} else {
|
||||
_nsdManager?.resolveService(service, object : NsdManager.ResolveListener {
|
||||
override fun onResolveFailed(serviceInfo: NsdServiceInfo, errorCode: Int) {
|
||||
Log.v(TAG, "Resolve failed: $errorCode")
|
||||
}
|
||||
|
||||
override fun onServiceResolved(serviceInfo: NsdServiceInfo) {
|
||||
Log.v(TAG, "Resolve Succeeded: $serviceInfo")
|
||||
addOrUpdate(serviceInfo.serviceName, arrayOf(serviceInfo.host), serviceInfo.port, serviceInfo.attributes)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val _registrationListener = object : NsdManager.RegistrationListener {
|
||||
override fun onServiceRegistered(serviceInfo: NsdServiceInfo) {
|
||||
Log.v(TAG, "onServiceRegistered: ${serviceInfo.serviceName}")
|
||||
}
|
||||
|
||||
override fun onRegistrationFailed(serviceInfo: NsdServiceInfo, errorCode: Int) {
|
||||
Log.v(TAG, "onRegistrationFailed: ${serviceInfo.serviceName} (error code: $errorCode)")
|
||||
}
|
||||
|
||||
override fun onServiceUnregistered(serviceInfo: NsdServiceInfo) {
|
||||
Log.v(TAG, "onServiceUnregistered: ${serviceInfo.serviceName}")
|
||||
}
|
||||
|
||||
override fun onUnregistrationFailed(serviceInfo: NsdServiceInfo, errorCode: Int) {
|
||||
Log.v(TAG, "onUnregistrationFailed: ${serviceInfo.serviceName} (error code: $errorCode)")
|
||||
}
|
||||
}
|
||||
|
||||
var keyPair: DHState? = null
|
||||
var publicKey: String? = null
|
||||
|
||||
var onAuthorized: ((SyncSession, Boolean, Boolean) -> Unit)? = null
|
||||
var onUnauthorized: ((SyncSession) -> Unit)? = null
|
||||
var onConnectedChanged: ((SyncSession, Boolean) -> Unit)? = null
|
||||
var onClose: ((SyncSession) -> Unit)? = null
|
||||
var onData: ((SyncSession, UByte, UByte, ByteBuffer) -> Unit)? = null
|
||||
var authorizePrompt: ((String, (Boolean) -> Unit) -> Unit)? = null
|
||||
|
||||
fun start(context: Context, onServerBindFail: (() -> Unit)? = null) {
|
||||
if (_started) {
|
||||
Logger.i(TAG, "Already started.")
|
||||
return
|
||||
}
|
||||
_started = true
|
||||
_scope = CoroutineScope(Dispatchers.IO)
|
||||
|
||||
try {
|
||||
val syncKeyPair = database.getSyncKeyPair() ?: throw Exception("SyncKeyPair not found")
|
||||
val p = Noise.createDH(dh)
|
||||
p.setPublicKey(syncKeyPair.publicKey.base64ToByteArray(), 0)
|
||||
p.setPrivateKey(syncKeyPair.privateKey.base64ToByteArray(), 0)
|
||||
keyPair = p
|
||||
} catch (e: Throwable) {
|
||||
//Sync key pair non-existing, invalid or lost
|
||||
val p = Noise.createDH(dh)
|
||||
p.generateKeyPair()
|
||||
|
||||
val publicKey = ByteArray(p.publicKeyLength)
|
||||
p.getPublicKey(publicKey, 0)
|
||||
val privateKey = ByteArray(p.privateKeyLength)
|
||||
p.getPrivateKey(privateKey, 0)
|
||||
|
||||
val syncKeyPair = SyncKeyPair(1, publicKey.toBase64(), privateKey.toBase64())
|
||||
database.setSyncKeyPair(syncKeyPair)
|
||||
|
||||
Logger.e(TAG, "Failed to load existing key pair", e)
|
||||
keyPair = p
|
||||
}
|
||||
|
||||
publicKey = keyPair?.let {
|
||||
val pkey = ByteArray(it.publicKeyLength)
|
||||
it.getPublicKey(pkey, 0)
|
||||
return@let pkey.toBase64()
|
||||
}
|
||||
|
||||
_nsdManager = context.getSystemService(Context.NSD_SERVICE) as NsdManager
|
||||
if (settings.mdnsConnectDiscovered) {
|
||||
_nsdManager?.apply {
|
||||
discoverServices(serviceName, NsdManager.PROTOCOL_DNS_SD, _discoveryListener)
|
||||
}
|
||||
}
|
||||
|
||||
if (settings.mdnsBroadcast) {
|
||||
val pk = publicKey
|
||||
val nsdManager = _nsdManager
|
||||
|
||||
if (pk != null && nsdManager != null) {
|
||||
val sn = serviceName
|
||||
val serviceInfo = NsdServiceInfo().apply {
|
||||
serviceName = getDeviceName()
|
||||
serviceType = sn
|
||||
port = settings.listenerPort
|
||||
setAttribute("pk", pk.replace('+', '-').replace('/', '_').replace("=", ""))
|
||||
}
|
||||
|
||||
nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD, _registrationListener)
|
||||
}
|
||||
}
|
||||
|
||||
Logger.i(TAG, "Sync key pair initialized (public key = $publicKey)")
|
||||
|
||||
if (settings.bindListener) {
|
||||
startListener(onServerBindFail)
|
||||
}
|
||||
|
||||
if (settings.relayEnabled) {
|
||||
startRelayLoop()
|
||||
}
|
||||
|
||||
if (settings.connectLastKnown) {
|
||||
startConnectLastLoop()
|
||||
}
|
||||
}
|
||||
|
||||
private fun startListener(onServerBindFail: (() -> Unit)? = null) {
|
||||
serverSocketFailedToStart = false
|
||||
_thread = Thread {
|
||||
try {
|
||||
val serverSocket = ServerSocket(settings.listenerPort)
|
||||
_serverSocket = serverSocket
|
||||
|
||||
Log.i(TAG, "Running on port ${settings.listenerPort} (TCP)")
|
||||
|
||||
while (_started) {
|
||||
val socket = serverSocket.accept()
|
||||
val session = createSocketSession(socket, true)
|
||||
session.startAsResponder()
|
||||
}
|
||||
} catch (e: Throwable) {
|
||||
Logger.e(TAG, "Failed to bind server socket to port ${settings.listenerPort}", e)
|
||||
serverSocketFailedToStart = true
|
||||
onServerBindFail?.invoke()
|
||||
}
|
||||
}.apply { start() }
|
||||
}
|
||||
|
||||
private fun startConnectLastLoop() {
|
||||
_connectThread = Thread {
|
||||
Log.i(TAG, "Running auto reconnector")
|
||||
|
||||
while (_started) {
|
||||
val authorizedDevices = database.getAllAuthorizedDevices() ?: arrayOf()
|
||||
val addressesToConnect = authorizedDevices.mapNotNull {
|
||||
val connected = isConnected(it)
|
||||
if (connected) {
|
||||
return@mapNotNull null
|
||||
}
|
||||
|
||||
val lastKnownAddress = database.getLastAddress(it) ?: return@mapNotNull null
|
||||
return@mapNotNull Pair(it, lastKnownAddress)
|
||||
}
|
||||
|
||||
for (connectPair in addressesToConnect) {
|
||||
try {
|
||||
val now = System.currentTimeMillis()
|
||||
val lastConnectTime = synchronized(_lastConnectTimesIp) {
|
||||
_lastConnectTimesIp[connectPair.first] ?: 0
|
||||
}
|
||||
|
||||
//Connect once every 30 seconds, max
|
||||
if (now - lastConnectTime > 30000) {
|
||||
synchronized(_lastConnectTimesIp) {
|
||||
_lastConnectTimesIp[connectPair.first] = now
|
||||
}
|
||||
|
||||
Logger.i(TAG, "Attempting to connect to authorized device by last known IP '${connectPair.first}' with pkey=${connectPair.first}")
|
||||
connect(arrayOf(connectPair.second), settings.listenerPort, connectPair.first, null)
|
||||
}
|
||||
} catch (e: Throwable) {
|
||||
Logger.i(TAG, "Failed to connect to " + connectPair.first, e)
|
||||
}
|
||||
}
|
||||
Thread.sleep(5000)
|
||||
}
|
||||
}.apply { start() }
|
||||
}
|
||||
|
||||
private fun startRelayLoop() {
|
||||
_threadRelay = Thread {
|
||||
var backoffs: Array<Long> = arrayOf(1000, 5000, 10000, 20000)
|
||||
var backoffIndex = 0;
|
||||
|
||||
while (_started) {
|
||||
try {
|
||||
Log.i(TAG, "Starting relay session...")
|
||||
|
||||
var socketClosed = false;
|
||||
val socket = Socket(relayServer, 9000)
|
||||
_relaySession = SyncSocketSession(
|
||||
(socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!,
|
||||
keyPair!!,
|
||||
socket,
|
||||
isHandshakeAllowed = { linkType, syncSocketSession, publicKey, pairingCode, appId -> isHandshakeAllowed(linkType, syncSocketSession, publicKey, pairingCode, appId) },
|
||||
onNewChannel = { _, c ->
|
||||
val remotePublicKey = c.remotePublicKey
|
||||
if (remotePublicKey == null) {
|
||||
Log.e(TAG, "Remote public key should never be null in onNewChannel.")
|
||||
return@SyncSocketSession
|
||||
}
|
||||
|
||||
Log.i(TAG, "New channel established from relay (pk: '$remotePublicKey').")
|
||||
|
||||
var session: SyncSession?
|
||||
synchronized(_sessions) {
|
||||
session = _sessions[remotePublicKey]
|
||||
if (session == null) {
|
||||
val remoteDeviceName = database.getDeviceName(remotePublicKey)
|
||||
session = createNewSyncSession(remotePublicKey, remoteDeviceName)
|
||||
_sessions[remotePublicKey] = session!!
|
||||
}
|
||||
session!!.addChannel(c)
|
||||
}
|
||||
|
||||
c.setDataHandler { _, channel, opcode, subOpcode, data ->
|
||||
session?.handlePacket(opcode, subOpcode, data)
|
||||
}
|
||||
c.setCloseHandler { channel ->
|
||||
session?.removeChannel(channel)
|
||||
}
|
||||
},
|
||||
onChannelEstablished = { _, channel, isResponder ->
|
||||
handleAuthorization(channel, isResponder)
|
||||
},
|
||||
onClose = { socketClosed = true },
|
||||
onHandshakeComplete = { relaySession ->
|
||||
backoffIndex = 0
|
||||
|
||||
Thread {
|
||||
try {
|
||||
while (_started && !socketClosed) {
|
||||
val unconnectedAuthorizedDevices = database.getAllAuthorizedDevices()?.filter { !isConnected(it) }?.toTypedArray() ?: arrayOf()
|
||||
relaySession.publishConnectionInformation(unconnectedAuthorizedDevices, settings.listenerPort, settings.relayConnectDirect, false, false, settings.relayConnectRelayed)
|
||||
|
||||
Logger.v(TAG, "Requesting ${unconnectedAuthorizedDevices.size} devices connection information")
|
||||
val connectionInfos = runBlocking { relaySession.requestBulkConnectionInfo(unconnectedAuthorizedDevices) }
|
||||
Logger.v(TAG, "Received ${connectionInfos.size} devices connection information")
|
||||
|
||||
for ((targetKey, connectionInfo) in connectionInfos) {
|
||||
val potentialLocalAddresses = connectionInfo.ipv4Addresses
|
||||
.filter { it != connectionInfo.remoteIp }
|
||||
if (connectionInfo.allowLocalDirect && Settings.instance.synchronization.connectLocalDirectThroughRelay) {
|
||||
Thread {
|
||||
try {
|
||||
Log.v(TAG, "Attempting to connect directly, locally to '$targetKey'.")
|
||||
connect(potentialLocalAddresses.map { it }.toTypedArray(), settings.listenerPort, targetKey, null)
|
||||
} catch (e: Throwable) {
|
||||
Log.e(TAG, "Failed to start direct connection using connection info with $targetKey.", e)
|
||||
}
|
||||
}.start()
|
||||
}
|
||||
|
||||
if (connectionInfo.allowRemoteDirect) {
|
||||
// TODO: Implement direct remote connection if needed
|
||||
}
|
||||
|
||||
if (connectionInfo.allowRemoteHolePunched) {
|
||||
// TODO: Implement hole punching if needed
|
||||
}
|
||||
|
||||
if (connectionInfo.allowRemoteRelayed && Settings.instance.synchronization.connectThroughRelay) {
|
||||
try {
|
||||
Logger.v(TAG, "Attempting relayed connection with '$targetKey'.")
|
||||
runBlocking { relaySession.startRelayedChannel(targetKey, appId, null) }
|
||||
} catch (e: Throwable) {
|
||||
Logger.e(TAG, "Failed to start relayed channel with $targetKey.", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Thread.sleep(15000)
|
||||
}
|
||||
} catch (e: Throwable) {
|
||||
Logger.e(TAG, "Unhandled exception in relay session.", e)
|
||||
relaySession.stop()
|
||||
}
|
||||
}.start()
|
||||
}
|
||||
)
|
||||
|
||||
_relaySession!!.authorizable = object : IAuthorizable {
|
||||
override val isAuthorized: Boolean get() = true
|
||||
}
|
||||
|
||||
_relaySession!!.runAsInitiator(relayPublicKey, appId, null)
|
||||
|
||||
Log.i(TAG, "Started relay session.")
|
||||
} catch (e: Throwable) {
|
||||
Log.e(TAG, "Relay session failed.", e)
|
||||
} finally {
|
||||
_relaySession?.stop()
|
||||
_relaySession = null
|
||||
Thread.sleep(backoffs[min(backoffs.size - 1, backoffIndex++)])
|
||||
}
|
||||
}
|
||||
}.apply { start() }
|
||||
}
|
||||
|
||||
private fun createSocketSession(socket: Socket, isResponder: Boolean): SyncSocketSession {
|
||||
var session: SyncSession? = null
|
||||
var channelSocket: ChannelSocket? = null
|
||||
return SyncSocketSession(
|
||||
(socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!,
|
||||
keyPair!!,
|
||||
socket,
|
||||
onClose = { s ->
|
||||
if (channelSocket != null)
|
||||
session?.removeChannel(channelSocket!!)
|
||||
},
|
||||
isHandshakeAllowed = { linkType, syncSocketSession, publicKey, pairingCode, appId -> isHandshakeAllowed(linkType, syncSocketSession, publicKey, pairingCode, appId) },
|
||||
onHandshakeComplete = { s ->
|
||||
val remotePublicKey = s.remotePublicKey
|
||||
if (remotePublicKey == null) {
|
||||
s.stop()
|
||||
return@SyncSocketSession
|
||||
}
|
||||
|
||||
Logger.i(TAG, "Handshake complete with (LocalPublicKey = ${s.localPublicKey}, RemotePublicKey = ${s.remotePublicKey})")
|
||||
|
||||
channelSocket = ChannelSocket(s)
|
||||
|
||||
synchronized(_sessions) {
|
||||
session = _sessions[s.remotePublicKey]
|
||||
if (session == null) {
|
||||
val remoteDeviceName = database.getDeviceName(remotePublicKey)
|
||||
database.setLastAddress(remotePublicKey, s.remoteAddress)
|
||||
session = createNewSyncSession(remotePublicKey, remoteDeviceName)
|
||||
_sessions[remotePublicKey] = session!!
|
||||
}
|
||||
session!!.addChannel(channelSocket!!)
|
||||
}
|
||||
|
||||
handleAuthorization(channelSocket!!, isResponder)
|
||||
},
|
||||
onData = { s, opcode, subOpcode, data ->
|
||||
session?.handlePacket(opcode, subOpcode, data)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
private fun handleAuthorization(channel: IChannel, isResponder: Boolean) {
|
||||
val syncSession = channel.syncSession!!
|
||||
val remotePublicKey = channel.remotePublicKey!!
|
||||
|
||||
if (isResponder) {
|
||||
val isAuthorized = database.isAuthorized(remotePublicKey)
|
||||
if (!isAuthorized) {
|
||||
val ap = this.authorizePrompt
|
||||
if (ap == null) {
|
||||
try {
|
||||
Logger.i(TAG, "$remotePublicKey unauthorized because AuthorizePrompt is null")
|
||||
syncSession.unauthorize()
|
||||
} catch (e: Throwable) {
|
||||
Logger.e(TAG, "Failed to send authorize result.", e)
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
ap.invoke(remotePublicKey) {
|
||||
try {
|
||||
_scope?.launch(Dispatchers.IO) {
|
||||
if (it) {
|
||||
Logger.i(TAG, "$remotePublicKey manually authorized")
|
||||
syncSession.authorize()
|
||||
} else {
|
||||
Logger.i(TAG, "$remotePublicKey manually unauthorized")
|
||||
syncSession.unauthorize()
|
||||
syncSession.close()
|
||||
}
|
||||
}
|
||||
} catch (e: Throwable) {
|
||||
Logger.e(TAG, "Failed to send authorize result.")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
//Responder does not need to check because already approved
|
||||
syncSession.authorize()
|
||||
Logger.i(TAG, "Connection authorized for $remotePublicKey because already authorized")
|
||||
}
|
||||
} else {
|
||||
//Initiator does not need to check because the manual action of scanning the QR counts as approval
|
||||
syncSession.authorize()
|
||||
Logger.i(TAG, "Connection authorized for $remotePublicKey because initiator")
|
||||
}
|
||||
}
|
||||
|
||||
private fun isHandshakeAllowed(linkType: LinkType, syncSocketSession: SyncSocketSession, publicKey: String, pairingCode: String?, appId: UInt): Boolean {
|
||||
Log.v(TAG, "Check if handshake allowed from '$publicKey' (app id: $appId).")
|
||||
if (publicKey == StateSync.RELAY_PUBLIC_KEY)
|
||||
return true
|
||||
|
||||
if (database.isAuthorized(publicKey)) {
|
||||
if (linkType == LinkType.Relayed && !settings.relayHandshakeAllowed)
|
||||
return false
|
||||
return true
|
||||
}
|
||||
|
||||
Log.v(TAG, "Check if handshake allowed with pairing code '$pairingCode' with active pairing code '$_pairingCode' (app id: $appId).")
|
||||
if (_pairingCode == null || pairingCode.isNullOrEmpty())
|
||||
return false
|
||||
|
||||
if (linkType == LinkType.Relayed && !settings.relayPairAllowed)
|
||||
return false
|
||||
|
||||
return _pairingCode == pairingCode
|
||||
}
|
||||
|
||||
private fun createNewSyncSession(rpk: String, remoteDeviceName: String?): SyncSession {
|
||||
val remotePublicKey = rpk.base64ToByteArray().toBase64()
|
||||
return SyncSession(
|
||||
remotePublicKey,
|
||||
onAuthorized = { it, isNewlyAuthorized, isNewSession ->
|
||||
synchronized(_remotePendingStatusUpdate) {
|
||||
_remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(true, "Authorized")
|
||||
}
|
||||
|
||||
if (isNewSession) {
|
||||
it.remoteDeviceName?.let { remoteDeviceName ->
|
||||
database.setDeviceName(remotePublicKey, remoteDeviceName)
|
||||
}
|
||||
|
||||
database.addAuthorizedDevice(remotePublicKey)
|
||||
}
|
||||
|
||||
onAuthorized?.invoke(it, isNewlyAuthorized, isNewSession)
|
||||
},
|
||||
onUnauthorized = {
|
||||
synchronized(_remotePendingStatusUpdate) {
|
||||
_remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(false, "Unauthorized")
|
||||
}
|
||||
|
||||
onUnauthorized?.invoke(it)
|
||||
},
|
||||
onConnectedChanged = { it, connected ->
|
||||
Logger.i(TAG, "$remotePublicKey connected: $connected")
|
||||
onConnectedChanged?.invoke(it, connected)
|
||||
},
|
||||
onClose = {
|
||||
Logger.i(TAG, "$remotePublicKey closed")
|
||||
|
||||
removeSession(it.remotePublicKey)
|
||||
synchronized(_remotePendingStatusUpdate) {
|
||||
_remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(false, "Connection closed")
|
||||
}
|
||||
|
||||
onClose?.invoke(it)
|
||||
},
|
||||
dataHandler = { it, opcode, subOpcode, data ->
|
||||
onData?.invoke(it, opcode, subOpcode, data)
|
||||
},
|
||||
remoteDeviceName
|
||||
)
|
||||
}
|
||||
|
||||
fun isConnected(publicKey: String): Boolean = synchronized(_sessions) { _sessions[publicKey]?.connected ?: false }
|
||||
fun isAuthorized(publicKey: String): Boolean = database.isAuthorized(publicKey)
|
||||
fun getSession(publicKey: String): SyncSession? = synchronized(_sessions) { _sessions[publicKey] }
|
||||
fun getSessions(): List<SyncSession> = synchronized(_sessions) { _sessions.values.toList() }
|
||||
fun removeSession(publicKey: String) = synchronized(_sessions) { _sessions.remove(publicKey) }
|
||||
fun getCachedName(publicKey: String): String? = database.getDeviceName(publicKey)
|
||||
fun getAuthorizedDeviceCount(): Int = database.getAuthorizedDeviceCount()
|
||||
fun getAllAuthorizedDevices(): Array<String>? = database.getAllAuthorizedDevices()
|
||||
fun removeAuthorizedDevice(publicKey: String) = database.removeAuthorizedDevice(publicKey)
|
||||
|
||||
fun connect(deviceInfo: SyncDeviceInfo, onStatusUpdate: ((complete: Boolean?, message: String) -> Unit)? = null) {
|
||||
try {
|
||||
connect(deviceInfo.addresses, deviceInfo.port, deviceInfo.publicKey, deviceInfo.pairingCode, onStatusUpdate)
|
||||
} catch (e: Throwable) {
|
||||
Logger.e(TAG, "Failed to connect directly", e)
|
||||
val relaySession = _relaySession
|
||||
if (relaySession != null && Settings.instance.synchronization.pairThroughRelay) {
|
||||
onStatusUpdate?.invoke(null, "Connecting via relay...")
|
||||
|
||||
runBlocking {
|
||||
if (onStatusUpdate != null) {
|
||||
synchronized(_remotePendingStatusUpdate) {
|
||||
_remotePendingStatusUpdate[deviceInfo.publicKey.base64ToByteArray().toBase64()] = onStatusUpdate
|
||||
}
|
||||
}
|
||||
relaySession.startRelayedChannel(deviceInfo.publicKey.base64ToByteArray().toBase64(), appId, deviceInfo.pairingCode)
|
||||
}
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun connect(addresses: Array<String>, port: Int, publicKey: String, pairingCode: String?, onStatusUpdate: ((complete: Boolean?, message: String) -> Unit)? = null): SyncSocketSession {
|
||||
onStatusUpdate?.invoke(null, "Connecting directly...")
|
||||
val socket = getConnectedSocket(addresses.map { InetAddress.getByName(it) }, port) ?: throw Exception("Failed to connect")
|
||||
onStatusUpdate?.invoke(null, "Handshaking...")
|
||||
|
||||
val session = createSocketSession(socket, false)
|
||||
if (onStatusUpdate != null) {
|
||||
synchronized(_remotePendingStatusUpdate) {
|
||||
_remotePendingStatusUpdate[publicKey.base64ToByteArray().toBase64()] = onStatusUpdate
|
||||
}
|
||||
}
|
||||
|
||||
session.startAsInitiator(publicKey, appId, pairingCode)
|
||||
return session
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
_scope?.cancel()
|
||||
_scope = null
|
||||
_relaySession?.stop()
|
||||
_relaySession = null
|
||||
_serverSocket?.close()
|
||||
_serverSocket = null
|
||||
synchronized(_sessions) {
|
||||
_sessions.values.forEach { it.close() }
|
||||
_sessions.clear()
|
||||
}
|
||||
}
|
||||
|
||||
private fun getDeviceName(): String {
|
||||
val manufacturer = Build.MANUFACTURER.replaceFirstChar { if (it.isLowerCase()) it.titlecase(Locale.getDefault()) else it.toString() }
|
||||
val model = Build.MODEL
|
||||
|
||||
return if (model.startsWith(manufacturer, ignoreCase = true)) {
|
||||
model.replaceFirstChar { if (it.isLowerCase()) it.titlecase(Locale.getDefault()) else it.toString() }
|
||||
} else {
|
||||
"$manufacturer $model".replaceFirstChar { if (it.isLowerCase()) it.titlecase(Locale.getDefault()) else it.toString() }
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
val dh = "25519"
|
||||
val pattern = "IK"
|
||||
val cipher = "ChaChaPoly"
|
||||
val hash = "BLAKE2b"
|
||||
var protocolName = "Noise_${pattern}_${dh}_${cipher}_${hash}"
|
||||
|
||||
private const val TAG = "SyncService"
|
||||
}
|
||||
}
|
|
@ -1,9 +1,6 @@
|
|||
package com.futo.platformplayer.sync.internal
|
||||
|
||||
import android.os.Build
|
||||
import com.futo.platformplayer.LittleEndianDataInputStream
|
||||
import com.futo.platformplayer.LittleEndianDataOutputStream
|
||||
import com.futo.platformplayer.copyToOutputStream
|
||||
import com.futo.platformplayer.ensureNotMainThread
|
||||
import com.futo.platformplayer.logging.Logger
|
||||
import com.futo.platformplayer.noise.protocol.CipherStatePair
|
||||
|
@ -11,7 +8,6 @@ import com.futo.platformplayer.noise.protocol.DHState
|
|||
import com.futo.platformplayer.noise.protocol.HandshakeState
|
||||
import com.futo.platformplayer.states.StateApp
|
||||
import com.futo.platformplayer.states.StateSync
|
||||
import com.futo.platformplayer.sync.internal.ChannelRelayed.Companion
|
||||
import com.futo.polycentric.core.base64ToByteArray
|
||||
import com.futo.polycentric.core.toBase64
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
|
@ -34,9 +30,7 @@ import java.util.Locale
|
|||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.zip.GZIPInputStream
|
||||
import java.util.zip.GZIPOutputStream
|
||||
import kotlin.math.min
|
||||
import kotlin.system.measureTimeMillis
|
||||
import kotlin.time.measureTime
|
||||
|
||||
class SyncSocketSession {
|
||||
private val _socket: Socket
|
||||
|
@ -257,7 +251,7 @@ class SyncSocketSession {
|
|||
private fun handshakeAsInitiator(remotePublicKey: String, appId: UInt, pairingCode: String?) {
|
||||
performVersionCheck()
|
||||
|
||||
val initiator = HandshakeState(StateSync.protocolName, HandshakeState.INITIATOR)
|
||||
val initiator = HandshakeState(SyncService.protocolName, HandshakeState.INITIATOR)
|
||||
initiator.localKeyPair.copyFrom(_localKeyPair)
|
||||
initiator.remotePublicKey.setPublicKey(Base64.getDecoder().decode(remotePublicKey), 0)
|
||||
initiator.start()
|
||||
|
@ -311,7 +305,7 @@ class SyncSocketSession {
|
|||
private fun handshakeAsResponder(): Boolean {
|
||||
performVersionCheck()
|
||||
|
||||
val responder = HandshakeState(StateSync.protocolName, HandshakeState.RESPONDER)
|
||||
val responder = HandshakeState(SyncService.protocolName, HandshakeState.RESPONDER)
|
||||
responder.localKeyPair.copyFrom(_localKeyPair)
|
||||
responder.start()
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue