Coverage Report

Created: 2026-06-04 13:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/vparquet_group_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/vparquet_group_reader.h"
19
20
#include <gen_cpp/Exprs_types.h>
21
#include <gen_cpp/Opcodes_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/parquet_types.h>
24
#include <string.h>
25
26
#include <algorithm>
27
#include <boost/iterator/iterator_facade.hpp>
28
#include <memory>
29
#include <numeric>
30
#include <ostream>
31
32
#include "common/config.h"
33
#include "common/logging.h"
34
#include "common/object_pool.h"
35
#include "common/status.h"
36
#include "core/assert_cast.h"
37
#include "core/block/block.h"
38
#include "core/block/column_with_type_and_name.h"
39
#include "core/column/column.h"
40
#include "core/column/column_const.h"
41
#include "core/column/column_nullable.h"
42
#include "core/column/column_string.h"
43
#include "core/column/column_vector.h"
44
#include "core/custom_allocator.h"
45
#include "core/data_type/data_type.h"
46
#include "core/data_type/data_type_string.h"
47
#include "core/data_type/define_primitive_type.h"
48
#include "core/pod_array.h"
49
#include "core/types.h"
50
#include "exprs/create_predicate_function.h"
51
#include "exprs/hybrid_set.h"
52
#include "exprs/vdirect_in_predicate.h"
53
#include "exprs/vectorized_fn_call.h"
54
#include "exprs/vexpr.h"
55
#include "exprs/vexpr_context.h"
56
#include "exprs/vliteral.h"
57
#include "exprs/vslot_ref.h"
58
#include "format/parquet/schema_desc.h"
59
#include "format/parquet/vparquet_column_reader.h"
60
#include "format/table/iceberg_reader.h"
61
#include "runtime/descriptors.h"
62
#include "runtime/runtime_state.h"
63
#include "runtime/thread_context.h"
64
#include "storage/segment/column_reader.h"
65
66
namespace cctz {
67
class time_zone;
68
} // namespace cctz
69
namespace doris {
70
class RuntimeState;
71
72
namespace io {
73
struct IOContext;
74
} // namespace io
75
} // namespace doris
76
77
namespace doris {
78
79
const std::vector<int64_t> RowGroupReader::NO_DELETE = {};
80
static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE = std::numeric_limits<uint32_t>::max();
81
82
RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader,
83
                               const std::vector<std::string>& read_columns,
84
                               const int32_t row_group_id, const tparquet::RowGroup& row_group,
85
                               const cctz::time_zone* ctz, io::IOContext* io_ctx,
86
                               const PositionDeleteContext& position_delete_ctx,
87
                               const LazyReadContext& lazy_read_ctx, RuntimeState* state,
88
                               const std::set<uint64_t>& column_ids,
89
                               const std::set<uint64_t>& filter_column_ids)
90
39
        : _file_reader(file_reader),
91
39
          _read_table_columns(read_columns),
92
39
          _row_group_id(row_group_id),
93
39
          _row_group_meta(row_group),
94
39
          _remaining_rows(row_group.num_rows),
95
39
          _ctz(ctz),
96
39
          _io_ctx(io_ctx),
97
39
          _position_delete_ctx(position_delete_ctx),
98
39
          _lazy_read_ctx(lazy_read_ctx),
99
39
          _state(state),
100
39
          _obj_pool(new ObjectPool()),
101
39
          _column_ids(column_ids),
102
39
          _filter_column_ids(filter_column_ids) {}
103
104
39
RowGroupReader::~RowGroupReader() {
105
39
    if (_obj_pool != nullptr) {
106
39
        _obj_pool->clear();
107
39
    }
108
39
}
109
110
Status RowGroupReader::init(
111
        const FieldDescriptor& schema, RowRanges& row_ranges,
112
        std::unordered_map<int, tparquet::OffsetIndex>& col_offsets,
113
        const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
114
        const std::unordered_map<std::string, int>* colname_to_slot_id,
115
        const VExprContextSPtrs* not_single_slot_filter_conjuncts,
116
38
        const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
117
38
    _tuple_descriptor = tuple_descriptor;
118
38
    _row_descriptor = row_descriptor;
119
38
    _col_name_to_slot_id = colname_to_slot_id;
120
38
    _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
121
38
    _read_ranges = row_ranges;
122
38
    _filter_read_ranges_by_condition_cache();
123
38
    _remaining_rows = _read_ranges.count();
124
125
38
    if (_read_table_columns.empty()) {
126
        // Query task that only select columns in path.
127
1
        return Status::OK();
128
1
    }
129
37
    const size_t MAX_GROUP_BUF_SIZE = config::parquet_rowgroup_max_buffer_mb << 20;
130
37
    const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 20;
131
37
    size_t max_buf_size =
132
37
            std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / _read_table_columns.size());
133
108
    for (const auto& read_table_col : _read_table_columns) {
134
108
        auto read_file_col = _table_info_node_ptr->children_file_column_name(read_table_col);
135
108
        auto* field = schema.get_column(read_file_col);
136
108
        std::unique_ptr<ParquetColumnReader> reader;
137
108
        RETURN_IF_ERROR(ParquetColumnReader::create(
138
108
                _file_reader, field, _row_group_meta, _read_ranges, _ctz, _io_ctx, reader,
139
108
                max_buf_size, col_offsets, _state, false, _column_ids, _filter_column_ids));
140
108
        if (reader == nullptr) {
141
0
            VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader failed";
142
0
            return Status::Corruption("Init row group reader failed");
143
0
        }
144
108
        _column_readers[read_table_col] = std::move(reader);
145
108
    }
146
147
37
    bool disable_dict_filter = false;
148
37
    if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) {
149
0
        disable_dict_filter = true;
150
0
        _filter_conjuncts.insert(_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(),
151
0
                                 not_single_slot_filter_conjuncts->end());
152
0
    }
153
154
    // Check if single slot can be filtered by dict.
155
37
    if (_slot_id_to_filter_conjuncts && !_slot_id_to_filter_conjuncts->empty()) {
156
6
        const std::vector<std::string>& predicate_col_names =
157
6
                _lazy_read_ctx.predicate_columns.first;
158
6
        const std::vector<int>& predicate_col_slot_ids = _lazy_read_ctx.predicate_columns.second;
159
14
        for (size_t i = 0; i < predicate_col_names.size(); ++i) {
160
8
            const std::string& predicate_col_name = predicate_col_names[i];
161
8
            int slot_id = predicate_col_slot_ids[i];
162
163
8
            if (!_table_format_reader->has_column_optimization(
164
8
                        predicate_col_name,
165
8
                        TableFormatReader::ColumnOptimizationTypes::DICT_FILTER)) {
166
                // Row-lineage style generated columns cannot participate in dict filtering.
167
0
                if (_slot_id_to_filter_conjuncts->find(slot_id) !=
168
0
                    _slot_id_to_filter_conjuncts->end()) {
169
0
                    for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
170
0
                        _filter_conjuncts.push_back(ctx);
171
0
                    }
172
0
                }
173
0
                continue;
174
0
            }
175
176
8
            auto predicate_file_col_name =
177
8
                    _table_info_node_ptr->children_file_column_name(predicate_col_name);
178
8
            auto field = schema.get_column(predicate_file_col_name);
179
8
            if (!disable_dict_filter && !_lazy_read_ctx.has_complex_type &&
180
8
                _can_filter_by_dict(
181
8
                        slot_id, _row_group_meta.columns[field->physical_column_index].meta_data)) {
182
2
                _dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id));
183
6
            } else {
184
6
                if (_slot_id_to_filter_conjuncts->find(slot_id) !=
185
6
                    _slot_id_to_filter_conjuncts->end()) {
186
6
                    for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
187
6
                        _filter_conjuncts.push_back(ctx);
188
6
                    }
189
6
                }
190
6
            }
191
8
        }
192
        // Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts)
193
        // to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts.
