Coverage Report

Created: 2026-04-14 17:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/information_schema/schema_rowsets_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "information_schema/schema_rowsets_scanner.h"
19
20
#include <gen_cpp/Descriptors_types.h>
21
22
#include <algorithm>
23
#include <cstddef>
24
#include <memory>
25
#include <shared_mutex>
26
#include <string>
27
#include <utility>
28
29
#include "cloud/cloud_storage_engine.h"
30
#include "cloud/cloud_tablet.h"
31
#include "cloud/cloud_tablet_mgr.h"
32
#include "cloud/config.h"
33
#include "common/status.h"
34
#include "core/data_type/define_primitive_type.h"
35
#include "core/string_ref.h"
36
#include "runtime/exec_env.h"
37
#include "runtime/runtime_profile.h"
38
#include "runtime/runtime_state.h"
39
#include "storage/olap_common.h"
40
#include "storage/rowset/rowset.h"
41
#include "storage/rowset/rowset_meta.h"
42
#include "storage/storage_engine.h"
43
#include "storage/tablet/tablet.h"
44
#include "storage/tablet/tablet_manager.h"
45
46
namespace doris {
47
class Block;
48
49
std::vector<SchemaScanner::ColumnDesc> SchemaRowsetsScanner::_s_tbls_columns = {
50
        //   name,       type,          size,     is_null
51
        {"BACKEND_ID", TYPE_BIGINT, sizeof(int64_t), true},
52
        {"ROWSET_ID", TYPE_VARCHAR, sizeof(StringRef), true},
53
        {"TABLET_ID", TYPE_BIGINT, sizeof(int64_t), true},
54
        {"ROWSET_NUM_ROWS", TYPE_BIGINT, sizeof(int64_t), true},
55
        {"TXN_ID", TYPE_BIGINT, sizeof(int64_t), true},
56
        {"NUM_SEGMENTS", TYPE_BIGINT, sizeof(int64_t), true},
57
        {"START_VERSION", TYPE_BIGINT, sizeof(int64_t), true},
58
        {"END_VERSION", TYPE_BIGINT, sizeof(int64_t), true},
59
        {"INDEX_DISK_SIZE", TYPE_BIGINT, sizeof(size_t), true},
60
        {"DATA_DISK_SIZE", TYPE_BIGINT, sizeof(size_t), true},
61
        {"CREATION_TIME", TYPE_DATETIME, sizeof(int64_t), true},
62
        {"NEWEST_WRITE_TIMESTAMP", TYPE_DATETIME, sizeof(int64_t), true},
63
        {"SCHEMA_VERSION", TYPE_INT, sizeof(int32_t), true},
64
        {"COMMIT_TSO", TYPE_BIGINT, sizeof(int64_t), true},
65
66
};
67
68
SchemaRowsetsScanner::SchemaRowsetsScanner()
69
0
        : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_ROWSETS),
70
0
          backend_id_(0),
71
0
          _rowsets_idx(0) {};
