Coverage Report

Created: 2025-04-25 17:55

/root/doris/be/src/runtime/query_context.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/query_context.h"
19
20
#include <fmt/core.h>
21
#include <gen_cpp/FrontendService_types.h>
22
#include <gen_cpp/RuntimeProfile_types.h>
23
#include <gen_cpp/Types_types.h>
24
#include <glog/logging.h>
25
26
#include <algorithm>
27
#include <exception>
28
#include <memory>
29
#include <mutex>
30
#include <utility>
31
#include <vector>
32
33
#include "common/logging.h"
34
#include "common/status.h"
35
#include "olap/olap_common.h"
36
#include "pipeline/dependency.h"
37
#include "pipeline/pipeline_fragment_context.h"
38
#include "runtime/exec_env.h"
39
#include "runtime/fragment_mgr.h"
40
#include "runtime/memory/heap_profiler.h"
41
#include "runtime/runtime_query_statistics_mgr.h"
42
#include "runtime/runtime_state.h"
43
#include "runtime/thread_context.h"
44
#include "runtime/workload_group/workload_group_manager.h"
45
#include "runtime/workload_management/query_task_controller.h"
46
#include "runtime_filter/runtime_filter_definitions.h"
47
#include "util/mem_info.h"
48
#include "util/uid_util.h"
49
#include "vec/spill/spill_stream_manager.h"
50
51
namespace doris {
52
53
class DelayReleaseToken : public Runnable {
54
    ENABLE_FACTORY_CREATOR(DelayReleaseToken);
55
56
public:
57
0
    DelayReleaseToken(std::unique_ptr<ThreadPoolToken>&& token) { token_ = std::move(token); }
58
0
    ~DelayReleaseToken() override = default;
59
0
    void run() override {}
60
    std::unique_ptr<ThreadPoolToken> token_;
61
};
62
63
0
const std::string toString(QuerySource queryType) {
64
0
    switch (queryType) {
65
0
    case QuerySource::INTERNAL_FRONTEND:
66
0
        return "INTERNAL_FRONTEND";
67
0
    case QuerySource::STREAM_LOAD:
68
0
        return "STREAM_LOAD";
69
0
    case QuerySource::GROUP_COMMIT_LOAD:
70
0
        return "EXTERNAL_QUERY";
71
0
    case QuerySource::ROUTINE_LOAD:
72
0
        return "ROUTINE_LOAD";
73
0
    case QuerySource::EXTERNAL_CONNECTOR:
74
0
        return "EXTERNAL_CONNECTOR";
75
0
    default:
76
0
        return "UNKNOWN";
77
0
    }
78
0
}
79
80
std::shared_ptr<QueryContext> QueryContext::create(TUniqueId query_id, ExecEnv* exec_env,
81
                                                   const TQueryOptions& query_options,
82
                                                   TNetworkAddress coord_addr, bool is_nereids,
83
                                                   TNetworkAddress current_connect_fe,
84
100
                                                   QuerySource query_type) {
85
100
    auto ctx = QueryContext::create_shared(query_id, exec_env, query_options, coord_addr,
86
100
                                           is_nereids, current_connect_fe, query_type);
87
100
    ctx->init_query_task_controller();
88
100
    return ctx;
89
100
}
90
91
QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env,
92
                           const TQueryOptions& query_options, TNetworkAddress coord_addr,
93
                           bool is_nereids, TNetworkAddress current_connect_fe,
94
                           QuerySource query_source)
95
        : _timeout_second(-1),
96
          _query_id(std::move(query_id)),
97
          _exec_env(exec_env),
98
          _is_nereids(is_nereids),
99
          _query_options(query_options),
100
78.6k
          _query_source(query_source) {
101
78.6k
    _init_resource_context();
102
78.6k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker());
103
78.6k
    _query_watcher.start();
104
78.6k
    _execution_dependency =
105
78.6k
            pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency", false);
106
78.6k
    _memory_sufficient_dependency =
107
78.6k
            pipeline::Dependency::create_unique(-1, -1, "MemorySufficientDependency", true);
108
109
78.6k
    _runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(true);
110
111
78.6k
    _timeout_second = query_options.execution_timeout;
112
113
78.6k
    bool is_query_type_valid = query_options.query_type == TQueryType::SELECT ||
114
78.6k
                               query_options.query_type == TQueryType::LOAD ||
115
78.6k
                               query_options.query_type == TQueryType::EXTERNAL;
