Coverage Report

Created: 2025-10-16 20:27

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/runtime/fragment_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/fragment_mgr.h"
19
20
#include <brpc/controller.h>
21
#include <bvar/latency_recorder.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/DorisExternalService_types.h>
24
#include <gen_cpp/FrontendService.h>
25
#include <gen_cpp/FrontendService_types.h>
26
#include <gen_cpp/HeartbeatService_types.h>
27
#include <gen_cpp/Metrics_types.h>
28
#include <gen_cpp/PaloInternalService_types.h>
29
#include <gen_cpp/PlanNodes_types.h>
30
#include <gen_cpp/Planner_types.h>
31
#include <gen_cpp/QueryPlanExtra_types.h>
32
#include <gen_cpp/RuntimeProfile_types.h>
33
#include <gen_cpp/Types_types.h>
34
#include <gen_cpp/internal_service.pb.h>
35
#include <pthread.h>
36
#include <sys/time.h>
37
#include <thrift/TApplicationException.h>
38
#include <thrift/Thrift.h>
39
#include <thrift/protocol/TDebugProtocol.h>
40
#include <thrift/transport/TTransportException.h>
41
#include <unistd.h>
42
43
#include <algorithm>
44
#include <cstddef>
45
#include <ctime>
46
47
// IWYU pragma: no_include <bits/chrono.h>
48
#include <chrono> // IWYU pragma: keep
49
#include <cstdint>
50
#include <map>
51
#include <memory>
52
#include <mutex>
53
#include <sstream>
54
#include <unordered_map>
55
#include <unordered_set>
56
#include <utility>
57
58
#include "common/config.h"
59
#include "common/exception.h"
60
#include "common/logging.h"
61
#include "common/object_pool.h"
62
#include "common/status.h"
63
#include "common/utils.h"
64
#include "io/fs/stream_load_pipe.h"
65
#include "pipeline/pipeline_fragment_context.h"
66
#include "runtime/client_cache.h"
67
#include "runtime/descriptors.h"
68
#include "runtime/exec_env.h"
69
#include "runtime/frontend_info.h"
70
#include "runtime/primitive_type.h"
71
#include "runtime/query_context.h"
72
#include "runtime/runtime_query_statistics_mgr.h"
73
#include "runtime/runtime_state.h"
74
#include "runtime/stream_load/new_load_stream_mgr.h"
75
#include "runtime/stream_load/stream_load_context.h"
76
#include "runtime/stream_load/stream_load_executor.h"
77
#include "runtime/thread_context.h"
78
#include "runtime/types.h"
79
#include "runtime/workload_group/workload_group.h"
80
#include "runtime/workload_group/workload_group_manager.h"
81
#include "runtime_filter/runtime_filter_consumer.h"
82
#include "runtime_filter/runtime_filter_mgr.h"
83
#include "service/backend_options.h"
84
#include "util/brpc_client_cache.h"
85
#include "util/debug_points.h"
86
#include "util/debug_util.h"
87
#include "util/doris_metrics.h"
88
#include "util/network_util.h"
89
#include "util/runtime_profile.h"
90
#include "util/thread.h"
91
#include "util/threadpool.h"
92
#include "util/thrift_util.h"
93
#include "util/uid_util.h"
94
95
namespace doris {
96
#include "common/compile_check_begin.h"
97
98
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, MetricUnit::NOUNIT);
99
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT);
100
101
bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare");
102
103
bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count");
104
bvar::Status<uint64_t> g_fragment_last_active_time(
105
        "fragment_last_active_time", duration_cast<std::chrono::milliseconds>(
106
                                             std::chrono::system_clock::now().time_since_epoch())
107
                                             .count());
108
109
0
uint64_t get_fragment_executing_count() {
110
0
    return g_fragment_executing_count.get_value();
111
0
}
112
0
uint64_t get_fragment_last_active_time() {
113
0
    return g_fragment_last_active_time.get_value();
114
0
}
115
116
0
std::string to_load_error_http_path(const std::string& file_name) {
117
0
    if (file_name.empty()) {
118
0
        return "";
119
0
    }
120
0
    if (file_name.compare(0, 4, "http") == 0) {
121
0
        return file_name;
122
0
    }
123
0
    std::stringstream url;
124
0
    url << "http://" << get_host_port(BackendOptions::get_localhost(), config::webserver_port)
125
0
        << "/api/_load_error_log?"
126
0
        << "file=" << file_name;
127
0
    return url.str();
128
0
}
129
130
using apache::thrift::TException;
131
using apache::thrift::transport::TTransportException;
132
133
static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info,
134
0
                                            std::unordered_set<TUniqueId>& query_set) {
135
0
    TFetchRunningQueriesResult rpc_result;
136
0
    TFetchRunningQueriesRequest rpc_request;
137
138
0
    Status client_status;
139
0
    const int32_t timeout_ms = 3 * 1000;
140
0
    FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(),
141
0
                                         fe_info.info.coordinator_address, timeout_ms,
142
0
                                         &client_status);
143
    // Abort this fe.
144
0
    if (!client_status.ok()) {
145
0
        LOG_WARNING("Failed to get client for {}, reason is {}",
146
0
                    PrintThriftNetworkAddress(fe_info.info.coordinator_address),
147
0
                    client_status.to_string());
148
0
        return Status::InternalError("Failed to get client for {}, reason is {}",
149
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
150
0
                                     client_status.to_string());
151
0
    }
152
153
    // do rpc
154
0
    try {
155
0
        try {
156
0
            rpc_client->fetchRunningQueries(rpc_result, rpc_request);
157
0
        } catch (const apache::thrift::transport::TTransportException& e) {
158
0
            LOG_WARNING("Transport exception reason: {}, reopening", e.what());
159
0
            client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
160
0
            if (!client_status.ok()) {
161
0
                LOG_WARNING("Reopen failed, reason: {}", client_status.to_string_no_stack());
162
0
                return Status::InternalError("Reopen failed, reason: {}",
163
0
                                             client_status.to_string_no_stack());
164
0
            }
165
166
0
            rpc_client->fetchRunningQueries(rpc_result, rpc_request);
167
0
        }
168
0
    } catch (apache::thrift::TException& e) {
169
        // During upgrading cluster or meet any other network error.
170
0
        LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
171
0
                    PrintThriftNetworkAddress(fe_info.info.coordinator_address), e.what());
172
0
        return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
173
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
174
0
                                     e.what());
175
0
    }
176
177
    // Avoid logic error in frontend.
178
0
    if (!rpc_result.__isset.status || rpc_result.status.status_code != TStatusCode::OK) {
179
0
        LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
180
0
                    PrintThriftNetworkAddress(fe_info.info.coordinator_address),
181
0
                    doris::to_string(rpc_result.status.status_code));
182
0
        return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
183
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
184
0
                                     doris::to_string(rpc_result.status.status_code));
185
0
    }
186
187
0
    if (!rpc_result.__isset.running_queries) {
188
0
        return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
189
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
190
0
                                     "running_queries is not set");
191
0
    }
192
193
0
    query_set = std::unordered_set<TUniqueId>(rpc_result.running_queries.begin(),
194
0
                                              rpc_result.running_queries.end());
195
0
    return Status::OK();
196
0
};
197
198
0
static std::map<int64_t, std::unordered_set<TUniqueId>> _get_all_running_queries_from_fe() {
199
0
    const std::map<TNetworkAddress, FrontendInfo>& running_fes =
200
0
            ExecEnv::GetInstance()->get_running_frontends();
201
202
0
    std::map<int64_t, std::unordered_set<TUniqueId>> result;
203
0
    std::vector<FrontendInfo> qualified_fes;
204
205
0
    for (const auto& fe : running_fes) {
206
        // Only consider normal frontend.
207
0
        if (fe.first.port != 0 && fe.second.info.process_uuid != 0) {
208
0
            qualified_fes.push_back(fe.second);
209
0
        } else {
210
0
            return {};
211
0
        }
212
0
    }
213
214
0
    for (const auto& fe_addr : qualified_fes) {
215
0
        const int64_t process_uuid = fe_addr.info.process_uuid;
216
0
        std::unordered_set<TUniqueId> query_set;
217
0
        Status st = _do_fetch_running_queries_rpc(fe_addr, query_set);
218
0
        if (!st.ok()) {
219
            // Empty result, cancel worker will not do anything
220
0
            return {};
221
0
        }
222
223
        // frontend_info and process_uuid has been checked in rpc threads.
224
0
        result[process_uuid] = query_set;
225
0
    }
226
227
0
    return result;
228
0
}
229
230
1
inline uint32_t get_map_id(const TUniqueId& query_id, size_t capacity) {
231
1
    uint32_t value = HashUtil::hash(&query_id.lo, 8, 0);
232
1
    value = HashUtil::hash(&query_id.hi, 8, value);
233
1
    return value % capacity;
234
1
}
235
236
0
inline uint32_t get_map_id(std::pair<TUniqueId, int> key, size_t capacity) {
237
0
    uint32_t value = HashUtil::hash(&key.first.lo, 8, 0);
238
0
    value = HashUtil::hash(&key.first.hi, 8, value);
239
0
    return value % capacity;
240
0
}
241
242
template <typename Key, typename Value, typename ValueType>
243
14
ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
244
14
    _internal_map.resize(config::num_query_ctx_map_partitions);
