diff --git a/app/src/main/java/com/futo/platformplayer/Extensions_Syntax.kt b/app/src/main/java/com/futo/platformplayer/Extensions_Syntax.kt index 63f4bd31..f1e63366 100644 --- a/app/src/main/java/com/futo/platformplayer/Extensions_Syntax.kt +++ b/app/src/main/java/com/futo/platformplayer/Extensions_Syntax.kt @@ -7,6 +7,9 @@ import java.net.InetAddress import java.net.URI import java.net.URISyntaxException import java.net.URLEncoder +import java.time.Instant +import java.time.OffsetDateTime +import java.time.ZoneOffset //Syntax sugaring inline fun Any.assume(): T?{ @@ -50,4 +53,20 @@ fun InetAddress?.toUrlAddress(): String { throw Exception("Invalid address type") } } +} + +fun Long?.sToOffsetDateTimeUTC(): OffsetDateTime { + if (this == null || this < 0) + return OffsetDateTime.MIN + if(this > 4070912400) + return OffsetDateTime.MAX; + return OffsetDateTime.ofInstant(Instant.ofEpochSecond(this), ZoneOffset.UTC) +} + +fun Long?.msToOffsetDateTimeUTC(): OffsetDateTime { + if (this == null || this < 0) + return OffsetDateTime.MIN + if(this > 4070912400) + return OffsetDateTime.MAX; + return OffsetDateTime.ofInstant(Instant.ofEpochMilli(this), ZoneOffset.UTC) } \ No newline at end of file diff --git a/app/src/main/java/com/futo/platformplayer/states/StatePlaylists.kt b/app/src/main/java/com/futo/platformplayer/states/StatePlaylists.kt index e2054c90..fe194b2d 100644 --- a/app/src/main/java/com/futo/platformplayer/states/StatePlaylists.kt +++ b/app/src/main/java/com/futo/platformplayer/states/StatePlaylists.kt @@ -19,6 +19,7 @@ import com.futo.platformplayer.exceptions.ReconstructionException import com.futo.platformplayer.logging.Logger import com.futo.platformplayer.models.ImportCache import com.futo.platformplayer.models.Playlist +import com.futo.platformplayer.sToOffsetDateTimeUTC import com.futo.platformplayer.smartMerge import com.futo.platformplayer.states.StateSubscriptionGroups.Companion import com.futo.platformplayer.stores.FragmentedStorage @@ -85,7 +86,7 @@ class StatePlaylists { if(value.isEmpty()) return OffsetDateTime.MIN; val tryParse = value.toLongOrNull() ?: 0; - return OffsetDateTime.ofInstant(Instant.ofEpochSecond(tryParse), ZoneOffset.UTC); + return tryParse.sToOffsetDateTimeUTC(); } private fun setWatchLaterReorderTime() { val now = OffsetDateTime.now(ZoneOffset.UTC); diff --git a/app/src/main/java/com/futo/platformplayer/states/StateSync.kt b/app/src/main/java/com/futo/platformplayer/states/StateSync.kt index 70bdc8bc..dbd3660a 100644 --- a/app/src/main/java/com/futo/platformplayer/states/StateSync.kt +++ b/app/src/main/java/com/futo/platformplayer/states/StateSync.kt @@ -25,6 +25,7 @@ import com.futo.platformplayer.models.HistoryVideo import com.futo.platformplayer.models.Subscription import com.futo.platformplayer.noise.protocol.DHState import com.futo.platformplayer.noise.protocol.Noise +import com.futo.platformplayer.sToOffsetDateTimeUTC import com.futo.platformplayer.smartMerge import com.futo.platformplayer.stores.FragmentedStorage import com.futo.platformplayer.stores.StringStringMapStorage @@ -731,7 +732,7 @@ class StateSync { } for(removal in pack.groupRemovals) { val creation = StateSubscriptionGroups.instance.getSubscriptionGroup(removal.key); - val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value, 0), ZoneOffset.UTC); + val removalTime = removal.value.sToOffsetDateTimeUTC(); if(creation != null && creation.creationTime < removalTime) StateSubscriptionGroups.instance.deleteSubscriptionGroup(removal.key, false); } @@ -759,7 +760,7 @@ class StateSync { } for(removal in pack.playlistRemovals) { val creation = StatePlaylists.instance.getPlaylist(removal.key); - val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value, 0), ZoneOffset.UTC); + val removalTime = removal.value.sToOffsetDateTimeUTC(); if(creation != null && creation.dateCreation < removalTime) StatePlaylists.instance.removePlaylist(creation, false); @@ -777,7 +778,7 @@ class StateSync { val allExisting = StatePlaylists.instance.getWatchLater(); for(video in pack.videos) { val existing = allExisting.firstOrNull { it.url == video.url }; - val time = if(pack.videoAdds != null && pack.videoAdds.containsKey(video.url)) OffsetDateTime.ofInstant(Instant.ofEpochSecond(pack.videoAdds[video.url] ?: 0), ZoneOffset.UTC) else OffsetDateTime.MIN; + val time = if(pack.videoAdds.containsKey(video.url)) (pack.videoAdds[video.url] ?: 0).sToOffsetDateTimeUTC() else OffsetDateTime.MIN; if(existing == null) { StatePlaylists.instance.addToWatchLater(video, false); @@ -788,12 +789,12 @@ class StateSync { for(removal in pack.videoRemovals) { val watchLater = allExisting.firstOrNull { it.url == removal.key } ?: continue; val creation = StatePlaylists.instance.getWatchLaterRemovalTime(watchLater.url) ?: OffsetDateTime.MIN; - val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value), ZoneOffset.UTC); + val removalTime = removal.value.sToOffsetDateTimeUTC() if(creation < removalTime) StatePlaylists.instance.removeFromWatchLater(watchLater, false, removalTime); } - val packReorderTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(pack.reorderTime), ZoneOffset.UTC); + val packReorderTime = pack.reorderTime.sToOffsetDateTimeUTC() val localReorderTime = StatePlaylists.instance.getWatchLaterLastReorderTime(); if(localReorderTime < packReorderTime && pack.ordering != null) { StatePlaylists.instance.updateWatchLaterOrdering(smartMerge(pack.ordering!!, StatePlaylists.instance.getWatchLaterOrdering()), true); @@ -830,22 +831,15 @@ class StateSync { } } - private fun onAuthorized(remotePublicKey: String) { - synchronized(_remotePendingStatusUpdate) { - _remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(true, "Authorized") - } - } - - private fun onUnuthorized(remotePublicKey: String) { - synchronized(_remotePendingStatusUpdate) { - _remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(false, "Unauthorized") - } - } - - private fun createNewSyncSession(remotePublicKey: String, remoteDeviceName: String?): SyncSession { + private fun createNewSyncSession(rpk: String, remoteDeviceName: String?): SyncSession { + val remotePublicKey = rpk.base64ToByteArray().toBase64() return SyncSession( remotePublicKey, onAuthorized = { it, isNewlyAuthorized, isNewSession -> + synchronized(_remotePendingStatusUpdate) { + _remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(true, "Authorized") + } + if (!isNewSession) { return@SyncSession } @@ -857,7 +851,6 @@ class StateSync { } Logger.i(TAG, "$remotePublicKey authorized (name: ${it.displayName})") - onAuthorized(remotePublicKey) _authorizedDevices.addDistinct(remotePublicKey) _authorizedDevices.save() deviceUpdatedOrAdded.emit(it.remotePublicKey, it) @@ -865,10 +858,12 @@ class StateSync { checkForSync(it); }, onUnauthorized = { - unauthorize(remotePublicKey) + synchronized(_remotePendingStatusUpdate) { + _remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(false, "Unauthorized") + } + unauthorize(remotePublicKey) Logger.i(TAG, "$remotePublicKey unauthorized (name: ${it.displayName})") - onUnuthorized(remotePublicKey) synchronized(_sessions) { it.close() @@ -1117,7 +1112,7 @@ class StateSync { runBlocking { if (onStatusUpdate != null) { synchronized(_remotePendingStatusUpdate) { - _remotePendingStatusUpdate[deviceInfo.publicKey] = onStatusUpdate + _remotePendingStatusUpdate[deviceInfo.publicKey.base64ToByteArray().toBase64()] = onStatusUpdate } } relaySession.startRelayedChannel(deviceInfo.publicKey, APP_ID, deviceInfo.pairingCode) @@ -1136,7 +1131,7 @@ class StateSync { val session = createSocketSession(socket, false) if (onStatusUpdate != null) { synchronized(_remotePendingStatusUpdate) { - _remotePendingStatusUpdate[publicKey] = onStatusUpdate + _remotePendingStatusUpdate[publicKey.base64ToByteArray().toBase64()] = onStatusUpdate } } diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt index ab0a0da6..1f3f9fcf 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt @@ -11,6 +11,8 @@ import com.futo.platformplayer.noise.protocol.DHState import com.futo.platformplayer.noise.protocol.HandshakeState import com.futo.platformplayer.states.StateSync import com.futo.platformplayer.sync.internal.ChannelRelayed.Companion +import com.futo.polycentric.core.base64ToByteArray +import com.futo.polycentric.core.toBase64 import kotlinx.coroutines.CompletableDeferred import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream @@ -169,7 +171,7 @@ class SyncSocketSession { var totalBytesReceived: Int = 0 while (totalBytesReceived < size) { val bytesReceived = _inputStream.read(buffer, offset + totalBytesReceived, size - totalBytesReceived) - if (bytesReceived == 0) + if (bytesReceived <= 0) throw Exception("Socket disconnected") totalBytesReceived += bytesReceived } @@ -291,7 +293,7 @@ class SyncSocketSession { _cipherStatePair = initiator.split() val remoteKeyBytes = ByteArray(initiator.remotePublicKey.publicKeyLength) initiator.remotePublicKey.getPublicKey(remoteKeyBytes, 0) - _remotePublicKey = Base64.getEncoder().encodeToString(remoteKeyBytes) + _remotePublicKey = Base64.getEncoder().encodeToString(remoteKeyBytes).base64ToByteArray().toBase64() } private fun handshakeAsResponder(): Boolean { @@ -345,7 +347,7 @@ class SyncSocketSession { _outputStream.write(responseBuffer, 0, 4 + responseLength) _cipherStatePair = responder.split() - _remotePublicKey = remotePublicKey + _remotePublicKey = remotePublicKey.base64ToByteArray().toBase64() return true } @@ -440,7 +442,7 @@ class SyncSocketSession { ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len) _outputStream.write(_sendBufferEncrypted, 0, 4 + len) } - //Logger.v(TAG, "_outputStream.write (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.remaining(): ${processedData.remaining()}, sendDuration: ${sendDuration})") + Logger.v(TAG, "_outputStream.write (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.remaining(): ${processedData.remaining()}, sendDuration: ${sendDuration})") } } } @@ -840,11 +842,14 @@ class SyncSocketSession { if (!isGzipSupported) throw Exception("Failed to handle packet, gzip is not supported for this opcode (opcode = ${opcode}, subOpcode = ${subOpcode}, data.length = ${data.remaining()}).") - val compressedStream = ByteArrayInputStream(data.array(), data.position(), data.remaining()); - var outputStream = ByteArrayOutputStream(); + val compressedStream = ByteArrayInputStream(data.array(), data.position(), data.remaining()) + val outputStream = ByteArrayOutputStream() GZIPInputStream(compressedStream).use { gzipStream -> - gzipStream.copyToOutputStream(outputStream); - gzipStream.close(); + val buffer = ByteArray(8192) // 8KB buffer + var bytesRead: Int + while (gzipStream.read(buffer).also { bytesRead = it } != -1) { + outputStream.write(buffer, 0, bytesRead) + } } data = ByteBuffer.wrap(outputStream.toByteArray()) } @@ -933,7 +938,7 @@ class SyncSocketSession { throw Exception("After sync stream end, the stream must be complete") } - handlePacket(syncStream.opcode, syncStream.subOpcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) }, contentEncoding, sourceChannel) + handlePacket(syncStream.opcode, syncStream.subOpcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) }, syncStream.contentEncoding, sourceChannel) } } Opcode.DATA.value -> {