1 /*
2 * Copyright 2009-2017 Alibaba Cloud All rights reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <alibabacloud/oss/model/SelectObjectRequest.h>
18 #include <iostream>
19 #include <sstream>
20 #include <cstring>
21 #include "ModelError.h"
22 #include "utils/Utils.h"
23 #include "../utils/LogUtils.h"
24 #include "../utils/Crc32.h"
25 #include "../utils/StreamBuf.h"
26
27 #define FRAME_HEADER_LEN (12+8)
28
29 using namespace AlibabaCloud::OSS;
30
31
32 struct SelectObjectFrame {
33 int frame_type;
34 int init_crc32;
35 int32_t header_len;
36 int32_t tail_len;
37 int32_t payload_remains;
38 uint8_t tail[4];
39 uint8_t header[FRAME_HEADER_LEN];
40 uint8_t end_frame[256];
41 uint32_t end_frame_size;
42 uint32_t payload_crc32;
43 };
44
45
46 class SelectObjectStreamBuf : public StreamBufProxy
47 {
48 public:
SelectObjectStreamBuf(std::iostream & stream,int initCrc32)49 SelectObjectStreamBuf(std::iostream& stream, int initCrc32) :
50 StreamBufProxy(stream),
51 lastStatus_(0)
52 {
53 // init frame
54 frame_.init_crc32 = initCrc32;
55 frame_.header_len = 0;
56 frame_.tail_len = 0;
57 frame_.payload_remains = 0;
58 frame_.end_frame_size = 0;
59 };
60
LastStatus()61 int LastStatus()
62 {
63 return lastStatus_;
64 }
65
66 protected:
selectObjectDepackFrame(const char * ptr,int len,int * frame_type,int * payload_len,char ** payload_buf,SelectObjectFrame * frame)67 int selectObjectDepackFrame(const char *ptr, int len, int *frame_type, int *payload_len, char **payload_buf, SelectObjectFrame *frame)
68 {
69 int remain = len;
70 //Version | Frame - Type | Payload Length | Header Checksum | Payload | Payload Checksum
71 //<1 byte> <--3 bytes--> <-- 4 bytes --> <------4 bytes--> <variable><----4bytes------>
72 //Payload
73 //<offset | data>
74 //<8 types><variable>
75
76 // header
77 if (frame->header_len < FRAME_HEADER_LEN) {
78 int copy = FRAME_HEADER_LEN - frame->header_len;
79 copy = ((remain > copy) ? copy : remain);
80 memcpy(frame->header + frame->header_len, ptr, copy);
81 frame->header_len += copy;
82 ptr += copy;
83 remain -= copy;
84
85 // if deal with header done
86 if (frame->header_len == FRAME_HEADER_LEN) {
87 uint32_t payload_length;
88 // calculation payload length
89 payload_length = frame->header[4];
90 payload_length = (payload_length << 8) | frame->header[5];
91 payload_length = (payload_length << 8) | frame->header[6];
92 payload_length = (payload_length << 8) | frame->header[7];
93 frame->payload_remains = payload_length - 8;
94 frame->payload_crc32 = CRC32::CalcCRC(frame->init_crc32, frame->header + 12, 8);
95 }
96 }
97
98 // payload
99 if (frame->payload_remains > 0) {
100 int copy = (frame->payload_remains > remain) ? remain : frame->payload_remains;
101 uint32_t type;
102 type = frame->header[1];
103 type = (type << 8) | frame->header[2];
104 type = (type << 8) | frame->header[3];
105 *frame_type = type;
106 *payload_len = copy;
107 *payload_buf = (char *)ptr;
108 remain -= copy;
109 frame->payload_remains -= copy;
110 frame->payload_crc32 = CRC32::CalcCRC(frame->payload_crc32, ptr, copy);
111 return len - remain;
112 }
113
114 // tail
115 if (frame->tail_len < 4) {
116 int copy = 4 - frame->tail_len;
117 copy = (copy > remain ? remain : copy);
118 memcpy(frame->tail + frame->tail_len, ptr, copy);
119 frame->tail_len += copy;
120 remain -= copy;
121 *frame_type = 0;
122 }
123
124 return len - remain;
125 }
126
selectObjectTransferContent(SelectObjectFrame * frame,const char * ptr,int wanted)127 int selectObjectTransferContent(SelectObjectFrame *frame, const char *ptr, int wanted) {
128 int remain = wanted;
129 char *payload_buf;
130 // the actual length of the write
131 int result = 0;
132 // deal with the whole buffer
133 while (remain > 0) {
134 int frame_type = 0, payload_len = 0;
135 int ret = selectObjectDepackFrame(ptr, remain, &frame_type, &payload_len, &payload_buf, frame);
136 switch (frame_type)
137 {
138 case 0x800001:
139 int temp;
140 temp = static_cast<int>(StreamBufProxy::xsputn(payload_buf, payload_len));
141 if (temp < 0) {
142 return temp;
143 }
144 result += temp;
145 break;
146 case 0x800004: //Continuous Frame
147 break;
148 case 0x800005: //Select object End Frame
149 {
150 int32_t copy = sizeof(frame->end_frame) - frame->end_frame_size;
151 copy = (copy > payload_len) ? payload_len : copy;
152 if (copy > 0) {
153 memcpy(frame->end_frame + frame->end_frame_size, ptr, copy);
154 frame->end_frame_size += copy;
155 }
156 }
157 break;
158 default:
159 // get payload checksum
160 if (frame->tail_len == 4) {
161 // compare check sum
162 uint32_t payload_crc32;
163 payload_crc32 = frame->tail[0];
164 payload_crc32 = (payload_crc32 << 8) | frame->tail[1];
165 payload_crc32 = (payload_crc32 << 8) | frame->tail[2];
166 payload_crc32 = (payload_crc32 << 8) | frame->tail[3];
167 if (payload_crc32 != 0 && payload_crc32 != frame->payload_crc32) {
168 // CRC32 Checksum failed
169 return -1;
170 }
171
172 // reset to get next frame
173 frame->header_len = 0;
174 frame->tail_len = 0;
175 frame->payload_remains = 0;
176 frame->end_frame_size = 0;
177 }
178 break;
179 }
180 ptr += ret;
181 remain -= ret;
182 }
183 return result;
184 }
185
xsputn(const char * ptr,std::streamsize count)186 std::streamsize xsputn(const char *ptr, std::streamsize count)
187 {
188 int result = selectObjectTransferContent(&frame_, ptr, static_cast<int>(count));
189 if (result < 0) {
190 if (result == -1) {
191 lastStatus_ = ARG_ERROR_SELECT_OBJECT_CHECK_SUM_FAILED;
192 }
193 return static_cast<std::streamsize>(result);
194 }
195 return count;
196 }
197
198 private:
199 SelectObjectFrame frame_;
200 int lastStatus_;
201 };
202
203 /////////////////////////////////////////////////////////////
204
SelectObjectRequest(const std::string & bucket,const std::string & key)205 SelectObjectRequest::SelectObjectRequest(const std::string& bucket, const std::string& key) :
206 GetObjectRequest(bucket, key),
207 expressionType_(ExpressionType::SQL),
208 skipPartialDataRecord_(false),
209 maxSkippedRecordsAllowed_(0),
210 inputFormat_(nullptr),
211 outputFormat_(nullptr),
212 streamBuffer_(nullptr),
213 upperContent_(nullptr)
214 {
215 setResponseStreamFactory(ResponseStreamFactory());
216
217 // close CRC Checksum
218 int flag = Flags();
219 flag |= REQUEST_FLAG_CONTENTMD5;
220 flag &= ~REQUEST_FLAG_CHECK_CRC64;
221 setFlags(flag);
222 }
223
setResponseStreamFactory(const IOStreamFactory & factory)224 void SelectObjectRequest::setResponseStreamFactory(const IOStreamFactory& factory)
225 {
226 upperResponseStreamFactory_ = factory;
227 ServiceRequest::setResponseStreamFactory([=]() {
228 streamBuffer_ = nullptr;
229 auto content = upperResponseStreamFactory_();
230 if (!outputFormat_->OutputRawData()) {
231 int initCrc32 = 0;
232 #ifdef ENABLE_OSS_TEST
233 if (!!(Flags() & 0x20000000)) {
234 const char* TAG = "SelectObjectClient";
235 OSS_LOG(LogLevel::LogDebug, TAG, "Payload checksum fail.");
236 initCrc32 = 1;
237 }
238 #endif // ENABLE_OSS_TEST
239 streamBuffer_ = std::make_shared<SelectObjectStreamBuf>(*content, initCrc32);
240 }
241 upperContent_ = content;
242 return content;
243 });
244 }
245
MaxSkippedRecordsAllowed() const246 uint64_t SelectObjectRequest::MaxSkippedRecordsAllowed() const
247 {
248 return maxSkippedRecordsAllowed_;
249 }
250
setSkippedRecords(bool skipPartialDataRecord,uint64_t maxSkippedRecords)251 void SelectObjectRequest::setSkippedRecords(bool skipPartialDataRecord, uint64_t maxSkippedRecords)
252 {
253 skipPartialDataRecord_ = skipPartialDataRecord;
254 maxSkippedRecordsAllowed_ = maxSkippedRecords;
255 }
256
setExpression(const std::string & expression,ExpressionType type)257 void SelectObjectRequest::setExpression(const std::string& expression, ExpressionType type)
258 {
259 expressionType_ = type;
260 expression_ = expression;
261 }
262
setInputFormat(InputFormat & inputFormat)263 void SelectObjectRequest::setInputFormat(InputFormat& inputFormat)
264 {
265 inputFormat_ = &inputFormat;
266 }
267
setOutputFormat(OutputFormat & OutputFormat)268 void SelectObjectRequest::setOutputFormat(OutputFormat& OutputFormat)
269 {
270 outputFormat_ = &OutputFormat;
271 }
272
validate() const273 int SelectObjectRequest::validate() const
274 {
275 int ret = GetObjectRequest::validate();
276 if (ret != 0) {
277 return ret;
278 }
279
280 if (expressionType_ != ExpressionType::SQL) {
281 return ARG_ERROR_SELECT_OBJECT_NOT_SQL_EXPRESSION;
282 }
283 if (inputFormat_ == nullptr || outputFormat_ == nullptr) {
284 return ARG_ERROR_SELECT_OBJECT_NULL_POINT;
285 }
286
287 ret = inputFormat_->validate();
288 if (ret != 0) {
289 return ret;
290 }
291 ret = outputFormat_->validate();
292 if (ret != 0) {
293 return ret;
294 }
295 // check type
296 if (inputFormat_->Type() != outputFormat_->Type()) {
297 return ARG_ERROR_SELECT_OBJECT_PROCESS_NOT_SAME;
298 }
299 return 0;
300 }
301
payload() const302 std::string SelectObjectRequest::payload() const
303 {
304 std::stringstream ss;
305 ss << "<SelectRequest>" << std::endl;
306 // Expression
307 ss << "<Expression>" << Base64Encode(expression_) << "</Expression>" << std::endl;
308 // input format
309 ss << inputFormat_->toXML(1) << std::endl;
310 // output format
311 ss << outputFormat_->toXML() << std::endl;
312 // options
313 ss << "<Options>" << std::endl;
314 ss << "<SkipPartialDataRecord>" << (skipPartialDataRecord_ ? "true" : "false") << "</SkipPartialDataRecord>" << std::endl;
315 ss << "<MaxSkippedRecordsAllowed>" << std::to_string(MaxSkippedRecordsAllowed()) << "</MaxSkippedRecordsAllowed>" << std::endl;
316 ss << "</Options>" << std::endl;
317 ss << "</SelectRequest>" << std::endl;
318 return ss.str();
319 }
320
dispose() const321 int SelectObjectRequest::dispose() const
322 {
323 int ret = 0;
324 if (streamBuffer_ != nullptr) {
325 auto buf = std::static_pointer_cast<SelectObjectStreamBuf>(streamBuffer_);
326 ret = buf->LastStatus();
327 streamBuffer_ = nullptr;
328 }
329 upperContent_ = nullptr;
330 return ret;
331 }
332
specialParameters() const333 ParameterCollection SelectObjectRequest::specialParameters() const
334 {
335 auto parameters = GetObjectRequest::specialParameters();
336 if (inputFormat_) {
337 auto type = inputFormat_->Type();
338 type.append("/select");
339 parameters["x-oss-process"] = type;
340 }
341 return parameters;
342 }
343