Coverage Report

Created: 2026-03-15 01:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/vparquet_group_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/vparquet_group_reader.h"
19
20
#include <gen_cpp/Exprs_types.h>
21
#include <gen_cpp/Opcodes_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/parquet_types.h>
24
#include <string.h>
25
26
#include <algorithm>
27
#include <boost/iterator/iterator_facade.hpp>
28
#include <memory>
29
#include <ostream>
30
31
#include "common/config.h"
32
#include "common/logging.h"
33
#include "common/object_pool.h"
34
#include "common/status.h"
35
#include "core/assert_cast.h"
36
#include "core/block/block.h"
37
#include "core/block/column_with_type_and_name.h"
38
#include "core/column/column_const.h"
39
#include "core/column/column_nullable.h"
40
#include "core/column/column_string.h"
41
#include "core/column/column_vector.h"
42
#include "core/custom_allocator.h"
43
#include "core/data_type/data_type.h"
44
#include "core/data_type/data_type_string.h"
45
#include "core/data_type/define_primitive_type.h"
46
#include "core/pod_array.h"
47
#include "core/types.h"
48
#include "exprs/create_predicate_function.h"
49
#include "exprs/hybrid_set.h"
50
#include "exprs/vdirect_in_predicate.h"
51
#include "exprs/vectorized_fn_call.h"
52
#include "exprs/vexpr.h"
53
#include "exprs/vexpr_context.h"
54
#include "exprs/vliteral.h"
55
#include "exprs/vslot_ref.h"
56
#include "format/parquet/schema_desc.h"
57
#include "format/parquet/vparquet_column_reader.h"
58
#include "runtime/descriptors.h"
59
#include "runtime/runtime_state.h"
60
#include "runtime/thread_context.h"
61
#include "storage/segment/column_reader.h"
62
63
namespace cctz {
64
class time_zone;
65
} // namespace cctz
66
namespace doris {
67
class RuntimeState;
68
69
namespace io {
70
struct IOContext;
71
} // namespace io
72
} // namespace doris
73
74
namespace doris {
75
#include "common/compile_check_begin.h"
76
const std::vector<int64_t> RowGroupReader::NO_DELETE = {};
77
static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE = std::numeric_limits<uint32_t>::max();
78
79
RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader,
80
                               const std::vector<std::string>& read_columns,
81
                               const int32_t row_group_id, const tparquet::RowGroup& row_group,
82
                               const cctz::time_zone* ctz, io::IOContext* io_ctx,
83
                               const PositionDeleteContext& position_delete_ctx,
84
                               const LazyReadContext& lazy_read_ctx, RuntimeState* state,
85
                               const std::set<uint64_t>& column_ids,
86
                               const std::set<uint64_t>& filter_column_ids)
87
35
        : _file_reader(file_reader),
88
35
          _read_table_columns(read_columns),
89
35
          _row_group_id(row_group_id),
90
35
          _row_group_meta(row_group),
91
35
          _remaining_rows(row_group.num_rows),
92
35
          _ctz(ctz),
93
35
          _io_ctx(io_ctx),
94
35
          _position_delete_ctx(position_delete_ctx),
95
35
          _lazy_read_ctx(lazy_read_ctx),
96
35
          _state(state),
97
35
          _obj_pool(new ObjectPool()),
98
35
          _column_ids(column_ids),
99
35
          _filter_column_ids(filter_column_ids) {}
100
101
35
RowGroupReader::~RowGroupReader() {
102
35
    _column_readers.clear();
103
35
    _obj_pool->clear();
104
35
}
105
106
Status RowGroupReader::init(
107
        const FieldDescriptor& schema, RowRanges& row_ranges,
108
        std::unordered_map<int, tparquet::OffsetIndex>& col_offsets,
109
        const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
110
        const std::unordered_map<std::string, int>* colname_to_slot_id,
111
        const VExprContextSPtrs* not_single_slot_filter_conjuncts,
112
35
        const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
113
35
    _tuple_descriptor = tuple_descriptor;
114
35
    _row_descriptor = row_descriptor;
115
35
    _col_name_to_slot_id = colname_to_slot_id;
116
35
    _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
117
35
    _read_ranges = row_ranges;
118
35
    _remaining_rows = row_ranges.count();
119
120
35
    if (_read_table_columns.empty()) {
121
        // Query task that only select columns in path.
122
1
        return Status::OK();
123
1
    }
124
34
    const size_t MAX_GROUP_BUF_SIZE = config::parquet_rowgroup_max_buffer_mb << 20;
125
34
    const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 20;
126
34
    size_t max_buf_size =
127
34
            std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / _read_table_columns.size());
128
104
    for (const auto& read_table_col : _read_table_columns) {
129
104
        auto read_file_col = _table_info_node_ptr->children_file_column_name(read_table_col);
130
104
        auto* field = schema.get_column(read_file_col);
131
104
        std::unique_ptr<ParquetColumnReader> reader;
132
104
        RETURN_IF_ERROR(ParquetColumnReader::create(
133
104
                _file_reader, field, _row_group_meta, _read_ranges, _ctz, _io_ctx, reader,
134
104
                max_buf_size, col_offsets, _state, false, _column_ids, _filter_column_ids));
135
104
        if (reader == nullptr) {
136
0
            VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader failed";
137
0
            return Status::Corruption("Init row group reader failed");
138
0
        }
139
104
        _column_readers[read_table_col] = std::move(reader);
140
104
    }
141
142
34
    bool disable_dict_filter = false;
143
34
    if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) {
144
0
        disable_dict_filter = true;
145
0
        _filter_conjuncts.insert(_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(),
146
0
                                 not_single_slot_filter_conjuncts->end());
147
0
    }
148
149
    // Check if single slot can be filtered by dict.
150
34
    if (_slot_id_to_filter_conjuncts && !_slot_id_to_filter_conjuncts->empty()) {
151
6
        const std::vector<std::string>& predicate_col_names =
152
6
                _lazy_read_ctx.predicate_columns.first;
153
6
        const std::vector<int>& predicate_col_slot_ids = _lazy_read_ctx.predicate_columns.second;
154
14
        for (size_t i = 0; i < predicate_col_names.size(); ++i) {
155
8
            const std::string& predicate_col_name = predicate_col_names[i];
156
8
            int slot_id = predicate_col_slot_ids[i];
157
8
            auto predicate_file_col_name =
158
8
                    _table_info_node_ptr->children_file_column_name(predicate_col_name);
159
8
            auto field = schema.get_column(predicate_file_col_name);
160
8
            if (!disable_dict_filter && !_lazy_read_ctx.has_complex_type &&
161
8
                _can_filter_by_dict(
162
8
                        slot_id, _row_group_meta.columns[field->physical_column_index].meta_data)) {
163
2
                _dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id));
164
6
            } else {
165
6
                if (_slot_id_to_filter_conjuncts->find(slot_id) !=
166
6
                    _slot_id_to_filter_conjuncts->end()) {
167
6
                    for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
168
6
                        _filter_conjuncts.push_back(ctx);
169
6
                    }
170
6
                }
