Coverage Report

Created: 2025-04-24 12:23

/root/doris/be/src/service/internal_service.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/internal_service.h"
19
20
#include <assert.h>
21
#include <brpc/closure_guard.h>
22
#include <brpc/controller.h>
23
#include <bthread/bthread.h>
24
#include <bthread/types.h>
25
#include <butil/errno.h>
26
#include <butil/iobuf.h>
27
#include <fcntl.h>
28
#include <fmt/core.h>
29
#include <gen_cpp/DataSinks_types.h>
30
#include <gen_cpp/MasterService_types.h>
31
#include <gen_cpp/PaloInternalService_types.h>
32
#include <gen_cpp/PlanNodes_types.h>
33
#include <gen_cpp/Status_types.h>
34
#include <gen_cpp/Types_types.h>
35
#include <gen_cpp/internal_service.pb.h>
36
#include <gen_cpp/olap_file.pb.h>
37
#include <gen_cpp/segment_v2.pb.h>
38
#include <gen_cpp/types.pb.h>
39
#include <google/protobuf/stubs/callback.h>
40
#include <stddef.h>
41
#include <stdint.h>
42
#include <sys/stat.h>
43
#include <vec/exec/vjdbc_connector.h>
44
45
#include <algorithm>
46
#include <exception>
47
#include <filesystem>
48
#include <memory>
49
#include <set>
50
#include <sstream>
51
#include <string>
52
#include <unordered_map>
53
#include <utility>
54
#include <vector>
55
56
#include "common/config.h"
57
#include "common/consts.h"
58
#include "common/exception.h"
59
#include "common/logging.h"
60
#include "common/signal_handler.h"
61
#include "common/status.h"
62
#include "gen_cpp/BackendService.h"
63
#include "gen_cpp/PaloInternalService_types.h"
64
#include "gen_cpp/internal_service.pb.h"
65
#include "gutil/integral_types.h"
66
#include "http/http_client.h"
67
#include "io/fs/file_writer.h"
68
#include "io/fs/local_file_system.h"
69
#include "io/fs/stream_load_pipe.h"
70
#include "io/io_common.h"
71
#include "olap/data_dir.h"
72
#include "olap/olap_common.h"
73
#include "olap/rowset/beta_rowset.h"
74
#include "olap/rowset/rowset.h"
75
#include "olap/rowset/rowset_factory.h"
76
#include "olap/rowset/rowset_meta.h"
77
#include "olap/rowset/segment_v2/column_reader.h"
78
#include "olap/rowset/segment_v2/common.h"
79
#include "olap/rowset/segment_v2/inverted_index_desc.h"
80
#include "olap/rowset/segment_v2/segment.h"
81
#include "olap/rowset/segment_v2/segment_iterator.h"
82
#include "olap/segment_loader.h"
83
#include "olap/storage_engine.h"
84
#include "olap/tablet.h"
85
#include "olap/tablet_manager.h"
86
#include "olap/tablet_schema.h"
87
#include "olap/txn_manager.h"
88
#include "olap/utils.h"
89
#include "olap/wal/wal_manager.h"
90
#include "runtime/buffer_control_block.h"
91
#include "runtime/cache/result_cache.h"
92
#include "runtime/define_primitive_type.h"
93
#include "runtime/descriptors.h"
94
#include "runtime/exec_env.h"
95
#include "runtime/fold_constant_executor.h"
96
#include "runtime/fragment_mgr.h"
97
#include "runtime/load_channel_mgr.h"
98
#include "runtime/load_stream_mgr.h"
99
#include "runtime/result_buffer_mgr.h"
100
#include "runtime/routine_load/routine_load_task_executor.h"
101
#include "runtime/stream_load/new_load_stream_mgr.h"
102
#include "runtime/stream_load/stream_load_context.h"
103
#include "runtime/thread_context.h"
104
#include "runtime/types.h"
105
#include "service/backend_options.h"
106
#include "service/point_query_executor.h"
107
#include "util/arrow/row_batch.h"
108
#include "util/async_io.h"
109
#include "util/brpc_client_cache.h"
110
#include "util/doris_metrics.h"
111
#include "util/md5.h"
112
#include "util/metrics.h"
113
#include "util/network_util.h"
114
#include "util/proto_util.h"
115
#include "util/ref_count_closure.h"
116
#include "util/runtime_profile.h"
117
#include "util/stopwatch.hpp"
118
#include "util/string_util.h"
119
#include "util/thrift_util.h"
120
#include "util/time.h"
121
#include "util/uid_util.h"
122
#include "vec/columns/column.h"
123
#include "vec/columns/column_string.h"
124
#include "vec/common/schema_util.h"
125
#include "vec/core/block.h"
126
#include "vec/core/column_with_type_and_name.h"
127
#include "vec/data_types/data_type.h"
128
#include "vec/exec/format/avro//avro_jni_reader.h"
129
#include "vec/exec/format/csv/csv_reader.h"
130
#include "vec/exec/format/generic_reader.h"
131
#include "vec/exec/format/json/new_json_reader.h"
132
#include "vec/exec/format/orc/vorc_reader.h"
133
#include "vec/exec/format/parquet/vparquet_reader.h"
134
#include "vec/jsonb/serialize.h"
135
#include "vec/runtime/vdata_stream_mgr.h"
136
137
namespace google {
138
namespace protobuf {
139
class RpcController;
140
} // namespace protobuf
141
} // namespace google
142
143
namespace doris {
144
using namespace ErrorCode;
145
146
const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;
147
148
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_queue_size, MetricUnit::NOUNIT);
149
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_queue_size, MetricUnit::NOUNIT);
150
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_active_threads, MetricUnit::NOUNIT);
151
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_active_threads, MetricUnit::NOUNIT);
152
153
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_max_queue_size, MetricUnit::NOUNIT);
154
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size, MetricUnit::NOUNIT);
155
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT);
156
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT);
157
158
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_queue_size, MetricUnit::NOUNIT);
159
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_active_threads, MetricUnit::NOUNIT);
160
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_max_queue_size, MetricUnit::NOUNIT);
161
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_max_threads, MetricUnit::NOUNIT);
162
163
bthread_key_t btls_key;
164
165
0
static void thread_context_deleter(void* d) {
166
0
    delete static_cast<ThreadContext*>(d);
167
0
}
168
169
template <typename T>
170
class NewHttpClosure : public ::google::protobuf::Closure {
171
public:
172
    NewHttpClosure(google::protobuf::Closure* done) : _done(done) {}
173
0
    NewHttpClosure(T* request, google::protobuf::Closure* done) : _request(request), _done(done) {}
Unexecuted instantiation: _ZN5doris14NewHttpClosureINS_28PTabletWriterAddBlockRequestEEC2EPS1_PN6google8protobuf7ClosureE
Unexecuted instantiation: _ZN5doris14NewHttpClosureINS_19PTransmitDataParamsEEC2EPS1_PN6google8protobuf7ClosureE
174
175
0
    void Run() override {
176
0
        if (_request != nullptr) {
177
0
            delete _request;
178
0
            _request = nullptr;
179
0
        }
180
0
        if (_done != nullptr) {
181
0
            _done->Run();
182
0
        }
183
0
        delete this;
184
0
    }
Unexecuted instantiation: _ZN5doris14NewHttpClosureINS_28PTabletWriterAddBlockRequestEE3RunEv
Unexecuted instantiation: _ZN5doris14NewHttpClosureINS_19PTransmitDataParamsEE3RunEv
185
186
private:
187
    T* _request = nullptr;
188
    google::protobuf::Closure* _done = nullptr;
189
};
190
191
template <typename T>
192
concept CanCancel = requires(T* response) { response->mutable_status(); };
193
194
template <CanCancel T>
195
0
void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) {
196
0
    brpc::ClosureGuard closure_guard(done);
197
    // Should use status to generate protobuf message, because it will encoding Backend Info
198
    // into the error message and then we could know which backend's pool is full.
199
0
    Status st = Status::Error<TStatusCode::CANCELLED>(
200
0
            "fail to offer request to the work pool, pool={}", pool.get_info());
201
0
    st.to_protobuf(response->mutable_status());
202
0
    LOG(WARNING) << "cancelled due to fail to offer request to the work pool, pool="
203
0
                 << pool.get_info();
204
0
}
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PTabletWriterOpenResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PExecPlanFragmentResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23POpenLoadStreamResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_27PTabletWriterAddBlockResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_25PCancelPlanFragmentResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_16PFetchDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_21PFetchArrowDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_26POutfileWriteSuccessResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PFetchTableSchemaResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_29PFetchArrowFlightSchemaResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_24PTabletKeyLookupResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_25PJdbcTestConnectionResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_20PFetchColIdsResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_26PFetchRemoteSchemaResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_12PProxyResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_20PMergeFilterResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PSendFilterSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PSyncFilterSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_22PPublishFilterResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_15PSendDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_13PCommitResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_15PRollbackResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_19PConstantExprResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_19PTransmitDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_24PCheckRPCChannelResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_24PResetRPCChannelResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PTabletWriteSlaveResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_27PTabletWriteSlaveDoneResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_17PMultiGetResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_13PGlobResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_26PGroupCommitInsertResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_24PGetWalQueueSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_22PGetBeResourceResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
205
206
template <typename T>
207
0
void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) {
208
0
    brpc::ClosureGuard closure_guard(done);
209
0
    LOG(WARNING) << "fail to offer request to the work pool, pool=" << pool.get_info();
210
0
}
Unexecuted instantiation: _ZN5doris12offer_failedINS_25PTabletWriterCancelResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_14PCacheResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_17PFetchCacheResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
211
212
PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
213
        : _exec_env(exec_env),
214
          _heavy_work_pool(config::brpc_heavy_work_pool_threads != -1
215
                                   ? config::brpc_heavy_work_pool_threads
216
                                   : std::max(128, CpuInfo::num_cores() * 4),
217
                           config::brpc_heavy_work_pool_max_queue_size != -1
218
                                   ? config::brpc_heavy_work_pool_max_queue_size
219
                                   : std::max(10240, CpuInfo::num_cores() * 320),
220
                           "brpc_heavy"),
221
          _light_work_pool(config::brpc_light_work_pool_threads != -1
222
                                   ? config::brpc_light_work_pool_threads
223
                                   : std::max(128, CpuInfo::num_cores() * 4),
224
                           config::brpc_light_work_pool_max_queue_size != -1
225
                                   ? config::brpc_light_work_pool_max_queue_size
226
                                   : std::max(10240, CpuInfo::num_cores() * 320),
227
                           "brpc_light"),
228
          _arrow_flight_work_pool(config::brpc_arrow_flight_work_pool_threads != -1
229
                                          ? config::brpc_arrow_flight_work_pool_threads
230
                                          : std::max(512, CpuInfo::num_cores() * 2),
231
                                  config::brpc_arrow_flight_work_pool_max_queue_size != -1
232
                                          ? config::brpc_arrow_flight_work_pool_max_queue_size
233
                                          : std::max(20480, CpuInfo::num_cores() * 640),
234
0
                                  "brpc_arrow_flight") {
235
0
    REGISTER_HOOK_METRIC(heavy_work_pool_queue_size,
236
0
                         [this]() { return _heavy_work_pool.get_queue_size(); });
237
0
    REGISTER_HOOK_METRIC(light_work_pool_queue_size,
238
0
                         [this]() { return _light_work_pool.get_queue_size(); });
239
0
    REGISTER_HOOK_METRIC(heavy_work_active_threads,
240
0
                         [this]() { return _heavy_work_pool.get_active_threads(); });
241
0
    REGISTER_HOOK_METRIC(light_work_active_threads,
242
0
                         [this]() { return _light_work_pool.get_active_threads(); });
243
244
0
    REGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size,
245
0
                         []() { return config::brpc_heavy_work_pool_max_queue_size; });
246
0
    REGISTER_HOOK_METRIC(light_work_pool_max_queue_size,
247
0
                         []() { return config::brpc_light_work_pool_max_queue_size; });
248
0
    REGISTER_HOOK_METRIC(heavy_work_max_threads,
249
0
                         []() { return config::brpc_heavy_work_pool_threads; });
250
0
    REGISTER_HOOK_METRIC(light_work_max_threads,
251
0
                         []() { return config::brpc_light_work_pool_threads; });
252
253
0
    REGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size,
254
0
                         [this]() { return _arrow_flight_work_pool.get_queue_size(); });
255
0
    REGISTER_HOOK_METRIC(arrow_flight_work_active_threads,
256
0
                         [this]() { return _arrow_flight_work_pool.get_active_threads(); });
257
0
    REGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size,
258
0
                         []() { return config::brpc_arrow_flight_work_pool_max_queue_size; });
259
0
    REGISTER_HOOK_METRIC(arrow_flight_work_max_threads,
260
0
                         []() { return config::brpc_arrow_flight_work_pool_threads; });
261
262
0
    _exec_env->load_stream_mgr()->set_heavy_work_pool(&_heavy_work_pool);
263
0
    _exec_env->load_stream_mgr()->set_light_work_pool(&_light_work_pool);
264
265
0
    CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter));
266
0
    CHECK_EQ(0, bthread_key_create(&AsyncIO::btls_io_ctx_key, AsyncIO::io_ctx_key_deleter));
267
0
}
268
269
0
PInternalServiceImpl::~PInternalServiceImpl() {
270
0
    DEREGISTER_HOOK_METRIC(heavy_work_pool_queue_size);
271
0
    DEREGISTER_HOOK_METRIC(light_work_pool_queue_size);
272
0
    DEREGISTER_HOOK_METRIC(heavy_work_active_threads);
273
0
    DEREGISTER_HOOK_METRIC(light_work_active_threads);
274
275
0
    DEREGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size);
276
0
    DEREGISTER_HOOK_METRIC(light_work_pool_max_queue_size);
277
0
    DEREGISTER_HOOK_METRIC(heavy_work_max_threads);
278
0
    DEREGISTER_HOOK_METRIC(light_work_max_threads);
279
280
0
    DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size);
281
0
    DEREGISTER_HOOK_METRIC(arrow_flight_work_active_threads);
282
0
    DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size);
283
0
    DEREGISTER_HOOK_METRIC(arrow_flight_work_max_threads);
284
285
0
    CHECK_EQ(0, bthread_key_delete(btls_key));
286
0
    CHECK_EQ(0, bthread_key_delete(AsyncIO::btls_io_ctx_key));
287
0
}
288
289
void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* controller,
290
                                         const PTransmitDataParams* request,
291
                                         PTransmitDataResult* response,
292
0
                                         google::protobuf::Closure* done) {}
293
294
void PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController* controller,
295
                                                 const PEmptyRequest* request,
296
                                                 PTransmitDataResult* response,
297
0
                                                 google::protobuf::Closure* done) {}
298
299
void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* controller,
300
                                          const PTransmitDataParams* request,
301
                                          PTransmitDataResult* response,
302
                                          google::protobuf::Closure* done,
