be/src/format/table/iceberg_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <cstddef> |
21 | | #include <cstdint> |
22 | | #include <string> |
23 | | #include <unordered_map> |
24 | | #include <unordered_set> |
25 | | #include <utility> |
26 | | #include <vector> |
27 | | |
28 | | #include "common/status.h" |
29 | | #include "exprs/vslot_ref.h" |
30 | | #include "format/orc/vorc_reader.h" |
31 | | #include "format/parquet/vparquet_reader.h" |
32 | | #include "format/table/equality_delete.h" |
33 | | #include "format/table/table_format_reader.h" |
34 | | #include "storage/olap_scan_common.h" |
35 | | |
36 | | namespace tparquet { |
37 | | class KeyValue; |
38 | | } // namespace tparquet |
39 | | |
40 | | namespace doris { |
41 | | #include "common/compile_check_begin.h" |
42 | | class RowDescriptor; |
43 | | class RuntimeState; |
44 | | class SlotDescriptor; |
45 | | class TFileRangeDesc; |
46 | | class TFileScanRangeParams; |
47 | | class TIcebergDeleteFileDesc; |
48 | | class TupleDescriptor; |
49 | | |
50 | | namespace io { |
51 | | struct IOContext; |
52 | | } // namespace io |
53 | | template <typename T> |
54 | | class ColumnStr; |
55 | | using ColumnString = ColumnStr<UInt32>; |
56 | | class Block; |
57 | | class GenericReader; |
58 | | class ShardedKVCache; |
59 | | class VExprContext; |
60 | | |
61 | | struct RowLineageColumns { |
62 | | int row_id_column_idx = -1; |
63 | | int last_updated_sequence_number_column_idx = -1; |
64 | | int64_t first_row_id = -1; |
65 | | int64_t last_updated_sequence_number = -1; |
66 | | |
67 | 710 | bool need_row_ids() const { return row_id_column_idx >= 0; } |
68 | 468 | bool has_last_updated_sequence_number_column() const { |
69 | 468 | return last_updated_sequence_number_column_idx >= 0; |
70 | 468 | } |
71 | | }; |
72 | | |
73 | | class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHelper { |
74 | | public: |
75 | | static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id"; |
76 | | static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER = |
77 | | "_last_updated_sequence_number"; |
78 | | |
79 | | IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, |
80 | | RuntimeState* state, const TFileScanRangeParams& params, |
81 | | const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx, |
82 | | FileMetaCache* meta_cache); |
83 | 24.6k | ~IcebergTableReader() override = default; |
84 | | |
85 | 156 | void set_need_row_id_column(bool need) { _need_row_id_column = need; } |
86 | 0 | bool need_row_id_column() const { return _need_row_id_column; } |
87 | 230 | void set_row_id_column_position(int position) { _row_id_column_position = position; } |
88 | | |
89 | | Status init_row_filters() final; |
90 | | |
91 | | Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; |
92 | | |
93 | | enum { DATA, POSITION_DELETE, EQUALITY_DELETE, DELETION_VECTOR }; |
94 | | enum Fileformat { NONE, PARQUET, ORC, AVRO }; |
95 | | |
96 | | virtual void set_delete_rows() = 0; |
97 | | |
98 | 10.0k | bool has_delete_operations() const override { |
99 | 10.0k | return _equality_delete_impls.size() > 0 || TableFormatReader::has_delete_operations(); |
100 | 10.0k | } |
101 | | |
102 | | Status read_deletion_vector(const std::string& data_file_path, |
103 | | const TIcebergDeleteFileDesc& delete_file_desc); |
104 | | |
105 | 472 | void set_row_lineage_columns(std::shared_ptr<RowLineageColumns> row_lineage_columns) { |
106 | 472 | _row_lineage_columns = std::move(row_lineage_columns); |
107 | 472 | } |
108 | | |
109 | | protected: |
110 | | struct IcebergProfile { |
111 | | RuntimeProfile::Counter* num_delete_files; |
112 | | RuntimeProfile::Counter* num_delete_rows; |
113 | | RuntimeProfile::Counter* delete_files_read_time; |
114 | | RuntimeProfile::Counter* delete_rows_sort_time; |
115 | | RuntimeProfile::Counter* parse_delete_file_time; |
116 | | }; |
117 | | using DeleteRows = std::vector<int64_t>; |
118 | | using DeleteFile = phmap::parallel_flat_hash_map< |
119 | | std::string, std::unique_ptr<DeleteRows>, std::hash<std::string>, std::equal_to<>, |
120 | | std::allocator<std::pair<const std::string, std::unique_ptr<DeleteRows>>>, 8, |
121 | | std::mutex>; |
122 | | |
123 | | // $row_id metadata column generation state |
124 | | bool _need_row_id_column = false; |
125 | | int _row_id_column_position = -1; |
126 | | /** |
127 | | * https://iceberg.apache.org/spec/#position-delete-files |
128 | | * The rows in the delete file must be sorted by file_path then position to optimize filtering rows while scanning. |
129 | | * Sorting by file_path allows filter pushdown by file in columnar storage formats. |
130 | | * Sorting by position allows filtering rows while scanning, to avoid keeping deletes in memory. |
131 | | */ |
132 | | static void _sort_delete_rows(const std::vector<std::vector<int64_t>*>& delete_rows_array, |
133 | | int64_t num_delete_rows, std::vector<int64_t>& result); |
134 | | |
135 | 5.63k | static std::string _delet_file_cache_key(const std::string& path) { return "delete_" + path; } |
136 | | |
137 | | Status _position_delete_base(const std::string data_file_path, |
138 | | const std::vector<TIcebergDeleteFileDesc>& delete_files); |
139 | | virtual Status _process_equality_delete( |
140 | | const std::vector<TIcebergDeleteFileDesc>& delete_files) = 0; |
141 | | void _generate_equality_delete_block(Block* block, |
142 | | const std::vector<std::string>& equality_delete_col_names, |
143 | | const std::vector<DataTypePtr>& equality_delete_col_types); |
144 | | // Equality delete should read the primary columns. Add the missing columns |
145 | | Status _expand_block_if_need(Block* block); |
146 | | // Remove the added delete columns |
147 | | Status _shrink_block_if_need(Block* block); |
148 | | |
149 | | // owned by scan node |
150 | | ShardedKVCache* _kv_cache; |
151 | | IcebergProfile _iceberg_profile; |
152 | | // _iceberg_delete_rows from kv_cache |
153 | | const std::vector<int64_t>* _iceberg_delete_rows = nullptr; |
154 | | |
155 | | // Pointer to external column name to block index mapping (from FileScanner) |
156 | | // Used to dynamically add expand columns for equality delete |
157 | | std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = nullptr; |
158 | | |
159 | | Fileformat _file_format = Fileformat::NONE; |
160 | | |
161 | | const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2; |
162 | | const int READ_DELETE_FILE_BATCH_SIZE = 102400; |
163 | | |
164 | | // Read a position delete file from the full Iceberg delete descriptor. |
165 | | Status _read_position_delete_file(const TIcebergDeleteFileDesc&, DeleteFile*); |
166 | | |
167 | | // read table colummn + extra equality delete columns |
168 | | std::vector<std::string> _all_required_col_names; |
169 | | |
170 | | // extra equality delete name and type |
171 | | std::vector<std::string> _expand_col_names; |
172 | | std::vector<ColumnWithTypeAndName> _expand_columns; |
173 | | |
174 | | // all ids that need read for eq delete (from all qe delte file.) |
175 | | std::set<int> _equality_delete_col_ids; |
176 | | // eq delete column ids -> location of _equality_delete_blocks / _equality_delete_impls |
177 | | std::map<std::vector<int>, int> _equality_delete_block_map; |
178 | | // EqualityDeleteBase stores raw pointers to these blocks, so do not modify this vector after |
179 | | // creating entries in _equality_delete_impls. |
180 | | std::vector<Block> _equality_delete_blocks; |
181 | | std::vector<std::unique_ptr<EqualityDeleteBase>> _equality_delete_impls; |
182 | | |
183 | | // id -> block column name. |
184 | | std::unordered_map<int, std::string> _id_to_block_column_name; |
185 | | |
186 | | std::shared_ptr<RowLineageColumns> _row_lineage_columns; |
187 | | }; |
188 | | |
189 | | class IcebergParquetReader final : public IcebergTableReader { |
190 | | public: |
191 | | ENABLE_FACTORY_CREATOR(IcebergParquetReader); |
192 | | |
193 | | IcebergParquetReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, |
194 | | RuntimeState* state, const TFileScanRangeParams& params, |
195 | | const TFileRangeDesc& range, ShardedKVCache* kv_cache, |
196 | | io::IOContext* io_ctx, FileMetaCache* meta_cache) |
197 | 18.0k | : IcebergTableReader(std::move(file_format_reader), profile, state, params, range, |
198 | 18.0k | kv_cache, io_ctx, meta_cache) {} |
199 | | Status init_reader( |
200 | | const std::vector<std::string>& file_col_names, |
201 | | std::unordered_map<std::string, uint32_t>* col_name_to_block_idx, |
202 | | const VExprContextSPtrs& conjuncts, |
203 | | phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>& |
204 | | slot_id_to_predicates, |
205 | | const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, |
206 | | const std::unordered_map<std::string, int>* colname_to_slot_id, |
207 | | const VExprContextSPtrs* not_single_slot_filter_conjuncts, |
208 | | const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts); |
209 | | |
210 | 3.00k | void set_delete_rows() final { |
211 | 3.00k | auto* parquet_reader = (ParquetReader*)(_file_format_reader.get()); |
212 | 3.00k | parquet_reader->set_delete_rows(_iceberg_delete_rows); |
213 | 3.00k | } |
214 | | |
215 | | private: |
216 | | static ColumnIdResult _create_column_ids(const FieldDescriptor* field_desc, |
217 | | const TupleDescriptor* tuple_descriptor); |
218 | | Status _process_equality_delete(const std::vector<TIcebergDeleteFileDesc>& delete_files) final; |
219 | | |
220 | | const FieldDescriptor* _data_file_field_desc = nullptr; |
221 | | }; |
222 | | class IcebergOrcReader final : public IcebergTableReader { |
223 | | public: |
224 | | ENABLE_FACTORY_CREATOR(IcebergOrcReader); |
225 | | |
226 | | IcebergOrcReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, |
227 | | RuntimeState* state, const TFileScanRangeParams& params, |
228 | | const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx, |
229 | | FileMetaCache* meta_cache) |
230 | 6.56k | : IcebergTableReader(std::move(file_format_reader), profile, state, params, range, |
231 | 6.56k | kv_cache, io_ctx, meta_cache) {} |
232 | | |
233 | 976 | void set_delete_rows() final { |
234 | 976 | auto* orc_reader = (OrcReader*)_file_format_reader.get(); |
235 | 976 | orc_reader->set_position_delete_rowids(_iceberg_delete_rows); |
236 | 976 | } |
237 | | |
238 | | Status init_reader( |
239 | | const std::vector<std::string>& file_col_names, |
240 | | std::unordered_map<std::string, uint32_t>* col_name_to_block_idx, |
241 | | const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, |
242 | | const RowDescriptor* row_descriptor, |
243 | | const std::unordered_map<std::string, int>* colname_to_slot_id, |
244 | | const VExprContextSPtrs* not_single_slot_filter_conjuncts, |
245 | | const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts); |
246 | | |
247 | | private: |
248 | | Status _process_equality_delete(const std::vector<TIcebergDeleteFileDesc>& delete_files) final; |
249 | | |
250 | | static ColumnIdResult _create_column_ids(const orc::Type* orc_type, |
251 | | const TupleDescriptor* tuple_descriptor); |
252 | | |
253 | | private: |
254 | | static const std::string ICEBERG_ORC_ATTRIBUTE; |
255 | | const orc::Type* _data_file_type_desc = nullptr; |
256 | | }; |
257 | | |
258 | | #include "common/compile_check_end.h" |
259 | | } // namespace doris |