Coverage Report

Created: 2026-03-15 08:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/information_schema/schema_partitions_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "information_schema/schema_partitions_scanner.h"
19
20
#include <gen_cpp/Descriptors_types.h>
21
#include <gen_cpp/FrontendService_types.h>
22
#include <stdint.h>
23
24
#include "core/block/block.h"
25
#include "core/data_type/data_type_factory.hpp"
26
#include "core/string_ref.h"
27
#include "information_schema/schema_helper.h"
28
#include "runtime/exec_env.h"
29
#include "runtime/runtime_state.h"
30
#include "util/client_cache.h"
31
#include "util/thrift_rpc_helper.h"
32
33
namespace doris {
34
#include "common/compile_check_begin.h"
35
36
class RuntimeState;
37
class Block;
38
39
std::vector<SchemaScanner::ColumnDesc> SchemaPartitionsScanner::_s_tbls_columns = {
40
        //   name,       type,          size,     is_null
41
        {"PARTITION_ID", TYPE_BIGINT, sizeof(int64_t), true},
42
        {"TABLE_CATALOG", TYPE_VARCHAR, sizeof(StringRef), true},
43
        {"TABLE_SCHEMA", TYPE_VARCHAR, sizeof(StringRef), true},
44
        {"TABLE_NAME", TYPE_VARCHAR, sizeof(StringRef), false},
45
        {"PARTITION_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
46
        {"SUBPARTITION_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
47
        {"PARTITION_ORDINAL_POSITION", TYPE_INT, sizeof(int32_t), true},
48
        {"SUBPARTITION_ORDINAL_POSITION", TYPE_INT, sizeof(int32_t), true},
49
        {"PARTITION_METHOD", TYPE_VARCHAR, sizeof(StringRef), true},
50
        {"SUBPARTITION_METHOD", TYPE_VARCHAR, sizeof(StringRef), true},
51
        {"PARTITION_EXPRESSION", TYPE_VARCHAR, sizeof(StringRef), true},
52
        {"SUBPARTITION_EXPRESSION", TYPE_VARCHAR, sizeof(StringRef), true},
53
        {"PARTITION_DESCRIPTION", TYPE_STRING, sizeof(StringRef), true},
54
        {"TABLE_ROWS", TYPE_BIGINT, sizeof(int64_t), true},
55
        {"AVG_ROW_LENGTH", TYPE_BIGINT, sizeof(int64_t), true},
56
        {"DATA_LENGTH", TYPE_BIGINT, sizeof(int64_t), true},
57
        {"MAX_DATA_LENGTH", TYPE_BIGINT, sizeof(int64_t), true},
58
        {"INDEX_LENGTH", TYPE_BIGINT, sizeof(int64_t), true},
59
        {"DATA_FREE", TYPE_BIGINT, sizeof(int64_t), true},
60
        {"CREATE_TIME", TYPE_BIGINT, sizeof(int64_t), false},
61
        {"UPDATE_TIME", TYPE_DATETIME, sizeof(int128_t), true},
62
        {"CHECK_TIME", TYPE_DATETIME, sizeof(int128_t), true},
63
        {"CHECKSUM", TYPE_BIGINT, sizeof(int64_t), true},
64
        {"PARTITION_COMMENT", TYPE_STRING, sizeof(StringRef), false},
65
        {"NODEGROUP", TYPE_VARCHAR, sizeof(StringRef), true},
66
        {"TABLESPACE_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
67
        {"LOCAL_DATA_SIZE", TYPE_STRING, sizeof(StringRef), true},
68
        {"REMOTE_DATA_SIZE", TYPE_STRING, sizeof(StringRef), true},
69
        {"STATE", TYPE_STRING, sizeof(StringRef), true},
70
        {"REPLICA_ALLOCATION", TYPE_STRING, sizeof(StringRef), true},
71
        {"REPLICA_NUM", TYPE_INT, sizeof(int32_t), true},
72
        {"STORAGE_POLICY", TYPE_STRING, sizeof(StringRef), true},
73
        {"STORAGE_MEDIUM", TYPE_STRING, sizeof(StringRef), true},
74
        {"COOLDOWN_TIME_MS", TYPE_STRING, sizeof(StringRef), true},
75
        {"LAST_CONSISTENCY_CHECK_TIME", TYPE_STRING, sizeof(StringRef), true},
76
        {"BUCKET_NUM", TYPE_INT, sizeof(int32_t), true},
77
        {"COMMITTED_VERSION", TYPE_BIGINT, sizeof(int64_t), true},
78
        {"VISIBLE_VERSION", TYPE_BIGINT, sizeof(int64_t), true},
79
        {"PARTITION_KEY", TYPE_STRING, sizeof(StringRef), true},
80
        {"RANGE", TYPE_STRING, sizeof(StringRef), true},
81
        {"DISTRIBUTION", TYPE_STRING, sizeof(StringRef), true},
82
};
83
84
SchemaPartitionsScanner::SchemaPartitionsScanner()
85
0
        : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_PARTITIONS) {}
86
87
0
SchemaPartitionsScanner::~SchemaPartitionsScanner() {}
88
89
0
Status SchemaPartitionsScanner::start(RuntimeState* state) {
90
0
    if (!_is_init) {
91
0
        return Status::InternalError("used before initialized.");
92
0
    }
93
0
    SCOPED_TIMER(_get_db_timer);
94
0
    TGetDbsParams db_params;
95
0
    if (_param->common_param->db) {
96
0
        db_params.__set_pattern(*(_param->common_param->db));
97
0
    }
98
0
    if (_param->common_param->catalog) {
99
0
        db_params.__set_catalog(*(_param->common_param->catalog));
100
0
    }
101
0
    if (_param->common_param->current_user_ident) {
102
0
        db_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
103
0
    }
104
105
0
    if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
106
0
        RETURN_IF_ERROR(SchemaHelper::get_db_names(
107
0
                *(_param->common_param->ip), _param->common_param->port, db_params, &_db_result));
108
0
    } else {
109
0
        return Status::InternalError("IP or port doesn't exists");
110
0
    }
111
0
    _block_rows_limit = state->batch_size();
112
0
    _rpc_timeout_ms = state->execution_timeout() * 1000;
113
0
    return Status::OK();
114
0
}
115
116
0
Status SchemaPartitionsScanner::get_onedb_info_from_fe(int64_t dbId) {
117
0
    TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
118
119
0
    TSchemaTableRequestParams schema_table_request_params;
120
0
    for (int i = 0; i < _s_tbls_columns.size(); i++) {
121
0
        schema_table_request_params.__isset.columns_name = true;
122
0
        schema_table_request_params.columns_name.emplace_back(_s_tbls_columns[i].name);
123
0
    }
124
0
    schema_table_request_params.__set_current_user_ident(*_param->common_param->current_user_ident);
125
0
    schema_table_request_params.__set_catalog(*_param->common_param->catalog);
126
0
    schema_table_request_params.__set_dbId(dbId);
127
0
    if (_param->common_param->thread_id > 0) {
128
0
        schema_table_request_params.__set_thread_id(_param->common_param->thread_id);
129
0
    }
130
0
    schema_table_request_params.__set_time_zone(_timezone);
131
132
0
    TFetchSchemaTableDataRequest request;
133
0
    request.__set_schema_table_name(TSchemaTableName::PARTITIONS);
134
0
    request.__set_schema_table_params(schema_table_request_params);
135
136
0
    TFetchSchemaTableDataResult result;
137
138
0
    RETURN_IF_ERROR(SchemaHelper::fetch_schema_table_data(master_addr.hostname, master_addr.port,
139
0
                                                          request, &result));
140
0
    RETURN_IF_ERROR(fill_db_partitions(result));
141
0
    return Status::OK();
142
0
}
143
144
0
Status SchemaPartitionsScanner::fill_db_partitions(TFetchSchemaTableDataResult& result) {
145
0
    Status status(Status::create(result.status));
146
0
    if (!status.ok()) {
147
0
        LOG(WARNING) << "fetch table options from FE failed, errmsg=" << status;
148
0
        return status;
149
0
    }
150
0
    std::vector<TRow> result_data = result.data_batch;
151
152
0
    _partitions_block = Block::create_unique();
153
0
    for (int i = 0; i < _s_tbls_columns.size(); ++i) {
154
0
        auto data_type =
155
0
                DataTypeFactory::instance().create_data_type(_s_tbls_columns[i].type, true);
156
0
        _partitions_block->insert(ColumnWithTypeAndName(data_type->create_column(), data_type,
157
0
                                                        _s_tbls_columns[i].name));
158
0
    }
159
0
    _partitions_block->reserve(_block_rows_limit);
160
0
    if (!result_data.empty()) {
161
0
        auto col_size = result_data[0].column_value.size();
162
0
        if (col_size != _s_tbls_columns.size()) {
163
0
            return Status::InternalError<false>("table options schema is not match for FE and BE");
164
0
        }
165
0
    }
166
167
0
    for (int i = 0; i < result_data.size(); i++) {
168
0
        TRow row = result_data[i];
169
0
        for (int j = 0; j < _s_tbls_columns.size(); j++) {
170
0
            RETURN_IF_ERROR(insert_block_column(row.column_value[j], j, _partitions_block.get(),
171
0
                                                _s_tbls_columns[j].type));
172
0
        }
173
0
    }
174
0
    return Status::OK();
175
0
}
176
177
0
bool SchemaPartitionsScanner::check_and_mark_eos(bool* eos) const {
178
0
    if (_row_idx == _total_rows) {
179
0
        *eos = true;
180
0
        if (_db_index < _db_result.db_ids.size()) {
181
0
            *eos = false;
182
0
        }
183
0
        return true;
184
0
    }
185
0
    return false;
186
0
}
187
188
0
Status SchemaPartitionsScanner::get_next_block_internal(Block* block, bool* eos) {
189
0
    if (!_is_init) {
190
0
        return Status::InternalError("Used before initialized.");
191
0
    }
192
193
0
    if (nullptr == block || nullptr == eos) {
194
0
        return Status::InternalError("input pointer is nullptr.");
195
0
    }
196
0
    SCOPED_TIMER(_fill_block_timer);
197
198
0
    if ((_partitions_block == nullptr) || (_row_idx == _total_rows)) {
199
0
        if (_db_index < _db_result.db_ids.size()) {
200
0
            RETURN_IF_ERROR(get_onedb_info_from_fe(_db_result.db_ids[_db_index]));
201
0
            _row_idx = 0; // reset row index so that it start filling for next block.
202
0
            _total_rows = (int)_partitions_block->rows();
203
0
            _db_index++;
204
0
        }
205
0
    }
206
207
0
    if (check_and_mark_eos(eos)) {
208
0
        return Status::OK();
209
0
    }
210
211
0
    int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
212
0
    MutableBlock mblock = MutableBlock::build_mutable_block(block);
213
0
    RETURN_IF_ERROR(mblock.add_rows(_partitions_block.get(), _row_idx, current_batch_rows));
214
0
    _row_idx += current_batch_rows;
215
216
0
    if (!check_and_mark_eos(eos)) {
217
0
        *eos = false;
218
0
    }
219
0
    return Status::OK();
220
0
}
221
222
} // namespace doris