1 /*
2  * Copyright 2009-2017 Alibaba Cloud All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <alibabacloud/oss/model/SelectObjectRequest.h>
18 #include <iostream>
19 #include <sstream>
20 #include <cstring>
21 #include "ModelError.h"
22 #include "utils/Utils.h"
23 #include "../utils/LogUtils.h"
24 #include "../utils/Crc32.h"
25 #include "../utils/StreamBuf.h"
26 
27 #define FRAME_HEADER_LEN   (12+8)
28 
29 using namespace AlibabaCloud::OSS;
30 
31 
32 struct SelectObjectFrame {
33     int frame_type;
34     int init_crc32;
35     int32_t header_len;
36     int32_t tail_len;
37     int32_t payload_remains;
38     uint8_t tail[4];
39     uint8_t header[FRAME_HEADER_LEN];
40     uint8_t end_frame[256];
41     uint32_t end_frame_size;
42     uint32_t payload_crc32;
43 };
44 
45 
46 class SelectObjectStreamBuf : public StreamBufProxy
47 {
48 public:
SelectObjectStreamBuf(std::iostream & stream,int initCrc32)49     SelectObjectStreamBuf(std::iostream& stream, int initCrc32) :
50         StreamBufProxy(stream),
51         lastStatus_(0)
52     {
53         // init frame
54         frame_.init_crc32 = initCrc32;
55         frame_.header_len = 0;
56         frame_.tail_len = 0;
57         frame_.payload_remains = 0;
58         frame_.end_frame_size = 0;
59     };
60 
LastStatus()61     int LastStatus()
62     {
63         return lastStatus_;
64     }
65 
66 protected:
selectObjectDepackFrame(const char * ptr,int len,int * frame_type,int * payload_len,char ** payload_buf,SelectObjectFrame * frame)67     int selectObjectDepackFrame(const char *ptr, int len, int *frame_type, int *payload_len, char **payload_buf, SelectObjectFrame *frame)
68     {
69         int remain = len;
70         //Version | Frame - Type | Payload Length | Header Checksum | Payload | Payload Checksum
71         //<1 byte> <--3 bytes-->   <-- 4 bytes --> <------4 bytes--> <variable><----4bytes------>
72         //Payload
73         //<offset | data>
74         //<8 types><variable>
75 
76         // header
77         if (frame->header_len < FRAME_HEADER_LEN) {
78             int copy = FRAME_HEADER_LEN - frame->header_len;
79             copy = ((remain > copy) ? copy : remain);
80             memcpy(frame->header + frame->header_len, ptr, copy);
81             frame->header_len += copy;
82             ptr += copy;
83             remain -= copy;
84 
85             // if deal with header done
86             if (frame->header_len == FRAME_HEADER_LEN) {
87                 uint32_t payload_length;
88                 // calculation payload length
89                 payload_length = frame->header[4];
90                 payload_length = (payload_length << 8) | frame->header[5];
91                 payload_length = (payload_length << 8) | frame->header[6];
92                 payload_length = (payload_length << 8) | frame->header[7];
93                 frame->payload_remains = payload_length - 8;
94                 frame->payload_crc32 = CRC32::CalcCRC(frame->init_crc32, frame->header + 12, 8);
95             }
96         }
97 
98         // payload
99         if (frame->payload_remains > 0) {
100             int copy = (frame->payload_remains > remain) ? remain : frame->payload_remains;
101             uint32_t type;
102             type = frame->header[1];
103             type = (type << 8) | frame->header[2];
104             type = (type << 8) | frame->header[3];
105             *frame_type = type;
106             *payload_len = copy;
107             *payload_buf = (char *)ptr;
108             remain -= copy;
109             frame->payload_remains -= copy;
110             frame->payload_crc32 = CRC32::CalcCRC(frame->payload_crc32, ptr, copy);
111             return len - remain;
112         }
113 
114         // tail
115         if (frame->tail_len < 4) {
116             int copy = 4 - frame->tail_len;
117             copy = (copy > remain ? remain : copy);
118             memcpy(frame->tail + frame->tail_len, ptr, copy);
119             frame->tail_len += copy;
120             remain -= copy;
121             *frame_type = 0;
122         }
123 
124         return len - remain;
125     }
126 
selectObjectTransferContent(SelectObjectFrame * frame,const char * ptr,int wanted)127     int selectObjectTransferContent(SelectObjectFrame *frame, const char *ptr, int wanted) {
128         int remain = wanted;
129         char *payload_buf;
130         // the actual length of the write
131         int result = 0;
132         // deal with the whole buffer
133         while (remain > 0) {
134             int  frame_type = 0, payload_len = 0;
135             int ret = selectObjectDepackFrame(ptr, remain, &frame_type, &payload_len, &payload_buf, frame);
136             switch (frame_type)
137             {
138             case 0x800001:
139                 int temp;
140                 temp = static_cast<int>(StreamBufProxy::xsputn(payload_buf, payload_len));
141                 if (temp < 0) {
142                     return temp;
143                 }
144                 result += temp;
145                 break;
146             case 0x800004: //Continuous Frame
147                 break;
148             case 0x800005: //Select object End Frame
149             {
150                 int32_t copy = sizeof(frame->end_frame) - frame->end_frame_size;
151                 copy = (copy > payload_len) ? payload_len : copy;
152                 if (copy > 0) {
153                     memcpy(frame->end_frame + frame->end_frame_size, ptr, copy);
154                     frame->end_frame_size += copy;
155                 }
156             }
157             break;
158             default:
159                 // get payload checksum
160                 if (frame->tail_len == 4) {
161                     // compare check sum
162                     uint32_t payload_crc32;
163                     payload_crc32 = frame->tail[0];
164                     payload_crc32 = (payload_crc32 << 8) | frame->tail[1];
165                     payload_crc32 = (payload_crc32 << 8) | frame->tail[2];
166                     payload_crc32 = (payload_crc32 << 8) | frame->tail[3];
167                     if (payload_crc32 != 0 && payload_crc32 != frame->payload_crc32) {
168                         // CRC32 Checksum failed
169                         return -1;
170                     }
171 
172                     // reset to get next frame
173                     frame->header_len = 0;
174                     frame->tail_len = 0;
175                     frame->payload_remains = 0;
176                     frame->end_frame_size = 0;
177                 }
178                 break;
179             }
180             ptr += ret;
181             remain -= ret;
182         }
183         return result;
184     }
185 
xsputn(const char * ptr,std::streamsize count)186     std::streamsize xsputn(const char *ptr, std::streamsize count)
187     {
188         int result = selectObjectTransferContent(&frame_, ptr, static_cast<int>(count));
189         if (result < 0) {
190             if (result == -1) {
191                 lastStatus_ = ARG_ERROR_SELECT_OBJECT_CHECK_SUM_FAILED;
192             }
193             return static_cast<std::streamsize>(result);
194         }
195         return count;
196     }
197 
198 private:
199     SelectObjectFrame frame_;
200     int lastStatus_;
201 };
202 
203 /////////////////////////////////////////////////////////////
204 
SelectObjectRequest(const std::string & bucket,const std::string & key)205 SelectObjectRequest::SelectObjectRequest(const std::string& bucket, const std::string& key) :
206     GetObjectRequest(bucket, key),
207     expressionType_(ExpressionType::SQL),
208     skipPartialDataRecord_(false),
209     maxSkippedRecordsAllowed_(0),
210     inputFormat_(nullptr),
211     outputFormat_(nullptr),
212     streamBuffer_(nullptr),
213     upperContent_(nullptr)
214 {
215     setResponseStreamFactory(ResponseStreamFactory());
216 
217     // close CRC Checksum
218     int flag = Flags();
219     flag |= REQUEST_FLAG_CONTENTMD5;
220     flag &= ~REQUEST_FLAG_CHECK_CRC64;
221     setFlags(flag);
222 }
223 
setResponseStreamFactory(const IOStreamFactory & factory)224 void SelectObjectRequest::setResponseStreamFactory(const IOStreamFactory& factory)
225 {
226     upperResponseStreamFactory_ = factory;
227     ServiceRequest::setResponseStreamFactory([=]() {
228         streamBuffer_ = nullptr;
229         auto content = upperResponseStreamFactory_();
230         if (!outputFormat_->OutputRawData()) {
231             int initCrc32 = 0;
232 #ifdef ENABLE_OSS_TEST
233             if (!!(Flags() & 0x20000000)) {
234                 const char* TAG = "SelectObjectClient";
235                 OSS_LOG(LogLevel::LogDebug, TAG, "Payload checksum fail.");
236                 initCrc32 = 1;
237             }
238 #endif // ENABLE_OSS_TEST
239             streamBuffer_ = std::make_shared<SelectObjectStreamBuf>(*content, initCrc32);
240         }
241         upperContent_ = content;
242         return content;
243     });
244 }
245 
MaxSkippedRecordsAllowed() const246 uint64_t SelectObjectRequest::MaxSkippedRecordsAllowed() const
247 {
248     return maxSkippedRecordsAllowed_;
249 }
250 
setSkippedRecords(bool skipPartialDataRecord,uint64_t maxSkippedRecords)251 void SelectObjectRequest::setSkippedRecords(bool skipPartialDataRecord, uint64_t maxSkippedRecords)
252 {
253     skipPartialDataRecord_ = skipPartialDataRecord;
254     maxSkippedRecordsAllowed_ = maxSkippedRecords;
255 }
256 
setExpression(const std::string & expression,ExpressionType type)257 void SelectObjectRequest::setExpression(const std::string& expression, ExpressionType type)
258 {
259     expressionType_ = type;
260     expression_ = expression;
261 }
262 
setInputFormat(InputFormat & inputFormat)263 void SelectObjectRequest::setInputFormat(InputFormat& inputFormat)
264 {
265     inputFormat_ = &inputFormat;
266 }
267 
setOutputFormat(OutputFormat & OutputFormat)268 void SelectObjectRequest::setOutputFormat(OutputFormat& OutputFormat)
269 {
270     outputFormat_ = &OutputFormat;
271 }
272 
validate() const273 int SelectObjectRequest::validate() const
274 {
275     int ret = GetObjectRequest::validate();
276     if (ret != 0) {
277         return ret;
278     }
279 
280     if (expressionType_ != ExpressionType::SQL) {
281         return ARG_ERROR_SELECT_OBJECT_NOT_SQL_EXPRESSION;
282     }
283     if (inputFormat_ == nullptr || outputFormat_ == nullptr) {
284         return ARG_ERROR_SELECT_OBJECT_NULL_POINT;
285     }
286 
287     ret = inputFormat_->validate();
288     if (ret != 0) {
289         return ret;
290     }
291     ret = outputFormat_->validate();
292     if (ret != 0) {
293         return ret;
294     }
295     // check type
296     if (inputFormat_->Type() != outputFormat_->Type()) {
297         return ARG_ERROR_SELECT_OBJECT_PROCESS_NOT_SAME;
298     }
299     return 0;
300 }
301 
payload() const302 std::string SelectObjectRequest::payload() const
303 {
304     std::stringstream ss;
305     ss << "<SelectRequest>" << std::endl;
306     // Expression
307     ss << "<Expression>" << Base64Encode(expression_) << "</Expression>" << std::endl;
308     // input format
309     ss << inputFormat_->toXML(1) << std::endl;
310     // output format
311     ss << outputFormat_->toXML() << std::endl;
312     // options
313     ss << "<Options>" << std::endl;
314     ss << "<SkipPartialDataRecord>" << (skipPartialDataRecord_ ? "true" : "false") << "</SkipPartialDataRecord>" << std::endl;
315     ss << "<MaxSkippedRecordsAllowed>" << std::to_string(MaxSkippedRecordsAllowed()) << "</MaxSkippedRecordsAllowed>" << std::endl;
316     ss << "</Options>" << std::endl;
317     ss << "</SelectRequest>" << std::endl;
318     return ss.str();
319 }
320 
dispose() const321 int SelectObjectRequest::dispose() const
322 {
323     int ret = 0;
324     if (streamBuffer_ != nullptr) {
325         auto buf = std::static_pointer_cast<SelectObjectStreamBuf>(streamBuffer_);
326         ret = buf->LastStatus();
327         streamBuffer_ = nullptr;
328     }
329     upperContent_ = nullptr;
330     return ret;
331 }
332 
specialParameters() const333 ParameterCollection SelectObjectRequest::specialParameters() const
334 {
335     auto parameters = GetObjectRequest::specialParameters();
336     if (inputFormat_) {
337         auto type = inputFormat_->Type();
338         type.append("/select");
339         parameters["x-oss-process"] = type;
340     }
341     return parameters;
342 }
343