Source code for rsqsim_api.tsunami.gf_netcdf

"""
Utilities for reading and computing Green's-function sea-surface displacement (SSD) from netCDF files.

Provides :class:`LookupPatch` for indexing individual patch Green's
functions within multi-file netCDF datasets, and functions for
computing and saving per-event SSD grids using superposition of patch
responses.
"""
import numpy as np
import netCDF4
from glob import glob
from concurrent.futures import ThreadPoolExecutor


[docs] class LookupPatch: """ Index entry mapping a fault patch to its location within a netCDF Green's-function dataset. Attributes ---------- patch_index : int Global patch identifier. dset : netCDF4.Dataset Open netCDF4 dataset containing the Green's-function data for this patch. dset_index : int Local index of this patch within ``dset``. """ def __init__(self, patch_index: int, dset: netCDF4.Dataset, dset_index: int): """ Parameters ---------- patch_index : int Global patch identifier. dset : netCDF4.Dataset Open netCDF4 dataset. dset_index : int Local index of the patch within ``dset``. """
[docs] self.patch_index = patch_index
[docs] self.dset = dset
[docs] self.dset_index = dset_index
[docs] def create_lookup_dict(search_string: str): """ Build a patch-lookup dictionary from a set of netCDF Green's-function files. Opens all files matching ``search_string`` and constructs a mapping from global patch index to :class:`LookupPatch` objects. Parameters ---------- search_string : str Glob pattern for the netCDF files, e.g. ``"/path/to/gf_files/*.nc"``. Returns ------- dict Mapping of patch index (int) to :class:`LookupPatch`. """ lookup_dict = {} nc_list = [] files = glob(search_string) for fname in files: nc_list.append(netCDF4.Dataset(fname, "r")) for dset in nc_list: for local_i, patch_i in enumerate(dset["index"][:]): lookup_dict[patch_i] = LookupPatch(patch_i, dset, local_i) return lookup_dict
[docs] def sea_surface_displacements(input_tuple): """ Compute the sea-surface displacement grid for a single event. Superposes the per-patch Green's functions scaled by the event's patch slip values. Returns NaN-filled arrays for events whose patches are not all present in the lookup dictionary. Parameters ---------- input_tuple : tuple ``(event, lookup)`` where ``event`` is an :class:`~rsqsim_api.catalogue.event.RsqSimEvent` and ``lookup`` is a dict mapping patch index to :class:`LookupPatch`. Returns ------- event_id : int The event identifier. disp : numpy.ndarray Sea-surface displacement grid (shape matches the Green's function grid). Filled with ``NaN`` if any patch is missing from the lookup. """ event, lookup = input_tuple event_id = event.event_id disp_shape = lookup[0].dset["ssd"][0].data.shape disp = np.zeros(disp_shape) try: for slip, patch in zip(event.patch_slip, event.patches): patch_gf = lookup[patch.patch_number] disp += slip * patch_gf.dset["ssd"][patch_gf.dset_index] except KeyError: disp = np.ones(disp_shape) * np.nan return event_id, disp
[docs] def sea_surface_displacements_multi(event_ls: list, lookup: dict, out_netcdf: str, num_processes: int = None): """ Compute and save sea-surface displacements for multiple events to a netCDF file. Uses a :class:`~concurrent.futures.ThreadPoolExecutor` to compute displacements in parallel, then writes the results to a new netCDF file with dimensions ``(event_id, y, x)``. Parameters ---------- event_ls : list List of :class:`~rsqsim_api.catalogue.event.RsqSimEvent` objects. lookup : dict Patch-lookup dictionary from :func:`create_lookup_dict`. out_netcdf : str Output netCDF file path. num_processes : int or None, optional Maximum number of worker threads. Defaults to ``None`` (determined by :class:`~concurrent.futures.ThreadPoolExecutor` default). """ with ThreadPoolExecutor(max_workers=num_processes) as executor: results = executor.map(sea_surface_displacements, [(event, lookup) for event in event_ls]) template_dset = lookup[0].dset x_data = template_dset["x"][:] y_data = template_dset["y"][:] event_id_ls = [event.event_id for event in event_ls] out_dset = netCDF4.Dataset(out_netcdf, "w") out_dset.set_always_mask(False) for dim, dim_len in zip(["x", "y", "event_id"], [x_data.size, y_data.size, len(event_ls)]): out_dset.createDimension(dim, dim_len) out_dset.createVariable("x", np.float32, ("x",)) out_dset.createVariable("y", np.float32, ("y",)) out_dset["x"][:] = x_data out_dset["y"][:] = y_data out_dset.createVariable("event_id", int, ("event_id",)) out_dset.createVariable("ssd", np.float32, ("event_id", "y", "x"), zlib=True) result_list = list(results) result_id_ls = [result[0] for result in result_list] num_events = len(event_id_ls) for i, event_id in enumerate(event_id_ls): print(f"{i}/{num_events}") event_index = result_id_ls.index(event_id) event_disp = result_list[event_index][1] out_dset["event_id"][i] = event_id out_dset["ssd"][i] = event_disp out_dset.close()