From 716bff0d7f8fe728baf35584a4742700a8aa04d8 Mon Sep 17 00:00:00 2001 From: Dave Shawley Date: Thu, 7 Oct 2021 06:44:38 -0400 Subject: [PATCH] Add application/x-www-formurlencoded transcoder. I ended up not using urllib.parse functions since they do not implement the specification. The only difference is that the specification requires that "~" is encoded. NB - this commit is incomplete since it does not handle calling the transcoder on simple objects. mypy will quite correctly fail. --- docs/api.rst | 6 + docs/history.rst | 3 + setup.cfg | 3 + sprockets/mixins/mediatype/transcoders.py | 165 ++++++++++++++++++++++ tests.py | 103 +++++++++++++- 5 files changed, 279 insertions(+), 1 deletion(-) diff --git a/docs/api.rst b/docs/api.rst index b1b0f87..c324972 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -54,6 +54,12 @@ Bundled Transcoders .. autoclass:: MsgPackTranscoder :members: +.. autoclass:: FormUrlEncodedTranscoder + :members: + +.. autoclass:: FormUrlEncodingOptions + :members: + .. _type-info: Python Type Information diff --git a/docs/history.rst b/docs/history.rst index 818ea42..f02c664 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -3,12 +3,15 @@ Version History :compare:`Next <3.0.4...master>` -------------------------------- +- Add a transcoder for `application/x-www-formurlencoded`_ - Add type annotations (see :ref:`type-info`) - Return a "406 Not Acceptable" if the :http:header:`Accept` header values cannot be matched and there is no default content type configured - Deprecate not having a default content type configured - Fail gracefully when a transcoder does not exist for the default content type +.. _application/x-www-formurlencoded: https://url.spec.whatwg.org/#application/x-www-form-urlencoded + :compare:`3.0.4 <3.0.3...3.0.4>` (2 Nov 2020) --------------------------------------------- - Return a "400 Bad Request" when an invalid Content-Type header is received diff --git a/setup.cfg b/setup.cfg index 7086654..bdd866b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -87,3 +87,6 @@ exclude = build,env,.eggs [mypy] mypy_path = typestubs strict = True + +[yapf] +allow_split_before_dict_value = False diff --git a/sprockets/mixins/mediatype/transcoders.py b/sprockets/mixins/mediatype/transcoders.py index 1f8d2aa..c86891a 100644 --- a/sprockets/mixins/mediatype/transcoders.py +++ b/sprockets/mixins/mediatype/transcoders.py @@ -3,13 +3,17 @@ Bundled media type transcoders. - :class:`.JSONTranscoder` implements JSON encoding/decoding - :class:`.MsgPackTranscoder` implements msgpack encoding/decoding +- :class:`.FormUrlEncodedTranscoder` implements the venerable form encoding """ from __future__ import annotations import base64 +import dataclasses import json +import string import typing +import urllib.parse import uuid import collections.abc @@ -21,6 +25,14 @@ except ImportError: # pragma: no cover from sprockets.mixins.mediatype import handlers, type_info +_FORM_URLENCODING = {c: '%{:02X}'.format(c) for c in range(0, 255)} +_FORM_URLENCODING.update({ord(c): c for c in string.ascii_letters}) +_FORM_URLENCODING.update({ord(c): c for c in string.digits}) +_FORM_URLENCODING.update({ord(c): c for c in '*-_.'}) + +_FORM_URLENCODING_PLUS = _FORM_URLENCODING.copy() +_FORM_URLENCODING_PLUS[ord(' ')] = '+' + class JSONTranscoder(handlers.TextContentHandler): """ @@ -238,3 +250,156 @@ class MsgPackTranscoder(handlers.BinaryContentHandler): raise TypeError('{} is not msgpackable'.format( datum.__class__.__name__)) + + +@dataclasses.dataclass +class FormUrlEncodingOptions: + """Configuration knobs for :class:`.FormUrlEncodedTranscoder`""" + encoding: str = 'utf-8' + """Encoding use when generating the byte stream from character data.""" + + literal_mapping: dict[typing.Literal[None, True, False], + str] = dataclasses.field(default_factory=lambda: { + None: '', + True: 'true', + False: 'false' + }) + """Mapping from supported literal values to strings.""" + + space_as_plus: bool = False + """Quote spaces as ``%20`` or ``+``.""" + + +class FormUrlEncodedTranscoder: + """Opinionated transcoder for the venerable x-www-formurlencoded. + + This transcoder implements transcoding according to the current + W3C documentation. + + * character strings are encoded as UTF-8 codepoints before + percent-encoding the resulting bytes + * the space character is represented as ``%20`` + * :data:`False` is represented as ``false`` + * :data:`True` is represented as ``true`` + * :data:`None` is represented as the empty string + + Some of the opinions can be changed by modifying ``self.options``. + + https://url.spec.whatwg.org/#application/x-www-form-urlencoded + + .. attribute:: options + :type: FormUrlEncodingOptions + + Controls the behavior of the transcoder + + """ + content_type = 'application/x-www-formurlencoded' + + def __init__(self) -> None: + self.options = FormUrlEncodingOptions() + + def to_bytes( + self, + inst_data: type_info.Serializable, + encoding: typing.Optional[str] = None) -> typing.Tuple[str, bytes]: + """Serialize `inst_data` into a byte stream and content type spec. + + :param inst_data: the data to serialize + :param encoding: optional encoding override + + Serialization is implemented as described in the W3C + `urlencoded serialization`_ algorithm. The :attr:`.options` + attribute controls the configurable details of the encoding + process. + + The character encoding can be further overridden by specifying the + `encoding` parameter. + + :returns: tuple of the content type and the resulting bytes + :raises: :exc:`TypeError` if a supplied value cannot be serialized + + .. _urlencoded serialization: https://url.spec.whatwg.org/ + #urlencoded-serializing + + """ + # Generate a sequence of name+value tuples to encode + if isinstance(inst_data, collections.abc.Mapping): + tuples = ((self._normalize(a), self._normalize(b)) + for a, b in inst_data.items()) + else: + tuples = ((self._normalize(a), self._normalize(b)) + for a, b in inst_data) + + # Encode each pair and run the encoded form through the + # appropriate octet to string mapping table + chr_map: typing.Mapping[int, str] + chr_map = (_FORM_URLENCODING_PLUS + if self.options.space_as_plus else _FORM_URLENCODING) + if encoding is None: + encoding = self.options.encoding + prefix = '' # micro-optimization removes if statement from inner loop + buf = [] + for name, value in tuples: + buf.append(prefix) + buf.extend(chr_map[c] for c in name.encode(encoding)) + buf.append('=') + buf.extend(chr_map[c] for c in value.encode(encoding)) + prefix = '&' + + return self.content_type, ''.join(buf).encode('ascii') + + def from_bytes( + self, + data_bytes: bytes, + encoding: typing.Optional[str] = None) -> type_info.Deserialized: + """Deserialize `bytes` into a Python object instance. + + :param data_bytes: byte string to deserialize + :param encoding: optional encoding override + + Deserialization is implemented according to the W3C + `urlencoded deserialization`_ algorithm. The :attr:`.options` + attribute controls the configurable details of the encoding + process. + + :returns: the decoded Python object + + .. _urlencoded deserialization: https://url.spec.whatwg.org/ + #urlencoded-parsing + + """ + dequote = (urllib.parse.unquote_plus + if self.options.space_as_plus else urllib.parse.unquote) + if encoding is None: + encoding = self.options.encoding + + output = [] + for part in data_bytes.decode('ascii').split('&'): + if not part: + continue + name, eq_present, value = part.partition('=') + name = dequote(name, encoding=encoding) + if eq_present: + output.append((name, dequote(value, encoding=encoding))) + else: + output.append((name, '')) + + return dict(output) + + def _normalize( + self, datum: typing.Union[bool, None, float, int, str, + type_info.DefinesIsoFormat] + ) -> str: + try: + datum = self.options.literal_mapping[datum] # type: ignore + except (KeyError, TypeError): + if isinstance(datum, (float, int, str)): + datum = str(datum) + elif hasattr(datum, 'isoformat'): + datum = datum.isoformat() + else: + raise TypeError( + f'{datum.__class__.__name__} is not serializable' + ) from None + + return datum diff --git a/tests.py b/tests.py index f2ea89a..4ad5eca 100644 --- a/tests.py +++ b/tests.py @@ -1,6 +1,7 @@ import base64 import datetime import json +import math import os import pickle import struct @@ -12,7 +13,8 @@ from ietfparse import algorithms from tornado import httputil, testing, web import umsgpack -from sprockets.mixins.mediatype import content, handlers, transcoders +from sprockets.mixins.mediatype import (content, handlers, transcoders, + type_info) import examples @@ -520,3 +522,102 @@ class MsgPackTranscoderTests(unittest.TestCase): new_callable=lambda: None): with self.assertRaises(RuntimeError): transcoders.MsgPackTranscoder() + + +class FormUrlEncodingTranscoderTests(unittest.TestCase): + transcoder: type_info.Transcoder + + def setUp(self): + super().setUp() + self.transcoder = transcoders.FormUrlEncodedTranscoder() + + def test_simple_deserialization(self): + body = self.transcoder.from_bytes( + b'number=12&boolean=true&null=null&string=anything%20really&empty=' + ) + self.assertEqual(body['number'], '12') + self.assertEqual(body['boolean'], 'true') + self.assertEqual(body['empty'], '') + self.assertEqual(body['null'], 'null') + self.assertEqual(body['string'], 'anything really') + + def test_deserialization_edge_cases(self): + body = self.transcoder.from_bytes(b'') + self.assertEqual({}, body) + + body = self.transcoder.from_bytes(b'&') + self.assertEqual({}, body) + + body = self.transcoder.from_bytes(b'empty&&=no-name&no-value=') + self.assertEqual({'empty': '', '': 'no-name', 'no-value': ''}, body) + + body = self.transcoder.from_bytes(b'repeated=1&repeated=2') + self.assertEqual({'repeated': '2'}, body) + + def test_that_deserialization_encoding_can_be_overridden(self): + body = self.transcoder.from_bytes(b'kolor=%bf%F3%b3ty', + encoding='iso-8859-2') + self.assertEqual({'kolor': 'żółty'}, body) + + def test_simple_serialization(self): + now = datetime.datetime.now() + content_type, result = self.transcoder.to_bytes({ + 'integer': 12, + 'float': math.pi, + 'string': 'percent quoted', + 'datetime': now, + }) + self.assertEqual(content_type, 'application/x-www-formurlencoded') + self.assertEqual( + result.decode(), '&'.join([ + 'integer=12', + f'float={math.pi}', + 'string=percent%20quoted', + 'datetime=' + now.isoformat().replace(':', '%3A'), + ])) + + def test_that_serialization_encoding_can_be_overridden(self): + _, result = self.transcoder.to_bytes([('kolor', 'żółty')], + encoding='iso-8859-2') + self.assertEqual(b'kolor=%bf%f3%b3ty', result.lower()) + + def test_serialization_edge_cases(self): + _, result = self.transcoder.to_bytes([ + ('', ''), + ('', True), + ('', False), + ('', None), + ('name', None), + ]) + self.assertEqual(b'=&=true&=false&=&name=', result) + + def test_serialization_using_plusses(self): + self.transcoder: transcoders.FormUrlEncodedTranscoder + + self.transcoder.options.space_as_plus = True + _, result = self.transcoder.to_bytes({'value': 'with space'}) + self.assertEqual(b'value=with+space', result) + + self.transcoder.options.space_as_plus = False + _, result = self.transcoder.to_bytes({'value': 'with space'}) + self.assertEqual(b'value=with%20space', result) + + def test_that_serializing_unsupported_types_fails(self): + with self.assertRaises(TypeError): + self.transcoder.to_bytes({'unsupported': object()}) + + def test_that_required_octets_are_encoded(self): + # build the set of all characters required to be encoded by + # https://url.spec.whatwg.org/#percent-encoded-bytes + pct_chrs = typing.cast(typing.Set[str], set()) + pct_chrs.update({c for c in ' "#<>'}) # query set + pct_chrs.update({c for c in '?`{}'}) # path set + pct_chrs.update({c for c in '/:;=@[^|'}) # userinfo set + pct_chrs.update({c for c in '$%&+,'}) # component set + pct_chrs.update({c for c in "!'()~"}) # formurlencoding set + + test_string = ''.join(pct_chrs) + expected = ''.join('%{:02X}'.format(ord(c)) for c in test_string) + expected = f'test_string={expected}'.encode() + _, result = self.transcoder.to_bytes({'test_string': test_string}) + self.assertEqual(expected, result)