mirror of
				https://github.com/django/django.git
				synced 2025-10-31 09:41:08 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			457 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			457 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import json
 | |
| 
 | |
| from django.contrib.gis.db.models.fields import BaseSpatialField
 | |
| from django.contrib.gis.db.models.functions import Distance
 | |
| from django.contrib.gis.db.models.lookups import DistanceLookupBase, GISLookup
 | |
| from django.contrib.gis.gdal import GDALRaster
 | |
| from django.contrib.gis.geos import GEOSGeometry
 | |
| from django.contrib.gis.measure import D
 | |
| from django.contrib.gis.shortcuts import numpy
 | |
| from django.db import connection
 | |
| from django.db.models import F, Func, Q
 | |
| from django.test import TransactionTestCase, skipUnlessDBFeature
 | |
| from django.test.utils import CaptureQueriesContext
 | |
| 
 | |
| from ..data.rasters.textrasters import JSON_RASTER
 | |
| from .models import RasterModel, RasterRelatedModel
 | |
| 
 | |
| 
 | |
| @skipUnlessDBFeature("supports_raster")
 | |
| class RasterFieldTest(TransactionTestCase):
 | |
|     available_apps = ["gis_tests.rasterapp"]
 | |
| 
 | |
|     def setUp(self):
 | |
|         rast = GDALRaster(
 | |
|             {
 | |
|                 "srid": 4326,
 | |
|                 "origin": [0, 0],
 | |
|                 "scale": [-1, 1],
 | |
|                 "skew": [0, 0],
 | |
|                 "width": 5,
 | |
|                 "height": 5,
 | |
|                 "nr_of_bands": 2,
 | |
|                 "bands": [{"data": range(25)}, {"data": range(25, 50)}],
 | |
|             }
 | |
|         )
 | |
|         model_instance = RasterModel.objects.create(
 | |
|             rast=rast,
 | |
|             rastprojected=rast,
 | |
|             geom="POINT (-95.37040 29.70486)",
 | |
|         )
 | |
|         RasterRelatedModel.objects.create(rastermodel=model_instance)
 | |
| 
 | |
|     def test_field_null_value(self):
 | |
|         """
 | |
|         Test creating a model where the RasterField has a null value.
 | |
|         """
 | |
|         r = RasterModel.objects.create(rast=None)
 | |
|         r.refresh_from_db()
 | |
|         self.assertIsNone(r.rast)
 | |
| 
 | |
|     def test_access_band_data_directly_from_queryset(self):
 | |
|         RasterModel.objects.create(rast=JSON_RASTER)
 | |
|         qs = RasterModel.objects.all()
 | |
|         qs[0].rast.bands[0].data()
 | |
| 
 | |
|     def test_deserialize_with_pixeltype_flags(self):
 | |
|         no_data = 3
 | |
|         rast = GDALRaster(
 | |
|             {
 | |
|                 "srid": 4326,
 | |
|                 "origin": [0, 0],
 | |
|                 "scale": [-1, 1],
 | |
|                 "skew": [0, 0],
 | |
|                 "width": 1,
 | |
|                 "height": 1,
 | |
|                 "nr_of_bands": 1,
 | |
|                 "bands": [{"data": [no_data], "nodata_value": no_data}],
 | |
|             }
 | |
|         )
 | |
|         r = RasterModel.objects.create(rast=rast)
 | |
|         RasterModel.objects.filter(pk=r.pk).update(
 | |
|             rast=Func(F("rast"), function="ST_SetBandIsNoData"),
 | |
|         )
 | |
|         r.refresh_from_db()
 | |
|         band = r.rast.bands[0].data()
 | |
|         if numpy:
 | |
|             band = band.flatten().tolist()
 | |
|         self.assertEqual(band, [no_data])
 | |
|         self.assertEqual(r.rast.bands[0].nodata_value, no_data)
 | |
| 
 | |
|     def test_model_creation(self):
 | |
|         """
 | |
|         Test RasterField through a test model.
 | |
|         """
 | |
|         # Create model instance from JSON raster
 | |
|         r = RasterModel.objects.create(rast=JSON_RASTER)
 | |
|         r.refresh_from_db()
 | |
|         # Test raster metadata properties
 | |
|         self.assertEqual((5, 5), (r.rast.width, r.rast.height))
 | |
|         self.assertEqual([0.0, -1.0, 0.0, 0.0, 0.0, 1.0], r.rast.geotransform)
 | |
|         self.assertIsNone(r.rast.bands[0].nodata_value)
 | |
|         # Compare srs
 | |
|         self.assertEqual(r.rast.srs.srid, 4326)
 | |