303
0
                                          const Status& extract_st) {}
304
305
void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* controller,
306
                                              const PTabletWriterOpenRequest* request,
307
                                              PTabletWriterOpenResult* response,
308
0
                                              google::protobuf::Closure* done) {
309
0
    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
310
0
        VLOG_RPC << "tablet writer open, id=" << request->id()
311
0
                 << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id();
312
0
        signal::set_signal_task_id(request->id());
313
0
        brpc::ClosureGuard closure_guard(done);
314
0
        auto st = _exec_env->load_channel_mgr()->open(*request);
315
0
        if (!st.ok()) {
316
0
            LOG(WARNING) << "load channel open failed, message=" << st << ", id=" << request->id()
317
0
                         << ", index_id=" << request->index_id()
318
0
                         << ", txn_id=" << request->txn_id();
319
0
        }
320
0
        st.to_protobuf(response->mutable_status());
321
0
    });
322
0
    if (!ret) {
323
0
        offer_failed(response, done, _light_work_pool);
324
0
        return;
325
0
    }
326
0
}
327
328
void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* controller,
329
                                              const PExecPlanFragmentRequest* request,
330
                                              PExecPlanFragmentResult* response,
331
0
                                              google::protobuf::Closure* done) {
332
0
    timeval tv {};
333
0
    gettimeofday(&tv, nullptr);
334
0
    response->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000);
335
0
    bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
336
0
        _exec_plan_fragment_in_pthread(controller, request, response, done);
337
0
    });
338
0
    if (!ret) {
339
0
        offer_failed(response, done, _light_work_pool);
340
0
        return;
341
0
    }
342
0
}
343
344
void PInternalServiceImpl::_exec_plan_fragment_in_pthread(
345
        google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request,
346
0
        PExecPlanFragmentResult* response, google::protobuf::Closure* done) {
347
0
    timeval tv1 {};
348
0
    gettimeofday(&tv1, nullptr);
349
0
    response->set_execution_time(tv1.tv_sec * 1000LL + tv1.tv_usec / 1000);
350
0
    brpc::ClosureGuard closure_guard(done);
351
0
    auto st = Status::OK();
352
0
    bool compact = request->has_compact() ? request->compact() : false;
353
0
    PFragmentRequestVersion version =
354
0
            request->has_version() ? request->version() : PFragmentRequestVersion::VERSION_1;
355
0
    try {
356
0
        st = _exec_plan_fragment_impl(request->request(), version, compact);
357
0
    } catch (const Exception& e) {
358
0
        st = e.to_status();
359
0
    } catch (...) {
360
0
        st = Status::Error(ErrorCode::INTERNAL_ERROR,
361
0
                           "_exec_plan_fragment_impl meet unknown error");
362
0
    }
363
0
    if (!st.ok()) {
364
0
        LOG(WARNING) << "exec plan fragment failed, errmsg=" << st;
365
0
    }
366
0
    st.to_protobuf(response->mutable_status());
367
0
    timeval tv2 {};
368
0
    gettimeofday(&tv2, nullptr);
369
0
    response->set_execution_done_time(tv2.tv_sec * 1000LL + tv2.tv_usec / 1000);
370
0
}
371
372
void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController* controller,
373
                                                      const PExecPlanFragmentRequest* request,
374
                                                      PExecPlanFragmentResult* response,
375
0
                                                      google::protobuf::Closure* done) {
376
0
    timeval tv {};
377
0
    gettimeofday(&tv, nullptr);
378
0
    response->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000);
379
0
    bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
380
0
        _exec_plan_fragment_in_pthread(controller, request, response, done);
381
0
    });
382
0
    if (!ret) {
383
0
        offer_failed(response, done, _light_work_pool);
384
0
        return;
385
0
    }
386
0
}
387
388
void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController* /*controller*/,
389
                                                    const PExecPlanFragmentStartRequest* request,
390
                                                    PExecPlanFragmentResult* result,
391
0
                                                    google::protobuf::Closure* done) {
392
0
    timeval tv {};
393
0
    gettimeofday(&tv, nullptr);
394
0
    result->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000);
395
0
    bool ret = _light_work_pool.try_offer([this, request, result, done]() {
396
0
        timeval tv1 {};
397
0
        gettimeofday(&tv1, nullptr);
398
0
        result->set_execution_time(tv1.tv_sec * 1000LL + tv1.tv_usec / 1000);
399
0
        brpc::ClosureGuard closure_guard(done);
400
0
        auto st = _exec_env->fragment_mgr()->start_query_execution(request);
401
0
        st.to_protobuf(result->mutable_status());
402
0
        timeval tv2 {};
403
0
        gettimeofday(&tv2, nullptr);
404
0
        result->set_execution_done_time(tv2.tv_sec * 1000LL + tv2.tv_usec / 1000);
405
0
    });
406
0
    if (!ret) {
407
0
        offer_failed(result, done, _light_work_pool);
408
0
        return;
409
0
    }
410
0
}
411
412
void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* controller,
413
                                            const POpenLoadStreamRequest* request,
414
                                            POpenLoadStreamResponse* response,
415
0
                                            google::protobuf::Closure* done) {
416
0
    bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
417
0
        signal::set_signal_task_id(request->load_id());
418
0
        brpc::ClosureGuard done_guard(done);
419
0
        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
420
0
        brpc::StreamOptions stream_options;
421
422
0
        LOG(INFO) << "open load stream, load_id=" << request->load_id()
423
0
                  << ", src_id=" << request->src_id();
424
425
0
        for (const auto& req : request->tablets()) {
426
0
            TabletManager* tablet_mgr = StorageEngine::instance()->tablet_manager();
427
0
            TabletSharedPtr tablet = tablet_mgr->get_tablet(req.tablet_id());
428
0
            if (tablet == nullptr) {
429
0
                auto st = Status::NotFound("Tablet {} not found", req.tablet_id());
430
0
                st.to_protobuf(response->mutable_status());
431
0
                cntl->SetFailed(st.to_string());
432
0
                return;
433
0
            }
434
0
            auto resp = response->add_tablet_schemas();
435
0
            resp->set_index_id(req.index_id());
436
0
            resp->set_enable_unique_key_merge_on_write(tablet->enable_unique_key_merge_on_write());
437
0
            tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema());
438
0
        }
439
440
0
        LoadStream* load_stream = nullptr;
441
0
        auto st = _exec_env->load_stream_mgr()->open_load_stream(request, load_stream);
442
0
        if (!st.ok()) {
443
0
            st.to_protobuf(response->mutable_status());
444
0
            return;
445
0
        }
446
447
0
        stream_options.handler = load_stream;
448
0
        stream_options.idle_timeout_ms = request->idle_timeout_ms();
449
0
        DBUG_EXECUTE_IF("PInternalServiceImpl.open_load_stream.set_idle_timeout",
450
0
                        { stream_options.idle_timeout_ms = 1; });
451
452
0
        StreamId streamid;
453
0
        if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) {
454
0
            st = Status::Cancelled("Fail to accept stream {}", streamid);
455
0
            st.to_protobuf(response->mutable_status());
456
0
            cntl->SetFailed(st.to_string());
457
0
            return;
458
0
        }
459
460
0
        VLOG_DEBUG << "get streamid =" << streamid;
461
0
        st.to_protobuf(response->mutable_status());
462
0
    });
463
0
    if (!ret) {
464
0
        offer_failed(response, done, _light_work_pool);
465
0
    }
466
0
}
467
468
void PInternalServiceImpl::tablet_writer_add_block_by_http(
469
        google::protobuf::RpcController* controller, const ::doris::PEmptyRequest* request,
470
0
        PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) {
471
0
    PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest();
472
0
    google::protobuf::Closure* new_done =
473
0
            new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request, done);
474
0
    brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
475
0
    Status st = attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(new_request,
476
0
                                                                                       cntl);
477
0
    if (st.ok()) {
478
0
        tablet_writer_add_block(controller, new_request, response, new_done);
479
0
    } else {
480
0
        st.to_protobuf(response->mutable_status());
481
0
    }
482
0
}
483
484
void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* controller,
485
                                                   const PTabletWriterAddBlockRequest* request,
486
                                                   PTabletWriterAddBlockResult* response,
487
0
                                                   google::protobuf::Closure* done) {
488
0
    int64_t submit_task_time_ns = MonotonicNanos();
489
0
    bool ret = _heavy_work_pool.try_offer([request, response, done, submit_task_time_ns, this]() {
490
0
        int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns;
491
0
        brpc::ClosureGuard closure_guard(done);
492
0
        int64_t execution_time_ns = 0;
493
0
        {
494
0
            SCOPED_RAW_TIMER(&execution_time_ns);
495
0
            signal::set_signal_task_id(request->id());
496
0
            auto st = _exec_env->load_channel_mgr()->add_batch(*request, response);
497
0
            if (!st.ok()) {
498
0
                LOG(WARNING) << "tablet writer add block failed, message=" << st
499
0
                             << ", id=" << request->id() << ", index_id=" << request->index_id()
500
0
                             << ", sender_id=" << request->sender_id()
501
0
                             << ", backend id=" << request->backend_id();
502
0
            }
503
0
            st.to_protobuf(response->mutable_status());
504
0
        }
505
0
        response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO);
506
0
        response->set_wait_execution_time_us(wait_execution_time_ns / NANOS_PER_MICRO);
507
0
    });
508
0
    if (!ret) {
509
0
        offer_failed(response, done, _heavy_work_pool);
510
0
        return;
511
0
    }
512
0
}
513
514
void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* controller,
515
                                                const PTabletWriterCancelRequest* request,
516
                                                PTabletWriterCancelResult* response,
517
0
                                                google::protobuf::Closure* done) {
518
0
    bool ret = _light_work_pool.try_offer([this, request, done]() {
519
0
        VLOG_RPC << "tablet writer cancel, id=" << request->id()
520
0
                 << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id();
521
0
        signal::set_signal_task_id(request->id());
522
0
        brpc::ClosureGuard closure_guard(done);
523
0
        auto st = _exec_env->load_channel_mgr()->cancel(*request);
524
0
        if (!st.ok()) {
525
0
            LOG(WARNING) << "tablet writer cancel failed, id=" << request->id()
526
0
                         << ", index_id=" << request->index_id()
527
0
                         << ", sender_id=" << request->sender_id();
528
0
        }
529
0
    });
530
0
    if (!ret) {
531
0
        offer_failed(response, done, _light_work_pool);
532
0
        return;
533
0
    }
534
0
}
535
536
Status PInternalServiceImpl::_exec_plan_fragment_impl(
537
        const std::string& ser_request, PFragmentRequestVersion version, bool compact,
538
0
        const std::function<void(RuntimeState*, Status*)>& cb) {
539
    // Sometimes the BE do not receive the first heartbeat message and it receives request from FE
540
    // If BE execute this fragment, it will core when it wants to get some property from master info.
541
0
    if (ExecEnv::GetInstance()->master_info() == nullptr) {
542
0
        return Status::InternalError(
543
0
                "Have not receive the first heartbeat message from master, not ready to provide "
544
0
                "service");
545
0
    }
546
0
    if (version == PFragmentRequestVersion::VERSION_1) {
547
        // VERSION_1 should be removed in v1.2
548
0
        TExecPlanFragmentParams t_request;
549
0
        {
550
0
            const uint8_t* buf = (const uint8_t*)ser_request.data();
551
0
            uint32_t len = ser_request.size();
552
0
            RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request));
553
0
        }
554
0
        if (cb) {
555
0
            return _exec_env->fragment_mgr()->exec_plan_fragment(
556
0
                    t_request, QuerySource::INTERNAL_FRONTEND, cb);
557
0
        } else {
558
0
            return _exec_env->fragment_mgr()->exec_plan_fragment(t_request,
559
0
                                                                 QuerySource::INTERNAL_FRONTEND);
560
0
        }
561
0
    } else if (version == PFragmentRequestVersion::VERSION_2) {
562
0
        TExecPlanFragmentParamsList t_request;
563
0
        {
564
0
            const uint8_t* buf = (const uint8_t*)ser_request.data();
565
0
            uint32_t len = ser_request.size();
566
0
            RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request));
567
0
        }
568
0
        const auto& fragment_list = t_request.paramsList;
569
0
        MonotonicStopWatch timer;
570
0
        timer.start();
571
572
0
        for (const TExecPlanFragmentParams& params : t_request.paramsList) {
573
0
            if (cb) {
574
0
                RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
575
0
                        params, QuerySource::INTERNAL_FRONTEND, cb));
576
0
            } else {
577
0
                RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
578
0
                        params, QuerySource::INTERNAL_FRONTEND));
579
0
            }
580
0
        }
581
582
0
        timer.stop();
583
0
        double cost_secs = static_cast<double>(timer.elapsed_time()) / 1000000000ULL;
584
0
        if (cost_secs > 5) {
585
0
            LOG_WARNING("Prepare {} fragments of query {} costs {} seconds, it costs too much",
586
0
                        fragment_list.size(), print_id(fragment_list.front().params.query_id),
587
0
                        cost_secs);
588
0
        }
589
590
0
        return Status::OK();
591
0
    } else if (version == PFragmentRequestVersion::VERSION_3) {
592
0
        TPipelineFragmentParamsList t_request;
593
0
        {
594
0
            const uint8_t* buf = (const uint8_t*)ser_request.data();
595
0
            uint32_t len = ser_request.size();
596
0
            RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request));
597
0
        }
598
599
0
        const auto& fragment_list = t_request.params_list;
600
0
        if (fragment_list.empty()) {
601
0
            return Status::InternalError("Invalid TPipelineFragmentParamsList!");
602
0
        }
603
0
        MonotonicStopWatch timer;
604
0
        timer.start();
605
0
        for (const TPipelineFragmentParams& fragment : fragment_list) {
606
0
            if (cb) {
607
0
                RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
608
0
                        fragment, QuerySource::INTERNAL_FRONTEND, cb));
609
0
            } else {
610
0
                RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
611
0
                        fragment, QuerySource::INTERNAL_FRONTEND));
612
0
            }
613
0
        }
614
0
        timer.stop();
615
0
        double cost_secs = static_cast<double>(timer.elapsed_time()) / 1000000000ULL;
616
0
        if (cost_secs > 5) {
617
0
            LOG_WARNING("Prepare {} fragments of query {} costs {} seconds, it costs too much",
618
0
                        fragment_list.size(), print_id(fragment_list.front().query_id), cost_secs);
619
0
        }
620
621
0
        return Status::OK();
622
0
    } else {
623
0
        return Status::InternalError("invalid version");
624
0
    }
625
0
}
626
627
void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* /*controller*/,
628
                                                const PCancelPlanFragmentRequest* request,
629
                                                PCancelPlanFragmentResult* result,
