Coverage Report

Created: 2024-11-20 19:28

/root/doris/be/src/pipeline/pipeline_task.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "pipeline_task.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <glog/logging.h>
23
#include <stddef.h>
24
25
#include <ostream>
26
27
#include "common/status.h"
28
#include "pipeline/exec/operator.h"
29
#include "pipeline/pipeline.h"
30
#include "pipeline_fragment_context.h"
31
#include "runtime/descriptors.h"
32
#include "runtime/query_context.h"
33
#include "runtime/thread_context.h"
34
#include "task_queue.h"
35
#include "util/defer_op.h"
36
#include "util/runtime_profile.h"
37
38
namespace doris {
39
class RuntimeState;
40
} // namespace doris
41
42
namespace doris::pipeline {
43
44
PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
45
                           OperatorPtr& sink, PipelineFragmentContext* fragment_context,
46
                           RuntimeProfile* parent_profile)
47
        : _index(index),
48
          _pipeline(pipeline),
49
          _prepared(false),
50
          _opened(false),
51
          _state(state),
52
          _cur_state(PipelineTaskState::NOT_READY),
53
          _data_state(SourceState::DEPEND_ON_SOURCE),
54
          _fragment_context(fragment_context),
55
          _parent_profile(parent_profile),
56
          _operators(pipeline->_operators),
57
          _source(_operators.front()),
58
          _root(_operators.back()),
59
0
          _sink(sink) {
60
0
    _pipeline_task_watcher.start();
61
0
}
62
63
PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
64
                           PipelineFragmentContext* fragment_context,
65
                           RuntimeProfile* parent_profile)
66
        : _index(index),
67
          _pipeline(pipeline),
68
          _prepared(false),
69
          _opened(false),
70
          _state(state),
71
          _cur_state(PipelineTaskState::NOT_READY),
72
          _data_state(SourceState::DEPEND_ON_SOURCE),
73
          _fragment_context(fragment_context),
74
          _parent_profile(parent_profile),
75
          _operators({}),
76
          _source(nullptr),
77
          _root(nullptr),
78
0
          _sink(nullptr) {
79
0
    _pipeline_task_watcher.start();
80
0
}
81
82
0
void PipelineTask::_fresh_profile_counter() {
83
0
    COUNTER_SET(_wait_source_timer, (int64_t)_wait_source_watcher.elapsed_time());
84
0
    COUNTER_SET(_wait_bf_timer, (int64_t)_wait_bf_watcher.elapsed_time());
85
0
    COUNTER_SET(_schedule_counts, (int64_t)_schedule_time);
86
0
    COUNTER_SET(_wait_sink_timer, (int64_t)_wait_sink_watcher.elapsed_time());
87
0
    COUNTER_SET(_wait_worker_timer, (int64_t)_wait_worker_watcher.elapsed_time());
88
0
    COUNTER_SET(_begin_execute_timer, _begin_execute_time);
89
0
    COUNTER_SET(_eos_timer, _eos_time);
90
0
    COUNTER_SET(_src_pending_finish_over_timer, _src_pending_finish_over_time);
91
0
    COUNTER_SET(_dst_pending_finish_over_timer, _dst_pending_finish_over_time);
92
0
    COUNTER_SET(_pip_task_total_timer, (int64_t)_pipeline_task_watcher.elapsed_time());
93
0
}
94
95
0
void PipelineTask::_init_profile() {
96
0
    std::stringstream ss;
97
0
    ss << "PipelineTask"
98
0
       << " (index=" << _index << ")";
99
0
    auto* task_profile = new RuntimeProfile(ss.str());
100
0
    _parent_profile->add_child(task_profile, true, nullptr);
101
0
    _task_profile.reset(task_profile);
102
0
    _task_cpu_timer = ADD_TIMER(_task_profile, "TaskCpuTime");
103
104
0
    static const char* exec_time = "ExecuteTime";
105
0
    _exec_timer = ADD_TIMER(_task_profile, exec_time);
106
0
    _prepare_timer = ADD_CHILD_TIMER(_task_profile, "PrepareTime", exec_time);
107
0
    _open_timer = ADD_CHILD_TIMER(_task_profile, "OpenTime", exec_time);
108
0
    _get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime", exec_time);
109
0
    _get_block_counter = ADD_COUNTER(_task_profile, "GetBlockCounter", TUnit::UNIT);
110
0
    _sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
111
0
    _close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
112
113
0
    _wait_source_timer = ADD_TIMER(_task_profile, "WaitSourceTime");
114
0
    _wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime");
115
0
    _wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime");
116
0
    _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
117
0
    _block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT);
