Coverage Report

Created: 2026-01-23 16:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/pipeline/task_scheduler.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <atomic>
21
#include <chrono>
22
#include <condition_variable>
23
#include <cstddef>
24
#include <list>
25
#include <memory>
26
#include <mutex>
27
#include <utility>
28
#include <vector>
29
30
#include "common/status.h"
31
#include "pipeline_task.h"
32
#include "runtime/query_context.h"
33
#include "runtime/workload_group/workload_group.h"
34
#include "task_queue.h"
35
#include "util/thread.h"
36
#include "util/uid_util.h"
37
38
namespace doris {
39
class ExecEnv;
40
class ThreadPool;
41
} // namespace doris
42
43
namespace doris::pipeline {
44
45
class HybridTaskScheduler;
46
class TaskScheduler {
47
public:
48
    virtual ~TaskScheduler();
49
50
    virtual Status submit(PipelineTaskSPtr task);
51
52
    virtual Status start();
53
54
    virtual void stop();
55
56
0
    virtual std::vector<std::pair<std::string, std::vector<int>>> thread_debug_info() {
57
0
        return {{_name, _fix_thread_pool->debug_info()}};
58
0
    }
59
60
protected:
61
    std::string _name;
62
    bool _need_to_stop = false;
63
    bool _shutdown = false;
64
    const int _num_threads;
65
66
private:
67
    friend class HybridTaskScheduler;
68
69
    TaskScheduler(int core_num, std::string name, std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
70
0
            : _name(std::move(name)),
71
0
              _num_threads(core_num),
72
0
              _task_queue(core_num),
73
0
              _cgroup_cpu_ctl(cgroup_cpu_ctl) {
74
0
        LOG(INFO) << "TaskScheduler " << _name << " created with " << core_num << " threads.";
75
0
    }
76
21
    TaskScheduler() : _num_threads(0), _task_queue(0) {}
77
    std::unique_ptr<ThreadPool> _fix_thread_pool;
78
79
    MultiCoreTaskQueue _task_queue;
80
    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
81
82
    void _do_work(int index);
83
};
84
85
class HybridTaskScheduler MOCK_REMOVE(final) : public TaskScheduler {
86
public:
87
    HybridTaskScheduler(int exec_thread_num, int blocking_exec_thread_num, std::string name,
88
                        std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
89
0
            : _blocking_scheduler(blocking_exec_thread_num, name + "_blocking_scheduler",
90
0
                                  cgroup_cpu_ctl),
91
0
              _simple_scheduler(exec_thread_num, name + "_simple_scheduler", cgroup_cpu_ctl) {}
92
0
    ~HybridTaskScheduler() override {
93
0
        DCHECK(_blocking_scheduler._shutdown)
94
0
                << _blocking_scheduler._name << ": " << _blocking_scheduler._shutdown << " "
95
0
                << _blocking_scheduler._need_to_stop << " " << _blocking_scheduler._num_threads;
96
0
        DCHECK(_simple_scheduler._shutdown)
97
0
                << _simple_scheduler._name << ": " << _simple_scheduler._shutdown << " "
98
0
                << _simple_scheduler._need_to_stop << " " << _simple_scheduler._num_threads;
99
0
    }
100
101
    Status submit(PipelineTaskSPtr task) override;
102
103
    Status start() override;
104
105
    void stop() override;
106
107
0
    std::vector<std::pair<std::string, std::vector<int>>> thread_debug_info() override {
108
0
        return {_blocking_scheduler.thread_debug_info()[0],
109
0
                _simple_scheduler.thread_debug_info()[0]};
110
0
    }
111
112
private:
113
    TaskScheduler _blocking_scheduler;
114
    TaskScheduler _simple_scheduler;
115
};
116
} // namespace doris::pipeline