/root/doris/be/src/service/internal_service.cpp
| Line | Count | Source | 
| 1 |  | // Licensed to the Apache Software Foundation (ASF) under one | 
| 2 |  | // or more contributor license agreements.  See the NOTICE file | 
| 3 |  | // distributed with this work for additional information | 
| 4 |  | // regarding copyright ownership.  The ASF licenses this file | 
| 5 |  | // to you under the Apache License, Version 2.0 (the | 
| 6 |  | // "License"); you may not use this file except in compliance | 
| 7 |  | // with the License.  You may obtain a copy of the License at | 
| 8 |  | // | 
| 9 |  | //   http://www.apache.org/licenses/LICENSE-2.0 | 
| 10 |  | // | 
| 11 |  | // Unless required by applicable law or agreed to in writing, | 
| 12 |  | // software distributed under the License is distributed on an | 
| 13 |  | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 14 |  | // KIND, either express or implied.  See the License for the | 
| 15 |  | // specific language governing permissions and limitations | 
| 16 |  | // under the License. | 
| 17 |  |  | 
| 18 |  | #include "service/internal_service.h" | 
| 19 |  |  | 
| 20 |  | #include <assert.h> | 
| 21 |  | #include <brpc/closure_guard.h> | 
| 22 |  | #include <brpc/controller.h> | 
| 23 |  | #include <bthread/bthread.h> | 
| 24 |  | #include <bthread/types.h> | 
| 25 |  | #include <butil/errno.h> | 
| 26 |  | #include <butil/iobuf.h> | 
| 27 |  | #include <fcntl.h> | 
| 28 |  | #include <fmt/core.h> | 
| 29 |  | #include <gen_cpp/DataSinks_types.h> | 
| 30 |  | #include <gen_cpp/MasterService_types.h> | 
| 31 |  | #include <gen_cpp/PaloInternalService_types.h> | 
| 32 |  | #include <gen_cpp/PlanNodes_types.h> | 
| 33 |  | #include <gen_cpp/Status_types.h> | 
| 34 |  | #include <gen_cpp/Types_types.h> | 
| 35 |  | #include <gen_cpp/internal_service.pb.h> | 
| 36 |  | #include <gen_cpp/olap_file.pb.h> | 
| 37 |  | #include <gen_cpp/segment_v2.pb.h> | 
| 38 |  | #include <gen_cpp/types.pb.h> | 
| 39 |  | #include <google/protobuf/stubs/callback.h> | 
| 40 |  | #include <stddef.h> | 
| 41 |  | #include <stdint.h> | 
| 42 |  | #include <sys/stat.h> | 
| 43 |  | #include <vec/data_types/data_type.h> | 
| 44 |  | #include <vec/exec/vjdbc_connector.h> | 
| 45 |  | #include <vec/sink/varrow_flight_result_writer.h> | 
| 46 |  |  | 
| 47 |  | #include <algorithm> | 
| 48 |  | #include <exception> | 
| 49 |  | #include <filesystem> | 
| 50 |  | #include <memory> | 
| 51 |  | #include <set> | 
| 52 |  | #include <sstream> | 
| 53 |  | #include <string> | 
| 54 |  | #include <utility> | 
| 55 |  | #include <vector> | 
| 56 |  |  | 
| 57 |  | #include "cloud/cloud_storage_engine.h" | 
| 58 |  | #include "cloud/cloud_tablet_mgr.h" | 
| 59 |  | #include "cloud/config.h" | 
| 60 |  | #include "common/config.h" | 
| 61 |  | #include "common/exception.h" | 
| 62 |  | #include "common/logging.h" | 
| 63 |  | #include "common/signal_handler.h" | 
| 64 |  | #include "common/status.h" | 
| 65 |  | #include "exec/rowid_fetcher.h" | 
| 66 |  | #include "http/http_client.h" | 
| 67 |  | #include "io/fs/local_file_system.h" | 
| 68 |  | #include "io/fs/stream_load_pipe.h" | 
| 69 |  | #include "io/io_common.h" | 
| 70 |  | #include "olap/data_dir.h" | 
| 71 |  | #include "olap/olap_common.h" | 
| 72 |  | #include "olap/olap_define.h" | 
| 73 |  | #include "olap/rowset/beta_rowset.h" | 
| 74 |  | #include "olap/rowset/rowset.h" | 
| 75 |  | #include "olap/rowset/rowset_factory.h" | 
| 76 |  | #include "olap/rowset/rowset_meta.h" | 
| 77 |  | #include "olap/rowset/segment_v2/column_reader.h" | 
| 78 |  | #include "olap/rowset/segment_v2/inverted_index_desc.h" | 
| 79 |  | #include "olap/storage_engine.h" | 
| 80 |  | #include "olap/tablet_fwd.h" | 
| 81 |  | #include "olap/tablet_manager.h" | 
| 82 |  | #include "olap/tablet_schema.h" | 
| 83 |  | #include "olap/txn_manager.h" | 
| 84 |  | #include "olap/wal/wal_manager.h" | 
| 85 |  | #include "runtime/cache/result_cache.h" | 
| 86 |  | #include "runtime/descriptors.h" | 
| 87 |  | #include "runtime/exec_env.h" | 
| 88 |  | #include "runtime/fold_constant_executor.h" | 
| 89 |  | #include "runtime/fragment_mgr.h" | 
| 90 |  | #include "runtime/load_channel_mgr.h" | 
| 91 |  | #include "runtime/load_stream_mgr.h" | 
| 92 |  | #include "runtime/result_block_buffer.h" | 
| 93 |  | #include "runtime/result_buffer_mgr.h" | 
| 94 |  | #include "runtime/routine_load/routine_load_task_executor.h" | 
| 95 |  | #include "runtime/stream_load/new_load_stream_mgr.h" | 
| 96 |  | #include "runtime/stream_load/stream_load_context.h" | 
| 97 |  | #include "runtime/thread_context.h" | 
| 98 |  | #include "runtime/types.h" | 
| 99 |  | #include "runtime/workload_group/workload_group.h" | 
| 100 |  | #include "runtime/workload_group/workload_group_manager.h" | 
| 101 |  | #include "service/backend_options.h" | 
| 102 |  | #include "service/point_query_executor.h" | 
| 103 |  | #include "util/arrow/row_batch.h" | 
| 104 |  | #include "util/async_io.h" | 
| 105 |  | #include "util/brpc_client_cache.h" | 
| 106 |  | #include "util/brpc_closure.h" | 
| 107 |  | #include "util/doris_metrics.h" | 
| 108 |  | #include "util/md5.h" | 
| 109 |  | #include "util/metrics.h" | 
| 110 |  | #include "util/network_util.h" | 
| 111 |  | #include "util/proto_util.h" | 
| 112 |  | #include "util/runtime_profile.h" | 
| 113 |  | #include "util/stopwatch.hpp" | 
| 114 |  | #include "util/string_util.h" | 
| 115 |  | #include "util/thrift_util.h" | 
| 116 |  | #include "util/time.h" | 
| 117 |  | #include "util/uid_util.h" | 
| 118 |  | #include "vec/common/schema_util.h" | 
| 119 |  | #include "vec/core/block.h" | 
| 120 |  | #include "vec/exec/format/avro//avro_jni_reader.h" | 
| 121 |  | #include "vec/exec/format/csv/csv_reader.h" | 
| 122 |  | #include "vec/exec/format/generic_reader.h" | 
| 123 |  | #include "vec/exec/format/json/new_json_reader.h" | 
| 124 |  | #include "vec/exec/format/orc/vorc_reader.h" | 
| 125 |  | #include "vec/exec/format/parquet/vparquet_reader.h" | 
| 126 |  | #include "vec/exec/format/text/text_reader.h" | 
| 127 |  | #include "vec/functions/dictionary_factory.h" | 
| 128 |  | #include "vec/jsonb/serialize.h" | 
| 129 |  | #include "vec/runtime/vdata_stream_mgr.h" | 
| 130 |  | #include "vec/sink/vmysql_result_writer.h" | 
| 131 |  |  | 
| 132 |  | namespace google { | 
| 133 |  | namespace protobuf { | 
| 134 |  | class RpcController; | 
| 135 |  | } // namespace protobuf | 
| 136 |  | } // namespace google | 
| 137 |  |  | 
| 138 |  | namespace doris { | 
| 139 |  | #include "common/compile_check_avoid_begin.h" | 
| 140 |  | using namespace ErrorCode; | 
| 141 |  |  | 
| 142 |  | const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3; | 
| 143 |  |  | 
| 144 |  | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_queue_size, MetricUnit::NOUNIT); | 
| 145 |  | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_queue_size, MetricUnit::NOUNIT); | 
| 146 |  | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_active_threads, MetricUnit::NOUNIT); | 
| 147 |  | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_active_threads, MetricUnit::NOUNIT); | 
| 148 |  |  | 
| 149 |  | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_max_queue_size, MetricUnit::NOUNIT); | 
| 150 |  | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size, MetricUnit::NOUNIT); | 
| 151 |  | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT); | 
| 152 |  | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT); | 
| 153 |  |  | 
| 154 |  | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_queue_size, MetricUnit::NOUNIT); | 
| 155 |  | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_active_threads, MetricUnit::NOUNIT); | 
| 156 |  | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_max_queue_size, MetricUnit::NOUNIT); | 
| 157 |  | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_max_threads, MetricUnit::NOUNIT); | 
| 158 |  |  | 
| 159 |  | static bvar::LatencyRecorder g_process_remote_fetch_rowsets_latency("process_remote_fetch_rowsets"); | 
| 160 |  |  | 
| 161 |  | bthread_key_t btls_key; | 
| 162 |  |  | 
| 163 | 0 | static void thread_context_deleter(void* d) { | 
| 164 | 0 |     delete static_cast<ThreadContext*>(d); | 
| 165 | 0 | } | 
| 166 |  |  | 
| 167 |  | template <typename T> | 
| 168 |  | concept CanCancel = requires(T* response) { response->mutable_status(); }; | 
| 169 |  |  | 
| 170 |  | template <typename T> | 
| 171 | 0 | void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) { | 
| 172 | 0 |     brpc::ClosureGuard closure_guard(done); | 
| 173 | 0 |     LOG(WARNING) << "fail to offer request to the work pool, pool=" << pool.get_info(); | 
| 174 | 0 | } Unexecuted instantiation: _ZN5doris12offer_failedINS_25PTabletWriterCancelResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedINS_14PCacheResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedINS_17PFetchCacheResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE | 
| 175 |  |  | 
| 176 |  | template <CanCancel T> | 
| 177 | 0 | void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) { | 
| 178 | 0 |     brpc::ClosureGuard closure_guard(done); | 
| 179 |  |     // Should use status to generate protobuf message, because it will encoding Backend Info | 
| 180 |  |     // into the error message and then we could know which backend's pool is full. | 
| 181 | 0 |     Status st = Status::Error<TStatusCode::CANCELLED>( | 
| 182 | 0 |             "fail to offer request to the work pool, pool={}", pool.get_info()); | 
| 183 | 0 |     st.to_protobuf(response->mutable_status()); | 
| 184 | 0 |     LOG(WARNING) << "cancelled due to fail to offer request to the work pool, pool=" | 
| 185 | 0 |                  << pool.get_info(); | 
| 186 | 0 | } Unexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_23PTabletWriterOpenResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_23PExecPlanFragmentResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_23POpenLoadStreamResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_27PTabletWriterAddBlockResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_25PCancelPlanFragmentResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_21PFetchArrowDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_26POutfileWriteSuccessResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_23PFetchTableSchemaResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_29PFetchArrowFlightSchemaResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_24PTabletKeyLookupResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_25PJdbcTestConnectionResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_20PFetchColIdsResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_26PFetchRemoteSchemaResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_12PProxyResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_20PMergeFilterResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_23PSendFilterSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_23PSyncFilterSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_22PPublishFilterResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_15PSendDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_13PCommitResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_15PRollbackResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_19PConstantExprResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_19PTransmitDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_24PCheckRPCChannelResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_24PResetRPCChannelResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_23PTabletWriteSlaveResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_27PTabletWriteSlaveDoneResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_17PMultiGetResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_13PGlobResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_26PGroupCommitInsertResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_24PGetWalQueueSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEEUnexecuted instantiation: _ZN5doris12offer_failedITkNS_9CanCancelENS_22PGetBeResourceResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE | 
| 187 |  |  | 
| 188 |  | template <typename T> | 
| 189 |  | class NewHttpClosure : public ::google::protobuf::Closure { | 
| 190 |  | public: | 
| 191 |  |     NewHttpClosure(google::protobuf::Closure* done) : _done(done) {} | 
| 192 | 0 |     NewHttpClosure(T* request, google::protobuf::Closure* done) : _request(request), _done(done) {}Unexecuted instantiation: _ZN5doris14NewHttpClosureINS_28PTabletWriterAddBlockRequestEEC2EPS1_PN6google8protobuf7ClosureEUnexecuted instantiation: _ZN5doris14NewHttpClosureINS_19PTransmitDataParamsEEC2EPS1_PN6google8protobuf7ClosureE | 
| 193 |  |  | 
| 194 | 0 |     void Run() override { | 
| 195 | 0 |         if (_request != nullptr) { | 
| 196 | 0 |             delete _request; | 
| 197 | 0 |             _request = nullptr; | 
| 198 | 0 |         } | 
| 199 | 0 |         if (_done != nullptr) { | 
| 200 | 0 |             _done->Run(); | 
| 201 | 0 |         } | 
| 202 | 0 |         delete this; | 
| 203 | 0 |     } Unexecuted instantiation: _ZN5doris14NewHttpClosureINS_28PTabletWriterAddBlockRequestEE3RunEvUnexecuted instantiation: _ZN5doris14NewHttpClosureINS_19PTransmitDataParamsEE3RunEv | 
| 204 |  |  | 
| 205 |  | private: | 
| 206 |  |     T* _request = nullptr; | 
| 207 |  |     google::protobuf::Closure* _done = nullptr; | 
| 208 |  | }; | 
| 209 |  |  | 
| 210 |  | PInternalService::PInternalService(ExecEnv* exec_env) | 
| 211 | 0 |         : _exec_env(exec_env), | 
| 212 |  |           // heavy threadpool is used for load process and other process that will read disk or access network. | 
| 213 | 0 |           _heavy_work_pool(config::brpc_heavy_work_pool_threads != -1 | 
| 214 | 0 |                                    ? config::brpc_heavy_work_pool_threads | 
| 215 | 0 |                                    : std::max(128, CpuInfo::num_cores() * 4), | 
| 216 | 0 |                            config::brpc_heavy_work_pool_max_queue_size != -1 | 
| 217 | 0 |                                    ? config::brpc_heavy_work_pool_max_queue_size | 
| 218 | 0 |                                    : std::max(10240, CpuInfo::num_cores() * 320), | 
| 219 | 0 |                            "brpc_heavy"), | 
| 220 |  |  | 
| 221 |  |           // light threadpool should be only used in query processing logic. All hanlers should be very light, not locked, not access disk. | 
| 222 | 0 |           _light_work_pool(config::brpc_light_work_pool_threads != -1 | 
| 223 | 0 |                                    ? config::brpc_light_work_pool_threads | 
| 224 | 0 |                                    : std::max(128, CpuInfo::num_cores() * 4), | 
| 225 | 0 |                            config::brpc_light_work_pool_max_queue_size != -1 | 
| 226 | 0 |                                    ? config::brpc_light_work_pool_max_queue_size | 
| 227 | 0 |                                    : std::max(10240, CpuInfo::num_cores() * 320), | 
| 228 | 0 |                            "brpc_light"), | 
| 229 | 0 |           _arrow_flight_work_pool(config::brpc_arrow_flight_work_pool_threads != -1 | 
| 230 | 0 |                                           ? config::brpc_arrow_flight_work_pool_threads | 
| 231 | 0 |                                           : std::max(512, CpuInfo::num_cores() * 2), | 
| 232 | 0 |                                   config::brpc_arrow_flight_work_pool_max_queue_size != -1 | 
| 233 | 0 |                                           ? config::brpc_arrow_flight_work_pool_max_queue_size | 
| 234 | 0 |                                           : std::max(20480, CpuInfo::num_cores() * 640), | 
| 235 | 0 |                                   "brpc_arrow_flight") { | 
| 236 | 0 |     REGISTER_HOOK_METRIC(heavy_work_pool_queue_size, | 
| 237 | 0 |                          [this]() { return _heavy_work_pool.get_queue_size(); }); | 
| 238 | 0 |     REGISTER_HOOK_METRIC(light_work_pool_queue_size, | 
| 239 | 0 |                          [this]() { return _light_work_pool.get_queue_size(); }); | 
| 240 | 0 |     REGISTER_HOOK_METRIC(heavy_work_active_threads, | 
| 241 | 0 |                          [this]() { return _heavy_work_pool.get_active_threads(); }); | 
| 242 | 0 |     REGISTER_HOOK_METRIC(light_work_active_threads, | 
| 243 | 0 |                          [this]() { return _light_work_pool.get_active_threads(); }); | 
| 244 |  | 
 | 
| 245 | 0 |     REGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size, | 
| 246 | 0 |                          []() { return config::brpc_heavy_work_pool_max_queue_size; }); | 
| 247 | 0 |     REGISTER_HOOK_METRIC(light_work_pool_max_queue_size, | 
| 248 | 0 |                          []() { return config::brpc_light_work_pool_max_queue_size; }); | 
| 249 | 0 |     REGISTER_HOOK_METRIC(heavy_work_max_threads, | 
| 250 | 0 |                          []() { return config::brpc_heavy_work_pool_threads; }); | 
| 251 | 0 |     REGISTER_HOOK_METRIC(light_work_max_threads, | 
| 252 | 0 |                          []() { return config::brpc_light_work_pool_threads; }); | 
| 253 |  | 
 | 
| 254 | 0 |     REGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size, | 
| 255 | 0 |                          [this]() { return _arrow_flight_work_pool.get_queue_size(); }); | 
| 256 | 0 |     REGISTER_HOOK_METRIC(arrow_flight_work_active_threads, | 
| 257 | 0 |                          [this]() { return _arrow_flight_work_pool.get_active_threads(); }); | 
| 258 | 0 |     REGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size, | 
| 259 | 0 |                          []() { return config::brpc_arrow_flight_work_pool_max_queue_size; }); | 
| 260 | 0 |     REGISTER_HOOK_METRIC(arrow_flight_work_max_threads, | 
| 261 | 0 |                          []() { return config::brpc_arrow_flight_work_pool_threads; }); | 
| 262 |  | 
 | 
| 263 | 0 |     _exec_env->load_stream_mgr()->set_heavy_work_pool(&_heavy_work_pool); | 
| 264 |  | 
 | 
| 265 | 0 |     CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter)); | 
| 266 | 0 |     CHECK_EQ(0, bthread_key_create(&AsyncIO::btls_io_ctx_key, AsyncIO::io_ctx_key_deleter)); | 
| 267 | 0 | } | 
| 268 |  |  | 
| 269 |  | PInternalServiceImpl::PInternalServiceImpl(StorageEngine& engine, ExecEnv* exec_env) | 
| 270 | 0 |         : PInternalService(exec_env), _engine(engine) {} | 
| 271 |  |  | 
| 272 | 0 | PInternalServiceImpl::~PInternalServiceImpl() = default; | 
| 273 |  |  | 
| 274 | 0 | PInternalService::~PInternalService() { | 
| 275 | 0 |     DEREGISTER_HOOK_METRIC(heavy_work_pool_queue_size); | 
| 276 | 0 |     DEREGISTER_HOOK_METRIC(light_work_pool_queue_size); | 
| 277 | 0 |     DEREGISTER_HOOK_METRIC(heavy_work_active_threads); | 
| 278 | 0 |     DEREGISTER_HOOK_METRIC(light_work_active_threads); | 
| 279 |  | 
 | 
| 280 | 0 |     DEREGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size); | 
| 281 | 0 |     DEREGISTER_HOOK_METRIC(light_work_pool_max_queue_size); | 
| 282 | 0 |     DEREGISTER_HOOK_METRIC(heavy_work_max_threads); | 
| 283 | 0 |     DEREGISTER_HOOK_METRIC(light_work_max_threads); | 
| 284 |  | 
 | 
| 285 | 0 |     DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size); | 
| 286 | 0 |     DEREGISTER_HOOK_METRIC(arrow_flight_work_active_threads); | 
| 287 | 0 |     DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size); | 
| 288 | 0 |     DEREGISTER_HOOK_METRIC(arrow_flight_work_max_threads); | 
| 289 |  | 
 | 
