Merge pull request #43 from dave-shawley/form-encoding

Add transcoder for `application/x-www-formurlencoded`
This commit is contained in:
Andrew Rabert 2021-10-27 11:21:08 -04:00 committed by GitHub
commit 776f23fee5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 445 additions and 8 deletions

View file

@ -54,6 +54,12 @@ Bundled Transcoders
.. autoclass:: MsgPackTranscoder
:members:
.. autoclass:: FormUrlEncodedTranscoder
:members:
.. autoclass:: FormUrlEncodingOptions
:members:
.. _type-info:
Python Type Information

View file

@ -3,11 +3,16 @@ Version History
:compare:`Next <3.0.4...master>`
--------------------------------
- Add a transcoder for `application/x-www-formurlencoded`_
- Add type annotations (see :ref:`type-info`)
- Return a "406 Not Acceptable" if the :http:header:`Accept` header values cannot be matched
and there is no default content type configured
- Deprecate not having a default content type configured
- Fail gracefully when a transcoder does not exist for the default content type
- Fail gracefully when a transcoder raises a :exc:`TypeError` or :exc:`ValueError` when encoding
the response
.. _application/x-www-formurlencoded: https://url.spec.whatwg.org/#application/x-www-form-urlencoded
:compare:`3.0.4 <3.0.3...3.0.4>` (2 Nov 2020)
---------------------------------------------

View file

@ -87,3 +87,6 @@ exclude = build,env,.eggs
[mypy]
mypy_path = typestubs
strict = True
[yapf]
allow_split_before_dict_value = False

View file

@ -423,8 +423,15 @@ class ContentMixin(web.RequestHandler):
settings.default_content_type)
raise web.HTTPError(500)
else:
content_type, data_bytes = handler.to_bytes(body)
if set_content_type:
self.set_header('Content-Type', content_type)
self.add_header('Vary', 'Accept')
self.write(data_bytes)
try:
content_type, data_bytes = handler.to_bytes(body)
except (TypeError, ValueError) as e:
self._logger.error(
'selected transcoder (%s) failed to encode response '
'body: %s', handler.__class__.__name__, e)
raise web.HTTPError(500, reason='Response Encoding Failure')
else:
if set_content_type:
self.set_header('Content-Type', content_type)
self.add_header('Vary', 'Accept')
self.write(data_bytes)

View file

