Coverage Report

Created: 2026-05-27 22:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/group_commit/group_commit_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/group_commit/group_commit_mgr.h"
19
20
#include <gen_cpp/Types_types.h>
21
#include <glog/logging.h>
22
23
#include <chrono>
24
25
#include "cloud/config.h"
26
#include "common/compiler_util.h"
27
#include "common/config.h"
28
#include "common/status.h"
29
#include "exec/pipeline/dependency.h"
30
#include "runtime/exec_env.h"
31
#include "runtime/fragment_mgr.h"
32
#include "runtime/memory/mem_tracker_limiter.h"
33
#include "runtime/thread_context.h"
34
#include "util/client_cache.h"
35
#include "util/debug_points.h"
36
#include "util/thrift_rpc_helper.h"
37
#include "util/time.h"
38
39
namespace doris {
40
41
bvar::Adder<uint64_t> group_commit_block_by_memory_counter("group_commit_block_by_memory_counter");
42
43
0
std::string LoadBlockQueue::_get_load_ids() {
44
0
    std::stringstream ss;
45
0
    ss << "[";
46
0
    for (auto& id : _load_ids_to_write_dep) {
47
0
        ss << id.first.to_string() << ", ";
48
0
    }
49
0
    ss << "]";
50
0
    return ss.str();
51
0
}
52
53
Status LoadBlockQueue::add_block(RuntimeState* runtime_state, std::shared_ptr<Block> block,
54
0
                                 bool write_wal, UniqueId& load_id) {
55
0
    DBUG_EXECUTE_IF("LoadBlockQueue.add_block.failed",
56
0
                    { return Status::InternalError("LoadBlockQueue.add_block.failed"); });
57
0
    std::unique_lock l(mutex);
58
0
    RETURN_IF_ERROR(status);
59
0
    if (UNLIKELY(runtime_state->is_cancelled())) {
60
0
        return runtime_state->cancel_reason();
61
0
    }
62
0
    RETURN_IF_ERROR(status);
63
0
    LOG(INFO) << "query_id: " << print_id(runtime_state->query_id())
64
0
              << ", add block rows=" << block->rows() << ", use group_commit label=" << label;
65
0
    DBUG_EXECUTE_IF("LoadBlockQueue.add_block.block", DBUG_BLOCK);
66
0
    if (block->rows() > 0) {
67
0
        if (!config::group_commit_wait_replay_wal_finish) {
68
0
            _block_queue.emplace_back(block);
69
0
            _data_bytes += block->bytes();
70
0
            size_t before_block_queues_bytes = _all_block_queues_bytes->load();
71
0
            _all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed);
72
0
            VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::add_block). "
73
0
                       << "Cur block rows=" << block->rows() << ", bytes=" << block->bytes()
74
0
                       << ". all block queues bytes from " << before_block_queues_bytes << " to  "
75
0
                       << _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size()
76
0
                       << ". txn_id=" << txn_id << ", label=" << label
77
0
                       << ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids();
78
0
        }
79
0
        if (write_wal || config::group_commit_wait_replay_wal_finish) {
80
0
            auto st = _v_wal_writer->write_wal(block.get());
81
0
            if (!st.ok()) {
82
0
                _cancel_without_lock(st);
83
0
                return st;
84
0
            }
85
0
        }
86
0
        if (!runtime_state->is_cancelled() && status.ok() &&
87
0
            _all_block_queues_bytes->load(std::memory_order_relaxed) >=
88
0
                    config::group_commit_queue_mem_limit) {
89
0
            group_commit_block_by_memory_counter << 1;
90
0
            auto dep_it = _load_ids_to_write_dep.find(load_id);
91
0
            DCHECK(dep_it != _load_ids_to_write_dep.end());
92
0
            if (dep_it != _load_ids_to_write_dep.end() && dep_it->second) {
93
0
                dep_it->second->block();
94
0
                VLOG_DEBUG << "block add_block for load_id=" << load_id << ", memory="
95
0
                           << _all_block_queues_bytes->load(std::memory_order_relaxed)
96
0
                           << ". inner load_id=" << load_instance_id << ", label=" << label;
97
0
            }
98
0
        }
99
0
    }
100
0
    if (!_need_commit.load()) {
101
0
        if (_data_bytes >= _group_commit_data_bytes) {
102
0
            VLOG_DEBUG << "group commit meets commit condition for data size, label=" << label
103
0
                       << ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes;
104
0
            _need_commit.store(true);
105
0
            data_size_condition = true;
106
0
        }
107
0
        if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() -
108
0
                                                                  _start_time)
109
0
                    .count() >= _group_commit_interval_ms) {
110
0
            VLOG_DEBUG << "group commit meets commit condition for time interval, label=" << label
111
0
                       << ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes;
112
0
            _need_commit.store(true);
113
0
        }
114
0
    }
115
0
    for (auto read_dep : _read_deps) {
116
0
        read_dep->set_ready();
117
0
        VLOG_DEBUG << "set ready for inner load_id=" << load_instance_id;
118
0
    }
119
0
    return Status::OK();
120
0
}
121
122
Status LoadBlockQueue::get_block(RuntimeState* runtime_state, Block* block, bool* find_block,
123
0
                                 bool* eos, std::shared_ptr<Dependency> get_block_dep) {
124
0
    *find_block = false;
125
0
    *eos = false;
126
0
    std::unique_lock l(mutex);
127
0
    if (runtime_state->is_cancelled() || !status.ok()) {
128
0
        auto st = runtime_state->cancel_reason();
129
0
        _cancel_without_lock(st);
130
0
        return status;
131
0
    }
132
0
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
133
0
                            std::chrono::steady_clock::now() - _start_time)
