Coverage Report

Created: 2026-05-23 04:27

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/parquet_metadata_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/table/parquet_metadata_reader.h"
19
20
#include <fmt/format.h>
21
22
#include <algorithm>
23
#include <array>
24
#include <cctype>
25
#include <cstring>
26
#include <memory>
27
#include <optional>
28
#include <unordered_map>
29
#include <utility>
30
31
#include "core/block/block.h"
32
#include "core/column/column.h"
33
#include "core/column/column_map.h"
34
#include "core/column/column_nullable.h"
35
#include "core/data_type/data_type_nullable.h"
36
#include "core/field.h"
37
#include "core/string_view.h"
38
#include "core/types.h"
39
#include "format/parquet/parquet_thrift_util.h"
40
#include "format/parquet/schema_desc.h"
41
#include "format/parquet/vparquet_file_metadata.h"
42
#include "format/table/parquet_utils.h"
43
#include "io/file_factory.h"
44
#include "io/fs/file_reader.h"
45
#include "io/hdfs_builder.h"
46
#include "io/io_common.h"
47
#include "runtime/runtime_state.h"
48
#include "util/string_util.h"
49
50
namespace doris {
51
52
using namespace parquet_utils;
53
54
class ParquetMetadataReader::ModeHandler {
55
public:
56
0
    explicit ModeHandler(RuntimeState* state) : _state(state) {}
57
0
    virtual ~ModeHandler() = default;
58
59
    virtual void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) = 0;
60
    virtual Status append_rows(const std::string& path, FileMetaData* metadata,
61
                               std::vector<MutableColumnPtr>& columns) = 0;
62
63
protected:
64
    RuntimeState* _state = nullptr;
65
66
    static std::unordered_map<std::string, int> _build_name_to_pos_map(
67
0
            const std::vector<SlotDescriptor*>& slots) {
68
0
        std::unordered_map<std::string, int> name_to_pos;
69
0
        name_to_pos.reserve(slots.size());
70
0
        for (size_t i = 0; i < slots.size(); ++i) {
71
0
            name_to_pos.emplace(to_lower(slots[i]->col_name()), static_cast<int>(i));
72
0
        }
73
0
        return name_to_pos;
74
0
    }
75
76
    template <size_t N>
77
    static void _init_slot_pos_map(const std::unordered_map<std::string, int>& name_to_pos,
78
                                   const std::array<const char*, N>& column_names,
79
0
                                   std::array<int, N>* slot_pos) {
80
0
        slot_pos->fill(-1);
81
0
        for (size_t i = 0; i < column_names.size(); ++i) {
82
0
            auto it = name_to_pos.find(column_names[i]);
83
0
            if (it != name_to_pos.end()) {
84
0
                (*slot_pos)[i] = it->second;
85
0
            }
86
0
        }
87
0
    }
Unexecuted instantiation: _ZN5doris21ParquetMetadataReader11ModeHandler18_init_slot_pos_mapILm11EEEvRKSt13unordered_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt4hashIS9_ESt8equal_toIS9_ESaISt4pairIKS9_iEEERKSt5arrayIPKcXT_EEPSL_IiXT_EE
Unexecuted instantiation: _ZN5doris21ParquetMetadataReader11ModeHandler18_init_slot_pos_mapILm7EEEvRKSt13unordered_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt4hashIS9_ESt8equal_toIS9_ESaISt4pairIKS9_iEEERKSt5arrayIPKcXT_EEPSL_IiXT_EE
Unexecuted instantiation: _ZN5doris21ParquetMetadataReader11ModeHandler18_init_slot_pos_mapILm3EEEvRKSt13unordered_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt4hashIS9_ESt8equal_toIS9_ESaISt4pairIKS9_iEEERKSt5arrayIPKcXT_EEPSL_IiXT_EE
Unexecuted instantiation: _ZN5doris21ParquetMetadataReader11ModeHandler18_init_slot_pos_mapILm29EEEvRKSt13unordered_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt4hashIS9_ESt8equal_toIS9_ESaISt4pairIKS9_iEEERKSt5arrayIPKcXT_EEPSL_IiXT_EE
88
};
89
90
class ParquetSchemaModeHandler final : public ParquetMetadataReader::ModeHandler {
91
public:
92
0
    explicit ParquetSchemaModeHandler(RuntimeState* state) : ModeHandler(state) {}
93
94
0
    void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override {
95
0
        const auto& name_to_pos = _build_name_to_pos_map(slots);
96
0
        _init_slot_pos_map(name_to_pos, kSchemaColumnNames, &_slot_pos);
97
0
    }
98
99
    Status append_rows(const std::string& path, FileMetaData* metadata,
100
0
                       std::vector<MutableColumnPtr>& columns) override {
101
0
        const auto& fields = metadata->schema().get_fields_schema();
102
0
        for (const auto& field : fields) {
103
0
            RETURN_IF_ERROR(_append_schema_node(path, field, columns));
104
0
        }
105
0
        return Status::OK();
106
0
    }
107
108
private:
109
    std::array<int, SCHEMA_COLUMN_COUNT> _slot_pos {};
110
111
0
    static std::string _repetition_type_to_string(tparquet::FieldRepetitionType::type type) {
112
0
        switch (type) {
113
0
        case tparquet::FieldRepetitionType::REQUIRED:
114
0
            return "REQUIRED";
115
0
        case tparquet::FieldRepetitionType::OPTIONAL:
116
0
            return "OPTIONAL";
117
0
        case tparquet::FieldRepetitionType::REPEATED:
118
0
            return "REPEATED";
119
0
        default:
120
0
            return "UNKNOWN";
121
0
        }
122
0
    }
123
124
    Status _append_schema_node(const std::string& path, const FieldSchema& field,
125
0
                               std::vector<MutableColumnPtr>& columns) {
126
0
        auto insert_if_requested = [&](SchemaColumnIndex idx, auto&& inserter, auto&&... args) {
127
0
            int pos = _slot_pos[idx];
128
0
            if (pos >= 0) {
129
0
                inserter(columns[pos], std::forward<decltype(args)>(args)...);
130
0
            }
131
0
        };
Unexecuted instantiation: _ZZN5doris24ParquetSchemaModeHandler19_append_schema_nodeERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_11FieldSchemaERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISE_EESaISH_EEENKUlNS_13parquet_utils17SchemaColumnIndexEOT_DpOT0_E_clIRFvRSH_S8_EJS8_EEEDaSM_SO_SR_
Unexecuted instantiation: _ZZN5doris24ParquetSchemaModeHandler19_append_schema_nodeERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_11FieldSchemaERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISE_EESaISH_EEENKUlNS_13parquet_utils17SchemaColumnIndexEOT_DpOT0_E_clIRFvRSH_S8_EJS6_EEEDaSM_SO_SR_
Unexecuted instantiation: _ZZN5doris24ParquetSchemaModeHandler19_append_schema_nodeERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_11FieldSchemaERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISE_EESaISH_EEENKUlNS_13parquet_utils17SchemaColumnIndexEOT_DpOT0_E_clIRFvRSH_EJEEEDaSM_SO_SR_
Unexecuted instantiation: _ZZN5doris24ParquetSchemaModeHandler19_append_schema_nodeERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_11FieldSchemaERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISE_EESaISH_EEENKUlNS_13parquet_utils17SchemaColumnIndexEOT_DpOT0_E_clIRFvRSH_lEJlEEEDaSM_SO_SR_
Unexecuted instantiation: _ZZN5doris24ParquetSchemaModeHandler19_append_schema_nodeERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_11FieldSchemaERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISE_EESaISH_EEENKUlNS_13parquet_utils17SchemaColumnIndexEOT_DpOT0_E_clIRFvRSH_S8_EJRS6_EEEDaSM_SO_SR_
132
133
0
        insert_if_requested(SCHEMA_FILE_NAME, insert_string, path);
134
0
        insert_if_requested(SCHEMA_NAME, insert_string, field.parquet_schema.name);
135
136
0
        if (field.parquet_schema.__isset.type) {
137
0
            insert_if_requested(SCHEMA_TYPE, insert_string,
138
0
                                physical_type_to_string(field.parquet_schema.type));
139
0
        } else {
140
0
            insert_if_requested(SCHEMA_TYPE, insert_null);
141
0
        }
142
143
0
        if (field.parquet_schema.__isset.type_length) {
144
0
            insert_if_requested(SCHEMA_TYPE_LENGTH, insert_int64,
145
0
                                static_cast<Int64>(field.parquet_schema.type_length));
146
0
        } else {
147
0
            insert_if_requested(SCHEMA_TYPE_LENGTH, insert_null);
148
0
        }
149
150
0
        if (field.parquet_schema.__isset.repetition_type) {
151
0
            insert_if_requested(SCHEMA_REPETITION_TYPE, insert_string,
152
0
                                _repetition_type_to_string(field.parquet_schema.repetition_type));
153
0
        } else {
154
0
            insert_if_requested(SCHEMA_REPETITION_TYPE, insert_null);
155
0
        }
156
157
0
        int64_t num_children = field.parquet_schema.__isset.num_children
158
0
                                       ? static_cast<int64_t>(field.parquet_schema.num_children)
159
0
                                       : 0;
160
0
        insert_if_requested(SCHEMA_NUM_CHILDREN, insert_int64, static_cast<Int64>(num_children));
161
162
0
        if (field.parquet_schema.__isset.converted_type) {
163
0
            insert_if_requested(SCHEMA_CONVERTED_TYPE, insert_string,
164
0
                                converted_type_to_string(field.parquet_schema.converted_type));
165
0
        } else {
166
0
            insert_if_requested(SCHEMA_CONVERTED_TYPE, insert_null);
167
0
        }
168
169
0
        if (field.parquet_schema.__isset.scale) {
170
0
            insert_if_requested(SCHEMA_SCALE, insert_int64,
171
0
                                static_cast<Int64>(field.parquet_schema.scale));
172
0
        } else {
173
0
            insert_if_requested(SCHEMA_SCALE, insert_null);
174
0
        }
175
176
0
        if (field.parquet_schema.__isset.precision) {
177
0
            insert_if_requested(SCHEMA_PRECISION, insert_int64,
178
0
                                static_cast<Int64>(field.parquet_schema.precision));
179
0
        } else {
180
0
            insert_if_requested(SCHEMA_PRECISION, insert_null);
181
0
        }
182
183
0
        if (field.parquet_schema.__isset.field_id) {
184
0
            insert_if_requested(SCHEMA_FIELD_ID, insert_int64,
185
0
                                static_cast<Int64>(field.parquet_schema.field_id));
186
0
        } else {
187
0
            insert_if_requested(SCHEMA_FIELD_ID, insert_null);
188
0
        }
189
190
0
        std::string logical = logical_type_to_string(field.parquet_schema);
191
0
        if (logical.empty()) {
192
0
            insert_if_requested(SCHEMA_LOGICAL_TYPE, insert_null);
193
0
        } else {
194
0
            insert_if_requested(SCHEMA_LOGICAL_TYPE, insert_string, logical);
195
0
        }
196
197
0
        for (const auto& child : field.children) {
198
0
            RETURN_IF_ERROR(_append_schema_node(path, child, columns));
199
0
        }
200
0
        return Status::OK();
201
0
    }
202
};
203
204
class ParquetMetadataModeHandler final : public ParquetMetadataReader::ModeHandler {
205
public:
206
0
    explicit ParquetMetadataModeHandler(RuntimeState* state) : ModeHandler(state) {}
207
208
0
    void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override {
209
0
        std::unordered_map<std::string, int> name_to_pos = _build_name_to_pos_map(slots);
210
0
        _init_slot_pos_map(name_to_pos, kMetadataColumnNames, &_slot_pos);
211
0
    }
212
213
    Status append_rows(const std::string& path, FileMetaData* metadata,
214
0
                       std::vector<MutableColumnPtr>& columns) override {
215
0
        const tparquet::FileMetaData& thrift_meta = metadata->to_thrift();
216
0
        if (thrift_meta.row_groups.empty()) {
217
0
            return Status::OK();
218
0
        }
219
220
0
        std::unordered_map<std::string, const FieldSchema*> path_map;
221
0
        const auto& fields = metadata->schema().get_fields_schema();
222
0
        for (const auto& field : fields) {
223
0
            build_path_map(field, "", &path_map);
224
0
        }
225
226
0
        const int kv_pos = _slot_pos[META_KEY_VALUE_METADATA];
227
0
        bool has_kv_map = false;
228
0
        Field kv_map_field;
229
0
        if (kv_pos >= 0 && thrift_meta.__isset.key_value_metadata &&
230
0
            !thrift_meta.key_value_metadata.empty()) {
231
0
            Array keys;
232
0
            Array values;
233
0
            keys.reserve(thrift_meta.key_value_metadata.size());
234
0
            values.reserve(thrift_meta.key_value_metadata.size());
235
0
            for (const auto& kv : thrift_meta.key_value_metadata) {
236
0
                keys.emplace_back(Field::create_field<TYPE_VARBINARY>(doris::StringView(kv.key)));
237
0
                if (kv.__isset.value) {
238
0
                    values.emplace_back(
239
0
                            Field::create_field<TYPE_VARBINARY>(doris::StringView(kv.value)));
240
0
                } else {
241
0
                    values.emplace_back(Field {});
242
0
                }
243
0
            }
244
0
            Map map_value;
245
0
            map_value.reserve(2);
246
0
            map_value.emplace_back(Field::create_field<TYPE_ARRAY>(std::move(keys)));
247
0
            map_value.emplace_back(Field::create_field<TYPE_ARRAY>(std::move(values)));
248
0
            kv_map_field = Field::create_field<TYPE_MAP>(std::move(map_value));
249
0
            has_kv_map = true;
250
0
        }
251
252
0
        for (size_t rg_index = 0; rg_index < thrift_meta.row_groups.size(); ++rg_index) {
253
0
            const auto& row_group = thrift_meta.row_groups[rg_index];
254
0
            Int64 row_group_num_rows = static_cast<Int64>(row_group.num_rows);
255
0
            Int64 row_group_num_columns = static_cast<Int64>(row_group.columns.size());
256
0
            Int64 row_group_bytes = static_cast<Int64>(row_group.total_byte_size);
257
0
            Int64 row_group_compressed_bytes = 0;
258
0
            if (row_group.__isset.total_compressed_size) {
259
0
                row_group_compressed_bytes = static_cast<Int64>(row_group.total_compressed_size);
260
0
            } else {
261
0
                for (const auto& col_chunk : row_group.columns) {
262
0
                    if (!col_chunk.__isset.meta_data) {
263
0
                        continue;
264
0
                    }
265
0
                    row_group_compressed_bytes += col_chunk.meta_data.total_compressed_size;
266
0
                }
267
0
            }
268
269
0
            for (size_t col_idx = 0; col_idx < row_group.columns.size(); ++col_idx) {
270
0
                const auto& column_chunk = row_group.columns[col_idx];
271
0
                if (!column_chunk.__isset.meta_data) {
272
0
                    continue;
273
0
                }
274
0
                const auto& column_meta = column_chunk.meta_data;
275
0
                std::string path_in_schema = join_path(column_meta.path_in_schema);
276
0
                const FieldSchema* schema_field = nullptr;
277
0
                auto it = path_map.find(path_in_schema);
278
0
                if (it != path_map.end()) {
279
0
                    schema_field = it->second;
280
0
                }
281
282
0
                auto insert_if_requested = [&](MetadataColumnIndex idx, auto&& inserter,
283
0
                                               auto&&... args) {
284
0
                    int pos = _slot_pos[idx];
285
0
                    if (pos >= 0) {
286
0
                        inserter(columns[pos], std::forward<decltype(args)>(args)...);
287
0
                    }
288
0
                };
Unexecuted instantiation: _ZZN5doris26ParquetMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19MetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_S8_EJS8_EEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris26ParquetMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19MetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_lEJlEEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris26ParquetMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19MetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_lEJRlEEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris26ParquetMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19MetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_lEJRKlEEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris26ParquetMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19MetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_S8_EJRS6_EEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris26ParquetMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19MetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_S8_EJS6_EEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris26ParquetMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19MetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_EJEEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris26ParquetMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19MetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_bEJRKbEEEDaSL_SN_SQ_
289
290
0
                insert_if_requested(META_FILE_NAME, insert_string,
291
0
                                    column_chunk.__isset.file_path ? column_chunk.file_path : path);
292
0
                insert_if_requested(META_ROW_GROUP_ID, insert_int64, static_cast<Int64>(rg_index));
293
0
                insert_if_requested(META_ROW_GROUP_NUM_ROWS, insert_int64, row_group_num_rows);
294
0
                insert_if_requested(META_ROW_GROUP_NUM_COLUMNS, insert_int64,
295
0
                                    row_group_num_columns);
296
0
                insert_if_requested(META_ROW_GROUP_BYTES, insert_int64, row_group_bytes);
297
0
                insert_if_requested(META_COLUMN_ID, insert_int64, static_cast<Int64>(col_idx));
298
299
                // `ColumnChunk.file_offset` is deprecated and can be 0 even when page offsets are present.
300
                // Fall back to the first page (dictionary/data) offset to provide a useful value.
301
0
                Int64 file_offset = static_cast<Int64>(column_chunk.file_offset);
302
0
                if (file_offset == 0) {
303
0
                    if (column_meta.__isset.dictionary_page_offset) {
304
0
                        file_offset = static_cast<Int64>(column_meta.dictionary_page_offset);
305
0
                    } else {
306
0
                        file_offset = static_cast<Int64>(column_meta.data_page_offset);
307
0
                    }
308
0
                }
309
0
                insert_if_requested(META_FILE_OFFSET, insert_int64, file_offset);
310
0
                insert_if_requested(META_NUM_VALUES, insert_int64, column_meta.num_values);
311
0
                insert_if_requested(META_PATH_IN_SCHEMA, insert_string, path_in_schema);
312
0
                insert_if_requested(META_TYPE, insert_string,
313
0
                                    physical_type_to_string(column_meta.type));
314
315
0
                if (column_meta.__isset.statistics) {
316
0
                    static const cctz::time_zone kUtc0 = cctz::utc_time_zone();
317
0
                    const cctz::time_zone& ctz = _state != nullptr ? _state->timezone_obj() : kUtc0;
318
319
0
                    const auto& stats = column_meta.statistics;
320
321
0
                    if (stats.__isset.min) {
322
0
                        insert_if_requested(META_STATS_MIN, insert_string,
323
0
                                            decode_statistics_value(schema_field, column_meta.type,
324
0
                                                                    stats.min, ctz));
325
0
                    } else {
326
0
                        insert_if_requested(META_STATS_MIN, insert_null);
327
0
                    }
328
0
                    if (stats.__isset.max) {
329
0
                        insert_if_requested(META_STATS_MAX, insert_string,
330
0
                                            decode_statistics_value(schema_field, column_meta.type,
331
0
                                                                    stats.max, ctz));
332
0
                    } else {
333
0
                        insert_if_requested(META_STATS_MAX, insert_null);
334
0
                    }
335
336
0
                    if (stats.__isset.null_count) {
337
0
                        insert_if_requested(META_STATS_NULL_COUNT, insert_int64, stats.null_count);
338
0
                    } else {
339
0
                        insert_if_requested(META_STATS_NULL_COUNT, insert_null);
340
0
                    }
341
0
                    if (stats.__isset.distinct_count) {
342
0
                        insert_if_requested(META_STATS_DISTINCT_COUNT, insert_int64,
343
0
                                            stats.distinct_count);
344
0
                    } else {
345
0
                        insert_if_requested(META_STATS_DISTINCT_COUNT, insert_null);
346
0
                    }
347
348
                    // Prefer min_value/max_value, but fall back to deprecated min/max so the column
349
                    // is still populated for older files.
350
0
                    std::string encoded_min_value;
351
0
                    std::string encoded_max_value;
352
0
                    bool has_min_value = false;
353
0
                    bool has_max_value = false;
354
0
                    if (stats.__isset.min_value) {
355
0
                        encoded_min_value = stats.min_value;
356
0
                        has_min_value = true;
357
0
                    } else if (stats.__isset.min) {
358
0
                        encoded_min_value = stats.min;
359
0
                        has_min_value = true;
360
0
                    }
361
0
                    if (stats.__isset.max_value) {
362
0
                        encoded_max_value = stats.max_value;
363
0
                        has_max_value = true;
364
0
                    } else if (stats.__isset.max) {
365
0
                        encoded_max_value = stats.max;
366
0
                        has_max_value = true;
367
0
                    }
368
0
                    if (has_min_value) {
369
0
                        insert_if_requested(META_STATS_MIN_VALUE, insert_string,
370
0
                                            decode_statistics_value(schema_field, column_meta.type,
371
0
                                                                    encoded_min_value, ctz));
372
0
                    } else {
373
0
                        insert_if_requested(META_STATS_MIN_VALUE, insert_null);
374
0
                    }
375
0
                    if (has_max_value) {
376
0
                        insert_if_requested(META_STATS_MAX_VALUE, insert_string,
377
0
                                            decode_statistics_value(schema_field, column_meta.type,
378
0
                                                                    encoded_max_value, ctz));
379
0
                    } else {
380
0
                        insert_if_requested(META_STATS_MAX_VALUE, insert_null);
381
0
                    }
382
383
0
                    if (stats.__isset.is_min_value_exact) {
384
0
                        insert_if_requested(META_MIN_IS_EXACT, insert_bool,
385
0
                                            stats.is_min_value_exact);
386
0
                    } else {
387
0
                        insert_if_requested(META_MIN_IS_EXACT, insert_null);
388
0
                    }
389
0
                    if (stats.__isset.is_max_value_exact) {
390
0
                        insert_if_requested(META_MAX_IS_EXACT, insert_bool,
391
0
                                            stats.is_max_value_exact);
392
0
                    } else {
393
0
                        insert_if_requested(META_MAX_IS_EXACT, insert_null);
394
0
                    }
395
0
                } else {
396
0
                    insert_if_requested(META_STATS_MIN, insert_null);
397
0
                    insert_if_requested(META_STATS_MAX, insert_null);
398
0
                    insert_if_requested(META_STATS_NULL_COUNT, insert_null);
399
0
                    insert_if_requested(META_STATS_DISTINCT_COUNT, insert_null);
400
0
                    insert_if_requested(META_STATS_MIN_VALUE, insert_null);
401
0
                    insert_if_requested(META_STATS_MAX_VALUE, insert_null);
402
0
                    insert_if_requested(META_MIN_IS_EXACT, insert_null);
403
0
                    insert_if_requested(META_MAX_IS_EXACT, insert_null);
404
0
                }
405
406
0
                insert_if_requested(META_COMPRESSION, insert_string,
407
0
                                    compression_to_string(column_meta.codec));
408
0
                insert_if_requested(META_ENCODINGS, insert_string,
409
0
                                    encodings_to_string(column_meta.encodings));
410
411
0
                if (column_meta.__isset.index_page_offset) {
412
0
                    insert_if_requested(META_INDEX_PAGE_OFFSET, insert_int64,
413
0
                                        column_meta.index_page_offset);
414
0
                } else {
415
0
                    insert_if_requested(META_INDEX_PAGE_OFFSET, insert_null);
416
0
                }
417
0
                if (column_meta.__isset.dictionary_page_offset) {
418
0
                    insert_if_requested(META_DICTIONARY_PAGE_OFFSET, insert_int64,
419
0
                                        column_meta.dictionary_page_offset);
420
0
                } else {
421
0
                    insert_if_requested(META_DICTIONARY_PAGE_OFFSET, insert_null);
422
0
                }
423
0
                insert_if_requested(META_DATA_PAGE_OFFSET, insert_int64,
424
0
                                    column_meta.data_page_offset);
425
426
0
                insert_if_requested(META_TOTAL_COMPRESSED_SIZE, insert_int64,
427
0
                                    column_meta.total_compressed_size);
428
0
                insert_if_requested(META_TOTAL_UNCOMPRESSED_SIZE, insert_int64,
429
0
                                    column_meta.total_uncompressed_size);
430
431
0
                if (kv_pos >= 0) {
432
0
                    if (has_kv_map) {
433
0
                        columns[kv_pos]->insert(kv_map_field);
434
0
                    } else {
435
0
                        insert_null(columns[kv_pos]);
436
0
                    }
437
0
                }
438
439
0
                if (column_meta.__isset.bloom_filter_offset) {
440
0
                    insert_if_requested(META_BLOOM_FILTER_OFFSET, insert_int64,
441
0
                                        column_meta.bloom_filter_offset);
442
0
                } else {
443
0
                    insert_if_requested(META_BLOOM_FILTER_OFFSET, insert_null);
444
0
                }
445
0
                if (column_meta.__isset.bloom_filter_length) {
446
0
                    insert_if_requested(META_BLOOM_FILTER_LENGTH, insert_int64,
447
0
                                        static_cast<Int64>(column_meta.bloom_filter_length));
448
0
                } else {
449
0
                    insert_if_requested(META_BLOOM_FILTER_LENGTH, insert_null);
450
0
                }
451
452
0
                insert_if_requested(META_ROW_GROUP_COMPRESSED_BYTES, insert_int64,
453
0
                                    row_group_compressed_bytes);
454
0
            }
455
0
        }
456
0
        return Status::OK();
457
0
    }
458
459
private:
460
    std::array<int, META_COLUMN_COUNT> _slot_pos {};
461
};
462
463
class ParquetFileMetadataModeHandler final : public ParquetMetadataReader::ModeHandler {
464
public:
465
0
    explicit ParquetFileMetadataModeHandler(RuntimeState* state) : ModeHandler(state) {}
466
467
0
    void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override {
468
0
        const auto& name_to_pos = _build_name_to_pos_map(slots);
469
0
        _init_slot_pos_map(name_to_pos, kFileMetadataColumnNames, &_slot_pos);
470
0
    }
471
472
    Status append_rows(const std::string& path, FileMetaData* metadata,
473
0
                       std::vector<MutableColumnPtr>& columns) override {
474
0
        const tparquet::FileMetaData& thrift_meta = metadata->to_thrift();
475
476
0
        auto insert_if_requested = [&](FileMetadataColumnIndex idx, auto&& inserter,
477
0
                                       auto&&... args) {
478
0
            int pos = _slot_pos[idx];
479
0
            if (pos >= 0) {
480
0
                inserter(columns[pos], std::forward<decltype(args)>(args)...);
481
0
            }
482
0
        };
Unexecuted instantiation: _ZZN5doris30ParquetFileMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils23FileMetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_S8_EJS8_EEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris30ParquetFileMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils23FileMetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_EJEEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris30ParquetFileMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils23FileMetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_lEJlEEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris30ParquetFileMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils23FileMetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_S8_EJRS6_EEEDaSL_SN_SQ_
483
484
0
        insert_if_requested(FILE_META_FILE_NAME, insert_string, path);
485
0
        if (thrift_meta.__isset.created_by) {
486
0
            insert_if_requested(FILE_META_CREATED_BY, insert_string, thrift_meta.created_by);
487
0
        } else {
488
0
            insert_if_requested(FILE_META_CREATED_BY, insert_null);
489
0
        }
490
0
        insert_if_requested(FILE_META_NUM_ROWS, insert_int64,
491
0
                            static_cast<Int64>(thrift_meta.num_rows));
492
0
        insert_if_requested(FILE_META_NUM_ROW_GROUPS, insert_int64,
493
0
                            static_cast<Int64>(thrift_meta.row_groups.size()));
494
0
        insert_if_requested(FILE_META_FORMAT_VERSION, insert_int64,
495
0
                            static_cast<Int64>(thrift_meta.version));
496
0
        if (thrift_meta.__isset.encryption_algorithm) {
497
0
            const auto& algo = thrift_meta.encryption_algorithm;
498
0
            std::string algo_name;
499
0
            if (algo.__isset.AES_GCM_V1) {
500
0
                algo_name = "AES_GCM_V1";
501
0
            } else if (algo.__isset.AES_GCM_CTR_V1) {
502
0
                algo_name = "AES_GCM_CTR_V1";
503
0
            }
504
0
            if (!algo_name.empty()) {
505
0
                insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_string, algo_name);
506
0
            } else {
507
0
                insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_null);
508
0
            }
509
0
        } else {
510
0
            insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_null);
