Coverage Report

Created: 2026-03-15 01:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/parquet_metadata_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/table/parquet_metadata_reader.h"
19
20
#include <fmt/format.h>
21
22
#include <algorithm>
23
#include <array>
24
#include <cctype>
25
#include <cstring>
26
#include <memory>
27
#include <optional>
28
#include <unordered_map>
29
#include <utility>
30
31
#include "core/block/block.h"
32
#include "core/column/column_map.h"
33
#include "core/column/column_nullable.h"
34
#include "core/data_type/data_type_nullable.h"
35
#include "core/field.h"
36
#include "core/string_view.h"
37
#include "core/types.h"
38
#include "format/parquet/parquet_thrift_util.h"
39
#include "format/parquet/schema_desc.h"
40
#include "format/parquet/vparquet_file_metadata.h"
41
#include "format/table/parquet_utils.h"
42
#include "io/file_factory.h"
43
#include "io/fs/file_reader.h"
44
#include "io/hdfs_builder.h"
45
#include "io/io_common.h"
46
#include "runtime/runtime_state.h"
47
#include "util/string_util.h"
48
49
namespace doris {
50
51
using namespace parquet_utils;
52
53
class ParquetMetadataReader::ModeHandler {
54
public:
55
0
    explicit ModeHandler(RuntimeState* state) : _state(state) {}
56
0
    virtual ~ModeHandler() = default;
57
58
    virtual void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) = 0;
59
    virtual Status append_rows(const std::string& path, FileMetaData* metadata,
60
                               std::vector<MutableColumnPtr>& columns) = 0;
61
62
protected:
63
    RuntimeState* _state = nullptr;
64
65
    static std::unordered_map<std::string, int> _build_name_to_pos_map(
66
0
            const std::vector<SlotDescriptor*>& slots) {
67
0
        std::unordered_map<std::string, int> name_to_pos;
68
0
        name_to_pos.reserve(slots.size());
69
0
        for (size_t i = 0; i < slots.size(); ++i) {
70
0
            name_to_pos.emplace(to_lower(slots[i]->col_name()), static_cast<int>(i));
71
0
        }
72
0
        return name_to_pos;
73
0
    }
74
75
    template <size_t N>
76
    static void _init_slot_pos_map(const std::unordered_map<std::string, int>& name_to_pos,
77
                                   const std::array<const char*, N>& column_names,
78
0
                                   std::array<int, N>* slot_pos) {
79
0
        slot_pos->fill(-1);
80
0
        for (size_t i = 0; i < column_names.size(); ++i) {
81
0
            auto it = name_to_pos.find(column_names[i]);
82
0
            if (it != name_to_pos.end()) {
83
0
                (*slot_pos)[i] = it->second;
84
0
            }
85
0
        }
86
0
    }
Unexecuted instantiation: _ZN5doris21ParquetMetadataReader11ModeHandler18_init_slot_pos_mapILm11EEEvRKSt13unordered_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt4hashIS9_ESt8equal_toIS9_ESaISt4pairIKS9_iEEERKSt5arrayIPKcXT_EEPSL_IiXT_EE
Unexecuted instantiation: _ZN5doris21ParquetMetadataReader11ModeHandler18_init_slot_pos_mapILm7EEEvRKSt13unordered_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt4hashIS9_ESt8equal_toIS9_ESaISt4pairIKS9_iEEERKSt5arrayIPKcXT_EEPSL_IiXT_EE
Unexecuted instantiation: _ZN5doris21ParquetMetadataReader11ModeHandler18_init_slot_pos_mapILm3EEEvRKSt13unordered_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt4hashIS9_ESt8equal_toIS9_ESaISt4pairIKS9_iEEERKSt5arrayIPKcXT_EEPSL_IiXT_EE
Unexecuted instantiation: _ZN5doris21ParquetMetadataReader11ModeHandler18_init_slot_pos_mapILm29EEEvRKSt13unordered_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt4hashIS9_ESt8equal_toIS9_ESaISt4pairIKS9_iEEERKSt5arrayIPKcXT_EEPSL_IiXT_EE
87
};
88
89
class ParquetSchemaModeHandler final : public ParquetMetadataReader::ModeHandler {
90
public:
91
0
    explicit ParquetSchemaModeHandler(RuntimeState* state) : ModeHandler(state) {}
92
93
0
    void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override {
94
0
        const auto& name_to_pos = _build_name_to_pos_map(slots);
95
0
        _init_slot_pos_map(name_to_pos, kSchemaColumnNames, &_slot_pos);
96
0
    }
97
98
    Status append_rows(const std::string& path, FileMetaData* metadata,
99
0
                       std::vector<MutableColumnPtr>& columns) override {
100
0
        const auto& fields = metadata->schema().get_fields_schema();
101
0
        for (const auto& field : fields) {
102
0
            RETURN_IF_ERROR(_append_schema_node(path, field, columns));
103
0
        }
104
0
        return Status::OK();
105
0
    }
106
107
private:
108
    std::array<int, SCHEMA_COLUMN_COUNT> _slot_pos {};
109
110
0
    static std::string _repetition_type_to_string(tparquet::FieldRepetitionType::type type) {
111
0
        switch (type) {
112
0
        case tparquet::FieldRepetitionType::REQUIRED:
113
0
            return "REQUIRED";
114
0
        case tparquet::FieldRepetitionType::OPTIONAL:
115
0
            return "OPTIONAL";
116
0
        case tparquet::FieldRepetitionType::REPEATED:
117
0
            return "REPEATED";
118
0
        default:
119
0
            return "UNKNOWN";
120
0
        }
121
0
    }
122
123
    Status _append_schema_node(const std::string& path, const FieldSchema& field,
124
0
                               std::vector<MutableColumnPtr>& columns) {
125
0
        auto insert_if_requested = [&](SchemaColumnIndex idx, auto&& inserter, auto&&... args) {
126
0
            int pos = _slot_pos[idx];
127
0
            if (pos >= 0) {
128
0
                inserter(columns[pos], std::forward<decltype(args)>(args)...);
129
0
            }
130
0
        };
Unexecuted instantiation: _ZZN5doris24ParquetSchemaModeHandler19_append_schema_nodeERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_11FieldSchemaERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISE_EESaISH_EEENKUlNS_13parquet_utils17SchemaColumnIndexEOT_DpOT0_E_clIRFvRSH_S8_EJS8_EEEDaSM_SO_SR_
Unexecuted instantiation: _ZZN5doris24ParquetSchemaModeHandler19_append_schema_nodeERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_11FieldSchemaERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISE_EESaISH_EEENKUlNS_13parquet_utils17SchemaColumnIndexEOT_DpOT0_E_clIRFvRSH_S8_EJS6_EEEDaSM_SO_SR_
Unexecuted instantiation: _ZZN5doris24ParquetSchemaModeHandler19_append_schema_nodeERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_11FieldSchemaERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISE_EESaISH_EEENKUlNS_13parquet_utils17SchemaColumnIndexEOT_DpOT0_E_clIRFvRSH_EJEEEDaSM_SO_SR_
Unexecuted instantiation: _ZZN5doris24ParquetSchemaModeHandler19_append_schema_nodeERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_11FieldSchemaERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISE_EESaISH_EEENKUlNS_13parquet_utils17SchemaColumnIndexEOT_DpOT0_E_clIRFvRSH_lEJlEEEDaSM_SO_SR_
Unexecuted instantiation: _ZZN5doris24ParquetSchemaModeHandler19_append_schema_nodeERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_11FieldSchemaERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISE_EESaISH_EEENKUlNS_13parquet_utils17SchemaColumnIndexEOT_DpOT0_E_clIRFvRSH_S8_EJRS6_EEEDaSM_SO_SR_
131
132
0
        insert_if_requested(SCHEMA_FILE_NAME, insert_string, path);
133
0
        insert_if_requested(SCHEMA_NAME, insert_string, field.parquet_schema.name);
134
135
0
        if (field.parquet_schema.__isset.type) {
136
0
            insert_if_requested(SCHEMA_TYPE, insert_string,
137
0
                                physical_type_to_string(field.parquet_schema.type));
138
0
        } else {
139
0
            insert_if_requested(SCHEMA_TYPE, insert_null);
140
0
        }
141
142
0
        if (field.parquet_schema.__isset.type_length) {
143
0
            insert_if_requested(SCHEMA_TYPE_LENGTH, insert_int64,
144
0
                                static_cast<Int64>(field.parquet_schema.type_length));
145
0
        } else {
146
0
            insert_if_requested(SCHEMA_TYPE_LENGTH, insert_null);
147
0
        }
148
149
0
        if (field.parquet_schema.__isset.repetition_type) {
150
0
            insert_if_requested(SCHEMA_REPETITION_TYPE, insert_string,
151
0
                                _repetition_type_to_string(field.parquet_schema.repetition_type));
152
0
        } else {
153
0
            insert_if_requested(SCHEMA_REPETITION_TYPE, insert_null);
154
0
        }
155
156
0
        int64_t num_children = field.parquet_schema.__isset.num_children
157
0
                                       ? static_cast<int64_t>(field.parquet_schema.num_children)
158
0
                                       : 0;
159
0
        insert_if_requested(SCHEMA_NUM_CHILDREN, insert_int64, static_cast<Int64>(num_children));
160
161
0
        if (field.parquet_schema.__isset.converted_type) {
162
0
            insert_if_requested(SCHEMA_CONVERTED_TYPE, insert_string,
163
0
                                converted_type_to_string(field.parquet_schema.converted_type));
164
0
        } else {
165
0
            insert_if_requested(SCHEMA_CONVERTED_TYPE, insert_null);
166
0
        }
167
168
0
        if (field.parquet_schema.__isset.scale) {
169
0
            insert_if_requested(SCHEMA_SCALE, insert_int64,
170
0
                                static_cast<Int64>(field.parquet_schema.scale));
171
0
        } else {
172
0
            insert_if_requested(SCHEMA_SCALE, insert_null);
173
0
        }
174
175
0
        if (field.parquet_schema.__isset.precision) {
176
0
            insert_if_requested(SCHEMA_PRECISION, insert_int64,
177
0
                                static_cast<Int64>(field.parquet_schema.precision));
178
0
        } else {
179
0
            insert_if_requested(SCHEMA_PRECISION, insert_null);
180
0
        }
181
182
0
        if (field.parquet_schema.__isset.field_id) {
183
0
            insert_if_requested(SCHEMA_FIELD_ID, insert_int64,
184
0
                                static_cast<Int64>(field.parquet_schema.field_id));
185
0
        } else {
186
0
            insert_if_requested(SCHEMA_FIELD_ID, insert_null);
187
0
        }
188
189
0
        std::string logical = logical_type_to_string(field.parquet_schema);
190
0
        if (logical.empty()) {
191
0
            insert_if_requested(SCHEMA_LOGICAL_TYPE, insert_null);
192
0
        } else {
193
0
            insert_if_requested(SCHEMA_LOGICAL_TYPE, insert_string, logical);
194
0
        }
195
196
0
        for (const auto& child : field.children) {
197
0
            RETURN_IF_ERROR(_append_schema_node(path, child, columns));
198
0
        }
199
0
        return Status::OK();
200
0
    }
201
};
202
203
class ParquetMetadataModeHandler final : public ParquetMetadataReader::ModeHandler {
204
public:
205
0
    explicit ParquetMetadataModeHandler(RuntimeState* state) : ModeHandler(state) {}
206
207
0
    void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override {
208
0
        std::unordered_map<std::string, int> name_to_pos = _build_name_to_pos_map(slots);
209
0
        _init_slot_pos_map(name_to_pos, kMetadataColumnNames, &_slot_pos);
210
0
    }
211
212
    Status append_rows(const std::string& path, FileMetaData* metadata,
213
0
                       std::vector<MutableColumnPtr>& columns) override {
214
0
        const tparquet::FileMetaData& thrift_meta = metadata->to_thrift();
215
0
        if (thrift_meta.row_groups.empty()) {
216
0
            return Status::OK();
217
0
        }
218
219
0
        std::unordered_map<std::string, const FieldSchema*> path_map;
220
0
        const auto& fields = metadata->schema().get_fields_schema();
221
0
        for (const auto& field : fields) {
222
0
            build_path_map(field, "", &path_map);
223
0
        }
224
225
0
        const int kv_pos = _slot_pos[META_KEY_VALUE_METADATA];
226
0
        bool has_kv_map = false;
227
0
        Field kv_map_field;
228
0
        if (kv_pos >= 0 && thrift_meta.__isset.key_value_metadata &&
229
0
            !thrift_meta.key_value_metadata.empty()) {
230
0
            Array keys;
231
0
            Array values;
232
0
            keys.reserve(thrift_meta.key_value_metadata.size());
233
0
            values.reserve(thrift_meta.key_value_metadata.size());
234
0
            for (const auto& kv : thrift_meta.key_value_metadata) {
235
0
                keys.emplace_back(Field::create_field<TYPE_VARBINARY>(doris::StringView(kv.key)));
236
0
                if (kv.__isset.value) {
237
0
                    values.emplace_back(
238
0
                            Field::create_field<TYPE_VARBINARY>(doris::StringView(kv.value)));
239
0
                } else {
240
0
                    values.emplace_back(Field {});
241
0
                }
242
0
            }
243
0
            Map map_value;
244
0
            map_value.reserve(2);
245
0
            map_value.emplace_back(Field::create_field<TYPE_ARRAY>(std::move(keys)));
246
0
            map_value.emplace_back(Field::create_field<TYPE_ARRAY>(std::move(values)));
247
0
            kv_map_field = Field::create_field<TYPE_MAP>(std::move(map_value));
248
0
            has_kv_map = true;
249
0
        }
250
251
0
        for (size_t rg_index = 0; rg_index < thrift_meta.row_groups.size(); ++rg_index) {
252
0
            const auto& row_group = thrift_meta.row_groups[rg_index];
253
0
            Int64 row_group_num_rows = static_cast<Int64>(row_group.num_rows);
254
0
            Int64 row_group_num_columns = static_cast<Int64>(row_group.columns.size());
255
0
            Int64 row_group_bytes = static_cast<Int64>(row_group.total_byte_size);
256
0
            Int64 row_group_compressed_bytes = 0;
257
0
            if (row_group.__isset.total_compressed_size) {
258
0
                row_group_compressed_bytes = static_cast<Int64>(row_group.total_compressed_size);
259
0
            } else {
260
0
                for (const auto& col_chunk : row_group.columns) {
261
0
                    if (!col_chunk.__isset.meta_data) {
262
0
                        continue;
263
0
                    }
264
0
                    row_group_compressed_bytes += col_chunk.meta_data.total_compressed_size;
265
0
                }
266
0
            }
267
268
0
            for (size_t col_idx = 0; col_idx < row_group.columns.size(); ++col_idx) {
269
0
                const auto& column_chunk = row_group.columns[col_idx];
270
0
                if (!column_chunk.__isset.meta_data) {
271
0
                    continue;
272
0
                }
273
0
                const auto& column_meta = column_chunk.meta_data;
274
0
                std::string path_in_schema = join_path(column_meta.path_in_schema);
275
0
                const FieldSchema* schema_field = nullptr;
276
0
                auto it = path_map.find(path_in_schema);
277
0
                if (it != path_map.end()) {
278
0
                    schema_field = it->second;
279
0
                }
280
281
0
                auto insert_if_requested = [&](MetadataColumnIndex idx, auto&& inserter,
282
0
                                               auto&&... args) {
283
0
                    int pos = _slot_pos[idx];
284
0
                    if (pos >= 0) {
285
0
                        inserter(columns[pos], std::forward<decltype(args)>(args)...);
286
0
                    }
287
0
                };
Unexecuted instantiation: _ZZN5doris26ParquetMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19MetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_S8_EJS8_EEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris26ParquetMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19MetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_lEJlEEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris26ParquetMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19MetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_lEJRlEEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris26ParquetMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19MetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_lEJRKlEEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris26ParquetMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19MetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_S8_EJRS6_EEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris26ParquetMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19MetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_S8_EJS6_EEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris26ParquetMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19MetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_EJEEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris26ParquetMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19MetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_bEJRKbEEEDaSL_SN_SQ_
288
289
0
                insert_if_requested(META_FILE_NAME, insert_string,
290
0
                                    column_chunk.__isset.file_path ? column_chunk.file_path : path);
291
0
                insert_if_requested(META_ROW_GROUP_ID, insert_int64, static_cast<Int64>(rg_index));
292
0
                insert_if_requested(META_ROW_GROUP_NUM_ROWS, insert_int64, row_group_num_rows);
293
0
                insert_if_requested(META_ROW_GROUP_NUM_COLUMNS, insert_int64,
294
0
                                    row_group_num_columns);
295
0
                insert_if_requested(META_ROW_GROUP_BYTES, insert_int64, row_group_bytes);
296
0
                insert_if_requested(META_COLUMN_ID, insert_int64, static_cast<Int64>(col_idx));
297
298
                // `ColumnChunk.file_offset` is deprecated and can be 0 even when page offsets are present.
299
                // Fall back to the first page (dictionary/data) offset to provide a useful value.
300
0
                Int64 file_offset = static_cast<Int64>(column_chunk.file_offset);
301
0
                if (file_offset == 0) {
302
0
                    if (column_meta.__isset.dictionary_page_offset) {
303
0
                        file_offset = static_cast<Int64>(column_meta.dictionary_page_offset);
304
0
                    } else {
305
0
                        file_offset = static_cast<Int64>(column_meta.data_page_offset);
306
0
                    }
307
0
                }
308
0
                insert_if_requested(META_FILE_OFFSET, insert_int64, file_offset);
309
0
                insert_if_requested(META_NUM_VALUES, insert_int64, column_meta.num_values);
310
0
                insert_if_requested(META_PATH_IN_SCHEMA, insert_string, path_in_schema);
311
0
                insert_if_requested(META_TYPE, insert_string,
312
0
                                    physical_type_to_string(column_meta.type));
313
314
0
                if (column_meta.__isset.statistics) {
315
0
                    static const cctz::time_zone kUtc0 = cctz::utc_time_zone();
316
0
                    const cctz::time_zone& ctz = _state != nullptr ? _state->timezone_obj() : kUtc0;
317
318
0
                    const auto& stats = column_meta.statistics;
319
320
0
                    if (stats.__isset.min) {
321
0
                        insert_if_requested(META_STATS_MIN, insert_string,
322
0
                                            decode_statistics_value(schema_field, column_meta.type,
323
0
                                                                    stats.min, ctz));
324
0
                    } else {
325
0
                        insert_if_requested(META_STATS_MIN, insert_null);
326
0
                    }
327
0
                    if (stats.__isset.max) {
328
0
                        insert_if_requested(META_STATS_MAX, insert_string,
329
0
                                            decode_statistics_value(schema_field, column_meta.type,
330
0
                                                                    stats.max, ctz));
331
0
                    } else {
332
0
                        insert_if_requested(META_STATS_MAX, insert_null);
333
0
                    }
334
335
0
                    if (stats.__isset.null_count) {
336
0
                        insert_if_requested(META_STATS_NULL_COUNT, insert_int64, stats.null_count);
337
0
                    } else {
338
0
                        insert_if_requested(META_STATS_NULL_COUNT, insert_null);
339
0
                    }
340
0
                    if (stats.__isset.distinct_count) {
341
0
                        insert_if_requested(META_STATS_DISTINCT_COUNT, insert_int64,
342
0
                                            stats.distinct_count);
343
0
                    } else {
344
0
                        insert_if_requested(META_STATS_DISTINCT_COUNT, insert_null);
345
0
                    }
346
347
                    // Prefer min_value/max_value, but fall back to deprecated min/max so the column
348
                    // is still populated for older files.
349
0
                    std::string encoded_min_value;
350
0
                    std::string encoded_max_value;
351
0
                    bool has_min_value = false;
352
0
                    bool has_max_value = false;
353
0
                    if (stats.__isset.min_value) {
354
0
                        encoded_min_value = stats.min_value;
355
0
                        has_min_value = true;
356
0
                    } else if (stats.__isset.min) {
357
0
                        encoded_min_value = stats.min;
358
0
                        has_min_value = true;
359
0
                    }
360
0
                    if (stats.__isset.max_value) {
361
0
                        encoded_max_value = stats.max_value;
362
0
                        has_max_value = true;
363
0
                    } else if (stats.__isset.max) {
364
0
                        encoded_max_value = stats.max;
365
0
                        has_max_value = true;
366
0
                    }
367
0
                    if (has_min_value) {
368
0
                        insert_if_requested(META_STATS_MIN_VALUE, insert_string,
369
0
                                            decode_statistics_value(schema_field, column_meta.type,
370
0
                                                                    encoded_min_value, ctz));
371
0
                    } else {
372
0
                        insert_if_requested(META_STATS_MIN_VALUE, insert_null);
373
0
                    }
374
0
                    if (has_max_value) {
375
0
                        insert_if_requested(META_STATS_MAX_VALUE, insert_string,
376
0
                                            decode_statistics_value(schema_field, column_meta.type,
377
0
                                                                    encoded_max_value, ctz));
378
0
                    } else {
379
0
                        insert_if_requested(META_STATS_MAX_VALUE, insert_null);
380
0
                    }
381
382
0
                    if (stats.__isset.is_min_value_exact) {
383
0
                        insert_if_requested(META_MIN_IS_EXACT, insert_bool,
384
0
                                            stats.is_min_value_exact);
385
0
                    } else {
386
0
                        insert_if_requested(META_MIN_IS_EXACT, insert_null);
387
0
                    }
388
0
                    if (stats.__isset.is_max_value_exact) {
389
0
                        insert_if_requested(META_MAX_IS_EXACT, insert_bool,
390
0
                                            stats.is_max_value_exact);
391
0
                    } else {
392
0
                        insert_if_requested(META_MAX_IS_EXACT, insert_null);
393
0
                    }
394
0
                } else {
395
0
                    insert_if_requested(META_STATS_MIN, insert_null);
396
0
                    insert_if_requested(META_STATS_MAX, insert_null);
397
0
                    insert_if_requested(META_STATS_NULL_COUNT, insert_null);
398
0
                    insert_if_requested(META_STATS_DISTINCT_COUNT, insert_null);
399
0
                    insert_if_requested(META_STATS_MIN_VALUE, insert_null);
400
0
                    insert_if_requested(META_STATS_MAX_VALUE, insert_null);
401
0
                    insert_if_requested(META_MIN_IS_EXACT, insert_null);
402
0
                    insert_if_requested(META_MAX_IS_EXACT, insert_null);
403
0
                }
404
405
0
                insert_if_requested(META_COMPRESSION, insert_string,
406
0
                                    compression_to_string(column_meta.codec));
407
0
                insert_if_requested(META_ENCODINGS, insert_string,
408
0
                                    encodings_to_string(column_meta.encodings));
409
410
0
                if (column_meta.__isset.index_page_offset) {
411
0
                    insert_if_requested(META_INDEX_PAGE_OFFSET, insert_int64,
412
0
                                        column_meta.index_page_offset);
413
0
                } else {
414
0
                    insert_if_requested(META_INDEX_PAGE_OFFSET, insert_null);
415
0
                }
416
0
                if (column_meta.__isset.dictionary_page_offset) {
417
0
                    insert_if_requested(META_DICTIONARY_PAGE_OFFSET, insert_int64,
418
0
                                        column_meta.dictionary_page_offset);
419
0
                } else {
420
0
                    insert_if_requested(META_DICTIONARY_PAGE_OFFSET, insert_null);
421
0
                }
422
0
                insert_if_requested(META_DATA_PAGE_OFFSET, insert_int64,
423
0
                                    column_meta.data_page_offset);
424
425
0
                insert_if_requested(META_TOTAL_COMPRESSED_SIZE, insert_int64,
426
0
                                    column_meta.total_compressed_size);
427
0
                insert_if_requested(META_TOTAL_UNCOMPRESSED_SIZE, insert_int64,
428
0
                                    column_meta.total_uncompressed_size);
429
430
0
                if (kv_pos >= 0) {
431
0
                    if (has_kv_map) {
432
0
                        columns[kv_pos]->insert(kv_map_field);
433
0
                    } else {
434
0
                        insert_null(columns[kv_pos]);
435
0
                    }
436
0
                }
437
438
0
                if (column_meta.__isset.bloom_filter_offset) {
439
0
                    insert_if_requested(META_BLOOM_FILTER_OFFSET, insert_int64,
440
0
                                        column_meta.bloom_filter_offset);
441
0
                } else {
442
0
                    insert_if_requested(META_BLOOM_FILTER_OFFSET, insert_null);
443
0
                }
444
0
                if (column_meta.__isset.bloom_filter_length) {
445
0
                    insert_if_requested(META_BLOOM_FILTER_LENGTH, insert_int64,
446
0
                                        static_cast<Int64>(column_meta.bloom_filter_length));
447
0
                } else {
448
0
                    insert_if_requested(META_BLOOM_FILTER_LENGTH, insert_null);
449
0
                }
450
451
0
                insert_if_requested(META_ROW_GROUP_COMPRESSED_BYTES, insert_int64,
452
0
                                    row_group_compressed_bytes);
453
0
            }
454
0
        }
455
0
        return Status::OK();
456
0
    }
457
458
private:
459
    std::array<int, META_COLUMN_COUNT> _slot_pos {};
460
};
461
462
class ParquetFileMetadataModeHandler final : public ParquetMetadataReader::ModeHandler {
463
public:
464
0
    explicit ParquetFileMetadataModeHandler(RuntimeState* state) : ModeHandler(state) {}
465
466
0
    void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override {
467
0
        const auto& name_to_pos = _build_name_to_pos_map(slots);
468
0
        _init_slot_pos_map(name_to_pos, kFileMetadataColumnNames, &_slot_pos);
469
0
    }
470
471
    Status append_rows(const std::string& path, FileMetaData* metadata,
472
0
                       std::vector<MutableColumnPtr>& columns) override {
473
0
        const tparquet::FileMetaData& thrift_meta = metadata->to_thrift();
474
475
0
        auto insert_if_requested = [&](FileMetadataColumnIndex idx, auto&& inserter,
476
0
                                       auto&&... args) {
477
0
            int pos = _slot_pos[idx];
478
0
            if (pos >= 0) {
479
0
                inserter(columns[pos], std::forward<decltype(args)>(args)...);
480
0
            }
481
0
        };
Unexecuted instantiation: _ZZN5doris30ParquetFileMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils23FileMetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_S8_EJS8_EEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris30ParquetFileMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils23FileMetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_EJEEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris30ParquetFileMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils23FileMetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_lEJlEEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris30ParquetFileMetadataModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils23FileMetadataColumnIndexEOT_DpOT0_E_clIRFvRSG_S8_EJRS6_EEEDaSL_SN_SQ_
482
483
0
        insert_if_requested(FILE_META_FILE_NAME, insert_string, path);
484
0
        if (thrift_meta.__isset.created_by) {
485
0
            insert_if_requested(FILE_META_CREATED_BY, insert_string, thrift_meta.created_by);
486
0
        } else {
487
0
            insert_if_requested(FILE_META_CREATED_BY, insert_null);
488
0
        }
489
0
        insert_if_requested(FILE_META_NUM_ROWS, insert_int64,
490
0
                            static_cast<Int64>(thrift_meta.num_rows));
491
0
        insert_if_requested(FILE_META_NUM_ROW_GROUPS, insert_int64,
492
0
                            static_cast<Int64>(thrift_meta.row_groups.size()));
493
0
        insert_if_requested(FILE_META_FORMAT_VERSION, insert_int64,
494
0
                            static_cast<Int64>(thrift_meta.version));
495
0
        if (thrift_meta.__isset.encryption_algorithm) {
496
0
            const auto& algo = thrift_meta.encryption_algorithm;
497
0
            std::string algo_name;
498
0
            if (algo.__isset.AES_GCM_V1) {
499
0
                algo_name = "AES_GCM_V1";
500
0
            } else if (algo.__isset.AES_GCM_CTR_V1) {
501
0
                algo_name = "AES_GCM_CTR_V1";
502
0
            }
503
0
            if (!algo_name.empty()) {
504
0
                insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_string, algo_name);
505
0
            } else {
506
0
                insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_null);
507
0
            }
508
0
        } else {
509
0
            insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_null);