134
0
                            .count();
135
0
    if (!_need_commit.load() && duration >= _group_commit_interval_ms) {
136
0
        _need_commit.store(true);
137
0
    }
138
0
    if (_block_queue.empty()) {
139
0
        if (_need_commit.load() && duration >= 10 * _group_commit_interval_ms) {
140
0
            auto last_print_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
141
0
                                               std::chrono::steady_clock::now() - _last_print_time)
142
0
                                               .count();
143
0
            if (last_print_duration >= 10000) {
144
0
                _last_print_time = std::chrono::steady_clock::now();
145
0
                LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id
146
0
                          << ", label=" << label << ", instance_id=" << load_instance_id
147
0
                          << ", duration=" << duration << ", load_ids=" << _get_load_ids();
148
0
            }
149
0
        }
150
0
        VLOG_DEBUG << "get_block for inner load_id=" << load_instance_id << ", but queue is empty";
151
0
        if (!_need_commit.load()) {
152
0
            get_block_dep->block();
153
0
            VLOG_DEBUG << "block get_block for inner load_id=" << load_instance_id;
154
0
        }
155
0
    } else {
156
0
        const BlockData block_data = _block_queue.front();
157
0
        block->swap(*block_data.block);
158
0
        *find_block = true;
159
0
        _block_queue.pop_front();
160
0
        size_t before_block_queues_bytes = _all_block_queues_bytes->load();
161
0
        _all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed);
162
0
        VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). "
163
0
                   << "Cur block rows=" << block->rows() << ", bytes=" << block->bytes()
164
0
                   << ". all block queues bytes from " << before_block_queues_bytes << " to  "
165
0
                   << _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size()
166
0
                   << ". txn_id=" << txn_id << ", label=" << label
167
0
                   << ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids();
168
0
    }
169
0
    if (_block_queue.empty() && _need_commit.load() && _load_ids_to_write_dep.empty()) {
170
0
        *eos = true;
171
0
    } else {
172
0
        *eos = false;
173
0
    }
174
0
    if (_all_block_queues_bytes->load(std::memory_order_relaxed) <
175
0
        config::group_commit_queue_mem_limit) {
176
0
        for (auto& id : _load_ids_to_write_dep) {
177
0
            id.second->set_ready();
178
0
        }
179
0
        VLOG_DEBUG << "set ready for load_ids=" << _get_load_ids()
180
0
                   << ". inner load_id=" << load_instance_id << ", label=" << label;
181
0
    }
182
0
    return Status::OK();
183
0
}
184
185
0
Status LoadBlockQueue::remove_load_id(const UniqueId& load_id) {
186
0
    std::unique_lock l(mutex);
187
0
    if (_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()) {
188
0
        _load_ids_to_write_dep[load_id]->set_always_ready();
189
0
        _load_ids_to_write_dep.erase(load_id);
190
0
        for (auto read_dep : _read_deps) {
191
0
            read_dep->set_ready();
192
0
        }
193
0
        VLOG_DEBUG << "set ready for load_id=" << load_id << ", inner load_id=" << load_instance_id;
194
0
        return Status::OK();
195
0
    }
196
0
    return Status::NotFound<false>("load_id=" + load_id.to_string() +
197
0
                                   " not in block queue, label=" + label);
198
0
}
199
200
0
bool LoadBlockQueue::contain_load_id(const UniqueId& load_id) {
201
0
    std::unique_lock l(mutex);
202
0
    return _load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end();
203
0
}
204
205
Status LoadBlockQueue::add_load_id(const UniqueId& load_id,
206
0
                                   const std::shared_ptr<Dependency> put_block_dep) {
207
0
    std::unique_lock l(mutex);
208
0
    if (_need_commit.load() || !status.ok() || process_finish.load()) {
209
0
        return Status::InternalError<false>(
210
0
                "block queue cannot add load id, id=" + load_instance_id.to_string() +
211
0
                ", need_commit=" + (_need_commit.load() ? "true" : "false") +
212
0
                ", process_finish=" + (process_finish.load() ? "true" : "false") +
213
0
                ", queue_status=" + status.to_string());
214
0
    }
215
0
    _load_ids_to_write_dep[load_id] = put_block_dep;
216
0
    group_commit_load_count.fetch_add(1);
217
0
    return Status::OK();
218
0
}
219
220
0
void LoadBlockQueue::cancel(const Status& st) {
221
0
    DCHECK(!st.ok());
222
0
    std::unique_lock l(mutex);
223
0
    _cancel_without_lock(st);
224
0
}
225
226
0
void LoadBlockQueue::_cancel_without_lock(const Status& st) {
227
0
    LOG(INFO) << "cancel group_commit, instance_id=" << load_instance_id << ", label=" << label
228
0
              << ", status=" << st.to_string();
229
0
    status =
230
0
            Status::Cancelled("cancel group_commit, label=" + label + ", status=" + st.to_string());
231
0
    size_t before_block_queues_bytes = _all_block_queues_bytes->load();
232
0
    while (!_block_queue.empty()) {
233
0
        const BlockData& block_data = _block_queue.front();
234
0
        _all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed);
235
0
        _block_queue.pop_front();
236
0
    }
237
0
    VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::_cancel_without_block). "
238
0
               << "all block queues bytes from " << before_block_queues_bytes << " to "
239
0
               << _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size()
240
0
               << ", txn_id=" << txn_id << ", label=" << label