245
1.80k
    for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
246
1.79k
        _internal_map[i] = {std::make_unique<std::shared_mutex>(),
247
1.79k
                            phmap::flat_hash_map<Key, Value>()};
248
1.79k
    }
249
14
}
_ZN5doris20ConcurrentContextMapISt4pairINS_9TUniqueIdEiESt10shared_ptrINS_8pipeline23PipelineFragmentContextEES6_EC2Ev
Line
Count
Source
243
7
ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
244
7
    _internal_map.resize(config::num_query_ctx_map_partitions);
245
903
    for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
246
896
        _internal_map[i] = {std::make_unique<std::shared_mutex>(),
247
896
                            phmap::flat_hash_map<Key, Value>()};
248
896
    }
249
7
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt8weak_ptrINS_12QueryContextEES3_EC2Ev
Line
Count
Source
243
7
ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
244
7
    _internal_map.resize(config::num_query_ctx_map_partitions);
245
903
    for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
246
896
        _internal_map[i] = {std::make_unique<std::shared_mutex>(),
247
896
                            phmap::flat_hash_map<Key, Value>()};
248
896
    }
249
7
}
250
251
template <typename Key, typename Value, typename ValueType>
252
1
Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) {
253
1
    auto id = get_map_id(query_id, _internal_map.size());
254
1
    {
255
1
        std::shared_lock lock(*_internal_map[id].first);
256
1
        auto& map = _internal_map[id].second;
257
1
        auto search = map.find(query_id);
258
1
        if (search != map.end()) {
259
0
            return search->second;
260
0
        }
261
1
        return std::shared_ptr<ValueType>(nullptr);
262
1
    }
263
1
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt8weak_ptrINS_12QueryContextEES3_E4findERKS1_
Line
Count
Source
252
1
Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) {
253
1
    auto id = get_map_id(query_id, _internal_map.size());
254
1
    {
255
1
        std::shared_lock lock(*_internal_map[id].first);
256
1
        auto& map = _internal_map[id].second;
257
1
        auto search = map.find(query_id);
258
1
        if (search != map.end()) {
259
0
            return search->second;
260
0
        }
261
1
        return std::shared_ptr<ValueType>(nullptr);
262
1
    }
263
1
}
Unexecuted instantiation: _ZN5doris20ConcurrentContextMapISt4pairINS_9TUniqueIdEiESt10shared_ptrINS_8pipeline23PipelineFragmentContextEES6_E4findERKS3_
264
265
template <typename Key, typename Value, typename ValueType>
266
Status ConcurrentContextMap<Key, Value, ValueType>::apply_if_not_exists(
267
0
        const Key& query_id, std::shared_ptr<ValueType>& query_ctx, ApplyFunction&& function) {
268
0
    auto id = get_map_id(query_id, _internal_map.size());
269
0
    {
270
0
        std::unique_lock lock(*_internal_map[id].first);
271
0
        auto& map = _internal_map[id].second;
272
0
        auto search = map.find(query_id);
273
0
        if (search != map.end()) {
274
0
            query_ctx = search->second.lock();
275
0
        }
276
0
        if (!query_ctx) {
277
0
            return function(map);
278
0
        }
279
0
        return Status::OK();
280
0
    }
281
0
}
282
283
template <typename Key, typename Value, typename ValueType>
284
0
void ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
285
0
    auto id = get_map_id(query_id, _internal_map.size());
286
0
    {
287
0
        std::unique_lock lock(*_internal_map[id].first);
288
0
        auto& map = _internal_map[id].second;
289
0
        map.erase(query_id);
290
0
    }
291
0
}
292
293
template <typename Key, typename Value, typename ValueType>
294
void ConcurrentContextMap<Key, Value, ValueType>::insert(const Key& query_id,
295
0
                                                         std::shared_ptr<ValueType> query_ctx) {
296
0
    auto id = get_map_id(query_id, _internal_map.size());
297
0
    {
298
0
        std::unique_lock lock(*_internal_map[id].first);
299
0
        auto& map = _internal_map[id].second;
300
0
        map.insert({query_id, query_ctx});
301
0
    }
302
0
}
303
304
template <typename Key, typename Value, typename ValueType>
305
14
void ConcurrentContextMap<Key, Value, ValueType>::clear() {
306
1.79k
    for (auto& pair : _internal_map) {
307
1.79k
        std::unique_lock lock(*pair.first);
308
1.79k
        auto& map = pair.second;
309
1.79k
        map.clear();
310
1.79k
    }
311
14
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt8weak_ptrINS_12QueryContextEES3_E5clearEv
Line
Count
Source
305
7
void ConcurrentContextMap<Key, Value, ValueType>::clear() {
306
896
    for (auto& pair : _internal_map) {
307
896
        std::unique_lock lock(*pair.first);
308
896
        auto& map = pair.second;
309
896
        map.clear();
310
896
    }
311
7
}
_ZN5doris20ConcurrentContextMapISt4pairINS_9TUniqueIdEiESt10shared_ptrINS_8pipeline23PipelineFragmentContextEES6_E5clearEv
Line
Count
Source
305
7
void ConcurrentContextMap<Key, Value, ValueType>::clear() {
306
896
    for (auto& pair : _internal_map) {
307
896
        std::unique_lock lock(*pair.first);
308
896
        auto& map = pair.second;
309
896
        map.clear();
310
896
    }
311
7
}
312
313
FragmentMgr::FragmentMgr(ExecEnv* exec_env)
314
7
        : _exec_env(exec_env), _stop_background_threads_latch(1) {
315
7
    _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
316
7
    INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
317
318
7
    auto s = Thread::create(
319
7
            "FragmentMgr", "cancel_timeout_plan_fragment", [this]() { this->cancel_worker(); },
320
7
            &_cancel_thread);
321
7
    CHECK(s.ok()) << s.to_string();
322
323
7
    s = ThreadPoolBuilder("FragmentMgrAsyncWorkThreadPool")
324
7
                .set_min_threads(config::fragment_mgr_async_work_pool_thread_num_min)
325
7
                .set_max_threads(config::fragment_mgr_async_work_pool_thread_num_max)
326
7
                .set_max_queue_size(config::fragment_mgr_async_work_pool_queue_size)
327
7
                .build(&_thread_pool);
328
7
    CHECK(s.ok()) << s.to_string();
329
7
}
330
331
7
FragmentMgr::~FragmentMgr() = default;
332
333
7
void FragmentMgr::stop() {
334
7
    DEREGISTER_HOOK_METRIC(fragment_instance_count);
335
7
    _stop_background_threads_latch.count_down();
336
7
    if (_cancel_thread) {
337
7
        _cancel_thread->join();
338
7
    }
339
340
7
    _thread_pool->shutdown();
341
    // Only me can delete
342
7
    _query_ctx_map.clear();
343
7
    _pipeline_map.clear();
344
7
}
345
346
0
std::string FragmentMgr::to_http_path(const std::string& file_name) {
347
0
    std::stringstream url;
348
0
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
349
0
        << "/api/_download_load?"
350
0
        << "token=" << _exec_env->token() << "&file=" << file_name;
351
0
    return url.str();
352
0
}
353
354
Status FragmentMgr::trigger_pipeline_context_report(
355
0
        const ReportStatusRequest req, std::shared_ptr<pipeline::PipelineFragmentContext>&& ctx) {
356
0
    return _thread_pool->submit_func([this, req, ctx]() {
357
0
        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
358
0
        coordinator_callback(req);
359
0
        if (!req.done) {
360
0
            ctx->refresh_next_report_time();
361
0
        }
362
0
    });
363
0
}
364
365
// There can only be one of these callbacks in-flight at any moment, because
366
// it is only invoked from the executor's reporting thread.
367
// Also, the reported status will always reflect the most recent execution status,
368
// including the final status when execution finishes.
369
0
void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
370
0
    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
371
0
    if (req.coord_addr.hostname == "external") {
372
        // External query (flink/spark read tablets) not need to report to FE.
373
0
        return;
374
0
    }
375
0
    int callback_retries = 10;
376
0
    const int sleep_ms = 1000;
377
0
    Status exec_status = req.status;
378
0
    Status coord_status;
379
0
    std::unique_ptr<FrontendServiceConnection> coord = nullptr;
380
0
    do {
381
0
        coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
382
0
                                                            req.coord_addr, &coord_status);
383
0
        if (!coord_status.ok()) {
384
0
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
385
0
        }
386
0
    } while (!coord_status.ok() && callback_retries-- > 0);