510
0
        }
511
0
        if (thrift_meta.__isset.footer_signing_key_metadata) {
512
0
            insert_if_requested(FILE_META_FOOTER_SIGNING_KEY_METADATA, insert_string,
513
0
                                thrift_meta.footer_signing_key_metadata);
514
0
        } else {
515
0
            insert_if_requested(FILE_META_FOOTER_SIGNING_KEY_METADATA, insert_null);
516
0
        }
517
0
        return Status::OK();
518
0
    }
519
520
private:
521
    std::array<int, FILE_META_COLUMN_COUNT> _slot_pos {};
522
};
523
524
class ParquetKeyValueModeHandler final : public ParquetMetadataReader::ModeHandler {
525
public:
526
0
    explicit ParquetKeyValueModeHandler(RuntimeState* state) : ModeHandler(state) {}
527
528
0
    void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override {
529
0
        const auto& name_to_pos = _build_name_to_pos_map(slots);
530
0
        _init_slot_pos_map(name_to_pos, kKeyValueColumnNames, &_slot_pos);
531
0
    }
532
533
    Status append_rows(const std::string& path, FileMetaData* metadata,
534
0
                       std::vector<MutableColumnPtr>& columns) override {
535
0
        const tparquet::FileMetaData& thrift_meta = metadata->to_thrift();
536
0
        if (!thrift_meta.__isset.key_value_metadata || thrift_meta.key_value_metadata.empty()) {
537
0
            return Status::OK();
538
0
        }
539
540
0
        auto insert_if_requested = [&](KeyValueColumnIndex idx, auto&& inserter, auto&&... args) {
541
0
            int pos = _slot_pos[idx];
542
0
            if (pos >= 0) {
543
0
                inserter(columns[pos], std::forward<decltype(args)>(args)...);
544
0
            }
545
0
        };
Unexecuted instantiation: _ZZN5doris26ParquetKeyValueModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19KeyValueColumnIndexEOT_DpOT0_E_clIRFvRSG_S8_EJS8_EEEDaSL_SN_SQ_
Unexecuted instantiation: _ZZN5doris26ParquetKeyValueModeHandler11append_rowsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_12FileMetaDataERSt6vectorINS_3COWINS_7IColumnEE11mutable_ptrISD_EESaISG_EEENKUlNS_13parquet_utils19KeyValueColumnIndexEOT_DpOT0_E_clIRFvRSG_EJEEEDaSL_SN_SQ_
546
547
0
        for (const auto& kv : thrift_meta.key_value_metadata) {
548
0
            insert_if_requested(KV_FILE_NAME, insert_string, path);
549
0
            insert_if_requested(KV_KEY, insert_string, kv.key);
550
0
            if (kv.__isset.value) {
551
0
                insert_if_requested(KV_VALUE, insert_string, kv.value);
552
0
            } else {
553
0
                insert_if_requested(KV_VALUE, insert_null);
554
0
            }
555
0
        }
556
0
        return Status::OK();
557
0
    }
558
559
private:
560
    std::array<int, KV_COLUMN_COUNT> _slot_pos {};
561
};
562
563
class ParquetBloomProbeModeHandler final : public ParquetMetadataReader::ModeHandler {
564
public:
565
    ParquetBloomProbeModeHandler(RuntimeState* state, TFileType::type file_type,
566
                                 std::map<std::string, std::string> properties, std::string column,
567
                                 std::string literal)
568
0
            : ModeHandler(state),
569
0
              _file_type(file_type),
570
0
              _properties(std::move(properties)),
571
0
              _column(std::move(column)),
572
0
              _literal(std::move(literal)) {}
573
574
0
    void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override {
575
0
        const auto& name_to_pos = _build_name_to_pos_map(slots);
576
0
        _init_slot_pos_map(name_to_pos, kBloomProbeColumnNames, &_slot_pos);
577
0
    }
578
579
    Status append_rows(const std::string& path, FileMetaData* metadata,
580
0
                       std::vector<MutableColumnPtr>& columns) override {
581
0
        const FieldSchema* schema = metadata->schema().get_column(_column);
582
0
        if (schema == nullptr) {
583
0
            return Status::InvalidArgument(
584
0
                    fmt::format("Column '{}' not found for parquet_bloom_probe", _column));
585
0
        }
586
0
        int parquet_col_id = schema->physical_column_index;
587
0
        PrimitiveType primitive_type = _get_primitive(schema->data_type);
588
0
        if (!ParquetPredicate::bloom_filter_supported(primitive_type)) {
589
0
            return Status::InvalidArgument(
590
0
                    fmt::format("Column '{}' type {} does not support parquet bloom filter probe",
591
0
                                _column, primitive_type));
592
0
        }
593
594
0
        std::string encoded_literal;
595
0
        RETURN_IF_ERROR(
596
0
                _encode_literal(schema->physical_type, primitive_type, _literal, &encoded_literal));
597
598
0
        io::FileSystemProperties system_properties;
599
0
        system_properties.system_type = _file_type;
600
0
        system_properties.properties = _properties;
601
0
        io::FileDescription file_desc;
602
0
        file_desc.path = path;
603
0
        io::FileReaderSPtr file_reader = DORIS_TRY(FileFactory::create_file_reader(
604
0
                system_properties, file_desc, io::FileReaderOptions::DEFAULT, nullptr));
605
0
        io::IOContext io_ctx;
606
607
0
        const tparquet::FileMetaData& thrift_meta = metadata->to_thrift();
608
0
        if (thrift_meta.row_groups.empty()) {
609
0
            return Status::OK();
610
0
        }
611
612
0
        for (size_t rg_idx = 0; rg_idx < thrift_meta.row_groups.size(); ++rg_idx) {
613
0
            if (parquet_col_id < 0 ||
614
0
                parquet_col_id >= thrift_meta.row_groups[rg_idx].columns.size()) {
615
0
                return Status::InvalidArgument(fmt::format(
616
0
                        "Invalid column index {} for parquet_bloom_probe", parquet_col_id));
617
0
            }
618
0
            const auto& column_chunk = thrift_meta.row_groups[rg_idx].columns[parquet_col_id];
619
0
            std::optional<bool> excludes;
620
0
            if (column_chunk.__isset.meta_data &&
621
0
                column_chunk.meta_data.__isset.bloom_filter_offset) {
622
0
                ParquetPredicate::ColumnStat stat;
623
0
                auto st = ParquetPredicate::read_bloom_filter(column_chunk.meta_data, file_reader,
624
0
                                                              &io_ctx, &stat);
625
0
                if (st.ok() && stat.bloom_filter) {
626
0
                    bool might_contain = stat.bloom_filter->test_bytes(encoded_literal.data(),
627
0
                                                                       encoded_literal.size());
628
0
                    excludes = !might_contain;
629
0
                }
630
0
            }
631
0
            _emit_row(path, static_cast<Int64>(rg_idx), excludes, columns);
632
0
        }
633
0
        return Status::OK();
634
0
    }
635
636
private:
637
    std::array<int, BLOOM_COLUMN_COUNT> _slot_pos {};
638
    TFileType::type _file_type;
639
    std::map<std::string, std::string> _properties;
640
    std::string _column;
641
    std::string _literal;
642
643
0
    PrimitiveType _get_primitive(const DataTypePtr& type) const {
644
0
        if (auto nullable = typeid_cast<const DataTypeNullable*>(type.get())) {
645
0
            return nullable->get_nested_type()->get_primitive_type();
646
0
        }
647
0
        return type->get_primitive_type();
648
0
    }
649
650
    Status _encode_literal(tparquet::Type::type physical_type, PrimitiveType primitive_type,
651
0
                           const std::string& literal, std::string* out) const {
652
0
        try {
653
0
            switch (physical_type) {
654
0
            case tparquet::Type::INT32: {
655
0
                int64_t v = std::stoll(literal);
656
0
                int32_t v32 = static_cast<int32_t>(v);
657
0
                out->assign(reinterpret_cast<const char*>(&v32), sizeof(int32_t));
658
0
                return Status::OK();
659
0
            }
660
0
            case tparquet::Type::INT64: {
661
0
                int64_t v = std::stoll(literal);
662
0
                out->assign(reinterpret_cast<const char*>(&v), sizeof(int64_t));
663
0
                return Status::OK();
664
0
            }
665
0
            case tparquet::Type::FLOAT: {
666
0
                float v = std::stof(literal);
667
0
                out->assign(reinterpret_cast<const char*>(&v), sizeof(float));
668
0
                return Status::OK();
669
0
            }
670
0
            case tparquet::Type::DOUBLE: {
671
0
                double v = std::stod(literal);
672
0
                out->assign(reinterpret_cast<const char*>(&v), sizeof(double));
673
0
                return Status::OK();
674
0
            }
675
0
            case tparquet::Type::BYTE_ARRAY: {
676
                // For string/blob, use raw bytes from the literal.
677
0
                *out = literal;
678
0
                return Status::OK();
679
0
            }
680
0
            default:
681
0
                break;
682
0
            }
683
0
        } catch (const std::exception& e) {
684
0
            return Status::InvalidArgument(fmt::format(
685
0
                    "Failed to parse literal '{}' for parquet bloom probe: {}", literal, e.what()));
686
0
        }
687
0
        return Status::NotSupported(
688
0
                fmt::format("Physical type {} for column '{}' not supported in parquet_bloom_probe",
689
0
                            physical_type, _column));
690
0
    }
691
692
    void _emit_row(const std::string& path, Int64 row_group_id, std::optional<bool> excludes,
693
0
                   std::vector<MutableColumnPtr>& columns) {
694
0
        if (_slot_pos[BLOOM_FILE_NAME] >= 0) {
695
0
            insert_string(columns[_slot_pos[BLOOM_FILE_NAME]], path);
696
0
        }
697
0
        if (_slot_pos[BLOOM_ROW_GROUP_ID] >= 0) {
698
0
            insert_int32(columns[_slot_pos[BLOOM_ROW_GROUP_ID]], static_cast<Int32>(row_group_id));
699
0
        }
700
0
        if (_slot_pos[BLOOM_EXCLUDES] >= 0) {
701
0
            int32_t excludes_val = -1; // -1: no bloom filter present
702
0
            if (excludes.has_value()) {
703
0
                excludes_val = excludes.value() ? 1 : 0;
704
0
            }
705
0
            insert_int32(columns[_slot_pos[BLOOM_EXCLUDES]], excludes_val);
706
0
        }
707
0
    }
708
};
709
710
ParquetMetadataReader::ParquetMetadataReader(std::vector<SlotDescriptor*> slots,
711
                                             RuntimeState* state, RuntimeProfile* profile,
712
                                             TMetaScanRange scan_range)
