Coverage Report

Created: 2026-07-02 10:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/internal_service.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/internal_service.h"
19
20
#include <assert.h>
21
#include <brpc/closure_guard.h>
22
#include <brpc/controller.h>
23
#include <bthread/bthread.h>
24
#include <bthread/types.h>
25
#include <butil/errno.h>
26
#include <butil/iobuf.h>
27
#include <fcntl.h>
28
#include <fmt/core.h>
29
#include <gen_cpp/DataSinks_types.h>
30
#include <gen_cpp/MasterService_types.h>
31
#include <gen_cpp/PaloInternalService_types.h>
32
#include <gen_cpp/PlanNodes_types.h>
33
#include <gen_cpp/Status_types.h>
34
#include <gen_cpp/Types_types.h>
35
#include <gen_cpp/internal_service.pb.h>
36
#include <gen_cpp/olap_file.pb.h>
37
#include <gen_cpp/segment_v2.pb.h>
38
#include <gen_cpp/types.pb.h>
39
#include <google/protobuf/stubs/callback.h>
40
#include <stddef.h>
41
#include <stdint.h>
42
#include <sys/stat.h>
43
44
#include <algorithm>
45
#include <exception>
46
#include <filesystem>
47
#include <memory>
48
#include <set>
49
#include <sstream>
50
#include <string>
51
#include <utility>
52
#include <vector>
53
54
#include "cloud/cloud_storage_engine.h"
55
#include "cloud/cloud_tablet_mgr.h"
56
#include "cloud/config.h"
57
#include "common/config.h"
58
#include "common/exception.h"
59
#include "common/logging.h"
60
#include "common/metrics/doris_metrics.h"
61
#include "common/metrics/metrics.h"
62
#include "common/signal_handler.h"
63
#include "common/status.h"
64
#include "core/block/block.h"
65
#include "core/data_type/data_type.h"
66
#include "exec/common/variant_util.h"
67
#include "exec/exchange/vdata_stream_mgr.h"
68
#include "exec/rowid_fetcher.h"
69
#include "exec/runtime_filter/runtime_filter_mgr.h"
70
#include "exec/sink/writer/varrow_flight_result_writer.h"
71
#include "exec/sink/writer/vmysql_result_writer.h"
72
#include "exprs/function/dictionary_factory.h"
73
#include "format/arrow/arrow_row_batch.h"
74
#include "format/csv/csv_reader.h"
75
#include "format/generic_reader.h"
76
#include "format/jni/jni_reader.h"
77
#include "format/json/new_json_reader.h"
78
#include "format/native/native_reader.h"
79
#include "format/orc/vorc_reader.h"
80
#include "format/parquet/vparquet_reader.h"
81
#include "format/text/text_reader.h"
82
#ifdef BUILD_RUST_READERS
83
#include "format/lance/lance_rust_reader.h"
84
#endif
85
#include "io/fs/local_file_system.h"
86
#include "io/fs/stream_load_pipe.h"
87
#include "io/io_common.h"
88
#include "load/channel/load_channel_mgr.h"
89
#include "load/channel/load_stream_mgr.h"
90
#include "load/delta_writer/delta_writer.h"
91
#include "load/group_commit/wal/wal_manager.h"
92
#include "load/routine_load/routine_load_task_executor.h"
93
#include "load/stream_load/new_load_stream_mgr.h"
94
#include "load/stream_load/stream_load_context.h"
95
#include "runtime/cache/result_cache.h"
96
#include "runtime/cdc_client_mgr.h"
97
#include "runtime/descriptors.h"
98
#include "runtime/exec_env.h"
99
#include "runtime/fold_constant_executor.h"
100
#include "runtime/fragment_mgr.h"
101
#include "runtime/query_context.h"
102
#include "runtime/result_block_buffer.h"
103
#include "runtime/result_buffer_mgr.h"
104
#include "runtime/runtime_profile.h"
105
#include "runtime/thread_context.h"
106
#include "runtime/workload_group/workload_group.h"
107
#include "runtime/workload_group/workload_group_manager.h"
108
#include "service/backend_options.h"
109
#include "service/http/http_client.h"
110
#include "service/point_query_executor.h"
111
#include "storage/data_dir.h"
112
#include "storage/index/inverted/inverted_index_desc.h"
113
#include "storage/olap_common.h"
114
#include "storage/olap_define.h"
115
#include "storage/rowset/beta_rowset.h"
116
#include "storage/rowset/rowset.h"
117
#include "storage/rowset/rowset_factory.h"
118
#include "storage/rowset/rowset_meta.h"
119
#include "storage/segment/column_reader.h"
120
#include "storage/storage_engine.h"
121
#include "storage/tablet/tablet_fwd.h"
122
#include "storage/tablet/tablet_manager.h"
123
#include "storage/tablet/tablet_schema.h"
124
#include "storage/txn/txn_manager.h"
125
#include "util/async_io.h"
126
#include "util/brpc_client_cache.h"
127
#include "util/brpc_closure.h"
128
#include "util/jdbc_utils.h"
129
#include "util/jsonb/serialize.h"
130
#include "util/md5.h"
131
#include "util/network_util.h"
132
#include "util/proto_util.h"
133
#include "util/stopwatch.hpp"
134
#include "util/string_util.h"
135
#include "util/thrift_util.h"
136
#include "util/time.h"
137
#include "util/uid_util.h"
138
139
namespace google {
140
namespace protobuf {
141
class RpcController;
142
} // namespace protobuf
143
} // namespace google
144
145
namespace doris {
146
#include "common/compile_check_avoid_begin.h"
147
using namespace ErrorCode;
148
149
const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;
150
151
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_queue_size, MetricUnit::NOUNIT);
152
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_queue_size, MetricUnit::NOUNIT);
153
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_active_threads, MetricUnit::NOUNIT);
154
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_active_threads, MetricUnit::NOUNIT);
155
156
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_max_queue_size, MetricUnit::NOUNIT);
157
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size, MetricUnit::NOUNIT);
158
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT);
159
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT);
160
161
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_queue_size, MetricUnit::NOUNIT);
162
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_active_threads, MetricUnit::NOUNIT);
163
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_max_queue_size, MetricUnit::NOUNIT);
164
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_max_threads, MetricUnit::NOUNIT);
165
166
static bvar::LatencyRecorder g_process_remote_fetch_rowsets_latency("process_remote_fetch_rowsets");
167
168
bthread_key_t btls_key;
169
170
1.47M
static void thread_context_deleter(void* d) {
171
1.47M
    delete static_cast<ThreadContext*>(d);
172
1.47M
}
173
174
template <typename T>
175
concept CanCancel = requires(T* response) { response->mutable_status(); };
176
177
template <typename T>
178
0
void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) {
179
0
    brpc::ClosureGuard closure_guard(done);
180
0
    LOG(WARNING) << "fail to offer request to the work pool, pool=" << pool.get_info();
181
0
}
Unexecuted instantiation: _ZN5doris12offer_failedINS_25PTabletWriterCancelResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_14PCacheResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_17PFetchCacheResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
182
183
template <CanCancel T>
184
0
void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) {
185
0
    brpc::ClosureGuard closure_guard(done);
186
    // Should use status to generate protobuf message, because it will encoding Backend Info
187
    // into the error message and then we could know which backend's pool is full.
188
0
    Status st = Status::Error<TStatusCode::CANCELLED>(
189
0
            "fail to offer request to the work pool, pool={}", pool.get_info());
190
0
    st.to_protobuf(response->mutable_status());
191
0
    LOG(WARNING) << "cancelled due to fail to offer request to the work pool, pool="
192
0
                 << pool.get_info();
193
0
}
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_23PTabletWriterOpenResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_23PExecPlanFragmentResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_23POpenLoadStreamResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_27PTabletWriterAddBlockResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_25PCancelPlanFragmentResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_21PFetchArrowDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_26POutfileWriteSuccessResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_23PFetchTableSchemaResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_29PFetchArrowFlightSchemaResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_24PTabletKeyLookupResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_25PJdbcTestConnectionResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_20PFetchColIdsResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_26PFetchRemoteSchemaResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_12PProxyResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_20PMergeFilterResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_23PSendFilterSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_23PSyncFilterSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_22PPublishFilterResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_15PSendDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_13PCommitResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_15PRollbackResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_19PConstantExprResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_26PTransmitRecCTEBlockResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_20PRerunFragmentResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_20PResetGlobalRfResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_19PTransmitDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_24PCheckRPCChannelResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_24PResetRPCChannelResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_23PTabletWriteSlaveResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_27PTabletWriteSlaveDoneResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_17PMultiGetResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_13PGlobResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_26PGroupCommitInsertResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_24PGetWalQueueSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_22PGetBeResourceResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_23PRequestCdcClientResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
194
195
template <typename T>
196
class NewHttpClosure : public ::google::protobuf::Closure {
197
public:
198
    NewHttpClosure(google::protobuf::Closure* done) : _done(done) {}
199
0
    NewHttpClosure(T* request, google::protobuf::Closure* done) : _request(request), _done(done) {}
Unexecuted instantiation: _ZN5doris14NewHttpClosureINS_28PTabletWriterAddBlockRequestEEC2EPS1_PN6google8protobuf7ClosureE
Unexecuted instantiation: _ZN5doris14NewHttpClosureINS_19PTransmitDataParamsEEC2EPS1_PN6google8protobuf7ClosureE
200
201
0
    void Run() override {
202
0
        if (_request != nullptr) {
203
0
            delete _request;
204
0
            _request = nullptr;
205
0
        }
206
0
        if (_done != nullptr) {
207
0
            _done->Run();
208
0
        }
209
0
        delete this;
210
0
    }
Unexecuted instantiation: _ZN5doris14NewHttpClosureINS_28PTabletWriterAddBlockRequestEE3RunEv
Unexecuted instantiation: _ZN5doris14NewHttpClosureINS_19PTransmitDataParamsEE3RunEv
211
212
private:
213
    T* _request = nullptr;
214
    google::protobuf::Closure* _done = nullptr;
215
};
216
217
PInternalService::PInternalService(ExecEnv* exec_env)
218
6
        : _exec_env(exec_env),
219
          // heavy threadpool is used for load process and other process that will read disk or access network.
220
6
          _heavy_work_pool(config::brpc_heavy_work_pool_threads != -1
221
6
                                   ? config::brpc_heavy_work_pool_threads
222
6
                                   : std::max(128, CpuInfo::num_cores() * 4),
223
6
                           config::brpc_heavy_work_pool_max_queue_size != -1
224
6
                                   ? config::brpc_heavy_work_pool_max_queue_size
225
6
                                   : std::max(10240, CpuInfo::num_cores() * 320),
226
6
                           "brpc_heavy"),
227
228
          // light threadpool should be only used in query processing logic. All hanlers should be very light, not locked, not access disk.
229
6
          _light_work_pool(config::brpc_light_work_pool_threads != -1
230
6
                                   ? config::brpc_light_work_pool_threads
231
6
                                   : std::max(128, CpuInfo::num_cores() * 4),
232
6
                           config::brpc_light_work_pool_max_queue_size != -1
233
6
                                   ? config::brpc_light_work_pool_max_queue_size
234
6
                                   : std::max(10240, CpuInfo::num_cores() * 320),
235
6
                           "brpc_light"),
236
6
          _arrow_flight_work_pool(config::brpc_arrow_flight_work_pool_threads != -1
237
6
                                          ? config::brpc_arrow_flight_work_pool_threads
238
6
                                          : std::max(512, CpuInfo::num_cores() * 2),
239
6
                                  config::brpc_arrow_flight_work_pool_max_queue_size != -1
240
6
                                          ? config::brpc_arrow_flight_work_pool_max_queue_size
241
6
                                          : std::max(20480, CpuInfo::num_cores() * 640),
242
6
                                  "brpc_arrow_flight") {
243
6
    REGISTER_HOOK_METRIC(heavy_work_pool_queue_size,
244
6
                         [this]() { return _heavy_work_pool.get_queue_size(); });
245
6
    REGISTER_HOOK_METRIC(light_work_pool_queue_size,
246
6
                         [this]() { return _light_work_pool.get_queue_size(); });
247
6
    REGISTER_HOOK_METRIC(heavy_work_active_threads,
248
6
                         [this]() { return _heavy_work_pool.get_active_threads(); });
249
6
    REGISTER_HOOK_METRIC(light_work_active_threads,
250
6
                         [this]() { return _light_work_pool.get_active_threads(); });
251
252
6
    REGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size,
253
6
                         []() { return config::brpc_heavy_work_pool_max_queue_size; });
254
6
    REGISTER_HOOK_METRIC(light_work_pool_max_queue_size,
255
6
                         []() { return config::brpc_light_work_pool_max_queue_size; });
256
6
    REGISTER_HOOK_METRIC(heavy_work_max_threads,
257
6
                         []() { return config::brpc_heavy_work_pool_threads; });
258
6
    REGISTER_HOOK_METRIC(light_work_max_threads,
259
6
                         []() { return config::brpc_light_work_pool_threads; });
260
261
6
    REGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size,
262
6
                         [this]() { return _arrow_flight_work_pool.get_queue_size(); });
263
6
    REGISTER_HOOK_METRIC(arrow_flight_work_active_threads,
264
6
                         [this]() { return _arrow_flight_work_pool.get_active_threads(); });
265
6
    REGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size,
266
6
                         []() { return config::brpc_arrow_flight_work_pool_max_queue_size; });
267
6
    REGISTER_HOOK_METRIC(arrow_flight_work_max_threads,
268
6
                         []() { return config::brpc_arrow_flight_work_pool_threads; });
269
270
6
    _exec_env->load_stream_mgr()->set_heavy_work_pool(&_heavy_work_pool);
271
272
6
    CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter));
273
6
    CHECK_EQ(0, bthread_key_create(&AsyncIO::btls_io_ctx_key, AsyncIO::io_ctx_key_deleter));
274
6
}
275
276
PInternalServiceImpl::PInternalServiceImpl(StorageEngine& engine, ExecEnv* exec_env)
277
5
        : PInternalService(exec_env), _engine(engine) {}
278
279
2
PInternalServiceImpl::~PInternalServiceImpl() = default;
280
281
2
PInternalService::~PInternalService() {
282
2
    DEREGISTER_HOOK_METRIC(heavy_work_pool_queue_size);
283
2
    DEREGISTER_HOOK_METRIC(light_work_pool_queue_size);
284
2
    DEREGISTER_HOOK_METRIC(heavy_work_active_threads);
285
2
    DEREGISTER_HOOK_METRIC(light_work_active_threads);
286
287
2
    DEREGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size);
288
2
    DEREGISTER_HOOK_METRIC(light_work_pool_max_queue_size);
289
2
    DEREGISTER_HOOK_METRIC(heavy_work_max_threads);
290
2
    DEREGISTER_HOOK_METRIC(light_work_max_threads);
291
292
2
    DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size);
293
2
    DEREGISTER_HOOK_METRIC(arrow_flight_work_active_threads);
294
2
    DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size);
295
2
    DEREGISTER_HOOK_METRIC(arrow_flight_work_max_threads);
296
297
2
    CHECK_EQ(0, bthread_key_delete(btls_key));
298
2
    CHECK_EQ(0, bthread_key_delete(AsyncIO::btls_io_ctx_key));
299
2
}
300
301
void PInternalService::tablet_writer_open(google::protobuf::RpcController* controller,
302
                                          const PTabletWriterOpenRequest* request,
303
                                          PTabletWriterOpenResult* response,
304
46.7k
                                          google::protobuf::Closure* done) {
305
46.8k
    bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
306
46.8k
        VLOG_RPC << "tablet writer open, id=" << request->id()
307
0
                 << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id();
308
46.8k
        signal::SignalTaskIdKeeper keeper(request->id());
309
46.8k
        brpc::ClosureGuard closure_guard(done);
310
46.8k
        auto st = _exec_env->load_channel_mgr()->open(*request);
311
46.8k
        if (!st.ok()) {
312
0
            LOG(WARNING) << "load channel open failed, message=" << st << ", id=" << request->id()
313
0
                         << ", index_id=" << request->index_id()
314
0
                         << ", txn_id=" << request->txn_id();
315
0
        }
316
46.8k
        st.to_protobuf(response->mutable_status());
317
46.8k
    });
318
46.7k
    if (!ret) {
319
0
        offer_failed(response, done, _heavy_work_pool);
320
0
        return;
321
0
    }
322
46.7k
}
323
324
void PInternalService::exec_plan_fragment(google::protobuf::RpcController* controller,
325
                                          const PExecPlanFragmentRequest* request,
326
                                          PExecPlanFragmentResult* response,
327
143k
                                          google::protobuf::Closure* done) {
328
143k
    timeval tv {};
329
143k
    gettimeofday(&tv, nullptr);
330
143k
    response->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000);