630
0
                                                google::protobuf::Closure* done) {
631
0
    bool ret = _light_work_pool.try_offer([this, request, result, done]() {
632
0
        brpc::ClosureGuard closure_guard(done);
633
0
        TUniqueId tid;
634
0
        tid.__set_hi(request->finst_id().hi());
635
0
        tid.__set_lo(request->finst_id().lo());
636
0
        signal::set_signal_task_id(tid);
637
0
        Status st = Status::OK();
638
639
0
        const bool has_cancel_reason = request->has_cancel_reason();
640
        // KILL QUERY need send query directly to BE ignore the fragmentid or fragment instance id.
641
        // Actully, not need check fragmentid, but should keep stable on 2.1, so that only call
642
        // cancel_query when queryid is set and fragment id not set.
643
0
        if (request->has_query_id() && !request->has_fragment_id()) {
644
0
            TUniqueId query_id;
645
0
            query_id.__set_hi(request->query_id().hi());
646
0
            query_id.__set_lo(request->query_id().lo());
647
0
            LOG(INFO) << fmt::format(
648
0
                    "Cancel query {}, reason: {}", print_id(query_id),
649
0
                    has_cancel_reason ? PPlanFragmentCancelReason_Name(request->cancel_reason())
650
0
                                      : "INTERNAL_ERROR");
651
0
            _exec_env->fragment_mgr()->cancel_query(
652
0
                    query_id,
653
0
                    has_cancel_reason ? request->cancel_reason()
654
0
                                      : PPlanFragmentCancelReason::INTERNAL_ERROR,
655
0
                    "cancel query from FE");
656
0
        } else if (request->has_fragment_id()) {
657
0
            TUniqueId query_id;
658
0
            query_id.__set_hi(request->query_id().hi());
659
0
            query_id.__set_lo(request->query_id().lo());
660
0
            LOG(INFO) << fmt::format(
661
0
                    "Cancel query {}, reason: {}", print_id(query_id),
662
0
                    has_cancel_reason ? PPlanFragmentCancelReason_Name(request->cancel_reason())
663
0
                                      : "INTERNAL_ERROR");
664
0
            _exec_env->fragment_mgr()->cancel_fragment(
665
0
                    query_id, request->fragment_id(),
666
0
                    has_cancel_reason ? request->cancel_reason()
667
0
                                      : PPlanFragmentCancelReason::INTERNAL_ERROR);
668
0
        } else {
669
0
            LOG(INFO) << fmt::format(
670
0
                    "Cancel instance {}, reason: {}", print_id(tid),
671
0
                    has_cancel_reason ? PPlanFragmentCancelReason_Name(request->cancel_reason())
672
0
                                      : "INTERNAL_ERROR");
673
0
            _exec_env->fragment_mgr()->cancel_instance(
674
0
                    tid, has_cancel_reason ? request->cancel_reason()
675
0
                                           : PPlanFragmentCancelReason::INTERNAL_ERROR);
676
0
        }
677
678
        // TODO: the logic seems useless, cancel only return Status::OK. remove it
679
0
        st.to_protobuf(result->mutable_status());
680
0
    });
681
0
    if (!ret) {
682
0
        offer_failed(result, done, _light_work_pool);
683
0
        return;
684
0
    }
685
0
}
686
687
void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* controller,
688
                                      const PFetchDataRequest* request, PFetchDataResult* result,
689
0
                                      google::protobuf::Closure* done) {
690
0
    bool ret = _heavy_work_pool.try_offer([this, controller, request, result, done]() {
691
0
        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
692
0
        GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
693
0
        _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
694
0
    });
695
0
    if (!ret) {
696
0
        offer_failed(result, done, _heavy_work_pool);
697
0
        return;
698
0
    }
699
0
}
700
701
void PInternalServiceImpl::fetch_arrow_data(google::protobuf::RpcController* controller,
702
                                            const PFetchArrowDataRequest* request,
703
                                            PFetchArrowDataResult* result,
704
0
                                            google::protobuf::Closure* done) {
705
0
    bool ret = _arrow_flight_work_pool.try_offer([this, controller, request, result, done]() {
706
0
        brpc::ClosureGuard closure_guard(done);
707
0
        auto* cntl = static_cast<brpc::Controller*>(controller);
708
0
        auto* ctx = new GetArrowResultBatchCtx(cntl, result, done);
709
0
        _exec_env->result_mgr()->fetch_arrow_data(request->finst_id(), ctx);
710
0
    });
711
0
    if (!ret) {
712
0
        offer_failed(result, done, _arrow_flight_work_pool);
713
0
        return;
714
0
    }
715
0
}
716
717
void PInternalServiceImpl::outfile_write_success(google::protobuf::RpcController* controller,
718
                                                 const POutfileWriteSuccessRequest* request,
719
                                                 POutfileWriteSuccessResult* result,
720
0
                                                 google::protobuf::Closure* done) {
721
0
    bool ret = _heavy_work_pool.try_offer([request, result, done]() {
722
0
        VLOG_RPC << "outfile write success file";
723
0
        brpc::ClosureGuard closure_guard(done);
724
0
        TResultFileSink result_file_sink;
725
0
        Status st = Status::OK();
726
0
        {
727
0
            const uint8_t* buf = (const uint8_t*)(request->result_file_sink().data());
728
0
            uint32_t len = request->result_file_sink().size();
729
0
            st = deserialize_thrift_msg(buf, &len, false, &result_file_sink);
730
0
            if (!st.ok()) {
731
0
                LOG(WARNING) << "outfile write success file failed, errmsg = " << st;
732
0
                st.to_protobuf(result->mutable_status());
733
0
                return;
734
0
            }
735
0
        }
736
737
0
        TResultFileSinkOptions file_options = result_file_sink.file_options;
738
0
        std::stringstream ss;
739
0
        ss << file_options.file_path << file_options.success_file_name;
740
0
        std::string file_name = ss.str();
741
0
        if (result_file_sink.storage_backend_type == TStorageBackendType::LOCAL) {
742
            // For local file writer, the file_path is a local dir.
743
            // Here we do a simple security verification by checking whether the file exists.
744
            // Because the file path is currently arbitrarily specified by the user,
745
            // Doris is not responsible for ensuring the correctness of the path.
746
            // This is just to prevent overwriting the existing file.
747
0
            bool exists = true;
748
0
            st = io::global_local_filesystem()->exists(file_name, &exists);
749
0
            if (!st.ok()) {
750
0
                LOG(WARNING) << "outfile write success filefailed, errmsg = " << st;
751
0
                st.to_protobuf(result->mutable_status());
752
0
                return;
753
0
            }
754
0
            if (exists) {
755
0
                st = Status::InternalError("File already exists: {}", file_name);
756
0
            }
757
0
            if (!st.ok()) {
758
0
                LOG(WARNING) << "outfile write success file failed, errmsg = " << st;
759
0
                st.to_protobuf(result->mutable_status());
760
0
                return;
761
0
            }
762
0
        }
763
764
0
        std::unique_ptr<doris::io::FileWriter> _file_writer_impl;
765
0
        st = FileFactory::create_file_writer(
766
0
                FileFactory::convert_storage_type(result_file_sink.storage_backend_type),
767
0
                ExecEnv::GetInstance(), file_options.broker_addresses,
768
0
                file_options.broker_properties, file_name, 0, _file_writer_impl);
769
0
        if (!st.ok()) {
770
0
            LOG(WARNING) << "Outfile write success file failed when create file writer , errmsg = "
771
0
                         << st;
772
0
            st.to_protobuf(result->mutable_status());
773
0
            return;
774
0
        }
775
776
        // must write somthing because s3 file writer can not writer empty file
777
0
        st = _file_writer_impl->append({"success"});
778
0
        if (!st.ok()) {
779
0
            LOG(WARNING) << "outfile write success file failed when write success, errmsg = " << st;
780
0
            st.to_protobuf(result->mutable_status());
781
0
            return;
782
0
        }
783
784
0
        st = _file_writer_impl->close();
785
0
        if (!st.ok()) {
786
0
            LOG(WARNING) << "outfile write success file failed when close file writer, errmsg = "
787
0
                         << st;
788
0
            st.to_protobuf(result->mutable_status());
789
0
            return;
790
0
        }
791
0
    });
792
0
    if (!ret) {
793
0
        offer_failed(result, done, _heavy_work_pool);
794
0
        return;
795
0
    }
796
0
}
797
798
void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* controller,
799
                                              const PFetchTableSchemaRequest* request,
800
                                              PFetchTableSchemaResult* result,
801
0
                                              google::protobuf::Closure* done) {
802
0
    bool ret = _heavy_work_pool.try_offer([request, result, done]() {
803
0
        VLOG_RPC << "fetch table schema";
804
0
        brpc::ClosureGuard closure_guard(done);
805
0
        TFileScanRange file_scan_range;
806
0
        Status st = Status::OK();
807
0
        {
808
0
            const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data());
809
0
            uint32_t len = request->file_scan_range().size();
810
0
            st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
811
0
            if (!st.ok()) {
812
0
                LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
813
0
                st.to_protobuf(result->mutable_status());
814
0
                return;
815
0
            }
816
0
        }
817
0
        if (file_scan_range.__isset.ranges == false) {
818
0
            st = Status::InternalError("can not get TFileRangeDesc.");
819
0
            st.to_protobuf(result->mutable_status());
820
0
            return;
821
0
        }
822
0
        if (file_scan_range.__isset.params == false) {
823
0
            st = Status::InternalError("can not get TFileScanRangeParams.");
824
0
            st.to_protobuf(result->mutable_status());
825
0
            return;
826
0
        }
827
0
        const TFileRangeDesc& range = file_scan_range.ranges.at(0);
828
0
        const TFileScanRangeParams& params = file_scan_range.params;
829
830
0
        std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
831
0
                MemTrackerLimiter::Type::OTHER,
832
0
                fmt::format("InternalService::fetch_table_schema:{}#{}", params.format_type,
833
0
                            params.file_type));
834
0
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
835
836
        // make sure profile is desctructed after reader cause PrefetchBufferedReader
837
        // might asynchronouslly access the profile
838
0
        std::unique_ptr<RuntimeProfile> profile =
839
0
                std::make_unique<RuntimeProfile>("FetchTableSchema");
840
0
        std::unique_ptr<vectorized::GenericReader> reader(nullptr);
841
0
        io::IOContext io_ctx;
842
0
        io::FileCacheStatistics file_cache_statis;
843
0
        io_ctx.file_cache_stats = &file_cache_statis;
844
0
        switch (params.format_type) {
845
0
        case TFileFormatType::FORMAT_CSV_PLAIN:
846
0
        case TFileFormatType::FORMAT_CSV_GZ:
847
0
        case TFileFormatType::FORMAT_CSV_BZ2:
848
0
        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
849
0
        case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
850
0
        case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
851
0
        case TFileFormatType::FORMAT_CSV_LZOP:
852
0
        case TFileFormatType::FORMAT_CSV_DEFLATE: {
853
            // file_slots is no use
854
0
            std::vector<SlotDescriptor*> file_slots;
855
0
            reader = vectorized::CsvReader::create_unique(profile.get(), params, range, file_slots,
856
0
                                                          &io_ctx);
857
0
            break;
858
0
        }
859
0
        case TFileFormatType::FORMAT_PARQUET: {
860
0
            reader = vectorized::ParquetReader::create_unique(params, range, &io_ctx, nullptr);
861
0
            break;
862
0
        }
863
0
        case TFileFormatType::FORMAT_ORC: {
864
0
            reader = vectorized::OrcReader::create_unique(params, range, "", &io_ctx);
865
0
            break;
866
0
        }
867
0
        case TFileFormatType::FORMAT_JSON: {
868
0
            std::vector<SlotDescriptor*> file_slots;
869
0
            reader = vectorized::NewJsonReader::create_unique(profile.get(), params, range,
870
0
                                                              file_slots, &io_ctx);
871
0
            break;
872
0
        }
873
0
        case TFileFormatType::FORMAT_AVRO: {
874
            // file_slots is no use
875
0
            std::vector<SlotDescriptor*> file_slots;
876
0
            reader = vectorized::AvroJNIReader::create_unique(profile.get(), params, range,
877
0
                                                              file_slots);
878
0
            st = ((vectorized::AvroJNIReader*)(reader.get()))->init_fetch_table_schema_reader();
879
0
            break;
880
0
        }
881
0
        default:
882
0
            st = Status::InternalError("Not supported file format in fetch table schema: {}",
883
0
                                       params.format_type);
884
0
            st.to_protobuf(result->mutable_status());
885
0
            return;
886
0
        }
887
0
        if (!st.ok()) {
888
0
            LOG(WARNING) << "failed to init reader, errmsg=" << st;
889
0
            st.to_protobuf(result->mutable_status());
890
0
            return;
891
0
        }
892
0
        std::vector<std::string> col_names;
893
0
        std::vector<TypeDescriptor> col_types;
894
0
        st = reader->get_parsed_schema(&col_names, &col_types);
895
0
        if (!st.ok()) {
896
0
            LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
897
0
            st.to_protobuf(result->mutable_status());
898
0
            return;
899
0
        }
900
0
        result->set_column_nums(col_names.size());
901
0
        for (size_t idx = 0; idx < col_names.size(); ++idx) {
902
0
            result->add_column_names(col_names[idx]);
903
0
        }
904
0
        for (size_t idx = 0; idx < col_types.size(); ++idx) {
905
0
            PTypeDesc* type_desc = result->add_column_types();
906
0
            col_types[idx].to_protobuf(type_desc);
907
0
        }
908
0
        st.to_protobuf(result->mutable_status());
909
0
    });
910
0
    if (!ret) {
911
0
        offer_failed(result, done, _heavy_work_pool);
912
0
        return;
913
0
    }
914
0
}
915
916
void PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcController* controller,
917
                                                     const PFetchArrowFlightSchemaRequest* request,
918
                                                     PFetchArrowFlightSchemaResult* result,
919
0
                                                     google::protobuf::Closure* done) {
920
0
    bool ret = _arrow_flight_work_pool.try_offer([request, result, done]() {
921
0
        brpc::ClosureGuard closure_guard(done);
922
0
        std::shared_ptr<arrow::Schema> schema;
923
0
        auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
924
0
                UniqueId(request->finst_id()).to_thrift(), &schema);
925
0
        if (!st.ok()) {
926
0
            LOG(WARNING) << "fetch arrow flight schema failed, errmsg=" << st;
927
0
            st.to_protobuf(result->mutable_status());
928
0
            return;
929
0
        }
930
931
0
        std::string schema_str;
932
0
        st = serialize_arrow_schema(&schema, &schema_str);
933
0
        if (st.ok()) {
934
0
            result->set_schema(std::move(schema_str));
935
0
            if (!config::public_host.empty()) {
936
0
                result->set_be_arrow_flight_ip(config::public_host);
937
0
            }
938
0
            if (config::arrow_flight_sql_proxy_port != -1) {
939
0
                result->set_be_arrow_flight_port(config::arrow_flight_sql_proxy_port);
940
0
            }
941
0
        }
942
0
        st.to_protobuf(result->mutable_status());
943
0
    });
944
0
    if (!ret) {
945
0
        offer_failed(result, done, _arrow_flight_work_pool);
946
0
        return;
947
0
    }