194
6
        for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
195
4
            auto& [value, slot_desc] = kv.second;
196
4
            auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
197
4
            if (iter != _slot_id_to_filter_conjuncts->end()) {
198
4
                for (auto& ctx : iter->second) {
199
4
                    _filter_conjuncts.push_back(ctx);
200
4
                }
201
4
            }
202
4
        }
203
        //For check missing column :   missing column == xx, missing column is null,missing column is not null.
204
6
        _filter_conjuncts.insert(_filter_conjuncts.end(),
205
6
                                 _lazy_read_ctx.missing_columns_conjuncts.begin(),
206
6
                                 _lazy_read_ctx.missing_columns_conjuncts.end());
207
6
        RETURN_IF_ERROR(_rewrite_dict_predicates());
208
6
    }
209
    // _state is nullptr in some ut.
210
37
    if (_state && _state->enable_adjust_conjunct_order_by_cost()) {
211
8
        std::ranges::sort(_filter_conjuncts, [](const auto& a, const auto& b) {
212
8
            return a->execute_cost() < b->execute_cost();
213
8
        });
214
8
    }
215
37
    return Status::OK();
216
37
}
217
218
bool RowGroupReader::_can_filter_by_dict(int slot_id,
219
8
                                         const tparquet::ColumnMetaData& column_metadata) {
220
8
    SlotDescriptor* slot = nullptr;
221
8
    const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
222
14
    for (auto each : slots) {
223
14
        if (each->id() == slot_id) {
224
8
            slot = each;
225
8
            break;
226
8
        }
227
14
    }
228
8
    if (!is_string_type(slot->type()->get_primitive_type()) &&
229
8
        !is_var_len_object(slot->type()->get_primitive_type())) {
230
6
        return false;
231
6
    }
232
2
    if (column_metadata.type != tparquet::Type::BYTE_ARRAY) {
233
0
        return false;
234
0
    }
235
236
2
    if (!is_dictionary_encoded(column_metadata)) {
237
0
        return false;
238
0
    }
239
240
2
    if (_slot_id_to_filter_conjuncts->find(slot_id) == _slot_id_to_filter_conjuncts->end()) {
241
0
        return false;
242
0
    }
243
244
    // TODO: The current implementation of dictionary filtering does not take into account
245
    //  the implementation of NULL values because the dictionary itself does not contain
246
    //  NULL value encoding. As a result, many NULL-related functions or expressions
247
    //  cannot work properly, such as is null, is not null, coalesce, etc.
248
    //  Here we check if the predicate expr is IN or BINARY_PRED.
249
    //  Implementation of NULL value dictionary filtering will be carried out later.
250
2
    return std::ranges::all_of(_slot_id_to_filter_conjuncts->at(slot_id), [&](const auto& ctx) {
251
2
        return (ctx->root()->node_type() == TExprNodeType::IN_PRED ||
252
2
                ctx->root()->node_type() == TExprNodeType::BINARY_PRED) &&
253
2
               ctx->root()->children()[0]->node_type() == TExprNodeType::SLOT_REF;
254
2
    });
255
2
}
256
257
// This function is copied from
258
// https://github.com/apache/impala/blob/master/be/src/exec/parquet/hdfs-parquet-scanner.cc#L1717
259
3
bool RowGroupReader::is_dictionary_encoded(const tparquet::ColumnMetaData& column_metadata) {
260
    // The Parquet spec allows for column chunks to have mixed encodings
261
    // where some data pages are dictionary-encoded and others are plain
262
    // encoded. For example, a Parquet file writer might start writing
263
    // a column chunk as dictionary encoded, but it will switch to plain
264
    // encoding if the dictionary grows too large.
265
    //
266
    // In order for dictionary filters to skip the entire row group,
267
    // the conjuncts must be evaluated on column chunks that are entirely
268
    // encoded with the dictionary encoding. There are two checks
269
    // available to verify this:
270
    // 1. The encoding_stats field on the column chunk metadata provides
271
    //    information about the number of data pages written in each
272
    //    format. This allows for a specific check of whether all the
273
    //    data pages are dictionary encoded.
274
    // 2. The encodings field on the column chunk metadata lists the
275
    //    encodings used. If this list contains the dictionary encoding
276
    //    and does not include unexpected encodings (i.e. encodings not
277
    //    associated with definition/repetition levels), then it is entirely
278
    //    dictionary encoded.
279
3
    if (column_metadata.__isset.encoding_stats) {
280
        // Condition #1 above
281
6
        for (const tparquet::PageEncodingStats& enc_stat : column_metadata.encoding_stats) {
282
6
            if ((enc_stat.page_type == tparquet::PageType::DATA_PAGE ||
283
6
                 enc_stat.page_type == tparquet::PageType::DATA_PAGE_V2) &&
284
6
                (enc_stat.encoding != tparquet::Encoding::PLAIN_DICTIONARY &&
285
4
                 enc_stat.encoding != tparquet::Encoding::RLE_DICTIONARY) &&
286
6
                enc_stat.count > 0) {
287
1
                return false;
288
1
            }
289
6
        }
290
3
    } else {
291
        // Condition #2 above
292
0
        bool has_dict_encoding = false;
293
0
        bool has_nondict_encoding = false;
294
0
        for (const tparquet::Encoding::type& encoding : column_metadata.encodings) {
295
0
            if (encoding == tparquet::Encoding::PLAIN_DICTIONARY ||
296
0
                encoding == tparquet::Encoding::RLE_DICTIONARY) {
297
0
                has_dict_encoding = true;
298
0
            }
299
300
            // RLE and BIT_PACKED are used for repetition/definition levels
301
0
            if (encoding != tparquet::Encoding::PLAIN_DICTIONARY &&
302
0
                encoding != tparquet::Encoding::RLE_DICTIONARY &&
303
0
                encoding != tparquet::Encoding::RLE && encoding != tparquet::Encoding::BIT_PACKED) {
304
0
                has_nondict_encoding = true;
305
0
                break;
306
0
            }
307
0
        }
308
        // Not entirely dictionary encoded if:
309
        // 1. No dictionary encoding listed
310
        // OR
311
        // 2. Some non-dictionary encoding is listed
312
0
        if (!has_dict_encoding || has_nondict_encoding) {
313
0
            return false;
314
0
        }
315
0
    }
316
317
2
    return true;
318
3
}
319
320
Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_rows,
321
98
                                  bool* batch_eof) {
322
98
    if (_is_row_group_filtered) {
323
2
        *read_rows = 0;
324
2
        *batch_eof = true;
325
2
        return Status::OK();
326
2
    }
327
328
    // Process external table query task that select columns are all from path.
329
96
    if (_read_table_columns.empty()) {
330
11
        int64_t batch_base_row = _total_read_rows;
331
11
        RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof));
332
333
11
        DCHECK(_table_format_reader);
334
11
        RETURN_IF_ERROR(_table_format_reader->on_fill_partition_columns(
335
11
                block, *read_rows, _lazy_read_ctx.partition_col_names));
336
11
        RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns(
337
11
                block, *read_rows, _lazy_read_ctx.missing_col_names));
338
11
        RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block, *read_rows));
339
11
        RETURN_IF_ERROR(_table_format_reader->fill_generated_columns(block, *read_rows));
340
11
        std::vector<uint32_t> columns_to_filter(block->columns());
341
22
        for (uint32_t i = 0; i < columns_to_filter.size(); ++i) {
342
11
            columns_to_filter[i] = i;
343
11
        }
344
11
        IColumn::Filter result_filter;
345
11
        RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
346
11
                _lazy_read_ctx.conjuncts, block, columns_to_filter, block->columns(),
347
11
                result_filter));
348
11
        _mark_condition_cache_granules(result_filter.data(), *read_rows, batch_base_row);
349
11
        *read_rows = block->rows();
350
11
        return Status::OK();
351
11
    }
