be/src/exec/es/es_scroll_query.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/es/es_scroll_query.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | #include <rapidjson/encodings.h> |
22 | | #include <rapidjson/rapidjson.h> |
23 | | #include <stdlib.h> |
24 | | |
25 | | #include <sstream> |
26 | | |
27 | | #include "exec/es/es_scan_reader.h" |
28 | | #include "rapidjson/document.h" |
29 | | #include "rapidjson/stringbuffer.h" |
30 | | #include "rapidjson/writer.h" |
31 | | |
32 | | namespace doris { |
33 | | |
34 | 0 | ESScrollQueryBuilder::ESScrollQueryBuilder() {} |
35 | | |
36 | 0 | ESScrollQueryBuilder::~ESScrollQueryBuilder() {} |
37 | | |
38 | | std::string ESScrollQueryBuilder::build_next_scroll_body(const std::string& scroll_id, |
39 | 0 | const std::string& scroll) { |
40 | 0 | rapidjson::Document scroll_dsl; |
41 | 0 | rapidjson::Document::AllocatorType& allocator = scroll_dsl.GetAllocator(); |
42 | 0 | scroll_dsl.SetObject(); |
43 | 0 | rapidjson::Value scroll_id_value(scroll_id.c_str(), allocator); |
44 | 0 | scroll_dsl.AddMember("scroll_id", scroll_id_value, allocator); |
45 | 0 | rapidjson::Value scroll_value(scroll.c_str(), allocator); |
46 | 0 | scroll_dsl.AddMember("scroll", scroll_value, allocator); |
47 | 0 | rapidjson::StringBuffer buffer; |
48 | 0 | rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); |
49 | 0 | scroll_dsl.Accept(writer); |
50 | 0 | return buffer.GetString(); |
51 | 0 | } |
52 | 704 | std::string ESScrollQueryBuilder::build_clear_scroll_body(const std::string& scroll_id) { |
53 | 704 | rapidjson::Document delete_scroll_dsl; |
54 | 704 | rapidjson::Document::AllocatorType& allocator = delete_scroll_dsl.GetAllocator(); |
55 | 704 | delete_scroll_dsl.SetObject(); |
56 | 704 | rapidjson::Value scroll_id_value(scroll_id.c_str(), allocator); |
57 | 704 | delete_scroll_dsl.AddMember("scroll_id", scroll_id_value, allocator); |
58 | 704 | rapidjson::StringBuffer buffer; |
59 | 704 | rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); |
60 | 704 | delete_scroll_dsl.Accept(writer); |
61 | 704 | return buffer.GetString(); |
62 | 704 | } |
63 | | |
64 | | std::string ESScrollQueryBuilder::build(const std::map<std::string, std::string>& properties, |
65 | | const std::vector<std::string>& fields, |
66 | | const std::map<std::string, std::string>& docvalue_context, |
67 | 704 | bool* doc_value_mode) { |
68 | 704 | rapidjson::Document es_query_dsl; |
69 | 704 | rapidjson::Document::AllocatorType& allocator = es_query_dsl.GetAllocator(); |
70 | 704 | es_query_dsl.SetObject(); |
71 | | // generate the filter clause |
72 | 704 | rapidjson::Document scratch_document; |
73 | 704 | rapidjson::Value query_node(rapidjson::kObjectType); |
74 | | // use fe generate dsl, it must be placed outside the if, otherwise it will cause problems in AddMember |
75 | 704 | rapidjson::Document fe_query_dsl; |
76 | 704 | DCHECK(properties.find(ESScanReader::KEY_QUERY_DSL) != properties.end()); |
77 | 704 | auto query_dsl = properties.at(ESScanReader::KEY_QUERY_DSL); |
78 | 704 | es_query_dsl.AddMember("query", fe_query_dsl.Parse(query_dsl.c_str(), query_dsl.length()), |
79 | 704 | allocator); |
80 | | |
81 | | // Doris FE already has checked docvalue-scan optimization |
82 | 704 | bool pure_docvalue = true; |
83 | 704 | if (properties.find(ESScanReader::KEY_DOC_VALUES_MODE) != properties.end()) { |
84 | 700 | pure_docvalue = atoi(properties.at(ESScanReader::KEY_DOC_VALUES_MODE).c_str()); |
85 | 700 | } else { |
86 | | // check docvalue scan optimization, used for compatibility |
87 | 4 | if (docvalue_context.size() == 0 || docvalue_context.size() < fields.size()) { |
88 | 4 | pure_docvalue = false; |
89 | 4 | } else { |
90 | 0 | for (auto& select_field : fields) { |
91 | 0 | if (docvalue_context.find(select_field) == docvalue_context.end()) { |
92 | 0 | pure_docvalue = false; |
93 | 0 | break; |
94 | 0 | } |
95 | 0 | } |
96 | 0 | } |
97 | 4 | } |
98 | | |
99 | 704 | *doc_value_mode = pure_docvalue; |
100 | | |
101 | 704 | rapidjson::Value source_node(rapidjson::kArrayType); |
102 | 704 | if (pure_docvalue) { |
103 | 228 | for (auto& select_field : fields) { |
104 | 228 | rapidjson::Value field(docvalue_context.at(select_field).c_str(), allocator); |
105 | 228 | source_node.PushBack(field, allocator); |
106 | 228 | } |
107 | 520 | } else { |
108 | 6.86k | for (auto& select_field : fields) { |
109 | 6.86k | rapidjson::Value field(select_field.c_str(), allocator); |
110 | 6.86k | source_node.PushBack(field, allocator); |
111 | 6.86k | } |
112 | 520 | } |
113 | | |
114 | | // just filter the selected fields for reducing the network cost |
115 | 704 | if (pure_docvalue) { |
116 | 184 | es_query_dsl.AddMember("stored_fields", "_none_", allocator); |
117 | 184 | es_query_dsl.AddMember("docvalue_fields", source_node, allocator); |
118 | 520 | } else { |
119 | 520 | es_query_dsl.AddMember("_source", source_node, allocator); |
120 | 520 | } |
121 | | |
122 | 704 | int size; |
123 | 704 | if (properties.find(ESScanReader::KEY_TERMINATE_AFTER) != properties.end()) { |
124 | 0 | size = atoi(properties.at(ESScanReader::KEY_TERMINATE_AFTER).c_str()); |
125 | 704 | } else { |
126 | 704 | size = atoi(properties.at(ESScanReader::KEY_BATCH_SIZE).c_str()); |
127 | 704 | } |
128 | | |
129 | 704 | std::string shard_id; |
130 | 704 | if (properties.find(ESScanReader::KEY_SHARD) != properties.end()) { |
131 | 704 | shard_id = properties.at(ESScanReader::KEY_SHARD); |
132 | 704 | } |
133 | | // To maintain consistency with the query, when shard_id is negative, do not add sort node in scroll request body. |
134 | 704 | if (!shard_id.empty() && std::stoi(shard_id) >= 0) { |
135 | 390 | rapidjson::Value sort_node(rapidjson::kArrayType); |
136 | | // use the scroll-scan mode for scan index documents |
137 | 390 | rapidjson::Value field("_doc", allocator); |
138 | 390 | sort_node.PushBack(field, allocator); |
139 | 390 | es_query_dsl.AddMember("sort", sort_node, allocator); |
140 | 390 | } |
141 | | // number of documents returned |
142 | 704 | es_query_dsl.AddMember("size", size, allocator); |
143 | 704 | rapidjson::StringBuffer buffer; |
144 | 704 | rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); |
145 | 704 | es_query_dsl.Accept(writer); |
146 | 704 | std::string es_query_dsl_json = buffer.GetString(); |
147 | | LOG(INFO) << "Generated ES queryDSL [ " << es_query_dsl_json << " ]"; |
148 | 704 | return es_query_dsl_json; |
149 | 704 | } |
150 | | } // namespace doris |