| 290 | 0 |     CHECK_EQ(0, bthread_key_delete(btls_key)); | 
| 291 | 0 |     CHECK_EQ(0, bthread_key_delete(AsyncIO::btls_io_ctx_key)); | 
| 292 | 0 | } | 
| 293 |  |  | 
| 294 |  | void PInternalService::tablet_writer_open(google::protobuf::RpcController* controller, | 
| 295 |  |                                           const PTabletWriterOpenRequest* request, | 
| 296 |  |                                           PTabletWriterOpenResult* response, | 
| 297 | 0 |                                           google::protobuf::Closure* done) { | 
| 298 | 0 |     bool ret = _heavy_work_pool.try_offer([this, request, response, done]() { | 
| 299 | 0 |         VLOG_RPC << "tablet writer open, id=" << request->id() | 
| 300 | 0 |                  << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); | 
| 301 | 0 |         signal::SignalTaskIdKeeper keeper(request->id()); | 
| 302 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 303 | 0 |         auto st = _exec_env->load_channel_mgr()->open(*request); | 
| 304 | 0 |         if (!st.ok()) { | 
| 305 | 0 |             LOG(WARNING) << "load channel open failed, message=" << st << ", id=" << request->id() | 
| 306 | 0 |                          << ", index_id=" << request->index_id() | 
| 307 | 0 |                          << ", txn_id=" << request->txn_id(); | 
| 308 | 0 |         } | 
| 309 | 0 |         st.to_protobuf(response->mutable_status()); | 
| 310 | 0 |     }); | 
| 311 | 0 |     if (!ret) { | 
| 312 | 0 |         offer_failed(response, done, _heavy_work_pool); | 
| 313 | 0 |         return; | 
| 314 | 0 |     } | 
| 315 | 0 | } | 
| 316 |  |  | 
| 317 |  | void PInternalService::exec_plan_fragment(google::protobuf::RpcController* controller, | 
| 318 |  |                                           const PExecPlanFragmentRequest* request, | 
| 319 |  |                                           PExecPlanFragmentResult* response, | 
| 320 | 0 |                                           google::protobuf::Closure* done) { | 
| 321 | 0 |     timeval tv {}; | 
| 322 | 0 |     gettimeofday(&tv, nullptr); | 
| 323 | 0 |     response->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000); | 
| 324 | 0 |     bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() { | 
| 325 | 0 |         _exec_plan_fragment_in_pthread(controller, request, response, done); | 
| 326 | 0 |     }); | 
| 327 | 0 |     if (!ret) { | 
| 328 | 0 |         offer_failed(response, done, _light_work_pool); | 
| 329 | 0 |         return; | 
| 330 | 0 |     } | 
| 331 | 0 | } | 
| 332 |  |  | 
| 333 |  | void PInternalService::_exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller, | 
| 334 |  |                                                       const PExecPlanFragmentRequest* request, | 
| 335 |  |                                                       PExecPlanFragmentResult* response, | 
| 336 | 0 |                                                       google::protobuf::Closure* done) { | 
| 337 | 0 |     timeval tv1 {}; | 
| 338 | 0 |     gettimeofday(&tv1, nullptr); | 
| 339 | 0 |     response->set_execution_time(tv1.tv_sec * 1000LL + tv1.tv_usec / 1000); | 
| 340 | 0 |     brpc::ClosureGuard closure_guard(done); | 
| 341 | 0 |     auto st = Status::OK(); | 
| 342 | 0 |     bool compact = request->has_compact() ? request->compact() : false; | 
| 343 | 0 |     PFragmentRequestVersion version = | 
| 344 | 0 |             request->has_version() ? request->version() : PFragmentRequestVersion::VERSION_1; | 
| 345 | 0 |     try { | 
| 346 | 0 |         st = _exec_plan_fragment_impl(request->request(), version, compact); | 
| 347 | 0 |     } catch (const Exception& e) { | 
| 348 | 0 |         st = e.to_status(); | 
| 349 | 0 |     } catch (const std::exception& e) { | 
| 350 | 0 |         st = Status::Error(ErrorCode::INTERNAL_ERROR, e.what()); | 
| 351 | 0 |     } catch (...) { | 
| 352 | 0 |         st = Status::Error(ErrorCode::INTERNAL_ERROR, | 
| 353 | 0 |                            "_exec_plan_fragment_impl meet unknown error"); | 
| 354 | 0 |     } | 
| 355 | 0 |     if (!st.ok()) { | 
| 356 | 0 |         LOG(WARNING) << "exec plan fragment failed, errmsg=" << st; | 
| 357 | 0 |     } | 
| 358 | 0 |     st.to_protobuf(response->mutable_status()); | 
| 359 | 0 |     timeval tv2 {}; | 
| 360 | 0 |     gettimeofday(&tv2, nullptr); | 
| 361 | 0 |     response->set_execution_done_time(tv2.tv_sec * 1000LL + tv2.tv_usec / 1000); | 
| 362 | 0 | } | 
| 363 |  |  | 
| 364 |  | void PInternalService::exec_plan_fragment_prepare(google::protobuf::RpcController* controller, | 
| 365 |  |                                                   const PExecPlanFragmentRequest* request, | 
| 366 |  |                                                   PExecPlanFragmentResult* response, | 
| 367 | 0 |                                                   google::protobuf::Closure* done) { | 
| 368 | 0 |     timeval tv {}; | 
| 369 | 0 |     gettimeofday(&tv, nullptr); | 
| 370 | 0 |     response->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000); | 
| 371 | 0 |     bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() { | 
| 372 | 0 |         _exec_plan_fragment_in_pthread(controller, request, response, done); | 
| 373 | 0 |     }); | 
| 374 | 0 |     if (!ret) { | 
| 375 | 0 |         offer_failed(response, done, _light_work_pool); | 
| 376 | 0 |         return; | 
| 377 | 0 |     } | 
| 378 | 0 | } | 
| 379 |  |  | 
| 380 |  | void PInternalService::exec_plan_fragment_start(google::protobuf::RpcController* /*controller*/, | 
| 381 |  |                                                 const PExecPlanFragmentStartRequest* request, | 
| 382 |  |                                                 PExecPlanFragmentResult* result, | 
| 383 | 0 |                                                 google::protobuf::Closure* done) { | 
| 384 | 0 |     timeval tv {}; | 
| 385 | 0 |     gettimeofday(&tv, nullptr); | 
| 386 | 0 |     result->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000); | 
| 387 | 0 |     bool ret = _light_work_pool.try_offer([this, request, result, done]() { | 
| 388 | 0 |         timeval tv1 {}; | 
| 389 | 0 |         gettimeofday(&tv1, nullptr); | 
| 390 | 0 |         result->set_execution_time(tv1.tv_sec * 1000LL + tv1.tv_usec / 1000); | 
| 391 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 392 | 0 |         auto st = _exec_env->fragment_mgr()->start_query_execution(request); | 
| 393 | 0 |         st.to_protobuf(result->mutable_status()); | 
| 394 | 0 |         timeval tv2 {}; | 
| 395 | 0 |         gettimeofday(&tv2, nullptr); | 
| 396 | 0 |         result->set_execution_done_time(tv2.tv_sec * 1000LL + tv2.tv_usec / 1000); | 
| 397 | 0 |     }); | 
| 398 | 0 |     if (!ret) { | 
| 399 | 0 |         offer_failed(result, done, _light_work_pool); | 
| 400 | 0 |         return; | 
| 401 | 0 |     } | 
| 402 | 0 | } | 
| 403 |  |  | 
| 404 |  | void PInternalService::open_load_stream(google::protobuf::RpcController* controller, | 
| 405 |  |                                         const POpenLoadStreamRequest* request, | 
| 406 |  |                                         POpenLoadStreamResponse* response, | 
| 407 | 0 |                                         google::protobuf::Closure* done) { | 
| 408 | 0 |     bool ret = _heavy_work_pool.try_offer([this, controller, request, response, done]() { | 
| 409 | 0 |         signal::SignalTaskIdKeeper keeper(request->load_id()); | 
| 410 | 0 |         brpc::ClosureGuard done_guard(done); | 
| 411 | 0 |         brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); | 
| 412 | 0 |         brpc::StreamOptions stream_options; | 
| 413 |  | 
 | 
| 414 | 0 |         LOG(INFO) << "open load stream, load_id=" << request->load_id() | 
| 415 | 0 |                   << ", src_id=" << request->src_id(); | 
| 416 |  | 
 | 
| 417 | 0 |         for (const auto& req : request->tablets()) { | 
| 418 | 0 |             BaseTabletSPtr tablet; | 
| 419 | 0 |             if (auto res = ExecEnv::get_tablet(req.tablet_id()); !res.has_value()) [[unlikely]] { | 
| 420 | 0 |                 auto st = std::move(res).error(); | 
| 421 | 0 |                 st.to_protobuf(response->mutable_status()); | 
| 422 | 0 |                 cntl->SetFailed(st.to_string()); | 
| 423 | 0 |                 return; | 
| 424 | 0 |             } else { | 
| 425 | 0 |                 tablet = std::move(res).value(); | 
| 426 | 0 |             } | 
| 427 | 0 |             auto resp = response->add_tablet_schemas(); | 
| 428 | 0 |             resp->set_index_id(req.index_id()); | 
| 429 | 0 |             resp->set_enable_unique_key_merge_on_write(tablet->enable_unique_key_merge_on_write()); | 
| 430 | 0 |             tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema()); | 
| 431 | 0 |         } | 
| 432 |  |  | 
| 433 | 0 |         LoadStream* load_stream = nullptr; | 
| 434 | 0 |         auto st = _exec_env->load_stream_mgr()->open_load_stream(request, load_stream); | 
| 435 | 0 |         if (!st.ok()) { | 
| 436 | 0 |             st.to_protobuf(response->mutable_status()); | 
| 437 | 0 |             return; | 
| 438 | 0 |         } | 
| 439 |  |  | 
| 440 | 0 |         stream_options.handler = load_stream; | 
| 441 | 0 |         stream_options.idle_timeout_ms = request->idle_timeout_ms(); | 
| 442 | 0 |         DBUG_EXECUTE_IF("PInternalServiceImpl.open_load_stream.set_idle_timeout", | 
| 443 | 0 |                         { stream_options.idle_timeout_ms = 1; }); | 
| 444 |  | 
 | 
| 445 | 0 |         StreamId streamid; | 
| 446 | 0 |         if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) { | 
| 447 | 0 |             st = Status::Cancelled("Fail to accept stream {}", streamid); | 
| 448 | 0 |             st.to_protobuf(response->mutable_status()); | 
| 449 | 0 |             cntl->SetFailed(st.to_string()); | 
| 450 | 0 |             return; | 
| 451 | 0 |         } | 
| 452 |  |  | 
| 453 | 0 |         VLOG_DEBUG << "get streamid =" << streamid; | 
| 454 | 0 |         st.to_protobuf(response->mutable_status()); | 
| 455 | 0 |     }); | 
| 456 | 0 |     if (!ret) { | 
| 457 | 0 |         offer_failed(response, done, _heavy_work_pool); | 
| 458 | 0 |     } | 
| 459 | 0 | } | 
| 460 |  |  | 
| 461 |  | void PInternalService::tablet_writer_add_block_by_http(google::protobuf::RpcController* controller, | 
| 462 |  |                                                        const ::doris::PEmptyRequest* request, | 
| 463 |  |                                                        PTabletWriterAddBlockResult* response, | 
| 464 | 0 |                                                        google::protobuf::Closure* done) { | 
| 465 | 0 |     PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest(); | 
| 466 | 0 |     google::protobuf::Closure* new_done = | 
| 467 | 0 |             new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request, done); | 
| 468 | 0 |     brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); | 
| 469 | 0 |     Status st = attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(new_request, | 
| 470 | 0 |                                                                                        cntl); | 
| 471 | 0 |     if (st.ok()) { | 
| 472 | 0 |         tablet_writer_add_block(controller, new_request, response, new_done); | 
| 473 | 0 |     } else { | 
| 474 | 0 |         st.to_protobuf(response->mutable_status()); | 
| 475 | 0 |     } | 
| 476 | 0 | } | 
| 477 |  |  | 
| 478 |  | void PInternalService::tablet_writer_add_block(google::protobuf::RpcController* controller, | 
| 479 |  |                                                const PTabletWriterAddBlockRequest* request, | 
| 480 |  |                                                PTabletWriterAddBlockResult* response, | 
| 481 | 0 |                                                google::protobuf::Closure* done) { | 
| 482 | 0 |     int64_t submit_task_time_ns = MonotonicNanos(); | 
| 483 | 0 |     bool ret = _heavy_work_pool.try_offer([request, response, done, submit_task_time_ns, this]() { | 
| 484 | 0 |         int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns; | 
| 485 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 486 | 0 |         int64_t execution_time_ns = 0; | 
| 487 | 0 |         { | 
| 488 | 0 |             SCOPED_RAW_TIMER(&execution_time_ns); | 
| 489 | 0 |             signal::SignalTaskIdKeeper keeper(request->id()); | 
| 490 | 0 |             auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); | 
| 491 | 0 |             if (!st.ok()) { | 
| 492 | 0 |                 LOG(WARNING) << "tablet writer add block failed, message=" << st | 
| 493 | 0 |                              << ", id=" << request->id() << ", index_id=" << request->index_id() | 
| 494 | 0 |                              << ", sender_id=" << request->sender_id() | 
| 495 | 0 |                              << ", backend id=" << request->backend_id(); | 
| 496 | 0 |             } | 
| 497 | 0 |             st.to_protobuf(response->mutable_status()); | 
| 498 | 0 |         } | 
| 499 | 0 |         response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO); | 
| 500 | 0 |         response->set_wait_execution_time_us(wait_execution_time_ns / NANOS_PER_MICRO); | 
| 501 | 0 |     }); | 
| 502 | 0 |     if (!ret) { | 
| 503 | 0 |         offer_failed(response, done, _heavy_work_pool); | 
| 504 | 0 |         return; | 
| 505 | 0 |     } | 
| 506 | 0 | } | 
| 507 |  |  | 
| 508 |  | void PInternalService::tablet_writer_cancel(google::protobuf::RpcController* controller, | 
| 509 |  |                                             const PTabletWriterCancelRequest* request, | 
| 510 |  |                                             PTabletWriterCancelResult* response, | 
| 511 | 0 |                                             google::protobuf::Closure* done) { | 
| 512 | 0 |     bool ret = _heavy_work_pool.try_offer([this, request, done]() { | 
| 513 | 0 |         VLOG_RPC << "tablet writer cancel, id=" << request->id() | 
| 514 | 0 |                  << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); | 
| 515 | 0 |         signal::SignalTaskIdKeeper keeper(request->id()); | 
| 516 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 517 | 0 |         auto st = _exec_env->load_channel_mgr()->cancel(*request); | 
| 518 | 0 |         if (!st.ok()) { | 
| 519 | 0 |             LOG(WARNING) << "tablet writer cancel failed, id=" << request->id() | 
| 520 | 0 |                          << ", index_id=" << request->index_id() | 
| 521 | 0 |                          << ", sender_id=" << request->sender_id(); | 
| 522 | 0 |         } | 
| 523 | 0 |     }); | 
| 524 | 0 |     if (!ret) { | 
| 525 | 0 |         offer_failed(response, done, _heavy_work_pool); | 
| 526 | 0 |         return; | 
| 527 | 0 |     } | 
| 528 | 0 | } | 
| 529 |  |  | 
| 530 |  | Status PInternalService::_exec_plan_fragment_impl( | 
| 531 |  |         const std::string& ser_request, PFragmentRequestVersion version, bool compact, | 
| 532 | 0 |         const std::function<void(RuntimeState*, Status*)>& cb) { | 
| 533 |  |     // Sometimes the BE do not receive the first heartbeat message and it receives request from FE | 
| 534 |  |     // If BE execute this fragment, it will core when it wants to get some property from master info. | 
| 535 | 0 |     if (ExecEnv::GetInstance()->cluster_info() == nullptr) { | 
| 536 | 0 |         return Status::InternalError( | 
| 537 | 0 |                 "Have not receive the first heartbeat message from master, not ready to provide " | 
| 538 | 0 |                 "service"); | 
| 539 | 0 |     } | 
| 540 | 0 |     CHECK(version == PFragmentRequestVersion::VERSION_3) | 
| 541 | 0 |             << "only support version 3, received " << version; | 
| 542 | 0 |     if (version == PFragmentRequestVersion::VERSION_3) { | 
| 543 | 0 |         TPipelineFragmentParamsList t_request; | 
| 544 | 0 |         { | 
| 545 | 0 |             const uint8_t* buf = (const uint8_t*)ser_request.data(); | 
| 546 | 0 |             uint32_t len = ser_request.size(); | 
| 547 | 0 |             RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request)); | 
| 548 | 0 |         } | 
| 549 |  |  | 
| 550 | 0 |         const auto& fragment_list = t_request.params_list; | 
| 551 | 0 |         if (fragment_list.empty()) { | 
| 552 | 0 |             return Status::InternalError("Invalid TPipelineFragmentParamsList!"); | 
| 553 | 0 |         } | 
| 554 | 0 |         MonotonicStopWatch timer; | 
| 555 | 0 |         timer.start(); | 
| 556 |  |  | 
| 557 |  |         // work for old version frontend | 
| 558 | 0 |         if (!t_request.__isset.runtime_filter_info) { | 
| 559 | 0 |             TRuntimeFilterInfo runtime_filter_info; | 
| 560 | 0 |             auto local_param = fragment_list[0].local_params[0]; | 
| 561 | 0 |             if (local_param.__isset.runtime_filter_params) { | 
| 562 | 0 |                 runtime_filter_info.__set_runtime_filter_params(local_param.runtime_filter_params); | 
| 563 | 0 |             } | 
| 564 | 0 |             if (local_param.__isset.topn_filter_descs) { | 
| 565 | 0 |                 runtime_filter_info.__set_topn_filter_descs(local_param.topn_filter_descs); | 
| 566 | 0 |             } | 
| 567 | 0 |             t_request.__set_runtime_filter_info(runtime_filter_info); | 
| 568 | 0 |         } | 
| 569 |  | 
 | 
| 570 | 0 |         for (const TPipelineFragmentParams& fragment : fragment_list) { | 
| 571 | 0 |             if (cb) { | 
| 572 | 0 |                 RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment( | 
| 573 | 0 |                         fragment, QuerySource::INTERNAL_FRONTEND, cb, t_request)); | 
| 574 | 0 |             } else { | 
| 575 | 0 |                 RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment( | 
| 576 | 0 |                         fragment, QuerySource::INTERNAL_FRONTEND, t_request)); | 
| 577 | 0 |             } | 
| 578 | 0 |         } | 
| 579 | 0 |         timer.stop(); | 
| 580 | 0 |         double cost_secs = static_cast<double>(timer.elapsed_time()) / 1000000000ULL; | 
| 581 | 0 |         if (cost_secs > 5) { | 
| 582 | 0 |             LOG_WARNING("Prepare {} fragments of query {} costs {} seconds, it costs too much", | 
| 583 | 0 |                         fragment_list.size(), print_id(fragment_list.front().query_id), cost_secs); | 
| 584 | 0 |         } | 
| 585 |  | 
 | 