116
78.6k
    DCHECK_EQ(is_query_type_valid, true);
117
118
78.6k
    this->coord_addr = coord_addr;
119
    // current_connect_fe is used for report query statistics
120
78.6k
    this->current_connect_fe = current_connect_fe;
121
    // external query has no current_connect_fe
122
78.6k
    if (query_options.query_type != TQueryType::EXTERNAL) {
123
189
        bool is_report_fe_addr_valid =
124
189
                !this->current_connect_fe.hostname.empty() && this->current_connect_fe.port != 0;
125
189
        DCHECK_EQ(is_report_fe_addr_valid, true);
126
189
    }
127
78.6k
    clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp);
128
78.6k
    DorisMetrics::instance()->query_ctx_cnt->increment(1);
129
78.6k
}
130
131
78.6k
void QueryContext::_init_query_mem_tracker() {
132
78.6k
    bool has_query_mem_limit = _query_options.__isset.mem_limit && (_query_options.mem_limit > 0);
133
78.6k
    int64_t bytes_limit = has_query_mem_limit ? _query_options.mem_limit : -1;
134
78.6k
    if (bytes_limit > MemInfo::mem_limit() || bytes_limit == -1) {
135
0
        VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES)
136
0
                    << " exceeds process memory limit of "
137
0
                    << PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES)
138
0
                    << " OR is -1. Using process memory limit instead.";
139
0
        bytes_limit = MemInfo::mem_limit();
140
0
    }
141
    // If the query is a pure load task(streamload, routine load, group commit), then it should not use
142
    // memlimit per query to limit their memory usage.
143
78.6k
    if (is_pure_load_task()) {
144
78.4k
        bytes_limit = MemInfo::mem_limit();
145
78.4k
    }
146
78.6k
    std::shared_ptr<MemTrackerLimiter> query_mem_tracker;
147
78.6k
    if (_query_options.query_type == TQueryType::SELECT) {
148
186
        query_mem_tracker = MemTrackerLimiter::create_shared(
149
186
                MemTrackerLimiter::Type::QUERY, fmt::format("Query#Id={}", print_id(_query_id)),
150
186
                bytes_limit);
151
78.4k
    } else if (_query_options.query_type == TQueryType::LOAD) {
152
3
        query_mem_tracker = MemTrackerLimiter::create_shared(
153
3
                MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", print_id(_query_id)),
154
3
                bytes_limit);
155
78.4k
    } else if (_query_options.query_type == TQueryType::EXTERNAL) { // spark/flink/etc..
156
78.4k
        query_mem_tracker = MemTrackerLimiter::create_shared(
157
78.4k
                MemTrackerLimiter::Type::QUERY, fmt::format("External#Id={}", print_id(_query_id)),
158
78.4k
                bytes_limit);
159
78.4k
    } else {
160
0
        LOG(FATAL) << "__builtin_unreachable";
161
0
        __builtin_unreachable();
162
0
    }
163
78.6k
    if (_query_options.__isset.is_report_success && _query_options.is_report_success) {
164
0
        query_mem_tracker->enable_print_log_usage();
165
0
    }
166
167
78.6k
    query_mem_tracker->set_enable_reserve_memory(_query_options.__isset.enable_reserve_memory &&
168
78.6k
                                                 _query_options.enable_reserve_memory);
169
78.6k
    _resource_ctx->memory_context()->set_mem_tracker(query_mem_tracker);
170
78.6k
}
171
172
78.6k
void QueryContext::_init_resource_context() {
173
78.6k
    _resource_ctx = ResourceContext::create_shared();
174
78.6k
    _init_query_mem_tracker();
175
78.6k
}
176
177
78.5k
void QueryContext::init_query_task_controller() {
178
78.5k
    _resource_ctx->set_task_controller(QueryTaskController::create(this));
179
78.5k
    _resource_ctx->task_controller()->set_task_id(_query_id);
180
78.5k
    _resource_ctx->task_controller()->set_fe_addr(current_connect_fe);
181
78.5k
    _resource_ctx->task_controller()->set_query_type(_query_options.query_type);
182
#ifndef BE_TEST
183
    _exec_env->runtime_query_statistics_mgr()->register_resource_context(print_id(_query_id),
184
                                                                         _resource_ctx);
185
#endif
186
78.5k
}
187
188
78.6k
QueryContext::~QueryContext() {
189
78.6k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker());
190
    // query mem tracker consumption is equal to 0, it means that after QueryContext is created,
