Coverage Report

Created: 2026-04-27 17:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/routine_load/kinesis_conf.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/routine_load/kinesis_conf.h"
19
20
#include "common/logging.h"
21
22
namespace doris {
23
24
KinesisConf::ConfResult KinesisConf::set(const std::string& name, const std::string& value,
25
0
                                         std::string& errstr) {
26
    // Determine which API(s) this parameter belongs to based on its semantic meaning
27
    // All parameters come in as simple names (e.g., "max_records", "stream_arn")
28
    // after "aws.kinesis." prefix is removed in data_consumer.cpp
29
30
0
    if (name == "max_records") {
31
        // GetRecords API parameter
32
0
        int max_records;
33
0
        if (!parse_int(value, max_records, errstr)) {
34
0
            return CONF_INVALID;
35
0
        }
36
0
        if (max_records < 1 || max_records > 10000) {
37
0
            errstr = "max_records must be between 1 and 10000";
38
0
            return CONF_INVALID;
39
0
        }
40
0
        _get_records_params[name] = value;
41
0
        return CONF_OK;
42
0
    } else if (name == "stream_arn") {
43
        // Used by all three APIs
44
0
        _get_records_params[name] = value;
45
0
        _get_shard_iterator_params[name] = value;
46
0
        _list_shards_params[name] = value;
47
0
        return CONF_OK;
48
0
    } else if (name == "timestamp") {
49
        // GetShardIterator API parameter (for AT_TIMESTAMP iterator type)
50
0
        long timestamp;
51
0
        if (!parse_long(value, timestamp, errstr)) {
52
0
            return CONF_INVALID;
53
0
        }
54
0
        _get_shard_iterator_params[name] = value;
55
0
        return CONF_OK;
56
0
    } else if (name == "max_results") {
57
        // ListShards API parameter
58
0
        int max_results;
59
0
        if (!parse_int(value, max_results, errstr)) {
60
0
            return CONF_INVALID;
61
0
        }
62
0
        if (max_results < 1 || max_results > 10000) {
63
0
            errstr = "max_results must be between 1 and 10000";
64
0
            return CONF_INVALID;
65
0
        }
66
0
        _list_shards_params[name] = value;
67
0
        return CONF_OK;
68
0
    }
69
70
0
    VLOG_NOTICE << "Unknown Kinesis configuration: " << name;
71
0
    return CONF_UNKNOWN;
72
0
}
73
74
Status KinesisConf::apply_to_get_records_request(Aws::Kinesis::Model::GetRecordsRequest& request,
75
0
                                                 const std::string& shard_iterator) const {
76
0
    request.SetShardIterator(shard_iterator);
77
78
0
    auto it = _get_records_params.find("max_records");
79
0
    if (it != _get_records_params.end()) {
80
0
        try {
81
0
            request.SetLimit(std::stoi(it->second));
82
0
        } catch (const std::exception&) {
83
0
            return Status::InternalError("Failed to apply get_records.max_records: {}", it->second);
84
0
        }
85
0
    }
86
87
0
    it = _get_records_params.find("stream_arn");
88
0
    if (it != _get_records_params.end() && !it->second.empty()) {
89
0
        request.SetStreamARN(it->second);
90
0
    }
91
92
0
    return Status::OK();
93
0
}
94
95
Status KinesisConf::apply_to_get_shard_iterator_request(
96
        Aws::Kinesis::Model::GetShardIteratorRequest& request, const std::string& stream_name,
97
0
        const std::string& shard_id, const std::string& sequence_number) const {
98
0
    request.SetStreamName(stream_name);
99
0
    request.SetShardId(shard_id);
100
101
0
    if (sequence_number.empty() || sequence_number == "TRIM_HORIZON" || sequence_number == "-2") {
102
0
        request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::TRIM_HORIZON);
103
0
    } else if (sequence_number == "LATEST" || sequence_number == "-1") {
104
0
        request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::LATEST);
105
0
    } else {
106
0
        request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::AFTER_SEQUENCE_NUMBER);
107
0
        request.SetStartingSequenceNumber(sequence_number);
108
0
    }
109
110
0
    auto it = _get_shard_iterator_params.find("stream_arn");
111
0
    if (it != _get_shard_iterator_params.end() && !it->second.empty()) {
112
0
        request.SetStreamARN(it->second);
113
0
    }
114
115
0
    it = _get_shard_iterator_params.find("timestamp");
116
0
    if (it != _get_shard_iterator_params.end()) {
117
0
        try {
118
0
            request.SetTimestamp(Aws::Utils::DateTime(std::stol(it->second)));
119
0
        } catch (const std::exception&) {
120
0
            return Status::InternalError("Failed to apply get_shard_iterator.timestamp: {}",
121
0
                                         it->second);
122
0
        }
123
0
    }
124
125
0
    return Status::OK();
126
0
}
127
128
Status KinesisConf::apply_to_list_shards_request(Aws::Kinesis::Model::ListShardsRequest& request,
129
0
                                                 const std::string& stream_name) const {
130
0
    request.SetStreamName(stream_name);
131
132
0
    auto it = _list_shards_params.find("stream_arn");
133
0
    if (it != _list_shards_params.end() && !it->second.empty()) {
134
0
        request.SetStreamARN(it->second);
135
0
    }
136
137
0
    it = _list_shards_params.find("max_results");
138
0
    if (it != _list_shards_params.end()) {
139
0
        try {
140
0
            request.SetMaxResults(std::stoi(it->second));
141
0
        } catch (const std::exception&) {
142
0
            return Status::InternalError("Failed to apply list_shards.max_results: {}", it->second);
143
0
        }
144
0
    }
145
146
0
    return Status::OK();
147
0
}
148
149
0
bool KinesisConf::parse_int(const std::string& value, int& result, std::string& errstr) const {
150
0
    try {
151
0
        result = std::stoi(value);
152
0
        return true;
153
0
    } catch (const std::exception&) {
154
0
        errstr = "Invalid integer value: " + value;
155
0
        return false;
156
0
    }
157
0
}
158
159
0
bool KinesisConf::parse_long(const std::string& value, long& result, std::string& errstr) const {
160
0
    try {
161
0
        result = std::stol(value);
162
0
        return true;
163
0
    } catch (const std::exception&) {
164
0
        errstr = "Invalid long value: " + value;
165
0
        return false;
166
0
    }
167
0
}
168
169
} // namespace doris