@ -3,13 +3,17 @@ Bundled media type transcoders.
- :class:`.JSONTranscoder` implements JSON encoding/decoding
- :class:`.MsgPackTranscoder` implements msgpack encoding/decoding
- :class:`.FormUrlEncodedTranscoder` implements the venerable form encoding
"""
from __future__ import annotations
import base64
import dataclasses
import json
import string
import typing
import urllib.parse
import uuid
import collections.abc
@ -21,6 +25,14 @@ except ImportError: # pragma: no cover
from sprockets.mixins.mediatype import handlers, type_info
_FORM_URLENCODING = {c: '%{:02X}'.format(c) for c in range(0, 255)}
_FORM_URLENCODING.update({ord(c): c for c in string.ascii_letters})
_FORM_URLENCODING.update({ord(c): c for c in string.digits})
_FORM_URLENCODING.update({ord(c): c for c in '*-_.'})
_FORM_URLENCODING_PLUS = _FORM_URLENCODING.copy()
_FORM_URLENCODING_PLUS[ord(' ')] = '+'
class JSONTranscoder(handlers.TextContentHandler):
"""
@ -238,3 +250,226 @@ class MsgPackTranscoder(handlers.BinaryContentHandler):
raise TypeError('{} is not msgpackable'.format(
datum.__class__.__name__))
@dataclasses.dataclass
class FormUrlEncodingOptions:
"""Configuration knobs for :class:`.FormUrlEncodedTranscoder`"""
encoding: str = 'utf-8'
"""Encoding use when generating the byte stream from character data."""
encode_sequences: bool = False
"""Encode sequence values as multiple name=value instances."""
literal_mapping: dict[typing.Literal[None, True, False],
str] = dataclasses.field(default_factory=lambda: {
None: '',
True: 'true',
False: 'false'
})
"""Mapping from supported literal values to strings."""
space_as_plus: bool = False
"""Quote spaces as ``%20`` or ``+``."""
class FormUrlEncodedTranscoder:
"""Opinionated transcoder for the venerable x-www-formurlencoded.
:param encoding_options: keyword parameters are used to initialize
:class:`FormUrlEncodingOptions`
This transcoder implements transcoding according to the current
W3C documentation. The encoding interface takes mappings or
sequences of pairs and encodes both the name and value. The
following table describes how each supported type is encoded.
+----------------------------+---------------------------------------+
| Value / Type | Encoding |
+============================+=======================================+
| character strings | UTF-8 codepoints before percent- |
| | encoding the resulting bytes |
+----------------------------+---------------------------------------+
| space character | ``%20`` or ``+`` |
+----------------------------+---------------------------------------+
| :data:`False` | ``false`` |
+----------------------------+---------------------------------------+
| :data:`True` | ``true`` |
+----------------------------+---------------------------------------+
| :data:`None` | the empty string |
+----------------------------+---------------------------------------+
| numbers | ``str(n)`` |
+----------------------------+---------------------------------------+
| byte sequences | percent-encoded bytes |
+----------------------------+---------------------------------------+
| :class:`uuid.UUID` | ``str(u)`` |
+----------------------------+---------------------------------------+
| :class:`datetime.datetime` | result of calling |
| | :meth:`~datetime.datetime.isoformat` |
+----------------------------+---------------------------------------+
https://url.spec.whatwg.org/#application/x-www-form-urlencoded
.. warning::
Types that are not explicitly mentioned above will result in
:meth:`to_bytes` simply calling ``str(value)`` and encoding
the result. This causes nested sequences to be encoded as
their ``repr``. For example, encoding ``{'a': [1, 2]}`` will
result in ``a=%5B1%2C%202%5D``. This matches what
:func:`urllib.parse.urlencode` does by default.
Better support for sequence values can be enabled by setting
the :attr:`~FormUrlEncodingOptions.encode_sequences` attribute
of :attr:`.options`. This mimics the ``doseq`` parameter of
:func:`urllib,parse.urlencode`.
.. attribute:: options
:type: FormUrlEncodingOptions
Controls the behavior of the transcoder
"""
content_type = 'application/x-www-formurlencoded'
def __init__(self, **encoding_options: typing.Any) -> None:
self.options = FormUrlEncodingOptions(**encoding_options)
def to_bytes(
self,
inst_data: type_info.Serializable,
encoding: typing.Optional[str] = None) -> typing.Tuple[str, bytes]:
"""Serialize `inst_data` into a byte stream and content type spec.
:param inst_data: the data to serialize
:param encoding: optional encoding override
Serialization is implemented as described in the W3C
`urlencoded serialization`_ algorithm. The :attr:`.options`
attribute controls the configurable details of the encoding
process.
The character encoding can be further overridden by specifying the
`encoding` parameter.
:returns: tuple of the content type and the resulting bytes
:raises: :exc:`TypeError` if a supplied value cannot be serialized
.. _urlencoded serialization: https://url.spec.whatwg.org/
#urlencoded-serializing
"""
# Select the appropriate encoding table and use the default
# character encoding if necessary. Binding these to local
# names removes branches from the inner loop.
chr_map: typing.Mapping[int, str]
chr_map = (_FORM_URLENCODING_PLUS
if self.options.space_as_plus else _FORM_URLENCODING)
if encoding is None:
encoding = self.options.encoding
# Generate a sequence of name+value tuples to encode or
# directly encode primitives
try:
tuples = self._convert_to_tuple_sequence(inst_data)
except TypeError:
# hopefully this is a primitive ... if not then the
# call to _encode will fail below
tuples = [(inst_data, None)]
prefix = '' # another micro-optimization
buf = []
for name, value in tuples:
buf.append(prefix)
buf.extend(self._encode(name, chr_map, encoding))
if value is not None:
buf.append('=')
buf.extend(self._encode(value, chr_map, encoding))
prefix = '&'
encoded = ''.join(buf)
return self.content_type, encoded.encode('ascii')
def from_bytes(
self,
data_bytes: bytes,
encoding: typing.Optional[str] = None) -> type_info.Deserialized:
"""Deserialize `bytes` into a Python object instance.
:param data_bytes: byte string to deserialize
:param encoding: optional encoding override
Deserialization is implemented according to the W3C
`urlencoded deserialization`_ algorithm. The :attr:`.options`
attribute controls the configurable details of the encoding
process.
:returns: the decoded Python object
.. _urlencoded deserialization: https://url.spec.whatwg.org/
#urlencoded-parsing
"""
dequote = (urllib.parse.unquote_plus
if self.options.space_as_plus else urllib.parse.unquote)
if encoding is None:
encoding = self.options.encoding
output = []
for part in data_bytes.decode('ascii').split('&'):
if not part:
continue
name, eq_present, value = part.partition('=')
name = dequote(name, encoding=encoding)
if eq_present:
output.append((name, dequote(value, encoding=encoding)))
else:
output.append((name, ''))
return dict(output)
def _encode(self, datum: typing.Union[bool, None, float, int, str,
type_info.DefinesIsoFormat],
char_map: typing.Mapping[int, str], encoding: str) -> str:
if isinstance(datum, str):
pass # optimization: skip additional checks for strings
elif (isinstance(datum, (float, int, str, uuid.UUID))
and not isinstance(datum, bool)):
datum = str(datum)
elif (isinstance(datum, collections.abc.Hashable)
and datum in self.options.literal_mapping):
# the isinstance Hashable check confuses mypy
datum = self.options.literal_mapping[datum] # type: ignore
elif isinstance(datum, (bytearray, bytes, memoryview)):
return ''.join(char_map[c] for c in datum)
elif isinstance(datum, type_info.DefinesIsoFormat):
datum = datum.isoformat()
else:
datum = str(datum)
return ''.join(char_map[c] for c in datum.encode(encoding))
def _convert_to_tuple_sequence(
self, value: type_info.Serializable
) -> typing.Iterable[typing.Tuple[typing.Any, typing.Any]]:
tuples: typing.Iterable[typing.Tuple[typing.Any, typing.Any]]
if isinstance(value, collections.abc.Mapping):
tuples = value.items()
else:
try:
tuples = [(a, b) for a, b in value] # type: ignore
except (TypeError, ValueError):
raise TypeError('Cannot convert value to sequence of tuples')
if self.options.encode_sequences:
out_tuples = []
for a, b in tuples:
if (not isinstance(b, (bytes, bytearray, memoryview, str))
and isinstance(b, collections.abc.Iterable)):
for value in b:
out_tuples.append((a, value))
else:
out_tuples.append((a, b))
tuples = out_tuples
return tuples

