be/src/format/table/iceberg_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <cstddef> |
21 | | #include <cstdint> |
22 | | #include <string> |
23 | | #include <unordered_map> |
24 | | #include <unordered_set> |
25 | | #include <utility> |
26 | | #include <vector> |
27 | | |
28 | | #include "common/status.h" |
29 | | #include "core/column/column_dictionary.h" |
30 | | #include "core/data_type/define_primitive_type.h" |
31 | | #include "core/data_type/primitive_type.h" |
32 | | #include "exprs/vslot_ref.h" |
33 | | #include "format/orc/vorc_reader.h" |
34 | | #include "format/parquet/vparquet_reader.h" |
35 | | #include "format/table/equality_delete.h" |
36 | | #include "format/table/table_format_reader.h" |
37 | | #include "storage/olap_scan_common.h" |
38 | | |
39 | | namespace tparquet { |
40 | | class KeyValue; |
41 | | } // namespace tparquet |
42 | | |
43 | | namespace doris { |
44 | | #include "common/compile_check_begin.h" |
45 | | class RowDescriptor; |
46 | | class RuntimeState; |
47 | | class SlotDescriptor; |
48 | | class TFileRangeDesc; |
49 | | class TFileScanRangeParams; |
50 | | class TIcebergDeleteFileDesc; |
51 | | class TupleDescriptor; |
52 | | |
53 | | namespace io { |
54 | | struct IOContext; |
55 | | } // namespace io |
56 | | template <typename T> |
57 | | class ColumnStr; |
58 | | using ColumnString = ColumnStr<UInt32>; |
59 | | class Block; |
60 | | class GenericReader; |
61 | | class ShardedKVCache; |
62 | | class VExprContext; |
63 | | |
64 | | class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHelper { |
65 | | public: |
66 | | struct PositionDeleteRange { |
67 | | std::vector<std::string> data_file_path; |
68 | | std::vector<std::pair<int, int>> range; |
69 | | }; |
70 | | |
71 | | IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, |
72 | | RuntimeState* state, const TFileScanRangeParams& params, |
73 | | const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx, |
74 | | FileMetaCache* meta_cache); |
75 | 23.5k | ~IcebergTableReader() override = default; |
76 | | |
77 | | Status init_row_filters() final; |
78 | | |
79 | | Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; |
80 | | |
81 | | enum { DATA, POSITION_DELETE, EQUALITY_DELETE, DELETION_VECTOR }; |
82 | | enum Fileformat { NONE, PARQUET, ORC, AVRO }; |
83 | | |
84 | | virtual void set_delete_rows() = 0; |
85 | | |
86 | | Status read_deletion_vector(const std::string& data_file_path, |
87 | | const TIcebergDeleteFileDesc& delete_file_desc); |
88 | | |
89 | | protected: |
90 | | struct IcebergProfile { |
91 | | RuntimeProfile::Counter* num_delete_files; |
92 | | RuntimeProfile::Counter* num_delete_rows; |
93 | | RuntimeProfile::Counter* delete_files_read_time; |
94 | | RuntimeProfile::Counter* delete_rows_sort_time; |
95 | | RuntimeProfile::Counter* parse_delete_file_time; |
96 | | }; |
97 | | using DeleteRows = std::vector<int64_t>; |
98 | | using DeleteFile = phmap::parallel_flat_hash_map< |
99 | | std::string, std::unique_ptr<DeleteRows>, std::hash<std::string>, std::equal_to<>, |
100 | | std::allocator<std::pair<const std::string, std::unique_ptr<DeleteRows>>>, 8, |
101 | | std::mutex>; |
102 | | /** |
103 | | * https://iceberg.apache.org/spec/#position-delete-files |
104 | | * The rows in the delete file must be sorted by file_path then position to optimize filtering rows while scanning. |
105 | | * Sorting by file_path allows filter pushdown by file in columnar storage formats. |
106 | | * Sorting by position allows filtering rows while scanning, to avoid keeping deletes in memory. |
107 | | */ |
108 | | static void _sort_delete_rows(const std::vector<std::vector<int64_t>*>& delete_rows_array, |
109 | | int64_t num_delete_rows, std::vector<int64_t>& result); |
110 | | |
111 | | PositionDeleteRange _get_range(const ColumnDictI32& file_path_column); |
112 | | |
113 | | PositionDeleteRange _get_range(const ColumnString& file_path_column); |
114 | | |
115 | 5.26k | static std::string _delet_file_cache_key(const std::string& path) { return "delete_" + path; } |
116 | | |
117 | | Status _position_delete_base(const std::string data_file_path, |
118 | | const std::vector<TIcebergDeleteFileDesc>& delete_files); |
119 | | virtual Status _process_equality_delete( |
120 | | const std::vector<TIcebergDeleteFileDesc>& delete_files) = 0; |
121 | | void _generate_equality_delete_block(Block* block, |
122 | | const std::vector<std::string>& equality_delete_col_names, |
123 | | const std::vector<DataTypePtr>& equality_delete_col_types); |
124 | | // Equality delete should read the primary columns. Add the missing columns |
125 | | Status _expand_block_if_need(Block* block); |
126 | | // Remove the added delete columns |
127 | | Status _shrink_block_if_need(Block* block); |
128 | | |
129 | | // owned by scan node |
130 | | ShardedKVCache* _kv_cache; |
131 | | IcebergProfile _iceberg_profile; |
132 | | // _iceberg_delete_rows from kv_cache |
133 | | const std::vector<int64_t>* _iceberg_delete_rows = nullptr; |
134 | | |
135 | | // Pointer to external column name to block index mapping (from FileScanner) |
136 | | // Used to dynamically add expand columns for equality delete |
137 | | std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = nullptr; |
138 | | |
139 | | Fileformat _file_format = Fileformat::NONE; |
140 | | |
141 | | const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2; |
142 | | const std::string ICEBERG_FILE_PATH = "file_path"; |
143 | | const std::string ICEBERG_ROW_POS = "pos"; |
144 | | const std::vector<std::string> delete_file_col_names {ICEBERG_FILE_PATH, ICEBERG_ROW_POS}; |
145 | | const std::unordered_map<std::string, uint32_t> DELETE_COL_NAME_TO_BLOCK_IDX = { |
146 | | {ICEBERG_FILE_PATH, 0}, {ICEBERG_ROW_POS, 1}}; |
147 | | const int ICEBERG_FILE_PATH_INDEX = 0; |
148 | | const int ICEBERG_FILE_POS_INDEX = 1; |
149 | | const int READ_DELETE_FILE_BATCH_SIZE = 102400; |
150 | | |
151 | | //Read position_delete_file TFileRangeDesc, generate DeleteFile |
152 | | virtual Status _read_position_delete_file(const TFileRangeDesc*, DeleteFile*) = 0; |
153 | | |
154 | | void _gen_position_delete_file_range(Block& block, DeleteFile* const position_delete, |
155 | | size_t read_rows, bool file_path_column_dictionary_coded); |
156 | | |
157 | | // read table colummn + extra equality delete columns |
158 | | std::vector<std::string> _all_required_col_names; |
159 | | |
160 | | // extra equality delete name and type |
161 | | std::vector<std::string> _expand_col_names; |
162 | | std::vector<ColumnWithTypeAndName> _expand_columns; |
163 | | |
164 | | // all ids that need read for eq delete (from all qe delte file.) |
165 | | std::set<int> _equality_delete_col_ids; |
166 | | // eq delete column ids -> location of _equality_delete_blocks / _equality_delete_impls |
167 | | std::map<std::vector<int>, int> _equality_delete_block_map; |
168 | | // EqualityDeleteBase stores raw pointers to these blocks, so do not modify this vector after |
169 | | // creating entries in _equality_delete_impls. |
170 | | std::vector<Block> _equality_delete_blocks; |
171 | | std::vector<std::unique_ptr<EqualityDeleteBase>> _equality_delete_impls; |
172 | | |
173 | | // id -> block column name. |
174 | | std::unordered_map<int, std::string> _id_to_block_column_name; |
175 | | }; |
176 | | |
177 | | class IcebergParquetReader final : public IcebergTableReader { |
178 | | public: |
179 | | ENABLE_FACTORY_CREATOR(IcebergParquetReader); |
180 | | |
181 | | IcebergParquetReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, |
182 | | RuntimeState* state, const TFileScanRangeParams& params, |
183 | | const TFileRangeDesc& range, ShardedKVCache* kv_cache, |
184 | | io::IOContext* io_ctx, FileMetaCache* meta_cache) |
185 | 17.5k | : IcebergTableReader(std::move(file_format_reader), profile, state, params, range, |
186 | 17.5k | kv_cache, io_ctx, meta_cache) {} |
187 | | Status init_reader( |
188 | | const std::vector<std::string>& file_col_names, |
189 | | std::unordered_map<std::string, uint32_t>* col_name_to_block_idx, |
190 | | const VExprContextSPtrs& conjuncts, |
191 | | phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>& |
192 | | slot_id_to_predicates, |
193 | | const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, |
194 | | const std::unordered_map<std::string, int>* colname_to_slot_id, |
195 | | const VExprContextSPtrs* not_single_slot_filter_conjuncts, |
196 | | const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts); |
197 | | |
198 | 2.83k | void set_delete_rows() final { |
199 | 2.83k | auto* parquet_reader = (ParquetReader*)(_file_format_reader.get()); |
200 | 2.83k | parquet_reader->set_delete_rows(_iceberg_delete_rows); |
201 | 2.83k | } |
202 | | |
203 | | private: |
204 | | static ColumnIdResult _create_column_ids(const FieldDescriptor* field_desc, |
205 | | const TupleDescriptor* tuple_descriptor); |
206 | | |
207 | | Status _read_position_delete_file(const TFileRangeDesc* delete_range, |
208 | | DeleteFile* position_delete) final; |
209 | | Status _process_equality_delete(const std::vector<TIcebergDeleteFileDesc>& delete_files) final; |
210 | | |
211 | | const FieldDescriptor* _data_file_field_desc = nullptr; |
212 | | }; |
213 | | class IcebergOrcReader final : public IcebergTableReader { |
214 | | public: |
215 | | ENABLE_FACTORY_CREATOR(IcebergOrcReader); |
216 | | |
217 | | Status _read_position_delete_file(const TFileRangeDesc* delete_range, |
218 | | DeleteFile* position_delete) final; |
219 | | |
220 | | IcebergOrcReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, |
221 | | RuntimeState* state, const TFileScanRangeParams& params, |
222 | | const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx, |
223 | | FileMetaCache* meta_cache) |
224 | 6.01k | : IcebergTableReader(std::move(file_format_reader), profile, state, params, range, |
225 | 6.01k | kv_cache, io_ctx, meta_cache) {} |
226 | | |
227 | 812 | void set_delete_rows() final { |
228 | 812 | auto* orc_reader = (OrcReader*)_file_format_reader.get(); |
229 | 812 | orc_reader->set_position_delete_rowids(_iceberg_delete_rows); |
230 | 812 | } |
231 | | |
232 | | Status init_reader( |
233 | | const std::vector<std::string>& file_col_names, |
234 | | std::unordered_map<std::string, uint32_t>* col_name_to_block_idx, |
235 | | const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, |
236 | | const RowDescriptor* row_descriptor, |
237 | | const std::unordered_map<std::string, int>* colname_to_slot_id, |
238 | | const VExprContextSPtrs* not_single_slot_filter_conjuncts, |
239 | | const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts); |
240 | | |
241 | | private: |
242 | | Status _process_equality_delete(const std::vector<TIcebergDeleteFileDesc>& delete_files) final; |
243 | | |
244 | | static ColumnIdResult _create_column_ids(const orc::Type* orc_type, |
245 | | const TupleDescriptor* tuple_descriptor); |
246 | | |
247 | | private: |
248 | | static const std::string ICEBERG_ORC_ATTRIBUTE; |
249 | | const orc::Type* _data_file_type_desc = nullptr; |
250 | | }; |
251 | | |
252 | | #include "common/compile_check_end.h" |
253 | | } // namespace doris |