118
0
    _block_by_source_counts = ADD_COUNTER(_task_profile, "NumBlockedBySrcTimes", TUnit::UNIT);
119
0
    _block_by_sink_counts = ADD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT);
120
0
    _schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT);
121
0
    _yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
122
0
    _core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", TUnit::UNIT);
123
0
    _wait_bf_counts = ADD_COUNTER(_task_profile, "WaitBfTimes", TUnit::UNIT);
124
0
    _wait_dependency_counts = ADD_COUNTER(_task_profile, "WaitDenpendencyTimes", TUnit::UNIT);
125
0
    _pending_finish_counts = ADD_COUNTER(_task_profile, "PendingFinishTimes", TUnit::UNIT);
126
127
0
    _begin_execute_timer = ADD_TIMER(_task_profile, "Task1BeginExecuteTime");
128
0
    _eos_timer = ADD_TIMER(_task_profile, "Task2EosTime");
129
0
    _src_pending_finish_over_timer = ADD_TIMER(_task_profile, "Task3SrcPendingFinishOverTime");
130
0
    _dst_pending_finish_over_timer = ADD_TIMER(_task_profile, "Task4DstPendingFinishOverTime");
131
0
    _pip_task_total_timer = ADD_TIMER(_task_profile, "Task5TotalTime");
132
0
    _close_pipeline_timer = ADD_TIMER(_task_profile, "Task6ClosePipelineTime");
