Coverage Report

Created: 2026-03-16 23:17

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/information_schema/schema_rowsets_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "information_schema/schema_rowsets_scanner.h"
19
20
#include <gen_cpp/Descriptors_types.h>
21
22
#include <algorithm>
23
#include <cstddef>
24
#include <memory>
25
#include <shared_mutex>
26
#include <string>
27
#include <utility>
28
29
#include "cloud/cloud_storage_engine.h"
30
#include "cloud/cloud_tablet.h"
31
#include "cloud/cloud_tablet_mgr.h"
32
#include "cloud/config.h"
33
#include "common/status.h"
34
#include "core/data_type/define_primitive_type.h"
35
#include "core/string_ref.h"
36
#include "runtime/exec_env.h"
37
#include "runtime/runtime_profile.h"
38
#include "runtime/runtime_state.h"
39
#include "storage/olap_common.h"
40
#include "storage/rowset/rowset.h"
41
#include "storage/rowset/rowset_meta.h"
42
#include "storage/storage_engine.h"
43
#include "storage/tablet/tablet.h"
44
#include "storage/tablet/tablet_manager.h"
45
46
namespace doris {
47
class Block;
48
49
#include "common/compile_check_begin.h"
50
51
std::vector<SchemaScanner::ColumnDesc> SchemaRowsetsScanner::_s_tbls_columns = {
52
        //   name,       type,          size,     is_null
53
        {"BACKEND_ID", TYPE_BIGINT, sizeof(int64_t), true},
54
        {"ROWSET_ID", TYPE_VARCHAR, sizeof(StringRef), true},
55
        {"TABLET_ID", TYPE_BIGINT, sizeof(int64_t), true},
56
        {"ROWSET_NUM_ROWS", TYPE_BIGINT, sizeof(int64_t), true},
57
        {"TXN_ID", TYPE_BIGINT, sizeof(int64_t), true},
58
        {"NUM_SEGMENTS", TYPE_BIGINT, sizeof(int64_t), true},
59
        {"START_VERSION", TYPE_BIGINT, sizeof(int64_t), true},
60
        {"END_VERSION", TYPE_BIGINT, sizeof(int64_t), true},
61
        {"INDEX_DISK_SIZE", TYPE_BIGINT, sizeof(size_t), true},
62
        {"DATA_DISK_SIZE", TYPE_BIGINT, sizeof(size_t), true},
63
        {"CREATION_TIME", TYPE_DATETIME, sizeof(int64_t), true},
64
        {"NEWEST_WRITE_TIMESTAMP", TYPE_DATETIME, sizeof(int64_t), true},
65
        {"SCHEMA_VERSION", TYPE_INT, sizeof(int32_t), true},
66
67
};
68
69
SchemaRowsetsScanner::SchemaRowsetsScanner()
70
0
        : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_ROWSETS),
71
0
          backend_id_(0),
72
0
          _rowsets_idx(0) {};
73
74
0
Status SchemaRowsetsScanner::start(RuntimeState* state) {
75
0
    if (!_is_init) {
76
0
        return Status::InternalError("used before initialized.");
77
0
    }
78
0
    backend_id_ = state->backend_id();
79
0
    RETURN_IF_ERROR(_get_all_rowsets());
80
0
    return Status::OK();
81
0
}
82
83
0
Status SchemaRowsetsScanner::_get_all_rowsets() {
84
0
    if (config::is_cloud_mode()) {
85
        // only query cloud tablets in lru cache instead of all tablets
86
0
        std::vector<std::weak_ptr<CloudTablet>> tablets =
87
0
                ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_mgr().get_weak_tablets();
88
0
        for (const std::weak_ptr<CloudTablet>& tablet : tablets) {
89
0
            if (!tablet.expired()) {
90
0
                auto t = tablet.lock();
91
0
                std::shared_lock rowset_ldlock(t->get_header_lock());
92
0
                for (const auto& it : t->rowset_map()) {
93
0
                    rowsets_.emplace_back(it.second);
94
0
                }
95
0
            }
96
0
        }
97
0
        return Status::OK();
98
0
    }
99
0
    std::vector<TabletSharedPtr> tablets =
100
0
            ExecEnv::GetInstance()->storage_engine().to_local().tablet_manager()->get_all_tablet();
101
0
    for (const auto& tablet : tablets) {
102
        // all rowset
103
0
        std::vector<std::pair<Version, RowsetSharedPtr>> all_rowsets;
104
0
        {
105
0
            std::shared_lock rowset_ldlock(tablet->get_header_lock());
106
0
            tablet->acquire_version_and_rowsets(&all_rowsets);
107
0
        }
108
0
        for (const auto& version_and_rowset : all_rowsets) {
109
0
            RowsetSharedPtr rowset = version_and_rowset.second;
110
0
            rowsets_.emplace_back(rowset);
111
0
        }
112
0
    }
113
0
    return Status::OK();
114
0
}
115
116
0
Status SchemaRowsetsScanner::get_next_block_internal(Block* block, bool* eos) {
117
0
    if (!_is_init) {
118
0
        return Status::InternalError("Used before initialized.");
119
0
    }
120
0
    if (nullptr == block || nullptr == eos) {
121
0
        return Status::InternalError("input pointer is nullptr.");
122
0
    }
123
124
0
    if (_rowsets_idx >= rowsets_.size()) {
125
0
        *eos = true;
126
0
        return Status::OK();
127
0
    }
128
0
    *eos = false;
129
0
    return _fill_block_impl(block);
130
0
}
131
132
0
Status SchemaRowsetsScanner::_fill_block_impl(Block* block) {
133
0
    SCOPED_TIMER(_fill_block_timer);
134
0
    size_t fill_rowsets_num = std::min(1000UL, rowsets_.size() - _rowsets_idx);
135
0
    size_t fill_idx_begin = _rowsets_idx;
136
0
    size_t fill_idx_end = _rowsets_idx + fill_rowsets_num;
137
0
    std::vector<void*> datas(fill_rowsets_num);
138
    // BACKEND_ID
139
0
    {
140
0
        int64_t src = backend_id_;
141
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
142
0
            datas[i - fill_idx_begin] = &src;
143
0
        }
144
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 0, datas));
145
0
    }