View file

@ -4,13 +4,14 @@ import typing
import uuid
try:
from typing import Protocol
from typing import Protocol, runtime_checkable
except ImportError:
# "ignore" is required to avoid an incompatible import
# error due to different bindings of _SpecialForm
from typing_extensions import Protocol # type: ignore
from typing_extensions import Protocol, runtime_checkable # type: ignore
@runtime_checkable
class DefinesIsoFormat(Protocol):
"""An object that has an isoformat method."""
def isoformat(self) -> str:
@ -24,6 +25,10 @@ class HasSettings(Protocol):
"""Application settings."""
SerializablePrimitives = (type(None), bool, bytearray, bytes, float, int,
memoryview, str, uuid.UUID)
"""Use this with isinstance to identify simple values."""
Serializable = typing.Union[DefinesIsoFormat, None, bool, bytearray, bytes,
float, int, memoryview, str, typing.Mapping,
typing.Sequence, typing.Set, uuid.UUID]

178
tests.py
View file

@ -1,6 +1,7 @@
import base64
import datetime
import json
import math
import os
import pickle
import struct
@ -12,7 +13,8 @@ from ietfparse import algorithms
from tornado import httputil, testing, web
import umsgpack
from sprockets.mixins.mediatype import content, handlers, transcoders
from sprockets.mixins.mediatype import (content, handlers, transcoders,
type_info)
import examples
@ -175,6 +177,35 @@ class SendResponseTests(testing.AsyncHTTPTestCase):
self.assertEqual('application/foo+json',
response.headers.get('Content-Type'))
def test_that_transcoder_failures_result_in_500(self):
class FailingTranscoder:
content_type = 'application/vnd.com.example.bad'
def __init__(self):
self.exc_class = TypeError
def to_bytes(self, inst_data, encoding=None):
raise self.exc_class('I always fail at this')
def from_bytes(self, data_bytes, encoding=None):
return {}
transcoder = FailingTranscoder()
content.add_transcoder(self.application, transcoder)
for _ in range(2):
response = self.fetch(
'/',
method='POST',
body=b'{}',
headers={
'Accept': 'application/vnd.com.example.bad',
'Content-Type': 'application/json',
},
)
self.assertEqual(500, response.code)
self.assertEqual('Response Encoding Failure', response.reason)
transcoder.exc_class = ValueError
class GetRequestBodyTests(testing.AsyncHTTPTestCase):
def setUp(self):
@ -520,3 +551,148 @@ class MsgPackTranscoderTests(unittest.TestCase):
new_callable=lambda: None):
with self.assertRaises(RuntimeError):
transcoders.MsgPackTranscoder()
class FormUrlEncodingTranscoderTests(unittest.TestCase):
transcoder: type_info.Transcoder
def setUp(self):
super().setUp()
self.transcoder = transcoders.FormUrlEncodedTranscoder()
def test_simple_deserialization(self):
body = self.transcoder.from_bytes(
b'number=12&boolean=true&null=null&string=anything%20really&empty='
)
self.assertEqual(body['number'], '12')
self.assertEqual(body['boolean'], 'true')
self.assertEqual(body['empty'], '')
self.assertEqual(body['null'], 'null')
self.assertEqual(body['string'], 'anything really')
def test_deserialization_edge_cases(self):
body = self.transcoder.from_bytes(b'')
self.assertEqual({}, body)
body = self.transcoder.from_bytes(b'&')
self.assertEqual({}, body)
body = self.transcoder.from_bytes(b'empty&&=no-name&no-value=')
self.assertEqual({'empty': '', '': 'no-name', 'no-value': ''}, body)
body = self.transcoder.from_bytes(b'repeated=1&repeated=2')
self.assertEqual({'repeated': '2'}, body)
def test_that_deserialization_encoding_can_be_overridden(self):
body = self.transcoder.from_bytes(b'kolor=%bf%F3%b3ty',
encoding='iso-8859-2')
self.assertEqual({'kolor': 'żółty'}, body)
def test_simple_serialization(self):
now = datetime.datetime.now()
id_val = uuid.uuid4()
content_type, result = self.transcoder.to_bytes({
'integer': 12,
'float': math.pi,
'string': 'percent quoted',
'datetime': now,
'id': id_val,
})
self.assertEqual(content_type, 'application/x-www-formurlencoded')
self.assertEqual(
result.decode(), '&'.join([
'integer=12',
f'float={math.pi}',
'string=percent%20quoted',
'datetime=' + now.isoformat().replace(':', '%3A'),
f'id={id_val}',
]))
def test_that_serialization_encoding_can_be_overridden(self):
_, result = self.transcoder.to_bytes([('kolor', 'żółty')],
encoding='iso-8859-2')
self.assertEqual(b'kolor=%bf%f3%b3ty', result.lower())
def test_serialization_edge_cases(self):
_, result = self.transcoder.to_bytes([
('', ''),
('', True),
('', False),
('', None),
('name', None),
])
self.assertEqual(b'=&=true&=false&&name', result)
def test_serialization_using_plusses(self):
self.transcoder: transcoders.FormUrlEncodedTranscoder
self.transcoder.options.space_as_plus = True
_, result = self.transcoder.to_bytes({'value': 'with space'})
self.assertEqual(b'value=with+space', result)
self.transcoder.options.space_as_plus = False
_, result = self.transcoder.to_bytes({'value': 'with space'})
self.assertEqual(b'value=with%20space', result)
def test_that_serializing_unsupported_types_stringifies(self):
obj = object()
# quick & dirty URL encoding
expected = str(obj).translate({0x20: '%20', 0x3C: '%3C', 0x3E: '%3E'})
_, result = self.transcoder.to_bytes({'unsupported': obj})
self.assertEqual(f'unsupported={expected}'.encode(), result)
def test_that_required_octets_are_encoded(self):
# build the set of all characters required to be encoded by
# https://url.spec.whatwg.org/#percent-encoded-bytes
pct_chrs = typing.cast(typing.Set[str], set())
pct_chrs.update({c for c in ' "#<>'}) # query set
pct_chrs.update({c for c in '?`{}'}) # path set
pct_chrs.update({c for c in '/:;=@[^|'}) # userinfo set
pct_chrs.update({c for c in '$%&+,'}) # component set
pct_chrs.update({c for c in "!'()~"}) # formurlencoding set
test_string = ''.join(pct_chrs)
expected = ''.join('%{:02X}'.format(ord(c)) for c in test_string)
expected = f'test_string={expected}'.encode()
_, result = self.transcoder.to_bytes({'test_string': test_string})
self.assertEqual(expected, result)
def test_serialization_of_primitives(self):
id_val = uuid.uuid4()
expectations = {
None: b'',
'a string': b'a%20string',
10: b'10',
2.3: str(2.3).encode(),
True: b'true',
False: b'false',
b'\xfe\xed\xfa\xce': b'%FE%ED%FA%CE',
memoryview(b'\xfe\xed\xfa\xce'): b'%FE%ED%FA%CE',
id_val: str(id_val).encode(),
}
for value, expected in expectations.items():
_, result = self.transcoder.to_bytes(value)
self.assertEqual(expected, result)
def test_serialization_with_empty_literal_map(self):
self.transcoder: transcoders.FormUrlEncodedTranscoder
self.transcoder.options.literal_mapping.clear()
for value in {None, True, False}:
_, result = self.transcoder.to_bytes(value)
self.assertEqual(str(value).encode(), result)
def test_serialization_of_sequences(self):
self.transcoder: transcoders.FormUrlEncodedTranscoder
value = {'list': [1, 2], 'tuple': (1, 2), 'set': {1, 2}, 'str': 'val'}
self.transcoder.options.encode_sequences = False
_, result = self.transcoder.to_bytes(value)
self.assertEqual((b'list=%5B1%2C%202%5D&tuple=%281%2C%202%29'
b'&set=%7B1%2C%202%7D&str=val'), result)
self.transcoder.options.encode_sequences = True
_, result = self.transcoder.to_bytes(value)
self.assertEqual(b'list=1&list=2&tuple=1&tuple=2&set=1&set=2&str=val',
result)