133
0
}
134
135
0
Status PipelineTask::prepare(RuntimeState* state) {
136
0
    DCHECK(_sink);
137
0
    DCHECK(_cur_state == PipelineTaskState::NOT_READY);
138
0
    _init_profile();
139
0
    SCOPED_TIMER(_task_profile->total_time_counter());
140
0
    SCOPED_CPU_TIMER(_task_cpu_timer);
141
0
    SCOPED_TIMER(_prepare_timer);
142
0
    RETURN_IF_ERROR(_sink->prepare(state));
143
0
    for (auto& o : _operators) {
144
0
        RETURN_IF_ERROR(o->prepare(state));
145
0
    }
146
147
0
    _task_profile->add_info_string("Sink",
148
0
                                   fmt::format("{}(dst_id={})", _sink->get_name(), _sink->id()));
149
0
    fmt::memory_buffer operator_ids_str;
150
0
    for (size_t i = 0; i < _operators.size(); i++) {
151
0
        if (i == 0) {
152
0
            fmt::format_to(
153
0
                    operator_ids_str,
154
0
                    fmt::format("[{}(node_id={})", _operators[i]->get_name(), _operators[i]->id()));
155
0
        } else {
156
0
            fmt::format_to(operator_ids_str,
157
0
                           fmt::format(", {}(node_id={})", _operators[i]->get_name(),
158
0
                                       _operators[i]->id()));
159
0
        }
160
0
    }
161
0
    fmt::format_to(operator_ids_str, "]");
162
0
    _task_profile->add_info_string("OperatorIds(source2root)", fmt::to_string(operator_ids_str));
163
164
0
    _block = doris::vectorized::Block::create_unique();
165
166
    // We should make sure initial state for task are runnable so that we can do some preparation jobs (e.g. initialize runtime filters).
167
0
    set_state(PipelineTaskState::RUNNABLE);
168
0
    _prepared = true;
169
0
    return Status::OK();
170
0
}
171
172
0
bool PipelineTask::has_dependency() {
173
0
    if (_dependency_finish) {
174
0
        return false;
175
0
    }
176
0
    if (_fragment_context->is_canceled()) {
177
0
        _dependency_finish = true;
178
0
        return false;
179
0
    }
180
0
    if (_pipeline->has_dependency()) {
181
0
        return true;
182
0
    }
183
184
0
    if (!query_context()->is_ready_to_execute()) {
185
0
        return true;
186
0
    }
187
188
    // runtime filter is a dependency
189
0
    _dependency_finish = true;
190
0
    return false;
191
0
}
192
193
0
Status PipelineTask::_open() {
194
0
    SCOPED_TIMER(_task_profile->total_time_counter());
195
0
    SCOPED_CPU_TIMER(_task_cpu_timer);
196
0
    SCOPED_TIMER(_open_timer);
197
0
    for (auto& o : _operators) {
198
0
        RETURN_IF_ERROR(o->open(_state));
199
0
    }
200
0
    if (_sink) {
201
0
        RETURN_IF_ERROR(_sink->open(_state));
202
0
    }
203
0
    _opened = true;
204
0
    return Status::OK();
205
0
}
206
207
0
void PipelineTask::set_task_queue(TaskQueue* task_queue) {
208
0
    _task_queue = task_queue;
209
0
}
210
211
0
Status PipelineTask::execute(bool* eos) {
212
0
    SCOPED_TIMER(_task_profile->total_time_counter());
213
0
    SCOPED_TIMER(_exec_timer);
214
0
    SCOPED_ATTACH_TASK(_state);
215
0
    int64_t time_spent = 0;
216
217
0
    ThreadCpuStopWatch cpu_time_stop_watch;
218
0
    cpu_time_stop_watch.start();
219
0
    Defer defer {[&]() {
220
0
        if (_task_queue) {
221
0
            _task_queue->update_statistics(this, time_spent);
222
0
        }
223
0
        int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
224
0
        _task_cpu_timer->update(delta_cpu_time);
225
0
        auto cpu_qs = query_context()->get_cpu_statistics();
226
0
        if (cpu_qs) {
227
0
            cpu_qs->add_cpu_nanos(delta_cpu_time);
228
0
        }
229
0
        query_context()->update_wg_cpu_adder(delta_cpu_time);
230
0
    }};
231
    // The status must be runnable
232
0
    *eos = false;
233
0
    if (!_opened && !_fragment_context->is_canceled()) {
234
0
        {
235
0
            SCOPED_RAW_TIMER(&time_spent);
236
            // if _open_status is not ok, could know have execute open function,
237
            // now execute open again, so need excluding PIP_WAIT_FOR_RF and PIP_WAIT_FOR_SC error out.
238
0
            if (!_open_status.ok() && !_open_status.is<ErrorCode::PIP_WAIT_FOR_RF>() &&
239
0
                !_open_status.is<ErrorCode::PIP_WAIT_FOR_SC>()) {
240
0
                return _open_status;
241
0
            }
242
            // here execute open and not check dependency(eg: the second start rpc arrival)
243
            // so if open have some error, and return error status directly, the query will be cancel.
244
            // and then the rpc arrival will not found the query as have been canceled and remove.
245
0
            _open_status = _open();
246
0
            if (_open_status.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
247
0
                set_state(PipelineTaskState::BLOCKED_FOR_RF);
248
0
                return Status::OK();
249
0
            } else if (_open_status.is<ErrorCode::PIP_WAIT_FOR_SC>()) {
250
0
                set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
251
0
                return Status::OK();
252
0
            }
253
            //if status is not ok, and have dependency to push back to queue again.
254
0
            if (!_open_status.ok() && has_dependency()) {
255
0
                set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
256
0
                return Status::OK();
257
0
            }
258
            // if not ok and no dependency, return error to cancel.
259
0
            RETURN_IF_ERROR(_open_status);
260
0
        }
261
0
        if (has_dependency()) {
262
0
            set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
263
0
            return Status::OK();
264
0
        }
265
0
        if (!source_can_read()) {
266
0
            set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
267
0
            return Status::OK();
268
0
        }
269
0
        if (!sink_can_write()) {
270
0
            set_state(PipelineTaskState::BLOCKED_FOR_SINK);
271
0
            return Status::OK();
272
0
        }
273
0
    }
274
275
0
    auto status = Status::OK();
276
0
    _task_profile->add_info_string("TaskState", "Runnable");
277
0
    this->set_begin_execute_time();
278
0
    while (!_fragment_context->is_canceled()) {
279
0
        if (_data_state != SourceState::MORE_DATA && !source_can_read()) {
280
0
            set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
281
0
            _task_profile->add_info_string("TaskState", "BlockedBySource");
282
0
            break;
283
0
        }
284
0
        if (!sink_can_write()) {
285
0
            set_state(PipelineTaskState::BLOCKED_FOR_SINK);
286
0
            _task_profile->add_info_string("TaskState", "BlockedBySink");
287
0
            break;
288
0
        }
289
0
        if (time_spent > THREAD_TIME_SLICE) {
290
0
            COUNTER_UPDATE(_yield_counts, 1);
291
0
            _task_profile->add_info_string("TaskState", "Yield");
292
0
            break;
293
0
        }
294
        // TODO llj: Pipeline entity should_yield
295
0
        SCOPED_RAW_TIMER(&time_spent);
296
0
        _block->clear_column_data(_root->row_desc().num_materialized_slots());
297
0
        auto* block = _block.get();
298
299
        // Pull block from operator chain
300
0
        {
301
0
            SCOPED_TIMER(_get_block_timer);
302
0
            _get_block_counter->update(1);
303
0
            RETURN_IF_ERROR(_root->get_block(_state, block, _data_state));
304
0
        }
305
0
        *eos = _data_state == SourceState::FINISHED;
306
307
0
        if (_block->rows() != 0 || *eos) {
308
0
            SCOPED_TIMER(_sink_timer);
309
0
            status = _sink->sink(_state, block, _data_state);
310
0
            if (!status.is<ErrorCode::END_OF_FILE>()) {
311
0
                RETURN_IF_ERROR(status);
312
0
            }
313
0
            *eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos;
314
0
            if (*eos) { // just return, the scheduler will do finish work
315
0
                _task_profile->add_info_string("TaskState", "Finished");
316
0
                break;
317
0
            }
318
0
        }
319
0
    }
320
0
    if (*eos) { // now only join node/set operation node have add_dependency, and join probe could start when the join sink is eos
321
0
        _finish_p_dependency();
322
0
    }
323
324
    // If the status is eof(sink node will return eof if downstream fragment finished), then return it.
325
0
    return status;
326
0
}
327
328
0
Status PipelineTask::close(Status exec_status) {
329
0
    int64_t close_ns = 0;
330
0
    Defer defer {[&]() {
331
0
        if (_task_queue) {
332
0
            _task_queue->update_statistics(this, close_ns);
333
0
        }
334
0
    }};
335
0
    Status s;
336
0
    {
337
0
        SCOPED_RAW_TIMER(&close_ns);
338
0
        s = _sink->close(_state);
339
0
        for (auto& op : _operators) {
340
0
            auto tem = op->close(_state);
341
0
            if (!tem.ok() && s.ok()) {
342
0
                s = tem;
343
0
            }
344
0
        }
345
0
    }
346
0
    if (_opened) {
347
0
        _fresh_profile_counter();
348
0
        COUNTER_SET(_close_timer, close_ns);
349
0
        COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns);
350
0
    }
