1
0
mirror of https://github.com/django/django.git synced 2025-07-04 01:39:20 +00:00

gis: Added the gml() and union() GeoQuerySet routines w/tests.

git-svn-id: http://code.djangoproject.com/svn/django/branches/gis@6441 bcc190cf-cafb-0310-a4f2-bffc1f526a37
This commit is contained in:
Justin Bronn 2007-09-30 17:44:13 +00:00
parent 091ba8c2ea
commit 41709bf315
5 changed files with 144 additions and 50 deletions

View File

@ -17,6 +17,9 @@ from django.db.models.query import field_choices, find_field, get_where_clause,
FieldFound, LOOKUP_SEPARATOR, QUERY_TERMS FieldFound, LOOKUP_SEPARATOR, QUERY_TERMS
from django.utils.datastructures import SortedDict from django.utils.datastructures import SortedDict
# These routines default to False
ASGML, ASKML, UNION = (False, False, False)
if settings.DATABASE_ENGINE == 'postgresql_psycopg2': if settings.DATABASE_ENGINE == 'postgresql_psycopg2':
# PostGIS is the spatial database, getting the rquired modules, renaming as necessary. # PostGIS is the spatial database, getting the rquired modules, renaming as necessary.
from django.contrib.gis.db.backend.postgis import \ from django.contrib.gis.db.backend.postgis import \

View File

@ -7,8 +7,14 @@ class GeoManager(Manager):
def get_query_set(self): def get_query_set(self):
return GeoQuerySet(model=self.model) return GeoQuerySet(model=self.model)
def kml(self, field_name, **kwargs): def gml(self, *args, **kwargs):
return self.get_query_set().kml(field_name, **kwargs) return self.get_query_set().gml(*args, **kwargs)
def transform(self, field_name, **kwargs): def kml(self, *args, **kwargs):
return self.get_query_set().transform(field_name, **kwargs) return self.get_query_set().kml(*args, **kwargs)
def transform(self, *args, **kwargs):
return self.get_query_set().transform(*args, **kwargs)
def union(self, *args, **kwargs):
return self.get_query_set().union(*args, **kwargs)

View File