241
0
               << ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids();
242
0
    for (auto& id : _load_ids_to_write_dep) {
243
0
        id.second->set_always_ready();
244
0
    }
245
0
    for (auto read_dep : _read_deps) {
246
0
        read_dep->set_ready();
247
0
    }
248
0
    VLOG_DEBUG << "set ready for load_ids=" << _get_load_ids()
249
0
               << ", inner load_id=" << load_instance_id;
250
0
}
251
252
Status GroupCommitTable::get_first_block_load_queue(
253
        int64_t table_id, int64_t base_schema_version, int64_t index_size, const UniqueId& load_id,
254
        std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
255
0
        std::shared_ptr<Dependency> create_plan_dep, std::shared_ptr<Dependency> put_block_dep) {
256
0
    DCHECK(table_id == _table_id);
257
0
    std::unique_lock l(_lock);
258
0
    auto try_to_get_matched_queue = [&]() -> Status {
259
0
        for (const auto& [_, inner_block_queue] : _load_block_queues) {
260
0
            if (inner_block_queue->contain_load_id(load_id)) {
261
0
                load_block_queue = inner_block_queue;
262
0
                return Status::OK();
263
0
            }
264
0
        }
265
0
        for (const auto& [_, inner_block_queue] : _load_block_queues) {
266
0
            if (!inner_block_queue->need_commit()) {
267
0
                if (base_schema_version == inner_block_queue->schema_version &&
268
0
                    index_size == inner_block_queue->index_size) {
269
0
                    if (inner_block_queue->add_load_id(load_id, put_block_dep).ok()) {
270
0
                        load_block_queue = inner_block_queue;
271
0
                        return Status::OK();
272
0
                    }
273
0
                } else {
274
0
                    return Status::DataQualityError<false>(
275
0
                            "schema version not match, maybe a schema change is in process. "
276
0
                            "Please retry this load manually.");
277
0
                }
278
0
            }
279
0
        }
280
0
        return Status::InternalError<false>("can not get a block queue for table_id: " +
281
0
                                            std::to_string(_table_id) + _create_plan_failed_reason);
282
0
    };
283
284
0
    if (try_to_get_matched_queue().ok()) {
285
0
        return Status::OK();
286
0
    }
287
0
    create_plan_dep->block();
288
0
    _create_plan_be_exe_version = be_exe_version;
289
0
    if (_create_plan_deps.empty()) {
290
0
        _create_plan_start_time_ms = MonotonicMillis();
291
0
    }
292
0
    _create_plan_deps.emplace(load_id, std::make_tuple(create_plan_dep, put_block_dep,
293
0
                                                       base_schema_version, index_size));
294
0
    [[maybe_unused]] auto submit_st = _submit_create_group_commit_load();
295
0
    return try_to_get_matched_queue();
296
0
}
297
298
0
Status GroupCommitTable::submit_create_group_commit_load() {
299
0
    std::unique_lock l(_lock);
300
0
    if (_create_plan_deps.empty()) {
301
0
        return Status::OK();
302
0
    }
303
0
    return _submit_create_group_commit_load();
304
0
}
305
306
0
Status GroupCommitTable::_submit_create_group_commit_load() {
307
0
    if (_is_creating_plan_fragment) {
308
0
        return Status::OK();
309
0
    }
310
311
0
    int64_t timeout_ms = config::group_commit_create_plan_timeout_ms;
312
0
    if (timeout_ms > 0 && !_create_plan_deps.empty()) {
313
0
        int64_t now_ms = MonotonicMillis();
314
0
        if (_create_plan_start_time_ms > 0 && now_ms - _create_plan_start_time_ms > timeout_ms) {
315
0
            _create_plan_failed_reason =
316
0
                    ". group commit create plan timeout after " + std::to_string(timeout_ms) + "ms";
317
0
            for (const auto& [id, load_info] : _create_plan_deps) {
318
0
                std::get<0>(load_info)->set_ready();
319
0
            }
320
0
            _create_plan_deps.clear();
321
0
            _create_plan_start_time_ms = 0;
322
0
            return Status::OK();
323
0
        }
324
0
    }
325
326
0
    auto mem_tracker = _group_commit_mgr->group_commit_mem_tracker();
327
0
    int be_exe_version = _create_plan_be_exe_version;
328
0
    _is_creating_plan_fragment = true;
329
0
    auto submit_st = _thread_pool->submit_func([&, be_exe_version, mem_tracker] {
330
0
        std::shared_ptr<LoadBlockQueue> created_load_block_queue;
331
0
        Status create_group_commit_st = Status::OK();
332
0
        std::string create_plan_failed_reason;
333
0
        Defer defer {[&]() {
334
0
            bool need_resubmit = !create_group_commit_st.ok();
335
0
            std::unique_lock l(_lock);
336
0
            _is_creating_plan_fragment = false;
337
0
            _create_plan_failed_reason = create_plan_failed_reason;
338
0
            if (created_load_block_queue && create_group_commit_st.ok() &&
339
0
                !created_load_block_queue->need_commit()) {
340
0
                std::vector<UniqueId> success_load_ids;
341
0
                for (const auto& [id, load_info] : _create_plan_deps) {
342
0
                    auto create_dep = std::get<0>(load_info);
343
0
                    auto put_dep = std::get<1>(load_info);
344
0
                    if (created_load_block_queue->schema_version == std::get<2>(load_info) &&
345
0
                        created_load_block_queue->index_size == std::get<3>(load_info)) {
346
0
                        auto st = created_load_block_queue->add_load_id(id, put_dep);
347
0
                        if (!st.ok()) {
348
0
                            LOG(WARNING) << "failed to add pending load_id into created "
349
0
                                            "group commit queue, load_id="
350
0
                                         << id << ", label=" << created_load_block_queue->label
351
0
                                         << ", status=" << st.to_string();
352
0
                            need_resubmit = true;
353
0
                        } else {
354
0
                            create_dep->set_ready();
355
0
                            success_load_ids.emplace_back(id);
356
0
                        }
357
0
                    } else if (created_load_block_queue->schema_version > std::get<2>(load_info) ||
358
0
                               (created_load_block_queue->schema_version ==
359
0
                                        std::get<2>(load_info) &&
360
0
                                created_load_block_queue->index_size != std::get<3>(load_info))) {
361
                        // schema version mismatch:
362
                        //   1. the schema version of created load block queue is newer than the load request
363
                        //   2. the index size is not equal
364
                        // set ready for the load request to let it fail
365
0
                        create_dep->set_ready();
366
0
                        success_load_ids.emplace_back(id);
367
0
                    }
368
0
                }
369
0
                for (const auto& id : success_load_ids) {
370
0
                    _create_plan_deps.erase(id);
371
0
                }
372
0
                if (_create_plan_deps.empty()) {
373
0
                    _create_plan_start_time_ms = 0;
374
0
                }
375
0
            }
376
0
            if (!_create_plan_deps.empty()) {
377
0
                need_resubmit = true;
378
0
            }
379
0
            if (need_resubmit && _group_commit_mgr) {
380
0
                LOG(INFO) << "resubmit create group commit load task for table: " << _table_id;
381
0
                _group_commit_mgr->add_need_create_plan_table(_table_id);
382
0
            }
383
0
        }};
384
0
        create_group_commit_st =
385
0
                _create_group_commit_load(be_exe_version, mem_tracker, created_load_block_queue);
386
0
        if (!create_group_commit_st.ok()) {
387
0
            LOG(WARNING) << "create group commit load error: "
388
0
                         << create_group_commit_st.to_string();
389
0
            create_plan_failed_reason = ". create group commit load error: " +
390
0
                                        create_group_commit_st.to_string().substr(0, 300);
391
0
        } else {
392
0
            create_plan_failed_reason = "";
393
0
        }
394
0
    });
395
0
    if (!submit_st.ok()) {
396
0
        _is_creating_plan_fragment = false;
397
0
        _create_plan_failed_reason =
398
0
                ". create group commit load error: submit create group commit load task failed: " +
399
0
                submit_st.to_string().substr(0, 300);
400
0
        for (const auto& [id, load_info] : _create_plan_deps) {
401
0
            std::get<0>(load_info)->set_ready();
402
0
        }
403
0
        _create_plan_deps.clear();
404
0
        LOG(WARNING) << "submit create group commit load task for table: " << _table_id
405
0
                     << ", error: " << submit_st.to_string();
406
0
    }
407
0
    return submit_st;
408
0
}
409
410
0
void GroupCommitTable::remove_load_id(const UniqueId& load_id) {
411
0
    std::unique_lock l(_lock);
412
0
    if (_create_plan_deps.find(load_id) != _create_plan_deps.end()) {
413
0
        _create_plan_deps.erase(load_id);
414
0
        return;
415
0
    }
416
0
    for (const auto& [_, inner_block_queue] : _load_block_queues) {
417
0
        if (inner_block_queue->remove_load_id(load_id).ok()) {
418
0
            return;
419
0
        }
420
0
    }
421
0
}
422
423
Status GroupCommitTable::_create_group_commit_load(
424
        int be_exe_version, const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
425
0
        std::shared_ptr<LoadBlockQueue>& created_load_block_queue) {
426
0
    Status st = Status::OK();
427
0
    TStreamLoadPutResult result;
428
0
    std::string label;
429
0
    int64_t txn_id;
430
0
    TUniqueId instance_id;
431
0
    {
432
0
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
433
0
        UniqueId load_id = UniqueId::gen_uid();
434
0
        TUniqueId tload_id;
435
0
        tload_id.__set_hi(load_id.hi);
436
0
        tload_id.__set_lo(load_id.lo);
437
0
        std::regex reg("-");
438
0
        label = "group_commit_" + std::regex_replace(load_id.to_string(), reg, "_");
439
0
        std::stringstream ss;
440
0
        ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label
441
0
           << " select * from group_commit(\"table_id\"=\"" << _table_id << "\")";
442
0
        TStreamLoadPutRequest request;
443
0
        request.__set_load_sql(ss.str());
444
0
        request.__set_loadId(tload_id);
445
0
        request.__set_label(label);
446
0
        request.__set_token("group_commit"); // this is a fake, fe not check it now
447
0
        request.__set_max_filter_ratio(1.0);
448
0
        request.__set_strictMode(false);
449
0
        request.__set_partial_update(false);
450
        // this is an internal interface, use admin to pass the auth check
451
0
        request.__set_user("admin");
452
0
        if (_exec_env->cluster_info()->backend_id != 0) {
453
0
            request.__set_backend_id(_exec_env->cluster_info()->backend_id);
454
0
        } else {
455
0
            LOG(WARNING) << "_exec_env->cluster_info not set backend_id";
456
0
        }
457
0
        TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
458
0
        st = ThriftRpcHelper::rpc<FrontendServiceClient>(
459
0
                master_addr.hostname, master_addr.port,
460
0
                [&result, &request](FrontendServiceConnection& client) {
461
0
                    client->streamLoadPut(result, request);
462
0
                },
463
0
                10000L);
464
0
        if (!st.ok()) {
465
0
            LOG(WARNING) << "create group commit load rpc error, st=" << st.to_string();
466
0
            return st;
467
0
        }
468
0
        st = Status::create<false>(result.status);
469
0
        if (st.ok() && !result.__isset.pipeline_params) {
470
0
            st = Status::InternalError("Non-pipeline is disabled!");
471
0
        }
472
0
        if (!st.ok()) {
473
0
            LOG(WARNING) << "create group commit load error, st=" << st.to_string();
474
0
            return st;
475
0
        }
476
0
        auto& pipeline_params = result.pipeline_params;
477
0
        auto schema_version = pipeline_params.fragment.output_sink.olap_table_sink.schema.version;
478
0
        auto index_size =
479
0
                pipeline_params.fragment.output_sink.olap_table_sink.schema.indexes.size();
480
0
        DCHECK(pipeline_params.fragment.output_sink.olap_table_sink.db_id == _db_id);
481
0
        txn_id = pipeline_params.txn_conf.txn_id;
482
0
        DCHECK(pipeline_params.local_params.size() == 1);
483
0
        instance_id = pipeline_params.local_params[0].fragment_instance_id;
484
0
        VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table=" << _table_id
485
0
                   << ", schema version=" << schema_version << ", index size=" << index_size
486
0
                   << ", label=" << label << ", txn_id=" << txn_id
487
0
                   << ", instance_id=" << print_id(instance_id);
488
0
        {
489
0
            auto load_block_queue = std::make_shared<LoadBlockQueue>(
490
0
                    instance_id, label, txn_id, schema_version, index_size, _all_block_queues_bytes,
491
0
                    result.wait_internal_group_commit_finish, result.group_commit_interval_ms,
492
0
                    result.group_commit_data_bytes);
493
0
            RETURN_IF_ERROR(load_block_queue->create_wal(
494
0
                    _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
495
0
                    pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs,
496
0
                    be_exe_version));
497
0
            created_load_block_queue = load_block_queue;
498
499
0
            std::unique_lock l(_lock);
500
0
            _load_block_queues.emplace(instance_id, load_block_queue);
501
0
            std::vector<UniqueId> success_load_ids;
502
0
            for (const auto& [id, load_info] : _create_plan_deps) {
503
0
                auto create_dep = std::get<0>(load_info);
504
0
                auto put_dep = std::get<1>(load_info);
505
0
                if (load_block_queue->schema_version == std::get<2>(load_info) &&
506
0
                    load_block_queue->index_size == std::get<3>(load_info)) {
507
0
                    if (load_block_queue->add_load_id(id, put_dep).ok()) {
508
0
                        create_dep->set_ready();
509
0
                        success_load_ids.emplace_back(id);
510
0
                    }
511
0
                } else if (load_block_queue->schema_version > std::get<2>(load_info) ||
512
0
                           (load_block_queue->schema_version == std::get<2>(load_info) &&
513
0
                            load_block_queue->index_size != std::get<3>(load_info))) {
514
                    // schema version mismatch:
515
                    //   1. the schema version of created load block queue is newer than the load request
516
                    //   2. the index size is not equal
517
                    // set ready for the load request to let it fail
518
0
                    create_dep->set_ready();
519
0
                    success_load_ids.emplace_back(id);
520
0
                }
521
0
            }
522
0
            for (const auto& id : success_load_ids) {
523
0
                _create_plan_deps.erase(id);
524
0
            }
525
0
            if (_create_plan_deps.empty()) {
526
0
                _create_plan_start_time_ms = 0;
527
0
            }
528
0
        }
529
0
    }
530
0
    st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, result.pipeline_params);
