mirror of
https://gitlab.futo.org/videostreaming/grayjay.git
synced 2025-08-12 19:19:45 +00:00
Various fixes for android to android pairing.
This commit is contained in:
parent
db8426779c
commit
ac3a8da002
4 changed files with 53 additions and 33 deletions
|
@ -7,6 +7,9 @@ import java.net.InetAddress
|
|||
import java.net.URI
|
||||
import java.net.URISyntaxException
|
||||
import java.net.URLEncoder
|
||||
import java.time.Instant
|
||||
import java.time.OffsetDateTime
|
||||
import java.time.ZoneOffset
|
||||
|
||||
//Syntax sugaring
|
||||
inline fun <reified T> Any.assume(): T?{
|
||||
|
@ -50,4 +53,20 @@ fun InetAddress?.toUrlAddress(): String {
|
|||
throw Exception("Invalid address type")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun Long?.sToOffsetDateTimeUTC(): OffsetDateTime {
|
||||
if (this == null || this < 0)
|
||||
return OffsetDateTime.MIN
|
||||
if(this > 4070912400)
|
||||
return OffsetDateTime.MAX;
|
||||
return OffsetDateTime.ofInstant(Instant.ofEpochSecond(this), ZoneOffset.UTC)
|
||||
}
|
||||
|
||||
fun Long?.msToOffsetDateTimeUTC(): OffsetDateTime {
|
||||
if (this == null || this < 0)
|
||||
return OffsetDateTime.MIN
|
||||
if(this > 4070912400)
|
||||
return OffsetDateTime.MAX;
|
||||
return OffsetDateTime.ofInstant(Instant.ofEpochMilli(this), ZoneOffset.UTC)
|
||||
}
|
|
@ -19,6 +19,7 @@ import com.futo.platformplayer.exceptions.ReconstructionException
|
|||
import com.futo.platformplayer.logging.Logger
|
||||
import com.futo.platformplayer.models.ImportCache
|
||||
import com.futo.platformplayer.models.Playlist
|
||||
import com.futo.platformplayer.sToOffsetDateTimeUTC
|
||||
import com.futo.platformplayer.smartMerge
|
||||
import com.futo.platformplayer.states.StateSubscriptionGroups.Companion
|
||||
import com.futo.platformplayer.stores.FragmentedStorage
|
||||
|
@ -85,7 +86,7 @@ class StatePlaylists {
|
|||
if(value.isEmpty())
|
||||
return OffsetDateTime.MIN;
|
||||
val tryParse = value.toLongOrNull() ?: 0;
|
||||
return OffsetDateTime.ofInstant(Instant.ofEpochSecond(tryParse), ZoneOffset.UTC);
|
||||
return tryParse.sToOffsetDateTimeUTC();
|
||||
}
|
||||
private fun setWatchLaterReorderTime() {
|
||||
val now = OffsetDateTime.now(ZoneOffset.UTC);
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.futo.platformplayer.models.HistoryVideo
|
|||
import com.futo.platformplayer.models.Subscription
|
||||
import com.futo.platformplayer.noise.protocol.DHState
|
||||
import com.futo.platformplayer.noise.protocol.Noise
|
||||
import com.futo.platformplayer.sToOffsetDateTimeUTC
|
||||
import com.futo.platformplayer.smartMerge
|
||||
import com.futo.platformplayer.stores.FragmentedStorage
|
||||
import com.futo.platformplayer.stores.StringStringMapStorage
|
||||
|
@ -731,7 +732,7 @@ class StateSync {
|
|||
}
|
||||
for(removal in pack.groupRemovals) {
|
||||
val creation = StateSubscriptionGroups.instance.getSubscriptionGroup(removal.key);
|
||||
val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value, 0), ZoneOffset.UTC);
|
||||
val removalTime = removal.value.sToOffsetDateTimeUTC();
|
||||
if(creation != null && creation.creationTime < removalTime)
|
||||
StateSubscriptionGroups.instance.deleteSubscriptionGroup(removal.key, false);
|
||||
}
|
||||
|
@ -759,7 +760,7 @@ class StateSync {
|
|||
}
|
||||
for(removal in pack.playlistRemovals) {
|
||||
val creation = StatePlaylists.instance.getPlaylist(removal.key);
|
||||
val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value, 0), ZoneOffset.UTC);
|
||||
val removalTime = removal.value.sToOffsetDateTimeUTC();
|
||||
if(creation != null && creation.dateCreation < removalTime)
|
||||
StatePlaylists.instance.removePlaylist(creation, false);
|
||||
|
||||
|
@ -777,7 +778,7 @@ class StateSync {
|
|||
val allExisting = StatePlaylists.instance.getWatchLater();
|
||||
for(video in pack.videos) {
|
||||
val existing = allExisting.firstOrNull { it.url == video.url };
|
||||
val time = if(pack.videoAdds != null && pack.videoAdds.containsKey(video.url)) OffsetDateTime.ofInstant(Instant.ofEpochSecond(pack.videoAdds[video.url] ?: 0), ZoneOffset.UTC) else OffsetDateTime.MIN;
|
||||
val time = if(pack.videoAdds.containsKey(video.url)) (pack.videoAdds[video.url] ?: 0).sToOffsetDateTimeUTC() else OffsetDateTime.MIN;
|
||||
|
||||
if(existing == null) {
|
||||
StatePlaylists.instance.addToWatchLater(video, false);
|
||||
|
@ -788,12 +789,12 @@ class StateSync {
|
|||
for(removal in pack.videoRemovals) {
|
||||
val watchLater = allExisting.firstOrNull { it.url == removal.key } ?: continue;
|
||||
val creation = StatePlaylists.instance.getWatchLaterRemovalTime(watchLater.url) ?: OffsetDateTime.MIN;
|
||||
val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value), ZoneOffset.UTC);
|
||||
val removalTime = removal.value.sToOffsetDateTimeUTC()
|
||||
if(creation < removalTime)
|
||||
StatePlaylists.instance.removeFromWatchLater(watchLater, false, removalTime);
|
||||
}
|
||||
|
||||
val packReorderTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(pack.reorderTime), ZoneOffset.UTC);
|
||||
val packReorderTime = pack.reorderTime.sToOffsetDateTimeUTC()
|
||||
val localReorderTime = StatePlaylists.instance.getWatchLaterLastReorderTime();
|
||||
if(localReorderTime < packReorderTime && pack.ordering != null) {
|
||||
StatePlaylists.instance.updateWatchLaterOrdering(smartMerge(pack.ordering!!, StatePlaylists.instance.getWatchLaterOrdering()), true);
|
||||
|
@ -830,22 +831,15 @@ class StateSync {
|
|||
}
|
||||
}
|
||||
|
||||
private fun onAuthorized(remotePublicKey: String) {
|
||||
synchronized(_remotePendingStatusUpdate) {
|
||||
_remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(true, "Authorized")
|
||||
}
|
||||
}
|
||||
|
||||
private fun onUnuthorized(remotePublicKey: String) {
|
||||
synchronized(_remotePendingStatusUpdate) {
|
||||
_remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(false, "Unauthorized")
|
||||
}
|
||||
}
|
||||
|
||||
private fun createNewSyncSession(remotePublicKey: String, remoteDeviceName: String?): SyncSession {
|
||||
private fun createNewSyncSession(rpk: String, remoteDeviceName: String?): SyncSession {
|
||||
val remotePublicKey = rpk.base64ToByteArray().toBase64()
|
||||
return SyncSession(
|
||||
remotePublicKey,
|
||||
onAuthorized = { it, isNewlyAuthorized, isNewSession ->
|
||||
synchronized(_remotePendingStatusUpdate) {
|
||||
_remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(true, "Authorized")
|
||||
}
|
||||
|
||||
if (!isNewSession) {
|
||||
return@SyncSession
|
||||
}
|
||||
|
@ -857,7 +851,6 @@ class StateSync {
|
|||
}
|
||||
|
||||
Logger.i(TAG, "$remotePublicKey authorized (name: ${it.displayName})")
|
||||
onAuthorized(remotePublicKey)
|
||||
_authorizedDevices.addDistinct(remotePublicKey)
|
||||
_authorizedDevices.save()
|
||||
deviceUpdatedOrAdded.emit(it.remotePublicKey, it)
|
||||
|
@ -865,10 +858,12 @@ class StateSync {
|
|||
checkForSync(it);
|
||||
},
|
||||
onUnauthorized = {
|
||||
unauthorize(remotePublicKey)
|
||||
synchronized(_remotePendingStatusUpdate) {
|
||||
_remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(false, "Unauthorized")
|
||||
}
|
||||
|
||||
unauthorize(remotePublicKey)
|
||||
Logger.i(TAG, "$remotePublicKey unauthorized (name: ${it.displayName})")
|
||||
onUnuthorized(remotePublicKey)
|
||||
|
||||
synchronized(_sessions) {
|
||||
it.close()
|
||||
|
@ -1117,7 +1112,7 @@ class StateSync {
|
|||
runBlocking {
|
||||
if (onStatusUpdate != null) {
|
||||
synchronized(_remotePendingStatusUpdate) {
|
||||
_remotePendingStatusUpdate[deviceInfo.publicKey] = onStatusUpdate
|
||||
_remotePendingStatusUpdate[deviceInfo.publicKey.base64ToByteArray().toBase64()] = onStatusUpdate
|
||||
}
|
||||
}
|
||||
relaySession.startRelayedChannel(deviceInfo.publicKey, APP_ID, deviceInfo.pairingCode)
|
||||
|
@ -1136,7 +1131,7 @@ class StateSync {
|
|||
val session = createSocketSession(socket, false)
|
||||
if (onStatusUpdate != null) {
|
||||
synchronized(_remotePendingStatusUpdate) {
|
||||
_remotePendingStatusUpdate[publicKey] = onStatusUpdate
|
||||
_remotePendingStatusUpdate[publicKey.base64ToByteArray().toBase64()] = onStatusUpdate
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -11,6 +11,8 @@ import com.futo.platformplayer.noise.protocol.DHState
|
|||
import com.futo.platformplayer.noise.protocol.HandshakeState
|
||||
import com.futo.platformplayer.states.StateSync
|
||||
import com.futo.platformplayer.sync.internal.ChannelRelayed.Companion
|
||||
import com.futo.polycentric.core.base64ToByteArray
|
||||
import com.futo.polycentric.core.toBase64
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.ByteArrayOutputStream
|
||||
|
@ -169,7 +171,7 @@ class SyncSocketSession {
|
|||
var totalBytesReceived: Int = 0
|
||||
while (totalBytesReceived < size) {
|
||||
val bytesReceived = _inputStream.read(buffer, offset + totalBytesReceived, size - totalBytesReceived)
|
||||
if (bytesReceived == 0)
|
||||
if (bytesReceived <= 0)
|
||||
throw Exception("Socket disconnected")
|
||||
totalBytesReceived += bytesReceived
|
||||
}
|
||||
|
@ -291,7 +293,7 @@ class SyncSocketSession {
|
|||
_cipherStatePair = initiator.split()
|
||||
val remoteKeyBytes = ByteArray(initiator.remotePublicKey.publicKeyLength)
|
||||
initiator.remotePublicKey.getPublicKey(remoteKeyBytes, 0)
|
||||
_remotePublicKey = Base64.getEncoder().encodeToString(remoteKeyBytes)
|
||||
_remotePublicKey = Base64.getEncoder().encodeToString(remoteKeyBytes).base64ToByteArray().toBase64()
|
||||
}
|
||||
|
||||
private fun handshakeAsResponder(): Boolean {
|
||||
|
@ -345,7 +347,7 @@ class SyncSocketSession {
|
|||
_outputStream.write(responseBuffer, 0, 4 + responseLength)
|
||||
|
||||
_cipherStatePair = responder.split()
|
||||
_remotePublicKey = remotePublicKey
|
||||
_remotePublicKey = remotePublicKey.base64ToByteArray().toBase64()
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -440,7 +442,7 @@ class SyncSocketSession {
|
|||
ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len)
|
||||
_outputStream.write(_sendBufferEncrypted, 0, 4 + len)
|
||||
}
|
||||
//Logger.v(TAG, "_outputStream.write (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.remaining(): ${processedData.remaining()}, sendDuration: ${sendDuration})")
|
||||
Logger.v(TAG, "_outputStream.write (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.remaining(): ${processedData.remaining()}, sendDuration: ${sendDuration})")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -840,11 +842,14 @@ class SyncSocketSession {
|
|||
if (!isGzipSupported)
|
||||
throw Exception("Failed to handle packet, gzip is not supported for this opcode (opcode = ${opcode}, subOpcode = ${subOpcode}, data.length = ${data.remaining()}).")
|
||||
|
||||
val compressedStream = ByteArrayInputStream(data.array(), data.position(), data.remaining());
|
||||
var outputStream = ByteArrayOutputStream();
|
||||
val compressedStream = ByteArrayInputStream(data.array(), data.position(), data.remaining())
|
||||
val outputStream = ByteArrayOutputStream()
|
||||
GZIPInputStream(compressedStream).use { gzipStream ->
|
||||
gzipStream.copyToOutputStream(outputStream);
|
||||
gzipStream.close();
|
||||
val buffer = ByteArray(8192) // 8KB buffer
|
||||
var bytesRead: Int
|
||||
while (gzipStream.read(buffer).also { bytesRead = it } != -1) {
|
||||
outputStream.write(buffer, 0, bytesRead)
|
||||
}
|
||||
}
|
||||
data = ByteBuffer.wrap(outputStream.toByteArray())
|
||||
}
|
||||
|
@ -933,7 +938,7 @@ class SyncSocketSession {
|
|||
throw Exception("After sync stream end, the stream must be complete")
|
||||
}
|
||||
|
||||
handlePacket(syncStream.opcode, syncStream.subOpcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) }, contentEncoding, sourceChannel)
|
||||
handlePacket(syncStream.opcode, syncStream.subOpcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) }, syncStream.contentEncoding, sourceChannel)
|
||||
}
|
||||
}
|
||||
Opcode.DATA.value -> {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue