Source code for mindsphere_core.serialization_filter

import six
import tempfile
import datetime
import re
import os
import sys

from . import exceptions
from . import constants

PRIMITIVE_TYPES = (float, bool, bytes, six.text_type) + six.integer_types
NATIVE_TYPES_MAPPING = {
    "int": int,
    "long": int,  # noqa: F821
    "float": float,
    "str": str,
    "bool": bool,
    "date": datetime.date,
    "object": object,
}

service_model_package = ""


[docs]def deserialize(response, response_type): """Deserializes response into an object. :param response: RESTResponse object to be deserialized. :param response_type: class literal for deserialized object, or string of class name. :return: deserialized object. """ # handle file downloading # save response body into a tmp file and return the instance if response_type == "file": return __deserialize_file(response) return __deserialize(response, response_type)
[docs]def __deserialize(data, klass): """Deserializes dict, list, str into an object. :type data: object :param data: dict, list or str. :param klass: class literal, or string of class name. :return: object. """ if data is None: return None if type(klass) == str: if klass.startswith("list["): sub_kls = re.match("list\[(.*)\]", klass).group(1) return [__deserialize(sub_data, sub_kls) for sub_data in data] if klass.startswith("dict("): sub_kls = re.match("dict\(([^,]*), (.*)\)", klass).group(2) return {k: __deserialize(v, sub_kls) for k, v in six.iteritems(data)} # convert str to class if klass in NATIVE_TYPES_MAPPING: klass = NATIVE_TYPES_MAPPING[klass] elif klass == constants.DATE_TIME_TYPE: return __deserialize_datetime(data) else: global service_model_package klass = getattr(sys.modules[service_model_package], klass) if klass in PRIMITIVE_TYPES: return __deserialize_primitive(data, klass) elif klass == object: return __deserialize_object(data) elif klass == datetime.date: return __deserialize_date(data) else: return __deserialize_model(data, klass)
[docs]def __deserialize_file(response): """Deserializes body to file Saves response body into a file in a temporary folder, using the filename from the `Content-Disposition` header if provided. :param response: RESTResponse. :return: file path. """ fd, path = tempfile.mkstemp(dir=None) os.close(fd) os.remove(path) content_disposition = response.getheader("Content-Disposition") if content_disposition: filename = re.search( r'filename=[\'"]?([^\'"\s]+)[\'"]?', content_disposition ).group(1) path = os.path.join(os.path.dirname(path), filename) with open(path, "wb") as f: f.write(response.data) return path
[docs]def __deserialize_primitive(data, klass): """Deserializes string to primitive type. :param data: str. :param klass: class literal. :return: int, long, float, str, bool. """ try: return klass(data) except UnicodeEncodeError: return six.u(data) except TypeError: return data
[docs]def __deserialize_object(value): """Return a original value. :return: object. """ return value
[docs]def __deserialize_date(string): """Deserializes string to date. :param string: str. :return: date. """ try: from dateutil.parser import parse return parse(string).date() except ImportError: return string except ValueError: raise exceptions.MindsphereClientError( "While de-serializing, " "failed to parse `{0}` as date object".format(string) )
[docs]def __deserialize_datetime(string): """Deserializes string to datetime. The string should be in iso8601 datetime format. :param string: str. :return: datetime. """ try: from dateutil.parser import parse return parse(string) except ImportError: return string except ValueError: raise exceptions.MindsphereClientError( "While de-serializing, " "failed to parse `{0}` as datetime object".format(string) )
[docs]def __deserialize_model(data, klass): """Deserializes list or dict to model. :param data: dict, list. :param klass: class literal. :return: model object. """ if not klass.attribute_types and not hasattr(klass, "get_real_child_model"): return data kwargs = {} if klass.attribute_types is not None: for attr, attr_type in six.iteritems(klass.attribute_types): if ( data is not None and klass.attribute_map[attr] in data and isinstance(data, (list, dict)) ): value = data[klass.attribute_map[attr]] kwargs[attr] = __deserialize(value, attr_type) elif klass.attribute_map[attr] == constants.FIELDS: fields = {} if klass.attribute_types[attr].startswith("dict("): sub_kls = re.match("dict\(([^,]*), (.*)\)", klass.attribute_types[attr]).group(2) for k, v in six.iteritems(data): if k != constants.TIME and k != constants.STARTTIME and k != constants.ENDTIME: fields[k] = __deserialize(v, sub_kls) kwargs[attr] = fields instance = klass(**kwargs) if hasattr(instance, "get_real_child_model"): klass_name = instance.get_real_child_model(data) if klass_name: instance = __deserialize(data, klass_name) return instance
[docs]def sanitize_for_serialization(obj): """ Builds a JSON POST object. If obj is None, return None. If obj is str, int, long, float, bool, return directly. If obj is datetime.datetime, datetime. date convert to string in iso8601 format. If obj is list, sanitize each element in the list. If obj is dict, return the dict. If obj is swagger model, return the properties dict. :param obj: The data to serialize. :return: The serialized form of data. """ if obj is None: return None elif isinstance(obj, PRIMITIVE_TYPES): return obj elif isinstance(obj, list): return [sanitize_for_serialization(sub_obj) for sub_obj in obj] elif isinstance(obj, tuple): return tuple(sanitize_for_serialization(sub_obj) for sub_obj in obj) elif isinstance(obj, (datetime.datetime, datetime.date)): return obj.isoformat() elif isinstance(obj, dict): obj_dict = obj else: # Convert model obj to dict except # attributes `swagger_types`, `attribute_map` # and attributes which value is not None. # Convert attribute name to json key in # model definition for request. obj_dict = { obj.attribute_map[attr]: getattr(obj, attr) for attr, _ in six.iteritems(obj.attribute_types) if getattr(obj, attr) is not None } if hasattr(obj, 'fields') and obj_dict[constants.FIELDS] is not None: for key, value in obj_dict[constants.FIELDS].items(): obj_dict[key] = value del obj_dict[constants.FIELDS] return { key: sanitize_for_serialization(val) for key, val in six.iteritems(obj_dict) }
[docs]def sanitize_headers(headers): """Converts all custom header values to string. This is required because requests module only accept string type headers. """ default_header_keys = ['Content-Type', 'Authorization', 'Accept'] for key, value in headers.items(): if key not in default_header_keys and value is not None and type(value) != str: headers[key] = str(value) return headers