be/src/load/routine_load/consumer_helpers.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <chrono> |
21 | | #include <string> |
22 | | #include <unordered_map> |
23 | | |
24 | | namespace doris { |
25 | | |
26 | | // Helper class for tracking metrics in data consumers |
27 | | class ConsumerMetrics { |
28 | | public: |
29 | | ConsumerMetrics() = default; |
30 | | |
31 | | // Track a single message fetch operation |
32 | | void track_get_msg(int64_t latency_us); |
33 | | |
34 | | // Track consumed bytes |
35 | | void track_consume_bytes(int64_t bytes); |
36 | | |
37 | | // Track consumed rows |
38 | | void track_consume_rows(int64_t rows); |
39 | | |
40 | | // Get accumulated metrics |
41 | 0 | int64_t get_msg_count() const { return _get_msg_count; } |
42 | 0 | int64_t consume_bytes() const { return _consume_bytes; } |
43 | 0 | int64_t consume_rows() const { return _consume_rows; } |
44 | | |
45 | | private: |
46 | | int64_t _get_msg_count = 0; |
47 | | int64_t _consume_bytes = 0; |
48 | | int64_t _consume_rows = 0; |
49 | | }; |
50 | | |
51 | | // Helper class for retry logic with configurable backoff |
52 | | class RetryPolicy { |
53 | | public: |
54 | | explicit RetryPolicy(int max_retries = 3, int backoff_ms = 200) |
55 | 71 | : _max_retries(max_retries), _backoff_ms(backoff_ms), _retry_count(0) {} |
56 | | |
57 | | // Check if should retry |
58 | 0 | bool should_retry() const { return _retry_count < _max_retries; } |
59 | | |
60 | | // Increment retry count and sleep with backoff |
61 | | void retry_with_backoff(); |
62 | | |
63 | | // Reset retry counter (call on success) |
64 | 665 | void reset() { _retry_count = 0; } |
65 | | |
66 | | // Get current retry count |
67 | 0 | int retry_count() const { return _retry_count; } |
68 | | |
69 | | private: |
70 | | int _max_retries; |
71 | | int _backoff_ms; |
72 | | int _retry_count; |
73 | | }; |
74 | | |
75 | | // Helper class for exponential backoff (used for throttling) |
76 | | class ThrottleBackoff { |
77 | | public: |
78 | | explicit ThrottleBackoff(int initial_backoff_ms = 1000, int max_backoff_ms = 10000) |
79 | 0 | : _initial_backoff_ms(initial_backoff_ms), |
80 | 0 | _max_backoff_ms(max_backoff_ms), |
81 | 0 | _throttle_count(0) {} |
82 | | |
83 | | // Increment throttle count and sleep with exponential backoff |
84 | | void backoff_and_sleep(); |
85 | | |
86 | | // Reset throttle counter (call on success) |
87 | 0 | void reset() { _throttle_count = 0; } |
88 | | |
89 | | // Get current throttle count |
90 | 0 | int throttle_count() const { return _throttle_count; } |
91 | | |
92 | | private: |
93 | | int _initial_backoff_ms; |
94 | | int _max_backoff_ms; |
95 | | int _throttle_count; |
96 | | }; |
97 | | |
98 | | // Helper class for comparing custom properties between consumers |
99 | | class PropertyMatcher { |
100 | | public: |
101 | | // Check if two property maps match exactly |
102 | | template <typename MapType1, typename MapType2> |
103 | 1.18k | static bool properties_match(const MapType1& props1, const MapType2& props2) { |
104 | 1.18k | if (props1.size() != props2.size()) { |
105 | 30 | return false; |
106 | 30 | } |
107 | | |
108 | 1.16k | for (const auto& [key, value] : props1) { |
109 | 1.16k | auto it = props2.find(key); |
110 | 1.16k | if (it == props2.end() || it->second != value) { |
111 | 726 | return false; |
112 | 726 | } |
113 | 1.16k | } |
114 | | |
115 | 431 | return true; |
116 | 1.15k | } |
117 | | }; |
118 | | |
119 | | } // namespace doris |