|         # Compare pixel values
 | |
|         band = r.rast.bands[0].data()
 | |
|         # If numpy, convert result to list
 | |
|         if numpy:
 | |
|             band = band.flatten().tolist()
 | |
|         # Loop through rows in band data and assert single
 | |
|         # value is as expected.
 | |
|         self.assertEqual(
 | |
|             [
 | |
|                 0.0,
 | |
|                 1.0,
 | |
|                 2.0,
 | |
|                 3.0,
 | |
|                 4.0,
 | |
|                 5.0,
 | |
|                 6.0,
 | |
|                 7.0,
 | |
|                 8.0,
 | |
|                 9.0,
 | |
|                 10.0,
 | |
|                 11.0,
 | |
|                 12.0,
 | |
|                 13.0,
 | |
|                 14.0,
 | |
|                 15.0,
 | |
|                 16.0,
 | |
|                 17.0,
 | |
|                 18.0,
 | |
|                 19.0,
 | |
|                 20.0,
 | |
|                 21.0,
 | |
|                 22.0,
 | |
|                 23.0,
 | |
|                 24.0,
 | |
|             ],
 | |
|             band,
 | |
|         )
 | |
| 
 | |
|     def test_implicit_raster_transformation(self):
 | |
|         """
 | |
|         Test automatic transformation of rasters with srid different from the
 | |
|         field srid.
 | |
|         """
 | |
|         # Parse json raster
 | |
|         rast = json.loads(JSON_RASTER)
 | |
|         # Update srid to another value
 | |
|         rast["srid"] = 3086
 | |
|         # Save model and get it from db
 | |
|         r = RasterModel.objects.create(rast=rast)
 | |
|         r.refresh_from_db()
 | |
|         # Confirm raster has been transformed to the default srid
 | |
|         self.assertEqual(r.rast.srs.srid, 4326)
 | |
|         # Confirm geotransform is in lat/lon
 | |
|         expected = [
 | |
|             -87.9298551266551,
 | |
|             9.459646421449934e-06,
 | |
|             0.0,
 | |
|             23.94249275457565,
 | |
|             0.0,
 | |
|             -9.459646421449934e-06,
 | |
|         ]
 | |
|         for val, exp in zip(r.rast.geotransform, expected):
 | |
|             self.assertAlmostEqual(exp, val)
 | |
| 
 | |
|     def test_verbose_name_arg(self):
 | |
|         """
 | |
|         RasterField should accept a positional verbose name argument.
 | |
|         """
 | |
|         self.assertEqual(
 | |
|             RasterModel._meta.get_field("rast").verbose_name, "A Verbose Raster Name"
 | |
|         )
 | |
| 
 | |
|     def test_all_gis_lookups_with_rasters(self):
 | |
|         """
 | |
|         Evaluate all possible lookups for all input combinations (i.e.
 | |
|         raster-raster, raster-geom, geom-raster) and for projected and
 | |
|         unprojected coordinate systems. This test just checks that the lookup
 | |
|         can be called, but doesn't check if the result makes logical sense.
 | |
|         """
 | |
|         from django.contrib.gis.db.backends.postgis.operations import PostGISOperations
 | |
| 
 | |
|         # Create test raster and geom.
 | |
|         rast = GDALRaster(json.loads(JSON_RASTER))
 | |
|         stx_pnt = GEOSGeometry("POINT (-95.370401017314293 29.704867409475465)", 4326)
 | |
|         stx_pnt.transform(3086)
 | |
| 
 | |
|         lookups = [
 | |
|             (name, lookup)
 | |
|             for name, lookup in BaseSpatialField.get_lookups().items()
 | |
|             if issubclass(lookup, GISLookup)
 | |
|         ]
 | |
|         self.assertNotEqual(lookups, [], "No lookups found")
 | |
|         # Loop through all the GIS lookups.
 | |
|         for name, lookup in lookups:
 | |
|             # Construct lookup filter strings.
 | |
|             combo_keys = [
 | |
|                 field + name
 | |
|                 for field in [
 | |
|                     "rast__",
 | |
|                     "rast__",
 | |
|                     "rastprojected__0__",
 | |
|                     "rast__",
 | |
|                     "rastprojected__",
 | |
|                     "geom__",
 | |
|                     "rast__",
 | |
|                 ]
 | |
|             ]
 | |
|             if issubclass(lookup, DistanceLookupBase):
 | |
|                 # Set lookup values for distance lookups.
 | |