146
    // ROWSET_ID
147
0
    {
148
0
        std::vector<std::string> rowset_ids(fill_rowsets_num);
149
0
        std::vector<StringRef> strs(fill_rowsets_num);
150
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
151
0
            RowsetSharedPtr rowset = rowsets_[i];
152
0
            rowset_ids[i - fill_idx_begin] = rowset->rowset_id().to_string();
153
0
            strs[i - fill_idx_begin] = StringRef(rowset_ids[i - fill_idx_begin].c_str(),
154
0
                                                 rowset_ids[i - fill_idx_begin].size());
155
0
            datas[i - fill_idx_begin] = strs.data() + i - fill_idx_begin;
156
0
        }
157
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 1, datas));
158
0
    }
159
    // TABLET_ID
160
0
    {
161
0
        std::vector<int64_t> srcs(fill_rowsets_num);
162
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
163
0
            RowsetSharedPtr rowset = rowsets_[i];
164
0
            srcs[i - fill_idx_begin] = rowset->rowset_meta()->tablet_id();
165
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
166
0
        }
167
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 2, datas));
168
0
    }
169
    // ROWSET_NUM_ROWS
170
0
    {
171
0
        std::vector<int64_t> srcs(fill_rowsets_num);
172
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
173
0
            RowsetSharedPtr rowset = rowsets_[i];
174
0
            srcs[i - fill_idx_begin] = rowset->num_rows();
175
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
176
0
        }
177
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 3, datas));
178
0
    }
179
    // TXN_ID
180
0
    {
181
0
        std::vector<int64_t> srcs(fill_rowsets_num);
182
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
183
0
            RowsetSharedPtr rowset = rowsets_[i];
184
0
            srcs[i - fill_idx_begin] = rowset->txn_id();
185
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
186
0
        }
187
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 4, datas));
188
0
    }
189
    // NUM_SEGMENTS
190
0
    {
191
0
        std::vector<int64_t> srcs(fill_rowsets_num);
192
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
193
0
            RowsetSharedPtr rowset = rowsets_[i];
194
0
            srcs[i - fill_idx_begin] = rowset->num_segments();
195
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
196
0
        }
197
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 5, datas));
198
0
    }
199
    // START_VERSION
200
0
    {
201
0
        std::vector<int64_t> srcs(fill_rowsets_num);
202
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
203
0
            RowsetSharedPtr rowset = rowsets_[i];
204
0
            srcs[i - fill_idx_begin] = rowset->start_version();
205
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
206
0
        }
207
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 6, datas));
208
0
    }
209
    // END_VERSION
210
0
    {
211
0
        std::vector<int64_t> srcs(fill_rowsets_num);
212
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
213
0
            RowsetSharedPtr rowset = rowsets_[i];
214
0
            srcs[i - fill_idx_begin] = rowset->end_version();
215
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
216
0
        }
217
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 7, datas));
218
0
    }
219
    // INDEX_DISK_SIZE
220
0
    {
221
0
        std::vector<int64_t> srcs(fill_rowsets_num);
222
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
223
0
            RowsetSharedPtr rowset = rowsets_[i];
224
0
            srcs[i - fill_idx_begin] = rowset->index_disk_size();
225
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
226
0
        }
227
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 8, datas));
228
0
    }
229
    // DATA_DISK_SIZE
230
0
    {
231
0
        std::vector<int64_t> srcs(fill_rowsets_num);
232
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
233
0
            RowsetSharedPtr rowset = rowsets_[i];
234
0
            srcs[i - fill_idx_begin] = rowset->data_disk_size();
235
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
236
0
        }
237
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 9, datas));
238
0
    }
239
    // CREATION_TIME
240
0
    {
241
0
        std::vector<VecDateTimeValue> srcs(fill_rowsets_num);
242
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
243
0
            RowsetSharedPtr rowset = rowsets_[i];
244
0
            int64_t creation_time = rowset->creation_time();
245
0
            srcs[i - fill_idx_begin].from_unixtime(creation_time, _timezone_obj);
246
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
247
0
        }
248
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 10, datas));
249
0
    }
250
    // NEWEST_WRITE_TIMESTAMP
251
0
    {
252
0
        std::vector<VecDateTimeValue> srcs(fill_rowsets_num);
253
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
254
0
            RowsetSharedPtr rowset = rowsets_[i];
255
0
            int64_t newest_write_timestamp = rowset->newest_write_timestamp();
256
0
            srcs[i - fill_idx_begin].from_unixtime(newest_write_timestamp, _timezone_obj);
257
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
258
0
        }
259
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, datas));
260
0
    }
261
    // SCHEMA_VERSION
262
0
    {
263
0
        std::vector<int32_t> srcs(fill_rowsets_num);
264
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
265
0
            RowsetSharedPtr rowset = rowsets_[i];
266
0
            srcs[i - fill_idx_begin] = rowset->tablet_schema()->schema_version();
267
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
268
0
        }
269
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, datas));
270
0
    }
271
272
0
    _rowsets_idx += fill_rowsets_num;
273
0
    return Status::OK();
274
0
}
275
} // namespace doris