72
73
0
Status SchemaRowsetsScanner::start(RuntimeState* state) {
74
0
    if (!_is_init) {
75
0
        return Status::InternalError("used before initialized.");
76
0
    }
77
0
    backend_id_ = state->backend_id();
78
0
    RETURN_IF_ERROR(_get_all_rowsets());
79
0
    return Status::OK();
80
0
}
81
82
0
Status SchemaRowsetsScanner::_get_all_rowsets() {
83
0
    if (config::is_cloud_mode()) {
84
        // only query cloud tablets in lru cache instead of all tablets
85
0
        std::vector<std::weak_ptr<CloudTablet>> tablets =
86
0
                ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_mgr().get_weak_tablets();
87
0
        for (const std::weak_ptr<CloudTablet>& tablet : tablets) {
88
0
            if (!tablet.expired()) {
89
0
                auto t = tablet.lock();
90
0
                std::shared_lock rowset_ldlock(t->get_header_lock());
91
0
                for (const auto& it : t->rowset_map()) {
92
0
                    rowsets_.emplace_back(it.second);
93
0
                }
94
0
            }
95
0
        }
96
0
        return Status::OK();
97
0
    }
98
0
    std::vector<TabletSharedPtr> tablets =
99
0
            ExecEnv::GetInstance()->storage_engine().to_local().tablet_manager()->get_all_tablet();
100
0
    for (const auto& tablet : tablets) {
101
        // all rowset
102
0
        std::vector<std::pair<Version, RowsetSharedPtr>> all_rowsets;
103
0
        {
104
0
            std::shared_lock rowset_ldlock(tablet->get_header_lock());
105
0
            tablet->acquire_version_and_rowsets(&all_rowsets);
106
0
        }
107
0
        for (const auto& version_and_rowset : all_rowsets) {
108
0
            RowsetSharedPtr rowset = version_and_rowset.second;
109
0
            rowsets_.emplace_back(rowset);
110
0
        }
111
0
    }
112
0
    return Status::OK();
113
0
}
114
115
0
Status SchemaRowsetsScanner::get_next_block_internal(Block* block, bool* eos) {
116
0
    if (!_is_init) {
117
0
        return Status::InternalError("Used before initialized.");
118
0
    }
119
0
    if (nullptr == block || nullptr == eos) {
120
0
        return Status::InternalError("input pointer is nullptr.");
121
0
    }
122
123
0
    if (_rowsets_idx >= rowsets_.size()) {
124
0
        *eos = true;
125
0
        return Status::OK();
126
0
    }
127
0
    *eos = false;
128
0
    return _fill_block_impl(block);
129
0
}
130
131
0
Status SchemaRowsetsScanner::_fill_block_impl(Block* block) {
132
0
    SCOPED_TIMER(_fill_block_timer);
133
0
    size_t fill_rowsets_num = std::min(1000UL, rowsets_.size() - _rowsets_idx);
134
0
    size_t fill_idx_begin = _rowsets_idx;
135
0
    size_t fill_idx_end = _rowsets_idx + fill_rowsets_num;
136
0
    std::vector<void*> datas(fill_rowsets_num);
137
    // BACKEND_ID
138
0
    {
139
0
        int64_t src = backend_id_;
140
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
141
0
            datas[i - fill_idx_begin] = &src;
142
0
        }
143
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 0, datas));
144
0
    }
145
    // ROWSET_ID
146
0
    {
147
0
        std::vector<std::string> rowset_ids(fill_rowsets_num);
148
0
        std::vector<StringRef> strs(fill_rowsets_num);
149
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
150
0
            RowsetSharedPtr rowset = rowsets_[i];
151
0
            rowset_ids[i - fill_idx_begin] = rowset->rowset_id().to_string();
152
0
            strs[i - fill_idx_begin] = StringRef(rowset_ids[i - fill_idx_begin].c_str(),
153
0
                                                 rowset_ids[i - fill_idx_begin].size());
154
0
            datas[i - fill_idx_begin] = strs.data() + i - fill_idx_begin;
155
0
        }
156
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 1, datas));
157
0
    }
158
    // TABLET_ID
159
0
    {
160
0
        std::vector<int64_t> srcs(fill_rowsets_num);
161
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
162
0
            RowsetSharedPtr rowset = rowsets_[i];
163
0
            srcs[i - fill_idx_begin] = rowset->rowset_meta()->tablet_id();
164
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
165
0
        }
166
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 2, datas));
167
0
    }
168
    // ROWSET_NUM_ROWS
169
0
    {
170
0
        std::vector<int64_t> srcs(fill_rowsets_num);
171
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
172
0
            RowsetSharedPtr rowset = rowsets_[i];
173
0
            srcs[i - fill_idx_begin] = rowset->num_rows();
174
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
175
0
        }
176
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 3, datas));
177
0
    }
178
    // TXN_ID