171
6
            }
172
8
        }
173
        // Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts)
174
        // to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts.
175
6
        for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
176
4
            auto& [value, slot_desc] = kv.second;
177
4
            auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
178
4
            if (iter != _slot_id_to_filter_conjuncts->end()) {
179
4
                for (auto& ctx : iter->second) {
180
4
                    _filter_conjuncts.push_back(ctx);
181
4
                }
182
4
            }
183
4
        }
184
        //For check missing column :   missing column == xx, missing column is null,missing column is not null.
185
6
        _filter_conjuncts.insert(_filter_conjuncts.end(),
186
6
                                 _lazy_read_ctx.missing_columns_conjuncts.begin(),
187
6
                                 _lazy_read_ctx.missing_columns_conjuncts.end());
188
6
        RETURN_IF_ERROR(_rewrite_dict_predicates());
189
6
    }
190
    // _state is nullptr in some ut.
191
34
    if (_state && _state->enable_adjust_conjunct_order_by_cost()) {
192
8
        std::ranges::sort(_filter_conjuncts, [](const auto& a, const auto& b) {
193
8
            return a->execute_cost() < b->execute_cost();
194
8
        });
195
8
    }
196
34
    return Status::OK();
197
34
}
198
199
bool RowGroupReader::_can_filter_by_dict(int slot_id,
200
8
                                         const tparquet::ColumnMetaData& column_metadata) {
201
8
    SlotDescriptor* slot = nullptr;
202
8
    const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
203
14
    for (auto each : slots) {
204
14
        if (each->id() == slot_id) {
205
8
            slot = each;
206
8
            break;
207
8
        }
208
14
    }
209
8
    if (!is_string_type(slot->type()->get_primitive_type()) &&
210
8
        !is_var_len_object(slot->type()->get_primitive_type())) {
211
6
        return false;
212
6
    }
213
2
    if (column_metadata.type != tparquet::Type::BYTE_ARRAY) {
214
0
        return false;
215
0
    }
216
217
2
    if (!is_dictionary_encoded(column_metadata)) {
218
0
        return false;
219
0
    }
220
221
2
    if (_slot_id_to_filter_conjuncts->find(slot_id) == _slot_id_to_filter_conjuncts->end()) {
222
0
        return false;
223
0
    }
224
225
    // TODO: The current implementation of dictionary filtering does not take into account
226
    //  the implementation of NULL values because the dictionary itself does not contain
227
    //  NULL value encoding. As a result, many NULL-related functions or expressions
228
    //  cannot work properly, such as is null, is not null, coalesce, etc.
229
    //  Here we check if the predicate expr is IN or BINARY_PRED.
230
    //  Implementation of NULL value dictionary filtering will be carried out later.
231
2
    return std::ranges::all_of(_slot_id_to_filter_conjuncts->at(slot_id), [&](const auto& ctx) {
232
2
        return (ctx->root()->node_type() == TExprNodeType::IN_PRED ||
233
2
                ctx->root()->node_type() == TExprNodeType::BINARY_PRED) &&
234
2
               ctx->root()->children()[0]->node_type() == TExprNodeType::SLOT_REF;
235
2
    });
236
2
}
237
238
// This function is copied from
239
// https://github.com/apache/impala/blob/master/be/src/exec/parquet/hdfs-parquet-scanner.cc#L1717
240
2
bool RowGroupReader::is_dictionary_encoded(const tparquet::ColumnMetaData& column_metadata) {
241
    // The Parquet spec allows for column chunks to have mixed encodings
242
    // where some data pages are dictionary-encoded and others are plain
243
    // encoded. For example, a Parquet file writer might start writing
244
    // a column chunk as dictionary encoded, but it will switch to plain
245
    // encoding if the dictionary grows too large.
246
    //
247
    // In order for dictionary filters to skip the entire row group,
248
    // the conjuncts must be evaluated on column chunks that are entirely
249
    // encoded with the dictionary encoding. There are two checks
250
    // available to verify this:
251
    // 1. The encoding_stats field on the column chunk metadata provides
252
    //    information about the number of data pages written in each
253
    //    format. This allows for a specific check of whether all the
254
    //    data pages are dictionary encoded.
255
    // 2. The encodings field on the column chunk metadata lists the
256
    //    encodings used. If this list contains the dictionary encoding
257
    //    and does not include unexpected encodings (i.e. encodings not
258
    //    associated with definition/repetition levels), then it is entirely
259
    //    dictionary encoded.
260
2
    if (column_metadata.__isset.encoding_stats) {
261
        // Condition #1 above
262
4
        for (const tparquet::PageEncodingStats& enc_stat : column_metadata.encoding_stats) {
263
4
            if (enc_stat.page_type == tparquet::PageType::DATA_PAGE &&
264
4
                (enc_stat.encoding != tparquet::Encoding::PLAIN_DICTIONARY &&
265
2
                 enc_stat.encoding != tparquet::Encoding::RLE_DICTIONARY) &&
266
4
                enc_stat.count > 0) {
267
0
                return false;
268
0
            }
269
4
        }
270
2
    } else {
271
        // Condition #2 above
272
0
        bool has_dict_encoding = false;
273
0
        bool has_nondict_encoding = false;
274
0
        for (const tparquet::Encoding::type& encoding : column_metadata.encodings) {
275
0
            if (encoding == tparquet::Encoding::PLAIN_DICTIONARY ||
276
0
                encoding == tparquet::Encoding::RLE_DICTIONARY) {
277
0
                has_dict_encoding = true;
278
0
            }
279
280
            // RLE and BIT_PACKED are used for repetition/definition levels
281
0
            if (encoding != tparquet::Encoding::PLAIN_DICTIONARY &&
282
0
                encoding != tparquet::Encoding::RLE_DICTIONARY &&
283
0
                encoding != tparquet::Encoding::RLE && encoding != tparquet::Encoding::BIT_PACKED) {
284
0
                has_nondict_encoding = true;
285
0
                break;
286
0
            }
287
0
        }
288
        // Not entirely dictionary encoded if:
289
        // 1. No dictionary encoding listed
290
        // OR
291
        // 2. Some non-dictionary encoding is listed
292
0
        if (!has_dict_encoding || has_nondict_encoding) {
293
0
            return false;
294
0
        }
295
0
    }
296
297
2
    return true;
298
2
}
299
300
Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_rows,
301
47
                                  bool* batch_eof) {
302
47
    if (_is_row_group_filtered) {
303
2
        *read_rows = 0;
304
2
        *batch_eof = true;
305
2
        return Status::OK();
306
2
    }
307
308
    // Process external table query task that select columns are all from path.
309
45
    if (_read_table_columns.empty()) {
310
3
        bool modify_row_ids = false;
311
3
        RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof, &modify_row_ids));
