Coverage Report

Created: 2025-12-11 01:17

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/olap/schema_change.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <butil/macros.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/Descriptors_types.h>
23
#include <glog/logging.h>
24
#include <stddef.h>
25
#include <stdint.h>
26
27
#include <memory>
28
#include <ostream>
29
#include <set>
30
#include <shared_mutex>
31
#include <string>
32
#include <unordered_map>
33
#include <unordered_set>
34
#include <utility>
35
#include <vector>
36
37
#include "common/config.h"
38
#include "common/object_pool.h"
39
#include "common/status.h"
40
#include "olap/column_mapping.h"
41
#include "olap/olap_common.h"
42
#include "olap/rowset/pending_rowset_helper.h"
43
#include "olap/rowset/rowset.h"
44
#include "olap/rowset/rowset_reader.h"
45
#include "olap/rowset/rowset_writer.h"
46
#include "olap/storage_engine.h"
47
#include "olap/tablet.h"
48
#include "olap/tablet_fwd.h"
49
#include "olap/tablet_schema.h"
50
#include "runtime/descriptors.h"
51
#include "runtime/memory/mem_tracker.h"
52
#include "runtime/runtime_state.h"
53
#include "vec/data_types/data_type.h"
54
55
namespace doris {
56
class DeleteHandler;
57
class Field;
58
class TAlterInvertedIndexReq;
59
class TAlterTabletReqV2;
60
class TExpr;
61
enum AlterTabletType : int;
62
enum RowsetTypePB : int;
63
enum SegmentsOverlapPB : int;
64
65
namespace vectorized {
66
class Block;
67
class OlapBlockDataConvertor;
68
} // namespace vectorized
69
70
class BlockChanger {
71
public:
72
    BlockChanger(TabletSchemaSPtr tablet_schema, DescriptorTbl desc_tbl,
73
                 std::shared_ptr<RuntimeState> state);
74
75
    ~BlockChanger();
76
77
    ColumnMapping* get_mutable_column_mapping(size_t column_index);
78
79
    Status change_block(vectorized::Block* ref_block, vectorized::Block* new_block) const;
80
81
0
    void set_where_expr(const std::shared_ptr<TExpr>& where_expr) { _where_expr = where_expr; }
82
83
0
    void set_type(AlterTabletType type) { _type = type; }
84
85
0
    void set_compatible_version(int32_t version) noexcept { _fe_compatible_version = version; }
86
87
0
    bool has_where() const { return _where_expr != nullptr; }
88
89
private:
90
    static Status _check_cast_valid(vectorized::ColumnPtr ref_column,
91
                                    vectorized::ColumnPtr new_column);
92
93
    // @brief column-mapping specification of new schema
94
    SchemaMapping _schema_mapping;
95
96
    DescriptorTbl _desc_tbl;
97
98
    std::shared_ptr<TExpr> _where_expr;
99
100
    AlterTabletType _type;
101
102
    int32_t _fe_compatible_version = -1;
103
104
    std::shared_ptr<RuntimeState> _state;
105
};
106
107
class SchemaChange {
108
public:
109
0
    SchemaChange() = default;
110
0
    virtual ~SchemaChange() = default;
111
112
    virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
113
                           BaseTabletSPtr new_tablet, BaseTabletSPtr base_tablet,
114
                           TabletSchemaSPtr base_tablet_schema,
115
0
                           TabletSchemaSPtr new_tablet_schema) {
116
0
        if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) {
117
0
            RETURN_IF_ERROR(rowset_writer->flush());
118
0
            return Status::OK();
119
0
        }
120
121
0
        _filtered_rows = 0;
122
0
        _merged_rows = 0;
123
0
        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_inner_process(rowset_reader, rowset_writer, new_tablet,
124
0
                                                          base_tablet_schema, new_tablet_schema));
125
126
        // Check row num changes
127
0
        if (!_check_row_nums(rowset_reader, *rowset_writer)) {
128
0
            return Status::Error<ErrorCode::ALTER_STATUS_ERR>("SchemaChange check row nums failed");
129
0
        }
130
131
0
        LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows()
132
0
                  << ", source_filtered_rows=" << rowset_reader->filtered_rows()
133
0
                  << ", source_merged_rows=" << rowset_reader->merged_rows()
134
0
                  << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows()
135
0
                  << ", new_index_rows=" << rowset_writer->num_rows()
136
0
                  << ", writer_filtered_rows=" << rowset_writer->num_rows_filtered();
137
0
        return Status::OK();
138
0
    }
139
140
0
    uint64_t filtered_rows() const { return _filtered_rows; }
