be/src/exec/scan/meta_scanner.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/scan/meta_scanner.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/FrontendService.h> |
22 | | #include <gen_cpp/FrontendService_types.h> |
23 | | #include <gen_cpp/HeartbeatService_types.h> |
24 | | #include <gen_cpp/PaloInternalService_types.h> |
25 | | #include <gen_cpp/PlanNodes_types.h> |
26 | | |
27 | | #include <ostream> |
28 | | #include <string> |
29 | | #include <unordered_map> |
30 | | |
31 | | #include "common/cast_set.h" |
32 | | #include "common/logging.h" |
33 | | #include "core/block/block.h" |
34 | | #include "core/column/column.h" |
35 | | #include "core/column/column_nullable.h" |
36 | | #include "core/column/column_string.h" |
37 | | #include "core/column/column_vector.h" |
38 | | #include "core/data_type/define_primitive_type.h" |
39 | | #include "core/types.h" |
40 | | #include "format/table/iceberg_sys_table_jni_reader.h" |
41 | | #include "format/table/parquet_metadata_reader.h" |
42 | | #include "runtime/descriptors.h" |
43 | | #include "runtime/exec_env.h" |
44 | | #include "runtime/runtime_state.h" |
45 | | #include "util/client_cache.h" |
46 | | #include "util/thrift_rpc_helper.h" |
47 | | |
48 | | namespace doris { |
49 | | class RuntimeProfile; |
50 | | class VExprContext; |
51 | | } // namespace doris |
52 | | |
53 | | namespace doris { |
54 | | #include "common/compile_check_begin.h" |
55 | | |
56 | | MetaScanner::MetaScanner(RuntimeState* state, ScanLocalStateBase* local_state, TupleId tuple_id, |
57 | | const TScanRangeParams& scan_range, int64_t limit, RuntimeProfile* profile, |
58 | | TUserIdentity user_identity) |
59 | 5.30k | : Scanner(state, local_state, limit, profile), |
60 | 5.30k | _meta_eos(false), |
61 | 5.30k | _tuple_id(tuple_id), |
62 | 5.30k | _user_identity(user_identity), |
63 | 5.30k | _scan_range(scan_range.scan_range) {} |
64 | | |
65 | 5.30k | Status MetaScanner::_open_impl(RuntimeState* state) { |
66 | 5.30k | VLOG_CRITICAL << "MetaScanner::open"; |
67 | 5.30k | RETURN_IF_ERROR(Scanner::_open_impl(state)); |
68 | 5.30k | if (_scan_range.meta_scan_range.metadata_type == TMetadataType::ICEBERG) { |
69 | | // TODO: refactor this code |
70 | 0 | auto reader = IcebergSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile, |
71 | 0 | _scan_range.meta_scan_range); |
72 | 0 | RETURN_IF_ERROR(reader->init_reader()); |
73 | 0 | static_cast<IcebergSysTableJniReader*>(reader.get()) |
74 | 0 | ->set_col_name_to_block_idx(&_src_block_name_to_idx); |
75 | 0 | _reader = std::move(reader); |
76 | 5.30k | } else if (_scan_range.meta_scan_range.metadata_type == TMetadataType::PARQUET) { |
77 | 0 | auto reader = ParquetMetadataReader::create_unique(_tuple_desc->slots(), state, _profile, |
78 | 0 | _scan_range.meta_scan_range); |
79 | 0 | RETURN_IF_ERROR(reader->init_reader()); |
80 | 0 | _reader = std::move(reader); |
81 | 5.30k | } else { |
82 | 5.30k | RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range)); |
83 | 5.30k | } |
84 | 5.30k | return Status::OK(); |
85 | 5.30k | } |
86 | | |
87 | 5.30k | Status MetaScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) { |
88 | 5.30k | VLOG_CRITICAL << "MetaScanner::init"; |
89 | 5.30k | RETURN_IF_ERROR(Scanner::init(_state, conjuncts)); |
90 | 5.30k | _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); |
91 | 5.30k | return Status::OK(); |
92 | 5.30k | } |
93 | | |
94 | 9.71k | Status MetaScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { |
95 | 9.71k | VLOG_CRITICAL << "MetaScanner::_get_block_impl"; |
96 | 9.71k | if (nullptr == state || nullptr == block || nullptr == eof) { |
97 | 0 | return Status::InternalError("input is NULL pointer"); |
98 | 0 | } |
99 | | |
100 | | // Build name to index map only once on first call |
101 | 9.71k | if (_src_block_name_to_idx.empty()) { |
102 | 5.30k | _src_block_name_to_idx = block->get_name_to_pos_map(); |
103 | 5.30k | } |
104 | | |
105 | 9.71k | if (_reader) { |
106 | | // TODO: This is a temporary workaround; the code is planned to be refactored later. |
107 | 0 | size_t read_rows = 0; |
108 | 0 | return _reader->get_next_block(block, &read_rows, eof); |
109 | 0 | } |
110 | | |
111 | 9.71k | if (_meta_eos == true) { |
112 | 4.40k | *eof = true; |
113 | 4.40k | return Status::OK(); |
114 | 4.40k | } |
115 | | |
116 | 5.30k | auto column_size = _tuple_desc->slots().size(); |
117 | 5.30k | std::vector<MutableColumnPtr> columns(column_size); |
118 | 5.30k | bool mem_reuse = block->mem_reuse(); |
119 | 5.30k | do { |
120 | 5.30k | RETURN_IF_CANCELLED(state); |
121 | | |
122 | 5.30k | columns.resize(column_size); |
123 | 44.0k | for (auto i = 0; i < column_size; i++) { |
124 | 38.7k | if (mem_reuse) { |
125 | 38.7k | columns[i] = block->get_by_position(i).column->assume_mutable(); |
126 | 38.7k | } else { |
127 | 0 | columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column(); |
128 | 0 | } |
129 | 38.7k | } |
130 | | // fill block |
131 | 5.30k | RETURN_IF_ERROR(_fill_block_with_remote_data(columns)); |
132 | 5.30k | if (_meta_eos == true) { |
133 | 5.30k | if (block->rows() == 0) { |
134 | 883 | *eof = true; |
135 | 883 | } |
136 | 5.30k | break; |
137 | 5.30k | } |
138 | | // Before really use the Block, must clear other ptr of column in block |
139 | | // So here need do std::move and clear in `columns` |
140 | 0 | if (!mem_reuse) { |
141 | 0 | int column_index = 0; |
142 | 0 | for (const auto slot_desc : _tuple_desc->slots()) { |
143 | 0 | block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]), |
144 | 0 | slot_desc->get_data_type_ptr(), |
145 | 0 | slot_desc->col_name())); |
146 | 0 | } |
147 | 0 | } else { |
148 | 0 | columns.clear(); |
149 | 0 | } |
150 | 0 | VLOG_ROW << "VMetaScanNode output rows: " << block->rows(); |
151 | 0 | } while (block->rows() == 0 && !(*eof)); |
152 | 5.30k | return Status::OK(); |
153 | 5.30k | } |
154 | | |
155 | 5.30k | Status MetaScanner::_fill_block_with_remote_data(const std::vector<MutableColumnPtr>& columns) { |
156 | 5.30k | VLOG_CRITICAL << "MetaScanner::_fill_block_with_remote_data"; |
157 | 43.7k | for (int col_idx = 0; col_idx < columns.size(); col_idx++) { |
158 | 38.7k | auto slot_desc = _tuple_desc->slots()[col_idx]; |
159 | | |
160 | 5.64M | for (int _row_idx = 0; _row_idx < _batch_data.size(); _row_idx++) { |
161 | 5.61M | IColumn* col_ptr = columns[col_idx].get(); |
162 | 5.61M | TCell& cell = _batch_data[_row_idx].column_value[col_idx]; |
163 | 5.61M | if (cell.__isset.isNull && cell.isNull) { |
164 | 0 | DCHECK(slot_desc->is_nullable()) |
165 | 0 | << "cell is null but column is not nullable: " << slot_desc->col_name(); |
166 | 0 | auto& null_col = reinterpret_cast<ColumnNullable&>(*col_ptr); |
167 | 0 | null_col.get_nested_column().insert_default(); |
168 | 0 | null_col.get_null_map_data().push_back(1); |
169 | 5.61M | } else { |
170 | 5.61M | if (slot_desc->is_nullable()) { |
171 | 0 | auto& null_col = reinterpret_cast<ColumnNullable&>(*col_ptr); |
172 | 0 | null_col.get_null_map_data().push_back(0); |
173 | 0 | col_ptr = null_col.get_nested_column_ptr().get(); |
174 | 0 | } |
175 | 5.61M | switch (slot_desc->type()->get_primitive_type()) { |
176 | 5.60k | case TYPE_BOOLEAN: { |
177 | 5.60k | bool data = cell.boolVal; |
178 | 5.60k | assert_cast<ColumnBool*>(col_ptr)->insert_value((uint8_t)data); |
179 | 5.60k | break; |
180 | 0 | } |
181 | 0 | case TYPE_TINYINT: { |
182 | 0 | int8_t data = (int8_t)cell.intVal; |
183 | 0 | assert_cast<ColumnInt8*>(col_ptr)->insert_value(data); |
184 | 0 | break; |
185 | 0 | } |
186 | 0 | case TYPE_SMALLINT: { |
187 | 0 | int16_t data = (int16_t)cell.intVal; |
188 | 0 | assert_cast<ColumnInt16*>(col_ptr)->insert_value(data); |
189 | 0 | break; |
190 | 0 | } |
191 | 60 | case TYPE_INT: { |
192 | 60 | int32_t data = cell.intVal; |
193 | 60 | assert_cast<ColumnInt32*>(col_ptr)->insert_value(data); |
194 | 60 | break; |
195 | 0 | } |
196 | 277 | case TYPE_BIGINT: { |
197 | 277 | int64_t data = cell.longVal; |
198 | 277 | assert_cast<ColumnInt64*>(col_ptr)->insert_value(data); |
199 | 277 | break; |
200 | 0 | } |
201 | 0 | case TYPE_FLOAT: { |
202 | 0 | auto data = static_cast<float>(cell.doubleVal); |
203 | 0 | assert_cast<ColumnFloat32*>(col_ptr)->insert_value(data); |
204 | 0 | break; |
205 | 0 | } |
206 | 10 | case TYPE_DOUBLE: { |
207 | 10 | double data = cell.doubleVal; |
208 | 10 | assert_cast<ColumnFloat64*>(col_ptr)->insert_value(data); |
209 | 10 | break; |
210 | 0 | } |
211 | 0 | case TYPE_DATEV2: { |
212 | 0 | uint32_t data = (uint32_t)cell.longVal; |
213 | 0 | assert_cast<ColumnDateV2*>(col_ptr)->insert_value(data); |
214 | 0 | break; |
215 | 0 | } |
216 | 0 | case TYPE_DATETIMEV2: { |
217 | 0 | uint64_t data = cell.longVal; |
218 | 0 | assert_cast<ColumnDateTimeV2*>(col_ptr)->insert_value(data); |
219 | 0 | break; |
220 | 0 | } |
221 | 5.61M | case TYPE_STRING: |
222 | 5.61M | case TYPE_CHAR: |
223 | 5.61M | case TYPE_VARCHAR: { |
224 | 5.61M | std::string data = cell.stringVal; |
225 | 5.61M | assert_cast<ColumnString*>(col_ptr)->insert_data(data.c_str(), data.length()); |
226 | 5.61M | break; |
227 | 5.61M | } |
228 | 0 | default: { |
229 | 0 | std::string error_msg = |
230 | 0 | fmt::format("Invalid column type {} on column: {}.", |
231 | 0 | slot_desc->type()->get_name(), slot_desc->col_name()); |
232 | 0 | return Status::InternalError(std::string(error_msg)); |
233 | 5.61M | } |
234 | 5.61M | } |
235 | 5.61M | } |
236 | 5.61M | } |
237 | 38.7k | } |
238 | 5.00k | _meta_eos = true; |
239 | 5.00k | return Status::OK(); |
240 | 5.30k | } |
241 | | |
242 | 5.30k | Status MetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { |
243 | 5.30k | VLOG_CRITICAL << "MetaScanner::_fetch_metadata"; |
244 | 5.30k | TFetchSchemaTableDataRequest request; |
245 | 5.30k | switch (meta_scan_range.metadata_type) { |
246 | 0 | case TMetadataType::HUDI: |
247 | 0 | RETURN_IF_ERROR(_build_hudi_metadata_request(meta_scan_range, &request)); |
248 | 0 | break; |
249 | 6 | case TMetadataType::BACKENDS: |
250 | 6 | RETURN_IF_ERROR(_build_backends_metadata_request(meta_scan_range, &request)); |
251 | 6 | break; |
252 | 30 | case TMetadataType::FRONTENDS: |
253 | 30 | RETURN_IF_ERROR(_build_frontends_metadata_request(meta_scan_range, &request)); |
254 | 30 | break; |
255 | 30 | case TMetadataType::FRONTENDS_DISKS: |
256 | 1 | RETURN_IF_ERROR(_build_frontends_disks_metadata_request(meta_scan_range, &request)); |
257 | 1 | break; |
258 | 1 | case TMetadataType::WORKLOAD_SCHED_POLICY: |
259 | 0 | RETURN_IF_ERROR(_build_workload_sched_policy_metadata_request(meta_scan_range, &request)); |
260 | 0 | break; |
261 | 2 | case TMetadataType::CATALOGS: |
262 | 2 | RETURN_IF_ERROR(_build_catalogs_metadata_request(meta_scan_range, &request)); |
263 | 2 | break; |
264 | 2.32k | case TMetadataType::MATERIALIZED_VIEWS: |
265 | 2.32k | RETURN_IF_ERROR(_build_materialized_views_metadata_request(meta_scan_range, &request)); |
266 | 2.32k | break; |
267 | 2.32k | case TMetadataType::PARTITIONS: |
268 | 18 | RETURN_IF_ERROR(_build_partitions_metadata_request(meta_scan_range, &request)); |
269 | 18 | break; |
270 | 158 | case TMetadataType::JOBS: |
271 | 158 | RETURN_IF_ERROR(_build_jobs_metadata_request(meta_scan_range, &request)); |
272 | 158 | break; |
273 | 2.76k | case TMetadataType::TASKS: |
274 | 2.76k | RETURN_IF_ERROR(_build_tasks_metadata_request(meta_scan_range, &request)); |
275 | 2.76k | break; |
276 | 2.76k | case TMetadataType::PARTITION_VALUES: |
277 | 0 | RETURN_IF_ERROR(_build_partition_values_metadata_request(meta_scan_range, &request)); |
278 | 0 | break; |
279 | 0 | default: |
280 | 0 | _meta_eos = true; |
281 | 0 | return Status::OK(); |
282 | 5.30k | } |
283 | | |
284 | | // set filter columns |
285 | 5.30k | std::vector<std::string> filter_columns; |
286 | 38.7k | for (const auto& slot : _tuple_desc->slots()) { |
287 | 38.7k | filter_columns.emplace_back(slot->col_name_lower_case()); |
288 | 38.7k | } |
289 | 5.30k | request.metada_table_params.__set_columns_name(filter_columns); |
290 | | |
291 | | // _state->execution_timeout() is seconds, change to milliseconds |
292 | 5.30k | int time_out = _state->execution_timeout() * 1000; |
293 | 5.30k | TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; |
294 | 5.30k | TFetchSchemaTableDataResult result; |
295 | 5.30k | RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
296 | 5.30k | master_addr.hostname, master_addr.port, |
297 | 5.30k | [&request, &result](FrontendServiceConnection& client) { |
298 | 5.30k | client->fetchSchemaTableData(result, request); |
299 | 5.30k | }, |
300 | 5.30k | time_out)); |
301 | | |
302 | 5.30k | Status status(Status::create(result.status)); |
303 | 5.30k | if (!status.ok()) { |
304 | 0 | LOG(WARNING) << "fetch schema table data from master failed, errmsg=" << status; |
305 | 0 | return status; |
306 | 0 | } |
307 | 5.30k | _batch_data = std::move(result.data_batch); |
308 | 5.30k | return Status::OK(); |
309 | 5.30k | } |
310 | | |
311 | | Status MetaScanner::_build_hudi_metadata_request(const TMetaScanRange& meta_scan_range, |
312 | 0 | TFetchSchemaTableDataRequest* request) { |
313 | 0 | VLOG_CRITICAL << "MetaScanner::_build_hudi_metadata_request"; |
314 | 0 | if (!meta_scan_range.__isset.hudi_params) { |
315 | 0 | return Status::InternalError("Can not find THudiMetadataParams from meta_scan_range."); |
316 | 0 | } |
317 | | |
318 | | // create request |
319 | 0 | request->__set_cluster_name(""); |
320 | 0 | request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
321 | | |
322 | | // create TMetadataTableRequestParams |
323 | 0 | TMetadataTableRequestParams metadata_table_params; |
324 | 0 | metadata_table_params.__set_metadata_type(TMetadataType::HUDI); |
325 | 0 | metadata_table_params.__set_hudi_metadata_params(meta_scan_range.hudi_params); |
326 | |
|
327 | 0 | request->__set_metada_table_params(metadata_table_params); |
328 | 0 | return Status::OK(); |
329 | 0 | } |
330 | | |
331 | | Status MetaScanner::_build_backends_metadata_request(const TMetaScanRange& meta_scan_range, |
332 | 6 | TFetchSchemaTableDataRequest* request) { |
333 | 6 | VLOG_CRITICAL << "MetaScanner::_build_backends_metadata_request"; |
334 | 6 | if (!meta_scan_range.__isset.backends_params) { |
335 | 0 | return Status::InternalError("Can not find TBackendsMetadataParams from meta_scan_range."); |
336 | 0 | } |
337 | | // create request |
338 | 6 | request->__set_cluster_name(""); |
339 | 6 | request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
340 | | |
341 | | // create TMetadataTableRequestParams |
342 | 6 | TMetadataTableRequestParams metadata_table_params; |
343 | 6 | metadata_table_params.__set_metadata_type(TMetadataType::BACKENDS); |
344 | 6 | metadata_table_params.__set_backends_metadata_params(meta_scan_range.backends_params); |
345 | | |
346 | 6 | request->__set_metada_table_params(metadata_table_params); |
347 | 6 | return Status::OK(); |
348 | 6 | } |
349 | | |
350 | | Status MetaScanner::_build_frontends_metadata_request(const TMetaScanRange& meta_scan_range, |
351 | 30 | TFetchSchemaTableDataRequest* request) { |
352 | 30 | VLOG_CRITICAL << "MetaScanner::_build_frontends_metadata_request"; |
353 | 30 | if (!meta_scan_range.__isset.frontends_params) { |
354 | 0 | return Status::InternalError("Can not find TFrontendsMetadataParams from meta_scan_range."); |
355 | 0 | } |
356 | | // create request |
357 | 30 | request->__set_cluster_name(""); |
358 | 30 | request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
359 | | |
360 | | // create TMetadataTableRequestParams |
361 | 30 | TMetadataTableRequestParams metadata_table_params; |
362 | 30 | metadata_table_params.__set_metadata_type(TMetadataType::FRONTENDS); |
363 | 30 | metadata_table_params.__set_frontends_metadata_params(meta_scan_range.frontends_params); |
364 | | |
365 | 30 | request->__set_metada_table_params(metadata_table_params); |
366 | 30 | return Status::OK(); |
367 | 30 | } |
368 | | |
369 | | Status MetaScanner::_build_frontends_disks_metadata_request(const TMetaScanRange& meta_scan_range, |
370 | 1 | TFetchSchemaTableDataRequest* request) { |
371 | 1 | VLOG_CRITICAL << "MetaScanner::_build_frontends_metadata_request"; |
372 | 1 | if (!meta_scan_range.__isset.frontends_params) { |
373 | 0 | return Status::InternalError("Can not find TFrontendsMetadataParams from meta_scan_range."); |
374 | 0 | } |
375 | | // create request |
376 | 1 | request->__set_cluster_name(""); |
377 | 1 | request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
378 | | |
379 | | // create TMetadataTableRequestParams |
380 | 1 | TMetadataTableRequestParams metadata_table_params; |
381 | 1 | metadata_table_params.__set_metadata_type(TMetadataType::FRONTENDS_DISKS); |
382 | 1 | metadata_table_params.__set_frontends_metadata_params(meta_scan_range.frontends_params); |
383 | | |
384 | 1 | request->__set_metada_table_params(metadata_table_params); |
385 | 1 | return Status::OK(); |
386 | 1 | } |
387 | | |
388 | | Status MetaScanner::_build_workload_sched_policy_metadata_request( |
389 | 0 | const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) { |
390 | 0 | VLOG_CRITICAL << "MetaScanner::_build_workload_sched_policy_metadata_request"; |
391 | | |
392 | | // create request |
393 | 0 | request->__set_cluster_name(""); |
394 | 0 | request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
395 | | |
396 | | // create TMetadataTableRequestParams |
397 | 0 | TMetadataTableRequestParams metadata_table_params; |
398 | 0 | metadata_table_params.__set_metadata_type(TMetadataType::WORKLOAD_SCHED_POLICY); |
399 | 0 | metadata_table_params.__set_current_user_ident(_user_identity); |
400 | |
|
401 | 0 | request->__set_metada_table_params(metadata_table_params); |
402 | 0 | return Status::OK(); |
403 | 0 | } |
404 | | |
405 | | Status MetaScanner::_build_catalogs_metadata_request(const TMetaScanRange& meta_scan_range, |
406 | 2 | TFetchSchemaTableDataRequest* request) { |
407 | 2 | VLOG_CRITICAL << "MetaScanner::_build_catalogs_metadata_request"; |
408 | | |
409 | | // create request |
410 | 2 | request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
411 | | |
412 | | // create TMetadataTableRequestParams |
413 | 2 | TMetadataTableRequestParams metadata_table_params; |
414 | 2 | metadata_table_params.__set_metadata_type(TMetadataType::CATALOGS); |
415 | 2 | metadata_table_params.__set_current_user_ident(_user_identity); |
416 | | |
417 | 2 | request->__set_metada_table_params(metadata_table_params); |
418 | 2 | return Status::OK(); |
419 | 2 | } |
420 | | |
421 | | Status MetaScanner::_build_materialized_views_metadata_request( |
422 | 2.32k | const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) { |
423 | 2.32k | VLOG_CRITICAL << "MetaScanner::_build_materialized_views_metadata_request"; |
424 | 2.32k | if (!meta_scan_range.__isset.materialized_views_params) { |
425 | 0 | return Status::InternalError( |
426 | 0 | "Can not find TMaterializedViewsMetadataParams from meta_scan_range."); |
427 | 0 | } |
428 | | |
429 | | // create request |
430 | 2.32k | request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
431 | | |
432 | | // create TMetadataTableRequestParams |
433 | 2.32k | TMetadataTableRequestParams metadata_table_params; |
434 | 2.32k | metadata_table_params.__set_metadata_type(TMetadataType::MATERIALIZED_VIEWS); |
435 | 2.32k | metadata_table_params.__set_materialized_views_metadata_params( |
436 | 2.32k | meta_scan_range.materialized_views_params); |
437 | | |
438 | 2.32k | request->__set_metada_table_params(metadata_table_params); |
439 | 2.32k | return Status::OK(); |
440 | 2.32k | } |
441 | | |
442 | | Status MetaScanner::_build_partitions_metadata_request(const TMetaScanRange& meta_scan_range, |
443 | 18 | TFetchSchemaTableDataRequest* request) { |
444 | 18 | VLOG_CRITICAL << "MetaScanner::_build_partitions_metadata_request"; |
445 | 18 | if (!meta_scan_range.__isset.partitions_params) { |
446 | 0 | return Status::InternalError( |
447 | 0 | "Can not find TPartitionsMetadataParams from meta_scan_range."); |
448 | 0 | } |
449 | | |
450 | | // create request |
451 | 18 | request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
452 | | |
453 | | // create TMetadataTableRequestParams |
454 | 18 | TMetadataTableRequestParams metadata_table_params; |
455 | 18 | metadata_table_params.__set_metadata_type(TMetadataType::PARTITIONS); |
456 | 18 | metadata_table_params.__set_partitions_metadata_params(meta_scan_range.partitions_params); |
457 | | |
458 | 18 | request->__set_metada_table_params(metadata_table_params); |
459 | 18 | return Status::OK(); |
460 | 18 | } |
461 | | |
462 | | Status MetaScanner::_build_jobs_metadata_request(const TMetaScanRange& meta_scan_range, |
463 | 158 | TFetchSchemaTableDataRequest* request) { |
464 | 158 | VLOG_CRITICAL << "MetaScanner::_build_jobs_metadata_request"; |
465 | 158 | if (!meta_scan_range.__isset.jobs_params) { |
466 | 0 | return Status::InternalError("Can not find TJobsMetadataParams from meta_scan_range."); |
467 | 0 | } |
468 | | |
469 | | // create request |
470 | 158 | request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
471 | | |
472 | | // create TMetadataTableRequestParams |
473 | 158 | TMetadataTableRequestParams metadata_table_params; |
474 | 158 | metadata_table_params.__set_metadata_type(TMetadataType::JOBS); |
475 | 158 | metadata_table_params.__set_jobs_metadata_params(meta_scan_range.jobs_params); |
476 | | |
477 | 158 | request->__set_metada_table_params(metadata_table_params); |
478 | 158 | return Status::OK(); |
479 | 158 | } |
480 | | |
481 | | Status MetaScanner::_build_tasks_metadata_request(const TMetaScanRange& meta_scan_range, |
482 | 2.76k | TFetchSchemaTableDataRequest* request) { |
483 | 2.76k | VLOG_CRITICAL << "MetaScanner::_build_tasks_metadata_request"; |
484 | 2.76k | if (!meta_scan_range.__isset.tasks_params) { |
485 | 0 | return Status::InternalError("Can not find TTasksMetadataParams from meta_scan_range."); |
486 | 0 | } |
487 | | |
488 | | // create request |
489 | 2.76k | request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
490 | | |
491 | | // create TMetadataTableRequestParams |
492 | 2.76k | TMetadataTableRequestParams metadata_table_params; |
493 | 2.76k | metadata_table_params.__set_metadata_type(TMetadataType::TASKS); |
494 | 2.76k | metadata_table_params.__set_tasks_metadata_params(meta_scan_range.tasks_params); |
495 | | |
496 | 2.76k | request->__set_metada_table_params(metadata_table_params); |
497 | 2.76k | return Status::OK(); |
498 | 2.76k | } |
499 | | |
500 | | Status MetaScanner::_build_partition_values_metadata_request( |
501 | 0 | const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) { |
502 | 0 | VLOG_CRITICAL << "MetaScanner::_build_partition_values_metadata_request"; |
503 | 0 | if (!meta_scan_range.__isset.partition_values_params) { |
504 | 0 | return Status::InternalError( |
505 | 0 | "Can not find TPartitionValuesMetadataParams from meta_scan_range."); |
506 | 0 | } |
507 | | |
508 | | // create request |
509 | 0 | request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
510 | | |
511 | | // create TMetadataTableRequestParams |
512 | 0 | TMetadataTableRequestParams metadata_table_params; |
513 | 0 | metadata_table_params.__set_metadata_type(TMetadataType::PARTITION_VALUES); |
514 | 0 | metadata_table_params.__set_partition_values_metadata_params( |
515 | 0 | meta_scan_range.partition_values_params); |
516 | |
|
517 | 0 | request->__set_metada_table_params(metadata_table_params); |
518 | 0 | return Status::OK(); |
519 | 0 | } |
520 | | |
521 | 5.30k | Status MetaScanner::close(RuntimeState* state) { |
522 | 5.30k | VLOG_CRITICAL << "MetaScanner::close"; |
523 | 5.30k | if (!_try_close()) { |
524 | 0 | return Status::OK(); |
525 | 0 | } |
526 | 5.30k | if (_reader) { |
527 | 0 | RETURN_IF_ERROR(_reader->close()); |
528 | 0 | } |
529 | 5.30k | RETURN_IF_ERROR(Scanner::close(state)); |
530 | 5.30k | return Status::OK(); |
531 | 5.30k | } |
532 | | |
533 | | #include "common/compile_check_end.h" |
534 | | } // namespace doris |