352
85
    if (_lazy_read_ctx.can_lazy_read) {
353
        // call _do_lazy_read recursively when current batch is skipped
354
12
        return _do_lazy_read(block, batch_size, read_rows, batch_eof);
355
73
    } else {
356
73
        FilterMap filter_map;
357
73
        int64_t batch_base_row = _total_read_rows;
358
73
        RETURN_IF_ERROR((_read_column_data(block, _lazy_read_ctx.all_read_columns, batch_size,
359
73
                                           read_rows, batch_eof, filter_map)));
360
73
        DCHECK(_table_format_reader);
361
73
        RETURN_IF_ERROR(_table_format_reader->on_fill_partition_columns(
362
73
                block, *read_rows, _lazy_read_ctx.partition_col_names));
363
73
        RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns(
364
73
                block, *read_rows, _lazy_read_ctx.missing_col_names));
365
366
73
        if (_need_current_batch_row_positions()) {
367
5
            RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
368
5
        }
369
73
        RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block, *read_rows));
370
73
        RETURN_IF_ERROR(_table_format_reader->fill_generated_columns(block, *read_rows));
371
372
73
#ifndef NDEBUG
373
207
        for (auto col : *block) {
374
207
            col.column->sanity_check();
375
207
            DCHECK(block->rows() == col.column->size())
376
0
                    << absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
377
0
                                        block->rows(), col.column->size(), col.name);
378
207
        }
379
73
#endif
380
381
73
        if (block->rows() == 0) {
382
0
            RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
383
0
            *read_rows = block->rows();
384
0
#ifndef NDEBUG
385
0
            for (auto col : *block) {
386
0
                col.column->sanity_check();
387
0
                DCHECK(block->rows() == col.column->size())
388
0
                        << absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
389
0
                                            block->rows(), col.column->size(), col.name);
390
0
            }
391
0
#endif
392
0
            return Status::OK();
393
0
        }
394
73
        {
395
73
            SCOPED_RAW_TIMER(&_predicate_filter_time);
396
73
            RETURN_IF_ERROR(_build_pos_delete_filter(*read_rows));
397
398
73
            std::vector<uint32_t> columns_to_filter;
399
73
            int column_to_keep = block->columns();
400
73
            columns_to_filter.resize(column_to_keep);
401
280
            for (uint32_t i = 0; i < column_to_keep; ++i) {
402
207
                columns_to_filter[i] = i;
403
207
            }
404
73
            if (!_lazy_read_ctx.conjuncts.empty()) {
405
22
                std::vector<IColumn::Filter*> filters;
406
22
                if (_position_delete_ctx.has_filter) {
407
0
                    filters.push_back(_pos_delete_filter_ptr.get());
408
0
                }
409
22
                IColumn::Filter result_filter(block->rows(), 1);
410
22
                bool can_filter_all = false;
411
412
22
                {
413
22
                    RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
414
22
                            _filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
415
22
                }
416
417
                // Condition cache MISS: mark granules with surviving rows (non-lazy path)
418
22
                if (!can_filter_all) {
419
11
                    _mark_condition_cache_granules(result_filter.data(), block->rows(),
420
11
                                                   batch_base_row);
421
11
                }
422
423
22
                if (can_filter_all) {
424
11
                    block->clear_column_data(columns_to_filter);
425
11
                    Block::erase_useless_column(block, column_to_keep);
426
11
                    RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
427
11
                    return Status::OK();
428
11
                }
429
430
11
                RETURN_IF_CATCH_EXCEPTION(
431
11
                        Block::filter_block_internal(block, columns_to_filter, result_filter));
432
11
                Block::erase_useless_column(block, column_to_keep);
433
51
            } else {
434
51
                RETURN_IF_CATCH_EXCEPTION(
435
51
                        RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter)));
436
51
            }
437
62
            RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
438
62
        }
439
62
#ifndef NDEBUG
440
174
        for (auto col : *block) {
441
174
            col.column->sanity_check();
442
174
            DCHECK(block->rows() == col.column->size())
443
0
                    << absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
444
0
                                        block->rows(), col.column->size(), col.name);
445
174
        }
446
62
#endif
447
62
        *read_rows = block->rows();
448
62
        return Status::OK();
449
62
    }