331
144k
    bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
332
144k
        _exec_plan_fragment_in_pthread(controller, request, response, done);
333
144k
    });
334
143k
    if (!ret) {
335
0
        offer_failed(response, done, _light_work_pool);
336
0
        return;
337
0
    }
338
143k
}
339
340
void PInternalService::_exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller,
341
                                                      const PExecPlanFragmentRequest* request,
342
                                                      PExecPlanFragmentResult* response,
343
254k
                                                      google::protobuf::Closure* done) {
344
254k
    timeval tv1 {};
345
254k
    gettimeofday(&tv1, nullptr);
346
254k
    response->set_execution_time(tv1.tv_sec * 1000LL + tv1.tv_usec / 1000);
347
254k
    brpc::ClosureGuard closure_guard(done);
348
254k
    auto st = Status::OK();
349
18.4E
    bool compact = request->has_compact() ? request->compact() : false;
350
254k
    PFragmentRequestVersion version =
351
18.4E
            request->has_version() ? request->version() : PFragmentRequestVersion::VERSION_1;
352
254k
    try {
353
254k
        st = _exec_plan_fragment_impl(request->request(), version, compact);
354
254k
    } catch (const Exception& e) {
355
0
        st = e.to_status();
356
0
    } catch (const std::exception& e) {
357
0
        st = Status::Error(ErrorCode::INTERNAL_ERROR, e.what());
358
0
    } catch (...) {
359
0
        st = Status::Error(ErrorCode::INTERNAL_ERROR,
360
0
                           "_exec_plan_fragment_impl meet unknown error");
361
0
    }
362
254k
    if (!st.ok()) {
363
1.20k
        LOG(WARNING) << "exec plan fragment failed, errmsg=" << st;
364
1.20k
    }
365
253k
    st.to_protobuf(response->mutable_status());
366
253k
    timeval tv2 {};
367
253k
    gettimeofday(&tv2, nullptr);
368
253k
    response->set_execution_done_time(tv2.tv_sec * 1000LL + tv2.tv_usec / 1000);
369
253k
}
370
371
void PInternalService::exec_plan_fragment_prepare(google::protobuf::RpcController* controller,
372
                                                  const PExecPlanFragmentRequest* request,
373
                                                  PExecPlanFragmentResult* response,
374
109k
                                                  google::protobuf::Closure* done) {
375
109k
    timeval tv {};
376
109k
    gettimeofday(&tv, nullptr);
377
109k
    response->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000);
378
109k
    bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
379
109k
        _exec_plan_fragment_in_pthread(controller, request, response, done);
380
109k
    });
381
109k
    if (!ret) {
382
0
        offer_failed(response, done, _light_work_pool);
383
0
        return;
384
0
    }
385
109k
}
386
387
void PInternalService::exec_plan_fragment_start(google::protobuf::RpcController* /*controller*/,
388
                                                const PExecPlanFragmentStartRequest* request,
389
                                                PExecPlanFragmentResult* result,
390
109k
                                                google::protobuf::Closure* done) {
391
109k
    timeval tv {};
392
109k
    gettimeofday(&tv, nullptr);
393
109k
    result->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000);
394
109k
    bool ret = _light_work_pool.try_offer([this, request, result, done]() {
395
109k
        timeval tv1 {};
396
109k
        gettimeofday(&tv1, nullptr);
397
109k
        result->set_execution_time(tv1.tv_sec * 1000LL + tv1.tv_usec / 1000);
398
109k
        brpc::ClosureGuard closure_guard(done);
399
109k
        auto st = _exec_env->fragment_mgr()->start_query_execution(request);
400
109k
        st.to_protobuf(result->mutable_status());
401
109k
        timeval tv2 {};
402
109k
        gettimeofday(&tv2, nullptr);
403
109k
        result->set_execution_done_time(tv2.tv_sec * 1000LL + tv2.tv_usec / 1000);
404
109k
    });
405
109k
    if (!ret) {
406
0
        offer_failed(result, done, _light_work_pool);
407
0
        return;
408
0
    }
409
109k
}
410
411
void PInternalService::open_load_stream(google::protobuf::RpcController* controller,
412
                                        const POpenLoadStreamRequest* request,
413
                                        POpenLoadStreamResponse* response,
414
2.81k
                                        google::protobuf::Closure* done) {
415
2.81k
    bool ret = _heavy_work_pool.try_offer([this, controller, request, response, done]() {
416
2.81k
        signal::SignalTaskIdKeeper keeper(request->load_id());
417
2.81k
        brpc::ClosureGuard done_guard(done);
418
2.81k
        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
419
2.81k
        brpc::StreamOptions stream_options;
420
421
2.81k
        LOG(INFO) << "open load stream, load_id=" << request->load_id()
422
2.81k
                  << ", src_id=" << request->src_id();
423
424
2.81k
        std::vector<BaseTabletSPtr> tablets;
425
2.81k
        for (const auto& req : request->tablets()) {
426
1.43k
            BaseTabletSPtr tablet;
427
1.43k
            if (auto res = ExecEnv::get_tablet(req.tablet_id()); !res.has_value()) [[unlikely]] {
428
0
                auto st = std::move(res).error();
429
0
                st.to_protobuf(response->mutable_status());
430
0
                cntl->SetFailed(st.to_string());
431
0
                return;
432
1.43k
            } else {
433
1.43k
                tablet = std::move(res).value();
434
1.43k
            }
435
1.43k
            auto resp = response->add_tablet_schemas();
436
1.43k
            resp->set_index_id(req.index_id());
437
1.43k
            resp->set_enable_unique_key_merge_on_write(tablet->enable_unique_key_merge_on_write());
438
1.43k
            tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema());
439
1.43k
            tablets.push_back(tablet);
440
1.43k
        }
441
2.81k
        if (!tablets.empty()) {
442
1.43k
            auto* tablet_load_infos = response->mutable_tablet_load_rowset_num_infos();
443
1.43k
            for (const auto& tablet : tablets) {
444
1.43k
                BaseDeltaWriter::collect_tablet_load_rowset_num_info(tablet.get(),
445
1.43k
                                                                     tablet_load_infos);
446
1.43k
            }
447
1.43k
        }
448
449
2.81k
        LoadStream* load_stream = nullptr;
450
2.81k
        auto st = _exec_env->load_stream_mgr()->open_load_stream(request, load_stream);
451
2.81k
        if (!st.ok()) {
452
0
            st.to_protobuf(response->mutable_status());
453
0
            return;
454
0
        }
455
456
2.81k
        stream_options.handler = load_stream;
457
2.81k
        stream_options.idle_timeout_ms = request->idle_timeout_ms();
458
2.81k
        DBUG_EXECUTE_IF("PInternalServiceImpl.open_load_stream.set_idle_timeout",
459
2.81k
                        { stream_options.idle_timeout_ms = 1; });
460
461
2.81k
        StreamId streamid;
462
2.81k
        if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) {
463
0
            st = Status::Cancelled("Fail to accept stream {}", streamid);
464
0
            st.to_protobuf(response->mutable_status());
465
0
            cntl->SetFailed(st.to_string());
466
0
            return;
467
0
        }
468
469
2.81k
        VLOG_DEBUG << "get streamid =" << streamid;
470
2.81k
        st.to_protobuf(response->mutable_status());
471
2.81k
    });
472
2.81k
    if (!ret) {
473
0
        offer_failed(response, done, _heavy_work_pool);
474
0
    }
475
2.81k
}
476
477
void PInternalService::tablet_writer_add_block_by_http(google::protobuf::RpcController* controller,
478
                                                       const ::doris::PEmptyRequest* request,
479
                                                       PTabletWriterAddBlockResult* response,
480
0
                                                       google::protobuf::Closure* done) {
481
0
    PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest();
482
0
    google::protobuf::Closure* new_done =
483
0
            new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request, done);
484
0
    brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
485
0
    Status st = attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(new_request,
486
0
                                                                                       cntl);
487
0
    if (st.ok()) {
488
0
        tablet_writer_add_block(controller, new_request, response, new_done);
489
0
    } else {
490
0
        st.to_protobuf(response->mutable_status());
491
0
    }
492
0
}
493
494
void PInternalService::tablet_writer_add_block(google::protobuf::RpcController* controller,
495
                                               const PTabletWriterAddBlockRequest* request,
496
                                               PTabletWriterAddBlockResult* response,
497
48.8k
                                               google::protobuf::Closure* done) {
498
48.8k
    int64_t submit_task_time_ns = MonotonicNanos();
499
48.9k
    bool ret = _heavy_work_pool.try_offer([request, response, done, submit_task_time_ns, this]() {
500
48.9k
        int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns;
501
48.9k
        brpc::ClosureGuard closure_guard(done);
502
48.9k
        int64_t execution_time_ns = 0;
503
48.9k
        {
504
48.9k
            SCOPED_RAW_TIMER(&execution_time_ns);
505
48.9k
            signal::SignalTaskIdKeeper keeper(request->id());
506
48.9k
            auto st = _exec_env->load_channel_mgr()->add_batch(*request, response);
507
48.9k
            if (!st.ok()) {
508
44
                LOG(WARNING) << "tablet writer add block failed, message=" << st
509
44
                             << ", id=" << request->id() << ", index_id=" << request->index_id()
510
44
                             << ", sender_id=" << request->sender_id()
511
44
                             << ", backend id=" << request->backend_id();
512
44
            }
513
48.9k
            st.to_protobuf(response->mutable_status());
514
48.9k
        }
515
48.9k
        response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO);
516
48.9k
        response->set_wait_execution_time_us(wait_execution_time_ns / NANOS_PER_MICRO);
517
48.9k
    });
518
48.8k
    if (!ret) {
519
0
        offer_failed(response, done, _heavy_work_pool);
520
0
        return;
521
0
    }
522
48.8k
}
523
524
void PInternalService::tablet_writer_cancel(google::protobuf::RpcController* controller,
525
                                            const PTabletWriterCancelRequest* request,
526
                                            PTabletWriterCancelResult* response,
527
144
                                            google::protobuf::Closure* done) {
528
144
    bool ret = _heavy_work_pool.try_offer([this, request, done]() {
529
144
        VLOG_RPC << "tablet writer cancel, id=" << request->id()
530
0
                 << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id();
531
144
        signal::SignalTaskIdKeeper keeper(request->id());
532
144
        brpc::ClosureGuard closure_guard(done);
533
144
        auto st = _exec_env->load_channel_mgr()->cancel(*request);
534
144
        if (!st.ok()) {
535
0
            LOG(WARNING) << "tablet writer cancel failed, id=" << request->id()
536
0
                         << ", index_id=" << request->index_id()
537
0
                         << ", sender_id=" << request->sender_id();
538
0
        }
539
144
    });
540
144
    if (!ret) {
541
0
        offer_failed(response, done, _heavy_work_pool);
542
0
        return;
543
0
    }
544
144
}
545
546
Status PInternalService::_exec_plan_fragment_impl(
547
        const std::string& ser_request, PFragmentRequestVersion version, bool compact,
548
254k
        const std::function<void(RuntimeState*, Status*)>& cb) {
549
    // Sometimes the BE do not receive the first heartbeat message and it receives request from FE
550
    // If BE execute this fragment, it will core when it wants to get some property from master info.
551
254k
    if (ExecEnv::GetInstance()->cluster_info() == nullptr) {
552
0
        return Status::InternalError(
553
0
                "Have not receive the first heartbeat message from master, not ready to provide "
554
0
                "service");
555
0
    }
556
254k
    CHECK(version == PFragmentRequestVersion::VERSION_3)
557
0
            << "only support version 3, received " << version;
558
254k
    if (version == PFragmentRequestVersion::VERSION_3) {
559
254k
        TPipelineFragmentParamsList t_request;
560
254k
        {
561
254k
            const uint8_t* buf = (const uint8_t*)ser_request.data();
562
254k
            uint32_t len = ser_request.size();
563
254k
            RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request));
564
254k
        }
565
566
254k
        const auto& fragment_list = t_request.params_list;
567
254k
        if (fragment_list.empty()) {
568
0
            return Status::InternalError("Invalid TPipelineFragmentParamsList!");
569
0
        }
570
254k
        MonotonicStopWatch timer;
571
254k
        timer.start();
572
573
        // work for old version frontend
574
254k
        if (!t_request.__isset.runtime_filter_info) {
575
94.0k
            TRuntimeFilterInfo runtime_filter_info;
576
94.0k
            auto local_param = fragment_list[0].local_params[0];
577
94.0k
            if (local_param.__isset.runtime_filter_params) {
578
93.9k
                runtime_filter_info.__set_runtime_filter_params(local_param.runtime_filter_params);
579
93.9k
            }
580
94.0k
            if (local_param.__isset.topn_filter_descs) {
581
0
                runtime_filter_info.__set_topn_filter_descs(local_param.topn_filter_descs);
582
0
            }
583
94.0k
            t_request.__set_runtime_filter_info(runtime_filter_info);
584
94.0k
        }
585
586
384k
        for (const TPipelineFragmentParams& fragment : fragment_list) {
587
384k
            if (cb) {
588
29
                RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
589
29
                        fragment, QuerySource::INTERNAL_FRONTEND, cb, t_request));
590
384k
            } else {
591
384k
                RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
592
384k
                        fragment, QuerySource::INTERNAL_FRONTEND, t_request));
593
384k
            }
594
384k
        }
595
252k
        timer.stop();
596
252k
        double cost_secs = static_cast<double>(timer.elapsed_time()) / 1000000000ULL;
597
252k
        if (cost_secs > 5) {
598
33
            LOG_WARNING("Prepare {} fragments of query {} costs {} seconds, it costs too much",
599
33
                        fragment_list.size(), print_id(fragment_list.front().query_id), cost_secs);
600
33
        }
601
602
252k
        return Status::OK();
603
254k
    } else {
604
0
        return Status::InternalError("invalid version");
605
0
    }
