Source code for rsqsim_api.io.array_operations

"""Raster and grid I/O utilities for GeoTIFF and GMT NetCDF formats."""
import geopandas as gpd
import numpy as np
import os
from netCDF4 import Dataset
import rasterio
from rasterio.warp import calculate_default_transform, reproject, Resampling


[docs] def read_grid(raster_file: str, nan_threshold: float | int = 100, window: str | list | tuple | np.ndarray = None, buffer: float | int = 100, return_meta: bool = False): """ Read a raster file into coordinate and value arrays. Supports optional spatial windowing by bounding box or shapefile extent. Values with absolute magnitude above ``nan_threshold`` are replaced with NaN. Parameters ---------- raster_file : Path to the raster file (any format supported by rasterio). nan_threshold : Values with ``|z| > nan_threshold`` are set to NaN. Defaults to 100. window : Optional spatial subset. May be a path to a vector file (whose bounding box is used), or a 4-element sequence ``[x_min, y_min, x_max, y_max]`` in the raster's CRS. buffer : Extra margin (in CRS units) added around the window bounds. Defaults to 100. return_meta : If ``True``, also return the rasterio metadata dict updated to reflect the windowed read. Returns ------- x : numpy.ndarray 1-D array of easting (column-centre) coordinates. y : numpy.ndarray 1-D array of northing (row-centre) coordinates. z : numpy.ndarray of shape (len(y), len(x)) Data values with out-of-range entries set to NaN. geo_meta : dict, optional Rasterio metadata dict (only returned when ``return_meta=True``). Raises ------ AssertionError If ``raster_file`` does not exist, the window sequence does not have exactly 4 elements, or ``x_max <= x_min`` / ``y_max <= y_min``. """ assert os.path.exists(raster_file) raster = rasterio.open(raster_file) no_data = raster.nodata if window is not None: if isinstance(window, str): assert os.path.exists(window) shp = gpd.GeoDataFrame.from_file(window) x1, y1, x2, y2 = np.array(shp.bounds)[0] else: assert len(window) == 4 x1, y1, x2, y2 = window[:] assert all([x2 > x1, y2 > y1]) buffer_value = 0 if buffer is None else buffer if x1 < raster.bounds.left: x1 = np.round(raster.bounds.left + 1) lowest = min([raster.bounds.bottom, raster.bounds.top]) if y1 < lowest: y1 = np.round(lowest + -1) if raster.bounds.top < raster.bounds.bottom: window_object = rasterio.windows.from_bounds(x1 - buffer_value, y2 + buffer_value, x2 + buffer_value, y1 - buffer_value, transform=raster.transform) else: window_object = rasterio.windows.from_bounds(x1 - buffer_value, y1 - buffer_value, x2 + buffer_value, y2 + buffer_value, transform=raster.transform) z = raster.read(1, window=window_object) geo_transform = raster.window_transform(window_object) else: z = raster.read(1) # Get affine transform from raster geo_transform = raster.transform if return_meta: geo_meta = raster.meta.copy() geo_meta.update({"width": z.shape[1], "height": z.shape[0], "transform": geo_transform}) else: geo_meta = None # extract origin, pixel width and pixel height from affine transform origin_x = geo_transform[2] origin_y = geo_transform[5] pixel_width = geo_transform[0] pixel_height = geo_transform[4] # Width and height of raster in pixels cols = z.shape[1] rows = z.shape[0] # Turn into X and Y coordinates x = origin_x + pixel_width * np.arange(cols) y = origin_y + pixel_height * np.arange(rows) # Set values above threshold to NaN if no_data != np.nan: z[z < -nan_threshold] = np.NaN z[z > nan_threshold] = np.NaN # Close raster raster.close() if return_meta: return x, y, z, geo_meta else: return x, y, z
[docs] def read_tiff(tiff: str, x_correct: float = None, y_correct: float = None, nan_threshold: float | int = 9000, make_y_ascending: bool = False, window: str | list | tuple | np.ndarray = None, buffer: float | int=100, return_meta: bool = False): """ Read a GeoTIFF file into coordinate and value arrays. Thin wrapper around :func:`read_grid` with GeoTIFF-specific defaults and optional coordinate offset correction. Parameters ---------- tiff : Path to the GeoTIFF file. x_correct : Offset added to x coordinates to correct for pixel vs. gridline registration differences (GMT convention). y_correct : Offset added to y coordinates (same purpose as ``x_correct``). nan_threshold : Values with ``|z| > nan_threshold`` are set to NaN. Defaults to 9000 (suitable for elevation data in metres). make_y_ascending : If ``True`` and ``y[0] > y[-1]``, reverse y and the corresponding rows of z so that y increases monotonically. window : Optional spatial subset passed to :func:`read_grid`. buffer : Buffer added around the window bounds in CRS units. return_meta : If ``True``, also return the rasterio metadata dict. Returns ------- x : numpy.ndarray 1-D easting coordinate array. y : numpy.ndarray 1-D northing coordinate array. z : numpy.ndarray of shape (len(y), len(x)) Data values. geo_meta : dict, optional Rasterio metadata (only returned when ``return_meta=True``). Raises ------ AssertionError If ``tiff`` does not exist. """ assert os.path.exists(tiff) # Read grid into x, y ,z if return_meta: x, y, z, geo_meta = read_grid(tiff, nan_threshold=nan_threshold, window=window, buffer=buffer, return_meta=return_meta) else: x, y, z = read_grid(tiff, nan_threshold=nan_threshold, window=window, buffer=buffer) geo_meta = None if make_y_ascending and y[0] > y[-1]: y = y[::-1] z = z[::-1, :] # If necessary, sort out problems with pixel vs. gridline registration (need to supply values) if x_correct is not None: x += x_correct print("Adjusting for GMT-Tiff difference") if y_correct is not None: y += y_correct if return_meta: return x, y, z, geo_meta else: return x, y, z
[docs] def read_gmt_grid(grd_file: str): """ Read a GMT-compatible NetCDF4 grid file. Reads variables named ``x``, ``y``, and ``z`` from the file and fills any masked values with NaN. Faster than using rasterio for NetCDF grids. Parameters ---------- grd_file : Path to the GMT ``.grd`` NetCDF4 file. Returns ------- x : numpy.ndarray 1-D array of x (easting) coordinate values. y : numpy.ndarray 1-D array of y (northing) coordinate values. z : numpy.ndarray of shape (len(y), len(x)) Grid data values; masked values replaced with NaN. Raises ------ AssertionError If ``grd_file`` does not exist or if the file does not contain variables named ``x``, ``y``, and ``z``. """ assert os.path.exists(grd_file) # Open dataset and check that x,y,z are variable names fid = Dataset(grd_file, "r") assert all([a in fid.variables for a in ("x", "y", "z")]) xm, ym, zm = [fid.variables[a][:] for a in ("x", "y", "z")] fid.close() # Check arrays are either numpy arrays or masked arrays for a in (xm, ym, zm): assert any([isinstance(a, b) for b in (np.ma.MaskedArray, np.ndarray)]) x = xm if isinstance(xm, np.ndarray) else xm.filled(fill_value=np.nan) y = ym if isinstance(ym, np.ndarray) else ym.filled(fill_value=np.nan) z = zm if isinstance(zm, np.ndarray) else zm.filled(fill_value=np.nan) return np.array(x), np.array(y), np.array(z)
[docs] def write_tiff(filename: str, x: np.ndarray, y: np.ndarray, z: np.ndarray, epsg: int = 2193, reverse_y: bool = False, compress_lzw: bool = True): """ Write coordinate and data arrays to a GeoTIFF file. Parameters ---------- filename : Output GeoTIFF file path. x : 1-D array of easting coordinates (column centres). y : 1-D array of northing coordinates (row centres). z : numpy.ndarray of shape (len(y), len(x)) Data values to write to band 1. epsg : EPSG code for the output coordinate reference system. Defaults to 2193 (NZTM2000). reverse_y : If ``True``, write the GeoTIFF with y decreasing (top-to-bottom), as preferred by some GIS applications. compress_lzw : If ``True`` (default), apply LZW compression. Raises ------ AssertionError If ``x`` or ``y`` are not 1-D, or if ``z.shape != (len(y), len(x))``. """ # Check data have correct dimensions assert all([a.ndim == 1 for a in (x, y)]) assert z.shape == (len(y), len(x)) # Change into y-ascending format (reverse_y option changes it back later) if y[0] > y[-1]: y = y[::-1] z = z[::-1, :] # To allow writing in correct format z = np.array(z, dtype=np.float64) # Calculate x and y spacing x_spacing = (max(x) - min(x)) / (len(x) - 1) y_spacing = (max(y) - min(y)) / (len(y) - 1) # Set affine transform from x and y if reverse_y: # Sometimes GIS prefer y values to descend. transform = rasterio.transform.Affine(x_spacing, 0., min(x), 0., -1 * y_spacing, max(y)) else: transform = rasterio.transform.Affine(x_spacing, 0., min(x), 0., y_spacing, min(y)) # create tiff profile (no. bands, data type etc.) profile = rasterio.profiles.DefaultGTiffProfile(count=1, dtype=np.float64, transform=transform, width=len(x), height=len(y)) # Set coordinate system if specified if epsg is not None: if epsg not in [2193, 4326, 32759, 32760, 27200]: print("EPSG:{:d} Not a recognised NZ coordinate system...".format(epsg)) print("Writing anyway...") crs = rasterio.crs.CRS.from_epsg(epsg) profile["crs"] = crs # Add compression if desired if compress_lzw: profile["compress"] = "lzw" # Open raster file for writing fid = rasterio.open(filename, "w", **profile) # Write z to band one (depending whether y ascending/descending required). if reverse_y: fid.write(z[-1::-1], 1) else: fid.write(z, 1) # Close file fid.close()
[docs] def write_gmt_grd(x_array: np.ndarray, y_array: np.ndarray, mesh: np.ndarray, grid_name: str): """ Write coordinate and data arrays to a GMT-compatible NetCDF4 grid file. Parameters ---------- x_array : 1-D array of x (easting) coordinate values. y_array : 1-D array of y (northing) coordinate values. mesh : numpy.ndarray of shape (len(y_array), len(x_array)) Data values (z) to store in the ``z`` variable. grid_name : Output file path, typically ending in ``.grd``. Raises ------ AssertionError If ``x_array`` or ``y_array`` are not 1-D, or if ``mesh.shape != (len(y_array), len(x_array))``. """ assert all([a.ndim == 1 for a in (x_array, y_array)]) assert mesh.shape == (len(y_array), len(x_array)) # Reverse y axis if necessary if y_array[0] > y_array[-1]: y_array = y_array[::-1] mesh = mesh[::-1, :] # Open NetCDF file fid = Dataset(grid_name, "w") # Create x- and y-dimensions _ = fid.createDimension('x', len(x_array)) _ = fid.createDimension('y', len(y_array)) # Create variables using these dimensions x_var = fid.createVariable('x', 'd', ('x',), zlib=True) y_var = fid.createVariable('y', 'd', ('y',), zlib=True) z_var = fid.createVariable('z', 'd', ('y', 'x'), zlib=True) # Set variable values x_var[:] = x_array.flatten() y_var[:] = y_array.flatten() z_var[:] = mesh # Close file fid.close()
[docs] def tiff2grd(tiff: str, grd: str, x_correct: float = None, y_correct: float = None, window: list | tuple | int = None, buffer: float | int = 100): """ Convert a GeoTIFF to a GMT NetCDF4 grid file. Parameters ---------- tiff : Path to the input GeoTIFF file. grd : Path for the output GMT ``.grd`` file. x_correct : Offset applied to x coordinates to correct for pixel vs. gridline registration differences. y_correct : Offset applied to y coordinates (same purpose as ``x_correct``). window : Optional spatial subset passed to :func:`read_tiff`. buffer : Buffer added around the window bounds in CRS units. Raises ------ AssertionError If ``tiff`` does not exist. """ assert os.path.exists(tiff) x, y, z = read_tiff(tiff, x_correct=x_correct, y_correct=y_correct, window=window, buffer=buffer) write_gmt_grd(x, y, z, grd)
[docs] def grd2tiff(grd: str, tiff: str, x_correct: float = None, y_correct: float = None): """ Convert a GMT NetCDF4 grid file to a GeoTIFF. Parameters ---------- grd : Path to the input GMT ``.grd`` file. tiff : Path for the output GeoTIFF file. x_correct : Offset applied to x coordinates after reading. y_correct : Offset applied to y coordinates after reading. Raises ------ AssertionError If ``grd`` does not exist. """ assert os.path.exists(grd) # Read in grid x, y, z = read_gmt_grid(grd) # Correct if necessary if x_correct is not None: x_correct -= x_correct print("Adjusting for GMT-Tiff difference") if y_correct is not None: y -= y_correct write_tiff(tiff, x, y, z)
[docs] def array_to_gmt(array: np.ndarray, out_file: str): """ Write a NumPy array to a plain-text GMT-compatible file. Each row of ``array`` is written as a space-separated line of values formatted to 4 decimal places. Parameters ---------- array : 1-D or 2-D array to write. Must have at least 2 elements or columns. out_file : Output file path. Raises ------ AssertionError If the last dimension of ``array`` has fewer than 2 elements. """ assert array.shape[-1] >= 2, "Need 2 or more elements or columns" out_id = open(out_file, "w") if array.ndim == 1: out_str = "{:.4f} " * len(array) + "\n" out_id.write(out_str.format(*array)) else: out_str = "{:.4f} " * len(array[0, :]) + "\n" for row in array: out_id.write(out_str.format(*row)) out_id.close()
[docs] def clip_tiff(in_tiff: str, out_tiff: str, window: str | list | tuple | int, buffer: float | int = 100): """ Clip a GeoTIFF to a spatial window and write the result. Parameters ---------- in_tiff : Path to the input GeoTIFF file. out_tiff : Path for the output clipped GeoTIFF. window : Spatial subset specifier: a path to a vector file or a 4-element sequence ``[x_min, y_min, x_max, y_max]``. buffer : Extra margin added around the window bounds in CRS units. Defaults to 100. Raises ------ AssertionError If ``in_tiff`` does not exist. """ assert os.path.exists(in_tiff) x, y, z = read_tiff(in_tiff, window=window, buffer=buffer) write_tiff(out_tiff, x, y, z)
[docs] def reproject_tiff(in_raster: str, out_raster: str, dst_epsg: int = 4326, window: str | list | tuple | int = None, buffer: float | int = 100, out_format="tiff"): """ Reproject a raster to a different coordinate reference system. Reads the input raster, reprojects it to the target CRS using nearest-neighbour resampling, and writes the result as a GeoTIFF or GMT grid. Parameters ---------- in_raster : Path to the input raster file. out_raster : Path for the output reprojected file. dst_epsg : Target EPSG code. Defaults to 4326 (WGS84 geographic). window : Optional spatial subset applied before reprojection. buffer : Buffer added around the window bounds in CRS units. out_format : Output format: ``"tiff"`` (default) or ``"grd"``. Raises ------ AssertionError If ``out_format`` is not ``"tiff"`` or ``"grd"``. Notes ----- Based on the rasterio reprojection example from the rasterio documentation. """ assert out_format in ("tiff", "grd") dst_crs = "EPSG:{:d}".format(dst_epsg) x, y, z, src_meta = read_tiff(in_raster, return_meta=True, window=window, buffer=buffer) transform, width, height = calculate_default_transform(src_meta["crs"], dst_crs, src_meta["width"], src_meta["height"], min(x), min(y), max(x), max(y)) kwargs = src_meta.copy() kwargs.update({ 'crs': dst_crs, 'transform': transform, 'width': width, 'height': height }) new_z = np.zeros((height, width)) reproject( source=z, destination=new_z, src_transform=src_meta["transform"], src_crs=src_meta["crs"], dst_transform=transform, dst_crs=dst_crs, resampling=Resampling.nearest) origin_x = transform[2] origin_y = transform[5] pixel_width = transform[0] pixel_height = transform[4] # Width and height of raster in pixels cols = width rows = height # Turn into X and Y coordinates new_x = origin_x + pixel_width * np.arange(cols) new_y = origin_y + pixel_height * np.arange(rows) if out_format == "tiff": write_tiff(out_raster, new_x, new_y, new_z, epsg=dst_epsg) else: write_gmt_grd(new_x, new_y, new_z, out_raster)