@ -1,11 +1,13 @@
import operator import operator
from django.core.exceptions import ImproperlyConfigured from django.core.exceptions import ImproperlyConfigured
from django.db import connection from django.db import connection
from django.db.models.query import Q, QuerySet, handle_legacy_orderlist, quote_only_if_word, orderfield2column, fill_table_cache from django.db.models.query import EmptyResultSet, Q, QuerySet, handle_legacy_orderlist, quote_only_if_word, orderfield2column, fill_table_cache
from django.db.models.fields import FieldDoesNotExist from django.db.models.fields import FieldDoesNotExist
from django.utils.datastructures import SortedDict from django.utils.datastructures import SortedDict
from django.contrib.gis.db.models.fields import GeometryField from django.contrib.gis.db.models.fields import GeometryField
from django.contrib.gis.db.backend import parse_lookup # parse_lookup depends on the spatial database backend. # parse_lookup depends on the spatial database backend.
from django.contrib.gis.db.backend import parse_lookup, ASGML, ASKML, UNION
from django.contrib.gis.geos import GEOSGeometry
class GeoQ(Q): class GeoQ(Q):
"Geographical query encapsulation object." "Geographical query encapsulation object."
@ -143,34 +145,66 @@ class GeoQuerySet(QuerySet):
#### Methods specific to the GeoQuerySet #### #### Methods specific to the GeoQuerySet ####
def _field_column(self, field): def _field_column(self, field):
"Helper function that returns the database column for the given field."
qn = connection.ops.quote_name qn = connection.ops.quote_name
return "%s.%s" % (qn(self.model._meta.db_table), return "%s.%s" % (qn(self.model._meta.db_table),
qn(field.column)) qn(field.column))
def kml(self, field_name, precision=8): def _geo_column(self, field_name):
"""Returns KML representation of the given field name in a `kml` """
attribute on each element of the QuerySet.""" Helper function that returns False when the given field name is not an
# Is KML output supported? instance of a GeographicField, otherwise, the database column for the
try: geographic field is returned.
from django.contrib.gis.db.backend.postgis import ASKML """
except ImportError: field = self.model._meta.get_field(field_name)
raise ImproperlyConfigured, 'AsKML() only available in PostGIS versions 1.2.1 and greater.' if isinstance(field, GeometryField):
return self._field_column(field)
else:
return False
def gml(self, field_name, precision=8, version=2):
"""
Returns GML representation of the given field in a `gml` attribute
on each element of the GeoQuerySet.
"""
# Is GML output supported?
if not ASGML:
raise ImproperlyConfigured('AsGML() stored procedure not available.')
# Is the given field name a geographic field? # Is the given field name a geographic field?
field = self.model._meta.get_field(field_name) field_col = self._geo_column(field_name)
if not isinstance(field, GeometryField): if not field_col:
raise TypeError, 'KML output only available on GeometryField fields.' raise TypeError('GML output only available on GeometryFields')
field_col = self._field_column(field)
# Adding AsGML function call to SELECT part of the SQL.
return self.extra(select={'gml':'%s(%s,%s,%s)' % (ASGML, field_col, precision, version)})
def kml(self, field_name, precision=8):
"""
Returns KML representation of the given field name in a `kml`
attribute on each element of the GeoQuerySet.
"""
# Is KML output supported?
if not ASKML:
raise ImproperlyConfigured('AsKML() stored procedure not available.')
# Is the given field name a geographic field?
field_col = self._geo_column(field_name)
if not field_col:
raise TypeError('KML output only available on GeometryFields.')
# Adding the AsKML function call to the SELECT part of the SQL. # Adding the AsKML function call to SELECT part of the SQL.
return self.extra(select={'kml':'%s(%s,%s)' % (ASKML, field_col, precision)}) return self.extra(select={'kml':'%s(%s,%s)' % (ASKML, field_col, precision)})
def transform(self, field_name, srid=4326): def transform(self, field_name, srid=4326):
"""Transforms the given geometry field to the given SRID. If no SRID is """
provided, the transformation will default to using 4326 (WGS84).""" Transforms the given geometry field to the given SRID. If no SRID is
provided, the transformation will default to using 4326 (WGS84).
"""
# Is the given field name a geographic field?
field = self.model._meta.get_field(field_name) field = self.model._meta.get_field(field_name)
if not isinstance(field, GeometryField): if not isinstance(field, GeometryField):
raise TypeError, 'ST_Transform() only available for GeometryField fields.' raise TypeError('ST_Transform() only available for GeometryFields')
# Setting the key for the field's column with the custom SELECT SQL to # Setting the key for the field's column with the custom SELECT SQL to
# override the geometry column returned from the database. # override the geometry column returned from the database.
@ -179,4 +213,33 @@ class GeoQuerySet(QuerySet):
connection.ops.quote_name(field.column)) connection.ops.quote_name(field.column))
return self._clone() return self._clone()
def union(self, field_name):
"""
Performs an aggregate union on the given geometry field. Returns
None if the GeoQuerySet is empty.
"""
# Making sure backend supports the Union stored procedure
if not UNION:
raise ImproperlyConfigured('Union stored procedure not available.')
# Getting the geographic field column
field_col = self._geo_column(field_name)
if not field_col:
raise TypeError('Aggregate Union only available on GeometryFields.')
# Getting the SQL for the query.
try:
select, sql, params = self._get_sql_clause()
except EmptyResultSet:
return None
# Replacing the select with a call to the ST_Union stored procedure
# on the geographic field column.
union_sql = ('SELECT %s(%s)' % (UNION, field_col)) + sql
cursor = connection.cursor()
cursor.execute(union_sql, params)
# Pulling the HEXEWKB from the returned cursor.
hex = cursor.fetchone()[0]
if hex: return GEOSGeometry(hex)
else: return None

View File

@ -3,7 +3,6 @@ from copy import copy
from unittest import TestSuite, TextTestRunner from unittest import TestSuite, TextTestRunner
from django.contrib.gis.gdal import HAS_GDAL from django.contrib.gis.gdal import HAS_GDAL
# Tests that do not require setting up and tearing down a spatial database. # Tests that do not require setting up and tearing down a spatial database.
test_suite_names = [ test_suite_names = [
'test_geos', 'test_geos',
@ -37,8 +36,7 @@ def run(verbosity=1):
def run_tests(module_list, verbosity=1, interactive=True): def run_tests(module_list, verbosity=1, interactive=True):
""" """
Run the tests that require creation of a spatial database. Does not Run the tests that require creation of a spatial database.
yet work on Windows platforms.
In order to run geographic model tests the DATABASE_USER will require In order to run geographic model tests the DATABASE_USER will require
superuser priviliges. To accomplish this outside the `postgres` user, superuser priviliges. To accomplish this outside the `postgres` user,
@ -49,17 +47,24 @@ def run_tests(module_list, verbosity=1, interactive=True):
/path/to/user/db) to change the database port (e.g. `port = 5433`). /path/to/user/db) to change the database port (e.g. `port = 5433`).
(3) Start this database `pg_ctl -D /path/to/user/db start` (3) Start this database `pg_ctl -D /path/to/user/db start`
Make sure your settings.py matches the settings of the user database. For example, set the On Windows platforms simply use the pgAdmin III utility to add superuser
same port number (`DATABASE_PORT=5433`). DATABASE_NAME or TEST_DATABSE_NAME must be set, priviliges to your database user.
along with DATABASE_USER.
Make sure your settings.py matches the settings of the user database.
For example, set the same port number (`DATABASE_PORT=5433`).
DATABASE_NAME or TEST_DATABSE_NAME must be set, along with DATABASE_USER.
In settings.py set TEST_RUNNER='django.contrib.gis.tests.run_tests'. In settings.py set TEST_RUNNER='django.contrib.gis.tests.run_tests'.
Finally, this assumes that the PostGIS SQL files (lwpostgis.sql and spatial_ref_sys.sql) Finally, this assumes that the PostGIS SQL files (lwpostgis.sql and
are installed in /usr/local/share. If they are not, add `POSTGIS_SQL_PATH=/path/to/sql` spatial_ref_sys.sql) are installed in the directory specified by
in your settings.py. `pg_config --sharedir` (and defaults to /usr/local/share if that fails).
This behavior is overridden if `POSTGIS_SQL_PATH` is in your settings.
Windows users should use the POSTGIS_SQL_PATH because the output
of `pg_config` uses paths like 'C:/PROGRA~1/POSTGR~1/..'.
The tests may be run by invoking `./manage.py test`. Finally, the tests may be run by invoking `./manage.py test`.
""" """
from django.conf import settings from django.conf import settings
from django.contrib.gis.db.backend import create_spatial_db from django.contrib.gis.db.backend import create_spatial_db

View File

