diff --git a/libavcodec/executor.c b/libavcodec/executor.c index 21ebad3def..7a86e894f8 100644 --- a/libavcodec/executor.c +++ b/libavcodec/executor.c @@ -48,6 +48,11 @@ typedef struct ThreadInfo { ExecutorThread thread; } ThreadInfo; +typedef struct Queue { + FFTask *head; + FFTask *tail; +} Queue; + struct FFExecutor { FFTaskCallbacks cb; int thread_count; @@ -60,29 +65,39 @@ struct FFExecutor { AVCond cond; int die; - FFTask *tasks; + Queue *q; }; -static FFTask* remove_task(FFTask **prev, FFTask *t) +static FFTask* remove_task(Queue *q) { - *prev = t->next; - t->next = NULL; + FFTask *t = q->head; + if (t) { + q->head = t->next; + t->next = NULL; + if (!q->head) + q->tail = NULL; + } return t; } -static void add_task(FFTask **prev, FFTask *t) +static void add_task(Queue *q, FFTask *t) { - t->next = *prev; - *prev = t; + t->next = NULL; + if (!q->head) + q->tail = q->head = t; + else + q->tail = q->tail->next = t; } static int run_one_task(FFExecutor *e, void *lc) { FFTaskCallbacks *cb = &e->cb; - FFTask **prev = &e->tasks; + FFTask *t = NULL; - if (*prev) { - FFTask *t = remove_task(prev, *prev); + for (int i = 0; i < e->cb.priorities && !t; i++) + t = remove_task(e->q + i); + + if (t) { if (e->thread_count > 0) ff_mutex_unlock(&e->lock); cb->run(t, lc, cb->user_data); @@ -132,6 +147,7 @@ static void executor_free(FFExecutor *e, const int has_lock, const int has_cond) ff_mutex_destroy(&e->lock); av_free(e->threads); + av_free(e->q); av_free(e->local_contexts); av_free(e); @@ -141,7 +157,7 @@ FFExecutor* ff_executor_alloc(const FFTaskCallbacks *cb, int thread_count) { FFExecutor *e; int has_lock = 0, has_cond = 0; - if (!cb || !cb->user_data || !cb->run || !cb->priority_higher) + if (!cb || !cb->user_data || !cb->run || !cb->priorities) return NULL; e = av_mallocz(sizeof(*e)); @@ -153,6 +169,10 @@ FFExecutor* ff_executor_alloc(const FFTaskCallbacks *cb, int thread_count) if (!e->local_contexts) goto free_executor; + e->q = av_calloc(e->cb.priorities, sizeof(Queue)); + if (!e->q) + goto free_executor; + e->threads = av_calloc(FFMAX(thread_count, 1), sizeof(*e->threads)); if (!e->threads) goto free_executor; @@ -192,16 +212,10 @@ void ff_executor_free(FFExecutor **executor) void ff_executor_execute(FFExecutor *e, FFTask *t) { - FFTaskCallbacks *cb = &e->cb; - FFTask **prev; - if (e->thread_count) ff_mutex_lock(&e->lock); - if (t) { - for (prev = &e->tasks; *prev && cb->priority_higher(*prev, t); prev = &(*prev)->next) - /* nothing */; - add_task(prev, t); - } + if (t) + add_task(e->q + t->priority % e->cb.priorities, t); if (e->thread_count) { ff_cond_signal(&e->cond); ff_mutex_unlock(&e->lock); diff --git a/libavcodec/executor.h b/libavcodec/executor.h index 51763ec25e..cd13d4c518 100644 --- a/libavcodec/executor.h +++ b/libavcodec/executor.h @@ -32,6 +32,7 @@ typedef struct FFTask FFTask; struct FFTask { FFTask *next; + int priority; // task priority should >= 0 and < AVTaskCallbacks.priorities }; typedef struct FFTaskCallbacks { @@ -39,8 +40,8 @@ typedef struct FFTaskCallbacks { int local_context_size; - // return 1 if a's priority > b's priority - int (*priority_higher)(const FFTask *a, const FFTask *b); + // how many priorities do we have? + int priorities; // run the task int (*run)(FFTask *t, void *local_context, void *user_data); diff --git a/libavcodec/vvc/thread.c b/libavcodec/vvc/thread.c index a8c19b17cf..d75784e242 100644 --- a/libavcodec/vvc/thread.c +++ b/libavcodec/vvc/thread.c @@ -103,13 +103,28 @@ typedef struct VVCFrameThread { AVCond cond; } VVCFrameThread; +#define PRIORITY_LOWEST 2 static void add_task(VVCContext *s, VVCTask *t) { - VVCFrameThread *ft = t->fc->ft; + VVCFrameThread *ft = t->fc->ft; + FFTask *task = &t->u.task; + const int priorities[] = { + 0, // VVC_TASK_STAGE_INIT, + 0, // VVC_TASK_STAGE_PARSE, + // For an 8K clip, a CTU line completed in the reference frame may trigger 64 and more inter tasks. + // We assign these tasks the lowest priority to avoid being overwhelmed with inter tasks. + PRIORITY_LOWEST, // VVC_TASK_STAGE_INTER + 1, // VVC_TASK_STAGE_RECON, + 1, // VVC_TASK_STAGE_LMCS, + 1, // VVC_TASK_STAGE_DEBLOCK_V, + 1, // VVC_TASK_STAGE_DEBLOCK_H, + 1, // VVC_TASK_STAGE_SAO, + 1, // VVC_TASK_STAGE_ALF, + }; atomic_fetch_add(&ft->nb_scheduled_tasks, 1); - - ff_executor_execute(s->executor, &t->u.task); + task->priority = priorities[t->stage]; + ff_executor_execute(s->executor, task); } static void task_init(VVCTask *t, VVCTaskStage stage, VVCFrameContext *fc, const int rx, const int ry) @@ -372,31 +387,6 @@ static int task_is_stage_ready(VVCTask *t, int add) return task_has_target_score(t, stage, score); } -#define CHECK(a, b) \ - do { \ - if ((a) != (b)) \ - return (a) < (b); \ - } while (0) - -static int task_priority_higher(const FFTask *_a, const FFTask *_b) -{ - const VVCTask *a = (const VVCTask*)_a; - const VVCTask *b = (const VVCTask*)_b; - - - if (a->stage <= VVC_TASK_STAGE_PARSE || b->stage <= VVC_TASK_STAGE_PARSE) { - CHECK(a->stage, b->stage); - CHECK(a->fc->decode_order, b->fc->decode_order); //decode order - CHECK(a->ry, b->ry); - return a->rx < b->rx; - } - - CHECK(a->fc->decode_order, b->fc->decode_order); //decode order - CHECK(a->rx + a->ry + a->stage, b->rx + b->ry + b->stage); //zigzag with type - CHECK(a->rx + a->ry, b->rx + b->ry); //zigzag - return a->ry < b->ry; -} - static void check_colocation(VVCContext *s, VVCTask *t) { const VVCFrameContext *fc = t->fc; @@ -681,7 +671,7 @@ FFExecutor* ff_vvc_executor_alloc(VVCContext *s, const int thread_count) FFTaskCallbacks callbacks = { s, sizeof(VVCLocalContext), - task_priority_higher, + PRIORITY_LOWEST + 1, task_run, }; return ff_executor_alloc(&callbacks, thread_count);