Add application/x-www-formurlencoded transcoder.

I ended up not using urllib.parse functions since they do not implement
the specification.  The only difference is that the specification
requires that "~" is encoded.

NB - this commit is incomplete since it does not handle calling the
transcoder on simple objects.  mypy will quite correctly fail.
This commit is contained in:
Dave Shawley 2021-10-07 06:44:38 -04:00
parent 4dbc74076d
commit 716bff0d7f
No known key found for this signature in database
GPG key ID: F41A8A99298F8EED
5 changed files with 279 additions and 1 deletions

View file

@ -54,6 +54,12 @@ Bundled Transcoders
.. autoclass:: MsgPackTranscoder
:members:
.. autoclass:: FormUrlEncodedTranscoder
:members:
.. autoclass:: FormUrlEncodingOptions
:members:
.. _type-info:
Python Type Information

View file

@ -3,12 +3,15 @@ Version History
:compare:`Next <3.0.4...master>`
--------------------------------
- Add a transcoder for `application/x-www-formurlencoded`_
- Add type annotations (see :ref:`type-info`)
- Return a "406 Not Acceptable" if the :http:header:`Accept` header values cannot be matched
and there is no default content type configured
- Deprecate not having a default content type configured
- Fail gracefully when a transcoder does not exist for the default content type
.. _application/x-www-formurlencoded: https://url.spec.whatwg.org/#application/x-www-form-urlencoded
:compare:`3.0.4 <3.0.3...3.0.4>` (2 Nov 2020)
---------------------------------------------
- Return a "400 Bad Request" when an invalid Content-Type header is received

View file

@ -87,3 +87,6 @@ exclude = build,env,.eggs
[mypy]
mypy_path = typestubs
strict = True
[yapf]
allow_split_before_dict_value = False

View file