531
0
    if (!st.ok()) {
532
0
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
533
0
        auto finish_st = _finish_group_commit_load(_db_id, _table_id, label, txn_id, instance_id,
534
0
                                                   st, nullptr);
535
0
        if (!finish_st.ok()) {
536
0
            LOG(WARNING) << "finish group commit error, label=" << label
537
0
                         << ", st=" << finish_st.to_string();
538
0
        }
539
0
    }
540
0
    return st;
541
0
}
542
543
Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_id,
544
                                                   const std::string& label, int64_t txn_id,
545
                                                   const TUniqueId& instance_id, Status& status,
546
0
                                                   RuntimeState* state) {
547
0
    Status st;
548
0
    Status result_status;
549
0
    DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", {
550
0
        status = Status::InternalError("LoadBlockQueue._finish_group_commit_load.err_status");
551
0
    });
552
0
    DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.load_error",
553
0
                    { status = Status::InternalError("load_error"); });
554
0
    if (status.ok()) {
555
0
        DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_error", {
556
0
            status = Status::InternalError("LoadBlockQueue._finish_group_commit_load.commit_error");
557
0
        });
558
        // commit txn
559
0
        TLoadTxnCommitRequest request;
560
        // deprecated and should be removed in 3.1, use token instead
561
0
        request.__set_auth_code(0);