312
313
3
        RETURN_IF_ERROR(
314
3
                _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns));
315
3
        RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns));
316
317
3
        RETURN_IF_ERROR(_fill_row_id_columns(block, *read_rows, modify_row_ids));
318
319
3
        Status st = VExprContext::filter_block(_lazy_read_ctx.conjuncts, block, block->columns());
320
3
        *read_rows = block->rows();
321
3
        return st;
322
3
    }
323
42
    if (_lazy_read_ctx.can_lazy_read) {
324
        // call _do_lazy_read recursively when current batch is skipped
325
4
        return _do_lazy_read(block, batch_size, read_rows, batch_eof);
326
38
    } else {
327
38
        FilterMap filter_map;
328
38
        RETURN_IF_ERROR((_read_column_data(block, _lazy_read_ctx.all_read_columns, batch_size,
329
38
                                           read_rows, batch_eof, filter_map)));
330
38
        RETURN_IF_ERROR(
331
38
                _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns));
332
38
        RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns));
333
38
        RETURN_IF_ERROR(_fill_row_id_columns(block, *read_rows, false));
334
335
38
#ifndef NDEBUG
336
123
        for (auto col : *block) {
337
123
            col.column->sanity_check();
338
123
            DCHECK(block->rows() == col.column->size())
339
0
                    << absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
340
0
                                        block->rows(), col.column->size(), col.name);
341
123
        }
342
38
#endif
343
344
38
        if (block->rows() == 0) {
345
0
            RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
346
0
            *read_rows = block->rows();
347
0
#ifndef NDEBUG
348
0
            for (auto col : *block) {
349
0
                col.column->sanity_check();
350
0
                DCHECK(block->rows() == col.column->size())
351
0
                        << absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
352
0
                                            block->rows(), col.column->size(), col.name);
353
0
            }
354
0
#endif
355
0
            return Status::OK();
356
0
        }
357
38
        {
358
38
            SCOPED_RAW_TIMER(&_predicate_filter_time);
359
38
            RETURN_IF_ERROR(_build_pos_delete_filter(*read_rows));
360
361
38
            std::vector<uint32_t> columns_to_filter;
362
38
            int column_to_keep = block->columns();
363
38
            columns_to_filter.resize(column_to_keep);
364
161
            for (uint32_t i = 0; i < column_to_keep; ++i) {
365
123
                columns_to_filter[i] = i;
366
123
            }
367
38
            if (!_lazy_read_ctx.conjuncts.empty()) {
368
6
                std::vector<IColumn::Filter*> filters;
369
6
                if (_position_delete_ctx.has_filter) {
370
0
                    filters.push_back(_pos_delete_filter_ptr.get());
371
0
                }
372
6
                IColumn::Filter result_filter(block->rows(), 1);
373
6
                bool can_filter_all = false;
374
375
6
                {
376
6
                    RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
377
6
                            _filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
378
6
                }
379
380
6
                if (can_filter_all) {
381
9
                    for (auto& col : columns_to_filter) {
382
9
                        std::move(*block->get_by_position(col).column).assume_mutable()->clear();
383
9
                    }
384
3
                    Block::erase_useless_column(block, column_to_keep);
385
3
                    RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
386
3
                    return Status::OK();
387
3
                }
388
389
3
                RETURN_IF_CATCH_EXCEPTION(
390
3
                        Block::filter_block_internal(block, columns_to_filter, result_filter));
391
3
                Block::erase_useless_column(block, column_to_keep);
392
32
            } else {
393
32
                RETURN_IF_CATCH_EXCEPTION(
394
32
                        RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter)));
395
32
            }
396
35
            RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
397
35
        }
398
35
#ifndef NDEBUG
399
114
        for (auto col : *block) {
400
114
            col.column->sanity_check();
401
114
            DCHECK(block->rows() == col.column->size())
402
0
                    << absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
403
0
                                        block->rows(), col.column->size(), col.name);
404
114
        }
405
35
#endif
406
35
        *read_rows = block->rows();
407
35
        return Status::OK();
408
35
    }
409
42
}
410
411
Status RowGroupReader::_read_column_data(Block* block,
412
                                         const std::vector<std::string>& table_columns,
413
                                         size_t batch_size, size_t* read_rows, bool* batch_eof,
414
47
                                         FilterMap& filter_map) {
415
47
    size_t batch_read_rows = 0;
416
47
    bool has_eof = false;
417
121
    for (auto& read_col_name : table_columns) {
418
121
        auto& column_with_type_and_name =
419
121
                block->safe_get_by_position((*_col_name_to_block_idx)[read_col_name]);
420
121
        auto& column_ptr = column_with_type_and_name.column;
421
121
        auto& column_type = column_with_type_and_name.type;
422
121
        bool is_dict_filter = false;
423
121
        for (auto& _dict_filter_col : _dict_filter_cols) {
424
0
            if (_dict_filter_col.first == read_col_name) {
425
0
                MutableColumnPtr dict_column = ColumnInt32::create();
426
0
                if (!_col_name_to_block_idx->contains(read_col_name)) {
427
0
                    return Status::InternalError(
428
0
                            "Wrong read column '{}' in parquet file, block: {}", read_col_name,
429
0
                            block->dump_structure());
430
0
                }
431
0
                if (column_type->is_nullable()) {
432
0
                    block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type =
433
0
                            std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
434
0
                    block->replace_by_position(
435
0
                            (*_col_name_to_block_idx)[read_col_name],
436
0
                            ColumnNullable::create(std::move(dict_column),
437
0
                                                   ColumnUInt8::create(dict_column->size(), 0)));
438
0
                } else {
439
0
                    block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type =
440
0
                            std::make_shared<DataTypeInt32>();
441
0
                    block->replace_by_position((*_col_name_to_block_idx)[read_col_name],
442
0
                                               std::move(dict_column));
443
0
                }
444
0
                is_dict_filter = true;
445
0
                break;
446
0
            }
447
0
        }
448
449
121
        size_t col_read_rows = 0;
450
121
        bool col_eof = false;
451
        // Should reset _filter_map_index to 0 when reading next column.
452
        //        select_vector.reset();
453
121
        _column_readers[read_col_name]->reset_filter_map_index();
454
305
        while (!col_eof && col_read_rows < batch_size) {
455
184
            size_t loop_rows = 0;
456
184
            RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data(
457
184
                    column_ptr, column_type, _table_info_node_ptr->get_children_node(read_col_name),
458
184
                    filter_map, batch_size - col_read_rows, &loop_rows, &col_eof, is_dict_filter));
459
184
            VLOG_DEBUG << "[RowGroupReader] column '" << read_col_name
460
0
                       << "' loop_rows=" << loop_rows << " col_read_rows_so_far=" << col_read_rows
461
0
                       << std::endl;
462
184
            col_read_rows += loop_rows;
463
184
        }
464
121
        VLOG_DEBUG << "[RowGroupReader] column '" << read_col_name
465
0
                   << "' read_rows=" << col_read_rows << std::endl;
466
121
        if (batch_read_rows > 0 && batch_read_rows != col_read_rows) {
467
0
            LOG(WARNING) << "[RowGroupReader] Mismatched read rows among parquet columns. "
468
0
                            "previous_batch_read_rows="
469
0
                         << batch_read_rows << ", current_column='" << read_col_name
470
0
                         << "', current_col_read_rows=" << col_read_rows;
471
0
            return Status::Corruption("Can't read the same number of rows among parquet columns");
472
0
        }
473
121
        batch_read_rows = col_read_rows;
474
475
121
#ifndef NDEBUG
476
121
        column_ptr->sanity_check();
477
121
#endif
478
121
        if (col_eof) {
479
99
            has_eof = true;
480
99
        }
481
121
    }