606
254k
}
607
608
void PInternalService::cancel_plan_fragment(google::protobuf::RpcController* /*controller*/,
609
                                            const PCancelPlanFragmentRequest* request,
610
                                            PCancelPlanFragmentResult* result,
611
159k
                                            google::protobuf::Closure* done) {
612
159k
    bool ret = _light_work_pool.try_offer([this, request, result, done]() {
613
159k
        brpc::ClosureGuard closure_guard(done);
614
159k
        signal::SignalTaskIdKeeper keeper(request->finst_id());
615
159k
        Status st = Status::OK();
616
617
159k
        const bool has_cancel_reason = request->has_cancel_reason();
618
159k
        const bool has_cancel_status = request->has_cancel_status();
619
        // During upgrade only LIMIT_REACH is used, other reason is changed to internal error
620
159k
        Status actual_cancel_status = Status::OK();
621
        // Convert PPlanFragmentCancelReason to Status
622
159k
        if (has_cancel_status) {
623
            // If fe set cancel status, then it is new FE now, should use cancel status.
624
159k
            actual_cancel_status = Status::create<false>(request->cancel_status());
625
159k
        } else if (has_cancel_reason) {
626
            // If fe not set cancel status, but set cancel reason, should convert cancel reason
627
            // to cancel status here.
628
0
            if (request->cancel_reason() == PPlanFragmentCancelReason::LIMIT_REACH) {
629
0
                actual_cancel_status = Status::Error<ErrorCode::LIMIT_REACH>("limit reach");
630
0
            } else {
631
                // Use cancel reason as error message
632
0
                actual_cancel_status = Status::InternalError(
633
0
                        PPlanFragmentCancelReason_Name(request->cancel_reason()));
634
0
            }
635
0
        } else {
636
0
            actual_cancel_status = Status::InternalError("unknown error");
637
0
        }
638
639
159k
        TUniqueId query_id;
640
159k
        query_id.__set_hi(request->query_id().hi());
641
159k
        query_id.__set_lo(request->query_id().lo());
642
159k
        LOG(INFO) << fmt::format("Cancel query {}, reason: {}", print_id(query_id),
643
159k
                                 actual_cancel_status.to_string());
644
159k
        _exec_env->fragment_mgr()->cancel_query(query_id, actual_cancel_status);
645
646
        // TODO: the logic seems useless, cancel only return Status::OK. remove it
647
159k
        st.to_protobuf(result->mutable_status());
648
159k
    });
649
159k
    if (!ret) {
650
0
        offer_failed(result, done, _light_work_pool);
651
0
        return;
652
0
    }
653
159k
}
654
655
void PInternalService::fetch_data(google::protobuf::RpcController* controller,
656
                                  const PFetchDataRequest* request, PFetchDataResult* result,
657
364k
                                  google::protobuf::Closure* done) {
658
    // fetch_data is a light operation which will put a request rather than wait inplace when there's no data ready.
659
    // when there's data ready, use brpc to send. there's queue in brpc service. won't take it too long.
660
364k
    auto ctx = GetResultBatchCtx::create_shared(result, done);
661
364k
    TUniqueId unique_id = UniqueId(request->finst_id()).to_thrift(); // query_id or instance_id
662
364k
    std::shared_ptr<MySQLResultBlockBuffer> buffer;
663
364k
    Status st = ExecEnv::GetInstance()->result_mgr()->find_buffer(unique_id, buffer);
664
364k
    if (!st.ok()) {
665
0
        LOG(WARNING) << "Result buffer not found! finst ID: " << print_id(unique_id);
666
0
        return;
667
0
    }
668
364k
    if (st = buffer->get_batch(ctx); !st.ok()) {
669
6
        LOG(WARNING) << "fetch_data failed: " << st.to_string();
670
6
    }
671
364k
}
672
673
void PInternalService::fetch_arrow_data(google::protobuf::RpcController* controller,
674
                                        const PFetchArrowDataRequest* request,
675
                                        PFetchArrowDataResult* result,
676
0
                                        google::protobuf::Closure* done) {
677
0
    bool ret = _arrow_flight_work_pool.try_offer([request, result, done]() {
678
0
        auto ctx = GetArrowResultBatchCtx::create_shared(result, done);
679
0
        TUniqueId unique_id = UniqueId(request->finst_id()).to_thrift(); // query_id or instance_id
680
0
        std::shared_ptr<ArrowFlightResultBlockBuffer> arrow_buffer;
681
0
        auto st = ExecEnv::GetInstance()->result_mgr()->find_buffer(unique_id, arrow_buffer);
682
0
        if (!st.ok()) {
683
0
            LOG(WARNING) << "Result buffer not found! Query ID: " << print_id(unique_id);
684
0
            return;
685
0
        }
686
0
        if (st = arrow_buffer->get_batch(ctx); !st.ok()) {
687
0
            LOG(WARNING) << "fetch_arrow_data failed: " << st.to_string();
688
0
        }
689
0
    });
690
0
    if (!ret) {
691
0
        offer_failed(result, done, _arrow_flight_work_pool);
692
0
        return;
693
0
    }
694
0
}
695
696
void PInternalService::outfile_write_success(google::protobuf::RpcController* controller,
697
                                             const POutfileWriteSuccessRequest* request,
698
                                             POutfileWriteSuccessResult* result,
699
4
                                             google::protobuf::Closure* done) {
700
4
    bool ret = _heavy_work_pool.try_offer([request, result, done]() {
701
4
        VLOG_RPC << "outfile write success file";
702
4
        brpc::ClosureGuard closure_guard(done);
703
4
        TResultFileSink result_file_sink;
704
4
        Status st = Status::OK();
705
4
        {
706
4
            const uint8_t* buf = (const uint8_t*)(request->result_file_sink().data());
707
4
            uint32_t len = request->result_file_sink().size();
708
4
            st = deserialize_thrift_msg(buf, &len, false, &result_file_sink);
709
4
            if (!st.ok()) {
710
0
                LOG(WARNING) << "outfile write success file failed, errmsg = " << st;
711
0
                st.to_protobuf(result->mutable_status());
712
0
                return;
713
0
            }
714
4
        }
715
716
4
        TResultFileSinkOptions file_options = result_file_sink.file_options;
717
4
        std::stringstream ss;
718
4
        ss << file_options.file_path << file_options.success_file_name;
719
4
        std::string file_name = ss.str();
720
4
        if (result_file_sink.storage_backend_type == TStorageBackendType::LOCAL) {
721
            // For local file writer, the file_path is a local dir.
722
            // Here we do a simple security verification by checking whether the file exists.
723
            // Because the file path is currently arbitrarily specified by the user,
724
            // Doris is not responsible for ensuring the correctness of the path.
725
            // This is just to prevent overwriting the existing file.
726
4
            bool exists = true;
727
4
            st = io::global_local_filesystem()->exists(file_name, &exists);
728
4
            if (!st.ok()) {
729
0
                LOG(WARNING) << "outfile write success filefailed, errmsg = " << st;
730
0
                st.to_protobuf(result->mutable_status());
731
0
                return;
732
0
            }
733
4
            if (exists) {
734
0
                st = Status::InternalError("File already exists: {}", file_name);
735
0
            }
736
4
            if (!st.ok()) {
737
0
                LOG(WARNING) << "outfile write success file failed, errmsg = " << st;
738
0
                st.to_protobuf(result->mutable_status());
739
0
                return;
740
0
            }
741
4
        }
742
743
4
        auto file_type_res =
744
4
                FileFactory::convert_storage_type(result_file_sink.storage_backend_type);
745
4
        if (!file_type_res.has_value()) [[unlikely]] {
746
0
            st = std::move(file_type_res).error();
747
0
            st.to_protobuf(result->mutable_status());
748
0
            LOG(WARNING) << "encounter unkonw type=" << result_file_sink.storage_backend_type
749
0
                         << ", st=" << st;
750
0
            return;
751
0
        }
752
753
4
        auto&& res = FileFactory::create_file_writer(file_type_res.value(), ExecEnv::GetInstance(),
754
4
                                                     file_options.broker_addresses,
755
4
                                                     file_options.broker_properties, file_name,
756
4
                                                     {
757
4
                                                             .write_file_cache = false,
758
4
                                                             .sync_file_data = false,
759
4
                                                     });
760
4
        using T = std::decay_t<decltype(res)>;
761
4
        if (!res.has_value()) [[unlikely]] {
762
0
            st = std::forward<T>(res).error();
763
0
            st.to_protobuf(result->mutable_status());
764
0
            return;
765
0
        }
766
767
4
        std::unique_ptr<doris::io::FileWriter> _file_writer_impl = std::forward<T>(res).value();
768
        // must write somthing because s3 file writer can not writer empty file
769
4
        st = _file_writer_impl->append({"success"});
770
4
        if (!st.ok()) {
771
0
            LOG(WARNING) << "outfile write success filefailed, errmsg=" << st;
772
0
            st.to_protobuf(result->mutable_status());
773
0
            return;
774
0
        }
775
4
        st = _file_writer_impl->close();
776
4
        if (!st.ok()) {
777
0
            LOG(WARNING) << "outfile write success filefailed, errmsg=" << st;
778
0
            st.to_protobuf(result->mutable_status());
779
0
            return;
780
0
        }
781
4
    });
782
4
    if (!ret) {
783
0
        offer_failed(result, done, _heavy_work_pool);
784
0
        return;
785
0
    }
786
4
}
787
788
void PInternalService::fetch_table_schema(google::protobuf::RpcController* controller,
789
                                          const PFetchTableSchemaRequest* request,
790
                                          PFetchTableSchemaResult* result,
791
2.02k
                                          google::protobuf::Closure* done) {
792
2.02k
    bool ret = _heavy_work_pool.try_offer([request, result, done]() {
793
2.02k
        VLOG_RPC << "fetch table schema";
794
2.02k
        brpc::ClosureGuard closure_guard(done);
795
2.02k
        TFileScanRange file_scan_range;
796
2.02k
        Status st = Status::OK();
797
2.02k
        {
798
2.02k
            const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data());
799
2.02k
            uint32_t len = request->file_scan_range().size();
800
2.02k
            st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
801
2.02k
            if (!st.ok()) {
802
0
                LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
803
0
                st.to_protobuf(result->mutable_status());
804
0
                return;
805
0
            }
806
2.02k
        }
807
2.02k
        if (file_scan_range.__isset.ranges == false) {
808
0
            st = Status::InternalError("can not get TFileRangeDesc.");
809
0
            st.to_protobuf(result->mutable_status());
810
0
            return;
811
0
        }
812
2.02k
        if (file_scan_range.__isset.params == false) {
813
0
            st = Status::InternalError("can not get TFileScanRangeParams.");
814
0
            st.to_protobuf(result->mutable_status());
815
0
            return;
816
0
        }
817
2.02k
        const TFileRangeDesc& range = file_scan_range.ranges.at(0);
818
2.02k
        const TFileScanRangeParams& params = file_scan_range.params;
819
820
2.02k
        std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
821
2.02k
                MemTrackerLimiter::Type::OTHER,
822
2.02k
                fmt::format("InternalService::fetch_table_schema:{}#{}", params.format_type,
823
2.02k
                            params.file_type));
824
2.02k
        SCOPED_ATTACH_TASK(mem_tracker);
825
826
        // make sure profile is desctructed after reader cause PrefetchBufferedReader
827
        // might asynchronouslly access the profile
828
2.02k
        std::unique_ptr<RuntimeProfile> profile =
829
2.02k
                std::make_unique<RuntimeProfile>("FetchTableSchema");
830
2.02k
        std::unique_ptr<GenericReader> reader(nullptr);
831
2.02k
        auto io_ctx = std::make_shared<io::IOContext>();
832
2.02k
        auto file_cache_statis = std::make_shared<io::FileCacheStatistics>();
833
2.02k
        auto file_reader_stats = std::make_shared<io::FileReaderStats>();
834
2.02k
        io_ctx->file_cache_stats = file_cache_statis.get();
835
2.02k
        io_ctx->file_reader_stats = file_reader_stats.get();
836
2.02k
        constexpr size_t fetch_schema_batch_size = 4064;
837
        // file_slots is no use, but the lifetime should be longer than reader
838
2.02k
        std::vector<SlotDescriptor*> file_slots;
839
2.02k
        switch (params.format_type) {
840
609
        case TFileFormatType::FORMAT_CSV_PLAIN:
841
609
        case TFileFormatType::FORMAT_CSV_GZ:
842
609
        case TFileFormatType::FORMAT_CSV_BZ2:
843
609
        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
844
609
        case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
845
609
        case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
846
609
        case TFileFormatType::FORMAT_CSV_LZOP:
847
609
        case TFileFormatType::FORMAT_CSV_DEFLATE: {
848
609
            reader = CsvReader::create_unique(nullptr, profile.get(), nullptr, params, range,
849
609
                                              file_slots, fetch_schema_batch_size, io_ctx.get(),
850
609
                                              io_ctx);
851
609
            break;
852
609
        }
853
2
        case TFileFormatType::FORMAT_TEXT: {
854
2
            reader = TextReader::create_unique(nullptr, profile.get(), nullptr, params, range,
855
2
                                               file_slots, fetch_schema_batch_size, io_ctx.get());
856
2
            break;
857
609
        }
858
951
        case TFileFormatType::FORMAT_PARQUET: {
859
951
            reader = ParquetReader::create_unique(params, range, io_ctx, nullptr);
860
951
            break;
861
609
        }
862
409
        case TFileFormatType::FORMAT_ORC: {
863
409
            reader = OrcReader::create_unique(params, range, fetch_schema_batch_size, "", io_ctx);
864
409
            break;
865
609
        }
866
2
        case TFileFormatType::FORMAT_NATIVE: {
867
2
            reader = NativeReader::create_unique(profile.get(), params, range, io_ctx.get(),
868
2
                                                 nullptr);
869
2
            break;
870
609
        }
871
46
        case TFileFormatType::FORMAT_JSON: {
872
46
            reader = NewJsonReader::create_unique(profile.get(), params, range, file_slots,
873
46
                                                  fetch_schema_batch_size, io_ctx.get(), io_ctx);
874
46
            break;
875
609
        }
876
0
#ifdef BUILD_RUST_READERS
877
8
        case TFileFormatType::FORMAT_LANCE: {
878
8
            reader = LanceRustReader::create_unique(params, range, io_ctx.get());
879
8
            break;
880
609
        }
881
0
#endif
882
0
        default:
883
0
            st = Status::InternalError("Not supported file format in fetch table schema: {}",
884
0
                                       params.format_type);
885
0
            st.to_protobuf(result->mutable_status());
886
0
            return;
887
2.02k
        }
888
2.02k
        if (!st.ok()) {
889
0
            LOG(WARNING) << "failed to create reader, errmsg=" << st;
890
0
            st.to_protobuf(result->mutable_status());
891
0
            return;
892
0
        }
893
2.02k
        st = reader->init_schema_reader();
894
2.02k
        if (!st.ok()) {
895
10
            LOG(WARNING) << "failed to init reader, errmsg=" << st;
896
10
            st.to_protobuf(result->mutable_status());
897
10
            return;
898
10
        }
899
2.01k
        std::vector<std::string> col_names;
900
2.01k
        std::vector<DataTypePtr> col_types;
901
2.01k
        st = reader->get_parsed_schema(&col_names, &col_types);
902
2.01k
        if (!st.ok()) {
903
7
            LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
904
7
            st.to_protobuf(result->mutable_status());
905
7
            return;
906
7
        }
907
2.00k
        result->set_column_nums(col_names.size());
908
15.0k
        for (size_t idx = 0; idx < col_names.size(); ++idx) {
909
13.0k
            result->add_column_names(col_names[idx]);
910
13.0k
        }
911
15.0k
        for (size_t idx = 0; idx < col_types.size(); ++idx) {
912
13.0k
            PTypeDesc* type_desc = result->add_column_types();
913
13.0k
            col_types[idx]->to_protobuf(type_desc);
914
13.0k
        }
915
2.00k
        st.to_protobuf(result->mutable_status());
916
2.00k
    });
917
2.02k
    if (!ret) {
918
0
        offer_failed(result, done, _heavy_work_pool);
919
0
        return;
920
0
    }
921
2.02k
}
922
923
void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController* controller,
924
                                                 const PFetchArrowFlightSchemaRequest* request,
925
                                                 PFetchArrowFlightSchemaResult* result,
926
55
                                                 google::protobuf::Closure* done) {
927
55
    bool ret = _arrow_flight_work_pool.try_offer([request, result, done]() {
928
55
        brpc::ClosureGuard closure_guard(done);
929
55
        std::shared_ptr<arrow::Schema> schema;
930
55
        std::shared_ptr<ArrowFlightResultBlockBuffer> buffer;
931
55
        auto st = ExecEnv::GetInstance()->result_mgr()->find_buffer(
932
55
                UniqueId(request->finst_id()).to_thrift(), buffer);
933
55
        if (!st.ok()) {
934
0
            LOG(WARNING) << "fetch arrow flight schema failed, errmsg=" << st;
935
0
            st.to_protobuf(result->mutable_status());
936
0
            return;
937
0
        }
938
55
        st = buffer->get_schema(&schema);
939
55
        if (!st.ok()) {
940
0
            LOG(WARNING) << "fetch arrow flight schema failed, errmsg=" << st;
941
0
            st.to_protobuf(result->mutable_status());
942
0
            return;
943
0
        }
944
945
55
        std::string schema_str;
946
55
        st = serialize_arrow_schema(&schema, &schema_str);
947
55
        if (st.ok()) {
948
55
            result->set_schema(std::move(schema_str));
949
55
            if (!config::public_host.empty()) {
950
0
                result->set_be_arrow_flight_ip(config::public_host);
951
0
            }
952
55
            if (config::arrow_flight_sql_proxy_port != -1) {
953
0
                result->set_be_arrow_flight_port(config::arrow_flight_sql_proxy_port);
954
0
            }
955
55
        }
956
55
        st.to_protobuf(result->mutable_status());
957
55
    });
958
55
    if (!ret) {
959
0
        offer_failed(result, done, _arrow_flight_work_pool);
960
0
        return;
961
0
    }
962
55
}
963
964
Status PInternalService::_tablet_fetch_data(const PTabletKeyLookupRequest* request,
965
227
                                            PTabletKeyLookupResponse* response) {
966
227
    PointQueryExecutor executor;
967
227
    RETURN_IF_ERROR(executor.init(request, response));
968
227
    if (response->has_need_resend_query_context() && response->need_resend_query_context()) {
969
2
        return Status::OK();
970
2
    }
971
225
    RETURN_IF_ERROR(executor.lookup_up());
972
222
    executor.print_profile();
973
222
    return Status::OK();
974
225
}
975
976
void PInternalService::tablet_fetch_data(google::protobuf::RpcController* controller,
977
                                         const PTabletKeyLookupRequest* request,
978
                                         PTabletKeyLookupResponse* response,
979
227
                                         google::protobuf::Closure* done) {
980
227
    bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
981
227
        [[maybe_unused]] auto* cntl = static_cast<brpc::Controller*>(controller);
982
227
        brpc::ClosureGuard guard(done);
983
227
        Status st = _tablet_fetch_data(request, response);
984
227
        st.to_protobuf(response->mutable_status());
985
227
    });
986
227
    if (!ret) {
987
0
        offer_failed(response, done, _light_work_pool);
988
0
        return;
989
0
    }