387
388
0
    if (!coord_status.ok()) {
389
0
        std::stringstream ss;
390
0
        UniqueId uid(req.query_id.hi, req.query_id.lo);
391
0
        static_cast<void>(req.cancel_fn(Status::InternalError(
392
0
                "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
393
0
                PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
394
0
        return;
395
0
    }
396
397
0
    TReportExecStatusParams params;
398
0
    params.protocol_version = FrontendServiceVersion::V1;
399
0
    params.__set_query_id(req.query_id);
400
0
    params.__set_backend_num(req.backend_num);
401
0
    params.__set_fragment_instance_id(req.fragment_instance_id);
402
0
    params.__set_fragment_id(req.fragment_id);
403
0
    params.__set_status(exec_status.to_thrift());
404
0
    params.__set_done(req.done);
405
0
    params.__set_query_type(req.runtime_state->query_type());
406
0
    params.__isset.profile = false;
407
408
0
    DCHECK(req.runtime_state != nullptr);
409
410
0
    if (req.runtime_state->query_type() == TQueryType::LOAD) {
411
0
        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
412
0
        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
413
0
    } else {
414
0
        DCHECK(!req.runtime_states.empty());
415
0
        if (!req.runtime_state->output_files().empty()) {
416
0
            params.__isset.delta_urls = true;
417
0
            for (auto& it : req.runtime_state->output_files()) {
418
0
                params.delta_urls.push_back(to_http_path(it));
419
0
            }
420
0
        }
421
0
        if (!params.delta_urls.empty()) {
422
0
            params.__isset.delta_urls = true;
423
0
        }
424
0
    }
425
426
    // load rows
427
0
    static std::string s_dpp_normal_all = "dpp.norm.ALL";
428
0
    static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
429
0
    static std::string s_unselected_rows = "unselected.rows";
430
0
    int64_t num_rows_load_success = 0;
431
0
    int64_t num_rows_load_filtered = 0;
432
0
    int64_t num_rows_load_unselected = 0;
433
0
    if (req.runtime_state->num_rows_load_total() > 0 ||
434
0
        req.runtime_state->num_rows_load_filtered() > 0 ||
435
0
        req.runtime_state->num_finished_range() > 0) {
436
0
        params.__isset.load_counters = true;
437
438
0
        num_rows_load_success = req.runtime_state->num_rows_load_success();
439
0
        num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
440
0
        num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
441
0
        params.__isset.fragment_instance_reports = true;
442
0
        TFragmentInstanceReport t;
443
0
        t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
444
0
        t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
445
0
        t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
446
0
        t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
447
0
        params.fragment_instance_reports.push_back(t);
448
0
    } else if (!req.runtime_states.empty()) {
449
0
        for (auto* rs : req.runtime_states) {
450
0
            if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
451
0
                rs->num_finished_range() > 0) {
452
0
                params.__isset.load_counters = true;
453
0
                num_rows_load_success += rs->num_rows_load_success();
454
0
                num_rows_load_filtered += rs->num_rows_load_filtered();
455
0
                num_rows_load_unselected += rs->num_rows_load_unselected();
456
0
                params.__isset.fragment_instance_reports = true;
457
0
                TFragmentInstanceReport t;
458
0
                t.__set_fragment_instance_id(rs->fragment_instance_id());
459
0
                t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
460
0
                t.__set_loaded_rows(rs->num_rows_load_total());
461
0
                t.__set_loaded_bytes(rs->num_bytes_load_total());
462
0
                params.fragment_instance_reports.push_back(t);
463
0
            }
464
0
        }
465
0
    }
466
0
    params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
467
0
    params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
468
0
    params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
469
470
0
    if (!req.load_error_url.empty()) {
471
0
        params.__set_tracking_url(req.load_error_url);
472
0
    }
473
0
    if (!req.first_error_msg.empty()) {
474
0
        params.__set_first_error_msg(req.first_error_msg);
475
0
    }
476
0
    for (auto* rs : req.runtime_states) {
477
0
        if (rs->wal_id() > 0) {
478
0
            params.__set_txn_id(rs->wal_id());
479
0
            params.__set_label(rs->import_label());
480
0
        }
481
0
    }
482
0
    if (!req.runtime_state->export_output_files().empty()) {
483
0
        params.__isset.export_files = true;
484
0
        params.export_files = req.runtime_state->export_output_files();
485
0
    } else if (!req.runtime_states.empty()) {
486
0
        for (auto* rs : req.runtime_states) {
487
0
            if (!rs->export_output_files().empty()) {
488
0
                params.__isset.export_files = true;
489
0
                params.export_files.insert(params.export_files.end(),
490
0
                                           rs->export_output_files().begin(),
491
0
                                           rs->export_output_files().end());
492
0
            }
493
0
        }
494
0
    }
495
0
    if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
496
0
        params.__isset.commitInfos = true;
497
0
        params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
498
0
    } else if (!req.runtime_states.empty()) {
499
0
        for (auto* rs : req.runtime_states) {
500
0
            if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
501
0
                params.__isset.commitInfos = true;
502
0
                params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
503
0
            }
504
0
        }
505
0
    }
506
0
    if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
507
0
        params.__isset.errorTabletInfos = true;
508
0
        params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
509
0
    } else if (!req.runtime_states.empty()) {
510
0
        for (auto* rs : req.runtime_states) {
511
0
            if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
512
0
                params.__isset.errorTabletInfos = true;
513
0
                params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
514
0
                                               rs_eti.end());
515
0
            }
516
0
        }
517
0
    }
518
0
    if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
519
0
        params.__isset.hive_partition_updates = true;
520
0
        params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
521
0
                                             hpu.end());
522
0
    } else if (!req.runtime_states.empty()) {
523
0
        for (auto* rs : req.runtime_states) {
524
0
            if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
525
0
                params.__isset.hive_partition_updates = true;
526
0
                params.hive_partition_updates.insert(params.hive_partition_updates.end(),
527
0
                                                     rs_hpu.begin(), rs_hpu.end());
528
0
            }
529
0
        }
530
0
    }
531
0
    if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
532
0
        params.__isset.iceberg_commit_datas = true;
533
0
        params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
534
0
                                           icd.end());
535
0
    } else if (!req.runtime_states.empty()) {
536
0
        for (auto* rs : req.runtime_states) {
537
0
            if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
538
0
                params.__isset.iceberg_commit_datas = true;
539
0
                params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
540
0
                                                   rs_icd.begin(), rs_icd.end());
541
0
            }
542
0
        }
543
0
    }
544
545
    // Send new errors to coordinator
546
0
    req.runtime_state->get_unreported_errors(&(params.error_log));
547
0
    params.__isset.error_log = (!params.error_log.empty());
548
549
0
    if (_exec_env->cluster_info()->backend_id != 0) {
550
0
        params.__set_backend_id(_exec_env->cluster_info()->backend_id);
551
0
    }
552
553
0
    TReportExecStatusResult res;
554
0
    Status rpc_status;
555
556
0
    VLOG_DEBUG << "reportExecStatus params is "
557
0
               << apache::thrift::ThriftDebugString(params).c_str();
558
0
    if (!exec_status.ok()) {
559
0
        LOG(WARNING) << "report error status: " << exec_status.msg()
560
0
                     << " to coordinator: " << req.coord_addr
561
0
                     << ", query id: " << print_id(req.query_id);
562
0
    }