450
85
}
451
452
// Maps each batch row to its global parquet file position via _read_ranges, then marks
453
// the corresponding condition cache granule as true if the filter indicates the row survived.
454
// batch_seq_start is the number of rows already read sequentially before this batch
455
// (i.e., _total_read_rows before the batch started).
456
void RowGroupReader::_mark_condition_cache_granules(const uint8_t* filter_data, size_t num_rows,
457
33
                                                    int64_t batch_seq_start) {
458
33
    if (!_condition_cache_ctx || _condition_cache_ctx->is_hit) {
459
33
        return;
460
33
    }
461
0
    auto& cache = *_condition_cache_ctx->filter_result;
462
0
    for (size_t i = 0; i < num_rows; i++) {
463
0
        if (filter_data[i]) {
464
            // row-group-relative position of this row
465
0
            int64_t rg_pos = _read_ranges.get_row_index_by_pos(batch_seq_start + i);
466
            // global row number in the parquet file
467
0
            size_t granule = (_current_row_group_idx.first_row + rg_pos) /
468
0
                             ConditionCacheContext::GRANULE_SIZE;
469
0
            size_t cache_idx = granule - _condition_cache_ctx->base_granule;
470
0
            if (cache_idx < cache.size()) {
471
0
                cache[cache_idx] = true;
472
0
            }
473
0
        }
474
0
    }
475
0
}
476
477
// On condition cache HIT, removes row ranges whose granules have no surviving rows from
478
// _read_ranges BEFORE column readers are created. This makes ParquetColumnReader skip I/O
479
// entirely for false-granule rows — both predicate and lazy columns — via its existing
480
// page/row-skipping infrastructure.
481
38
void RowGroupReader::_filter_read_ranges_by_condition_cache() {
482
38
    if (!_condition_cache_ctx || !_condition_cache_ctx->is_hit) {
483
38
        return;
484
38
    }
485
0
    auto& filter_result = *_condition_cache_ctx->filter_result;
486
0
    if (filter_result.empty()) {
487
0
        return;
488
0
    }
489
490
0
    auto old_row_count = _read_ranges.count();
491
0
    _read_ranges =
492
0
            filter_ranges_by_cache(_read_ranges, filter_result, _current_row_group_idx.first_row,
493
0
                                   _condition_cache_ctx->base_granule);
494
0
    _is_row_group_filtered = _read_ranges.is_empty();
495
0
    _condition_cache_filtered_rows += old_row_count - _read_ranges.count();
496
0
}
497
498
// Filters read_ranges by removing rows whose cache granule is false.
499
//
500
// Cache index i maps to global granule (base_granule + i), which covers global file
501
// rows [(base_granule+i)*GS, (base_granule+i+1)*GS). Since read_ranges uses
502
// row-group-relative indices and first_row is the global position of the row group's
503
// first row, global granule g maps to row-group-relative range:
504
//   [max(0, g*GS - first_row), max(0, (g+1)*GS - first_row))
505
//
506
// We build a RowRanges of all false-granule regions (in row-group-relative coordinates),
507
// then subtract from read_ranges via ranges_exception.
508
//
509
// Granules beyond cache.size() are kept conservatively (assumed true).
510
//
511
// When base_granule > 0, the cache only covers granules starting from base_granule.
512
// This happens when a Parquet file is split across multiple scan ranges and this reader
513
// only processes row groups starting at a non-zero offset in the file.
514
RowRanges RowGroupReader::filter_ranges_by_cache(const RowRanges& read_ranges,
515
                                                 const std::vector<bool>& cache, int64_t first_row,
516
21
                                                 int64_t base_granule) {
517
21
    constexpr int64_t GS = ConditionCacheContext::GRANULE_SIZE;
518
21
    RowRanges filtered_ranges;
519
520
138
    for (size_t i = 0; i < cache.size(); i++) {
521
117
        if (!cache[i]) {
522
64
            int64_t global_granule = base_granule + static_cast<int64_t>(i);
523
64
            int64_t rg_from = std::max(static_cast<int64_t>(0), global_granule * GS - first_row);
524
64
            int64_t rg_to =
525
64
                    std::max(static_cast<int64_t>(0), (global_granule + 1) * GS - first_row);
526
64
            if (rg_from < rg_to) {
527
16
                filtered_ranges.add(RowRange(rg_from, rg_to));
528
16
            }
529
64
        }
530
117
    }
531
532
21
    RowRanges result;
533
21
    RowRanges::ranges_exception(read_ranges, filtered_ranges, &result);
534
21
    return result;
535
21
}
536
537
Status RowGroupReader::_read_column_data(Block* block,
538
                                         const std::vector<std::string>& table_columns,
539
                                         size_t batch_size, size_t* read_rows, bool* batch_eof,
540
106
                                         FilterMap& filter_map) {
541
106
    size_t batch_read_rows = 0;
542
106
    bool has_eof = false;
543
213
    for (auto& read_col_name : table_columns) {
544
213
        uint32_t block_pos = 0;
545
213
        RETURN_IF_ERROR(_get_block_column_pos(*block, read_col_name, &block_pos));
546
213
        auto reader_iter = _column_readers.find(read_col_name);
547
213
        if (reader_iter == _column_readers.end() || reader_iter->second == nullptr) {
548
0
            return Status::InternalError("Column reader for '{}' not found in parquet row group",
549
0
                                         read_col_name);
550
0
        }
551
552
213
        auto& column_with_type_and_name = block->safe_get_by_position(block_pos);
553
213
        auto& column_ptr = column_with_type_and_name.column;
554
213
        auto& column_type = column_with_type_and_name.type;
555
213
        bool is_dict_filter = false;
556
213
        for (auto& _dict_filter_col : _dict_filter_cols) {
557
0
            if (_dict_filter_col.first == read_col_name) {
558
0
                MutableColumnPtr dict_column = ColumnInt32::create();
559
0
                if (column_type->is_nullable()) {
560
0
                    block->get_by_position(block_pos).type =
561
0
                            std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
562
0
                    block->replace_by_position(
563
0
                            block_pos,
564
0
                            ColumnNullable::create(std::move(dict_column),
565
0
                                                   ColumnUInt8::create(dict_column->size(), 0)));
566
0
                } else {
567
0
                    block->get_by_position(block_pos).type = std::make_shared<DataTypeInt32>();
568
0
                    block->replace_by_position(block_pos, std::move(dict_column));
569
0
                }
570
0
                is_dict_filter = true;
571
0
                break;
572
0
            }
573
0
        }
574
575
213
        size_t col_read_rows = 0;
576
213
        bool col_eof = false;
577
        // Should reset _filter_map_index to 0 when reading next column.
578
        //        select_vector.reset();
579
213
        reader_iter->second->reset_filter_map_index();
580
489
        while (!col_eof && col_read_rows < batch_size) {
581
276
            size_t loop_rows = 0;
582
276
            RETURN_IF_ERROR(reader_iter->second->read_column_data(
583
276
                    column_ptr, column_type, _table_info_node_ptr->get_children_node(read_col_name),
584
276
                    filter_map, batch_size - col_read_rows, &loop_rows, &col_eof, is_dict_filter));
585
276
            VLOG_DEBUG << "[RowGroupReader] column '" << read_col_name
586
0
                       << "' loop_rows=" << loop_rows << " col_read_rows_so_far=" << col_read_rows
587
0
                       << std::endl;
588
276
            col_read_rows += loop_rows;
589
276
        }
590
213
        VLOG_DEBUG << "[RowGroupReader] column '" << read_col_name
591
0
                   << "' read_rows=" << col_read_rows << std::endl;
592
213
        if (batch_read_rows > 0 && batch_read_rows != col_read_rows) {
593
0
            LOG(WARNING) << "[RowGroupReader] Mismatched read rows among parquet columns. "
594
0
                            "previous_batch_read_rows="
595
0
                         << batch_read_rows << ", current_column='" << read_col_name
596
0
                         << "', current_col_read_rows=" << col_read_rows;
597
0
            return Status::Corruption("Can't read the same number of rows among parquet columns");
598
0
        }
599
213
        batch_read_rows = col_read_rows;
600
601
213
#ifndef NDEBUG
602
213
        column_ptr->sanity_check();
603
213
#endif
604
213
        if (col_eof) {
605
103
            has_eof = true;
606
103
        }
607
213
    }
608
609
106
    *read_rows = batch_read_rows;
610
106
    *batch_eof = has_eof;
611
612
106
    return Status::OK();
613
106
}
614
615
Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* read_rows,
616
12
                                     bool* batch_eof) {
617
12
    std::unique_ptr<FilterMap> filter_map_ptr = nullptr;
618
12
    size_t pre_read_rows;
619
12
    bool pre_eof;
620
12
    std::vector<uint32_t> columns_to_filter;
621
12
    uint32_t origin_column_num = block->columns();
622
12
    columns_to_filter.resize(origin_column_num);
623
48
    for (uint32_t i = 0; i < origin_column_num; ++i) {
624
36
        columns_to_filter[i] = i;
625
36
    }
626
12
    IColumn::Filter result_filter;
627
12
    size_t pre_raw_read_rows = 0;
628
22
    while (!_state->is_cancelled()) {
629
        // read predicate columns
630
22
        pre_read_rows = 0;
631
22
        pre_eof = false;
632
22
        FilterMap filter_map;
633
22
        int64_t batch_base_row = _total_read_rows;
634
22
        RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns.first, batch_size,
635
22
                                          &pre_read_rows, &pre_eof, filter_map));
636
22
        if (pre_read_rows == 0) {
637
0
            DCHECK_EQ(pre_eof, true);
638
0
            break;
639
0
        }
640
22
        pre_raw_read_rows += pre_read_rows;
641
642
22
        DCHECK(_table_format_reader);
643
22
        RETURN_IF_ERROR(_table_format_reader->on_fill_partition_columns(
644
22
                block, pre_read_rows, _lazy_read_ctx.predicate_partition_col_names));
645
22
        RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns(
646
22
                block, pre_read_rows, _lazy_read_ctx.predicate_missing_col_names));
647
22
        if (_need_current_batch_row_positions()) {
648
0
            RETURN_IF_ERROR(_get_current_batch_row_id(pre_read_rows));
649
0
        }
650
22
        RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block, pre_read_rows));
651
22
        RETURN_IF_ERROR(_table_format_reader->fill_generated_columns(block, pre_read_rows));
652
22
        RETURN_IF_ERROR(_build_pos_delete_filter(pre_read_rows));
653
654
22
#ifndef NDEBUG
655
66
        for (auto col : *block) {
656
66
            if (col.column->size() == 0) { // lazy read column.
657
22
                continue;
658
22
            }
659
44
            col.column->sanity_check();
660
44
            DCHECK(pre_read_rows == col.column->size())
661
0
                    << absl::Substitute("pre_read_rows = $0 , column rows = $1, col name = $2",
662
0
                                        pre_read_rows, col.column->size(), col.name);
663
44
        }
664
22
#endif
665
666
22
        bool can_filter_all = false;