990
227
}
991
992
void PInternalService::test_jdbc_connection(google::protobuf::RpcController* controller,
993
                                            const PJdbcTestConnectionRequest* request,
994
                                            PJdbcTestConnectionResult* result,
995
160
                                            google::protobuf::Closure* done) {
996
160
    if (!doris::config::enable_java_support) {
997
0
        doris::Status status = doris::Status::InternalError(
998
0
                "you can change be config enable_java_support to true and restart be.");
999
0
        status.to_protobuf(result->mutable_status());
1000
0
        done->Run();
1001
0
        return;
1002
0
    }
1003
160
    bool ret = _heavy_work_pool.try_offer([request, result, done]() {
1004
160
        VLOG_RPC << "test jdbc connection";
1005
160
        brpc::ClosureGuard closure_guard(done);
1006
160
        std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
1007
160
                MemTrackerLimiter::Type::OTHER,
1008
160
                fmt::format("InternalService::test_jdbc_connection"));
1009
160
        SCOPED_ATTACH_TASK(mem_tracker);
1010
160
        TTableDescriptor table_desc;
1011
160
        Status st = Status::OK();
1012
160
        {
1013
160
            const uint8_t* buf = (const uint8_t*)request->jdbc_table().data();
1014
160
            uint32_t len = request->jdbc_table().size();
1015
160
            st = deserialize_thrift_msg(buf, &len, false, &table_desc);
1016
160
            if (!st.ok()) {
1017
0
                LOG(WARNING) << "test jdbc connection failed, errmsg=" << st;
1018
0
                st.to_protobuf(result->mutable_status());
1019
0
                return;
1020
0
            }
1021
160
        }
1022
160
        TJdbcTable jdbc_table = (table_desc.jdbcTable);
1023
1024
        // Resolve driver URL to absolute file:// path
1025
160
        std::string driver_url;
1026
160
        st = JdbcUtils::resolve_driver_url(jdbc_table.jdbc_driver_url, &driver_url);
1027
160
        if (!st.ok()) {
1028
0
            st.to_protobuf(result->mutable_status());
1029
0
            return;
1030
0
        }
1031
1032
        // Build params for JdbcConnectionTester
1033
160
        std::map<std::string, std::string> params;
1034
160
        params["jdbc_url"] = jdbc_table.jdbc_url;
1035
160
        params["jdbc_user"] = jdbc_table.jdbc_user;
1036
160
        params["jdbc_password"] = jdbc_table.jdbc_password;
1037
160
        params["jdbc_driver_class"] = jdbc_table.jdbc_driver_class;
1038
160
        params["jdbc_driver_url"] = driver_url;
1039
160
        params["query_sql"] = request->query_str();
1040
160
        params["catalog_id"] = std::to_string(jdbc_table.catalog_id);
1041
160
        params["connection_pool_min_size"] = std::to_string(jdbc_table.connection_pool_min_size);
1042
160
        params["connection_pool_max_size"] = std::to_string(jdbc_table.connection_pool_max_size);
1043
160
        params["connection_pool_max_wait_time"] =
1044
160
                std::to_string(jdbc_table.connection_pool_max_wait_time);
1045
160
        params["connection_pool_max_life_time"] =
1046
160
                std::to_string(jdbc_table.connection_pool_max_life_time);
1047
160
        params["connection_pool_keep_alive"] =
1048
160
                jdbc_table.connection_pool_keep_alive ? "true" : "false";
1049
160
        params["clean_datasource"] = "true";
1050
        // Map jdbc_table_type (TOdbcTableType enum value) to string name
1051
        // for JdbcTypeHandlerFactory to select the correct type handler.
1052
        // This ensures the right validation query is used (e.g. Oracle: "SELECT 1 FROM dual").
1053
160
        if (request->has_jdbc_table_type()) {
1054
160
            std::string type_name;
1055
160
            switch (request->jdbc_table_type()) {
1056
109
            case 0:
1057
109
                type_name = "MYSQL";
1058
109
                break;
1059
19
            case 1:
1060
19
                type_name = "ORACLE";
1061
19
                break;
1062
12
            case 2:
1063
12
                type_name = "POSTGRESQL";
1064
12
                break;
1065
8
            case 3:
1066
8
                type_name = "SQLSERVER";
1067
8
                break;
1068
8
            case 6:
1069
8
                type_name = "CLICKHOUSE";
1070
8
                break;
1071
0
            case 7:
1072
0
                type_name = "SAP_HANA";
1073
0
                break;
1074
0
            case 8:
1075
0
                type_name = "TRINO";
1076
0
                break;
1077
0
            case 9:
1078
0
                type_name = "PRESTO";
1079
0
                break;
1080
2
            case 10:
1081
2
                type_name = "OCEANBASE";
1082
2
                break;
1083
0
            case 11:
1084
0
                type_name = "OCEANBASE_ORACLE";
1085
0
                break;
1086
2
            case 13:
1087
2
                type_name = "DB2";
1088
2
                break;
1089
0
            case 14:
1090
0
                type_name = "GBASE";
1091
0
                break;
1092
0
            default:
1093
0
                break;
1094
160
            }
1095
160
            if (!type_name.empty()) {
1096
160
                params["table_type"] = type_name;
1097
160
            }
1098
160
        }
1099
        // required_fields and columns_types are required by JniReader
1100
160
        params["required_fields"] = "result";
1101
160
        params["columns_types"] = "int";
1102
1103
        // Use JniReader to create JdbcConnectionTester, which tests
1104
        // the connection in its open() method.
1105
160
        auto jni_reader =
1106
160
                std::make_unique<JniReader>("org/apache/doris/jdbc/JdbcConnectionTester", params);
1107
160
        st = jni_reader->open(nullptr, nullptr);
1108
160
        st.to_protobuf(result->mutable_status());
1109
1110
160
        Status close_st = jni_reader->close();
1111
160
        if (!close_st.ok()) {
1112
0
            LOG(WARNING) << "Failed to close JDBC connection tester: " << close_st.msg();
1113
0
        }
1114
160
    });
1115
1116
160
    if (!ret) {
1117
0
        offer_failed(result, done, _heavy_work_pool);
1118
0
        return;
1119
0
    }
1120
160
}
1121
1122
void PInternalServiceImpl::get_column_ids_by_tablet_ids(google::protobuf::RpcController* controller,
1123
                                                        const PFetchColIdsRequest* request,
1124
                                                        PFetchColIdsResponse* response,
1125
0
                                                        google::protobuf::Closure* done) {
1126
0
    bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
1127
0
        _get_column_ids_by_tablet_ids(controller, request, response, done);
1128
0
    });
1129
0
    if (!ret) {
1130
0
        offer_failed(response, done, _light_work_pool);
1131
0
        return;
1132
0
    }
1133
0
}
1134
1135
void PInternalServiceImpl::_get_column_ids_by_tablet_ids(
1136
        google::protobuf::RpcController* controller, const PFetchColIdsRequest* request,
1137
0
        PFetchColIdsResponse* response, google::protobuf::Closure* done) {
1138
0
    brpc::ClosureGuard guard(done);
1139
0
    [[maybe_unused]] auto* cntl = static_cast<brpc::Controller*>(controller);
1140
0
    TabletManager* tablet_mgr = _engine.tablet_manager();
1141
0
    const auto& params = request->params();
1142
0
    for (const auto& param : params) {
1143
0
        int64_t index_id = param.indexid();
1144
0
        const auto& tablet_ids = param.tablet_ids();
1145
0
        std::set<std::set<int32_t>> filter_set;
1146
0
        std::map<int32_t, const TabletColumn*> id_to_column;
1147
0
        for (const int64_t tablet_id : tablet_ids) {
1148
0
            TabletSharedPtr tablet = tablet_mgr->get_tablet(tablet_id);
1149
0
            if (tablet == nullptr) {
1150
0
                std::stringstream ss;
1151
0
                ss << "cannot get tablet by id:" << tablet_id;
1152
0
                LOG(WARNING) << ss.str();
1153
0
                response->mutable_status()->set_status_code(TStatusCode::ILLEGAL_STATE);
1154
0
                response->mutable_status()->add_error_msgs(ss.str());
1155
0
                return;
1156
0
            }
1157
            // check schema consistency, column ids should be the same
1158
0
            const auto& columns = tablet->tablet_schema()->columns();
1159
1160
0
            std::set<int32_t> column_ids;
1161
0
            for (const auto& col : columns) {
1162
0
                column_ids.insert(col->unique_id());
1163
0
            }
1164
0
            filter_set.insert(std::move(column_ids));
1165
1166
0
            if (id_to_column.empty()) {
1167
0
                for (const auto& col : columns) {
1168
0
                    id_to_column.insert(std::pair {col->unique_id(), col.get()});
1169
0
                }
1170
0
            } else {
1171
0
                for (const auto& col : columns) {
1172
0
                    auto it = id_to_column.find(col->unique_id());
1173
0
                    if (it == id_to_column.end() || *(it->second) != *col) {
1174
0
                        ColumnPB prev_col_pb;
1175
0
                        ColumnPB curr_col_pb;
1176
0
                        if (it != id_to_column.end()) {
1177
0
                            it->second->to_schema_pb(&prev_col_pb);
1178
0
                        }
1179
0
                        col->to_schema_pb(&curr_col_pb);
1180
0
                        std::stringstream ss;
1181
0
                        ss << "consistency check failed: index{ " << index_id << " }"
1182
0
                           << " got inconsistent schema, prev column: " << prev_col_pb.DebugString()
1183
0
                           << " current column: " << curr_col_pb.DebugString();
1184
0
                        LOG(WARNING) << ss.str();
1185
0
                        response->mutable_status()->set_status_code(TStatusCode::ILLEGAL_STATE);
1186
0
                        response->mutable_status()->add_error_msgs(ss.str());
1187
0
                        return;
1188
0
                    }
1189
0
                }
1190
0
            }
1191
0
        }
1192
1193
0
        if (filter_set.size() > 1) {
1194
            // consistecy check failed
1195
0
            std::stringstream ss;
1196
0
            ss << "consistency check failed: index{" << index_id << "}"
1197
0
               << "got inconsistent schema";
1198
0
            LOG(WARNING) << ss.str();
1199
0
            response->mutable_status()->set_status_code(TStatusCode::ILLEGAL_STATE);
1200
0
            response->mutable_status()->add_error_msgs(ss.str());
1201
0
            return;
1202
0
        }
1203
        // consistency check passed, use the first tablet to be the representative
1204
0
        TabletSharedPtr tablet = tablet_mgr->get_tablet(tablet_ids[0]);
1205
0
        const auto& columns = tablet->tablet_schema()->columns();
1206
0
        auto entry = response->add_entries();
1207
0
        entry->set_index_id(index_id);
1208
0
        auto col_name_to_id = entry->mutable_col_name_to_id();
1209
0
        for (const auto& column : columns) {
1210
0
            (*col_name_to_id)[column->name()] = column->unique_id();
1211
0
        }
1212
0
    }
1213
0
    response->mutable_status()->set_status_code(TStatusCode::OK);
1214
0
}
1215
1216
template <class RPCResponse>
1217
struct AsyncRPCContext {
1218
    RPCResponse response;
1219
    brpc::Controller cntl;
1220
    brpc::CallId cid;
1221
};
1222
1223
void PInternalService::fetch_remote_tablet_schema(google::protobuf::RpcController* controller,
1224
                                                  const PFetchRemoteSchemaRequest* request,
1225
                                                  PFetchRemoteSchemaResponse* response,
1226
212
                                                  google::protobuf::Closure* done) {
1227
212
    bool ret = _heavy_work_pool.try_offer([request, response, done]() {
1228
212
        brpc::ClosureGuard closure_guard(done);
1229
212
        Status st = Status::OK();
1230
212
        std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
1231
212
                MemTrackerLimiter::Type::OTHER,
1232
212
                fmt::format("InternalService::fetch_remote_tablet_schema"));
1233
212
        SCOPED_ATTACH_TASK(mem_tracker);
1234
212
        if (request->is_coordinator()) {
1235
            // Spawn rpc request to none coordinator nodes, and finally merge them all
1236
106
            PFetchRemoteSchemaRequest remote_request(*request);
1237
            // set it none coordinator to get merged schema
1238
106
            remote_request.set_is_coordinator(false);
1239
106
            using PFetchRemoteTabletSchemaRpcContext = AsyncRPCContext<PFetchRemoteSchemaResponse>;
1240
106
            std::vector<PFetchRemoteTabletSchemaRpcContext> rpc_contexts(
1241
106
                    request->tablet_location_size());
1242
212
            for (int i = 0; i < request->tablet_location_size(); ++i) {
1243
106
                std::string host = request->tablet_location(i).host();
1244
106
                int32_t brpc_port = request->tablet_location(i).brpc_port();
1245
106
                std::shared_ptr<PBackendService_Stub> stub(
1246
106
                        ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
1247
106
                                host, brpc_port));
1248
106
                if (stub == nullptr) {
1249
0
                    LOG(WARNING) << "Failed to init rpc to " << host << ":" << brpc_port;
1250
0
                    st = Status::InternalError("Failed to init rpc to {}:{}", host, brpc_port);
1251
0
                    continue;
1252
0
                }
1253
106
                rpc_contexts[i].cid = rpc_contexts[i].cntl.call_id();
1254
106
                rpc_contexts[i].cntl.set_timeout_ms(config::fetch_remote_schema_rpc_timeout_ms);
1255
106
                stub->fetch_remote_tablet_schema(&rpc_contexts[i].cntl, &remote_request,
1256
106
                                                 &rpc_contexts[i].response, brpc::DoNothing());
1257
106
            }
1258
106
            std::vector<TabletSchemaSPtr> schemas;
1259
106
            for (auto& rpc_context : rpc_contexts) {
1260
106
                brpc::Join(rpc_context.cid);
1261
106
                if (!st.ok()) {
1262
                    // make sure all flying rpc request is joined
1263
0
                    continue;
1264
0
                }
1265
106
                if (rpc_context.cntl.Failed()) {
1266
0
                    LOG(WARNING) << "fetch_remote_tablet_schema rpc err:"
1267
0
                                 << rpc_context.cntl.ErrorText();
1268
0
                    ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
1269
0
                            rpc_context.cntl.remote_side());
1270
0
                    st = Status::InternalError("fetch_remote_tablet_schema rpc err: {}",
1271
0
                                               rpc_context.cntl.ErrorText());
1272
0
                }
1273
106
                if (rpc_context.response.status().status_code() != 0) {
1274
0
                    st = Status::create(rpc_context.response.status());
1275
0
                }
1276
106
                if (rpc_context.response.has_merged_schema()) {
1277
106
                    TabletSchemaSPtr schema = std::make_shared<TabletSchema>();
1278
106
                    schema->init_from_pb(rpc_context.response.merged_schema());
1279
106
                    schemas.push_back(schema);
1280
106
                }
1281
106
            }
1282
106
            if (!schemas.empty() && st.ok()) {
1283
                // merge all
1284
106
                TabletSchemaSPtr merged_schema;
1285
106
                st = variant_util::get_least_common_schema(schemas, nullptr, merged_schema);
1286
106
                if (!st.ok()) {
1287
0
                    LOG(WARNING) << "Failed to get least common schema: " << st.to_string();
1288
0
                    st = Status::InternalError("Failed to get least common schema: {}",
1289
0
                                               st.to_string());
1290
0
                }
1291
106
                VLOG_DEBUG << "dump schema:" << merged_schema->dump_structure();
1292
106
                merged_schema->reserve_extracted_columns();
1293
106
                merged_schema->to_schema_pb(response->mutable_merged_schema());
1294
106
            }
1295
106
            st.to_protobuf(response->mutable_status());
1296
106
            return;
1297
106
        } else {
1298
            // This is not a coordinator, get it's tablet and merge schema
1299
106
            std::vector<int64_t> target_tablets;
1300
106
            for (int i = 0; i < request->tablet_location_size(); ++i) {
1301
106
                const auto& location = request->tablet_location(i);
1302
106
                auto backend = BackendOptions::get_local_backend();
1303
                // If this is the target backend
1304
106
                if (backend.host == location.host() && config::brpc_port == location.brpc_port()) {
1305
106
                    target_tablets.assign(location.tablet_id().begin(), location.tablet_id().end());
1306
106
                    break;
1307
106
                }
1308
106
            }
1309
106
            if (!target_tablets.empty()) {
1310
106
                std::vector<TabletSchemaSPtr> tablet_schemas;
1311
1.41k
                for (int64_t tablet_id : target_tablets) {
1312
1.41k
                    auto res = ExecEnv::get_tablet(tablet_id);
1313
1.41k
                    if (!res.has_value()) {
1314
                        // just ignore
1315
0
                        LOG(WARNING) << "tablet does not exist, tablet id is " << tablet_id;
1316
0
                        continue;
1317
0
                    }
1318
1.41k
                    auto tablet = res.value();
1319
1.41k
                    auto rowsets = tablet->get_snapshot_rowset();
1320
1.41k
                    auto schema =
1321
1.41k
                            variant_util::VariantCompactionUtil::calculate_variant_extended_schema(
1322
1.41k
                                    rowsets, tablet->tablet_schema());
1323
1.41k
                    tablet_schemas.push_back(schema);
1324
1.41k
                }
1325
106
                if (!tablet_schemas.empty()) {
1326
                    // merge all
1327
106
                    TabletSchemaSPtr merged_schema;
1328
106
                    st = variant_util::get_least_common_schema(tablet_schemas, nullptr,
1329
106
                                                               merged_schema);
1330
106
                    if (!st.ok()) {
1331
0
                        LOG(WARNING) << "Failed to get least common schema: " << st.to_string();
1332
0
                        st = Status::InternalError("Failed to get least common schema: {}",
1333
0
                                                   st.to_string());
1334
0
                    }
1335
106
                    merged_schema->to_schema_pb(response->mutable_merged_schema());
1336
106
                    VLOG_DEBUG << "dump schema:" << merged_schema->dump_structure();
1337
106
                }
1338
106
            }
1339
106
            st.to_protobuf(response->mutable_status());
1340
106
        }