511
0
        }
512
0
        if (thrift_meta.__isset.footer_signing_key_metadata) {
513
0
            insert_if_requested(FILE_META_FOOTER_SIGNING_KEY_METADATA, insert_string,
514
0
                                thrift_meta.footer_signing_key_metadata);
515
0
        } else {
516
0
            insert_if_requested(FILE_META_FOOTER_SIGNING_KEY_METADATA, insert_null);
517
0
        }
518
0
        return Status::OK();
519
0
    }
520
521
private:
522
    std::array<int, FILE_META_COLUMN_COUNT> _slot_pos {};
523
};
524
525
class ParquetKeyValueModeHandler final : public ParquetMetadataReader::ModeHandler {
526
public:
527
0
    explicit ParquetKeyValueModeHandler(RuntimeState* state) : ModeHandler(state) {}
528
529
0
    void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override {
530
0
        const auto& name_to_pos = _build_name_to_pos_map(slots);
531
0
        _init_slot_pos_map(name_to_pos, kKeyValueColumnNames, &_slot_pos);
532
0
    }
533
534
    Status append_rows(const std::string& path, FileMetaData* metadata,
535
0
                       std::vector<MutableColumnPtr>& columns) override {
536
0
        const tparquet::FileMetaData& thrift_meta = metadata->to_thrift();
537
0
        if (!thrift_meta.__isset.key_value_metadata || thrift_meta.key_value_metadata.empty()) {
538
0
            return Status::OK();
539
0
        }
540
541
0
        auto insert_if_requested = [&](KeyValueColumnIndex idx, auto&& inserter, auto&&... args) {
542
0
            int pos = _slot_pos[idx];
543
0
            if (pos >= 0) {
544
0
                inserter(columns[pos], std::forward<decltype(args)>(args)...);
545
0
            }
546
0
        };
Unexecuted instantiation: _ZZN5doris26ParquetKeyValueModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19KeyValueColumnIndexEOT_DpOT0_E_clIRFvRSG_S8_EJS8_EEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris26ParquetKeyValueModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19KeyValueColumnIndexEOT_DpOT0_E_clIRFvRSG_EJEEEDaSL_SN_SQ_
547
548
0
        for (const auto& kv : thrift_meta.key_value_metadata) {
549
0
            insert_if_requested(KV_FILE_NAME, insert_string, path);
550
0
            insert_if_requested(KV_KEY, insert_string, kv.key);
551
0
            if (kv.__isset.value) {
552
0
                insert_if_requested(KV_VALUE, insert_string, kv.value);
553
0
            } else {
554
0
                insert_if_requested(KV_VALUE, insert_null);
555
0
            }
556
0
        }
557
0
        return Status::OK();
558
0
    }
559
560
private:
561
    std::array<int, KV_COLUMN_COUNT> _slot_pos {};
562
};
563
564
class ParquetBloomProbeModeHandler final : public ParquetMetadataReader::ModeHandler {
565
public:
566
    ParquetBloomProbeModeHandler(RuntimeState* state, TFileType::type file_type,
567
                                 std::map<std::string, std::string> properties, std::string column,
568
                                 std::string literal)
569
0
            : ModeHandler(state),
570
0
              _file_type(file_type),
571
0
              _properties(std::move(properties)),
572
0
              _column(std::move(column)),
573
0
              _literal(std::move(literal)) {}
574
575
0
    void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override {
576
0
        const auto& name_to_pos = _build_name_to_pos_map(slots);
577
0
        _init_slot_pos_map(name_to_pos, kBloomProbeColumnNames, &_slot_pos);
578
0
    }
579
580
    Status append_rows(const std::string& path, FileMetaData* metadata,
581
0
                       std::vector<MutableColumnPtr>& columns) override {
582
0
        const FieldSchema* schema = metadata->schema().get_column(_column);
583
0
        if (schema == nullptr) {
584
0
            return Status::InvalidArgument(
585
0
                    fmt::format("Column '{}' not found for parquet_bloom_probe", _column));
586
0
        }
587
0
        int parquet_col_id = schema->physical_column_index;
588
0
        PrimitiveType primitive_type = _get_primitive(schema->data_type);
589
0
        if (!ParquetPredicate::bloom_filter_supported(primitive_type)) {
590
0
            return Status::InvalidArgument(
591
0
                    fmt::format("Column '{}' type {} does not support parquet bloom filter probe",
592
0
                                _column, primitive_type));
593
0
        }
594
595
0
        std::string encoded_literal;
596
0
        RETURN_IF_ERROR(
597
0
                _encode_literal(schema->physical_type, primitive_type, _literal, &encoded_literal));
598
599
0
        io::FileSystemProperties system_properties;
600
0
        system_properties.system_type = _file_type;
601
0
        system_properties.properties = _properties;
602
0
        io::FileDescription file_desc;
603
0
        file_desc.path = path;
604
0
        io::FileReaderSPtr file_reader = DORIS_TRY(FileFactory::create_file_reader(
605
0
                system_properties, file_desc, io::FileReaderOptions::DEFAULT, nullptr));
606
0
        io::IOContext io_ctx;
607
608
0
        const tparquet::FileMetaData& thrift_meta = metadata->to_thrift();
609
0
        if (thrift_meta.row_groups.empty()) {
610
0
            return Status::OK();
611
0
        }
612
613
0
        for (size_t rg_idx = 0; rg_idx < thrift_meta.row_groups.size(); ++rg_idx) {
614
0
            if (parquet_col_id < 0 ||
615
0
                parquet_col_id >= thrift_meta.row_groups[rg_idx].columns.size()) {
616
0
                return Status::InvalidArgument(fmt::format(
617
0
                        "Invalid column index {} for parquet_bloom_probe", parquet_col_id));
618
0
            }
619
0
            const auto& column_chunk = thrift_meta.row_groups[rg_idx].columns[parquet_col_id];
620
0
            std::optional<bool> excludes;
621
0
            if (column_chunk.__isset.meta_data &&
622
0
                column_chunk.meta_data.__isset.bloom_filter_offset) {
623
0
                ParquetPredicate::ColumnStat stat;
624
0
                auto st = ParquetPredicate::read_bloom_filter(column_chunk.meta_data, file_reader,
625
0
                                                              &io_ctx, &stat);
626
0
                if (st.ok() && stat.bloom_filter) {
627
0
                    bool might_contain = stat.bloom_filter->test_bytes(encoded_literal.data(),
628
0
                                                                       encoded_literal.size());
629
0
                    excludes = !might_contain;
630
0
                }
631
0
            }
632
0
            _emit_row(path, static_cast<Int64>(rg_idx), excludes, columns);
633
0
        }
634
0
        return Status::OK();
635
0
    }
636
637
private:
638
    std::array<int, BLOOM_COLUMN_COUNT> _slot_pos {};
639
    TFileType::type _file_type;
640
    std::map<std::string, std::string> _properties;
641
    std::string _column;
642
    std::string _literal;
643
644
0
    PrimitiveType _get_primitive(const DataTypePtr& type) const {
645
0
        if (auto nullable = typeid_cast<const DataTypeNullable*>(type.get())) {
646
0
            return nullable->get_nested_type()->get_primitive_type();
647
0
        }
648
0
        return type->get_primitive_type();
649
0
    }
650
651
    Status _encode_literal(tparquet::Type::type physical_type, PrimitiveType primitive_type,
652
0
                           const std::string& literal, std::string* out) const {
653
0
        try {
654
0
            switch (physical_type) {
655
0
            case tparquet::Type::INT32: {
656
0
                int64_t v = std::stoll(literal);
657
0
                int32_t v32 = static_cast<int32_t>(v);
658
0
                out->assign(reinterpret_cast<const char*>(&v32), sizeof(int32_t));
659
0
                return Status::OK();
660
0
            }
661
0
            case tparquet::Type::INT64: {
662
0
                int64_t v = std::stoll(literal);
663
0
                out->assign(reinterpret_cast<const char*>(&v), sizeof(int64_t));
664
0
                return Status::OK();
665
0
            }
666
0
            case tparquet::Type::FLOAT: {
667
0
                float v = std::stof(literal);
668
0
                out->assign(reinterpret_cast<const char*>(&v), sizeof(float));
669
0
                return Status::OK();
670
0
            }
671
0
            case tparquet::Type::DOUBLE: {
672
0
                double v = std::stod(literal);
673
0
                out->assign(reinterpret_cast<const char*>(&v), sizeof(double));
674
0
                return Status::OK();
675
0
            }
676
0
            case tparquet::Type::BYTE_ARRAY: {
677
                // For string/blob, use raw bytes from the literal.
678
0
                *out = literal;
679
0
                return Status::OK();
680
0
            }
681
0
            default:
682
0
                break;
683
0
            }
684
0
        } catch (const std::exception& e) {
685
0
            return Status::InvalidArgument(fmt::format(
686
0
                    "Failed to parse literal '{}' for parquet bloom probe: {}", literal, e.what()));
687
0
        }
688
0
        return Status::NotSupported(
689
0
                fmt::format("Physical type {} for column '{}' not supported in parquet_bloom_probe",
690
0
                            physical_type, _column));
691
0
    }
692
693
    void _emit_row(const std::string& path, Int64 row_group_id, std::optional<bool> excludes,
694
0
                   std::vector<MutableColumnPtr>& columns) {
695
0
        if (_slot_pos[BLOOM_FILE_NAME] >= 0) {
696
0
            insert_string(columns[_slot_pos[BLOOM_FILE_NAME]], path);
697
0
        }
698
0
        if (_slot_pos[BLOOM_ROW_GROUP_ID] >= 0) {
699
0
            insert_int32(columns[_slot_pos[BLOOM_ROW_GROUP_ID]], static_cast<Int32>(row_group_id));
700
0
        }
701
0
        if (_slot_pos[BLOOM_EXCLUDES] >= 0) {
702
0
            int32_t excludes_val = -1; // -1: no bloom filter present
703
0
            if (excludes.has_value()) {
704
0
                excludes_val = excludes.value() ? 1 : 0;
705
0
            }
706
0
            insert_int32(columns[_slot_pos[BLOOM_EXCLUDES]], excludes_val);
707
0
        }
708
0
    }
709
};
710
711
ParquetMetadataReader::ParquetMetadataReader(std::vector<SlotDescriptor*> slots,
712
                                             RuntimeState* state, RuntimeProfile* profile,
713
                                             TMetaScanRange scan_range)