948
0
}
949
950
Status PInternalServiceImpl::_tablet_fetch_data(const PTabletKeyLookupRequest* request,
951
0
                                                PTabletKeyLookupResponse* response) {
952
0
    PointQueryExecutor lookup_util;
953
0
    RETURN_IF_ERROR(lookup_util.init(request, response));
954
0
    RETURN_IF_ERROR(lookup_util.lookup_up());
955
0
    if (VLOG_DEBUG_IS_ON) {
956
0
        VLOG_DEBUG << lookup_util.print_profile();
957
0
    }
958
0
    LOG_EVERY_N(INFO, 500) << lookup_util.print_profile();
959
0
    return Status::OK();
960
0
}
961
962
void PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* controller,
963
                                             const PTabletKeyLookupRequest* request,
964
                                             PTabletKeyLookupResponse* response,
965
0
                                             google::protobuf::Closure* done) {
966
0
    bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
967
0
        [[maybe_unused]] brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
968
0
        brpc::ClosureGuard guard(done);
969
0
        Status st = _tablet_fetch_data(request, response);
970
0
        st.to_protobuf(response->mutable_status());
971
0
    });
972
0
    if (!ret) {
973
0
        offer_failed(response, done, _light_work_pool);
974
0
        return;
975
0
    }
976
0
}
977
978
void PInternalServiceImpl::test_jdbc_connection(google::protobuf::RpcController* controller,
979
                                                const PJdbcTestConnectionRequest* request,
980
                                                PJdbcTestConnectionResult* result,
981
0
                                                google::protobuf::Closure* done) {
982
0
    bool ret = _heavy_work_pool.try_offer([request, result, done]() {
983
0
        VLOG_RPC << "test jdbc connection";
984
0
        brpc::ClosureGuard closure_guard(done);
985
0
        TTableDescriptor table_desc;
986
0
        vectorized::JdbcConnectorParam jdbc_param;
987
0
        Status st = Status::OK();
988
0
        {
989
0
            const uint8_t* buf = (const uint8_t*)request->jdbc_table().data();
990
0
            uint32_t len = request->jdbc_table().size();
991
0
            st = deserialize_thrift_msg(buf, &len, false, &table_desc);
992
0
            if (!st.ok()) {
993
0
                LOG(WARNING) << "test jdbc connection failed, errmsg=" << st;
994
0
                st.to_protobuf(result->mutable_status());
995
0
                return;
996
0
            }
997
0
        }
998
0
        TJdbcTable jdbc_table = (table_desc.jdbcTable);
999
0
        jdbc_param.catalog_id = jdbc_table.catalog_id;
1000
0
        jdbc_param.driver_class = jdbc_table.jdbc_driver_class;
1001
0
        jdbc_param.driver_path = jdbc_table.jdbc_driver_url;
1002
0
        jdbc_param.driver_checksum = jdbc_table.jdbc_driver_checksum;
1003
0
        jdbc_param.jdbc_url = jdbc_table.jdbc_url;
1004
0
        jdbc_param.user = jdbc_table.jdbc_user;
1005
0
        jdbc_param.passwd = jdbc_table.jdbc_password;
1006
0
        jdbc_param.query_string = request->query_str();
1007
0
        jdbc_param.table_type = static_cast<TOdbcTableType::type>(request->jdbc_table_type());
1008
0
        jdbc_param.use_transaction = false;
1009
0
        jdbc_param.connection_pool_min_size = jdbc_table.connection_pool_min_size;
1010
0
        jdbc_param.connection_pool_max_size = jdbc_table.connection_pool_max_size;
1011
0
        jdbc_param.connection_pool_max_life_time = jdbc_table.connection_pool_max_life_time;
1012
0
        jdbc_param.connection_pool_max_wait_time = jdbc_table.connection_pool_max_wait_time;
1013
0
        jdbc_param.connection_pool_keep_alive = jdbc_table.connection_pool_keep_alive;
1014
1015
0
        std::unique_ptr<vectorized::JdbcConnector> jdbc_connector;
1016
0
        jdbc_connector.reset(new (std::nothrow) vectorized::JdbcConnector(jdbc_param));
1017
1018
0
        st = jdbc_connector->test_connection();
1019
0
        st.to_protobuf(result->mutable_status());
1020
1021
0
        Status clean_st = jdbc_connector->clean_datasource();
1022
0
        if (!clean_st.ok()) {
1023
0
            LOG(WARNING) << "Failed to clean JDBC datasource: " << clean_st.msg();
1024
0
        }
1025
0
        Status close_st = jdbc_connector->close();
1026
0
        if (!close_st.ok()) {
1027
0
            LOG(WARNING) << "Failed to close JDBC connector: " << close_st.msg();
1028
0
        }
1029
0
    });
1030
1031
0
    if (!ret) {
1032
0
        offer_failed(result, done, _heavy_work_pool);
1033
0
        return;
1034
0
    }
1035
0
}
1036
1037
void PInternalServiceImpl::get_column_ids_by_tablet_ids(google::protobuf::RpcController* controller,
1038
                                                        const PFetchColIdsRequest* request,
1039
                                                        PFetchColIdsResponse* response,
1040
0
                                                        google::protobuf::Closure* done) {
1041
0
    bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
1042
0
        _get_column_ids_by_tablet_ids(controller, request, response, done);
1043
0
    });
1044
0
    if (!ret) {
1045
0
        offer_failed(response, done, _light_work_pool);
1046
0
        return;
1047
0
    }
1048
0
}
1049
1050
void PInternalServiceImpl::_get_column_ids_by_tablet_ids(
1051
        google::protobuf::RpcController* controller, const PFetchColIdsRequest* request,
1052
0
        PFetchColIdsResponse* response, google::protobuf::Closure* done) {
1053
0
    brpc::ClosureGuard guard(done);
1054
0
    [[maybe_unused]] brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
1055
0
    TabletManager* tablet_mgr = StorageEngine::instance()->tablet_manager();
1056
0
    const auto& params = request->params();
1057
0
    for (const auto& param : params) {
1058
0
        int64_t index_id = param.indexid();
1059
0
        auto tablet_ids = param.tablet_ids();
1060
0
        std::set<std::set<int32_t>> filter_set;
1061
0
        std::map<int32_t, const TabletColumn*> id_to_column;
1062
0
        for (const int64_t tablet_id : tablet_ids) {
1063
0
            TabletSharedPtr tablet = tablet_mgr->get_tablet(tablet_id);
1064
0
            if (tablet == nullptr) {
1065
0
                std::stringstream ss;
1066
0
                ss << "cannot get tablet by id:" << tablet_id;
1067
0
                LOG(WARNING) << ss.str();
1068
0
                response->mutable_status()->set_status_code(TStatusCode::ILLEGAL_STATE);
1069
0
                response->mutable_status()->add_error_msgs(ss.str());
1070
0
                return;
1071
0
            }
1072
            // check schema consistency, column ids should be the same
1073
0
            const auto& columns = tablet->tablet_schema()->columns();
1074
1075
0
            std::set<int32_t> column_ids;
1076
0
            for (const auto& col : columns) {
1077
0
                column_ids.insert(col->unique_id());
1078
0
            }
1079
0
            filter_set.insert(column_ids);
1080
1081
0
            if (id_to_column.empty()) {
1082
0
                for (const auto& col : columns) {
1083
0
                    id_to_column.insert(std::pair {col->unique_id(), col.get()});
1084
0
                }
1085
0
            } else {
1086
0
                for (const auto& col : columns) {
1087
0
                    auto it = id_to_column.find(col->unique_id());
1088
0
                    if (it == id_to_column.end() || *(it->second) != *col) {
1089
0
                        ColumnPB prev_col_pb;
1090
0
                        ColumnPB curr_col_pb;
1091
0
                        if (it != id_to_column.end()) {
1092
0
                            it->second->to_schema_pb(&prev_col_pb);
1093
0
                        }
1094
0
                        col->to_schema_pb(&curr_col_pb);
1095
0
                        std::stringstream ss;
1096
0
                        ss << "consistency check failed: index{ " << index_id << " }"
1097
0
                           << " got inconsistent schema, prev column: " << prev_col_pb.DebugString()
1098
0
                           << " current column: " << curr_col_pb.DebugString();
1099
0
                        LOG(WARNING) << ss.str();
1100
0
                        response->mutable_status()->set_status_code(TStatusCode::ILLEGAL_STATE);
1101
0
                        response->mutable_status()->add_error_msgs(ss.str());
1102
0
                        return;
1103
0
                    }
1104
0
                }
1105
0
            }
1106
0
        }
1107
1108
0
        if (filter_set.size() > 1) {
1109
            // consistecy check failed
1110
0
            std::stringstream ss;
1111
0
            ss << "consistency check failed: index{" << index_id << "}"
1112
0
               << "got inconsistent schema";
1113
0
            LOG(WARNING) << ss.str();
1114
0
            response->mutable_status()->set_status_code(TStatusCode::ILLEGAL_STATE);
1115
0
            response->mutable_status()->add_error_msgs(ss.str());
1116
0
            return;
1117
0
        }
1118
        // consistency check passed, use the first tablet to be the representative
1119
0
        TabletSharedPtr tablet = tablet_mgr->get_tablet(tablet_ids[0]);
1120
0
        const auto& columns = tablet->tablet_schema()->columns();
1121
0
        auto entry = response->add_entries();
1122
0
        entry->set_index_id(index_id);
1123
0
        auto col_name_to_id = entry->mutable_col_name_to_id();
1124
0
        for (const auto& column : columns) {
1125
0
            (*col_name_to_id)[column->name()] = column->unique_id();
1126
0
        }
1127
0
    }
1128
0
    response->mutable_status()->set_status_code(TStatusCode::OK);
1129
0
}
1130
1131
template <class RPCResponse>
1132
struct AsyncRPCContext {
1133
    RPCResponse response;
1134
    brpc::Controller cntl;
1135
    brpc::CallId cid;
1136
};
1137
1138
void PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcController* controller,
1139
                                                      const PFetchRemoteSchemaRequest* request,
1140
                                                      PFetchRemoteSchemaResponse* response,
1141
0
                                                      google::protobuf::Closure* done) {
1142
0
    bool ret = _heavy_work_pool.try_offer([request, response, done]() {
1143
0
        brpc::ClosureGuard closure_guard(done);
1144
0
        Status st = Status::OK();
1145
0
        if (request->is_coordinator()) {
1146
            // Spawn rpc request to none coordinator nodes, and finally merge them all
1147
0
            PFetchRemoteSchemaRequest remote_request(*request);
1148
            // set it none coordinator to get merged schema
1149
0
            remote_request.set_is_coordinator(false);
1150
0
            using PFetchRemoteTabletSchemaRpcContext = AsyncRPCContext<PFetchRemoteSchemaResponse>;
1151
0
            std::vector<PFetchRemoteTabletSchemaRpcContext> rpc_contexts(
1152
0
                    request->tablet_location_size());
1153
0
            for (int i = 0; i < request->tablet_location_size(); ++i) {
1154
0
                std::string host = request->tablet_location(i).host();
1155
0
                int32_t brpc_port = request->tablet_location(i).brpc_port();
1156
0
                std::shared_ptr<PBackendService_Stub> stub(
1157
0
                        ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
1158
0
                                host, brpc_port));
1159
0
                if (stub == nullptr) {
1160
0
                    LOG(WARNING) << "Failed to init rpc to " << host << ":" << brpc_port;
1161
0
                    st = Status::InternalError("Failed to init rpc to {}:{}", host, brpc_port);
1162
0
                    continue;
1163
0
                }
1164
0
                rpc_contexts[i].cid = rpc_contexts[i].cntl.call_id();
1165
0
                rpc_contexts[i].cntl.set_timeout_ms(config::fetch_remote_schema_rpc_timeout_ms);
1166
0
                stub->fetch_remote_tablet_schema(&rpc_contexts[i].cntl, &remote_request,
1167
0
                                                 &rpc_contexts[i].response, brpc::DoNothing());
1168
0
            }
1169
0
            std::vector<TabletSchemaSPtr> schemas;
1170
0
            for (auto& rpc_context : rpc_contexts) {
1171
0
                brpc::Join(rpc_context.cid);
1172
0
                if (!st.ok()) {
1173
                    // make sure all flying rpc request is joined
1174
0
                    continue;
1175
0
                }
1176
0
                if (rpc_context.cntl.Failed()) {
1177
0
                    LOG(WARNING) << "fetch_remote_tablet_schema rpc err:"
1178
0
                                 << rpc_context.cntl.ErrorText();
1179
0
                    ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
1180
0
                            rpc_context.cntl.remote_side());
1181
0
                    st = Status::InternalError("fetch_remote_tablet_schema rpc err: {}",
1182
0
                                               rpc_context.cntl.ErrorText());
1183
0
                }
1184
0
                if (rpc_context.response.status().status_code() != 0) {
1185
0
                    st = Status::create(rpc_context.response.status());
1186
0
                }
1187
0
                if (rpc_context.response.has_merged_schema()) {
1188
0
                    TabletSchemaSPtr schema = std::make_shared<TabletSchema>();
1189
0
                    schema->init_from_pb(rpc_context.response.merged_schema());
1190
0
                    schemas.push_back(schema);
1191
0
                }
1192
0
            }
1193
0
            if (!schemas.empty() && st.ok()) {
1194
                // merge all
1195
0
                TabletSchemaSPtr merged_schema;
1196
0
                static_cast<void>(vectorized::schema_util::get_least_common_schema(schemas, nullptr,
1197
0
                                                                                   merged_schema));
1198
0
                VLOG_DEBUG << "dump schema:" << merged_schema->dump_structure();
1199
0
                merged_schema->reserve_extracted_columns();
1200
0
                merged_schema->to_schema_pb(response->mutable_merged_schema());
1201
0
            }
1202
0
            st.to_protobuf(response->mutable_status());
1203
0
            return;
1204
0
        } else {
1205
            // This is not a coordinator, get it's tablet and merge schema
1206
0
            std::vector<int64_t> target_tablets;
1207
0
            for (int i = 0; i < request->tablet_location_size(); ++i) {
1208
0
                const auto& location = request->tablet_location(i);
1209
0
                auto backend = BackendOptions::get_local_backend();
1210
                // If this is the target backend
1211
0
                if (backend.host == location.host() && config::brpc_port == location.brpc_port()) {
1212
0
                    target_tablets.assign(location.tablet_id().begin(), location.tablet_id().end());
1213
0
                    break;
1214
0
                }
1215
0
            }
1216
0
            if (!target_tablets.empty()) {
1217
0
                std::vector<TabletSchemaSPtr> tablet_schemas;
1218
0
                for (int64_t tablet_id : target_tablets) {
1219
0
                    TabletSharedPtr tablet =
1220
0
                            StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id,
1221
0
                                                                                    false);
1222
0
                    if (tablet == nullptr) {
1223
                        // just ignore
1224
0
                        LOG(WARNING) << "tablet does not exist, tablet id is " << tablet_id;
1225
0
                        continue;
1226
0
                    }
1227
0
                    tablet_schemas.push_back(tablet->tablet_schema());
1228
0
                }
1229
0
                if (!tablet_schemas.empty()) {
1230
                    // merge all
1231
0
                    TabletSchemaSPtr merged_schema;
1232
0
                    static_cast<void>(vectorized::schema_util::get_least_common_schema(
1233
0
                            tablet_schemas, nullptr, merged_schema));
1234
0
                    merged_schema->to_schema_pb(response->mutable_merged_schema());
1235
0
                    VLOG_DEBUG << "dump schema:" << merged_schema->dump_structure();
1236
0
                }
1237
0
            }
1238
0
            st.to_protobuf(response->mutable_status());
1239
0
        }