179
0
    {
180
0
        std::vector<int64_t> srcs(fill_rowsets_num);
181
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
182
0
            RowsetSharedPtr rowset = rowsets_[i];
183
0
            srcs[i - fill_idx_begin] = rowset->txn_id();
184
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
185
0
        }
186
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 4, datas));
187
0
    }
188
    // NUM_SEGMENTS
189
0
    {
190
0
        std::vector<int64_t> srcs(fill_rowsets_num);
191
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
192
0
            RowsetSharedPtr rowset = rowsets_[i];
193
0
            srcs[i - fill_idx_begin] = rowset->num_segments();
194
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
195
0
        }
196
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 5, datas));
197
0
    }
198
    // START_VERSION
199
0
    {
200
0
        std::vector<int64_t> srcs(fill_rowsets_num);
201
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
202
0
            RowsetSharedPtr rowset = rowsets_[i];
203
0
            srcs[i - fill_idx_begin] = rowset->start_version();
204
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
205
0
        }
206
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 6, datas));
207
0
    }
208
    // END_VERSION
209
0
    {
210
0
        std::vector<int64_t> srcs(fill_rowsets_num);
211
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
212
0
            RowsetSharedPtr rowset = rowsets_[i];
213
0
            srcs[i - fill_idx_begin] = rowset->end_version();
214
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
215
0
        }
216
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 7, datas));
217
0
    }
218
    // INDEX_DISK_SIZE
219
0
    {
220
0
        std::vector<int64_t> srcs(fill_rowsets_num);
221
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
222
0
            RowsetSharedPtr rowset = rowsets_[i];
223
0
            srcs[i - fill_idx_begin] = rowset->index_disk_size();
224
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
225
0
        }
226
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 8, datas));
227
0
    }
228
    // DATA_DISK_SIZE
229
0
    {
230
0
        std::vector<int64_t> srcs(fill_rowsets_num);
231
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
232
0
            RowsetSharedPtr rowset = rowsets_[i];
233
0
            srcs[i - fill_idx_begin] = rowset->data_disk_size();
234
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
235
0
        }
236
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 9, datas));
237
0
    }
238
    // CREATION_TIME
239
0
    {
240
0
        std::vector<VecDateTimeValue> srcs(fill_rowsets_num);
241
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
242
0
            RowsetSharedPtr rowset = rowsets_[i];
243
0
            int64_t creation_time = rowset->creation_time();
244
0
            srcs[i - fill_idx_begin].from_unixtime(creation_time, _timezone_obj);
245
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
246
0
        }
247
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 10, datas));
248
0
    }
249
    // NEWEST_WRITE_TIMESTAMP
250
0
    {
251
0
        std::vector<VecDateTimeValue> srcs(fill_rowsets_num);
252
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
253
0
            RowsetSharedPtr rowset = rowsets_[i];
254
0
            int64_t newest_write_timestamp = rowset->newest_write_timestamp();
255
0
            srcs[i - fill_idx_begin].from_unixtime(newest_write_timestamp, _timezone_obj);
256
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
257
0
        }
258
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, datas));
259
0
    }
260
    // SCHEMA_VERSION
261
0
    {
262
0
        std::vector<int32_t> srcs(fill_rowsets_num);
263
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
264
0
            RowsetSharedPtr rowset = rowsets_[i];
265
0
            srcs[i - fill_idx_begin] = rowset->tablet_schema()->schema_version();
266
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
267
0
        }
268
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, datas));
269
0
    }
270
    // COMMIT_TSO
271
0
    {
272
0
        std::vector<int64_t> srcs(fill_rowsets_num);
273
0
        for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
274
0
            RowsetSharedPtr rowset = rowsets_[i];
275
0
            srcs[i - fill_idx_begin] = rowset->commit_tso();
276
0
            datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
277
0
        }
278
0
        RETURN_IF_ERROR(fill_dest_column_for_range(block, 13, datas));
279
0
    }
280
281
0
    _rowsets_idx += fill_rowsets_num;
282
0
    return Status::OK();
283
0
}
284
} // namespace doris