563
0
    try {
564
0
        try {
565
0
            (*coord)->reportExecStatus(res, params);
566
0
        } catch ([[maybe_unused]] TTransportException& e) {
567
#ifndef ADDRESS_SANITIZER
568
            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
569
                         << ", instance id: " << print_id(req.fragment_instance_id) << " to "
570
                         << req.coord_addr << ", err: " << e.what();
571
#endif
572
0
            rpc_status = coord->reopen();
573
574
0
            if (!rpc_status.ok()) {
575
                // we need to cancel the execution of this fragment
576
0
                req.cancel_fn(rpc_status);
577
0
                return;
578
0
            }
579
0
            (*coord)->reportExecStatus(res, params);
580
0
        }
581
582
0
        rpc_status = Status::create<false>(res.status);
583
0
    } catch (TException& e) {
584
0
        rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
585
0
                                           PrintThriftNetworkAddress(req.coord_addr), e.what());
586
0
    }
587
588
0
    if (!rpc_status.ok()) {
589
0
        LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
590
0
                 print_id(req.query_id), rpc_status.to_string());
591
        // we need to cancel the execution of this fragment
592
0
        req.cancel_fn(rpc_status);
593
0
    }
594
0
}
595
596
0
static void empty_function(RuntimeState*, Status*) {}
597
598
Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
599
                                       const QuerySource query_source,
600
0
                                       const TPipelineFragmentParamsList& parent) {
601
0
    if (params.txn_conf.need_txn) {
602
0
        std::shared_ptr<StreamLoadContext> stream_load_ctx =
603
0
                std::make_shared<StreamLoadContext>(_exec_env);
604
0
        stream_load_ctx->db = params.txn_conf.db;
605
0
        stream_load_ctx->db_id = params.txn_conf.db_id;
606
0
        stream_load_ctx->table = params.txn_conf.tbl;
607
0
        stream_load_ctx->txn_id = params.txn_conf.txn_id;
608
0
        stream_load_ctx->id = UniqueId(params.query_id);
609
0
        stream_load_ctx->put_result.__set_pipeline_params(params);
610
0
        stream_load_ctx->use_streaming = true;
611
0
        stream_load_ctx->load_type = TLoadType::MANUL_LOAD;
612
0
        stream_load_ctx->load_src_type = TLoadSourceType::RAW;
613
0
        stream_load_ctx->label = params.import_label;
614
0
        stream_load_ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
615
0
        stream_load_ctx->timeout_second = 3600;
616
0
        stream_load_ctx->auth.token = params.txn_conf.token;
617
0
        stream_load_ctx->need_commit_self = true;
618
0
        stream_load_ctx->need_rollback = true;
619
0
        auto pipe = std::make_shared<io::StreamLoadPipe>(
620
0
                io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
621
0
                -1 /* total_length */, true /* use_proto */);
622
0
        stream_load_ctx->body_sink = pipe;
623
0
        stream_load_ctx->pipe = pipe;
624
0
        stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
625
626
0
        RETURN_IF_ERROR(
627
0
                _exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, stream_load_ctx));
628
629
0
        RETURN_IF_ERROR(
630
0
                _exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx, parent));
631
0
        return Status::OK();
632
0
    } else {
633
0
        return exec_plan_fragment(params, query_source, empty_function, parent);
634
0
    }
635
0
}
636
637
// Stage 2. prepare finished. then get FE instruction to execute
638
0
Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* request) {
639
0
    TUniqueId query_id;
640
0
    query_id.__set_hi(request->query_id().hi());
641
0
    query_id.__set_lo(request->query_id().lo());
642
0
    auto q_ctx = get_query_ctx(query_id);
643
0
    if (q_ctx) {
644
0
        q_ctx->set_ready_to_execute(Status::OK());
645
0
        LOG_INFO("Query {} start execution", print_id(query_id));
646
0
    } else {
647
0
        return Status::InternalError(
648
0
                "Failed to get query fragments context. Query {} may be "
649
0
                "timeout or be cancelled. host: {}",
650
0
                print_id(query_id), BackendOptions::get_localhost());
651
0
    }
652
0
    return Status::OK();
653
0
}
654
655
0
void FragmentMgr::remove_pipeline_context(std::pair<TUniqueId, int> key) {
656
0
    int64_t now = duration_cast<std::chrono::milliseconds>(
657
0
                          std::chrono::system_clock::now().time_since_epoch())
658
0
                          .count();
659
0
    g_fragment_executing_count << -1;
660
0
    g_fragment_last_active_time.set_value(now);
661
662
0
    _pipeline_map.erase(key);
663
0
}
664
665
122k
void FragmentMgr::remove_query_context(const TUniqueId& key) {
666
#ifndef BE_TEST
667
    _query_ctx_map.erase(key);
668
#endif
669
122k
}
670
671
1
std::shared_ptr<QueryContext> FragmentMgr::get_query_ctx(const TUniqueId& query_id) {
672
1
    auto val = _query_ctx_map.find(query_id);
673
1
    if (auto q_ctx = val.lock()) {
674
0
        return q_ctx;
675
0
    }
676
1
    return nullptr;
677
1
}
678
679
Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& params,
680
                                             const TPipelineFragmentParamsList& parent,
681
                                             QuerySource query_source,
682
0
                                             std::shared_ptr<QueryContext>& query_ctx) {
683
0
    auto query_id = params.query_id;
684
0
    DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed", {
685
0
        return Status::InternalError("FragmentMgr._get_query_ctx.failed, query id {}",
686
0
                                     print_id(query_id));
687
0
    });
688
689
    // Find _query_ctx_map, in case some other request has already
690
    // create the query fragments context.
691
0
    query_ctx = get_query_ctx(query_id);
692
0
    if (params.is_simplified_param) {
693
        // Get common components from _query_ctx_map
694
0
        if (!query_ctx) {
695
0
            return Status::InternalError(
696
0
                    "Failed to get query fragments context. Query {} may be timeout or be "
697
0
                    "cancelled. host: {}",
698
0
                    print_id(query_id), BackendOptions::get_localhost());
699
0
        }
700
0
    } else {
701
0
        if (!query_ctx) {
702
0
            RETURN_IF_ERROR(_query_ctx_map.apply_if_not_exists(
703
0
                    query_id, query_ctx,
704
0
                    [&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>& map)
705
0
                            -> Status {
706
0
                        WorkloadGroupPtr workload_group_ptr = nullptr;
707
0
                        std::vector<uint64_t> wg_id_set;
708
0
                        if (params.__isset.workload_groups && !params.workload_groups.empty()) {
709
0
                            for (auto& wg : params.workload_groups) {
710
0
                                wg_id_set.push_back(wg.id);
711
0
                            }
712
0
                        }
713
0
                        workload_group_ptr = _exec_env->workload_group_mgr()->get_group(wg_id_set);
714
715
                        // First time a fragment of a query arrived. print logs.
716
0
                        LOG(INFO) << "query_id: " << print_id(query_id)
717
0
                                  << ", coord_addr: " << params.coord
718
0
                                  << ", total fragment num on current host: "
719
0
                                  << params.fragment_num_on_host
720
0
                                  << ", fe process uuid: " << params.query_options.fe_process_uuid
721
0
                                  << ", query type: " << params.query_options.query_type
722
0
                                  << ", report audit fe:" << params.current_connect_fe
723
0
                                  << ", use wg:" << workload_group_ptr->id() << ","
724
0
                                  << workload_group_ptr->name();
725
726
                        // This may be a first fragment request of the query.
727
                        // Create the query fragments context.
728
0
                        query_ctx = QueryContext::create(query_id, _exec_env, params.query_options,
729
0
                                                         params.coord, params.is_nereids,
730
0
                                                         params.current_connect_fe, query_source);
731
0
                        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker());
732
0
                        RETURN_IF_ERROR(DescriptorTbl::create(
733
0
                                &(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl)));
734
                        // set file scan range params
735
0
                        if (params.__isset.file_scan_params) {
736
0
                            query_ctx->file_scan_range_params_map = params.file_scan_params;
737
0
                        }
738
739
0
                        query_ctx->query_globals = params.query_globals;
740
741
0
                        if (params.__isset.resource_info) {
742
0
                            query_ctx->user = params.resource_info.user;
743
0
                            query_ctx->group = params.resource_info.group;
744
0
                            query_ctx->set_rsc_info = true;
745
0
                        }
746
747
0
                        if (params.__isset.ai_resources) {
748
0
                            query_ctx->set_ai_resources(params.ai_resources);
749
0
                        }
750
751
0
                        RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
752
753
0
                        if (parent.__isset.runtime_filter_info) {
754
0
                            auto info = parent.runtime_filter_info;
755
0
                            if (info.__isset.runtime_filter_params) {
756
0
                                auto handler =
757
0
                                        std::make_shared<RuntimeFilterMergeControllerEntity>();
758
0
                                RETURN_IF_ERROR(
759
0
                                        handler->init(query_ctx, info.runtime_filter_params));
760
0
                                query_ctx->set_merge_controller_handler(handler);
761
762
0
                                query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
763
0
                                        info.runtime_filter_params);
764
0
                            }
765
0
                            if (info.__isset.topn_filter_descs) {
766
0
                                query_ctx->init_runtime_predicates(info.topn_filter_descs);
767
0
                            }
768
0
                        }
769
770
                        // There is some logic in query ctx's dctor, we could not check if exists and delete the
771
                        // temp query ctx now. For example, the query id maybe removed from workload group's queryset.
772
0
                        map.insert({query_id, query_ctx});
773
0
                        return Status::OK();
774
0
                    }));
