Skip to content

Commit

Permalink
fix: Split up Django spatial filters
Browse files Browse the repository at this point in the history
Improved typing
  • Loading branch information
constantinius committed Nov 25, 2021
1 parent 6c3584b commit 484e0b3
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 31 deletions.
5 changes: 2 additions & 3 deletions pygeofilter/backends/django/evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,15 @@ def spatial_operation(self, node, lhs, rhs):

@handle(ast.Relate)
def spatial_pattern(self, node, lhs, rhs):
return filters.spatial(
return filters.spatial_relate(
lhs,
rhs,
'RELATE',
pattern=node.pattern,
)

@handle(ast.SpatialDistancePredicate, subclasses=True)
def spatial_distance(self, node, lhs, rhs):
return filters.spatial(
return filters.spatial_distance(
lhs,
rhs,
node.op.value,
Expand Down
69 changes: 41 additions & 28 deletions pygeofilter/backends/django/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def negate(sub_filter: Q) -> Q:
"=": "exact"
}

INVERT_COMP = {
INVERT_COMP: Dict[Optional[str], str] = {
"lt": "gt",
"lte": "gte",
"gt": "lt",
Expand All @@ -76,7 +76,7 @@ def negate(sub_filter: Q) -> Q:


def compare(lhs: Union[F, Value], rhs: Union[F, Value], op: str,
mapping_choices: Optional[Dict[str, str]] = None) -> Q:
mapping_choices: Optional[Dict[str, Dict[str, str]]] = None) -> Q:
""" Compare a filter with an expression using a comparison operation
:param lhs: the field to compare
Expand Down Expand Up @@ -144,7 +144,7 @@ def between(lhs: F, low: Union[F, Value], high: Union[F, Value],


def like(lhs: F, pattern: str, nocase: bool = False, not_: bool = False,
mapping_choices: Optional[Dict[str, str]] = None) -> Q:
mapping_choices: Optional[Dict[str, Dict[str, str]]] = None) -> Q:
""" Create a filter to filter elements according to a string attribute using
wildcard expressions.
Expand Down Expand Up @@ -228,7 +228,7 @@ def like(lhs: F, pattern: str, nocase: bool = False, not_: bool = False,


def contains(lhs: F, items: List[Union[F, Value]], not_: bool = False,
mapping_choices: Optional[Dict[str, str]] = None) -> Q:
mapping_choices: Optional[Dict[str, Dict[str, str]]] = None) -> Q:
""" Create a filter to match elements attribute to be in a list of choices.
:param lhs: the field to compare
Expand All @@ -245,19 +245,23 @@ def contains(lhs: F, items: List[Union[F, Value]], not_: bool = False,
:rtype: :class:`django.db.models.Q`
"""

if mapping_choices and lhs.name in mapping_choices:
def map_value(item):
if mapping_choices is not None and lhs.name in mapping_choices:
def map_value(item: Union[str, Value],
choices: Dict[str, str]) -> Union[str, Value]:
try:
if isinstance(item, str):
item = mapping_choices[lhs.name][item]
elif hasattr(item, 'value'):
item = Value(mapping_choices[lhs.name][item.value])
item = choices[item]
elif isinstance(item, Value):
item = Value(choices[item.value])

except KeyError as e:
raise AssertionError("Invalid field value %s" % e)
return item

items = map(map_value, items)
items = [
map_value(item, mapping_choices[lhs.name])
for item in items
]

q = Q(**{"%s__in" % lhs.name: items})
return ~q if not_ else q
Expand Down Expand Up @@ -417,11 +421,6 @@ def spatial(lhs: Union[F, Value], rhs: Union[F, Value], op: str,
"INTERSECTS", "DISJOINT", "CONTAINS", "WITHIN", "TOUCHES", "CROSSES",
"OVERLAPS", "EQUALS", "RELATE", "DWITHIN", "BEYOND"
)
if op == "RELATE":
assert pattern
elif op in ("DWITHIN", "BEYOND"):
assert distance
assert units

# if the left hand side is not a field reference, the comparison
# can be be inverted to try if the right hand side is a field
Expand All @@ -434,18 +433,33 @@ def spatial(lhs: Union[F, Value], rhs: Union[F, Value], op: str,
if not isinstance(lhs, F):
raise ValueError(f'Unable to compare non-field {lhs}')

if op in (
"INTERSECTS", "DISJOINT", "CONTAINS", "WITHIN", "TOUCHES",
"CROSSES", "OVERLAPS", "EQUALS"):
return Q(**{"%s__%s" % (lhs.name, op.lower()): rhs})
elif op == "RELATE":
return Q(**{"%s__relate" % lhs.name: (rhs, pattern)})
elif op in ("DWITHIN", "BEYOND"):
# TODO: maybe use D.unit_attname(units)
d = D(**{UNITS_LOOKUP[units]: distance})
if op == "DWITHIN":
return Q(**{"%s__distance_lte" % lhs.name: (rhs, d, 'spheroid')})
return Q(**{"%s__distance_gte" % lhs.name: (rhs, d, 'spheroid')})
return Q(**{"%s__%s" % (lhs.name, op.lower()): rhs})


def spatial_relate(lhs: Union[F, Value], rhs: Union[F, Value],
pattern: str) -> Q:

if not isinstance(lhs, F):
# TODO: cannot yet invert pattern -> raise
raise ValueError(f'Unable to compare non-field {lhs}')

return Q(**{"%s__relate" % lhs.name: (rhs, pattern)})


def spatial_distance(lhs: Union[F, Value], rhs: Union[F, Value], op: str,
distance: float, units: str) -> Q:
if not isinstance(lhs, F):
lhs, rhs = rhs, lhs

# if neither lhs and rhs are fields, we have to fail here
if not isinstance(lhs, F):
raise ValueError(f'Unable to compare non-field {lhs}')

# TODO: maybe use D.unit_attname(units)
d = D(**{UNITS_LOOKUP[units]: distance})
if op == "DWITHIN":
return Q(**{"%s__distance_lte" % lhs.name: (rhs, d, 'spheroid')})
return Q(**{"%s__distance_gte" % lhs.name: (rhs, d, 'spheroid')})


def bbox(lhs: F, minx: float, miny: float, maxx, maxy: float,
Expand Down Expand Up @@ -484,7 +498,6 @@ def attribute(name: str, field_mapping: Optional[Dict[str, str]] = None) -> F:
:param name: the field filter name
:type name: str
:param field_mapping: the dictionary to use as a lookup.
:type mapping_choices: dict[str, str]
:rtype: :class:`django.db.models.F`
"""
if field_mapping:
Expand Down

0 comments on commit 484e0b3

Please sign in to comment.