667
22
        {
668
22
            SCOPED_RAW_TIMER(&_predicate_filter_time);
669
670
            // generate filter vector
671
22
            if (_lazy_read_ctx.resize_first_column) {
672
                // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
673
                // The following process may be tricky and time-consuming, but we have no other way.
674
22
                auto column_guard = block->mutate_column_scoped(0);
675
22
                column_guard.mutable_column()->resize(pre_read_rows);
676
22
            }
677
22
            result_filter.assign(pre_read_rows, static_cast<unsigned char>(1));
678
22
            std::vector<IColumn::Filter*> filters;
679
22
            if (_position_delete_ctx.has_filter) {
680
0
                filters.push_back(_pos_delete_filter_ptr.get());
681
0
            }
682
683
22
            VExprContextSPtrs filter_contexts;
684
44
            for (auto& conjunct : _filter_conjuncts) {
685
44
                filter_contexts.emplace_back(conjunct);
686
44
            }
687
688
22
            {
689
22
                RETURN_IF_ERROR(VExprContext::execute_conjuncts(filter_contexts, &filters, block,
690
22
                                                                &result_filter, &can_filter_all));
691
22
            }
692
693
            // Condition cache MISS: mark granules with surviving rows
694
22
            if (!can_filter_all) {
695
11
                _mark_condition_cache_granules(result_filter.data(), pre_read_rows, batch_base_row);
696
11
            }
697
698
22
            if (_lazy_read_ctx.resize_first_column) {
699
                // We have to clean the first column to insert right data.
700
22
                block->clear_column_data(std::vector<uint32_t> {0});
701
22
            }
702
22
        }
703
704
0
        const uint8_t* __restrict filter_map_data = result_filter.data();
705
22
        filter_map_ptr = std::make_unique<FilterMap>();
706
22
        RETURN_IF_ERROR(filter_map_ptr->init(filter_map_data, pre_read_rows, can_filter_all));
707
22
        if (filter_map_ptr->filter_all()) {
708
11
            {
709
11
                SCOPED_RAW_TIMER(&_predicate_filter_time);
710
11
                std::vector<uint32_t> columns_to_clear;
711
11
                columns_to_clear.reserve(_lazy_read_ctx.predicate_columns.first.size() +
712
11
                                         _lazy_read_ctx.predicate_partition_columns.size() +
713
11
                                         _lazy_read_ctx.predicate_missing_columns.size());
714
11
                for (const auto& col : _lazy_read_ctx.predicate_columns.first) {
715
                    // clean block to read predicate columns
716
11
                    uint32_t block_pos = 0;
717
11
                    RETURN_IF_ERROR(_get_block_column_pos(*block, col, &block_pos));
718
11
                    columns_to_clear.emplace_back(block_pos);
719
11
                }
720
11
                for (const auto& col : _lazy_read_ctx.predicate_partition_columns) {
721
11
                    uint32_t block_pos = 0;
722
11
                    RETURN_IF_ERROR(_get_block_column_pos(*block, col.first, &block_pos));
723
11
                    columns_to_clear.emplace_back(block_pos);
724
11
                }
725
11
                for (const auto& col : _lazy_read_ctx.predicate_missing_columns) {
726
0
                    uint32_t block_pos = 0;
727
0
                    RETURN_IF_ERROR(_get_block_column_pos(*block, col.first, &block_pos));
728
0
                    columns_to_clear.emplace_back(block_pos);
729
0
                }
730
11
                block->clear_column_data(columns_to_clear);
731
11
                RETURN_IF_ERROR(_table_format_reader->clear_synthesized_columns(block));
732
11
                RETURN_IF_ERROR(_table_format_reader->clear_generated_columns(block));
733
11
                Block::erase_useless_column(block, origin_column_num);
734
11
            }
735
736
11
            if (!pre_eof) {
737
                // If continuous batches are skipped, we can cache them to skip a whole page
738
10
                _cached_filtered_rows += pre_read_rows;
739
10
                if (pre_raw_read_rows >= config::doris_scanner_row_num) {
740
0
                    *read_rows = 0;
741
0
                    RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
742
0
                    return Status::OK();
743
0
                }
744
10
            } else { // pre_eof
745
                // If filter_map_ptr->filter_all() and pre_eof, we can skip whole row group.
746
1
                *read_rows = 0;
747
1
                *batch_eof = true;
748
1
                _lazy_read_filtered_rows += (pre_read_rows + _cached_filtered_rows);
749
1
                RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
750
1
                return Status::OK();
751
1
            }
752
11
        } else {
753
11
            break;
754
11
        }
755
22
    }
756
11
    if (_state->is_cancelled()) {
757
0
        return Status::Cancelled("cancelled");
758
0
    }
759
760
11
    if (filter_map_ptr == nullptr) {
761
0
        DCHECK_EQ(pre_read_rows + _cached_filtered_rows, 0);
762
0
        *read_rows = 0;
763
0
        *batch_eof = true;
764
0
        RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
765
0
        return Status::OK();
766
0
    }
767
768
11
    FilterMap& filter_map = *filter_map_ptr;
769
11
    DorisUniqueBufferPtr<uint8_t> rebuild_filter_map = nullptr;
770
11
    if (_cached_filtered_rows != 0) {
771
0
        RETURN_IF_ERROR(_rebuild_filter_map(filter_map, rebuild_filter_map, pre_read_rows));
772
0
        pre_read_rows += _cached_filtered_rows;
773
0
        _cached_filtered_rows = 0;
774
0
    }
775
776
    // lazy read columns
777
11
    size_t lazy_read_rows;
778
11
    bool lazy_eof;
779
11
    RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.lazy_read_columns, pre_read_rows,
780
11
                                      &lazy_read_rows, &lazy_eof, filter_map));
781
782
11
    if (pre_read_rows != lazy_read_rows) {
783
0
        return Status::Corruption("Can't read the same number of rows when doing lazy read");
784
0
    }
785
    // pre_eof ^ lazy_eof
786
    // we set pre_read_rows as batch_size for lazy read columns, so pre_eof != lazy_eof
787
788
    // filter data in predicate columns, and remove filter column
789
11
    {
790
11
        SCOPED_RAW_TIMER(&_predicate_filter_time);
791
11
        if (filter_map.has_filter()) {
792
0
            RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
793
0
                    block, _lazy_read_ctx.all_predicate_col_ids, result_filter));
794
0
            Block::erase_useless_column(block, origin_column_num);
795
796
11
        } else {
797
11
            Block::erase_useless_column(block, origin_column_num);
798
11
        }
799
11
    }
800
801
11
    RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
802
803
11
    size_t column_num = block->columns();
804
11
    size_t column_size = 0;
805
44
    for (int i = 0; i < column_num; ++i) {
806
33
        size_t cz = block->get_by_position(i).column->size();
807
33
        if (column_size != 0 && cz != 0) {
808
22
            DCHECK_EQ(column_size, cz);
809
22
        }
810
33
        if (cz != 0) {
811
33
            column_size = cz;
812
33
        }
813
33
    }
814
11
    _lazy_read_filtered_rows += pre_read_rows - column_size;
815
11
    *read_rows = column_size;
816
817
11
    *batch_eof = pre_eof;
818
11
    DCHECK(_table_format_reader);
819
11
    RETURN_IF_ERROR(_table_format_reader->on_fill_partition_columns(
820
11
            block, column_size, _lazy_read_ctx.partition_col_names));
821
11
    RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns(
822
11
            block, column_size, _lazy_read_ctx.missing_col_names));
823
11
#ifndef NDEBUG
824
33
    for (auto col : *block) {
825
33
        col.column->sanity_check();
826
33
        DCHECK(block->rows() == col.column->size())
827
0
                << absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
828
0
                                    block->rows(), col.column->size(), col.name);
829
33
    }
830
11
#endif
831
11
    return Status::OK();
832
11
}
833
834
Status RowGroupReader::_rebuild_filter_map(FilterMap& filter_map,
835
                                           DorisUniqueBufferPtr<uint8_t>& filter_map_data,
836
0
                                           size_t pre_read_rows) const {
837
0
    if (_cached_filtered_rows == 0) {
838
0
        return Status::OK();
839
0
    }
840
0
    size_t total_rows = _cached_filtered_rows + pre_read_rows;
841
0
    if (filter_map.filter_all()) {
842
0
        RETURN_IF_ERROR(filter_map.init(nullptr, total_rows, true));
843
0
        return Status::OK();
844
0
    }
845
846
0
    filter_map_data = make_unique_buffer<uint8_t>(total_rows);
847
0
    auto* map = filter_map_data.get();
848
0
    for (size_t i = 0; i < _cached_filtered_rows; ++i) {
849
0
        map[i] = 0;
850
0
    }
851
0
    const uint8_t* old_map = filter_map.filter_map_data();
852
0
    if (old_map == nullptr) {
853
        // select_vector.filter_all() == true is already built.
854
0
        for (size_t i = _cached_filtered_rows; i < total_rows; ++i) {
855
0
            map[i] = 1;
856
0
        }
857
0
    } else {
858
0
        memcpy(map + _cached_filtered_rows, old_map, pre_read_rows);
859
0
    }
860
0
    RETURN_IF_ERROR(filter_map.init(map, total_rows, false));
861
0
    return Status::OK();
862
0
}
863
864
Status RowGroupReader::_fill_partition_columns(
865
        Block* block, size_t rows,
866
        const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
867
0
                partition_columns) {
868
0
    DataTypeSerDe::FormatOptions _text_formatOptions;
869
0
    for (const auto& kv : partition_columns) {
870
0
        uint32_t block_pos = 0;
871
0
        RETURN_IF_ERROR(_get_block_column_pos(*block, kv.first, &block_pos));
872
0
        auto doris_column = block->get_by_position(block_pos).column;
873
        // obtained from block*, it is a mutable object.
874
0
        auto* col_ptr = const_cast<IColumn*>(doris_column.get());
875
0
        const auto& [value, slot_desc] = kv.second;
876
0
        auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
877
0
        Slice slice(value.data(), value.size());
878
0
        uint64_t num_deserialized = 0;
879
        // Be careful when reading empty rows from parquet row groups.
880
0
        if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows,
881
0
                                                            &num_deserialized,
882
0
                                                            _text_formatOptions) != Status::OK()) {
883
0
            return Status::InternalError("Failed to fill partition column: {}={}",
884
0
                                         slot_desc->col_name(), value);
885
0
        }
886
0
        if (num_deserialized != rows) {
887
0
            return Status::InternalError(
888
0
                    "Failed to fill partition column: {}={} ."
889
0
                    "Number of rows expected to be written : {}, number of rows actually written : "
890
0
                    "{}",
891
0
                    slot_desc->col_name(), value, num_deserialized, rows);
892
0
        }
893
0
    }
894
0
    return Status::OK();
895
0
}
896
897
Status RowGroupReader::_fill_missing_columns(
898
        Block* block, size_t rows,
899
0
        const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
900
0
    for (const auto& kv : missing_columns) {
901
0
        uint32_t block_pos = 0;
902
0
        RETURN_IF_ERROR(_get_block_column_pos(*block, kv.first, &block_pos));
903
0
        if (kv.second == nullptr) {
904
            // no default column, fill with null
905
0
            auto column_guard = block->mutate_column_scoped(block_pos);
906
0
            auto& mutable_column = column_guard.mutable_column();
907
0
            auto* nullable_column = assert_cast<ColumnNullable*>(mutable_column.get());
908
0
            nullable_column->insert_many_defaults(rows);
909
0
        } else {
910
            // fill with default value
911
0
            const auto& ctx = kv.second;
912
0
            ColumnPtr result_column_ptr;
913
            // PT1 => dest primitive type
914
0
            RETURN_IF_ERROR(ctx->execute(block, result_column_ptr));
915
0
            if (result_column_ptr->use_count() == 1) {
916
                // call resize because the first column of _src_block_ptr may not be filled by reader,
917
                // so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
918
                // has only one row.
919
0
                auto mutable_column = result_column_ptr->assert_mutable();
920
0
                mutable_column->resize(rows);
921
                // result_column_ptr maybe a ColumnConst, convert it to a normal column
922
0
                result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
923
0
                auto origin_column_type = block->get_by_position(block_pos).type;
924
0
                bool is_nullable = origin_column_type->is_nullable();
925
0
                block->replace_by_position(block_pos, is_nullable ? make_nullable(result_column_ptr)
926
0
                                                                  : result_column_ptr);
927
0
            }
928
0
        }
929
0
    }
930
0
    return Status::OK();
931
0
}
932
933
Status RowGroupReader::_get_block_column_pos(const Block& block, const std::string& column_name,
934
235
                                             uint32_t* position) const {
935
235
    if (_col_name_to_block_idx == nullptr) {
936
0
        return Status::InternalError(
937
0
                "Column name to block index map is not set when reading parquet column '{}', "
938
0
                "block: "
939
0
                "{}",
940
0
                column_name, block.dump_structure());
941
0
    }
942
235
    auto iter = _col_name_to_block_idx->find(column_name);
943
235
    if (iter == _col_name_to_block_idx->end()) {
944
0
        return Status::InternalError("Column '{}' not found in block index map, block: {}",
945
0
                                     column_name, block.dump_structure());
946
0
    }
947
235
    if (iter->second >= block.columns()) {
948
0
        return Status::InternalError(
949
0
                "Column '{}' maps to invalid block position {}, block columns: {}, block: {}",
950
0
                column_name, iter->second, block.columns(), block.dump_structure());
951
0
    }
952
235
    *position = iter->second;
953
235
    return Status::OK();
954
235
}
955
956
106
bool RowGroupReader::_need_current_batch_row_positions() const {
957
106
    DCHECK(_table_format_reader);
958
106
    return _table_format_reader->has_synthesized_column_handlers() ||
959
106
           _table_format_reader->has_generated_column_handlers();
960
106
}
961
962
11
Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof) {
963
11
    if (_position_delete_ctx.has_filter) {
964
0
        int64_t start_row_id = _position_delete_ctx.current_row_id;
965
0
        int64_t end_row_id = std::min(_position_delete_ctx.current_row_id + (int64_t)batch_size,
966
0
                                      _position_delete_ctx.last_row_id);
967
0
        int64_t num_delete_rows = 0;
968
0
        auto before_index = _position_delete_ctx.index;
969
0
        while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
970
0
            const int64_t& delete_row_id =
971
0
                    _position_delete_ctx.delete_rows[_position_delete_ctx.index];
972
0
            if (delete_row_id < start_row_id) {
973
0
                _position_delete_ctx.index++;
974
0
                before_index = _position_delete_ctx.index;
975
0
            } else if (delete_row_id < end_row_id) {
976
0
                num_delete_rows++;
977
0
                _position_delete_ctx.index++;
978
0
            } else { // delete_row_id >= end_row_id
979
0
                break;
980
0
            }
981
0
        }
982
0
        *read_rows = end_row_id - start_row_id - num_delete_rows;
983
0
        _position_delete_ctx.current_row_id = end_row_id;
984
0
        *batch_eof = _position_delete_ctx.current_row_id == _position_delete_ctx.last_row_id;
985
986
0
        if (_need_current_batch_row_positions()) {
987
0
            _current_batch_row_ids.clear();
988
0
            _current_batch_row_ids.resize(*read_rows);
989
0
            size_t idx = 0;
990
0
            for (auto id = start_row_id; id < end_row_id; id++) {
991
0
                if (before_index < _position_delete_ctx.index &&
992
0
                    id == _position_delete_ctx.delete_rows[before_index]) {
993
0
                    before_index++;
994
0
                    continue;
995
0
                }
996
0
                _current_batch_row_ids[idx++] = (rowid_t)id;
997
0
            }
998
0
        }
999
11
    } else {
1000
11
        if (batch_size < _remaining_rows) {
1001
10
            *read_rows = batch_size;
1002
10
            _remaining_rows -= batch_size;
1003
10
            *batch_eof = false;
1004
10
        } else {
1005
1
            *read_rows = _remaining_rows;
1006
1
            _remaining_rows = 0;
1007
1
            *batch_eof = true;
1008
1
        }
1009
11
        if (_need_current_batch_row_positions()) {
1010
0
            RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
1011
0
        }
1012
11
    }
