WIP SubsExchange

This commit is contained in:
Kelvin K 2025-03-26 23:28:32 +01:00
parent 7bd687331b
commit 034b8b15ae
13 changed files with 271 additions and 8 deletions

View file

@ -294,6 +294,9 @@ class Settings : FragmentedStorageFileJson() {
@FormField(R.string.show_subscription_group, FieldForm.TOGGLE, R.string.show_subscription_group_description, 5)
var showSubscriptionGroups: Boolean = true;
@FormField(R.string.use_subscription_exchange, FieldForm.TOGGLE, R.string.use_subscription_exchange_description, 6)
var useSubscriptionExchange: Boolean = false;
@FormField(R.string.preview_feed_items, FieldForm.TOGGLE, R.string.preview_feed_items_description, 6)
var previewFeedItems: Boolean = true;

View file

@ -1,5 +1,6 @@
package com.futo.platformplayer.states
import SubsExchangeClient
import com.futo.platformplayer.Settings
import com.futo.platformplayer.api.media.PlatformID
import com.futo.platformplayer.api.media.models.channels.IPlatformChannel
@ -18,6 +19,7 @@ import com.futo.platformplayer.models.SubscriptionGroup
import com.futo.platformplayer.resolveChannelUrl
import com.futo.platformplayer.stores.FragmentedStorage
import com.futo.platformplayer.stores.StringDateMapStorage
import com.futo.platformplayer.stores.StringStorage
import com.futo.platformplayer.stores.StringStringMapStorage
import com.futo.platformplayer.stores.SubscriptionStorage
import com.futo.platformplayer.stores.v2.ReconstructStore
@ -67,10 +69,24 @@ class StateSubscriptions {
val onSubscriptionsChanged = Event2<List<Subscription>, Boolean>();
private val _subsExchangeServer = "https://exchange.grayjay.app/";
private val _subscriptionKey = FragmentedStorage.get<StringStorage>("sub_exchange_key");
init {
global.onUpdateProgress.subscribe { progress, total ->
onFeedProgress.emit(null, progress, total);
}
if(_subscriptionKey.value.isNullOrBlank())
generateNewSubsExchangeKey();
}
fun generateNewSubsExchangeKey(){
_subscriptionKey.setAndSave(SubsExchangeClient.createPrivateKey());
}
fun getSubsExchangeClient(): SubsExchangeClient {
if(_subscriptionKey.value.isNullOrBlank())
throw IllegalStateException("No valid subscription exchange key set");
return SubsExchangeClient(_subsExchangeServer, _subscriptionKey.value);
}
fun getOldestUpdateTime(): OffsetDateTime {
@ -359,7 +375,8 @@ class StateSubscriptions {
}
fun getSubscriptionsFeedWithExceptions(allowFailure: Boolean = false, withCacheFallback: Boolean = false, cacheScope: CoroutineScope, onProgress: ((Int, Int)->Unit)? = null, onNewCacheHit: ((Subscription, IPlatformContent)->Unit)? = null, subGroup: SubscriptionGroup? = null): Pair<IPager<IPlatformContent>, List<Throwable>> {
val algo = SubscriptionFetchAlgorithm.getAlgorithm(_algorithmSubscriptions, cacheScope, allowFailure, withCacheFallback, _subscriptionsPool);
val exchangeClient = if(Settings.instance.subscriptions.useSubscriptionExchange) getSubsExchangeClient() else null;
val algo = SubscriptionFetchAlgorithm.getAlgorithm(_algorithmSubscriptions, cacheScope, allowFailure, withCacheFallback, _subscriptionsPool, exchangeClient);
if(onNewCacheHit != null)
algo.onNewCacheHit.subscribe(onNewCacheHit)

View file

@ -1,5 +1,6 @@
package com.futo.platformplayer.subscription
import SubsExchangeClient
import com.futo.platformplayer.Settings
import com.futo.platformplayer.api.media.models.ResultCapabilities
import com.futo.platformplayer.api.media.platforms.js.JSClient
@ -15,8 +16,9 @@ class SmartSubscriptionAlgorithm(
scope: CoroutineScope,
allowFailure: Boolean = false,
withCacheFallback: Boolean = true,
threadPool: ForkJoinPool? = null
): SubscriptionsTaskFetchAlgorithm(scope, allowFailure, withCacheFallback, threadPool) {
threadPool: ForkJoinPool? = null,
subsExchangeClient: SubsExchangeClient? = null
): SubscriptionsTaskFetchAlgorithm(scope, allowFailure, withCacheFallback, threadPool, subsExchangeClient) {
override fun getSubscriptionTasks(subs: Map<Subscription, List<String>>): List<SubscriptionTask> {
val allTasks: List<SubscriptionTask> = subs.flatMap { entry ->
val sub = entry.key;

View file

@ -1,5 +1,6 @@
package com.futo.platformplayer.subscription
import SubsExchangeClient
import com.futo.platformplayer.api.media.models.contents.IPlatformContent
import com.futo.platformplayer.api.media.platforms.js.JSClient
import com.futo.platformplayer.api.media.structures.IPager
@ -33,11 +34,11 @@ abstract class SubscriptionFetchAlgorithm(
companion object {
public val TAG = "SubscriptionAlgorithm";
fun getAlgorithm(algo: SubscriptionFetchAlgorithms, scope: CoroutineScope, allowFailure: Boolean = false, withCacheFallback: Boolean = false, pool: ForkJoinPool? = null): SubscriptionFetchAlgorithm {
fun getAlgorithm(algo: SubscriptionFetchAlgorithms, scope: CoroutineScope, allowFailure: Boolean = false, withCacheFallback: Boolean = false, pool: ForkJoinPool? = null, withExchangeClient: SubsExchangeClient? = null): SubscriptionFetchAlgorithm {
return when(algo) {
SubscriptionFetchAlgorithms.CACHE -> CachedSubscriptionAlgorithm(scope, allowFailure, withCacheFallback, pool, 50);
SubscriptionFetchAlgorithms.SIMPLE -> SimpleSubscriptionAlgorithm(scope, allowFailure, withCacheFallback, pool);
SubscriptionFetchAlgorithms.SMART -> SmartSubscriptionAlgorithm(scope, allowFailure, withCacheFallback, pool);
SubscriptionFetchAlgorithms.SMART -> SmartSubscriptionAlgorithm(scope, allowFailure, withCacheFallback, pool, withExchangeClient);
}
}
}

View file

@ -1,5 +1,6 @@
package com.futo.platformplayer.subscription
import SubsExchangeClient
import com.futo.platformplayer.UIDialogs
import com.futo.platformplayer.activities.MainActivity
import com.futo.platformplayer.api.media.models.ResultCapabilities
@ -10,6 +11,7 @@ import com.futo.platformplayer.api.media.structures.DedupContentPager
import com.futo.platformplayer.api.media.structures.EmptyPager
import com.futo.platformplayer.api.media.structures.IPager
import com.futo.platformplayer.api.media.structures.MultiChronoContentPager
import com.futo.platformplayer.api.media.structures.PlatformContentPager
import com.futo.platformplayer.engine.exceptions.PluginException
import com.futo.platformplayer.engine.exceptions.ScriptCaptchaRequiredException
import com.futo.platformplayer.engine.exceptions.ScriptCriticalException
@ -24,6 +26,8 @@ import com.futo.platformplayer.states.StateCache
import com.futo.platformplayer.states.StatePlatform
import com.futo.platformplayer.states.StatePlugins
import com.futo.platformplayer.states.StateSubscriptions
import com.futo.platformplayer.subsexchange.ChannelRequest
import com.futo.platformplayer.subsexchange.ChannelResolve
import kotlinx.coroutines.CoroutineScope
import java.time.OffsetDateTime
import java.util.concurrent.ExecutionException
@ -35,7 +39,8 @@ abstract class SubscriptionsTaskFetchAlgorithm(
scope: CoroutineScope,
allowFailure: Boolean = false,
withCacheFallback: Boolean = true,
_threadPool: ForkJoinPool? = null
_threadPool: ForkJoinPool? = null,
private val subsExchangeClient: SubsExchangeClient? = null
) : SubscriptionFetchAlgorithm(scope, allowFailure, withCacheFallback, _threadPool) {
@ -45,7 +50,7 @@ abstract class SubscriptionsTaskFetchAlgorithm(
}
override fun getSubscriptions(subs: Map<Subscription, List<String>>): Result {
val tasks = getSubscriptionTasks(subs);
var tasks = getSubscriptionTasks(subs).toMutableList()
val tasksGrouped = tasks.groupBy { it.client }
@ -70,6 +75,21 @@ abstract class SubscriptionsTaskFetchAlgorithm(
val exs: ArrayList<Throwable> = arrayListOf();
val liveTasks = tasks.filter { !it.fromPeek && !it.fromCache };
val contract = subsExchangeClient?.requestContract(*liveTasks.map { ChannelRequest(it.url) }.toTypedArray());
var providedTasks: MutableList<SubscriptionTask>? = null;
if(contract != null && contract.provided.size > 0){
providedTasks = mutableListOf()
for(task in tasks.toList()){
if(!task.fromCache && !task.fromPeek && contract.provided.contains(task.url)) {
providedTasks.add(task);
tasks.remove(task);
}
}
}
val failedPlugins = mutableListOf<String>();
val cachedChannels = mutableListOf<String>()
val forkTasks = executeSubscriptionTasks(tasks, failedPlugins, cachedChannels);
@ -104,6 +124,39 @@ abstract class SubscriptionsTaskFetchAlgorithm(
};
}
}
//Resolve Subscription Exchange
if(contract != null) {
try {
val resolve = subsExchangeClient?.resolveContract(
contract,
*taskResults.filter { it.pager != null }.map {
ChannelResolve(
it.task.url,
it.pager!!.getResults()
)
}.toTypedArray()
);
if (resolve != null) {
for(result in resolve){
val task = providedTasks?.find { it.url == result.channelUrl };
if(task != null) {
taskResults.add(SubscriptionTaskResult(task, PlatformContentPager(result.content, result.content.size), null));
providedTasks?.remove(task);
}
}
}
if (providedTasks != null) {
for(task in providedTasks) {
taskResults.add(SubscriptionTaskResult(task, null, IllegalStateException("No data received from exchange")));
}
}
}
catch(ex: Throwable) {
//TODO: fetch remainder after all?
}
}
Logger.i("StateSubscriptions", "Subscriptions results in ${timeTotal}ms")
//Cache pagers grouped by channel
@ -173,6 +226,8 @@ abstract class SubscriptionsTaskFetchAlgorithm(
Logger.e(StateSubscriptions.TAG, "Subscription peek [${task.sub.channel.name}] failed", ex);
}
}
//Intercepts task.fromCache & task.fromPeek
synchronized(cachedChannels) {
if(task.fromCache || task.fromPeek) {
finished++;

View file

@ -0,0 +1,8 @@
package com.futo.platformplayer.subsexchange
import kotlinx.serialization.Serializable
@Serializable
class ChannelRequest(
var url: String
);

View file

@ -0,0 +1,13 @@
package com.futo.platformplayer.subsexchange
import com.futo.platformplayer.api.media.models.channels.IPlatformChannel
import com.futo.platformplayer.api.media.models.contents.IPlatformContent
import kotlinx.serialization.Serializable
import java.time.OffsetDateTime
@Serializable
class ChannelResolve(
var channelUrl: String,
var content: List<IPlatformContent>,
var channel: IPlatformChannel? = null
)

View file

@ -0,0 +1,17 @@
package com.futo.platformplayer.subsexchange
import com.futo.platformplayer.api.media.models.channels.IPlatformChannel
import com.futo.platformplayer.api.media.models.contents.IPlatformContent
import com.futo.platformplayer.serializers.OffsetDateTimeNullableSerializer
import com.futo.platformplayer.serializers.OffsetDateTimeSerializer
import kotlinx.serialization.Serializable
import java.time.OffsetDateTime
@Serializable
class ChannelResult(
@kotlinx.serialization.Serializable(with = OffsetDateTimeSerializer::class)
var dateTime: OffsetDateTime,
var channelUrl: String,
var content: List<IPlatformContent>,
var channel: IPlatformChannel? = null
)

View file

@ -0,0 +1,17 @@
package com.futo.platformplayer.subsexchange
import com.futo.platformplayer.serializers.OffsetDateTimeNullableSerializer
import com.futo.platformplayer.serializers.OffsetDateTimeSerializer
import kotlinx.serialization.Serializable
import java.time.OffsetDateTime
@Serializable
class ExchangeContract(
var id: String,
var requests: List<ChannelRequest>,
var provided: List<String> = listOf(),
var required: List<String> = listOf(),
@kotlinx.serialization.Serializable(with = OffsetDateTimeSerializer::class)
var expired: OffsetDateTime = OffsetDateTime.MIN,
var contractVersion: Int = 1
)

View file

@ -0,0 +1,10 @@
package com.futo.platformplayer.subsexchange
import kotlinx.serialization.Serializable
@Serializable
data class ExchangeContractResolve(
val publicKey: String,
val signature: String,
val data: String
)

View file

@ -0,0 +1,118 @@
import com.futo.platformplayer.subsexchange.ChannelRequest
import com.futo.platformplayer.subsexchange.ChannelResolve
import com.futo.platformplayer.subsexchange.ChannelResult
import com.futo.platformplayer.subsexchange.ExchangeContract
import com.futo.platformplayer.subsexchange.ExchangeContractResolve
import kotlinx.serialization.*
import kotlinx.serialization.json.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import java.net.HttpURLConnection
import java.net.URL
import java.security.KeyFactory
import java.security.PrivateKey
import java.security.PublicKey
import java.security.Signature
import java.security.interfaces.RSAPrivateKey
import java.security.interfaces.RSAPublicKey
import java.util.Base64
import java.io.InputStreamReader
import java.io.OutputStream
import java.io.OutputStreamWriter
import java.nio.charset.StandardCharsets
import java.security.KeyPairGenerator
import java.security.spec.PKCS8EncodedKeySpec
class SubsExchangeClient(private val server: String, private val privateKey: String) {
private val publicKey: String = extractPublicKey(privateKey)
// Endpoints
// Endpoint: Contract
fun requestContract(vararg channels: ChannelRequest): ExchangeContract {
val data = post("/api/Channel/Contract", Json.encodeToString(channels), "application/json")
return Json.decodeFromString(data)
}
suspend fun requestContractAsync(vararg channels: ChannelRequest): ExchangeContract {
val data = postAsync("/api/Channel/Contract", Json.encodeToString(channels), "application/json")
return Json.decodeFromString(data)
}
// Endpoint: Resolve
fun resolveContract(contract: ExchangeContract, vararg resolves: ChannelResolve): Array<ChannelResult> {
val contractResolve = convertResolves(*resolves)
val result = post("/api/Channel/Resolve?contractId=${contract.id}", Json.encodeToString(contractResolve), "application/json")
return Json.decodeFromString(result)
}
suspend fun resolveContractAsync(contract: ExchangeContract, vararg resolves: ChannelResolve): Array<ChannelResult> {
val contractResolve = convertResolves(*resolves)
val result = postAsync("/api/Channel/Resolve?contractId=${contract.id}", Json.encodeToString(contractResolve), "application/json")
return Json.decodeFromString(result)
}
private fun convertResolves(vararg resolves: ChannelResolve): ExchangeContractResolve {
val data = Json.encodeToString(resolves)
val signature = createSignature(data, privateKey)
return ExchangeContractResolve(
publicKey = publicKey,
signature = signature,
data = data
)
}
// IO methods
private fun post(query: String, body: String, contentType: String): String {
val url = URL("$server$query")
with(url.openConnection() as HttpURLConnection) {
requestMethod = "POST"
setRequestProperty("Content-Type", contentType)
doOutput = true
OutputStreamWriter(outputStream, StandardCharsets.UTF_8).use { it.write(body) }
InputStreamReader(inputStream, StandardCharsets.UTF_8).use {
return it.readText()
}
}
}
private suspend fun postAsync(query: String, body: String, contentType: String): String {
return withContext(Dispatchers.IO) {
post(query, body, contentType)
}
}
// Crypto methods
companion object {
fun createPrivateKey(): String {
val rsa = KeyFactory.getInstance("RSA")
val keyPairGenerator = KeyPairGenerator.getInstance("RSA");
keyPairGenerator.initialize(2048);
val keyPair = keyPairGenerator.generateKeyPair();
return Base64.getEncoder().encodeToString(keyPair.private.encoded);
}
fun extractPublicKey(privateKey: String): String {
val keySpec = PKCS8EncodedKeySpec(Base64.getDecoder().decode(privateKey))
val keyFactory = KeyFactory.getInstance("RSA")
val privateKeyObj = keyFactory.generatePrivate(keySpec) as RSAPrivateKey
val publicKeyObj: RSAPublicKey = keyFactory.generatePublic(keySpec) as RSAPublicKey;
return Base64.getEncoder().encodeToString(publicKeyObj.encoded)
}
fun createSignature(data: String, privateKey: String): String {
val keySpec = PKCS8EncodedKeySpec(Base64.getDecoder().decode(privateKey))
val keyFactory = KeyFactory.getInstance("RSA")
val rsaPrivateKey = keyFactory.generatePrivate(keySpec) as RSAPrivateKey
val signature = Signature.getInstance("SHA256withRSA")
signature.initSign(rsaPrivateKey)
signature.update(data.toByteArray(Charsets.UTF_8))
val signatureBytes = signature.sign()
return Base64.getEncoder().encodeToString(signatureBytes)
}
}
}

View file

@ -168,7 +168,7 @@
android:layout_height="wrap_content"
android:layout_marginTop="10dp"
android:background="@drawable/background_button_round"
android:hint="Seach.." />
android:hint="Search.." />
<LinearLayout
android:layout_width="match_parent"

View file

@ -412,6 +412,8 @@
<string name="background_switch_audio_description">Optimize bandwidth usage by switching to audio-only stream in background if available, may cause stutter</string>
<string name="subscription_group_menu">Groups</string>
<string name="show_subscription_group">Show Subscription Groups</string>
<string name="use_subscription_exchange">Use Subscription Exchange (Experimental)</string>
<string name="use_subscription_exchange_description">Uses a centralized crowd-sourced server to significantly reduce the required requests for subscriptions, in exchange you submit your subscriptions to the server.</string>
<string name="show_subscription_group_description">If subscription groups should be shown above your subscriptions to filter</string>
<string name="preview_feed_items">Preview Feed Items</string>
<string name="preview_feed_items_description">When the preview feedstyle is used, if items should auto-preview when scrolling over them</string>