562
0
        request.__set_token(_exec_env->cluster_info()->curr_auth_token);
563
0
        request.__set_db_id(db_id);
564
0
        request.__set_table_id(table_id);
565
0
        request.__set_txnId(txn_id);
566
0
        request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms);
567
0
        request.__set_groupCommit(true);
568
0
        request.__set_receiveBytes(state->num_bytes_load_total());
569
0
        if (_exec_env->cluster_info()->backend_id != 0) {
570
0
            request.__set_backendId(_exec_env->cluster_info()->backend_id);
571
0
        } else {
572
0
            LOG(WARNING) << "_exec_env->cluster_info not set backend_id";
573
0
        }
574
0
        if (state) {
575
0
            request.__set_commitInfos(state->tablet_commit_infos());
576
0
        }
577
0
        TLoadTxnCommitResult result;
578
0
        TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
579
0
        int retry_times = 0;
580
0
        while (retry_times < config::mow_stream_load_commit_retry_times) {
581
0
            st = ThriftRpcHelper::rpc<FrontendServiceClient>(
582
0
                    master_addr.hostname, master_addr.port,
583
0
                    [&request, &result](FrontendServiceConnection& client) {
584
0
                        client->loadTxnCommit(result, request);
585
0
                    },
586
0
                    config::txn_commit_rpc_timeout_ms);
587
0
            if (st.ok()) {
588
0
                result_status = Status::create(result.status);
589
                // DELETE_BITMAP_LOCK_ERROR will be retried
590
0
                if (result_status.ok() ||
591
0
                    !result_status.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) {
592
0
                    break;
593
0
                }
594
0
                LOG_WARNING("Failed to commit txn on group commit")
595
0
                        .tag("label", label)
596
0
                        .tag("txn_id", txn_id)
597
0
                        .tag("retry_times", retry_times)
598
0
                        .error(result_status);
599
0
            } else {
600
0
                LOG_WARNING("Failed to commit txn on group commit")
601
0
                        .tag("label", label)
602
0
                        .tag("txn_id", txn_id)
603
0
                        .tag("retry_times", retry_times)
604
0
                        .error(st);
605
0
            }
606
0
            retry_times++;
607
0
        }