1240
0
    });
1241
0
    if (!ret) {
1242
0
        offer_failed(response, done, _heavy_work_pool);
1243
0
    }
1244
0
}
1245
1246
void PInternalServiceImpl::report_stream_load_status(google::protobuf::RpcController* controller,
1247
                                                     const PReportStreamLoadStatusRequest* request,
1248
                                                     PReportStreamLoadStatusResponse* response,
1249
0
                                                     google::protobuf::Closure* done) {
1250
0
    TUniqueId load_id;
1251
0
    load_id.__set_hi(request->load_id().hi());
1252
0
    load_id.__set_lo(request->load_id().lo());
1253
0
    Status st = Status::OK();
1254
0
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id);
1255
0
    if (!stream_load_ctx) {
1256
0
        st = Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
1257
0
    }
1258
0
    stream_load_ctx->promise.set_value(st);
1259
0
    st.to_protobuf(response->mutable_status());
1260
0
}
1261
1262
void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
1263
                                    const PProxyRequest* request, PProxyResult* response,
1264
0
                                    google::protobuf::Closure* done) {
1265
0
    bool ret = _exec_env->routine_load_task_executor()->get_thread_pool().submit_func([this,
1266
0
                                                                                       request,
1267
0
                                                                                       response,
1268
0
                                                                                       done]() {
1269
0
        brpc::ClosureGuard closure_guard(done);
1270
        // PProxyRequest is defined in gensrc/proto/internal_service.proto
1271
        // Currently it supports 2 kinds of requests:
1272
        // 1. get all kafka partition ids for given topic
1273
        // 2. get all kafka partition offsets for given topic and timestamp.
1274
0
        int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() * 1000 : 60 * 1000;
1275
0
        if (request->has_kafka_meta_request()) {
1276
0
            const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request();
1277
0
            if (!kafka_request.offset_flags().empty()) {
1278
0
                std::vector<PIntegerPair> partition_offsets;
1279
0
                Status st = _exec_env->routine_load_task_executor()
1280
0
                                    ->get_kafka_real_offsets_for_partitions(
1281
0
                                            request->kafka_meta_request(), &partition_offsets,
1282
0
                                            timeout_ms);
1283
0
                if (st.ok()) {
1284
0
                    PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets();
1285
0
                    for (const auto& entry : partition_offsets) {
1286
0
                        PIntegerPair* res = part_offsets->add_offset_times();
1287
0
                        res->set_key(entry.key());
1288
0
                        res->set_val(entry.val());
1289
0
                    }
1290
0
                }
1291
0
                st.to_protobuf(response->mutable_status());
1292
0
                return;
1293
0
            } else if (!kafka_request.partition_id_for_latest_offsets().empty()) {
1294
                // get latest offsets for specified partition ids
1295
0
                std::vector<PIntegerPair> partition_offsets;
1296
0
                Status st = _exec_env->routine_load_task_executor()
1297
0
                                    ->get_kafka_latest_offsets_for_partitions(
1298
0
                                            request->kafka_meta_request(), &partition_offsets,
1299
0
                                            timeout_ms);
1300
0
                if (st.ok()) {
1301
0
                    PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets();
1302
0
                    for (const auto& entry : partition_offsets) {
1303
0
                        PIntegerPair* res = part_offsets->add_offset_times();
1304
0
                        res->set_key(entry.key());
1305
0
                        res->set_val(entry.val());
1306
0
                    }
1307
0
                }
1308
0
                st.to_protobuf(response->mutable_status());
1309
0
                return;
1310
0
            } else if (!kafka_request.offset_times().empty()) {
1311
                // if offset_times() has elements, which means this request is to get offset by timestamp.
1312
0
                std::vector<PIntegerPair> partition_offsets;
1313
0
                Status st = _exec_env->routine_load_task_executor()
1314
0
                                    ->get_kafka_partition_offsets_for_times(
1315
0
                                            request->kafka_meta_request(), &partition_offsets,
1316
0
                                            timeout_ms);
1317
0
                if (st.ok()) {
1318
0
                    PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets();
1319
0
                    for (const auto& entry : partition_offsets) {
1320
0
                        PIntegerPair* res = part_offsets->add_offset_times();
1321
0
                        res->set_key(entry.key());
1322
0
                        res->set_val(entry.val());
1323
0
                    }
1324
0
                }
1325
0
                st.to_protobuf(response->mutable_status());
1326
0
                return;
1327
0
            } else {
1328
                // get partition ids of topic
1329
0
                std::vector<int32_t> partition_ids;
1330
0
                Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta(
1331
0
                        request->kafka_meta_request(), &partition_ids);
1332
0
                if (st.ok()) {
1333
0
                    PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result();
1334
0
                    for (int32_t id : partition_ids) {
1335
0
                        kafka_result->add_partition_ids(id);
1336
0
                    }
1337
0
                }
1338
0
                st.to_protobuf(response->mutable_status());
1339
0
                return;
1340
0
            }
1341
0
        }
1342
0
        Status::OK().to_protobuf(response->mutable_status());
1343
0
    });
1344
0
    if (!ret) {
1345
0
        offer_failed(response, done, _heavy_work_pool);
1346
0
        return;
1347
0
    }
1348
0
}
1349
1350
void PInternalServiceImpl::update_cache(google::protobuf::RpcController* controller,
1351
                                        const PUpdateCacheRequest* request,
1352
0
                                        PCacheResponse* response, google::protobuf::Closure* done) {
1353
0
    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
1354
0
        brpc::ClosureGuard closure_guard(done);
1355
0
        _exec_env->result_cache()->update(request, response);
1356
0
    });
1357
0
    if (!ret) {
1358
0
        offer_failed(response, done, _light_work_pool);
1359
0
        return;
1360
0
    }
1361
0
}
1362
1363
void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controller,
1364
                                       const PFetchCacheRequest* request, PFetchCacheResult* result,
1365
0
                                       google::protobuf::Closure* done) {
1366
0
    bool ret = _heavy_work_pool.try_offer([this, request, result, done]() {
1367
0
        brpc::ClosureGuard closure_guard(done);
1368
0
        _exec_env->result_cache()->fetch(request, result);
1369
0
    });
1370
0
    if (!ret) {
1371
0
        offer_failed(result, done, _heavy_work_pool);
1372
0
        return;
1373
0
    }
1374
0
}
1375
1376
void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controller,
1377
                                       const PClearCacheRequest* request, PCacheResponse* response,
1378
0
                                       google::protobuf::Closure* done) {
1379
0
    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
1380
0
        brpc::ClosureGuard closure_guard(done);
1381
0
        _exec_env->result_cache()->clear(request, response);
1382
0
    });
1383
0
    if (!ret) {
1384
0
        offer_failed(response, done, _light_work_pool);
1385
0
        return;
1386
0
    }
1387
0
}
1388
1389
void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* controller,
1390
                                        const ::doris::PMergeFilterRequest* request,
1391
                                        ::doris::PMergeFilterResponse* response,
1392
0
                                        ::google::protobuf::Closure* done) {
1393
0
    bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
1394
0
        brpc::ClosureGuard closure_guard(done);
1395
0
        auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
1396
0
        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
1397
0
        Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream);
1398
0
        st.to_protobuf(response->mutable_status());
1399
0
    });
1400
0
    if (!ret) {
1401
0
        offer_failed(response, done, _light_work_pool);
1402
0
        return;
1403
0
    }
1404
0
}
1405
1406
void PInternalServiceImpl::send_filter_size(::google::protobuf::RpcController* controller,
1407
                                            const ::doris::PSendFilterSizeRequest* request,
1408
                                            ::doris::PSendFilterSizeResponse* response,
1409
0
                                            ::google::protobuf::Closure* done) {
1410
0
    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
1411
0
        brpc::ClosureGuard closure_guard(done);
1412
0
        Status st = _exec_env->fragment_mgr()->send_filter_size(request);
1413
0
        st.to_protobuf(response->mutable_status());
1414
0
    });
1415
0
    if (!ret) {
1416
0
        offer_failed(response, done, _light_work_pool);
1417
0
        return;
1418
0
    }
1419
0
}
1420
1421
void PInternalServiceImpl::sync_filter_size(::google::protobuf::RpcController* controller,
1422
                                            const ::doris::PSyncFilterSizeRequest* request,
1423
                                            ::doris::PSyncFilterSizeResponse* response,
1424
0
                                            ::google::protobuf::Closure* done) {
1425
0
    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
1426
0
        brpc::ClosureGuard closure_guard(done);
1427
0
        Status st = _exec_env->fragment_mgr()->sync_filter_size(request);
1428
0
        st.to_protobuf(response->mutable_status());
1429
0
    });
1430
0
    if (!ret) {
1431
0
        offer_failed(response, done, _light_work_pool);
1432
0
        return;
1433
0
    }
1434
0
}
1435
1436
void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* controller,
1437
                                        const ::doris::PPublishFilterRequest* request,
1438
                                        ::doris::PPublishFilterResponse* response,
1439
0
                                        ::google::protobuf::Closure* done) {
1440
0
    bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
1441
0
        brpc::ClosureGuard closure_guard(done);
1442
0
        auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
1443
0
        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
1444
0
        UniqueId unique_id(request->query_id());
1445
0
        VLOG_NOTICE << "rpc apply_filter recv";
1446
0
        Status st = _exec_env->fragment_mgr()->apply_filter(request, &zero_copy_input_stream);
1447
0
        if (!st.ok()) {
1448
0
            LOG(WARNING) << "apply filter meet error: " << st.to_string();
1449
0
        }
1450
0
        st.to_protobuf(response->mutable_status());
1451
0
    });
1452
0
    if (!ret) {
1453
0
        offer_failed(response, done, _light_work_pool);
1454
0
        return;
1455
0
    }
1456
0
}
1457
1458
void PInternalServiceImpl::apply_filterv2(::google::protobuf::RpcController* controller,
1459
                                          const ::doris::PPublishFilterRequestV2* request,
1460
                                          ::doris::PPublishFilterResponse* response,
1461
0
                                          ::google::protobuf::Closure* done) {
1462
0
    bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
1463
0
        brpc::ClosureGuard closure_guard(done);
1464
0
        auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
1465
0
        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
1466
0
        UniqueId unique_id(request->query_id());
1467
0
        VLOG_NOTICE << "rpc apply_filterv2 recv";
1468
0
        Status st = _exec_env->fragment_mgr()->apply_filterv2(request, &zero_copy_input_stream);
1469
0
        if (!st.ok()) {
1470
0
            LOG(WARNING) << "apply filter meet error: " << st.to_string();
1471
0
        }
1472
0
        st.to_protobuf(response->mutable_status());
1473
0
    });
1474
0
    if (!ret) {
1475
0
        offer_failed(response, done, _light_work_pool);
1476
0
        return;
1477
0
    }
1478
0
}
1479
1480
void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller,
1481
                                     const PSendDataRequest* request, PSendDataResult* response,
1482
0
                                     google::protobuf::Closure* done) {
1483
0
    bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
1484
0
        brpc::ClosureGuard closure_guard(done);
1485
0
        TUniqueId load_id;
1486
0
        load_id.hi = request->load_id().hi();
1487
0
        load_id.lo = request->load_id().lo();
1488
        // On 1.2.3 we add load id to send data request and using load id to get pipe
1489
0
        auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id);
1490
0
        if (stream_load_ctx == nullptr) {
1491
0
            response->mutable_status()->set_status_code(1);
1492
0
            response->mutable_status()->add_error_msgs("could not find stream load context");
1493
0
        } else {
1494
0
            auto pipe = stream_load_ctx->pipe;
1495
0
            for (int i = 0; i < request->data_size(); ++i) {
1496
0
                std::unique_ptr<PDataRow> row(new PDataRow());
1497
0
                row->CopyFrom(request->data(i));
1498
0
                Status s = pipe->append(std::move(row));
1499
0
                if (!s.ok()) {
1500
0
                    response->mutable_status()->set_status_code(1);
1501
0
                    response->mutable_status()->add_error_msgs(s.to_string());
1502
0
                    return;
1503
0
                }
1504
0
            }
1505
0
            response->mutable_status()->set_status_code(0);
1506
0
        }
1507
0
    });
1508
0
    if (!ret) {
1509
0
        offer_failed(response, done, _heavy_work_pool);
1510
0
        return;
1511
0
    }
1512
0
}
1513
1514
void PInternalServiceImpl::commit(google::protobuf::RpcController* controller,
1515
                                  const PCommitRequest* request, PCommitResult* response,
1516
0
                                  google::protobuf::Closure* done) {
1517
0
    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
1518
0
        brpc::ClosureGuard closure_guard(done);
1519
0
        TUniqueId load_id;
1520
0
        load_id.hi = request->load_id().hi();
1521
0
        load_id.lo = request->load_id().lo();
1522
1523
0
        auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id);
1524
0
        if (stream_load_ctx == nullptr) {
1525
0
            response->mutable_status()->set_status_code(1);
1526
0
            response->mutable_status()->add_error_msgs("could not find stream load context");
1527
0
        } else {
1528
0
            static_cast<void>(stream_load_ctx->pipe->finish());
1529
0
            response->mutable_status()->set_status_code(0);
1530
0
        }
1531
0
    });
1532
0
    if (!ret) {
1533
0
        offer_failed(response, done, _light_work_pool);
1534
0
        return;
1535
0
    }
1536
0
}
1537
1538
void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller,
1539
                                    const PRollbackRequest* request, PRollbackResult* response,
1540
0
                                    google::protobuf::Closure* done) {
1541
0
    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
1542
0
        brpc::ClosureGuard closure_guard(done);
1543
0
        TUniqueId load_id;
1544
0
        load_id.hi = request->load_id().hi();
1545
0
        load_id.lo = request->load_id().lo();
1546
0
        auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id);
1547
0
        if (stream_load_ctx == nullptr) {
1548
0
            response->mutable_status()->set_status_code(1);
1549
0
            response->mutable_status()->add_error_msgs("could not find stream load context");
1550
0
        } else {
1551
0
            stream_load_ctx->pipe->cancel("rollback");
1552
0
            response->mutable_status()->set_status_code(0);
1553
0
        }
1554
0
    });