191
    // it is found that query already exists in _query_ctx_map, and query mem tracker is not used.
192
    // query mem tracker consumption is not equal to 0 after use, because there is memory consumed
193
    // on query mem tracker, released on other trackers.
194
78.6k
    std::string mem_tracker_msg;
195
78.6k
    if (query_mem_tracker()->peak_consumption() != 0) {
196
30
        mem_tracker_msg = fmt::format(
197
30
                "deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, "
198
30
                "PeakUsed={}",
199
30
                print_id(_query_id), MemCounter::print_bytes(query_mem_tracker()->limit()),
200
30
                MemCounter::print_bytes(query_mem_tracker()->consumption()),
201
30
                MemCounter::print_bytes(query_mem_tracker()->peak_consumption()));
202
30
    }
203
78.6k
    [[maybe_unused]] uint64_t group_id = 0;
204
78.6k
    if (workload_group()) {
205
22
        group_id = workload_group()->id(); // before remove
206
22
    }
207
208
78.6k
    _resource_ctx->task_controller()->finish();
209
210
78.6k
    if (enable_profile()) {
211
0
        _report_query_profile();
212
0
    }
213
214
    // Not release the the thread token in query context's dector method, because the query
215
    // conext may be dectored in the thread token it self. It is very dangerous and may core.
216
    // And also thread token need shutdown, it may take some time, may cause the thread that
217
    // release the token hang, the thread maybe a pipeline task scheduler thread.
218
78.6k
    if (_thread_token) {
219
0
        Status submit_st = ExecEnv::GetInstance()->lazy_release_obj_pool()->submit(
220
0
                DelayReleaseToken::create_shared(std::move(_thread_token)));
221
0
        if (!submit_st.ok()) {
222
0
            LOG(WARNING) << "Failed to release query context thread token, query_id "
223
0
                         << print_id(_query_id) << ", error status " << submit_st;
224
0
        }
225
0
    }
226
#ifndef BE_TEST
227
    if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) [[unlikely]] {
228
        try {
229
            ExecEnv::GetInstance()->pipeline_tracer_context()->end_query(_query_id, group_id);
230
        } catch (std::exception& e) {
231
            LOG(WARNING) << "Dump trace log failed bacause " << e.what();
232
        }
233
    }
234
#endif
235
78.6k
    _runtime_filter_mgr.reset();
236
78.6k
    _execution_dependency.reset();
237
78.6k
    _runtime_predicates.clear();
238
78.6k
    file_scan_range_params_map.clear();
239
78.6k
    obj_pool.clear();
240
78.6k
    _merge_controller_handler.reset();
241
242
#ifndef BE_TEST
243
    _exec_env->spill_stream_mgr()->async_cleanup_query(_query_id);
244
#endif
245
78.6k
    DorisMetrics::instance()->query_ctx_cnt->increment(-1);
246
    // the only one msg shows query's end. any other msg should append to it if need.
247
78.6k
    LOG_INFO("Query {} deconstructed, mem_tracker: {}", print_id(this->_query_id), mem_tracker_msg);
