WIP: Fix the types

This commit is contained in:
Correl Roush 2021-03-25 10:21:17 -04:00 committed by Correl
parent 1dec15bee9
commit 340e35aee1
10 changed files with 81 additions and 19 deletions

9
functor.py Normal file
View file

@ -0,0 +1,9 @@
from __future__ import annotations
from typing import Any, Callable, Protocol, TypeVar
T = TypeVar("T")
S = TypeVar("S")
class Functor(Protocol[T, S]):
def __fmap__(self, function: Callable[[T], S]) -> Functor[S]:
...

View file

@ -2,6 +2,7 @@ from __future__ import annotations
from typing import Any, Callable, TypeVar
from .functor import Functor
from .tools import flip
T = TypeVar("T")
S = TypeVar("S")
@ -16,7 +17,5 @@ class Applicative(Functor[T]):
# (Functor[Callable[[T], S]]) is reported as incompatible with subclass
# implementations due to a flaw in mypy:
# https://github.com/python/mypy/issues/1317
def apply(self, functor: Any) -> Functor[S]: # pragma: no cover
def apply(self, functor: Functor[Callable[[T], S]]) -> Functor[S]: # pragma: no cover
raise NotImplementedError
__and__ = lambda other, self: Applicative.apply(self, other)

View file

@ -2,6 +2,7 @@ from __future__ import annotations
import functools
from typing import Awaitable, Callable, Iterable, List, TypeVar, Union
from .monad import Monad
from .tools import flip
T = TypeVar("T")
S = TypeVar("S")
@ -54,12 +55,12 @@ class Future(Monad[T]):
future: Future[T] = x if isinstance(x, Future) else Future(x)
return acc.bind(lambda acc_: future.map(lambda x_: acc_ + [x_]))
empty: Future[List[T]] = cls.pure([])
empty: Future[List[T]] = Future.pure([])
return functools.reduce(mcons, xs, empty)
def __await__(self):
return self.awaitable.__await__()
__rshift__ = bind
__and__ = lambda other, self: Future.apply(self, other)
__rand__ = apply
__mul__ = __rmul__ = map

View file

@ -5,6 +5,7 @@ from typing import Callable, Iterable, List as _List, TypeVar
from .monad import Monad
from .monoid import Monoidal
from .tools import flip
T = TypeVar("T")
S = TypeVar("S")
@ -33,7 +34,7 @@ class List(Monad[T], Monoidal[list]):
def mcons(acc: List[_List[T]], x: List[T]) -> List[_List[T]]:
return acc.bind(lambda acc_: x.map(lambda x_: acc_ + [x_]))
empty: List[_List[T]] = cls.pure([])
empty: List[_List[T]] = List.pure([])
return reduce(mcons, xs, empty)
@classmethod
@ -44,6 +45,6 @@ class List(Monad[T], Monoidal[list]):
return List(self.value + other.value)
__add__ = mappend
__and__ = lambda other, self: List.apply(self, other)
__rand__ = apply
__mul__ = __rmul__ = map
__rshift__ = bind

View file

@ -4,6 +4,7 @@ from typing import Any, Callable, Generic, Iterable, List, Optional, TypeVar
from . import result
from .monad import Monad
from .monoid import Monoid
from .tools import flip
T = TypeVar("T")
S = TypeVar("S")
@ -46,7 +47,7 @@ class Maybe(Monad[T]):
def mcons(acc: Maybe[List[T]], x: Maybe[T]) -> Maybe[List[T]]:
return acc.bind(lambda acc_: x.map(lambda x_: acc_ + [x_]))
empty: Maybe[List[T]] = cls.pure([])
empty: Maybe[List[T]] = Maybe.pure([])
return functools.reduce(mcons, xs, empty)
def withDefault(self, default: T) -> T:
@ -86,9 +87,16 @@ class Maybe(Monad[T]):
return Nothing()
__rshift__ = bind
__and__ = lambda other, self: Maybe.apply(self, other)
__mul__ = __rmul__ = map
def __and__(self: Maybe[Callable[[T], S]], other: Maybe[T]) -> Maybe[S]:
if isinstance(self, Just) and not callable(self.value):
return NotImplemented
return other.apply(self)
def __rand__(self, functor: Maybe[Callable[[T], S]]) -> Maybe[S]:
return self.apply(functor)
class Just(Maybe[T]):
def __init__(self, value: T) -> None:

View file

