mirror of
https://github.com/sprockets/sprockets.mixins.mediatype.git
synced 2025-01-01 11:13:21 +00:00
Add support for encoding array.array instances
This commit is contained in:
parent
b15f131df1
commit
f5725ac6dd
4 changed files with 88 additions and 13 deletions
|
@ -8,6 +8,7 @@ Version History
|
|||
- Add support for encoding :func:`dataclasses.dataclass` decorated classes
|
||||
- Add support for encoding :class:`ipaddress.IPv4Address` and :class:`ipaddress.IPv6Address`
|
||||
- Add support for encoding :class:`pathlib.Path`
|
||||
- Add support for encoding :class:`array.array`
|
||||
- Add type annotations (see :ref:`type-info`)
|
||||
- Return a "406 Not Acceptable" if the :http:header:`Accept` header values cannot be matched
|
||||
and there is no default content type configured
|
||||
|
|
|
@ -6,6 +6,9 @@ Bundled media type transcoders.
|
|||
- :class:`.FormUrlEncodedTranscoder` implements the venerable form encoding
|
||||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import array
|
||||
import base64
|
||||
import dataclasses
|
||||
import decimal
|
||||
|
@ -122,6 +125,8 @@ class JSONTranscoder(handlers.TextContentHandler):
|
|||
+--------------------------------+------------------------------------+
|
||||
| :class:`pathlib.Path` | Same as ``str(value)`` |
|
||||
+--------------------------------+------------------------------------+
|
||||
| :class:`array.array` | Same as :meth:`array.array.tolist` |
|
||||
+--------------------------------+------------------------------------+
|
||||
|
||||
"""
|
||||
if isinstance(obj, (pathlib.Path, uuid.UUID)):
|
||||
|
@ -136,6 +141,8 @@ class JSONTranscoder(handlers.TextContentHandler):
|
|||
return float(obj)
|
||||
if dataclasses.is_dataclass(obj):
|
||||
return dataclasses.asdict(obj)
|
||||
if isinstance(obj, array.array):
|
||||
return obj.tolist()
|
||||
raise TypeError('{!r} is not JSON serializable'.format(obj))
|
||||
|
||||
|
||||
|
@ -223,6 +230,8 @@ class MsgPackTranscoder(handlers.BinaryContentHandler):
|
|||
+-----------------------------------+-------------------------------+
|
||||
| :class:`pathlib.Path` | Same as ``str(value)`` |
|
||||
+-----------------------------------+-------------------------------+
|
||||
| :class:`array.array` | `array family`_ |
|
||||
+-----------------------------------+-------------------------------+
|
||||
|
||||
.. _nil byte: https://github.com/msgpack/msgpack/blob/
|
||||
0b8f5ac67cdd130f4d4d4fe6afb839b989fdb86a/spec.md#formats-nil
|
||||
|
@ -272,9 +281,6 @@ class MsgPackTranscoder(handlers.BinaryContentHandler):
|
|||
if isinstance(datum, (bytes, str)):
|
||||
return datum
|
||||
|
||||
if isinstance(datum, (collections.abc.Sequence, collections.abc.Set)):
|
||||
return [self.normalize_datum(item) for item in datum]
|
||||
|
||||
if dataclasses.is_dataclass(datum):
|
||||
datum = dataclasses.asdict(datum)
|
||||
|
||||
|
@ -284,6 +290,9 @@ class MsgPackTranscoder(handlers.BinaryContentHandler):
|
|||
out[k] = self.normalize_datum(v)
|
||||
return out
|
||||
|
||||
if isinstance(datum, (collections.abc.Iterable, collections.abc.Set)):
|
||||
return [self.normalize_datum(item) for item in datum]
|
||||
|
||||
raise TypeError('{} is not msgpackable'.format(
|
||||
datum.__class__.__name__))
|
||||
|
||||
|
@ -352,6 +361,9 @@ class FormUrlEncodedTranscoder:
|
|||
+--------------------------------+---------------------------------------+
|
||||
| :class:`pathlib.Path` | Same as ``str(value)`` |
|
||||
+--------------------------------+---------------------------------------+
|
||||
| :class:`array.array` | Same as encoding |
|
||||
| | :meth:`array.array.tolist` |
|
||||
+--------------------------------+---------------------------------------+
|
||||
|
||||
https://url.spec.whatwg.org/#application/x-www-form-urlencoded
|
||||
|
||||
|
@ -474,27 +486,30 @@ class FormUrlEncodedTranscoder:
|
|||
return dict(output)
|
||||
|
||||
def _encode(self, datum: typing.Union[bool, None, float, int, str,
|
||||
type_info.SupportsIsoFormat],
|
||||
type_info.SupportsIsoFormat,
|
||||
array.array[typing.Any]],
|
||||
char_map: typing.Mapping[int, str], encoding: str) -> str:
|
||||
if isinstance(datum, str):
|
||||
pass # optimization: skip additional checks for strings
|
||||
str_repr = datum
|
||||
elif (isinstance(datum, (float, int, str, uuid.UUID))
|
||||
and not isinstance(datum, bool)):
|
||||
datum = str(datum)
|
||||
str_repr = str(datum)
|
||||
elif isinstance(datum, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
|
||||
datum = datum.exploded
|
||||
str_repr = datum.exploded
|
||||
elif isinstance(datum, array.array):
|
||||
str_repr = str(datum.tolist())
|
||||
elif (isinstance(datum, collections.abc.Hashable)
|
||||
and datum in self.options.literal_mapping):
|
||||
# the isinstance Hashable check confuses mypy
|
||||
datum = self.options.literal_mapping[datum] # type: ignore
|
||||
str_repr = self.options.literal_mapping[datum] # type: ignore
|
||||
elif isinstance(datum, (bytearray, bytes, memoryview)):
|
||||
return ''.join(char_map[c] for c in datum)
|
||||
elif isinstance(datum, type_info.SupportsIsoFormat):
|
||||
datum = datum.isoformat()
|
||||
str_repr = datum.isoformat()
|
||||
else:
|
||||
datum = str(datum)
|
||||
str_repr = str(datum)
|
||||
|
||||
return ''.join(char_map[c] for c in datum.encode(encoding))
|
||||
return ''.join(char_map[c] for c in str_repr.encode(encoding))
|
||||
|
||||
def _convert_to_tuple_sequence(
|
||||
self, value: type_info.Serializable
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import array
|
||||
import decimal
|
||||
import ipaddress
|
||||
import pathlib
|
||||
|
@ -42,7 +43,7 @@ Serializable = typing.Union[SupportsIsoFormat, None, bool, bytearray, bytes,
|
|||
typing.Sequence, typing.Set, uuid.UUID,
|
||||
decimal.Decimal, SupportsDataclassFields,
|
||||
ipaddress.IPv4Address, ipaddress.IPv6Address,
|
||||
pathlib.Path]
|
||||
pathlib.Path, array.array]
|
||||
"""Types that can be serialized by this library.
|
||||
|
||||
This is the set of types that
|
||||
|
|
60
tests.py
60
tests.py
|
@ -1,3 +1,4 @@
|
|||
import array
|
||||
import base64
|
||||
import dataclasses
|
||||
import datetime
|
||||
|
@ -41,7 +42,7 @@ class Context:
|
|||
self.settings = {}
|
||||
|
||||
|
||||
def pack_string(obj):
|
||||
def pack_string(obj) -> bytes:
|
||||
"""Optimally pack a string according to msgpack format"""
|
||||
payload = str(obj).encode('ASCII')
|
||||
pl = len(payload)
|
||||
|
@ -68,6 +69,22 @@ def pack_bytes(payload):
|
|||
return prefix + payload
|
||||
|
||||
|
||||
def pack_integer(payload):
|
||||
if payload >= 0:
|
||||
nbits = payload.bit_length()
|
||||
codes = [
|
||||
(7, b'', '>0sB'), # special case of no typecode
|
||||
(8, 0xCC, 'BB'),
|
||||
(16, 0xCD, '>BH'),
|
||||
(32, 0xCE, '>BL'),
|
||||
(64, 0xCF, '>BQ'),
|
||||
]
|
||||
for max_bits, typecode, fmt_str in codes:
|
||||
if nbits <= max_bits:
|
||||
return struct.pack(fmt_str, typecode, payload)
|
||||
raise RuntimeError(f'pack_integer cannot pack {payload!r}')
|
||||
|
||||
|
||||
class SendResponseTests(testing.AsyncHTTPTestCase):
|
||||
application: typing.Union[None, web.Application]
|
||||
|
||||
|
@ -379,6 +396,13 @@ class JSONTranscoderTests(unittest.TestCase):
|
|||
dumped = self.transcoder.dumps({'path': p})
|
||||
self.assertEqual(dumped, '{"path":"%s"}' % (p, ))
|
||||
|
||||
def test_that_array_is_supported(self):
|
||||
a = array.array('B')
|
||||
a.extend(range(255))
|
||||
dumped = self.transcoder.dumps({'array': a})
|
||||
self.assertEqual(dumped,
|
||||
'{"array":[%s]}' % (','.join(str(x) for x in a), ))
|
||||
|
||||
|
||||
class ContentSettingsTests(unittest.TestCase):
|
||||
def test_that_handler_listed_in_available_content_types(self):
|
||||
|
@ -622,6 +646,27 @@ class MsgPackTranscoderTests(unittest.TestCase):
|
|||
dumped = self.transcoder.packb(p)
|
||||
self.assertEqual(pack_string(str(p)), dumped)
|
||||
|
||||
def test_that_array_is_packed_as_array(self):
|
||||
a = array.array('B')
|
||||
a.extend(range(255))
|
||||
expected = struct.pack(
|
||||
'>BH',
|
||||
0xDC, # array of between 16 & (2^16)-1 elements
|
||||
len(a),
|
||||
)
|
||||
expected += b''.join(pack_integer(elm) for elm in a)
|
||||
self.assertEqual(expected, self.transcoder.packb(a))
|
||||
|
||||
# msgpack handling for an array of Unicode characters
|
||||
# is to pack them as a list of strings instead of a
|
||||
# list of integers
|
||||
data = 'hi there'
|
||||
a = array.array('u', data)
|
||||
expected = bytes([0x90 | len(data)])
|
||||
for ch in data:
|
||||
expected += pack_string(ch)
|
||||
self.assertEqual(expected, self.transcoder.packb(a))
|
||||
|
||||
|
||||
class FormUrlEncodingTranscoderTests(unittest.TestCase):
|
||||
transcoder: type_info.Transcoder
|
||||
|
@ -793,3 +838,16 @@ class FormUrlEncodingTranscoderTests(unittest.TestCase):
|
|||
p = pathlib.Path(__file__)
|
||||
_, result = self.transcoder.to_bytes({'path': p})
|
||||
self.assertEqual(f'path={str(p)}'.replace('/', '%2F').encode(), result)
|
||||
|
||||
def test_that_arrays_are_supported(self):
|
||||
self.transcoder: transcoders.FormUrlEncodedTranscoder
|
||||
|
||||
a = array.array('B', os.urandom(128))
|
||||
_, expected = self.transcoder.to_bytes({'array': a.tolist()})
|
||||
_, result = self.transcoder.to_bytes({'array': a})
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
self.transcoder.options.encode_sequences = True
|
||||
_, expected = self.transcoder.to_bytes({'array': a.tolist()})
|
||||
_, result = self.transcoder.to_bytes({'array': a})
|
||||
self.assertEqual(expected, result)
|
||||
|
|
Loading…
Reference in a new issue