1555
0
    if (!ret) {
1556
0
        offer_failed(response, done, _light_work_pool);
1557
0
        return;
1558
0
    }
1559
0
}
1560
1561
void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* controller,
1562
                                              const PConstantExprRequest* request,
1563
                                              PConstantExprResult* response,
1564
0
                                              google::protobuf::Closure* done) {
1565
0
    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
1566
0
        brpc::ClosureGuard closure_guard(done);
1567
0
        Status st = _fold_constant_expr(request->request(), response);
1568
0
        st.to_protobuf(response->mutable_status());
1569
0
    });
1570
0
    if (!ret) {
1571
0
        offer_failed(response, done, _light_work_pool);
1572
0
        return;
1573
0
    }
1574
0
}
1575
1576
Status PInternalServiceImpl::_fold_constant_expr(const std::string& ser_request,
1577
0
                                                 PConstantExprResult* response) {
1578
0
    TFoldConstantParams t_request;
1579
0
    {
1580
0
        const uint8_t* buf = (const uint8_t*)ser_request.data();
1581
0
        uint32_t len = ser_request.size();
1582
0
        RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, false, &t_request));
1583
0
    }
1584
0
    std::unique_ptr<FoldConstantExecutor> fold_executor = std::make_unique<FoldConstantExecutor>();
1585
0
    Status st = fold_executor->fold_constant_vexpr(t_request, response);
1586
0
    if (!st.ok()) {
1587
0
        LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st
1588
0
                     << " .and query_id_is: " << fold_executor->query_id_string();
1589
0
    }
1590
0
    return st;
1591
0
}
1592
1593
void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* controller,
1594
                                          const PTransmitDataParams* request,
1595
                                          PTransmitDataResult* response,
1596
0
                                          google::protobuf::Closure* done) {
1597
0
    int64_t receive_time = GetCurrentTimeNanos();
1598
0
    response->set_receive_time(receive_time);
1599
1600
    // under high concurrency, thread pool will have a lot of lock contention.
1601
    // May offer failed to the thread pool, so that we should avoid using thread
1602
    // pool here.
1603
0
    _transmit_block(controller, request, response, done, Status::OK());
1604
0
}
1605
1606
void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* controller,
1607
                                                  const PEmptyRequest* request,
1608
                                                  PTransmitDataResult* response,
1609
0
                                                  google::protobuf::Closure* done) {
1610
0
    bool ret = _heavy_work_pool.try_offer([this, controller, response, done]() {
1611
0
        PTransmitDataParams* new_request = new PTransmitDataParams();
1612
0
        google::protobuf::Closure* new_done =
1613
0
                new NewHttpClosure<PTransmitDataParams>(new_request, done);
1614
0
        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
1615
0
        Status st =
1616
0
                attachment_extract_request_contain_block<PTransmitDataParams>(new_request, cntl);
1617
0
        _transmit_block(controller, new_request, response, new_done, st);
1618
0
    });
1619
0
    if (!ret) {
1620
0
        offer_failed(response, done, _heavy_work_pool);
1621
0
        return;
1622
0
    }
1623
0
}
1624
1625
void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* controller,
1626
                                           const PTransmitDataParams* request,
1627
                                           PTransmitDataResult* response,
1628
                                           google::protobuf::Closure* done,
1629
0
                                           const Status& extract_st) {
1630
0
    if (request->has_query_id()) {
1631
0
        VLOG_ROW << "transmit block: fragment_instance_id=" << print_id(request->finst_id())
1632
0
                 << " query_id=" << print_id(request->query_id()) << " node=" << request->node_id();
1633
0
    }
1634
1635
    // The response is accessed when done->Run is called in transmit_block(),
1636
    // give response a default value to avoid null pointers in high concurrency.
1637
0
    Status st;
1638
0
    if (extract_st.ok()) {
1639
0
        st = _exec_env->vstream_mgr()->transmit_block(request, &done);
1640
0
        if (!st.ok() && !st.is<END_OF_FILE>()) {
1641
0
            LOG(WARNING) << "transmit_block failed, message=" << st
1642
0
                         << ", fragment_instance_id=" << print_id(request->finst_id())
1643
0
                         << ", node=" << request->node_id()
1644
0
                         << ", from sender_id: " << request->sender_id()
1645
0
                         << ", be_number: " << request->be_number()
1646
0
                         << ", packet_seq: " << request->packet_seq();
1647
0
        }
1648
0
    } else {
1649
0
        st = extract_st;
1650
0
    }
1651
0
    if (done != nullptr) {
1652
0
        st.to_protobuf(response->mutable_status());
1653
0
        done->Run();
1654
0
    }
1655
0
}
1656
1657
void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* controller,
1658
                                             const PCheckRPCChannelRequest* request,
1659
                                             PCheckRPCChannelResponse* response,
1660
0
                                             google::protobuf::Closure* done) {
1661
0
    bool ret = _light_work_pool.try_offer([request, response, done]() {
1662
0
        brpc::ClosureGuard closure_guard(done);
1663
0
        response->mutable_status()->set_status_code(0);
1664
0
        if (request->data().size() != request->size()) {
1665
0
            std::stringstream ss;
1666
0
            ss << "data size not same, expected: " << request->size()
1667
0
               << ", actual: " << request->data().size();
1668
0
            response->mutable_status()->add_error_msgs(ss.str());
1669
0
            response->mutable_status()->set_status_code(1);
1670
1671
0
        } else {
1672
0
            Md5Digest digest;
1673
0
            digest.update(static_cast<const void*>(request->data().c_str()),
1674
0
                          request->data().size());
1675
0
            digest.digest();
1676
0
            if (!iequal(digest.hex(), request->md5())) {
1677
0
                std::stringstream ss;
1678
0
                ss << "md5 not same, expected: " << request->md5() << ", actual: " << digest.hex();
1679
0
                response->mutable_status()->add_error_msgs(ss.str());
1680
0
                response->mutable_status()->set_status_code(1);
1681
0
            }
1682
0
        }
1683
0
    });
1684
0
    if (!ret) {
1685
0
        offer_failed(response, done, _light_work_pool);
1686
0
        return;
1687
0
    }
1688
0
}
1689
1690
void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* controller,
1691
                                             const PResetRPCChannelRequest* request,
1692
                                             PResetRPCChannelResponse* response,
1693
0
                                             google::protobuf::Closure* done) {
1694
0
    bool ret = _light_work_pool.try_offer([request, response, done]() {
1695
0
        brpc::ClosureGuard closure_guard(done);
1696
0
        response->mutable_status()->set_status_code(0);
1697
0
        if (request->all()) {
1698
0
            int size = ExecEnv::GetInstance()->brpc_internal_client_cache()->size();
1699
0
            if (size > 0) {
1700
0
                std::vector<std::string> endpoints;
1701
0
                ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints);
1702
0
                ExecEnv::GetInstance()->brpc_internal_client_cache()->clear();
1703
0
                *response->mutable_channels() = {endpoints.begin(), endpoints.end()};
1704
0
            }
1705
0
        } else {
1706
0
            for (const std::string& endpoint : request->endpoints()) {
1707
0
                if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) {
1708
0
                    response->mutable_status()->add_error_msgs(endpoint + ": not found.");
1709
0
                    continue;
1710
0
                }
1711
1712
0
                if (ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) {
1713
0
                    response->add_channels(endpoint);
1714
0
                } else {
1715
0
                    response->mutable_status()->add_error_msgs(endpoint + ": reset failed.");
1716
0
                }
1717
0
            }
1718
0
            if (request->endpoints_size() != response->channels_size()) {
1719
0
                response->mutable_status()->set_status_code(1);
1720
0
            }
1721
0
        }
1722
0
    });
1723
0
    if (!ret) {
1724
0
        offer_failed(response, done, _light_work_pool);
1725
0
        return;
1726
0
    }
1727
0
}
1728
1729
void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* controller,
1730
                                      const PHandShakeRequest* request,
1731
                                      PHandShakeResponse* response,
1732
0
                                      google::protobuf::Closure* done) {
1733
0
    brpc::ClosureGuard closure_guard(done);
1734
0
    if (request->has_hello()) {
1735
0
        response->set_hello(request->hello());
1736
0
    }
1737
0
    response->mutable_status()->set_status_code(0);
1738
0
}
1739
1740
constexpr char HttpProtocol[] = "http://";
1741
constexpr char DownloadApiPath[] = "/api/_tablet/_download?token=";
1742
constexpr char FileParam[] = "&file=";
1743
1744
std::string construct_url(const std::string& host_port, const std::string& token,
1745
0
                          const std::string& path) {
1746
0
    return fmt::format("{}{}{}{}{}{}", HttpProtocol, host_port, DownloadApiPath, token, FileParam,
1747
0
                       path);
1748
0
}
1749
1750
std::string construct_file_path(const std::string& tablet_path, const std::string& rowset_id,
1751
0
                                int64_t segment) {
1752
0
    return fmt::format("{}/{}_{}.dat", tablet_path, rowset_id, segment);
1753
0
}
1754
1755
static Status download_file_action(std::string& remote_file_url, std::string& local_file_path,
1756
0
                                   uint64_t estimate_timeout, uint64_t file_size) {
1757
0
    auto download_cb = [remote_file_url, estimate_timeout, local_file_path,
1758
0
                        file_size](HttpClient* client) {
1759
0
        RETURN_IF_ERROR(client->init(remote_file_url));
1760
0
        client->set_timeout_ms(estimate_timeout * 1000);
1761
0
        RETURN_IF_ERROR(client->download(local_file_path));
1762
1763
0
        if (file_size > 0) {
1764
            // Check file length
1765
0
            uint64_t local_file_size = std::filesystem::file_size(local_file_path);
1766
0
            if (local_file_size != file_size) {
1767
0
                LOG(WARNING) << "failed to pull rowset for slave replica. download file "
1768
0
                                "length error"
1769
0
                             << ", remote_path=" << remote_file_url << ", file_size=" << file_size
1770
0
                             << ", local_file_size=" << local_file_size;
1771
0
                return Status::InternalError("downloaded file size is not equal");
1772
0
            }
1773
0
        }
1774
1775
0
        return io::global_local_filesystem()->permission(local_file_path,
1776
0
                                                         io::LocalFileSystem::PERMS_OWNER_RW);
1777
0
    };
1778
0
    return HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, download_cb);
1779
0
}
1780
1781
void PInternalServiceImpl::request_slave_tablet_pull_rowset(
1782
        google::protobuf::RpcController* controller, const PTabletWriteSlaveRequest* request,
1783
0
        PTabletWriteSlaveResult* response, google::protobuf::Closure* done) {
1784
0
    brpc::ClosureGuard closure_guard(done);
1785
0
    RowsetMetaPB rowset_meta_pb = request->rowset_meta();
1786
0
    std::string rowset_path = request->rowset_path();
1787
0
    google::protobuf::Map<int64, int64> segments_size = request->segments_size();
1788
0
    google::protobuf::Map<int64, PTabletWriteSlaveRequest_IndexSizeMap> indices_size =
1789
0
            request->inverted_indices_size();
1790
0
    std::string host = request->host();
1791
0
    int64_t http_port = request->http_port();
1792
0
    int64_t brpc_port = request->brpc_port();
1793
0
    std::string token = request->token();
1794
0
    int64_t node_id = request->node_id();
1795
0
    bool ret = _heavy_work_pool.try_offer([rowset_meta_pb, host, brpc_port, node_id, segments_size,
1796
0
                                           indices_size, http_port, token, rowset_path, this]() {
1797
0
        TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
1798
0
                rowset_meta_pb.tablet_id(), rowset_meta_pb.tablet_schema_hash());
1799
0
        if (tablet == nullptr) {
1800
0
            LOG(WARNING) << "failed to pull rowset for slave replica. tablet ["
1801
0
                         << rowset_meta_pb.tablet_id()
1802
0
                         << "] is not exist. txn_id=" << rowset_meta_pb.txn_id();
1803
0
            _response_pull_slave_rowset(host, brpc_port, rowset_meta_pb.txn_id(),
1804
0
                                        rowset_meta_pb.tablet_id(), node_id, false);
1805
0
            return;
1806
0
        }
1807
1808
0
        RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
1809
0
        std::string rowset_meta_str;
1810
0
        bool ret = rowset_meta_pb.SerializeToString(&rowset_meta_str);
1811
0
        if (!ret) {
1812
0
            LOG(WARNING) << "failed to pull rowset for slave replica. serialize rowset meta "
1813
0
                            "failed. rowset_id="
1814
0
                         << rowset_meta_pb.rowset_id()
1815
0
                         << ", tablet_id=" << rowset_meta_pb.tablet_id()
1816
0
                         << ", txn_id=" << rowset_meta_pb.txn_id();
1817
0
            _response_pull_slave_rowset(host, brpc_port, rowset_meta_pb.txn_id(),
1818
0
                                        rowset_meta_pb.tablet_id(), node_id, false);
1819
0
            return;
1820
0
        }
1821
0
        bool parsed = rowset_meta->init(rowset_meta_str);
1822
0
        if (!parsed) {
1823
0
            LOG(WARNING) << "failed to pull rowset for slave replica. parse rowset meta string "
1824
0
                            "failed. rowset_id="
1825
0
                         << rowset_meta_pb.rowset_id()
1826
0
                         << ", tablet_id=" << rowset_meta_pb.tablet_id()
1827
0
                         << ", txn_id=" << rowset_meta_pb.txn_id();
1828
            // return false will break meta iterator, return true to skip this error
1829
0
            _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
1830
0
                                        rowset_meta->tablet_id(), node_id, false);
1831
0
            return;
1832
0
        }
1833
0
        RowsetId remote_rowset_id = rowset_meta->rowset_id();
1834
        // change rowset id because it maybe same as other local rowset
1835
0
        RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
1836
0
        auto pending_rs_guard =
1837
0
                StorageEngine::instance()->pending_local_rowsets().add(new_rowset_id);
1838
0
        rowset_meta->set_rowset_id(new_rowset_id);
1839
0
        rowset_meta->set_tablet_uid(tablet->tablet_uid());
1840
0
        VLOG_CRITICAL << "succeed to init rowset meta for slave replica. rowset_id="
1841
0
                      << rowset_meta->rowset_id() << ", tablet_id=" << rowset_meta->tablet_id()
1842
0
                      << ", txn_id=" << rowset_meta->txn_id();
1843
1844
0
        auto tablet_scheme = rowset_meta->tablet_schema();
1845
0
        for (auto& segment : segments_size) {
1846
0
            uint64_t file_size = segment.second;
1847
0
            uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024;
1848
0
            if (estimate_timeout < config::download_low_speed_time) {
1849
0
                estimate_timeout = config::download_low_speed_time;
1850
0
            }
1851
1852
0
            std::string remote_file_path =
1853
0
                    construct_file_path(rowset_path, remote_rowset_id.to_string(), segment.first);
1854
0
            std::string remote_file_url =
1855
0
                    construct_url(get_host_port(host, http_port), token, remote_file_path);
1856
1857
0
            std::string local_file_path = construct_file_path(
1858
0
                    tablet->tablet_path(), rowset_meta->rowset_id().to_string(), segment.first);
1859
1860
0
            auto st = download_file_action(remote_file_url, local_file_path, estimate_timeout,
1861
0
                                           file_size);
1862
0
            if (!st.ok()) {
1863
0
                LOG(WARNING) << "failed to pull rowset for slave replica. failed to download "
1864
0
                                "file. url="
1865
0
                             << remote_file_url << ", local_path=" << local_file_path
1866
0
                             << ", txn_id=" << rowset_meta->txn_id();
1867
0
                _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
1868
0
                                            rowset_meta->tablet_id(), node_id, false);
1869
0
                return;
1870
0
            }
1871
0
            VLOG_CRITICAL << "succeed to download file for slave replica. url=" << remote_file_url
1872
0
                          << ", local_path=" << local_file_path
1873
0
                          << ", txn_id=" << rowset_meta->txn_id();
1874
0
            if (indices_size.find(segment.first) != indices_size.end()) {
1875
0
                PTabletWriteSlaveRequest_IndexSizeMap segment_indices_size =
1876
0
                        indices_size.at(segment.first);
1877
1878
0
                for (auto index_size : segment_indices_size.index_sizes()) {
1879
0
                    auto index_id = index_size.indexid();
1880
0
                    auto size = index_size.size();
1881
0
                    auto suffix_path = index_size.suffix_path();
1882
0
                    std::string remote_inverted_index_file;
1883
0
                    std::string local_inverted_index_file;
1884
0
                    std::string remote_inverted_index_file_url;
1885
0
                    if (tablet_scheme->get_inverted_index_storage_format() !=
1886
0
                        InvertedIndexStorageFormatPB::V1) {
1887
0
                        remote_inverted_index_file =
1888
0
                                InvertedIndexDescriptor::get_index_file_name(remote_file_path);
1889
0
                        remote_inverted_index_file_url = construct_url(
1890
0
                                get_host_port(host, http_port), token, remote_inverted_index_file);
1891
1892
0
                        local_inverted_index_file =
1893
0
                                InvertedIndexDescriptor::get_index_file_name(local_file_path);
1894
0
                    } else {
1895
0
                        remote_inverted_index_file = InvertedIndexDescriptor::get_index_file_name(
1896
0
                                remote_file_path, index_id, suffix_path);
1897
0
                        remote_inverted_index_file_url = construct_url(
1898
0
                                get_host_port(host, http_port), token, remote_inverted_index_file);
1899
1900
0
                        local_inverted_index_file = InvertedIndexDescriptor::get_index_file_name(
1901
0
                                local_file_path, index_id, suffix_path);
1902
0
                    }
1903
0
                    st = download_file_action(remote_inverted_index_file_url,
1904
0
                                              local_inverted_index_file, estimate_timeout, size);
1905
0
                    if (!st.ok()) {
1906
0
                        LOG(WARNING) << "failed to pull rowset for slave replica. failed to "
1907
0
                                        "download "
1908
0
                                        "file. url="
1909
0
                                     << remote_inverted_index_file_url
1910
0
                                     << ", local_path=" << local_inverted_index_file
1911
0
                                     << ", txn_id=" << rowset_meta->txn_id();
1912
0
                        _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
1913
0
                                                    rowset_meta->tablet_id(), node_id, false);
1914
0
                        return;
1915
0
                    }
1916
1917
0
                    VLOG_CRITICAL
1918
0
                            << "succeed to download inverted index file for slave replica. url="
1919
0
                            << remote_inverted_index_file_url
1920
0
                            << ", local_path=" << local_inverted_index_file
1921
0
                            << ", txn_id=" << rowset_meta->txn_id();
1922
0
                }
1923
0
            }