| 586 | 0 |         return Status::OK(); | 
| 587 | 0 |     } else { | 
| 588 | 0 |         return Status::InternalError("invalid version"); | 
| 589 | 0 |     } | 
| 590 | 0 | } | 
| 591 |  |  | 
| 592 |  | void PInternalService::cancel_plan_fragment(google::protobuf::RpcController* /*controller*/, | 
| 593 |  |                                             const PCancelPlanFragmentRequest* request, | 
| 594 |  |                                             PCancelPlanFragmentResult* result, | 
| 595 | 0 |                                             google::protobuf::Closure* done) { | 
| 596 | 0 |     bool ret = _light_work_pool.try_offer([this, request, result, done]() { | 
| 597 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 598 | 0 |         signal::SignalTaskIdKeeper keeper(request->finst_id()); | 
| 599 | 0 |         Status st = Status::OK(); | 
| 600 |  | 
 | 
| 601 | 0 |         const bool has_cancel_reason = request->has_cancel_reason(); | 
| 602 | 0 |         const bool has_cancel_status = request->has_cancel_status(); | 
| 603 |  |         // During upgrade only LIMIT_REACH is used, other reason is changed to internal error | 
| 604 | 0 |         Status actual_cancel_status = Status::OK(); | 
| 605 |  |         // Convert PPlanFragmentCancelReason to Status | 
| 606 | 0 |         if (has_cancel_status) { | 
| 607 |  |             // If fe set cancel status, then it is new FE now, should use cancel status. | 
| 608 | 0 |             actual_cancel_status = Status::create<false>(request->cancel_status()); | 
| 609 | 0 |         } else if (has_cancel_reason) { | 
| 610 |  |             // If fe not set cancel status, but set cancel reason, should convert cancel reason | 
| 611 |  |             // to cancel status here. | 
| 612 | 0 |             if (request->cancel_reason() == PPlanFragmentCancelReason::LIMIT_REACH) { | 
| 613 | 0 |                 actual_cancel_status = Status::Error<ErrorCode::LIMIT_REACH>("limit reach"); | 
| 614 | 0 |             } else { | 
| 615 |  |                 // Use cancel reason as error message | 
| 616 | 0 |                 actual_cancel_status = Status::InternalError( | 
| 617 | 0 |                         PPlanFragmentCancelReason_Name(request->cancel_reason())); | 
| 618 | 0 |             } | 
| 619 | 0 |         } else { | 
| 620 | 0 |             actual_cancel_status = Status::InternalError("unknown error"); | 
| 621 | 0 |         } | 
| 622 |  | 
 | 
| 623 | 0 |         TUniqueId query_id; | 
| 624 | 0 |         query_id.__set_hi(request->query_id().hi()); | 
| 625 | 0 |         query_id.__set_lo(request->query_id().lo()); | 
| 626 | 0 |         LOG(INFO) << fmt::format("Cancel query {}, reason: {}", print_id(query_id), | 
| 627 | 0 |                                  actual_cancel_status.to_string()); | 
| 628 | 0 |         _exec_env->fragment_mgr()->cancel_query(query_id, actual_cancel_status); | 
| 629 |  |  | 
| 630 |  |         // TODO: the logic seems useless, cancel only return Status::OK. remove it | 
| 631 | 0 |         st.to_protobuf(result->mutable_status()); | 
| 632 | 0 |     }); | 
| 633 | 0 |     if (!ret) { | 
| 634 | 0 |         offer_failed(result, done, _light_work_pool); | 
| 635 | 0 |         return; | 
| 636 | 0 |     } | 
| 637 | 0 | } | 
| 638 |  |  | 
| 639 |  | void PInternalService::fetch_data(google::protobuf::RpcController* controller, | 
| 640 |  |                                   const PFetchDataRequest* request, PFetchDataResult* result, | 
| 641 | 0 |                                   google::protobuf::Closure* done) { | 
| 642 |  |     // fetch_data is a light operation which will put a request rather than wait inplace when there's no data ready. | 
| 643 |  |     // when there's data ready, use brpc to send. there's queue in brpc service. won't take it too long. | 
| 644 | 0 |     auto ctx = vectorized::GetResultBatchCtx::create_shared(result, done); | 
| 645 | 0 |     TUniqueId unique_id = UniqueId(request->finst_id()).to_thrift(); // query_id or instance_id | 
| 646 | 0 |     std::shared_ptr<vectorized::MySQLResultBlockBuffer> buffer; | 
| 647 | 0 |     Status st = ExecEnv::GetInstance()->result_mgr()->find_buffer(unique_id, buffer); | 
| 648 | 0 |     if (!st.ok()) { | 
| 649 | 0 |         LOG(WARNING) << "Result buffer not found! finst ID: " << print_id(unique_id); | 
| 650 | 0 |         return; | 
| 651 | 0 |     } | 
| 652 | 0 |     if (st = buffer->get_batch(ctx); !st.ok()) { | 
| 653 | 0 |         LOG(WARNING) << "fetch_data failed: " << st.to_string(); | 
| 654 | 0 |     } | 
| 655 | 0 | } | 
| 656 |  |  | 
| 657 |  | void PInternalService::fetch_arrow_data(google::protobuf::RpcController* controller, | 
| 658 |  |                                         const PFetchArrowDataRequest* request, | 
| 659 |  |                                         PFetchArrowDataResult* result, | 
| 660 | 0 |                                         google::protobuf::Closure* done) { | 
| 661 | 0 |     bool ret = _arrow_flight_work_pool.try_offer([request, result, done]() { | 
| 662 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 663 | 0 |         auto ctx = vectorized::GetArrowResultBatchCtx::create_shared(result); | 
| 664 | 0 |         TUniqueId unique_id = UniqueId(request->finst_id()).to_thrift(); // query_id or instance_id | 
| 665 | 0 |         std::shared_ptr<vectorized::ArrowFlightResultBlockBuffer> arrow_buffer; | 
| 666 | 0 |         auto st = ExecEnv::GetInstance()->result_mgr()->find_buffer(unique_id, arrow_buffer); | 
| 667 | 0 |         if (!st.ok()) { | 
| 668 | 0 |             LOG(WARNING) << "Result buffer not found! Query ID: " << print_id(unique_id); | 
| 669 | 0 |             return; | 
| 670 | 0 |         } | 
| 671 | 0 |         if (st = arrow_buffer->get_batch(ctx); !st.ok()) { | 
| 672 | 0 |             LOG(WARNING) << "fetch_arrow_data failed: " << st.to_string(); | 
| 673 | 0 |         } | 
| 674 | 0 |     }); | 
| 675 | 0 |     if (!ret) { | 
| 676 | 0 |         offer_failed(result, done, _arrow_flight_work_pool); | 
| 677 | 0 |         return; | 
| 678 | 0 |     } | 
| 679 | 0 | } | 
| 680 |  |  | 
| 681 |  | void PInternalService::outfile_write_success(google::protobuf::RpcController* controller, | 
| 682 |  |                                              const POutfileWriteSuccessRequest* request, | 
| 683 |  |                                              POutfileWriteSuccessResult* result, | 
| 684 | 0 |                                              google::protobuf::Closure* done) { | 
| 685 | 0 |     bool ret = _heavy_work_pool.try_offer([request, result, done]() { | 
| 686 | 0 |         VLOG_RPC << "outfile write success file"; | 
| 687 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 688 | 0 |         TResultFileSink result_file_sink; | 
| 689 | 0 |         Status st = Status::OK(); | 
| 690 | 0 |         { | 
| 691 | 0 |             const uint8_t* buf = (const uint8_t*)(request->result_file_sink().data()); | 
| 692 | 0 |             uint32_t len = request->result_file_sink().size(); | 
| 693 | 0 |             st = deserialize_thrift_msg(buf, &len, false, &result_file_sink); | 
| 694 | 0 |             if (!st.ok()) { | 
| 695 | 0 |                 LOG(WARNING) << "outfile write success file failed, errmsg = " << st; | 
| 696 | 0 |                 st.to_protobuf(result->mutable_status()); | 
| 697 | 0 |                 return; | 
| 698 | 0 |             } | 
| 699 | 0 |         } | 
| 700 |  |  | 
| 701 | 0 |         TResultFileSinkOptions file_options = result_file_sink.file_options; | 
| 702 | 0 |         std::stringstream ss; | 
| 703 | 0 |         ss << file_options.file_path << file_options.success_file_name; | 
| 704 | 0 |         std::string file_name = ss.str(); | 
| 705 | 0 |         if (result_file_sink.storage_backend_type == TStorageBackendType::LOCAL) { | 
| 706 |  |             // For local file writer, the file_path is a local dir. | 
| 707 |  |             // Here we do a simple security verification by checking whether the file exists. | 
| 708 |  |             // Because the file path is currently arbitrarily specified by the user, | 
| 709 |  |             // Doris is not responsible for ensuring the correctness of the path. | 
| 710 |  |             // This is just to prevent overwriting the existing file. | 
| 711 | 0 |             bool exists = true; | 
| 712 | 0 |             st = io::global_local_filesystem()->exists(file_name, &exists); | 
| 713 | 0 |             if (!st.ok()) { | 
| 714 | 0 |                 LOG(WARNING) << "outfile write success filefailed, errmsg = " << st; | 
| 715 | 0 |                 st.to_protobuf(result->mutable_status()); | 
| 716 | 0 |                 return; | 
| 717 | 0 |             } | 
| 718 | 0 |             if (exists) { | 
| 719 | 0 |                 st = Status::InternalError("File already exists: {}", file_name); | 
| 720 | 0 |             } | 
| 721 | 0 |             if (!st.ok()) { | 
| 722 | 0 |                 LOG(WARNING) << "outfile write success file failed, errmsg = " << st; | 
| 723 | 0 |                 st.to_protobuf(result->mutable_status()); | 
| 724 | 0 |                 return; | 
| 725 | 0 |             } | 
| 726 | 0 |         } | 
| 727 |  |  | 
| 728 | 0 |         auto&& res = FileFactory::create_file_writer( | 
| 729 | 0 |                 FileFactory::convert_storage_type(result_file_sink.storage_backend_type), | 
| 730 | 0 |                 ExecEnv::GetInstance(), file_options.broker_addresses, | 
| 731 | 0 |                 file_options.broker_properties, file_name, | 
| 732 | 0 |                 { | 
| 733 | 0 |                         .write_file_cache = false, | 
| 734 | 0 |                         .sync_file_data = false, | 
| 735 | 0 |                 }); | 
| 736 | 0 |         using T = std::decay_t<decltype(res)>; | 
| 737 | 0 |         if (!res.has_value()) [[unlikely]] { | 
| 738 | 0 |             st = std::forward<T>(res).error(); | 
| 739 | 0 |             st.to_protobuf(result->mutable_status()); | 
| 740 | 0 |             return; | 
| 741 | 0 |         } | 
| 742 |  |  | 
| 743 | 0 |         std::unique_ptr<doris::io::FileWriter> _file_writer_impl = std::forward<T>(res).value(); | 
| 744 |  |         // must write somthing because s3 file writer can not writer empty file | 
| 745 | 0 |         st = _file_writer_impl->append({"success"}); | 
| 746 | 0 |         if (!st.ok()) { | 
| 747 | 0 |             LOG(WARNING) << "outfile write success filefailed, errmsg=" << st; | 
| 748 | 0 |             st.to_protobuf(result->mutable_status()); | 
| 749 | 0 |             return; | 
| 750 | 0 |         } | 
| 751 | 0 |         st = _file_writer_impl->close(); | 
| 752 | 0 |         if (!st.ok()) { | 
| 753 | 0 |             LOG(WARNING) << "outfile write success filefailed, errmsg=" << st; | 
| 754 | 0 |             st.to_protobuf(result->mutable_status()); | 
| 755 | 0 |             return; | 
| 756 | 0 |         } | 
| 757 | 0 |     }); | 
| 758 | 0 |     if (!ret) { | 
| 759 | 0 |         offer_failed(result, done, _heavy_work_pool); | 
| 760 | 0 |         return; | 
| 761 | 0 |     } | 
| 762 | 0 | } | 
| 763 |  |  | 
| 764 |  | void PInternalService::fetch_table_schema(google::protobuf::RpcController* controller, | 
| 765 |  |                                           const PFetchTableSchemaRequest* request, | 
| 766 |  |                                           PFetchTableSchemaResult* result, | 
| 767 | 0 |                                           google::protobuf::Closure* done) { | 
| 768 | 0 |     bool ret = _heavy_work_pool.try_offer([request, result, done]() { | 
| 769 | 0 |         VLOG_RPC << "fetch table schema"; | 
| 770 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 771 | 0 |         TFileScanRange file_scan_range; | 
| 772 | 0 |         Status st = Status::OK(); | 
| 773 | 0 |         { | 
| 774 | 0 |             const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data()); | 
| 775 | 0 |             uint32_t len = request->file_scan_range().size(); | 
| 776 | 0 |             st = deserialize_thrift_msg(buf, &len, false, &file_scan_range); | 
| 777 | 0 |             if (!st.ok()) { | 
| 778 | 0 |                 LOG(WARNING) << "fetch table schema failed, errmsg=" << st; | 
| 779 | 0 |                 st.to_protobuf(result->mutable_status()); | 
| 780 | 0 |                 return; | 
| 781 | 0 |             } | 
| 782 | 0 |         } | 
| 783 | 0 |         if (file_scan_range.__isset.ranges == false) { | 
| 784 | 0 |             st = Status::InternalError("can not get TFileRangeDesc."); | 
| 785 | 0 |             st.to_protobuf(result->mutable_status()); | 
| 786 | 0 |             return; | 
| 787 | 0 |         } | 
| 788 | 0 |         if (file_scan_range.__isset.params == false) { | 
| 789 | 0 |             st = Status::InternalError("can not get TFileScanRangeParams."); | 
| 790 | 0 |             st.to_protobuf(result->mutable_status()); | 
| 791 | 0 |             return; | 
| 792 | 0 |         } | 
| 793 | 0 |         const TFileRangeDesc& range = file_scan_range.ranges.at(0); | 
| 794 | 0 |         const TFileScanRangeParams& params = file_scan_range.params; | 
| 795 |  | 
 | 
| 796 | 0 |         std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared( | 
| 797 | 0 |                 MemTrackerLimiter::Type::OTHER, | 
| 798 | 0 |                 fmt::format("InternalService::fetch_table_schema:{}#{}", params.format_type, | 
| 799 | 0 |                             params.file_type)); | 
| 800 | 0 |         SCOPED_ATTACH_TASK(mem_tracker); | 
| 801 |  |  | 
| 802 |  |         // make sure profile is desctructed after reader cause PrefetchBufferedReader | 
| 803 |  |         // might asynchronouslly access the profile | 
| 804 | 0 |         std::unique_ptr<RuntimeProfile> profile = | 
| 805 | 0 |                 std::make_unique<RuntimeProfile>("FetchTableSchema"); | 
| 806 | 0 |         std::unique_ptr<vectorized::GenericReader> reader(nullptr); | 
| 807 | 0 |         io::IOContext io_ctx; | 
| 808 | 0 |         io::FileCacheStatistics file_cache_statis; | 
| 809 | 0 |         io_ctx.file_cache_stats = &file_cache_statis; | 
| 810 | 0 |         io::FileReaderStats file_reader_stats; | 
| 811 | 0 |         io_ctx.file_reader_stats = &file_reader_stats; | 
| 812 |  |         // file_slots is no use, but the lifetime should be longer than reader | 
| 813 | 0 |         std::vector<SlotDescriptor*> file_slots; | 
| 814 | 0 |         switch (params.format_type) { | 
| 815 | 0 |         case TFileFormatType::FORMAT_CSV_PLAIN: | 
| 816 | 0 |         case TFileFormatType::FORMAT_CSV_GZ: | 
| 817 | 0 |         case TFileFormatType::FORMAT_CSV_BZ2: | 
| 818 | 0 |         case TFileFormatType::FORMAT_CSV_LZ4FRAME: | 
| 819 | 0 |         case TFileFormatType::FORMAT_CSV_LZ4BLOCK: | 
| 820 | 0 |         case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: | 
| 821 | 0 |         case TFileFormatType::FORMAT_CSV_LZOP: | 
| 822 | 0 |         case TFileFormatType::FORMAT_CSV_DEFLATE: { | 
| 823 | 0 |             reader = vectorized::CsvReader::create_unique(nullptr, profile.get(), nullptr, params, | 
| 824 | 0 |                                                           range, file_slots, &io_ctx); | 
| 825 | 0 |             break; | 
| 826 | 0 |         } | 
| 827 | 0 |         case TFileFormatType::FORMAT_TEXT: { | 
| 828 | 0 |             reader = vectorized::TextReader::create_unique(nullptr, profile.get(), nullptr, params, | 
| 829 | 0 |                                                            range, file_slots, &io_ctx); | 
| 830 | 0 |             break; | 
| 831 | 0 |         } | 
| 832 | 0 |         case TFileFormatType::FORMAT_PARQUET: { | 
| 833 | 0 |             reader = vectorized::ParquetReader::create_unique(params, range, &io_ctx, nullptr); | 
| 834 | 0 |             break; | 
| 835 | 0 |         } | 
| 836 | 0 |         case TFileFormatType::FORMAT_ORC: { | 
| 837 | 0 |             reader = vectorized::OrcReader::create_unique(params, range, "", &io_ctx); | 
| 838 | 0 |             break; | 
| 839 | 0 |         } | 
| 840 | 0 |         case TFileFormatType::FORMAT_JSON: { | 
| 841 | 0 |             reader = vectorized::NewJsonReader::create_unique(profile.get(), params, range, | 
| 842 | 0 |                                                               file_slots, &io_ctx); | 
| 843 | 0 |             break; | 
| 844 | 0 |         } | 
| 845 | 0 |         case TFileFormatType::FORMAT_AVRO: { | 
| 846 | 0 |             reader = vectorized::AvroJNIReader::create_unique(profile.get(), params, range, | 
| 847 | 0 |                                                               file_slots); | 
| 848 | 0 |             break; | 
| 849 | 0 |         } | 
| 850 | 0 |         default: | 
| 851 | 0 |             st = Status::InternalError("Not supported file format in fetch table schema: {}", | 
| 852 | 0 |                                        params.format_type); | 
| 853 | 0 |             st.to_protobuf(result->mutable_status()); | 
| 854 | 0 |             return; | 
| 855 | 0 |         } | 
| 856 | 0 |         if (!st.ok()) { | 
| 857 | 0 |             LOG(WARNING) << "failed to create reader, errmsg=" << st; | 
| 858 | 0 |             st.to_protobuf(result->mutable_status()); | 
| 859 | 0 |             return; | 
| 860 | 0 |         } | 
| 861 | 0 |         st = reader->init_schema_reader(); | 
| 862 | 0 |         if (!st.ok()) { | 
| 863 | 0 |             LOG(WARNING) << "failed to init reader, errmsg=" << st; | 
| 864 | 0 |             st.to_protobuf(result->mutable_status()); | 
| 865 | 0 |             return; | 
| 866 | 0 |         } | 
| 867 | 0 |         std::vector<std::string> col_names; | 
| 868 | 0 |         std::vector<vectorized::DataTypePtr> col_types; | 
| 869 | 0 |         st = reader->get_parsed_schema(&col_names, &col_types); | 
| 870 | 0 |         if (!st.ok()) { | 
| 871 | 0 |             LOG(WARNING) << "fetch table schema failed, errmsg=" << st; | 
| 872 | 0 |             st.to_protobuf(result->mutable_status()); | 
| 873 | 0 |             return; | 
| 874 | 0 |         } | 
| 875 | 0 |         result->set_column_nums(col_names.size()); | 
| 876 | 0 |         for (size_t idx = 0; idx < col_names.size(); ++idx) { | 
| 877 | 0 |             result->add_column_names(col_names[idx]); | 
| 878 | 0 |         } | 
| 879 | 0 |         for (size_t idx = 0; idx < col_types.size(); ++idx) { | 
| 880 | 0 |             PTypeDesc* type_desc = result->add_column_types(); | 
| 881 | 0 |             col_types[idx]->to_protobuf(type_desc); | 
| 882 | 0 |         } | 
| 883 | 0 |         st.to_protobuf(result->mutable_status()); | 
| 884 | 0 |     }); | 
| 885 | 0 |     if (!ret) { | 
| 886 | 0 |         offer_failed(result, done, _heavy_work_pool); | 
| 887 | 0 |         return; | 
| 888 | 0 |     } | 
| 889 | 0 | } | 
| 890 |  |  | 
| 891 |  | void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController* controller, | 
| 892 |  |                                                  const PFetchArrowFlightSchemaRequest* request, | 
| 893 |  |                                                  PFetchArrowFlightSchemaResult* result, | 
| 894 | 0 |                                                  google::protobuf::Closure* done) { | 
| 895 | 0 |     bool ret = _arrow_flight_work_pool.try_offer([request, result, done]() { | 
| 896 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 897 | 0 |         std::shared_ptr<arrow::Schema> schema; | 
| 898 | 0 |         std::shared_ptr<vectorized::ArrowFlightResultBlockBuffer> buffer; | 
| 899 | 0 |         auto st = ExecEnv::GetInstance()->result_mgr()->find_buffer( | 
| 900 | 0 |                 UniqueId(request->finst_id()).to_thrift(), buffer); | 
| 901 | 0 |         if (!st.ok()) { | 
| 902 | 0 |             LOG(WARNING) << "fetch arrow flight schema failed, errmsg=" << st; | 
| 903 | 0 |             st.to_protobuf(result->mutable_status()); | 
| 904 | 0 |             return; | 
| 905 | 0 |         } | 
| 906 | 0 |         st = buffer->get_schema(&schema); | 
| 907 | 0 |         if (!st.ok()) { | 
| 908 | 0 |             LOG(WARNING) << "fetch arrow flight schema failed, errmsg=" << st; | 
| 909 | 0 |             st.to_protobuf(result->mutable_status()); | 
| 910 | 0 |             return; | 
| 911 | 0 |         } | 
| 912 |  |  | 
| 913 | 0 |         std::string schema_str; | 
| 914 | 0 |         st = serialize_arrow_schema(&schema, &schema_str); | 
| 915 | 0 |         if (st.ok()) { | 
| 916 | 0 |             result->set_schema(std::move(schema_str)); | 
| 917 | 0 |             if (!config::public_host.empty()) { | 
| 918 | 0 |                 result->set_be_arrow_flight_ip(config::public_host); | 
| 919 | 0 |             } | 
| 920 | 0 |             if (config::arrow_flight_sql_proxy_port != -1) { | 
| 921 | 0 |                 result->set_be_arrow_flight_port(config::arrow_flight_sql_proxy_port); | 
| 922 | 0 |             } | 
| 923 | 0 |         } | 
| 924 | 0 |         st.to_protobuf(result->mutable_status()); | 
| 925 | 0 |     }); | 
| 926 | 0 |     if (!ret) { | 
| 927 | 0 |         offer_failed(result, done, _arrow_flight_work_pool); | 
| 928 | 0 |         return; | 
| 929 | 0 |     } | 
| 930 | 0 | } | 
| 931 |  |  | 
| 932 |  | Status PInternalService::_tablet_fetch_data(const PTabletKeyLookupRequest* request, | 
| 933 | 0 |                                             PTabletKeyLookupResponse* response) { | 
| 934 | 0 |     PointQueryExecutor executor; | 
| 935 | 0 |     RETURN_IF_ERROR(executor.init(request, response)); | 
| 936 | 0 |     RETURN_IF_ERROR(executor.lookup_up()); | 
| 937 | 0 |     executor.print_profile(); | 
| 938 | 0 |     return Status::OK(); | 
| 939 | 0 | } | 
| 940 |  |  | 
| 941 |  | void PInternalService::tablet_fetch_data(google::protobuf::RpcController* controller, | 
| 942 |  |                                          const PTabletKeyLookupRequest* request, | 
| 943 |  |                                          PTabletKeyLookupResponse* response, | 
| 944 | 0 |                                          google::protobuf::Closure* done) { | 
| 945 | 0 |     bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() { | 
| 946 | 0 |         [[maybe_unused]] auto* cntl = static_cast<brpc::Controller*>(controller); | 
| 947 | 0 |         brpc::ClosureGuard guard(done); | 
| 948 | 0 |         Status st = _tablet_fetch_data(request, response); | 
| 949 | 0 |         st.to_protobuf(response->mutable_status()); | 
| 950 | 0 |     }); | 
| 951 | 0 |     if (!ret) { | 
| 952 | 0 |         offer_failed(response, done, _light_work_pool); | 
| 953 | 0 |         return; | 
| 954 | 0 |     } | 
| 955 | 0 | } | 
| 956 |  |  | 
| 957 |  | void PInternalService::test_jdbc_connection(google::protobuf::RpcController* controller, | 
| 958 |  |                                             const PJdbcTestConnectionRequest* request, | 
| 959 |  |                                             PJdbcTestConnectionResult* result, | 
| 960 | 0 |                                             google::protobuf::Closure* done) { | 
| 961 | 0 |     bool ret = _heavy_work_pool.try_offer([request, result, done]() { | 
| 962 | 0 |         VLOG_RPC << "test jdbc connection"; | 
| 963 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 964 | 0 |         std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared( | 
| 965 | 0 |                 MemTrackerLimiter::Type::OTHER, | 
| 966 | 0 |                 fmt::format("InternalService::test_jdbc_connection")); | 
| 967 | 0 |         SCOPED_ATTACH_TASK(mem_tracker); | 
| 968 | 0 |         TTableDescriptor table_desc; | 
| 969 | 0 |         vectorized::JdbcConnectorParam jdbc_param; | 
| 970 | 0 |         Status st = Status::OK(); | 
| 971 | 0 |         { | 
| 972 | 0 |             const uint8_t* buf = (const uint8_t*)request->jdbc_table().data(); | 
| 973 | 0 |             uint32_t len = request->jdbc_table().size(); | 
| 974 | 0 |             st = deserialize_thrift_msg(buf, &len, false, &table_desc); | 
| 975 | 0 |             if (!st.ok()) { | 
| 976 | 0 |                 LOG(WARNING) << "test jdbc connection failed, errmsg=" << st; | 
| 977 | 0 |                 st.to_protobuf(result->mutable_status()); | 
| 978 | 0 |                 return; | 
| 979 | 0 |             } | 
| 980 | 0 |         } | 
| 981 | 0 |         TJdbcTable jdbc_table = (table_desc.jdbcTable); | 
| 982 | 0 |         jdbc_param.catalog_id = jdbc_table.catalog_id; | 
| 983 | 0 |         jdbc_param.driver_class = jdbc_table.jdbc_driver_class; | 
| 984 | 0 |         jdbc_param.driver_path = jdbc_table.jdbc_driver_url; | 
| 985 | 0 |         jdbc_param.driver_checksum = jdbc_table.jdbc_driver_checksum; | 
| 986 | 0 |         jdbc_param.jdbc_url = jdbc_table.jdbc_url; | 
| 987 | 0 |         jdbc_param.user = jdbc_table.jdbc_user; | 
| 988 | 0 |         jdbc_param.passwd = jdbc_table.jdbc_password; | 
| 989 | 0 |         jdbc_param.query_string = request->query_str(); | 
| 990 | 0 |         jdbc_param.table_type = static_cast<TOdbcTableType::type>(request->jdbc_table_type()); | 
| 991 | 0 |         jdbc_param.use_transaction = false; | 
| 992 | 0 |         jdbc_param.connection_pool_min_size = jdbc_table.connection_pool_min_size; | 
| 993 | 0 |         jdbc_param.connection_pool_max_size = jdbc_table.connection_pool_max_size; | 
| 994 | 0 |         jdbc_param.connection_pool_max_life_time = jdbc_table.connection_pool_max_life_time; | 
| 995 | 0 |         jdbc_param.connection_pool_max_wait_time = jdbc_table.connection_pool_max_wait_time; | 
| 996 | 0 |         jdbc_param.connection_pool_keep_alive = jdbc_table.connection_pool_keep_alive; | 
| 997 |  | 
 | 
| 998 | 0 |         std::unique_ptr<vectorized::JdbcConnector> jdbc_connector; | 
| 999 | 0 |         jdbc_connector.reset(new (std::nothrow) vectorized::JdbcConnector(jdbc_param)); | 
| 1000 |  | 
 | 
| 1001 | 0 |         st = jdbc_connector->test_connection(); | 
| 1002 | 0 |         st.to_protobuf(result->mutable_status()); | 
| 1003 |  | 
 | 
| 1004 | 0 |         Status clean_st = jdbc_connector->clean_datasource(); | 
| 1005 | 0 |         if (!clean_st.ok()) { | 
| 1006 | 0 |             LOG(WARNING) << "Failed to clean JDBC datasource: " << clean_st.msg(); | 
| 1007 | 0 |         } | 
| 1008 | 0 |         Status close_st = jdbc_connector->close(); | 
| 1009 | 0 |         if (!close_st.ok()) { | 
| 1010 | 0 |             LOG(WARNING) << "Failed to close JDBC connector: " << close_st.msg(); | 
| 1011 | 0 |         } | 
| 1012 | 0 |     }); | 
| 1013 |  | 
 | 
| 1014 | 0 |     if (!ret) { | 
| 1015 | 0 |         offer_failed(result, done, _heavy_work_pool); | 
| 1016 | 0 |         return; | 
| 1017 | 0 |     } | 
| 1018 | 0 | } | 
| 1019 |  |  | 
| 1020 |  | void PInternalServiceImpl::get_column_ids_by_tablet_ids(google::protobuf::RpcController* controller, | 
| 1021 |  |                                                         const PFetchColIdsRequest* request, | 
| 1022 |  |                                                         PFetchColIdsResponse* response, | 
| 1023 | 0 |                                                         google::protobuf::Closure* done) { | 
| 1024 | 0 |     bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() { | 
| 1025 | 0 |         _get_column_ids_by_tablet_ids(controller, request, response, done); | 
| 1026 | 0 |     }); | 
| 1027 | 0 |     if (!ret) { | 
| 1028 | 0 |         offer_failed(response, done, _light_work_pool); | 
| 1029 | 0 |         return; | 
| 1030 | 0 |     } | 
| 1031 | 0 | } | 
| 1032 |  |  | 
| 1033 |  | void PInternalServiceImpl::_get_column_ids_by_tablet_ids( | 
| 1034 |  |         google::protobuf::RpcController* controller, const PFetchColIdsRequest* request, | 
| 1035 | 0 |         PFetchColIdsResponse* response, google::protobuf::Closure* done) { | 
| 1036 | 0 |     brpc::ClosureGuard guard(done); | 
| 1037 | 0 |     [[maybe_unused]] auto* cntl = static_cast<brpc::Controller*>(controller); | 
| 1038 | 0 |     TabletManager* tablet_mgr = _engine.tablet_manager(); | 
| 1039 | 0 |     const auto& params = request->params(); | 
| 1040 | 0 |     for (const auto& param : params) { | 
| 1041 | 0 |         int64_t index_id = param.indexid(); | 
| 1042 | 0 |         const auto& tablet_ids = param.tablet_ids(); | 
| 1043 | 0 |         std::set<std::set<int32_t>> filter_set; | 
| 1044 | 0 |         std::map<int32_t, const TabletColumn*> id_to_column; | 
| 1045 | 0 |         for (const int64_t tablet_id : tablet_ids) { | 
| 1046 | 0 |             TabletSharedPtr tablet = tablet_mgr->get_tablet(tablet_id); | 
| 1047 | 0 |             if (tablet == nullptr) { | 
| 1048 | 0 |                 std::stringstream ss; | 
| 1049 | 0 |                 ss << "cannot get tablet by id:" << tablet_id; | 
| 1050 | 0 |                 LOG(WARNING) << ss.str(); | 
| 1051 | 0 |                 response->mutable_status()->set_status_code(TStatusCode::ILLEGAL_STATE); | 
| 1052 | 0 |                 response->mutable_status()->add_error_msgs(ss.str()); | 
| 1053 | 0 |                 return; | 
| 1054 | 0 |             } | 
| 1055 |  |             // check schema consistency, column ids should be the same | 
| 1056 | 0 |             const auto& columns = tablet->tablet_schema()->columns(); | 
| 1057 |  | 
 | 
| 1058 | 0 |             std::set<int32_t> column_ids; | 
| 1059 | 0 |             for (const auto& col : columns) { | 
| 1060 | 0 |                 column_ids.insert(col->unique_id()); | 
| 1061 | 0 |             } | 
| 1062 | 0 |             filter_set.insert(std::move(column_ids)); | 
| 1063 |  | 
 | 
| 1064 | 0 |             if (id_to_column.empty()) { | 
| 1065 | 0 |                 for (const auto& col : columns) { | 
| 1066 | 0 |                     id_to_column.insert(std::pair {col->unique_id(), col.get()}); | 
| 1067 | 0 |                 } | 
| 1068 | 0 |             } else { | 
| 1069 | 0 |                 for (const auto& col : columns) { | 
| 1070 | 0 |                     auto it = id_to_column.find(col->unique_id()); | 
| 1071 | 0 |                     if (it == id_to_column.end() || *(it->second) != *col) { | 
| 1072 | 0 |                         ColumnPB prev_col_pb; | 
| 1073 | 0 |                         ColumnPB curr_col_pb; | 
| 1074 | 0 |                         if (it != id_to_column.end()) { | 
| 1075 | 0 |                             it->second->to_schema_pb(&prev_col_pb); | 
| 1076 | 0 |                         } | 
| 1077 | 0 |                         col->to_schema_pb(&curr_col_pb); | 
| 1078 | 0 |                         std::stringstream ss; | 
| 1079 | 0 |                         ss << "consistency check failed: index{ " << index_id << " }" | 
| 1080 | 0 |                            << " got inconsistent schema, prev column: " << prev_col_pb.DebugString() | 
| 1081 | 0 |                            << " current column: " << curr_col_pb.DebugString(); | 
| 1082 | 0 |                         LOG(WARNING) << ss.str(); | 
| 1083 | 0 |                         response->mutable_status()->set_status_code(TStatusCode::ILLEGAL_STATE); | 
| 1084 | 0 |                         response->mutable_status()->add_error_msgs(ss.str()); | 
| 1085 | 0 |                         return; | 
| 1086 | 0 |                     } | 
| 1087 | 0 |                 } | 
| 1088 | 0 |             } | 
| 1089 | 0 |         } | 
| 1090 |  |  | 
| 1091 | 0 |         if (filter_set.size() > 1) { | 
| 1092 |  |             // consistecy check failed | 
| 1093 | 0 |             std::stringstream ss; | 
| 1094 | 0 |             ss << "consistency check failed: index{" << index_id << "}" | 
| 1095 | 0 |                << "got inconsistent schema"; | 
| 1096 | 0 |             LOG(WARNING) << ss.str(); | 
| 1097 | 0 |             response->mutable_status()->set_status_code(TStatusCode::ILLEGAL_STATE); | 
| 1098 | 0 |             response->mutable_status()->add_error_msgs(ss.str()); | 
| 1099 | 0 |             return; | 
| 1100 | 0 |         } | 
| 1101 |  |         // consistency check passed, use the first tablet to be the representative | 
| 1102 | 0 |         TabletSharedPtr tablet = tablet_mgr->get_tablet(tablet_ids[0]); | 
| 1103 | 0 |         const auto& columns = tablet->tablet_schema()->columns(); | 
| 1104 | 0 |         auto entry = response->add_entries(); | 
| 1105 | 0 |         entry->set_index_id(index_id); | 
| 1106 | 0 |         auto col_name_to_id = entry->mutable_col_name_to_id(); | 
| 1107 | 0 |         for (const auto& column : columns) { | 
| 1108 | 0 |             (*col_name_to_id)[column->name()] = column->unique_id(); | 
| 1109 | 0 |         } | 
| 1110 | 0 |     } | 
| 1111 | 0 |     response->mutable_status()->set_status_code(TStatusCode::OK); | 
| 1112 | 0 | } | 
| 1113 |  |  | 
| 1114 |  | template <class RPCResponse> | 
| 1115 |  | struct AsyncRPCContext { | 
| 1116 |  |     RPCResponse response; | 
| 1117 |  |     brpc::Controller cntl; | 
| 1118 |  |     brpc::CallId cid; | 
| 1119 |  | }; | 
| 1120 |  |  | 
| 1121 |  | void PInternalService::fetch_remote_tablet_schema(google::protobuf::RpcController* controller, | 
| 1122 |  |                                                   const PFetchRemoteSchemaRequest* request, | 
| 1123 |  |                                                   PFetchRemoteSchemaResponse* response, | 
| 1124 | 0 |                                                   google::protobuf::Closure* done) { | 
| 1125 | 0 |     bool ret = _heavy_work_pool.try_offer([request, response, done]() { | 
| 1126 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 1127 | 0 |         Status st = Status::OK(); | 
| 1128 | 0 |         std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared( | 
| 1129 | 0 |                 MemTrackerLimiter::Type::OTHER, | 
| 1130 | 0 |                 fmt::format("InternalService::fetch_remote_tablet_schema")); | 
| 1131 | 0 |         SCOPED_ATTACH_TASK(mem_tracker); | 
| 1132 | 0 |         if (request->is_coordinator()) { | 
| 1133 |  |             // Spawn rpc request to none coordinator nodes, and finally merge them all | 
| 1134 | 0 |             PFetchRemoteSchemaRequest remote_request(*request); | 
| 1135 |  |             // set it none coordinator to get merged schema | 
| 1136 | 0 |             remote_request.set_is_coordinator(false); | 
| 1137 | 0 |             using PFetchRemoteTabletSchemaRpcContext = AsyncRPCContext<PFetchRemoteSchemaResponse>; | 
| 1138 | 0 |             std::vector<PFetchRemoteTabletSchemaRpcContext> rpc_contexts( | 
| 1139 | 0 |                     request->tablet_location_size()); | 
| 1140 | 0 |             for (int i = 0; i < request->tablet_location_size(); ++i) { | 
| 1141 | 0 |                 std::string host = request->tablet_location(i).host(); | 
| 1142 | 0 |                 int32_t brpc_port = request->tablet_location(i).brpc_port(); | 
| 1143 | 0 |                 std::shared_ptr<PBackendService_Stub> stub( | 
| 1144 | 0 |                         ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( | 
| 1145 | 0 |                                 host, brpc_port)); | 
| 1146 | 0 |                 if (stub == nullptr) { | 
| 1147 | 0 |                     LOG(WARNING) << "Failed to init rpc to " << host << ":" << brpc_port; | 
| 1148 | 0 |                     st = Status::InternalError("Failed to init rpc to {}:{}", host, brpc_port); | 
| 1149 | 0 |                     continue; | 
| 1150 | 0 |                 } | 
| 1151 | 0 |                 rpc_contexts[i].cid = rpc_contexts[i].cntl.call_id(); | 
| 1152 | 0 |                 rpc_contexts[i].cntl.set_timeout_ms(config::fetch_remote_schema_rpc_timeout_ms); | 
| 1153 | 0 |                 stub->fetch_remote_tablet_schema(&rpc_contexts[i].cntl, &remote_request, | 
| 1154 | 0 |                                                  &rpc_contexts[i].response, brpc::DoNothing()); | 
| 1155 | 0 |             } | 
| 1156 | 0 |             std::vector<TabletSchemaSPtr> schemas; | 
| 1157 | 0 |             for (auto& rpc_context : rpc_contexts) { | 
| 1158 | 0 |                 brpc::Join(rpc_context.cid); | 
| 1159 | 0 |                 if (!st.ok()) { | 
| 1160 |  |                     // make sure all flying rpc request is joined | 
| 1161 | 0 |                     continue; | 
| 1162 | 0 |                 } | 
| 1163 | 0 |                 if (rpc_context.cntl.Failed()) { | 
| 1164 | 0 |                     LOG(WARNING) << "fetch_remote_tablet_schema rpc err:" | 
| 1165 | 0 |                                  << rpc_context.cntl.ErrorText(); | 
| 1166 | 0 |                     ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( | 
| 1167 | 0 |                             rpc_context.cntl.remote_side()); | 
| 1168 | 0 |                     st = Status::InternalError("fetch_remote_tablet_schema rpc err: {}", | 
| 1169 | 0 |                                                rpc_context.cntl.ErrorText()); | 
| 1170 | 0 |                 } | 
| 1171 | 0 |                 if (rpc_context.response.status().status_code() != 0) { | 
| 1172 | 0 |                     st = Status::create(rpc_context.response.status()); | 
| 1173 | 0 |                 } | 
| 1174 | 0 |                 if (rpc_context.response.has_merged_schema()) { | 
| 1175 | 0 |                     TabletSchemaSPtr schema = std::make_shared<TabletSchema>(); | 
| 1176 | 0 |                     schema->init_from_pb(rpc_context.response.merged_schema()); | 
| 1177 | 0 |                     schemas.push_back(schema); | 
| 1178 | 0 |                 } | 
| 1179 | 0 |             } | 
| 1180 | 0 |             if (!schemas.empty() && st.ok()) { | 
| 1181 |  |                 // merge all | 
| 1182 | 0 |                 TabletSchemaSPtr merged_schema; | 
| 1183 | 0 |                 st = vectorized::schema_util::get_least_common_schema(schemas, nullptr, | 
| 1184 | 0 |                                                                       merged_schema); | 
| 1185 | 0 |                 if (!st.ok()) { | 
| 1186 | 0 |                     LOG(WARNING) << "Failed to get least common schema: " << st.to_string(); | 
| 1187 | 0 |                     st = Status::InternalError("Failed to get least common schema: {}", | 
| 1188 | 0 |                                                st.to_string()); | 
| 1189 | 0 |                 } | 
| 1190 | 0 |                 VLOG_DEBUG << "dump schema:" << merged_schema->dump_structure(); | 
| 1191 | 0 |                 merged_schema->reserve_extracted_columns(); | 
| 1192 | 0 |                 merged_schema->to_schema_pb(response->mutable_merged_schema()); | 
| 1193 | 0 |             } | 
| 1194 | 0 |             st.to_protobuf(response->mutable_status()); | 
| 1195 | 0 |             return; | 
| 1196 | 0 |         } else { | 
| 1197 |  |             // This is not a coordinator, get it's tablet and merge schema | 
| 1198 | 0 |             std::vector<int64_t> target_tablets; | 
| 1199 | 0 |             for (int i = 0; i < request->tablet_location_size(); ++i) { | 
| 1200 | 0 |                 const auto& location = request->tablet_location(i); | 
| 1201 | 0 |                 auto backend = BackendOptions::get_local_backend(); | 
| 1202 |  |                 // If this is the target backend | 
| 1203 | 0 |                 if (backend.host == location.host() && config::brpc_port == location.brpc_port()) { | 
| 1204 | 0 |                     target_tablets.assign(location.tablet_id().begin(), location.tablet_id().end()); | 
| 1205 | 0 |                     break; | 
| 1206 | 0 |                 } | 
| 1207 | 0 |             } | 
| 1208 | 0 |             if (!target_tablets.empty()) { | 
| 1209 | 0 |                 std::vector<TabletSchemaSPtr> tablet_schemas; | 
| 1210 | 0 |                 for (int64_t tablet_id : target_tablets) { | 
| 1211 | 0 |                     auto res = ExecEnv::get_tablet(tablet_id); | 
| 1212 | 0 |                     if (!res.has_value()) { | 
| 1213 |  |                         // just ignore | 
| 1214 | 0 |                         LOG(WARNING) << "tablet does not exist, tablet id is " << tablet_id; | 
| 1215 | 0 |                         continue; | 
| 1216 | 0 |                     } | 
| 1217 | 0 |                     auto tablet = res.value(); | 
| 1218 | 0 |                     auto rowsets = tablet->get_snapshot_rowset(); | 
| 1219 | 0 |                     auto schema = vectorized::schema_util::VariantCompactionUtil:: | 
| 1220 | 0 |                             calculate_variant_extended_schema(rowsets, tablet->tablet_schema()); | 
| 1221 | 0 |                     tablet_schemas.push_back(schema); | 
| 1222 | 0 |                 } | 
| 1223 | 0 |                 if (!tablet_schemas.empty()) { | 
| 1224 |  |                     // merge all | 
| 1225 | 0 |                     TabletSchemaSPtr merged_schema; | 
| 1226 | 0 |                     st = vectorized::schema_util::get_least_common_schema(tablet_schemas, nullptr, | 
| 1227 | 0 |                                                                           merged_schema); | 
| 1228 | 0 |                     if (!st.ok()) { | 
| 1229 | 0 |                         LOG(WARNING) << "Failed to get least common schema: " << st.to_string(); | 
| 1230 | 0 |                         st = Status::InternalError("Failed to get least common schema: {}", | 
| 1231 | 0 |                                                    st.to_string()); | 
| 1232 | 0 |                     } | 
| 1233 | 0 |                     merged_schema->to_schema_pb(response->mutable_merged_schema()); | 
| 1234 | 0 |                     VLOG_DEBUG << "dump schema:" << merged_schema->dump_structure(); | 
| 1235 | 0 |                 } | 
| 1236 | 0 |             } | 
| 1237 | 0 |             st.to_protobuf(response->mutable_status()); | 
| 1238 | 0 |         } | 
| 1239 | 0 |     }); | 
| 1240 | 0 |     if (!ret) { | 
| 1241 | 0 |         offer_failed(response, done, _heavy_work_pool); | 
| 1242 | 0 |     } | 
| 1243 | 0 | } | 
| 1244 |  |  | 
| 1245 |  | void PInternalService::report_stream_load_status(google::protobuf::RpcController* controller, | 
| 1246 |  |                                                  const PReportStreamLoadStatusRequest* request, | 
| 1247 |  |                                                  PReportStreamLoadStatusResponse* response, | 
| 1248 | 0 |                                                  google::protobuf::Closure* done) { | 
| 1249 | 0 |     TUniqueId load_id; | 
| 1250 | 0 |     load_id.__set_hi(request->load_id().hi()); | 
| 1251 | 0 |     load_id.__set_lo(request->load_id().lo()); | 
| 1252 | 0 |     Status st = Status::OK(); | 
| 1253 | 0 |     auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id); | 
| 1254 | 0 |     if (!stream_load_ctx) { | 
| 1255 | 0 |         st = Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); | 
| 1256 | 0 |     } | 
| 1257 | 0 |     stream_load_ctx->promise.set_value(st); | 
| 1258 | 0 |     st.to_protobuf(response->mutable_status()); | 
| 1259 | 0 | } | 
| 1260 |  |  | 
| 1261 |  | void PInternalService::get_info(google::protobuf::RpcController* controller, | 
| 1262 |  |                                 const PProxyRequest* request, PProxyResult* response, | 
| 1263 | 0 |                                 google::protobuf::Closure* done) { | 
| 1264 | 0 |     bool ret = _exec_env->routine_load_task_executor()->get_thread_pool().submit_func([this, | 
| 1265 | 0 |                                                                                        request, | 
| 1266 | 0 |                                                                                        response, | 
| 1267 | 0 |                                                                                        done]() { | 
| 1268 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 1269 |  |         // PProxyRequest is defined in gensrc/proto/internal_service.proto | 
| 1270 |  |         // Currently it supports 2 kinds of requests: | 
| 1271 |  |         // 1. get all kafka partition ids for given topic | 
| 1272 |  |         // 2. get all kafka partition offsets for given topic and timestamp. | 
| 1273 | 0 |         int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() * 1000 : 60 * 1000; | 
| 1274 | 0 |         if (request->has_kafka_meta_request()) { | 
| 1275 | 0 |             const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); | 
| 1276 | 0 |             if (!kafka_request.offset_flags().empty()) { | 
| 1277 | 0 |                 std::vector<PIntegerPair> partition_offsets; | 
| 1278 | 0 |                 Status st = _exec_env->routine_load_task_executor() | 
| 1279 | 0 |                                     ->get_kafka_real_offsets_for_partitions( | 
| 1280 | 0 |                                             request->kafka_meta_request(), &partition_offsets, | 
| 1281 | 0 |                                             timeout_ms); | 
| 1282 | 0 |                 if (st.ok()) { | 
| 1283 | 0 |                     PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); | 
| 1284 | 0 |                     for (const auto& entry : partition_offsets) { | 
| 1285 | 0 |                         PIntegerPair* res = part_offsets->add_offset_times(); | 
| 1286 | 0 |                         res->set_key(entry.key()); | 
| 1287 | 0 |                         res->set_val(entry.val()); | 
| 1288 | 0 |                     } | 
| 1289 | 0 |                 } | 
| 1290 | 0 |                 st.to_protobuf(response->mutable_status()); | 
| 1291 | 0 |                 return; | 
| 1292 | 0 |             } else if (!kafka_request.partition_id_for_latest_offsets().empty()) { | 
| 1293 |  |                 // get latest offsets for specified partition ids | 
| 1294 | 0 |                 std::vector<PIntegerPair> partition_offsets; | 
| 1295 | 0 |                 Status st = _exec_env->routine_load_task_executor() | 
| 1296 | 0 |                                     ->get_kafka_latest_offsets_for_partitions( | 
| 1297 | 0 |                                             request->kafka_meta_request(), &partition_offsets, | 
| 1298 | 0 |                                             timeout_ms); | 
| 1299 | 0 |                 if (st.ok()) { | 
| 1300 | 0 |                     PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); | 
| 1301 | 0 |                     for (const auto& entry : partition_offsets) { | 
| 1302 | 0 |                         PIntegerPair* res = part_offsets->add_offset_times(); | 
| 1303 | 0 |                         res->set_key(entry.key()); | 
| 1304 | 0 |                         res->set_val(entry.val()); | 
| 1305 | 0 |                     } | 
| 1306 | 0 |                 } | 
| 1307 | 0 |                 st.to_protobuf(response->mutable_status()); | 
| 1308 | 0 |                 return; | 
| 1309 | 0 |             } else if (!kafka_request.offset_times().empty()) { | 
| 1310 |  |                 // if offset_times() has elements, which means this request is to get offset by timestamp. | 
| 1311 | 0 |                 std::vector<PIntegerPair> partition_offsets; | 
| 1312 | 0 |                 Status st = _exec_env->routine_load_task_executor() | 
| 1313 | 0 |                                     ->get_kafka_partition_offsets_for_times( | 
| 1314 | 0 |                                             request->kafka_meta_request(), &partition_offsets, | 
| 1315 | 0 |                                             timeout_ms); | 
| 1316 | 0 |                 if (st.ok()) { | 
| 1317 | 0 |                     PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); | 
| 1318 | 0 |                     for (const auto& entry : partition_offsets) { | 
| 1319 | 0 |                         PIntegerPair* res = part_offsets->add_offset_times(); | 
| 1320 | 0 |                         res->set_key(entry.key()); | 
| 1321 | 0 |                         res->set_val(entry.val()); | 
| 1322 | 0 |                     } | 
| 1323 | 0 |                 } | 
| 1324 | 0 |                 st.to_protobuf(response->mutable_status()); | 
| 1325 | 0 |                 return; | 
| 1326 | 0 |             } else { | 
| 1327 |  |                 // get partition ids of topic | 
| 1328 | 0 |                 std::vector<int32_t> partition_ids; | 
| 1329 | 0 |                 Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta( | 
| 1330 | 0 |                         request->kafka_meta_request(), &partition_ids); | 
| 1331 | 0 |                 if (st.ok()) { | 
| 1332 | 0 |                     PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result(); | 
| 1333 | 0 |                     for (int32_t id : partition_ids) { | 
| 1334 | 0 |                         kafka_result->add_partition_ids(id); | 
| 1335 | 0 |                     } | 
| 1336 | 0 |                 } | 
| 1337 | 0 |                 st.to_protobuf(response->mutable_status()); | 
| 1338 | 0 |                 return; | 
| 1339 | 0 |             } | 
| 1340 | 0 |         } | 
| 1341 | 0 |         Status::OK().to_protobuf(response->mutable_status()); | 
| 1342 | 0 |     }); | 
| 1343 | 0 |     if (!ret) { | 
| 1344 | 0 |         offer_failed(response, done, _heavy_work_pool); | 
| 1345 | 0 |         return; | 
| 1346 | 0 |     } | 
| 1347 | 0 | } | 
| 1348 |  |  | 
| 1349 |  | void PInternalService::update_cache(google::protobuf::RpcController* controller, | 
| 1350 |  |                                     const PUpdateCacheRequest* request, PCacheResponse* response, | 
| 1351 | 0 |                                     google::protobuf::Closure* done) { | 
| 1352 | 0 |     bool ret = _light_work_pool.try_offer([this, request, response, done]() { | 
| 1353 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 1354 | 0 |         _exec_env->result_cache()->update(request, response); | 
| 1355 | 0 |     }); | 
| 1356 | 0 |     if (!ret) { | 
| 1357 | 0 |         offer_failed(response, done, _light_work_pool); | 
| 1358 | 0 |         return; | 
| 1359 | 0 |     } | 
| 1360 | 0 | } | 
| 1361 |  |  | 
| 1362 |  | void PInternalService::fetch_cache(google::protobuf::RpcController* controller, | 
| 1363 |  |                                    const PFetchCacheRequest* request, PFetchCacheResult* result, | 
| 1364 | 0 |                                    google::protobuf::Closure* done) { | 
| 1365 | 0 |     bool ret = _light_work_pool.try_offer([this, request, result, done]() { | 
| 1366 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 1367 | 0 |         _exec_env->result_cache()->fetch(request, result); | 
| 1368 | 0 |     }); | 
| 1369 | 0 |     if (!ret) { | 
| 1370 | 0 |         offer_failed(result, done, _light_work_pool); | 
| 1371 | 0 |         return; | 
| 1372 | 0 |     } | 
| 1373 | 0 | } | 
| 1374 |  |  | 
| 1375 |  | void PInternalService::clear_cache(google::protobuf::RpcController* controller, | 
| 1376 |  |                                    const PClearCacheRequest* request, PCacheResponse* response, | 
| 1377 | 0 |                                    google::protobuf::Closure* done) { | 
| 1378 | 0 |     bool ret = _light_work_pool.try_offer([this, request, response, done]() { | 
| 1379 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 1380 | 0 |         _exec_env->result_cache()->clear(request, response); | 
| 1381 | 0 |     }); | 
| 1382 | 0 |     if (!ret) { | 
| 1383 | 0 |         offer_failed(response, done, _light_work_pool); | 
| 1384 | 0 |         return; | 
| 1385 | 0 |     } | 
| 1386 | 0 | } | 
| 1387 |  |  | 
| 1388 |  | void PInternalService::merge_filter(::google::protobuf::RpcController* controller, | 
| 1389 |  |                                     const ::doris::PMergeFilterRequest* request, | 
| 1390 |  |                                     ::doris::PMergeFilterResponse* response, | 
| 1391 | 0 |                                     ::google::protobuf::Closure* done) { | 
| 1392 | 0 |     bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() { | 
| 1393 | 0 |         signal::SignalTaskIdKeeper keeper(request->query_id()); | 
| 1394 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 1395 | 0 |         auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); | 
| 1396 | 0 |         butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); | 
| 1397 | 0 |         Status st; | 
| 1398 | 0 |         try { | 
| 1399 | 0 |             st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream); | 
| 1400 | 0 |         } catch (Exception& e) { | 
| 1401 | 0 |             st = e.to_status(); | 
| 1402 | 0 |         } | 
| 1403 | 0 |         st.to_protobuf(response->mutable_status()); | 
| 1404 | 0 |     }); | 
| 1405 | 0 |     if (!ret) { | 
| 1406 | 0 |         offer_failed(response, done, _light_work_pool); | 
| 1407 | 0 |         return; | 
| 1408 | 0 |     } | 
| 1409 | 0 | } | 
| 1410 |  |  | 
| 1411 |  | void PInternalService::send_filter_size(::google::protobuf::RpcController* controller, | 
| 1412 |  |                                         const ::doris::PSendFilterSizeRequest* request, | 
| 1413 |  |                                         ::doris::PSendFilterSizeResponse* response, | 
| 1414 | 0 |                                         ::google::protobuf::Closure* done) { | 
| 1415 | 0 |     bool ret = _light_work_pool.try_offer([this, request, response, done]() { | 
| 1416 | 0 |         signal::SignalTaskIdKeeper keeper(request->query_id()); | 
| 1417 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 1418 | 0 |         Status st; | 
| 1419 | 0 |         try { | 
| 1420 | 0 |             st = _exec_env->fragment_mgr()->send_filter_size(request); | 
| 1421 | 0 |         } catch (Exception& e) { | 
| 1422 | 0 |             st = e.to_status(); | 
| 1423 | 0 |         } | 
| 1424 | 0 |         st.to_protobuf(response->mutable_status()); | 
| 1425 | 0 |     }); | 
| 1426 | 0 |     if (!ret) { | 
| 1427 | 0 |         offer_failed(response, done, _light_work_pool); | 
| 1428 | 0 |         return; | 
| 1429 | 0 |     } | 
| 1430 | 0 | } | 
| 1431 |  |  | 
| 1432 |  | void PInternalService::sync_filter_size(::google::protobuf::RpcController* controller, | 
| 1433 |  |                                         const ::doris::PSyncFilterSizeRequest* request, | 
| 1434 |  |                                         ::doris::PSyncFilterSizeResponse* response, | 
| 1435 | 0 |                                         ::google::protobuf::Closure* done) { | 
| 1436 | 0 |     bool ret = _light_work_pool.try_offer([this, request, response, done]() { | 
| 1437 | 0 |         signal::SignalTaskIdKeeper keeper(request->query_id()); | 
| 1438 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 1439 | 0 |         Status st; | 
| 1440 | 0 |         try { | 
| 1441 | 0 |             st = _exec_env->fragment_mgr()->sync_filter_size(request); | 
| 1442 | 0 |         } catch (Exception& e) { | 
| 1443 | 0 |             st = e.to_status(); | 
| 1444 | 0 |         } | 
| 1445 | 0 |         st.to_protobuf(response->mutable_status()); | 
| 1446 | 0 |     }); | 
| 1447 | 0 |     if (!ret) { | 
| 1448 | 0 |         offer_failed(response, done, _light_work_pool); | 
| 1449 | 0 |         return; | 
| 1450 | 0 |     } | 
| 1451 | 0 | } | 
| 1452 |  |  | 
| 1453 |  | void PInternalService::apply_filterv2(::google::protobuf::RpcController* controller, | 
| 1454 |  |                                       const ::doris::PPublishFilterRequestV2* request, | 
| 1455 |  |                                       ::doris::PPublishFilterResponse* response, | 
| 1456 | 0 |                                       ::google::protobuf::Closure* done) { | 
| 1457 | 0 |     bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() { | 
| 1458 | 0 |         signal::SignalTaskIdKeeper keeper(request->query_id()); | 
| 1459 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 1460 | 0 |         auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); | 
| 1461 | 0 |         butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); | 
| 1462 | 0 |         VLOG_NOTICE << "rpc apply_filterv2 recv"; | 
| 1463 | 0 |         Status st; | 
| 1464 | 0 |         try { | 
| 1465 | 0 |             st = _exec_env->fragment_mgr()->apply_filterv2(request, &zero_copy_input_stream); | 
| 1466 | 0 |         } catch (Exception& e) { | 
| 1467 | 0 |             st = e.to_status(); | 
| 1468 | 0 |         } | 
| 1469 | 0 |         if (!st.ok()) { | 
| 1470 | 0 |             LOG(WARNING) << "apply filter meet error: " << st.to_string(); | 
| 1471 | 0 |         } | 
| 1472 | 0 |         st.to_protobuf(response->mutable_status()); | 
| 1473 | 0 |     }); | 
| 1474 | 0 |     if (!ret) { | 
| 1475 | 0 |         offer_failed(response, done, _light_work_pool); | 
| 1476 | 0 |         return; | 
| 1477 | 0 |     } | 
| 1478 | 0 | } | 
| 1479 |  |  | 
| 1480 |  | void PInternalService::send_data(google::protobuf::RpcController* controller, | 
| 1481 |  |                                  const PSendDataRequest* request, PSendDataResult* response, | 
| 1482 | 0 |                                  google::protobuf::Closure* done) { | 
| 1483 | 0 |     bool ret = _heavy_work_pool.try_offer([this, request, response, done]() { | 
| 1484 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 1485 | 0 |         TUniqueId load_id; | 
| 1486 | 0 |         load_id.hi = request->load_id().hi(); | 
| 1487 | 0 |         load_id.lo = request->load_id().lo(); | 
| 1488 |  |         // On 1.2.3 we add load id to send data request and using load id to get pipe | 
| 1489 | 0 |         auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id); | 
| 1490 | 0 |         if (stream_load_ctx == nullptr) { | 
| 1491 | 0 |             response->mutable_status()->set_status_code(1); | 
| 1492 | 0 |             response->mutable_status()->add_error_msgs("could not find stream load context"); | 
| 1493 | 0 |         } else { | 
| 1494 | 0 |             auto pipe = stream_load_ctx->pipe; | 
| 1495 | 0 |             for (int i = 0; i < request->data_size(); ++i) { | 
| 1496 | 0 |                 std::unique_ptr<PDataRow> row(new PDataRow()); | 
| 1497 | 0 |                 row->CopyFrom(request->data(i)); | 
| 1498 | 0 |                 Status s = pipe->append(std::move(row)); | 
| 1499 | 0 |                 if (!s.ok()) { | 
| 1500 | 0 |                     response->mutable_status()->set_status_code(1); | 
| 1501 | 0 |                     response->mutable_status()->add_error_msgs(s.to_string()); | 
| 1502 | 0 |                     return; | 
| 1503 | 0 |                 } | 
| 1504 | 0 |             } | 
| 1505 | 0 |             response->mutable_status()->set_status_code(0); | 
| 1506 | 0 |         } | 
| 1507 | 0 |     }); | 
| 1508 | 0 |     if (!ret) { | 
| 1509 | 0 |         offer_failed(response, done, _heavy_work_pool); | 
| 1510 | 0 |         return; | 
| 1511 | 0 |     } | 
| 1512 | 0 | } | 
| 1513 |  |  | 
| 1514 |  | void PInternalService::commit(google::protobuf::RpcController* controller, | 
| 1515 |  |                               const PCommitRequest* request, PCommitResult* response, | 
| 1516 | 0 |                               google::protobuf::Closure* done) { | 
| 1517 | 0 |     bool ret = _heavy_work_pool.try_offer([this, request, response, done]() { | 
| 1518 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 1519 | 0 |         TUniqueId load_id; | 
| 1520 | 0 |         load_id.hi = request->load_id().hi(); | 
| 1521 | 0 |         load_id.lo = request->load_id().lo(); | 
| 1522 |  | 
 | 
| 1523 | 0 |         auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id); | 
| 1524 | 0 |         if (stream_load_ctx == nullptr) { | 
| 1525 | 0 |             response->mutable_status()->set_status_code(1); | 
| 1526 | 0 |             response->mutable_status()->add_error_msgs("could not find stream load context"); | 
| 1527 | 0 |         } else { | 
| 1528 | 0 |             static_cast<void>(stream_load_ctx->pipe->finish()); | 
| 1529 | 0 |             response->mutable_status()->set_status_code(0); | 
| 1530 | 0 |         } | 
| 1531 | 0 |     }); | 
| 1532 | 0 |     if (!ret) { | 
| 1533 | 0 |         offer_failed(response, done, _heavy_work_pool); | 
| 1534 | 0 |         return; | 
| 1535 | 0 |     } | 
| 1536 | 0 | } | 
| 1537 |  |  | 
| 1538 |  | void PInternalService::rollback(google::protobuf::RpcController* controller, | 
| 1539 |  |                                 const PRollbackRequest* request, PRollbackResult* response, | 
| 1540 | 0 |                                 google::protobuf::Closure* done) { | 
| 1541 | 0 |     bool ret = _heavy_work_pool.try_offer([this, request, response, done]() { | 
| 1542 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 1543 | 0 |         TUniqueId load_id; | 
| 1544 | 0 |         load_id.hi = request->load_id().hi(); | 
| 1545 | 0 |         load_id.lo = request->load_id().lo(); | 
| 1546 | 0 |         auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id); | 
| 1547 | 0 |         if (stream_load_ctx == nullptr) { | 
| 1548 | 0 |             response->mutable_status()->set_status_code(1); | 
| 1549 | 0 |             response->mutable_status()->add_error_msgs("could not find stream load context"); | 
| 1550 | 0 |         } else { | 
| 1551 | 0 |             stream_load_ctx->pipe->cancel("rollback"); | 
| 1552 | 0 |             response->mutable_status()->set_status_code(0); | 
| 1553 | 0 |         } | 
| 1554 | 0 |     }); | 
| 1555 | 0 |     if (!ret) { | 
| 1556 | 0 |         offer_failed(response, done, _heavy_work_pool); | 
| 1557 | 0 |         return; | 
| 1558 | 0 |     } | 
| 1559 | 0 | } | 
| 1560 |  |  | 
| 1561 |  | void PInternalService::fold_constant_expr(google::protobuf::RpcController* controller, | 
| 1562 |  |                                           const PConstantExprRequest* request, | 
| 1563 |  |                                           PConstantExprResult* response, | 
| 1564 | 0 |                                           google::protobuf::Closure* done) { | 
| 1565 | 0 |     bool ret = _light_work_pool.try_offer([request, response, done]() { | 
| 1566 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 1567 | 0 |         TFoldConstantParams t_request; | 
| 1568 | 0 |         Status st = Status::OK(); | 
| 1569 | 0 |         { | 
| 1570 | 0 |             const uint8_t* buf = (const uint8_t*)request->request().data(); | 
| 1571 | 0 |             uint32_t len = request->request().size(); | 
| 1572 | 0 |             st = deserialize_thrift_msg(buf, &len, false, &t_request); | 
| 1573 | 0 |         } | 
| 1574 | 0 |         if (!st.ok()) { | 
| 1575 | 0 |             LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st | 
| 1576 | 0 |                          << " .and query_id_is: " << t_request.query_id; | 
| 1577 | 0 |             st.to_protobuf(response->mutable_status()); | 
| 1578 | 0 |             return; | 
| 1579 | 0 |         } | 
| 1580 | 0 |         auto fold_func = [&]() -> Status { | 
| 1581 | 0 |             std::unique_ptr<FoldConstantExecutor> fold_executor = | 
| 1582 | 0 |                     std::make_unique<FoldConstantExecutor>(); | 
| 1583 | 0 |             RETURN_IF_ERROR_OR_CATCH_EXCEPTION( | 
| 1584 | 0 |                     fold_executor->fold_constant_vexpr(t_request, response)); | 
| 1585 | 0 |             return Status::OK(); | 
| 1586 | 0 |         }; | 
| 1587 | 0 |         st = fold_func(); | 
| 1588 | 0 |         if (!st.ok()) { | 
| 1589 | 0 |             LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st | 
| 1590 | 0 |                          << " .and query_id_is: " << t_request.query_id; | 
| 1591 | 0 |         } | 
| 1592 | 0 |         st.to_protobuf(response->mutable_status()); | 
| 1593 | 0 |     }); | 
| 1594 | 0 |     if (!ret) { | 
| 1595 | 0 |         offer_failed(response, done, _light_work_pool); | 
| 1596 | 0 |         return; | 
| 1597 | 0 |     } | 
| 1598 | 0 | } | 
| 1599 |  |  | 
| 1600 |  | void PInternalService::transmit_block(google::protobuf::RpcController* controller, | 
| 1601 |  |                                       const PTransmitDataParams* request, | 
| 1602 |  |                                       PTransmitDataResult* response, | 
| 1603 | 0 |                                       google::protobuf::Closure* done) { | 
| 1604 | 0 |     int64_t receive_time = GetCurrentTimeNanos(); | 
| 1605 | 0 |     if (config::enable_bthread_transmit_block) { | 
| 1606 | 0 |         response->set_receive_time(receive_time); | 
| 1607 |  |         // under high concurrency, thread pool will have a lot of lock contention. | 
| 1608 |  |         // May offer failed to the thread pool, so that we should avoid using thread | 
| 1609 |  |         // pool here. | 
| 1610 | 0 |         _transmit_block(controller, request, response, done, Status::OK(), 0); | 
| 1611 | 0 |     } else { | 
| 1612 | 0 |         bool ret = _light_work_pool.try_offer([this, controller, request, response, done, | 
| 1613 | 0 |                                                receive_time]() { | 
| 1614 | 0 |             response->set_receive_time(receive_time); | 
| 1615 |  |             // Sometimes transmit block function is the last owner of PlanFragmentExecutor | 
| 1616 |  |             // It will release the object. And the object maybe a JNIContext. | 
| 1617 |  |             // JNIContext will hold some TLS object. It could not work correctly under bthread | 
| 1618 |  |             // Context. So that put the logic into pthread. | 
| 1619 |  |             // But this is rarely happens, so this config is disabled by default. | 
| 1620 | 0 |             _transmit_block(controller, request, response, done, Status::OK(), | 
| 1621 | 0 |                             GetCurrentTimeNanos() - receive_time); | 
| 1622 | 0 |         }); | 
| 1623 | 0 |         if (!ret) { | 
| 1624 | 0 |             offer_failed(response, done, _light_work_pool); | 
| 1625 | 0 |             return; | 
| 1626 | 0 |         } | 
| 1627 | 0 |     } | 
| 1628 | 0 | } | 
| 1629 |  |  | 
| 1630 |  | void PInternalService::transmit_block_by_http(google::protobuf::RpcController* controller, | 
| 1631 |  |                                               const PEmptyRequest* request, | 
| 1632 |  |                                               PTransmitDataResult* response, | 
| 1633 | 0 |                                               google::protobuf::Closure* done) { | 
| 1634 | 0 |     int64_t receive_time = GetCurrentTimeNanos(); | 
| 1635 | 0 |     bool ret = _heavy_work_pool.try_offer([this, controller, response, done, receive_time]() { | 
| 1636 | 0 |         PTransmitDataParams* new_request = new PTransmitDataParams(); | 
| 1637 | 0 |         google::protobuf::Closure* new_done = | 
| 1638 | 0 |                 new NewHttpClosure<PTransmitDataParams>(new_request, done); | 
| 1639 | 0 |         brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); | 
| 1640 | 0 |         Status st = | 
| 1641 | 0 |                 attachment_extract_request_contain_block<PTransmitDataParams>(new_request, cntl); | 
| 1642 | 0 |         _transmit_block(controller, new_request, response, new_done, st, | 
| 1643 | 0 |                         GetCurrentTimeNanos() - receive_time); | 
| 1644 | 0 |     }); | 
| 1645 | 0 |     if (!ret) { | 
| 1646 | 0 |         offer_failed(response, done, _heavy_work_pool); | 
| 1647 | 0 |         return; | 
| 1648 | 0 |     } | 
| 1649 | 0 | } | 
| 1650 |  |  | 
| 1651 |  | void PInternalService::_transmit_block(google::protobuf::RpcController* controller, | 
| 1652 |  |                                        const PTransmitDataParams* request, | 
| 1653 |  |                                        PTransmitDataResult* response, | 
| 1654 |  |                                        google::protobuf::Closure* done, const Status& extract_st, | 
| 1655 | 0 |                                        const int64_t wait_for_worker) { | 
| 1656 | 0 |     if (request->has_query_id()) { | 
| 1657 | 0 |         VLOG_ROW << "transmit block: fragment_instance_id=" << print_id(request->finst_id()) | 
| 1658 | 0 |                  << " query_id=" << print_id(request->query_id()) << " node=" << request->node_id(); | 
| 1659 | 0 |     } | 
| 1660 |  |  | 
| 1661 |  |     // The response is accessed when done->Run is called in transmit_block(), | 
| 1662 |  |     // give response a default value to avoid null pointers in high concurrency. | 
| 1663 | 0 |     Status st; | 
| 1664 | 0 |     if (extract_st.ok()) { | 
| 1665 | 0 |         st = _exec_env->vstream_mgr()->transmit_block(request, &done, wait_for_worker); | 
| 1666 | 0 |         if (!st.ok() && !st.is<END_OF_FILE>()) { | 
| 1667 | 0 |             LOG(WARNING) << "transmit_block failed, message=" << st | 
| 1668 | 0 |                          << ", fragment_instance_id=" << print_id(request->finst_id()) | 
| 1669 | 0 |                          << ", node=" << request->node_id() | 
| 1670 | 0 |                          << ", from sender_id: " << request->sender_id() | 
| 1671 | 0 |                          << ", be_number: " << request->be_number() | 
| 1672 | 0 |                          << ", packet_seq: " << request->packet_seq(); | 
| 1673 | 0 |         } | 
| 1674 | 0 |     } else { | 
| 1675 | 0 |         st = extract_st; | 
| 1676 | 0 |     } | 
| 1677 | 0 |     if (done != nullptr) { | 
| 1678 | 0 |         st.to_protobuf(response->mutable_status()); | 
| 1679 | 0 |         done->Run(); | 
| 1680 | 0 |     } | 
| 1681 | 0 | } | 
| 1682 |  |  | 
| 1683 |  | void PInternalService::check_rpc_channel(google::protobuf::RpcController* controller, | 
| 1684 |  |                                          const PCheckRPCChannelRequest* request, | 
| 1685 |  |                                          PCheckRPCChannelResponse* response, | 
| 1686 | 0 |                                          google::protobuf::Closure* done) { | 
| 1687 | 0 |     bool ret = _light_work_pool.try_offer([request, response, done]() { | 
| 1688 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 1689 | 0 |         response->mutable_status()->set_status_code(0); | 
| 1690 | 0 |         if (request->data().size() != request->size()) { | 
| 1691 | 0 |             std::stringstream ss; | 
| 1692 | 0 |             ss << "data size not same, expected: " << request->size() | 
| 1693 | 0 |                << ", actual: " << request->data().size(); | 
| 1694 | 0 |             response->mutable_status()->add_error_msgs(ss.str()); | 
| 1695 | 0 |             response->mutable_status()->set_status_code(1); | 
| 1696 |  | 
 | 
| 1697 | 0 |         } else { | 
| 1698 | 0 |             Md5Digest digest; | 
| 1699 | 0 |             digest.update(static_cast<const void*>(request->data().c_str()), | 
| 1700 | 0 |                           request->data().size()); | 
| 1701 | 0 |             digest.digest(); | 
| 1702 | 0 |             if (!iequal(digest.hex(), request->md5())) { | 
| 1703 | 0 |                 std::stringstream ss; | 
| 1704 | 0 |                 ss << "md5 not same, expected: " << request->md5() << ", actual: " << digest.hex(); | 
| 1705 | 0 |                 response->mutable_status()->add_error_msgs(ss.str()); | 
| 1706 | 0 |                 response->mutable_status()->set_status_code(1); | 
| 1707 | 0 |             } | 
| 1708 | 0 |         } | 
| 1709 | 0 |     }); | 
| 1710 | 0 |     if (!ret) { | 
| 1711 | 0 |         offer_failed(response, done, _light_work_pool); | 
| 1712 | 0 |         return; | 
| 1713 | 0 |     } | 
| 1714 | 0 | } | 
| 1715 |  |  | 
| 1716 |  | void PInternalService::reset_rpc_channel(google::protobuf::RpcController* controller, | 
| 1717 |  |                                          const PResetRPCChannelRequest* request, | 
| 1718 |  |                                          PResetRPCChannelResponse* response, | 
| 1719 | 0 |                                          google::protobuf::Closure* done) { | 
| 1720 | 0 |     bool ret = _light_work_pool.try_offer([request, response, done]() { | 
| 1721 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 1722 | 0 |         response->mutable_status()->set_status_code(0); | 
| 1723 | 0 |         if (request->all()) { | 
| 1724 | 0 |             int size = ExecEnv::GetInstance()->brpc_internal_client_cache()->size(); | 
| 1725 | 0 |             if (size > 0) { | 
| 1726 | 0 |                 std::vector<std::string> endpoints; | 
| 1727 | 0 |                 ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints); | 
| 1728 | 0 |                 ExecEnv::GetInstance()->brpc_internal_client_cache()->clear(); | 
| 1729 | 0 |                 *response->mutable_channels() = {endpoints.begin(), endpoints.end()}; | 
| 1730 | 0 |             } | 
| 1731 | 0 |         } else { | 
| 1732 | 0 |             for (const std::string& endpoint : request->endpoints()) { | 
| 1733 | 0 |                 if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) { | 
| 1734 | 0 |                     response->mutable_status()->add_error_msgs(endpoint + ": not found."); | 
| 1735 | 0 |                     continue; | 
| 1736 | 0 |                 } | 
| 1737 |  |  | 
| 1738 | 0 |                 if (ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) { | 
| 1739 | 0 |                     response->add_channels(endpoint); | 
| 1740 | 0 |                 } else { | 
| 1741 | 0 |                     response->mutable_status()->add_error_msgs(endpoint + ": reset failed."); | 
| 1742 | 0 |                 } | 
| 1743 | 0 |             } | 
| 1744 | 0 |             if (request->endpoints_size() != response->channels_size()) { | 
| 1745 | 0 |                 response->mutable_status()->set_status_code(1); | 
| 1746 | 0 |             } | 
| 1747 | 0 |         } | 
| 1748 | 0 |     }); | 
| 1749 | 0 |     if (!ret) { | 
| 1750 | 0 |         offer_failed(response, done, _light_work_pool); | 
| 1751 | 0 |         return; | 
| 1752 | 0 |     } | 
| 1753 | 0 | } | 
| 1754 |  |  | 
| 1755 |  | void PInternalService::hand_shake(google::protobuf::RpcController* controller, | 
| 1756 |  |                                   const PHandShakeRequest* request, PHandShakeResponse* response, | 
| 1757 | 0 |                                   google::protobuf::Closure* done) { | 
| 1758 |  |     // The light pool may be full. Handshake is used to check the connection state of brpc. | 
| 1759 |  |     // Should not be interfered by the thread pool logic. | 
| 1760 | 0 |     brpc::ClosureGuard closure_guard(done); | 
| 1761 | 0 |     if (request->has_hello()) { | 
| 1762 | 0 |         response->set_hello(request->hello()); | 
| 1763 | 0 |     } | 
| 1764 | 0 |     response->mutable_status()->set_status_code(0); | 
| 1765 | 0 | } | 
| 1766 |  |  | 
| 1767 |  | constexpr char HttpProtocol[] = "http://"; | 
| 1768 |  | constexpr char DownloadApiPath[] = "/api/_tablet/_download?token="; | 
| 1769 |  | constexpr char FileParam[] = "&file="; | 
| 1770 |  |  | 
| 1771 |  | static std::string construct_url(const std::string& host_port, const std::string& token, | 
| 1772 | 0 |                                  const std::string& path) { | 
| 1773 | 0 |     return fmt::format("{}{}{}{}{}{}", HttpProtocol, host_port, DownloadApiPath, token, FileParam, | 
| 1774 | 0 |                        path); | 
| 1775 | 0 | } | 
| 1776 |  |  | 
| 1777 |  | static Status download_file_action(std::string& remote_file_url, std::string& local_file_path, | 
| 1778 | 0 |                                    uint64_t estimate_timeout, uint64_t file_size) { | 
| 1779 | 0 |     auto download_cb = [remote_file_url, estimate_timeout, local_file_path, | 
| 1780 | 0 |                         file_size](HttpClient* client) { | 
| 1781 | 0 |         RETURN_IF_ERROR(client->init(remote_file_url)); | 
| 1782 | 0 |         client->set_timeout_ms(estimate_timeout * 1000); | 
| 1783 | 0 |         RETURN_IF_ERROR(client->download(local_file_path)); | 
| 1784 |  |  | 
| 1785 | 0 |         if (file_size > 0) { | 
| 1786 |  |             // Check file length | 
| 1787 | 0 |             uint64_t local_file_size = std::filesystem::file_size(local_file_path); | 
| 1788 | 0 |             if (local_file_size != file_size) { | 
| 1789 | 0 |                 LOG(WARNING) << "failed to pull rowset for slave replica. download file " | 
| 1790 | 0 |                                 "length error" | 
| 1791 | 0 |                              << ", remote_path=" << remote_file_url << ", file_size=" << file_size | 
| 1792 | 0 |                              << ", local_file_size=" << local_file_size; | 
| 1793 | 0 |                 return Status::InternalError("downloaded file size is not equal"); | 
| 1794 | 0 |             } | 
| 1795 | 0 |         } | 
| 1796 |  |  | 
| 1797 | 0 |         return io::global_local_filesystem()->permission(local_file_path, | 
| 1798 | 0 |                                                          io::LocalFileSystem::PERMS_OWNER_RW); | 
| 1799 | 0 |     }; | 
| 1800 | 0 |     return HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, download_cb); | 
| 1801 | 0 | } | 
| 1802 |  |  | 
| 1803 |  | void PInternalServiceImpl::request_slave_tablet_pull_rowset( | 
| 1804 |  |         google::protobuf::RpcController* controller, const PTabletWriteSlaveRequest* request, | 
| 1805 | 0 |         PTabletWriteSlaveResult* response, google::protobuf::Closure* done) { | 
| 1806 | 0 |     brpc::ClosureGuard closure_guard(done); | 
| 1807 | 0 |     const RowsetMetaPB& rowset_meta_pb = request->rowset_meta(); | 
| 1808 | 0 |     const std::string& rowset_path = request->rowset_path(); | 
| 1809 | 0 |     google::protobuf::Map<int64_t, int64_t> segments_size = request->segments_size(); | 
| 1810 | 0 |     google::protobuf::Map<int64_t, PTabletWriteSlaveRequest_IndexSizeMap> indices_size = | 
| 1811 | 0 |             request->inverted_indices_size(); | 
| 1812 | 0 |     std::string host = request->host(); | 
| 1813 | 0 |     int64_t http_port = request->http_port(); | 
| 1814 | 0 |     int64_t brpc_port = request->brpc_port(); | 
| 1815 | 0 |     std::string token = request->token(); | 
| 1816 | 0 |     int64_t node_id = request->node_id(); | 
| 1817 | 0 |     bool ret = _heavy_work_pool.try_offer([rowset_meta_pb, host, brpc_port, node_id, segments_size, | 
| 1818 | 0 |                                            indices_size, http_port, token, rowset_path, this]() { | 
| 1819 | 0 |         TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet( | 
| 1820 | 0 |                 rowset_meta_pb.tablet_id(), rowset_meta_pb.tablet_schema_hash()); | 
| 1821 | 0 |         if (tablet == nullptr) { | 
| 1822 | 0 |             LOG(WARNING) << "failed to pull rowset for slave replica. tablet [" | 
| 1823 | 0 |                          << rowset_meta_pb.tablet_id() | 
| 1824 | 0 |                          << "] is not exist. txn_id=" << rowset_meta_pb.txn_id(); | 
| 1825 | 0 |             _response_pull_slave_rowset(host, brpc_port, rowset_meta_pb.txn_id(), | 
| 1826 | 0 |                                         rowset_meta_pb.tablet_id(), node_id, false); | 
| 1827 | 0 |             return; | 
| 1828 | 0 |         } | 
| 1829 |  |  | 
| 1830 | 0 |         RowsetMetaSharedPtr rowset_meta(new RowsetMeta()); | 
| 1831 | 0 |         std::string rowset_meta_str; | 
| 1832 | 0 |         bool ret = rowset_meta_pb.SerializeToString(&rowset_meta_str); | 
| 1833 | 0 |         if (!ret) { | 
| 1834 | 0 |             LOG(WARNING) << "failed to pull rowset for slave replica. serialize rowset meta " | 
| 1835 | 0 |                             "failed. rowset_id=" | 
| 1836 | 0 |                          << rowset_meta_pb.rowset_id() | 
| 1837 | 0 |                          << ", tablet_id=" << rowset_meta_pb.tablet_id() | 
| 1838 | 0 |                          << ", txn_id=" << rowset_meta_pb.txn_id(); | 
| 1839 | 0 |             _response_pull_slave_rowset(host, brpc_port, rowset_meta_pb.txn_id(), | 
| 1840 | 0 |                                         rowset_meta_pb.tablet_id(), node_id, false); | 
| 1841 | 0 |             return; | 
| 1842 | 0 |         } | 
| 1843 | 0 |         bool parsed = rowset_meta->init(rowset_meta_str); | 
| 1844 | 0 |         if (!parsed) { | 
| 1845 | 0 |             LOG(WARNING) << "failed to pull rowset for slave replica. parse rowset meta string " | 
| 1846 | 0 |                             "failed. rowset_id=" | 
| 1847 | 0 |                          << rowset_meta_pb.rowset_id() | 
| 1848 | 0 |                          << ", tablet_id=" << rowset_meta_pb.tablet_id() | 
| 1849 | 0 |                          << ", txn_id=" << rowset_meta_pb.txn_id(); | 
| 1850 |  |             // return false will break meta iterator, return true to skip this error | 
| 1851 | 0 |             _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(), | 
| 1852 | 0 |                                         rowset_meta->tablet_id(), node_id, false); | 
| 1853 | 0 |             return; | 
| 1854 | 0 |         } | 
| 1855 | 0 |         RowsetId remote_rowset_id = rowset_meta->rowset_id(); | 
| 1856 |  |         // change rowset id because it maybe same as other local rowset | 
| 1857 | 0 |         RowsetId new_rowset_id = _engine.next_rowset_id(); | 
| 1858 | 0 |         auto pending_rs_guard = _engine.pending_local_rowsets().add(new_rowset_id); | 
| 1859 | 0 |         rowset_meta->set_rowset_id(new_rowset_id); | 
| 1860 | 0 |         rowset_meta->set_tablet_uid(tablet->tablet_uid()); | 
| 1861 | 0 |         VLOG_CRITICAL << "succeed to init rowset meta for slave replica. rowset_id=" | 
| 1862 | 0 |                       << rowset_meta->rowset_id() << ", tablet_id=" << rowset_meta->tablet_id() | 
| 1863 | 0 |                       << ", txn_id=" << rowset_meta->txn_id(); | 
| 1864 |  | 
 | 
| 1865 | 0 |         auto tablet_scheme = rowset_meta->tablet_schema(); | 
| 1866 | 0 |         for (const auto& segment : segments_size) { | 
| 1867 | 0 |             uint64_t file_size = segment.second; | 
| 1868 | 0 |             uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024; | 
| 1869 | 0 |             if (estimate_timeout < config::download_low_speed_time) { | 
| 1870 | 0 |                 estimate_timeout = config::download_low_speed_time; | 
| 1871 | 0 |             } | 
| 1872 |  | 
 | 
| 1873 | 0 |             std::string remote_file_path = | 
| 1874 | 0 |                     local_segment_path(rowset_path, remote_rowset_id.to_string(), segment.first); | 
| 1875 | 0 |             std::string remote_file_url = | 
| 1876 | 0 |                     construct_url(get_host_port(host, http_port), token, remote_file_path); | 
| 1877 |  | 
 | 
| 1878 | 0 |             std::string local_file_path = local_segment_path( | 
| 1879 | 0 |                     tablet->tablet_path(), rowset_meta->rowset_id().to_string(), segment.first); | 
| 1880 |  | 
 | 
| 1881 | 0 |             auto st = download_file_action(remote_file_url, local_file_path, estimate_timeout, | 
| 1882 | 0 |                                            file_size); | 
| 1883 | 0 |             if (!st.ok()) { | 
| 1884 | 0 |                 LOG(WARNING) << "failed to pull rowset for slave replica. failed to download " | 
| 1885 | 0 |                                 "file. url=" | 
| 1886 | 0 |                              << remote_file_url << ", local_path=" << local_file_path | 
| 1887 | 0 |                              << ", txn_id=" << rowset_meta->txn_id(); | 
| 1888 | 0 |                 _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(), | 
| 1889 | 0 |                                             rowset_meta->tablet_id(), node_id, false); | 
| 1890 | 0 |                 return; | 
| 1891 | 0 |             } | 
| 1892 | 0 |             VLOG_CRITICAL << "succeed to download file for slave replica. url=" << remote_file_url | 
| 1893 | 0 |                           << ", local_path=" << local_file_path | 
| 1894 | 0 |                           << ", txn_id=" << rowset_meta->txn_id(); | 
| 1895 | 0 |             if (indices_size.find(segment.first) != indices_size.end()) { | 
| 1896 | 0 |                 PTabletWriteSlaveRequest_IndexSizeMap segment_indices_size = | 
| 1897 | 0 |                         indices_size.at(segment.first); | 
| 1898 |  | 
 | 
| 1899 | 0 |                 for (auto index_size : segment_indices_size.index_sizes()) { | 
| 1900 | 0 |                     auto index_id = index_size.indexid(); | 
| 1901 | 0 |                     auto size = index_size.size(); | 
| 1902 | 0 |                     auto suffix_path = index_size.suffix_path(); | 
| 1903 | 0 |                     std::string remote_inverted_index_file; | 
| 1904 | 0 |                     std::string local_inverted_index_file; | 
| 1905 | 0 |                     std::string remote_inverted_index_file_url; | 
| 1906 | 0 |                     if (tablet_scheme->get_inverted_index_storage_format() == | 
| 1907 | 0 |                         InvertedIndexStorageFormatPB::V1) { | 
| 1908 | 0 |                         remote_inverted_index_file = | 
| 1909 | 0 |                                 InvertedIndexDescriptor::get_index_file_path_v1( | 
| 1910 | 0 |                                         InvertedIndexDescriptor::get_index_file_path_prefix( | 
| 1911 | 0 |                                                 remote_file_path), | 
| 1912 | 0 |                                         index_id, suffix_path); | 
| 1913 | 0 |                         remote_inverted_index_file_url = construct_url( | 
| 1914 | 0 |                                 get_host_port(host, http_port), token, remote_inverted_index_file); | 
| 1915 |  | 
 | 
| 1916 | 0 |                         local_inverted_index_file = InvertedIndexDescriptor::get_index_file_path_v1( | 
| 1917 | 0 |                                 InvertedIndexDescriptor::get_index_file_path_prefix( | 
| 1918 | 0 |                                         local_file_path), | 
| 1919 | 0 |                                 index_id, suffix_path); | 
| 1920 | 0 |                     } else { | 
| 1921 | 0 |                         remote_inverted_index_file = | 
| 1922 | 0 |                                 InvertedIndexDescriptor::get_index_file_path_v2( | 
| 1923 | 0 |                                         InvertedIndexDescriptor::get_index_file_path_prefix( | 
| 1924 | 0 |                                                 remote_file_path)); | 
| 1925 | 0 |                         remote_inverted_index_file_url = construct_url( | 
| 1926 | 0 |                                 get_host_port(host, http_port), token, remote_inverted_index_file); | 
| 1927 |  | 
 | 
| 1928 | 0 |                         local_inverted_index_file = InvertedIndexDescriptor::get_index_file_path_v2( | 
| 1929 | 0 |                                 InvertedIndexDescriptor::get_index_file_path_prefix( | 
| 1930 | 0 |                                         local_file_path)); | 
| 1931 | 0 |                     } | 
| 1932 | 0 |                     st = download_file_action(remote_inverted_index_file_url, | 
| 1933 | 0 |                                               local_inverted_index_file, estimate_timeout, size); | 
| 1934 | 0 |                     if (!st.ok()) { | 
| 1935 | 0 |                         LOG(WARNING) << "failed to pull rowset for slave replica. failed to " | 
| 1936 | 0 |                                         "download " | 
| 1937 | 0 |                                         "file. url=" | 
| 1938 | 0 |                                      << remote_inverted_index_file_url | 
| 1939 | 0 |                                      << ", local_path=" << local_inverted_index_file | 
| 1940 | 0 |                                      << ", txn_id=" << rowset_meta->txn_id(); | 
| 1941 | 0 |                         _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(), | 
| 1942 | 0 |                                                     rowset_meta->tablet_id(), node_id, false); | 
| 1943 | 0 |                         return; | 
| 1944 | 0 |                     } | 
| 1945 |  |  | 
| 1946 | 0 |                     VLOG_CRITICAL | 
| 1947 | 0 |                             << "succeed to download inverted index file for slave replica. url=" | 
| 1948 | 0 |                             << remote_inverted_index_file_url | 
| 1949 | 0 |                             << ", local_path=" << local_inverted_index_file | 
| 1950 | 0 |                             << ", txn_id=" << rowset_meta->txn_id(); | 
| 1951 | 0 |                 } | 
| 1952 | 0 |             } | 
| 1953 | 0 |         } | 
| 1954 |  |  | 
| 1955 | 0 |         RowsetSharedPtr rowset; | 
| 1956 | 0 |         Status create_status = RowsetFactory::create_rowset( | 
| 1957 | 0 |                 tablet->tablet_schema(), tablet->tablet_path(), rowset_meta, &rowset); | 
| 1958 | 0 |         if (!create_status) { | 
| 1959 | 0 |             LOG(WARNING) << "failed to create rowset from rowset meta for slave replica" | 
| 1960 | 0 |                          << ". rowset_id: " << rowset_meta->rowset_id() | 
| 1961 | 0 |                          << ", rowset_type: " << rowset_meta->rowset_type() | 
| 1962 | 0 |                          << ", rowset_state: " << rowset_meta->rowset_state() | 
| 1963 | 0 |                          << ", tablet_id=" << rowset_meta->tablet_id() | 
| 1964 | 0 |                          << ", txn_id=" << rowset_meta->txn_id(); | 
| 1965 | 0 |             _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(), | 
| 1966 | 0 |                                         rowset_meta->tablet_id(), node_id, false); | 
| 1967 | 0 |             return; | 
| 1968 | 0 |         } | 
| 1969 | 0 |         if (rowset_meta->rowset_state() != RowsetStatePB::COMMITTED) { | 
| 1970 | 0 |             LOG(WARNING) << "could not commit txn for slave replica because master rowset state is " | 
| 1971 | 0 |                             "not committed, rowset_state=" | 
| 1972 | 0 |                          << rowset_meta->rowset_state() | 
| 1973 | 0 |                          << ", tablet_id=" << rowset_meta->tablet_id() | 
| 1974 | 0 |                          << ", txn_id=" << rowset_meta->txn_id(); | 
| 1975 | 0 |             _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(), | 
| 1976 | 0 |                                         rowset_meta->tablet_id(), node_id, false); | 
| 1977 | 0 |             return; | 
| 1978 | 0 |         } | 
| 1979 | 0 |         Status commit_txn_status = _engine.txn_manager()->commit_txn( | 
| 1980 | 0 |                 tablet->data_dir()->get_meta(), rowset_meta->partition_id(), rowset_meta->txn_id(), | 
| 1981 | 0 |                 rowset_meta->tablet_id(), tablet->tablet_uid(), rowset_meta->load_id(), rowset, | 
| 1982 | 0 |                 std::move(pending_rs_guard), false); | 
| 1983 | 0 |         if (!commit_txn_status && !commit_txn_status.is<PUSH_TRANSACTION_ALREADY_EXIST>()) { | 
| 1984 | 0 |             LOG(WARNING) << "failed to add committed rowset for slave replica. rowset_id=" | 
| 1985 | 0 |                          << rowset_meta->rowset_id() << ", tablet_id=" << rowset_meta->tablet_id() | 
| 1986 | 0 |                          << ", txn_id=" << rowset_meta->txn_id(); | 
| 1987 | 0 |             _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(), | 
| 1988 | 0 |                                         rowset_meta->tablet_id(), node_id, false); | 
| 1989 | 0 |             return; | 
| 1990 | 0 |         } | 
| 1991 | 0 |         VLOG_CRITICAL << "succeed to pull rowset for slave replica. successfully to add committed " | 
| 1992 | 0 |                          "rowset: " | 
| 1993 | 0 |                       << rowset_meta->rowset_id() | 
| 1994 | 0 |                       << " to tablet, tablet_id=" << rowset_meta->tablet_id() | 
| 1995 | 0 |                       << ", schema_hash=" << rowset_meta->tablet_schema_hash() | 
| 1996 | 0 |                       << ", txn_id=" << rowset_meta->txn_id(); | 
| 1997 | 0 |         _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(), | 
| 1998 | 0 |                                     rowset_meta->tablet_id(), node_id, true); | 
| 1999 | 0 |     }); | 
| 2000 | 0 |     if (!ret) { | 
| 2001 | 0 |         offer_failed(response, closure_guard.release(), _heavy_work_pool); | 
| 2002 | 0 |         return; | 
| 2003 | 0 |     } | 
| 2004 | 0 |     Status::OK().to_protobuf(response->mutable_status()); | 
| 2005 | 0 | } | 
| 2006 |  |  | 
| 2007 |  | void PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote_host, | 
| 2008 |  |                                                        int64_t brpc_port, int64_t txn_id, | 
| 2009 |  |                                                        int64_t tablet_id, int64_t node_id, | 
| 2010 | 0 |                                                        bool is_succeed) { | 
| 2011 | 0 |     std::shared_ptr<PBackendService_Stub> stub = | 
| 2012 | 0 |             ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(remote_host, | 
| 2013 | 0 |                                                                              brpc_port); | 
| 2014 | 0 |     if (stub == nullptr) { | 
| 2015 | 0 |         LOG(WARNING) << "failed to response result of slave replica to master replica. get rpc " | 
| 2016 | 0 |                         "stub failed, master host=" | 
| 2017 | 0 |                      << remote_host << ", port=" << brpc_port << ", tablet_id=" << tablet_id | 
| 2018 | 0 |                      << ", txn_id=" << txn_id; | 
| 2019 | 0 |         return; | 
| 2020 | 0 |     } | 
| 2021 |  |  | 
| 2022 | 0 |     auto request = std::make_shared<PTabletWriteSlaveDoneRequest>(); | 
| 2023 | 0 |     request->set_txn_id(txn_id); | 
| 2024 | 0 |     request->set_tablet_id(tablet_id); | 
| 2025 | 0 |     request->set_node_id(node_id); | 
| 2026 | 0 |     request->set_is_succeed(is_succeed); | 
| 2027 | 0 |     auto pull_rowset_callback = DummyBrpcCallback<PTabletWriteSlaveDoneResult>::create_shared(); | 
| 2028 | 0 |     auto closure = AutoReleaseClosure< | 
| 2029 | 0 |             PTabletWriteSlaveDoneRequest, | 
| 2030 | 0 |             DummyBrpcCallback<PTabletWriteSlaveDoneResult>>::create_unique(request, | 
| 2031 | 0 |                                                                            pull_rowset_callback); | 
| 2032 | 0 |     closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000); | 
| 2033 | 0 |     closure->cntl_->ignore_eovercrowded(); | 
| 2034 | 0 |     stub->response_slave_tablet_pull_rowset(closure->cntl_.get(), closure->request_.get(), | 
| 2035 | 0 |                                             closure->response_.get(), closure.get()); | 
| 2036 | 0 |     closure.release(); | 
| 2037 |  | 
 | 
