From b3a3e459a4acc6163570ff458bf0af30c413c86b Mon Sep 17 00:00:00 2001 From: Kelvin Date: Wed, 1 Nov 2023 00:09:05 +0100 Subject: [PATCH] WIP Smart subscriptions --- .../activities/AddSourceOptionsActivity.kt | 2 + .../mainactivity/main/SourceDetailFragment.kt | 13 ++ .../platformplayer/models/Subscription.kt | 63 ++++++- .../states/StateSubscriptions.kt | 134 +------------- .../SimpleSubscriptionAlgorithm.kt | 2 + .../SmartSubscriptionAlgorithm.kt | 94 +++++++++- .../SubscriptionFetchAlgorithm.kt | 2 + .../SubscriptionsTaskFetchAlgorithm.kt | 171 ++++++++++++++++++ .../layout/activity_add_source_options.xml | 12 +- .../res/layout/fragment_source_detail.xml | 9 + 10 files changed, 360 insertions(+), 142 deletions(-) create mode 100644 app/src/main/java/com/futo/platformplayer/subscription/SubscriptionsTaskFetchAlgorithm.kt diff --git a/app/src/main/java/com/futo/platformplayer/activities/AddSourceOptionsActivity.kt b/app/src/main/java/com/futo/platformplayer/activities/AddSourceOptionsActivity.kt index f328039f..c42c5198 100644 --- a/app/src/main/java/com/futo/platformplayer/activities/AddSourceOptionsActivity.kt +++ b/app/src/main/java/com/futo/platformplayer/activities/AddSourceOptionsActivity.kt @@ -16,6 +16,7 @@ class AddSourceOptionsActivity : AppCompatActivity() { lateinit var _buttonQR: BigButton; lateinit var _buttonURL: BigButton; + lateinit var _buttonPlugins: BigButton; private val _qrCodeResultLauncher = registerForActivityResult(ActivityResultContracts.StartActivityForResult()) { result -> val scanResult = IntentIntegrator.parseActivityResult(result.resultCode, result.data) @@ -51,6 +52,7 @@ class AddSourceOptionsActivity : AppCompatActivity() { _buttonQR = findViewById(R.id.option_qr); _buttonURL = findViewById(R.id.option_url); + _buttonPlugins = findViewById(R.id.option_plugins); _buttonBack.setOnClickListener { finish(); diff --git a/app/src/main/java/com/futo/platformplayer/fragment/mainactivity/main/SourceDetailFragment.kt b/app/src/main/java/com/futo/platformplayer/fragment/mainactivity/main/SourceDetailFragment.kt index 66b4f33b..0693f9d8 100644 --- a/app/src/main/java/com/futo/platformplayer/fragment/mainactivity/main/SourceDetailFragment.kt +++ b/app/src/main/java/com/futo/platformplayer/fragment/mainactivity/main/SourceDetailFragment.kt @@ -63,6 +63,7 @@ class SourceDetailFragment : MainFragment() { private val _sourceHeader: SourceHeaderView; private val _sourceButtons: LinearLayout; + private val _sourceAdvancedButtons: LinearLayout; private val _layoutLoader: FrameLayout; private val _imageSpinner: ImageView; @@ -82,6 +83,7 @@ class SourceDetailFragment : MainFragment() { this.fragment = fragment; _sourceHeader = findViewById(R.id.source_header); _sourceButtons = findViewById(R.id.source_buttons); + _sourceAdvancedButtons = findViewById(R.id.advanced_source_buttons); _settingsAppForm = findViewById(R.id.source_app_setings); _settingsForm = findViewById(R.id.source_settings); _layoutLoader = findViewById(R.id.layout_loader); @@ -283,6 +285,17 @@ class SourceDetailFragment : MainFragment() { for (group in groups) { _sourceButtons.addView(group); } + + val advancedButtons = BigButtonGroup(c, "Advanced", + BigButton(c, "Edit Code", "Modify the source of this plugin", R.drawable.ic_code) { + + }.apply { + this.alpha = 0.5f; + } + ) + + _sourceAdvancedButtons.removeAllViews(); + _sourceAdvancedButtons.addView(advancedButtons); } diff --git a/app/src/main/java/com/futo/platformplayer/models/Subscription.kt b/app/src/main/java/com/futo/platformplayer/models/Subscription.kt index de6c5ee2..9579dfcf 100644 --- a/app/src/main/java/com/futo/platformplayer/models/Subscription.kt +++ b/app/src/main/java/com/futo/platformplayer/models/Subscription.kt @@ -1,5 +1,6 @@ package com.futo.platformplayer.models +import com.futo.platformplayer.api.media.models.ResultCapabilities import com.futo.platformplayer.api.media.models.channels.IPlatformChannel import com.futo.platformplayer.api.media.models.channels.SerializedChannel import com.futo.platformplayer.api.media.models.contents.IPlatformContent @@ -39,6 +40,7 @@ class Subscription { //Last video interval var uploadInterval : Int = 0; + var uploadStreamInterval : Int = 0; var uploadPostInterval : Int = 0; @@ -46,13 +48,68 @@ class Subscription { this.channel = channel; } - fun shouldFetchStreams() = lastLiveStream.getNowDiffDays() < 7; - fun shouldFetchLiveStreams() = lastLiveStream.getNowDiffDays() < 14; - fun shouldFetchPosts() = lastPost.getNowDiffDays() < 2; + fun shouldFetchVideos() = true; + fun shouldFetchStreams() = doFetchStreams && lastLiveStream.getNowDiffDays() < 7; + fun shouldFetchLiveStreams() = doFetchLive && lastLiveStream.getNowDiffDays() < 14; + fun shouldFetchPosts() = doFetchPosts && lastPost.getNowDiffDays() < 2; fun getClient() = StatePlatform.instance.getChannelClientOrNull(channel.url); fun updateChannel(channel: IPlatformChannel) { this.channel = SerializedChannel.fromChannel(channel); } + + fun updateSubscriptionState(type: String, initialPage: List) { + val interval: Int; + val mostRecent: OffsetDateTime?; + if(!initialPage.isEmpty()) { + val newestVideoDays = initialPage[0].datetime?.getNowDiffDays()?.toInt() ?: 0; + val diffs = mutableListOf() + for(i in (initialPage.size - 1) downTo 1) { + val currentVideoDays = initialPage[i].datetime?.getNowDiffDays(); + val nextVideoDays = initialPage[i - 1].datetime?.getNowDiffDays(); + + if(currentVideoDays != null && nextVideoDays != null) { + val diff = nextVideoDays - currentVideoDays; + diffs.add(diff.toInt()); + } + } + val averageDiff = if(diffs.size > 0) + newestVideoDays.coerceAtLeast(diffs.average().toInt()) + else + newestVideoDays; + interval = averageDiff.coerceAtLeast(1); + mostRecent = initialPage[0].datetime; + } + else { + interval = 5; + mostRecent = null; + } + when(type) { + ResultCapabilities.TYPE_VIDEOS -> { + uploadInterval = interval; + if(mostRecent != null) + lastVideo = mostRecent; + lastVideoUpdate = OffsetDateTime.now(); + } + ResultCapabilities.TYPE_MIXED -> { + uploadInterval = interval; + if(mostRecent != null) + lastVideo = mostRecent; + lastVideoUpdate = OffsetDateTime.now(); + } + ResultCapabilities.TYPE_STREAMS -> { + uploadStreamInterval = interval; + if(mostRecent != null) + lastLiveStream = mostRecent; + lastStreamUpdate = OffsetDateTime.now(); + } + ResultCapabilities.TYPE_POSTS -> { + uploadPostInterval = interval; + if(mostRecent != null) + lastPost = mostRecent; + lastPostUpdate = OffsetDateTime.now(); + } + } + } } \ No newline at end of file diff --git a/app/src/main/java/com/futo/platformplayer/states/StateSubscriptions.kt b/app/src/main/java/com/futo/platformplayer/states/StateSubscriptions.kt index 40b153e9..a6274d26 100644 --- a/app/src/main/java/com/futo/platformplayer/states/StateSubscriptions.kt +++ b/app/src/main/java/com/futo/platformplayer/states/StateSubscriptions.kt @@ -63,7 +63,7 @@ class StateSubscriptions { var globalSubscriptionExceptions: List = listOf() private set; - private val _algorithmSubscriptions = SubscriptionFetchAlgorithms.SIMPLE; + private val _algorithmSubscriptions = SubscriptionFetchAlgorithms.SMART; private var _lastGlobalSubscriptionProgress: Int = 0; private var _lastGlobalSubscriptionTotal: Int = 0; @@ -248,138 +248,6 @@ class StateSubscriptions { val result = algo.getSubscriptions(subUrls); return Pair(result.pager, result.exceptions); - /* - val subsPager: Array>; - val exs: ArrayList = arrayListOf(); - - val tasks = mutableListOf?>>>(); - var finished = 0; - val exceptionMap: HashMap = hashMapOf(); - val concurrency = Settings.instance.subscriptions.getSubscriptionsConcurrency(); - val failedPlugins = arrayListOf(); - for (sub in getSubscriptions().filter { StatePlatform.instance.hasEnabledChannelClient(it.channel.url) }) { - tasks.add(_subscriptionsPool.submit?>> { - val toIgnore = synchronized(failedPlugins){ failedPlugins.toList() }; - - var polycentricProfile : PolycentricCache.CachedPolycentricProfile? = null; - val getProfileTime = measureTimeMillis { - try { - polycentricProfile = PolycentricCache.instance.getCachedProfile(sub.channel.url); - if (polycentricProfile == null) { - Logger.i("StateSubscriptions", "Get polycentric profile not cached"); - polycentricProfile = runBlocking { PolycentricCache.instance.getProfileAsync(sub.channel.id) }; - } else { - Logger.i("StateSubscriptions", "Get polycentric profile cached"); - } - } - catch(ex: Throwable) { - Logger.w(TAG, "Polycentric getCachedProfile failed for subscriptions", ex); - //TODO: Some way to communicate polycentric failing without blocking here - //UIDialogs.toast("Polycentric failed\n" + ex.message, false); - //UIDialogs.showGeneralErrorDialog(it, "Polycentric getCachedProfile failed for subscriptions", ex); - } - } - - Logger.i("StateSubscriptions", "Get polycentric profile time ${getProfileTime}ms"); - - var pager: IPager; - try { - val time = measureTimeMillis { - val profile = polycentricProfile?.profile - pager = if (profile != null) - StatePolycentric.instance.getChannelContent(profile, true, concurrency, toIgnore) - else - StatePlatform.instance.getChannelContent(sub.channel.url, true, concurrency, toIgnore); - - if (cacheScope != null) - pager = ChannelContentCache.cachePagerResults(cacheScope, pager) { - onNewCacheHit?.invoke(sub, it); - }; - - finished++; - onProgress?.invoke(finished, tasks.size); - } - Logger.i( - "StateSubscriptions", - "Subscription [${sub.channel.name}] results in ${time}ms" - ); - } - catch(ex: Throwable) { - Logger.e(TAG, "Subscription [${sub.channel.name}] failed", ex); - finished++; - onProgress?.invoke(finished, tasks.size); - val channelEx = ChannelException(sub.channel, ex); - synchronized(exceptionMap) { - exceptionMap.put(sub, channelEx); - } - if(ex is ScriptCaptchaRequiredException) { - synchronized(failedPlugins) { - //Fail all subscription calls to plugin if it has a captcha issue - if(ex.config is SourcePluginConfig && !failedPlugins.contains(ex.config.id)) { - Logger.w(TAG, "Subscriptionsgnoring plugin [${ex.config.name}] due to Captcha"); - failedPlugins.add(ex.config.id); - } - } - } - else if(ex is ScriptCriticalException) { - synchronized(failedPlugins) { - //Fail all subscription calls to plugin if it has a critical issue - if(ex.config is SourcePluginConfig && !failedPlugins.contains(ex.config.id)) { - Logger.w(TAG, "Subscriptions ignoring plugin [${ex.config.name}] due to critical exception:\n" + ex.message); - failedPlugins.add(ex.config.id); - } - } - } - if(!withCacheFallback) - throw channelEx; - else { - Logger.i(TAG, "Channel ${sub.channel.name} failed, substituting with cache"); - pager = ChannelContentCache.instance.getChannelCachePager(sub.channel.url); - } - } - return@submit Pair(sub, pager); - }); - } - val timeTotal = measureTimeMillis { - val taskResults = arrayListOf>(); - for(task in tasks) { - try { - val result = task.get(); - if(result != null) { - if(result.second != null) - taskResults.add(result.second!!); - if(exceptionMap.containsKey(result.first)) { - val ex = exceptionMap[result.first]; - if(ex != null) { - val nonRuntimeEx = findNonRuntimeException(ex); - if (nonRuntimeEx != null && (nonRuntimeEx is PluginException || nonRuntimeEx is ChannelException)) - exs.add(nonRuntimeEx); - else - throw ex.cause ?: ex; - } - } - } - } catch (ex: ExecutionException) { - val nonRuntimeEx = findNonRuntimeException(ex.cause); - if(nonRuntimeEx != null && (nonRuntimeEx is PluginException || nonRuntimeEx is ChannelException)) - exs.add(nonRuntimeEx); - else - throw ex.cause ?: ex; - }; - } - subsPager = taskResults.toTypedArray(); - } - Logger.i("StateSubscriptions", "Subscriptions results in ${timeTotal}ms") - - if(subsPager.size <= 0 && exs.any()) - throw exs.first(); - - Logger.i(TAG, "Subscription pager with ${subsPager.size} channels"); - val pager = MultiChronoContentPager(subsPager, allowFailure, 15); - pager.initialize(); - //return Pair(pager, exs); - return Pair(DedupContentPager(pager), exs); - */ } //New Migration diff --git a/app/src/main/java/com/futo/platformplayer/subscription/SimpleSubscriptionAlgorithm.kt b/app/src/main/java/com/futo/platformplayer/subscription/SimpleSubscriptionAlgorithm.kt index 86965492..af96ffcd 100644 --- a/app/src/main/java/com/futo/platformplayer/subscription/SimpleSubscriptionAlgorithm.kt +++ b/app/src/main/java/com/futo/platformplayer/subscription/SimpleSubscriptionAlgorithm.kt @@ -66,6 +66,8 @@ class SimpleSubscriptionAlgorithm( val subsPager: Array>; val exs: ArrayList = arrayListOf(); + Logger.i(TAG, "getSubscriptions [Simple]"); + val tasks = mutableListOf?>>>(); var finished = 0; val exceptionMap: HashMap = hashMapOf(); diff --git a/app/src/main/java/com/futo/platformplayer/subscription/SmartSubscriptionAlgorithm.kt b/app/src/main/java/com/futo/platformplayer/subscription/SmartSubscriptionAlgorithm.kt index 77889254..60e2bb5e 100644 --- a/app/src/main/java/com/futo/platformplayer/subscription/SmartSubscriptionAlgorithm.kt +++ b/app/src/main/java/com/futo/platformplayer/subscription/SmartSubscriptionAlgorithm.kt @@ -1,9 +1,14 @@ package com.futo.platformplayer.subscription +import com.futo.platformplayer.api.media.models.ResultCapabilities import com.futo.platformplayer.api.media.models.contents.IPlatformContent import com.futo.platformplayer.api.media.platforms.js.JSClient import com.futo.platformplayer.api.media.structures.IPager +import com.futo.platformplayer.getNowDiffDays +import com.futo.platformplayer.getNowDiffHours +import com.futo.platformplayer.logging.Logger import com.futo.platformplayer.models.Subscription +import com.futo.platformplayer.states.StatePlatform import kotlinx.coroutines.CoroutineScope import java.util.concurrent.ForkJoinPool @@ -12,12 +17,91 @@ class SmartSubscriptionAlgorithm( allowFailure: Boolean = false, withCacheFallback: Boolean = true, threadPool: ForkJoinPool? = null -): SubscriptionFetchAlgorithm(scope, allowFailure, withCacheFallback, threadPool) { - override fun countRequests(subs: Map>): Map { - TODO("Not yet implemented") +): SubscriptionsTaskFetchAlgorithm(scope, allowFailure, withCacheFallback, threadPool) { + override fun getSubscriptionTasks(subs: Map>): List { + val allTasks: List = subs.flatMap { entry -> + val sub = entry.key; + //Get all urls associated with this subscriptions + val allPlatforms = entry.value.associateWith { StatePlatform.instance.getChannelClientOrNull(it) } + .filterValues { it is JSClient }; + + //For every platform, get all sub-queries associated with that platform + return@flatMap allPlatforms + .filter { it.value != null } + .flatMap { + val url = it.key; + val client = it.value!! as JSClient; + val capabilities = client.getChannelCapabilities(); + + if(capabilities.hasType(ResultCapabilities.TYPE_MIXED)) + return@flatMap listOf(SubscriptionTask(client, sub, it.key, ResultCapabilities.TYPE_MIXED)); + else { + val types = listOf( + if(sub.shouldFetchVideos()) ResultCapabilities.TYPE_VIDEOS else null, + if(sub.shouldFetchStreams()) ResultCapabilities.TYPE_STREAMS else null, + if(sub.shouldFetchPosts()) ResultCapabilities.TYPE_POSTS else null, + if(sub.shouldFetchLiveStreams()) ResultCapabilities.TYPE_LIVE else null + ).filterNotNull().filter { capabilities.hasType(it) }; + return@flatMap types.map { + SubscriptionTask(client, sub, url, it); + }; + } + }; + }; + + val ordering = allTasks.groupBy { it.client } + .map { Pair(it.key, it.value.sortedBy { calculateUpdateUrgency(it.sub, it.type) }) }; + + val finalTasks = mutableListOf(); + + + for(clientTasks in ordering) { + val limit = clientTasks.first.config.subscriptionRateLimit; + if(limit == null || limit <= 0) + finalTasks.addAll(clientTasks.second); + else { + val fetchTasks = clientTasks.second.take(limit); + val cacheTasks = clientTasks.second.drop(limit); + + for(cacheTask in cacheTasks) + cacheTask.fromCache = true; + + Logger.i(TAG, "Subscription Client Budget [${clientTasks.first.name}]: ${fetchTasks.size}/${limit}") + + finalTasks.addAll(fetchTasks + cacheTasks); + } + } + + return finalTasks; } - override fun getSubscriptions(subs: Map>): Result { - TODO("Not yet implemented") + + fun calculateUpdateUrgency(sub: Subscription, type: String): Int { + val lastItem = when(type) { + ResultCapabilities.TYPE_VIDEOS -> sub.lastVideo; + ResultCapabilities.TYPE_STREAMS -> sub.lastLiveStream; + ResultCapabilities.TYPE_LIVE -> sub.lastLiveStream; + ResultCapabilities.TYPE_POSTS -> sub.lastPost; + else -> sub.lastVideo; //TODO: minimum of all + }; + val lastUpdate = when(type) { + ResultCapabilities.TYPE_VIDEOS -> sub.lastVideoUpdate; + ResultCapabilities.TYPE_STREAMS -> sub.lastLiveStreamUpdate; + ResultCapabilities.TYPE_LIVE -> sub.lastLiveStreamUpdate; + ResultCapabilities.TYPE_POSTS -> sub.lastPostUpdate; + else -> sub.lastVideoUpdate; //TODO: minimum of all + }; + val interval = when(type) { + ResultCapabilities.TYPE_VIDEOS -> sub.uploadInterval; + ResultCapabilities.TYPE_STREAMS -> sub.uploadStreamInterval; + ResultCapabilities.TYPE_LIVE -> sub.uploadStreamInterval; + ResultCapabilities.TYPE_POSTS -> sub.uploadPostInterval; + else -> sub.uploadInterval; //TODO: minimum of all + }; + val lastItemDaysAgo = lastItem.getNowDiffHours(); + val lastUpdateHoursAgo = lastUpdate.getNowDiffHours(); + val expectedHours = lastUpdateHoursAgo.toDouble() - (interval*24); + + return (expectedHours * 100).toInt(); } } \ No newline at end of file diff --git a/app/src/main/java/com/futo/platformplayer/subscription/SubscriptionFetchAlgorithm.kt b/app/src/main/java/com/futo/platformplayer/subscription/SubscriptionFetchAlgorithm.kt index 187cf0e4..4df2b6fd 100644 --- a/app/src/main/java/com/futo/platformplayer/subscription/SubscriptionFetchAlgorithm.kt +++ b/app/src/main/java/com/futo/platformplayer/subscription/SubscriptionFetchAlgorithm.kt @@ -34,6 +34,8 @@ abstract class SubscriptionFetchAlgorithm( ); companion object { + public val TAG = "SubscriptionAlgorithm"; + fun getAlgorithm(algo: SubscriptionFetchAlgorithms, scope: CoroutineScope, allowFailure: Boolean = false, withCacheFallback: Boolean = false, pool: ForkJoinPool? = null): SubscriptionFetchAlgorithm { return when(algo) { SubscriptionFetchAlgorithms.CACHE -> CachedSubscriptionAlgorithm(150, scope, allowFailure, withCacheFallback, pool); diff --git a/app/src/main/java/com/futo/platformplayer/subscription/SubscriptionsTaskFetchAlgorithm.kt b/app/src/main/java/com/futo/platformplayer/subscription/SubscriptionsTaskFetchAlgorithm.kt new file mode 100644 index 00000000..2a9de5bc --- /dev/null +++ b/app/src/main/java/com/futo/platformplayer/subscription/SubscriptionsTaskFetchAlgorithm.kt @@ -0,0 +1,171 @@ +package com.futo.platformplayer.subscription + +import com.futo.platformplayer.Settings +import com.futo.platformplayer.UIDialogs +import com.futo.platformplayer.api.media.models.ResultCapabilities +import com.futo.platformplayer.api.media.models.contents.IPlatformContent +import com.futo.platformplayer.api.media.platforms.js.JSClient +import com.futo.platformplayer.api.media.platforms.js.SourcePluginConfig +import com.futo.platformplayer.api.media.structures.DedupContentPager +import com.futo.platformplayer.api.media.structures.IPager +import com.futo.platformplayer.api.media.structures.MultiChronoContentPager +import com.futo.platformplayer.cache.ChannelContentCache +import com.futo.platformplayer.engine.exceptions.PluginException +import com.futo.platformplayer.engine.exceptions.ScriptCaptchaRequiredException +import com.futo.platformplayer.engine.exceptions.ScriptCriticalException +import com.futo.platformplayer.exceptions.ChannelException +import com.futo.platformplayer.findNonRuntimeException +import com.futo.platformplayer.logging.Logger +import com.futo.platformplayer.models.Subscription +import com.futo.platformplayer.states.StatePlatform +import com.futo.platformplayer.states.StateSubscriptions +import kotlinx.coroutines.CoroutineScope +import java.lang.IllegalStateException +import java.util.concurrent.ExecutionException +import java.util.concurrent.ForkJoinPool +import java.util.concurrent.ForkJoinTask +import kotlin.system.measureTimeMillis + +abstract class SubscriptionsTaskFetchAlgorithm( + scope: CoroutineScope, + allowFailure: Boolean = false, + withCacheFallback: Boolean = true, + _threadPool: ForkJoinPool? = null +) : SubscriptionFetchAlgorithm(scope, allowFailure, withCacheFallback, _threadPool) { + + + override fun countRequests(subs: Map>): Map { + return getSubscriptionTasks(subs).groupBy { it.client }.toList() + .associate { Pair(it.first, it.second.filter { !it.fromCache }.size) }; + } + + override fun getSubscriptions(subs: Map>): Result { + val tasks = getSubscriptionTasks(subs); + + Logger.i(TAG, "Starting Subscriptions Fetch:\n" + + " Tasks: ${tasks.filter { !it.fromCache }.size}\n" + + " Cached: ${tasks.filter { it.fromCache }.size}"); + try { + //TODO: Remove this + UIDialogs.toast("Tasks: ${tasks.filter { !it.fromCache }.size}\n" + + "Cached: ${tasks.filter { it.fromCache }.size}", false); + } catch (ex: Throwable){} + + val exs: ArrayList = arrayListOf(); + val taskResults = arrayListOf>(); + + val forkTasks = mutableListOf>(); + var finished = 0; + val exceptionMap: HashMap = hashMapOf(); + val concurrency = Settings.instance.subscriptions.getSubscriptionsConcurrency(); + val failedPlugins = arrayListOf(); + val cachedChannels = arrayListOf(); + + for(task in tasks) { + val forkTask = threadPool.submit { + synchronized(cachedChannels) { + if(task.fromCache) { + finished++; + onProgress.emit(finished, forkTasks.size); + if(cachedChannels.contains(task.url)) + return@submit SubscriptionTaskResult(task, null, null); + else { + cachedChannels.add(task.url); + return@submit SubscriptionTaskResult(task, ChannelContentCache.instance.getChannelCachePager(task.url), null); + } + } + } + + val shouldIgnore = synchronized(failedPlugins) { failedPlugins.contains(task.client.id) }; + if(shouldIgnore) + return@submit SubscriptionTaskResult(task, null, null); //skipped + + var taskEx: Throwable? = null; + var pager: IPager; + try { + val time = measureTimeMillis { + pager = StatePlatform.instance.getChannelContent(task.client, + task.url, task.type, ResultCapabilities.ORDER_CHONOLOGICAL); + + pager = ChannelContentCache.cachePagerResults(scope, pager) { + onNewCacheHit.emit(task.sub, it); + }; + + val initialPage = pager.getResults(); + task.sub.updateSubscriptionState(task.type, initialPage); + StateSubscriptions.instance.saveSubscription(task.sub); + + finished++; + onProgress.emit(finished, forkTasks.size); + } + Logger.i("StateSubscriptions", "Subscription [${task.sub.channel.name}]:${task.type} results in ${time}ms"); + return@submit SubscriptionTaskResult(task, pager, null); + } catch (ex: Throwable) { + Logger.e(StateSubscriptions.TAG, "Subscription [${task.sub.channel.name}] failed", ex); + val channelEx = ChannelException(task.sub.channel, ex); + finished++; + onProgress.emit(finished, forkTasks.size); + if (!withCacheFallback) + throw channelEx; + else { + Logger.i(StateSubscriptions.TAG, "Channel ${task.sub.channel.name} failed, substituting with cache"); + pager = ChannelContentCache.instance.getChannelCachePager(task.sub.channel.url); + taskEx = ex; + } + } + return@submit SubscriptionTaskResult(task, null, taskEx); + } + forkTasks.add(forkTask); + } + + val timeTotal = measureTimeMillis { + for(task in forkTasks) { + try { + val result = task.get(); + if(result != null) { + if(result.pager != null) + taskResults.add(result.pager!!); + if(exceptionMap.containsKey(result.task.sub)) { + val ex = exceptionMap[result.task.sub]; + if(ex != null) { + val nonRuntimeEx = findNonRuntimeException(ex); + if (nonRuntimeEx != null && (nonRuntimeEx is PluginException || nonRuntimeEx is ChannelException)) + exs.add(nonRuntimeEx); + else + throw ex.cause ?: ex; + } + } + } + } catch (ex: ExecutionException) { + val nonRuntimeEx = findNonRuntimeException(ex.cause); + if(nonRuntimeEx != null && (nonRuntimeEx is PluginException || nonRuntimeEx is ChannelException)) + exs.add(nonRuntimeEx); + else + throw ex.cause ?: ex; + }; + } + } + Logger.i("StateSubscriptions", "Subscriptions results in ${timeTotal}ms") + val pager = MultiChronoContentPager(taskResults, allowFailure, 15); + pager.initialize(); + + return Result(DedupContentPager(pager), exs); + } + + abstract fun getSubscriptionTasks(subs: Map>): List; + + + class SubscriptionTask( + val client: JSClient, + val sub: Subscription, + val url: String, + val type: String, + var fromCache: Boolean = false + ); + + class SubscriptionTaskResult( + val task: SubscriptionTask, + val pager: IPager?, + val exception: Throwable? + ) +} \ No newline at end of file diff --git a/app/src/main/res/layout/activity_add_source_options.xml b/app/src/main/res/layout/activity_add_source_options.xml index 9df6f1d3..b3a7f8d6 100644 --- a/app/src/main/res/layout/activity_add_source_options.xml +++ b/app/src/main/res/layout/activity_add_source_options.xml @@ -61,8 +61,18 @@ android:layout_height="wrap_content" android:alpha="0.5" android:layout_marginTop="5dp" - android:layout_marginBottom="20dp" + android:layout_marginBottom="5dp" app:buttonText="@string/install_by_url" app:buttonSubText="@string/enter_url_explain" app:buttonIcon="@drawable/ic_link" /> + \ No newline at end of file diff --git a/app/src/main/res/layout/fragment_source_detail.xml b/app/src/main/res/layout/fragment_source_detail.xml index 806a630e..b40628cd 100644 --- a/app/src/main/res/layout/fragment_source_detail.xml +++ b/app/src/main/res/layout/fragment_source_detail.xml @@ -57,6 +57,15 @@ + + + +