248
78.6k
}
249
250
18
void QueryContext::set_ready_to_execute(Status reason) {
251
18
    set_execution_dependency_ready();
252
18
    _exec_status.update(reason);
253
18
}
254
255
0
void QueryContext::set_ready_to_execute_only() {
256
0
    set_execution_dependency_ready();
257
0
}
258
259
18
void QueryContext::set_execution_dependency_ready() {
260
18
    _execution_dependency->set_ready();
261
18
}
262
263
18
void QueryContext::set_memory_sufficient(bool sufficient) {
264
18
    if (sufficient) {
265
8
        {
266
8
            _memory_sufficient_dependency->set_ready();
267
8
            _resource_ctx->task_controller()->reset_paused_reason();
268
8
        }
269
10
    } else {
270
10
        _memory_sufficient_dependency->block();
271
10
        _resource_ctx->task_controller()->add_paused_count();
272
10
    }
273
18
}
274
275
18
void QueryContext::cancel(Status new_status, int fragment_id) {
276
18
    if (!_exec_status.update(new_status)) {
277
0
        return;
278
0
    }
279
    // Tasks should be always runnable.
280
18
    _execution_dependency->set_always_ready();
281
18
    _memory_sufficient_dependency->set_always_ready();
282
18
    if ((new_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() ||
283
18
         new_status.is<ErrorCode::MEM_ALLOC_FAILED>()) &&
284
18
        _query_options.__isset.dump_heap_profile_when_mem_limit_exceeded &&
285
18
        _query_options.dump_heap_profile_when_mem_limit_exceeded) {
286
        // if query is cancelled because of query mem limit exceeded, dump heap profile
287
        // at the time of cancellation can get the most accurate memory usage for problem analysis
288
0
        auto wg = workload_group();
289
0
        auto log_str = fmt::format(
290
0
                "Query {} canceled because of memory limit exceeded, dumping memory "
291
0
                "detail profiles. wg: {}. {}",
292
0
                print_id(_query_id), wg ? wg->debug_string() : "null",
293
0
                doris::ProcessProfile::instance()->memory_profile()->process_memory_detail_str());
294
0
        LOG_LONG_STRING(INFO, log_str);
295
0
        std::string dot = HeapProfiler::instance()->dump_heap_profile_to_dot();
296
0
        if (!dot.empty()) {
297
0
            dot += "\n-------------------------------------------------------\n";
298
0
            dot += "Copy the text after `digraph` in the above output to "
299
0
                   "http://www.webgraphviz.com to generate a dot graph.\n"
300
0
                   "after start heap profiler, if there is no operation, will print `No nodes "
301
0
                   "to "
302
0
                   "print`."
303
0
                   "If there are many errors: `addr2line: Dwarf Error`,"
304
0
                   "or other FAQ, reference doc: "
305
0
                   "https://doris.apache.org/community/developer-guide/debug-tool/#4-qa\n";
306
0
            auto log_str =
307
0
                    fmt::format("Query {}, dump heap profile to dot: {}", print_id(_query_id), dot);
308
0
            LOG_LONG_STRING(INFO, log_str);
309
0
        }
310
0
    }
311
312
18
    set_ready_to_execute(new_status);
313
18
    cancel_all_pipeline_context(new_status, fragment_id);
314
18
}
315
316
0
void QueryContext::set_load_error_url(std::string error_url) {
317
0
    std::lock_guard<std::mutex> lock(_error_url_lock);
318
0
    _load_error_url = error_url;
319
0
}
320
321
0
std::string QueryContext::get_load_error_url() {
322
0
    std::lock_guard<std::mutex> lock(_error_url_lock);
323
0
    return _load_error_url;
324
0
}
325
326
18
void QueryContext::cancel_all_pipeline_context(const Status& reason, int fragment_id) {
327
18
    std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_cancel;
328
18
    {
329
18
        std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
330
18
        for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
331
0
            if (fragment_id == f_id) {
332
0
                continue;
333
0
            }
334
0
            ctx_to_cancel.push_back(f_context);
335
0
        }
336
18
    }
337
18
    for (auto& f_context : ctx_to_cancel) {
338
0
        if (auto pipeline_ctx = f_context.lock()) {
339
0
            pipeline_ctx->cancel(reason);
340
0
        }
341
0
    }
342
18
}
343
344
0
std::string QueryContext::print_all_pipeline_context() {
345
0
    std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_print;
346
0
    fmt::memory_buffer debug_string_buffer;
347
0
    size_t i = 0;
348
0
    {
349
0
        fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts in query {}. \n",
350
0
                       _fragment_id_to_pipeline_ctx.size(), print_id(_query_id));
351
352
0
        {
353
0
            std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
354
0
            for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
355
0
                ctx_to_print.push_back(f_context);
356
0
            }
357
0
        }
358
0
        for (auto& f_context : ctx_to_print) {
359
0
            if (auto pipeline_ctx = f_context.lock()) {
360
0
                auto elapsed = pipeline_ctx->elapsed_time() / 1000000000.0;
361
0
                fmt::format_to(debug_string_buffer,
362
0
                               "No.{} (elapse_second={}s, fragment_id={}) : {}\n", i, elapsed,
363
0
                               pipeline_ctx->get_fragment_id(), pipeline_ctx->debug_string());
364
0
                i++;
365
0
            }
366
0
        }
367
0
    }
368
0
    return fmt::to_string(debug_string_buffer);
369
0
}
370
371
void QueryContext::set_pipeline_context(
372
0
        const int fragment_id, std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx) {
373
0
    std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
374
0
    _fragment_id_to_pipeline_ctx.insert({fragment_id, pip_ctx});
375
0
}
376
377
0
doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
378
0
    if (workload_group()) {
379
0
        if (_task_scheduler) {
380
0
            return _task_scheduler;
381
0
        }
382
0
    }
383
0
    return _exec_env->pipeline_task_scheduler();