@ -3,13 +3,17 @@ Bundled media type transcoders.
- :class:`.JSONTranscoder` implements JSON encoding/decoding
- :class:`.MsgPackTranscoder` implements msgpack encoding/decoding
- :class:`.FormUrlEncodedTranscoder` implements the venerable form encoding
"""
from __future__ import annotations
import base64
import dataclasses
import json
import string
import typing
import urllib.parse
import uuid
import collections.abc
@ -21,6 +25,14 @@ except ImportError: # pragma: no cover
from sprockets.mixins.mediatype import handlers, type_info
_FORM_URLENCODING = {c: '%{:02X}'.format(c) for c in range(0, 255)}
_FORM_URLENCODING.update({ord(c): c for c in string.ascii_letters})
_FORM_URLENCODING.update({ord(c): c for c in string.digits})
_FORM_URLENCODING.update({ord(c): c for c in '*-_.'})
_FORM_URLENCODING_PLUS = _FORM_URLENCODING.copy()
_FORM_URLENCODING_PLUS[ord(' ')] = '+'
class JSONTranscoder(handlers.TextContentHandler):
"""
@ -238,3 +250,156 @@ class MsgPackTranscoder(handlers.BinaryContentHandler):
raise TypeError('{} is not msgpackable'.format(
datum.__class__.__name__))
@dataclasses.dataclass
class FormUrlEncodingOptions:
"""Configuration knobs for :class:`.FormUrlEncodedTranscoder`"""
encoding: str = 'utf-8'
"""Encoding use when generating the byte stream from character data."""
literal_mapping: dict[typing.Literal[None, True, False],
str] = dataclasses.field(default_factory=lambda: {
None: '',
True: 'true',
False: 'false'
})
"""Mapping from supported literal values to strings."""
space_as_plus: bool = False
"""Quote spaces as ``%20`` or ``+``."""
class FormUrlEncodedTranscoder:
"""Opinionated transcoder for the venerable x-www-formurlencoded.
This transcoder implements transcoding according to the current
W3C documentation.
* character strings are encoded as UTF-8 codepoints before
percent-encoding the resulting bytes
* the space character is represented as ``%20``
* :data:`False` is represented as ``false``
* :data:`True` is represented as ``true``
* :data:`None` is represented as the empty string
Some of the opinions can be changed by modifying ``self.options``.
https://url.spec.whatwg.org/#application/x-www-form-urlencoded
.. attribute:: options
:type: FormUrlEncodingOptions
Controls the behavior of the transcoder
"""
content_type = 'application/x-www-formurlencoded'
def __init__(self) -> None:
self.options = FormUrlEncodingOptions()
def to_bytes(
self,
inst_data: type_info.Serializable,
encoding: typing.Optional[str] = None) -> typing.Tuple[str, bytes]:
"""Serialize `inst_data` into a byte stream and content type spec.
:param inst_data: the data to serialize
:param encoding: optional encoding override
Serialization is implemented as described in the W3C
`urlencoded serialization`_ algorithm. The :attr:`.options`
attribute controls the configurable details of the encoding
process.
The character encoding can be further overridden by specifying the
`encoding` parameter.
:returns: tuple of the content type and the resulting bytes
:raises: :exc:`TypeError` if a supplied value cannot be serialized
.. _urlencoded serialization: https://url.spec.whatwg.org/
#urlencoded-serializing
"""
# Generate a sequence of name+value tuples to encode
if isinstance(inst_data, collections.abc.Mapping):
tuples = ((self._normalize(a), self._normalize(b))
for a, b in inst_data.items())
else:
tuples = ((self._normalize(a), self._normalize(b))
for a, b in inst_data)
# Encode each pair and run the encoded form through the
# appropriate octet to string mapping table
chr_map: typing.Mapping[int, str]
chr_map = (_FORM_URLENCODING_PLUS
if self.options.space_as_plus else _FORM_URLENCODING)
if encoding is None:
encoding = self.options.encoding
prefix = '' # micro-optimization removes if statement from inner loop
buf = []
for name, value in tuples:
buf.append(prefix)
buf.extend(chr_map[c] for c in name.encode(encoding))
buf.append('=')
buf.extend(chr_map[c] for c in value.encode(encoding))
prefix = '&'
return self.content_type, ''.join(buf).encode('ascii')
def from_bytes(
self,
data_bytes: bytes,
encoding: typing.Optional[str] = None) -> type_info.Deserialized:
"""Deserialize `bytes` into a Python object instance.
:param data_bytes: byte string to deserialize
:param encoding: optional encoding override
Deserialization is implemented according to the W3C
`urlencoded deserialization`_ algorithm. The :attr:`.options`
attribute controls the configurable details of the encoding
process.
:returns: the decoded Python object
.. _urlencoded deserialization: https://url.spec.whatwg.org/
#urlencoded-parsing
"""
dequote = (urllib.parse.unquote_plus
if self.options.space_as_plus else urllib.parse.unquote)
if encoding is None:
encoding = self.options.encoding
output = []
for part in data_bytes.decode('ascii').split('&'):
if not part:
continue
name, eq_present, value = part.partition('=')
name = dequote(name, encoding=encoding)
if eq_present:
output.append((name, dequote(value, encoding=encoding)))
else:
output.append((name, ''))
return dict(output)
def _normalize(
self, datum: typing.Union[bool, None, float, int, str,
type_info.DefinesIsoFormat]
) -> str:
try:
datum = self.options.literal_mapping[datum] # type: ignore
except (KeyError, TypeError):
if isinstance(datum, (float, int, str)):
datum = str(datum)
elif hasattr(datum, 'isoformat'):
datum = datum.isoformat()
else:
raise TypeError(
f'{datum.__class__.__name__} is not serializable'
) from None
return datum

103
tests.py
View file