| 2038 | 0 |     pull_rowset_callback->join(); | 
| 2039 | 0 |     if (pull_rowset_callback->cntl_->Failed()) { | 
| 2040 | 0 |         LOG(WARNING) << "failed to response result of slave replica to master replica, error=" | 
| 2041 | 0 |                      << berror(pull_rowset_callback->cntl_->ErrorCode()) | 
| 2042 | 0 |                      << ", error_text=" << pull_rowset_callback->cntl_->ErrorText() | 
| 2043 | 0 |                      << ", master host: " << remote_host << ", tablet_id=" << tablet_id | 
| 2044 | 0 |                      << ", txn_id=" << txn_id; | 
| 2045 | 0 |     } | 
| 2046 | 0 |     VLOG_CRITICAL << "succeed to response the result of slave replica pull rowset to master " | 
| 2047 | 0 |                      "replica. master host: " | 
| 2048 | 0 |                   << remote_host << ". is_succeed=" << is_succeed << ", tablet_id=" << tablet_id | 
| 2049 | 0 |                   << ", slave server=" << node_id << ", txn_id=" << txn_id; | 
| 2050 | 0 | } | 
| 2051 |  |  | 
| 2052 |  | void PInternalServiceImpl::response_slave_tablet_pull_rowset( | 
| 2053 |  |         google::protobuf::RpcController* controller, const PTabletWriteSlaveDoneRequest* request, | 
| 2054 | 0 |         PTabletWriteSlaveDoneResult* response, google::protobuf::Closure* done) { | 
| 2055 | 0 |     bool ret = _heavy_work_pool.try_offer([txn_mgr = _engine.txn_manager(), request, response, | 
| 2056 | 0 |                                            done]() { | 
| 2057 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 2058 | 0 |         VLOG_CRITICAL << "receive the result of slave replica pull rowset from slave replica. " | 
| 2059 | 0 |                          "slave server=" | 
| 2060 | 0 |                       << request->node_id() << ", is_succeed=" << request->is_succeed() | 
| 2061 | 0 |                       << ", tablet_id=" << request->tablet_id() << ", txn_id=" << request->txn_id(); | 
| 2062 | 0 |         txn_mgr->finish_slave_tablet_pull_rowset(request->txn_id(), request->tablet_id(), | 
| 2063 | 0 |                                                  request->node_id(), request->is_succeed()); | 
| 2064 | 0 |         Status::OK().to_protobuf(response->mutable_status()); | 
| 2065 | 0 |     }); | 
| 2066 | 0 |     if (!ret) { | 
| 2067 | 0 |         offer_failed(response, done, _heavy_work_pool); | 
| 2068 | 0 |         return; | 
| 2069 | 0 |     } | 
| 2070 | 0 | } | 
| 2071 |  |  | 
| 2072 |  | void PInternalService::multiget_data(google::protobuf::RpcController* controller, | 
| 2073 |  |                                      const PMultiGetRequest* request, PMultiGetResponse* response, | 
| 2074 | 0 |                                      google::protobuf::Closure* done) { | 
| 2075 | 0 |     bool ret = _heavy_work_pool.try_offer([request, response, done]() { | 
| 2076 | 0 |         signal::SignalTaskIdKeeper keeper(request->query_id()); | 
| 2077 |  |         // multi get data by rowid | 
| 2078 | 0 |         MonotonicStopWatch watch; | 
| 2079 | 0 |         watch.start(); | 
| 2080 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 2081 | 0 |         response->mutable_status()->set_status_code(0); | 
| 2082 | 0 |         SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->rowid_storage_reader_tracker()); | 
| 2083 | 0 |         Status st = RowIdStorageReader::read_by_rowids(*request, response); | 
| 2084 | 0 |         st.to_protobuf(response->mutable_status()); | 
| 2085 | 0 |         LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000; | 
| 2086 | 0 |     }); | 
| 2087 | 0 |     if (!ret) { | 
| 2088 | 0 |         offer_failed(response, done, _heavy_work_pool); | 
| 2089 | 0 |         return; | 
| 2090 | 0 |     } | 
| 2091 | 0 | } | 
| 2092 |  |  | 
| 2093 |  | void PInternalService::multiget_data_v2(google::protobuf::RpcController* controller, | 
| 2094 |  |                                         const PMultiGetRequestV2* request, | 
| 2095 |  |                                         PMultiGetResponseV2* response, | 
| 2096 | 0 |                                         google::protobuf::Closure* done) { | 
| 2097 | 0 |     std::vector<uint64_t> id_set; | 
| 2098 | 0 |     id_set.push_back(request->wg_id()); | 
| 2099 | 0 |     auto wg = ExecEnv::GetInstance()->workload_group_mgr()->get_group(id_set); | 
| 2100 | 0 |     Status st = Status::OK(); | 
| 2101 |  | 
 | 
| 2102 | 0 |     if (!wg) [[unlikely]] { | 
| 2103 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 2104 | 0 |         st = Status::Error<TStatusCode::CANCELLED>("fail to find wg: wg id:" + | 
| 2105 | 0 |                                                    std::to_string(request->wg_id())); | 
| 2106 | 0 |         st.to_protobuf(response->mutable_status()); | 
| 2107 | 0 |         return; | 
| 2108 | 0 |     } | 
| 2109 |  |  | 
| 2110 | 0 |     doris::pipeline::TaskScheduler* exec_sched = nullptr; | 
| 2111 | 0 |     vectorized::SimplifiedScanScheduler* scan_sched = nullptr; | 
| 2112 | 0 |     vectorized::SimplifiedScanScheduler* remote_scan_sched = nullptr; | 
| 2113 | 0 |     wg->get_query_scheduler(&exec_sched, &scan_sched, &remote_scan_sched); | 
| 2114 | 0 |     DCHECK(remote_scan_sched); | 
| 2115 |  | 
 | 
| 2116 | 0 |     st = remote_scan_sched->submit_scan_task( | 
| 2117 | 0 |             vectorized::SimplifiedScanTask( | 
| 2118 | 0 |                     [request, response, done]() { | 
| 2119 | 0 |                         SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->rowid_storage_reader_tracker()); | 
| 2120 | 0 |                         signal::set_signal_task_id(request->query_id()); | 
| 2121 |  |                         // multi get data by rowid | 
| 2122 | 0 |                         MonotonicStopWatch watch; | 
| 2123 | 0 |                         watch.start(); | 
| 2124 | 0 |                         brpc::ClosureGuard closure_guard(done); | 
| 2125 | 0 |                         response->mutable_status()->set_status_code(0); | 
| 2126 | 0 |                         Status st = RowIdStorageReader::read_by_rowids(*request, response); | 
| 2127 | 0 |                         st.to_protobuf(response->mutable_status()); | 
| 2128 | 0 |                         LOG(INFO) << "multiget_data finished, cost(us):" | 
| 2129 | 0 |                                   << watch.elapsed_time() / 1000; | 
| 2130 | 0 |                         return true; | 
| 2131 | 0 |                     }, | 
| 2132 | 0 |                     nullptr, nullptr), | 
| 2133 | 0 |             fmt::format("{}-multiget_data_v2", print_id(request->query_id()))); | 
| 2134 |  | 
 | 