1924
0
        }
1925
1926
0
        RowsetSharedPtr rowset;
1927
0
        Status create_status = RowsetFactory::create_rowset(
1928
0
                tablet->tablet_schema(), tablet->tablet_path(), rowset_meta, &rowset);
1929
0
        if (!create_status) {
1930
0
            LOG(WARNING) << "failed to create rowset from rowset meta for slave replica"
1931
0
                         << ". rowset_id: " << rowset_meta->rowset_id()
1932
0
                         << ", rowset_type: " << rowset_meta->rowset_type()
1933
0
                         << ", rowset_state: " << rowset_meta->rowset_state()
1934
0
                         << ", tablet_id=" << rowset_meta->tablet_id()
1935
0
                         << ", txn_id=" << rowset_meta->txn_id();
1936
0
            _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
1937
0
                                        rowset_meta->tablet_id(), node_id, false);
1938
0
            return;
1939
0
        }
1940
0
        if (rowset_meta->rowset_state() != RowsetStatePB::COMMITTED) {
1941
0
            LOG(WARNING) << "could not commit txn for slave replica because master rowset state is "
1942
0
                            "not committed, rowset_state="
1943
0
                         << rowset_meta->rowset_state()
1944
0
                         << ", tablet_id=" << rowset_meta->tablet_id()
1945
0
                         << ", txn_id=" << rowset_meta->txn_id();
1946
0
            _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
1947
0
                                        rowset_meta->tablet_id(), node_id, false);
1948
0
            return;
1949
0
        }
1950
0
        Status commit_txn_status = StorageEngine::instance()->txn_manager()->commit_txn(
1951
0
                tablet->data_dir()->get_meta(), rowset_meta->partition_id(), rowset_meta->txn_id(),
1952
0
                rowset_meta->tablet_id(), tablet->tablet_uid(), rowset_meta->load_id(), rowset,
1953
0
                std::move(pending_rs_guard), false);
1954
0
        if (!commit_txn_status && !commit_txn_status.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
1955
0
            LOG(WARNING) << "failed to add committed rowset for slave replica. rowset_id="
1956
0
                         << rowset_meta->rowset_id() << ", tablet_id=" << rowset_meta->tablet_id()
1957
0
                         << ", txn_id=" << rowset_meta->txn_id();
1958
0
            _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
1959
0
                                        rowset_meta->tablet_id(), node_id, false);
1960
0
            return;
1961
0
        }
1962
0
        VLOG_CRITICAL << "succeed to pull rowset for slave replica. successfully to add committed "
1963
0
                         "rowset: "
1964
0
                      << rowset_meta->rowset_id()
1965
0
                      << " to tablet, tablet_id=" << rowset_meta->tablet_id()
1966
0
                      << ", schema_hash=" << rowset_meta->tablet_schema_hash()
1967
0
                      << ", txn_id=" << rowset_meta->txn_id();
1968
0
        _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
1969
0
                                    rowset_meta->tablet_id(), node_id, true);
1970
0
    });
1971
0
    if (!ret) {
1972
0
        offer_failed(response, closure_guard.release(), _heavy_work_pool);
1973
0
        return;
1974
0
    }
1975
0
    Status::OK().to_protobuf(response->mutable_status());
1976
0
}
1977
1978
void PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote_host,
1979
                                                       int64_t brpc_port, int64_t txn_id,
1980
                                                       int64_t tablet_id, int64_t node_id,
1981
0
                                                       bool is_succeed) {
1982
0
    std::shared_ptr<PBackendService_Stub> stub =
1983
0
            ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(remote_host,
1984
0
                                                                             brpc_port);
1985
0
    if (stub == nullptr) {
1986
0
        LOG(WARNING) << "failed to response result of slave replica to master replica. get rpc "
1987
0
                        "stub failed, master host="
1988
0
                     << remote_host << ", port=" << brpc_port << ", tablet_id=" << tablet_id
1989
0
                     << ", txn_id=" << txn_id;
1990
0
        return;
1991
0
    }
1992
1993
0
    auto request = std::make_shared<PTabletWriteSlaveDoneRequest>();
1994
0
    request->set_txn_id(txn_id);
1995
0
    request->set_tablet_id(tablet_id);
1996
0
    request->set_node_id(node_id);
1997
0
    request->set_is_succeed(is_succeed);
1998
0
    auto pull_rowset_callback = DummyBrpcCallback<PTabletWriteSlaveDoneResult>::create_shared();
1999
0
    auto closure = AutoReleaseClosure<
2000
0
            PTabletWriteSlaveDoneRequest,
2001
0
            DummyBrpcCallback<PTabletWriteSlaveDoneResult>>::create_unique(request,
2002
0
                                                                           pull_rowset_callback);
2003
0
    closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000);
2004
0
    closure->cntl_->ignore_eovercrowded();
2005
0
    stub->response_slave_tablet_pull_rowset(closure->cntl_.get(), closure->request_.get(),
2006
0
                                            closure->response_.get(), closure.get());
2007
0
    closure.release();
2008
2009
0
    pull_rowset_callback->join();
2010
0
    if (pull_rowset_callback->cntl_->Failed()) {
2011
0
        LOG(WARNING) << "failed to response result of slave replica to master replica, error="
2012
0
                     << berror(pull_rowset_callback->cntl_->ErrorCode())
2013
0
                     << ", error_text=" << pull_rowset_callback->cntl_->ErrorText()
2014
0
                     << ", master host: " << remote_host << ", tablet_id=" << tablet_id
2015
0
                     << ", txn_id=" << txn_id;
2016
0
    }
2017
0
    VLOG_CRITICAL << "succeed to response the result of slave replica pull rowset to master "
2018
0
                     "replica. master host: "
2019
0
                  << remote_host << ". is_succeed=" << is_succeed << ", tablet_id=" << tablet_id
2020
0
                  << ", slave server=" << node_id << ", txn_id=" << txn_id;
