Sync implementation for subscriptions, tracking subscription removals, add fcast link to cast tutorial description.

This commit is contained in:
Kelvin 2024-09-30 16:39:10 +02:00
parent c4061cc6ac
commit fcbab10434
12 changed files with 320 additions and 17 deletions

View file

@ -70,6 +70,7 @@ import com.futo.platformplayer.fragment.mainactivity.topbar.NavigationTopBarFrag
import com.futo.platformplayer.fragment.mainactivity.topbar.SearchTopBarFragment
import com.futo.platformplayer.logging.Logger
import com.futo.platformplayer.models.ImportCache
import com.futo.platformplayer.models.UrlVideoWithTime
import com.futo.platformplayer.setNavigationBarColorAndIcons
import com.futo.platformplayer.states.StateApp
import com.futo.platformplayer.states.StateBackup
@ -731,7 +732,7 @@ class MainActivity : AppCompatActivity, IWithResultLauncher {
}
}
suspend fun handleUrl(url: String): Boolean {
suspend fun handleUrl(url: String, position: Int = 0): Boolean {
Logger.i(TAG, "handleUrl(url=$url)")
return withContext(Dispatchers.IO) {
@ -739,7 +740,10 @@ class MainActivity : AppCompatActivity, IWithResultLauncher {
if (StatePlatform.instance.hasEnabledVideoClient(url)) {
Logger.i(TAG, "handleUrl(url=$url) found video client");
lifecycleScope.launch(Dispatchers.Main) {
navigate(_fragVideoDetail, url);
if(position > 0)
navigate(_fragVideoDetail, UrlVideoWithTime(url, position.toLong(), true));
else
navigate(_fragVideoDetail, url);
_fragVideoDetail.maximizeVideoDetail(true);
}

View file

@ -200,7 +200,7 @@ class TutorialFragment : MainFragment() {
TutorialVideo(
uuid = "94d36959-e3fc-4c24-a988-89147067a179",
name = "Casting",
description = "Learn about casting in Grayjay. How do I show video on my TV?",
description = "Learn about casting in Grayjay. How do I show video on my TV?\nhttps://fcast.org/",
thumbnailUrl = "https://releases.grayjay.app/tutorials/how-to-cast.jpg",
videoUrl = "https://releases.grayjay.app/tutorials/how-to-cast.mp4",
duration = 79

View file

@ -15,6 +15,9 @@ import java.time.OffsetDateTime
class Subscription {
var channel: SerializedChannel;
@kotlinx.serialization.Serializable(with = OffsetDateTimeSerializer::class)
var creationTime: OffsetDateTime = OffsetDateTime.MIN;
//Settings
var doNotifications: Boolean = false;
var doFetchLive: Boolean = false;
@ -55,6 +58,8 @@ class Subscription {
constructor(channel : SerializedChannel) {
this.channel = channel;
if(this.creationTime == OffsetDateTime.MIN)
this.creationTime = OffsetDateTime.now();
}
fun isChannel(url: String): Boolean {

View file

@ -35,6 +35,7 @@ import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.File
import java.io.FileNotFoundException
import java.io.InputStream
import java.time.OffsetDateTime
import java.util.zip.ZipEntry
import java.util.zip.ZipInputStream
@ -509,6 +510,16 @@ class StateBackup {
}
companion object {
fun fromZipBytes(str: ByteArrayInputStream): ExportStructure {
var zip: ZipInputStream? = null;
try {
zip = ZipInputStream(str);
return fromZip(zip);
}
finally {
zip?.close();
}
}
fun fromZip(zipStream: ZipInputStream): ExportStructure {
var entry: ZipEntry?

View file

@ -18,13 +18,22 @@ import com.futo.platformplayer.models.SubscriptionGroup
import com.futo.platformplayer.polycentric.PolycentricCache
import com.futo.platformplayer.resolveChannelUrl
import com.futo.platformplayer.stores.FragmentedStorage
import com.futo.platformplayer.stores.StringDateMapStorage
import com.futo.platformplayer.stores.StringStringMapStorage
import com.futo.platformplayer.stores.SubscriptionStorage
import com.futo.platformplayer.stores.v2.ReconstructStore
import com.futo.platformplayer.stores.v2.ManagedStore
import com.futo.platformplayer.subscription.SubscriptionFetchAlgorithm
import com.futo.platformplayer.subscription.SubscriptionFetchAlgorithms
import com.futo.platformplayer.sync.internal.GJSyncOpcodes
import com.futo.platformplayer.sync.models.SyncSubscriptionsPackage
import com.google.gson.JsonSerializer
import kotlinx.coroutines.*
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import java.time.LocalDateTime
import java.time.OffsetDateTime
import java.time.ZoneOffset
import java.util.concurrent.ForkJoinPool
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
@ -45,6 +54,9 @@ class StateSubscriptions {
private val _subscriptionOthers = FragmentedStorage.storeJson<Subscription>("subscriptions_others")
.withUnique { it.channel.url }
.load();
private val _subscriptionsRemoved = FragmentedStorage.get<StringDateMapStorage>("subscriptions_removed");
private val _subscriptionsPool = ForkJoinPool(Settings.instance.subscriptions.getSubscriptionsConcurrency());
private val _legacySubscriptions = FragmentedStorage.get<SubscriptionStorage>();
@ -222,18 +234,67 @@ class StateSubscriptions {
fun getSubscriptions(): List<Subscription> {
return _subscriptions.getItems();
}
fun getSubscriptionRemovals(): Map<String, Long> {
return _subscriptionsRemoved.all();
}
fun getSubscriptionRemovalTime(url: String): OffsetDateTime{
return _subscriptionsRemoved.get(url) ?: OffsetDateTime.MIN;
}
fun addSubscription(channel : IPlatformChannel) : Subscription {
fun addSubscription(channel : IPlatformChannel, creationDate: OffsetDateTime? = null) : Subscription {
val subObj = Subscription(SerializedChannel.fromChannel(channel));
if(creationDate != null)
subObj.creationTime = creationDate;
_subscriptions.save(subObj);
onSubscriptionsChanged.emit(getSubscriptions(), true);
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
try {
StateSync.instance.broadcast(
GJSyncOpcodes.syncSubscriptions, Json.encodeToString(
SyncSubscriptionsPackage(
listOf(subObj),
mapOf<String, Long>()
)
)
);
}
catch(ex: Exception) {
Logger.w(TAG, "Failed to send subs changes to sync clients", ex);
}
}
return subObj;
}
fun removeSubscription(url: String) : Subscription? {
fun applySubscriptionRemovals(removals: Map<String, Long>): List<Subscription> {
val removed = mutableListOf<Subscription>()
val subs = getSubscriptions().associate { Pair(it.channel.url.lowercase(), it) };
for(removal in removals) {
if(subs.containsKey(removal.key.lowercase())) {
val sub = subs[removal.key.lowercase()];
val datetime = OffsetDateTime.of(LocalDateTime.ofEpochSecond(removal.value, 0, ZoneOffset.UTC), ZoneOffset.UTC);
if(datetime > sub!!.creationTime)
{
removeSubscription(sub.channel.url);
removed.add(sub);
}
}
}
_subscriptionsRemoved.setAllAndSave(removals) { key, value, oldValue ->
return@setAllAndSave oldValue == null || value > oldValue;
}
return removed;
}
fun removeSubscription(url: String, isUserAction: Boolean = false) : Subscription? {
var sub : Subscription? = getSubscription(url);
if(sub != null) {
_subscriptions.delete(sub);
onSubscriptionsChanged.emit(getSubscriptions(), false);
if(isUserAction)
_subscriptionsRemoved.setAndSave(sub.channel.url, OffsetDateTime.now());
}
return sub;
}
@ -328,6 +389,9 @@ class StateSubscriptions {
fun toMigrateCheck(): List<ManagedStore<*>> {
return listOf(_subscriptions);
}
fun getUnderlyingSubscriptionsStore(): ManagedStore<Subscription> {
return _subscriptions;
}
//Old migrate
fun shouldMigrate(): Boolean {
@ -346,6 +410,16 @@ class StateSubscriptions {
_legacySubscriptions.delete();
}
fun getSyncSubscriptionsPackageString(): String{
return Json.encodeToString(
SyncSubscriptionsPackage(
getSubscriptions(),
getSubscriptionRemovals()
)
);
}
companion object {
const val TAG = "StateSubscriptions";
const val VERSION = 1;

View file

@ -20,6 +20,7 @@ import com.futo.platformplayer.stores.FragmentedStorage
import com.futo.platformplayer.stores.StringStringMapStorage
import com.futo.platformplayer.stores.StringArrayStorage
import com.futo.platformplayer.stores.StringStorage
import com.futo.platformplayer.sync.internal.GJSyncOpcodes
import com.futo.platformplayer.sync.internal.SyncDeviceInfo
import com.futo.platformplayer.sync.internal.SyncKeyPair
import com.futo.platformplayer.sync.internal.SyncSession
@ -37,6 +38,7 @@ import java.net.ServerSocket
import java.net.Socket
import java.util.Base64
import java.util.Locale
import kotlin.system.measureTimeMillis
class StateSync {
private val _authorizedDevices = FragmentedStorage.get<StringArrayStorage>("authorized_devices")
@ -182,6 +184,11 @@ class StateSync {
_sessions[publicKey]
}
}
fun getSessions(): List<SyncSession> {
return synchronized(_sessions) {
return _sessions.values.toList()
};
}
private fun handleServiceUpdated(services: List<DnsService>) {
if (!Settings.instance.synchronization.connectDiscovered) {
@ -260,6 +267,8 @@ class StateSync {
_authorizedDevices.addDistinct(remotePublicKey)
_authorizedDevices.save()
deviceUpdatedOrAdded.emit(it.remotePublicKey, session!!)
checkForSync(it);
}, onUnauthorized = {
unauthorize(remotePublicKey)
@ -334,6 +343,31 @@ class StateSync {
})
}
fun broadcast(opcode: UByte, data: String) {
broadcast(opcode, data.toByteArray(Charsets.UTF_8));
}
fun broadcast(opcode: UByte, data: ByteArray) {
for(session in getSessions()) {
try {
if (session.isAuthorized && session.connected) {
session.send(opcode, data);
}
}
catch(ex: Exception) {
Logger.w(TAG, "Failed to broadcast ${opcode} to ${session.remotePublicKey}: ${ex.message}}", ex);
}
}
}
fun checkForSync(session: SyncSession) {
val time = measureTimeMillis {
//val export = StateBackup.export();
//session.send(GJSyncOpcodes.syncExport, export.asZip());
session.send(GJSyncOpcodes.syncSubscriptions, StateSubscriptions.instance.getSyncSubscriptionsPackageString());
}
Logger.i(TAG, "Generated and sent sync export in ${time}ms");
}
fun stop() {
_started = false
_serviceDiscoverer.stop()

View file

@ -0,0 +1,65 @@
package com.futo.platformplayer.stores
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import java.time.LocalDateTime
import java.time.OffsetDateTime
import java.time.ZoneOffset
import java.util.Dictionary
import java.util.concurrent.ConcurrentHashMap
@kotlinx.serialization.Serializable
class StringDateMapStorage : FragmentedStorageFileJson() {
var map: HashMap<String, Long> = hashMapOf()
override fun encode(): String {
synchronized(map) {
return Json.encodeToString(this);
}
}
fun get(key: String): OffsetDateTime? {
synchronized(map) {
val v = map[key];
if (v == null)
return null;
return OffsetDateTime.of(
LocalDateTime.ofEpochSecond(v, 0, ZoneOffset.UTC),
ZoneOffset.UTC
);
}
}
fun has(key: String): Boolean {
return map.contains(key);
}
fun all(): Map<String, Long>{
synchronized(map) {
return map.toMap();
}
}
fun setAllAndSave(newValues: Map<String, Long>, condition: ((String, Long, Long?) -> Boolean)? = null) {
synchronized(map){
for(kv in newValues){
if(condition == null || condition(kv.key, kv.value, map.get(kv.key)))
map.set(kv.key, kv.value);
}
}
}
fun setAndSave(key: String, value: OffsetDateTime): OffsetDateTime {
synchronized(map) {
map[key] = value.toEpochSecond();
save()
return value
}
}
fun setAndSaveBlocking(key: String, value: OffsetDateTime): OffsetDateTime {
synchronized(map) {
map[key] = value.toEpochSecond();
saveBlocking()
return value
}
}
}

View file

@ -273,6 +273,14 @@ class ManagedStore<T>{
save(obj, withReconstruction, onlyExisting);
}
suspend fun fromReconstruction(reconstruction: String, cache: ImportCache? = null): T {
if(_reconstructStore == null)
throw IllegalStateException("Can't reconstruct as no reconstruction is implemented for this type");
val id = UUID.randomUUID().toString();
return _reconstructStore!!.toObjectWithHeader(id, reconstruction, ReconstructStore.Builder(), cache);
}
suspend fun createFromReconstruction(reconstruction: String, builder: ReconstructStore.Builder, cache: ImportCache? = null): String {
if(_reconstructStore == null)
throw IllegalStateException("Can't reconstruct as no reconstruction is implemented for this type");

View file

@ -2,12 +2,20 @@ package com.futo.platformplayer.sync.internal
import com.futo.platformplayer.UIDialogs
import com.futo.platformplayer.activities.MainActivity
import com.futo.platformplayer.api.media.Serializer
import com.futo.platformplayer.logging.Logger
import com.futo.platformplayer.models.Subscription
import com.futo.platformplayer.states.StateApp
import com.futo.platformplayer.states.StateBackup
import com.futo.platformplayer.states.StatePlayer
import com.futo.platformplayer.states.StateSubscriptions
import com.futo.platformplayer.sync.internal.SyncSocketSession.Opcode
import com.futo.platformplayer.sync.models.SendToDevicePackage
import com.futo.platformplayer.sync.models.SyncSubscriptionsPackage
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.serialization.json.Json
import java.io.ByteArrayInputStream
import java.nio.ByteBuffer
interface IAuthorizable {
@ -117,18 +125,105 @@ class SyncSession : IAuthorizable {
Logger.i(TAG, "Received ${opcode} (${data.remaining()} bytes)")
//TODO: Abstract this out
when(opcode) {
GJSyncOpcodes.sendToDevices -> {
StateApp.instance.scopeOrNull?.launch(Dispatchers.Main) {
val context = StateApp.instance.contextOrNull;
if(context != null && context is MainActivity) {
val url = String(data.array(), Charsets.UTF_8);
UIDialogs.appToast("Received url from device [${socketSession.remotePublicKey}]:\n{$url}");
context.handleUrl(url);
try {
when (opcode) {
GJSyncOpcodes.sendToDevices -> {
StateApp.instance.scopeOrNull?.launch(Dispatchers.Main) {
val context = StateApp.instance.contextOrNull;
if (context != null && context is MainActivity) {
val dataBody = ByteArray(data.remaining());
val remainder = data.remaining();
data.get(dataBody, 0, remainder);
val json = String(dataBody, Charsets.UTF_8);
val obj = Json.decodeFromString<SendToDevicePackage>(json);
UIDialogs.appToast("Received url from device [${socketSession.remotePublicKey}]:\n{${obj.url}");
context.handleUrl(obj.url, obj.position);
}
};
}
GJSyncOpcodes.syncExport -> {
val dataBody = ByteArray(data.remaining());
val bytesStr = ByteArrayInputStream(data.array(), data.position(), data.remaining());
try {
val exportStruct = StateBackup.ExportStructure.fromZipBytes(bytesStr);
for (store in exportStruct.stores) {
if (store.key.equals("subscriptions", true)) {
val subStore =
StateSubscriptions.instance.getUnderlyingSubscriptionsStore();
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
val pack = SyncSubscriptionsPackage(
store.value.map {
subStore.fromReconstruction(it, exportStruct.cache)
},
StateSubscriptions.instance.getSubscriptionRemovals()
);
handleSyncSubscriptionPackage(this@SyncSession, pack);
}
}
}
} finally {
bytesStr.close();
}
};
}
GJSyncOpcodes.syncSubscriptions -> {
val dataBody = ByteArray(data.remaining());
data.get(dataBody);
val json = String(dataBody, Charsets.UTF_8);
val subPackage = Serializer.json.decodeFromString<SyncSubscriptionsPackage>(json);
handleSyncSubscriptionPackage(this, subPackage);
}
}
}
catch(ex: Exception) {
Logger.w(TAG, "Failed to handle sync package ${opcode}: ${ex.message}", ex);
}
}
private fun handleSyncSubscriptionPackage(origin: SyncSession, pack: SyncSubscriptionsPackage) {
val added = mutableListOf<Subscription>()
for(sub in pack.subscriptions) {
if(!StateSubscriptions.instance.isSubscribed(sub.channel)) {
val removalTime = StateSubscriptions.instance.getSubscriptionRemovalTime(sub.channel.url);
if(sub.creationTime > removalTime) {
val newSub =
StateSubscriptions.instance.addSubscription(sub.channel, sub.creationTime);
added.add(newSub);
}
}
}
if(added.size > 3)
UIDialogs.appToast("${added.size} Subscriptions from ${origin.remotePublicKey.substring(0, Math.min(8, origin.remotePublicKey.length))}");
else if(added.size > 0)
UIDialogs.appToast("Subscriptions from ${origin.remotePublicKey.substring(0, Math.min(8, origin.remotePublicKey.length))}:\n" +
added.map { it.channel.name }.joinToString("\n"));
if(pack.subscriptions != null && pack.subscriptions.size > 0) {
for (subRemoved in pack.subscriptionRemovals) {
val removed = StateSubscriptions.instance.applySubscriptionRemovals(pack.subscriptionRemovals);
if(removed.size > 3)
UIDialogs.appToast("Removed ${removed.size} Subscriptions from ${origin.remotePublicKey.substring(0, Math.min(8, origin.remotePublicKey.length))}");
else if(removed.size > 0)
UIDialogs.appToast("Subscriptions removed from ${origin.remotePublicKey.substring(0, Math.min(8, origin.remotePublicKey.length))}:\n" +
removed.map { it.channel.name }.joinToString("\n"));
}
}
}
fun send(opcode: UByte, data: String) {
send(opcode, data.toByteArray(Charsets.UTF_8));
}
fun send(opcode: UByte, data: ByteArray) {
val sock = _socketSessions.firstOrNull();
if(sock != null){
sock.send(opcode, ByteBuffer.wrap(data));
}
else
throw IllegalStateException("Session has no active sockets");
}
private companion object {

View file

@ -1,12 +1,16 @@
package com.futo.platformplayer.views.adapters
import android.os.Looper
import android.view.LayoutInflater
import android.view.ViewGroup
import androidx.recyclerview.widget.RecyclerView
import com.futo.platformplayer.UIDialogs
import com.futo.platformplayer.constructs.Event1
import com.futo.platformplayer.models.Subscription
import com.futo.platformplayer.states.StateApp
import com.futo.platformplayer.states.StateSubscriptions
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
class SubscriptionAdapter : RecyclerView.Adapter<SubscriptionViewHolder> {
private lateinit var _sortedDataset: List<Subscription>;
@ -30,7 +34,10 @@ class SubscriptionAdapter : RecyclerView.Adapter<SubscriptionViewHolder> {
_inflater = inflater;
_confirmationMessage = confirmationMessage;
StateSubscriptions.instance.onSubscriptionsChanged.subscribe { _, _ -> updateDataset(); }
StateSubscriptions.instance.onSubscriptionsChanged.subscribe { _, _ -> if(Looper.myLooper() != Looper.getMainLooper())
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { updateDataset() }
else
updateDataset(); }
updateDataset();
}
@ -43,7 +50,7 @@ class SubscriptionAdapter : RecyclerView.Adapter<SubscriptionViewHolder> {
holder.onTrash.subscribe {
val sub = holder.subscription ?: return@subscribe;
UIDialogs.showConfirmationDialog(_inflater.context, _confirmationMessage, {
StateSubscriptions.instance.removeSubscription(sub.channel.url);
StateSubscriptions.instance.removeSubscription(sub.channel.url, true);
});
};
holder.onSettings.subscribe {

View file

@ -79,7 +79,7 @@ class SubscribeButton : LinearLayout {
}
private fun handleUnSubscribe(url: String) {
setIsLoading(false);
val removed = StateSubscriptions.instance.removeSubscription(url);
val removed = StateSubscriptions.instance.removeSubscription(url, true);
if (removed != null)
UIDialogs.toast(context, context.getString(R.string.unsubscribed_from) + removed.channel.name);
setIsSubscribed(false);

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.8 KiB