351
0
    return s;
352
0
}
353
354
0
QueryContext* PipelineTask::query_context() {
355
0
    return _fragment_context->get_query_ctx();
356
0
}
357
358
// The FSM see PipelineTaskState's comment
359
0
void PipelineTask::set_state(PipelineTaskState state) {
360
0
    DCHECK(_cur_state != PipelineTaskState::FINISHED);
361
362
0
    if (_cur_state == state) {
363
0
        return;
364
0
    }
365
0
    if (_cur_state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
366
0
        if (state == PipelineTaskState::RUNNABLE) {
367
0
            _wait_source_watcher.stop();
368
0
        }
369
0
    } else if (_cur_state == PipelineTaskState::BLOCKED_FOR_SINK) {
370
0
        if (state == PipelineTaskState::RUNNABLE) {
371
0
            _wait_sink_watcher.stop();
372
0
        }
373
0
    } else if (_cur_state == PipelineTaskState::BLOCKED_FOR_RF) {
374
0
        if (state == PipelineTaskState::RUNNABLE) {
375
0
            _wait_bf_watcher.stop();
376
0
        }
377
0
    } else if (_cur_state == PipelineTaskState::RUNNABLE) {
378
0
        COUNTER_UPDATE(_block_counts, 1);
379
0
        if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
380
0
            _wait_source_watcher.start();
381
0
            COUNTER_UPDATE(_block_by_source_counts, 1);
382
0
        } else if (state == PipelineTaskState::BLOCKED_FOR_SINK) {
383
0
            _wait_sink_watcher.start();
384
0
            COUNTER_UPDATE(_block_by_sink_counts, 1);
385
0
        } else if (state == PipelineTaskState::BLOCKED_FOR_RF) {
386
0
            _wait_bf_watcher.start();
387
0
            COUNTER_UPDATE(_wait_bf_counts, 1);
388
0
        } else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
389
0
            COUNTER_UPDATE(_wait_dependency_counts, 1);
390
0
        } else if (state == PipelineTaskState::PENDING_FINISH) {
391
0
            COUNTER_UPDATE(_pending_finish_counts, 1);
392
0
        }
