be/src/format_v2/table/hive_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format_v2/table/hive_reader.h" |
19 | | |
20 | | #include "common/consts.h" |
21 | | #include "format_v2/column_mapper.h" |
22 | | #include "format_v2/file_reader.h" |
23 | | #include "runtime/runtime_state.h" |
24 | | |
25 | | namespace doris::format::hive { |
26 | | namespace { |
27 | | |
28 | 8 | TFileFormatType::type format_type_from_context(const format::ProjectedColumnBuildContext& context) { |
29 | 8 | DORIS_CHECK(context.scan_params != nullptr); |
30 | 8 | if (context.range != nullptr && context.range->__isset.format_type) { |
31 | 0 | return context.range->format_type; |
32 | 0 | } |
33 | 8 | return context.scan_params->format_type; |
34 | 8 | } |
35 | | |
36 | 8 | bool use_column_position_mapping(const format::ProjectedColumnBuildContext& context) { |
37 | 8 | if (context.runtime_state == nullptr || context.scan_params == nullptr) { |
38 | 0 | return false; |
39 | 0 | } |
40 | 8 | switch (format_type_from_context(context)) { |
41 | 7 | case TFileFormatType::FORMAT_PARQUET: |
42 | 7 | return !context.runtime_state->query_options().hive_parquet_use_column_names; |
43 | 1 | default: |
44 | 1 | return false; |
45 | 8 | } |
46 | 8 | } |
47 | | |
48 | | bool is_file_column_position_slot(const TFileScanSlotInfo& slot_info, |
49 | 5 | const std::string& column_name) { |
50 | 5 | if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) || |
51 | 5 | column_name == BeConsts::ICEBERG_ROWID_COL) { |
52 | 0 | return false; |
53 | 0 | } |
54 | 5 | if (slot_info.__isset.is_file_slot) { |
55 | 5 | return slot_info.is_file_slot; |
56 | 5 | } |
57 | 0 | return !slot_info.__isset.category || slot_info.category != TColumnCategory::PARTITION_KEY; |
58 | 5 | } |
59 | | |
60 | | } // namespace |
61 | | |
62 | 2 | Status HiveReader::prepare_split(const format::SplitReadOptions& options) { |
63 | 2 | if (options.current_split_format != _format) { |
64 | 1 | return Status::InternalError( |
65 | 1 | "Hive scan expects all splits to use the same file format, " |
66 | 1 | "initialized_format={}, current_split_format={}", |
67 | 1 | static_cast<int>(_format), static_cast<int>(options.current_split_format)); |
68 | 1 | } |
69 | 1 | return format::TableReader::prepare_split(options); |
70 | 2 | } |
71 | | |
72 | 5 | format::TableColumnMappingMode HiveReader::mapping_mode() const { |
73 | | // Hive-specific behavior: choose the column matching mode based on file format and the |
74 | | // matching session variable. |
75 | | // - hive_orc_use_column_names / hive_parquet_use_column_names == true |
76 | | // => BY_NAME (modern Hive default, match by column name) |
77 | | // - those options == false |
78 | | // => BY_INDEX (mainly for Hive1 ORC `_col0` / `_col1`, match by top-level position; |
79 | | // Parquet exposes the same switch for consistency) |
80 | | // TableReader updates `_format` in prepare_split(), so this is evaluated per split. |
81 | 5 | DORIS_CHECK(_runtime_state != nullptr); |
82 | 5 | const auto& query_options = _runtime_state->query_options(); |
83 | 5 | bool use_column_names = true; |
84 | 5 | if (_format == format::FileFormat::ORC) { |
85 | 0 | use_column_names = query_options.hive_orc_use_column_names; |
86 | 5 | } else if (_format == format::FileFormat::PARQUET) { |
87 | 2 | use_column_names = query_options.hive_parquet_use_column_names; |
88 | 3 | } else if (_format == format::FileFormat::CSV || _format == format::FileFormat::TEXT || |
89 | 3 | _format == format::FileFormat::JSON) { |
90 | | // Hive CSV/TEXT/JSON readers synthesize a file-local schema from FE-provided file slots |
91 | | // because these formats do not carry embedded column names or field ids. The scan params' |
92 | | // format-specific attributes still tell the physical reader how to read values, while the |
93 | | // table-level mapper can safely match the synthesized file schema by table column name. |
94 | 3 | use_column_names = true; |
95 | 3 | } else { |
96 | 0 | DORIS_CHECK(false) << "HiveReader does not support this file reader format"; |
97 | 0 | } |
98 | | |
99 | 5 | return use_column_names ? format::TableColumnMappingMode::BY_NAME |
100 | 5 | : format::TableColumnMappingMode::BY_INDEX; |
101 | 5 | } |
102 | | |
103 | | Status HiveReader::annotate_projected_column(const TFileScanSlotInfo& slot_info, |
104 | | format::ProjectedColumnBuildContext* context, |
105 | 6 | format::ColumnDefinition* column) const { |
106 | 6 | RETURN_IF_ERROR(format::TableReader::annotate_projected_column(slot_info, context, column)); |
107 | 6 | DORIS_CHECK(context != nullptr); |
108 | 6 | DORIS_CHECK(column != nullptr); |
109 | 6 | if (!use_column_position_mapping(*context) || |
110 | 6 | !is_file_column_position_slot(slot_info, column->name)) { |
111 | 2 | return Status::OK(); |
112 | 2 | } |
113 | 4 | const auto* scan_params = context->scan_params; |
114 | 4 | DORIS_CHECK(scan_params != nullptr); |
115 | 4 | if (!scan_params->__isset.column_idxs || |
116 | 4 | context->next_file_column_idx >= scan_params->column_idxs.size()) { |
117 | 1 | return Status::InvalidArgument( |
118 | 1 | "Hive positional column mapping is missing file index for column '{}', " |
119 | 1 | "required file slot ordinal={}, column_idxs_size={}", |
120 | 1 | column->name, context->next_file_column_idx, |
121 | 1 | scan_params->__isset.column_idxs ? scan_params->column_idxs.size() : 0); |
122 | 1 | } |
123 | 3 | const auto file_index = scan_params->column_idxs[context->next_file_column_idx]; |
124 | 3 | if (file_index < 0) { |
125 | 0 | return Status::InvalidArgument( |
126 | 0 | "Hive positional column mapping has negative file index {} for column '{}'", |
127 | 0 | file_index, column->name); |
128 | 0 | } |
129 | 3 | column->identifier = Field::create_field<TYPE_INT>(file_index); |
130 | 3 | ++context->next_file_column_idx; |
131 | 3 | return Status::OK(); |
132 | 3 | } |
133 | | |
134 | | Status HiveReader::validate_projected_columns( |
135 | 2 | const format::ProjectedColumnBuildContext& context) const { |
136 | 2 | if (!use_column_position_mapping(context)) { |
137 | 0 | return Status::OK(); |
138 | 0 | } |
139 | 2 | DORIS_CHECK(context.scan_params != nullptr); |
140 | 2 | if (context.scan_params->__isset.column_idxs && |
141 | 2 | context.next_file_column_idx != context.scan_params->column_idxs.size()) { |
142 | 0 | return Status::InvalidArgument( |
143 | 0 | "Hive positional column mapping has unused file indexes: consumed={}, " |
144 | 0 | "column_idxs_size={}", |
145 | 0 | context.next_file_column_idx, context.scan_params->column_idxs.size()); |
146 | 0 | } |
147 | 2 | return Status::OK(); |
148 | 2 | } |
149 | | |
150 | | } // namespace doris::format::hive |