775
0
        }
776
0
    }
777
0
    return Status::OK();
778
0
}
779
780
0
std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
781
0
    fmt::memory_buffer debug_string_buffer;
782
0
    size_t i = 0;
783
0
    {
784
0
        fmt::format_to(debug_string_buffer,
785
0
                       "{} pipeline fragment contexts are still running! duration_limit={}\n",
786
0
                       _pipeline_map.num_items(), duration);
787
0
        timespec now;
788
0
        clock_gettime(CLOCK_MONOTONIC, &now);
789
790
0
        _pipeline_map.apply([&](phmap::flat_hash_map<
791
0
                                    std::pair<TUniqueId, int>,
792
0
                                    std::shared_ptr<pipeline::PipelineFragmentContext>>& map)
793
0
                                    -> Status {
794
0
            std::set<TUniqueId> query_id_set;
795
0
            for (auto& it : map) {
796
0
                auto elapsed = it.second->elapsed_time() / 1000000000;
797
0
                if (elapsed < duration) {
798
                    // Only display tasks which has been running for more than {duration} seconds.
799
0
                    continue;
800
0
                }
801
0
                if (!query_id_set.contains(it.first.first)) {
802
0
                    query_id_set.insert(it.first.first);
803
0
                    fmt::format_to(
804
0
                            debug_string_buffer, "QueryId: {}, global_runtime_filter_mgr: {}\n",
805
0
                            print_id(it.first.first),
806
0
                            it.second->get_query_ctx()->runtime_filter_mgr()->debug_string());
807
808
0
                    if (it.second->get_query_ctx()->get_merge_controller_handler()) {
809
0
                        fmt::format_to(debug_string_buffer, "{}\n",
810
0
                                       it.second->get_query_ctx()
811
0
                                               ->get_merge_controller_handler()
812
0
                                               ->debug_string());
813
0
                    }
814
0
                }
815
816
0
                auto timeout_second = it.second->timeout_second();
817
0
                fmt::format_to(
818
0
                        debug_string_buffer,
819
0
                        "No.{} (elapse_second={}s, query_timeout_second={}s, is_timeout={}): {}\n",
820
0
                        i, elapsed, timeout_second, it.second->is_timeout(now),
821
0
                        it.second->debug_string());
822
0
                i++;
823
0
            }
824
0
            return Status::OK();
825
0
        });
826
0
    }
827
0
    return fmt::to_string(debug_string_buffer);
828
0
}
829
830
0
std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) {
831
0
    if (auto q_ctx = get_query_ctx(query_id)) {
832
0
        return q_ctx->print_all_pipeline_context();
833
0
    } else {
834
0
        return fmt::format(
835
0
                "Dump pipeline tasks failed: Query context (query id = {}) not found. \n",
836
0
                print_id(query_id));
837
0
    }
838
0
}
839
840
Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
841
                                       QuerySource query_source, const FinishCallback& cb,
842
0
                                       const TPipelineFragmentParamsList& parent) {
843
0
    VLOG_ROW << "Query: " << print_id(params.query_id) << " exec_plan_fragment params is "
844
0
             << apache::thrift::ThriftDebugString(params).c_str();
845
    // sometimes TPipelineFragmentParams debug string is too long and glog
846
    // will truncate the log line, so print query options seperately for debuggin purpose
847
0
    VLOG_ROW << "Query: " << print_id(params.query_id) << "query options is "
848
0
             << apache::thrift::ThriftDebugString(params.query_options).c_str();
849
850
0
    std::shared_ptr<QueryContext> query_ctx;
851
0
    RETURN_IF_ERROR(_get_or_create_query_ctx(params, parent, query_source, query_ctx));
852
0
    SCOPED_ATTACH_TASK(query_ctx.get()->resource_ctx());
853
0
    int64_t duration_ns = 0;
854
0
    std::shared_ptr<pipeline::PipelineFragmentContext> context =
855
0
            std::make_shared<pipeline::PipelineFragmentContext>(
856
0
                    query_ctx->query_id(), params, query_ctx, _exec_env, cb,
857
0
                    [this](const ReportStatusRequest& req, auto&& ctx) {
858
0
                        return this->trigger_pipeline_context_report(req, std::move(ctx));
859
0
                    });
860
0
    {
861
0
        SCOPED_RAW_TIMER(&duration_ns);
862
0
        Status prepare_st = Status::OK();
863
0
        ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(_thread_pool.get()),
864
0
                                         prepare_st);
865
0
        DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.prepare_failed", {
866
0
            prepare_st = Status::Aborted("FragmentMgr.exec_plan_fragment.prepare_failed");
867
0
        });
868
0
        if (!prepare_st.ok()) {
869
0
            query_ctx->cancel(prepare_st, params.fragment_id);
870
0
            return prepare_st;
871
0
        }
872
0
    }
873
0
    g_fragmentmgr_prepare_latency << (duration_ns / 1000);
874
875
0
    DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed",
876
0
                    { return Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); });
877
0
    {
878
0
        int64_t now = duration_cast<std::chrono::milliseconds>(
879
0
                              std::chrono::system_clock::now().time_since_epoch())
880
0
                              .count();
881
0
        g_fragment_executing_count << 1;
882
0
        g_fragment_last_active_time.set_value(now);
883
884
        // (query_id, fragment_id) is executed only on one BE, locks _pipeline_map.
885
0
        auto res = _pipeline_map.find({params.query_id, params.fragment_id});
886
0
        if (res != nullptr) {
887
0
            return Status::InternalError(
888
0
                    "exec_plan_fragment query_id({}) input duplicated fragment_id({})",
889
0
                    print_id(params.query_id), params.fragment_id);
890
0
        }
891
0
        _pipeline_map.insert({params.query_id, params.fragment_id}, context);
892
0
    }
893
894
0
    if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) {
895
0
        query_ctx->set_ready_to_execute_only();
896
0
    }
897
898
0
    query_ctx->set_pipeline_context(params.fragment_id, context);
899
900
0
    RETURN_IF_ERROR(context->submit());
901
0
    return Status::OK();
902
0
}
903
904
1
void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
905
1
    std::shared_ptr<QueryContext> query_ctx = nullptr;
906
1
    {
907
1
        if (auto q_ctx = get_query_ctx(query_id)) {
908
0
            query_ctx = q_ctx;
909
1
        } else {
910
1
            LOG(WARNING) << "Query " << print_id(query_id)
911
1
                         << " does not exists, failed to cancel it";
912
1
            return;
913
1
        }
914
1
    }
915
0
    query_ctx->cancel(reason);
916
0
    remove_query_context(query_id);
917
0
    LOG(INFO) << "Query " << print_id(query_id)
918
0
              << " is cancelled and removed. Reason: " << reason.to_string();