384
0
}
385
386
22
void QueryContext::set_workload_group(WorkloadGroupPtr& wg) {
387
22
    _resource_ctx->set_workload_group(wg);
388
    // Should add query first, then the workload group will not be deleted.
389
    // see task_group_manager::delete_workload_group_by_ids
390
22
    workload_group()->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler,
391
22
                                          &_remote_scan_task_scheduler);
392
22
}
393
394
void QueryContext::add_fragment_profile(
395
        int fragment_id, const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles,
396
0
        std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
397
0
    if (pipeline_profiles.empty()) {
398
0
        std::string msg = fmt::format("Add pipeline profile failed, query {}, fragment {}",
399
0
                                      print_id(this->_query_id), fragment_id);
400
0
        LOG_ERROR(msg);
401
0
        DCHECK(false) << msg;
402
0
        return;
403
0
    }
404
405
0
#ifndef NDEBUG
406
0
    for (const auto& p : pipeline_profiles) {
407
0
        DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed, query {}, fragment {}",
408
0
                                            print_id(this->_query_id), fragment_id);
409
0
    }
410
0
#endif
411
412
0
    std::lock_guard<std::mutex> l(_profile_mutex);
413
0
    VLOG_ROW << fmt::format(
414
0
            "Query add fragment profile, query {}, fragment {}, pipeline profile count {} ",
415
0
            print_id(this->_query_id), fragment_id, pipeline_profiles.size());
416
417
0
    _profile_map.insert(std::make_pair(fragment_id, pipeline_profiles));
418
419
0
    if (load_channel_profile != nullptr) {
420
0
        _load_channel_profile_map.insert(std::make_pair(fragment_id, load_channel_profile));
421
0
    }
422
0
}
423
424
0
void QueryContext::_report_query_profile() {
425
0
    std::lock_guard<std::mutex> lg(_profile_mutex);
426
427
0
    for (auto& [fragment_id, fragment_profile] : _profile_map) {
428
0
        std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
429
430
0
        if (_load_channel_profile_map.contains(fragment_id)) {
431
0
            load_channel_profile = _load_channel_profile_map[fragment_id];
432
0
        }
433
434
0
        ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile(
435
0
                _query_id, this->coord_addr, fragment_id, fragment_profile, load_channel_profile);
436
0
    }
437
438
0
    ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile();
439
0
}
440
441
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
442
0
QueryContext::_collect_realtime_query_profile() {
443
0
    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> res;
444
0
    std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
445
0
    for (const auto& [fragment_id, fragment_ctx_wptr] : _fragment_id_to_pipeline_ctx) {
446
0
        if (auto fragment_ctx = fragment_ctx_wptr.lock()) {
447
0
            if (fragment_ctx == nullptr) {
448
0
                std::string msg =
449
0
                        fmt::format("PipelineFragmentContext is nullptr, query {} fragment_id: {}",
450
0
                                    print_id(_query_id), fragment_id);
451
0
                LOG_ERROR(msg);
452
0
                DCHECK(false) << msg;
453
0
                continue;
454
0
            }
455
456
0
            auto profile = fragment_ctx->collect_realtime_profile();
457
458
0
            if (profile.empty()) {
459
0
                std::string err_msg = fmt::format(
460
0
                        "Get nothing when collecting profile, query {}, fragment_id: {}",
461
0
                        print_id(_query_id), fragment_id);
462
0
                LOG_ERROR(err_msg);
463
0
                DCHECK(false) << err_msg;
464
0
                continue;
465
0
            }
466
467
0
            res.insert(std::make_pair(fragment_id, profile));
468
0
        }
469
0
    }
470
471
0
    return res;
472
0
}
473
474
0
TReportExecStatusParams QueryContext::get_realtime_exec_status() {
475
0
    TReportExecStatusParams exec_status;
476
477
0
    auto realtime_query_profile = _collect_realtime_query_profile();
478
0
    std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles;
479
480
0
    for (auto load_channel_profile : _load_channel_profile_map) {
481
0
        if (load_channel_profile.second != nullptr) {
482
0
            load_channel_profiles.push_back(load_channel_profile.second);
483
0
        }
484
0
    }
485
486
0
    exec_status = RuntimeQueryStatisticsMgr::create_report_exec_status_params(
487
0
            this->_query_id, std::move(realtime_query_profile), std::move(load_channel_profiles),
488
0
            /*is_done=*/false);
489
490
0
    return exec_status;
491
0
}
492
493
} // namespace doris