713
0
        : _state(state), _slots(std::move(slots)), _scan_range(std::move(scan_range)) {
714
0
    (void)profile;
715
0
}
716
717
0
ParquetMetadataReader::~ParquetMetadataReader() = default;
718
719
0
Status ParquetMetadataReader::init_reader() {
720
0
    RETURN_IF_ERROR(_init_from_scan_range(_scan_range));
721
0
    if (_mode_type == Mode::SCHEMA) {
722
0
        _mode_handler = std::make_unique<ParquetSchemaModeHandler>(_state);
723
0
    } else if (_mode_type == Mode::FILE_METADATA) {
724
0
        _mode_handler = std::make_unique<ParquetFileMetadataModeHandler>(_state);
725
0
    } else if (_mode_type == Mode::KEY_VALUE_METADATA) {
726
0
        _mode_handler = std::make_unique<ParquetKeyValueModeHandler>(_state);
727
0
    } else if (_mode_type == Mode::BLOOM_PROBE) {
728
0
        _mode_handler = std::make_unique<ParquetBloomProbeModeHandler>(
729
0
                _state, _file_type, _properties, _bloom_column, _bloom_literal);
730
0
    } else {
731
0
        _mode_handler = std::make_unique<ParquetMetadataModeHandler>(_state);
732
0
    }
733
0
    _mode_handler->init_slot_pos_map(_slots);
734
0
    return Status::OK();
735
0
}
736
737
0
Status ParquetMetadataReader::_init_from_scan_range(const TMetaScanRange& scan_range) {
738
0
    if (!scan_range.__isset.parquet_params) {
739
0
        return Status::InvalidArgument(
740
0
                "Missing parquet parameters for parquet_meta table function");
741
0
    }
742
0
    const TParquetMetadataParams& params = scan_range.parquet_params;
743
0
    std::vector<std::string> resolved_paths;
744
0
    if (scan_range.__isset.serialized_splits && !scan_range.serialized_splits.empty()) {
745
0
        resolved_paths.assign(scan_range.serialized_splits.begin(),
746
0
                              scan_range.serialized_splits.end());
747
0
    } else if (params.__isset.paths && !params.paths.empty()) {
748
0
        resolved_paths.assign(params.paths.begin(), params.paths.end());
749
0
    } else {
750
0
        return Status::InvalidArgument("Property 'path' must be set for parquet_meta");
751
0
    }
752
0
    _paths.swap(resolved_paths);
753
754
0
    if (params.__isset.mode) {
755
0
        _mode = params.mode;
756
0
    } else {
757
0
        _mode = MODE_METADATA; // default
758
0
    }
759
760
0
    if (params.__isset.file_type) {
761
0
        _file_type = params.file_type;
762
0
    } else {
763
0
        return Status::InvalidArgument("Property 'file_type' must be set for parquet_metadata");
764
0
    }
765
0
    if (params.__isset.properties) {
766
0
        _properties = params.properties;
767
0
    }
768
0
    if (params.__isset.bloom_column) {
769
0
        _bloom_column = params.bloom_column;
770
0
    }
771
0
    if (params.__isset.bloom_literal) {
772
0
        _bloom_literal = params.bloom_literal;
773
0
    }
774
0
    std::string lower_mode = _mode;
775
0
    std::ranges::transform(lower_mode, lower_mode.begin(),
776
0
                           [](unsigned char c) { return std::tolower(c); });
777
0
    if (lower_mode == MODE_SCHEMA) {
778
0
        _mode_type = Mode::SCHEMA;
779
0
        _mode = MODE_SCHEMA;
780
0
    } else if (lower_mode == MODE_FILE_METADATA) {
781
0
        _mode_type = Mode::FILE_METADATA;
782
0
        _mode = MODE_FILE_METADATA;
783
0
    } else if (lower_mode == MODE_KEY_VALUE_METADATA) {
784
0
        _mode_type = Mode::KEY_VALUE_METADATA;
785
0
        _mode = MODE_KEY_VALUE_METADATA;
786
0
    } else if (lower_mode == MODE_BLOOM_PROBE) {
787
0
        _mode_type = Mode::BLOOM_PROBE;
788
0
        _mode = MODE_BLOOM_PROBE;
789
0
    } else {
790
0
        _mode_type = Mode::METADATA;
791
0
        _mode = MODE_METADATA;
792
0
    }
793
0
    if (_mode_type == Mode::BLOOM_PROBE && (_bloom_column.empty() || _bloom_literal.empty())) {
794
0
        return Status::InvalidArgument(
795
0
                "Properties 'bloom_column' and 'bloom_literal' must be set for "
796
0
                "parquet_bloom_probe");
797
0
    }
798
0
    return Status::OK();
799
0
}
800
801
0
Status ParquetMetadataReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
802
0
    if (_eof) {
803
0
        *eof = true;
804
0
        *read_rows = 0;
805
0
        return Status::OK();
806
0
    }