482
483
47
    *read_rows = batch_read_rows;
484
47
    *batch_eof = has_eof;
485
486
47
    return Status::OK();
487
47
}
488
489
Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* read_rows,
490
4
                                     bool* batch_eof) {
491
4
    std::unique_ptr<FilterMap> filter_map_ptr = nullptr;
492
4
    size_t pre_read_rows;
493
4
    bool pre_eof;
494
4
    std::vector<uint32_t> columns_to_filter;
495
4
    uint32_t origin_column_num = block->columns();
496
4
    columns_to_filter.resize(origin_column_num);
497
16
    for (uint32_t i = 0; i < origin_column_num; ++i) {
498
12
        columns_to_filter[i] = i;
499
12
    }
500
4
    IColumn::Filter result_filter;
501
4
    size_t pre_raw_read_rows = 0;
502
6
    while (!_state->is_cancelled()) {
503
        // read predicate columns
504
6
        pre_read_rows = 0;
505
6
        pre_eof = false;
506
6
        FilterMap filter_map;
507
6
        RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns.first, batch_size,
508
6
                                          &pre_read_rows, &pre_eof, filter_map));
509
6
        if (pre_read_rows == 0) {
510
0
            DCHECK_EQ(pre_eof, true);
511
0
            break;
512
0
        }
513
6
        pre_raw_read_rows += pre_read_rows;
514
6
        RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows,
515
6
                                                _lazy_read_ctx.predicate_partition_columns));
516
6
        RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows,
517
6
                                              _lazy_read_ctx.predicate_missing_columns));
518
6
        RETURN_IF_ERROR(_fill_row_id_columns(block, pre_read_rows, false));
519
520
6
        RETURN_IF_ERROR(_build_pos_delete_filter(pre_read_rows));
521
522
6
#ifndef NDEBUG
523
18
        for (auto col : *block) {
524
18
            if (col.column->size() == 0) { // lazy read column.
525
6
                continue;
526
6
            }
527
12
            col.column->sanity_check();
528
12
            DCHECK(pre_read_rows == col.column->size())
529
0
                    << absl::Substitute("pre_read_rows = $0 , column rows = $1, col name = $2",
530
0
                                        pre_read_rows, col.column->size(), col.name);
531
12
        }
532
6
#endif
533
534
6
        bool can_filter_all = false;
535
6
        {
536
6
            SCOPED_RAW_TIMER(&_predicate_filter_time);
537
538
            // generate filter vector
539
6
            if (_lazy_read_ctx.resize_first_column) {
540
                // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
541
                // The following process may be tricky and time-consuming, but we have no other way.
542
6
                block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows);
543
6
            }
544
6
            result_filter.assign(pre_read_rows, static_cast<unsigned char>(1));
545
6
            std::vector<IColumn::Filter*> filters;
546
6
            if (_position_delete_ctx.has_filter) {
547
0
                filters.push_back(_pos_delete_filter_ptr.get());
548
0
            }
549
550
6
            VExprContextSPtrs filter_contexts;
551
12
            for (auto& conjunct : _filter_conjuncts) {
552
12
                filter_contexts.emplace_back(conjunct);
553
12
            }
554
555
6
            {
556
6
                RETURN_IF_ERROR(VExprContext::execute_conjuncts(filter_contexts, &filters, block,
557
6
                                                                &result_filter, &can_filter_all));
558
6
            }
559
560
6
            if (_lazy_read_ctx.resize_first_column) {
561
                // We have to clean the first column to insert right data.
562
6
                block->get_by_position(0).column->assume_mutable()->clear();
563
6
            }
564
6
        }
565
566
0
        const uint8_t* __restrict filter_map_data = result_filter.data();
567
6
        filter_map_ptr = std::make_unique<FilterMap>();
568
6
        RETURN_IF_ERROR(filter_map_ptr->init(filter_map_data, pre_read_rows, can_filter_all));
569
6
        if (filter_map_ptr->filter_all()) {
570
3
            {
571
3
                SCOPED_RAW_TIMER(&_predicate_filter_time);
572
3
                for (const auto& col : _lazy_read_ctx.predicate_columns.first) {
573
                    // clean block to read predicate columns
574
3
                    block->get_by_position((*_col_name_to_block_idx)[col])
575
3
                            .column->assume_mutable()
576
3
                            ->clear();
577
3
                }
578
3
                for (const auto& col : _lazy_read_ctx.predicate_partition_columns) {
579
3
                    block->get_by_position((*_col_name_to_block_idx)[col.first])
580
3
                            .column->assume_mutable()
581
3
                            ->clear();
582
3
                }
583
3
                for (const auto& col : _lazy_read_ctx.predicate_missing_columns) {
584
0
                    block->get_by_position((*_col_name_to_block_idx)[col.first])
585
0
                            .column->assume_mutable()
586
0
                            ->clear();
587
0
                }
588
3
                if (_row_id_column_iterator_pair.first != nullptr) {
589
0
                    block->get_by_position(_row_id_column_iterator_pair.second)
590
0
                            .column->assume_mutable()
591
0
                            ->clear();
592
0
                }
593
3
                Block::erase_useless_column(block, origin_column_num);
594
3
            }
595
596
3
            if (!pre_eof) {
597
                // If continuous batches are skipped, we can cache them to skip a whole page
598
2
                _cached_filtered_rows += pre_read_rows;
599
2
                if (pre_raw_read_rows >= config::doris_scanner_row_num) {
600
0
                    *read_rows = 0;
601
0
                    RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
602
0
                    return Status::OK();
603
0
                }
604
2
            } else { // pre_eof
605
                // If filter_map_ptr->filter_all() and pre_eof, we can skip whole row group.
606
1
                *read_rows = 0;
607
1
                *batch_eof = true;
608
1
                _lazy_read_filtered_rows += (pre_read_rows + _cached_filtered_rows);
609
1
                RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
610
1
                return Status::OK();
611
1
            }
612
3
        } else {
613
3
            break;
614
3
        }
615
6
    }
616
3
    if (_state->is_cancelled()) {
617
0
        return Status::Cancelled("cancelled");
618
0
    }
619
620
3
    if (filter_map_ptr == nullptr) {
621
0
        DCHECK_EQ(pre_read_rows + _cached_filtered_rows, 0);
622
0
        *read_rows = 0;
623
0
        *batch_eof = true;
624
0
        return Status::OK();
625
0
    }
626
627
3
    FilterMap& filter_map = *filter_map_ptr;
628
3
    DorisUniqueBufferPtr<uint8_t> rebuild_filter_map = nullptr;
629
3
    if (_cached_filtered_rows != 0) {
630
0
        RETURN_IF_ERROR(_rebuild_filter_map(filter_map, rebuild_filter_map, pre_read_rows));
631
0
        pre_read_rows += _cached_filtered_rows;
632
0
        _cached_filtered_rows = 0;
633
0
    }
634
635
    // lazy read columns
636
3
    size_t lazy_read_rows;
637
3
    bool lazy_eof;
638
3
    RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.lazy_read_columns, pre_read_rows,
639
3
                                      &lazy_read_rows, &lazy_eof, filter_map));
640
641
3
    if (pre_read_rows != lazy_read_rows) {
642
0
        return Status::Corruption("Can't read the same number of rows when doing lazy read");
643
0
    }
644
    // pre_eof ^ lazy_eof
645
    // we set pre_read_rows as batch_size for lazy read columns, so pre_eof != lazy_eof
646
647
    // filter data in predicate columns, and remove filter column
648
3
    {
649
3
        SCOPED_RAW_TIMER(&_predicate_filter_time);
650
3
        if (filter_map.has_filter()) {
651
0
            RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
652
0
                    block, _lazy_read_ctx.all_predicate_col_ids, result_filter));
653
0
            Block::erase_useless_column(block, origin_column_num);
654
655
3
        } else {
656
3
            Block::erase_useless_column(block, origin_column_num);
657
3
        }
658
3
    }
659
660
3
    RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
661
662
3
    size_t column_num = block->columns();
663
3
    size_t column_size = 0;
664
12
    for (int i = 0; i < column_num; ++i) {
665
9
        size_t cz = block->get_by_position(i).column->size();
666
9
        if (column_size != 0 && cz != 0) {
667
6
            DCHECK_EQ(column_size, cz);
668
6
        }
669
9
        if (cz != 0) {
670
9
            column_size = cz;
671
9
        }
672
9
    }
673
3
    _lazy_read_filtered_rows += pre_read_rows - column_size;
674
3
    *read_rows = column_size;
675
676
3
    *batch_eof = pre_eof;
677
3
    RETURN_IF_ERROR(_fill_partition_columns(block, column_size, _lazy_read_ctx.partition_columns));
678
3
    RETURN_IF_ERROR(_fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns));
679
3
#ifndef NDEBUG
680
9
    for (auto col : *block) {
681
9
        col.column->sanity_check();
682
9
        DCHECK(block->rows() == col.column->size())
683
0
                << absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
684
0
                                    block->rows(), col.column->size(), col.name);
685
9
    }
686
3
#endif
687
3
    return Status::OK();
688
3
}
689
690
Status RowGroupReader::_rebuild_filter_map(FilterMap& filter_map,
691
                                           DorisUniqueBufferPtr<uint8_t>& filter_map_data,
692
0
                                           size_t pre_read_rows) const {
693
0
    if (_cached_filtered_rows == 0) {
694
0
        return Status::OK();
695
0
    }
696
0
    size_t total_rows = _cached_filtered_rows + pre_read_rows;
697
0
    if (filter_map.filter_all()) {
698
0
        RETURN_IF_ERROR(filter_map.init(nullptr, total_rows, true));
699
0
        return Status::OK();
700
0
    }
701
702
0
    filter_map_data = make_unique_buffer<uint8_t>(total_rows);
703
0
    auto* map = filter_map_data.get();
704
0
    for (size_t i = 0; i < _cached_filtered_rows; ++i) {
705
0
        map[i] = 0;
706
0
    }
707
0
    const uint8_t* old_map = filter_map.filter_map_data();
708
0
    if (old_map == nullptr) {
709
        // select_vector.filter_all() == true is already built.
710
0
        for (size_t i = _cached_filtered_rows; i < total_rows; ++i) {
711
0
            map[i] = 1;
712
0
        }
713
0
    } else {
714
0
        memcpy(map + _cached_filtered_rows, old_map, pre_read_rows);
715
0
    }
716
0
    RETURN_IF_ERROR(filter_map.init(map, total_rows, false));
717
0
    return Status::OK();
718
0
}
719
720
Status RowGroupReader::_fill_partition_columns(
721
        Block* block, size_t rows,
722
        const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
723
50
                partition_columns) {
724
50
    DataTypeSerDe::FormatOptions _text_formatOptions;
725
50
    for (const auto& kv : partition_columns) {
726
15
        auto doris_column = block->get_by_position((*_col_name_to_block_idx)[kv.first]).column;
727
        // obtained from block*, it is a mutable object.
728
15
        auto* col_ptr = const_cast<IColumn*>(doris_column.get());
729
15
        const auto& [value, slot_desc] = kv.second;
730
15
        auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
731
15
        Slice slice(value.data(), value.size());
732
15
        uint64_t num_deserialized = 0;
733
        // Be careful when reading empty rows from parquet row groups.
734
15
        if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows,
735
15
                                                            &num_deserialized,
736
15
                                                            _text_formatOptions) != Status::OK()) {
737
0
            return Status::InternalError("Failed to fill partition column: {}={}",
738
0
                                         slot_desc->col_name(), value);
739
0
        }
740
15
        if (num_deserialized != rows) {
741
0
            return Status::InternalError(
742
0
                    "Failed to fill partition column: {}={} ."
743
0
                    "Number of rows expected to be written : {}, number of rows actually written : "
744
0
                    "{}",
745
0
                    slot_desc->col_name(), value, num_deserialized, rows);
746
0
        }
747
15
    }
748
50
    return Status::OK();
749
50
}
750
751
Status RowGroupReader::_fill_missing_columns(
752
        Block* block, size_t rows,
753
50
        const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
754
50
    for (const auto& kv : missing_columns) {
755
0
        if (!_col_name_to_block_idx->contains(kv.first)) {
756
0
            return Status::InternalError("Missing column: {} not found in block {}", kv.first,
757
0
                                         block->dump_structure());
758
0
        }
759
0
        if (kv.second == nullptr) {
760
            // no default column, fill with null
761
0
            auto mutable_column = block->get_by_position((*_col_name_to_block_idx)[kv.first])
762
0
                                          .column->assume_mutable();
763
0
            auto* nullable_column = assert_cast<ColumnNullable*>(mutable_column.get());
764
0
            nullable_column->insert_many_defaults(rows);
765
0
        } else {
766
            // fill with default value
767
0
            const auto& ctx = kv.second;
768
0
            ColumnPtr result_column_ptr;
769
            // PT1 => dest primitive type
770
0
            RETURN_IF_ERROR(ctx->execute(block, result_column_ptr));
771
0
            if (result_column_ptr->use_count() == 1) {
772
                // call resize because the first column of _src_block_ptr may not be filled by reader,
773
                // so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
774
                // has only one row.
775
0
                auto mutable_column = result_column_ptr->assume_mutable();
776
0
                mutable_column->resize(rows);
777
                // result_column_ptr maybe a ColumnConst, convert it to a normal column
778
0
                result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
779
0
                auto origin_column_type =
780
0
                        block->get_by_position((*_col_name_to_block_idx)[kv.first]).type;
781
0
                bool is_nullable = origin_column_type->is_nullable();
782
0
                block->replace_by_position(
783
0
                        (*_col_name_to_block_idx)[kv.first],
784
0
                        is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
785
0
            }
786
0
        }
787
0
    }
788
50
    return Status::OK();
789
50
}
790
791
Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof,
792
3
                                         bool* modify_row_ids) {
793
3
    *modify_row_ids = false;
794
3
    if (_position_delete_ctx.has_filter) {
795
0
        int64_t start_row_id = _position_delete_ctx.current_row_id;
796
0
        int64_t end_row_id = std::min(_position_delete_ctx.current_row_id + (int64_t)batch_size,
797
0
                                      _position_delete_ctx.last_row_id);
798
0
        int64_t num_delete_rows = 0;
799
0
        auto before_index = _position_delete_ctx.index;
800
0
        while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
801
0
            const int64_t& delete_row_id =
802
0
                    _position_delete_ctx.delete_rows[_position_delete_ctx.index];
803
0
            if (delete_row_id < start_row_id) {
804
0
                _position_delete_ctx.index++;
805
0
                before_index = _position_delete_ctx.index;
806
0
            } else if (delete_row_id < end_row_id) {
807
0
                num_delete_rows++;
808
0
                _position_delete_ctx.index++;
809
0
            } else { // delete_row_id >= end_row_id
810
0
                break;
811
0
            }
812
0
        }
813
0
        *read_rows = end_row_id - start_row_id - num_delete_rows;
814
0
        _position_delete_ctx.current_row_id = end_row_id;
815
0
        *batch_eof = _position_delete_ctx.current_row_id == _position_delete_ctx.last_row_id;
816
817
0
        if (_row_id_column_iterator_pair.first != nullptr) {
818
0
            *modify_row_ids = true;
819
0
            _current_batch_row_ids.clear();
820
0
            _current_batch_row_ids.resize(*read_rows);
821
0
            size_t idx = 0;
822
0
            for (auto id = start_row_id; id < end_row_id; id++) {
823
0
                if (before_index < _position_delete_ctx.index &&
824
0
                    id == _position_delete_ctx.delete_rows[before_index]) {
825
0
                    before_index++;
826
0
                    continue;
827
0
                }
828
0
                _current_batch_row_ids[idx++] = (rowid_t)id;
829
0
            }
830
0
        }
831
3
    } else {
832
3
        if (batch_size < _remaining_rows) {
833
2
            *read_rows = batch_size;
834
2
            _remaining_rows -= batch_size;
835
2
            *batch_eof = false;
836
2
        } else {
837
1
            *read_rows = _remaining_rows;
838
1
            _remaining_rows = 0;
839
1
            *batch_eof = true;
840
1
        }
841
3
    }
842
3
    _total_read_rows += *read_rows;
843
3
    return Status::OK();
844
3
}
845
846
5
Status RowGroupReader::_get_current_batch_row_id(size_t read_rows) {
847
5
    _current_batch_row_ids.clear();
848
5
    _current_batch_row_ids.resize(read_rows);
849
850
5
    int64_t idx = 0;
851
5
    int64_t read_range_rows = 0;
852
19
    for (size_t range_idx = 0; range_idx < _read_ranges.range_size(); range_idx++) {
853
14
        auto range = _read_ranges.get_range(range_idx);
854
14
        if (read_rows == 0) {
855
0
            break;
856
0
        }
857
14
        if (read_range_rows + (range.to() - range.from()) > _total_read_rows) {
858
14
            int64_t fi =
859
14
                    std::max(_total_read_rows, read_range_rows) - read_range_rows + range.from();
860
14
            size_t len = std::min(read_rows, (size_t)(std::max(range.to(), fi) - fi));
861
862
14
            read_rows -= len;
863
864
28
            for (auto i = 0; i < len; i++) {
865
14
                _current_batch_row_ids[idx++] =
866
14
                        (rowid_t)(fi + i + _current_row_group_idx.first_row);
867
14
            }
868
14
        }
869
14
        read_range_rows += range.to() - range.from();
870
14
    }
871
5
    return Status::OK();
872
5
}
873
874
Status RowGroupReader::_fill_row_id_columns(Block* block, size_t read_rows,
875
47
                                            bool is_current_row_ids) {
876
47
    if (_row_id_column_iterator_pair.first != nullptr) {
877
5
        if (!is_current_row_ids) {
878
5
            RETURN_IF_ERROR(_get_current_batch_row_id(read_rows));
879
5
        }
880
5
        auto col = block->get_by_position(_row_id_column_iterator_pair.second)
881
5
                           .column->assume_mutable();
882
5
        RETURN_IF_ERROR(_row_id_column_iterator_pair.first->read_by_rowids(
883
5
                _current_batch_row_ids.data(), _current_batch_row_ids.size(), col));
884
5
    }
885
886
47
    return Status::OK();
887
47
}
888
889
44
Status RowGroupReader::_build_pos_delete_filter(size_t read_rows) {
890
44
    if (!_position_delete_ctx.has_filter) {
891
44
        _pos_delete_filter_ptr.reset(nullptr);
892
44
        _total_read_rows += read_rows;
893
44
        return Status::OK();
894
44
    }
895
0
    _pos_delete_filter_ptr.reset(new IColumn::Filter(read_rows, 1));
896
0
    auto* __restrict _pos_delete_filter_data = _pos_delete_filter_ptr->data();
897
0
    while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
898
0
        const int64_t delete_row_index_in_row_group =
899
0
                _position_delete_ctx.delete_rows[_position_delete_ctx.index] -
900
0
                _position_delete_ctx.first_row_id;
901
0
        int64_t read_range_rows = 0;
902
0
        size_t remaining_read_rows = _total_read_rows + read_rows;
903
0
        for (size_t range_idx = 0; range_idx < _read_ranges.range_size(); range_idx++) {
904
0
            auto range = _read_ranges.get_range(range_idx);
905
0
            if (delete_row_index_in_row_group < range.from()) {
906
0
                ++_position_delete_ctx.index;
907
0
                break;
908
0
            } else if (delete_row_index_in_row_group < range.to()) {
909
0
                int64_t index = (delete_row_index_in_row_group - range.from()) + read_range_rows -
910
0
                                _total_read_rows;
911
0
                if (index > read_rows - 1) {
912
0
                    _total_read_rows += read_rows;
913
0
                    return Status::OK();
914
0
                }
915
0
                _pos_delete_filter_data[index] = 0;
916
0
                ++_position_delete_ctx.index;
917
0
                break;
918
0
            } else { // delete_row >= range.last_row
919
0
            }
920
921
0
            int64_t range_size = range.to() - range.from();
922
            // Don't search next range when there is no remaining_read_rows.
923
0
            if (remaining_read_rows <= range_size) {
924
0
                _total_read_rows += read_rows;
925
0
                return Status::OK();
926
0
            } else {
927
0
                remaining_read_rows -= range_size;
928
0
                read_range_rows += range_size;
929
0
            }
930
0
        }
931
0
    }