393
0
    }
394
395
0
    _cur_state = state;
396
0
}
397
398
0
std::string PipelineTask::debug_string() {
399
0
    fmt::memory_buffer debug_string_buffer;
400
401
0
    fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id()));
402
0
    fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
403
0
                   print_id(fragment_context()->get_fragment_instance_id()));
404
405
0
    fmt::format_to(debug_string_buffer, "RuntimeUsage: {}\n",
406
0
                   PrettyPrinter::print(get_runtime_ns(), TUnit::TIME_NS));
407
0
    {
408
0
        std::stringstream profile_ss;
409
0
        _fresh_profile_counter();
410
0
        _task_profile->pretty_print(&profile_ss, "");
411
0
        fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str());
412
0
    }
413
0
    fmt::format_to(debug_string_buffer,
414
0
                   "PipelineTask[this = {}, state = {}]\noperators: ", (void*)this,
415
0
                   get_state_name(_cur_state));
416
0
    for (size_t i = 0; i < _operators.size(); i++) {
417
0
        fmt::format_to(debug_string_buffer, "\n{}{}", std::string(i * 2, ' '),
418
0
                       _operators[i]->debug_string());
419
0
        std::stringstream profile_ss;
420
0
        _operators[i]->get_runtime_profile()->pretty_print(&profile_ss, std::string(i * 2, ' '));
421
0
        fmt::format_to(debug_string_buffer, "\n{}", profile_ss.str());
422
0
    }
423
0
    fmt::format_to(debug_string_buffer, "\n{}{}", std::string(_operators.size() * 2, ' '),
424
0
                   _sink->debug_string());
425
0
    {
426
0
        std::stringstream profile_ss;
427
0
        _sink->get_runtime_profile()->pretty_print(&profile_ss,
428
0
                                                   std::string(_operators.size() * 2, ' '));
429
0
        fmt::format_to(debug_string_buffer, "\n{}", profile_ss.str());
430
0
    }
431
0
    return fmt::to_string(debug_string_buffer);
432
0
}
433
434
} // namespace doris::pipeline