1341
212
    });
1342
212
    if (!ret) {
1343
0
        offer_failed(response, done, _heavy_work_pool);
1344
0
    }
1345
212
}
1346
1347
void PInternalService::report_stream_load_status(google::protobuf::RpcController* controller,
1348
                                                 const PReportStreamLoadStatusRequest* request,
1349
                                                 PReportStreamLoadStatusResponse* response,
1350
0
                                                 google::protobuf::Closure* done) {
1351
0
    TUniqueId load_id;
1352
0
    load_id.__set_hi(request->load_id().hi());
1353
0
    load_id.__set_lo(request->load_id().lo());
1354
0
    Status st = Status::OK();
1355
0
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id);
1356
0
    if (!stream_load_ctx) {
1357
0
        st = Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
1358
0
    }
1359
0
    stream_load_ctx->load_status_promise.set_value(st);
1360
0
    st.to_protobuf(response->mutable_status());
1361
0
}
1362
1363
void PInternalService::get_info(google::protobuf::RpcController* controller,
1364
                                const PProxyRequest* request, PProxyResult* response,
1365
1.41k
                                google::protobuf::Closure* done) {
1366
1.41k
    bool ret = _exec_env->routine_load_task_executor()->get_thread_pool().submit_func([this,
1367
1.41k
                                                                                       request,
1368
1.41k
                                                                                       response,
1369
1.41k
                                                                                       done]() {
1370
1.41k
        brpc::ClosureGuard closure_guard(done);
1371
        // PProxyRequest is defined in gensrc/proto/internal_service.proto
1372
        // Currently it supports 2 kinds of requests:
1373
        // 1. get all kafka partition ids for given topic
1374
        // 2. get all kafka partition offsets for given topic and timestamp.
1375
1.41k
        int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() * 1000 : 60 * 1000;
1376
1.41k
        if (request->has_kafka_meta_request()) {
1377
1.41k
            const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request();
1378
1.41k
            if (!kafka_request.offset_flags().empty()) {
1379
66
                std::vector<PIntegerPair> partition_offsets;
1380
66
                Status st = _exec_env->routine_load_task_executor()
1381
66
                                    ->get_kafka_real_offsets_for_partitions(
1382
66
                                            request->kafka_meta_request(), &partition_offsets,
1383
66
                                            timeout_ms);
1384
66
                if (st.ok()) {
1385
66
                    PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets();
1386
66
                    for (const auto& entry : partition_offsets) {
1387
66
                        PIntegerPair* res = part_offsets->add_offset_times();
1388
66
                        res->set_key(entry.key());
1389
66
                        res->set_val(entry.val());
1390
66
                    }
1391
66
                }
1392
66
                st.to_protobuf(response->mutable_status());
1393
66
                return;
1394
1.34k
            } else if (!kafka_request.partition_id_for_latest_offsets().empty()) {
1395
                // get latest offsets for specified partition ids
1396
1.27k
                std::vector<PIntegerPair> partition_offsets;
1397
1.27k
                Status st = _exec_env->routine_load_task_executor()
1398
1.27k
                                    ->get_kafka_latest_offsets_for_partitions(
1399
1.27k
                                            request->kafka_meta_request(), &partition_offsets,
1400
1.27k
                                            timeout_ms);
1401
1.27k
                if (st.ok()) {
1402
1.27k
                    PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets();
1403
1.27k
                    for (const auto& entry : partition_offsets) {
1404
1.27k
                        PIntegerPair* res = part_offsets->add_offset_times();
1405
1.27k
                        res->set_key(entry.key());
1406
1.27k
                        res->set_val(entry.val());
1407
1.27k
                    }
1408
1.27k
                }
1409
1.27k
                st.to_protobuf(response->mutable_status());
1410
1.27k
                return;
1411
1.27k
            } else if (!kafka_request.offset_times().empty()) {
1412
                // if offset_times() has elements, which means this request is to get offset by timestamp.
1413
1
                std::vector<PIntegerPair> partition_offsets;
1414
1
                Status st = _exec_env->routine_load_task_executor()
1415
1
                                    ->get_kafka_partition_offsets_for_times(
1416
1
                                            request->kafka_meta_request(), &partition_offsets,
1417
1
                                            timeout_ms);
1418
1
                if (st.ok()) {
1419
1
                    PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets();
1420
1
                    for (const auto& entry : partition_offsets) {
1421
1
                        PIntegerPair* res = part_offsets->add_offset_times();
1422
1
                        res->set_key(entry.key());
1423
1
                        res->set_val(entry.val());
1424
1
                    }
1425
1
                }
1426
1
                st.to_protobuf(response->mutable_status());
1427
1
                return;
1428
78
            } else {
1429
                // get partition ids of topic
1430
78
                std::vector<int32_t> partition_ids;
1431
78
                Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta(
1432
78
                        request->kafka_meta_request(), &partition_ids);
1433
78
                if (st.ok()) {
1434
75
                    PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result();
1435
75
                    for (int32_t id : partition_ids) {
1436
75
                        kafka_result->add_partition_ids(id);
1437
75
                    }
1438
75
                }
1439
78
                st.to_protobuf(response->mutable_status());
1440
78
                return;
1441
78
            }
1442
1.41k
        }
1443
0
        if (request->has_kinesis_meta_request()) {
1444
0
            std::vector<std::string> shard_ids;
1445
0
            Status st = _exec_env->routine_load_task_executor()->get_kinesis_shard_meta(
1446
0
                    request->kinesis_meta_request(), &shard_ids);
1447
0
            if (st.ok()) {
1448
0
                PKinesisMetaProxyResult* kinesis_result = response->mutable_kinesis_meta_result();
1449
0
                for (const auto& shard_id : shard_ids) {
1450
0
                    kinesis_result->add_shard_ids(shard_id);
1451
0
                }
1452
0
            }
1453
0
            st.to_protobuf(response->mutable_status());
1454
0
            return;
1455
0
        }
1456
0
        Status::OK().to_protobuf(response->mutable_status());
1457
0
    });
1458
1.41k
    if (!ret) {
1459
0
        offer_failed(response, done, _heavy_work_pool);
1460
0
        return;
1461
0
    }
1462
1.41k
}
1463
1464
void PInternalService::update_cache(google::protobuf::RpcController* controller,
1465
                                    const PUpdateCacheRequest* request, PCacheResponse* response,
1466
66.3k
                                    google::protobuf::Closure* done) {
1467
66.3k
    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
1468
66.3k
        brpc::ClosureGuard closure_guard(done);
1469
66.3k
        _exec_env->result_cache()->update(request, response);
1470
66.3k
    });
1471
66.3k
    if (!ret) {
1472
0
        offer_failed(response, done, _light_work_pool);
1473
0
        return;
1474
0
    }
1475
66.3k
}
1476
1477
void PInternalService::fetch_cache(google::protobuf::RpcController* controller,
1478
                                   const PFetchCacheRequest* request, PFetchCacheResult* result,
1479
3.65k
                                   google::protobuf::Closure* done) {
1480
3.65k
    bool ret = _light_work_pool.try_offer([this, request, result, done]() {
1481
3.65k
        brpc::ClosureGuard closure_guard(done);
1482
3.65k
        _exec_env->result_cache()->fetch(request, result);
1483
3.65k
    });
1484
3.65k
    if (!ret) {
1485
0
        offer_failed(result, done, _light_work_pool);
1486
0
        return;
1487
0
    }
1488
3.65k
}
1489
1490
void PInternalService::clear_cache(google::protobuf::RpcController* controller,
1491
                                   const PClearCacheRequest* request, PCacheResponse* response,
1492
0
                                   google::protobuf::Closure* done) {
1493
0
    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
1494
0
        brpc::ClosureGuard closure_guard(done);
1495
0
        _exec_env->result_cache()->clear(request, response);
1496
0
    });
1497
0
    if (!ret) {
1498
0
        offer_failed(response, done, _light_work_pool);
1499
0
        return;
1500
0
    }
1501
0
}
1502
1503
void PInternalService::merge_filter(::google::protobuf::RpcController* controller,
1504
                                    const ::doris::PMergeFilterRequest* request,
1505
                                    ::doris::PMergeFilterResponse* response,
1506
3.71k
                                    ::google::protobuf::Closure* done) {
1507
3.72k
    bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
1508
3.72k
        signal::SignalTaskIdKeeper keeper(request->query_id());
1509
3.72k
        brpc::ClosureGuard closure_guard(done);
1510
3.72k
        auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
1511
3.72k
        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
1512
3.72k
        Status st;
1513
3.72k
        try {
1514
3.72k
            st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream);
1515
3.72k
        } catch (Exception& e) {
1516
0
            st = e.to_status();
1517
0
        }
1518
3.72k
        st.to_protobuf(response->mutable_status());
1519
3.71k
    });
1520
3.71k
    if (!ret) {
1521
0
        offer_failed(response, done, _light_work_pool);
1522
0
        return;
1523
0
    }
1524
3.71k
}
1525
1526
void PInternalService::send_filter_size(::google::protobuf::RpcController* controller,
1527
                                        const ::doris::PSendFilterSizeRequest* request,
1528
                                        ::doris::PSendFilterSizeResponse* response,
1529
266
                                        ::google::protobuf::Closure* done) {
1530
266
    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
1531
266
        signal::SignalTaskIdKeeper keeper(request->query_id());
1532
266
        brpc::ClosureGuard closure_guard(done);
1533
266
        Status st;
1534
266
        try {
1535
266
            st = _exec_env->fragment_mgr()->send_filter_size(request);
1536
266
        } catch (Exception& e) {
1537
0
            st = e.to_status();
1538
0
        }
1539
266
        st.to_protobuf(response->mutable_status());
1540
266
    });
1541
266
    if (!ret) {
1542
0
        offer_failed(response, done, _light_work_pool);
1543
0
        return;
1544
0
    }
1545
266
}
1546
1547
void PInternalService::sync_filter_size(::google::protobuf::RpcController* controller,
1548
                                        const ::doris::PSyncFilterSizeRequest* request,
1549
                                        ::doris::PSyncFilterSizeResponse* response,
1550
266
                                        ::google::protobuf::Closure* done) {
1551
266
    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
1552
266
        signal::SignalTaskIdKeeper keeper(request->query_id());
1553
266
        brpc::ClosureGuard closure_guard(done);
1554
266
        Status st;
1555
266
        try {
1556
266
            st = _exec_env->fragment_mgr()->sync_filter_size(request);
1557
266
        } catch (Exception& e) {
1558
0
            st = e.to_status();
1559
0
        }
1560
266
        st.to_protobuf(response->mutable_status());
1561
266
    });
1562
266
    if (!ret) {
1563
0
        offer_failed(response, done, _light_work_pool);
1564
0
        return;
1565
0
    }
1566
266
}
1567
1568
void PInternalService::apply_filterv2(::google::protobuf::RpcController* controller,
1569
                                      const ::doris::PPublishFilterRequestV2* request,
1570
                                      ::doris::PPublishFilterResponse* response,
1571
1.97k
                                      ::google::protobuf::Closure* done) {
1572
1.97k
    bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
1573
42
        signal::SignalTaskIdKeeper keeper(request->query_id());
1574
42
        brpc::ClosureGuard closure_guard(done);
1575
42
        const butil::IOBuf& request_attachment =
1576
42
                static_cast<brpc::Controller*>(controller)->request_attachment();
1577
42
        butil::IOBuf apply_attachment = request_attachment;
1578
42
        butil::IOBuf forward_attachment = request_attachment;
1579
42
        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(apply_attachment);
1580
42
        VLOG_NOTICE << "rpc apply_filterv2 recv";
1581
42
        Status st;
1582
42
        try {
1583
42
            st = _exec_env->fragment_mgr()->apply_filterv2(request, &zero_copy_input_stream);
1584
42
        } catch (Exception& e) {
1585
0
            st = e.to_status();
1586
0
        }
1587
42
        if (!st.ok()) {
1588
0
            LOG(WARNING) << "apply filter meet error: " << st.to_string();
1589
0
        }
1590
42
        std::weak_ptr<QueryContext> forward_ctx;
1591
42
        if (auto query_ctx = _exec_env->fragment_mgr()->get_query_ctx(
1592
42
                    UniqueId(request->query_id()).to_thrift())) {
1593
42
            if (!query_ctx->ignore_runtime_filter_error()) {
1594
42
                forward_ctx = query_ctx;
1595
42
            }
1596
42
        }
1597
42
        Status forward_st = forward_runtime_filter(*request, forward_attachment, forward_ctx);
1598
42
        if (!forward_st.ok()) {
1599
0
            LOG(WARNING) << "forward runtime filter meet error: " << forward_st.to_string();
1600
0
            if (st.ok()) {
1601
0
                st = std::move(forward_st);
1602
0
            }
1603
0
        }
1604
42
        st.to_protobuf(response->mutable_status());
1605
42
    });
1606
1.97k
    if (!ret) {
1607
0
        offer_failed(response, done, _light_work_pool);
1608
0
        return;
1609
0
    }
1610
1.97k
}
1611
1612
void PInternalService::send_data(google::protobuf::RpcController* controller,
1613
                                 const PSendDataRequest* request, PSendDataResult* response,
1614
44
                                 google::protobuf::Closure* done) {
1615
44
    bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
1616
44
        brpc::ClosureGuard closure_guard(done);
1617
44
        TUniqueId load_id;
1618
44
        load_id.hi = request->load_id().hi();
1619
44
        load_id.lo = request->load_id().lo();
1620
        // On 1.2.3 we add load id to send data request and using load id to get pipe
1621
44
        auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id);
1622
44
        if (stream_load_ctx == nullptr) {
1623
0
            response->mutable_status()->set_status_code(1);
1624
0
            response->mutable_status()->add_error_msgs("could not find stream load context");
1625
44
        } else {
1626
44
            auto pipe = stream_load_ctx->pipe;
1627
157
            for (int i = 0; i < request->data_size(); ++i) {
1628
113
                std::unique_ptr<PDataRow> row(new PDataRow());
1629
113
                row->CopyFrom(request->data(i));
1630
113
                Status s = pipe->append(std::move(row));
1631
113
                if (!s.ok()) {
1632
0
                    response->mutable_status()->set_status_code(1);
1633
0
                    response->mutable_status()->add_error_msgs(s.to_string());
1634
0
                    return;
1635
0
                }
1636
113
            }
1637
44
            response->mutable_status()->set_status_code(0);
1638
44
        }
1639
44
    });
1640
44
    if (!ret) {
1641
0
        offer_failed(response, done, _heavy_work_pool);
1642
0
        return;
1643
0
    }
1644
44
}
1645
1646
void PInternalService::commit(google::protobuf::RpcController* controller,
1647
                              const PCommitRequest* request, PCommitResult* response,
1648
44
                              google::protobuf::Closure* done) {
1649
44
    bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
1650
44
        brpc::ClosureGuard closure_guard(done);
1651
44
        TUniqueId load_id;
1652
44
        load_id.hi = request->load_id().hi();
1653
44
        load_id.lo = request->load_id().lo();
1654
1655
44
        auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id);
1656
44
        if (stream_load_ctx == nullptr) {
1657
0
            response->mutable_status()->set_status_code(1);
1658
0
            response->mutable_status()->add_error_msgs("could not find stream load context");
1659
44
        } else {
1660
44
            static_cast<void>(stream_load_ctx->pipe->finish());
1661
44
            response->mutable_status()->set_status_code(0);
1662
44
        }
