mirror of
https://gitlab.futo.org/videostreaming/grayjay.git
synced 2025-04-20 03:24:50 +00:00
WIP Smart subscriptions
This commit is contained in:
parent
f234564952
commit
b3a3e459a4
10 changed files with 360 additions and 142 deletions
|
@ -16,6 +16,7 @@ class AddSourceOptionsActivity : AppCompatActivity() {
|
|||
|
||||
lateinit var _buttonQR: BigButton;
|
||||
lateinit var _buttonURL: BigButton;
|
||||
lateinit var _buttonPlugins: BigButton;
|
||||
|
||||
private val _qrCodeResultLauncher = registerForActivityResult(ActivityResultContracts.StartActivityForResult()) { result ->
|
||||
val scanResult = IntentIntegrator.parseActivityResult(result.resultCode, result.data)
|
||||
|
@ -51,6 +52,7 @@ class AddSourceOptionsActivity : AppCompatActivity() {
|
|||
|
||||
_buttonQR = findViewById(R.id.option_qr);
|
||||
_buttonURL = findViewById(R.id.option_url);
|
||||
_buttonPlugins = findViewById(R.id.option_plugins);
|
||||
|
||||
_buttonBack.setOnClickListener {
|
||||
finish();
|
||||
|
|
|
@ -63,6 +63,7 @@ class SourceDetailFragment : MainFragment() {
|
|||
|
||||
private val _sourceHeader: SourceHeaderView;
|
||||
private val _sourceButtons: LinearLayout;
|
||||
private val _sourceAdvancedButtons: LinearLayout;
|
||||
private val _layoutLoader: FrameLayout;
|
||||
private val _imageSpinner: ImageView;
|
||||
|
||||
|
@ -82,6 +83,7 @@ class SourceDetailFragment : MainFragment() {
|
|||
this.fragment = fragment;
|
||||
_sourceHeader = findViewById(R.id.source_header);
|
||||
_sourceButtons = findViewById(R.id.source_buttons);
|
||||
_sourceAdvancedButtons = findViewById(R.id.advanced_source_buttons);
|
||||
_settingsAppForm = findViewById(R.id.source_app_setings);
|
||||
_settingsForm = findViewById(R.id.source_settings);
|
||||
_layoutLoader = findViewById(R.id.layout_loader);
|
||||
|
@ -283,6 +285,17 @@ class SourceDetailFragment : MainFragment() {
|
|||
for (group in groups) {
|
||||
_sourceButtons.addView(group);
|
||||
}
|
||||
|
||||
val advancedButtons = BigButtonGroup(c, "Advanced",
|
||||
BigButton(c, "Edit Code", "Modify the source of this plugin", R.drawable.ic_code) {
|
||||
|
||||
}.apply {
|
||||
this.alpha = 0.5f;
|
||||
}
|
||||
)
|
||||
|
||||
_sourceAdvancedButtons.removeAllViews();
|
||||
_sourceAdvancedButtons.addView(advancedButtons);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.futo.platformplayer.models
|
||||
|
||||
import com.futo.platformplayer.api.media.models.ResultCapabilities
|
||||
import com.futo.platformplayer.api.media.models.channels.IPlatformChannel
|
||||
import com.futo.platformplayer.api.media.models.channels.SerializedChannel
|
||||
import com.futo.platformplayer.api.media.models.contents.IPlatformContent
|
||||
|
@ -39,6 +40,7 @@ class Subscription {
|
|||
|
||||
//Last video interval
|
||||
var uploadInterval : Int = 0;
|
||||
var uploadStreamInterval : Int = 0;
|
||||
var uploadPostInterval : Int = 0;
|
||||
|
||||
|
||||
|
@ -46,13 +48,68 @@ class Subscription {
|
|||
this.channel = channel;
|
||||
}
|
||||
|
||||
fun shouldFetchStreams() = lastLiveStream.getNowDiffDays() < 7;
|
||||
fun shouldFetchLiveStreams() = lastLiveStream.getNowDiffDays() < 14;
|
||||
fun shouldFetchPosts() = lastPost.getNowDiffDays() < 2;
|
||||
fun shouldFetchVideos() = true;
|
||||
fun shouldFetchStreams() = doFetchStreams && lastLiveStream.getNowDiffDays() < 7;
|
||||
fun shouldFetchLiveStreams() = doFetchLive && lastLiveStream.getNowDiffDays() < 14;
|
||||
fun shouldFetchPosts() = doFetchPosts && lastPost.getNowDiffDays() < 2;
|
||||
|
||||
fun getClient() = StatePlatform.instance.getChannelClientOrNull(channel.url);
|
||||
|
||||
fun updateChannel(channel: IPlatformChannel) {
|
||||
this.channel = SerializedChannel.fromChannel(channel);
|
||||
}
|
||||
|
||||
fun updateSubscriptionState(type: String, initialPage: List<IPlatformContent>) {
|
||||
val interval: Int;
|
||||
val mostRecent: OffsetDateTime?;
|
||||
if(!initialPage.isEmpty()) {
|
||||
val newestVideoDays = initialPage[0].datetime?.getNowDiffDays()?.toInt() ?: 0;
|
||||
val diffs = mutableListOf<Int>()
|
||||
for(i in (initialPage.size - 1) downTo 1) {
|
||||
val currentVideoDays = initialPage[i].datetime?.getNowDiffDays();
|
||||
val nextVideoDays = initialPage[i - 1].datetime?.getNowDiffDays();
|
||||
|
||||
if(currentVideoDays != null && nextVideoDays != null) {
|
||||
val diff = nextVideoDays - currentVideoDays;
|
||||
diffs.add(diff.toInt());
|
||||
}
|
||||
}
|
||||
val averageDiff = if(diffs.size > 0)
|
||||
newestVideoDays.coerceAtLeast(diffs.average().toInt())
|
||||
else
|
||||
newestVideoDays;
|
||||
interval = averageDiff.coerceAtLeast(1);
|
||||
mostRecent = initialPage[0].datetime;
|
||||
}
|
||||
else {
|
||||
interval = 5;
|
||||
mostRecent = null;
|
||||
}
|
||||
when(type) {
|
||||
ResultCapabilities.TYPE_VIDEOS -> {
|
||||
uploadInterval = interval;
|
||||
if(mostRecent != null)
|
||||
lastVideo = mostRecent;
|
||||
lastVideoUpdate = OffsetDateTime.now();
|
||||
}
|
||||
ResultCapabilities.TYPE_MIXED -> {
|
||||
uploadInterval = interval;
|
||||
if(mostRecent != null)
|
||||
lastVideo = mostRecent;
|
||||
lastVideoUpdate = OffsetDateTime.now();
|
||||
}
|
||||
ResultCapabilities.TYPE_STREAMS -> {
|
||||
uploadStreamInterval = interval;
|
||||
if(mostRecent != null)
|
||||
lastLiveStream = mostRecent;
|
||||
lastStreamUpdate = OffsetDateTime.now();
|
||||
}
|
||||
ResultCapabilities.TYPE_POSTS -> {
|
||||
uploadPostInterval = interval;
|
||||
if(mostRecent != null)
|
||||
lastPost = mostRecent;
|
||||
lastPostUpdate = OffsetDateTime.now();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -63,7 +63,7 @@ class StateSubscriptions {
|
|||
var globalSubscriptionExceptions: List<Throwable> = listOf()
|
||||
private set;
|
||||
|
||||
private val _algorithmSubscriptions = SubscriptionFetchAlgorithms.SIMPLE;
|
||||
private val _algorithmSubscriptions = SubscriptionFetchAlgorithms.SMART;
|
||||
|
||||
private var _lastGlobalSubscriptionProgress: Int = 0;
|
||||
private var _lastGlobalSubscriptionTotal: Int = 0;
|
||||
|
@ -248,138 +248,6 @@ class StateSubscriptions {
|
|||
|
||||
val result = algo.getSubscriptions(subUrls);
|
||||
return Pair(result.pager, result.exceptions);
|
||||
/*
|
||||
val subsPager: Array<IPager<IPlatformContent>>;
|
||||
val exs: ArrayList<Throwable> = arrayListOf();
|
||||
|
||||
val tasks = mutableListOf<ForkJoinTask<Pair<Subscription, IPager<IPlatformContent>?>>>();
|
||||
var finished = 0;
|
||||
val exceptionMap: HashMap<Subscription, Throwable> = hashMapOf();
|
||||
val concurrency = Settings.instance.subscriptions.getSubscriptionsConcurrency();
|
||||
val failedPlugins = arrayListOf<String>();
|
||||
for (sub in getSubscriptions().filter { StatePlatform.instance.hasEnabledChannelClient(it.channel.url) }) {
|
||||
tasks.add(_subscriptionsPool.submit<Pair<Subscription, IPager<IPlatformContent>?>> {
|
||||
val toIgnore = synchronized(failedPlugins){ failedPlugins.toList() };
|
||||
|
||||
var polycentricProfile : PolycentricCache.CachedPolycentricProfile? = null;
|
||||
val getProfileTime = measureTimeMillis {
|
||||
try {
|
||||
polycentricProfile = PolycentricCache.instance.getCachedProfile(sub.channel.url);
|
||||
if (polycentricProfile == null) {
|
||||
Logger.i("StateSubscriptions", "Get polycentric profile not cached");
|
||||
polycentricProfile = runBlocking { PolycentricCache.instance.getProfileAsync(sub.channel.id) };
|
||||
} else {
|
||||
Logger.i("StateSubscriptions", "Get polycentric profile cached");
|
||||
}
|
||||
}
|
||||
catch(ex: Throwable) {
|
||||
Logger.w(TAG, "Polycentric getCachedProfile failed for subscriptions", ex);
|
||||
//TODO: Some way to communicate polycentric failing without blocking here
|
||||
//UIDialogs.toast("Polycentric failed\n" + ex.message, false);
|
||||
//UIDialogs.showGeneralErrorDialog(it, "Polycentric getCachedProfile failed for subscriptions", ex);
|
||||
}
|
||||
}
|
||||
|
||||
Logger.i("StateSubscriptions", "Get polycentric profile time ${getProfileTime}ms");
|
||||
|
||||
var pager: IPager<IPlatformContent>;
|
||||
try {
|
||||
val time = measureTimeMillis {
|
||||
val profile = polycentricProfile?.profile
|
||||
pager = if (profile != null)
|
||||
StatePolycentric.instance.getChannelContent(profile, true, concurrency, toIgnore)
|
||||
else
|
||||
StatePlatform.instance.getChannelContent(sub.channel.url, true, concurrency, toIgnore);
|
||||
|
||||
if (cacheScope != null)
|
||||
pager = ChannelContentCache.cachePagerResults(cacheScope, pager) {
|
||||
onNewCacheHit?.invoke(sub, it);
|
||||
};
|
||||
|
||||
finished++;
|
||||
onProgress?.invoke(finished, tasks.size);
|
||||
}
|
||||
Logger.i(
|
||||
"StateSubscriptions",
|
||||
"Subscription [${sub.channel.name}] results in ${time}ms"
|
||||
);
|
||||
}
|
||||
catch(ex: Throwable) {
|
||||
Logger.e(TAG, "Subscription [${sub.channel.name}] failed", ex);
|
||||
finished++;
|
||||
onProgress?.invoke(finished, tasks.size);
|
||||
val channelEx = ChannelException(sub.channel, ex);
|
||||
synchronized(exceptionMap) {
|
||||
exceptionMap.put(sub, channelEx);
|
||||
}
|
||||
if(ex is ScriptCaptchaRequiredException) {
|
||||
synchronized(failedPlugins) {
|
||||
//Fail all subscription calls to plugin if it has a captcha issue
|
||||
if(ex.config is SourcePluginConfig && !failedPlugins.contains(ex.config.id)) {
|
||||
Logger.w(TAG, "Subscriptionsgnoring plugin [${ex.config.name}] due to Captcha");
|
||||
failedPlugins.add(ex.config.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if(ex is ScriptCriticalException) {
|
||||
synchronized(failedPlugins) {
|
||||
//Fail all subscription calls to plugin if it has a critical issue
|
||||
if(ex.config is SourcePluginConfig && !failedPlugins.contains(ex.config.id)) {
|
||||
Logger.w(TAG, "Subscriptions ignoring plugin [${ex.config.name}] due to critical exception:\n" + ex.message);
|
||||
failedPlugins.add(ex.config.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
if(!withCacheFallback)
|
||||
throw channelEx;
|
||||
else {
|
||||
Logger.i(TAG, "Channel ${sub.channel.name} failed, substituting with cache");
|
||||
pager = ChannelContentCache.instance.getChannelCachePager(sub.channel.url);
|
||||
}
|
||||
}
|
||||
return@submit Pair(sub, pager);
|
||||
});
|
||||
}
|
||||
val timeTotal = measureTimeMillis {
|
||||
val taskResults = arrayListOf<IPager<IPlatformContent>>();
|
||||
for(task in tasks) {
|
||||
try {
|
||||
val result = task.get();
|
||||
if(result != null) {
|
||||
if(result.second != null)
|
||||
taskResults.add(result.second!!);
|
||||
if(exceptionMap.containsKey(result.first)) {
|
||||
val ex = exceptionMap[result.first];
|
||||
if(ex != null) {
|
||||
val nonRuntimeEx = findNonRuntimeException(ex);
|
||||
if (nonRuntimeEx != null && (nonRuntimeEx is PluginException || nonRuntimeEx is ChannelException))
|
||||
exs.add(nonRuntimeEx);
|
||||
else
|
||||
throw ex.cause ?: ex;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (ex: ExecutionException) {
|
||||
val nonRuntimeEx = findNonRuntimeException(ex.cause);
|
||||
if(nonRuntimeEx != null && (nonRuntimeEx is PluginException || nonRuntimeEx is ChannelException))
|
||||
exs.add(nonRuntimeEx);
|
||||
else
|
||||
throw ex.cause ?: ex;
|
||||
};
|
||||
}
|
||||
subsPager = taskResults.toTypedArray();
|
||||
}
|
||||
Logger.i("StateSubscriptions", "Subscriptions results in ${timeTotal}ms")
|
||||
|
||||
if(subsPager.size <= 0 && exs.any())
|
||||
throw exs.first();
|
||||
|
||||
Logger.i(TAG, "Subscription pager with ${subsPager.size} channels");
|
||||
val pager = MultiChronoContentPager(subsPager, allowFailure, 15);
|
||||
pager.initialize();
|
||||
//return Pair(pager, exs);
|
||||
return Pair(DedupContentPager(pager), exs);
|
||||
*/
|
||||
}
|
||||
|
||||
//New Migration
|
||||
|
|
|
@ -66,6 +66,8 @@ class SimpleSubscriptionAlgorithm(
|
|||
val subsPager: Array<IPager<IPlatformContent>>;
|
||||
val exs: ArrayList<Throwable> = arrayListOf();
|
||||
|
||||
Logger.i(TAG, "getSubscriptions [Simple]");
|
||||
|
||||
val tasks = mutableListOf<ForkJoinTask<Pair<Subscription, IPager<IPlatformContent>?>>>();
|
||||
var finished = 0;
|
||||
val exceptionMap: HashMap<Subscription, Throwable> = hashMapOf();
|
||||
|
|
|
@ -1,9 +1,14 @@
|
|||
package com.futo.platformplayer.subscription
|
||||
|
||||
import com.futo.platformplayer.api.media.models.ResultCapabilities
|
||||
import com.futo.platformplayer.api.media.models.contents.IPlatformContent
|
||||
import com.futo.platformplayer.api.media.platforms.js.JSClient
|
||||
import com.futo.platformplayer.api.media.structures.IPager
|
||||
import com.futo.platformplayer.getNowDiffDays
|
||||
import com.futo.platformplayer.getNowDiffHours
|
||||
import com.futo.platformplayer.logging.Logger
|
||||
import com.futo.platformplayer.models.Subscription
|
||||
import com.futo.platformplayer.states.StatePlatform
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import java.util.concurrent.ForkJoinPool
|
||||
|
||||
|
@ -12,12 +17,91 @@ class SmartSubscriptionAlgorithm(
|
|||
allowFailure: Boolean = false,
|
||||
withCacheFallback: Boolean = true,
|
||||
threadPool: ForkJoinPool? = null
|
||||
): SubscriptionFetchAlgorithm(scope, allowFailure, withCacheFallback, threadPool) {
|
||||
override fun countRequests(subs: Map<Subscription, List<String>>): Map<JSClient, Int> {
|
||||
TODO("Not yet implemented")
|
||||
): SubscriptionsTaskFetchAlgorithm(scope, allowFailure, withCacheFallback, threadPool) {
|
||||
override fun getSubscriptionTasks(subs: Map<Subscription, List<String>>): List<SubscriptionTask> {
|
||||
val allTasks: List<SubscriptionTask> = subs.flatMap { entry ->
|
||||
val sub = entry.key;
|
||||
//Get all urls associated with this subscriptions
|
||||
val allPlatforms = entry.value.associateWith { StatePlatform.instance.getChannelClientOrNull(it) }
|
||||
.filterValues { it is JSClient };
|
||||
|
||||
//For every platform, get all sub-queries associated with that platform
|
||||
return@flatMap allPlatforms
|
||||
.filter { it.value != null }
|
||||
.flatMap {
|
||||
val url = it.key;
|
||||
val client = it.value!! as JSClient;
|
||||
val capabilities = client.getChannelCapabilities();
|
||||
|
||||
if(capabilities.hasType(ResultCapabilities.TYPE_MIXED))
|
||||
return@flatMap listOf(SubscriptionTask(client, sub, it.key, ResultCapabilities.TYPE_MIXED));
|
||||
else {
|
||||
val types = listOf(
|
||||
if(sub.shouldFetchVideos()) ResultCapabilities.TYPE_VIDEOS else null,
|
||||
if(sub.shouldFetchStreams()) ResultCapabilities.TYPE_STREAMS else null,
|
||||
if(sub.shouldFetchPosts()) ResultCapabilities.TYPE_POSTS else null,
|
||||
if(sub.shouldFetchLiveStreams()) ResultCapabilities.TYPE_LIVE else null
|
||||
).filterNotNull().filter { capabilities.hasType(it) };
|
||||
return@flatMap types.map {
|
||||
SubscriptionTask(client, sub, url, it);
|
||||
};
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
val ordering = allTasks.groupBy { it.client }
|
||||
.map { Pair(it.key, it.value.sortedBy { calculateUpdateUrgency(it.sub, it.type) }) };
|
||||
|
||||
val finalTasks = mutableListOf<SubscriptionTask>();
|
||||
|
||||
|
||||
for(clientTasks in ordering) {
|
||||
val limit = clientTasks.first.config.subscriptionRateLimit;
|
||||
if(limit == null || limit <= 0)
|
||||
finalTasks.addAll(clientTasks.second);
|
||||
else {
|
||||
val fetchTasks = clientTasks.second.take(limit);
|
||||
val cacheTasks = clientTasks.second.drop(limit);
|
||||
|
||||
for(cacheTask in cacheTasks)
|
||||
cacheTask.fromCache = true;
|
||||
|
||||
Logger.i(TAG, "Subscription Client Budget [${clientTasks.first.name}]: ${fetchTasks.size}/${limit}")
|
||||
|
||||
finalTasks.addAll(fetchTasks + cacheTasks);
|
||||
}
|
||||
}
|
||||
|
||||
return finalTasks;
|
||||
}
|
||||
|
||||
override fun getSubscriptions(subs: Map<Subscription, List<String>>): Result {
|
||||
TODO("Not yet implemented")
|
||||
|
||||
fun calculateUpdateUrgency(sub: Subscription, type: String): Int {
|
||||
val lastItem = when(type) {
|
||||
ResultCapabilities.TYPE_VIDEOS -> sub.lastVideo;
|
||||
ResultCapabilities.TYPE_STREAMS -> sub.lastLiveStream;
|
||||
ResultCapabilities.TYPE_LIVE -> sub.lastLiveStream;
|
||||
ResultCapabilities.TYPE_POSTS -> sub.lastPost;
|
||||
else -> sub.lastVideo; //TODO: minimum of all
|
||||
};
|
||||
val lastUpdate = when(type) {
|
||||
ResultCapabilities.TYPE_VIDEOS -> sub.lastVideoUpdate;
|
||||
ResultCapabilities.TYPE_STREAMS -> sub.lastLiveStreamUpdate;
|
||||
ResultCapabilities.TYPE_LIVE -> sub.lastLiveStreamUpdate;
|
||||
ResultCapabilities.TYPE_POSTS -> sub.lastPostUpdate;
|
||||
else -> sub.lastVideoUpdate; //TODO: minimum of all
|
||||
};
|
||||
val interval = when(type) {
|
||||
ResultCapabilities.TYPE_VIDEOS -> sub.uploadInterval;
|
||||
ResultCapabilities.TYPE_STREAMS -> sub.uploadStreamInterval;
|
||||
ResultCapabilities.TYPE_LIVE -> sub.uploadStreamInterval;
|
||||
ResultCapabilities.TYPE_POSTS -> sub.uploadPostInterval;
|
||||
else -> sub.uploadInterval; //TODO: minimum of all
|
||||
};
|
||||
val lastItemDaysAgo = lastItem.getNowDiffHours();
|
||||
val lastUpdateHoursAgo = lastUpdate.getNowDiffHours();
|
||||
val expectedHours = lastUpdateHoursAgo.toDouble() - (interval*24);
|
||||
|
||||
return (expectedHours * 100).toInt();
|
||||
}
|
||||
}
|
|
@ -34,6 +34,8 @@ abstract class SubscriptionFetchAlgorithm(
|
|||
);
|
||||
|
||||
companion object {
|
||||
public val TAG = "SubscriptionAlgorithm";
|
||||
|
||||
fun getAlgorithm(algo: SubscriptionFetchAlgorithms, scope: CoroutineScope, allowFailure: Boolean = false, withCacheFallback: Boolean = false, pool: ForkJoinPool? = null): SubscriptionFetchAlgorithm {
|
||||
return when(algo) {
|
||||
SubscriptionFetchAlgorithms.CACHE -> CachedSubscriptionAlgorithm(150, scope, allowFailure, withCacheFallback, pool);
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
package com.futo.platformplayer.subscription
|
||||
|
||||
import com.futo.platformplayer.Settings
|
||||
import com.futo.platformplayer.UIDialogs
|
||||
import com.futo.platformplayer.api.media.models.ResultCapabilities
|
||||
import com.futo.platformplayer.api.media.models.contents.IPlatformContent
|
||||
import com.futo.platformplayer.api.media.platforms.js.JSClient
|
||||
import com.futo.platformplayer.api.media.platforms.js.SourcePluginConfig
|
||||
import com.futo.platformplayer.api.media.structures.DedupContentPager
|
||||
import com.futo.platformplayer.api.media.structures.IPager
|
||||
import com.futo.platformplayer.api.media.structures.MultiChronoContentPager
|
||||
import com.futo.platformplayer.cache.ChannelContentCache
|
||||
import com.futo.platformplayer.engine.exceptions.PluginException
|
||||
import com.futo.platformplayer.engine.exceptions.ScriptCaptchaRequiredException
|
||||
import com.futo.platformplayer.engine.exceptions.ScriptCriticalException
|
||||
import com.futo.platformplayer.exceptions.ChannelException
|
||||
import com.futo.platformplayer.findNonRuntimeException
|
||||
import com.futo.platformplayer.logging.Logger
|
||||
import com.futo.platformplayer.models.Subscription
|
||||
import com.futo.platformplayer.states.StatePlatform
|
||||
import com.futo.platformplayer.states.StateSubscriptions
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import java.lang.IllegalStateException
|
||||
import java.util.concurrent.ExecutionException
|
||||
import java.util.concurrent.ForkJoinPool
|
||||
import java.util.concurrent.ForkJoinTask
|
||||
import kotlin.system.measureTimeMillis
|
||||
|
||||
abstract class SubscriptionsTaskFetchAlgorithm(
|
||||
scope: CoroutineScope,
|
||||
allowFailure: Boolean = false,
|
||||
withCacheFallback: Boolean = true,
|
||||
_threadPool: ForkJoinPool? = null
|
||||
) : SubscriptionFetchAlgorithm(scope, allowFailure, withCacheFallback, _threadPool) {
|
||||
|
||||
|
||||
override fun countRequests(subs: Map<Subscription, List<String>>): Map<JSClient, Int> {
|
||||
return getSubscriptionTasks(subs).groupBy { it.client }.toList()
|
||||
.associate { Pair(it.first, it.second.filter { !it.fromCache }.size) };
|
||||
}
|
||||
|
||||
override fun getSubscriptions(subs: Map<Subscription, List<String>>): Result {
|
||||
val tasks = getSubscriptionTasks(subs);
|
||||
|
||||
Logger.i(TAG, "Starting Subscriptions Fetch:\n" +
|
||||
" Tasks: ${tasks.filter { !it.fromCache }.size}\n" +
|
||||
" Cached: ${tasks.filter { it.fromCache }.size}");
|
||||
try {
|
||||
//TODO: Remove this
|
||||
UIDialogs.toast("Tasks: ${tasks.filter { !it.fromCache }.size}\n" +
|
||||
"Cached: ${tasks.filter { it.fromCache }.size}", false);
|
||||
} catch (ex: Throwable){}
|
||||
|
||||
val exs: ArrayList<Throwable> = arrayListOf();
|
||||
val taskResults = arrayListOf<IPager<IPlatformContent>>();
|
||||
|
||||
val forkTasks = mutableListOf<ForkJoinTask<SubscriptionTaskResult>>();
|
||||
var finished = 0;
|
||||
val exceptionMap: HashMap<Subscription, Throwable> = hashMapOf();
|
||||
val concurrency = Settings.instance.subscriptions.getSubscriptionsConcurrency();
|
||||
val failedPlugins = arrayListOf<String>();
|
||||
val cachedChannels = arrayListOf<String>();
|
||||
|
||||
for(task in tasks) {
|
||||
val forkTask = threadPool.submit<SubscriptionTaskResult> {
|
||||
synchronized(cachedChannels) {
|
||||
if(task.fromCache) {
|
||||
finished++;
|
||||
onProgress.emit(finished, forkTasks.size);
|
||||
if(cachedChannels.contains(task.url))
|
||||
return@submit SubscriptionTaskResult(task, null, null);
|
||||
else {
|
||||
cachedChannels.add(task.url);
|
||||
return@submit SubscriptionTaskResult(task, ChannelContentCache.instance.getChannelCachePager(task.url), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val shouldIgnore = synchronized(failedPlugins) { failedPlugins.contains(task.client.id) };
|
||||
if(shouldIgnore)
|
||||
return@submit SubscriptionTaskResult(task, null, null); //skipped
|
||||
|
||||
var taskEx: Throwable? = null;
|
||||
var pager: IPager<IPlatformContent>;
|
||||
try {
|
||||
val time = measureTimeMillis {
|
||||
pager = StatePlatform.instance.getChannelContent(task.client,
|
||||
task.url, task.type, ResultCapabilities.ORDER_CHONOLOGICAL);
|
||||
|
||||
pager = ChannelContentCache.cachePagerResults(scope, pager) {
|
||||
onNewCacheHit.emit(task.sub, it);
|
||||
};
|
||||
|
||||
val initialPage = pager.getResults();
|
||||
task.sub.updateSubscriptionState(task.type, initialPage);
|
||||
StateSubscriptions.instance.saveSubscription(task.sub);
|
||||
|
||||
finished++;
|
||||
onProgress.emit(finished, forkTasks.size);
|
||||
}
|
||||
Logger.i("StateSubscriptions", "Subscription [${task.sub.channel.name}]:${task.type} results in ${time}ms");
|
||||
return@submit SubscriptionTaskResult(task, pager, null);
|
||||
} catch (ex: Throwable) {
|
||||
Logger.e(StateSubscriptions.TAG, "Subscription [${task.sub.channel.name}] failed", ex);
|
||||
val channelEx = ChannelException(task.sub.channel, ex);
|
||||
finished++;
|
||||
onProgress.emit(finished, forkTasks.size);
|
||||
if (!withCacheFallback)
|
||||
throw channelEx;
|
||||
else {
|
||||
Logger.i(StateSubscriptions.TAG, "Channel ${task.sub.channel.name} failed, substituting with cache");
|
||||
pager = ChannelContentCache.instance.getChannelCachePager(task.sub.channel.url);
|
||||
taskEx = ex;
|
||||
}
|
||||
}
|
||||
return@submit SubscriptionTaskResult(task, null, taskEx);
|
||||
}
|
||||
forkTasks.add(forkTask);
|
||||
}
|
||||
|
||||
val timeTotal = measureTimeMillis {
|
||||
for(task in forkTasks) {
|
||||
try {
|
||||
val result = task.get();
|
||||
if(result != null) {
|
||||
if(result.pager != null)
|
||||
taskResults.add(result.pager!!);
|
||||
if(exceptionMap.containsKey(result.task.sub)) {
|
||||
val ex = exceptionMap[result.task.sub];
|
||||
if(ex != null) {
|
||||
val nonRuntimeEx = findNonRuntimeException(ex);
|
||||
if (nonRuntimeEx != null && (nonRuntimeEx is PluginException || nonRuntimeEx is ChannelException))
|
||||
exs.add(nonRuntimeEx);
|
||||
else
|
||||
throw ex.cause ?: ex;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (ex: ExecutionException) {
|
||||
val nonRuntimeEx = findNonRuntimeException(ex.cause);
|
||||
if(nonRuntimeEx != null && (nonRuntimeEx is PluginException || nonRuntimeEx is ChannelException))
|
||||
exs.add(nonRuntimeEx);
|
||||
else
|
||||
throw ex.cause ?: ex;
|
||||
};
|
||||
}
|
||||
}
|
||||
Logger.i("StateSubscriptions", "Subscriptions results in ${timeTotal}ms")
|
||||
val pager = MultiChronoContentPager(taskResults, allowFailure, 15);
|
||||
pager.initialize();
|
||||
|
||||
return Result(DedupContentPager(pager), exs);
|
||||
}
|
||||
|
||||
abstract fun getSubscriptionTasks(subs: Map<Subscription, List<String>>): List<SubscriptionTask>;
|
||||
|
||||
|
||||
class SubscriptionTask(
|
||||
val client: JSClient,
|
||||
val sub: Subscription,
|
||||
val url: String,
|
||||
val type: String,
|
||||
var fromCache: Boolean = false
|
||||
);
|
||||
|
||||
class SubscriptionTaskResult(
|
||||
val task: SubscriptionTask,
|
||||
val pager: IPager<IPlatformContent>?,
|
||||
val exception: Throwable?
|
||||
)
|
||||
}
|
|
@ -61,8 +61,18 @@
|
|||
android:layout_height="wrap_content"
|
||||
android:alpha="0.5"
|
||||
android:layout_marginTop="5dp"
|
||||
android:layout_marginBottom="20dp"
|
||||
android:layout_marginBottom="5dp"
|
||||
app:buttonText="@string/install_by_url"
|
||||
app:buttonSubText="@string/enter_url_explain"
|
||||
app:buttonIcon="@drawable/ic_link" />
|
||||
<com.futo.platformplayer.views.buttons.BigButton
|
||||
android:id="@+id/option_plugins"
|
||||
android:layout_width="match_parent"
|
||||
android:layout_height="wrap_content"
|
||||
android:alpha="0.5"
|
||||
android:layout_marginTop="5dp"
|
||||
android:layout_marginBottom="20dp"
|
||||
app:buttonText="Install by Store"
|
||||
app:buttonSubText="Browse plugins published through Polycentric."
|
||||
app:buttonIcon="@drawable/ic_sources" />
|
||||
</LinearLayout>
|
|
@ -57,6 +57,15 @@
|
|||
</com.futo.platformplayer.views.fields.FieldForm>
|
||||
|
||||
</LinearLayout>
|
||||
|
||||
<LinearLayout
|
||||
android:id="@+id/advanced_source_buttons"
|
||||
android:layout_width="match_parent"
|
||||
android:layout_height="wrap_content"
|
||||
android:layout_marginTop="12dp"
|
||||
android:orientation="vertical">
|
||||
|
||||
</LinearLayout>
|
||||
</LinearLayout>
|
||||
</ScrollView>
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue