Coverage Report

Created: 2026-03-31 18:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/vparquet_group_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/vparquet_group_reader.h"
19
20
#include <gen_cpp/Exprs_types.h>
21
#include <gen_cpp/Opcodes_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/parquet_types.h>
24
#include <string.h>
25
26
#include <algorithm>
27
#include <boost/iterator/iterator_facade.hpp>
28
#include <memory>
29
#include <ostream>
30
31
#include "common/config.h"
32
#include "common/consts.h"
33
#include "common/logging.h"
34
#include "common/object_pool.h"
35
#include "common/status.h"
36
#include "core/assert_cast.h"
37
#include "core/block/block.h"
38
#include "core/block/column_with_type_and_name.h"
39
#include "core/column/column_const.h"
40
#include "core/column/column_nullable.h"
41
#include "core/column/column_string.h"
42
#include "core/column/column_struct.h"
43
#include "core/column/column_vector.h"
44
#include "core/custom_allocator.h"
45
#include "core/data_type/data_type.h"
46
#include "core/data_type/data_type_number.h"
47
#include "core/data_type/data_type_string.h"
48
#include "core/data_type/data_type_struct.h"
49
#include "core/data_type/define_primitive_type.h"
50
#include "core/pod_array.h"
51
#include "core/types.h"
52
#include "exprs/create_predicate_function.h"
53
#include "exprs/hybrid_set.h"
54
#include "exprs/vdirect_in_predicate.h"
55
#include "exprs/vectorized_fn_call.h"
56
#include "exprs/vexpr.h"
57
#include "exprs/vexpr_context.h"
58
#include "exprs/vliteral.h"
59
#include "exprs/vslot_ref.h"
60
#include "format/parquet/schema_desc.h"
61
#include "format/parquet/vparquet_column_reader.h"
62
#include "format/table/iceberg_reader.h"
63
#include "runtime/descriptors.h"
64
#include "runtime/runtime_state.h"
65
#include "runtime/thread_context.h"
66
#include "storage/segment/column_reader.h"
67
68
namespace cctz {
69
class time_zone;
70
} // namespace cctz
71
namespace doris {
72
class RuntimeState;
73
74
namespace io {
75
struct IOContext;
76
} // namespace io
77
} // namespace doris
78
79
namespace doris {
80
#include "common/compile_check_begin.h"
81
82
namespace {
83
Status build_iceberg_rowid_column(const DataTypePtr& type, const std::string& file_path,
84
                                  const std::vector<rowid_t>& row_ids, int32_t partition_spec_id,
85
                                  const std::string& partition_data_json,
86
0
                                  MutableColumnPtr* column_out) {
87
0
    if (type == nullptr || column_out == nullptr) {
88
0
        return Status::InvalidArgument("Invalid iceberg rowid column type or output column");
89
0
    }
90
91
0
    MutableColumnPtr column = type->create_column();
92
0
    ColumnNullable* nullable_col = check_and_get_column<ColumnNullable>(column.get());
93
0
    ColumnStruct* struct_col = nullptr;
94
0
    if (nullable_col != nullptr) {
95
0
        struct_col =
96
0
                check_and_get_column<ColumnStruct>(nullable_col->get_nested_column_ptr().get());
97
0
    } else {
98
0
        struct_col = check_and_get_column<ColumnStruct>(column.get());
99
0
    }
100
101
0
    if (struct_col == nullptr || struct_col->tuple_size() < 4) {
102
0
        return Status::InternalError("Invalid iceberg rowid column structure");
103
0
    }
104
105
0
    size_t num_rows = row_ids.size();
106
0
    auto& file_path_col = struct_col->get_column(0);
107
0
    auto& row_pos_col = struct_col->get_column(1);
108
0
    auto& spec_id_col = struct_col->get_column(2);
109
0
    auto& partition_data_col = struct_col->get_column(3);
110
111
0
    file_path_col.reserve(num_rows);
112
0
    row_pos_col.reserve(num_rows);
113
0
    spec_id_col.reserve(num_rows);
114
0
    partition_data_col.reserve(num_rows);
115
116
0
    for (size_t i = 0; i < num_rows; ++i) {
117
0
        file_path_col.insert_data(file_path.data(), file_path.size());
118
0
    }
119
0
    for (size_t i = 0; i < num_rows; ++i) {
120
0
        int64_t row_pos = static_cast<int64_t>(row_ids[i]);
121
0
        row_pos_col.insert_data(reinterpret_cast<const char*>(&row_pos), sizeof(row_pos));
122
0
    }
123
0
    for (size_t i = 0; i < num_rows; ++i) {
124
0
        int32_t spec_id = partition_spec_id;
125
0
        spec_id_col.insert_data(reinterpret_cast<const char*>(&spec_id), sizeof(spec_id));
126
0
    }
127
0
    for (size_t i = 0; i < num_rows; ++i) {
128
0
        partition_data_col.insert_data(partition_data_json.data(), partition_data_json.size());
129
0
    }
130
131
0
    if (nullable_col != nullptr) {
132
0
        nullable_col->get_null_map_data().resize_fill(num_rows, 0);
133
0
    }
134
135
0
    *column_out = std::move(column);
136
0
    return Status::OK();
137
0
}
138
} // namespace
139
const std::vector<int64_t> RowGroupReader::NO_DELETE = {};
140
static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE = std::numeric_limits<uint32_t>::max();
141
142
RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader,
143
                               const std::vector<std::string>& read_columns,
144
                               const int32_t row_group_id, const tparquet::RowGroup& row_group,
145
                               const cctz::time_zone* ctz, io::IOContext* io_ctx,
146
                               const PositionDeleteContext& position_delete_ctx,
147
                               const LazyReadContext& lazy_read_ctx, RuntimeState* state,
148
                               const std::set<uint64_t>& column_ids,
149
                               const std::set<uint64_t>& filter_column_ids)
150
37
        : _file_reader(file_reader),
151
37
          _read_table_columns(read_columns),
152
37
          _row_group_id(row_group_id),
153
37
          _row_group_meta(row_group),
154
37
          _remaining_rows(row_group.num_rows),
155
37
          _ctz(ctz),
156
37
          _io_ctx(io_ctx),
157
37
          _position_delete_ctx(position_delete_ctx),
158
37
          _lazy_read_ctx(lazy_read_ctx),
159
37
          _state(state),
160
37
          _obj_pool(new ObjectPool()),
161
37
          _column_ids(column_ids),
162
37
          _filter_column_ids(filter_column_ids) {}
163
164
37
RowGroupReader::~RowGroupReader() {
165
37
    _column_readers.clear();
166
37
    _obj_pool->clear();
167
37
}
168
169
Status RowGroupReader::init(
170
        const FieldDescriptor& schema, RowRanges& row_ranges,
171
        std::unordered_map<int, tparquet::OffsetIndex>& col_offsets,
172
        const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
173
        const std::unordered_map<std::string, int>* colname_to_slot_id,
174
        const VExprContextSPtrs* not_single_slot_filter_conjuncts,
175
37
        const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
176
37
    _tuple_descriptor = tuple_descriptor;
177
37
    _row_descriptor = row_descriptor;
178
37
    _col_name_to_slot_id = colname_to_slot_id;
179
37
    _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
180
37
    _read_ranges = row_ranges;
181
37
    _filter_read_ranges_by_condition_cache();
182
37
    _remaining_rows = _read_ranges.count();
183
184
37
    if (_read_table_columns.empty()) {
185
        // Query task that only select columns in path.
186
1
        return Status::OK();
187
1
    }
188
36
    const size_t MAX_GROUP_BUF_SIZE = config::parquet_rowgroup_max_buffer_mb << 20;
189
36
    const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 20;
190
36
    size_t max_buf_size =
191
36
            std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / _read_table_columns.size());
192
106
    for (const auto& read_table_col : _read_table_columns) {
193
106
        auto read_file_col = _table_info_node_ptr->children_file_column_name(read_table_col);
194
106
        auto* field = schema.get_column(read_file_col);
195
106
        std::unique_ptr<ParquetColumnReader> reader;
196
106
        RETURN_IF_ERROR(ParquetColumnReader::create(
197
106
                _file_reader, field, _row_group_meta, _read_ranges, _ctz, _io_ctx, reader,
198
106
                max_buf_size, col_offsets, _state, false, _column_ids, _filter_column_ids));
199
106
        if (reader == nullptr) {
200
0
            VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader failed";
201
0
            return Status::Corruption("Init row group reader failed");
202
0
        }
203
106
        _column_readers[read_table_col] = std::move(reader);
204
106
    }
205
206
36
    bool disable_dict_filter = false;
207
36
    if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) {
208
0
        disable_dict_filter = true;
209
0
        _filter_conjuncts.insert(_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(),
210
0
                                 not_single_slot_filter_conjuncts->end());
211
0
    }
212
213
    // Check if single slot can be filtered by dict.
214
36
    if (_slot_id_to_filter_conjuncts && !_slot_id_to_filter_conjuncts->empty()) {
215
6
        const std::vector<std::string>& predicate_col_names =
216
6
                _lazy_read_ctx.predicate_columns.first;
217
6
        const std::vector<int>& predicate_col_slot_ids = _lazy_read_ctx.predicate_columns.second;
218
14
        for (size_t i = 0; i < predicate_col_names.size(); ++i) {
219
8
            const std::string& predicate_col_name = predicate_col_names[i];
220
8
            int slot_id = predicate_col_slot_ids[i];
221
8
            if (predicate_col_name == IcebergTableReader::ROW_LINEAGE_ROW_ID ||
222
8
                predicate_col_name == IcebergTableReader::ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
223
                // row lineage column can not dict filter.
224
0
                if (_slot_id_to_filter_conjuncts->find(slot_id) !=
225
0
                    _slot_id_to_filter_conjuncts->end()) {
226
0
                    for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
227
0
                        _filter_conjuncts.push_back(ctx);
228
0
                    }
229
0
                }
230
0
                continue;
231
0
            }
232
233
8
            auto predicate_file_col_name =
234
8
                    _table_info_node_ptr->children_file_column_name(predicate_col_name);
235
8
            auto field = schema.get_column(predicate_file_col_name);
236
8
            if (!disable_dict_filter && !_lazy_read_ctx.has_complex_type &&
237
8
                _can_filter_by_dict(
238
8
                        slot_id, _row_group_meta.columns[field->physical_column_index].meta_data)) {
239
2
                _dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id));
240
6
            } else {
241
6
                if (_slot_id_to_filter_conjuncts->find(slot_id) !=
242
6
                    _slot_id_to_filter_conjuncts->end()) {
243
6
                    for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
244
6
                        _filter_conjuncts.push_back(ctx);
245
6
                    }
246
6
                }
247
6
            }
248
8
        }
249
        // Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts)
250
        // to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts.
251
6
        for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
252
4
            auto& [value, slot_desc] = kv.second;
253
4
            auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
254
4
            if (iter != _slot_id_to_filter_conjuncts->end()) {
255
4
                for (auto& ctx : iter->second) {
256
4
                    _filter_conjuncts.push_back(ctx);
257
4
                }
258
4
            }
259
4
        }
260
        //For check missing column :   missing column == xx, missing column is null,missing column is not null.
261
6
        _filter_conjuncts.insert(_filter_conjuncts.end(),
262
6
                                 _lazy_read_ctx.missing_columns_conjuncts.begin(),
263
6
                                 _lazy_read_ctx.missing_columns_conjuncts.end());
264
6
        RETURN_IF_ERROR(_rewrite_dict_predicates());
265
6
    }
266
    // _state is nullptr in some ut.
267
36
    if (_state && _state->enable_adjust_conjunct_order_by_cost()) {
268
8
        std::ranges::sort(_filter_conjuncts, [](const auto& a, const auto& b) {
269
8
            return a->execute_cost() < b->execute_cost();
270
8
        });
271
8
    }
272
36
    return Status::OK();
273
36
}
274
275
bool RowGroupReader::_can_filter_by_dict(int slot_id,
276
8
                                         const tparquet::ColumnMetaData& column_metadata) {
277
8
    SlotDescriptor* slot = nullptr;
278
8
    const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
279
14
    for (auto each : slots) {
280
14
        if (each->id() == slot_id) {
281
8
            slot = each;
282
8
            break;
283
8
        }
284
14
    }
285
8
    if (!is_string_type(slot->type()->get_primitive_type()) &&
286
8
        !is_var_len_object(slot->type()->get_primitive_type())) {
287
6
        return false;
288
6
    }
289
2
    if (column_metadata.type != tparquet::Type::BYTE_ARRAY) {
290
0
        return false;
291
0
    }
292
293
2
    if (!is_dictionary_encoded(column_metadata)) {
294
0
        return false;
295
0
    }
296
297
2
    if (_slot_id_to_filter_conjuncts->find(slot_id) == _slot_id_to_filter_conjuncts->end()) {
298
0
        return false;
299
0
    }
300
301
    // TODO: The current implementation of dictionary filtering does not take into account
302
    //  the implementation of NULL values because the dictionary itself does not contain
303
    //  NULL value encoding. As a result, many NULL-related functions or expressions
304
    //  cannot work properly, such as is null, is not null, coalesce, etc.
305
    //  Here we check if the predicate expr is IN or BINARY_PRED.
306
    //  Implementation of NULL value dictionary filtering will be carried out later.
307
2
    return std::ranges::all_of(_slot_id_to_filter_conjuncts->at(slot_id), [&](const auto& ctx) {
308
2
        return (ctx->root()->node_type() == TExprNodeType::IN_PRED ||
309
2
                ctx->root()->node_type() == TExprNodeType::BINARY_PRED) &&
310
2
               ctx->root()->children()[0]->node_type() == TExprNodeType::SLOT_REF;
311
2
    });
312
2
}
313
314
// This function is copied from
315
// https://github.com/apache/impala/blob/master/be/src/exec/parquet/hdfs-parquet-scanner.cc#L1717
316
2
bool RowGroupReader::is_dictionary_encoded(const tparquet::ColumnMetaData& column_metadata) {
317
    // The Parquet spec allows for column chunks to have mixed encodings
318
    // where some data pages are dictionary-encoded and others are plain
319
    // encoded. For example, a Parquet file writer might start writing
320
    // a column chunk as dictionary encoded, but it will switch to plain
321
    // encoding if the dictionary grows too large.
322
    //
323
    // In order for dictionary filters to skip the entire row group,
324
    // the conjuncts must be evaluated on column chunks that are entirely
325
    // encoded with the dictionary encoding. There are two checks
326
    // available to verify this:
327
    // 1. The encoding_stats field on the column chunk metadata provides
328
    //    information about the number of data pages written in each
329
    //    format. This allows for a specific check of whether all the
330
    //    data pages are dictionary encoded.
331
    // 2. The encodings field on the column chunk metadata lists the
332
    //    encodings used. If this list contains the dictionary encoding
333
    //    and does not include unexpected encodings (i.e. encodings not
334
    //    associated with definition/repetition levels), then it is entirely
335
    //    dictionary encoded.
336
2
    if (column_metadata.__isset.encoding_stats) {
337
        // Condition #1 above
338
4
        for (const tparquet::PageEncodingStats& enc_stat : column_metadata.encoding_stats) {
339
4
            if (enc_stat.page_type == tparquet::PageType::DATA_PAGE &&
340
4
                (enc_stat.encoding != tparquet::Encoding::PLAIN_DICTIONARY &&
341
2
                 enc_stat.encoding != tparquet::Encoding::RLE_DICTIONARY) &&
342
4
                enc_stat.count > 0) {
343
0
                return false;
344
0
            }
345
4
        }
346
2
    } else {
347
        // Condition #2 above
348
0
        bool has_dict_encoding = false;
349
0
        bool has_nondict_encoding = false;
350
0
        for (const tparquet::Encoding::type& encoding : column_metadata.encodings) {
351
0
            if (encoding == tparquet::Encoding::PLAIN_DICTIONARY ||
352
0
                encoding == tparquet::Encoding::RLE_DICTIONARY) {
353
0
                has_dict_encoding = true;
354
0
            }
355
356
            // RLE and BIT_PACKED are used for repetition/definition levels
357
0
            if (encoding != tparquet::Encoding::PLAIN_DICTIONARY &&
358
0
                encoding != tparquet::Encoding::RLE_DICTIONARY &&
359
0
                encoding != tparquet::Encoding::RLE && encoding != tparquet::Encoding::BIT_PACKED) {
360
0
                has_nondict_encoding = true;
361
0
                break;
362
0
            }
363
0
        }
364
        // Not entirely dictionary encoded if:
365
        // 1. No dictionary encoding listed
366
        // OR
367
        // 2. Some non-dictionary encoding is listed
368
0
        if (!has_dict_encoding || has_nondict_encoding) {
369
0
            return false;
370
0
        }
371
0
    }
372
373
2
    return true;
374
2
}
375
376
Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_rows,
377
49
                                  bool* batch_eof) {
378
49
    if (_is_row_group_filtered) {
379
2
        *read_rows = 0;
380
2
        *batch_eof = true;
381
2
        return Status::OK();
382
2
    }
383
384
    // Process external table query task that select columns are all from path.
385
47
    if (_read_table_columns.empty()) {
386
3
        bool modify_row_ids = false;
387
3
        RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof, &modify_row_ids));
388
389
3
        RETURN_IF_ERROR(
390
3
                _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns));
391
3
        RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns));
392
393
3
        RETURN_IF_ERROR(_fill_row_id_columns(block, *read_rows, modify_row_ids));
394
3
        RETURN_IF_ERROR(_append_iceberg_rowid_column(block, *read_rows, modify_row_ids));
395
396
3
        Status st = VExprContext::filter_block(_lazy_read_ctx.conjuncts, block, block->columns());
397
3
        *read_rows = block->rows();
398
3
        return st;
399
3
    }
400
44
    if (_lazy_read_ctx.can_lazy_read) {
401
        // call _do_lazy_read recursively when current batch is skipped
402
4
        return _do_lazy_read(block, batch_size, read_rows, batch_eof);
403
40
    } else {
404
40
        FilterMap filter_map;
405
40
        int64_t batch_base_row = _total_read_rows;
406
40
        RETURN_IF_ERROR((_read_column_data(block, _lazy_read_ctx.all_read_columns, batch_size,
407
40
                                           read_rows, batch_eof, filter_map)));
408
40
        RETURN_IF_ERROR(
409
40
                _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns));
410
40
        RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns));
411
40
        RETURN_IF_ERROR(_fill_row_id_columns(block, *read_rows, false));
412
40
        RETURN_IF_ERROR(_append_iceberg_rowid_column(block, *read_rows, false));
413
414
40
#ifndef NDEBUG
415
125
        for (auto col : *block) {
416
125
            col.column->sanity_check();
417
125
            DCHECK(block->rows() == col.column->size())
418
0
                    << absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
419
0
                                        block->rows(), col.column->size(), col.name);
420
125
        }
421
40
#endif
422
423
40
        if (block->rows() == 0) {
424
0
            RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
425
0
            *read_rows = block->rows();
426
0
#ifndef NDEBUG
427
0
            for (auto col : *block) {
428
0
                col.column->sanity_check();
429
0
                DCHECK(block->rows() == col.column->size())
430
0
                        << absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
431
0
                                            block->rows(), col.column->size(), col.name);
432
0
            }
433
0
#endif
434
0
            return Status::OK();
435
0
        }
436
40
        {
437
40
            SCOPED_RAW_TIMER(&_predicate_filter_time);
438
40
            RETURN_IF_ERROR(_build_pos_delete_filter(*read_rows));
439
440
40
            std::vector<uint32_t> columns_to_filter;
441
40
            int column_to_keep = block->columns();
442
40
            columns_to_filter.resize(column_to_keep);
443
165
            for (uint32_t i = 0; i < column_to_keep; ++i) {
444
125
                columns_to_filter[i] = i;
445
125
            }
446
40
            if (!_lazy_read_ctx.conjuncts.empty()) {
447
6
                std::vector<IColumn::Filter*> filters;
448
6
                if (_position_delete_ctx.has_filter) {
449
0
                    filters.push_back(_pos_delete_filter_ptr.get());
450
0
                }
451
6
                IColumn::Filter result_filter(block->rows(), 1);
452
6
                bool can_filter_all = false;
453
454
6
                {
455
6
                    RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
456
6
                            _filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
457
6
                }
458
459
                // Condition cache MISS: mark granules with surviving rows (non-lazy path)
460
6
                if (!can_filter_all) {
461
3
                    _mark_condition_cache_granules(result_filter.data(), block->rows(),
462
3
                                                   batch_base_row);
463
3
                }
464
465
6
                if (can_filter_all) {
466
9
                    for (auto& col : columns_to_filter) {
467
9
                        std::move(*block->get_by_position(col).column).assume_mutable()->clear();
468
9
                    }
469
3
                    Block::erase_useless_column(block, column_to_keep);
470
3
                    RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
471
3
                    return Status::OK();
472
3
                }
473
474
3
                RETURN_IF_CATCH_EXCEPTION(
475
3
                        Block::filter_block_internal(block, columns_to_filter, result_filter));
476
3
                Block::erase_useless_column(block, column_to_keep);
477
34
            } else {
478
34
                RETURN_IF_CATCH_EXCEPTION(
479
34
                        RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter)));
480
34
            }
481
37
            RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
482
37
        }
483
37
#ifndef NDEBUG
484
116
        for (auto col : *block) {
485
116
            col.column->sanity_check();
486
116
            DCHECK(block->rows() == col.column->size())
487
0
                    << absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
488
0
                                        block->rows(), col.column->size(), col.name);
489
116
        }
490
37
#endif
491
37
        *read_rows = block->rows();
492
37
        return Status::OK();
493
37
    }
494
44
}
495
496
// Maps each batch row to its global parquet file position via _read_ranges, then marks
497
// the corresponding condition cache granule as true if the filter indicates the row survived.
498
// batch_seq_start is the number of rows already read sequentially before this batch
499
// (i.e., _total_read_rows before the batch started).
500
void RowGroupReader::_mark_condition_cache_granules(const uint8_t* filter_data, size_t num_rows,
501
6
                                                    int64_t batch_seq_start) {
502
6
    if (!_condition_cache_ctx || _condition_cache_ctx->is_hit) {
503
6
        return;
504
6
    }
505
0
    auto& cache = *_condition_cache_ctx->filter_result;
506
0
    for (size_t i = 0; i < num_rows; i++) {
507
0
        if (filter_data[i]) {
508
            // row-group-relative position of this row
509
0
            int64_t rg_pos = _read_ranges.get_row_index_by_pos(batch_seq_start + i);
510
            // global row number in the parquet file
511
0
            size_t granule = (_current_row_group_idx.first_row + rg_pos) /
512
0
                             ConditionCacheContext::GRANULE_SIZE;
513
0
            size_t cache_idx = granule - _condition_cache_ctx->base_granule;
514
0
            if (cache_idx < cache.size()) {
515
0
                cache[cache_idx] = true;
516
0
            }
517
0
        }
518
0
    }
519
0
}
520
521
// On condition cache HIT, removes row ranges whose granules have no surviving rows from
522
// _read_ranges BEFORE column readers are created. This makes ParquetColumnReader skip I/O
523
// entirely for false-granule rows — both predicate and lazy columns — via its existing
524
// page/row-skipping infrastructure.
525
37
void RowGroupReader::_filter_read_ranges_by_condition_cache() {
526
37
    if (!_condition_cache_ctx || !_condition_cache_ctx->is_hit) {
527
37
        return;
528
37
    }
529
0
    auto& filter_result = *_condition_cache_ctx->filter_result;
530
0
    if (filter_result.empty()) {
531
0
        return;
532
0
    }
533
534
0
    auto old_row_count = _read_ranges.count();
535
0
    _read_ranges =
536
0
            filter_ranges_by_cache(_read_ranges, filter_result, _current_row_group_idx.first_row,
537
0
                                   _condition_cache_ctx->base_granule);
538
0
    _is_row_group_filtered = _read_ranges.is_empty();
539
0
    _condition_cache_filtered_rows += old_row_count - _read_ranges.count();
540
0
}
541
542
// Filters read_ranges by removing rows whose cache granule is false.
543
//
544
// Cache index i maps to global granule (base_granule + i), which covers global file
545
// rows [(base_granule+i)*GS, (base_granule+i+1)*GS). Since read_ranges uses
546
// row-group-relative indices and first_row is the global position of the row group's
547
// first row, global granule g maps to row-group-relative range:
548
//   [max(0, g*GS - first_row), max(0, (g+1)*GS - first_row))
549
//
550
// We build a RowRanges of all false-granule regions (in row-group-relative coordinates),
551
// then subtract from read_ranges via ranges_exception.
552
//
553
// Granules beyond cache.size() are kept conservatively (assumed true).
554
//
555
// When base_granule > 0, the cache only covers granules starting from base_granule.
556
// This happens when a Parquet file is split across multiple scan ranges and this reader
557
// only processes row groups starting at a non-zero offset in the file.
558
RowRanges RowGroupReader::filter_ranges_by_cache(const RowRanges& read_ranges,
559
                                                 const std::vector<bool>& cache, int64_t first_row,
560
21
                                                 int64_t base_granule) {
561
21
    constexpr int64_t GS = ConditionCacheContext::GRANULE_SIZE;
562
21
    RowRanges filtered_ranges;
563
564
138
    for (size_t i = 0; i < cache.size(); i++) {
565
117
        if (!cache[i]) {
566
64
            int64_t global_granule = base_granule + static_cast<int64_t>(i);
567
64
            int64_t rg_from = std::max(static_cast<int64_t>(0), global_granule * GS - first_row);
568
64
            int64_t rg_to =
569
64
                    std::max(static_cast<int64_t>(0), (global_granule + 1) * GS - first_row);
570
64
            if (rg_from < rg_to) {
571
16
                filtered_ranges.add(RowRange(rg_from, rg_to));
572
16
            }
573
64
        }
574
117
    }
575
576
21
    RowRanges result;
577
21
    RowRanges::ranges_exception(read_ranges, filtered_ranges, &result);
578
21
    return result;
579
21
}
580
581
Status RowGroupReader::_read_column_data(Block* block,
582
                                         const std::vector<std::string>& table_columns,
583
                                         size_t batch_size, size_t* read_rows, bool* batch_eof,
584
49
                                         FilterMap& filter_map) {
585
49
    size_t batch_read_rows = 0;
586
49
    bool has_eof = false;
587
123
    for (auto& read_col_name : table_columns) {
588
123
        auto& column_with_type_and_name =
589
123
                block->safe_get_by_position((*_col_name_to_block_idx)[read_col_name]);
590
123
        auto& column_ptr = column_with_type_and_name.column;
591
123
        auto& column_type = column_with_type_and_name.type;
592
123
        bool is_dict_filter = false;
593
123
        for (auto& _dict_filter_col : _dict_filter_cols) {
594
0
            if (_dict_filter_col.first == read_col_name) {
595
0
                MutableColumnPtr dict_column = ColumnInt32::create();
596
0
                if (!_col_name_to_block_idx->contains(read_col_name)) {
597
0
                    return Status::InternalError(
598
0
                            "Wrong read column '{}' in parquet file, block: {}", read_col_name,
599
0
                            block->dump_structure());
600
0
                }
601
0
                if (column_type->is_nullable()) {
602
0
                    block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type =
603
0
                            std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
604
0
                    block->replace_by_position(
605
0
                            (*_col_name_to_block_idx)[read_col_name],
606
0
                            ColumnNullable::create(std::move(dict_column),
607
0
                                                   ColumnUInt8::create(dict_column->size(), 0)));
608
0
                } else {
609
0
                    block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type =
610
0
                            std::make_shared<DataTypeInt32>();
611
0
                    block->replace_by_position((*_col_name_to_block_idx)[read_col_name],
612
0
                                               std::move(dict_column));
613
0
                }
614
0
                is_dict_filter = true;
615
0
                break;
616
0
            }
617
0
        }
618
619
123
        size_t col_read_rows = 0;
620
123
        bool col_eof = false;
621
        // Should reset _filter_map_index to 0 when reading next column.
622
        //        select_vector.reset();
623
123
        _column_readers[read_col_name]->reset_filter_map_index();
624
309
        while (!col_eof && col_read_rows < batch_size) {
625
186
            size_t loop_rows = 0;
626
186
            RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data(
627
186
                    column_ptr, column_type, _table_info_node_ptr->get_children_node(read_col_name),
628
186
                    filter_map, batch_size - col_read_rows, &loop_rows, &col_eof, is_dict_filter));
629
186
            VLOG_DEBUG << "[RowGroupReader] column '" << read_col_name
630
0
                       << "' loop_rows=" << loop_rows << " col_read_rows_so_far=" << col_read_rows
631
0
                       << std::endl;
632
186
            col_read_rows += loop_rows;
633
186
        }
634
123
        VLOG_DEBUG << "[RowGroupReader] column '" << read_col_name
635
0
                   << "' read_rows=" << col_read_rows << std::endl;
636
123
        if (batch_read_rows > 0 && batch_read_rows != col_read_rows) {
637
0
            LOG(WARNING) << "[RowGroupReader] Mismatched read rows among parquet columns. "
638
0
                            "previous_batch_read_rows="
639
0
                         << batch_read_rows << ", current_column='" << read_col_name
640
0
                         << "', current_col_read_rows=" << col_read_rows;
641
0
            return Status::Corruption("Can't read the same number of rows among parquet columns");
642
0
        }
643
123
        batch_read_rows = col_read_rows;
644
645
123
#ifndef NDEBUG
646
123
        column_ptr->sanity_check();
647
123
#endif
648
123
        if (col_eof) {
649
101
            has_eof = true;
650
101
        }
651
123
    }
652
653
49
    *read_rows = batch_read_rows;
654
49
    *batch_eof = has_eof;
655
656
49
    return Status::OK();
657
49
}
658
659
Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* read_rows,
660
4
                                     bool* batch_eof) {
661
4
    std::unique_ptr<FilterMap> filter_map_ptr = nullptr;
662
4
    size_t pre_read_rows;
663
4
    bool pre_eof;
664
4
    std::vector<uint32_t> columns_to_filter;
665
4
    uint32_t origin_column_num = block->columns();
666
4
    columns_to_filter.resize(origin_column_num);
667
16
    for (uint32_t i = 0; i < origin_column_num; ++i) {
668
12
        columns_to_filter[i] = i;
669
12
    }
670
4
    IColumn::Filter result_filter;
671
4
    size_t pre_raw_read_rows = 0;
672
6
    while (!_state->is_cancelled()) {
673
        // read predicate columns
674
6
        pre_read_rows = 0;
675
6
        pre_eof = false;
676
6
        FilterMap filter_map;
677
6
        int64_t batch_base_row = _total_read_rows;
678
6
        RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns.first, batch_size,
679
6
                                          &pre_read_rows, &pre_eof, filter_map));
680
6
        if (pre_read_rows == 0) {
681
0
            DCHECK_EQ(pre_eof, true);
682
0
            break;
683
0
        }
684
6
        pre_raw_read_rows += pre_read_rows;
685
686
6
        RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows,
687
6
                                                _lazy_read_ctx.predicate_partition_columns));
688
6
        RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows,
689
6
                                              _lazy_read_ctx.predicate_missing_columns));
690
6
        RETURN_IF_ERROR(_fill_row_id_columns(block, pre_read_rows, false));
691
6
        RETURN_IF_ERROR(_append_iceberg_rowid_column(block, pre_read_rows, false));
692
693
6
        RETURN_IF_ERROR(_build_pos_delete_filter(pre_read_rows));
694
695
6
#ifndef NDEBUG
696
18
        for (auto col : *block) {
697
18
            if (col.column->size() == 0) { // lazy read column.
698
6
                continue;
699
6
            }
700
12
            col.column->sanity_check();
701
12
            DCHECK(pre_read_rows == col.column->size())
702
0
                    << absl::Substitute("pre_read_rows = $0 , column rows = $1, col name = $2",
703
0
                                        pre_read_rows, col.column->size(), col.name);
704
12
        }
705
6
#endif
706
707
6
        bool can_filter_all = false;
708
6
        bool resize_first_column = _lazy_read_ctx.resize_first_column;
709
6
        if (resize_first_column && _iceberg_rowid_params.enabled) {
710
0
            int row_id_idx = block->get_position_by_name(doris::BeConsts::ICEBERG_ROWID_COL);
711
0
            if (row_id_idx == 0) {
712
0
                resize_first_column = false;
713
0
            }
714
0
        }
715
6
        {
716
6
            SCOPED_RAW_TIMER(&_predicate_filter_time);
717
718
            // generate filter vector
719
6
            if (resize_first_column) {
720
                // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
721
                // The following process may be tricky and time-consuming, but we have no other way.
722
6
                block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows);
723
6
            }
724
6
            result_filter.assign(pre_read_rows, static_cast<unsigned char>(1));
725
6
            std::vector<IColumn::Filter*> filters;
726
6
            if (_position_delete_ctx.has_filter) {
727
0
                filters.push_back(_pos_delete_filter_ptr.get());
728
0
            }
729
730
6
            VExprContextSPtrs filter_contexts;
731
12
            for (auto& conjunct : _filter_conjuncts) {
732
12
                filter_contexts.emplace_back(conjunct);
733
12
            }
734
735
6
            {
736
6
                RETURN_IF_ERROR(VExprContext::execute_conjuncts(filter_contexts, &filters, block,
737
6
                                                                &result_filter, &can_filter_all));
738
6
            }
739
740
            // Condition cache MISS: mark granules with surviving rows
741
6
            if (!can_filter_all) {
742
3
                _mark_condition_cache_granules(result_filter.data(), pre_read_rows, batch_base_row);
743
3
            }
744
745
6
            if (resize_first_column) {
746
                // We have to clean the first column to insert right data.
747
6
                block->get_by_position(0).column->assume_mutable()->clear();
748
6
            }
749
6
        }
750
751
0
        const uint8_t* __restrict filter_map_data = result_filter.data();
752
6
        filter_map_ptr = std::make_unique<FilterMap>();
753
6
        RETURN_IF_ERROR(filter_map_ptr->init(filter_map_data, pre_read_rows, can_filter_all));
754
6
        if (filter_map_ptr->filter_all()) {
755
3
            {
756
3
                SCOPED_RAW_TIMER(&_predicate_filter_time);
757
3
                for (const auto& col : _lazy_read_ctx.predicate_columns.first) {
758
                    // clean block to read predicate columns
759
3
                    block->get_by_position((*_col_name_to_block_idx)[col])
760
3
                            .column->assume_mutable()
761
3
                            ->clear();
762
3
                }
763
3
                for (const auto& col : _lazy_read_ctx.predicate_partition_columns) {
764
3
                    block->get_by_position((*_col_name_to_block_idx)[col.first])
765
3
                            .column->assume_mutable()
766
3
                            ->clear();
767
3
                }
768
3
                for (const auto& col : _lazy_read_ctx.predicate_missing_columns) {
769
0
                    block->get_by_position((*_col_name_to_block_idx)[col.first])
770
0
                            .column->assume_mutable()
771
0
                            ->clear();
772
0
                }
773
3
                if (_row_id_column_iterator_pair.first != nullptr) {
774
0
                    block->get_by_position(_row_id_column_iterator_pair.second)
775
0
                            .column->assume_mutable()
776
0
                            ->clear();
777
0
                }
778
3
                if (_iceberg_rowid_params.enabled) {
779
0
                    int row_id_idx =
780
0
                            block->get_position_by_name(doris::BeConsts::ICEBERG_ROWID_COL);
781
0
                    if (row_id_idx >= 0) {
782
0
                        block->get_by_position(static_cast<size_t>(row_id_idx))
783
0
                                .column->assume_mutable()
784
0
                                ->clear();
785
0
                    }
786
0
                }
787
3
                Block::erase_useless_column(block, origin_column_num);
788
3
            }
789
790
3
            if (!pre_eof) {
791
                // If continuous batches are skipped, we can cache them to skip a whole page
792
2
                _cached_filtered_rows += pre_read_rows;
793
2
                if (pre_raw_read_rows >= config::doris_scanner_row_num) {
794
0
                    *read_rows = 0;
795
0
                    RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
796
0
                    return Status::OK();
797
0
                }
798
2
            } else { // pre_eof
799
                // If filter_map_ptr->filter_all() and pre_eof, we can skip whole row group.
800
1
                *read_rows = 0;
801
1
                *batch_eof = true;
802
1
                _lazy_read_filtered_rows += (pre_read_rows + _cached_filtered_rows);
803
1
                RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
804
1
                return Status::OK();
805
1
            }
806
3
        } else {
807
3
            break;
808
3
        }
809
6
    }
810
3
    if (_state->is_cancelled()) {
811
0
        return Status::Cancelled("cancelled");
812
0
    }
813
814
3
    if (filter_map_ptr == nullptr) {
815
0
        DCHECK_EQ(pre_read_rows + _cached_filtered_rows, 0);
816
0
        *read_rows = 0;
817
0
        *batch_eof = true;
818
0
        return Status::OK();
819
0
    }
820
821
3
    FilterMap& filter_map = *filter_map_ptr;
822
3
    DorisUniqueBufferPtr<uint8_t> rebuild_filter_map = nullptr;
823
3
    if (_cached_filtered_rows != 0) {
824
0
        RETURN_IF_ERROR(_rebuild_filter_map(filter_map, rebuild_filter_map, pre_read_rows));
825
0
        pre_read_rows += _cached_filtered_rows;
826
0
        _cached_filtered_rows = 0;
827
0
    }
828
829
    // lazy read columns
830
3
    size_t lazy_read_rows;
831
3
    bool lazy_eof;
832
3
    RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.lazy_read_columns, pre_read_rows,
833
3
                                      &lazy_read_rows, &lazy_eof, filter_map));
834
835
3
    if (pre_read_rows != lazy_read_rows) {
836
0
        return Status::Corruption("Can't read the same number of rows when doing lazy read");
837
0
    }
838
    // pre_eof ^ lazy_eof
839
    // we set pre_read_rows as batch_size for lazy read columns, so pre_eof != lazy_eof
840
841
    // filter data in predicate columns, and remove filter column
842
3
    {
843
3
        SCOPED_RAW_TIMER(&_predicate_filter_time);
844
3
        if (filter_map.has_filter()) {
845
0
            std::vector<uint32_t> predicate_columns = _lazy_read_ctx.all_predicate_col_ids;
846
0
            if (_iceberg_rowid_params.enabled) {
847
0
                int row_id_idx = block->get_position_by_name(doris::BeConsts::ICEBERG_ROWID_COL);
848
0
                if (row_id_idx >= 0 &&
849
0
                    std::find(predicate_columns.begin(), predicate_columns.end(),
850
0
                              static_cast<uint32_t>(row_id_idx)) == predicate_columns.end()) {
851
0
                    predicate_columns.push_back(static_cast<uint32_t>(row_id_idx));
852
0
                }
853
0
            }
854
0
            RETURN_IF_CATCH_EXCEPTION(
855
0
                    Block::filter_block_internal(block, predicate_columns, result_filter));
856
0
            Block::erase_useless_column(block, origin_column_num);
857
858
3
        } else {
859
3
            Block::erase_useless_column(block, origin_column_num);
860
3
        }
861
3
    }
862
863
3
    RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
864
865
3
    size_t column_num = block->columns();
866
3
    size_t column_size = 0;
867
12
    for (int i = 0; i < column_num; ++i) {
868
9
        size_t cz = block->get_by_position(i).column->size();
869
9
        if (column_size != 0 && cz != 0) {
870
6
            DCHECK_EQ(column_size, cz);
871
6
        }
872
9
        if (cz != 0) {
873
9
            column_size = cz;
874
9
        }
875
9
    }
876
3
    _lazy_read_filtered_rows += pre_read_rows - column_size;
877
3
    *read_rows = column_size;
878
879
3
    *batch_eof = pre_eof;
880
3
    RETURN_IF_ERROR(_fill_partition_columns(block, column_size, _lazy_read_ctx.partition_columns));
881
3
    RETURN_IF_ERROR(_fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns));
882
3
#ifndef NDEBUG
883
9
    for (auto col : *block) {
884
9
        col.column->sanity_check();
885
9
        DCHECK(block->rows() == col.column->size())
886
0
                << absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
887
0
                                    block->rows(), col.column->size(), col.name);
888
9
    }
889
3
#endif
890
3
    return Status::OK();
891
3
}
892
893
Status RowGroupReader::_rebuild_filter_map(FilterMap& filter_map,
894
                                           DorisUniqueBufferPtr<uint8_t>& filter_map_data,
895
0
                                           size_t pre_read_rows) const {
896
0
    if (_cached_filtered_rows == 0) {
897
0
        return Status::OK();
898
0
    }
899
0
    size_t total_rows = _cached_filtered_rows + pre_read_rows;
900
0
    if (filter_map.filter_all()) {
901
0
        RETURN_IF_ERROR(filter_map.init(nullptr, total_rows, true));
902
0
        return Status::OK();
903
0
    }
904
905
0
    filter_map_data = make_unique_buffer<uint8_t>(total_rows);
906
0
    auto* map = filter_map_data.get();
907
0
    for (size_t i = 0; i < _cached_filtered_rows; ++i) {
908
0
        map[i] = 0;
909
0
    }
910
0
    const uint8_t* old_map = filter_map.filter_map_data();
911
0
    if (old_map == nullptr) {
912
        // select_vector.filter_all() == true is already built.
913
0
        for (size_t i = _cached_filtered_rows; i < total_rows; ++i) {
914
0
            map[i] = 1;
915
0
        }
916
0
    } else {
917
0
        memcpy(map + _cached_filtered_rows, old_map, pre_read_rows);
918
0
    }
919
0
    RETURN_IF_ERROR(filter_map.init(map, total_rows, false));
920
0
    return Status::OK();
921
0
}
922
923
Status RowGroupReader::_fill_partition_columns(
924
        Block* block, size_t rows,
925
        const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
926
52
                partition_columns) {
927
52
    DataTypeSerDe::FormatOptions _text_formatOptions;
928
52
    for (const auto& kv : partition_columns) {
929
15
        auto doris_column = block->get_by_position((*_col_name_to_block_idx)[kv.first]).column;
930
        // obtained from block*, it is a mutable object.
931
15
        auto* col_ptr = const_cast<IColumn*>(doris_column.get());
932
15
        const auto& [value, slot_desc] = kv.second;
933
15
        auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
934
15
        Slice slice(value.data(), value.size());
935
15
        uint64_t num_deserialized = 0;
936
        // Be careful when reading empty rows from parquet row groups.
937
15
        if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows,
938
15
                                                            &num_deserialized,
939
15
                                                            _text_formatOptions) != Status::OK()) {
940
0
            return Status::InternalError("Failed to fill partition column: {}={}",
941
0
                                         slot_desc->col_name(), value);
942
0
        }
943
15
        if (num_deserialized != rows) {
944
0
            return Status::InternalError(
945
0
                    "Failed to fill partition column: {}={} ."
946
0
                    "Number of rows expected to be written : {}, number of rows actually written : "
947
0
                    "{}",
948
0
                    slot_desc->col_name(), value, num_deserialized, rows);
949
0
        }
950
15
    }
951
52
    return Status::OK();
952
52
}
953
954
Status RowGroupReader::_fill_missing_columns(
955
        Block* block, size_t rows,
956
52
        const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
957
52
    for (const auto& kv : missing_columns) {
958
0
        if (!_col_name_to_block_idx->contains(kv.first)) {
959
0
            return Status::InternalError("Missing column: {} not found in block {}", kv.first,
960
0
                                         block->dump_structure());
961
0
        }
962
0
        if (kv.second == nullptr) {
963
            // no default column, fill with null
964
0
            auto mutable_column = block->get_by_position((*_col_name_to_block_idx)[kv.first])
965
0
                                          .column->assume_mutable();
966
0
            auto* nullable_column = assert_cast<ColumnNullable*>(mutable_column.get());
967
0
            nullable_column->insert_many_defaults(rows);
968
0
        } else {
969
            // fill with default value
970
0
            const auto& ctx = kv.second;
971
0
            ColumnPtr result_column_ptr;
972
            // PT1 => dest primitive type
973
0
            RETURN_IF_ERROR(ctx->execute(block, result_column_ptr));
974
0
            if (result_column_ptr->use_count() == 1) {
975
                // call resize because the first column of _src_block_ptr may not be filled by reader,
976
                // so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
977
                // has only one row.
978
0
                auto mutable_column = result_column_ptr->assume_mutable();
979
0
                mutable_column->resize(rows);
980
                // result_column_ptr maybe a ColumnConst, convert it to a normal column
981
0
                result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
982
0
                auto origin_column_type =
983
0
                        block->get_by_position((*_col_name_to_block_idx)[kv.first]).type;
984
0
                bool is_nullable = origin_column_type->is_nullable();
985
0
                block->replace_by_position(
986
0
                        (*_col_name_to_block_idx)[kv.first],
987
0
                        is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
988
0
            }
989
0
        }
990
0
    }
991
52
    return Status::OK();
992
52
}
993
994
Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof,
995
3
                                         bool* modify_row_ids) {
996
3
    *modify_row_ids = false;
997
3
    if (_position_delete_ctx.has_filter) {
998
0
        int64_t start_row_id = _position_delete_ctx.current_row_id;
999
0
        int64_t end_row_id = std::min(_position_delete_ctx.current_row_id + (int64_t)batch_size,
1000
0
                                      _position_delete_ctx.last_row_id);
1001
0
        int64_t num_delete_rows = 0;
1002
0
        auto before_index = _position_delete_ctx.index;
1003
0
        while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
1004
0
            const int64_t& delete_row_id =
1005
0
                    _position_delete_ctx.delete_rows[_position_delete_ctx.index];
1006
0
            if (delete_row_id < start_row_id) {
1007
0
                _position_delete_ctx.index++;
1008
0
                before_index = _position_delete_ctx.index;
1009
0
            } else if (delete_row_id < end_row_id) {
1010
0
                num_delete_rows++;
1011
0
                _position_delete_ctx.index++;
1012
0
            } else { // delete_row_id >= end_row_id
1013
0
                break;
1014
0
            }
1015
0
        }
1016
0
        *read_rows = end_row_id - start_row_id - num_delete_rows;
1017
0
        _position_delete_ctx.current_row_id = end_row_id;
1018
0
        *batch_eof = _position_delete_ctx.current_row_id == _position_delete_ctx.last_row_id;
1019
1020
0
        if (_row_id_column_iterator_pair.first != nullptr || _iceberg_rowid_params.enabled ||
1021
0
            (_row_lineage_columns != nullptr && _row_lineage_columns->need_row_ids())) {
1022
0
            *modify_row_ids = true;
1023
0
            _current_batch_row_ids.clear();
1024
0
            _current_batch_row_ids.resize(*read_rows);
1025
0
            size_t idx = 0;
1026
0
            for (auto id = start_row_id; id < end_row_id; id++) {
1027
0
                if (before_index < _position_delete_ctx.index &&
1028
0
                    id == _position_delete_ctx.delete_rows[before_index]) {
1029
0
                    before_index++;
1030
0
                    continue;
1031
0
                }
1032
0
                _current_batch_row_ids[idx++] = (rowid_t)id;
1033
0
            }
1034
0
        }
1035
3
    } else {
1036
3
        if (batch_size < _remaining_rows) {
1037
2
            *read_rows = batch_size;
1038
2
            _remaining_rows -= batch_size;
1039
2
            *batch_eof = false;
1040
2
        } else {
1041
1
            *read_rows = _remaining_rows;
1042
1
            _remaining_rows = 0;
1043
1
            *batch_eof = true;
1044
1
        }
1045
3
        if (_iceberg_rowid_params.enabled) {
1046
0
            *modify_row_ids = true;
1047
0
            RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
1048
0
        }
1049
3
    }
1050
3
    _total_read_rows += *read_rows;
1051
3
    return Status::OK();
1052
3
}
1053
1054
5
Status RowGroupReader::_get_current_batch_row_id(size_t read_rows) {
1055
5
    _current_batch_row_ids.clear();
1056
5
    _current_batch_row_ids.resize(read_rows);
1057
1058
5
    int64_t idx = 0;
1059
5
    int64_t read_range_rows = 0;
1060
19
    for (size_t range_idx = 0; range_idx < _read_ranges.range_size(); range_idx++) {
1061
14
        auto range = _read_ranges.get_range(range_idx);
1062
14
        if (read_rows == 0) {
1063
0
            break;
1064
0
        }
1065
14
        if (read_range_rows + (range.to() - range.from()) > _total_read_rows) {
1066
14
            int64_t fi =
1067
14
                    std::max(_total_read_rows, read_range_rows) - read_range_rows + range.from();
1068
14
            size_t len = std::min(read_rows, (size_t)(std::max(range.to(), fi) - fi));
1069
1070
14
            read_rows -= len;
1071
1072
28
            for (auto i = 0; i < len; i++) {
1073
14
                _current_batch_row_ids[idx++] =
1074
14
                        (rowid_t)(fi + i + _current_row_group_idx.first_row);
1075
14
            }
1076
14
        }
1077
14
        read_range_rows += range.to() - range.from();
1078
14
    }
1079
5
    return Status::OK();
1080
5
}
1081
1082
Status RowGroupReader::_fill_row_id_columns(Block* block, size_t read_rows,
1083
49
                                            bool is_current_row_ids) {
1084
49
    const bool need_row_ids =
1085
49
            _row_id_column_iterator_pair.first != nullptr ||
1086
49
            (_row_lineage_columns != nullptr && _row_lineage_columns->need_row_ids());
1087
49
    if (need_row_ids && !is_current_row_ids) {
1088
5
        RETURN_IF_ERROR(_get_current_batch_row_id(read_rows));
1089
5
    }
1090
49
    if (_row_id_column_iterator_pair.first != nullptr) {
1091
5
        auto col = block->get_by_position(_row_id_column_iterator_pair.second)
1092
5
                           .column->assume_mutable();
1093
5
        RETURN_IF_ERROR(_row_id_column_iterator_pair.first->read_by_rowids(
1094
5
                _current_batch_row_ids.data(), _current_batch_row_ids.size(), col));
1095
5
    }
1096
1097
49
    if (_row_lineage_columns != nullptr && _row_lineage_columns->need_row_ids() &&
1098
49
        _row_lineage_columns->first_row_id >= 0) {
1099
0
        auto col = block->get_by_position(_row_lineage_columns->row_id_column_idx)
1100
0
                           .column->assume_mutable();
1101
0
        auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
1102
0
        auto& null_map = nullable_column->get_null_map_data();
1103
0
        auto& data =
1104
0
                assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
1105
0
        for (size_t i = 0; i < read_rows; ++i) {
1106
0
            if (null_map[i] != 0) {
1107
0
                null_map[i] = 0;
1108
0
                data[i] = _row_lineage_columns->first_row_id +
1109
0
                          static_cast<int64_t>(_current_batch_row_ids[i]);
1110
0
            }
1111
0
        }
1112
0
    }
1113
1114
49
    if (_row_lineage_columns != nullptr &&
1115
49
        _row_lineage_columns->has_last_updated_sequence_number_column() &&
1116
49
        _row_lineage_columns->last_updated_sequence_number >= 0) {
1117
0
        auto col = block->get_by_position(
1118
0
                                _row_lineage_columns->last_updated_sequence_number_column_idx)
1119
0
                           .column->assume_mutable();
1120
0
        auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
1121
0
        auto& null_map = nullable_column->get_null_map_data();
1122
0
        auto& data =
1123
0
                assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
1124
0
        for (size_t i = 0; i < read_rows; ++i) {
1125
0
            if (null_map[i] != 0) {
1126
0
                null_map[i] = 0;
1127
0
                data[i] = _row_lineage_columns->last_updated_sequence_number;
1128
0
            }
1129
0
        }
1130
0
    }
1131
1132
49
    return Status::OK();
1133
49
}
1134
1135
Status RowGroupReader::_append_iceberg_rowid_column(Block* block, size_t read_rows,
1136
49
                                                    bool is_current_row_ids) {
1137
49
    if (!_iceberg_rowid_params.enabled) {
1138
49
        return Status::OK();
1139
49
    }
1140
0
    if (!is_current_row_ids) {
1141
0
        RETURN_IF_ERROR(_get_current_batch_row_id(read_rows));
1142
0
    }
1143
1144
0
    int row_id_idx = block->get_position_by_name(doris::BeConsts::ICEBERG_ROWID_COL);
1145
0
    if (row_id_idx >= 0) {
1146
0
        auto& col_with_type = block->get_by_position(static_cast<size_t>(row_id_idx));
1147
0
        MutableColumnPtr row_id_column;
1148
0
        RETURN_IF_ERROR(build_iceberg_rowid_column(
1149
0
                col_with_type.type, _iceberg_rowid_params.file_path, _current_batch_row_ids,
1150
0
                _iceberg_rowid_params.partition_spec_id, _iceberg_rowid_params.partition_data_json,
1151
0
                &row_id_column));
1152
0
        col_with_type.column = std::move(row_id_column);
1153
0
    } else {
1154
0
        DataTypes field_types;
1155
0
        field_types.push_back(std::make_shared<DataTypeString>());
1156
0
        field_types.push_back(std::make_shared<DataTypeInt64>());
1157
0
        field_types.push_back(std::make_shared<DataTypeInt32>());
1158
0
        field_types.push_back(std::make_shared<DataTypeString>());
1159
1160
0
        std::vector<std::string> field_names = {"file_path", "row_position", "partition_spec_id",
1161
0
                                                "partition_data"};
1162
1163
0
        auto row_id_type = std::make_shared<DataTypeStruct>(field_types, field_names);
1164
0
        MutableColumnPtr row_id_column;
1165
0
        RETURN_IF_ERROR(build_iceberg_rowid_column(
1166
0
                row_id_type, _iceberg_rowid_params.file_path, _current_batch_row_ids,
1167
0
                _iceberg_rowid_params.partition_spec_id, _iceberg_rowid_params.partition_data_json,
1168
0
                &row_id_column));
1169
0
        int insert_pos = _iceberg_rowid_params.row_id_column_pos;
1170
0
        if (insert_pos < 0 || insert_pos > static_cast<int>(block->columns())) {
1171
0
            insert_pos = static_cast<int>(block->columns());
1172
0
        }
1173
0
        block->insert(static_cast<size_t>(insert_pos),
1174
0
                      ColumnWithTypeAndName(std::move(row_id_column), row_id_type,
1175
0
                                            doris::BeConsts::ICEBERG_ROWID_COL));
1176
0
    }
1177
1178
0
    if (_col_name_to_block_idx != nullptr) {
1179
0
        *_col_name_to_block_idx = block->get_name_to_pos_map();
1180
0
    }
1181
1182
0
    return Status::OK();
1183
0
}
1184
1185
46
Status RowGroupReader::_build_pos_delete_filter(size_t read_rows) {
1186
46
    if (!_position_delete_ctx.has_filter) {
1187
46
        _pos_delete_filter_ptr.reset(nullptr);
1188
46
        _total_read_rows += read_rows;
1189
46
        return Status::OK();
1190
46
    }
1191
0
    _pos_delete_filter_ptr.reset(new IColumn::Filter(read_rows, 1));
1192
0
    auto* __restrict _pos_delete_filter_data = _pos_delete_filter_ptr->data();
1193
0
    while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
1194
0
        const int64_t delete_row_index_in_row_group =
1195
0
                _position_delete_ctx.delete_rows[_position_delete_ctx.index] -
1196
0
                _position_delete_ctx.first_row_id;
1197
0
        int64_t read_range_rows = 0;
1198
0
        size_t remaining_read_rows = _total_read_rows + read_rows;
1199
0
        for (size_t range_idx = 0; range_idx < _read_ranges.range_size(); range_idx++) {
1200
0
            auto range = _read_ranges.get_range(range_idx);
1201
0
            if (delete_row_index_in_row_group < range.from()) {
1202
0
                ++_position_delete_ctx.index;
1203
0
                break;
1204
0
            } else if (delete_row_index_in_row_group < range.to()) {
1205
0
                int64_t index = (delete_row_index_in_row_group - range.from()) + read_range_rows -
1206
0
                                _total_read_rows;
1207
0
                if (index > read_rows - 1) {
1208
0
                    _total_read_rows += read_rows;
1209
0
                    return Status::OK();
1210
0
                }
1211
0
                _pos_delete_filter_data[index] = 0;
1212
0
                ++_position_delete_ctx.index;
1213
0
                break;
1214
0
            } else { // delete_row >= range.last_row
1215
0
            }
1216
1217
0
            int64_t range_size = range.to() - range.from();
1218
            // Don't search next range when there is no remaining_read_rows.
1219
0
            if (remaining_read_rows <= range_size) {
1220
0
                _total_read_rows += read_rows;
1221
0
                return Status::OK();
1222
0
            } else {
1223
0
                remaining_read_rows -= range_size;
1224
0
                read_range_rows += range_size;
1225
0
            }
1226
0
        }
1227
0
    }
1228
0
    _total_read_rows += read_rows;
1229
0
    return Status::OK();
1230
0
}
1231
1232
// need exception safety
1233
Status RowGroupReader::_filter_block(Block* block, int column_to_keep,
1234
34
                                     const std::vector<uint32_t>& columns_to_filter) {
1235
34
    if (_pos_delete_filter_ptr) {
1236
0
        RETURN_IF_CATCH_EXCEPTION(
1237
0
                Block::filter_block_internal(block, columns_to_filter, (*_pos_delete_filter_ptr)));
1238
0
    }
1239
34
    Block::erase_useless_column(block, column_to_keep);
1240
1241
34
    return Status::OK();
1242
34
}
1243
1244
6
Status RowGroupReader::_rewrite_dict_predicates() {
1245
6
    SCOPED_RAW_TIMER(&_dict_filter_rewrite_time);
1246
6
    for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) {
1247
2
        std::string& dict_filter_col_name = it->first;
1248
2
        int slot_id = it->second;
1249
        // 1. Get dictionary values to a string column.
1250
2
        MutableColumnPtr dict_value_column = ColumnString::create();
1251
2
        bool has_dict = false;
1252
2
        RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->read_dict_values_to_column(
1253
2
                dict_value_column, &has_dict));
1254
2
#ifndef NDEBUG
1255
2
        dict_value_column->sanity_check();
1256
2
#endif
1257
2
        size_t dict_value_column_size = dict_value_column->size();
1258
2
        DCHECK(has_dict);
1259
        // 2. Build a temp block from the dict string column, then execute conjuncts and filter block.
1260
        // 2.1 Build a temp block from the dict string column to match the conjuncts executing.
1261
2
        Block temp_block;
1262
2
        int dict_pos = -1;
1263
2
        int index = 0;
1264
4
        for (const auto slot_desc : _tuple_descriptor->slots()) {
1265
4
            if (slot_desc->id() == slot_id) {
1266
2
                auto data_type = slot_desc->get_data_type_ptr();
1267
2
                if (data_type->is_nullable()) {
1268
0
                    temp_block.insert(
1269
0
                            {ColumnNullable::create(
1270
0
                                     std::move(
1271
0
                                             dict_value_column), // NOLINT(bugprone-use-after-move)
1272
0
                                     ColumnUInt8::create(dict_value_column_size, 0)),
1273
0
                             std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()),
1274
0
                             ""});
1275
2
                } else {
1276
2
                    temp_block.insert(
1277
2
                            {std::move(dict_value_column), std::make_shared<DataTypeString>(), ""});
1278
2
                }
1279
2
                dict_pos = index;
1280
1281
2
            } else {
1282
2
                temp_block.insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
1283
2
                                                        slot_desc->get_data_type_ptr(),
1284
2
                                                        slot_desc->col_name()));
1285
2
            }
1286
4
            ++index;
1287
4
        }
1288
1289
        // 2.2 Execute conjuncts.
1290
2
        VExprContextSPtrs ctxs;
1291
2
        auto iter = _slot_id_to_filter_conjuncts->find(slot_id);