1663
44
    });
1664
44
    if (!ret) {
1665
0
        offer_failed(response, done, _heavy_work_pool);
1666
0
        return;
1667
0
    }
1668
44
}
1669
1670
void PInternalService::rollback(google::protobuf::RpcController* controller,
1671
                                const PRollbackRequest* request, PRollbackResult* response,
1672
5
                                google::protobuf::Closure* done) {
1673
5
    bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
1674
5
        brpc::ClosureGuard closure_guard(done);
1675
5
        TUniqueId load_id;
1676
5
        load_id.hi = request->load_id().hi();
1677
5
        load_id.lo = request->load_id().lo();
1678
5
        auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id);
1679
5
        if (stream_load_ctx == nullptr) {
1680
0
            response->mutable_status()->set_status_code(1);
1681
0
            response->mutable_status()->add_error_msgs("could not find stream load context");
1682
5
        } else {
1683
5
            stream_load_ctx->pipe->cancel("rollback");
1684
5
            response->mutable_status()->set_status_code(0);
1685
5
        }
1686
5
    });
1687
5
    if (!ret) {
1688
0
        offer_failed(response, done, _heavy_work_pool);
1689
0
        return;
1690
0
    }
1691
5
}
1692
1693
void PInternalService::fold_constant_expr(google::protobuf::RpcController* controller,
1694
                                          const PConstantExprRequest* request,
1695
                                          PConstantExprResult* response,
1696
568
                                          google::protobuf::Closure* done) {
1697
568
    bool ret = _light_work_pool.try_offer([request, response, done]() {
1698
568
        brpc::ClosureGuard closure_guard(done);
1699
568
        TFoldConstantParams t_request;
1700
568
        Status st = Status::OK();
1701
568
        {
1702
568
            const uint8_t* buf = (const uint8_t*)request->request().data();
1703
568
            uint32_t len = request->request().size();
1704
568
            st = deserialize_thrift_msg(buf, &len, false, &t_request);
1705
568
        }
1706
568
        if (!st.ok()) {
1707
0
            LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st
1708
0
                         << " .and query_id_is: " << t_request.query_id;
1709
0
            st.to_protobuf(response->mutable_status());
1710
0
            return;
1711
0
        }
1712
568
        auto fold_func = [&]() -> Status {
1713
568
            std::unique_ptr<FoldConstantExecutor> fold_executor =
1714
568
                    std::make_unique<FoldConstantExecutor>();
1715
568
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
1716
568
                    fold_executor->fold_constant_vexpr(t_request, response));
1717
530
            return Status::OK();
1718
568
        };
1719
568
        st = fold_func();
1720
568
        if (!st.ok()) {
1721
38
            LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st
1722
38
                         << " .and query_id_is: " << t_request.query_id;
1723
38
        }
1724
568
        st.to_protobuf(response->mutable_status());
1725
568
    });
1726
568
    if (!ret) {
1727
0
        offer_failed(response, done, _light_work_pool);
1728
0
        return;
1729
0
    }
1730
568
}
1731
1732
void PInternalService::transmit_rec_cte_block(google::protobuf::RpcController* controller,
1733
                                              const PTransmitRecCTEBlockParams* request,
1734
                                              PTransmitRecCTEBlockResult* response,
1735
4.22k
                                              google::protobuf::Closure* done) {
1736
4.22k
    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
1737
4.22k
        brpc::ClosureGuard closure_guard(done);
1738
4.22k
        auto st = _exec_env->fragment_mgr()->transmit_rec_cte_block(
1739
4.22k
                UniqueId(request->query_id()).to_thrift(),
1740
4.22k
                UniqueId(request->fragment_instance_id()).to_thrift(), request->node_id(),
1741
4.22k
                request->blocks(), request->eos());
1742
4.22k
        st.to_protobuf(response->mutable_status());
1743
4.22k
    });
1744
4.22k
    if (!ret) {
1745
0
        offer_failed(response, done, _light_work_pool);
1746
0
        return;
1747
0
    }
1748
4.22k
}
1749
1750
void PInternalService::rerun_fragment(google::protobuf::RpcController* controller,
1751
                                      const PRerunFragmentParams* request,
1752
                                      PRerunFragmentResult* response,
1753
10.7k
                                      google::protobuf::Closure* done) {
1754
10.7k
    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
1755
        // Use shared_ptr<ClosureGuard> so we can transfer ownership to the PFC.
1756
        // For wait_for_destroy/final_close, the guard is stored in the PFC and the RPC
1757
        // response is deferred until the PFC is fully destroyed. For rebuild/submit,
1758
        // the guard fires immediately when this lambda returns.
1759
10.7k
        std::shared_ptr<brpc::ClosureGuard> closure_guard =
1760
10.7k
                std::make_shared<brpc::ClosureGuard>(done);
1761
10.7k
        auto st = _exec_env->fragment_mgr()->rerun_fragment(
1762
10.7k
                closure_guard, UniqueId(request->query_id()).to_thrift(), request->fragment_id(),
1763
10.7k
                request->stage());
1764
10.7k
        st.to_protobuf(response->mutable_status());
1765
10.7k
    });
1766
10.7k
    if (!ret) {
1767
0
        offer_failed(response, done, _light_work_pool);
1768
0
        return;
1769
0
    }
1770
10.7k
}
1771
1772
void PInternalService::reset_global_rf(google::protobuf::RpcController* controller,
1773
                                       const PResetGlobalRfParams* request,
1774
                                       PResetGlobalRfResult* response,
1775
2.04k
                                       google::protobuf::Closure* done) {
1776
2.04k
    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
1777
2.04k
        brpc::ClosureGuard closure_guard(done);
1778
2.04k
        auto st = _exec_env->fragment_mgr()->reset_global_rf(
1779
2.04k
                UniqueId(request->query_id()).to_thrift(), request->filter_ids());
1780
2.04k
        st.to_protobuf(response->mutable_status());
1781
2.04k
    });
1782
2.04k
    if (!ret) {
1783
0
        offer_failed(response, done, _light_work_pool);
1784
0
        return;
1785
0
    }
1786
2.04k
}
1787
1788
void PInternalService::transmit_block(google::protobuf::RpcController* controller,
1789
                                      const PTransmitDataParams* request,
1790
                                      PTransmitDataResult* response,
1791
1.31M
                                      google::protobuf::Closure* done) {
1792
1.31M
    int64_t receive_time = GetCurrentTimeNanos();
1793
1.32M
    if (config::enable_bthread_transmit_block) {
1794
1.32M
        response->set_receive_time(receive_time);
1795
        // under high concurrency, thread pool will have a lot of lock contention.
1796
        // May offer failed to the thread pool, so that we should avoid using thread
1797
        // pool here.
1798
1.32M
        _transmit_block(controller, request, response, done, Status::OK(), 0);
1799
18.4E
    } else {
1800
18.4E
        bool ret = _light_work_pool.try_offer([this, controller, request, response, done,
1801
18.4E
                                               receive_time]() {
1802
0
            response->set_receive_time(receive_time);
1803
            // Sometimes transmit block function is the last owner of PlanFragmentExecutor
1804
            // It will release the object. And the object maybe a JNIContext.
1805
            // JNIContext will hold some TLS object. It could not work correctly under bthread
1806
            // Context. So that put the logic into pthread.
1807
            // But this is rarely happens, so this config is disabled by default.
1808
0
            _transmit_block(controller, request, response, done, Status::OK(),
1809
0
                            GetCurrentTimeNanos() - receive_time);
1810
0
        });
1811
18.4E
        if (!ret) {
1812
0
            offer_failed(response, done, _light_work_pool);
1813
0
            return;
1814
0
        }
1815
18.4E
    }
1816
1.31M
}
1817
1818
void PInternalService::transmit_block_by_http(google::protobuf::RpcController* controller,
1819
                                              const PEmptyRequest* request,
1820
                                              PTransmitDataResult* response,
1821
0
                                              google::protobuf::Closure* done) {
1822
0
    int64_t receive_time = GetCurrentTimeNanos();
1823
0
    bool ret = _heavy_work_pool.try_offer([this, controller, response, done, receive_time]() {
1824
0
        PTransmitDataParams* new_request = new PTransmitDataParams();
1825
0
        google::protobuf::Closure* new_done =
1826
0
                new NewHttpClosure<PTransmitDataParams>(new_request, done);
1827
0
        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
1828
0
        Status st =
1829
0
                attachment_extract_request_contain_block<PTransmitDataParams>(new_request, cntl);
1830
0
        _transmit_block(controller, new_request, response, new_done, st,
1831
0
                        GetCurrentTimeNanos() - receive_time);
1832
0
    });
1833
0
    if (!ret) {
1834
0
        offer_failed(response, done, _heavy_work_pool);
1835
0
        return;
1836
0
    }
1837
0
}
1838
1839
void PInternalService::_transmit_block(google::protobuf::RpcController* controller,
1840
                                       const PTransmitDataParams* request,
1841
                                       PTransmitDataResult* response,
1842
                                       google::protobuf::Closure* done, const Status& extract_st,
1843
1.30M
                                       const int64_t wait_for_worker) {
1844
1.31M
    if (request->has_query_id()) {
1845
18.4E
        VLOG_ROW << "transmit block: fragment_instance_id=" << print_id(request->finst_id())
1846
18.4E
                 << " query_id=" << print_id(request->query_id()) << " node=" << request->node_id();
1847
1.31M
    }
1848
1849
    // The response is accessed when done->Run is called in transmit_block(),
1850
    // give response a default value to avoid null pointers in high concurrency.
1851
1.30M
    Status st;
1852
1.31M
    if (extract_st.ok()) {
1853
1.31M
        st = _exec_env->vstream_mgr()->transmit_block(request, &done, wait_for_worker);
1854
1.31M
        if (!st.ok() && !st.is<END_OF_FILE>()) {
1855
0
            LOG(WARNING) << "transmit_block failed, message=" << st
1856
0
                         << ", fragment_instance_id=" << print_id(request->finst_id())
1857
0
                         << ", node=" << request->node_id()
1858
0
                         << ", from sender_id: " << request->sender_id()
1859
0
                         << ", be_number: " << request->be_number()
1860
0
                         << ", packet_seq: " << request->packet_seq();
1861
0
        }
1862
18.4E
    } else {
1863
18.4E
        st = extract_st;
1864
18.4E
    }
1865
1.31M
    if (done != nullptr) {
1866
1.31M
        st.to_protobuf(response->mutable_status());
1867
1.31M
        done->Run();
1868
1.31M
    }
1869
1.30M
}
1870
1871
void PInternalService::check_rpc_channel(google::protobuf::RpcController* controller,
1872
                                         const PCheckRPCChannelRequest* request,
1873
                                         PCheckRPCChannelResponse* response,
1874
0
                                         google::protobuf::Closure* done) {
1875
0
    bool ret = _light_work_pool.try_offer([request, response, done]() {
1876
0
        brpc::ClosureGuard closure_guard(done);
1877
0
        response->mutable_status()->set_status_code(0);
1878
0
        if (request->data().size() != request->size()) {
1879
0
            std::stringstream ss;
1880
0
            ss << "data size not same, expected: " << request->size()
1881
0
               << ", actual: " << request->data().size();
1882
0
            response->mutable_status()->add_error_msgs(ss.str());
1883
0
            response->mutable_status()->set_status_code(1);
1884
1885
0
        } else {
1886
0
            Md5Digest digest;
1887
0
            digest.update(static_cast<const void*>(request->data().c_str()),
1888
0
                          request->data().size());
1889
0
            digest.digest();
1890
0
            if (!iequal(digest.hex(), request->md5())) {
1891
0
                std::stringstream ss;
1892
0
                ss << "md5 not same, expected: " << request->md5() << ", actual: " << digest.hex();
1893
0
                response->mutable_status()->add_error_msgs(ss.str());
1894
0
                response->mutable_status()->set_status_code(1);
1895
0
            }
1896
0
        }
1897
0
    });
1898
0
    if (!ret) {
1899
0
        offer_failed(response, done, _light_work_pool);
1900
0
        return;
1901
0
    }
1902
0
}
1903
1904
void PInternalService::reset_rpc_channel(google::protobuf::RpcController* controller,
1905
                                         const PResetRPCChannelRequest* request,
1906
                                         PResetRPCChannelResponse* response,
1907
0
                                         google::protobuf::Closure* done) {
1908
0
    bool ret = _light_work_pool.try_offer([request, response, done]() {
1909
0
        brpc::ClosureGuard closure_guard(done);
1910
0
        response->mutable_status()->set_status_code(0);
1911
0
        if (request->all()) {
1912
0
            int size = ExecEnv::GetInstance()->brpc_internal_client_cache()->size();
1913
0
            if (size > 0) {
1914
0
                std::vector<std::string> endpoints;
1915
0
                ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints);
1916
0
                ExecEnv::GetInstance()->brpc_internal_client_cache()->clear();
1917
0
                *response->mutable_channels() = {endpoints.begin(), endpoints.end()};
1918
0
            }
1919
0
        } else {
1920
0
            for (const std::string& endpoint : request->endpoints()) {
1921
0
                if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) {
1922
0
                    response->mutable_status()->add_error_msgs(endpoint + ": not found.");
1923
0
                    continue;
1924
0
                }
1925
1926
0
                if (ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) {
1927
0
                    response->add_channels(endpoint);
1928
0
                } else {
1929
0
                    response->mutable_status()->add_error_msgs(endpoint + ": reset failed.");
1930
0
                }
1931
0
            }
1932
0
            if (request->endpoints_size() != response->channels_size()) {
1933
0
                response->mutable_status()->set_status_code(1);
1934
0
            }
1935
0
        }
1936
0
    });
1937
0
    if (!ret) {
1938
0
        offer_failed(response, done, _light_work_pool);
1939
0
        return;
1940
0
    }
1941
0
}
1942
1943
void PInternalService::hand_shake(google::protobuf::RpcController* controller,
1944
                                  const PHandShakeRequest* request, PHandShakeResponse* response,
1945
3.10k
                                  google::protobuf::Closure* done) {
1946
    // The light pool may be full. Handshake is used to check the connection state of brpc.
1947
    // Should not be interfered by the thread pool logic.
1948
3.10k
    brpc::ClosureGuard closure_guard(done);
1949
3.10k
    if (request->has_hello()) {
1950
3.10k
        response->set_hello(request->hello());
1951
3.10k
    }
1952
3.10k
    response->mutable_status()->set_status_code(0);
1953
3.10k
}
1954
1955
constexpr char HttpProtocol[] = "http://";
1956
constexpr char DownloadApiPath[] = "/api/_tablet/_download?token=";
1957
constexpr char FileParam[] = "&file=";
1958
1959
static std::string construct_url(const std::string& host_port, const std::string& token,
1960
0
                                 const std::string& path) {
1961
0
    return fmt::format("{}{}{}{}{}{}", HttpProtocol, host_port, DownloadApiPath, token, FileParam,
1962
0
                       path);
1963
0
}
1964
1965
static Status download_file_action(std::string& remote_file_url, std::string& local_file_path,
1966
0
                                   uint64_t estimate_timeout, uint64_t file_size) {
1967
0
    auto download_cb = [remote_file_url, estimate_timeout, local_file_path,
1968
0
                        file_size](HttpClient* client) {
1969
0
        RETURN_IF_ERROR(client->init(remote_file_url));
1970
0
        client->set_timeout_ms(estimate_timeout * 1000);
1971
0
        RETURN_IF_ERROR(client->download(local_file_path));
1972
1973
0
        if (file_size > 0) {
1974
            // Check file length
1975
0
            uint64_t local_file_size = std::filesystem::file_size(local_file_path);
1976
0
            if (local_file_size != file_size) {
1977
0
                LOG(WARNING) << "failed to pull rowset for slave replica. download file "
1978
0
                                "length error"
1979
0
                             << ", remote_path=" << remote_file_url << ", file_size=" << file_size
1980
0
                             << ", local_file_size=" << local_file_size;
1981
0
                return Status::InternalError("downloaded file size is not equal");
1982
0
            }
1983
0
        }
1984
1985
0
        return io::global_local_filesystem()->permission(local_file_path,
1986
0
                                                         io::LocalFileSystem::PERMS_OWNER_RW);
1987
0
    };
1988
0
    return HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, download_cb);