@ -4,6 +4,7 @@ import inspect
from typing import Any, Callable, Generic, Iterable, List, TypeVar
from .monad import Monad
from .tools import flip
T = TypeVar("T")
S = TypeVar("S")
@ -58,7 +59,7 @@ class Reader(Monad[T], Generic[Env, T]):
def mcons(acc: Reader[Env, List[T]], x: Reader[Env, T]) -> Reader[Env, List[T]]:
return acc.bind(lambda acc_: x.map(lambda x_: acc_ + [x_]))
empty: Reader[Env, List[T]] = cls.pure([])
empty: Reader[Env, List[T]] = Reader.pure([])
return reduce(mcons, xs, empty)
def __eq__(self, other: object): # pragma: no cover
@ -72,4 +73,4 @@ class Reader(Monad[T], Generic[Env, T]):
__mul__ = __rmul__ = map
__rshift__ = bind
__and__ = lambda other, self: Reader.apply(self, other)
__rand__ = apply

View file

@ -4,6 +4,7 @@ from typing import Any, Callable, Generic, Iterable, List, Optional, TypeVar
from . import maybe
from .monad import Monad
from .tools import flip
T = TypeVar("T")
S = TypeVar("S")
@ -60,7 +61,7 @@ class Result(Monad[T], Generic[T, E]):
def mcons(acc: Result[List[T], E], x: Result[T, E]) -> Result[List[T], E]:
return acc.bind(lambda acc_: x.map(lambda x_: acc_ + [x_]))
empty: Result[List[T], E] = cls.pure([])
empty: Result[List[T], E] = Result.pure([])
return functools.reduce(mcons, xs, empty)
def withDefault(self, default: T) -> T:
@ -90,7 +91,23 @@ class Result(Monad[T], Generic[T, E]):
return None
__rshift__ = bind
__and__ = lambda other, self: Result.apply(self, other)
def and(self: Result[Callable[[T], S], E], other: Result[T, E]) -> Result[S, E]:
if isinstance(self, Ok) and not callable(self.value):
return other.rand(self)
return other.apply(self)
def rand(self, functor: Result[Callable[[T], S], E]) -> Result[S, E]:
return self.apply(functor)
def __and__(self: Result[Callable[[T], S], E], other: Result[T, E]) -> Result[S, E]:
if isinstance(self, Ok) and not callable(self.value):
return NotImplemented
return other.apply(self)
def __rand__(self, functor: Result[Callable[[T], S], E]) -> Result[S, E]:
return self.apply(functor)
__mul__ = __rmul__ = map

16
monads/tools.py Normal file
View file

@ -0,0 +1,16 @@
from functools import wraps
from typing import Callable, TypeVar
A = TypeVar("A")
B = TypeVar("B")
C = TypeVar("C")
def flip(function: Callable[[A, B], C]) -> Callable[[B, A], C]:
"""Flip the arguments of a 2-argument function."""
@wraps(function)
def flipped(b: B, a: A) -> C:
return function(a, b)
return flipped

View file

@ -1,18 +1,26 @@
import pytest # type: ignore
from typing import Callable, List
from typing import Callable, List, TypeVar
from monads.maybe import Maybe, Just, Nothing, maybe, first, last
from monads.result import Ok, Err
T = TypeVar("T")
def identity(x: T) -> T:
return x
def test_types() -> None:
m: Maybe[int] = Maybe.pure(1)
map: Maybe[int] = m.map(lambda x: x)
map_operator: Maybe[int] = m * (lambda x: x)
lifted_identity: Maybe[Callable[[int], int]] = Maybe.pure(identity)
map: Maybe[int] = m.map(identity)
map_operator: Maybe[int] = m * identity
bind: Maybe[int] = m.bind(lambda x: Maybe.pure(x))
bind_operator: Maybe[int] = m >> (lambda x: Maybe.pure(x))
apply: Maybe[int] = m.apply(Maybe.pure(lambda x: x))
apply_operator: Maybe[int] = Maybe.pure(lambda x: x) & m
apply: Maybe[int] = m.apply(Maybe.pure(identity))
apply_operator: Maybe[int] = lifted_identity & m
sequence: Maybe[List[int]] = Maybe.sequence([m])

View file

@ -7,12 +7,14 @@ from monads.result import Result, Ok, Err, safe
def test_types() -> None:
m: Result[int, str] = Result.pure(1)
increment: Callable[[int], int] = lambda x: x + 1
lifted_increment: Result[Callable[[int], int], str] = Result.pure(increment)
map: Result[int, str] = m.map(lambda x: x)
map_operator: Result[int, str] = m * (lambda x: x)
bind: Result[int, str] = m.bind(lambda x: Result.pure(x))
bind_operator: Result[int, str] = m >> (lambda x: Result.pure(x))
apply: Result[int, str] = m.apply(Result.pure(lambda x: x))
apply_operator: Result[int, str] = Result.pure(lambda x: x) & m
apply_operator: Result[int, str] = lifted_increment & m
sequence: Result[List[int], str] = Result.sequence([m])