919
0
}
920
921
7
void FragmentMgr::cancel_worker() {
922
7
    LOG(INFO) << "FragmentMgr cancel worker start working.";
923
924
7
    timespec check_invalid_query_last_timestamp;
925
7
    clock_gettime(CLOCK_MONOTONIC, &check_invalid_query_last_timestamp);
926
927
12
    do {
928
12
        std::vector<TUniqueId> queries_lost_coordinator;
929
12
        std::vector<TUniqueId> queries_timeout;
930
12
        std::vector<TUniqueId> queries_pipeline_task_leak;
931
        // Fe process uuid -> set<QueryId>
932
12
        std::map<int64_t, std::unordered_set<TUniqueId>> running_queries_on_all_fes;
933
12
        const std::map<TNetworkAddress, FrontendInfo>& running_fes =
934
12
                ExecEnv::GetInstance()->get_running_frontends();
935
936
12
        timespec now;
937
12
        clock_gettime(CLOCK_MONOTONIC, &now);
938
939
12
        if (config::enable_pipeline_task_leakage_detect &&
940
12
            now.tv_sec - check_invalid_query_last_timestamp.tv_sec >
941
0
                    config::pipeline_task_leakage_detect_period_secs) {
942
0
            check_invalid_query_last_timestamp = now;
943
0
            running_queries_on_all_fes = _get_all_running_queries_from_fe();
944
12
        } else {
945
12
            running_queries_on_all_fes.clear();
946
12
        }
947
948
12
        std::vector<std::shared_ptr<pipeline::PipelineFragmentContext>> ctx;
949
12
        _pipeline_map.apply(
950
12
                [&](phmap::flat_hash_map<std::pair<TUniqueId, int>,
951
12
                                         std::shared_ptr<pipeline::PipelineFragmentContext>>& map)
952
1.53k
                        -> Status {
953
1.53k
                    ctx.reserve(ctx.size() + map.size());
954
1.53k
                    for (auto& pipeline_itr : map) {
955
0
                        ctx.push_back(pipeline_itr.second);
956
0
                    }
957
1.53k
                    return Status::OK();
958
1.53k
                });
959
12
        for (auto& c : ctx) {
960
0
            c->clear_finished_tasks();
961
0
        }
962
963
12
        std::unordered_map<std::shared_ptr<PBackendService_Stub>, BrpcItem> brpc_stub_with_queries;
964
12
        {
965
12
            std::vector<std::shared_ptr<QueryContext>> contexts;
966
12
            _query_ctx_map.apply([&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>&
967
1.53k
                                             map) -> Status {
968
1.53k
                for (auto it = map.begin(); it != map.end();) {
969
0
                    if (auto q_ctx = it->second.lock()) {
970
0
                        contexts.push_back(q_ctx);
971
0
                        if (q_ctx->is_timeout(now)) {
972
0
                            LOG_WARNING("Query {} is timeout", print_id(it->first));
973
0
                            queries_timeout.push_back(it->first);
974
0
                        } else if (config::enable_brpc_connection_check) {
975
0
                            auto brpc_stubs = q_ctx->get_using_brpc_stubs();
976
0
                            for (auto& item : brpc_stubs) {
977
0
                                if (!brpc_stub_with_queries.contains(item.second)) {
978
0
                                    brpc_stub_with_queries.emplace(item.second,
979
0
                                                                   BrpcItem {item.first, {q_ctx}});
980
0
                                } else {
981
0
                                    brpc_stub_with_queries[item.second].queries.emplace_back(q_ctx);
982
0
                                }
983
0
                            }
984
0
                        }
985
0
                        ++it;
986
0
                    } else {
987
0
                        it = map.erase(it);
988
0
                    }
989
0
                }
990
1.53k
                return Status::OK();
991
1.53k
            });
992
12
            std::vector<std::shared_ptr<QueryContext>> {}.swap(contexts);
993
994
            // We use a very conservative cancel strategy.
995
            // 0. If there are no running frontends, do not cancel any queries.
996
            // 1. If query's process uuid is zero, do not cancel
997
            // 2. If same process uuid, do not cancel
998
            // 3. If fe has zero process uuid, do not cancel
999
12
            if (running_fes.empty() && _query_ctx_map.num_items() != 0) {
1000
0
                LOG_EVERY_N(WARNING, 10)
1001
0
                        << "Could not find any running frontends, maybe we are upgrading or "
1002
0
                           "starting? "
1003
0
                        << "We will not cancel any outdated queries in this situation.";
1004
12
            } else {
1005
12
                std::vector<std::shared_ptr<QueryContext>> q_contexts;
1006
12
                _query_ctx_map.apply([&](phmap::flat_hash_map<TUniqueId,
1007
12
                                                              std::weak_ptr<QueryContext>>& map)
1008
1.53k
                                             -> Status {
1009
1.53k
                    for (const auto& it : map) {
1010
0
                        if (auto q_ctx = it.second.lock()) {
1011
0
                            q_contexts.push_back(q_ctx);
1012
0
                            const int64_t fe_process_uuid = q_ctx->get_fe_process_uuid();
1013
1014
0
                            if (fe_process_uuid == 0) {
1015
                                // zero means this query is from a older version fe or
1016
                                // this fe is starting
1017
0
                                continue;
1018
0
                            }
1019
1020
                            // If the query is not running on the any frontends, cancel it.
1021
0
                            if (auto itr = running_queries_on_all_fes.find(fe_process_uuid);
1022
0
                                itr != running_queries_on_all_fes.end()) {
1023
                                // Query not found on this frontend, and the query arrives before the last check
1024
0
                                if (itr->second.find(it.first) == itr->second.end() &&
1025
                                    // tv_nsec represents the number of nanoseconds that have elapsed since the time point stored in tv_sec.
1026
                                    // tv_sec is enough, we do not need to check tv_nsec.
1027
0
                                    q_ctx->get_query_arrival_timestamp().tv_sec <
1028
0
                                            check_invalid_query_last_timestamp.tv_sec &&
1029
0
                                    q_ctx->get_query_source() == QuerySource::INTERNAL_FRONTEND) {
1030
0
                                    queries_pipeline_task_leak.push_back(q_ctx->query_id());
1031
0
                                    LOG_INFO(
1032
0
                                            "Query {}, type {} is not found on any frontends, "
1033
0
                                            "maybe it "
1034
0
                                            "is leaked.",
1035
0
                                            print_id(q_ctx->query_id()),
1036
0
                                            toString(q_ctx->get_query_source()));
1037
0
                                    continue;
1038
0
                                }
1039
0
                            }
1040
1041
0
                            auto itr = running_fes.find(q_ctx->coord_addr);
1042
0
                            if (itr != running_fes.end()) {
1043
0
                                if (fe_process_uuid == itr->second.info.process_uuid ||
1044
0
                                    itr->second.info.process_uuid == 0) {
1045
0
                                    continue;
1046
0
                                } else {
1047
0
                                    LOG_WARNING(
1048
0
                                            "Coordinator of query {} restarted, going to cancel "
1049
0
                                            "it.",
1050
0
                                            print_id(q_ctx->query_id()));
1051
0
                                }
1052
0
                            } else {
1053
                                // In some rear cases, the rpc port of follower is not updated in time,
1054
                                // then the port of this follower will be zero, but acutally it is still running,
1055
                                // and be has already received the query from follower.
1056
                                // So we need to check if host is in running_fes.
1057
0
                                bool fe_host_is_standing =
1058
0
                                        std::any_of(running_fes.begin(), running_fes.end(),
1059
0
                                                    [&q_ctx](const auto& fe) {
1060
0
                                                        return fe.first.hostname ==
1061
0
                                                                       q_ctx->coord_addr.hostname &&
1062
0
                                                               fe.first.port == 0;
1063
0
                                                    });
1064
0
                                if (fe_host_is_standing) {
1065
0
                                    LOG_WARNING(
1066
0
                                            "Coordinator {}:{} is not found, but its host is still "
1067
0
                                            "running with an unstable brpc port, not going to "
1068
0
                                            "cancel "
1069
0
                                            "it.",
1070
0
                                            q_ctx->coord_addr.hostname, q_ctx->coord_addr.port,
1071
0
                                            print_id(q_ctx->query_id()));
1072
0
                                    continue;
1073
0
                                } else {
1074
0
                                    LOG_WARNING(
1075
0
                                            "Could not find target coordinator {}:{} of query {}, "
1076
0
                                            "going to "
1077
0
                                            "cancel it.",
1078
0
                                            q_ctx->coord_addr.hostname, q_ctx->coord_addr.port,
1079
0
                                            print_id(q_ctx->query_id()));
1080
0
                                }
1081
0
                            }
1082
0
                        }
1083
                        // Coordinator of this query has already dead or query context has been released.
1084
0
                        queries_lost_coordinator.push_back(it.first);
1085
0
                    }
1086
1.53k
                    return Status::OK();
1087
1.53k
                });
1088
12
            }
1089
12
        }
1090
1091
12
        if (config::enable_brpc_connection_check) {
1092
0
            for (auto it : brpc_stub_with_queries) {
1093
0
                if (!it.first) {
1094
0
                    LOG(WARNING) << "brpc stub is nullptr, skip it.";
1095
0
                    continue;
1096
0
                }
1097
0
                _check_brpc_available(it.first, it.second);
1098
0
            }
1099
0
        }
1100
1101
12
        if (!queries_lost_coordinator.empty()) {
1102
0
            LOG(INFO) << "There are " << queries_lost_coordinator.size()
1103
0
                      << " queries need to be cancelled, coordinator dead or restarted.";
1104
0
        }
1105
1106
12
        for (const auto& qid : queries_timeout) {
1107
0
            cancel_query(qid,
1108
0
                         Status::Error<ErrorCode::TIMEOUT>(
1109
0
                                 "FragmentMgr cancel worker going to cancel timeout instance "));
1110
0
        }
1111
1112
12
        for (const auto& qid : queries_pipeline_task_leak) {
1113
            // Cancel the query, and maybe try to report debug info to fe so that we can
1114
            // collect debug info by sql or http api instead of search log.
1115
0
            cancel_query(qid, Status::Error<ErrorCode::ILLEGAL_STATE>(
1116
0
                                      "Potential pipeline task leakage"));
1117
0
        }
1118
1119
12
        for (const auto& qid : queries_lost_coordinator) {
1120
0
            cancel_query(qid, Status::Error<ErrorCode::CANCELLED>(
1121
0
                                      "Source frontend is not running or restarted"));
1122
0
        }
1123
1124
12
    } while (!_stop_background_threads_latch.wait_for(
1125
12
            std::chrono::seconds(config::fragment_mgr_cancel_worker_interval_seconds)));
1126
7
    LOG(INFO) << "FragmentMgr cancel worker is going to exit.";
1127
7
}
1128
1129
void FragmentMgr::_check_brpc_available(const std::shared_ptr<PBackendService_Stub>& brpc_stub,
1130
0
                                        const BrpcItem& brpc_item) {
1131
0
    const std::string message = "hello doris!";
1132
0
    std::string error_message;
1133
0
    int32_t failed_count = 0;
1134
0
    const int64_t check_timeout_ms =
1135
0
            std::max<int64_t>(100, config::brpc_connection_check_timeout_ms);
1136
1137
0
    while (true) {
1138
0
        PHandShakeRequest request;
1139
0
        request.set_hello(message);
1140
0
        PHandShakeResponse response;
1141
0
        brpc::Controller cntl;
1142
0
        cntl.set_timeout_ms(check_timeout_ms);
1143
0
        cntl.set_max_retry(10);
1144
0
        brpc_stub->hand_shake(&cntl, &request, &response, nullptr);
1145
1146
0
        if (cntl.Failed()) {
1147
0
            error_message = cntl.ErrorText();
1148
0
            LOG(WARNING) << "brpc stub: " << brpc_item.network_address.hostname << ":"
1149
0
                         << brpc_item.network_address.port << " check failed: " << error_message;
1150
0
        } else if (response.has_status() && response.status().status_code() == 0) {
1151
0
            break;
1152
0
        } else {
1153
0
            error_message = response.DebugString();
1154
0
            LOG(WARNING) << "brpc stub: " << brpc_item.network_address.hostname << ":"
1155
0
                         << brpc_item.network_address.port << " check failed: " << error_message;
1156
0
        }
1157
0
        failed_count++;
1158
0
        if (failed_count == 2) {
1159
0
            for (const auto& query_wptr : brpc_item.queries) {
1160
0
                auto query = query_wptr.lock();
1161
0
                if (query && !query->is_cancelled()) {
1162
0
                    query->cancel(Status::InternalError("brpc(dest: {}:{}) check failed: {}",
1163
0
                                                        brpc_item.network_address.hostname,
1164
0
                                                        brpc_item.network_address.port,
1165
0
                                                        error_message));
1166
0
                }
1167
0
            }
1168
1169
0
            LOG(WARNING) << "remove brpc stub from cache: " << brpc_item.network_address.hostname
1170
0
                         << ":" << brpc_item.network_address.port << ", error: " << error_message;
1171
0
            ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
1172
0
                    brpc_item.network_address.hostname, brpc_item.network_address.port);
1173
0
            break;
1174
0
        }
1175
0
    }