714
0
        : _state(state), _slots(std::move(slots)), _scan_range(std::move(scan_range)) {
715
0
    (void)profile;
716
0
}
717
718
0
ParquetMetadataReader::~ParquetMetadataReader() = default;
719
720
0
Status ParquetMetadataReader::init_reader() {
721
0
    RETURN_IF_ERROR(_init_from_scan_range(_scan_range));
722
0
    if (_mode_type == Mode::SCHEMA) {
723
0
        _mode_handler = std::make_unique<ParquetSchemaModeHandler>(_state);
724
0
    } else if (_mode_type == Mode::FILE_METADATA) {
725
0
        _mode_handler = std::make_unique<ParquetFileMetadataModeHandler>(_state);
726
0
    } else if (_mode_type == Mode::KEY_VALUE_METADATA) {
727
0
        _mode_handler = std::make_unique<ParquetKeyValueModeHandler>(_state);
728
0
    } else if (_mode_type == Mode::BLOOM_PROBE) {
729
0
        _mode_handler = std::make_unique<ParquetBloomProbeModeHandler>(
730
0
                _state, _file_type, _properties, _bloom_column, _bloom_literal);
731
0
    } else {
732
0
        _mode_handler = std::make_unique<ParquetMetadataModeHandler>(_state);
733
0
    }
734
0
    _mode_handler->init_slot_pos_map(_slots);
735
0
    return Status::OK();
736
0
}
737
738
0
Status ParquetMetadataReader::_init_from_scan_range(const TMetaScanRange& scan_range) {
739
0
    if (!scan_range.__isset.parquet_params) {
740
0
        return Status::InvalidArgument(
741
0
                "Missing parquet parameters for parquet_meta table function");
742
0
    }
743
0
    const TParquetMetadataParams& params = scan_range.parquet_params;
744
0
    std::vector<std::string> resolved_paths;
745
0
    if (scan_range.__isset.serialized_splits && !scan_range.serialized_splits.empty()) {
746
0
        resolved_paths.assign(scan_range.serialized_splits.begin(),
747
0
                              scan_range.serialized_splits.end());
748
0
    } else if (params.__isset.paths && !params.paths.empty()) {
749
0
        resolved_paths.assign(params.paths.begin(), params.paths.end());
750
0
    } else {
751
0
        return Status::InvalidArgument("Property 'path' must be set for parquet_meta");
752
0
    }
753
0
    _paths.swap(resolved_paths);
754
755
0
    if (params.__isset.mode) {
756
0
        _mode = params.mode;
757
0
    } else {
758
0
        _mode = MODE_METADATA; // default
759
0
    }
760
761
0
    if (params.__isset.file_type) {
762
0
        _file_type = params.file_type;
763
0
    } else {
764
0
        return Status::InvalidArgument("Property 'file_type' must be set for parquet_metadata");
765
0
    }
766
0
    if (params.__isset.properties) {
767
0
        _properties = params.properties;
768
0
    }
769
0
    if (params.__isset.bloom_column) {
770
0
        _bloom_column = params.bloom_column;
771
0
    }
772
0
    if (params.__isset.bloom_literal) {
773
0
        _bloom_literal = params.bloom_literal;
774
0
    }
775
0
    std::string lower_mode = _mode;
776
0
    std::ranges::transform(lower_mode, lower_mode.begin(),
777
0
                           [](unsigned char c) { return std::tolower(c); });
778
0
    if (lower_mode == MODE_SCHEMA) {
779
0
        _mode_type = Mode::SCHEMA;
780
0
        _mode = MODE_SCHEMA;
781
0
    } else if (lower_mode == MODE_FILE_METADATA) {
782
0
        _mode_type = Mode::FILE_METADATA;
783
0
        _mode = MODE_FILE_METADATA;
784
0
    } else if (lower_mode == MODE_KEY_VALUE_METADATA) {
785
0
        _mode_type = Mode::KEY_VALUE_METADATA;
786
0
        _mode = MODE_KEY_VALUE_METADATA;
787
0
    } else if (lower_mode == MODE_BLOOM_PROBE) {
788
0
        _mode_type = Mode::BLOOM_PROBE;
789
0
        _mode = MODE_BLOOM_PROBE;
790
0
    } else {
791
0
        _mode_type = Mode::METADATA;
792
0
        _mode = MODE_METADATA;
793
0
    }
794
0
    if (_mode_type == Mode::BLOOM_PROBE && (_bloom_column.empty() || _bloom_literal.empty())) {
795
0
        return Status::InvalidArgument(
796
0
                "Properties 'bloom_column' and 'bloom_literal' must be set for "
797
0
                "parquet_bloom_probe");
798
0
    }
799
0
    return Status::OK();
800
0
}
801
802
0
Status ParquetMetadataReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) {
803
0
    if (_eof) {
804
0
        *eof = true;
805
0
        *read_rows = 0;
806
0
        return Status::OK();
807
0
    }
808
809
    // Scanner may call multiple times; we surface data once and mark eof on the next call.
810
    // When reusing a Block, wipe row data but keep column structure intact.
811
0
    const bool mem_reuse = block->mem_reuse();
812
0
    size_t produced = 0;
813
0
    if (!mem_reuse) {
814
0
        std::vector<MutableColumnPtr> columns(_slots.size());
815
0
        for (size_t i = 0; i < _slots.size(); ++i) {
816
0
            columns[i] = _slots[i]->get_empty_mutable_column();
817
0
        }
818
819
0
        RETURN_IF_ERROR(_build_rows(columns));
820
0
        for (size_t i = 0; i < _slots.size(); ++i) {
821
0
            block->insert(ColumnWithTypeAndName(
822
0
                    std::move(columns[i]), _slots[i]->get_data_type_ptr(), _slots[i]->col_name()));
823
0
        }
824
0
        produced = block->rows();
825
0
    } else {
826
0
        auto columns_guard = block->mutate_columns_scoped();
827
0
        auto& columns = columns_guard.mutable_columns();
828
0
        for (size_t i = 0; i < _slots.size(); ++i) {
829
0
            columns[i]->clear();
830
0
        }
831
832
0
        RETURN_IF_ERROR(_build_rows(columns));
833
0
        produced = columns[0]->size();
834
0
    }
835
836
0
    *read_rows = produced;
837
0
    _eof = true;
838
0
    *eof = (produced == 0);
839
0
    return Status::OK();
840
0
}
841
842
// Iterate all configured paths and append metadata rows into the provided columns.
843
0
Status ParquetMetadataReader::_build_rows(std::vector<MutableColumnPtr>& columns) {
844
0
    for (const auto& path : _paths) {
845
0
        RETURN_IF_ERROR(_append_file_rows(path, columns));
846
0
    }
847
0
    return Status::OK();
848
0
}
849
850
// Open a single Parquet file, read its footer, and dispatch to schema/metadata handlers.
851
Status ParquetMetadataReader::_append_file_rows(const std::string& path,
852
0
                                                std::vector<MutableColumnPtr>& columns) {
853
0
    io::FileSystemProperties system_properties;
854
0
    system_properties.system_type = _file_type;
855
0
    system_properties.properties = _properties;
856
0
    if (_file_type == TFileType::FILE_HDFS) {
857
0
        system_properties.hdfs_params = ::doris::parse_properties(system_properties.properties);
858
0
    }
859
0
    io::FileDescription file_desc;
860
0
    file_desc.path = path;
861
0
    io::FileReaderSPtr file_reader = DORIS_TRY(FileFactory::create_file_reader(
862
0
            system_properties, file_desc, io::FileReaderOptions::DEFAULT, nullptr));
863
864
0
    std::unique_ptr<FileMetaData> file_metadata;
865
0
    size_t meta_size = 0;
866
0
    io::IOContext io_ctx;
867
0
    RETURN_IF_ERROR(
868
0
            parse_thrift_footer(file_reader, &file_metadata, &meta_size, &io_ctx, true, true));
869
870
0
    if (_mode_handler == nullptr) {
871
0
        return Status::InternalError(
872
0
                "Parquet metadata reader is not initialized with mode handler");
873
0
    }
874
0
    return _mode_handler->append_rows(path, file_metadata.get(), columns);
875
0
}
876
877
0
Status ParquetMetadataReader::close() {
878
0
    return Status::OK();
879
0
}
880
881
} // namespace doris