1989
0
}
1990
1991
void PInternalServiceImpl::request_slave_tablet_pull_rowset(
1992
        google::protobuf::RpcController* controller, const PTabletWriteSlaveRequest* request,
1993
0
        PTabletWriteSlaveResult* response, google::protobuf::Closure* done) {
1994
0
    brpc::ClosureGuard closure_guard(done);
1995
0
    const RowsetMetaPB& rowset_meta_pb = request->rowset_meta();
1996
0
    const std::string& rowset_path = request->rowset_path();
1997
0
    google::protobuf::Map<int64_t, int64_t> segments_size = request->segments_size();
1998
0
    google::protobuf::Map<int64_t, PTabletWriteSlaveRequest_IndexSizeMap> indices_size =
1999
0
            request->inverted_indices_size();
2000
0
    std::string host = request->host();
2001
0
    int64_t http_port = request->http_port();
2002
0
    int64_t brpc_port = request->brpc_port();
2003
0
    std::string token = request->token();
2004
0
    int64_t node_id = request->node_id();
2005
0
    bool ret = _heavy_work_pool.try_offer([rowset_meta_pb, host, brpc_port, node_id, segments_size,
2006
0
                                           indices_size, http_port, token, rowset_path, this]() {
2007
0
        TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(
2008
0
                rowset_meta_pb.tablet_id(), rowset_meta_pb.tablet_schema_hash());
2009
0
        if (tablet == nullptr) {
2010
0
            LOG(WARNING) << "failed to pull rowset for slave replica. tablet ["
2011
0
                         << rowset_meta_pb.tablet_id()
2012
0
                         << "] is not exist. txn_id=" << rowset_meta_pb.txn_id();
2013
0
            _response_pull_slave_rowset(host, brpc_port, rowset_meta_pb.txn_id(),
2014
0
                                        rowset_meta_pb.tablet_id(), node_id, false);
2015
0
            return;
2016
0
        }
2017
2018
0
        RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
2019
0
        std::string rowset_meta_str;
2020
0
        bool ret = rowset_meta_pb.SerializeToString(&rowset_meta_str);
2021
0
        if (!ret) {
2022
0
            LOG(WARNING) << "failed to pull rowset for slave replica. serialize rowset meta "
2023
0
                            "failed. rowset_id="
2024
0
                         << rowset_meta_pb.rowset_id()
2025
0
                         << ", tablet_id=" << rowset_meta_pb.tablet_id()
2026
0
                         << ", txn_id=" << rowset_meta_pb.txn_id();
2027
0
            _response_pull_slave_rowset(host, brpc_port, rowset_meta_pb.txn_id(),
2028
0
                                        rowset_meta_pb.tablet_id(), node_id, false);
2029
0
            return;
2030
0
        }
2031
0
        bool parsed = rowset_meta->init(rowset_meta_str);
2032
0
        if (!parsed) {
2033
0
            LOG(WARNING) << "failed to pull rowset for slave replica. parse rowset meta string "
2034
0
                            "failed. rowset_id="
2035
0
                         << rowset_meta_pb.rowset_id()
2036
0
                         << ", tablet_id=" << rowset_meta_pb.tablet_id()
2037
0
                         << ", txn_id=" << rowset_meta_pb.txn_id();
2038
            // return false will break meta iterator, return true to skip this error
2039
0
            _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
2040
0
                                        rowset_meta->tablet_id(), node_id, false);
2041
0
            return;
2042
0
        }
2043
0
        RowsetId remote_rowset_id = rowset_meta->rowset_id();
2044
        // change rowset id because it maybe same as other local rowset
2045
0
        RowsetId new_rowset_id = _engine.next_rowset_id();
2046
0
        auto pending_rs_guard = _engine.pending_local_rowsets().add(new_rowset_id);
2047
0
        rowset_meta->set_rowset_id(new_rowset_id);
2048
0
        rowset_meta->set_tablet_uid(tablet->tablet_uid());
2049
0
        VLOG_CRITICAL << "succeed to init rowset meta for slave replica. rowset_id="
2050
0
                      << rowset_meta->rowset_id() << ", tablet_id=" << rowset_meta->tablet_id()
2051
0
                      << ", txn_id=" << rowset_meta->txn_id();
2052
2053
0
        auto tablet_scheme = rowset_meta->tablet_schema();
2054
0
        for (const auto& segment : segments_size) {
2055
0
            uint64_t file_size = segment.second;
2056
0
            uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024;
2057
0
            if (estimate_timeout < config::download_low_speed_time) {
2058
0
                estimate_timeout = config::download_low_speed_time;
2059
0
            }
2060
2061
0
            std::string remote_file_path =
2062
0
                    local_segment_path(rowset_path, remote_rowset_id.to_string(), segment.first);
2063
0
            std::string remote_file_url =
2064
0
                    construct_url(get_host_port(host, http_port), token, remote_file_path);
2065
2066
0
            std::string local_file_path = local_segment_path(
2067
0
                    tablet->tablet_path(), rowset_meta->rowset_id().to_string(), segment.first);
2068
2069
0
            auto st = download_file_action(remote_file_url, local_file_path, estimate_timeout,
2070
0
                                           file_size);
2071
0
            if (!st.ok()) {
2072
0
                LOG(WARNING) << "failed to pull rowset for slave replica. failed to download "
2073
0
                                "file. url="
2074
0
                             << remote_file_url << ", local_path=" << local_file_path
2075
0
                             << ", txn_id=" << rowset_meta->txn_id();
2076
0
                _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
2077
0
                                            rowset_meta->tablet_id(), node_id, false);
2078
0
                return;
2079
0
            }
2080
0
            VLOG_CRITICAL << "succeed to download file for slave replica. url=" << remote_file_url
2081
0
                          << ", local_path=" << local_file_path
2082
0
                          << ", txn_id=" << rowset_meta->txn_id();
2083
0
            if (indices_size.find(segment.first) != indices_size.end()) {
2084
0
                PTabletWriteSlaveRequest_IndexSizeMap segment_indices_size =
2085
0
                        indices_size.at(segment.first);
2086
2087
0
                for (auto index_size : segment_indices_size.index_sizes()) {
2088
0
                    auto index_id = index_size.indexid();
2089
0
                    auto size = index_size.size();
2090
0
                    auto suffix_path = index_size.suffix_path();
2091
0
                    std::string remote_inverted_index_file;
2092
0
                    std::string local_inverted_index_file;
2093
0
                    std::string remote_inverted_index_file_url;
2094
0
                    if (tablet_scheme->get_inverted_index_storage_format() ==
2095
0
                        InvertedIndexStorageFormatPB::V1) {
2096
0
                        remote_inverted_index_file =
2097
0
                                InvertedIndexDescriptor::get_index_file_path_v1(
2098
0
                                        InvertedIndexDescriptor::get_index_file_path_prefix(
2099
0
                                                remote_file_path),
2100
0
                                        index_id, suffix_path);
2101
0
                        remote_inverted_index_file_url = construct_url(
2102
0
                                get_host_port(host, http_port), token, remote_inverted_index_file);
2103
2104
0
                        local_inverted_index_file = InvertedIndexDescriptor::get_index_file_path_v1(
2105
0
                                InvertedIndexDescriptor::get_index_file_path_prefix(
2106
0
                                        local_file_path),
2107
0
                                index_id, suffix_path);
2108
0
                    } else {
2109
0
                        remote_inverted_index_file =
2110
0
                                InvertedIndexDescriptor::get_index_file_path_v2(
2111
0
                                        InvertedIndexDescriptor::get_index_file_path_prefix(
2112
0
                                                remote_file_path));
2113
0
                        remote_inverted_index_file_url = construct_url(
2114
0
                                get_host_port(host, http_port), token, remote_inverted_index_file);
2115
2116
0
                        local_inverted_index_file = InvertedIndexDescriptor::get_index_file_path_v2(
2117
0
                                InvertedIndexDescriptor::get_index_file_path_prefix(
2118
0
                                        local_file_path));
2119
0
                    }
2120
0
                    st = download_file_action(remote_inverted_index_file_url,
2121
0
                                              local_inverted_index_file, estimate_timeout, size);
2122
0
                    if (!st.ok()) {
2123
0
                        LOG(WARNING) << "failed to pull rowset for slave replica. failed to "
2124
0
                                        "download "
2125
0
                                        "file. url="
2126
0
                                     << remote_inverted_index_file_url
2127
0
                                     << ", local_path=" << local_inverted_index_file
2128
0
                                     << ", txn_id=" << rowset_meta->txn_id();
2129
0
                        _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
2130
0
                                                    rowset_meta->tablet_id(), node_id, false);
2131
0
                        return;
2132
0
                    }
2133
2134
0
                    VLOG_CRITICAL
2135
0
                            << "succeed to download inverted index file for slave replica. url="
2136
0
                            << remote_inverted_index_file_url
2137
0
                            << ", local_path=" << local_inverted_index_file
2138
0
                            << ", txn_id=" << rowset_meta->txn_id();
2139
0
                }
2140
0
            }
2141
0
        }
2142
2143
0
        RowsetSharedPtr rowset;
2144
0
        Status create_status = RowsetFactory::create_rowset(
2145
0
                tablet->tablet_schema(), tablet->tablet_path(), rowset_meta, &rowset);
2146
0
        if (!create_status) {
2147
0
            LOG(WARNING) << "failed to create rowset from rowset meta for slave replica"
2148
0
                         << ". rowset_id: " << rowset_meta->rowset_id()
2149
0
                         << ", rowset_type: " << rowset_meta->rowset_type()
2150
0
                         << ", rowset_state: " << rowset_meta->rowset_state()
2151
0
                         << ", tablet_id=" << rowset_meta->tablet_id()
2152
0
                         << ", txn_id=" << rowset_meta->txn_id();
2153
0
            _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
2154
0
                                        rowset_meta->tablet_id(), node_id, false);
2155
0
            return;
2156
0
        }
2157
0
        if (rowset_meta->rowset_state() != RowsetStatePB::COMMITTED) {
2158
0
            LOG(WARNING) << "could not commit txn for slave replica because master rowset state is "
2159
0
                            "not committed, rowset_state="
2160
0
                         << rowset_meta->rowset_state()
2161
0
                         << ", tablet_id=" << rowset_meta->tablet_id()
2162
0
                         << ", txn_id=" << rowset_meta->txn_id();
2163
0
            _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
2164
0
                                        rowset_meta->tablet_id(), node_id, false);
2165
0
            return;
2166
0
        }
2167
0
        Status commit_txn_status = _engine.txn_manager()->commit_txn(
2168
0
                tablet->data_dir()->get_meta(), rowset_meta->partition_id(), rowset_meta->txn_id(),
2169
0
                rowset_meta->tablet_id(), tablet->tablet_uid(), rowset_meta->load_id(), rowset,
2170
0
                std::move(pending_rs_guard), false);
2171
0
        if (!commit_txn_status && !commit_txn_status.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
2172
0
            LOG(WARNING) << "failed to add committed rowset for slave replica. rowset_id="
2173
0
                         << rowset_meta->rowset_id() << ", tablet_id=" << rowset_meta->tablet_id()
2174
0
                         << ", txn_id=" << rowset_meta->txn_id();
2175
0
            _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
2176
0
                                        rowset_meta->tablet_id(), node_id, false);
2177
0
            return;
2178
0
        }
2179
0
        VLOG_CRITICAL << "succeed to pull rowset for slave replica. successfully to add committed "
2180
0
                         "rowset: "
2181
0
                      << rowset_meta->rowset_id()
2182
0
                      << " to tablet, tablet_id=" << rowset_meta->tablet_id()
2183
0
                      << ", schema_hash=" << rowset_meta->tablet_schema_hash()
2184
0
                      << ", txn_id=" << rowset_meta->txn_id();
2185
0
        _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
2186
0
                                    rowset_meta->tablet_id(), node_id, true);
2187
0
    });
2188
0
    if (!ret) {
2189
0
        offer_failed(response, closure_guard.release(), _heavy_work_pool);
2190
0
        return;
2191
0
    }
2192
0
    Status::OK().to_protobuf(response->mutable_status());
2193
0
}
2194
2195
void PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote_host,
2196
                                                       int64_t brpc_port, int64_t txn_id,
2197
                                                       int64_t tablet_id, int64_t node_id,
2198
0
                                                       bool is_succeed) {
2199
0
    std::shared_ptr<PBackendService_Stub> stub =
2200
0
            ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(remote_host,
2201
0
                                                                             brpc_port);
2202
0
    if (stub == nullptr) {
2203
0
        LOG(WARNING) << "failed to response result of slave replica to master replica. get rpc "
2204
0
                        "stub failed, master host="
2205
0
                     << remote_host << ", port=" << brpc_port << ", tablet_id=" << tablet_id
2206
0
                     << ", txn_id=" << txn_id;
2207
0
        return;
2208
0
    }
2209
2210
0
    auto request = std::make_shared<PTabletWriteSlaveDoneRequest>();
2211
0
    request->set_txn_id(txn_id);
2212
0
    request->set_tablet_id(tablet_id);
2213
0
    request->set_node_id(node_id);
2214
0
    request->set_is_succeed(is_succeed);
2215
0
    auto pull_rowset_callback = DummyBrpcCallback<PTabletWriteSlaveDoneResult>::create_shared();
2216
0
    auto closure = AutoReleaseClosure<
2217
0
            PTabletWriteSlaveDoneRequest,
2218
0
            DummyBrpcCallback<PTabletWriteSlaveDoneResult>>::create_unique(request,
2219
0
                                                                           pull_rowset_callback);
2220
0
    closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000);
2221
0
    closure->cntl_->ignore_eovercrowded();
2222
0
    stub->response_slave_tablet_pull_rowset(closure->cntl_.get(), closure->request_.get(),
2223
0
                                            closure->response_.get(), closure.get());
2224
0
    closure.release();
2225
2226
0
    pull_rowset_callback->join();
2227
0
    if (pull_rowset_callback->cntl_->Failed()) {
2228
0
        LOG(WARNING) << "failed to response result of slave replica to master replica, error="
2229
0
                     << berror(pull_rowset_callback->cntl_->ErrorCode())
2230
0
                     << ", error_text=" << pull_rowset_callback->cntl_->ErrorText()
2231
0
                     << ", master host: " << remote_host << ", tablet_id=" << tablet_id
2232
0
                     << ", txn_id=" << txn_id;
2233
0
    }
2234
0
    VLOG_CRITICAL << "succeed to response the result of slave replica pull rowset to master "
2235
0
                     "replica. master host: "
2236
0
                  << remote_host << ". is_succeed=" << is_succeed << ", tablet_id=" << tablet_id
2237
0
                  << ", slave server=" << node_id << ", txn_id=" << txn_id;
2238
0
}
2239
2240
void PInternalServiceImpl::response_slave_tablet_pull_rowset(
2241
        google::protobuf::RpcController* controller, const PTabletWriteSlaveDoneRequest* request,
2242
0
        PTabletWriteSlaveDoneResult* response, google::protobuf::Closure* done) {
2243
0
    bool ret = _heavy_work_pool.try_offer([txn_mgr = _engine.txn_manager(), request, response,
2244
0
                                           done]() {
2245
0
        brpc::ClosureGuard closure_guard(done);
2246
0
        VLOG_CRITICAL << "receive the result of slave replica pull rowset from slave replica. "
2247
0
                         "slave server="
2248
0
                      << request->node_id() << ", is_succeed=" << request->is_succeed()
2249
0
                      << ", tablet_id=" << request->tablet_id() << ", txn_id=" << request->txn_id();
2250
0
        txn_mgr->finish_slave_tablet_pull_rowset(request->txn_id(), request->tablet_id(),
2251
0
                                                 request->node_id(), request->is_succeed());
2252
0
        Status::OK().to_protobuf(response->mutable_status());
2253
0
    });
2254
0
    if (!ret) {
2255
0
        offer_failed(response, done, _heavy_work_pool);
2256
0
        return;
2257
0
    }
2258
0
}
2259
2260
void PInternalService::multiget_data(google::protobuf::RpcController* controller,
2261
                                     const PMultiGetRequest* request, PMultiGetResponse* response,
2262
0
                                     google::protobuf::Closure* done) {
2263
0
    bool ret = _heavy_work_pool.try_offer([request, response, done]() {
2264
0
        signal::SignalTaskIdKeeper keeper(request->query_id());
2265
        // multi get data by rowid
2266
0
        MonotonicStopWatch watch;
2267
0
        watch.start();
2268
0
        brpc::ClosureGuard closure_guard(done);
2269
0
        response->mutable_status()->set_status_code(0);
2270
0
        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->rowid_storage_reader_tracker());
2271
0
        Status st = RowIdStorageReader::read_by_rowids(*request, response);
2272
0
        st.to_protobuf(response->mutable_status());
2273
0
        LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000;
2274
0
    });
2275
0
    if (!ret) {
2276
0
        offer_failed(response, done, _heavy_work_pool);
2277
0
        return;
2278
0
    }