|                 combo_values = [
 | |
|                     (rast, 50, "spheroid"),
 | |
|                     (rast, 0, 50, "spheroid"),
 | |
|                     (rast, 0, D(km=1)),
 | |
|                     (stx_pnt, 0, 500),
 | |
|                     (stx_pnt, D(km=1000)),
 | |
|                     (rast, 500),
 | |
|                     (json.loads(JSON_RASTER), 500),
 | |
|                 ]
 | |
|             elif name == "relate":
 | |
|                 # Set lookup values for the relate lookup.
 | |
|                 combo_values = [
 | |
|                     (rast, "T*T***FF*"),
 | |
|                     (rast, 0, "T*T***FF*"),
 | |
|                     (rast, 0, "T*T***FF*"),
 | |
|                     (stx_pnt, 0, "T*T***FF*"),
 | |
|                     (stx_pnt, "T*T***FF*"),
 | |
|                     (rast, "T*T***FF*"),
 | |
|                     (json.loads(JSON_RASTER), "T*T***FF*"),
 | |
|                 ]
 | |
|             elif name == "isvalid":
 | |
|                 # The isvalid lookup doesn't make sense for rasters.
 | |
|                 continue
 | |
|             elif PostGISOperations.gis_operators[name].func:
 | |
|                 # Set lookup values for all function based operators.
 | |
|                 combo_values = [
 | |
|                     rast,
 | |
|                     (rast, 0),
 | |
|                     (rast, 0),
 | |
|                     (stx_pnt, 0),
 | |
|                     stx_pnt,
 | |
|                     rast,
 | |
|                     json.loads(JSON_RASTER),
 | |
|                 ]
 | |
|             else:
 | |
|                 # Override band lookup for these, as it's not supported.
 | |
|                 combo_keys[2] = "rastprojected__" + name
 | |
|                 # Set lookup values for all other operators.
 | |
|                 combo_values = [
 | |
|                     rast,
 | |
|                     None,
 | |
|                     rast,
 | |
|                     stx_pnt,
 | |
|                     stx_pnt,
 | |
|                     rast,
 | |
|                     json.loads(JSON_RASTER),
 | |
|                 ]
 | |
| 
 | |
|             # Create query filter combinations.
 | |
|             self.assertEqual(
 | |
|                 len(combo_keys),
 | |
|                 len(combo_values),
 | |
|                 "Number of lookup names and values should be the same",
 | |
|             )
 | |
|             combos = [x for x in zip(combo_keys, combo_values) if x[1]]
 | |
|             self.assertEqual(
 | |
|                 [(n, x) for n, x in enumerate(combos) if x in combos[:n]],
 | |
|                 [],
 | |
|                 "There are repeated test lookups",
 | |
|             )
 | |
|             combos = [{k: v} for k, v in combos]
 | |
| 
 | |
|             for combo in combos:
 | |
|                 # Apply this query filter.
 | |
|                 qs = RasterModel.objects.filter(**combo)
 | |
| 
 | |
|                 # Evaluate normal filter qs.
 | |
|                 self.assertIn(qs.count(), [0, 1])
 | |
| 
 | |
|             # Evaluate on conditional Q expressions.
 | |
|             qs = RasterModel.objects.filter(Q(**combos[0]) & Q(**combos[1]))
 | |
|             self.assertIn(qs.count(), [0, 1])
 | |
| 
 | |
|     def test_dwithin_gis_lookup_output_with_rasters(self):
 | |
|         """
 | |
|         Check the logical functionality of the dwithin lookup for different
 | |
|         input parameters.
 | |
|         """
 | |
|         # Create test raster and geom.
 | |
|         rast = GDALRaster(json.loads(JSON_RASTER))
 | |
|         stx_pnt = GEOSGeometry("POINT (-95.370401017314293 29.704867409475465)", 4326)
 | |
|         stx_pnt.transform(3086)
 | |
| 
 | |
|         # Filter raster with different lookup raster formats.
 | |
|         qs = RasterModel.objects.filter(rastprojected__dwithin=(rast, D(km=1)))
 | |
|         self.assertEqual(qs.count(), 1)
 | |
| 
 | |
|         qs = RasterModel.objects.filter(
 | |
|             rastprojected__dwithin=(json.loads(JSON_RASTER), D(km=1))
 | |
|         )
 | |
|         self.assertEqual(qs.count(), 1)
 | |
| 
 | |
|         qs = RasterModel.objects.filter(rastprojected__dwithin=(JSON_RASTER, D(km=1)))
 | |
|         self.assertEqual(qs.count(), 1)
 | |
| 
 | |
|         # Filter in an unprojected coordinate system.
 | |
|         qs = RasterModel.objects.filter(rast__dwithin=(rast, 40))
 | |
