mirror of
https://github.com/correl/openapi-core.git
synced 2024-11-25 11:09:53 +00:00
deserialize data form media type
This commit is contained in:
parent
7a83f019c5
commit
852c081068
3 changed files with 61 additions and 5 deletions
|
@ -1,4 +1,6 @@
|
||||||
from openapi_core.deserializing.media_types.util import json_loads, form_loads
|
from openapi_core.deserializing.media_types.util import (
|
||||||
|
json_loads, urlencoded_form_loads, data_form_loads,
|
||||||
|
)
|
||||||
|
|
||||||
from openapi_core.deserializing.media_types.deserializers import (
|
from openapi_core.deserializing.media_types.deserializers import (
|
||||||
PrimitiveDeserializer,
|
PrimitiveDeserializer,
|
||||||
|
@ -9,7 +11,8 @@ class MediaTypeDeserializersFactory(object):
|
||||||
|
|
||||||
MEDIA_TYPE_DESERIALIZERS = {
|
MEDIA_TYPE_DESERIALIZERS = {
|
||||||
'application/json': json_loads,
|
'application/json': json_loads,
|
||||||
'application/x-www-form-urlencoded': form_loads,
|
'application/x-www-form-urlencoded': urlencoded_form_loads,
|
||||||
|
'multipart/form-data': data_form_loads,
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, custom_deserializers=None):
|
def __init__(self, custom_deserializers=None):
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
from email.parser import BytesParser
|
||||||
from json import loads
|
from json import loads
|
||||||
|
|
||||||
from six import binary_type
|
from six import binary_type
|
||||||
|
@ -11,5 +12,19 @@ def json_loads(value):
|
||||||
return loads(value)
|
return loads(value)
|
||||||
|
|
||||||
|
|
||||||
def form_loads(value):
|
def urlencoded_form_loads(value):
|
||||||
return dict(parse_qsl(value))
|
return dict(parse_qsl(value))
|
||||||
|
|
||||||
|
|
||||||
|
def data_form_loads(value):
|
||||||
|
if issubclass(type(value), str):
|
||||||
|
value = value.encode()
|
||||||
|
parser = BytesParser()
|
||||||
|
parts = parser.parsebytes(value)
|
||||||
|
return dict(
|
||||||
|
(
|
||||||
|
part.get_param('name', header='content-disposition'),
|
||||||
|
part.get_payload(decode=True),
|
||||||
|
)
|
||||||
|
for part in parts.get_payload()
|
||||||
|
)
|
||||||
|
|
|
@ -1,3 +1,6 @@
|
||||||
|
from email.mime.multipart import MIMEMultipart
|
||||||
|
from email.mime.nonmultipart import MIMENonMultipart
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from openapi_core.deserializing.exceptions import DeserializeError
|
from openapi_core.deserializing.exceptions import DeserializeError
|
||||||
|
@ -7,6 +10,24 @@ from openapi_core.deserializing.media_types.factories import (
|
||||||
from openapi_core.schema.media_types.models import MediaType
|
from openapi_core.schema.media_types.models import MediaType
|
||||||
|
|
||||||
|
|
||||||
|
class MIMEFormdata(MIMENonMultipart):
|
||||||
|
def __init__(self, keyname, *args, **kwargs):
|
||||||
|
super(MIMEFormdata, self).__init__(*args, **kwargs)
|
||||||
|
self.add_header(
|
||||||
|
"Content-Disposition", "form-data; name=\"%s\"" % keyname)
|
||||||
|
|
||||||
|
|
||||||
|
def encode_multipart_formdata(fields):
|
||||||
|
m = MIMEMultipart("form-data")
|
||||||
|
|
||||||
|
for field, value in fields.items():
|
||||||
|
data = MIMEFormdata(field, "text", "plain")
|
||||||
|
data.set_payload(value)
|
||||||
|
m.attach(data)
|
||||||
|
|
||||||
|
return m
|
||||||
|
|
||||||
|
|
||||||
class TestMediaTypeDeserializer(object):
|
class TestMediaTypeDeserializer(object):
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
|
@ -31,7 +52,7 @@ class TestMediaTypeDeserializer(object):
|
||||||
|
|
||||||
assert result == {}
|
assert result == {}
|
||||||
|
|
||||||
def test_form_urlencoded_empty(self, deserializer_factory):
|
def test_urlencoded_form_empty(self, deserializer_factory):
|
||||||
media_type = MediaType('application/x-www-form-urlencoded')
|
media_type = MediaType('application/x-www-form-urlencoded')
|
||||||
value = ''
|
value = ''
|
||||||
|
|
||||||
|
@ -39,7 +60,7 @@ class TestMediaTypeDeserializer(object):
|
||||||
|
|
||||||
assert result == {}
|
assert result == {}
|
||||||
|
|
||||||
def test_form_urlencoded_simple(self, deserializer_factory):
|
def test_urlencoded_form_simple(self, deserializer_factory):
|
||||||
media_type = MediaType('application/x-www-form-urlencoded')
|
media_type = MediaType('application/x-www-form-urlencoded')
|
||||||
value = 'param1=test'
|
value = 'param1=test'
|
||||||
|
|
||||||
|
@ -47,6 +68,23 @@ class TestMediaTypeDeserializer(object):
|
||||||
|
|
||||||
assert result == {'param1': 'test'}
|
assert result == {'param1': 'test'}
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('value', [b'', ''])
|
||||||
|
def test_data_form_empty(self, deserializer_factory, value):
|
||||||
|
media_type = MediaType('multipart/form-data')
|
||||||
|
|
||||||
|
result = deserializer_factory(media_type)(value)
|
||||||
|
|
||||||
|
assert result == {}
|
||||||
|
|
||||||
|
def test_data_form_simple(self, deserializer_factory):
|
||||||
|
media_type = MediaType('multipart/form-data')
|
||||||
|
formdata = encode_multipart_formdata({'param1': 'test'})
|
||||||
|
value = str(formdata)
|
||||||
|
|
||||||
|
result = deserializer_factory(media_type)(value)
|
||||||
|
|
||||||
|
assert result == {'param1': b'test'}
|
||||||
|
|
||||||
def test_custom_simple(self, deserializer_factory):
|
def test_custom_simple(self, deserializer_factory):
|
||||||
custom_mimetype = 'application/custom'
|
custom_mimetype = 'application/custom'
|
||||||
media_type = MediaType(custom_mimetype)
|
media_type = MediaType(custom_mimetype)
|
||||||
|
|
Loading…
Reference in a new issue