608
0
        DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_success_and_rpc_error",
609
0
                        { result_status = Status::InternalError("commit_success_and_rpc_error"); });
610
0
    } else {
611
        // abort txn
612
0
        TLoadTxnRollbackRequest request;
613
        // deprecated and should be removed in 3.1, use token instead
614
0
        request.__set_auth_code(0);
615
0
        request.__set_token(_exec_env->cluster_info()->curr_auth_token);
616
0
        request.__set_db_id(db_id);
617
0
        request.__set_txnId(txn_id);
618
0
        request.__set_reason(status.to_string());
619
0
        TLoadTxnRollbackResult result;
620
0
        TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
621
0
        st = ThriftRpcHelper::rpc<FrontendServiceClient>(
622
0
                master_addr.hostname, master_addr.port,
623
0
                [&request, &result](FrontendServiceConnection& client) {
624
0
                    client->loadTxnRollback(result, request);
625
0
                });
626
0
        if (st.ok()) {
627
0
            result_status = Status::create<false>(result.status);
628
0
        }
629
0
        DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", {
630
0
            std ::string msg = "abort txn";
631
0
            LOG(INFO) << "debug promise set: " << msg;
632
0
            ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value(
633
0
                    Status ::InternalError(msg));
634
0
        });
635
0
    }
636
0
    std::shared_ptr<LoadBlockQueue> load_block_queue;
637
0
    {
638
0
        std::lock_guard<std::mutex> l(_lock);
639
0
        auto it = _load_block_queues.find(instance_id);
640
0
        if (it != _load_block_queues.end()) {
641
0
            load_block_queue = it->second;
642
0
            if (!status.ok()) {
643
0
                load_block_queue->cancel(status);
644
0
            }
645
            //close wal
646
0
            RETURN_IF_ERROR(load_block_queue->close_wal());
647
            // notify sync mode loads
648
0
            {
649
0
                std::unique_lock l2(load_block_queue->mutex);
650
0
                load_block_queue->process_finish = true;
651
0
                for (auto dep : load_block_queue->dependencies) {
652
0
                    dep->set_always_ready();
653
0
                }
654
0
            }
655
0
        }
656
0
        _load_block_queues.erase(instance_id);
657
0
    }
658
0
    if (!load_block_queue) {
659
0
        LOG(WARNING) << "finish group commit can not find load block queue, label=" << label
660
0
                     << ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id);
661
0
    }
662
    // status: exec_plan_fragment result
663
    // st: commit txn rpc status
664
    // result_status: commit txn result
665
0
    DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_st", {
666
0
        st = Status::InternalError("LoadBlockQueue._finish_group_commit_load.err_st");
667
0
    });
668
0
    if (status.ok() && st.ok() &&
669
0
        (result_status.ok() || result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) {
670
0
        if (!config::group_commit_wait_replay_wal_finish) {
671
0
            auto delete_st = _exec_env->wal_mgr()->delete_wal(table_id, txn_id);
672
0
            if (!delete_st.ok()) {
673
0
                LOG(WARNING) << "fail to delete wal " << txn_id << ", st=" << delete_st.to_string();
674
0
            }
675
0
        }
676
0
    } else {
677
0
        std::string wal_path;
678
0
        RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path));
679
0
        RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id, txn_id, wal_path));
680
0
    }
681
0
    std::stringstream ss;
682
0
    ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id << ", label=" << label
683
0
       << ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id)
684
0
       << ", exec_plan_fragment status=" << status.to_string()
685
0
       << ", commit/abort txn rpc status=" << st.to_string()
686
0
       << ", commit/abort txn status=" << result_status.to_string();
687
0
    if (load_block_queue) {
688
0
        ss << ", this group commit includes " << load_block_queue->group_commit_load_count
689
0
           << " loads, flush because meet "
690
0
           << (load_block_queue->data_size_condition ? "data size " : "time ") << "condition";
691
0
    } else {
692
0
        ss << ", load block queue is missing when finishing group commit";
693
0
    }