@ -1,6 +1,7 @@
import base64
import datetime
import json
import math
import os
import pickle
import struct
@ -12,7 +13,8 @@ from ietfparse import algorithms
from tornado import httputil, testing, web
import umsgpack
from sprockets.mixins.mediatype import content, handlers, transcoders
from sprockets.mixins.mediatype import (content, handlers, transcoders,
type_info)
import examples
@ -520,3 +522,102 @@ class MsgPackTranscoderTests(unittest.TestCase):
new_callable=lambda: None):
with self.assertRaises(RuntimeError):
transcoders.MsgPackTranscoder()
class FormUrlEncodingTranscoderTests(unittest.TestCase):
transcoder: type_info.Transcoder
def setUp(self):
super().setUp()
self.transcoder = transcoders.FormUrlEncodedTranscoder()
def test_simple_deserialization(self):
body = self.transcoder.from_bytes(
b'number=12&boolean=true&null=null&string=anything%20really&empty='
)
self.assertEqual(body['number'], '12')
self.assertEqual(body['boolean'], 'true')
self.assertEqual(body['empty'], '')
self.assertEqual(body['null'], 'null')
self.assertEqual(body['string'], 'anything really')
def test_deserialization_edge_cases(self):
body = self.transcoder.from_bytes(b'')
self.assertEqual({}, body)
body = self.transcoder.from_bytes(b'&')
self.assertEqual({}, body)
body = self.transcoder.from_bytes(b'empty&&=no-name&no-value=')
self.assertEqual({'empty': '', '': 'no-name', 'no-value': ''}, body)
body = self.transcoder.from_bytes(b'repeated=1&repeated=2')
self.assertEqual({'repeated': '2'}, body)
def test_that_deserialization_encoding_can_be_overridden(self):
body = self.transcoder.from_bytes(b'kolor=%bf%F3%b3ty',
encoding='iso-8859-2')
self.assertEqual({'kolor': 'żółty'}, body)
def test_simple_serialization(self):
now = datetime.datetime.now()
content_type, result = self.transcoder.to_bytes({
'integer': 12,
'float': math.pi,
'string': 'percent quoted',
'datetime': now,
})
self.assertEqual(content_type, 'application/x-www-formurlencoded')
self.assertEqual(
result.decode(), '&'.join([
'integer=12',
f'float={math.pi}',
'string=percent%20quoted',
'datetime=' + now.isoformat().replace(':', '%3A'),
]))
def test_that_serialization_encoding_can_be_overridden(self):
_, result = self.transcoder.to_bytes([('kolor', 'żółty')],
encoding='iso-8859-2')
self.assertEqual(b'kolor=%bf%f3%b3ty', result.lower())
def test_serialization_edge_cases(self):
_, result = self.transcoder.to_bytes([
('', ''),
('', True),
('', False),
('', None),
('name', None),
])
self.assertEqual(b'=&=true&=false&=&name=', result)
def test_serialization_using_plusses(self):
self.transcoder: transcoders.FormUrlEncodedTranscoder
self.transcoder.options.space_as_plus = True
_, result = self.transcoder.to_bytes({'value': 'with space'})
self.assertEqual(b'value=with+space', result)
self.transcoder.options.space_as_plus = False
_, result = self.transcoder.to_bytes({'value': 'with space'})
self.assertEqual(b'value=with%20space', result)
def test_that_serializing_unsupported_types_fails(self):
with self.assertRaises(TypeError):
self.transcoder.to_bytes({'unsupported': object()})
def test_that_required_octets_are_encoded(self):
# build the set of all characters required to be encoded by
# https://url.spec.whatwg.org/#percent-encoded-bytes
pct_chrs = typing.cast(typing.Set[str], set())
pct_chrs.update({c for c in ' "#<>'}) # query set
pct_chrs.update({c for c in '?`{}'}) # path set
pct_chrs.update({c for c in '/:;=@[^|'}) # userinfo set
pct_chrs.update({c for c in '$%&+,'}) # component set
pct_chrs.update({c for c in "!'()~"}) # formurlencoding set
test_string = ''.join(pct_chrs)
expected = ''.join('%{:02X}'.format(ord(c)) for c in test_string)
expected = f'test_string={expected}'.encode()
_, result = self.transcoder.to_bytes({'test_string': test_string})
self.assertEqual(expected, result)