/root/doris/be/src/olap/merger.cpp
| Line | Count | Source | 
| 1 |  | // Licensed to the Apache Software Foundation (ASF) under one | 
| 2 |  | // or more contributor license agreements.  See the NOTICE file | 
| 3 |  | // distributed with this work for additional information | 
| 4 |  | // regarding copyright ownership.  The ASF licenses this file | 
| 5 |  | // to you under the Apache License, Version 2.0 (the | 
| 6 |  | // "License"); you may not use this file except in compliance | 
| 7 |  | // with the License.  You may obtain a copy of the License at | 
| 8 |  | // | 
| 9 |  | //   http://www.apache.org/licenses/LICENSE-2.0 | 
| 10 |  | // | 
| 11 |  | // Unless required by applicable law or agreed to in writing, | 
| 12 |  | // software distributed under the License is distributed on an | 
| 13 |  | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 14 |  | // KIND, either express or implied.  See the License for the | 
| 15 |  | // specific language governing permissions and limitations | 
| 16 |  | // under the License. | 
| 17 |  |  | 
| 18 |  | #include "olap/merger.h" | 
| 19 |  |  | 
| 20 |  | #include <gen_cpp/olap_file.pb.h> | 
| 21 |  | #include <gen_cpp/types.pb.h> | 
| 22 |  | #include <stddef.h> | 
| 23 |  | #include <unistd.h> | 
| 24 |  |  | 
| 25 |  | #include <algorithm> | 
| 26 |  | #include <iterator> | 
| 27 |  | #include <memory> | 
| 28 |  | #include <mutex> | 
| 29 |  | #include <numeric> | 
| 30 |  | #include <ostream> | 
| 31 |  | #include <shared_mutex> | 
| 32 |  | #include <string> | 
| 33 |  | #include <utility> | 
| 34 |  | #include <vector> | 
| 35 |  |  | 
| 36 |  | #include "cloud/config.h" | 
| 37 |  | #include "common/config.h" | 
| 38 |  | #include "common/logging.h" | 
| 39 |  | #include "common/status.h" | 
| 40 |  | #include "olap/base_tablet.h" | 
| 41 |  | #include "olap/iterators.h" | 
| 42 |  | #include "olap/olap_common.h" | 
| 43 |  | #include "olap/olap_define.h" | 
| 44 |  | #include "olap/rowid_conversion.h" | 
| 45 |  | #include "olap/rowset/rowset.h" | 
| 46 |  | #include "olap/rowset/rowset_meta.h" | 
| 47 |  | #include "olap/rowset/rowset_writer.h" | 
| 48 |  | #include "olap/rowset/segment_v2/segment_writer.h" | 
| 49 |  | #include "olap/storage_engine.h" | 
| 50 |  | #include "olap/tablet.h" | 
| 51 |  | #include "olap/tablet_fwd.h" | 
| 52 |  | #include "olap/tablet_meta.h" | 
| 53 |  | #include "olap/tablet_reader.h" | 
| 54 |  | #include "olap/utils.h" | 
| 55 |  | #include "util/slice.h" | 
| 56 |  | #include "vec/core/block.h" | 
| 57 |  | #include "vec/olap/block_reader.h" | 
| 58 |  | #include "vec/olap/vertical_block_reader.h" | 
| 59 |  | #include "vec/olap/vertical_merge_iterator.h" | 
| 60 |  |  | 
| 61 |  | namespace doris { | 
| 62 |  | #include "common/compile_check_begin.h" | 
| 63 |  | Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, | 
| 64 |  |                               const TabletSchema& cur_tablet_schema, | 
| 65 |  |                               const std::vector<RowsetReaderSharedPtr>& src_rowset_readers, | 
| 66 | 48 |                               RowsetWriter* dst_rowset_writer, Statistics* stats_output) { | 
| 67 | 48 |     if (!cur_tablet_schema.cluster_key_uids().empty()) { | 
| 68 | 0 |         return Status::InternalError( | 
| 69 | 0 |                 "mow table with cluster keys does not support non vertical compaction"); | 
| 70 | 0 |     } | 
| 71 | 48 |     vectorized::BlockReader reader; | 
| 72 | 48 |     TabletReader::ReaderParams reader_params; | 
| 73 | 48 |     reader_params.tablet = tablet; | 
| 74 | 48 |     reader_params.reader_type = reader_type; | 
| 75 |  |  | 
| 76 | 48 |     TabletReadSource read_source; | 
| 77 | 48 |     read_source.rs_splits.reserve(src_rowset_readers.size()); | 
| 78 | 144 |     for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) { | 
| 79 | 144 |         read_source.rs_splits.emplace_back(rs_reader); | 
| 80 | 144 |     } | 
| 81 | 48 |     read_source.fill_delete_predicates(); | 
| 82 | 48 |     reader_params.set_read_source(std::move(read_source)); | 
| 83 |  |  | 
| 84 | 48 |     reader_params.version = dst_rowset_writer->version(); | 
| 85 |  |  | 
| 86 | 48 |     TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>(); | 
| 87 | 48 |     merge_tablet_schema->copy_from(cur_tablet_schema); | 
| 88 |  |  | 
| 89 |  |     // Merge the columns in delete predicate that not in latest schema in to current tablet schema | 
| 90 | 48 |     for (auto& del_pred_rs : reader_params.delete_predicates) { | 
| 91 | 24 |         merge_tablet_schema->merge_dropped_columns(*del_pred_rs->tablet_schema()); | 
| 92 | 24 |     } | 
| 93 | 48 |     reader_params.tablet_schema = merge_tablet_schema; | 
| 94 | 48 |     if (!tablet->tablet_schema()->cluster_key_uids().empty()) { | 
| 95 | 0 |         reader_params.delete_bitmap = tablet->tablet_meta()->delete_bitmap_ptr(); | 
| 96 | 0 |     } | 
| 97 |  |  | 
| 98 | 48 |     if (stats_output && stats_output->rowid_conversion) { | 
| 99 | 48 |         reader_params.record_rowids = true; | 
| 100 | 48 |         reader_params.rowid_conversion = stats_output->rowid_conversion; | 
| 101 | 48 |         stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id()); | 
| 102 | 48 |     } | 
| 103 |  |  | 
| 104 | 48 |     reader_params.return_columns.resize(cur_tablet_schema.num_columns()); | 
| 105 | 48 |     std::iota(reader_params.return_columns.begin(), reader_params.return_columns.end(), 0); | 
| 106 | 48 |     reader_params.origin_return_columns = &reader_params.return_columns; | 
| 107 | 48 |     RETURN_IF_ERROR(reader.init(reader_params)); | 
| 108 |  |  | 
| 109 | 48 |     vectorized::Block block = cur_tablet_schema.create_block(reader_params.return_columns); | 
| 110 | 48 |     size_t output_rows = 0; | 
| 111 | 48 |     bool eof = false; | 
| 112 | 626 |     while (!eof && !ExecEnv::GetInstance()->storage_engine().stopped()) { | 
| 113 | 578 |         auto tablet_state = tablet->tablet_state(); | 
| 114 | 578 |         if (tablet_state != TABLET_RUNNING && tablet_state != TABLET_NOTREADY) { | 
| 115 | 0 |             tablet->clear_cache(); | 
| 116 | 0 |             return Status::Error<INTERNAL_ERROR>("tablet {} is not used any more", | 
| 117 | 0 |                                                  tablet->tablet_id()); | 
| 118 | 0 |         } | 
| 119 |  |  | 
| 120 |  |         // Read one block from block reader | 
| 121 | 578 |         RETURN_NOT_OK_STATUS_WITH_WARN(reader.next_block_with_aggregation(&block, &eof), | 
| 122 | 578 |                                        "failed to read next block when merging rowsets of tablet " + | 
| 123 | 578 |                                                std::to_string(tablet->tablet_id())); | 
| 124 | 578 |         RETURN_NOT_OK_STATUS_WITH_WARN(dst_rowset_writer->add_block(&block), | 
| 125 | 578 |                                        "failed to write block when merging rowsets of tablet " + | 
| 126 | 578 |                                                std::to_string(tablet->tablet_id())); | 
| 127 |  |  | 
| 128 | 578 |         if (reader_params.record_rowids && block.rows() > 0) { | 
| 129 | 578 |             std::vector<uint32_t> segment_num_rows; | 
| 130 | 578 |             RETURN_IF_ERROR(dst_rowset_writer->get_segment_num_rows(&segment_num_rows)); | 
| 131 | 578 |             stats_output->rowid_conversion->add(reader.current_block_row_locations(), | 
| 132 | 578 |                                                 segment_num_rows); | 
| 133 | 578 |         } | 
| 134 |  |  | 
| 135 | 578 |         output_rows += block.rows(); | 
| 136 | 578 |         block.clear_column_data(); | 
| 137 | 578 |     } | 
| 138 | 48 |     if (ExecEnv::GetInstance()->storage_engine().stopped()) { | 
| 139 | 0 |         return Status::Error<INTERNAL_ERROR>("tablet {} failed to do compaction, engine stopped", | 
| 140 | 0 |                                              tablet->tablet_id()); | 
| 141 | 0 |     } | 
| 142 |  |  | 
| 143 | 48 |     if (stats_output != nullptr) { | 
| 144 | 48 |         stats_output->output_rows = output_rows; | 
| 145 | 48 |         stats_output->merged_rows = reader.merged_rows(); | 
| 146 | 48 |         stats_output->filtered_rows = reader.filtered_rows(); | 
| 147 | 48 |         stats_output->bytes_read_from_local = reader.stats().file_cache_stats.bytes_read_from_local; | 
| 148 | 48 |         stats_output->bytes_read_from_remote = | 
| 149 | 48 |                 reader.stats().file_cache_stats.bytes_read_from_remote; | 
| 150 | 48 |         stats_output->cached_bytes_total = reader.stats().file_cache_stats.bytes_write_into_cache; | 
| 151 | 48 |         if (config::is_cloud_mode()) { | 
| 152 | 0 |             stats_output->cloud_local_read_time = | 
| 153 | 0 |                     reader.stats().file_cache_stats.local_io_timer / 1000; | 
| 154 | 0 |             stats_output->cloud_remote_read_time = | 
| 155 | 0 |                     reader.stats().file_cache_stats.remote_io_timer / 1000; | 
| 156 | 0 |         } | 
| 157 | 48 |     } | 
| 158 |  |  | 
| 159 | 48 |     RETURN_NOT_OK_STATUS_WITH_WARN(dst_rowset_writer->flush(), | 
| 160 | 48 |                                    "failed to flush rowset when merging rowsets of tablet " + | 
| 161 | 48 |                                            std::to_string(tablet->tablet_id())); | 
| 162 |  |  | 
| 163 | 48 |     return Status::OK(); | 
| 164 | 48 | } | 
| 165 |  |  | 
| 166 |  | // split columns into several groups, make sure all keys in one group | 
| 167 |  | // unique_key should consider sequence&delete column | 
| 168 |  | void Merger::vertical_split_columns(const TabletSchema& tablet_schema, | 
| 169 |  |                                     std::vector<std::vector<uint32_t>>* column_groups, | 
| 170 | 94 |                                     std::vector<uint32_t>* key_group_cluster_key_idxes) { | 
| 171 | 94 |     size_t num_key_cols = tablet_schema.num_key_columns(); | 
| 172 | 94 |     size_t total_cols = tablet_schema.num_columns(); | 
| 173 | 94 |     std::vector<uint32_t> key_columns; | 
| 174 | 182 |     for (auto i = 0; i < num_key_cols; ++i) { | 
| 175 | 88 |         key_columns.emplace_back(i); | 
| 176 | 88 |     } | 
| 177 |  |     // in unique key, sequence & delete sign column should merge with key columns | 
| 178 | 94 |     int32_t sequence_col_idx = -1; | 
| 179 | 94 |     int32_t delete_sign_idx = -1; | 
| 180 |  |     // in key column compaction, seq_col real index is _num_key_columns | 
| 181 |  |     // and delete_sign column is _block->columns() - 1 | 
| 182 | 94 |     if (tablet_schema.keys_type() == KeysType::UNIQUE_KEYS) { | 
| 183 | 49 |         if (tablet_schema.has_sequence_col()) { | 
| 184 | 4 |             sequence_col_idx = tablet_schema.sequence_col_idx(); | 
| 185 | 4 |             key_columns.emplace_back(sequence_col_idx); | 
| 186 | 4 |         } | 
| 187 | 49 |         delete_sign_idx = tablet_schema.field_index(DELETE_SIGN); | 
| 188 | 49 |         if (delete_sign_idx != -1) { | 
| 189 | 43 |             key_columns.emplace_back(delete_sign_idx); | 
| 190 | 43 |         } | 
| 191 | 49 |         if (!tablet_schema.cluster_key_uids().empty()) { | 
| 192 | 0 |             for (const auto& cid : tablet_schema.cluster_key_uids()) { | 
| 193 | 0 |                 auto idx = tablet_schema.field_index(cid); | 
| 194 | 0 |                 DCHECK(idx >= 0) << "could not find cluster key column with unique_id=" << cid | 
| 195 | 0 |                                  << " in tablet schema, table_id=" << tablet_schema.table_id(); | 
| 196 | 0 |                 if (idx >= num_key_cols) { | 
| 197 | 0 |                     key_columns.emplace_back(idx); | 
| 198 | 0 |                 } | 
| 199 | 0 |             } | 
| 200 |  |             // tablet schema unique ids: [1, 2, 5, 3, 6, 4], [1 2] is key columns | 
| 201 |  |             // cluster key unique ids: [3, 1, 4] | 
| 202 |  |             // the key_columns should be [0, 1, 3, 5] | 
| 203 |  |             // the key_group_cluster_key_idxes should be [2, 1, 3] | 
| 204 | 0 |             for (const auto& cid : tablet_schema.cluster_key_uids()) { | 
| 205 | 0 |                 auto idx = tablet_schema.field_index(cid); | 
| 206 | 0 |                 for (auto i = 0; i < key_columns.size(); ++i) { | 
| 207 | 0 |                     if (idx == key_columns[i]) { | 
| 208 | 0 |                         key_group_cluster_key_idxes->emplace_back(i); | 
| 209 | 0 |                         break; | 
| 210 | 0 |                     } | 
| 211 | 0 |                 } | 
| 212 | 0 |             } | 
| 213 | 0 |         } | 
| 214 | 49 |     } | 
| 215 | 94 |     VLOG_NOTICE << "sequence_col_idx=" << sequence_col_idx | 
| 216 | 0 |                 << ", delete_sign_idx=" << delete_sign_idx; | 
| 217 |  |     // for duplicate no keys | 
| 218 | 94 |     if (!key_columns.empty()) { | 
| 219 | 77 |         column_groups->emplace_back(key_columns); | 
| 220 | 77 |     } | 
| 221 |  |  | 
| 222 | 94 |     std::vector<uint32_t> value_columns; | 
| 223 |  |  | 
| 224 | 988 |     for (size_t i = num_key_cols; i < total_cols; ++i) { | 
| 225 | 894 |         if (i == sequence_col_idx || i == delete_sign_idx || | 
| 226 | 894 |             key_columns.end() != std::find(key_columns.begin(), key_columns.end(), i)) { | 
| 227 | 47 |             continue; | 
| 228 | 47 |         } | 
| 229 |  |  | 
| 230 | 847 |         if (!value_columns.empty() && | 
| 231 | 847 |             value_columns.size() % config::vertical_compaction_num_columns_per_group == 0) { | 
| 232 | 140 |             column_groups->push_back(value_columns); | 
| 233 | 140 |             value_columns.clear(); | 
| 234 | 140 |         } | 
| 235 | 847 |         value_columns.push_back(cast_set<uint32_t>(i)); | 
| 236 | 847 |     } | 
| 237 |  |  | 
| 238 | 94 |     if (!value_columns.empty()) { | 
| 239 | 94 |         column_groups->push_back(value_columns); | 
| 240 | 94 |     } | 
| 241 | 94 | } | 
| 242 |  |  | 
| 243 |  | Status Merger::vertical_compact_one_group( | 
| 244 |  |         BaseTabletSPtr tablet, ReaderType reader_type, const TabletSchema& tablet_schema, | 
| 245 |  |         bool is_key, const std::vector<uint32_t>& column_group, | 
| 246 |  |         vectorized::RowSourcesBuffer* row_source_buf, | 
| 247 |  |         const std::vector<RowsetReaderSharedPtr>& src_rowset_readers, | 
| 248 |  |         RowsetWriter* dst_rowset_writer, uint32_t max_rows_per_segment, Statistics* stats_output, | 
| 249 |  |         std::vector<uint32_t> key_group_cluster_key_idxes, int64_t batch_size, | 
| 250 | 289 |         CompactionSampleInfo* sample_info) { | 
| 251 |  |     // build tablet reader | 
| 252 | 289 |     VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << max_rows_per_segment; | 
| 253 | 289 |     vectorized::VerticalBlockReader reader(row_source_buf); | 
| 254 | 289 |     TabletReader::ReaderParams reader_params; | 
| 255 | 289 |     reader_params.is_key_column_group = is_key; | 
| 256 | 289 |     reader_params.key_group_cluster_key_idxes = key_group_cluster_key_idxes; | 
| 257 | 289 |     reader_params.tablet = tablet; | 
| 258 | 289 |     reader_params.reader_type = reader_type; | 
| 259 |  |  | 
| 260 | 289 |     TabletReadSource read_source; | 
| 261 | 289 |     read_source.rs_splits.reserve(src_rowset_readers.size()); | 
| 262 | 935 |     for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) { | 
| 263 | 935 |         read_source.rs_splits.emplace_back(rs_reader); | 
| 264 | 935 |     } | 
| 265 | 289 |     read_source.fill_delete_predicates(); | 
| 266 | 289 |     reader_params.set_read_source(std::move(read_source)); | 
| 267 |  |  | 
| 268 | 289 |     reader_params.version = dst_rowset_writer->version(); | 
| 269 |  |  | 
| 270 | 289 |     TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>(); | 
| 271 | 289 |     merge_tablet_schema->copy_from(tablet_schema); | 
| 272 |  |  | 
| 273 | 289 |     for (auto& del_pred_rs : reader_params.delete_predicates) { | 
| 274 | 125 |         merge_tablet_schema->merge_dropped_columns(*del_pred_rs->tablet_schema()); | 
| 275 | 125 |     } | 
| 276 |  |  | 
| 277 | 289 |     reader_params.tablet_schema = merge_tablet_schema; | 
| 278 | 289 |     bool has_cluster_key = false; | 
| 279 | 289 |     if (!tablet->tablet_schema()->cluster_key_uids().empty()) { | 
| 280 | 0 |         reader_params.delete_bitmap = tablet->tablet_meta()->delete_bitmap_ptr(); | 
| 281 | 0 |         has_cluster_key = true; | 
| 282 | 0 |     } | 
| 283 |  |  | 
| 284 | 289 |     if (is_key && stats_output && stats_output->rowid_conversion) { | 
| 285 | 83 |         reader_params.record_rowids = true; | 
| 286 | 83 |         reader_params.rowid_conversion = stats_output->rowid_conversion; | 
| 287 | 83 |         stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id()); | 
| 288 | 83 |     } | 
| 289 |  |  | 
| 290 | 289 |     reader_params.return_columns = column_group; | 
| 291 | 289 |     reader_params.origin_return_columns = &reader_params.return_columns; | 
| 292 | 289 |     reader_params.batch_size = batch_size; | 
| 293 | 289 |     RETURN_IF_ERROR(reader.init(reader_params, sample_info)); | 
| 294 |  |  | 
| 295 | 289 |     vectorized::Block block = tablet_schema.create_block(reader_params.return_columns); | 
| 296 | 289 |     size_t output_rows = 0; | 
| 297 | 289 |     bool eof = false; | 
| 298 | 8.07k |     while (!eof && !ExecEnv::GetInstance()->storage_engine().stopped()) { | 
| 299 | 7.78k |         auto tablet_state = tablet->tablet_state(); | 
| 300 | 7.78k |         if (tablet_state != TABLET_RUNNING && tablet_state != TABLET_NOTREADY) { | 
| 301 | 0 |             tablet->clear_cache(); | 
| 302 | 0 |             return Status::Error<INTERNAL_ERROR>("tablet {} is not used any more", | 
| 303 | 0 |                                                  tablet->tablet_id()); | 
| 304 | 0 |         } | 
| 305 |  |         // Read one block from block reader | 
| 306 | 7.78k |         RETURN_NOT_OK_STATUS_WITH_WARN(reader.next_block_with_aggregation(&block, &eof), | 
| 307 | 7.78k |                                        "failed to read next block when merging rowsets of tablet " + | 
| 308 | 7.78k |                                                std::to_string(tablet->tablet_id())); | 
| 309 | 7.78k |         RETURN_NOT_OK_STATUS_WITH_WARN( | 
| 310 | 7.78k |                 dst_rowset_writer->add_columns(&block, column_group, is_key, max_rows_per_segment, | 
| 311 | 7.78k |                                                has_cluster_key), | 
| 312 | 7.78k |                 "failed to write block when merging rowsets of tablet " + | 
| 313 | 7.78k |                         std::to_string(tablet->tablet_id())); | 
| 314 |  |  | 
| 315 | 7.78k |         if (is_key && reader_params.record_rowids && block.rows() > 0) { | 
| 316 | 4.62k |             std::vector<uint32_t> segment_num_rows; | 
| 317 | 4.62k |             RETURN_IF_ERROR(dst_rowset_writer->get_segment_num_rows(&segment_num_rows)); | 
| 318 | 4.62k |             stats_output->rowid_conversion->add(reader.current_block_row_locations(), | 
| 319 | 4.62k |                                                 segment_num_rows); | 
| 320 | 4.62k |         } | 
| 321 | 7.78k |         output_rows += block.rows(); | 
| 322 | 7.78k |         block.clear_column_data(); | 
| 323 | 7.78k |     } | 
| 324 | 289 |     if (ExecEnv::GetInstance()->storage_engine().stopped()) { | 
| 325 | 0 |         return Status::Error<INTERNAL_ERROR>("tablet {} failed to do compaction, engine stopped", | 
| 326 | 0 |                                              tablet->tablet_id()); | 
| 327 | 0 |     } | 
| 328 |  |  | 
| 329 | 289 |     if (is_key && stats_output != nullptr) { | 
| 330 | 83 |         stats_output->output_rows = output_rows; | 
| 331 | 83 |         stats_output->merged_rows = reader.merged_rows(); | 
| 332 | 83 |         stats_output->filtered_rows = reader.filtered_rows(); | 
| 333 | 83 |         stats_output->bytes_read_from_local = reader.stats().file_cache_stats.bytes_read_from_local; | 
| 334 | 83 |         stats_output->bytes_read_from_remote = | 
| 335 | 83 |                 reader.stats().file_cache_stats.bytes_read_from_remote; | 
| 336 | 83 |         stats_output->cached_bytes_total = reader.stats().file_cache_stats.bytes_write_into_cache; | 
| 337 | 83 |         if (config::is_cloud_mode()) { | 
| 338 | 0 |             stats_output->cloud_local_read_time = | 
| 339 | 0 |                     reader.stats().file_cache_stats.local_io_timer / 1000; | 
| 340 | 0 |             stats_output->cloud_remote_read_time = | 
| 341 | 0 |                     reader.stats().file_cache_stats.remote_io_timer / 1000; | 
| 342 | 0 |         } | 
| 343 | 83 |     } | 
| 344 | 289 |     RETURN_IF_ERROR(dst_rowset_writer->flush_columns(is_key)); | 
| 345 |  |  | 
| 346 | 289 |     return Status::OK(); | 
| 347 | 289 | } | 
| 348 |  |  | 
| 349 |  | // for segcompaction | 
| 350 |  | Status Merger::vertical_compact_one_group( | 
| 351 |  |         int64_t tablet_id, ReaderType reader_type, const TabletSchema& tablet_schema, bool is_key, | 
| 352 |  |         const std::vector<uint32_t>& column_group, vectorized::RowSourcesBuffer* row_source_buf, | 
| 353 |  |         vectorized::VerticalBlockReader& src_block_reader, | 
| 354 |  |         segment_v2::SegmentWriter& dst_segment_writer, Statistics* stats_output, | 
| 355 | 22 |         uint64_t* index_size, KeyBoundsPB& key_bounds, SimpleRowIdConversion* rowid_conversion) { | 
| 356 |  |     // TODO: record_rowids | 
| 357 | 22 |     vectorized::Block block = tablet_schema.create_block(column_group); | 
| 358 | 22 |     size_t output_rows = 0; | 
| 359 | 22 |     bool eof = false; | 
| 360 | 138 |     while (!eof && !ExecEnv::GetInstance()->storage_engine().stopped()) { | 
| 361 |  |         // Read one block from block reader | 
| 362 | 116 |         RETURN_NOT_OK_STATUS_WITH_WARN(src_block_reader.next_block_with_aggregation(&block, &eof), | 
| 363 | 116 |                                        "failed to read next block when merging rowsets of tablet " + | 
| 364 | 116 |                                                std::to_string(tablet_id)); | 
| 365 | 116 |         if (!block.rows()) { | 
| 366 | 0 |             break; | 
| 367 | 0 |         } | 
| 368 | 116 |         RETURN_NOT_OK_STATUS_WITH_WARN(dst_segment_writer.append_block(&block, 0, block.rows()), | 
| 369 | 116 |                                        "failed to write block when merging rowsets of tablet " + | 
| 370 | 116 |                                                std::to_string(tablet_id)); | 
| 371 |  |  | 
| 372 | 116 |         if (is_key && rowid_conversion != nullptr) { | 
| 373 | 30 |             rowid_conversion->add(src_block_reader.current_block_row_locations()); | 
| 374 | 30 |         } | 
| 375 | 116 |         output_rows += block.rows(); | 
| 376 | 116 |         block.clear_column_data(); | 
| 377 | 116 |     } | 
| 378 | 22 |     if (ExecEnv::GetInstance()->storage_engine().stopped()) { | 
| 379 | 0 |         return Status::Error<INTERNAL_ERROR>("tablet {} failed to do compaction, engine stopped", | 
| 380 | 0 |                                              tablet_id); | 
| 381 | 0 |     } | 
| 382 |  |  | 
| 383 | 22 |     if (is_key && stats_output != nullptr) { | 
| 384 | 11 |         stats_output->output_rows = output_rows; | 
| 385 | 11 |         stats_output->merged_rows = src_block_reader.merged_rows(); | 
| 386 | 11 |         stats_output->filtered_rows = src_block_reader.filtered_rows(); | 
| 387 | 11 |         stats_output->bytes_read_from_local = | 
| 388 | 11 |                 src_block_reader.stats().file_cache_stats.bytes_read_from_local; | 
| 389 | 11 |         stats_output->bytes_read_from_remote = | 
| 390 | 11 |                 src_block_reader.stats().file_cache_stats.bytes_read_from_remote; | 
| 391 | 11 |         stats_output->cached_bytes_total = | 
| 392 | 11 |                 src_block_reader.stats().file_cache_stats.bytes_write_into_cache; | 
| 393 | 11 |     } | 
| 394 |  |  | 
| 395 |  |     // segcompaction produce only one segment at once | 
| 396 | 22 |     RETURN_IF_ERROR(dst_segment_writer.finalize_columns_data()); | 
| 397 | 22 |     RETURN_IF_ERROR(dst_segment_writer.finalize_columns_index(index_size)); | 
| 398 |  |  | 
| 399 | 22 |     if (is_key) { | 
| 400 | 11 |         Slice min_key = dst_segment_writer.min_encoded_key(); | 
| 401 | 11 |         Slice max_key = dst_segment_writer.max_encoded_key(); | 
| 402 | 11 |         DCHECK_LE(min_key.compare(max_key), 0); | 
| 403 | 11 |         key_bounds.set_min_key(min_key.to_string()); | 
| 404 | 11 |         key_bounds.set_max_key(max_key.to_string()); | 
| 405 | 11 |     } | 
| 406 |  |  | 
| 407 | 22 |     return Status::OK(); | 
| 408 | 22 | } | 
| 409 |  |  | 
| 410 | 94 | int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_cnt) { | 
| 411 | 94 |     std::unique_lock<std::mutex> lock(tablet->sample_info_lock); | 
| 412 | 94 |     CompactionSampleInfo info = tablet->sample_infos[group_index]; | 
| 413 | 94 |     if (way_cnt <= 0) { | 
| 414 | 0 |         LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " | 
| 415 | 0 |                   << tablet->tablet_id() << " way cnt: " << way_cnt; | 
| 416 | 0 |         return 4096 - 32; | 
| 417 | 0 |     } | 
| 418 | 94 |     int64_t block_mem_limit = config::compaction_memory_bytes_limit / way_cnt; | 
| 419 | 94 |     if (tablet->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>()) { | 
| 420 | 0 |         block_mem_limit /= 4; | 
| 421 | 0 |     } | 
| 422 |  |  | 
| 423 | 94 |     int64_t group_data_size = 0; | 
| 424 | 94 |     if (info.group_data_size > 0 && info.bytes > 0 && info.rows > 0) { | 
| 425 | 0 |         double smoothing_factor = 0.5; | 
| 426 | 0 |         group_data_size = | 
| 427 | 0 |                 int64_t((cast_set<double>(info.group_data_size) * (1 - smoothing_factor)) + | 
| 428 | 0 |                         (cast_set<double>(info.bytes / info.rows) * smoothing_factor)); | 
| 429 | 0 |         tablet->sample_infos[group_index].group_data_size = group_data_size; | 
| 430 | 94 |     } else if (info.group_data_size > 0 && (info.bytes <= 0 || info.rows <= 0)) { | 
| 431 | 0 |         group_data_size = info.group_data_size; | 
| 432 | 94 |     } else if (info.group_data_size <= 0 && info.bytes > 0 && info.rows > 0) { | 
| 433 | 0 |         group_data_size = info.bytes / info.rows; | 
| 434 | 0 |         tablet->sample_infos[group_index].group_data_size = group_data_size; | 
| 435 | 94 |     } else { | 
| 436 | 94 |         LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " | 
| 437 | 94 |                   << tablet->tablet_id() << " group data size: " << info.group_data_size | 
| 438 | 94 |                   << " row num: " << info.rows << " consume bytes: " << info.bytes; | 
| 439 | 94 |         return 1024 - 32; | 
| 440 | 94 |     } | 
| 441 |  |  | 
| 442 | 0 |     if (group_data_size <= 0) { | 
| 443 | 0 |         LOG(WARNING) << "estimate batch size for vertical compaction, tablet id: " | 
| 444 | 0 |                      << tablet->tablet_id() << " unexpected group data size: " << group_data_size; | 
| 445 | 0 |         return 4096 - 32; | 
| 446 | 0 |     } | 
| 447 |  |  | 
| 448 | 0 |     tablet->sample_infos[group_index].bytes = 0; | 
| 449 | 0 |     tablet->sample_infos[group_index].rows = 0; | 
| 450 |  | 
 | 
| 451 | 0 |     int64_t batch_size = block_mem_limit / group_data_size; | 
| 452 | 0 |     int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), int64_t(32L)); | 
| 453 | 0 |     LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " << tablet->tablet_id() | 
| 454 | 0 |               << " group data size: " << info.group_data_size << " row num: " << info.rows | 
| 455 | 0 |               << " consume bytes: " << info.bytes << " way cnt: " << way_cnt | 
| 456 | 0 |               << " batch size: " << res; | 
| 457 | 0 |     return res; | 
| 458 | 0 | } | 
| 459 |  |  | 
| 460 |  | // steps to do vertical merge: | 
| 461 |  | // 1. split columns into column groups | 
| 462 |  | // 2. compact groups one by one, generate a row_source_buf when compact key group | 
| 463 |  | // and use this row_source_buf to compact value column groups | 
| 464 |  | // 3. build output rowset | 
| 465 |  | Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, | 
| 466 |  |                                       const TabletSchema& tablet_schema, | 
| 467 |  |                                       const std::vector<RowsetReaderSharedPtr>& src_rowset_readers, | 
| 468 |  |                                       RowsetWriter* dst_rowset_writer, | 
| 469 |  |                                       uint32_t max_rows_per_segment, int64_t merge_way_num, | 
| 470 | 83 |                                       Statistics* stats_output) { | 
| 471 | 83 |     LOG(INFO) << "Start to do vertical compaction, tablet_id: " << tablet->tablet_id(); | 
| 472 | 83 |     std::vector<std::vector<uint32_t>> column_groups; | 
| 473 | 83 |     std::vector<uint32_t> key_group_cluster_key_idxes; | 
| 474 | 83 |     vertical_split_columns(tablet_schema, &column_groups, &key_group_cluster_key_idxes); | 
| 475 |  |  | 
| 476 | 83 |     vectorized::RowSourcesBuffer row_sources_buf( | 
| 477 | 83 |             tablet->tablet_id(), dst_rowset_writer->context().tablet_path, reader_type); | 
| 478 | 83 |     { | 
| 479 | 83 |         std::unique_lock<std::mutex> lock(tablet->sample_info_lock); | 
| 480 | 83 |         tablet->sample_infos.resize(column_groups.size(), {0, 0, 0}); | 
| 481 | 83 |     } | 
| 482 |  |     // compact group one by one | 
| 483 | 372 |     for (auto i = 0; i < column_groups.size(); ++i) { | 
| 484 | 289 |         VLOG_NOTICE << "row source size: " << row_sources_buf.total_size(); | 
| 485 | 289 |         bool is_key = (i == 0); | 
| 486 | 289 |         int64_t batch_size = config::compaction_batch_size != -1 | 
| 487 | 289 |                                      ? config::compaction_batch_size | 
| 488 | 289 |                                      : estimate_batch_size(i, tablet, merge_way_num); | 
| 489 | 289 |         CompactionSampleInfo sample_info; | 
| 490 | 289 |         Status st = vertical_compact_one_group( | 
| 491 | 289 |                 tablet, reader_type, tablet_schema, is_key, column_groups[i], &row_sources_buf, | 
| 492 | 289 |                 src_rowset_readers, dst_rowset_writer, max_rows_per_segment, stats_output, | 
| 493 | 289 |                 key_group_cluster_key_idxes, batch_size, &sample_info); | 
| 494 | 289 |         { | 
| 495 | 289 |             std::unique_lock<std::mutex> lock(tablet->sample_info_lock); | 
| 496 | 289 |             tablet->sample_infos[i] = sample_info; | 
| 497 | 289 |         } | 
| 498 | 289 |         RETURN_IF_ERROR(st); | 
| 499 | 289 |         if (is_key) { | 
| 500 | 83 |             RETURN_IF_ERROR(row_sources_buf.flush()); | 
| 501 | 83 |         } | 
| 502 | 289 |         RETURN_IF_ERROR(row_sources_buf.seek_to_begin()); | 
| 503 | 289 |     } | 
| 504 |  |  | 
| 505 |  |     // finish compact, build output rowset | 
| 506 | 83 |     VLOG_NOTICE << "finish compact groups"; | 
| 507 | 83 |     RETURN_IF_ERROR(dst_rowset_writer->final_flush()); | 
| 508 |  |  | 
| 509 | 83 |     return Status::OK(); | 
| 510 | 83 | } | 
| 511 |  | #include "common/compile_check_end.h" | 
| 512 |  | } // namespace doris |