Coverage Report

Created: 2026-04-09 15:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/information_schema/schema_rowsets_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "information_schema/schema_rowsets_scanner.h"
19
20
#include <gen_cpp/Descriptors_types.h>
21
22
#include <algorithm>
23
#include <cstddef>
24
#include <memory>
25
#include <shared_mutex>
26
#include <string>
27
#include <utility>
28
29
#include "cloud/cloud_storage_engine.h"
30
#include "cloud/cloud_tablet.h"
31
#include "cloud/cloud_tablet_mgr.h"
32
#include "cloud/config.h"
33
#include "common/status.h"
34
#include "core/data_type/define_primitive_type.h"
35
#include "core/string_ref.h"
36
#include "runtime/exec_env.h"
37
#include "runtime/runtime_profile.h"
38
#include "runtime/runtime_state.h"
39
#include "storage/olap_common.h"
40
#include "storage/rowset/rowset.h"
41
#include "storage/rowset/rowset_meta.h"
42
#include "storage/storage_engine.h"
43
#include "storage/tablet/tablet.h"
44
#include "storage/tablet/tablet_manager.h"
45
46
namespace doris {
47
class Block;
48
49
#include "common/compile_check_begin.h"
50
51
std::vector<SchemaScanner::ColumnDesc> SchemaRowsetsScanner::_s_tbls_columns = {
52
        //   name,       type,          size,     is_null
53
        {"BACKEND_ID", TYPE_BIGINT, sizeof(int64_t), true},
54
        {"ROWSET_ID", TYPE_VARCHAR, sizeof(StringRef), true},
55
        {"TABLET_ID", TYPE_BIGINT, sizeof(int64_t), true},
56
        {"ROWSET_NUM_ROWS", TYPE_BIGINT, sizeof(int64_t), true},
57
        {"TXN_ID", TYPE_BIGINT, sizeof(int64_t), true},
58
        {"NUM_SEGMENTS", TYPE_BIGINT, sizeof(int64_t), true},
59
        {"START_VERSION", TYPE_BIGINT, sizeof(int64_t), true},
60
        {"END_VERSION", TYPE_BIGINT, sizeof(int64_t), true},
61
        {"INDEX_DISK_SIZE", TYPE_BIGINT, sizeof(size_t), true},
62
        {"DATA_DISK_SIZE", TYPE_BIGINT, sizeof(size_t), true},
63
        {"CREATION_TIME", TYPE_DATETIME, sizeof(int64_t), true},
64
        {"NEWEST_WRITE_TIMESTAMP", TYPE_DATETIME, sizeof(int64_t), true},
65
        {"SCHEMA_VERSION", TYPE_INT, sizeof(int32_t), true},
66
        {"COMMIT_TSO", TYPE_BIGINT, sizeof(int64_t), true},
67
68
};
69
70
SchemaRowsetsScanner::SchemaRowsetsScanner()
71
0
        : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_ROWSETS),
72
0
          backend_id_(0),
73
0
          _rowsets_idx(0) {};