2279
0
}
2280
2281
void PInternalService::multiget_data_v2(google::protobuf::RpcController* controller,
2282
                                        const PMultiGetRequestV2* request,
2283
                                        PMultiGetResponseV2* response,
2284
2.17k
                                        google::protobuf::Closure* done) {
2285
2.17k
    std::vector<uint64_t> id_set;
2286
2.17k
    id_set.push_back(request->wg_id());
2287
2.17k
    auto wg = ExecEnv::GetInstance()->workload_group_mgr()->get_group(id_set);
2288
2.17k
    Status st = Status::OK();
2289
2290
2.17k
    if (!wg) [[unlikely]] {
2291
0
        brpc::ClosureGuard closure_guard(done);
2292
0
        st = Status::Error<TStatusCode::CANCELLED>("fail to find wg: wg id:" +
2293
0
                                                   std::to_string(request->wg_id()));
2294
0
        st.to_protobuf(response->mutable_status());
2295
0
        return;
2296
0
    }
2297
2298
2.17k
    doris::TaskScheduler* exec_sched = nullptr;
2299
2.17k
    ScannerScheduler* scan_sched = nullptr;
2300
2.17k
    ScannerScheduler* remote_scan_sched = nullptr;
2301
2.17k
    wg->get_query_scheduler(&exec_sched, &scan_sched, &remote_scan_sched);
2302
2.17k
    DCHECK(remote_scan_sched);
2303
2304
2.17k
    st = remote_scan_sched->submit_scan_task(
2305
2.17k
            SimplifiedScanTask(
2306
2.17k
                    [request, response, done]() {
2307
2.17k
                        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->rowid_storage_reader_tracker());
2308
2.17k
                        signal::set_signal_task_id(request->query_id());
2309
                        // multi get data by rowid
2310
2.17k
                        MonotonicStopWatch watch;
2311
2.17k
                        watch.start();
2312
2.17k
                        brpc::ClosureGuard closure_guard(done);
2313
2.17k
                        response->mutable_status()->set_status_code(0);
2314
2.17k
                        Status st = RowIdStorageReader::read_by_rowids(*request, response);
2315
2.17k
                        st.to_protobuf(response->mutable_status());
2316
2.17k
                        LOG(INFO) << "multiget_data finished, cost(us):"
2317
2.17k
                                  << watch.elapsed_time() / 1000;
2318
2.17k
                        return true;
2319
2.17k
                    },
2320
2.17k
                    nullptr, nullptr),
2321
2.17k
            fmt::format("{}-multiget_data_v2", print_id(request->query_id())));
2322
2323
2.17k
    if (!st.ok()) {
2324
0
        brpc::ClosureGuard closure_guard(done);
2325
0
        st.to_protobuf(response->mutable_status());
2326
0
    }
2327
2.17k
}
2328
2329
void PInternalServiceImpl::get_tablet_rowset_versions(google::protobuf::RpcController* cntl_base,
2330
                                                      const PGetTabletVersionsRequest* request,
2331
                                                      PGetTabletVersionsResponse* response,
2332
0
                                                      google::protobuf::Closure* done) {
2333
0
    brpc::ClosureGuard closure_guard(done);
2334
0
    VLOG_DEBUG << "receive get tablet versions request: " << request->DebugString();
2335
0
    _engine.get_tablet_rowset_versions(request, response);
2336
0
}
2337
2338
void PInternalService::glob(google::protobuf::RpcController* controller,
2339
                            const PGlobRequest* request, PGlobResponse* response,
2340
237
                            google::protobuf::Closure* done) {
2341
237
    bool ret = _heavy_work_pool.try_offer([request, response, done]() {
2342
237
        brpc::ClosureGuard closure_guard(done);
2343
237
        std::vector<io::FileInfo> files;
2344
237
        Status st = io::global_local_filesystem()->safe_glob(request->pattern(), &files);
2345
237
        if (st.ok()) {
2346
238
            for (auto& file : files) {
2347
238
                PGlobResponse_PFileInfo* pfile = response->add_files();
2348
238
                pfile->set_file(file.file_name);
2349
238
                pfile->set_size(file.file_size);
2350
238
            }
2351
226
        }
2352
237
        st.to_protobuf(response->mutable_status());
2353
237
    });
2354
237
    if (!ret) {
2355
0
        offer_failed(response, done, _heavy_work_pool);
2356
0
        return;
2357
0
    }
2358
237
}
2359
2360
void PInternalService::group_commit_insert(google::protobuf::RpcController* controller,
2361
                                           const PGroupCommitInsertRequest* request,
2362
                                           PGroupCommitInsertResponse* response,
2363
29
                                           google::protobuf::Closure* done) {
2364
29
    TUniqueId load_id;
2365
29
    load_id.__set_hi(request->load_id().hi());
2366
29
    load_id.__set_lo(request->load_id().lo());
2367
29
    std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
2368
29
    std::shared_ptr<bool> is_done = std::make_shared<bool>(false);
2369
29
    bool ret = _heavy_work_pool.try_offer([this, request, response, done, load_id, lock,
2370
29
                                           is_done]() {
2371
29
        brpc::ClosureGuard closure_guard(done);
2372
29
        std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
2373
29
        auto pipe = std::make_shared<io::StreamLoadPipe>(
2374
29
                io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
2375
29
                -1 /* total_length */, true /* use_proto */);
2376
29
        ctx->pipe = pipe;
2377
29
        Status st = _exec_env->new_load_stream_mgr()->put(load_id, ctx);
2378
29
        if (st.ok()) {
2379
29
            try {
2380
29
                st = _exec_plan_fragment_impl(
2381
29
                        request->exec_plan_fragment_request().request(),
2382
29
                        request->exec_plan_fragment_request().version(),
2383
29
                        request->exec_plan_fragment_request().compact(),
2384
29
                        [&, response, done, load_id, lock, is_done](RuntimeState* state,
2385
29
                                                                    Status* status) {
2386
29
                            std::lock_guard<std::mutex> lock1(*lock);
2387
29
                            if (*is_done) {
2388
0
                                return;
2389
0
                            }
2390
29
                            *is_done = true;
2391
29
                            brpc::ClosureGuard cb_closure_guard(done);
2392
29
                            response->set_label(state->import_label());
2393
29
                            response->set_txn_id(state->wal_id());
2394
29
                            response->set_loaded_rows(state->num_rows_load_success());
2395
29
                            response->set_filtered_rows(state->num_rows_load_filtered());
2396
29
                            status->to_protobuf(response->mutable_status());
2397
29
                            if (!state->get_error_log_file_path().empty()) {
2398
0
                                response->set_error_url(
2399
0
                                        to_load_error_http_path(state->get_error_log_file_path()));
2400
0
                            }
2401
29
                            if (!state->get_first_error_msg().empty()) {
2402
0
                                response->set_first_error_msg(state->get_first_error_msg());
2403
0
                            }
2404
29
                            _exec_env->new_load_stream_mgr()->remove(load_id);
2405
29
                        });
2406
29
            } catch (const Exception& e) {
2407
0
                st = e.to_status();
2408
0
            } catch (const std::exception& e) {
2409
0
                st = Status::Error(ErrorCode::INTERNAL_ERROR, e.what());
2410
0
            } catch (...) {
2411
0
                st = Status::Error(ErrorCode::INTERNAL_ERROR,
2412
0
                                   "_exec_plan_fragment_impl meet unknown error");
2413
0
            }
2414
29
            if (!st.ok()) {
2415
0
                LOG(WARNING) << "exec plan fragment failed, load_id=" << print_id(load_id)
2416
0
                             << ", errmsg=" << st;
2417
0
                std::lock_guard<std::mutex> lock1(*lock);
2418
0
                if (*is_done) {
2419
0
                    closure_guard.release();
2420
0
                } else {
2421
0
                    *is_done = true;
2422
0
                    st.to_protobuf(response->mutable_status());
2423
0
                    _exec_env->new_load_stream_mgr()->remove(load_id);
2424
0
                }
2425
29
            } else {
2426
29
                closure_guard.release();
2427
66
                for (int i = 0; i < request->data().size(); ++i) {
2428
37
                    std::unique_ptr<PDataRow> row(new PDataRow());
2429
37
                    row->CopyFrom(request->data(i));
2430
37
                    st = pipe->append(std::move(row));
2431
37
                    if (!st.ok()) {
2432
0
                        break;
2433
0
                    }
2434
37
                }
2435
29
                if (st.ok()) {
2436
29
                    static_cast<void>(pipe->finish());
2437
29
                }
2438
29
            }
2439
29
        }
2440
29
    });
2441
29
    if (!ret) {
2442
0
        _exec_env->new_load_stream_mgr()->remove(load_id);
2443
0
        offer_failed(response, done, _heavy_work_pool);
2444
0
        return;
2445
0
    }
2446
29
};
2447
2448
void PInternalService::get_wal_queue_size(google::protobuf::RpcController* controller,
2449
                                          const PGetWalQueueSizeRequest* request,
2450
                                          PGetWalQueueSizeResponse* response,
2451
1.14k
                                          google::protobuf::Closure* done) {
2452
1.15k
    bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
2453
1.15k
        brpc::ClosureGuard closure_guard(done);
2454
1.15k
        Status st = Status::OK();
2455
1.15k
        auto table_id = request->table_id();
2456
1.15k
        auto count = _exec_env->wal_mgr()->get_wal_queue_size(table_id);
2457
1.15k
        response->set_size(count);
2458
1.15k
        response->mutable_status()->set_status_code(st.code());
2459
1.15k
    });
2460
1.14k
    if (!ret) {
2461
0
        offer_failed(response, done, _heavy_work_pool);
2462
0
    }
2463
1.14k
}
2464
2465
void PInternalService::get_be_resource(google::protobuf::RpcController* controller,
2466
                                       const PGetBeResourceRequest* request,
2467
                                       PGetBeResourceResponse* response,
2468
0
                                       google::protobuf::Closure* done) {
2469
0
    bool ret = _heavy_work_pool.try_offer([response, done]() {
2470
0
        brpc::ClosureGuard closure_guard(done);
2471
0
        int64_t mem_limit = MemInfo::mem_limit();
2472
0
        int64_t mem_usage = PerfCounters::get_vm_rss();
2473
2474
0
        PGlobalResourceUsage* global_resource_usage = response->mutable_global_be_resource_usage();
2475
0
        global_resource_usage->set_mem_limit(mem_limit);
2476
0
        global_resource_usage->set_mem_usage(mem_usage);
2477
2478
0
        Status st = Status::OK();
2479
0
        response->mutable_status()->set_status_code(st.code());
2480
0
    });
2481
0
    if (!ret) {
2482
0
        offer_failed(response, done, _heavy_work_pool);
2483
0
    }
2484
0
}
2485
2486
void PInternalService::delete_dictionary(google::protobuf::RpcController* controller,
2487
                                         const PDeleteDictionaryRequest* request,
2488
                                         PDeleteDictionaryResponse* response,
2489
3
                                         google::protobuf::Closure* done) {
2490
3
    brpc::ClosureGuard closure_guard(done);
2491
3
    Status st = ExecEnv::GetInstance()->dict_factory()->delete_dict(request->dictionary_id());
2492
3
    st.to_protobuf(response->mutable_status());
2493
3
}
2494
2495
void PInternalService::commit_refresh_dictionary(google::protobuf::RpcController* controller,
2496
                                                 const PCommitRefreshDictionaryRequest* request,
2497
                                                 PCommitRefreshDictionaryResponse* response,
2498
86
                                                 google::protobuf::Closure* done) {
2499
86
    brpc::ClosureGuard closure_guard(done);
2500
86
    Status st = ExecEnv::GetInstance()->dict_factory()->commit_refresh_dict(
2501
86
            request->dictionary_id(), request->version_id());
2502
86
    st.to_protobuf(response->mutable_status());
2503
86
}
2504
2505
void PInternalService::abort_refresh_dictionary(google::protobuf::RpcController* controller,
2506
                                                const PAbortRefreshDictionaryRequest* request,
2507
                                                PAbortRefreshDictionaryResponse* response,
2508
3
                                                google::protobuf::Closure* done) {
2509
3
    brpc::ClosureGuard closure_guard(done);
2510
3
    Status st = ExecEnv::GetInstance()->dict_factory()->abort_refresh_dict(request->dictionary_id(),
2511
3
                                                                           request->version_id());
2512
3
    st.to_protobuf(response->mutable_status());
2513
3
}
2514
2515
void PInternalService::get_tablet_rowsets(google::protobuf::RpcController* controller,
2516
                                          const PGetTabletRowsetsRequest* request,
2517
                                          PGetTabletRowsetsResponse* response,
2518
0
                                          google::protobuf::Closure* done) {
2519
0
    DCHECK(config::is_cloud_mode());
2520
0
    auto start_time = GetMonoTimeMicros();
2521
0
    Defer defer {
2522
0
            [&]() { g_process_remote_fetch_rowsets_latency << GetMonoTimeMicros() - start_time; }};
2523
0
    brpc::ClosureGuard closure_guard(done);
2524
0
    LOG(INFO) << "process get tablet rowsets, request=" << request->ShortDebugString();
2525
0
    if (!request->has_tablet_id() || !request->has_version_start() || !request->has_version_end()) {
2526
0
        Status::InvalidArgument("missing params tablet/version_start/version_end")
2527
0
                .to_protobuf(response->mutable_status());
2528
0
        return;
2529
0
    }
2530
0
    CloudStorageEngine& storage = ExecEnv::GetInstance()->storage_engine().to_cloud();
2531
2532
0
    auto maybe_tablet =
2533
0
            storage.tablet_mgr().get_tablet(request->tablet_id(), /*warmup data*/ false,
2534
0
                                            /*syn_delete_bitmap*/ false, /*delete_bitmap*/ nullptr,
2535
0
                                            /*local_only*/ true);
2536
0
    if (!maybe_tablet) {
2537
0
        maybe_tablet.error().to_protobuf(response->mutable_status());
2538
0
        return;
2539
0
    }
2540
0
    auto tablet = maybe_tablet.value();
2541
0
    Result<CaptureRowsetResult> ret;
2542
0
    {
2543
0
        std::shared_lock l(tablet->get_header_lock());
2544
0
        ret = tablet->capture_consistent_rowsets_unlocked(
2545
0
                {request->version_start(), request->version_end()},
2546
0
                CaptureRowsetOps {.enable_fetch_rowsets_from_peers = false});
2547
0
    }
2548
0
    if (!ret) {
2549
0
        ret.error().to_protobuf(response->mutable_status());
2550
0
        return;
2551
0
    }
2552
0
    auto rowsets = std::move(ret.value().rowsets);
2553
0
    for (const auto& rs : rowsets) {
2554
0
        RowsetMetaPB meta;
2555
0
        rs->rowset_meta()->to_rowset_pb(&meta);
2556
0
        response->mutable_rowsets()->Add(std::move(meta));
2557
0
    }
2558
0
    if (request->has_delete_bitmap_keys()) {
2559
0
        DCHECK(tablet->enable_unique_key_merge_on_write());
2560
0
        auto delete_bitmap = std::move(ret.value().delete_bitmap);
2561
0
        auto keys_pb = request->delete_bitmap_keys();
2562
0
        size_t len = keys_pb.rowset_ids().size();
2563
0
        DCHECK_EQ(len, keys_pb.segment_ids().size());
2564
0
        DCHECK_EQ(len, keys_pb.versions().size());
2565
0
        std::set<DeleteBitmap::BitmapKey> keys;
2566
0
        for (size_t i = 0; i < len; ++i) {
2567
0
            RowsetId rs_id;
2568
0
            rs_id.init(keys_pb.rowset_ids(i));
2569
0
            keys.emplace(rs_id, keys_pb.segment_ids(i), keys_pb.versions(i));
2570
0
        }
2571
0
        auto diffset = delete_bitmap->diffset(keys).to_pb();
2572
0
        *response->mutable_delete_bitmap() = std::move(diffset);
2573
0
    }
2574
0
    Status::OK().to_protobuf(response->mutable_status());
2575
0
}
2576
2577
void PInternalService::request_cdc_client(google::protobuf::RpcController* controller,
2578
                                          const PRequestCdcClientRequest* request,
2579
                                          PRequestCdcClientResult* result,
2580
2.23k
                                          google::protobuf::Closure* done) {
2581
2.23k
    bool ret = _heavy_work_pool.try_offer([this, request, result, done]() {
2582
2.23k
        _exec_env->cdc_client_mgr()->request_cdc_client_impl(request, result, done);
2583
2.23k
    });
2584
2585
2.23k
    if (!ret) {
2586
0
        offer_failed(result, done, _heavy_work_pool);
2587
0
        return;
2588
0
    }
2589
2.23k
}
2590
2591
#include "common/compile_check_avoid_end.h"
2592
} // namespace doris