694
0
    ss << ", wal space info:" << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
695
0
    if (state) {
696
0
        if (!state->get_error_log_file_path().empty()) {
697
0
            ss << ", error_url=" << state->get_error_log_file_path();
698
0
        }
699
0
        if (!state->get_first_error_msg().empty()) {
700
0
            ss << ", first_error_msg=" << state->get_first_error_msg();
701
0
        }
702
0
        ss << ", rows=" << state->num_rows_load_success();
703
0
    }
704
0
    LOG(INFO) << ss.str();
705
0
    DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg", {
706
0
        if (dp->param<int64_t>("table_id", -1) == table_id) {
707
0
            std ::string msg = _exec_env->wal_mgr()->get_wal_dirs_info_string();
708
0
            LOG(INFO) << "table_id" << std::to_string(table_id) << " set debug promise: " << msg;
709
0
            ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value(
710
0
                    Status ::InternalError(msg));
711
0
        }
712
0
    };);
713
0
    return st;
714
0
}
715
716
Status GroupCommitTable::_exec_plan_fragment(int64_t db_id, int64_t table_id,
717
                                             const std::string& label, int64_t txn_id,
718
0
                                             const TPipelineFragmentParams& pipeline_params) {
719
0
    auto finish_cb = [db_id, table_id, label, txn_id, this](RuntimeState* state, Status* status) {
720
0
        DCHECK(state);
721
0
        auto finish_st = _finish_group_commit_load(db_id, table_id, label, txn_id,
722
0
                                                   state->fragment_instance_id(), *status, state);
723
0
        if (!finish_st.ok()) {
724
0
            LOG(WARNING) << "finish group commit error, label=" << label
725
0
                         << ", st=" << finish_st.to_string();
726
0
        }
727
0
    };
728
729
0
    TPipelineFragmentParamsList mocked;
730
0
    return _exec_env->fragment_mgr()->exec_plan_fragment(
731
0
            pipeline_params, QuerySource::GROUP_COMMIT_LOAD, finish_cb, mocked);
732
0
}
733
734
Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id,
735
                                              std::shared_ptr<LoadBlockQueue>& load_block_queue,
736
0
                                              std::shared_ptr<Dependency> get_block_dep) {
737
0
    std::unique_lock l(_lock);
738
0
    auto it = _load_block_queues.find(instance_id);
739
0
    if (it == _load_block_queues.end()) {
740
0
        return Status::InternalError("group commit load instance " + print_id(instance_id) +
741
0
                                     " not found");
742
0
    }
743
0
    load_block_queue = it->second;
744
0
    load_block_queue->append_read_dependency(get_block_dep);
745
0
    return Status::OK();
746
0
}
747
748
0
GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : _exec_env(exec_env) {
749
0
    static_cast<void>(ThreadPoolBuilder("GroupCommitThreadPool")
750
0
                              .set_min_threads(1)
751
0
                              .set_max_threads(config::group_commit_insert_threads)
752
0
                              .build(&_thread_pool));
753
0
    _all_block_queues_bytes = std::make_shared<std::atomic_size_t>(0);
754
0
    _group_commit_mem_tracker =
755
0
            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, "GroupCommit");
756
0
    _create_plan_thread = std::thread(&GroupCommitMgr::_create_plan_worker, this);
757
0
}
758
759
0
GroupCommitMgr::~GroupCommitMgr() {
760
0
    stop();
761
0
    LOG(INFO) << "GroupCommitMgr is destoried";
762
0
}
763
764
0
void GroupCommitMgr::stop() {
765
0
    {
766
0
        std::lock_guard<std::mutex> l(_need_create_plan_lock);
767
0
        if (_stopped) {
768
0
            return;
769
0
        }
770
0
        _stopped = true;
771
0
    }
772
0
    _need_create_plan_cv.notify_all();
773
0
    if (_create_plan_thread.joinable()) {
774
0
        _create_plan_thread.join();
775
0
    }
776
0
    _thread_pool->shutdown();
777
0
    LOG(INFO) << "GroupCommitMgr is stopped";
778
0
}
779
780
0
void GroupCommitMgr::add_need_create_plan_table(int64_t table_id) {
781
0
    {
782
0
        std::lock_guard<std::mutex> l(_need_create_plan_lock);
783
0
        if (_stopped) {
784
0
            return;
785
0
        }
786
0
        _need_create_plan_tables.insert(table_id);
787
0
    }
788
0
    _need_create_plan_cv.notify_one();
789
0
}
790
791
0
void GroupCommitMgr::_create_plan_worker() {
792
0
    SCOPED_INIT_THREAD_CONTEXT();
793
0
    while (true) {
794
0
        std::set<int64_t> need_create_plan_tables;
795
0
        {
796
0
            std::unique_lock<std::mutex> l(_need_create_plan_lock);
797
0
            _need_create_plan_cv.wait(
798
0
                    l, [this] { return _stopped || !_need_create_plan_tables.empty(); });
799
0
            if (_stopped && _need_create_plan_tables.empty()) {
800
0
                return;
801
0
            }
802
0
            need_create_plan_tables.swap(_need_create_plan_tables);
803
0
        }
804
0
        for (const auto table_id : need_create_plan_tables) {
805
0
            std::shared_ptr<GroupCommitTable> group_commit_table;
806
0
            {
807
0
                std::lock_guard<std::mutex> l(_lock);
808
0
                auto it = _table_map.find(table_id);
809
0
                if (it == _table_map.end()) {
810
0
                    continue;
811
0
                }
812
0
                group_commit_table = it->second;
813
0
            }
814
0
            auto st = group_commit_table->submit_create_group_commit_load();
815
0
            if (!st.ok()) {
816
0
                LOG(WARNING) << "submit create group commit load task from worker for table: "
817
0
                             << table_id << ", error: " << st.to_string();
818
0
            }
819
0
        }
820
0
    }
821
0
}
822
823
Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t table_id,
824
                                                  int64_t base_schema_version, int64_t index_size,