932
0
    _total_read_rows += read_rows;
933
0
    return Status::OK();
934
0
}
935
936
// need exception safety
937
Status RowGroupReader::_filter_block(Block* block, int column_to_keep,
938
32
                                     const std::vector<uint32_t>& columns_to_filter) {
939
32
    if (_pos_delete_filter_ptr) {
940
0
        RETURN_IF_CATCH_EXCEPTION(
941
0
                Block::filter_block_internal(block, columns_to_filter, (*_pos_delete_filter_ptr)));
942
0
    }
943
32
    Block::erase_useless_column(block, column_to_keep);
944
945
32
    return Status::OK();
946
32
}
947
948
6
Status RowGroupReader::_rewrite_dict_predicates() {
949
6
    SCOPED_RAW_TIMER(&_dict_filter_rewrite_time);
950
6
    for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) {
951
2
        std::string& dict_filter_col_name = it->first;
952
2
        int slot_id = it->second;
953
        // 1. Get dictionary values to a string column.
954
2
        MutableColumnPtr dict_value_column = ColumnString::create();
955
2
        bool has_dict = false;
956
2
        RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->read_dict_values_to_column(
957
2
                dict_value_column, &has_dict));
958
2
#ifndef NDEBUG
959
2
        dict_value_column->sanity_check();
960
2
#endif
961
2
        size_t dict_value_column_size = dict_value_column->size();
962
2
        DCHECK(has_dict);
963
        // 2. Build a temp block from the dict string column, then execute conjuncts and filter block.
964
        // 2.1 Build a temp block from the dict string column to match the conjuncts executing.
965
2
        Block temp_block;
966
2
        int dict_pos = -1;
967
2
        int index = 0;
968
4
        for (const auto slot_desc : _tuple_descriptor->slots()) {
969
4
            if (slot_desc->id() == slot_id) {
970
2
                auto data_type = slot_desc->get_data_type_ptr();
971
2
                if (data_type->is_nullable()) {
972
0
                    temp_block.insert(
973
0
                            {ColumnNullable::create(
974
0
                                     std::move(
975
0
                                             dict_value_column), // NOLINT(bugprone-use-after-move)
976
0
                                     ColumnUInt8::create(dict_value_column_size, 0)),
977
0
                             std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()),
978
0
                             ""});
979
2
                } else {
980
2
                    temp_block.insert(
981
2
                            {std::move(dict_value_column), std::make_shared<DataTypeString>(), ""});
982
2
                }
983
2
                dict_pos = index;
984
985
2
            } else {
986
2
                temp_block.insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
987
2
                                                        slot_desc->get_data_type_ptr(),
988
2
                                                        slot_desc->col_name()));
989
2
            }
990
4
            ++index;
991
4
        }
992
993
        // 2.2 Execute conjuncts.
994
2
        VExprContextSPtrs ctxs;
995
2
        auto iter = _slot_id_to_filter_conjuncts->find(slot_id);
996
2
        if (iter != _slot_id_to_filter_conjuncts->end()) {
997
2
            for (auto& ctx : iter->second) {
998
2
                ctxs.push_back(ctx);
999
2
            }
1000
2
        } else {
1001
0
            std::stringstream msg;
1002
0
            msg << "_slot_id_to_filter_conjuncts: slot_id [" << slot_id << "] not found";
1003
0
            return Status::NotFound(msg.str());
1004
0
        }
1005
1006
2
        if (dict_pos != 0) {
1007
            // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
1008
            // The following process may be tricky and time-consuming, but we have no other way.
1009
0
            temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
1010
0
        }
1011
2
        IColumn::Filter result_filter(temp_block.rows(), 1);
1012
2
        bool can_filter_all;
1013
2
        {
1014
2
            RETURN_IF_ERROR(VExprContext::execute_conjuncts(ctxs, nullptr, &temp_block,
1015
2
                                                            &result_filter, &can_filter_all));
1016
2
        }
1017
2
        if (dict_pos != 0) {
1018
            // We have to clean the first column to insert right data.
1019
0
            temp_block.get_by_position(0).column->assume_mutable()->clear();
1020
0
        }
1021
1022
        // If can_filter_all = true, can filter this row group.
1023
2
        if (can_filter_all) {
1024
2
            _is_row_group_filtered = true;
1025
2
            return Status::OK();
1026
2
        }
1027
1028
        // 3. Get dict codes.
1029
0
        std::vector<int32_t> dict_codes;
1030
0
        for (size_t i = 0; i < result_filter.size(); ++i) {
1031
0
            if (result_filter[i]) {
1032
0
                dict_codes.emplace_back(i);
1033
0
            }
1034
0
        }
1035
1036
        // About Performance: if dict_column size is too large, it will generate a large IN filter.
1037
0
        if (dict_codes.size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
1038
0
            it = _dict_filter_cols.erase(it);
1039
0
            for (auto& ctx : ctxs) {
1040
0
                _filter_conjuncts.push_back(ctx);
1041
0
            }
1042
0
            continue;
1043
0
        }