1013
11
    _total_read_rows += *read_rows;
1014
11
    return Status::OK();
1015
11
}
1016
1017
5
Status RowGroupReader::_get_current_batch_row_id(size_t read_rows) {
1018
5
    _current_batch_row_ids.clear();
1019
5
    _current_batch_row_ids.resize(read_rows);
1020
1021
5
    int64_t idx = 0;
1022
5
    int64_t read_range_rows = 0;
1023
19
    for (size_t range_idx = 0; range_idx < _read_ranges.range_size(); range_idx++) {
1024
14
        auto range = _read_ranges.get_range(range_idx);
1025
14
        if (read_rows == 0) {
1026
0
            break;
1027
0
        }
1028
14
        if (read_range_rows + (range.to() - range.from()) > _total_read_rows) {
1029
14
            int64_t fi =
1030
14
                    std::max(_total_read_rows, read_range_rows) - read_range_rows + range.from();
1031
14
            size_t len = std::min(read_rows, (size_t)(std::max(range.to(), fi) - fi));
1032
1033
14
            read_rows -= len;
1034
1035
28
            for (auto i = 0; i < len; i++) {
1036
14
                _current_batch_row_ids[idx++] =
1037
14
                        (rowid_t)(fi + i + _current_row_group_idx.first_row);
1038
14
            }
1039
14
        }
1040
14
        read_range_rows += range.to() - range.from();
1041
14
    }
1042
5
    return Status::OK();
1043
5
}
1044
1045
95
Status RowGroupReader::_build_pos_delete_filter(size_t read_rows) {
1046
95
    if (!_position_delete_ctx.has_filter) {
1047
95
        _pos_delete_filter_ptr.reset(nullptr);
1048
95
        _total_read_rows += read_rows;
1049
95
        return Status::OK();
1050
95
    }
1051
0
    _pos_delete_filter_ptr.reset(new IColumn::Filter(read_rows, 1));
1052
0
    auto* __restrict _pos_delete_filter_data = _pos_delete_filter_ptr->data();
1053
0
    while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
1054
0
        const int64_t delete_row_index_in_row_group =
1055
0
                _position_delete_ctx.delete_rows[_position_delete_ctx.index] -
1056
0
                _position_delete_ctx.first_row_id;
1057
0
        int64_t read_range_rows = 0;
1058
0
        size_t remaining_read_rows = _total_read_rows + read_rows;
1059
0
        for (size_t range_idx = 0; range_idx < _read_ranges.range_size(); range_idx++) {
1060
0
            auto range = _read_ranges.get_range(range_idx);
1061
0
            if (delete_row_index_in_row_group < range.from()) {
1062
0
                ++_position_delete_ctx.index;
1063
0
                break;
1064
0
            } else if (delete_row_index_in_row_group < range.to()) {
1065
0
                int64_t index = (delete_row_index_in_row_group - range.from()) + read_range_rows -
1066
0
                                _total_read_rows;
1067
0
                if (index > read_rows - 1) {
1068
0
                    _total_read_rows += read_rows;
1069
0
                    return Status::OK();
1070
0
                }
1071
0
                _pos_delete_filter_data[index] = 0;
1072
0
                ++_position_delete_ctx.index;
1073
0
                break;
1074
0
            } else { // delete_row >= range.last_row
1075
0
            }
1076
1077
0
            int64_t range_size = range.to() - range.from();
1078
            // Don't search next range when there is no remaining_read_rows.
1079
0
            if (remaining_read_rows <= range_size) {
1080
0
                _total_read_rows += read_rows;
1081
0
                return Status::OK();
1082
0
            } else {
1083
0
                remaining_read_rows -= range_size;
1084
0
                read_range_rows += range_size;
1085
0
            }
1086
0
        }
1087
0
    }
1088
0
    _total_read_rows += read_rows;
1089
0
    return Status::OK();
1090
0
}
1091
1092
// need exception safety
1093
Status RowGroupReader::_filter_block(Block* block, int column_to_keep,
1094
51
                                     const std::vector<uint32_t>& columns_to_filter) {
1095
51
    if (_pos_delete_filter_ptr) {
1096
0
        RETURN_IF_CATCH_EXCEPTION(
1097
0
                Block::filter_block_internal(block, columns_to_filter, (*_pos_delete_filter_ptr)));
1098
0
    }
1099
51
    Block::erase_useless_column(block, column_to_keep);
1100
1101
51
    return Status::OK();
1102
51
}
1103
1104
6
Status RowGroupReader::_rewrite_dict_predicates() {
1105
6
    SCOPED_RAW_TIMER(&_dict_filter_rewrite_time);
1106
6
    for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) {
1107
2
        std::string& dict_filter_col_name = it->first;
1108
2
        int slot_id = it->second;
1109
        // 1. Get dictionary values to a string column.
1110
2
        MutableColumnPtr dict_value_column = ColumnString::create();
1111
2
        bool has_dict = false;
1112
2
        RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->read_dict_values_to_column(
1113
2
                dict_value_column, &has_dict));
1114
2
#ifndef NDEBUG
1115
2
        dict_value_column->sanity_check();
1116
2
#endif
1117
2
        size_t dict_value_column_size = dict_value_column->size();
1118
2
        DCHECK(has_dict);
1119
        // 2. Build a temp block from the dict string column, then execute conjuncts and filter block.
1120
        // 2.1 Build a temp block from the dict string column to match the conjuncts executing.
1121
2
        Block temp_block;
1122
2
        int dict_pos = -1;
1123
2
        int index = 0;
1124
4
        for (const auto slot_desc : _tuple_descriptor->slots()) {
1125
4
            if (slot_desc->id() == slot_id) {
1126
2
                auto data_type = slot_desc->get_data_type_ptr();
1127
2
                if (data_type->is_nullable()) {
1128
0
                    temp_block.insert(
1129
0
                            {ColumnNullable::create(
1130
0
                                     std::move(
1131
0
                                             dict_value_column), // NOLINT(bugprone-use-after-move)
1132
0
                                     ColumnUInt8::create(dict_value_column_size, 0)),
1133
0
                             std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()),
1134
0
                             ""});
1135
2
                } else {
1136
2
                    temp_block.insert(
1137
2
                            {std::move(dict_value_column), std::make_shared<DataTypeString>(), ""});
1138
2
                }
1139
2
                dict_pos = index;
1140
1141
2
            } else {
1142
2
                temp_block.insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
1143
2
                                                        slot_desc->get_data_type_ptr(),
1144
2
                                                        slot_desc->col_name()));
1145
2
            }
1146
4
            ++index;
1147
4
        }
1148
1149
        // 2.2 Execute conjuncts.
1150
2
        VExprContextSPtrs ctxs;
1151
2
        auto iter = _slot_id_to_filter_conjuncts->find(slot_id);
1152
2
        if (iter != _slot_id_to_filter_conjuncts->end()) {
1153
2
            for (auto& ctx : iter->second) {
1154
2
                ctxs.push_back(ctx);
1155
2
            }
1156
2
        } else {
1157
0
            std::stringstream msg;
1158
0
            msg << "_slot_id_to_filter_conjuncts: slot_id [" << slot_id << "] not found";
1159
0
            return Status::NotFound(msg.str());
1160
0
        }
1161
1162
2
        if (dict_pos != 0) {
1163
            // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
1164
            // The following process may be tricky and time-consuming, but we have no other way.
1165
0
            temp_block.get_by_position(0).column->assert_mutable()->resize(dict_value_column_size);
1166
0
        }
1167
2
        IColumn::Filter result_filter(temp_block.rows(), 1);
1168
2
        bool can_filter_all;
1169
2
        {
1170
2
            RETURN_IF_ERROR(VExprContext::execute_conjuncts(ctxs, nullptr, &temp_block,
1171
2
                                                            &result_filter, &can_filter_all));
1172
2
        }