|         self.assertEqual(qs.count(), 1)
 | |
| 
 | |
|         # Filter with band index transform.
 | |
|         qs = RasterModel.objects.filter(rast__1__dwithin=(rast, 1, 40))
 | |
|         self.assertEqual(qs.count(), 1)
 | |
|         qs = RasterModel.objects.filter(rast__1__dwithin=(rast, 40))
 | |
|         self.assertEqual(qs.count(), 1)
 | |
|         qs = RasterModel.objects.filter(rast__dwithin=(rast, 1, 40))
 | |
|         self.assertEqual(qs.count(), 1)
 | |
| 
 | |
|         # Filter raster by geom.
 | |
|         qs = RasterModel.objects.filter(rast__dwithin=(stx_pnt, 500))
 | |
|         self.assertEqual(qs.count(), 1)
 | |
| 
 | |
|         qs = RasterModel.objects.filter(rastprojected__dwithin=(stx_pnt, D(km=10000)))
 | |
|         self.assertEqual(qs.count(), 1)
 | |
| 
 | |
|         qs = RasterModel.objects.filter(rast__dwithin=(stx_pnt, 5))
 | |
|         self.assertEqual(qs.count(), 0)
 | |
| 
 | |
|         qs = RasterModel.objects.filter(rastprojected__dwithin=(stx_pnt, D(km=100)))
 | |
|         self.assertEqual(qs.count(), 0)
 | |
| 
 | |
|         # Filter geom by raster.
 | |
|         qs = RasterModel.objects.filter(geom__dwithin=(rast, 500))
 | |
|         self.assertEqual(qs.count(), 1)
 | |
| 
 | |
|         # Filter through related model.
 | |
|         qs = RasterRelatedModel.objects.filter(rastermodel__rast__dwithin=(rast, 40))
 | |
|         self.assertEqual(qs.count(), 1)
 | |
| 
 | |
|         # Filter through related model with band index transform
 | |
|         qs = RasterRelatedModel.objects.filter(rastermodel__rast__1__dwithin=(rast, 40))
 | |
|         self.assertEqual(qs.count(), 1)
 | |
| 
 | |
|         # Filter through conditional statements.
 | |
|         qs = RasterModel.objects.filter(
 | |
|             Q(rast__dwithin=(rast, 40))
 | |
|             & Q(rastprojected__dwithin=(stx_pnt, D(km=10000)))
 | |
|         )
 | |
|         self.assertEqual(qs.count(), 1)
 | |
| 
 | |
|         # Filter through different lookup.
 | |
|         qs = RasterModel.objects.filter(rastprojected__bbcontains=rast)
 | |
|         self.assertEqual(qs.count(), 1)
 | |
| 
 | |
|     def test_lookup_input_tuple_too_long(self):
 | |
|         rast = GDALRaster(json.loads(JSON_RASTER))
 | |
|         msg = "Tuple too long for lookup bbcontains."
 | |
|         with self.assertRaisesMessage(ValueError, msg):
 | |
|             RasterModel.objects.filter(rast__bbcontains=(rast, 1, 2))
 | |
| 
 | |
|     def test_lookup_input_band_not_allowed(self):
 | |
|         rast = GDALRaster(json.loads(JSON_RASTER))
 | |
|         qs = RasterModel.objects.filter(rast__bbcontains=(rast, 1))
 | |
|         msg = "Band indices are not allowed for this operator, it works on bbox only."
 | |
|         with self.assertRaisesMessage(ValueError, msg):
 | |
|             qs.count()
 | |
| 
 | |
|     def test_isvalid_lookup_with_raster_error(self):
 | |
|         qs = RasterModel.objects.filter(rast__isvalid=True)
 | |
|         msg = (
 | |
|             "IsValid function requires a GeometryField in position 1, got RasterField."
 | |
|         )
 | |
|         with self.assertRaisesMessage(TypeError, msg):
 | |
|             qs.count()
 | |
| 
 | |
|     def test_result_of_gis_lookup_with_rasters(self):
 | |
|         # Point is in the interior
 | |
|         qs = RasterModel.objects.filter(
 | |
|             rast__contains=GEOSGeometry("POINT (-0.5 0.5)", 4326)
 | |
|         )
 | |
|         self.assertEqual(qs.count(), 1)
 | |
|         # Point is in the exterior
 | |
|         qs = RasterModel.objects.filter(
 | |
|             rast__contains=GEOSGeometry("POINT (0.5 0.5)", 4326)
 | |
|         )
 | |
|         self.assertEqual(qs.count(), 0)
 | |
|         # A point on the boundary is not contained properly
 | |
