be/src/format/parquet/vparquet_group_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format/parquet/vparquet_group_reader.h" |
19 | | |
20 | | #include <gen_cpp/Exprs_types.h> |
21 | | #include <gen_cpp/Opcodes_types.h> |
22 | | #include <gen_cpp/Types_types.h> |
23 | | #include <gen_cpp/parquet_types.h> |
24 | | #include <string.h> |
25 | | |
26 | | #include <algorithm> |
27 | | #include <boost/iterator/iterator_facade.hpp> |
28 | | #include <memory> |
29 | | #include <ostream> |
30 | | |
31 | | #include "common/config.h" |
32 | | #include "common/consts.h" |
33 | | #include "common/logging.h" |
34 | | #include "common/object_pool.h" |
35 | | #include "common/status.h" |
36 | | #include "core/assert_cast.h" |
37 | | #include "core/block/block.h" |
38 | | #include "core/block/column_with_type_and_name.h" |
39 | | #include "core/column/column_const.h" |
40 | | #include "core/column/column_nullable.h" |
41 | | #include "core/column/column_string.h" |
42 | | #include "core/column/column_struct.h" |
43 | | #include "core/column/column_vector.h" |
44 | | #include "core/custom_allocator.h" |
45 | | #include "core/data_type/data_type.h" |
46 | | #include "core/data_type/data_type_number.h" |
47 | | #include "core/data_type/data_type_string.h" |
48 | | #include "core/data_type/data_type_struct.h" |
49 | | #include "core/data_type/define_primitive_type.h" |
50 | | #include "core/pod_array.h" |
51 | | #include "core/types.h" |
52 | | #include "exprs/create_predicate_function.h" |
53 | | #include "exprs/hybrid_set.h" |
54 | | #include "exprs/vdirect_in_predicate.h" |
55 | | #include "exprs/vectorized_fn_call.h" |
56 | | #include "exprs/vexpr.h" |
57 | | #include "exprs/vexpr_context.h" |
58 | | #include "exprs/vliteral.h" |
59 | | #include "exprs/vslot_ref.h" |
60 | | #include "format/parquet/schema_desc.h" |
61 | | #include "format/parquet/vparquet_column_reader.h" |
62 | | #include "format/table/iceberg_reader.h" |
63 | | #include "runtime/descriptors.h" |
64 | | #include "runtime/runtime_state.h" |
65 | | #include "runtime/thread_context.h" |
66 | | #include "storage/segment/column_reader.h" |
67 | | |
68 | | namespace cctz { |
69 | | class time_zone; |
70 | | } // namespace cctz |
71 | | namespace doris { |
72 | | class RuntimeState; |
73 | | |
74 | | namespace io { |
75 | | struct IOContext; |
76 | | } // namespace io |
77 | | } // namespace doris |
78 | | |
79 | | namespace doris { |
80 | | #include "common/compile_check_begin.h" |
81 | | |
82 | | namespace { |
83 | | Status build_iceberg_rowid_column(const DataTypePtr& type, const std::string& file_path, |
84 | | const std::vector<rowid_t>& row_ids, int32_t partition_spec_id, |
85 | | const std::string& partition_data_json, |
86 | 0 | MutableColumnPtr* column_out) { |
87 | 0 | if (type == nullptr || column_out == nullptr) { |
88 | 0 | return Status::InvalidArgument("Invalid iceberg rowid column type or output column"); |
89 | 0 | } |
90 | | |
91 | 0 | MutableColumnPtr column = type->create_column(); |
92 | 0 | ColumnNullable* nullable_col = check_and_get_column<ColumnNullable>(column.get()); |
93 | 0 | ColumnStruct* struct_col = nullptr; |
94 | 0 | if (nullable_col != nullptr) { |
95 | 0 | struct_col = |
96 | 0 | check_and_get_column<ColumnStruct>(nullable_col->get_nested_column_ptr().get()); |
97 | 0 | } else { |
98 | 0 | struct_col = check_and_get_column<ColumnStruct>(column.get()); |
99 | 0 | } |
100 | |
|
101 | 0 | if (struct_col == nullptr || struct_col->tuple_size() < 4) { |
102 | 0 | return Status::InternalError("Invalid iceberg rowid column structure"); |
103 | 0 | } |
104 | | |
105 | 0 | size_t num_rows = row_ids.size(); |
106 | 0 | auto& file_path_col = struct_col->get_column(0); |
107 | 0 | auto& row_pos_col = struct_col->get_column(1); |
108 | 0 | auto& spec_id_col = struct_col->get_column(2); |
109 | 0 | auto& partition_data_col = struct_col->get_column(3); |
110 | |
|
111 | 0 | file_path_col.reserve(num_rows); |
112 | 0 | row_pos_col.reserve(num_rows); |
113 | 0 | spec_id_col.reserve(num_rows); |
114 | 0 | partition_data_col.reserve(num_rows); |
115 | |
|
116 | 0 | for (size_t i = 0; i < num_rows; ++i) { |
117 | 0 | file_path_col.insert_data(file_path.data(), file_path.size()); |
118 | 0 | } |
119 | 0 | for (size_t i = 0; i < num_rows; ++i) { |
120 | 0 | int64_t row_pos = static_cast<int64_t>(row_ids[i]); |
121 | 0 | row_pos_col.insert_data(reinterpret_cast<const char*>(&row_pos), sizeof(row_pos)); |
122 | 0 | } |
123 | 0 | for (size_t i = 0; i < num_rows; ++i) { |
124 | 0 | int32_t spec_id = partition_spec_id; |
125 | 0 | spec_id_col.insert_data(reinterpret_cast<const char*>(&spec_id), sizeof(spec_id)); |
126 | 0 | } |
127 | 0 | for (size_t i = 0; i < num_rows; ++i) { |
128 | 0 | partition_data_col.insert_data(partition_data_json.data(), partition_data_json.size()); |
129 | 0 | } |
130 | |
|
131 | 0 | if (nullable_col != nullptr) { |
132 | 0 | nullable_col->get_null_map_data().resize_fill(num_rows, 0); |
133 | 0 | } |
134 | |
|
135 | 0 | *column_out = std::move(column); |
136 | 0 | return Status::OK(); |
137 | 0 | } |
138 | | } // namespace |
139 | | const std::vector<int64_t> RowGroupReader::NO_DELETE = {}; |
140 | | static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE = std::numeric_limits<uint32_t>::max(); |
141 | | |
142 | | RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader, |
143 | | const std::vector<std::string>& read_columns, |
144 | | const int32_t row_group_id, const tparquet::RowGroup& row_group, |
145 | | const cctz::time_zone* ctz, io::IOContext* io_ctx, |
146 | | const PositionDeleteContext& position_delete_ctx, |
147 | | const LazyReadContext& lazy_read_ctx, RuntimeState* state, |
148 | | const std::set<uint64_t>& column_ids, |
149 | | const std::set<uint64_t>& filter_column_ids) |
150 | 37 | : _file_reader(file_reader), |
151 | 37 | _read_table_columns(read_columns), |
152 | 37 | _row_group_id(row_group_id), |
153 | 37 | _row_group_meta(row_group), |
154 | 37 | _remaining_rows(row_group.num_rows), |
155 | 37 | _ctz(ctz), |
156 | 37 | _io_ctx(io_ctx), |
157 | 37 | _position_delete_ctx(position_delete_ctx), |
158 | 37 | _lazy_read_ctx(lazy_read_ctx), |
159 | 37 | _state(state), |
160 | 37 | _obj_pool(new ObjectPool()), |
161 | 37 | _column_ids(column_ids), |
162 | 37 | _filter_column_ids(filter_column_ids) {} |
163 | | |
164 | 37 | RowGroupReader::~RowGroupReader() { |
165 | 37 | _column_readers.clear(); |
166 | 37 | _obj_pool->clear(); |
167 | 37 | } |
168 | | |
169 | | Status RowGroupReader::init( |
170 | | const FieldDescriptor& schema, RowRanges& row_ranges, |
171 | | std::unordered_map<int, tparquet::OffsetIndex>& col_offsets, |
172 | | const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, |
173 | | const std::unordered_map<std::string, int>* colname_to_slot_id, |
174 | | const VExprContextSPtrs* not_single_slot_filter_conjuncts, |
175 | 37 | const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) { |
176 | 37 | _tuple_descriptor = tuple_descriptor; |
177 | 37 | _row_descriptor = row_descriptor; |
178 | 37 | _col_name_to_slot_id = colname_to_slot_id; |
179 | 37 | _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts; |
180 | 37 | _read_ranges = row_ranges; |
181 | 37 | _filter_read_ranges_by_condition_cache(); |
182 | 37 | _remaining_rows = _read_ranges.count(); |
183 | | |
184 | 37 | if (_read_table_columns.empty()) { |
185 | | // Query task that only select columns in path. |
186 | 1 | return Status::OK(); |
187 | 1 | } |
188 | 36 | const size_t MAX_GROUP_BUF_SIZE = config::parquet_rowgroup_max_buffer_mb << 20; |
189 | 36 | const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 20; |
190 | 36 | size_t max_buf_size = |
191 | 36 | std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / _read_table_columns.size()); |
192 | 106 | for (const auto& read_table_col : _read_table_columns) { |
193 | 106 | auto read_file_col = _table_info_node_ptr->children_file_column_name(read_table_col); |
194 | 106 | auto* field = schema.get_column(read_file_col); |
195 | 106 | std::unique_ptr<ParquetColumnReader> reader; |
196 | 106 | RETURN_IF_ERROR(ParquetColumnReader::create( |
197 | 106 | _file_reader, field, _row_group_meta, _read_ranges, _ctz, _io_ctx, reader, |
198 | 106 | max_buf_size, col_offsets, _state, false, _column_ids, _filter_column_ids)); |
199 | 106 | if (reader == nullptr) { |
200 | 0 | VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader failed"; |
201 | 0 | return Status::Corruption("Init row group reader failed"); |
202 | 0 | } |
203 | 106 | _column_readers[read_table_col] = std::move(reader); |
204 | 106 | } |
205 | | |
206 | 36 | bool disable_dict_filter = false; |
207 | 36 | if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) { |
208 | 0 | disable_dict_filter = true; |
209 | 0 | _filter_conjuncts.insert(_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(), |
210 | 0 | not_single_slot_filter_conjuncts->end()); |
211 | 0 | } |
212 | | |
213 | | // Check if single slot can be filtered by dict. |
214 | 36 | if (_slot_id_to_filter_conjuncts && !_slot_id_to_filter_conjuncts->empty()) { |
215 | 6 | const std::vector<std::string>& predicate_col_names = |
216 | 6 | _lazy_read_ctx.predicate_columns.first; |
217 | 6 | const std::vector<int>& predicate_col_slot_ids = _lazy_read_ctx.predicate_columns.second; |
218 | 14 | for (size_t i = 0; i < predicate_col_names.size(); ++i) { |
219 | 8 | const std::string& predicate_col_name = predicate_col_names[i]; |
220 | 8 | int slot_id = predicate_col_slot_ids[i]; |
221 | 8 | if (predicate_col_name == IcebergTableReader::ROW_LINEAGE_ROW_ID || |
222 | 8 | predicate_col_name == IcebergTableReader::ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) { |
223 | | // row lineage column can not dict filter. |
224 | 0 | if (_slot_id_to_filter_conjuncts->find(slot_id) != |
225 | 0 | _slot_id_to_filter_conjuncts->end()) { |
226 | 0 | for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) { |
227 | 0 | _filter_conjuncts.push_back(ctx); |
228 | 0 | } |
229 | 0 | } |
230 | 0 | continue; |
231 | 0 | } |
232 | | |
233 | 8 | auto predicate_file_col_name = |
234 | 8 | _table_info_node_ptr->children_file_column_name(predicate_col_name); |
235 | 8 | auto field = schema.get_column(predicate_file_col_name); |
236 | 8 | if (!disable_dict_filter && !_lazy_read_ctx.has_complex_type && |
237 | 8 | _can_filter_by_dict( |
238 | 8 | slot_id, _row_group_meta.columns[field->physical_column_index].meta_data)) { |
239 | 2 | _dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id)); |
240 | 6 | } else { |
241 | 6 | if (_slot_id_to_filter_conjuncts->find(slot_id) != |
242 | 6 | _slot_id_to_filter_conjuncts->end()) { |
243 | 6 | for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) { |
244 | 6 | _filter_conjuncts.push_back(ctx); |
245 | 6 | } |
246 | 6 | } |
247 | 6 | } |
248 | 8 | } |
249 | | // Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts) |
250 | | // to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts. |
251 | 6 | for (auto& kv : _lazy_read_ctx.predicate_partition_columns) { |
252 | 4 | auto& [value, slot_desc] = kv.second; |
253 | 4 | auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id()); |
254 | 4 | if (iter != _slot_id_to_filter_conjuncts->end()) { |
255 | 4 | for (auto& ctx : iter->second) { |
256 | 4 | _filter_conjuncts.push_back(ctx); |
257 | 4 | } |
258 | 4 | } |
259 | 4 | } |
260 | | //For check missing column : missing column == xx, missing column is null,missing column is not null. |
261 | 6 | _filter_conjuncts.insert(_filter_conjuncts.end(), |
262 | 6 | _lazy_read_ctx.missing_columns_conjuncts.begin(), |
263 | 6 | _lazy_read_ctx.missing_columns_conjuncts.end()); |
264 | 6 | RETURN_IF_ERROR(_rewrite_dict_predicates()); |
265 | 6 | } |
266 | | // _state is nullptr in some ut. |
267 | 36 | if (_state && _state->enable_adjust_conjunct_order_by_cost()) { |
268 | 8 | std::ranges::sort(_filter_conjuncts, [](const auto& a, const auto& b) { |
269 | 8 | return a->execute_cost() < b->execute_cost(); |
270 | 8 | }); |
271 | 8 | } |
272 | 36 | return Status::OK(); |
273 | 36 | } |
274 | | |
275 | | bool RowGroupReader::_can_filter_by_dict(int slot_id, |
276 | 8 | const tparquet::ColumnMetaData& column_metadata) { |
277 | 8 | SlotDescriptor* slot = nullptr; |
278 | 8 | const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots(); |
279 | 14 | for (auto each : slots) { |
280 | 14 | if (each->id() == slot_id) { |
281 | 8 | slot = each; |
282 | 8 | break; |
283 | 8 | } |
284 | 14 | } |
285 | 8 | if (!is_string_type(slot->type()->get_primitive_type()) && |
286 | 8 | !is_var_len_object(slot->type()->get_primitive_type())) { |
287 | 6 | return false; |
288 | 6 | } |
289 | 2 | if (column_metadata.type != tparquet::Type::BYTE_ARRAY) { |
290 | 0 | return false; |
291 | 0 | } |
292 | | |
293 | 2 | if (!is_dictionary_encoded(column_metadata)) { |
294 | 0 | return false; |
295 | 0 | } |
296 | | |
297 | 2 | if (_slot_id_to_filter_conjuncts->find(slot_id) == _slot_id_to_filter_conjuncts->end()) { |
298 | 0 | return false; |
299 | 0 | } |
300 | | |
301 | | // TODO: The current implementation of dictionary filtering does not take into account |
302 | | // the implementation of NULL values because the dictionary itself does not contain |
303 | | // NULL value encoding. As a result, many NULL-related functions or expressions |
304 | | // cannot work properly, such as is null, is not null, coalesce, etc. |
305 | | // Here we check if the predicate expr is IN or BINARY_PRED. |
306 | | // Implementation of NULL value dictionary filtering will be carried out later. |
307 | 2 | return std::ranges::all_of(_slot_id_to_filter_conjuncts->at(slot_id), [&](const auto& ctx) { |
308 | 2 | return (ctx->root()->node_type() == TExprNodeType::IN_PRED || |
309 | 2 | ctx->root()->node_type() == TExprNodeType::BINARY_PRED) && |
310 | 2 | ctx->root()->children()[0]->node_type() == TExprNodeType::SLOT_REF; |
311 | 2 | }); |
312 | 2 | } |
313 | | |
314 | | // This function is copied from |
315 | | // https://github.com/apache/impala/blob/master/be/src/exec/parquet/hdfs-parquet-scanner.cc#L1717 |
316 | 2 | bool RowGroupReader::is_dictionary_encoded(const tparquet::ColumnMetaData& column_metadata) { |
317 | | // The Parquet spec allows for column chunks to have mixed encodings |
318 | | // where some data pages are dictionary-encoded and others are plain |
319 | | // encoded. For example, a Parquet file writer might start writing |
320 | | // a column chunk as dictionary encoded, but it will switch to plain |
321 | | // encoding if the dictionary grows too large. |
322 | | // |
323 | | // In order for dictionary filters to skip the entire row group, |
324 | | // the conjuncts must be evaluated on column chunks that are entirely |
325 | | // encoded with the dictionary encoding. There are two checks |
326 | | // available to verify this: |
327 | | // 1. The encoding_stats field on the column chunk metadata provides |
328 | | // information about the number of data pages written in each |
329 | | // format. This allows for a specific check of whether all the |
330 | | // data pages are dictionary encoded. |
331 | | // 2. The encodings field on the column chunk metadata lists the |
332 | | // encodings used. If this list contains the dictionary encoding |
333 | | // and does not include unexpected encodings (i.e. encodings not |
334 | | // associated with definition/repetition levels), then it is entirely |
335 | | // dictionary encoded. |
336 | 2 | if (column_metadata.__isset.encoding_stats) { |
337 | | // Condition #1 above |
338 | 4 | for (const tparquet::PageEncodingStats& enc_stat : column_metadata.encoding_stats) { |
339 | 4 | if (enc_stat.page_type == tparquet::PageType::DATA_PAGE && |
340 | 4 | (enc_stat.encoding != tparquet::Encoding::PLAIN_DICTIONARY && |
341 | 2 | enc_stat.encoding != tparquet::Encoding::RLE_DICTIONARY) && |
342 | 4 | enc_stat.count > 0) { |
343 | 0 | return false; |
344 | 0 | } |
345 | 4 | } |
346 | 2 | } else { |
347 | | // Condition #2 above |
348 | 0 | bool has_dict_encoding = false; |
349 | 0 | bool has_nondict_encoding = false; |
350 | 0 | for (const tparquet::Encoding::type& encoding : column_metadata.encodings) { |
351 | 0 | if (encoding == tparquet::Encoding::PLAIN_DICTIONARY || |
352 | 0 | encoding == tparquet::Encoding::RLE_DICTIONARY) { |
353 | 0 | has_dict_encoding = true; |
354 | 0 | } |
355 | | |
356 | | // RLE and BIT_PACKED are used for repetition/definition levels |
357 | 0 | if (encoding != tparquet::Encoding::PLAIN_DICTIONARY && |
358 | 0 | encoding != tparquet::Encoding::RLE_DICTIONARY && |
359 | 0 | encoding != tparquet::Encoding::RLE && encoding != tparquet::Encoding::BIT_PACKED) { |
360 | 0 | has_nondict_encoding = true; |
361 | 0 | break; |
362 | 0 | } |
363 | 0 | } |
364 | | // Not entirely dictionary encoded if: |
365 | | // 1. No dictionary encoding listed |
366 | | // OR |
367 | | // 2. Some non-dictionary encoding is listed |
368 | 0 | if (!has_dict_encoding || has_nondict_encoding) { |
369 | 0 | return false; |
370 | 0 | } |
371 | 0 | } |
372 | | |
373 | 2 | return true; |
374 | 2 | } |
375 | | |
376 | | Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_rows, |
377 | 49 | bool* batch_eof) { |
378 | 49 | if (_is_row_group_filtered) { |
379 | 2 | *read_rows = 0; |
380 | 2 | *batch_eof = true; |
381 | 2 | return Status::OK(); |
382 | 2 | } |
383 | | |
384 | | // Process external table query task that select columns are all from path. |
385 | 47 | if (_read_table_columns.empty()) { |
386 | 3 | bool modify_row_ids = false; |
387 | 3 | RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof, &modify_row_ids)); |
388 | | |
389 | 3 | RETURN_IF_ERROR( |
390 | 3 | _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns)); |
391 | 3 | RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns)); |
392 | | |
393 | 3 | RETURN_IF_ERROR(_fill_row_id_columns(block, *read_rows, modify_row_ids)); |
394 | 3 | RETURN_IF_ERROR(_append_iceberg_rowid_column(block, *read_rows, modify_row_ids)); |
395 | | |
396 | 3 | Status st = VExprContext::filter_block(_lazy_read_ctx.conjuncts, block, block->columns()); |
397 | 3 | *read_rows = block->rows(); |
398 | 3 | return st; |
399 | 3 | } |
400 | 44 | if (_lazy_read_ctx.can_lazy_read) { |
401 | | // call _do_lazy_read recursively when current batch is skipped |
402 | 4 | return _do_lazy_read(block, batch_size, read_rows, batch_eof); |
403 | 40 | } else { |
404 | 40 | FilterMap filter_map; |
405 | 40 | int64_t batch_base_row = _total_read_rows; |
406 | 40 | RETURN_IF_ERROR((_read_column_data(block, _lazy_read_ctx.all_read_columns, batch_size, |
407 | 40 | read_rows, batch_eof, filter_map))); |
408 | 40 | RETURN_IF_ERROR( |
409 | 40 | _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns)); |
410 | 40 | RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns)); |
411 | 40 | RETURN_IF_ERROR(_fill_row_id_columns(block, *read_rows, false)); |
412 | 40 | RETURN_IF_ERROR(_append_iceberg_rowid_column(block, *read_rows, false)); |
413 | | |
414 | 40 | #ifndef NDEBUG |
415 | 125 | for (auto col : *block) { |
416 | 125 | col.column->sanity_check(); |
417 | 125 | DCHECK(block->rows() == col.column->size()) |
418 | 0 | << absl::Substitute("block rows = $0 , column rows = $1, col name = $2", |
419 | 0 | block->rows(), col.column->size(), col.name); |
420 | 125 | } |
421 | 40 | #endif |
422 | | |
423 | 40 | if (block->rows() == 0) { |
424 | 0 | RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block)); |
425 | 0 | *read_rows = block->rows(); |
426 | 0 | #ifndef NDEBUG |
427 | 0 | for (auto col : *block) { |
428 | 0 | col.column->sanity_check(); |
429 | 0 | DCHECK(block->rows() == col.column->size()) |
430 | 0 | << absl::Substitute("block rows = $0 , column rows = $1, col name = $2", |
431 | 0 | block->rows(), col.column->size(), col.name); |
432 | 0 | } |
433 | 0 | #endif |
434 | 0 | return Status::OK(); |
435 | 0 | } |
436 | 40 | { |
437 | 40 | SCOPED_RAW_TIMER(&_predicate_filter_time); |
438 | 40 | RETURN_IF_ERROR(_build_pos_delete_filter(*read_rows)); |
439 | | |
440 | 40 | std::vector<uint32_t> columns_to_filter; |
441 | 40 | int column_to_keep = block->columns(); |
442 | 40 | columns_to_filter.resize(column_to_keep); |
443 | 165 | for (uint32_t i = 0; i < column_to_keep; ++i) { |
444 | 125 | columns_to_filter[i] = i; |
445 | 125 | } |
446 | 40 | if (!_lazy_read_ctx.conjuncts.empty()) { |
447 | 6 | std::vector<IColumn::Filter*> filters; |
448 | 6 | if (_position_delete_ctx.has_filter) { |
449 | 0 | filters.push_back(_pos_delete_filter_ptr.get()); |
450 | 0 | } |
451 | 6 | IColumn::Filter result_filter(block->rows(), 1); |
452 | 6 | bool can_filter_all = false; |
453 | | |
454 | 6 | { |
455 | 6 | RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts( |
456 | 6 | _filter_conjuncts, &filters, block, &result_filter, &can_filter_all)); |
457 | 6 | } |
458 | | |
459 | | // Condition cache MISS: mark granules with surviving rows (non-lazy path) |
460 | 6 | if (!can_filter_all) { |
461 | 3 | _mark_condition_cache_granules(result_filter.data(), block->rows(), |
462 | 3 | batch_base_row); |
463 | 3 | } |
464 | | |
465 | 6 | if (can_filter_all) { |
466 | 9 | for (auto& col : columns_to_filter) { |
467 | 9 | std::move(*block->get_by_position(col).column).assume_mutable()->clear(); |
468 | 9 | } |
469 | 3 | Block::erase_useless_column(block, column_to_keep); |
470 | 3 | RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block)); |
471 | 3 | return Status::OK(); |
472 | 3 | } |
473 | | |
474 | 3 | RETURN_IF_CATCH_EXCEPTION( |
475 | 3 | Block::filter_block_internal(block, columns_to_filter, result_filter)); |
476 | 3 | Block::erase_useless_column(block, column_to_keep); |
477 | 34 | } else { |
478 | 34 | RETURN_IF_CATCH_EXCEPTION( |
479 | 34 | RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter))); |
480 | 34 | } |
481 | 37 | RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block)); |
482 | 37 | } |
483 | 37 | #ifndef NDEBUG |
484 | 116 | for (auto col : *block) { |
485 | 116 | col.column->sanity_check(); |
486 | 116 | DCHECK(block->rows() == col.column->size()) |
487 | 0 | << absl::Substitute("block rows = $0 , column rows = $1, col name = $2", |
488 | 0 | block->rows(), col.column->size(), col.name); |
489 | 116 | } |
490 | 37 | #endif |
491 | 37 | *read_rows = block->rows(); |
492 | 37 | return Status::OK(); |
493 | 37 | } |
494 | 44 | } |
495 | | |
496 | | // Maps each batch row to its global parquet file position via _read_ranges, then marks |
497 | | // the corresponding condition cache granule as true if the filter indicates the row survived. |
498 | | // batch_seq_start is the number of rows already read sequentially before this batch |
499 | | // (i.e., _total_read_rows before the batch started). |
500 | | void RowGroupReader::_mark_condition_cache_granules(const uint8_t* filter_data, size_t num_rows, |
501 | 6 | int64_t batch_seq_start) { |
502 | 6 | if (!_condition_cache_ctx || _condition_cache_ctx->is_hit) { |
503 | 6 | return; |
504 | 6 | } |
505 | 0 | auto& cache = *_condition_cache_ctx->filter_result; |
506 | 0 | for (size_t i = 0; i < num_rows; i++) { |
507 | 0 | if (filter_data[i]) { |
508 | | // row-group-relative position of this row |
509 | 0 | int64_t rg_pos = _read_ranges.get_row_index_by_pos(batch_seq_start + i); |
510 | | // global row number in the parquet file |
511 | 0 | size_t granule = (_current_row_group_idx.first_row + rg_pos) / |
512 | 0 | ConditionCacheContext::GRANULE_SIZE; |
513 | 0 | size_t cache_idx = granule - _condition_cache_ctx->base_granule; |
514 | 0 | if (cache_idx < cache.size()) { |
515 | 0 | cache[cache_idx] = true; |
516 | 0 | } |
517 | 0 | } |
518 | 0 | } |
519 | 0 | } |
520 | | |
521 | | // On condition cache HIT, removes row ranges whose granules have no surviving rows from |
522 | | // _read_ranges BEFORE column readers are created. This makes ParquetColumnReader skip I/O |
523 | | // entirely for false-granule rows — both predicate and lazy columns — via its existing |
524 | | // page/row-skipping infrastructure. |
525 | 37 | void RowGroupReader::_filter_read_ranges_by_condition_cache() { |
526 | 37 | if (!_condition_cache_ctx || !_condition_cache_ctx->is_hit) { |
527 | 37 | return; |
528 | 37 | } |
529 | 0 | auto& filter_result = *_condition_cache_ctx->filter_result; |
530 | 0 | if (filter_result.empty()) { |
531 | 0 | return; |
532 | 0 | } |
533 | | |
534 | 0 | auto old_row_count = _read_ranges.count(); |
535 | 0 | _read_ranges = |
536 | 0 | filter_ranges_by_cache(_read_ranges, filter_result, _current_row_group_idx.first_row, |
537 | 0 | _condition_cache_ctx->base_granule); |
538 | 0 | _is_row_group_filtered = _read_ranges.is_empty(); |
539 | 0 | _condition_cache_filtered_rows += old_row_count - _read_ranges.count(); |
540 | 0 | } |
541 | | |
542 | | // Filters read_ranges by removing rows whose cache granule is false. |
543 | | // |
544 | | // Cache index i maps to global granule (base_granule + i), which covers global file |
545 | | // rows [(base_granule+i)*GS, (base_granule+i+1)*GS). Since read_ranges uses |
546 | | // row-group-relative indices and first_row is the global position of the row group's |
547 | | // first row, global granule g maps to row-group-relative range: |
548 | | // [max(0, g*GS - first_row), max(0, (g+1)*GS - first_row)) |
549 | | // |
550 | | // We build a RowRanges of all false-granule regions (in row-group-relative coordinates), |
551 | | // then subtract from read_ranges via ranges_exception. |
552 | | // |
553 | | // Granules beyond cache.size() are kept conservatively (assumed true). |
554 | | // |
555 | | // When base_granule > 0, the cache only covers granules starting from base_granule. |
556 | | // This happens when a Parquet file is split across multiple scan ranges and this reader |
557 | | // only processes row groups starting at a non-zero offset in the file. |
558 | | RowRanges RowGroupReader::filter_ranges_by_cache(const RowRanges& read_ranges, |
559 | | const std::vector<bool>& cache, int64_t first_row, |
560 | 21 | int64_t base_granule) { |
561 | 21 | constexpr int64_t GS = ConditionCacheContext::GRANULE_SIZE; |
562 | 21 | RowRanges filtered_ranges; |
563 | | |
564 | 138 | for (size_t i = 0; i < cache.size(); i++) { |
565 | 117 | if (!cache[i]) { |
566 | 64 | int64_t global_granule = base_granule + static_cast<int64_t>(i); |
567 | 64 | int64_t rg_from = std::max(static_cast<int64_t>(0), global_granule * GS - first_row); |
568 | 64 | int64_t rg_to = |
569 | 64 | std::max(static_cast<int64_t>(0), (global_granule + 1) * GS - first_row); |
570 | 64 | if (rg_from < rg_to) { |
571 | 16 | filtered_ranges.add(RowRange(rg_from, rg_to)); |
572 | 16 | } |
573 | 64 | } |
574 | 117 | } |
575 | | |
576 | 21 | RowRanges result; |
577 | 21 | RowRanges::ranges_exception(read_ranges, filtered_ranges, &result); |
578 | 21 | return result; |
579 | 21 | } |
580 | | |
581 | | Status RowGroupReader::_read_column_data(Block* block, |
582 | | const std::vector<std::string>& table_columns, |
583 | | size_t batch_size, size_t* read_rows, bool* batch_eof, |
584 | 49 | FilterMap& filter_map) { |
585 | 49 | size_t batch_read_rows = 0; |
586 | 49 | bool has_eof = false; |
587 | 123 | for (auto& read_col_name : table_columns) { |
588 | 123 | auto& column_with_type_and_name = |
589 | 123 | block->safe_get_by_position((*_col_name_to_block_idx)[read_col_name]); |
590 | 123 | auto& column_ptr = column_with_type_and_name.column; |
591 | 123 | auto& column_type = column_with_type_and_name.type; |
592 | 123 | bool is_dict_filter = false; |
593 | 123 | for (auto& _dict_filter_col : _dict_filter_cols) { |
594 | 0 | if (_dict_filter_col.first == read_col_name) { |
595 | 0 | MutableColumnPtr dict_column = ColumnInt32::create(); |
596 | 0 | if (!_col_name_to_block_idx->contains(read_col_name)) { |
597 | 0 | return Status::InternalError( |
598 | 0 | "Wrong read column '{}' in parquet file, block: {}", read_col_name, |
599 | 0 | block->dump_structure()); |
600 | 0 | } |
601 | 0 | if (column_type->is_nullable()) { |
602 | 0 | block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type = |
603 | 0 | std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>()); |
604 | 0 | block->replace_by_position( |
605 | 0 | (*_col_name_to_block_idx)[read_col_name], |
606 | 0 | ColumnNullable::create(std::move(dict_column), |
607 | 0 | ColumnUInt8::create(dict_column->size(), 0))); |
608 | 0 | } else { |
609 | 0 | block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type = |
610 | 0 | std::make_shared<DataTypeInt32>(); |
611 | 0 | block->replace_by_position((*_col_name_to_block_idx)[read_col_name], |
612 | 0 | std::move(dict_column)); |
613 | 0 | } |
614 | 0 | is_dict_filter = true; |
615 | 0 | break; |
616 | 0 | } |
617 | 0 | } |
618 | | |
619 | 123 | size_t col_read_rows = 0; |
620 | 123 | bool col_eof = false; |
621 | | // Should reset _filter_map_index to 0 when reading next column. |
622 | | // select_vector.reset(); |
623 | 123 | _column_readers[read_col_name]->reset_filter_map_index(); |
624 | 309 | while (!col_eof && col_read_rows < batch_size) { |
625 | 186 | size_t loop_rows = 0; |
626 | 186 | RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data( |
627 | 186 | column_ptr, column_type, _table_info_node_ptr->get_children_node(read_col_name), |
628 | 186 | filter_map, batch_size - col_read_rows, &loop_rows, &col_eof, is_dict_filter)); |
629 | 186 | VLOG_DEBUG << "[RowGroupReader] column '" << read_col_name |
630 | 0 | << "' loop_rows=" << loop_rows << " col_read_rows_so_far=" << col_read_rows |
631 | 0 | << std::endl; |
632 | 186 | col_read_rows += loop_rows; |
633 | 186 | } |
634 | 123 | VLOG_DEBUG << "[RowGroupReader] column '" << read_col_name |
635 | 0 | << "' read_rows=" << col_read_rows << std::endl; |
636 | 123 | if (batch_read_rows > 0 && batch_read_rows != col_read_rows) { |
637 | 0 | LOG(WARNING) << "[RowGroupReader] Mismatched read rows among parquet columns. " |
638 | 0 | "previous_batch_read_rows=" |
639 | 0 | << batch_read_rows << ", current_column='" << read_col_name |
640 | 0 | << "', current_col_read_rows=" << col_read_rows; |
641 | 0 | return Status::Corruption("Can't read the same number of rows among parquet columns"); |
642 | 0 | } |
643 | 123 | batch_read_rows = col_read_rows; |
644 | | |
645 | 123 | #ifndef NDEBUG |
646 | 123 | column_ptr->sanity_check(); |
647 | 123 | #endif |
648 | 123 | if (col_eof) { |
649 | 101 | has_eof = true; |
650 | 101 | } |
651 | 123 | } |
652 | | |
653 | 49 | *read_rows = batch_read_rows; |
654 | 49 | *batch_eof = has_eof; |
655 | | |
656 | 49 | return Status::OK(); |
657 | 49 | } |
658 | | |
659 | | Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* read_rows, |
660 | 4 | bool* batch_eof) { |
661 | 4 | std::unique_ptr<FilterMap> filter_map_ptr = nullptr; |
662 | 4 | size_t pre_read_rows; |
663 | 4 | bool pre_eof; |
664 | 4 | std::vector<uint32_t> columns_to_filter; |
665 | 4 | uint32_t origin_column_num = block->columns(); |
666 | 4 | columns_to_filter.resize(origin_column_num); |
667 | 16 | for (uint32_t i = 0; i < origin_column_num; ++i) { |
668 | 12 | columns_to_filter[i] = i; |
669 | 12 | } |
670 | 4 | IColumn::Filter result_filter; |
671 | 4 | size_t pre_raw_read_rows = 0; |
672 | 6 | while (!_state->is_cancelled()) { |
673 | | // read predicate columns |
674 | 6 | pre_read_rows = 0; |
675 | 6 | pre_eof = false; |
676 | 6 | FilterMap filter_map; |
677 | 6 | int64_t batch_base_row = _total_read_rows; |
678 | 6 | RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns.first, batch_size, |
679 | 6 | &pre_read_rows, &pre_eof, filter_map)); |
680 | 6 | if (pre_read_rows == 0) { |
681 | 0 | DCHECK_EQ(pre_eof, true); |
682 | 0 | break; |
683 | 0 | } |
684 | 6 | pre_raw_read_rows += pre_read_rows; |
685 | | |
686 | 6 | RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows, |
687 | 6 | _lazy_read_ctx.predicate_partition_columns)); |
688 | 6 | RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows, |
689 | 6 | _lazy_read_ctx.predicate_missing_columns)); |
690 | 6 | RETURN_IF_ERROR(_fill_row_id_columns(block, pre_read_rows, false)); |
691 | 6 | RETURN_IF_ERROR(_append_iceberg_rowid_column(block, pre_read_rows, false)); |
692 | | |
693 | 6 | RETURN_IF_ERROR(_build_pos_delete_filter(pre_read_rows)); |
694 | | |
695 | 6 | #ifndef NDEBUG |
696 | 18 | for (auto col : *block) { |
697 | 18 | if (col.column->size() == 0) { // lazy read column. |
698 | 6 | continue; |
699 | 6 | } |
700 | 12 | col.column->sanity_check(); |
701 | 12 | DCHECK(pre_read_rows == col.column->size()) |
702 | 0 | << absl::Substitute("pre_read_rows = $0 , column rows = $1, col name = $2", |
703 | 0 | pre_read_rows, col.column->size(), col.name); |
704 | 12 | } |
705 | 6 | #endif |
706 | | |
707 | 6 | bool can_filter_all = false; |
708 | 6 | bool resize_first_column = _lazy_read_ctx.resize_first_column; |
709 | 6 | if (resize_first_column && _iceberg_rowid_params.enabled) { |
710 | 0 | int row_id_idx = block->get_position_by_name(doris::BeConsts::ICEBERG_ROWID_COL); |
711 | 0 | if (row_id_idx == 0) { |
712 | 0 | resize_first_column = false; |
713 | 0 | } |
714 | 0 | } |
715 | 6 | { |
716 | 6 | SCOPED_RAW_TIMER(&_predicate_filter_time); |
717 | | |
718 | | // generate filter vector |
719 | 6 | if (resize_first_column) { |
720 | | // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0 |
721 | | // The following process may be tricky and time-consuming, but we have no other way. |
722 | 6 | block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows); |
723 | 6 | } |
724 | 6 | result_filter.assign(pre_read_rows, static_cast<unsigned char>(1)); |
725 | 6 | std::vector<IColumn::Filter*> filters; |
726 | 6 | if (_position_delete_ctx.has_filter) { |
727 | 0 | filters.push_back(_pos_delete_filter_ptr.get()); |
728 | 0 | } |
729 | | |
730 | 6 | VExprContextSPtrs filter_contexts; |
731 | 12 | for (auto& conjunct : _filter_conjuncts) { |
732 | 12 | filter_contexts.emplace_back(conjunct); |
733 | 12 | } |
734 | | |
735 | 6 | { |
736 | 6 | RETURN_IF_ERROR(VExprContext::execute_conjuncts(filter_contexts, &filters, block, |
737 | 6 | &result_filter, &can_filter_all)); |
738 | 6 | } |
739 | | |
740 | | // Condition cache MISS: mark granules with surviving rows |
741 | 6 | if (!can_filter_all) { |
742 | 3 | _mark_condition_cache_granules(result_filter.data(), pre_read_rows, batch_base_row); |
743 | 3 | } |
744 | | |
745 | 6 | if (resize_first_column) { |
746 | | // We have to clean the first column to insert right data. |
747 | 6 | block->get_by_position(0).column->assume_mutable()->clear(); |
748 | 6 | } |
749 | 6 | } |
750 | | |
751 | 0 | const uint8_t* __restrict filter_map_data = result_filter.data(); |
752 | 6 | filter_map_ptr = std::make_unique<FilterMap>(); |
753 | 6 | RETURN_IF_ERROR(filter_map_ptr->init(filter_map_data, pre_read_rows, can_filter_all)); |
754 | 6 | if (filter_map_ptr->filter_all()) { |
755 | 3 | { |
756 | 3 | SCOPED_RAW_TIMER(&_predicate_filter_time); |
757 | 3 | for (const auto& col : _lazy_read_ctx.predicate_columns.first) { |
758 | | // clean block to read predicate columns |
759 | 3 | block->get_by_position((*_col_name_to_block_idx)[col]) |
760 | 3 | .column->assume_mutable() |
761 | 3 | ->clear(); |
762 | 3 | } |
763 | 3 | for (const auto& col : _lazy_read_ctx.predicate_partition_columns) { |
764 | 3 | block->get_by_position((*_col_name_to_block_idx)[col.first]) |
765 | 3 | .column->assume_mutable() |
766 | 3 | ->clear(); |
767 | 3 | } |
768 | 3 | for (const auto& col : _lazy_read_ctx.predicate_missing_columns) { |
769 | 0 | block->get_by_position((*_col_name_to_block_idx)[col.first]) |
770 | 0 | .column->assume_mutable() |
771 | 0 | ->clear(); |
772 | 0 | } |
773 | 3 | if (_row_id_column_iterator_pair.first != nullptr) { |
774 | 0 | block->get_by_position(_row_id_column_iterator_pair.second) |
775 | 0 | .column->assume_mutable() |
776 | 0 | ->clear(); |
777 | 0 | } |
778 | 3 | if (_iceberg_rowid_params.enabled) { |
779 | 0 | int row_id_idx = |
780 | 0 | block->get_position_by_name(doris::BeConsts::ICEBERG_ROWID_COL); |
781 | 0 | if (row_id_idx >= 0) { |
782 | 0 | block->get_by_position(static_cast<size_t>(row_id_idx)) |
783 | 0 | .column->assume_mutable() |
784 | 0 | ->clear(); |
785 | 0 | } |
786 | 0 | } |
787 | 3 | Block::erase_useless_column(block, origin_column_num); |
788 | 3 | } |
789 | | |
790 | 3 | if (!pre_eof) { |
791 | | // If continuous batches are skipped, we can cache them to skip a whole page |
792 | 2 | _cached_filtered_rows += pre_read_rows; |
793 | 2 | if (pre_raw_read_rows >= config::doris_scanner_row_num) { |
794 | 0 | *read_rows = 0; |
795 | 0 | RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block)); |
796 | 0 | return Status::OK(); |
797 | 0 | } |
798 | 2 | } else { // pre_eof |
799 | | // If filter_map_ptr->filter_all() and pre_eof, we can skip whole row group. |
800 | 1 | *read_rows = 0; |
801 | 1 | *batch_eof = true; |
802 | 1 | _lazy_read_filtered_rows += (pre_read_rows + _cached_filtered_rows); |
803 | 1 | RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block)); |
804 | 1 | return Status::OK(); |
805 | 1 | } |
806 | 3 | } else { |
807 | 3 | break; |
808 | 3 | } |
809 | 6 | } |
810 | 3 | if (_state->is_cancelled()) { |
811 | 0 | return Status::Cancelled("cancelled"); |
812 | 0 | } |
813 | | |
814 | 3 | if (filter_map_ptr == nullptr) { |
815 | 0 | DCHECK_EQ(pre_read_rows + _cached_filtered_rows, 0); |
816 | 0 | *read_rows = 0; |
817 | 0 | *batch_eof = true; |
818 | 0 | return Status::OK(); |
819 | 0 | } |
820 | | |
821 | 3 | FilterMap& filter_map = *filter_map_ptr; |
822 | 3 | DorisUniqueBufferPtr<uint8_t> rebuild_filter_map = nullptr; |
823 | 3 | if (_cached_filtered_rows != 0) { |
824 | 0 | RETURN_IF_ERROR(_rebuild_filter_map(filter_map, rebuild_filter_map, pre_read_rows)); |
825 | 0 | pre_read_rows += _cached_filtered_rows; |
826 | 0 | _cached_filtered_rows = 0; |
827 | 0 | } |
828 | | |
829 | | // lazy read columns |
830 | 3 | size_t lazy_read_rows; |
831 | 3 | bool lazy_eof; |
832 | 3 | RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.lazy_read_columns, pre_read_rows, |
833 | 3 | &lazy_read_rows, &lazy_eof, filter_map)); |
834 | | |
835 | 3 | if (pre_read_rows != lazy_read_rows) { |
836 | 0 | return Status::Corruption("Can't read the same number of rows when doing lazy read"); |
837 | 0 | } |
838 | | // pre_eof ^ lazy_eof |
839 | | // we set pre_read_rows as batch_size for lazy read columns, so pre_eof != lazy_eof |
840 | | |
841 | | // filter data in predicate columns, and remove filter column |
842 | 3 | { |
843 | 3 | SCOPED_RAW_TIMER(&_predicate_filter_time); |
844 | 3 | if (filter_map.has_filter()) { |
845 | 0 | std::vector<uint32_t> predicate_columns = _lazy_read_ctx.all_predicate_col_ids; |
846 | 0 | if (_iceberg_rowid_params.enabled) { |
847 | 0 | int row_id_idx = block->get_position_by_name(doris::BeConsts::ICEBERG_ROWID_COL); |
848 | 0 | if (row_id_idx >= 0 && |
849 | 0 | std::find(predicate_columns.begin(), predicate_columns.end(), |
850 | 0 | static_cast<uint32_t>(row_id_idx)) == predicate_columns.end()) { |
851 | 0 | predicate_columns.push_back(static_cast<uint32_t>(row_id_idx)); |
852 | 0 | } |
853 | 0 | } |
854 | 0 | RETURN_IF_CATCH_EXCEPTION( |
855 | 0 | Block::filter_block_internal(block, predicate_columns, result_filter)); |
856 | 0 | Block::erase_useless_column(block, origin_column_num); |
857 | |
|
858 | 3 | } else { |
859 | 3 | Block::erase_useless_column(block, origin_column_num); |
860 | 3 | } |
861 | 3 | } |
862 | | |
863 | 3 | RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block)); |
864 | | |
865 | 3 | size_t column_num = block->columns(); |
866 | 3 | size_t column_size = 0; |
867 | 12 | for (int i = 0; i < column_num; ++i) { |
868 | 9 | size_t cz = block->get_by_position(i).column->size(); |
869 | 9 | if (column_size != 0 && cz != 0) { |
870 | 6 | DCHECK_EQ(column_size, cz); |
871 | 6 | } |
872 | 9 | if (cz != 0) { |
873 | 9 | column_size = cz; |
874 | 9 | } |
875 | 9 | } |
876 | 3 | _lazy_read_filtered_rows += pre_read_rows - column_size; |
877 | 3 | *read_rows = column_size; |
878 | | |
879 | 3 | *batch_eof = pre_eof; |
880 | 3 | RETURN_IF_ERROR(_fill_partition_columns(block, column_size, _lazy_read_ctx.partition_columns)); |
881 | 3 | RETURN_IF_ERROR(_fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns)); |
882 | 3 | #ifndef NDEBUG |
883 | 9 | for (auto col : *block) { |
884 | 9 | col.column->sanity_check(); |
885 | 9 | DCHECK(block->rows() == col.column->size()) |
886 | 0 | << absl::Substitute("block rows = $0 , column rows = $1, col name = $2", |
887 | 0 | block->rows(), col.column->size(), col.name); |
888 | 9 | } |
889 | 3 | #endif |
890 | 3 | return Status::OK(); |
891 | 3 | } |
892 | | |
893 | | Status RowGroupReader::_rebuild_filter_map(FilterMap& filter_map, |
894 | | DorisUniqueBufferPtr<uint8_t>& filter_map_data, |
895 | 0 | size_t pre_read_rows) const { |
896 | 0 | if (_cached_filtered_rows == 0) { |
897 | 0 | return Status::OK(); |
898 | 0 | } |
899 | 0 | size_t total_rows = _cached_filtered_rows + pre_read_rows; |
900 | 0 | if (filter_map.filter_all()) { |
901 | 0 | RETURN_IF_ERROR(filter_map.init(nullptr, total_rows, true)); |
902 | 0 | return Status::OK(); |
903 | 0 | } |
904 | | |
905 | 0 | filter_map_data = make_unique_buffer<uint8_t>(total_rows); |
906 | 0 | auto* map = filter_map_data.get(); |
907 | 0 | for (size_t i = 0; i < _cached_filtered_rows; ++i) { |
908 | 0 | map[i] = 0; |
909 | 0 | } |
910 | 0 | const uint8_t* old_map = filter_map.filter_map_data(); |
911 | 0 | if (old_map == nullptr) { |
912 | | // select_vector.filter_all() == true is already built. |
913 | 0 | for (size_t i = _cached_filtered_rows; i < total_rows; ++i) { |
914 | 0 | map[i] = 1; |
915 | 0 | } |
916 | 0 | } else { |
917 | 0 | memcpy(map + _cached_filtered_rows, old_map, pre_read_rows); |
918 | 0 | } |
919 | 0 | RETURN_IF_ERROR(filter_map.init(map, total_rows, false)); |
920 | 0 | return Status::OK(); |
921 | 0 | } |
922 | | |
923 | | Status RowGroupReader::_fill_partition_columns( |
924 | | Block* block, size_t rows, |
925 | | const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& |
926 | 52 | partition_columns) { |
927 | 52 | DataTypeSerDe::FormatOptions _text_formatOptions; |
928 | 52 | for (const auto& kv : partition_columns) { |
929 | 15 | auto doris_column = block->get_by_position((*_col_name_to_block_idx)[kv.first]).column; |
930 | | // obtained from block*, it is a mutable object. |
931 | 15 | auto* col_ptr = const_cast<IColumn*>(doris_column.get()); |
932 | 15 | const auto& [value, slot_desc] = kv.second; |
933 | 15 | auto _text_serde = slot_desc->get_data_type_ptr()->get_serde(); |
934 | 15 | Slice slice(value.data(), value.size()); |
935 | 15 | uint64_t num_deserialized = 0; |
936 | | // Be careful when reading empty rows from parquet row groups. |
937 | 15 | if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows, |
938 | 15 | &num_deserialized, |
939 | 15 | _text_formatOptions) != Status::OK()) { |
940 | 0 | return Status::InternalError("Failed to fill partition column: {}={}", |
941 | 0 | slot_desc->col_name(), value); |
942 | 0 | } |
943 | 15 | if (num_deserialized != rows) { |
944 | 0 | return Status::InternalError( |
945 | 0 | "Failed to fill partition column: {}={} ." |
946 | 0 | "Number of rows expected to be written : {}, number of rows actually written : " |
947 | 0 | "{}", |
948 | 0 | slot_desc->col_name(), value, num_deserialized, rows); |
949 | 0 | } |
950 | 15 | } |
951 | 52 | return Status::OK(); |
952 | 52 | } |
953 | | |
954 | | Status RowGroupReader::_fill_missing_columns( |
955 | | Block* block, size_t rows, |
956 | 52 | const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) { |
957 | 52 | for (const auto& kv : missing_columns) { |
958 | 0 | if (!_col_name_to_block_idx->contains(kv.first)) { |
959 | 0 | return Status::InternalError("Missing column: {} not found in block {}", kv.first, |
960 | 0 | block->dump_structure()); |
961 | 0 | } |
962 | 0 | if (kv.second == nullptr) { |
963 | | // no default column, fill with null |
964 | 0 | auto mutable_column = block->get_by_position((*_col_name_to_block_idx)[kv.first]) |
965 | 0 | .column->assume_mutable(); |
966 | 0 | auto* nullable_column = assert_cast<ColumnNullable*>(mutable_column.get()); |
967 | 0 | nullable_column->insert_many_defaults(rows); |
968 | 0 | } else { |
969 | | // fill with default value |
970 | 0 | const auto& ctx = kv.second; |
971 | 0 | ColumnPtr result_column_ptr; |
972 | | // PT1 => dest primitive type |
973 | 0 | RETURN_IF_ERROR(ctx->execute(block, result_column_ptr)); |
974 | 0 | if (result_column_ptr->use_count() == 1) { |
975 | | // call resize because the first column of _src_block_ptr may not be filled by reader, |
976 | | // so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()` |
977 | | // has only one row. |
978 | 0 | auto mutable_column = result_column_ptr->assume_mutable(); |
979 | 0 | mutable_column->resize(rows); |
980 | | // result_column_ptr maybe a ColumnConst, convert it to a normal column |
981 | 0 | result_column_ptr = result_column_ptr->convert_to_full_column_if_const(); |
982 | 0 | auto origin_column_type = |
983 | 0 | block->get_by_position((*_col_name_to_block_idx)[kv.first]).type; |
984 | 0 | bool is_nullable = origin_column_type->is_nullable(); |
985 | 0 | block->replace_by_position( |
986 | 0 | (*_col_name_to_block_idx)[kv.first], |
987 | 0 | is_nullable ? make_nullable(result_column_ptr) : result_column_ptr); |
988 | 0 | } |
989 | 0 | } |
990 | 0 | } |
991 | 52 | return Status::OK(); |
992 | 52 | } |
993 | | |
994 | | Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof, |
995 | 3 | bool* modify_row_ids) { |
996 | 3 | *modify_row_ids = false; |
997 | 3 | if (_position_delete_ctx.has_filter) { |
998 | 0 | int64_t start_row_id = _position_delete_ctx.current_row_id; |
999 | 0 | int64_t end_row_id = std::min(_position_delete_ctx.current_row_id + (int64_t)batch_size, |
1000 | 0 | _position_delete_ctx.last_row_id); |
1001 | 0 | int64_t num_delete_rows = 0; |
1002 | 0 | auto before_index = _position_delete_ctx.index; |
1003 | 0 | while (_position_delete_ctx.index < _position_delete_ctx.end_index) { |
1004 | 0 | const int64_t& delete_row_id = |
1005 | 0 | _position_delete_ctx.delete_rows[_position_delete_ctx.index]; |
1006 | 0 | if (delete_row_id < start_row_id) { |
1007 | 0 | _position_delete_ctx.index++; |
1008 | 0 | before_index = _position_delete_ctx.index; |
1009 | 0 | } else if (delete_row_id < end_row_id) { |
1010 | 0 | num_delete_rows++; |
1011 | 0 | _position_delete_ctx.index++; |
1012 | 0 | } else { // delete_row_id >= end_row_id |
1013 | 0 | break; |
1014 | 0 | } |
1015 | 0 | } |
1016 | 0 | *read_rows = end_row_id - start_row_id - num_delete_rows; |
1017 | 0 | _position_delete_ctx.current_row_id = end_row_id; |
1018 | 0 | *batch_eof = _position_delete_ctx.current_row_id == _position_delete_ctx.last_row_id; |
1019 | |
|
1020 | 0 | if (_row_id_column_iterator_pair.first != nullptr || _iceberg_rowid_params.enabled || |
1021 | 0 | (_row_lineage_columns != nullptr && _row_lineage_columns->need_row_ids())) { |
1022 | 0 | *modify_row_ids = true; |
1023 | 0 | _current_batch_row_ids.clear(); |
1024 | 0 | _current_batch_row_ids.resize(*read_rows); |
1025 | 0 | size_t idx = 0; |
1026 | 0 | for (auto id = start_row_id; id < end_row_id; id++) { |
1027 | 0 | if (before_index < _position_delete_ctx.index && |
1028 | 0 | id == _position_delete_ctx.delete_rows[before_index]) { |
1029 | 0 | before_index++; |
1030 | 0 | continue; |
1031 | 0 | } |
1032 | 0 | _current_batch_row_ids[idx++] = (rowid_t)id; |
1033 | 0 | } |
1034 | 0 | } |
1035 | 3 | } else { |
1036 | 3 | if (batch_size < _remaining_rows) { |
1037 | 2 | *read_rows = batch_size; |
1038 | 2 | _remaining_rows -= batch_size; |
1039 | 2 | *batch_eof = false; |
1040 | 2 | } else { |
1041 | 1 | *read_rows = _remaining_rows; |
1042 | 1 | _remaining_rows = 0; |
1043 | 1 | *batch_eof = true; |
1044 | 1 | } |
1045 | 3 | if (_iceberg_rowid_params.enabled) { |
1046 | 0 | *modify_row_ids = true; |
1047 | 0 | RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows)); |
1048 | 0 | } |
1049 | 3 | } |
1050 | 3 | _total_read_rows += *read_rows; |
1051 | 3 | return Status::OK(); |
1052 | 3 | } |
1053 | | |
1054 | 5 | Status RowGroupReader::_get_current_batch_row_id(size_t read_rows) { |
1055 | 5 | _current_batch_row_ids.clear(); |
1056 | 5 | _current_batch_row_ids.resize(read_rows); |
1057 | | |
1058 | 5 | int64_t idx = 0; |
1059 | 5 | int64_t read_range_rows = 0; |
1060 | 19 | for (size_t range_idx = 0; range_idx < _read_ranges.range_size(); range_idx++) { |
1061 | 14 | auto range = _read_ranges.get_range(range_idx); |
1062 | 14 | if (read_rows == 0) { |
1063 | 0 | break; |
1064 | 0 | } |
1065 | 14 | if (read_range_rows + (range.to() - range.from()) > _total_read_rows) { |
1066 | 14 | int64_t fi = |
1067 | 14 | std::max(_total_read_rows, read_range_rows) - read_range_rows + range.from(); |
1068 | 14 | size_t len = std::min(read_rows, (size_t)(std::max(range.to(), fi) - fi)); |
1069 | | |
1070 | 14 | read_rows -= len; |
1071 | | |
1072 | 28 | for (auto i = 0; i < len; i++) { |
1073 | 14 | _current_batch_row_ids[idx++] = |
1074 | 14 | (rowid_t)(fi + i + _current_row_group_idx.first_row); |
1075 | 14 | } |
1076 | 14 | } |
1077 | 14 | read_range_rows += range.to() - range.from(); |
1078 | 14 | } |
1079 | 5 | return Status::OK(); |
1080 | 5 | } |
1081 | | |
1082 | | Status RowGroupReader::_fill_row_id_columns(Block* block, size_t read_rows, |
1083 | 49 | bool is_current_row_ids) { |
1084 | 49 | const bool need_row_ids = |
1085 | 49 | _row_id_column_iterator_pair.first != nullptr || |
1086 | 49 | (_row_lineage_columns != nullptr && _row_lineage_columns->need_row_ids()); |
1087 | 49 | if (need_row_ids && !is_current_row_ids) { |
1088 | 5 | RETURN_IF_ERROR(_get_current_batch_row_id(read_rows)); |
1089 | 5 | } |
1090 | 49 | if (_row_id_column_iterator_pair.first != nullptr) { |
1091 | 5 | auto col = block->get_by_position(_row_id_column_iterator_pair.second) |
1092 | 5 | .column->assume_mutable(); |
1093 | 5 | RETURN_IF_ERROR(_row_id_column_iterator_pair.first->read_by_rowids( |
1094 | 5 | _current_batch_row_ids.data(), _current_batch_row_ids.size(), col)); |
1095 | 5 | } |
1096 | | |
1097 | 49 | if (_row_lineage_columns != nullptr && _row_lineage_columns->need_row_ids() && |
1098 | 49 | _row_lineage_columns->first_row_id >= 0) { |
1099 | 0 | auto col = block->get_by_position(_row_lineage_columns->row_id_column_idx) |
1100 | 0 | .column->assume_mutable(); |
1101 | 0 | auto* nullable_column = assert_cast<ColumnNullable*>(col.get()); |
1102 | 0 | auto& null_map = nullable_column->get_null_map_data(); |
1103 | 0 | auto& data = |
1104 | 0 | assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data(); |
1105 | 0 | for (size_t i = 0; i < read_rows; ++i) { |
1106 | 0 | if (null_map[i] != 0) { |
1107 | 0 | null_map[i] = 0; |
1108 | 0 | data[i] = _row_lineage_columns->first_row_id + |
1109 | 0 | static_cast<int64_t>(_current_batch_row_ids[i]); |
1110 | 0 | } |
1111 | 0 | } |
1112 | 0 | } |
1113 | | |
1114 | 49 | if (_row_lineage_columns != nullptr && |
1115 | 49 | _row_lineage_columns->has_last_updated_sequence_number_column() && |
1116 | 49 | _row_lineage_columns->last_updated_sequence_number >= 0) { |
1117 | 0 | auto col = block->get_by_position( |
1118 | 0 | _row_lineage_columns->last_updated_sequence_number_column_idx) |
1119 | 0 | .column->assume_mutable(); |
1120 | 0 | auto* nullable_column = assert_cast<ColumnNullable*>(col.get()); |
1121 | 0 | auto& null_map = nullable_column->get_null_map_data(); |
1122 | 0 | auto& data = |
1123 | 0 | assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data(); |
1124 | 0 | for (size_t i = 0; i < read_rows; ++i) { |
1125 | 0 | if (null_map[i] != 0) { |
1126 | 0 | null_map[i] = 0; |
1127 | 0 | data[i] = _row_lineage_columns->last_updated_sequence_number; |
1128 | 0 | } |
1129 | 0 | } |
1130 | 0 | } |
1131 | | |
1132 | 49 | return Status::OK(); |
1133 | 49 | } |
1134 | | |
1135 | | Status RowGroupReader::_append_iceberg_rowid_column(Block* block, size_t read_rows, |
1136 | 49 | bool is_current_row_ids) { |
1137 | 49 | if (!_iceberg_rowid_params.enabled) { |
1138 | 49 | return Status::OK(); |
1139 | 49 | } |
1140 | 0 | if (!is_current_row_ids) { |
1141 | 0 | RETURN_IF_ERROR(_get_current_batch_row_id(read_rows)); |
1142 | 0 | } |
1143 | | |
1144 | 0 | int row_id_idx = block->get_position_by_name(doris::BeConsts::ICEBERG_ROWID_COL); |
1145 | 0 | if (row_id_idx >= 0) { |
1146 | 0 | auto& col_with_type = block->get_by_position(static_cast<size_t>(row_id_idx)); |
1147 | 0 | MutableColumnPtr row_id_column; |
1148 | 0 | RETURN_IF_ERROR(build_iceberg_rowid_column( |
1149 | 0 | col_with_type.type, _iceberg_rowid_params.file_path, _current_batch_row_ids, |
1150 | 0 | _iceberg_rowid_params.partition_spec_id, _iceberg_rowid_params.partition_data_json, |
1151 | 0 | &row_id_column)); |
1152 | 0 | col_with_type.column = std::move(row_id_column); |
1153 | 0 | } else { |
1154 | 0 | DataTypes field_types; |
1155 | 0 | field_types.push_back(std::make_shared<DataTypeString>()); |
1156 | 0 | field_types.push_back(std::make_shared<DataTypeInt64>()); |
1157 | 0 | field_types.push_back(std::make_shared<DataTypeInt32>()); |
1158 | 0 | field_types.push_back(std::make_shared<DataTypeString>()); |
1159 | |
|
1160 | 0 | std::vector<std::string> field_names = {"file_path", "row_position", "partition_spec_id", |
1161 | 0 | "partition_data"}; |
1162 | |
|
1163 | 0 | auto row_id_type = std::make_shared<DataTypeStruct>(field_types, field_names); |
1164 | 0 | MutableColumnPtr row_id_column; |
1165 | 0 | RETURN_IF_ERROR(build_iceberg_rowid_column( |
1166 | 0 | row_id_type, _iceberg_rowid_params.file_path, _current_batch_row_ids, |
1167 | 0 | _iceberg_rowid_params.partition_spec_id, _iceberg_rowid_params.partition_data_json, |
1168 | 0 | &row_id_column)); |
1169 | 0 | int insert_pos = _iceberg_rowid_params.row_id_column_pos; |
1170 | 0 | if (insert_pos < 0 || insert_pos > static_cast<int>(block->columns())) { |
1171 | 0 | insert_pos = static_cast<int>(block->columns()); |
1172 | 0 | } |
1173 | 0 | block->insert(static_cast<size_t>(insert_pos), |
1174 | 0 | ColumnWithTypeAndName(std::move(row_id_column), row_id_type, |
1175 | 0 | doris::BeConsts::ICEBERG_ROWID_COL)); |
1176 | 0 | } |
1177 | | |
1178 | 0 | if (_col_name_to_block_idx != nullptr) { |
1179 | 0 | *_col_name_to_block_idx = block->get_name_to_pos_map(); |
1180 | 0 | } |
1181 | |
|
1182 | 0 | return Status::OK(); |
1183 | 0 | } |
1184 | | |
1185 | 46 | Status RowGroupReader::_build_pos_delete_filter(size_t read_rows) { |
1186 | 46 | if (!_position_delete_ctx.has_filter) { |
1187 | 46 | _pos_delete_filter_ptr.reset(nullptr); |
1188 | 46 | _total_read_rows += read_rows; |
1189 | 46 | return Status::OK(); |
1190 | 46 | } |
1191 | 0 | _pos_delete_filter_ptr.reset(new IColumn::Filter(read_rows, 1)); |
1192 | 0 | auto* __restrict _pos_delete_filter_data = _pos_delete_filter_ptr->data(); |
1193 | 0 | while (_position_delete_ctx.index < _position_delete_ctx.end_index) { |
1194 | 0 | const int64_t delete_row_index_in_row_group = |
1195 | 0 | _position_delete_ctx.delete_rows[_position_delete_ctx.index] - |
1196 | 0 | _position_delete_ctx.first_row_id; |
1197 | 0 | int64_t read_range_rows = 0; |
1198 | 0 | size_t remaining_read_rows = _total_read_rows + read_rows; |
1199 | 0 | for (size_t range_idx = 0; range_idx < _read_ranges.range_size(); range_idx++) { |
1200 | 0 | auto range = _read_ranges.get_range(range_idx); |
1201 | 0 | if (delete_row_index_in_row_group < range.from()) { |
1202 | 0 | ++_position_delete_ctx.index; |
1203 | 0 | break; |
1204 | 0 | } else if (delete_row_index_in_row_group < range.to()) { |
1205 | 0 | int64_t index = (delete_row_index_in_row_group - range.from()) + read_range_rows - |
1206 | 0 | _total_read_rows; |
1207 | 0 | if (index > read_rows - 1) { |
1208 | 0 | _total_read_rows += read_rows; |
1209 | 0 | return Status::OK(); |
1210 | 0 | } |
1211 | 0 | _pos_delete_filter_data[index] = 0; |
1212 | 0 | ++_position_delete_ctx.index; |
1213 | 0 | break; |
1214 | 0 | } else { // delete_row >= range.last_row |
1215 | 0 | } |
1216 | | |
1217 | 0 | int64_t range_size = range.to() - range.from(); |
1218 | | // Don't search next range when there is no remaining_read_rows. |
1219 | 0 | if (remaining_read_rows <= range_size) { |
1220 | 0 | _total_read_rows += read_rows; |
1221 | 0 | return Status::OK(); |
1222 | 0 | } else { |
1223 | 0 | remaining_read_rows -= range_size; |
1224 | 0 | read_range_rows += range_size; |
1225 | 0 | } |
1226 | 0 | } |
1227 | 0 | } |
1228 | 0 | _total_read_rows += read_rows; |
1229 | 0 | return Status::OK(); |
1230 | 0 | } |
1231 | | |
1232 | | // need exception safety |
1233 | | Status RowGroupReader::_filter_block(Block* block, int column_to_keep, |
1234 | 34 | const std::vector<uint32_t>& columns_to_filter) { |
1235 | 34 | if (_pos_delete_filter_ptr) { |
1236 | 0 | RETURN_IF_CATCH_EXCEPTION( |
1237 | 0 | Block::filter_block_internal(block, columns_to_filter, (*_pos_delete_filter_ptr))); |
1238 | 0 | } |
1239 | 34 | Block::erase_useless_column(block, column_to_keep); |
1240 | | |
1241 | 34 | return Status::OK(); |
1242 | 34 | } |
1243 | | |
1244 | 6 | Status RowGroupReader::_rewrite_dict_predicates() { |
1245 | 6 | SCOPED_RAW_TIMER(&_dict_filter_rewrite_time); |
1246 | 6 | for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) { |
1247 | 2 | std::string& dict_filter_col_name = it->first; |
1248 | 2 | int slot_id = it->second; |
1249 | | // 1. Get dictionary values to a string column. |
1250 | 2 | MutableColumnPtr dict_value_column = ColumnString::create(); |
1251 | 2 | bool has_dict = false; |
1252 | 2 | RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->read_dict_values_to_column( |
1253 | 2 | dict_value_column, &has_dict)); |
1254 | 2 | #ifndef NDEBUG |
1255 | 2 | dict_value_column->sanity_check(); |
1256 | 2 | #endif |
1257 | 2 | size_t dict_value_column_size = dict_value_column->size(); |
1258 | 2 | DCHECK(has_dict); |
1259 | | // 2. Build a temp block from the dict string column, then execute conjuncts and filter block. |
1260 | | // 2.1 Build a temp block from the dict string column to match the conjuncts executing. |
1261 | 2 | Block temp_block; |
1262 | 2 | int dict_pos = -1; |
1263 | 2 | int index = 0; |
1264 | 4 | for (const auto slot_desc : _tuple_descriptor->slots()) { |
1265 | 4 | if (slot_desc->id() == slot_id) { |
1266 | 2 | auto data_type = slot_desc->get_data_type_ptr(); |
1267 | 2 | if (data_type->is_nullable()) { |
1268 | 0 | temp_block.insert( |
1269 | 0 | {ColumnNullable::create( |
1270 | 0 | std::move( |
1271 | 0 | dict_value_column), // NOLINT(bugprone-use-after-move) |
1272 | 0 | ColumnUInt8::create(dict_value_column_size, 0)), |
1273 | 0 | std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()), |
1274 | 0 | ""}); |
1275 | 2 | } else { |
1276 | 2 | temp_block.insert( |
1277 | 2 | {std::move(dict_value_column), std::make_shared<DataTypeString>(), ""}); |
1278 | 2 | } |
1279 | 2 | dict_pos = index; |
1280 | | |
1281 | 2 | } else { |
1282 | 2 | temp_block.insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), |
1283 | 2 | slot_desc->get_data_type_ptr(), |
1284 | 2 | slot_desc->col_name())); |
1285 | 2 | } |
1286 | 4 | ++index; |
1287 | 4 | } |
1288 | | |
1289 | | // 2.2 Execute conjuncts. |
1290 | 2 | VExprContextSPtrs ctxs; |
1291 | 2 | auto iter = _slot_id_to_filter_conjuncts->find(slot_id); |
1292 | 2 | if (iter != _slot_id_to_filter_conjuncts->end()) { |
1293 | 2 | for (auto& ctx : iter->second) { |
1294 | 2 | ctxs.push_back(ctx); |
1295 | 2 | } |
1296 | 2 | } else { |
1297 | 0 | std::stringstream msg; |
1298 | 0 | msg << "_slot_id_to_filter_conjuncts: slot_id [" << slot_id << "] not found"; |
1299 | 0 | return Status::NotFound(msg.str()); |
1300 | 0 | } |
1301 | | |
1302 | 2 | if (dict_pos != 0) { |
1303 | | // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0 |
1304 | | // The following process may be tricky and time-consuming, but we have no other way. |
1305 | 0 | temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size); |
1306 | 0 | } |
1307 | 2 | IColumn::Filter result_filter(temp_block.rows(), 1); |
1308 | 2 | bool can_filter_all; |
1309 | 2 | { |
1310 | 2 | RETURN_IF_ERROR(VExprContext::execute_conjuncts(ctxs, nullptr, &temp_block, |
1311 | 2 | &result_filter, &can_filter_all)); |
1312 | 2 | } |
1313 | 2 | if (dict_pos != 0) { |
1314 | | // We have to clean the first column to insert right data. |
1315 | 0 | temp_block.get_by_position(0).column->assume_mutable()->clear(); |
1316 | 0 | } |
1317 | | |
1318 | | // If can_filter_all = true, can filter this row group. |
1319 | 2 | if (can_filter_all) { |
1320 | 2 | _is_row_group_filtered = true; |
1321 | 2 | return Status::OK(); |
1322 | 2 | } |
1323 | | |
1324 | | // 3. Get dict codes. |
1325 | 0 | std::vector<int32_t> dict_codes; |
1326 | 0 | for (size_t i = 0; i < result_filter.size(); ++i) { |
1327 | 0 | if (result_filter[i]) { |
1328 | 0 | dict_codes.emplace_back(i); |
1329 | 0 | } |
1330 | 0 | } |
1331 | | |
1332 | | // About Performance: if dict_column size is too large, it will generate a large IN filter. |
1333 | 0 | if (dict_codes.size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) { |
1334 | 0 | it = _dict_filter_cols.erase(it); |
1335 | 0 | for (auto& ctx : ctxs) { |
1336 | 0 | _filter_conjuncts.push_back(ctx); |
1337 | 0 | } |
1338 | 0 | continue; |
1339 | 0 | } |
1340 | | |
1341 | | // 4. Rewrite conjuncts. |
1342 | 0 | RETURN_IF_ERROR(_rewrite_dict_conjuncts( |
1343 | 0 | dict_codes, slot_id, temp_block.get_by_position(dict_pos).column->is_nullable())); |
1344 | 0 | ++it; |
1345 | 0 | } |
1346 | 4 | return Status::OK(); |
1347 | 6 | } |
1348 | | |
1349 | | Status RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id, |
1350 | 0 | bool is_nullable) { |
1351 | 0 | VExprSPtr root; |
1352 | 0 | if (dict_codes.size() == 1) { |
1353 | 0 | { |
1354 | 0 | TFunction fn; |
1355 | 0 | TFunctionName fn_name; |
1356 | 0 | fn_name.__set_db_name(""); |
1357 | 0 | fn_name.__set_function_name("eq"); |
1358 | 0 | fn.__set_name(fn_name); |
1359 | 0 | fn.__set_binary_type(TFunctionBinaryType::BUILTIN); |
1360 | 0 | std::vector<TTypeDesc> arg_types; |
1361 | 0 | arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT)); |
1362 | 0 | arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT)); |
1363 | 0 | fn.__set_arg_types(arg_types); |
1364 | 0 | fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); |
1365 | 0 | fn.__set_has_var_args(false); |
1366 | |
|
1367 | 0 | TExprNode texpr_node; |
1368 | 0 | texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); |
1369 | 0 | texpr_node.__set_node_type(TExprNodeType::BINARY_PRED); |
1370 | 0 | texpr_node.__set_opcode(TExprOpcode::EQ); |
1371 | 0 | texpr_node.__set_fn(fn); |
1372 | 0 | texpr_node.__set_num_children(2); |
1373 | 0 | texpr_node.__set_is_nullable(is_nullable); |
1374 | 0 | root = VectorizedFnCall::create_shared(texpr_node); |
1375 | 0 | } |
1376 | 0 | { |
1377 | 0 | SlotDescriptor* slot = nullptr; |
1378 | 0 | const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots(); |
1379 | 0 | for (auto each : slots) { |
1380 | 0 | if (each->id() == slot_id) { |
1381 | 0 | slot = each; |
1382 | 0 | break; |
1383 | 0 | } |
1384 | 0 | } |
1385 | 0 | root->add_child(VSlotRef::create_shared(slot)); |
1386 | 0 | } |
1387 | 0 | { |
1388 | 0 | TExprNode texpr_node; |
1389 | 0 | texpr_node.__set_node_type(TExprNodeType::INT_LITERAL); |
1390 | 0 | texpr_node.__set_type(create_type_desc(TYPE_INT)); |
1391 | 0 | TIntLiteral int_literal; |
1392 | 0 | int_literal.__set_value(dict_codes[0]); |
1393 | 0 | texpr_node.__set_int_literal(int_literal); |
1394 | 0 | texpr_node.__set_is_nullable(is_nullable); |
1395 | 0 | root->add_child(VLiteral::create_shared(texpr_node)); |
1396 | 0 | } |
1397 | 0 | } else { |
1398 | 0 | { |
1399 | 0 | TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN); |
1400 | 0 | TExprNode node; |
1401 | 0 | node.__set_type(type_desc); |
1402 | 0 | node.__set_node_type(TExprNodeType::IN_PRED); |
1403 | 0 | node.in_predicate.__set_is_not_in(false); |
1404 | 0 | node.__set_opcode(TExprOpcode::FILTER_IN); |
1405 | | // VdirectInPredicate assume is_nullable = false. |
1406 | 0 | node.__set_is_nullable(false); |
1407 | |
|
1408 | 0 | std::shared_ptr<HybridSetBase> hybrid_set( |
1409 | 0 | create_set(PrimitiveType::TYPE_INT, dict_codes.size(), false)); |
1410 | 0 | for (int j = 0; j < dict_codes.size(); ++j) { |
1411 | 0 | hybrid_set->insert(&dict_codes[j]); |
1412 | 0 | } |
1413 | 0 | root = VDirectInPredicate::create_shared(node, hybrid_set); |
1414 | 0 | } |
1415 | 0 | { |
1416 | 0 | SlotDescriptor* slot = nullptr; |
1417 | 0 | const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots(); |
1418 | 0 | for (auto each : slots) { |
1419 | 0 | if (each->id() == slot_id) { |
1420 | 0 | slot = each; |
1421 | 0 | break; |
1422 | 0 | } |
1423 | 0 | } |
1424 | 0 | root->add_child(VSlotRef::create_shared(slot)); |
1425 | 0 | } |
1426 | 0 | } |
1427 | 0 | VExprContextSPtr rewritten_conjunct_ctx = VExprContext::create_shared(root); |
1428 | 0 | RETURN_IF_ERROR(rewritten_conjunct_ctx->prepare(_state, *_row_descriptor)); |
1429 | 0 | RETURN_IF_ERROR(rewritten_conjunct_ctx->open(_state)); |
1430 | 0 | _dict_filter_conjuncts.push_back(rewritten_conjunct_ctx); |
1431 | 0 | _filter_conjuncts.push_back(rewritten_conjunct_ctx); |
1432 | 0 | return Status::OK(); |
1433 | 0 | } |
1434 | | |
1435 | 44 | Status RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { |
1436 | 44 | for (auto& dict_filter_cols : _dict_filter_cols) { |
1437 | 0 | if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) { |
1438 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
1439 | 0 | "Wrong read column '{}' in parquet file, block: {}", |
1440 | 0 | dict_filter_cols.first, block->dump_structure()); |
1441 | 0 | } |
1442 | 0 | ColumnWithTypeAndName& column_with_type_and_name = |
1443 | 0 | block->get_by_position((*_col_name_to_block_idx)[dict_filter_cols.first]); |
1444 | 0 | const ColumnPtr& column = column_with_type_and_name.column; |
1445 | 0 | if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) { |
1446 | 0 | const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); |
1447 | 0 | const auto* dict_column = assert_cast<const ColumnInt32*>(nested_column.get()); |
1448 | 0 | DCHECK(dict_column); |
1449 | |
|
1450 | 0 | auto string_column = DORIS_TRY( |
1451 | 0 | _column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column( |
1452 | 0 | dict_column)); |
1453 | |
|
1454 | 0 | column_with_type_and_name.type = |
1455 | 0 | std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); |
1456 | 0 | block->replace_by_position( |
1457 | 0 | (*_col_name_to_block_idx)[dict_filter_cols.first], |
1458 | 0 | ColumnNullable::create(std::move(string_column), |
1459 | 0 | nullable_column->get_null_map_column_ptr())); |
1460 | 0 | } else { |
1461 | 0 | const auto* dict_column = assert_cast<const ColumnInt32*>(column.get()); |
1462 | 0 | auto string_column = DORIS_TRY( |
1463 | 0 | _column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column( |
1464 | 0 | dict_column)); |
1465 | |
|
1466 | 0 | column_with_type_and_name.type = std::make_shared<DataTypeString>(); |
1467 | 0 | block->replace_by_position((*_col_name_to_block_idx)[dict_filter_cols.first], |
1468 | 0 | std::move(string_column)); |
1469 | 0 | } |
1470 | 0 | } |
1471 | 44 | return Status::OK(); |
1472 | 44 | } |
1473 | | |
1474 | 37 | ParquetColumnReader::ColumnStatistics RowGroupReader::merged_column_statistics() { |
1475 | 37 | ParquetColumnReader::ColumnStatistics st; |
1476 | 106 | for (auto& reader : _column_readers) { |
1477 | 106 | auto ost = reader.second->column_statistics(); |
1478 | 106 | st.merge(ost); |
1479 | 106 | } |
1480 | 37 | return st; |
1481 | 37 | } |
1482 | | #include "common/compile_check_end.h" |
1483 | | |
1484 | | } // namespace doris |