"""Raster and grid I/O utilities for GeoTIFF and GMT NetCDF formats."""
import geopandas as gpd
import numpy as np
import os
from netCDF4 import Dataset
import rasterio
from rasterio.warp import calculate_default_transform, reproject, Resampling
[docs]
def read_grid(raster_file: str, nan_threshold: float | int = 100,
window: str | list | tuple | np.ndarray = None,
buffer: float | int = 100, return_meta: bool = False):
"""
Read a raster file into coordinate and value arrays.
Supports optional spatial windowing by bounding box or shapefile
extent. Values with absolute magnitude above ``nan_threshold`` are
replaced with NaN.
Parameters
----------
raster_file :
Path to the raster file (any format supported by rasterio).
nan_threshold :
Values with ``|z| > nan_threshold`` are set to NaN. Defaults
to 100.
window :
Optional spatial subset. May be a path to a vector file (whose
bounding box is used), or a 4-element sequence
``[x_min, y_min, x_max, y_max]`` in the raster's CRS.
buffer :
Extra margin (in CRS units) added around the window bounds.
Defaults to 100.
return_meta :
If ``True``, also return the rasterio metadata dict updated to
reflect the windowed read.
Returns
-------
x : numpy.ndarray
1-D array of easting (column-centre) coordinates.
y : numpy.ndarray
1-D array of northing (row-centre) coordinates.
z : numpy.ndarray of shape (len(y), len(x))
Data values with out-of-range entries set to NaN.
geo_meta : dict, optional
Rasterio metadata dict (only returned when ``return_meta=True``).
Raises
------
AssertionError
If ``raster_file`` does not exist, the window sequence does not
have exactly 4 elements, or ``x_max <= x_min`` / ``y_max <= y_min``.
"""
assert os.path.exists(raster_file)
raster = rasterio.open(raster_file)
no_data = raster.nodata
if window is not None:
if isinstance(window, str):
assert os.path.exists(window)
shp = gpd.GeoDataFrame.from_file(window)
x1, y1, x2, y2 = np.array(shp.bounds)[0]
else:
assert len(window) == 4
x1, y1, x2, y2 = window[:]
assert all([x2 > x1, y2 > y1])
buffer_value = 0 if buffer is None else buffer
if x1 < raster.bounds.left:
x1 = np.round(raster.bounds.left + 1)
lowest = min([raster.bounds.bottom, raster.bounds.top])
if y1 < lowest:
y1 = np.round(lowest + -1)
if raster.bounds.top < raster.bounds.bottom:
window_object = rasterio.windows.from_bounds(x1 - buffer_value, y2 + buffer_value,
x2 + buffer_value, y1 - buffer_value,
transform=raster.transform)
else:
window_object = rasterio.windows.from_bounds(x1 - buffer_value, y1 - buffer_value,
x2 + buffer_value, y2 + buffer_value,
transform=raster.transform)
z = raster.read(1, window=window_object)
geo_transform = raster.window_transform(window_object)
else:
z = raster.read(1)
# Get affine transform from raster
geo_transform = raster.transform
if return_meta:
geo_meta = raster.meta.copy()
geo_meta.update({"width": z.shape[1],
"height": z.shape[0],
"transform": geo_transform})
else:
geo_meta = None
# extract origin, pixel width and pixel height from affine transform
origin_x = geo_transform[2]
origin_y = geo_transform[5]
pixel_width = geo_transform[0]
pixel_height = geo_transform[4]
# Width and height of raster in pixels
cols = z.shape[1]
rows = z.shape[0]
# Turn into X and Y coordinates
x = origin_x + pixel_width * np.arange(cols)
y = origin_y + pixel_height * np.arange(rows)
# Set values above threshold to NaN
if no_data != np.nan:
z[z < -nan_threshold] = np.NaN
z[z > nan_threshold] = np.NaN
# Close raster
raster.close()
if return_meta:
return x, y, z, geo_meta
else:
return x, y, z
[docs]
def read_tiff(tiff: str, x_correct: float = None, y_correct: float = None, nan_threshold: float | int = 9000,
make_y_ascending: bool = False, window: str | list | tuple | np.ndarray = None,
buffer: float | int=100, return_meta: bool = False):
"""
Read a GeoTIFF file into coordinate and value arrays.
Thin wrapper around :func:`read_grid` with GeoTIFF-specific defaults
and optional coordinate offset correction.
Parameters
----------
tiff :
Path to the GeoTIFF file.
x_correct :
Offset added to x coordinates to correct for pixel vs. gridline
registration differences (GMT convention).
y_correct :
Offset added to y coordinates (same purpose as ``x_correct``).
nan_threshold :
Values with ``|z| > nan_threshold`` are set to NaN. Defaults
to 9000 (suitable for elevation data in metres).
make_y_ascending :
If ``True`` and ``y[0] > y[-1]``, reverse y and the corresponding
rows of z so that y increases monotonically.
window :
Optional spatial subset passed to :func:`read_grid`.
buffer :
Buffer added around the window bounds in CRS units.
return_meta :
If ``True``, also return the rasterio metadata dict.
Returns
-------
x : numpy.ndarray
1-D easting coordinate array.
y : numpy.ndarray
1-D northing coordinate array.
z : numpy.ndarray of shape (len(y), len(x))
Data values.
geo_meta : dict, optional
Rasterio metadata (only returned when ``return_meta=True``).
Raises
------
AssertionError
If ``tiff`` does not exist.
"""
assert os.path.exists(tiff)
# Read grid into x, y ,z
if return_meta:
x, y, z, geo_meta = read_grid(tiff, nan_threshold=nan_threshold, window=window, buffer=buffer,
return_meta=return_meta)
else:
x, y, z = read_grid(tiff, nan_threshold=nan_threshold, window=window, buffer=buffer)
geo_meta = None
if make_y_ascending and y[0] > y[-1]:
y = y[::-1]
z = z[::-1, :]
# If necessary, sort out problems with pixel vs. gridline registration (need to supply values)
if x_correct is not None:
x += x_correct
print("Adjusting for GMT-Tiff difference")
if y_correct is not None:
y += y_correct
if return_meta:
return x, y, z, geo_meta
else:
return x, y, z
[docs]
def read_gmt_grid(grd_file: str):
"""
Read a GMT-compatible NetCDF4 grid file.
Reads variables named ``x``, ``y``, and ``z`` from the file and
fills any masked values with NaN. Faster than using rasterio for
NetCDF grids.
Parameters
----------
grd_file :
Path to the GMT ``.grd`` NetCDF4 file.
Returns
-------
x : numpy.ndarray
1-D array of x (easting) coordinate values.
y : numpy.ndarray
1-D array of y (northing) coordinate values.
z : numpy.ndarray of shape (len(y), len(x))
Grid data values; masked values replaced with NaN.
Raises
------
AssertionError
If ``grd_file`` does not exist or if the file does not contain
variables named ``x``, ``y``, and ``z``.
"""
assert os.path.exists(grd_file)
# Open dataset and check that x,y,z are variable names
fid = Dataset(grd_file, "r")
assert all([a in fid.variables for a in ("x", "y", "z")])
xm, ym, zm = [fid.variables[a][:] for a in ("x", "y", "z")]
fid.close()
# Check arrays are either numpy arrays or masked arrays
for a in (xm, ym, zm):
assert any([isinstance(a, b) for b in (np.ma.MaskedArray, np.ndarray)])
x = xm if isinstance(xm, np.ndarray) else xm.filled(fill_value=np.nan)
y = ym if isinstance(ym, np.ndarray) else ym.filled(fill_value=np.nan)
z = zm if isinstance(zm, np.ndarray) else zm.filled(fill_value=np.nan)
return np.array(x), np.array(y), np.array(z)
[docs]
def write_tiff(filename: str, x: np.ndarray, y: np.ndarray, z: np.ndarray, epsg: int = 2193, reverse_y: bool = False,
compress_lzw: bool = True):
"""
Write coordinate and data arrays to a GeoTIFF file.
Parameters
----------
filename :
Output GeoTIFF file path.
x :
1-D array of easting coordinates (column centres).
y :
1-D array of northing coordinates (row centres).
z : numpy.ndarray of shape (len(y), len(x))
Data values to write to band 1.
epsg :
EPSG code for the output coordinate reference system. Defaults
to 2193 (NZTM2000).
reverse_y :
If ``True``, write the GeoTIFF with y decreasing (top-to-bottom),
as preferred by some GIS applications.
compress_lzw :
If ``True`` (default), apply LZW compression.
Raises
------
AssertionError
If ``x`` or ``y`` are not 1-D, or if ``z.shape != (len(y), len(x))``.
"""
# Check data have correct dimensions
assert all([a.ndim == 1 for a in (x, y)])
assert z.shape == (len(y), len(x))
# Change into y-ascending format (reverse_y option changes it back later)
if y[0] > y[-1]:
y = y[::-1]
z = z[::-1, :]
# To allow writing in correct format
z = np.array(z, dtype=np.float64)
# Calculate x and y spacing
x_spacing = (max(x) - min(x)) / (len(x) - 1)
y_spacing = (max(y) - min(y)) / (len(y) - 1)
# Set affine transform from x and y
if reverse_y:
# Sometimes GIS prefer y values to descend.
transform = rasterio.transform.Affine(x_spacing, 0., min(x), 0., -1 * y_spacing, max(y))
else:
transform = rasterio.transform.Affine(x_spacing, 0., min(x), 0., y_spacing, min(y))
# create tiff profile (no. bands, data type etc.)
profile = rasterio.profiles.DefaultGTiffProfile(count=1, dtype=np.float64, transform=transform, width=len(x),
height=len(y))
# Set coordinate system if specified
if epsg is not None:
if epsg not in [2193, 4326, 32759, 32760, 27200]:
print("EPSG:{:d} Not a recognised NZ coordinate system...".format(epsg))
print("Writing anyway...")
crs = rasterio.crs.CRS.from_epsg(epsg)
profile["crs"] = crs
# Add compression if desired
if compress_lzw:
profile["compress"] = "lzw"
# Open raster file for writing
fid = rasterio.open(filename, "w", **profile)
# Write z to band one (depending whether y ascending/descending required).
if reverse_y:
fid.write(z[-1::-1], 1)
else:
fid.write(z, 1)
# Close file
fid.close()
[docs]
def write_gmt_grd(x_array: np.ndarray, y_array: np.ndarray, mesh: np.ndarray, grid_name: str):
"""
Write coordinate and data arrays to a GMT-compatible NetCDF4 grid file.
Parameters
----------
x_array :
1-D array of x (easting) coordinate values.
y_array :
1-D array of y (northing) coordinate values.
mesh : numpy.ndarray of shape (len(y_array), len(x_array))
Data values (z) to store in the ``z`` variable.
grid_name :
Output file path, typically ending in ``.grd``.
Raises
------
AssertionError
If ``x_array`` or ``y_array`` are not 1-D, or if
``mesh.shape != (len(y_array), len(x_array))``.
"""
assert all([a.ndim == 1 for a in (x_array, y_array)])
assert mesh.shape == (len(y_array), len(x_array))
# Reverse y axis if necessary
if y_array[0] > y_array[-1]:
y_array = y_array[::-1]
mesh = mesh[::-1, :]
# Open NetCDF file
fid = Dataset(grid_name, "w")
# Create x- and y-dimensions
_ = fid.createDimension('x', len(x_array))
_ = fid.createDimension('y', len(y_array))
# Create variables using these dimensions
x_var = fid.createVariable('x', 'd', ('x',), zlib=True)
y_var = fid.createVariable('y', 'd', ('y',), zlib=True)
z_var = fid.createVariable('z', 'd', ('y', 'x'), zlib=True)
# Set variable values
x_var[:] = x_array.flatten()
y_var[:] = y_array.flatten()
z_var[:] = mesh
# Close file
fid.close()
[docs]
def tiff2grd(tiff: str, grd: str, x_correct: float = None, y_correct: float = None,
window: list | tuple | int = None, buffer: float | int = 100):
"""
Convert a GeoTIFF to a GMT NetCDF4 grid file.
Parameters
----------
tiff :
Path to the input GeoTIFF file.
grd :
Path for the output GMT ``.grd`` file.
x_correct :
Offset applied to x coordinates to correct for pixel vs. gridline
registration differences.
y_correct :
Offset applied to y coordinates (same purpose as ``x_correct``).
window :
Optional spatial subset passed to :func:`read_tiff`.
buffer :
Buffer added around the window bounds in CRS units.
Raises
------
AssertionError
If ``tiff`` does not exist.
"""
assert os.path.exists(tiff)
x, y, z = read_tiff(tiff, x_correct=x_correct, y_correct=y_correct, window=window, buffer=buffer)
write_gmt_grd(x, y, z, grd)
[docs]
def grd2tiff(grd: str, tiff: str, x_correct: float = None, y_correct: float = None):
"""
Convert a GMT NetCDF4 grid file to a GeoTIFF.
Parameters
----------
grd :
Path to the input GMT ``.grd`` file.
tiff :
Path for the output GeoTIFF file.
x_correct :
Offset applied to x coordinates after reading.
y_correct :
Offset applied to y coordinates after reading.
Raises
------
AssertionError
If ``grd`` does not exist.
"""
assert os.path.exists(grd)
# Read in grid
x, y, z = read_gmt_grid(grd)
# Correct if necessary
if x_correct is not None:
x_correct -= x_correct
print("Adjusting for GMT-Tiff difference")
if y_correct is not None:
y -= y_correct
write_tiff(tiff, x, y, z)
[docs]
def array_to_gmt(array: np.ndarray, out_file: str):
"""
Write a NumPy array to a plain-text GMT-compatible file.
Each row of ``array`` is written as a space-separated line of values
formatted to 4 decimal places.
Parameters
----------
array :
1-D or 2-D array to write. Must have at least 2 elements or
columns.
out_file :
Output file path.
Raises
------
AssertionError
If the last dimension of ``array`` has fewer than 2 elements.
"""
assert array.shape[-1] >= 2, "Need 2 or more elements or columns"
out_id = open(out_file, "w")
if array.ndim == 1:
out_str = "{:.4f} " * len(array) + "\n"
out_id.write(out_str.format(*array))
else:
out_str = "{:.4f} " * len(array[0, :]) + "\n"
for row in array:
out_id.write(out_str.format(*row))
out_id.close()
[docs]
def clip_tiff(in_tiff: str, out_tiff: str, window: str | list | tuple | int, buffer: float | int = 100):
"""
Clip a GeoTIFF to a spatial window and write the result.
Parameters
----------
in_tiff :
Path to the input GeoTIFF file.
out_tiff :
Path for the output clipped GeoTIFF.
window :
Spatial subset specifier: a path to a vector file or a 4-element
sequence ``[x_min, y_min, x_max, y_max]``.
buffer :
Extra margin added around the window bounds in CRS units.
Defaults to 100.
Raises
------
AssertionError
If ``in_tiff`` does not exist.
"""
assert os.path.exists(in_tiff)
x, y, z = read_tiff(in_tiff, window=window, buffer=buffer)
write_tiff(out_tiff, x, y, z)
[docs]
def reproject_tiff(in_raster: str, out_raster: str, dst_epsg: int = 4326,
window: str | list | tuple | int = None, buffer: float | int = 100, out_format="tiff"):
"""
Reproject a raster to a different coordinate reference system.
Reads the input raster, reprojects it to the target CRS using
nearest-neighbour resampling, and writes the result as a GeoTIFF
or GMT grid.
Parameters
----------
in_raster :
Path to the input raster file.
out_raster :
Path for the output reprojected file.
dst_epsg :
Target EPSG code. Defaults to 4326 (WGS84 geographic).
window :
Optional spatial subset applied before reprojection.
buffer :
Buffer added around the window bounds in CRS units.
out_format :
Output format: ``"tiff"`` (default) or ``"grd"``.
Raises
------
AssertionError
If ``out_format`` is not ``"tiff"`` or ``"grd"``.
Notes
-----
Based on the rasterio reprojection example from the rasterio
documentation.
"""
assert out_format in ("tiff", "grd")
dst_crs = "EPSG:{:d}".format(dst_epsg)
x, y, z, src_meta = read_tiff(in_raster, return_meta=True, window=window, buffer=buffer)
transform, width, height = calculate_default_transform(src_meta["crs"],
dst_crs,
src_meta["width"],
src_meta["height"], min(x), min(y), max(x), max(y))
kwargs = src_meta.copy()
kwargs.update({
'crs': dst_crs,
'transform': transform,
'width': width,
'height': height
})
new_z = np.zeros((height, width))
reproject(
source=z,
destination=new_z,
src_transform=src_meta["transform"],
src_crs=src_meta["crs"],
dst_transform=transform,
dst_crs=dst_crs,
resampling=Resampling.nearest)
origin_x = transform[2]
origin_y = transform[5]
pixel_width = transform[0]
pixel_height = transform[4]
# Width and height of raster in pixels
cols = width
rows = height
# Turn into X and Y coordinates
new_x = origin_x + pixel_width * np.arange(cols)
new_y = origin_y + pixel_height * np.arange(rows)
if out_format == "tiff":
write_tiff(out_raster, new_x, new_y, new_z, epsg=dst_epsg)
else:
write_gmt_grd(new_x, new_y, new_z, out_raster)