2021
0
}
2022
2023
void PInternalServiceImpl::response_slave_tablet_pull_rowset(
2024
        google::protobuf::RpcController* controller, const PTabletWriteSlaveDoneRequest* request,
2025
0
        PTabletWriteSlaveDoneResult* response, google::protobuf::Closure* done) {
2026
0
    bool ret = _light_work_pool.try_offer([request, response, done]() {
2027
0
        brpc::ClosureGuard closure_guard(done);
2028
0
        VLOG_CRITICAL << "receive the result of slave replica pull rowset from slave replica. "
2029
0
                         "slave server="
2030
0
                      << request->node_id() << ", is_succeed=" << request->is_succeed()
2031
0
                      << ", tablet_id=" << request->tablet_id() << ", txn_id=" << request->txn_id();
2032
0
        StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset(
2033
0
                request->txn_id(), request->tablet_id(), request->node_id(), request->is_succeed());
2034
0
        Status::OK().to_protobuf(response->mutable_status());
2035
0
    });
2036
0
    if (!ret) {
2037
0
        offer_failed(response, done, _heavy_work_pool);
2038
0
        return;
2039
0
    }
2040
0
}
2041
2042
template <typename Func>
2043
0
auto scope_timer_run(Func fn, int64_t* cost) -> decltype(fn()) {
2044
0
    MonotonicStopWatch watch;
2045
0
    watch.start();
2046
0
    auto res = fn();
2047
0
    *cost += watch.elapsed_time() / 1000 / 1000;
2048
0
    return res;
2049
0
}
Unexecuted instantiation: internal_service.cpp:_ZN5doris15scope_timer_runIZNS_20PInternalServiceImpl10_multi_getERKNS_16PMultiGetRequestEPNS_17PMultiGetResponseEE3$_1EEDTclfp_EET_Pl
Unexecuted instantiation: internal_service.cpp:_ZN5doris15scope_timer_runIZNS_20PInternalServiceImpl10_multi_getERKNS_16PMultiGetRequestEPNS_17PMultiGetResponseEE3$_2EEDTclfp_EET_Pl
Unexecuted instantiation: internal_service.cpp:_ZN5doris15scope_timer_runIZNS_20PInternalServiceImpl10_multi_getERKNS_16PMultiGetRequestEPNS_17PMultiGetResponseEE3$_4EEDTclfp_EET_Pl
Unexecuted instantiation: internal_service.cpp:_ZN5doris15scope_timer_runIZNS_20PInternalServiceImpl10_multi_getERKNS_16PMultiGetRequestEPNS_17PMultiGetResponseEE3$_5EEDTclfp_EET_Pl
2050
2051
struct IteratorKey {
2052
    int64_t tablet_id;
2053
    RowsetId rowset_id;
2054
    uint64_t segment_id;
2055
    int slot_id;
2056
2057
    // unordered map std::equal_to
2058
0
    bool operator==(const IteratorKey& rhs) const {
2059
0
        return tablet_id == rhs.tablet_id && rowset_id == rhs.rowset_id &&
2060
0
               segment_id == rhs.segment_id && slot_id == rhs.slot_id;
2061
0
    }
2062
};
2063
2064
struct HashOfIteratorKey {
2065
0
    size_t operator()(const IteratorKey& key) const {
2066
0
        size_t seed = 0;
2067
0
        seed = HashUtil::hash64(&key.tablet_id, sizeof(key.tablet_id), seed);
2068
0
        seed = HashUtil::hash64(&key.rowset_id.hi, sizeof(key.rowset_id.hi), seed);
2069
0
        seed = HashUtil::hash64(&key.rowset_id.mi, sizeof(key.rowset_id.mi), seed);
2070
0
        seed = HashUtil::hash64(&key.rowset_id.lo, sizeof(key.rowset_id.lo), seed);
2071
0
        seed = HashUtil::hash64(&key.segment_id, sizeof(key.segment_id), seed);
2072
0
        seed = HashUtil::hash64(&key.slot_id, sizeof(key.slot_id), seed);
2073
0
        return seed;
2074
0
    }
2075
};
2076
2077
struct IteratorItem {
2078
    std::unique_ptr<ColumnIterator> iterator;
2079
    // for holding the reference of segment to avoid use after release
2080
    SegmentSharedPtr segment;
2081
};
2082
2083
Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
2084
0
                                        PMultiGetResponse* response) {
2085
0
    OlapReaderStatistics stats;
2086
0
    vectorized::Block result_block;
2087
0
    int64_t acquire_tablet_ms = 0;
2088
0
    int64_t acquire_rowsets_ms = 0;
2089
0
    int64_t acquire_segments_ms = 0;
2090
0
    int64_t lookup_row_data_ms = 0;
2091
2092
    // init desc
2093
0
    TupleDescriptor desc(request.desc());
2094
0
    std::vector<SlotDescriptor> slots;
2095
0
    slots.reserve(request.slots().size());
2096
0
    for (const auto& pslot : request.slots()) {
2097
0
        slots.push_back(SlotDescriptor(pslot));
2098
0
        desc.add_slot(&slots.back());
2099
0
    }
2100
2101
    // init read schema
2102
0
    TabletSchema full_read_schema;
2103
0
    for (const ColumnPB& column_pb : request.column_desc()) {
2104
0
        full_read_schema.append_column(TabletColumn(column_pb));
2105
0
    }
2106
2107
0
    std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey> iterator_map;
2108
    // read row by row
2109
0
    for (size_t i = 0; i < request.row_locs_size(); ++i) {
2110
0
        const auto& row_loc = request.row_locs(i);
2111
0
        MonotonicStopWatch watch;
2112
0
        watch.start();
2113
0
        TabletSharedPtr tablet = scope_timer_run(
2114
0
                [&]() {
2115
0
                    return StorageEngine::instance()->tablet_manager()->get_tablet(
2116
0
                            row_loc.tablet_id(), true /*include deleted*/);
2117
0
                },
2118
0
                &acquire_tablet_ms);
2119
0
        RowsetId rowset_id;
2120
0
        rowset_id.init(row_loc.rowset_id());
2121
0
        if (!tablet) {
2122
0
            continue;
2123
0
        }
2124
        // We ensured it's rowset is not released when init Tablet reader param, rowset->update_delayed_expired_timestamp();
2125
0
        BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(scope_timer_run(
2126
0
                [&]() { return StorageEngine::instance()->get_quering_rowset(rowset_id); },
2127
0
                &acquire_rowsets_ms));
2128
0
        if (!rowset) {
2129
0
            LOG(INFO) << "no such rowset " << rowset_id;
2130
0
            continue;
2131
0
        }
2132
0
        size_t row_size = 0;
2133
0
        Defer _defer([&]() {
2134
0
            LOG_EVERY_N(INFO, 100)
2135
0
                    << "multiget_data single_row, cost(us):" << watch.elapsed_time() / 1000
2136
0
                    << ", row_size:" << row_size;
2137
0
            *response->add_row_locs() = row_loc;
2138
0
        });
2139
0
        SegmentCacheHandle segment_cache;
2140
0
        RETURN_IF_ERROR(scope_timer_run(
2141
0
                [&]() {
2142
0
                    return SegmentLoader::instance()->load_segments(rowset, &segment_cache, true);
2143
0
                },
2144
0
                &acquire_segments_ms));
2145
        // find segment
2146
0
        auto it = std::find_if(segment_cache.get_segments().cbegin(),
2147
0
                               segment_cache.get_segments().cend(),
2148
0
                               [&row_loc](const segment_v2::SegmentSharedPtr& seg) {
2149
0
                                   return seg->id() == row_loc.segment_id();
2150
0
                               });
2151
0
        if (it == segment_cache.get_segments().end()) {
2152
0
            continue;
2153
0
        }
2154
0
        segment_v2::SegmentSharedPtr segment = *it;
2155
0
        GlobalRowLoacation row_location(row_loc.tablet_id(), rowset->rowset_id(),
2156
0
                                        row_loc.segment_id(), row_loc.ordinal_id());
2157
        // fetch by row store, more effcient way
2158
0
        if (request.fetch_row_store()) {
2159
0
            CHECK(tablet->tablet_schema()->store_row_column());
2160
0
            RowLocation loc(rowset_id, segment->id(), row_loc.ordinal_id());
2161
0
            string* value = response->add_binary_row_data();
2162
0
            RETURN_IF_ERROR(scope_timer_run(
2163
0
                    [&]() {
2164
0
                        return tablet->lookup_row_data({}, loc, rowset, &desc, stats, *value);
2165
0
                    },
2166
0
                    &lookup_row_data_ms));
2167
0
            row_size = value->size();
2168
0
            continue;
2169
0
        }
2170
2171
        // fetch by column store
2172
0
        if (result_block.is_empty_column()) {
2173
0
            result_block = vectorized::Block(desc.slots(), request.row_locs().size());
2174
0
        }
2175
0
        VLOG_DEBUG << "Read row location "
2176
0
                   << fmt::format("{}, {}, {}, {}", row_location.tablet_id,
2177
0
                                  row_location.row_location.rowset_id.to_string(),
2178
0
                                  row_location.row_location.segment_id,
2179
0
                                  row_location.row_location.row_id);
2180
0
        for (int x = 0; x < desc.slots().size(); ++x) {
2181
0
            auto row_id = static_cast<segment_v2::rowid_t>(row_loc.ordinal_id());
2182
0
            vectorized::MutableColumnPtr column =
2183
0
                    result_block.get_by_position(x).column->assume_mutable();
2184
0
            IteratorKey iterator_key {.tablet_id = tablet->tablet_id(),
2185
0
                                      .rowset_id = rowset_id,
2186
0
                                      .segment_id = row_loc.segment_id(),
2187
0
                                      .slot_id = desc.slots()[x]->id()};
2188
0
            IteratorItem& iterator_item = iterator_map[iterator_key];
2189
0
            if (iterator_item.segment == nullptr) {
2190
                // hold the reference
2191
0
                iterator_map[iterator_key].segment = segment;
2192
0
            }
2193
0
            segment = iterator_item.segment;
2194
0
            RETURN_IF_ERROR(segment->seek_and_read_by_rowid(full_read_schema, desc.slots()[x],
2195
0
                                                            row_id, column, stats,
2196
0
                                                            iterator_item.iterator));
2197
0
        }
2198
0
    }
2199
    // serialize block if not empty
2200
0
    if (!result_block.is_empty_column()) {
2201
0
        VLOG_DEBUG << "dump block:" << result_block.dump_data(0, 10)
2202
0
                   << ", be_exec_version:" << request.be_exec_version();
2203
0
        [[maybe_unused]] size_t compressed_size = 0;
2204
0
        [[maybe_unused]] size_t uncompressed_size = 0;
2205
0
        int be_exec_version = request.has_be_exec_version() ? request.be_exec_version() : 0;
2206
0
        RETURN_IF_ERROR(result_block.serialize(be_exec_version, response->mutable_block(),
2207
0
                                               &uncompressed_size, &compressed_size,
2208
0
                                               segment_v2::CompressionTypePB::LZ4));
2209
0
    }
2210
2211
0
    LOG(INFO) << "Query stats: "
2212
0
              << fmt::format(
2213
0
                         "hit_cached_pages:{}, total_pages_read:{}, compressed_bytes_read:{}, "
2214
0
                         "io_latency:{}ns, "
2215
0
                         "uncompressed_bytes_read:{},"
2216
0
                         "bytes_read:{},"
2217
0
                         "acquire_tablet_ms:{}, acquire_rowsets_ms:{}, acquire_segments_ms:{}, "
2218
0
                         "lookup_row_data_ms:{}",
2219
0
                         stats.cached_pages_num, stats.total_pages_num, stats.compressed_bytes_read,
2220
0
                         stats.io_ns, stats.uncompressed_bytes_read, stats.bytes_read,
2221
0
                         acquire_tablet_ms, acquire_rowsets_ms, acquire_segments_ms,
2222
0
                         lookup_row_data_ms);
2223
0
    return Status::OK();
2224
0
}
2225
2226
void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* controller,
2227
                                         const PMultiGetRequest* request,
2228
                                         PMultiGetResponse* response,
2229
0
                                         google::protobuf::Closure* done) {
2230
0
    bool ret = _light_work_pool.try_offer([request, response, done, this]() {
2231
0
        signal::set_signal_task_id(request->query_id());
2232
        // multi get data by rowid
2233
0
        MonotonicStopWatch watch;
2234
0
        watch.start();
2235
0
        brpc::ClosureGuard closure_guard(done);
2236
0
        response->mutable_status()->set_status_code(0);
2237
0
        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->rowid_storage_reader_tracker());
2238
0
        Status st = _multi_get(*request, response);
2239
0
        st.to_protobuf(response->mutable_status());
2240
0
        LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000;
2241
0
    });
2242
0
    if (!ret) {
2243
0
        offer_failed(response, done, _heavy_work_pool);
2244
0
        return;
2245
0
    }
2246
0
}
2247
2248
void PInternalServiceImpl::get_tablet_rowset_versions(google::protobuf::RpcController* cntl_base,
2249
                                                      const PGetTabletVersionsRequest* request,
2250
                                                      PGetTabletVersionsResponse* response,
2251
0
                                                      google::protobuf::Closure* done) {
2252
0
    brpc::ClosureGuard closure_guard(done);
2253
0
    VLOG_DEBUG << "receive get tablet versions request: " << request->DebugString();
2254
0
    StorageEngine::instance()->get_tablet_rowset_versions(request, response);
2255
0
}
2256
2257
void PInternalServiceImpl::glob(google::protobuf::RpcController* controller,
2258
                                const PGlobRequest* request, PGlobResponse* response,
2259
0
                                google::protobuf::Closure* done) {
2260
0
    bool ret = _heavy_work_pool.try_offer([request, response, done]() {
2261
0
        brpc::ClosureGuard closure_guard(done);
2262
0
        std::vector<io::FileInfo> files;
2263
0
        Status st = io::global_local_filesystem()->safe_glob(request->pattern(), &files);
2264
0
        if (st.ok()) {
2265
0
            for (auto& file : files) {
2266
0
                PGlobResponse_PFileInfo* pfile = response->add_files();
2267
0
                pfile->set_file(file.file_name);
2268
0
                pfile->set_size(file.file_size);
2269
0
            }
2270
0
        }
2271
0
        st.to_protobuf(response->mutable_status());
2272
0
    });
2273
0
    if (!ret) {
2274
0
        offer_failed(response, done, _heavy_work_pool);
2275
0
        return;
2276
0
    }
2277
0
}
2278
2279
void PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController* controller,
2280
                                               const PGroupCommitInsertRequest* request,
2281
                                               PGroupCommitInsertResponse* response,
2282
0
                                               google::protobuf::Closure* done) {
2283
0
    TUniqueId load_id;
2284
0
    load_id.__set_hi(request->load_id().hi());
2285
0
    load_id.__set_lo(request->load_id().lo());
2286
0
    std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
2287
0
    std::shared_ptr<bool> is_done = std::make_shared<bool>(false);
2288
0
    bool ret = _light_work_pool.try_offer([this, request, response, done, load_id, lock,
2289
0
                                           is_done]() {
2290
0
        brpc::ClosureGuard closure_guard(done);
2291
0
        std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
2292
0
        auto pipe = std::make_shared<io::StreamLoadPipe>(
2293
0
                io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
2294
0
                -1 /* total_length */, true /* use_proto */);
2295
0
        ctx->pipe = pipe;
2296
0
        Status st = _exec_env->new_load_stream_mgr()->put(load_id, ctx);
2297
0
        if (st.ok()) {
2298
0
            try {
2299
0
                st = _exec_plan_fragment_impl(
2300
0
                        request->exec_plan_fragment_request().request(),
2301
0
                        request->exec_plan_fragment_request().version(),
2302
0
                        request->exec_plan_fragment_request().compact(),
2303
0
                        [&, response, done, load_id, lock, is_done](RuntimeState* state,
2304
0
                                                                    Status* status) {
2305
0
                            std::lock_guard<std::mutex> lock1(*lock);
2306
0
                            if (*is_done) {
2307
0
                                return;
2308
0
                            }
2309
0
                            *is_done = true;
2310
0
                            brpc::ClosureGuard cb_closure_guard(done);
2311
0
                            response->set_label(state->import_label());
2312
0
                            response->set_txn_id(state->wal_id());
2313
0
                            response->set_loaded_rows(state->num_rows_load_success());
2314
0
                            response->set_filtered_rows(state->num_rows_load_filtered());
2315
0
                            status->to_protobuf(response->mutable_status());
2316
0
                            if (!state->get_error_log_file_path().empty()) {
2317
0
                                response->set_error_url(
2318
0
                                        to_load_error_http_path(state->get_error_log_file_path()));
2319
0
                            }
2320
0
                            _exec_env->new_load_stream_mgr()->remove(load_id);
2321
0
                        });
2322
0
            } catch (const Exception& e) {
2323
0
                st = e.to_status();
2324
0
            } catch (...) {
2325
0
                st = Status::Error(ErrorCode::INTERNAL_ERROR,
2326
0
                                   "_exec_plan_fragment_impl meet unknown error");
2327
0
            }
2328
0
            if (!st.ok()) {
2329
0
                LOG(WARNING) << "exec plan fragment failed, load_id=" << print_id(load_id)
2330
0
                             << ", errmsg=" << st;
2331
0
                std::lock_guard<std::mutex> lock1(*lock);
2332
0
                if (*is_done) {
2333
0
                    closure_guard.release();
2334
0
                } else {
2335
0
                    *is_done = true;
2336
0
                    st.to_protobuf(response->mutable_status());
2337
0
                    _exec_env->new_load_stream_mgr()->remove(load_id);
2338
0
                }
2339
0
            } else {
2340
0
                closure_guard.release();
2341
0
                for (int i = 0; i < request->data().size(); ++i) {
2342
0
                    std::unique_ptr<PDataRow> row(new PDataRow());
2343
0
                    row->CopyFrom(request->data(i));
2344
0
                    st = pipe->append(std::move(row));
2345
0
                    if (!st.ok()) {
2346
0
                        break;
2347
0
                    }
2348
0
                }
2349
0
                if (st.ok()) {
2350
0
                    static_cast<void>(pipe->finish());
2351
0
                }
2352
0
            }
2353
0
        }
2354
0
    });
2355
0
    if (!ret) {
2356
0
        _exec_env->new_load_stream_mgr()->remove(load_id);
2357
0
        offer_failed(response, done, _light_work_pool);
2358
0
        return;
2359
0
    }
2360
0
};
2361
2362
void PInternalServiceImpl::get_wal_queue_size(google::protobuf::RpcController* controller,
2363
                                              const PGetWalQueueSizeRequest* request,
2364
                                              PGetWalQueueSizeResponse* response,
2365
0
                                              google::protobuf::Closure* done) {
2366
0
    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
2367
0
        brpc::ClosureGuard closure_guard(done);
2368
0
        Status st = Status::OK();
2369
0
        auto table_id = request->table_id();
2370
0
        auto count = _exec_env->wal_mgr()->get_wal_queue_size(table_id);
2371
0
        response->set_size(count);
2372
0
        response->mutable_status()->set_status_code(st.code());
2373
0
    });
2374
0
    if (!ret) {
2375
0
        offer_failed(response, done, _light_work_pool);
2376
0
    }
2377
0
}
2378
2379
void PInternalServiceImpl::get_be_resource(google::protobuf::RpcController* controller,
2380
                                           const PGetBeResourceRequest* request,
2381
                                           PGetBeResourceResponse* response,
2382
0
                                           google::protobuf::Closure* done) {
2383
0
    bool ret = _light_work_pool.try_offer([response, done]() {
2384
0
        brpc::ClosureGuard closure_guard(done);
2385
0
        int64_t mem_limit = MemInfo::mem_limit();
2386
0
        int64_t mem_usage = PerfCounters::get_vm_rss();
2387
2388
0
        PGlobalResourceUsage* global_resource_usage = response->mutable_global_be_resource_usage();
2389
0
        global_resource_usage->set_mem_limit(mem_limit);
2390
0
        global_resource_usage->set_mem_usage(mem_usage);
2391
2392
0
        Status st = Status::OK();
2393
0
        response->mutable_status()->set_status_code(st.code());
2394
0
    });
2395
0
    if (!ret) {
2396
0
        offer_failed(response, done, _light_work_pool);
2397
0
    }
2398
0
}
2399
2400
} // namespace doris