807
808
    // Scanner may call multiple times; we surface data once and mark eof on the next call.
809
    // When reusing a Block, wipe row data but keep column structure intact.
810
0
    bool mem_reuse = block->mem_reuse();
811
0
    std::vector<MutableColumnPtr> columns(_slots.size());
812
0
    if (mem_reuse) {
813
0
        block->clear_column_data();
814
0
        for (size_t i = 0; i < _slots.size(); ++i) {
815
0
            columns[i] = block->get_by_position(i).column->assume_mutable();
816
0
        }
817
0
    } else {
818
0
        for (size_t i = 0; i < _slots.size(); ++i) {
819
0
            columns[i] = _slots[i]->get_empty_mutable_column();
820
0
        }
821
0
    }
822
823
0
    size_t rows_before = block->rows();
824
0
    RETURN_IF_ERROR(_build_rows(columns));
825
826
0
    if (!mem_reuse) {
827
0
        for (size_t i = 0; i < _slots.size(); ++i) {
828
0
            block->insert(ColumnWithTypeAndName(
829
0
                    std::move(columns[i]), _slots[i]->get_data_type_ptr(), _slots[i]->col_name()));
830
0
        }
831
0
    } else {
832
0
        columns.clear();
833
0
    }
834
835
0
    size_t produced = block->rows() - rows_before;
