data_lake_processor.c
Go to the documentation of this file.
1 
9 #include "data_lake_processor.h"
10 #include "data_lake_object.h"
11 #include "data_lake_json.h"
14 #include "mcl_core/mcl_memory.h"
16 
17 #define SINGLE_OBJECT_SIZE 1
18 
19 static const char bearer_format[] = "Bearer %s";
20 static const char string_identifier[] = "%s";
21 static const char application_json[] = "application/json";
22 static const char content_type[] = "Content-Type";
23 static const char azure_storage_url[] = "blob.core.windows.net";
24 static const char azure_header_blob_type[] = "x-ms-blob-type";
25 static const char azure_header_block_blob[] = "BlockBlob";
26 
27 // Function used to add authorization header to the request.
28 static mcl_error_t _add_authorization_header(mcl_http_request_t *request, const char *access_token);
29 
30 // Since signed URLs may not be in the same order with the request body, array may save time.
31 // It also helps to implement a function which can handle both single object and multiple objects.
33 
34 // Function to get signed urls from the server.
35 static mcl_error_t _generate_upload_urls(data_lake_processor_t *processor, data_lake_object_t **object_array, mcl_size_t array_size, const char *subtenant_id);
36 
37 // Function to clear signed urls.
38 static void _clear_signed_urls(data_lake_object_t **object_array, mcl_size_t array_size);
39 
40 // Check signed URL if data lake service uses Azure Storage.
42 
44 {
45  mcl_error_t code;
46  data_lake_object_t *single_object = object;
47 
48  MCL_DEBUG_ENTRY("data_lake_processor_t *processor = <%p>, data_lake_object_t *object = <%p>, const char *subtenant_id = <%p>",
49  processor, object, subtenant_id);
50 
51  code = _generate_upload_urls(processor, &single_object, SINGLE_OBJECT_SIZE, subtenant_id);
52 
53  MCL_DEBUG_LEAVE("retVal = <%d>", code);
54  return code;
55 }
56 
57 mcl_error_t data_lake_processor_generate_upload_urls(data_lake_processor_t *processor, mcl_list_t *object_list, const char *subtenant_id)
58 {
59  mcl_error_t code;
60  data_lake_object_t **object_array;
61 
62  MCL_DEBUG_ENTRY("data_lake_processor_t *processor = <%p>, mcl_list_t *object_list = <%p>, const char *subtenant_id = <%p>",
63  processor, object_list, subtenant_id);
64 
65  object_array = _data_lake_object_list_to_array(object_list);
66 
67  if (MCL_NULL == object_array)
68  {
69  code = MCL_OUT_OF_MEMORY;
70  }
71  else
72  {
73  code = _generate_upload_urls(processor, object_array, object_list->count, subtenant_id);
74  }
75 
76  MCL_FREE(object_array);
77 
78  MCL_DEBUG_LEAVE("retVal = <%d>", code);
79  return code;
80 }
81 
83 {
84  // Variable declaration.
85  mcl_error_t code;
86  mcl_http_request_t *request = MCL_NULL;
87  mcl_http_response_t *response = MCL_NULL;
89 
90  MCL_DEBUG_ENTRY("data_lake_processor_t *processor = <%p>, data_lake_object_t *object = <%p>", processor, object);
91 
92  // Validate object.
93  code = data_lake_object_validate(object);
94 
95  // Initialize request.
96  if (MCL_OK == code)
97  {
98  code = mcl_http_request_initialize(&request);
99  }
100 
101  // Set method.
102  if (MCL_OK == code)
103  {
105  }
106 
107  // Set URL.
108  if (MCL_OK == code)
109  {
111  }
112 
113  // Set callback.
114  if (MCL_OK == code)
115  {
117  }
118 
119  // Set user context.
120  if (MCL_OK == code)
121  {
123  }
124 
125  // Set payload size.
126  if (MCL_OK == code)
127  {
129  }
130 
131  if ((MCL_OK == code) && (MCL_TRUE == _check_url_if_it_is_for_azure_storage(object)))
132  {
134  }
135 
136  // Send request.
137  if (MCL_OK == code)
138  {
139  code = mcl_http_client_send(processor->http_client, request, &response);
140  }
141 
142  mcl_http_request_destroy(&request);
143 
144  if (MCL_OK == code)
145  {
147  {
148  MCL_INFO("Uploaded to data lake successfully.");
149  }
150  else
151  {
152  code = mcl_http_response_get_status(response);
153  MCL_ERROR("HTTP %d received from server for the request.", response->status_code);
154 
155  if (MCL_NULL != response->payload)
156  {
157  MCL_ERROR("HTTP Response:\n%.*s", response->payload_size, response->payload);
158  }
159  }
160  }
161 
162  mcl_http_response_destroy(&response);
163 
164  MCL_DEBUG_LEAVE("retVal = <%d>", code);
165  return code;
166 }
167 
168 static mcl_error_t _add_authorization_header(mcl_http_request_t *request, const char *access_token)
169 {
170  // Variable declaration.
172  mcl_size_t jwt_length;
173  char *jwt;
174 
175  MCL_DEBUG_ENTRY("mcl_http_request_t *request = <%p>, const char *access_token = <%p>", request, access_token);
176 
177  // Size of jwt including null character.
178  jwt_length = (sizeof(bearer_format) - sizeof(string_identifier)) + mcl_string_util_strlen(access_token) + MCL_NULL_CHAR_SIZE;
179 
180  // Allocate buffer for jwt.
181  jwt = MCL_MALLOC(jwt_length);
182 
183  if (MCL_NULL != jwt)
184  {
185  code = mcl_string_util_snprintf(jwt, jwt_length, bearer_format, access_token);
186 
187  if (MCL_OK == code)
188  {
189  code = mcl_http_request_add_header(request, "Authorization", jwt);
190  }
191 
192  MCL_FREE(jwt);
193  }
194 
195  MCL_DEBUG_LEAVE("retVal = <%d>", code);
196  return code;
197 }
198 
199 static mcl_error_t _generate_upload_urls(data_lake_processor_t *processor, data_lake_object_t **object_array, mcl_size_t array_size, const char *subtenant_id)
200 {
201  mcl_error_t code = MCL_OK;
202  mcl_http_request_t *request = MCL_NULL;
203  mcl_size_t client_id_length;
204  mcl_http_response_t *response = MCL_NULL;
205  char *body = MCL_NULL;
207 
208  MCL_DEBUG_ENTRY("data_lake_processor_t *processor = <%p>, data_lake_object_t **object_array = <%p>, mcl_size_t array_size = <%lu>, "\
209  "const char *subtenant_id = <%p>", processor, object_array, array_size, subtenant_id);
210 
211  // First clear signed urls.
212  _clear_signed_urls(object_array, array_size);
213 
214  // Calculate client id length.
215  client_id_length = mcl_string_util_strlen(processor->client_id);
216 
217  // Create json body.
218  code = data_lake_json_from_objects(object_array, array_size, processor->client_id, subtenant_id, &body);
219 
220  // Initialize request.
221  if (MCL_OK == code)
222  {
223  code = mcl_http_request_initialize(&request);
224  }
225 
226  // Set method.
227  if (MCL_OK == code)
228  {
230  }
231 
232  // Set URL.
233  if (MCL_OK == code)
234  {
236  }
237 
238  // Set body.
239  if (MCL_OK == code)
240  {
242  }
243 
244  // Set body size.
245  if (MCL_OK == code)
246  {
247  mcl_size_t body_size = mcl_string_util_strlen(body);
248 
250  }
251 
252  // Add content type header.
253  if (MCL_OK == code)
254  {
256  }
257 
258  // Add authorization header.
259  if (MCL_OK == code)
260  {
261  code = _add_authorization_header(request, processor->access_token);
262  }
263 
264  // Send request.
265  if (MCL_OK == code)
266  {
267  code = mcl_http_client_send(processor->http_client, request, &response);
268  }
269 
270  MCL_FREE(body);
271  mcl_http_request_destroy(&request);
272 
273  if (MCL_OK == code)
274  {
275  if (MCL_HTTP_STATUS_CODE_CREATED == response->status_code)
276  {
277  code = data_lake_json_match_signed_urls_with_objects(object_array, array_size, (char *)response->payload, response->payload_size, client_id_length);
278  }
279  else
280  {
281  code = mcl_http_response_get_status(response);
282  MCL_ERROR("HTTP %d received from server for the request.", response->status_code);
283 
284  if (MCL_NULL != response->payload)
285  {
286  MCL_ERROR("HTTP Response:\n%.*s", response->payload_size, response->payload);
287  }
288  }
289  }
290 
291  mcl_http_response_destroy(&response);
292 
293  MCL_DEBUG_LEAVE("retVal = <%d>", code);
294  return code;
295 }
296 
297 static void _clear_signed_urls(data_lake_object_t **object_array, mcl_size_t array_size)
298 {
299  mcl_size_t index;
300  MCL_DEBUG_ENTRY("data_lake_object_t **object_array = <%p>, mcl_size_t array_size = <%lu>", object_array, array_size);
301 
302  for (index = 0; index < array_size; ++index)
303  {
304  MCL_FREE(object_array[index]->signed_url);
305  }
306 
307  MCL_DEBUG_LEAVE("retVal = <void>");
308 }
309 
311 {
312  data_lake_object_t **data_lake_object_array;
313  mcl_size_t count = object_list->count;
314 
315  MCL_DEBUG_ENTRY("mcl_list_t *object_list = <%p>", object_list);
316 
317  // Size of pointer, not actual struct.
318  data_lake_object_array = MCL_MALLOC(count * sizeof(data_lake_object_t *));
319 
320  if (MCL_NULL != data_lake_object_array)
321  {
322  mcl_size_t index;
323 
324  for (index = 0; index < count; ++index)
325  {
326  mcl_list_node_t *node;
327 
328  // Get next data lake object.
329  (void) mcl_list_next(object_list, &node);
330 
331  data_lake_object_array[index] = (mcl_data_lake_object_t *) node->data;
332  }
333  }
334 
335  MCL_DEBUG_LEAVE("retVal = <%p>", data_lake_object_array);
336  return data_lake_object_array;
337 }
338 
340 {
341  mcl_bool_t is_for_azure = MCL_FALSE;
342  mcl_int32_t index = 0;
343  mcl_size_t url_length;
344 
345  MCL_DEBUG_ENTRY("data_lake_object_t *object = <%p>", object);
346 
347  url_length = mcl_string_util_strlen(object->signed_url);
348 
349  while (('.' != object->signed_url[index++]) && (index < url_length))
350  {
351  // Empty body.
352  }
353 
354  if ((index < (url_length - sizeof(azure_storage_url))) &&
356  {
357  is_for_azure = MCL_TRUE;
358  }
359 
360  MCL_DEBUG_LEAVE("retVal = <%d>", (int)is_for_azure);
361  return is_for_azure;
362 }
MCL_CORE_EXPORT mcl_error_t mcl_http_request_initialize(mcl_http_request_t **http_request)
char * access_token
Access token.
mcl_error_t data_lake_processor_generate_upload_url(data_lake_processor_t *processor, data_lake_object_t *object, const char *subtenant_id)
char * signed_url
Signed URL for the object.
static const char string_identifier[]
static const char content_type[]
static const char azure_storage_url[]
size_t mcl_size_t
static mcl_bool_t _check_url_if_it_is_for_azure_storage(data_lake_object_t *object)
int32_t mcl_int32_t
E_MCL_HTTP_STATUS_CODE status_code
MCL_OK
mcl_size_t payload_size
#define SINGLE_OBJECT_SIZE
MCL_HTTP_REQUEST_PARAMETER_BODY_SIZE
MCL_CORE_EXPORT mcl_error_t mcl_http_request_set_parameter(mcl_http_request_t *http_request, E_MCL_HTTP_REQUEST_PARAMETER parameter, const void *value)
mcl_int32_t mcl_error_t
char * upload_url_generation_url
Endpoint to generate upload URL.
MCL_HTTP_REQUEST_PARAMETER_STREAM_DATA
Data lake configuration module header file.
#define MCL_DEBUG_ENTRY(...)
MCL_HTTP_PUT
MCL_HTTP_REQUEST_PARAMETER_STREAM_CALLBACK
static const char azure_header_block_blob[]
#define MCL_FALSE
mcl_error_t data_lake_json_match_signed_urls_with_objects(data_lake_object_t **object_array, mcl_size_t array_size, char *json, mcl_size_t json_size, mcl_size_t client_id_length)
mcl_error_t data_lake_processor_upload(data_lake_processor_t *processor, data_lake_object_t *object)
MCL_CORE_EXPORT mcl_size_t mcl_string_util_strlen(const char *buffer)
MCL_CORE_EXPORT mcl_error_t mcl_list_next(mcl_list_t *list, mcl_list_node_t **node)
MCL_CORE_EXPORT mcl_error_t mcl_http_request_add_header(mcl_http_request_t *http_request, const char *header_name, const char *header_value)
Data lake object module header file.
E_MCL_HTTP_METHOD
#define MCL_NULL
MCL_HTTP_REQUEST_PARAMETER_URL
static const char bearer_format[]
#define MCL_ERROR(...)
static mcl_error_t _generate_upload_urls(data_lake_processor_t *processor, data_lake_object_t **object_array, mcl_size_t array_size, const char *subtenant_id)
mcl_error_t data_lake_json_from_objects(data_lake_object_t **object_array, mcl_size_t array_size, const char *client_id, const char *subtenant_id, char **json)
mcl_error_t data_lake_object_validate(data_lake_object_t *object)
#define MCL_FREE(p)
const char * client_id
Client id.
mcl_size_t size
Size of the object.
static void _clear_signed_urls(data_lake_object_t **object_array, mcl_size_t array_size)
MCL_CORE_EXPORT void mcl_http_request_destroy(mcl_http_request_t **http_request)
mcl_uint8_t * payload
mcl_data_lake_upload_callback upload_callback
Callback used by http client to copy object data to http request.
static const char application_json[]
MCL_HTTP_REQUEST_PARAMETER_BODY
mcl_error_t data_lake_processor_generate_upload_urls(data_lake_processor_t *processor, mcl_list_t *object_list, const char *subtenant_id)
static mcl_error_t _add_authorization_header(mcl_http_request_t *request, const char *access_token)
MCL_CORE_EXPORT mcl_error_t mcl_http_response_get_status(mcl_http_response_t *http_response)
MCL_CORE_EXPORT mcl_error_t mcl_string_util_snprintf(char *string, mcl_size_t length, const char *format,...)
Data lake json module header file.
MCL_CORE_EXPORT mcl_error_t mcl_http_client_send(mcl_http_client_t *http_client, mcl_http_request_t *http_request, mcl_http_response_t **http_response)
MCL_HTTP_POST
mcl_uint8_t mcl_bool_t
MCL_HTTP_STATUS_CODE_SUCCESS
struct mcl_data_lake_object_t mcl_data_lake_object_t
MCL_CORE_EXPORT void mcl_http_response_destroy(mcl_http_response_t **http_response)
MCL_OUT_OF_MEMORY
#define MCL_NULL_CHAR_SIZE
mcl_size_t count
#define MCL_MALLOC(bytes)
Data lake processor module header file.
MCL_CORE_EXPORT mcl_bool_t mcl_string_util_memcmp(const void *block_1, const void *block_2, mcl_size_t count)
#define MCL_DEBUG_LEAVE(...)
#define MCL_TRUE
static data_lake_object_t ** _data_lake_object_list_to_array(mcl_list_t *object_list)
MCL_HTTP_REQUEST_PARAMETER_METHOD
mcl_http_client_t * http_client
Http client handle.
Data lake common module interface header file.
void * user_context
Context which will be passed to the callback.
MCL_HTTP_STATUS_CODE_CREATED
#define MCL_INFO(...)
static const char azure_header_blob_type[]