74
75
0
Status SchemaRowsetsScanner::start(RuntimeState* state) {
76
0
    if (!_is_init) {
77
0
        return Status::InternalError("used before initialized.");
78
0
    }
79
0
    backend_id_ = state->backend_id();
80
0
    RETURN_IF_ERROR(_get_all_rowsets());
81
0
    return Status::OK();
82
0
}
83
84
0
Status SchemaRowsetsScanner::_get_all_rowsets() {
85
0
    if (config::is_cloud_mode()) {
86
        // only query cloud tablets in lru cache instead of all tablets
87
0
        std::vector<std::weak_ptr<CloudTablet>> tablets =
88
0
                ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_mgr().get_weak_tablets();
89
0
        for (const std::weak_ptr<CloudTablet>& tablet : tablets) {
90
0
            if (!tablet.expired()) {
91
0
                auto t = tablet.lock();
92
0
                std::shared_lock rowset_ldlock(t->get_header_lock());
93
0
                for (const auto& it : t->rowset_map()) {
94
0
                    rowsets_.emplace_back(it.second);
95
0
                }
96
0
            }
97
0
        }
98
0
        return Status::OK();
99
0
    }
100
0
    std::vector<TabletSharedPtr> tablets =
101
0
            ExecEnv::GetInstance()->storage_engine().to_local().tablet_manager()->get_all_tablet();
102
0
    for (const auto& tablet : tablets) {
103
        // all rowset
104
0
        std::vector<std::pair<Version, RowsetSharedPtr>> all_rowsets;
105
0
        {
106
0
            std::shared_lock rowset_ldlock(tablet->get_header_lock());
107
0
            tablet->acquire_version_and_rowsets(&all_rowsets);
108
0
        }
109
0
        for (const auto& version_and_rowset : all_rowsets) {
110
0
            RowsetSharedPtr rowset = version_and_rowset.second;
111
0
            rowsets_.emplace_back(rowset);
112
0
        }
113
0
    }
114
0
    return Status::OK();
115
0
}
116
117
0
Status SchemaRowsetsScanner::get_next_block_internal(Block* block, bool* eos) {
118
0
    if (!_is_init) {
119
0
        return Status::InternalError("Used before initialized.");
120
0
    }
121
0
    if (nullptr == block || nullptr == eos) {
122
0
        return Status::InternalError("input pointer is nullptr.");
123
0
    }
124
125
0
    if (_rowsets_idx >= rowsets_.size()) {
126
0
        *eos = true;
127
0
        return Status::OK();
128
0
    }
129
0
    *eos = false;
130
0
    return _fill_block_impl(block);
131
0
}
132
133
0
Status SchemaRowsetsScanner::_fill_block_impl(Block* block) {
134
0
    SCOPED_TIMER(_fill_block_timer);
135
0
    size_t fill_rowsets_num = std::min(1000UL, rowsets_.size() - _rowsets_idx);
136
0
    size_t fill_idx_begin = _rowsets_idx;
137
0
    size_t fill_idx_end = _rowsets_idx + fill_rowsets_num;
138
0
    std::vector<void*> datas(fill_rowsets_num);
139
    // BACKEND_ID
140
0
    {
141
0
        int64_t src = backend_id_;
142
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
143
0
            datas[i - fill_idx_begin] = &src;
144
0
        }
145
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 0, datas));
146
0
    }
147
    // ROWSET_ID
148
0
    {
149
0
        std::vector<std::string> rowset_ids(fill_rowsets_num);
150
0
        std::vector<StringRef> strs(fill_rowsets_num);
151
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
152
0
            RowsetSharedPtr rowset = rowsets_[i];
153
0
            rowset_ids[i - fill_idx_begin] = rowset->rowset_id().to_string();
154
0
            strs[i - fill_idx_begin] = StringRef(rowset_ids[i - fill_idx_begin].c_str(),
155
0
                                                 rowset_ids[i - fill_idx_begin].size());
156
0
            datas[i - fill_idx_begin] = strs.data() + i - fill_idx_begin;
157
0
        }
158
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 1, datas));
159
0
    }
160
    // TABLET_ID
161
0
    {
162
0
        std::vector<int64_t> srcs(fill_rowsets_num);
163
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
164
0
            RowsetSharedPtr rowset = rowsets_[i];
165
0
            srcs[i - fill_idx_begin] = rowset->rowset_meta()->tablet_id();
166
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
167
0
        }
168
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 2, datas));
169
0
    }
170
    // ROWSET_NUM_ROWS
171
0
    {
172
0
        std::vector<int64_t> srcs(fill_rowsets_num);
173
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
174
0
            RowsetSharedPtr rowset = rowsets_[i];
175
0
            srcs[i - fill_idx_begin] = rowset->num_rows();
176
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
177
0
        }
178
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 3, datas));
179
0
    }
180
    // TXN_ID