836
0
    *read_rows = produced;
837
0
    _eof = true;
838
0
    *eof = (produced == 0);
839
0
    return Status::OK();
840
0
}
841
842
// Iterate all configured paths and append metadata rows into the provided columns.
843
0
Status ParquetMetadataReader::_build_rows(std::vector<MutableColumnPtr>& columns) {
844
0
    for (const auto& path : _paths) {
845
0
        RETURN_IF_ERROR(_append_file_rows(path, columns));
846
0
    }
847
0
    return Status::OK();
848
0
}
849
850
// Open a single Parquet file, read its footer, and dispatch to schema/metadata handlers.
851
Status ParquetMetadataReader::_append_file_rows(const std::string& path,
852
0
                                                std::vector<MutableColumnPtr>& columns) {
853
0
    io::FileSystemProperties system_properties;
854
0
    system_properties.system_type = _file_type;
855
0
    system_properties.properties = _properties;
856
0
    if (_file_type == TFileType::FILE_HDFS) {
857
0
        system_properties.hdfs_params = ::doris::parse_properties(system_properties.properties);
858
0
    }
859
0
    io::FileDescription file_desc;
860
0
    file_desc.path = path;
861
0
    io::FileReaderSPtr file_reader = DORIS_TRY(FileFactory::create_file_reader(
862
0
            system_properties, file_desc, io::FileReaderOptions::DEFAULT, nullptr));
863
864
0
    std::unique_ptr<FileMetaData> file_metadata;
865
0
    size_t meta_size = 0;
866
0
    io::IOContext io_ctx;
867
0
    RETURN_IF_ERROR(
868
0
            parse_thrift_footer(file_reader, &file_metadata, &meta_size, &io_ctx, true, true));
869
870
0
    if (_mode_handler == nullptr) {
871
0
        return Status::InternalError(
872
0
                "Parquet metadata reader is not initialized with mode handler");
873
0
    }
874
0
    return _mode_handler->append_rows(path, file_metadata.get(), columns);
875
0
}
876
877
0
Status ParquetMetadataReader::close() {
878
0
    return Status::OK();
879
0
}
880
881
} // namespace doris