825
                                                  const UniqueId& load_id,
826
                                                  std::shared_ptr<LoadBlockQueue>& load_block_queue,
827
                                                  int be_exe_version,
828
                                                  std::shared_ptr<Dependency> create_plan_dep,
829
0
                                                  std::shared_ptr<Dependency> put_block_dep) {
830
0
    std::shared_ptr<GroupCommitTable> group_commit_table;
831
0
    {
832
0
        std::lock_guard wlock(_lock);
833
0
        if (_table_map.find(table_id) == _table_map.end()) {
834
0
            _table_map.emplace(table_id, std::make_shared<GroupCommitTable>(
835
0
                                                 _exec_env, _thread_pool.get(), db_id, table_id,
836
0
                                                 _all_block_queues_bytes, this));
837
0
        }
838
0
        group_commit_table = _table_map[table_id];
839
0
    }
840
0
    RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
841
0
            table_id, base_schema_version, index_size, load_id, load_block_queue, be_exe_version,
842
0
            create_plan_dep, put_block_dep));
843
0
    return Status::OK();
844
0
}
845
846
Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
847
                                            std::shared_ptr<LoadBlockQueue>& load_block_queue,
848
0
                                            std::shared_ptr<Dependency> get_block_dep) {
849
0
    std::shared_ptr<GroupCommitTable> group_commit_table;
850
0
    {
851
0
        std::lock_guard<std::mutex> l(_lock);
852
0
        auto it = _table_map.find(table_id);
853
0
        if (it == _table_map.end()) {
854
0
            return Status::NotFound("table_id: " + std::to_string(table_id) +
855
0
                                    ", instance_id: " + print_id(instance_id) + " dose not exist");
856
0
        }
857
0
        group_commit_table = it->second;
858
0
    }
859
0
    return group_commit_table->get_load_block_queue(instance_id, load_block_queue, get_block_dep);
860
0
}
861
862
0
void GroupCommitMgr::remove_load_id(int64_t table_id, const UniqueId& load_id) {
863
0
    std::lock_guard wlock(_lock);
864
0
    if (_table_map.find(table_id) != _table_map.end()) {
865
0
        _table_map.find(table_id)->second->remove_load_id(load_id);
866
0
    }
867
0
}
868
869
Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
870
                                  const std::string& import_label, WalManager* wal_manager,
871
0
                                  std::vector<TSlotDescriptor>& slot_desc, int be_exe_version) {
872
0
    std::string real_label = config::group_commit_wait_replay_wal_finish
873
0
                                     ? import_label + "_test_wait"
874
0
                                     : import_label;
875
0
    RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->create_wal_path(
876
0
            db_id, tb_id, wal_id, real_label, _wal_base_path, WAL_VERSION));
877
0
    _v_wal_writer = std::make_shared<VWalWriter>(db_id, tb_id, wal_id, real_label, wal_manager,
878
0
                                                 slot_desc, be_exe_version);
879
0
    return _v_wal_writer->init();
880
0
}
881
882
0
Status LoadBlockQueue::close_wal() {
883
0
    if (_v_wal_writer != nullptr) {
884
0
        RETURN_IF_ERROR(_v_wal_writer->close());
885
0
    }
886
0
    return Status::OK();
887
0
}
888
889
0
void LoadBlockQueue::append_dependency(std::shared_ptr<Dependency> finish_dep) {
890
0
    std::lock_guard<std::mutex> lock(mutex);
891
    // If not finished, dependencies should be blocked.
892
0
    if (!process_finish) {
893
0
        finish_dep->block();
894
0
        dependencies.push_back(finish_dep);
895
0
    }
896
0
}
897
898
0
void LoadBlockQueue::append_read_dependency(std::shared_ptr<Dependency> read_dep) {
899
0
    std::lock_guard<std::mutex> lock(mutex);
900
0
    _read_deps.push_back(read_dep);
901
0
}
902
903
0
bool LoadBlockQueue::has_enough_wal_disk_space(size_t estimated_wal_bytes) {
904
0
    DBUG_EXECUTE_IF("LoadBlockQueue.has_enough_wal_disk_space.low_space", { return false; });
905
0
    auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
906
0
    size_t available_bytes = 0;
907
0
    {
908
0
        Status st = wal_mgr->get_wal_dir_available_size(_wal_base_path, &available_bytes);
909
0
        if (!st.ok()) {
910
0
            LOG(WARNING) << "get wal dir available size failed, st=" << st.to_string();
911
0
        }
912
0
    }
913
0
    if (estimated_wal_bytes < available_bytes) {
914
0
        Status st =
915
0
                wal_mgr->update_wal_dir_estimated_wal_bytes(_wal_base_path, estimated_wal_bytes, 0);
916
0
        if (!st.ok()) {
917
0
            LOG(WARNING) << "update wal dir estimated_wal_bytes failed, reason: " << st.to_string();
918
0
        }
919
0
        return true;
920
0
    } else {
921
0
        return false;
922
0
    }
923
0
}
924
925
} // namespace doris