141
142
0
    uint64_t merged_rows() const { return _merged_rows; }
143
144
protected:
145
0
    void _add_filtered_rows(uint64_t filtered_rows) { _filtered_rows += filtered_rows; }
146
147
0
    void _add_merged_rows(uint64_t merged_rows) { _merged_rows += merged_rows; }
148
149
    virtual Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
150
                                  BaseTabletSPtr new_tablet, TabletSchemaSPtr base_tablet_schema,
151
0
                                  TabletSchemaSPtr new_tablet_schema) {
152
0
        return Status::NotSupported("inner process unsupported.");
153
0
    }
154
155
0
    virtual bool _check_row_nums(RowsetReaderSharedPtr reader, const RowsetWriter& writer) const {
156
0
        if (reader->rowset()->num_rows() - reader->filtered_rows() - reader->merged_rows() !=
157
0
            writer.num_rows() + writer.num_rows_filtered() + _merged_rows + _filtered_rows) {
158
0
            LOG(WARNING) << "fail to check row num! "
159
0
                         << "source_rows=" << reader->rowset()->num_rows()
160
0
                         << ", source_filtered_rows=" << reader->filtered_rows()
161
0
                         << ", source_merged_rows=" << reader->merged_rows()
162
0
                         << ", written_rows=" << writer.num_rows()
163
0
                         << ", writer_filtered_rows=" << writer.num_rows_filtered()
164
0
                         << ", merged_rows=" << merged_rows()
165
0
                         << ", filtered_rows=" << filtered_rows();
166
0
            if (!config::ignore_schema_change_check) {
167
0
                return false;
168
0
            }
169
0
        }
170
0
        return true;
171
0
    }
172
173
private:
174
    uint64_t _filtered_rows {};
175
    uint64_t _merged_rows {};
176
177
protected:
178
    std::vector<bool> _row_same_bit;
179
};
180
181
class LinkedSchemaChange : public SchemaChange {
182
public:
183
0
    LinkedSchemaChange() = default;
184
0
    ~LinkedSchemaChange() override = default;
185
186
    Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
187
                   BaseTabletSPtr new_tablet, BaseTabletSPtr base_tablet,
188
                   TabletSchemaSPtr base_tablet_schema,
189
                   TabletSchemaSPtr new_tablet_schema) override;
190
191
private:
192
    DISALLOW_COPY_AND_ASSIGN(LinkedSchemaChange);
193
};
194
195
class VSchemaChangeDirectly : public SchemaChange {
196
public:
197
0
    VSchemaChangeDirectly(const BlockChanger& changer) : _changer(changer) {}
198
199
private:
200
    Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
201
                          BaseTabletSPtr new_tablet, TabletSchemaSPtr base_tablet_schema,
202
                          TabletSchemaSPtr new_tablet_schema) override;
203
204
0
    bool _check_row_nums(RowsetReaderSharedPtr reader, const RowsetWriter& writer) const override {
205
0
        return _changer.has_where() || SchemaChange::_check_row_nums(reader, writer);
206
0
    }
207
208
    const BlockChanger& _changer;
209
};
210
211
class VBaseSchemaChangeWithSorting : public SchemaChange {
212
public:
213
    VBaseSchemaChangeWithSorting(const BlockChanger& changer, size_t memory_limitation);
214
0
    ~VBaseSchemaChangeWithSorting() override = default;
215
216
    Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
217
                          BaseTabletSPtr new_tablet, TabletSchemaSPtr base_tablet_schema,
218
                          TabletSchemaSPtr new_tablet_schema) override;
219
220
    virtual Result<RowsetSharedPtr> _internal_sorting(
221
            const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
222
            const Version& temp_delta_versions, int64_t newest_write_timestamp,
223
            BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type,
224
            SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema);
225
226
    Status _external_sorting(std::vector<RowsetSharedPtr>& src_rowsets, RowsetWriter* rowset_writer,
227
                             BaseTabletSPtr new_tablet, TabletSchemaSPtr new_tablet_schema);
228
229
protected:
230
    // for external sorting
231
    // src_rowsets to store the rowset generated by internal sorting
232
    std::vector<RowsetSharedPtr> _src_rowsets;
233
234
private:
235
0
    bool _check_row_nums(RowsetReaderSharedPtr reader, const RowsetWriter& writer) const override {
236
0
        return _changer.has_where() || SchemaChange::_check_row_nums(reader, writer);
237
0
    }
238
239
    const BlockChanger& _changer;
240
    size_t _memory_limitation;
241
    Version _temp_delta_versions;