1292
2
        if (iter != _slot_id_to_filter_conjuncts->end()) {
1293
2
            for (auto& ctx : iter->second) {
1294
2
                ctxs.push_back(ctx);
1295
2
            }
1296
2
        } else {
1297
0
            std::stringstream msg;
1298
0
            msg << "_slot_id_to_filter_conjuncts: slot_id [" << slot_id << "] not found";
1299
0
            return Status::NotFound(msg.str());
1300
0
        }
1301
1302
2
        if (dict_pos != 0) {
1303
            // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
1304
            // The following process may be tricky and time-consuming, but we have no other way.
1305
0
            temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
1306
0
        }
1307
2
        IColumn::Filter result_filter(temp_block.rows(), 1);
1308
2
        bool can_filter_all;
1309
2
        {
1310
2
            RETURN_IF_ERROR(VExprContext::execute_conjuncts(ctxs, nullptr, &temp_block,
1311
2
                                                            &result_filter, &can_filter_all));
1312
2
        }
1313
2
        if (dict_pos != 0) {
1314
            // We have to clean the first column to insert right data.
1315
0
            temp_block.get_by_position(0).column->assume_mutable()->clear();
1316
0
        }
1317
1318
        // If can_filter_all = true, can filter this row group.
1319
2
        if (can_filter_all) {
1320
2
            _is_row_group_filtered = true;
1321
2
            return Status::OK();
1322
2
        }
1323
1324
        // 3. Get dict codes.
1325
0
        std::vector<int32_t> dict_codes;
1326
0
        for (size_t i = 0; i < result_filter.size(); ++i) {
1327
0
            if (result_filter[i]) {
1328
0
                dict_codes.emplace_back(i);
1329
0
            }
1330
0
        }
1331
1332
        // About Performance: if dict_column size is too large, it will generate a large IN filter.
1333
0
        if (dict_codes.size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
1334
0
            it = _dict_filter_cols.erase(it);
1335
0
            for (auto& ctx : ctxs) {
1336
0
                _filter_conjuncts.push_back(ctx);
1337
0
            }
1338
0
            continue;
1339
0
        }
1340
1341
        // 4. Rewrite conjuncts.
1342
0
        RETURN_IF_ERROR(_rewrite_dict_conjuncts(
1343
0
                dict_codes, slot_id, temp_block.get_by_position(dict_pos).column->is_nullable()));
1344
0
        ++it;
1345
0
    }
1346
4
    return Status::OK();
1347
6
}
1348
1349
Status RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id,
1350
0
                                               bool is_nullable) {
1351
0
    VExprSPtr root;
1352
0
    if (dict_codes.size() == 1) {
1353
0
        {
1354
0
            TFunction fn;
1355
0
            TFunctionName fn_name;
1356
0
            fn_name.__set_db_name("");
1357
0
            fn_name.__set_function_name("eq");
1358
0
            fn.__set_name(fn_name);
1359
0
            fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
1360
0
            std::vector<TTypeDesc> arg_types;
1361
0
            arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
1362
0
            arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
1363
0
            fn.__set_arg_types(arg_types);
1364
0
            fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
1365
0
            fn.__set_has_var_args(false);
1366
1367
0
            TExprNode texpr_node;
1368
0
            texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
1369
0
            texpr_node.__set_node_type(TExprNodeType::BINARY_PRED);
1370
0
            texpr_node.__set_opcode(TExprOpcode::EQ);
1371
0
            texpr_node.__set_fn(fn);
1372
0
            texpr_node.__set_num_children(2);
1373
0
            texpr_node.__set_is_nullable(is_nullable);
1374
0
            root = VectorizedFnCall::create_shared(texpr_node);
1375
0
        }
1376
0
        {
1377
0
            SlotDescriptor* slot = nullptr;
1378
0
            const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
1379
0
            for (auto each : slots) {
1380
0
                if (each->id() == slot_id) {
1381
0
                    slot = each;
1382
0
                    break;
1383
0
                }
1384
0
            }
1385
0
            root->add_child(VSlotRef::create_shared(slot));
1386
0
        }
1387
0
        {
1388
0
            TExprNode texpr_node;
1389
0
            texpr_node.__set_node_type(TExprNodeType::INT_LITERAL);
1390
0
            texpr_node.__set_type(create_type_desc(TYPE_INT));
1391
0
            TIntLiteral int_literal;
1392
0
            int_literal.__set_value(dict_codes[0]);
1393
0
            texpr_node.__set_int_literal(int_literal);
1394
0
            texpr_node.__set_is_nullable(is_nullable);
1395
0
            root->add_child(VLiteral::create_shared(texpr_node));
1396
0
        }
1397
0
    } else {
1398
0
        {
1399
0
            TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
1400
0
            TExprNode node;
1401
0
            node.__set_type(type_desc);
1402
0
            node.__set_node_type(TExprNodeType::IN_PRED);
1403
0
            node.in_predicate.__set_is_not_in(false);
1404
0
            node.__set_opcode(TExprOpcode::FILTER_IN);
1405
            // VdirectInPredicate assume is_nullable = false.
1406
0
            node.__set_is_nullable(false);
1407
1408
0
            std::shared_ptr<HybridSetBase> hybrid_set(
1409
0
                    create_set(PrimitiveType::TYPE_INT, dict_codes.size(), false));
1410
0
            for (int j = 0; j < dict_codes.size(); ++j) {
1411
0
                hybrid_set->insert(&dict_codes[j]);
1412
0
            }
1413
0
            root = VDirectInPredicate::create_shared(node, hybrid_set);
1414
0
        }
1415
0
        {
1416
0
            SlotDescriptor* slot = nullptr;
1417
0
            const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
1418
0
            for (auto each : slots) {
1419
0
                if (each->id() == slot_id) {
1420
0
                    slot = each;
1421
0
                    break;
1422
0
                }
1423
0
            }
1424
0
            root->add_child(VSlotRef::create_shared(slot));
1425
0
        }
1426
0
    }
1427
0
    VExprContextSPtr rewritten_conjunct_ctx = VExprContext::create_shared(root);
1428
0
    RETURN_IF_ERROR(rewritten_conjunct_ctx->prepare(_state, *_row_descriptor));
1429
0
    RETURN_IF_ERROR(rewritten_conjunct_ctx->open(_state));
1430
0
    _dict_filter_conjuncts.push_back(rewritten_conjunct_ctx);
1431
0
    _filter_conjuncts.push_back(rewritten_conjunct_ctx);
1432
0
    return Status::OK();
1433
0
}
1434
1435
44
Status RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
1436
44
    for (auto& dict_filter_cols : _dict_filter_cols) {
1437
0
        if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
1438
0
            throw Exception(ErrorCode::INTERNAL_ERROR,
1439
0
                            "Wrong read column '{}' in parquet file, block: {}",
1440
0
                            dict_filter_cols.first, block->dump_structure());
1441
0
        }
1442
0
        ColumnWithTypeAndName& column_with_type_and_name =
1443
0
                block->get_by_position((*_col_name_to_block_idx)[dict_filter_cols.first]);
1444
0
        const ColumnPtr& column = column_with_type_and_name.column;
1445
0
        if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
1446
0
            const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr();
1447
0
            const auto* dict_column = assert_cast<const ColumnInt32*>(nested_column.get());
1448
0
            DCHECK(dict_column);
1449
1450
0
            auto string_column = DORIS_TRY(
1451
0
                    _column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
1452
0
                            dict_column));
1453
1454
0
            column_with_type_and_name.type =
1455
0
                    std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
1456
0
            block->replace_by_position(
1457
0
                    (*_col_name_to_block_idx)[dict_filter_cols.first],
1458
0
                    ColumnNullable::create(std::move(string_column),
1459
0
                                           nullable_column->get_null_map_column_ptr()));
1460
0
        } else {
1461
0
            const auto* dict_column = assert_cast<const ColumnInt32*>(column.get());
1462
0
            auto string_column = DORIS_TRY(
1463
0
                    _column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
1464
0
                            dict_column));
1465
1466
0
            column_with_type_and_name.type = std::make_shared<DataTypeString>();
1467
0
            block->replace_by_position((*_col_name_to_block_idx)[dict_filter_cols.first],
1468
0
                                       std::move(string_column));
1469
0
        }
1470
0
    }
1471
44
    return Status::OK();
1472
44
}
1473
1474
37
ParquetColumnReader::ColumnStatistics RowGroupReader::merged_column_statistics() {
1475
37
    ParquetColumnReader::ColumnStatistics st;
1476
106
    for (auto& reader : _column_readers) {
1477
106
        auto ost = reader.second->column_statistics();
1478
106
        st.merge(ost);
1479
106
    }
1480
37
    return st;
1481
37
}
1482
#include "common/compile_check_end.h"
1483
1484
} // namespace doris