181
0
    {
182
0
        std::vector<int64_t> srcs(fill_rowsets_num);
183
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
184
0
            RowsetSharedPtr rowset = rowsets_[i];
185
0
            srcs[i - fill_idx_begin] = rowset->txn_id();
186
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
187
0
        }
188
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 4, datas));
189
0
    }
190
    // NUM_SEGMENTS
191
0
    {
192
0
        std::vector<int64_t> srcs(fill_rowsets_num);
193
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
194
0
            RowsetSharedPtr rowset = rowsets_[i];
195
0
            srcs[i - fill_idx_begin] = rowset->num_segments();
196
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
197
0
        }
198
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 5, datas));
199
0
    }
200
    // START_VERSION
201
0
    {
202
0
        std::vector<int64_t> srcs(fill_rowsets_num);
203
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
204
0
            RowsetSharedPtr rowset = rowsets_[i];
205
0
            srcs[i - fill_idx_begin] = rowset->start_version();
206
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
207
0
        }
208
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 6, datas));
209
0
    }
210
    // END_VERSION
211
0
    {
212
0
        std::vector<int64_t> srcs(fill_rowsets_num);
213
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
214
0
            RowsetSharedPtr rowset = rowsets_[i];
215
0
            srcs[i - fill_idx_begin] = rowset->end_version();
216
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
217
0
        }
218
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 7, datas));
219
0
    }
220
    // INDEX_DISK_SIZE
221
0
    {
222
0
        std::vector<int64_t> srcs(fill_rowsets_num);
223
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
224
0
            RowsetSharedPtr rowset = rowsets_[i];
225
0
            srcs[i - fill_idx_begin] = rowset->index_disk_size();
226
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
227
0
        }
228
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 8, datas));
229
0
    }
230
    // DATA_DISK_SIZE
231
0
    {
232
0
        std::vector<int64_t> srcs(fill_rowsets_num);
233
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
234
0
            RowsetSharedPtr rowset = rowsets_[i];
235
0
            srcs[i - fill_idx_begin] = rowset->data_disk_size();
236
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
237
0
        }
238
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 9, datas));
239
0
    }
240
    // CREATION_TIME
241
0
    {
242
0
        std::vector<VecDateTimeValue> srcs(fill_rowsets_num);
243
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
244
0
            RowsetSharedPtr rowset = rowsets_[i];
245
0
            int64_t creation_time = rowset->creation_time();
246
0
            srcs[i - fill_idx_begin].from_unixtime(creation_time, _timezone_obj);
247
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
248
0
        }
249
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 10, datas));
250
0
    }
251
    // NEWEST_WRITE_TIMESTAMP
252
0
    {
253
0
        std::vector<VecDateTimeValue> srcs(fill_rowsets_num);
254
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
255
0
            RowsetSharedPtr rowset = rowsets_[i];
256
0
            int64_t newest_write_timestamp = rowset->newest_write_timestamp();
257
0
            srcs[i - fill_idx_begin].from_unixtime(newest_write_timestamp, _timezone_obj);
258
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
259
0
        }
260
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, datas));
261
0
    }
262
    // SCHEMA_VERSION
263
0
    {
264
0
        std::vector<int32_t> srcs(fill_rowsets_num);
265
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
266
0
            RowsetSharedPtr rowset = rowsets_[i];
267
0
            srcs[i - fill_idx_begin] = rowset->tablet_schema()->schema_version();
268
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
269
0
        }
270
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, datas));
271
0
    }
272
    // COMMIT_TSO
273
0
    {
274
0
        std::vector<int64_t> srcs(fill_rowsets_num);
275
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
276
0
            RowsetSharedPtr rowset = rowsets_[i];
277
0
            srcs[i - fill_idx_begin] = rowset->commit_tso();
278
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
279
0
        }
280
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 13, datas));
281
0
    }
282
283
0
    _rowsets_idx += fill_rowsets_num;
284
0
    return Status::OK();
285
0
}
286
} // namespace doris