Coverage Report

Created: 2026-04-15 19:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/information_schema/schema_partitions_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "information_schema/schema_partitions_scanner.h"
19
20
#include <gen_cpp/Descriptors_types.h>
21
#include <gen_cpp/FrontendService_types.h>
22
#include <stdint.h>
23
24
#include "core/block/block.h"
25
#include "core/data_type/data_type_factory.hpp"
26
#include "core/string_ref.h"
27
#include "information_schema/schema_helper.h"
28
#include "runtime/exec_env.h"
29
#include "runtime/runtime_state.h"
30
#include "util/client_cache.h"
31
#include "util/thrift_rpc_helper.h"
32
33
namespace doris {
34
35
class RuntimeState;
36
class Block;
37
38
std::vector<SchemaScanner::ColumnDesc> SchemaPartitionsScanner::_s_tbls_columns = {
39
        //   name,       type,          size,     is_null
40
        {"PARTITION_ID", TYPE_BIGINT, sizeof(int64_t), true},
41
        {"TABLE_CATALOG", TYPE_VARCHAR, sizeof(StringRef), true},
42
        {"TABLE_SCHEMA", TYPE_VARCHAR, sizeof(StringRef), true},
43
        {"TABLE_NAME", TYPE_VARCHAR, sizeof(StringRef), false},
44
        {"PARTITION_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
45
        {"SUBPARTITION_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
46
        {"PARTITION_ORDINAL_POSITION", TYPE_INT, sizeof(int32_t), true},
47
        {"SUBPARTITION_ORDINAL_POSITION", TYPE_INT, sizeof(int32_t), true},
48
        {"PARTITION_METHOD", TYPE_VARCHAR, sizeof(StringRef), true},
49
        {"SUBPARTITION_METHOD", TYPE_VARCHAR, sizeof(StringRef), true},
50
        {"PARTITION_EXPRESSION", TYPE_VARCHAR, sizeof(StringRef), true},
51
        {"SUBPARTITION_EXPRESSION", TYPE_VARCHAR, sizeof(StringRef), true},
52
        {"PARTITION_DESCRIPTION", TYPE_STRING, sizeof(StringRef), true},
53
        {"TABLE_ROWS", TYPE_BIGINT, sizeof(int64_t), true},
54
        {"AVG_ROW_LENGTH", TYPE_BIGINT, sizeof(int64_t), true},
55
        {"DATA_LENGTH", TYPE_BIGINT, sizeof(int64_t), true},
56
        {"MAX_DATA_LENGTH", TYPE_BIGINT, sizeof(int64_t), true},
57
        {"INDEX_LENGTH", TYPE_BIGINT, sizeof(int64_t), true},
58
        {"DATA_FREE", TYPE_BIGINT, sizeof(int64_t), true},
59
        {"CREATE_TIME", TYPE_BIGINT, sizeof(int64_t), false},
60
        {"UPDATE_TIME", TYPE_DATETIME, sizeof(int128_t), true},
61
        {"CHECK_TIME", TYPE_DATETIME, sizeof(int128_t), true},
62
        {"CHECKSUM", TYPE_BIGINT, sizeof(int64_t), true},
63
        {"PARTITION_COMMENT", TYPE_STRING, sizeof(StringRef), false},
64
        {"NODEGROUP", TYPE_VARCHAR, sizeof(StringRef), true},
65
        {"TABLESPACE_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
66
        {"LOCAL_DATA_SIZE", TYPE_STRING, sizeof(StringRef), true},
67
        {"REMOTE_DATA_SIZE", TYPE_STRING, sizeof(StringRef), true},
68
        {"STATE", TYPE_STRING, sizeof(StringRef), true},
69
        {"REPLICA_ALLOCATION", TYPE_STRING, sizeof(StringRef), true},
70
        {"REPLICA_NUM", TYPE_INT, sizeof(int32_t), true},
71
        {"STORAGE_POLICY", TYPE_STRING, sizeof(StringRef), true},
72
        {"STORAGE_MEDIUM", TYPE_STRING, sizeof(StringRef), true},
73
        {"COOLDOWN_TIME_MS", TYPE_STRING, sizeof(StringRef), true},
74
        {"LAST_CONSISTENCY_CHECK_TIME", TYPE_STRING, sizeof(StringRef), true},
75
        {"BUCKET_NUM", TYPE_INT, sizeof(int32_t), true},
76
        {"COMMITTED_VERSION", TYPE_BIGINT, sizeof(int64_t), true},
77
        {"VISIBLE_VERSION", TYPE_BIGINT, sizeof(int64_t), true},
78
        {"PARTITION_KEY", TYPE_STRING, sizeof(StringRef), true},
79
        {"RANGE", TYPE_STRING, sizeof(StringRef), true},
80
        {"DISTRIBUTION", TYPE_STRING, sizeof(StringRef), true},
81
};
82
83
SchemaPartitionsScanner::SchemaPartitionsScanner()
84
0
        : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_PARTITIONS) {}
85
86
0
SchemaPartitionsScanner::~SchemaPartitionsScanner() {}
87
88
0
Status SchemaPartitionsScanner::start(RuntimeState* state) {
89
0
    if (!_is_init) {
90
0
        return Status::InternalError("used before initialized.");
91
0
    }
92
0
    SCOPED_TIMER(_get_db_timer);
93
0
    TGetDbsParams db_params;
94
0
    if (_param->common_param->db) {
95
0
        db_params.__set_pattern(*(_param->common_param->db));
96
0
    }
97
0
    if (_param->common_param->catalog) {
98
0
        db_params.__set_catalog(*(_param->common_param->catalog));
99
0
    }
100
0
    if (_param->common_param->current_user_ident) {
101
0
        db_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
102
0
    }
103
104
0
    if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
105
0
        RETURN_IF_ERROR(SchemaHelper::get_db_names(
106
0
                *(_param->common_param->ip), _param->common_param->port, db_params, &_db_result));
107
0
    } else {
108
0
        return Status::InternalError("IP or port doesn't exists");
109
0
    }
110
0
    _block_rows_limit = state->batch_size();
111
0
    _rpc_timeout_ms = state->execution_timeout() * 1000;
112
0
    return Status::OK();
113
0
}
114
115
0
Status SchemaPartitionsScanner::get_onedb_info_from_fe(int64_t dbId) {
116
0
    TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
117
118
0
    TSchemaTableRequestParams schema_table_request_params;
119
0
    for (int i = 0; i < _s_tbls_columns.size(); i++) {
120
0
        schema_table_request_params.__isset.columns_name = true;
121
0
        schema_table_request_params.columns_name.emplace_back(_s_tbls_columns[i].name);
122
0
    }
123
0
    schema_table_request_params.__set_current_user_ident(*_param->common_param->current_user_ident);
124
0
    schema_table_request_params.__set_catalog(*_param->common_param->catalog);
125
0
    schema_table_request_params.__set_dbId(dbId);
126
0
    if (_param->common_param->thread_id > 0) {
127
0
        schema_table_request_params.__set_thread_id(_param->common_param->thread_id);
128
0
    }
129
0
    schema_table_request_params.__set_time_zone(_timezone);
130
131
0
    TFetchSchemaTableDataRequest request;
132
0
    request.__set_schema_table_name(TSchemaTableName::PARTITIONS);
133
0
    request.__set_schema_table_params(schema_table_request_params);
134
135
0
    TFetchSchemaTableDataResult result;
136
137
0
    RETURN_IF_ERROR(SchemaHelper::fetch_schema_table_data(master_addr.hostname, master_addr.port,
138
0
                                                          request, &result));
139
0
    RETURN_IF_ERROR(fill_db_partitions(result));
140
0
    return Status::OK();
141
0
}
142
143
0
Status SchemaPartitionsScanner::fill_db_partitions(TFetchSchemaTableDataResult& result) {
144
0
    Status status(Status::create(result.status));
145
0
    if (!status.ok()) {
146
0
        LOG(WARNING) << "fetch table options from FE failed, errmsg=" << status;
147
0
        return status;
148
0
    }
149
0
    std::vector<TRow> result_data = result.data_batch;
150
151
0
    _partitions_block = Block::create_unique();
152
0
    for (int i = 0; i < _s_tbls_columns.size(); ++i) {
153
0
        auto data_type =
154
0
                DataTypeFactory::instance().create_data_type(_s_tbls_columns[i].type, true);
155
0
        _partitions_block->insert(ColumnWithTypeAndName(data_type->create_column(), data_type,
156
0
                                                        _s_tbls_columns[i].name));
157
0
    }
158
0
    _partitions_block->reserve(_block_rows_limit);
159
0
    if (!result_data.empty()) {
160
0
        auto col_size = result_data[0].column_value.size();
161
0
        if (col_size != _s_tbls_columns.size()) {
162
0
            return Status::InternalError<false>("table options schema is not match for FE and BE");
163
0
        }
164
0
    }
165
166
0
    for (int i = 0; i < result_data.size(); i++) {
167
0
        TRow row = result_data[i];
168
0
        for (int j = 0; j < _s_tbls_columns.size(); j++) {
169
0
            RETURN_IF_ERROR(insert_block_column(row.column_value[j], j, _partitions_block.get(),
170
0
                                                _s_tbls_columns[j].type));
171
0
        }
172
0
    }
173
0
    return Status::OK();
174
0
}
175
176
0
bool SchemaPartitionsScanner::check_and_mark_eos(bool* eos) const {
177
0
    if (_row_idx == _total_rows) {
178
0
        *eos = true;
179
0
        if (_db_index < _db_result.db_ids.size()) {
180
0
            *eos = false;
181
0
        }
182
0
        return true;
183
0
    }
184
0
    return false;
185
0
}
186
187
0
Status SchemaPartitionsScanner::get_next_block_internal(Block* block, bool* eos) {
188
0
    if (!_is_init) {
189
0
        return Status::InternalError("Used before initialized.");
190
0
    }
191
192
0
    if (nullptr == block || nullptr == eos) {
193
0
        return Status::InternalError("input pointer is nullptr.");
194
0
    }
195
0
    SCOPED_TIMER(_fill_block_timer);
196
197
0
    if ((_partitions_block == nullptr) || (_row_idx == _total_rows)) {
198
0
        if (_db_index < _db_result.db_ids.size()) {
199
0
            RETURN_IF_ERROR(get_onedb_info_from_fe(_db_result.db_ids[_db_index]));
200
0
            _row_idx = 0; // reset row index so that it start filling for next block.
201
0
            _total_rows = (int)_partitions_block->rows();
202
0
            _db_index++;
203
0
        }
204
0
    }
205
206
0
    if (check_and_mark_eos(eos)) {
207
0
        return Status::OK();
208
0
    }
209
210
0
    int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
211
0
    MutableBlock mblock = MutableBlock::build_mutable_block(block);
212
0
    RETURN_IF_ERROR(mblock.add_rows(_partitions_block.get(), _row_idx, current_batch_rows));
213
0
    _row_idx += current_batch_rows;
214
215
0
    if (!check_and_mark_eos(eos)) {
216
0
        *eos = false;
217
0
    }
218
0
    return Status::OK();
219
0
}
220
221
} // namespace doris