2013-07-29 19:19:04 +02:00
|
|
|
from __future__ import unicode_literals
|
2011-10-17 18:45:22 +00:00
|
|
|
|
2011-09-17 19:54:52 +00:00
|
|
|
import os
|
2014-08-21 18:47:57 +02:00
|
|
|
import re
|
2013-07-01 14:22:27 +02:00
|
|
|
from unittest import skipUnless
|
2011-10-17 18:45:22 +00:00
|
|
|
|
2015-01-28 07:35:27 -05:00
|
|
|
from django.contrib.gis.gdal import HAS_GDAL
|
2013-09-12 10:30:45 +02:00
|
|
|
from django.core.management import call_command
|
2014-09-17 17:58:20 -04:00
|
|
|
from django.db import connection, connections
|
2014-08-17 19:06:25 +02:00
|
|
|
from django.test import TestCase, skipUnlessDBFeature
|
2013-09-12 10:30:45 +02:00
|
|
|
from django.utils.six import StringIO
|
2011-10-17 18:45:22 +00:00
|
|
|
|
2015-02-10 10:07:44 -05:00
|
|
|
from ..test_data import TEST_DATA
|
|
|
|
|
2013-05-10 23:08:45 -04:00
|
|
|
if HAS_GDAL:
|
2014-12-17 22:00:05 +01:00
|
|
|
from django.contrib.gis.gdal import Driver, GDALException
|
2013-05-10 23:08:45 -04:00
|
|
|
from django.contrib.gis.utils.ogrinspect import ogrinspect
|
2011-09-17 19:54:52 +00:00
|
|
|
|
2013-05-10 23:08:45 -04:00
|
|
|
from .models import AllOGRFields
|
2011-09-17 19:54:52 +00:00
|
|
|
|
2013-05-10 23:08:45 -04:00
|
|
|
|
2014-08-17 19:06:25 +02:00
|
|
|
@skipUnless(HAS_GDAL, "InspectDbTests needs GDAL support")
|
|
|
|
@skipUnlessDBFeature("gis_enabled")
|
2013-09-12 10:30:45 +02:00
|
|
|
class InspectDbTests(TestCase):
|
|
|
|
def test_geom_columns(self):
|
|
|
|
"""
|
|
|
|
Test the geo-enabled inspectdb command.
|
|
|
|
"""
|
|
|
|
out = StringIO()
|
2014-11-03 22:03:59 +01:00
|
|
|
call_command(
|
|
|
|
'inspectdb',
|
|
|
|
table_name_filter=lambda tn: tn == 'inspectapp_allogrfields',
|
|
|
|
stdout=out
|
|
|
|
)
|
2013-09-12 10:30:45 +02:00
|
|
|
output = out.getvalue()
|
2014-09-17 17:58:20 -04:00
|
|
|
if connection.features.supports_geometry_field_introspection:
|
|
|
|
self.assertIn('geom = models.PolygonField()', output)
|
|
|
|
self.assertIn('point = models.PointField()', output)
|
|
|
|
else:
|
|
|
|
self.assertIn('geom = models.GeometryField(', output)
|
|
|
|
self.assertIn('point = models.GeometryField(', output)
|
2013-09-12 10:30:45 +02:00
|
|
|
self.assertIn('objects = models.GeoManager()', output)
|
|
|
|
|
2014-11-03 22:03:59 +01:00
|
|
|
@skipUnlessDBFeature("supports_3d_storage")
|
|
|
|
def test_3d_columns(self):
|
|
|
|
out = StringIO()
|
|
|
|
call_command(
|
|
|
|
'inspectdb',
|
|
|
|
table_name_filter=lambda tn: tn == 'inspectapp_fields3d',
|
|
|
|
stdout=out
|
|
|
|
)
|
|
|
|
output = out.getvalue()
|
|
|
|
if connection.features.supports_geometry_field_introspection:
|
|
|
|
self.assertIn('point = models.PointField(dim=3)', output)
|
|
|
|
self.assertIn('line = models.LineStringField(dim=3)', output)
|
|
|
|
self.assertIn('poly = models.PolygonField(dim=3)', output)
|
|
|
|
else:
|
|
|
|
self.assertIn('point = models.GeometryField(', output)
|
|
|
|
self.assertIn('line = models.GeometryField(', output)
|
|
|
|
self.assertIn('poly = models.GeometryField(', output)
|
|
|
|
self.assertIn('objects = models.GeoManager()', output)
|
|
|
|
|
2013-09-12 10:30:45 +02:00
|
|
|
|
2014-08-17 19:06:25 +02:00
|
|
|
@skipUnless(HAS_GDAL, "OGRInspectTest needs GDAL support")
|
|
|
|
@skipUnlessDBFeature("gis_enabled")
|
2011-09-17 19:54:52 +00:00
|
|
|
class OGRInspectTest(TestCase):
|
2013-02-01 09:34:39 +01:00
|
|
|
maxDiff = 1024
|
|
|
|
|
2011-09-17 19:54:52 +00:00
|
|
|
def test_poly(self):
|
|
|
|
shp_file = os.path.join(TEST_DATA, 'test_poly', 'test_poly.shp')
|
|
|
|
model_def = ogrinspect(shp_file, 'MyModel')
|
|
|
|
|
|
|
|
expected = [
|
|
|
|
'# This is an auto-generated Django model module created by ogrinspect.',
|
|
|
|
'from django.contrib.gis.db import models',
|
|
|
|
'',
|
|
|
|
'class MyModel(models.Model):',
|
|
|
|
' float = models.FloatField()',
|
|
|
|
' int = models.FloatField()',
|
|
|
|
' str = models.CharField(max_length=80)',
|
|
|
|
' geom = models.PolygonField(srid=-1)',
|
|
|
|
' objects = models.GeoManager()',
|
|
|
|
]
|
|
|
|
|
|
|
|
self.assertEqual(model_def, '\n'.join(expected))
|
|
|
|
|
|
|
|
def test_date_field(self):
|
|
|
|
shp_file = os.path.join(TEST_DATA, 'cities', 'cities.shp')
|
|
|
|
model_def = ogrinspect(shp_file, 'City')
|
|
|
|
|
|
|
|
expected = [
|
|
|
|
'# This is an auto-generated Django model module created by ogrinspect.',
|
|
|
|
'from django.contrib.gis.db import models',
|
|
|
|
'',
|
|
|
|
'class City(models.Model):',
|
|
|
|
' name = models.CharField(max_length=80)',
|
|
|
|
' population = models.FloatField()',
|
|
|
|
' density = models.FloatField()',
|
|
|
|
' created = models.DateField()',
|
|
|
|
' geom = models.PointField(srid=-1)',
|
|
|
|
' objects = models.GeoManager()',
|
|
|
|
]
|
|
|
|
|
|
|
|
self.assertEqual(model_def, '\n'.join(expected))
|
|
|
|
|
|
|
|
def test_time_field(self):
|
|
|
|
# Getting the database identifier used by OGR, if None returned
|
|
|
|
# GDAL does not have the support compiled in.
|
|
|
|
ogr_db = get_ogr_db_string()
|
|
|
|
if not ogr_db:
|
2014-08-21 18:47:57 +02:00
|
|
|
self.skipTest("Unable to setup an OGR connection to your database")
|
2011-09-17 19:54:52 +00:00
|
|
|
|
2014-08-21 18:47:57 +02:00
|
|
|
try:
|
|
|
|
# Writing shapefiles via GDAL currently does not support writing OGRTime
|
|
|
|
# fields, so we need to actually use a database
|
|
|
|
model_def = ogrinspect(ogr_db, 'Measurement',
|
|
|
|
layer_key=AllOGRFields._meta.db_table,
|
|
|
|
decimal=['f_decimal'])
|
2014-12-17 22:00:05 +01:00
|
|
|
except GDALException:
|
2014-08-21 18:47:57 +02:00
|
|
|
self.skipTest("Unable to setup an OGR connection to your database")
|
2011-09-17 19:54:52 +00:00
|
|
|
|
2013-02-02 14:00:38 +01:00
|
|
|
self.assertTrue(model_def.startswith(
|
|
|
|
'# This is an auto-generated Django model module created by ogrinspect.\n'
|
|
|
|
'from django.contrib.gis.db import models\n'
|
|
|
|
'\n'
|
|
|
|
'class Measurement(models.Model):\n'
|
|
|
|
))
|
|
|
|
|
|
|
|
# The ordering of model fields might vary depending on several factors (version of GDAL, etc.)
|
|
|
|
self.assertIn(' f_decimal = models.DecimalField(max_digits=0, decimal_places=0)', model_def)
|
|
|
|
self.assertIn(' f_int = models.IntegerField()', model_def)
|
|
|
|
self.assertIn(' f_datetime = models.DateTimeField()', model_def)
|
|
|
|
self.assertIn(' f_time = models.TimeField()', model_def)
|
|
|
|
self.assertIn(' f_float = models.FloatField()', model_def)
|
|
|
|
self.assertIn(' f_char = models.CharField(max_length=10)', model_def)
|
|
|
|
self.assertIn(' f_date = models.DateField()', model_def)
|
|
|
|
|
2014-08-21 18:47:57 +02:00
|
|
|
self.assertIsNotNone(re.search(
|
|
|
|
r' geom = models.PolygonField\(([^\)])*\)\n' # Some backends may have srid=-1
|
|
|
|
r' objects = models.GeoManager\(\)', model_def))
|
2011-09-17 19:54:52 +00:00
|
|
|
|
2013-12-31 14:36:45 +01:00
|
|
|
def test_management_command(self):
|
|
|
|
shp_file = os.path.join(TEST_DATA, 'cities', 'cities.shp')
|
|
|
|
out = StringIO()
|
|
|
|
call_command('ogrinspect', shp_file, 'City', stdout=out)
|
|
|
|
output = out.getvalue()
|
|
|
|
self.assertIn('class City(models.Model):', output)
|
|
|
|
|
2011-09-17 19:54:52 +00:00
|
|
|
|
|
|
|
def get_ogr_db_string():
|
2013-04-15 20:27:58 +02:00
|
|
|
"""
|
|
|
|
Construct the DB string that GDAL will use to inspect the database.
|
|
|
|
GDAL will create its own connection to the database, so we re-use the
|
|
|
|
connection settings from the Django test.
|
|
|
|
"""
|
2011-09-17 19:54:52 +00:00
|
|
|
db = connections.databases['default']
|
|
|
|
|
|
|
|
# Map from the django backend into the OGR driver name and database identifier
|
|
|
|
# http://www.gdal.org/ogr/ogr_formats.html
|
|
|
|
#
|
2013-04-15 20:27:58 +02:00
|
|
|
# TODO: Support Oracle (OCI).
|
2011-09-17 19:54:52 +00:00
|
|
|
drivers = {
|
2013-04-15 20:27:58 +02:00
|
|
|
'django.contrib.gis.db.backends.postgis': ('PostgreSQL', "PG:dbname='%(db_name)s'", ' '),
|
|
|
|
'django.contrib.gis.db.backends.mysql': ('MySQL', 'MYSQL:"%(db_name)s"', ','),
|
|
|
|
'django.contrib.gis.db.backends.spatialite': ('SQLite', '%(db_name)s', '')
|
2011-09-17 19:54:52 +00:00
|
|
|
}
|
|
|
|
|
2014-09-17 17:34:28 -04:00
|
|
|
db_engine = db['ENGINE']
|
|
|
|
if db_engine not in drivers:
|
|
|
|
return None
|
|
|
|
|
|
|
|
drv_name, db_str, param_sep = drivers[db_engine]
|
2011-09-17 19:54:52 +00:00
|
|
|
|
|
|
|
# Ensure that GDAL library has driver support for the database.
|
|
|
|
try:
|
|
|
|
Driver(drv_name)
|
|
|
|
except:
|
|
|
|
return None
|
|
|
|
|
2013-04-15 20:27:58 +02:00
|
|
|
# SQLite/Spatialite in-memory databases
|
|
|
|
if db['NAME'] == ":memory:":
|
|
|
|
return None
|
|
|
|
|
2011-09-17 19:54:52 +00:00
|
|
|
# Build the params of the OGR database connection string
|
2013-04-15 20:27:58 +02:00
|
|
|
params = [db_str % {'db_name': db['NAME']}]
|
2013-10-22 11:21:07 +01:00
|
|
|
|
2011-09-17 19:54:52 +00:00
|
|
|
def add(key, template):
|
|
|
|
value = db.get(key, None)
|
|
|
|
# Don't add the parameter if it is not in django's settings
|
|
|
|
if value:
|
|
|
|
params.append(template % value)
|
|
|
|
add('HOST', "host='%s'")
|
|
|
|
add('PORT', "port='%s'")
|
|
|
|
add('USER', "user='%s'")
|
|
|
|
add('PASSWORD', "password='%s'")
|
|
|
|
|
2013-04-15 20:27:58 +02:00
|
|
|
return param_sep.join(params)
|