1176
0
}
1177
1178
0
void FragmentMgr::debug(std::stringstream& ss) {}
1179
/*
1180
 * 1. resolve opaqued_query_plan to thrift structure
1181
 * 2. build TPipelineFragmentParams
1182
 */
1183
Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
1184
                                                const TQueryPlanInfo& t_query_plan_info,
1185
                                                const TUniqueId& query_id,
1186
                                                const TUniqueId& fragment_instance_id,
1187
0
                                                std::vector<TScanColumnDesc>* selected_columns) {
1188
    // set up desc tbl
1189
0
    DescriptorTbl* desc_tbl = nullptr;
1190
0
    ObjectPool obj_pool;
1191
0
    Status st = DescriptorTbl::create(&obj_pool, t_query_plan_info.desc_tbl, &desc_tbl);
1192
0
    if (!st.ok()) {
1193
0
        LOG(WARNING) << "open context error: extract DescriptorTbl failure";
1194
0
        std::stringstream msg;
1195
0
        msg << " create DescriptorTbl error, should not be modified after returned Doris FE "
1196
0
               "processed";
1197
0
        return Status::InvalidArgument(msg.str());
1198
0
    }
1199
0
    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
1200
0
    if (tuple_desc == nullptr) {
1201
0
        LOG(WARNING) << "open context error: extract TupleDescriptor failure";
1202
0
        std::stringstream msg;
1203
0
        msg << " get  TupleDescriptor error, should not be modified after returned Doris FE "
1204
0
               "processed";
1205
0
        return Status::InvalidArgument(msg.str());
1206
0
    }
1207
    // process selected columns form slots
1208
0
    for (const SlotDescriptor* slot : tuple_desc->slots()) {
1209
0
        TScanColumnDesc col;
1210
0
        col.__set_name(slot->col_name());
1211
0
        col.__set_type(to_thrift(slot->type()->get_primitive_type()));
1212
0
        selected_columns->emplace_back(std::move(col));
1213
0
    }
1214
1215
0
    VLOG_QUERY << "BackendService execute open()  TQueryPlanInfo: "
1216
0
               << apache::thrift::ThriftDebugString(t_query_plan_info);
1217
    // assign the param used to execute PlanFragment
1218
0
    TPipelineFragmentParams exec_fragment_params;
1219
0
    exec_fragment_params.protocol_version = (PaloInternalServiceVersion::type)0;
1220
0
    exec_fragment_params.__set_is_simplified_param(false);
1221
0
    exec_fragment_params.__set_fragment(t_query_plan_info.plan_fragment);
1222
0
    exec_fragment_params.__set_desc_tbl(t_query_plan_info.desc_tbl);
1223
1224
    // assign the param used for executing of PlanFragment-self
1225
0
    TPipelineInstanceParams fragment_exec_params;
1226
0
    exec_fragment_params.query_id = query_id;
1227
0
    fragment_exec_params.fragment_instance_id = fragment_instance_id;
1228
0
    exec_fragment_params.coord.hostname = "external";
1229
0
    std::map<::doris::TPlanNodeId, std::vector<TScanRangeParams>> per_node_scan_ranges;
1230
0
    std::vector<TScanRangeParams> scan_ranges;
1231
0
    std::vector<int64_t> tablet_ids = params.tablet_ids;
1232
0
    TNetworkAddress address;
1233
0
    address.hostname = BackendOptions::get_localhost();
1234
0
    address.port = doris::config::be_port;
1235
0
    std::map<int64_t, TTabletVersionInfo> tablet_info = t_query_plan_info.tablet_info;
1236
0
    for (auto tablet_id : params.tablet_ids) {
1237
0
        TPaloScanRange scan_range;
1238
0
        scan_range.db_name = params.database;
1239
0
        scan_range.table_name = params.table;
1240
0
        auto iter = tablet_info.find(tablet_id);
1241
0
        if (iter != tablet_info.end()) {
1242
0
            TTabletVersionInfo info = iter->second;
1243
0
            scan_range.tablet_id = tablet_id;
1244
0
            scan_range.version = std::to_string(info.version);
1245
            // Useless but it is required field in TPaloScanRange
1246
0
            scan_range.version_hash = "0";
1247
0
            scan_range.schema_hash = std::to_string(info.schema_hash);
1248
0
            scan_range.hosts.push_back(address);
1249
0
        } else {
1250
0
            std::stringstream msg;
1251
0
            msg << "tablet_id: " << tablet_id << " not found";
1252
0
            LOG(WARNING) << "tablet_id [ " << tablet_id << " ] not found";
1253
0
            return Status::NotFound(msg.str());
1254
0
        }
1255
0
        TScanRange doris_scan_range;
1256
0
        doris_scan_range.__set_palo_scan_range(scan_range);
1257
0
        TScanRangeParams scan_range_params;
1258
0
        scan_range_params.scan_range = doris_scan_range;
1259
0
        scan_ranges.push_back(scan_range_params);
1260
0
    }
1261
0
    per_node_scan_ranges.insert(std::make_pair((::doris::TPlanNodeId)0, scan_ranges));
1262
0
    fragment_exec_params.per_node_scan_ranges = per_node_scan_ranges;
1263
0
    exec_fragment_params.local_params.push_back(fragment_exec_params);
1264
0
    TQueryOptions query_options;
1265
0
    query_options.batch_size = params.batch_size;
1266
0
    query_options.execution_timeout = params.execution_timeout;
1267
0
    query_options.mem_limit = params.mem_limit;
1268
0
    query_options.query_type = TQueryType::EXTERNAL;
1269
0
    query_options.be_exec_version = BeExecVersionManager::get_newest_version();
1270
0
    exec_fragment_params.__set_query_options(query_options);
1271
0
    VLOG_ROW << "external exec_plan_fragment params is "
1272
0
             << apache::thrift::ThriftDebugString(exec_fragment_params).c_str();
1273
1274
0
    TPipelineFragmentParamsList mocked;
1275
0
    return exec_plan_fragment(exec_fragment_params, QuerySource::EXTERNAL_CONNECTOR, mocked);
1276
0
}
1277
1278
Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
1279
0
                                   butil::IOBufAsZeroCopyInputStream* attach_data) {
1280
0
    UniqueId queryid = request->query_id();
1281
0
    TUniqueId query_id;
1282
0
    query_id.__set_hi(queryid.hi);
1283
0
    query_id.__set_lo(queryid.lo);
1284
0
    if (auto q_ctx = get_query_ctx(query_id)) {
1285
0
        SCOPED_ATTACH_TASK(q_ctx.get());
1286
0
        RuntimeFilterMgr* runtime_filter_mgr = q_ctx->runtime_filter_mgr();
1287
0
        DCHECK(runtime_filter_mgr != nullptr);
1288
1289
        // 1. get the target filters
1290
0
        std::vector<std::shared_ptr<RuntimeFilterConsumer>> filters =
1291
0
                runtime_filter_mgr->get_consume_filters(request->filter_id());
1292
1293
        // 2. create the filter wrapper to replace or ignore/disable the target filters
1294
0
        if (!filters.empty()) {
1295
0
            RETURN_IF_ERROR(filters[0]->assign(*request, attach_data));
1296
0
            std::ranges::for_each(filters, [&](auto& filter) { filter->signal(filters[0].get()); });
1297
0
        }
1298
0
    }
1299
0
    return Status::OK();
1300
0
}
1301
1302
0
Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
1303
0
    UniqueId queryid = request->query_id();