@ -6,7 +6,6 @@ class GeoModelTest(unittest.TestCase):
def test01_initial_sql(self): def test01_initial_sql(self):
"Testing geographic initial SQL." "Testing geographic initial SQL."
# Ensuring that data was loaded from initial SQL. # Ensuring that data was loaded from initial SQL.
self.assertEqual(2, Country.objects.count()) self.assertEqual(2, Country.objects.count())
self.assertEqual(8, City.objects.count()) self.assertEqual(8, City.objects.count())
@ -71,23 +70,28 @@ class GeoModelTest(unittest.TestCase):
self.assertEqual(ply, State.objects.get(name='NullState').poly) self.assertEqual(ply, State.objects.get(name='NullState').poly)
nullstate.delete() nullstate.delete()
def test03_kml(self): def test03a_kml(self):
"Testing KML output from the database using GeoManager.kml()." "Testing KML output from the database using GeoManager.kml()."
# Should throw an error trying to get KML from a non-geometry field. # Should throw a TypeError when trying to obtain KML from a
try: # non-geometry field.
qs = City.objects.all().kml('name') qs = City.objects.all()
except TypeError: self.assertRaises(TypeError, qs.kml, 'name')
pass
else:
self.fail('Expected a TypeError exception')
# Ensuring the KML is as expected. # Ensuring the KML is as expected.
ptown = City.objects.kml('point', precision=9).get(name='Pueblo') ptown = City.objects.kml('point', precision=9).get(name='Pueblo')
self.assertEqual('<Point><coordinates>-104.609252,38.255001,0</coordinates></Point>', ptown.kml) self.assertEqual('<Point><coordinates>-104.609252,38.255001,0</coordinates></Point>', ptown.kml)
def test04_transform(self): def test03b_gml(self):
"Testing the transform() queryset method." "Testing GML output from the database using GeoManager.gml()."
# Should throw a TypeError when tyring to obtain GML from a
# non-geometry field.
qs = City.objects.all()
self.assertRaises(TypeError, qs.gml, 'name')
ptown = City.objects.gml('point', precision=9).get(name='Pueblo')
self.assertEqual('<gml:Point srsName="EPSG:4326"><gml:coordinates>-104.609252,38.255001</gml:coordinates></gml:Point>', ptown.gml)
def test04_transform(self):
"Testing the transform() GeoManager method."
# Pre-transformed points for Houston and Pueblo. # Pre-transformed points for Houston and Pueblo.
htown = fromstr('POINT(1947516.83115183 6322297.06040572)', srid=3084) htown = fromstr('POINT(1947516.83115183 6322297.06040572)', srid=3084)
ptown = fromstr('POINT(992363.390841912 481455.395105533)', srid=2774) ptown = fromstr('POINT(992363.390841912 481455.395105533)', srid=2774)
@ -103,8 +107,7 @@ class GeoModelTest(unittest.TestCase):
self.assertAlmostEqual(ptown.y, p.point.y, 8) self.assertAlmostEqual(ptown.y, p.point.y, 8)
def test10_contains_contained(self): def test10_contains_contained(self):
"Testing the 'contained' and 'contains' lookup types." "Testing the 'contained', 'contains', and 'bbcontains' lookup types."
# Getting Texas, yes we were a country -- once ;) # Getting Texas, yes we were a country -- once ;)
texas = Country.objects.get(name='Texas') texas = Country.objects.get(name='Texas')
@ -137,9 +140,13 @@ class GeoModelTest(unittest.TestCase):
self.assertEqual(0, len(Country.objects.filter(mpoly__contains=pueblo.point))) # Query w/GEOSGeometry object self.assertEqual(0, len(Country.objects.filter(mpoly__contains=pueblo.point))) # Query w/GEOSGeometry object
self.assertEqual(0, len(Country.objects.filter(mpoly__contains=okcity.point.wkt))) # Qeury w/WKT self.assertEqual(0, len(Country.objects.filter(mpoly__contains=okcity.point.wkt))) # Qeury w/WKT
# OK City is contained w/in bounding box of Texas.
qs = Country.objects.filter(mpoly__bbcontains=okcity.point)
self.assertEqual(1, len(qs))
self.assertEqual('Texas', qs[0].name)
def test11_lookup_insert_transform(self): def test11_lookup_insert_transform(self):
"Testing automatic transform for lookups and inserts." "Testing automatic transform for lookups and inserts."
# San Antonio in 'WGS84' (SRID 4326) and 'NAD83(HARN) / Texas Centric Lambert Conformal' (SRID 3084) # San Antonio in 'WGS84' (SRID 4326) and 'NAD83(HARN) / Texas Centric Lambert Conformal' (SRID 3084)
sa_4326 = 'POINT (-98.493183 29.424170)' sa_4326 = 'POINT (-98.493183 29.424170)'
sa_3084 = 'POINT (1645978.362408288754523 6276356.025927528738976)' # Used ogr.py in gdal 1.4.1 for this transform sa_3084 = 'POINT (1645978.362408288754523 6276356.025927528738976)' # Used ogr.py in gdal 1.4.1 for this transform
@ -161,7 +168,6 @@ class GeoModelTest(unittest.TestCase):
def test12_null_geometries(self): def test12_null_geometries(self):
"Testing NULL geometry support." "Testing NULL geometry support."
# Querying for both NULL and Non-NULL values. # Querying for both NULL and Non-NULL values.
nullqs = State.objects.filter(poly__isnull=True) nullqs = State.objects.filter(poly__isnull=True)
validqs = State.objects.filter(poly__isnull=False) validqs = State.objects.filter(poly__isnull=False)
@ -182,7 +188,6 @@ class GeoModelTest(unittest.TestCase):
def test13_left_right(self): def test13_left_right(self):
"Testing the 'left' and 'right' lookup types." "Testing the 'left' and 'right' lookup types."
# Left: A << B => true if xmax(A) < xmin(B) # Left: A << B => true if xmax(A) < xmin(B)
# Right: A >> B => true if xmin(A) > xmax(B) # Right: A >> B => true if xmin(A) > xmax(B)
# See: BOX2D_left() and BOX2D_right() in lwgeom_box2dfloat4.c in PostGIS source. # See: BOX2D_left() and BOX2D_right() in lwgeom_box2dfloat4.c in PostGIS source.
@ -261,6 +266,18 @@ class GeoModelTest(unittest.TestCase):
c = City() c = City()
self.assertEqual(c.point, None) self.assertEqual(c.point, None)
def test17_union(self):
"Testing the union() GeoManager method."
tx = Country.objects.get(name='Texas').mpoly
# Houston, Dallas, San Antonio
union = fromstr('MULTIPOINT(-98.493183 29.424170,-96.801611 32.782057,-95.363151 29.763374)')
qs = City.objects.filter(point__within=tx)
self.assertRaises(TypeError, qs.union, 'name')
u = qs.union('point')
self.assertEqual(True, union.equals_exact(u, 10)) # Going up to 10 digits of precision.
qs = City.objects.filter(name='NotACity')
self.assertEqual(None, qs.union('point'))
def suite(): def suite():
s = unittest.TestSuite() s = unittest.TestSuite()
s.addTest(unittest.makeSuite(GeoModelTest)) s.addTest(unittest.makeSuite(GeoModelTest))