| 2135 | 0 |     if (!st.ok()) { | 
| 2136 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 2137 | 0 |         st.to_protobuf(response->mutable_status()); | 
| 2138 | 0 |     } | 
| 2139 | 0 | } | 
| 2140 |  |  | 
| 2141 |  | void PInternalServiceImpl::get_tablet_rowset_versions(google::protobuf::RpcController* cntl_base, | 
| 2142 |  |                                                       const PGetTabletVersionsRequest* request, | 
| 2143 |  |                                                       PGetTabletVersionsResponse* response, | 
| 2144 | 0 |                                                       google::protobuf::Closure* done) { | 
| 2145 | 0 |     brpc::ClosureGuard closure_guard(done); | 
| 2146 | 0 |     VLOG_DEBUG << "receive get tablet versions request: " << request->DebugString(); | 
| 2147 | 0 |     _engine.get_tablet_rowset_versions(request, response); | 
| 2148 | 0 | } | 
| 2149 |  |  | 
| 2150 |  | void PInternalService::glob(google::protobuf::RpcController* controller, | 
| 2151 |  |                             const PGlobRequest* request, PGlobResponse* response, | 
| 2152 | 0 |                             google::protobuf::Closure* done) { | 
| 2153 | 0 |     bool ret = _heavy_work_pool.try_offer([request, response, done]() { | 
| 2154 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 2155 | 0 |         std::vector<io::FileInfo> files; | 
| 2156 | 0 |         Status st = io::global_local_filesystem()->safe_glob(request->pattern(), &files); | 
| 2157 | 0 |         if (st.ok()) { | 
| 2158 | 0 |             for (auto& file : files) { | 
| 2159 | 0 |                 PGlobResponse_PFileInfo* pfile = response->add_files(); | 
| 2160 | 0 |                 pfile->set_file(file.file_name); | 
| 2161 | 0 |                 pfile->set_size(file.file_size); | 
| 2162 | 0 |             } | 
| 2163 | 0 |         } | 
| 2164 | 0 |         st.to_protobuf(response->mutable_status()); | 
| 2165 | 0 |     }); | 
| 2166 | 0 |     if (!ret) { | 
| 2167 | 0 |         offer_failed(response, done, _heavy_work_pool); | 
| 2168 | 0 |         return; | 
| 2169 | 0 |     } | 
| 2170 | 0 | } | 
| 2171 |  |  | 
| 2172 |  | void PInternalService::group_commit_insert(google::protobuf::RpcController* controller, | 
| 2173 |  |                                            const PGroupCommitInsertRequest* request, | 
| 2174 |  |                                            PGroupCommitInsertResponse* response, | 
| 2175 | 0 |                                            google::protobuf::Closure* done) { | 
| 2176 | 0 |     TUniqueId load_id; | 
| 2177 | 0 |     load_id.__set_hi(request->load_id().hi()); | 
| 2178 | 0 |     load_id.__set_lo(request->load_id().lo()); | 
| 2179 | 0 |     std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>(); | 
| 2180 | 0 |     std::shared_ptr<bool> is_done = std::make_shared<bool>(false); | 
| 2181 | 0 |     bool ret = _heavy_work_pool.try_offer([this, request, response, done, load_id, lock, | 
| 2182 | 0 |                                            is_done]() { | 
| 2183 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 2184 | 0 |         std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env); | 
| 2185 | 0 |         auto pipe = std::make_shared<io::StreamLoadPipe>( | 
| 2186 | 0 |                 io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, | 
| 2187 | 0 |                 -1 /* total_length */, true /* use_proto */); | 
| 2188 | 0 |         ctx->pipe = pipe; | 
| 2189 | 0 |         Status st = _exec_env->new_load_stream_mgr()->put(load_id, ctx); | 
| 2190 | 0 |         if (st.ok()) { | 
| 2191 | 0 |             try { | 
| 2192 | 0 |                 st = _exec_plan_fragment_impl( | 
| 2193 | 0 |                         request->exec_plan_fragment_request().request(), | 
| 2194 | 0 |                         request->exec_plan_fragment_request().version(), | 
| 2195 | 0 |                         request->exec_plan_fragment_request().compact(), | 
| 2196 | 0 |                         [&, response, done, load_id, lock, is_done](RuntimeState* state, | 
| 2197 | 0 |                                                                     Status* status) { | 
| 2198 | 0 |                             std::lock_guard<std::mutex> lock1(*lock); | 
| 2199 | 0 |                             if (*is_done) { | 
| 2200 | 0 |                                 return; | 
| 2201 | 0 |                             } | 
| 2202 | 0 |                             *is_done = true; | 
| 2203 | 0 |                             brpc::ClosureGuard cb_closure_guard(done); | 
| 2204 | 0 |                             response->set_label(state->import_label()); | 
| 2205 | 0 |                             response->set_txn_id(state->wal_id()); | 
| 2206 | 0 |                             response->set_loaded_rows(state->num_rows_load_success()); | 
| 2207 | 0 |                             response->set_filtered_rows(state->num_rows_load_filtered()); | 
| 2208 | 0 |                             status->to_protobuf(response->mutable_status()); | 
| 2209 | 0 |                             if (!state->get_error_log_file_path().empty()) { | 
| 2210 | 0 |                                 response->set_error_url( | 
| 2211 | 0 |                                         to_load_error_http_path(state->get_error_log_file_path())); | 
| 2212 | 0 |                             } | 
| 2213 | 0 |                             if (!state->get_first_error_msg().empty()) { | 
| 2214 | 0 |                                 response->set_first_error_msg(state->get_first_error_msg()); | 
| 2215 | 0 |                             } | 
| 2216 | 0 |                             _exec_env->new_load_stream_mgr()->remove(load_id); | 
| 2217 | 0 |                         }); | 
| 2218 | 0 |             } catch (const Exception& e) { | 
| 2219 | 0 |                 st = e.to_status(); | 
| 2220 | 0 |             } catch (const std::exception& e) { | 
| 2221 | 0 |                 st = Status::Error(ErrorCode::INTERNAL_ERROR, e.what()); | 
| 2222 | 0 |             } catch (...) { | 
| 2223 | 0 |                 st = Status::Error(ErrorCode::INTERNAL_ERROR, | 
| 2224 | 0 |                                    "_exec_plan_fragment_impl meet unknown error"); | 
| 2225 | 0 |             } | 
| 2226 | 0 |             if (!st.ok()) { | 
| 2227 | 0 |                 LOG(WARNING) << "exec plan fragment failed, load_id=" << print_id(load_id) | 
| 2228 | 0 |                              << ", errmsg=" << st; | 
| 2229 | 0 |                 std::lock_guard<std::mutex> lock1(*lock); | 
| 2230 | 0 |                 if (*is_done) { | 
| 2231 | 0 |                     closure_guard.release(); | 
| 2232 | 0 |                 } else { | 
| 2233 | 0 |                     *is_done = true; | 
| 2234 | 0 |                     st.to_protobuf(response->mutable_status()); | 
| 2235 | 0 |                     _exec_env->new_load_stream_mgr()->remove(load_id); | 
| 2236 | 0 |                 } | 
| 2237 | 0 |             } else { | 
| 2238 | 0 |                 closure_guard.release(); | 
| 2239 | 0 |                 for (int i = 0; i < request->data().size(); ++i) { | 
| 2240 | 0 |                     std::unique_ptr<PDataRow> row(new PDataRow()); | 
| 2241 | 0 |                     row->CopyFrom(request->data(i)); | 
| 2242 | 0 |                     st = pipe->append(std::move(row)); | 
| 2243 | 0 |                     if (!st.ok()) { | 
| 2244 | 0 |                         break; | 
| 2245 | 0 |                     } | 
| 2246 | 0 |                 } | 
| 2247 | 0 |                 if (st.ok()) { | 
| 2248 | 0 |                     static_cast<void>(pipe->finish()); | 
| 2249 | 0 |                 } | 
| 2250 | 0 |             } | 
| 2251 | 0 |         } | 
| 2252 | 0 |     }); | 
| 2253 | 0 |     if (!ret) { | 
| 2254 | 0 |         _exec_env->new_load_stream_mgr()->remove(load_id); | 
| 2255 | 0 |         offer_failed(response, done, _heavy_work_pool); | 
| 2256 | 0 |         return; | 
| 2257 | 0 |     } | 
| 2258 | 0 | }; | 
| 2259 |  |  | 
| 2260 |  | void PInternalService::get_wal_queue_size(google::protobuf::RpcController* controller, | 
| 2261 |  |                                           const PGetWalQueueSizeRequest* request, | 
| 2262 |  |                                           PGetWalQueueSizeResponse* response, | 
| 2263 | 0 |                                           google::protobuf::Closure* done) { | 
| 2264 | 0 |     bool ret = _heavy_work_pool.try_offer([this, request, response, done]() { | 
| 2265 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 2266 | 0 |         Status st = Status::OK(); | 
| 2267 | 0 |         auto table_id = request->table_id(); | 
| 2268 | 0 |         auto count = _exec_env->wal_mgr()->get_wal_queue_size(table_id); | 
| 2269 | 0 |         response->set_size(count); | 
| 2270 | 0 |         response->mutable_status()->set_status_code(st.code()); | 
| 2271 | 0 |     }); | 
| 2272 | 0 |     if (!ret) { | 
| 2273 | 0 |         offer_failed(response, done, _heavy_work_pool); | 
| 2274 | 0 |     } | 
| 2275 | 0 | } | 
| 2276 |  |  | 
| 2277 |  | void PInternalService::get_be_resource(google::protobuf::RpcController* controller, | 
| 2278 |  |                                        const PGetBeResourceRequest* request, | 
| 2279 |  |                                        PGetBeResourceResponse* response, | 
| 2280 | 0 |                                        google::protobuf::Closure* done) { | 
| 2281 | 0 |     bool ret = _heavy_work_pool.try_offer([response, done]() { | 
| 2282 | 0 |         brpc::ClosureGuard closure_guard(done); | 
| 2283 | 0 |         int64_t mem_limit = MemInfo::mem_limit(); | 
| 2284 | 0 |         int64_t mem_usage = PerfCounters::get_vm_rss(); | 
| 2285 |  | 
 | 
| 2286 | 0 |         PGlobalResourceUsage* global_resource_usage = response->mutable_global_be_resource_usage(); | 
| 2287 | 0 |         global_resource_usage->set_mem_limit(mem_limit); | 
| 2288 | 0 |         global_resource_usage->set_mem_usage(mem_usage); | 
| 2289 |  | 
 | 
| 2290 | 0 |         Status st = Status::OK(); | 
| 2291 | 0 |         response->mutable_status()->set_status_code(st.code()); | 
| 2292 | 0 |     }); | 
| 2293 | 0 |     if (!ret) { | 
| 2294 | 0 |         offer_failed(response, done, _heavy_work_pool); | 
| 2295 | 0 |     } | 
| 2296 | 0 | } | 
| 2297 |  |  | 
| 2298 |  | void PInternalService::delete_dictionary(google::protobuf::RpcController* controller, | 
| 2299 |  |                                          const PDeleteDictionaryRequest* request, | 
| 2300 |  |                                          PDeleteDictionaryResponse* response, | 
| 2301 | 0 |                                          google::protobuf::Closure* done) { | 
| 2302 | 0 |     brpc::ClosureGuard closure_guard(done); | 
| 2303 | 0 |     Status st = ExecEnv::GetInstance()->dict_factory()->delete_dict(request->dictionary_id()); | 
| 2304 | 0 |     st.to_protobuf(response->mutable_status()); | 
| 2305 | 0 | } | 
| 2306 |  |  | 
| 2307 |  | void PInternalService::commit_refresh_dictionary(google::protobuf::RpcController* controller, | 
| 2308 |  |                                                  const PCommitRefreshDictionaryRequest* request, | 
| 2309 |  |                                                  PCommitRefreshDictionaryResponse* response, | 
| 2310 | 0 |                                                  google::protobuf::Closure* done) { | 
| 2311 | 0 |     brpc::ClosureGuard closure_guard(done); | 
| 2312 | 0 |     Status st = ExecEnv::GetInstance()->dict_factory()->commit_refresh_dict( | 
| 2313 | 0 |             request->dictionary_id(), request->version_id()); | 
| 2314 | 0 |     st.to_protobuf(response->mutable_status()); | 
| 2315 | 0 | } | 
| 2316 |  |  | 
| 2317 |  | void PInternalService::abort_refresh_dictionary(google::protobuf::RpcController* controller, | 
| 2318 |  |                                                 const PAbortRefreshDictionaryRequest* request, | 
| 2319 |  |                                                 PAbortRefreshDictionaryResponse* response, | 
| 2320 | 0 |                                                 google::protobuf::Closure* done) { | 
| 2321 | 0 |     brpc::ClosureGuard closure_guard(done); | 
| 2322 | 0 |     Status st = ExecEnv::GetInstance()->dict_factory()->abort_refresh_dict(request->dictionary_id(), | 
| 2323 | 0 |                                                                            request->version_id()); | 
| 2324 | 0 |     st.to_protobuf(response->mutable_status()); | 
| 2325 | 0 | } | 
| 2326 |  |  | 
| 2327 |  | void PInternalService::get_tablet_rowsets(google::protobuf::RpcController* controller, | 
| 2328 |  |                                           const PGetTabletRowsetsRequest* request, | 
| 2329 |  |                                           PGetTabletRowsetsResponse* response, | 
| 2330 | 0 |                                           google::protobuf::Closure* done) { | 
| 2331 | 0 |     DCHECK(config::is_cloud_mode()); | 
| 2332 | 0 |     auto start_time = GetMonoTimeMicros(); | 
| 2333 | 0 |     Defer defer { | 
| 2334 | 0 |             [&]() { g_process_remote_fetch_rowsets_latency << GetMonoTimeMicros() - start_time; }}; | 
| 2335 | 0 |     brpc::ClosureGuard closure_guard(done); | 
| 2336 | 0 |     LOG(INFO) << "process get tablet rowsets, request=" << request->ShortDebugString(); | 
| 2337 | 0 |     if (!request->has_tablet_id() || !request->has_version_start() || !request->has_version_end()) { | 
| 2338 | 0 |         Status::InvalidArgument("missing params tablet/version_start/version_end") | 
| 2339 | 0 |                 .to_protobuf(response->mutable_status()); | 
| 2340 | 0 |         return; | 
| 2341 | 0 |     } | 
| 2342 | 0 |     CloudStorageEngine& storage = ExecEnv::GetInstance()->storage_engine().to_cloud(); | 
| 2343 |  | 
 | 
| 2344 | 0 |     auto maybe_tablet = | 
| 2345 | 0 |             storage.tablet_mgr().get_tablet(request->tablet_id(), /*warmup data*/ false, | 
| 2346 | 0 |                                             /*syn_delete_bitmap*/ false, /*delete_bitmap*/ nullptr, | 
| 2347 | 0 |                                             /*local_only*/ true); | 
| 2348 | 0 |     if (!maybe_tablet) { | 
| 2349 | 0 |         maybe_tablet.error().to_protobuf(response->mutable_status()); | 
| 2350 | 0 |         return; | 
| 2351 | 0 |     } | 
| 2352 | 0 |     auto tablet = maybe_tablet.value(); | 
| 2353 | 0 |     Result<CaptureRowsetResult> ret; | 
| 2354 | 0 |     { | 
| 2355 | 0 |         std::shared_lock l(tablet->get_header_lock()); | 
| 2356 | 0 |         ret = tablet->capture_consistent_rowsets_unlocked( | 
| 2357 | 0 |                 {request->version_start(), request->version_end()}, | 
| 2358 | 0 |                 CaptureRowsetOps {.enable_fetch_rowsets_from_peers = false}); | 
| 2359 | 0 |     } | 
| 2360 | 0 |     if (!ret) { | 
| 2361 | 0 |         ret.error().to_protobuf(response->mutable_status()); | 
| 2362 | 0 |         return; | 
| 2363 | 0 |     } | 
| 2364 | 0 |     auto rowsets = std::move(ret.value().rowsets); | 
| 2365 | 0 |     for (const auto& rs : rowsets) { | 
| 2366 | 0 |         RowsetMetaPB meta; | 
| 2367 | 0 |         rs->rowset_meta()->to_rowset_pb(&meta); | 
| 2368 | 0 |         response->mutable_rowsets()->Add(std::move(meta)); | 
| 2369 | 0 |     } | 
| 2370 | 0 |     if (request->has_delete_bitmap_keys()) { | 
| 2371 | 0 |         DCHECK(tablet->enable_unique_key_merge_on_write()); | 
| 2372 | 0 |         auto delete_bitmap = std::move(ret.value().delete_bitmap); | 
| 2373 | 0 |         auto keys_pb = request->delete_bitmap_keys(); | 
| 2374 | 0 |         size_t len = keys_pb.rowset_ids().size(); | 
| 2375 | 0 |         DCHECK_EQ(len, keys_pb.segment_ids().size()); | 
| 2376 | 0 |         DCHECK_EQ(len, keys_pb.versions().size()); | 
| 2377 | 0 |         std::set<DeleteBitmap::BitmapKey> keys; | 
| 2378 | 0 |         for (size_t i = 0; i < len; ++i) { | 
| 2379 | 0 |             RowsetId rs_id; | 
| 2380 | 0 |             rs_id.init(keys_pb.rowset_ids(i)); | 
| 2381 | 0 |             keys.emplace(rs_id, keys_pb.segment_ids(i), keys_pb.versions(i)); | 
| 2382 | 0 |         } | 
| 2383 | 0 |         auto diffset = delete_bitmap->diffset(keys).to_pb(); | 
| 2384 | 0 |         *response->mutable_delete_bitmap() = std::move(diffset); | 
| 2385 | 0 |     } | 
| 2386 | 0 |     Status::OK().to_protobuf(response->mutable_status()); | 
| 2387 | 0 | } | 
| 2388 |  |  | 
| 2389 |  | #include "common/compile_check_avoid_end.h" | 
| 2390 |  | } // namespace doris |