be/src/storage/index/indexed_column_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/index/indexed_column_writer.h" |
19 | | |
20 | | #include <gen_cpp/segment_v2.pb.h> |
21 | | |
22 | | #include <ostream> |
23 | | #include <string> |
24 | | |
25 | | #include "common/logging.h" |
26 | | #include "io/fs/file_writer.h" |
27 | | #include "storage/index/index_page.h" |
28 | | #include "storage/key_coder.h" |
29 | | #include "storage/olap_common.h" |
30 | | #include "storage/segment/encoding_info.h" |
31 | | #include "storage/segment/options.h" |
32 | | #include "storage/segment/page_builder.h" |
33 | | #include "storage/segment/page_io.h" |
34 | | #include "storage/segment/page_pointer.h" |
35 | | #include "storage/types.h" |
36 | | #include "util/block_compression.h" |
37 | | #include "util/slice.h" |
38 | | |
39 | | namespace doris { |
40 | | namespace segment_v2 { |
41 | | #include "common/compile_check_begin.h" |
42 | | |
43 | | IndexedColumnWriter::IndexedColumnWriter(const IndexedColumnWriterOptions& options, |
44 | | const TypeInfo* type_info, io::FileWriter* file_writer) |
45 | 16.0k | : _options(options), |
46 | 16.0k | _type_info(type_info), |
47 | 16.0k | _file_writer(file_writer), |
48 | 16.0k | _num_values(0), |
49 | 16.0k | _num_data_pages(0), |
50 | 16.0k | _disk_size(0), |
51 | 16.0k | _value_key_coder(nullptr), |
52 | 16.0k | _compress_codec(nullptr) { |
53 | 16.0k | _first_value.resize(_type_info->size()); |
54 | 16.0k | } |
55 | | |
56 | 16.0k | IndexedColumnWriter::~IndexedColumnWriter() = default; |
57 | | |
58 | 16.0k | Status IndexedColumnWriter::init() { |
59 | 16.0k | const EncodingInfo* encoding_info; |
60 | 16.0k | RETURN_IF_ERROR(EncodingInfo::get(_type_info->type(), _options.encoding, {}, &encoding_info)); |
61 | 16.0k | _options.encoding = encoding_info->encoding(); |
62 | | // should store more concrete encoding type instead of DEFAULT_ENCODING |
63 | | // because the default encoding of a data type can be changed in the future |
64 | 16.0k | DCHECK_NE(_options.encoding, DEFAULT_ENCODING); |
65 | | |
66 | 16.0k | PageBuilder* data_page_builder = nullptr; |
67 | 16.0k | PageBuilderOptions builder_option; |
68 | 16.0k | builder_option.need_check_bitmap = false; |
69 | 16.0k | builder_option.data_page_size = _options.data_page_size; |
70 | 16.0k | RETURN_IF_ERROR(encoding_info->create_page_builder(builder_option, &data_page_builder)); |
71 | 16.0k | _data_page_builder.reset(data_page_builder); |
72 | | |
73 | 16.0k | if (_options.write_ordinal_index) { |
74 | 16.0k | _ordinal_index_builder.reset(new IndexPageBuilder(_options.index_page_size, true)); |
75 | 16.0k | } |
76 | 16.0k | if (_options.write_value_index) { |
77 | 206 | _value_index_builder.reset(new IndexPageBuilder(_options.index_page_size, true)); |
78 | 206 | _value_key_coder = get_key_coder(_type_info->type()); |
79 | 206 | } |
80 | | |
81 | 16.0k | if (_options.compression != NO_COMPRESSION) { |
82 | 206 | RETURN_IF_ERROR(get_block_compression_codec(_options.compression, &_compress_codec)); |
83 | 206 | } |
84 | 16.0k | return Status::OK(); |
85 | 16.0k | } |
86 | | |
87 | 154k | Status IndexedColumnWriter::add(const void* value) { |
88 | 154k | if (_options.write_value_index && _data_page_builder->count() == 0) { |
89 | | // remember page's first value because it's used to build value index |
90 | 239 | _type_info->deep_copy(_first_value.data(), value, _arena); |
91 | 239 | } |
92 | 154k | size_t num_to_write = 1; |
93 | 154k | RETURN_IF_ERROR( |
94 | 154k | _data_page_builder->add(reinterpret_cast<const uint8_t*>(value), &num_to_write)); |
95 | 154k | CHECK(num_to_write == 1 || num_to_write == 0); |
96 | 154k | if (num_to_write == 0) { |
97 | 0 | CHECK(_data_page_builder->is_page_full()); |
98 | | // current page is already full, we need to first flush the current page, |
99 | | // and then add the value to the new page |
100 | 0 | size_t num_val; |
101 | 0 | RETURN_IF_ERROR(_finish_current_data_page(num_val)); |
102 | 0 | return add(value); |
103 | 0 | } |
104 | 154k | _num_values++; |
105 | 154k | size_t num_val; |
106 | 154k | if (_data_page_builder->is_page_full()) { |
107 | 33 | RETURN_IF_ERROR(_finish_current_data_page(num_val)); |
108 | 33 | } |
109 | 154k | return Status::OK(); |
110 | 154k | } |
111 | | |
112 | 16.1k | Status IndexedColumnWriter::_finish_current_data_page(size_t& num_val) { |
113 | 16.1k | auto num_values_in_page = _data_page_builder->count(); |
114 | 16.1k | num_val = num_values_in_page; |
115 | 16.1k | if (num_values_in_page == 0) { |
116 | 10 | return Status::OK(); |
117 | 10 | } |
118 | 16.1k | ordinal_t first_ordinal = _num_values - num_values_in_page; |
119 | | |
120 | | // IndexedColumn doesn't have NULLs, thus data page body only contains encoded values |
121 | 16.1k | OwnedSlice page_body; |
122 | 16.1k | RETURN_IF_ERROR(_data_page_builder->finish(&page_body)); |
123 | 16.1k | RETURN_IF_ERROR(_data_page_builder->reset()); |
124 | | |
125 | 16.1k | PageFooterPB footer; |
126 | 16.1k | footer.set_type(DATA_PAGE); |
127 | 16.1k | footer.set_uncompressed_size(static_cast<uint32_t>(page_body.slice().get_size())); |
128 | 16.1k | footer.mutable_data_page_footer()->set_first_ordinal(first_ordinal); |
129 | 16.1k | footer.mutable_data_page_footer()->set_num_values(num_values_in_page); |
130 | 16.1k | footer.mutable_data_page_footer()->set_nullmap_size(0); |
131 | | |
132 | 16.1k | uint64_t start_size = _file_writer->bytes_appended(); |
133 | 16.1k | RETURN_IF_ERROR(PageIO::compress_and_write_page( |
134 | 16.1k | _compress_codec, _options.compression_min_space_saving, _file_writer, |
135 | 16.1k | {page_body.slice()}, footer, &_last_data_page)); |
136 | 16.1k | _num_data_pages++; |
137 | 16.1k | _disk_size += (_file_writer->bytes_appended() - start_size); |
138 | | |
139 | 16.1k | if (_options.write_ordinal_index) { |
140 | 16.1k | std::string key; |
141 | 16.1k | KeyCoderTraits<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>::full_encode_ascending( |
142 | 16.1k | &first_ordinal, &key); |
143 | 16.1k | _ordinal_index_builder->add(key, _last_data_page); |
144 | 16.1k | } |
145 | | |
146 | 16.1k | if (_options.write_value_index) { |
147 | 239 | std::string key; |
148 | 239 | _value_key_coder->full_encode_ascending(_first_value.data(), &key); |
149 | | // TODO short separate key optimize |
150 | 239 | _value_index_builder->add(key, _last_data_page); |
151 | | // TODO record last key in short separate key optimize |
152 | 239 | } |
153 | 16.1k | return Status::OK(); |
154 | 16.1k | } |
155 | | |
156 | 16.0k | Status IndexedColumnWriter::finish(IndexedColumnMetaPB* meta) { |
157 | 16.0k | size_t num_val_in_page; |
158 | 16.0k | RETURN_IF_ERROR(_finish_current_data_page(num_val_in_page)); |
159 | 16.0k | if (_options.write_ordinal_index) { |
160 | 16.0k | RETURN_IF_ERROR( |
161 | 16.0k | _flush_index(_ordinal_index_builder.get(), meta->mutable_ordinal_index_meta())); |
162 | 16.0k | } |
163 | 16.0k | if (_options.write_value_index) { |
164 | 206 | RETURN_IF_ERROR(_flush_index(_value_index_builder.get(), meta->mutable_value_index_meta())); |
165 | 206 | } |
166 | 16.0k | meta->set_data_type(int(_type_info->type())); |
167 | 16.0k | meta->set_encoding(_options.encoding); |
168 | 16.0k | meta->set_num_values(_num_values); |
169 | 16.0k | meta->set_compression(_options.compression); |
170 | | // `_finish_current_data_page` will be called in `add` function when page is full, |
171 | | // so num_val_in_page will be zero in this case. |
172 | 16.0k | if (_num_data_pages <= 1 && num_val_in_page != 0) { |
173 | 16.0k | DCHECK(num_val_in_page == _num_values) |
174 | 0 | << "num_val_in_page: " << num_val_in_page << ", _num_values: " << _num_values; |
175 | 16.0k | } |
176 | 16.0k | return Status::OK(); |
177 | 16.0k | } |
178 | | |
179 | 16.3k | Status IndexedColumnWriter::_flush_index(IndexPageBuilder* index_builder, BTreeMetaPB* meta) { |
180 | 16.3k | if (_num_data_pages <= 1) { |
181 | 16.2k | meta->set_is_root_data_page(true); |
182 | 16.2k | _last_data_page.to_proto(meta->mutable_root_page()); |
183 | 16.2k | } else { |
184 | 42 | OwnedSlice page_body; |
185 | 42 | PageFooterPB page_footer; |
186 | 42 | index_builder->finish(&page_body, &page_footer); |
187 | | |
188 | 42 | PagePointer pp; |
189 | 42 | uint64_t start_size = _file_writer->bytes_appended(); |
190 | 42 | RETURN_IF_ERROR(PageIO::compress_and_write_page( |
191 | 42 | _compress_codec, _options.compression_min_space_saving, _file_writer, |
192 | 42 | {page_body.slice()}, page_footer, &pp)); |
193 | 42 | _disk_size += (_file_writer->bytes_appended() - start_size); |
194 | | |
195 | 42 | meta->set_is_root_data_page(false); |
196 | 42 | pp.to_proto(meta->mutable_root_page()); |
197 | 42 | } |
198 | 16.3k | return Status::OK(); |
199 | 16.3k | } |
200 | | |
201 | | #include "common/compile_check_end.h" |
202 | | } // namespace segment_v2 |
203 | | } // namespace doris |