242
    std::unique_ptr<MemTracker> _mem_tracker;
243
};
244
245
// @breif schema change with sorting
246
// Mixin for local StorageEngine
247
class VLocalSchemaChangeWithSorting final : public VBaseSchemaChangeWithSorting {
248
public:
249
    VLocalSchemaChangeWithSorting(const BlockChanger& changer, size_t memory_limitation,
250
                                  StorageEngine& local_storage_engine)
251
0
            : VBaseSchemaChangeWithSorting(changer, memory_limitation),
252
0
              _local_storage_engine(local_storage_engine) {}
253
0
    ~VLocalSchemaChangeWithSorting() override = default;
254
255
    Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
256
                          BaseTabletSPtr new_tablet, TabletSchemaSPtr base_tablet_schema,
257
                          TabletSchemaSPtr new_tablet_schema) override;
258
259
    Result<RowsetSharedPtr> _internal_sorting(
260
            const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
261
            const Version& temp_delta_versions, int64_t newest_write_timestamp,
262
            BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type,
263
            SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) override;
264
265
private:
266
    StorageEngine& _local_storage_engine;
267
    std::vector<PendingRowsetGuard> _pending_rs_guards;
268
};
269
270
struct AlterMaterializedViewParam {
271
    std::string column_name;
272
    std::string origin_column_name;
273
    std::shared_ptr<TExpr> expr;
274
};
275
276
struct SchemaChangeParams {
277
    AlterTabletType alter_tablet_type;
278
    bool enable_unique_key_merge_on_write = false;
279
    std::vector<RowsetReaderSharedPtr> ref_rowset_readers;
280
    DeleteHandler* delete_handler = nullptr;
281
    std::unordered_map<std::string, AlterMaterializedViewParam> materialized_params_map;
282
    DescriptorTbl* desc_tbl = nullptr;
283
    ObjectPool pool;
284
    int32_t be_exec_version;
285
    std::string vault_id;
286
    bool output_to_file_cache;
287
    std::shared_ptr<RuntimeState> runtime_state;
288
};
289
290
class SchemaChangeJob {
291
public:
292
    SchemaChangeJob(StorageEngine& local_storage_engine, const TAlterTabletReqV2& request,
293
                    const std::string& job_id);
294
    Status process_alter_tablet(const TAlterTabletReqV2& request);
295
296
    bool tablet_in_converting(int64_t tablet_id);
297
298
    static Status parse_request(const SchemaChangeParams& sc_params,
299
                                TabletSchema* base_tablet_schema, TabletSchema* new_tablet_schema,
300
                                BlockChanger* changer, bool* sc_sorting, bool* sc_directly);
301
302
private:
303
    std::unique_ptr<SchemaChange> _get_sc_procedure(const BlockChanger& changer, bool sc_sorting,
304
0
                                                    bool sc_directly, int64_t mem_limit) {
305
0
        if (sc_sorting) {
306
0
            return std::make_unique<VLocalSchemaChangeWithSorting>(changer, mem_limit,
307
0
                                                                   _local_storage_engine);
308
0
        }
309
310
0
        if (sc_directly) {
311
0
            return std::make_unique<VSchemaChangeDirectly>(changer);
312
0
        }
313
314
0
        return std::make_unique<LinkedSchemaChange>();
315
0
    }
316
317
    Status _get_versions_to_be_changed(std::vector<Version>* versions_to_be_changed,
318
                                       RowsetSharedPtr* max_rowset);
319
320
    Status _do_process_alter_tablet(const TAlterTabletReqV2& request);
321
322
    Status _validate_alter_result(const TAlterTabletReqV2& request);
323
324
    Status _convert_historical_rowsets(const SchemaChangeParams& sc_params,
325
                                       int64_t* real_alter_version);
326
327
    // Initialization Settings for creating a default value
328
    static Status _init_column_mapping(ColumnMapping* column_mapping,
329
                                       const TabletColumn& column_schema, const std::string& value);
330
331
    Status _calc_delete_bitmap_for_mow_table(int64_t alter_version);
332
333
    StorageEngine& _local_storage_engine;
334
    TabletSharedPtr _base_tablet;
335
    TabletSharedPtr _new_tablet;
336
    TabletSchemaSPtr _base_tablet_schema;
337
    TabletSchemaSPtr _new_tablet_schema;
338
    std::shared_mutex _mutex;
339
    std::unordered_set<int64_t> _tablet_ids_in_converting;
340
    std::set<std::string> _supported_functions;
341
    std::string _job_id;
342
};
343
} // namespace doris