be/src/exec/scan/es_scanner.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/scan/es_scanner.h" |
19 | | |
20 | | #include <algorithm> |
21 | | #include <ostream> |
22 | | #include <utility> |
23 | | |
24 | | #include "common/logging.h" |
25 | | #include "core/block/block.h" |
26 | | #include "core/block/column_with_type_and_name.h" |
27 | | #include "core/column/column.h" |
28 | | #include "exec/operator/es_scan_operator.h" |
29 | | #include "runtime/descriptors.h" |
30 | | #include "runtime/runtime_profile.h" |
31 | | #include "runtime/runtime_state.h" |
32 | | |
33 | | namespace doris { |
34 | | class VExprContext; |
35 | | } // namespace doris |
36 | | |
37 | | static const std::string NEW_SCANNER_TYPE = "EsScanner"; |
38 | | |
39 | | namespace doris { |
40 | | |
41 | | EsScanner::EsScanner(RuntimeState* state, ScanLocalStateBase* local_state, int64_t limit, |
42 | | TupleId tuple_id, const std::map<std::string, std::string>& properties, |
43 | | const std::map<std::string, std::string>& docvalue_context, |
44 | | bool doc_value_mode, RuntimeProfile* profile) |
45 | 0 | : Scanner(state, local_state, limit, profile), |
46 | 0 | _es_eof(false), |
47 | 0 | _properties(properties), |
48 | 0 | _line_eof(false), |
49 | 0 | _batch_eof(false), |
50 | 0 | _tuple_id(tuple_id), |
51 | 0 | _tuple_desc(nullptr), |
52 | 0 | _es_reader(nullptr), |
53 | 0 | _es_scroll_parser(nullptr), |
54 | 0 | _docvalue_context(docvalue_context), |
55 | 0 | _doc_value_mode(doc_value_mode) { |
56 | 0 | _has_prepared = false; |
57 | 0 | } |
58 | | |
59 | 0 | Status EsScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) { |
60 | 0 | VLOG_CRITICAL << NEW_SCANNER_TYPE << "::init"; |
61 | 0 | RETURN_IF_ERROR(Scanner::init(_state, conjuncts)); |
62 | | |
63 | 0 | if (nullptr == state) { |
64 | 0 | return Status::InternalError("input pointer is null."); |
65 | 0 | } |
66 | | |
67 | 0 | _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); |
68 | 0 | if (nullptr == _tuple_desc) { |
69 | 0 | return Status::InternalError("Failed to get tuple descriptor, tuple_id={}", _tuple_id); |
70 | 0 | } |
71 | | |
72 | 0 | const std::string& host = _properties.at(ESScanReader::KEY_HOST_PORT); |
73 | 0 | _es_reader.reset(new ESScanReader(host, _properties, _doc_value_mode)); |
74 | 0 | if (_es_reader == nullptr) { |
75 | 0 | return Status::InternalError("Es reader construct failed."); |
76 | 0 | } |
77 | | |
78 | 0 | return Status::OK(); |
79 | 0 | } |
80 | | |
81 | 0 | Status EsScanner::_open_impl(RuntimeState* state) { |
82 | 0 | VLOG_CRITICAL << NEW_SCANNER_TYPE << "::open"; |
83 | |
|
84 | 0 | if (nullptr == state) { |
85 | 0 | return Status::InternalError("input pointer is null."); |
86 | 0 | } |
87 | | |
88 | 0 | if (!_has_prepared) { |
89 | 0 | return Status::InternalError("used before initialize."); |
90 | 0 | } |
91 | | |
92 | 0 | RETURN_IF_CANCELLED(state); |
93 | 0 | RETURN_IF_ERROR(Scanner::_open_impl(state)); |
94 | | |
95 | 0 | RETURN_IF_ERROR(_es_reader->open()); |
96 | | |
97 | 0 | return Status::OK(); |
98 | 0 | } |
99 | | |
100 | 0 | Status EsScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { |
101 | 0 | VLOG_CRITICAL << NEW_SCANNER_TYPE << "::_get_block_impl"; |
102 | 0 | if (nullptr == state || nullptr == block || nullptr == eof) { |
103 | 0 | return Status::InternalError("input is NULL pointer"); |
104 | 0 | } |
105 | | |
106 | 0 | if (!_has_prepared) { |
107 | 0 | return Status::InternalError("used before initialize."); |
108 | 0 | } |
109 | | |
110 | 0 | RETURN_IF_CANCELLED(state); |
111 | | |
112 | 0 | if (_es_eof == true) { |
113 | 0 | *eof = true; |
114 | 0 | return Status::OK(); |
115 | 0 | } |
116 | | |
117 | 0 | auto column_size = _tuple_desc->slots().size(); |
118 | 0 | std::vector<MutableColumnPtr> columns(column_size); |
119 | |
|
120 | 0 | bool mem_reuse = block->mem_reuse(); |
121 | 0 | const int batch_size = state->batch_size(); |
122 | | // only empty block should be here |
123 | 0 | DCHECK(block->rows() == 0); |
124 | |
|
125 | 0 | do { |
126 | 0 | columns.resize(column_size); |
127 | 0 | for (auto i = 0; i < column_size; i++) { |
128 | 0 | if (mem_reuse) { |
129 | 0 | columns[i] = block->get_by_position(i).column->assume_mutable(); |
130 | 0 | } else { |
131 | 0 | columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column(); |
132 | 0 | } |
133 | 0 | } |
134 | |
|
135 | 0 | while (columns[0]->size() < batch_size && !_es_eof) { |
136 | 0 | RETURN_IF_CANCELLED(state); |
137 | | // Get from scanner |
138 | 0 | RETURN_IF_ERROR(_get_next(columns)); |
139 | 0 | } |
140 | | |
141 | 0 | if (_es_eof == true) { |
142 | 0 | if (block->rows() == 0) { |
143 | 0 | *eof = true; |
144 | 0 | } |
145 | 0 | break; |
146 | 0 | } |
147 | | |
148 | | // Before really use the Block, must clear other ptr of column in block |
149 | | // So here need do std::move and clear in `columns` |
150 | 0 | if (!mem_reuse) { |
151 | 0 | int column_index = 0; |
152 | 0 | for (const auto slot_desc : _tuple_desc->slots()) { |
153 | 0 | block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]), |
154 | 0 | slot_desc->get_data_type_ptr(), |
155 | 0 | slot_desc->col_name())); |
156 | 0 | } |
157 | 0 | } else { |
158 | 0 | columns.clear(); |
159 | 0 | } |
160 | 0 | VLOG_ROW << "EsScanner output rows: " << block->rows(); |
161 | 0 | } while (block->rows() == 0 && !(*eof)); |
162 | 0 | return Status::OK(); |
163 | 0 | } |
164 | | |
165 | 0 | Status EsScanner::_get_next(std::vector<MutableColumnPtr>& columns) { |
166 | 0 | SCOPED_TIMER(_local_state->cast<EsScanLocalState>()._read_timer); |
167 | 0 | if (_line_eof && _batch_eof) { |
168 | 0 | _es_eof = true; |
169 | 0 | return Status::OK(); |
170 | 0 | } |
171 | | |
172 | 0 | while (!_batch_eof) { |
173 | 0 | if (_line_eof || _es_scroll_parser == nullptr) { |
174 | 0 | RETURN_IF_ERROR(_es_reader->get_next(&_batch_eof, _es_scroll_parser)); |
175 | 0 | if (_batch_eof) { |
176 | 0 | _es_eof = true; |
177 | 0 | return Status::OK(); |
178 | 0 | } |
179 | 0 | } |
180 | | |
181 | 0 | COUNTER_UPDATE(_local_state->cast<EsScanLocalState>()._blocks_read_counter, 1); |
182 | 0 | SCOPED_TIMER(_local_state->cast<EsScanLocalState>()._materialize_timer); |
183 | 0 | RETURN_IF_ERROR(_es_scroll_parser->fill_columns(_tuple_desc, columns, &_line_eof, |
184 | 0 | _docvalue_context, _state->timezone_obj())); |
185 | 0 | if (!_line_eof) { |
186 | 0 | break; |
187 | 0 | } |
188 | 0 | } |
189 | | |
190 | 0 | return Status::OK(); |
191 | 0 | } |
192 | | |
193 | 0 | Status EsScanner::close(RuntimeState* state) { |
194 | 0 | if (!_try_close()) { |
195 | 0 | return Status::OK(); |
196 | 0 | } |
197 | | |
198 | 0 | if (_es_reader != nullptr) { |
199 | 0 | RETURN_IF_ERROR(_es_reader->close()); |
200 | 0 | } |
201 | | |
202 | 0 | RETURN_IF_ERROR(Scanner::close(state)); |
203 | 0 | return Status::OK(); |
204 | 0 | } |
205 | | } // namespace doris |