|         qs = RasterModel.objects.filter(
 | |
|             rast__contains_properly=GEOSGeometry("POINT (0 0)", 4326)
 | |
|         )
 | |
|         self.assertEqual(qs.count(), 0)
 | |
|         # Raster is located left of the point
 | |
|         qs = RasterModel.objects.filter(rast__left=GEOSGeometry("POINT (1 0)", 4326))
 | |
|         self.assertEqual(qs.count(), 1)
 | |
| 
 | |
|     def test_lookup_with_raster_bbox(self):
 | |
|         rast = GDALRaster(json.loads(JSON_RASTER))
 | |
|         # Shift raster upward
 | |
|         rast.origin.y = 2
 | |
|         # The raster in the model is not strictly below
 | |
|         qs = RasterModel.objects.filter(rast__strictly_below=rast)
 | |
|         self.assertEqual(qs.count(), 0)
 | |
|         # Shift raster further upward
 | |
|         rast.origin.y = 6
 | |
|         # The raster in the model is strictly below
 | |
|         qs = RasterModel.objects.filter(rast__strictly_below=rast)
 | |
|         self.assertEqual(qs.count(), 1)
 | |
| 
 | |
|     def test_lookup_with_polygonized_raster(self):
 | |
|         rast = GDALRaster(json.loads(JSON_RASTER))
 | |
|         # Move raster to overlap with the model point on the left side
 | |
|         rast.origin.x = -95.37040 + 1
 | |
|         rast.origin.y = 29.70486
 | |
|         # Raster overlaps with point in model
 | |
|         qs = RasterModel.objects.filter(geom__intersects=rast)
 | |
|         self.assertEqual(qs.count(), 1)
 | |
|         # Change left side of raster to be nodata values
 | |
|         rast.bands[0].data(data=[0, 0, 0, 1, 1], shape=(5, 1))
 | |
|         rast.bands[0].nodata_value = 0
 | |
|         qs = RasterModel.objects.filter(geom__intersects=rast)
 | |
|         # Raster does not overlap anymore after polygonization
 | |
|         # where the nodata zone is not included.
 | |
|         self.assertEqual(qs.count(), 0)
 | |
| 
 | |
|     def test_lookup_value_error(self):
 | |
|         # Test with invalid dict lookup parameter
 | |
|         obj = {}
 | |
|         msg = "Couldn't create spatial object from lookup value '%s'." % obj
 | |
|         with self.assertRaisesMessage(ValueError, msg):
 | |
|             RasterModel.objects.filter(geom__intersects=obj)
 | |
|         # Test with invalid string lookup parameter
 | |
|         obj = "00000"
 | |
|         msg = "Couldn't create spatial object from lookup value '%s'." % obj
 | |
|         with self.assertRaisesMessage(ValueError, msg):
 | |
|             RasterModel.objects.filter(geom__intersects=obj)
 | |
| 
 | |
|     def test_db_function_errors(self):
 | |
|         """
 | |
|         Errors are raised when using DB functions with raster content.
 | |
|         """
 | |
|         point = GEOSGeometry("SRID=3086;POINT (-697024.9213808845 683729.1705516104)")
 | |
|         rast = GDALRaster(json.loads(JSON_RASTER))
 | |
|         msg = "Distance function requires a geometric argument in position 2."
 | |
|         with self.assertRaisesMessage(TypeError, msg):
 | |
|             RasterModel.objects.annotate(distance_from_point=Distance("geom", rast))
 | |
|         with self.assertRaisesMessage(TypeError, msg):
 | |
|             RasterModel.objects.annotate(
 | |
|                 distance_from_point=Distance("rastprojected", rast)
 | |
|             )
 | |
|         msg = (
 | |
|             "Distance function requires a GeometryField in position 1, got RasterField."
 | |
|         )
 | |
|         with self.assertRaisesMessage(TypeError, msg):
 | |
|             RasterModel.objects.annotate(
 | |
|                 distance_from_point=Distance("rastprojected", point)
 | |
|             ).count()
 | |
| 
 | |
|     def test_lhs_with_index_rhs_without_index(self):
 | |
|         with CaptureQueriesContext(connection) as queries:
 | |
|             RasterModel.objects.filter(
 | |
|                 rast__0__contains=json.loads(JSON_RASTER)
 | |
|             ).exists()
 | |
|         # It's easier to check the indexes in the generated SQL than to write
 | |
|         # tests that cover all index combinations.
 | |
|         self.assertRegex(queries[-1]["sql"], r"WHERE ST_Contains\([^)]*, 1, [^)]*, 1\)")
 |