be/src/load/routine_load/kinesis_conf.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/routine_load/kinesis_conf.h" |
19 | | |
20 | | #include "common/logging.h" |
21 | | |
22 | | namespace doris { |
23 | | |
24 | | KinesisConf::ConfResult KinesisConf::set(const std::string& name, const std::string& value, |
25 | 0 | std::string& errstr) { |
26 | | // Determine which API(s) this parameter belongs to based on its semantic meaning |
27 | | // All parameters come in as simple names (e.g., "max_records", "stream_arn") |
28 | | // after "aws.kinesis." prefix is removed in data_consumer.cpp |
29 | |
|
30 | 0 | if (name == "max_records") { |
31 | | // GetRecords API parameter |
32 | 0 | int max_records; |
33 | 0 | if (!parse_int(value, max_records, errstr)) { |
34 | 0 | return CONF_INVALID; |
35 | 0 | } |
36 | 0 | if (max_records < 1 || max_records > 10000) { |
37 | 0 | errstr = "max_records must be between 1 and 10000"; |
38 | 0 | return CONF_INVALID; |
39 | 0 | } |
40 | 0 | _get_records_params[name] = value; |
41 | 0 | return CONF_OK; |
42 | 0 | } else if (name == "stream_arn") { |
43 | | // Used by all three APIs |
44 | 0 | _get_records_params[name] = value; |
45 | 0 | _get_shard_iterator_params[name] = value; |
46 | 0 | _list_shards_params[name] = value; |
47 | 0 | return CONF_OK; |
48 | 0 | } else if (name == "timestamp") { |
49 | | // GetShardIterator API parameter (for AT_TIMESTAMP iterator type) |
50 | 0 | long timestamp; |
51 | 0 | if (!parse_long(value, timestamp, errstr)) { |
52 | 0 | return CONF_INVALID; |
53 | 0 | } |
54 | 0 | _get_shard_iterator_params[name] = value; |
55 | 0 | return CONF_OK; |
56 | 0 | } else if (name == "max_results") { |
57 | | // ListShards API parameter |
58 | 0 | int max_results; |
59 | 0 | if (!parse_int(value, max_results, errstr)) { |
60 | 0 | return CONF_INVALID; |
61 | 0 | } |
62 | 0 | if (max_results < 1 || max_results > 10000) { |
63 | 0 | errstr = "max_results must be between 1 and 10000"; |
64 | 0 | return CONF_INVALID; |
65 | 0 | } |
66 | 0 | _list_shards_params[name] = value; |
67 | 0 | return CONF_OK; |
68 | 0 | } |
69 | | |
70 | 0 | VLOG_NOTICE << "Unknown Kinesis configuration: " << name; |
71 | 0 | return CONF_UNKNOWN; |
72 | 0 | } |
73 | | |
74 | | Status KinesisConf::apply_to_get_records_request(Aws::Kinesis::Model::GetRecordsRequest& request, |
75 | 0 | const std::string& shard_iterator) const { |
76 | 0 | request.SetShardIterator(shard_iterator); |
77 | |
|
78 | 0 | auto it = _get_records_params.find("max_records"); |
79 | 0 | if (it != _get_records_params.end()) { |
80 | 0 | try { |
81 | 0 | request.SetLimit(std::stoi(it->second)); |
82 | 0 | } catch (const std::exception&) { |
83 | 0 | return Status::InternalError("Failed to apply get_records.max_records: {}", it->second); |
84 | 0 | } |
85 | 0 | } |
86 | | |
87 | 0 | it = _get_records_params.find("stream_arn"); |
88 | 0 | if (it != _get_records_params.end() && !it->second.empty()) { |
89 | 0 | request.SetStreamARN(it->second); |
90 | 0 | } |
91 | |
|
92 | 0 | return Status::OK(); |
93 | 0 | } |
94 | | |
95 | | Status KinesisConf::apply_to_get_shard_iterator_request( |
96 | | Aws::Kinesis::Model::GetShardIteratorRequest& request, const std::string& stream_name, |
97 | 0 | const std::string& shard_id, const std::string& sequence_number) const { |
98 | 0 | request.SetStreamName(stream_name); |
99 | 0 | request.SetShardId(shard_id); |
100 | |
|
101 | 0 | if (sequence_number.empty() || sequence_number == "TRIM_HORIZON" || sequence_number == "-2") { |
102 | 0 | request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::TRIM_HORIZON); |
103 | 0 | } else if (sequence_number == "LATEST" || sequence_number == "-1") { |
104 | 0 | request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::LATEST); |
105 | 0 | } else { |
106 | 0 | request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::AFTER_SEQUENCE_NUMBER); |
107 | 0 | request.SetStartingSequenceNumber(sequence_number); |
108 | 0 | } |
109 | |
|
110 | 0 | auto it = _get_shard_iterator_params.find("stream_arn"); |
111 | 0 | if (it != _get_shard_iterator_params.end() && !it->second.empty()) { |
112 | 0 | request.SetStreamARN(it->second); |
113 | 0 | } |
114 | |
|
115 | 0 | it = _get_shard_iterator_params.find("timestamp"); |
116 | 0 | if (it != _get_shard_iterator_params.end()) { |
117 | 0 | try { |
118 | 0 | request.SetTimestamp(Aws::Utils::DateTime(std::stol(it->second))); |
119 | 0 | } catch (const std::exception&) { |
120 | 0 | return Status::InternalError("Failed to apply get_shard_iterator.timestamp: {}", |
121 | 0 | it->second); |
122 | 0 | } |
123 | 0 | } |
124 | | |
125 | 0 | return Status::OK(); |
126 | 0 | } |
127 | | |
128 | | Status KinesisConf::apply_to_list_shards_request(Aws::Kinesis::Model::ListShardsRequest& request, |
129 | 0 | const std::string& stream_name) const { |
130 | 0 | request.SetStreamName(stream_name); |
131 | |
|
132 | 0 | auto it = _list_shards_params.find("stream_arn"); |
133 | 0 | if (it != _list_shards_params.end() && !it->second.empty()) { |
134 | 0 | request.SetStreamARN(it->second); |
135 | 0 | } |
136 | |
|
137 | 0 | it = _list_shards_params.find("max_results"); |
138 | 0 | if (it != _list_shards_params.end()) { |
139 | 0 | try { |
140 | 0 | request.SetMaxResults(std::stoi(it->second)); |
141 | 0 | } catch (const std::exception&) { |
142 | 0 | return Status::InternalError("Failed to apply list_shards.max_results: {}", it->second); |
143 | 0 | } |
144 | 0 | } |
145 | | |
146 | 0 | return Status::OK(); |
147 | 0 | } |
148 | | |
149 | 0 | bool KinesisConf::parse_int(const std::string& value, int& result, std::string& errstr) const { |
150 | 0 | try { |
151 | 0 | result = std::stoi(value); |
152 | 0 | return true; |
153 | 0 | } catch (const std::exception&) { |
154 | 0 | errstr = "Invalid integer value: " + value; |
155 | 0 | return false; |
156 | 0 | } |
157 | 0 | } |
158 | | |
159 | 0 | bool KinesisConf::parse_long(const std::string& value, long& result, std::string& errstr) const { |
160 | 0 | try { |
161 | 0 | result = std::stol(value); |
162 | 0 | return true; |
163 | 0 | } catch (const std::exception&) { |
164 | 0 | errstr = "Invalid long value: " + value; |
165 | 0 | return false; |
166 | 0 | } |
167 | 0 | } |
168 | | |
169 | | } // namespace doris |