1173
2
        if (dict_pos != 0) {
1174
            // We have to clean the first column to insert right data.
1175
0
            temp_block.get_by_position(0).column->assert_mutable()->clear();
1176
0
        }
1177
1178
        // If can_filter_all = true, can filter this row group.
1179
2
        if (can_filter_all) {
1180
2
            _is_row_group_filtered = true;
1181
2
            return Status::OK();
1182
2
        }
1183
1184
        // 3. Get dict codes.
1185
0
        std::vector<int32_t> dict_codes;
1186
0
        for (size_t i = 0; i < result_filter.size(); ++i) {
1187
0
            if (result_filter[i]) {
1188
0
                dict_codes.emplace_back(i);
1189
0
            }
1190
0
        }
1191
1192
        // About Performance: if dict_column size is too large, it will generate a large IN filter.
1193
0
        if (dict_codes.size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
1194
0
            it = _dict_filter_cols.erase(it);
1195
0
            for (auto& ctx : ctxs) {
1196
0
                _filter_conjuncts.push_back(ctx);
1197
0
            }
1198
0
            continue;
1199
0
        }
1200
1201
        // 4. Rewrite conjuncts.
1202
0
        RETURN_IF_ERROR(_rewrite_dict_conjuncts(
1203
0
                dict_codes, slot_id, temp_block.get_by_position(dict_pos).column->is_nullable()));
1204
0
        ++it;
1205
0
    }
1206
4
    return Status::OK();
1207
6
}
1208
1209
Status RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id,
1210
0
                                               bool is_nullable) {
1211
0
    VExprSPtr root;
1212
0
    if (dict_codes.size() == 1) {
1213
0
        {
1214
0
            TFunction fn;
1215
0
            TFunctionName fn_name;
1216
0
            fn_name.__set_db_name("");
1217
0
            fn_name.__set_function_name("eq");
1218
0
            fn.__set_name(fn_name);
1219
0
            fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
1220
0
            std::vector<TTypeDesc> arg_types;
1221
0
            arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
1222
0
            arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
1223
0
            fn.__set_arg_types(arg_types);
1224
0
            fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
1225
0
            fn.__set_has_var_args(false);
1226
1227
0
            TExprNode texpr_node;
1228
0
            texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
1229
0
            texpr_node.__set_node_type(TExprNodeType::BINARY_PRED);
1230
0
            texpr_node.__set_opcode(TExprOpcode::EQ);
1231
0
            texpr_node.__set_fn(fn);
1232
0
            texpr_node.__set_num_children(2);
1233
0
            texpr_node.__set_is_nullable(is_nullable);
1234
0
            root = VectorizedFnCall::create_shared(texpr_node);
1235
0
        }
1236
0
        {
1237
0
            SlotDescriptor* slot = nullptr;
1238
0
            const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
1239
0
            for (auto each : slots) {
1240
0
                if (each->id() == slot_id) {
1241
0
                    slot = each;
1242
0
                    break;
1243
0
                }
1244
0
            }
1245
0
            root->add_child(VSlotRef::create_shared(slot));
1246
0
        }
1247
0
        {
1248
0
            TExprNode texpr_node;
1249
0
            texpr_node.__set_node_type(TExprNodeType::INT_LITERAL);
1250
0
            texpr_node.__set_type(create_type_desc(TYPE_INT));
1251
0
            TIntLiteral int_literal;
1252
0
            int_literal.__set_value(dict_codes[0]);
1253
0
            texpr_node.__set_int_literal(int_literal);
1254
0
            texpr_node.__set_is_nullable(is_nullable);
1255
0
            root->add_child(VLiteral::create_shared(texpr_node));
1256
0
        }
1257
0
    } else {
1258
0
        {
1259
0
            TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
1260
0
            TExprNode node;
1261
0
            node.__set_type(type_desc);
1262
0
            node.__set_node_type(TExprNodeType::IN_PRED);
1263
0
            node.in_predicate.__set_is_not_in(false);
1264
0
            node.__set_opcode(TExprOpcode::FILTER_IN);
1265
            // VdirectInPredicate assume is_nullable = false.
1266
0
            node.__set_is_nullable(false);
1267
1268
0
            std::shared_ptr<HybridSetBase> hybrid_set(
1269
0
                    create_set(PrimitiveType::TYPE_INT, dict_codes.size(), false));
1270
0
            for (int j = 0; j < dict_codes.size(); ++j) {
1271
0
                hybrid_set->insert(&dict_codes[j]);
1272
0
            }
1273
0
            root = VDirectInPredicate::create_shared(node, hybrid_set);
1274
0
        }
1275
0
        {
1276
0
            SlotDescriptor* slot = nullptr;
1277
0
            const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
1278
0
            for (auto each : slots) {
1279
0
                if (each->id() == slot_id) {
1280
0
                    slot = each;
1281
0
                    break;
1282
0
                }
1283
0
            }
1284
0
            root->add_child(VSlotRef::create_shared(slot));
1285
0
        }
1286
0
    }
1287
0
    VExprContextSPtr rewritten_conjunct_ctx = VExprContext::create_shared(root);
1288
0
    RETURN_IF_ERROR(rewritten_conjunct_ctx->prepare(_state, *_row_descriptor));
1289
0
    RETURN_IF_ERROR(rewritten_conjunct_ctx->open(_state));
1290
0
    _dict_filter_conjuncts.push_back(rewritten_conjunct_ctx);
1291
0
    _filter_conjuncts.push_back(rewritten_conjunct_ctx);
1292
0
    return Status::OK();
1293
0
}
1294
1295
85
Status RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
1296
85
    for (auto& dict_filter_cols : _dict_filter_cols) {
1297
0
        uint32_t block_pos = 0;
1298
0
        RETURN_IF_ERROR(_get_block_column_pos(*block, dict_filter_cols.first, &block_pos));
1299
0
        auto reader_iter = _column_readers.find(dict_filter_cols.first);
1300
0
        if (reader_iter == _column_readers.end() || reader_iter->second == nullptr) {
1301
0
            return Status::InternalError("Column reader for '{}' not found in parquet row group",
1302
0
                                         dict_filter_cols.first);
1303
0
        }
1304
0
        ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(block_pos);
1305
0
        const ColumnPtr& column = column_with_type_and_name.column;
1306
0
        if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
1307
0
            const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr();
1308
0
            const auto* dict_column = assert_cast<const ColumnInt32*>(nested_column.get());
1309
0
            DCHECK(dict_column);
1310
1311
0
            auto string_column = DORIS_TRY(
1312
0
                    reader_iter->second->convert_dict_column_to_string_column(dict_column));
1313
1314
0
            column_with_type_and_name.type =
1315
0
                    std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
1316
0
            block->replace_by_position(
1317
0
                    block_pos, ColumnNullable::create(std::move(string_column),
1318
0
                                                      nullable_column->get_null_map_column_ptr()));
1319
0
        } else {
1320
0
            const auto* dict_column = assert_cast<const ColumnInt32*>(column.get());
1321
0
            auto string_column = DORIS_TRY(
1322
0
                    reader_iter->second->convert_dict_column_to_string_column(dict_column));
1323
1324
0
            column_with_type_and_name.type = std::make_shared<DataTypeString>();
1325
0
            block->replace_by_position(block_pos, std::move(string_column));
1326
0
        }
1327
0
    }
1328
85
    return Status::OK();
1329
85
}
1330
1331
38
ParquetColumnReader::ColumnStatistics RowGroupReader::merged_column_statistics() {
1332
38
    ParquetColumnReader::ColumnStatistics st;
1333
108
    for (auto& reader : _column_readers) {
1334
108
        auto ost = reader.second->column_statistics();
1335
108
        st.merge(ost);
1336
108
    }
1337
38
    return st;
1338
38
}
1339
1340
} // namespace doris