1304
0
    TUniqueId query_id;
1305
0
    query_id.__set_hi(queryid.hi);
1306
0
    query_id.__set_lo(queryid.lo);
1307
1308
0
    if (config::enable_debug_points &&
1309
0
        DebugPoints::instance()->is_enable("FragmentMgr::send_filter_size.return_eof")) {
1310
0
        return Status::EndOfFile("inject FragmentMgr::send_filter_size.return_eof");
1311
0
    }
1312
1313
0
    if (auto q_ctx = get_query_ctx(query_id)) {
1314
0
        return q_ctx->get_merge_controller_handler()->send_filter_size(q_ctx, request);
1315
0
    } else {
1316
0
        return Status::EndOfFile(
1317
0
                "Send filter size failed: Query context (query-id: {}) not found, maybe "
1318
0
                "finished",
1319
0
                queryid.to_string());
1320
0
    }
1321
0
}
1322
1323
0
Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
1324
0
    UniqueId queryid = request->query_id();
1325
0
    TUniqueId query_id;
1326
0
    query_id.__set_hi(queryid.hi);
1327
0
    query_id.__set_lo(queryid.lo);
1328
0
    if (auto q_ctx = get_query_ctx(query_id)) {
1329
0
        try {
1330
0
            return q_ctx->runtime_filter_mgr()->sync_filter_size(request);
1331
0
        } catch (const Exception& e) {
1332
0
            return Status::InternalError(
1333
0
                    "Sync filter size failed: Query context (query-id: {}) error: {}",
1334
0
                    queryid.to_string(), e.what());
1335
0
        }
1336
0
    } else {
1337
0
        return Status::EndOfFile(
1338
0
                "Sync filter size failed: Query context (query-id: {}) already finished",
1339
0
                queryid.to_string());
1340
0
    }
1341
0
}
1342
1343
Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
1344
0
                                 butil::IOBufAsZeroCopyInputStream* attach_data) {
1345
0
    UniqueId queryid = request->query_id();
1346
1347
0
    TUniqueId query_id;
1348
0
    query_id.__set_hi(queryid.hi);
1349
0
    query_id.__set_lo(queryid.lo);
1350
0
    if (auto q_ctx = get_query_ctx(query_id)) {
1351
0
        SCOPED_ATTACH_TASK(q_ctx.get());
1352
0
        if (!q_ctx->get_merge_controller_handler()) {
1353
0
            return Status::InternalError("Merge filter failed: Merge controller handler is null");
1354
0
        }
1355
0
        return q_ctx->get_merge_controller_handler()->merge(q_ctx, request, attach_data);
1356
0
    } else {
1357
0
        return Status::EndOfFile(
1358
0
                "Merge filter size failed: Query context (query-id: {}) already finished",
1359
0
                queryid.to_string());
1360
0
    }
1361
0
}
1362
1363
void FragmentMgr::get_runtime_query_info(
1364
0
        std::vector<std::weak_ptr<ResourceContext>>* _resource_ctx_list) {
1365
0
    std::vector<std::shared_ptr<QueryContext>> contexts;
1366
0
    _query_ctx_map.apply(
1367
0
            [&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>& map) -> Status {
1368
0
                for (auto iter = map.begin(); iter != map.end();) {
1369
0
                    if (auto q_ctx = iter->second.lock()) {
1370
0
                        _resource_ctx_list->push_back(q_ctx->resource_ctx());
1371
0
                        contexts.push_back(q_ctx);
1372
0
                        iter++;
1373
0
                    } else {
1374
0
                        iter = map.erase(iter);
1375
0
                    }
1376
0
                }
1377
0
                return Status::OK();
1378
0
            });
1379
0
}
1380
1381
Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id,
1382
0
                                             TReportExecStatusParams* exec_status) {
1383
0
    if (exec_status == nullptr) {
1384
0
        return Status::InvalidArgument("exes_status is nullptr");
1385
0
    }
1386
1387
0
    std::shared_ptr<QueryContext> query_context = get_query_ctx(query_id);
1388
0
    if (query_context == nullptr) {
1389
0
        return Status::NotFound("Query {} not found or released", print_id(query_id));
1390
0
    }
1391
1392
0
    *exec_status = query_context->get_realtime_exec_status();
1393
1394
0
    return Status::OK();
1395
0
}
1396
1397
0
Status FragmentMgr::get_query_statistics(const TUniqueId& query_id, TQueryStatistics* query_stats) {
1398
0
    if (query_stats == nullptr) {
1399
0
        return Status::InvalidArgument("query_stats is nullptr");
1400
0
    }
1401
1402
0
    return ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_query_statistics(
1403
0
            print_id(query_id), query_stats);
1404
0
}
1405
1406
#include "common/compile_check_end.h"
1407
1408
} // namespace doris