1044
1045
        // 4. Rewrite conjuncts.
1046
0
        RETURN_IF_ERROR(_rewrite_dict_conjuncts(
1047
0
                dict_codes, slot_id, temp_block.get_by_position(dict_pos).column->is_nullable()));
1048
0
        ++it;
1049
0
    }
1050
4
    return Status::OK();
1051
6
}
1052
1053
Status RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id,
1054
0
                                               bool is_nullable) {
1055
0
    VExprSPtr root;
1056
0
    if (dict_codes.size() == 1) {
1057
0
        {
1058
0
            TFunction fn;
1059
0
            TFunctionName fn_name;
1060
0
            fn_name.__set_db_name("");
1061
0
            fn_name.__set_function_name("eq");
1062
0
            fn.__set_name(fn_name);
1063
0
            fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
1064
0
            std::vector<TTypeDesc> arg_types;
1065
0
            arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
1066
0
            arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
1067
0
            fn.__set_arg_types(arg_types);
1068
0
            fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
1069
0
            fn.__set_has_var_args(false);
1070
1071
0
            TExprNode texpr_node;
1072
0
            texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
1073
0
            texpr_node.__set_node_type(TExprNodeType::BINARY_PRED);
1074
0
            texpr_node.__set_opcode(TExprOpcode::EQ);
1075
0
            texpr_node.__set_fn(fn);
1076
0
            texpr_node.__set_num_children(2);
1077
0
            texpr_node.__set_is_nullable(is_nullable);
1078
0
            root = VectorizedFnCall::create_shared(texpr_node);
1079
0
        }
1080
0
        {
1081
0
            SlotDescriptor* slot = nullptr;
1082
0
            const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
1083
0
            for (auto each : slots) {
1084
0
                if (each->id() == slot_id) {
1085
0
                    slot = each;
1086
0
                    break;
1087
0
                }
1088
0
            }
1089
0
            root->add_child(VSlotRef::create_shared(slot));
1090
0
        }
1091
0
        {
1092
0
            TExprNode texpr_node;
1093
0
            texpr_node.__set_node_type(TExprNodeType::INT_LITERAL);
1094
0
            texpr_node.__set_type(create_type_desc(TYPE_INT));
1095
0
            TIntLiteral int_literal;
1096
0
            int_literal.__set_value(dict_codes[0]);
1097
0
            texpr_node.__set_int_literal(int_literal);
1098
0
            texpr_node.__set_is_nullable(is_nullable);
1099
0
            root->add_child(VLiteral::create_shared(texpr_node));
1100
0
        }
1101
0
    } else {
1102
0
        {
1103
0
            TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
1104
0
            TExprNode node;
1105
0
            node.__set_type(type_desc);
1106
0
            node.__set_node_type(TExprNodeType::IN_PRED);
1107
0
            node.in_predicate.__set_is_not_in(false);
1108
0
            node.__set_opcode(TExprOpcode::FILTER_IN);
1109
            // VdirectInPredicate assume is_nullable = false.
1110
0
            node.__set_is_nullable(false);
1111
1112
0
            std::shared_ptr<HybridSetBase> hybrid_set(
1113
0
                    create_set(PrimitiveType::TYPE_INT, dict_codes.size(), false));
1114
0
            for (int j = 0; j < dict_codes.size(); ++j) {
1115
0
                hybrid_set->insert(&dict_codes[j]);
1116
0
            }
1117
0
            root = VDirectInPredicate::create_shared(node, hybrid_set);
1118
0
        }
1119
0
        {
1120
0
            SlotDescriptor* slot = nullptr;
1121
0
            const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
1122
0
            for (auto each : slots) {
1123
0
                if (each->id() == slot_id) {
1124
0
                    slot = each;
1125
0
                    break;
1126
0
                }
1127
0
            }
1128
0
            root->add_child(VSlotRef::create_shared(slot));
1129
0
        }
1130
0
    }
1131
0
    VExprContextSPtr rewritten_conjunct_ctx = VExprContext::create_shared(root);
1132
0
    RETURN_IF_ERROR(rewritten_conjunct_ctx->prepare(_state, *_row_descriptor));
1133
0
    RETURN_IF_ERROR(rewritten_conjunct_ctx->open(_state));
1134
0
    _dict_filter_conjuncts.push_back(rewritten_conjunct_ctx);
1135
0
    _filter_conjuncts.push_back(rewritten_conjunct_ctx);
1136
0
    return Status::OK();
1137
0
}
1138
1139
42
Status RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
1140
42
    for (auto& dict_filter_cols : _dict_filter_cols) {
1141
0
        if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
1142
0
            throw Exception(ErrorCode::INTERNAL_ERROR,
1143
0
                            "Wrong read column '{}' in parquet file, block: {}",
1144
0
                            dict_filter_cols.first, block->dump_structure());
1145
0
        }
1146
0
        ColumnWithTypeAndName& column_with_type_and_name =
1147
0
                block->get_by_position((*_col_name_to_block_idx)[dict_filter_cols.first]);
1148
0
        const ColumnPtr& column = column_with_type_and_name.column;
1149
0
        if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
1150
0
            const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr();
1151
0
            const auto* dict_column = assert_cast<const ColumnInt32*>(nested_column.get());
1152
0
            DCHECK(dict_column);
1153
1154
0
            auto string_column = DORIS_TRY(
1155
0
                    _column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
1156
0
                            dict_column));
1157
1158
0
            column_with_type_and_name.type =
1159
0
                    std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
1160
0
            block->replace_by_position(
1161
0
                    (*_col_name_to_block_idx)[dict_filter_cols.first],
1162
0
                    ColumnNullable::create(std::move(string_column),
1163
0
                                           nullable_column->get_null_map_column_ptr()));
1164
0
        } else {
1165
0
            const auto* dict_column = assert_cast<const ColumnInt32*>(column.get());
1166
0
            auto string_column = DORIS_TRY(
1167
0
                    _column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
1168
0
                            dict_column));
1169
1170
0
            column_with_type_and_name.type = std::make_shared<DataTypeString>();
1171
0
            block->replace_by_position((*_col_name_to_block_idx)[dict_filter_cols.first],
1172
0
                                       std::move(string_column));
1173
0
        }
1174
0
    }
1175
42
    return Status::OK();
1176
42
}
1177
1178
35
ParquetColumnReader::ColumnStatistics RowGroupReader::merged_column_statistics() {
1179
35
    ParquetColumnReader::ColumnStatistics st;
1180
104
    for (auto& reader : _column_readers) {
1181
104
        auto ost = reader.second->column_statistics();
1182
104
        st.merge(ost);
1183
104
    }
1184
35
    return st;
1185
35
}
1186
#include "common/compile_check_end.h"
1187
1188
} // namespace doris