Source code for rsqsim_api.catalogue.event

"""
RSQSim earthquake event representation.

Provides :class:`RsqSimEvent`, which stores the catalogue parameters
(time, magnitude, location) and the per-patch slip distribution for a
single simulated rupture, together with methods for computing derived
quantities and exporting slip distributions in a variety of formats.

Also provides :class:`OpenQuakeMultiSquareRupture` for packaging events
as multi-planar ruptures compatible with the OpenQuake engine.
"""
import fnmatch
import json
import operator
import os
import pickle
import xml.etree.ElementTree as ElemTree
from collections import defaultdict
from collections.abc import Iterable
from math import isclose
from string import digits
from xml.dom import minidom

import geopandas as gpd
import meshio
import numpy as np
import pandas as pd
from matplotlib import cm, colors
from matplotlib import pyplot as plt
from matplotlib.animation import FuncAnimation, PillowWriter, FFMpegWriter
from matplotlib.widgets import Slider
from mpl_toolkits.axes_grid1 import make_axes_locatable
from pyproj import Transformer
from shapely.geometry import LineString, Polygon, MultiPolygon, Point
from shapely.ops import unary_union
from scipy.spatial import KDTree

from rsqsim_api.fault.multifault import RsqSimMultiFault
from rsqsim_api.fault.patch import OpenQuakeRectangularPatch
from rsqsim_api.io.bruce_shaw_utilities import bruce_subduction
from rsqsim_api.io.mesh_utils import array_to_mesh, quads_to_vtk
from rsqsim_api.visualisation.utilities import plot_coast, plot_background
from rsqsim_api.catalogue.utilities import weighted_circular_mean, m0_to_mw

[docs] transformer_nztm2wgs = Transformer.from_crs(2193, 4326, always_xy=True)
[docs] class RsqSimEvent: """ A single earthquake event from an RSQSim catalogue. Stores basic seismic catalogue parameters and, when populated, the patch-level slip distribution including patch objects, slip magnitudes, and rupture timing. Attributes ---------- event_id : int or None Unique integer identifier for the event in the catalogue. t0 : float or None Origin time in seconds from the start of the simulation. m0 : float or None Scalar seismic moment in N·m. mw : float or None Moment magnitude. x, y, z : float or None Hypocentre coordinates in NZTM (metres, EPSG:2193). ``z`` is negative downward. area : float or None Total rupture area in m². dt : float or None Rupture duration in seconds. patches : list or None List of :class:`~rsqsim_api.fault.patch.RsqSimTriangularPatch` objects that slipped in this event. patch_slip : numpy.ndarray or None Slip magnitude (m) for each entry in ``patches``. faults : list or None Unique :class:`~rsqsim_api.fault.segment.RsqSimSegment` objects involved in this event. patch_time : numpy.ndarray or None Absolute rupture time (s) for each patch. patch_numbers : numpy.ndarray or None Global patch IDs corresponding to ``patches``. length : float or None Estimated rupture length (m) along fault traces. """ def __init__(self): """ Initialise an empty RsqSimEvent. All attributes are set to ``None``. Use the class methods :meth:`from_catalogue_array` or :meth:`from_earthquake_list` to construct a populated event. """ # Event ID
[docs] self.event_id = None
# Origin time
[docs] self.t0 = None
# Seismic moment and mw
[docs] self.m0 = None
[docs] self.mw = None
# Hypocentre location self.x, self.y, self.z = (None,) * 3 # Rupture area
[docs] self.area = None
# Rupture duration
[docs] self.dt = None
# Parameters for slip distributions
[docs] self.patches = None
[docs] self.patch_slip = None
[docs] self.faults = None
[docs] self.patch_time = None
[docs] self.patch_numbers = None
self._mean_slip = None
[docs] self.length = None
self._mean_strike = None self._mean_strike_180 = None self._mean_dip = None self._mean_rake = None self._first_fault = None @property
[docs] def num_faults(self): """Number of unique fault segments involved in this event.""" return len(self.faults)
@property
[docs] def bounds(self): """ Axis-aligned bounding box of the rupture in NZTM coordinates. Returns ------- list of float ``[x_min, y_min, x_max, y_max]`` in metres (NZTM). """ x1 = min([min(fault.vertices[:, 0]) for fault in self.faults]) y1 = min([min(fault.vertices[:, 1]) for fault in self.faults]) x2 = max([max(fault.vertices[:, 0]) for fault in self.faults]) y2 = max([max(fault.vertices[:, 1]) for fault in self.faults]) return [x1, y1, x2, y2]
@property
[docs] def exterior(self): """ Shapely geometry representing the union of all ruptured patch polygons. Returns ------- shapely.geometry.base.BaseGeometry Union of all patch outlines as a Shapely geometry object. """ return unary_union([patch.as_polygon() for patch in self.patches])
@property
[docs] def mean_slip(self): """Mean slip (m) averaged over all ruptured patches.""" return self._mean_slip
@property
[docs] def mean_strike(self): """Mean strike (degrees, 0–360) averaged over all ruptured patches.""" return self._mean_strike
@property
[docs] def mean_strike_180(self): """Mean strike (degrees, 0–180) averaged over all ruptured patches.""" return self._mean_strike_180
@property
[docs] def mean_dip(self): """Mean dip (degrees) averaged over all ruptured patches.""" return self._mean_dip
@property
[docs] def mean_rake(self): """Mean rake (degrees) averaged over all ruptured patches.""" return self._mean_rake
[docs] def find_first_fault(self, fault_model: RsqSimMultiFault, name: bool = True): """ Return the fault that first ruptured in this event. Parameters ---------- fault_model : RsqSimMultiFault Fault model used to look up patch-to-fault associations. name : bool, optional If ``True`` (default), return the fault name string. If ``False``, return the fault segment object. Returns ------- str or RsqSimSegment The name (or object) of the fault that hosted the first ruptured patch. """ first_patch = self.patches[np.where(self.patch_time == np.min(self.patch_time))[0][0]] first_fault = fault_model.faults_with_patches[first_patch.patch_number] if name: return first_fault.name else: return first_fault
@classmethod
[docs] def from_catalogue_array(cls, t0: float, m0: float, mw: float, x: float, y: float, z: float, area: float, dt: float, event_id: int = None): """ Construct an event from basic catalogue parameters. Creates an event with catalogue-level metadata only (no patch slip distribution). Parameters ---------- t0 : float Origin time in seconds from the simulation start. m0 : float Scalar seismic moment in N·m. mw : float Moment magnitude. x : float Hypocentre easting in NZTM (m). y : float Hypocentre northing in NZTM (m). z : float Hypocentre depth in NZTM (m, negative downward). area : float Total rupture area in m². dt : float Rupture duration in seconds. event_id : int, optional Unique catalogue event ID. Returns ------- RsqSimEvent Event populated with catalogue parameters; patch attributes remain ``None``. """ event = cls() event.t0, event.m0, event.mw, event.x, event.y, event.z = [t0, m0, mw, x, y, z] event.area, event.dt = [area, dt] event.event_id = event_id return event
@property
[docs] def patch_outline_gs(self): """ GeoSeries of individual patch outlines in NZTM (EPSG:2193). Returns ------- geopandas.GeoSeries One Shapely polygon per ruptured patch, with CRS set to EPSG:2193 (NZTM). """ return gpd.GeoSeries([patch.as_polygon() for patch in self.patches], crs=2193)
@classmethod
[docs] def from_earthquake_list(cls, t0: float, m0: float, mw: float, x: float, y: float, z: float, area: float, dt: float, patch_numbers: list | np.ndarray | tuple, patch_slip: list | np.ndarray | tuple, patch_time: list | np.ndarray | tuple, fault_model: RsqSimMultiFault, filter_single_patches: bool = True, min_patches: int = 10, min_slip: float | int = 1, event_id: int = None): """ Construct an event from catalogue parameters and patch slip data. Filters out fault segments that contribute fewer than ``min_patches`` patches, then resolves patch IDs to patch objects and fault objects using ``fault_model``. Parameters ---------- t0 : float Origin time in seconds. m0 : float Scalar seismic moment in N·m. mw : float Moment magnitude. x : float Hypocentre easting in NZTM (m). y : float Hypocentre northing in NZTM (m). z : float Hypocentre depth in NZTM (m, negative downward). area : float Total rupture area in m². dt : float Rupture duration in seconds. patch_numbers : array-like Global patch IDs that slipped in this event. patch_slip : array-like Slip magnitude (m) for each entry in ``patch_numbers``. patch_time : array-like Absolute rupture time (s) for each entry in ``patch_numbers``. fault_model : RsqSimMultiFault Fault model providing the patch and fault dictionaries. filter_single_patches : bool, optional Unused; retained for API compatibility. min_patches : int, optional Minimum number of patches a fault must contribute to be retained. Defaults to 10. min_slip : float or int, optional Unused; retained for API compatibility. event_id : int, optional Unique catalogue event ID. Returns ------- RsqSimEvent Fully populated event including patch slip distribution and fault objects. """ event = cls.from_catalogue_array( t0, m0, mw, x, y, z, area, dt, event_id=event_id) faults_with_patches = fault_model.faults_with_patches patches_on_fault = defaultdict(list) [patches_on_fault[faults_with_patches[i]].append(i) for i in patch_numbers] mask = np.full(len(patch_numbers), True) for fault in patches_on_fault.keys(): patches_on_this_fault = patches_on_fault[fault] if len(patches_on_this_fault) < min_patches: # Finds the indices of values in patches_on_this_fault in patch_numbers patch_on_fault_indices = np.searchsorted(patch_numbers, patches_on_this_fault) mask[patch_on_fault_indices] = False event.patch_numbers = patch_numbers[mask] event.patch_slip = patch_slip[mask] event.patch_time = patch_time[mask] if event.patch_numbers.size > 1: patchnum_lookup = operator.itemgetter(*(event.patch_numbers)) event.patches = list(patchnum_lookup(fault_model.patch_dic)) event.faults = list(set(patchnum_lookup(fault_model.faults_with_patches))) elif event.patch_numbers.size == 1: event.patches = [fault_model.patch_dic[event.patch_numbers[0]]] event.faults = [fault_model.faults_with_patches[event.patch_numbers[0]]] else: event.patches = [] event.faults = [] print( f"Event {event_id} doesn't rupture more than {min_patches} patches on any fault. \n Decrease min_patches if you want a fault + patches returned.") return event
@classmethod
[docs] def from_multiprocessing(cls, t0: float, m0: float, mw: float, x: float, y: float, z: float, area: float, dt: float, patch_numbers: list | np.ndarray | tuple, patch_slip: list | np.ndarray | tuple, patch_time: list | np.ndarray | tuple, fault_model: RsqSimMultiFault, mask: list, event_id: int = None): """ Construct an event from pre-computed multiprocessing results. Similar to :meth:`from_earthquake_list`, but accepts a boolean ``mask`` that has already been applied to filter patches, rather than recomputing it here. Parameters ---------- t0 : float Origin time in seconds. m0 : float Scalar seismic moment in N·m. mw : float Moment magnitude. x : float Hypocentre easting in NZTM (m). y : float Hypocentre northing in NZTM (m). z : float Hypocentre depth in NZTM (m, negative downward). area : float Total rupture area in m². dt : float Rupture duration in seconds. patch_numbers : array-like Global patch IDs that slipped in this event. patch_slip : array-like Slip magnitude (m) for each entry in ``patch_numbers``. patch_time : array-like Absolute rupture time (s) for each entry in ``patch_numbers``. fault_model : RsqSimMultiFault Fault model providing patch and fault dictionaries. mask : list of bool Boolean mask selecting the patches to retain from ``patch_numbers``. event_id : int, optional Unique catalogue event ID. Returns ------- RsqSimEvent Fully populated event. """ event = cls.from_catalogue_array( t0, m0, mw, x, y, z, area, dt, event_id=event_id) event.patch_numbers = patch_numbers[mask] event.patch_slip = patch_slip[mask] event.patch_time = patch_time[mask] if event.patch_numbers.size > 0: patchnum_lookup = operator.itemgetter(*(event.patch_numbers)) event.patches = list(patchnum_lookup(fault_model.patch_dic)) event.faults = list(set(patchnum_lookup(fault_model.faults_with_patches))) else: event.patches = [] event.faults = [] return event
[docs] def sub_events_by_fault(self, fault_model: RsqSimMultiFault, min_slip: float = 0.1) -> list["RsqSimEvent"]: """ Split the event into per-fault sub-events. For each fault involved in the event, a new :class:`RsqSimEvent` is created containing only the patches on that fault. Faults where no patch has slip >= ``min_slip`` are skipped. Parameters ---------- fault_model : RsqSimMultiFault Fault model providing patch and fault dictionaries. min_slip : float, optional Minimum slip threshold (m) for a patch to be included in a sub-event. Defaults to 0.1 m. Returns ------- list of RsqSimEvent One event per fault that contributed qualifying slip. """ m0s = self.make_fault_moment_dict(fault_model=fault_model, mu=3.0e10, by_cfm_names=False) subevents = [] for fault in self.faults: fault_patches = np.array(list(fault.patch_dic.keys())) fault_patch_numbers = self.patch_numbers[np.isin(self.patch_numbers, fault_patches)] fault_patch_slip = self.patch_slip[np.isin(self.patch_numbers, fault_patch_numbers)] fault_patch_time = self.patch_time[np.isin(self.patch_numbers, fault_patch_numbers)] fault_t0 = self.t0 + np.min(fault_patch_time) fault_dt = np.max(fault_patch_time) - np.min(fault_patch_time) fault_m0 = m0s[fault.name] fault_mw = m0_to_mw(fault_m0) fault_area = np.sum([fault.patch_dic[number].area for number in fault_patch_numbers]) fault_first_patch = fault_patch_numbers[np.where(fault_patch_time == np.min(fault_patch_time))[0][0]] fault_first_patch = fault_model.patch_dic[fault_first_patch] fault_x, fault_y, fault_z = fault_first_patch.centre # Check if any slip is above the minimum slip threshold if np.any(fault_patch_slip >= min_slip): subevent = RsqSimEvent.from_earthquake_list( fault_t0, fault_m0, fault_mw, fault_x, fault_y, fault_z, fault_area, fault_dt, fault_patch_numbers, fault_patch_slip, fault_patch_time, fault_model=fault_model, event_id=self.event_id) subevents.append(subevent) return subevents
[docs] def find_mean_slip(self): """ Compute and cache the mean slip across all ruptured patches. Sets :attr:`_mean_slip` (accessed via :attr:`mean_slip`) to the arithmetic mean of ``patch_slip``. Does nothing if no patches are loaded. """ if self.patches: total_slip = np.sum(self.patch_slip) npatches = len(self.patches) if all([total_slip > 0., npatches > 0]): self._mean_slip = total_slip / npatches
[docs] def find_mean_strike(self): """ Compute and cache the arithmetic mean strike (0–360°). Sets :attr:`_mean_strike` (accessed via :attr:`mean_strike`). Does nothing if no patches are loaded. """ if self.patches: cumstrike = 0. for patch in self.patches: cumstrike += patch.strike npatches = len(self.patches) if npatches > 0: self._mean_strike = cumstrike / npatches
[docs] def find_mean_strike_180(self): """ Compute and cache the mean strike folded into the range 0–180°. Strikes in 180–360° are reduced by 180° before averaging, so that conjugate orientations are treated as equivalent. Sets :attr:`_mean_strike_180` (accessed via :attr:`mean_strike_180`). Does nothing if no patches are loaded. """ if self.patches: cumstrike = 0. for patch in self.patches: strike = patch.strike if 0.<= strike <180: cumstrike += strike else: assert(0.<= strike - 180 < 180), "strike not in range 0 - 360" cumstrike += (strike - 180.) npatches = len(self.patches) if npatches > 0: self._mean_strike_180 = cumstrike / npatches
[docs] def find_mean_dip(self): """ Compute and cache the arithmetic mean dip (degrees). Sets :attr:`_mean_dip` (accessed via :attr:`mean_dip`). Does nothing if no patches are loaded. """ if self.patches: cumdip = 0. npatches = len(self.patches) for patch in self.patches: cumdip += patch.dip if npatches > 0: self._mean_dip = cumdip / npatches
[docs] def find_mean_rake(self): """ Compute and cache the arithmetic mean rake (degrees). Sets :attr:`_mean_rake` (accessed via :attr:`mean_rake`). Does nothing if no patches are loaded. """ if self.patches: cumrake = 0. for patch in self.patches: cumrake += patch.rake npatches = len(self.patches) if npatches > 0: self._mean_rake = cumrake / npatches
[docs] def find_length(self, min_slip_percentile: float | None = None): """ Estimate and cache the rupture length along fault surface traces. For each fault, the range of distances of patch centroids projected onto the fault trace is summed to give a cumulative rupture length. Sets :attr:`length`. Parameters ---------- min_slip_percentile : float or None, optional Unused; reserved for future filtering by slip percentile. """ if self.patches: rupture_length = 0. for fault in self.faults: fault_trace = fault.trace patch_locs = [] for patch in self.patches: centroid = Point(patch.centre[:2]) patch_dist = fault_trace.project(centroid) patch_locs.append(patch_dist) rupture_length += np.ptp(patch_locs) self.length = rupture_length
[docs] def make_fault_moment_dict(self, fault_model: RsqSimMultiFault, mu: float = 3.0e10, by_cfm_names: bool = True, min_m0: float = 0.): """ Build a dictionary of seismic moment released on each fault. Parameters ---------- fault_model : RsqSimMultiFault Fault model providing patch and fault dictionaries. mu : float, optional Shear modulus in Pa. Defaults to 3×10¹⁰ Pa (30 GPa). by_cfm_names : bool, optional If ``True`` (default), merge sub-segments by stripping trailing digits and hyphens to recover CFM fault names. If ``False``, use the raw fault segment names. min_m0 : float, optional Minimum moment (N·m) for a fault to be included. Defaults to 0 (include all faults). Returns ------- dict Mapping of fault name (str) to scalar moment (float, N·m). """ assert self.faults is not None, "Event has no faults, can't calculate moment" m0_dict = {} for fault in self.faults: fault_patches = np.array(list(fault.patch_dic.keys())) fault_patch_numbers = self.patch_numbers[np.isin(self.patch_numbers, fault_patches)] fault_patch_slip = self.patch_slip[np.isin(self.patch_numbers, fault_patches)] areas = [fault.patch_dic[number].area for number in fault_patch_numbers] m0 = sum(fault_patch_slip * areas * mu) if m0 > min_m0: m0_dict[fault.name] = m0 if by_cfm_names: # make lookup for fault segment names fault_short_dict = {} fault_short_names = [] rm_digs = str.maketrans('', '', digits) rm_hyph = str.maketrans('', '', "-") for name in fault_model.names: short_name = name.translate(rm_digs) short_name = short_name.translate(rm_hyph) fault_short_names += short_name fault_short_dict[name] = short_name m0_df = pd.DataFrame.from_dict(m0_dict, orient='index', columns=['M0']) m0_df["segName"] = [fault_short_dict[key] for key in m0_df.index] m0_df.reset_index(inplace=True, drop=True) m0_per_segment = m0_df.groupby(by="segName").sum() m0_seg_dict = dict(zip(m0_per_segment.index, m0_per_segment.M0)) else: m0_seg_dict = m0_dict return m0_seg_dict
[docs] def make_moment_prop_dict(self, fault_model: RsqSimMultiFault, mu: float = 3.0e10, by_cfm_names: bool = True): """ Build a dictionary of each fault's fractional contribution to total moment. Parameters ---------- fault_model : RsqSimMultiFault Fault model providing patch and fault dictionaries. mu : float, optional Shear modulus in Pa. Defaults to 3×10¹⁰ Pa. by_cfm_names : bool, optional Passed to :meth:`make_fault_moment_dict`; if ``True`` (default), merge sub-segments to CFM fault names. Returns ------- dict Mapping of fault name (str) to proportion of total event moment (float, 0–1), sorted in descending order. """ m0_dict = self.make_fault_moment_dict(fault_model=fault_model, mu=mu, by_cfm_names=by_cfm_names) prop_dict = {} for fault_name in m0_dict.keys(): prop_dict[fault_name] = m0_dict[fault_name] / self.m0 # and sort prop_dict_sorted_keys = sorted(prop_dict, key=prop_dict.get, reverse=True) prop_dict_sorted = dict(zip(prop_dict_sorted_keys, [prop_dict[x] for x in prop_dict_sorted_keys])) return prop_dict_sorted
[docs] def plot_slip_2d(self, subduction_cmap: str = "plasma", crustal_cmap: str = "viridis", show: bool = True, extra_sub_list: list = None, write: str = None, subplots=None, global_max_sub_slip: int = 0, global_max_slip: int = 0, figsize: tuple = (6.4, 4.8), hillshading_intensity: float = 0.0, bounds: tuple = None, plot_rivers: bool = True, plot_lakes: bool = True, plot_highways: bool = True, plot_boundaries: bool = False, create_background: bool = False, coast_only: bool = True, hillshade_cmap: colors.LinearSegmentedColormap = cm.terrain, plot_log_scale: bool = False, log_cmap: str = "magma", log_min: float = 1.0, log_max: float = 100., plot_traces: bool = True, trace_colour: str = "pink", land_color: str = 'antiquewhite', min_slip_percentile: float = None, min_slip_value: float = None, plot_zeros: bool = True, wgs: bool = False, title: str = None, plot_edge_label: bool = True, plot_cbars: bool = True, alpha: float = 1.0, coast_on_top: bool = False): """ Plot a 2-D map of the slip distribution for this event. Subduction-interface faults and crustal faults are coloured with separate colourmaps. Optionally overlays a coastal/background map and saves to file. Parameters ---------- subduction_cmap : str, optional Matplotlib colourmap name for subduction-interface patches. Defaults to ``"plasma"``. crustal_cmap : str, optional Matplotlib colourmap name for crustal patches. Defaults to ``"viridis"``. show : bool, optional If ``True`` (default), call ``plt.show()`` after plotting. extra_sub_list : list of str, optional Additional fault names to treat as subduction interface, supplementing the built-in ``bruce_subduction`` list. write : str or None, optional File path to save the figure (e.g. ``"event_001.png"``). If ``None``, the figure is not saved. subplots : tuple or str or None, optional Existing ``(fig, ax)`` tuple or path to a pickled figure to plot on top of. If ``None``, a new figure is created. global_max_sub_slip : float, optional Fixed colourbar maximum for subduction slip. If 0, the per-event maximum is used. global_max_slip : float, optional Fixed colourbar maximum for crustal slip. If 0, the per-event maximum is used. figsize : tuple of float, optional Figure size in inches ``(width, height)``. hillshading_intensity : float, optional Hillshading intensity passed to ``plot_background``. bounds : tuple or None, optional Map extent ``(x_min, y_min, x_max, y_max)`` in NZTM. Defaults to the event bounding box. plot_rivers, plot_lakes, plot_highways, plot_boundaries : bool, optional Toggle background map layers. create_background : bool, optional If ``True``, render a full background map. coast_only : bool, optional If ``True`` (default), only render the coastline as background. hillshade_cmap : LinearSegmentedColormap, optional Colourmap for hillshading. plot_log_scale : bool, optional If ``True``, use logarithmic colour scaling. log_cmap : str, optional Colourmap for log-scale plots. Defaults to ``"magma"``. log_min, log_max : float, optional Colour scale limits for log-scale plots. plot_traces : bool, optional If ``True``, plot fault surface traces. trace_colour : str, optional Colour for fault traces. land_color : str, optional Background land colour. min_slip_percentile : float or None, optional If set, only show patches above this slip percentile. min_slip_value : float or None, optional If set, only show patches with slip >= this value (m). plot_zeros : bool, optional If ``True`` (default), plot zero-slip patches. wgs : bool, optional If ``True``, use WGS84 coordinates rather than NZTM. title : str or None, optional Figure super-title. plot_edge_label : bool, optional If ``True``, show axis edge labels on the background map. plot_cbars : bool, optional If ``True`` (default), add colourbars to the figure. alpha : float, optional Patch transparency (0–1). Defaults to 1. coast_on_top : bool, optional If ``True``, redraw the coastline on top of the slip patches. Returns ------- list Matplotlib ``PolyCollection`` objects for each fault plotted. """ assert self.patches is not None, "Need to populate object with patches!" if all([bounds is None, self.bounds is not None]): bounds = self.bounds if all([min_slip_percentile is not None, min_slip_value is None]): min_slip = np.percentile(self.patch_slip, min_slip_percentile) else: min_slip = min_slip_value if subplots is not None: if isinstance(subplots, str): # Assume pickled figure with open(subplots, "rb") as pfile: loaded_subplots = pickle.load(pfile) fig, background_ax = loaded_subplots ax = background_ax["main_figure"] else: # Assume matplotlib objects fig, ax = subplots assert isinstance(fig, plt.Figure), "subplots must be a matplotlib figure or a pickled figure" assert isinstance(ax, plt.Axes), "subplots must be a matplotlib figure or a pickled figure" elif create_background: fig, background_ax = plot_background(figsize=figsize, hillshading_intensity=hillshading_intensity, bounds=bounds, plot_rivers=plot_rivers, plot_lakes=plot_lakes, plot_highways=plot_highways, plot_boundaries=plot_boundaries, hillshade_cmap=hillshade_cmap, wgs=wgs, land_color=land_color, plot_edge_label=plot_edge_label) ax = background_ax["main_figure"] elif coast_only: fig, background_ax = plot_background(figsize=figsize, hillshading_intensity=hillshading_intensity, bounds=bounds, plot_rivers=False, plot_lakes=False, plot_highways=False, plot_boundaries=False, hillshade_cmap=hillshade_cmap, wgs=wgs, land_color=land_color, plot_edge_label=plot_edge_label) ax = background_ax["main_figure"] else: fig, ax = plt.subplots() fig.set_size_inches(figsize) # Find maximum slip for subduction interface # Find maximum slip to scale colourbar max_slip = 0 if extra_sub_list is not None: sub_list = bruce_subduction + extra_sub_list else: sub_list = bruce_subduction colour_dic = {} for f_i, fault in enumerate(self.faults): if fault.name in sub_list: if plot_zeros: colours = np.zeros(fault.patch_numbers.shape) else: colours = np.nan * np.ones(fault.patch_numbers.shape) for local_id, patch_id in enumerate(fault.patch_numbers): if patch_id in self.patch_numbers: slip_index = np.argwhere(self.patch_numbers == patch_id)[0] if min_slip is not None: if self.patch_slip[slip_index] >= min_slip: colours[local_id] = self.patch_slip[slip_index] else: if self.patch_slip[slip_index] > 0.: colours[local_id] = self.patch_slip[slip_index] colour_dic[f_i] = colours if np.nanmax(colours) > max_slip: max_slip = np.nanmax(colours) max_slip = global_max_sub_slip if global_max_sub_slip > 0 else max_slip plots = [] # Plot subduction interface subduction_list = [] subduction_plot = None for f_i, fault in enumerate(self.faults): if fault.name in sub_list: subduction_list.append(fault.name) if plot_log_scale: subduction_plot = ax.tripcolor(fault.vertices[:, 0], fault.vertices[:, 1], fault.triangles, facecolors=colour_dic[f_i], cmap=log_cmap, norm=colors.LogNorm(vmin=log_min, vmax=log_max), alpha=alpha) else: subduction_plot = ax.tripcolor(fault.vertices[:, 0], fault.vertices[:, 1], fault.triangles, facecolors=colour_dic[f_i], cmap=subduction_cmap, vmin=0., vmax=max_slip, alpha=alpha) plots.append(subduction_plot) max_slip = 0 colour_dic = {} for f_i, fault in enumerate(self.faults): if fault.name not in sub_list: if plot_zeros: colours = np.zeros(fault.patch_numbers.shape) else: colours = np.nan * np.ones(fault.patch_numbers.shape) for local_id, patch_id in enumerate(fault.patch_numbers): if patch_id in self.patch_numbers: slip_index = np.argwhere(self.patch_numbers == patch_id)[0] if min_slip is not None: if self.patch_slip[slip_index] >= min_slip: colours[local_id] = self.patch_slip[slip_index] else: if self.patch_slip[slip_index] > 0.: colours[local_id] = self.patch_slip[slip_index] colour_dic[f_i] = colours if np.nanmax(colours) > max_slip: max_slip = np.nanmax(colours) max_slip = global_max_slip if global_max_slip > 0 else max_slip crustal_plot = None # check for 90 degree dipping faults vert_faults = [fault for fault in self.faults if fault.mean_dip == 90.] if len(vert_faults) == 0: for f_i, fault in enumerate(self.faults): if isinstance(fault.trace, LineString): ax.plot(*fault.trace.coords.xy, color='black', linestyle='dashed', linewidth=0.1) else: try: merged_coords = [list(geom.coords) for geom in fault.trace.geoms] merged_trace = LineString([trace for sublist in merged_coords for trace in sublist]) ax.plot(*merged_trace.coords.xy, color='black', linestyle='dashed', linewidth=0.1) except: pass if fault.name not in sub_list: if plot_log_scale: crustal_plot = ax.tripcolor(fault.vertices[:, 0], fault.vertices[:, 1], fault.triangles, facecolors=colour_dic[f_i], cmap=log_cmap, norm=colors.LogNorm(vmin=log_min, vmax=log_max), alpha=alpha) else: crustal_plot = ax.tripcolor(fault.vertices[:, 0], fault.vertices[:, 1], fault.triangles, facecolors=colour_dic[f_i], cmap=crustal_cmap, vmin=0., vmax=max_slip, alpha=alpha) plots.append(crustal_plot) elif len(self.faults) == 1: fault = self.faults[0] f_i = 0 if isinstance(fault.trace, LineString): ax.plot(*fault.trace.coords.xy, color='black', linestyle='dashed', linewidth=0.1) else: try: merged_coords = [list(geom.coords) for geom in fault.trace.geoms] merged_trace = LineString([trace for sublist in merged_coords for trace in sublist]) ax.plot(*merged_trace.coords.xy, color='black', linestyle='dashed', linewidth=0.1) except: pass if fault.name not in sub_list: if plot_log_scale: crustal_plot = ax.tripcolor(fault.vertices[:, 0], fault.vertices[:, 2], fault.triangles, facecolors=colour_dic[f_i], cmap=log_cmap, norm=colors.LogNorm(vmin=log_min, vmax=log_max), alpha=alpha) else: crustal_plot = ax.tripcolor(fault.vertices[:, 0], fault.vertices[:, 2], fault.triangles, facecolors=colour_dic[f_i], cmap=crustal_cmap, vmin=0., vmax=max_slip, alpha=alpha) plots.append(crustal_plot) else: for f_i, fault in enumerate(self.faults): if isinstance(fault.trace, LineString): ax.plot(*fault.trace.coords.xy, color='black', linestyle='dashed', linewidth=0.1) else: try: merged_coords = [list(geom.coords) for geom in fault.trace.geoms] merged_trace = LineString([trace for sublist in merged_coords for trace in sublist]) ax.plot(*merged_trace.coords.xy, color='black', linestyle='dashed', linewidth=0.1) except: pass if fault.name not in sub_list: if fault in vert_faults: pass else: if plot_log_scale: crustal_plot = ax.tripcolor(fault.vertices[:, 0], fault.vertices[:, 1], fault.triangles, facecolors=colour_dic[f_i], cmap=log_cmap, norm=colors.LogNorm(vmin=log_min, vmax=log_max)) else: crustal_plot = ax.tripcolor(fault.vertices[:, 0], fault.vertices[:, 1], fault.triangles, facecolors=colour_dic[f_i], cmap=crustal_cmap, vmin=0., vmax=max_slip) plots.append(crustal_plot) if plot_cbars: if any([subplots is None, isinstance(subplots, str)]): if plot_log_scale: if subduction_list: sub_cbar = fig.colorbar(subduction_plot, ax=ax) sub_cbar.set_label("Slip (m)") elif crustal_plot is not None: crust_cbar = fig.colorbar(crustal_plot, ax=ax) crust_cbar.set_label("Crustal slip (m)") else: if subduction_list: sub_cbar = fig.colorbar(subduction_plot, ax=ax) sub_cbar.set_label("Subduction slip (m)") if crustal_plot is not None: crust_cbar = fig.colorbar(crustal_plot, ax=ax) crust_cbar.set_label("Crustal slip (m)") if coast_on_top: plot_coast(ax=ax, wgs=wgs, linewidth=0.5, edgecolor='k') if title: plt.suptitle(title) if write is not None: fig.savefig(write, dpi=300) if show: plt.show() else: plt.close(fig) if show and subplots is None: plt.show() return plots
[docs] def plot_slip_evolution(self, subduction_cmap: str = "plasma", crustal_cmap: str = "viridis", show: bool = True, step_size: int = 1, write: str = None, fps: int = 20, file_format: str = "gif", figsize: tuple = (6.4, 4.8), extra_sub_list: list = None): """ Animate the temporal evolution of slip propagation for this event. Produces a ``FuncAnimation`` that steps through rupture time, progressively revealing each patch as it slips. Can be saved as a GIF or video file. Parameters ---------- subduction_cmap : str, optional Matplotlib colourmap for subduction-interface patches. Defaults to ``"plasma"``. crustal_cmap : str, optional Matplotlib colourmap for crustal patches. Defaults to ``"viridis"``. show : bool, optional If ``True`` (default), display the animation interactively. step_size : int, optional Time step (s) between animation frames. Defaults to 1. write : str or None, optional Output file path prefix (without extension). If ``None`` the animation is not saved. fps : int, optional Frames per second for the saved animation. Defaults to 20. file_format : str, optional Output format: ``"gif"``, ``"mov"``, ``"avi"``, or ``"mp4"``. Defaults to ``"gif"``. figsize : tuple of float, optional Figure size in inches ``(width, height)``. extra_sub_list : list of str, optional Additional fault names to treat as subduction interface. """ assert file_format in ("gif", "mov", "avi", "mp4") assert len(self.faults) > 0, "Can't plot an event with no faults." fig, ax = plt.subplots() fig.set_size_inches(figsize) ax.set_facecolor('w') plt.figure(facecolor='w') plot_coast(ax, clip_boundary=self.bounds) ax.set_aspect("equal") ax.get_xaxis().set_visible(True) ax.get_yaxis().set_visible(True) colour_dic = {} timestamps = defaultdict(set) subduction_max_slip = 0 crustal_max_slip = 0 if extra_sub_list is not None: sub_list = [*bruce_subduction, *extra_sub_list] else: sub_list = bruce_subduction subduction_list = [] for f_i, fault in enumerate(self.faults): colours = np.zeros(fault.patch_numbers.shape) times = np.zeros(fault.patch_numbers.shape) for local_id, patch_id in enumerate(fault.patch_numbers): if patch_id in self.patch_numbers: slip_index = np.searchsorted(self.patch_numbers, patch_id) times[local_id] = step_size * np.rint((self.patch_time[slip_index] - self.t0) / step_size) colours[local_id] = self.patch_slip[slip_index] timestamps[times[local_id]].add(f_i) colour_dic[f_i] = (colours, times) if fault.name in sub_list: subduction_list.append(fault.name) if max(colours) > subduction_max_slip: subduction_max_slip = max(colours) else: if max(colours) > crustal_max_slip: crustal_max_slip = max(colours) plots = {} subduction_plot = None for f_i, fault in enumerate(self.faults): init_colours = np.zeros(fault.patch_numbers.shape) if isinstance(fault.trace, LineString): ax.plot(*fault.trace.coords.xy, color='gray', linestyle='dashed') else: try: merged_coords = [list(geom.coords) for geom in fault.trace.geoms] merged_trace = LineString([trace for sublist in merged_coords for trace in sublist]) ax.plot(*merged_trace.coords.xy, color='gray', linestyle='dashed') except: pass if fault.name in sub_list: subduction_plot = ax.tripcolor(fault.vertices[:, 0], fault.vertices[:, 1], fault.triangles, facecolors=init_colours, cmap=subduction_cmap, vmin=0, vmax=subduction_max_slip) plots[f_i] = (subduction_plot, init_colours) crustal_plot = None for f_i, fault in enumerate(self.faults): init_colours = np.zeros(fault.patch_numbers.shape) if fault.name not in sub_list: crustal_plot = ax.tripcolor(fault.vertices[:, 0], fault.vertices[:, 1], fault.triangles, facecolors=init_colours, cmap=crustal_cmap, vmin=0, vmax=crustal_max_slip) plots[f_i] = (crustal_plot, init_colours) ax_divider = make_axes_locatable(ax) ax_time = ax_divider.append_axes("bottom", size="3%", pad=0.5) time_slider = Slider(ax_time, 'Time (s)', 0, step_size * round(self.dt / step_size) + step_size, valinit=0, valstep=step_size) # Build colorbars padding = 0.25 if subduction_list: sub_ax = ax_divider.append_axes("right", size="5%", pad=padding) sub_cbar = fig.colorbar( subduction_plot, cax=sub_ax) sub_cbar.set_label("Subduction slip (m)") padding += 0.25 if crustal_plot is not None: crust_ax = ax_divider.append_axes("right", size="5%", pad=padding) crust_cbar = fig.colorbar( crustal_plot, cax=crust_ax) crust_cbar.set_label("Slip (m)") def update_plot(num): time = time_slider.valmin + num * step_size time_slider.set_val(time) if time in timestamps: for f_i in timestamps[time]: plot, curr_colours = plots[f_i] fault_times = colour_dic[f_i][1] filter_time_indices = np.argwhere(fault_times == time).flatten() curr_colours[filter_time_indices] = colour_dic[f_i][0][filter_time_indices] plot.update({'array': curr_colours}) elif time == time_slider.valmax: for f_i, fault in enumerate(self.faults): plot, curr_colours = plots[f_i] init_colors = np.zeros(fault.patch_numbers.shape) curr_colours[:] = init_colors[:] plot.update({'array': curr_colours}) fig.canvas.draw_idle() frames = int((time_slider.valmax - time_slider.valmin) / step_size) + 1 animation = FuncAnimation(fig, update_plot, interval=50, frames=frames) if write is not None: writer = PillowWriter(fps=fps) if file_format == "gif" else FFMpegWriter(fps=fps) animation.save(f"{write}.{file_format}", writer, savefig_kwargs=dict(facecolor='w')) if show: plt.show()
[docs] def find_surface_faults(self,fault_model: RsqSimMultiFault,min_slip: float =0.1, method: str = 'vertex', n_patches: int = 1, max_depth: float = -1000., faults2ignore: [list,str] ='hikurangi'): """ Identify fault segments with surface-rupturing patches. Parameters ---------- fault_model : RsqSimMultiFault Fault model providing the patch dictionary. min_slip : float, optional Minimum slip (m) for a patch to count as ruptured. Defaults to 0.1 m. method : str, optional Depth criterion: ``"vertex"`` uses the shallowest vertex of a patch; ``"centroid"`` uses the patch centroid depth. Defaults to ``"vertex"``. n_patches : int, optional Minimum number of qualifying surface patches required for a fault to be included in the output. Defaults to 1. max_depth : float, optional Depth threshold (m, negative downward). Patches shallower than this are considered surface-rupturing. Defaults to -1000 m. faults2ignore : list of str or str, optional Fault name(s) to exclude from the search (e.g. subduction interface). Defaults to ``"hikurangi"``. Returns ------- list of str Names of fault segments that have surface rupture. """ assert method in ['centroid', 'vertex'], "Method must be centroid or vertex" assert max_depth < 0., "depths should be negative" if issubclass(type(faults2ignore),str): faults2ignore =[faults2ignore] surface_faults = [] for fault in self.faults: if not fault.name in faults2ignore: surface_patches = [] for patch_id in fault.patch_numbers: if patch_id in self.patch_numbers: patch = fault.patch_dic[patch_id] if method == 'vertex': patch_zs = patch.vertices.flatten()[[2, 5, 8]] patch_z = np.max(patch_zs) # use max because depths are negative elif method == 'centroid': patch_z = patch.centre[2] else: AssertionError('method must be vertex or centroid') if patch_z > max_depth: patch_ev_index = np.searchsorted(self.patch_numbers, patch_id) patch_slip = self.patch_slip[patch_ev_index] if patch_slip >= min_slip: surface_patches.append(patch_ev_index) if len(surface_patches) >= n_patches: surface_faults.append(fault.name) return surface_faults
[docs] def split_by_fault(self, fault_model: RsqSimMultiFault, min_slip: float = 0.1, min_patches: int = 1): """ Create per-fault sub-event objects for this event. Parameters ---------- fault_model : RsqSimMultiFault Fault model used to resolve patch IDs and fault names. min_slip : float, optional Minimum moment (N·m) for a fault to be included (passed as ``min_m0`` to :meth:`make_fault_moment_dict`). Defaults to 0.1. min_patches : int, optional Unused; retained for API compatibility. Returns ------- dict Mapping of fault name (str) to its corresponding :class:`RsqSimEvent` sub-event. """ assert self.faults is not None, "Event has no faults, can't split by fault" subevents = {} moment_dict = self.make_fault_moment_dict(fault_model=fault_model, min_m0=min_slip) for i, fault in enumerate(self.faults): if fault.name in moment_dict.keys(): sub_m0 = moment_dict[fault.name] sub_mw = m0_to_mw(sub_m0) fault_patches = np.array(list(fault.patch_dic.keys())) fault_patch_numbers = self.patch_numbers[np.isin(self.patch_numbers, fault_patches)] fault_patch_slip = self.patch_slip[np.isin(self.patch_numbers, fault_patches)] fault_patch_time = self.patch_time[np.isin(self.patch_numbers, fault_patches)] sub_event = self.from_earthquake_list(t0=self.t0, m0=sub_m0, mw=sub_mw, x=self.x, y=self.y, z=self.z, area=self.area, dt=self.dt, patch_numbers=fault_patch_numbers, patch_slip=fault_patch_slip, patch_time=fault_patch_time, fault_model=fault_model, event_id=i) subevents[fault.name] = sub_event return subevents
[docs] def slip_dist_array(self, include_zeros: bool = True, min_slip_percentile: float = None, min_slip_value: float = None, nztm_to_lonlat: bool = False): """ Build a dense array of the slip distribution across all patches. Each row corresponds to one triangular patch and contains the three vertex coordinates followed by slip, rake, and rupture time. Parameters ---------- include_zeros : bool, optional If ``True`` (default), include zero-slip patches (patches on involved faults that did not slip in this event). min_slip_percentile : float or None, optional If provided (and ``min_slip_value`` is ``None``), patches with slip below this percentile are excluded or zeroed. min_slip_value : float or None, optional If provided, patches with slip below this value (m) are excluded or zeroed. nztm_to_lonlat : bool, optional If ``True``, transform vertex coordinates from NZTM to WGS84 longitude/latitude before including them. Returns ------- numpy.ndarray of shape (n_patches, 12) Columns: ``[x1,y1,z1, x2,y2,z2, x3,y3,z3, slip_m, rake_deg, time_s]``. """ all_patches = [] if all([min_slip_percentile is not None, min_slip_value is None]): min_slip = np.percentile(self.patch_slip, min_slip_percentile) else: min_slip = min_slip_value for fault in self.faults: for patch_id in fault.patch_numbers: if patch_id in self.patch_numbers: patch = fault.patch_dic[patch_id] if nztm_to_lonlat: triangle_corners = patch.vertices_lonlat.flatten() else: triangle_corners = patch.vertices.flatten() slip_index = np.searchsorted(self.patch_numbers, patch_id) time = self.patch_time[slip_index] - self.t0 slip_mag = self.patch_slip[slip_index] if min_slip is not None: if slip_mag >= min_slip: patch_line = np.hstack([triangle_corners, np.array([slip_mag, patch.rake, time])]) all_patches.append(patch_line) elif include_zeros: patch = fault.patch_dic[patch_id] patch_line = np.hstack([triangle_corners, np.array([0., 0., 0.])]) all_patches.append(patch_line) else: patch_line = np.hstack([triangle_corners, np.array([slip_mag, patch.rake, time])]) all_patches.append(patch_line) elif include_zeros: patch = fault.patch_dic[patch_id] if nztm_to_lonlat: triangle_corners = patch.vertices_lonlat.flatten() else: triangle_corners = patch.vertices.flatten() patch_line = np.hstack([triangle_corners, np.array([0., 0., 0.])]) all_patches.append(patch_line) return np.array(all_patches)
[docs] def slip_dist_bounds(self, include_zeros: bool = True, min_slip_percentile: float = None, min_slip_value: float = None, nztm_to_lonlat: bool = False): """ Compute the bounding box of the slip distribution array. Parameters ---------- include_zeros : bool, optional Passed to :meth:`slip_dist_array`. min_slip_percentile : float or None, optional Passed to :meth:`slip_dist_array`. min_slip_value : float or None, optional Passed to :meth:`slip_dist_array`. nztm_to_lonlat : bool, optional Passed to :meth:`slip_dist_array`. Returns ------- tuple of float ``(x_min, y_min, x_max, y_max)`` extents of the slip distribution patch vertices. """ slip_dist_array = self.slip_dist_array(include_zeros=include_zeros, min_slip_percentile=min_slip_percentile, min_slip_value=min_slip_value, nztm_to_lonlat=nztm_to_lonlat) min_x = np.min(slip_dist_array[:, [0, 3, 6]]) max_x = np.max(slip_dist_array[:, [0, 3, 6]]) min_y = np.min(slip_dist_array[:, [1, 4, 7]]) max_y = np.max(slip_dist_array[:, [1, 4, 7]]) return min_x, min_y, max_x, max_y
[docs] def slip_dist_to_mesh(self, include_zeros: bool = True, min_slip_percentile: float = None, min_slip_value: float = None, nztm_to_lonlat: bool = False): """ Convert the slip distribution to a meshio Mesh with cell data. Parameters ---------- include_zeros : bool, optional Passed to :meth:`slip_dist_array`. min_slip_percentile : float or None, optional Passed to :meth:`slip_dist_array`. min_slip_value : float or None, optional Passed to :meth:`slip_dist_array`. nztm_to_lonlat : bool, optional Passed to :meth:`slip_dist_array`. Returns ------- meshio.Mesh Triangulated mesh with ``cell_data`` containing ``"slip"``, ``"rake"``, and ``"time"`` arrays. """ slip_dist_array = self.slip_dist_array(include_zeros=include_zeros, min_slip_percentile=min_slip_percentile, min_slip_value=min_slip_value) mesh = array_to_mesh(slip_dist_array[:, :9]) data_dic = {} for label, index in zip(["slip", "rake", "time"], [9, 10, 11]): data_dic[label] = slip_dist_array[:, index] mesh.cell_data = data_dic return mesh
[docs] def slip_dist_to_vtk(self, vtk_file: str, include_zeros: bool = True, min_slip_percentile: float = None, min_slip_value: float = None): """ Write the slip distribution to a VTK file. Parameters ---------- vtk_file : str Output VTK file path. include_zeros : bool, optional Passed to :meth:`slip_dist_to_mesh`. min_slip_percentile : float or None, optional Passed to :meth:`slip_dist_to_mesh`. min_slip_value : float or None, optional Passed to :meth:`slip_dist_to_mesh`. """ mesh = self.slip_dist_to_mesh(include_zeros=include_zeros, min_slip_percentile=min_slip_percentile, min_slip_value=min_slip_value) mesh.write(vtk_file, file_format="vtk")
[docs] def slip_dist_to_obj(self, obj_file: str, include_zeros: bool = True, min_slip_percentile: float = None, min_slip_value: float = None): """ Write the slip distribution to a Wavefront OBJ file. Parameters ---------- obj_file : str Output OBJ file path. include_zeros : bool, optional Passed to :meth:`slip_dist_to_mesh`. min_slip_percentile : float or None, optional Passed to :meth:`slip_dist_to_mesh`. min_slip_value : float or None, optional Passed to :meth:`slip_dist_to_mesh`. """ mesh = self.slip_dist_to_mesh(include_zeros=include_zeros, min_slip_percentile=min_slip_percentile, min_slip_value=min_slip_value) mesh.write(obj_file, file_format="obj")
[docs] def slip_dist_to_txt(self, txt_file, include_zeros: bool = True, min_slip_percentile: float = None, min_slip_value: float = None, nztm_to_lonlat: bool = False): """ Write the slip distribution to a space-delimited text file. Each row contains the three vertex coordinates of a patch followed by slip, rake, and rupture time. Parameters ---------- txt_file : str or path-like Output text file path. include_zeros : bool, optional Passed to :meth:`slip_dist_array`. min_slip_percentile : float or None, optional Passed to :meth:`slip_dist_array`. min_slip_value : float or None, optional Passed to :meth:`slip_dist_array`. nztm_to_lonlat : bool, optional If ``True``, vertex coordinates are in WGS84 lon/lat and the header is adjusted accordingly. """ if nztm_to_lonlat: header = "lon1 lat1 z1 lon2 lat2 z2 lon3 lat3 z3 slip_m rake_deg time_s" else: header = "x1 y1 z1 x2 y2 z2 x3 y3 z3 slip_m rake_deg time_s" slip_dist_array = self.slip_dist_array(include_zeros=include_zeros, min_slip_percentile=min_slip_percentile, min_slip_value=min_slip_value, nztm_to_lonlat=nztm_to_lonlat) np.savetxt(txt_file, slip_dist_array, fmt="%.6f", delimiter=" ", header=header)
[docs] def slip_dist_to_gdf(self, gdf_file: str, include_zeros: bool = True, min_slip_percentile: float = None, min_slip_value: float = None, nztm_to_lonlat: bool = False, crs="2193"): """ Build a GeoDataFrame of the slip distribution. Creates a GeoDataFrame where each row is a triangular patch polygon with slip, rake, and time attributes. Parameters ---------- gdf_file : str or None Unused; retained for API compatibility. include_zeros : bool, optional Passed to :meth:`slip_dist_array`. min_slip_percentile : float or None, optional Passed to :meth:`slip_dist_array`. min_slip_value : float or None, optional Passed to :meth:`slip_dist_array`. nztm_to_lonlat : bool, optional If ``True``, reproject the result from NZTM to WGS84. crs : str, optional CRS for the output GeoDataFrame. Defaults to ``"2193"`` (NZTM / EPSG:2193). Returns ------- geopandas.GeoDataFrame GeoDataFrame with columns ``["slip", "rake", "time"]`` and a ``geometry`` column of triangular patch polygons. """ slip_dist_array = self.slip_dist_array(include_zeros=include_zeros, min_slip_percentile=min_slip_percentile, min_slip_value=min_slip_value, nztm_to_lonlat=nztm_to_lonlat) geometry = [Polygon([(slip_dist_array[i, 0], slip_dist_array[i, 1]), (slip_dist_array[i, 3], slip_dist_array[i, 4]), (slip_dist_array[i, 6], slip_dist_array[i, 7])]) for i in range(len(slip_dist_array))] gdf = gpd.GeoDataFrame(slip_dist_array[:, 9:], columns=["slip", "rake", "time"], geometry=geometry, crs=crs) if nztm_to_lonlat: assert crs == "2193" gdf.to_crs("EPSG:4326", inplace=True) return gdf
[docs] def slip_dist_to_geojson(self, geojson_file: str, include_zeros: bool = True, min_slip_percentile: float = None, min_slip_value: float = None, nztm_to_lonlat: bool = False): """ Write the slip distribution to a GeoJSON file. Parameters ---------- geojson_file : str Output GeoJSON file path. include_zeros : bool, optional Passed to :meth:`slip_dist_to_gdf`. min_slip_percentile : float or None, optional Passed to :meth:`slip_dist_to_gdf`. min_slip_value : float or None, optional Passed to :meth:`slip_dist_to_gdf`. nztm_to_lonlat : bool, optional If ``True``, reproject patches to WGS84 before writing. """ gdf = self.slip_dist_to_gdf(gdf_file=None, include_zeros=include_zeros, min_slip_percentile=min_slip_percentile, min_slip_value=min_slip_value, nztm_to_lonlat=nztm_to_lonlat) gdf.to_file(geojson_file, driver="GeoJSON")
[docs] def slip_dist_to_shapefile(self, shapefile_file: str, include_zeros: bool = True, min_slip_percentile: float = None, min_slip_value: float = None, nztm_to_lonlat: bool = False): """ Write the slip distribution to an ESRI Shapefile. Parameters ---------- shapefile_file : str Output shapefile path (without extension). include_zeros : bool, optional Passed to :meth:`slip_dist_to_gdf`. min_slip_percentile : float or None, optional Passed to :meth:`slip_dist_to_gdf`. min_slip_value : float or None, optional Passed to :meth:`slip_dist_to_gdf`. nztm_to_lonlat : bool, optional If ``True``, reproject patches to WGS84 before writing. """ gdf = self.slip_dist_to_gdf(gdf_file=None, include_zeros=include_zeros, min_slip_percentile=min_slip_percentile, min_slip_value=min_slip_value, nztm_to_lonlat=nztm_to_lonlat) gdf.to_file(shapefile_file)
[docs] def slip_dist_to_gmt(self, fault_model: RsqSimMultiFault, gmt_prefix: str, min_slip_value: float = None, nztm_to_lonlat: bool = False, subduction_names: Iterable = ("hikkerm", "puysegur")): """ Write the slip distribution to GMT multi-segment text files. Writes two files: ``<gmt_prefix>_crustal.gmt`` for crustal faults and ``<gmt_prefix>_subduction.gmt`` for subduction-interface faults. Each patch is written as a ``>-Z<slip>`` segment header followed by vertex coordinates. Parameters ---------- fault_model : RsqSimMultiFault Fault model providing the patch dictionary. gmt_prefix : str Prefix for the output GMT file names. min_slip_value : float or None, optional If set, only patches with slip >= this value (m) are written. nztm_to_lonlat : bool, optional Unused; reserved for future coordinate transformation. subduction_names : iterable of str, optional Fault names to classify as subduction interface. Defaults to ``("hikkerm", "puysegur")``. """ crustal_faults = [fault for fault in self.faults if fault.name not in subduction_names] subduction_faults = [fault for fault in self.faults if fault.name in subduction_names] if crustal_faults: crustal_patch_ids = [] crustal_patch_slip = [] crustal_patch_depth = [] for fault in crustal_faults: fault_patches = fault.patch_numbers event_fault_indices = np.isin(self.patch_numbers, fault_patches) event_fault_slip = self.patch_slip[event_fault_indices] if min_slip_value is not None: event_gt_min_slip_indices = event_fault_slip > min_slip_value filtered_patch_indices = np.where(event_gt_min_slip_indices & event_fault_indices)[0] else: filtered_patch_indices = np.where(event_fault_indices)[0] crustal_patch_ids.extend(self.patch_numbers[filtered_patch_indices]) crustal_patch_slip.extend(self.patch_slip[filtered_patch_indices]) for slip_index in filtered_patch_indices: patch_id = self.patch_numbers[slip_index] patch = fault.patch_dic[patch_id] depth = np.mean(patch.vertices[:, 2]) crustal_patch_depth.append(depth) if len(crustal_patch_ids) > 0: sorted_depths = np.argsort(crustal_patch_depth) crustal_patch_ids = np.array(crustal_patch_ids)[sorted_depths] crustal_patch_slip = np.array(crustal_patch_slip)[sorted_depths] crustal_patch_depth = np.array(crustal_patch_depth)[sorted_depths] with open(f"{gmt_prefix}_crustal.gmt", "w") as f: for patch_id, slip, depth in zip(crustal_patch_ids, crustal_patch_slip, crustal_patch_depth): patch = fault_model.patch_dic[patch_id] f.write(f">-Z{slip:.6f}\n") for vertex in patch.vertices: f.write(f"{vertex[0]} {vertex[1]} {vertex[2]}\n") if subduction_faults: with open(f"{gmt_prefix}_subduction.gmt", "w") as f: for fault in subduction_faults: fault_patches = fault.patch_numbers event_fault_indices = np.isin(self.patch_numbers, fault_patches) event_fault_slip = self.patch_slip[event_fault_indices] if min_slip_value is not None: event_gt_min_slip_indices = event_fault_slip > min_slip_value filtered_patch_indices = np.where(event_gt_min_slip_indices & event_fault_indices)[0] else: filtered_patch_indices = np.where(event_fault_indices)[0] for slip_index in filtered_patch_indices: patch_id = self.patch_numbers[slip_index] patch = fault.patch_dic[patch_id] slip = self.patch_slip[slip_index] f.write(f">-Z{slip:.6f}\n") for vertex in patch.vertices: f.write(f"{vertex[0]} {vertex[1]} {vertex[2]}\n")
[docs] def discretize_tiles(self, tile_list: list[Polygon], probability: float, rake: float): """ Select tiles that overlap the rupture exterior by at least 50 %. Parameters ---------- tile_list : list of Polygon Candidate rectangular tiles in NZTM coordinates. probability : float Unused; retained for API compatibility with :meth:`discretize_openquake`. rake : float Unused; retained for API compatibility. Returns ------- geopandas.GeoSeries GeoSeries (EPSG:2193) of tiles where the intersection with the rupture exterior covers at least half the tile area. """ included_tiles = [] for tile in tile_list: overlapping = tile.intersects(self.exterior) if overlapping: intersection = tile.intersection(self.exterior) if intersection.area >= 0.5 * tile.area: included_tiles.append(tile) out_gs = gpd.GeoSeries(included_tiles, crs=2193) return out_gs
[docs] def discretize_openquake(self, tile_list: list[Polygon], probability: float, rake: float): """ Build an OpenQuake multi-planar rupture from overlapping tiles. Selects tiles from ``tile_list`` that overlap the rupture exterior by at least 50 %, then packages them as an :class:`OpenQuakeMultiSquareRupture`. Parameters ---------- tile_list : list of Polygon Candidate rectangular tiles in NZTM coordinates. probability : float Probability of occurrence passed to :class:`OpenQuakeMultiSquareRupture`. rake : float Mean rake (degrees) passed to :class:`OpenQuakeMultiSquareRupture`. Returns ------- OpenQuakeMultiSquareRupture or None Rupture object if any tiles overlap; ``None`` otherwise. """ included_tiles = [] for tile in tile_list: overlapping = tile.intersects(self.exterior) if overlapping: intersection = tile.intersection(self.exterior) if intersection.area >= 0.5 * tile.area: included_tiles.append(tile) out_gs = gpd.GeoSeries(included_tiles, crs=2193) out_gs_wgs = out_gs.to_crs(epsg=4326) if out_gs_wgs.size > 0: oq_rup = OpenQuakeMultiSquareRupture(list(out_gs.geometry), magnitude=self.mw, rake=rake, hypocentre=np.array([self.x, self.y, self.z]), event_id=self.event_id, probability=probability) return oq_rup else: return
[docs] def discretize_openquake_ktree(self, fault_model: RsqSimMultiFault, quads_dict: dict, probability: float, subduction_names: Iterable = ("hikkerm", "puysegur"), min_moment = 1.e18, min_slip = 0.1, tile_size: float = 5000., write_mesh: bool = False, write_geojson: bool = False, xml_dir: str = None, threshold: float = 0.5): """ Build OpenQuake XML ruptures using a KD-tree tile assignment. Uses :meth:`slip_dist_quads_ktree` to find ruptured quadrilateral tiles per fault, then writes separate XML files for crustal and subduction components. Parameters ---------- fault_model : RsqSimMultiFault Fault model providing patch and fault dictionaries. quads_dict : dict Mapping of fault segment name to a ``(n, 4, 3)`` array of pre-computed quadrilateral tile corner coordinates. probability : float Probability of occurrence for the OpenQuake rupture. subduction_names : iterable of str, optional Fault names classified as subduction interface. min_moment : float, optional Minimum moment (N·m) for a fault to be included. min_slip : float, optional Minimum slip (m) for a patch to count as ruptured. tile_size : float, optional Tile size hint (m); unused here but passed through for context. write_mesh : bool, optional If ``True``, also write VTK mesh files for each component. write_geojson : bool, optional If ``True``, also write GeoJSON files for each component. xml_dir : str or None, optional Directory in which to write output files. Defaults to the current working directory. threshold : float, optional Fraction of patches in a tile that must be ruptured for the tile to be included. Passed to :meth:`slip_dist_quads_ktree`. """ tiles_dict = self.slip_dist_quads_ktree(quads_dict=quads_dict, fault_model=fault_model,min_moment=min_moment, min_slip=min_slip, threshold_for_inclusion=threshold) crustal_faults = [key for key in tiles_dict.keys() if key not in subduction_names] if crustal_faults: if all([tiles_dict[key].size == 0 for key in crustal_faults]): crustal_faults = [] subduction_faults = [key for key in tiles_dict.keys() if key in subduction_names] if subduction_faults: if all([tiles_dict[key].size == 0 for key in subduction_faults]): subduction_faults = [] if subduction_faults: subduction_tiles = np.vstack([tiles_dict[key] for key in subduction_faults]) if write_mesh: mesh = quads_to_vtk(subduction_tiles) vtk_name = f"event_{self.event_id}_subduction.vtk" if xml_dir is not None: vtk_name = os.path.join(xml_dir, vtk_name) mesh.write(vtk_name, file_format="vtk") subduction_tiles_gs = gpd.GeoSeries([Polygon(subduction_tile) for subduction_tile in subduction_tiles], crs=2193) if write_geojson: geojson_name = f"event_{self.event_id}_subduction.geojson" if xml_dir is not None: geojson_name = os.path.join(xml_dir, geojson_name) subduction_tiles_gs.to_file(geojson_name, driver="GeoJSON") subduction_component = self.get_subduction_component(fault_model=fault_model, subduction_names=subduction_names, min_moment=min_moment, min_slip=min_slip) if subduction_component is not None: subduction_mw, subduction_rake, subduction_centroid = subduction_component oq_rup = OpenQuakeMultiSquareRupture(list(subduction_tiles_gs.geometry), magnitude=subduction_mw, rake=subduction_rake, hypocentre=subduction_centroid, event_id=self.event_id, probability=probability) out_name = f"event_{self.event_id}_subduction.xml" if xml_dir is not None: out_file = os.path.join(xml_dir, out_name) else: out_file = out_name oq_rup.to_oq_xml(out_file) if crustal_faults: crustal_tiles = np.vstack([tiles_dict[key] for key in crustal_faults if tiles_dict[key].size > 0]) if write_mesh: mesh = quads_to_vtk(crustal_tiles) vtk_name = f"event_{self.event_id}_crustal.vtk" if xml_dir is not None: vtk_name = os.path.join(xml_dir, vtk_name) mesh.write(vtk_name, file_format="vtk") crustal_tiles_gs = gpd.GeoSeries([Polygon(crustal_tile) for crustal_tile in crustal_tiles], crs=2193) if write_geojson: geojson_name = f"event_{self.event_id}_crustal.geojson" if xml_dir is not None: geojson_name = os.path.join(xml_dir, geojson_name) crustal_tiles_gs.to_file(geojson_name, driver="GeoJSON") crustal_component = self.get_crustal_component(fault_model=fault_model, crustal_names=crustal_faults, min_moment=min_moment, min_slip=min_slip) if crustal_component is not None and not crustal_tiles_gs.is_empty.all(): crustal_mw, crustal_rake, crustal_centroid = crustal_component oq_rup = OpenQuakeMultiSquareRupture(list(crustal_tiles_gs.geometry), magnitude=crustal_mw, rake=crustal_rake, hypocentre=crustal_centroid, event_id=self.event_id, probability=probability) out_name = f"event_{self.event_id}_crustal.xml" if xml_dir is not None: out_file = os.path.join(xml_dir, out_name) else: out_file = out_name oq_rup.to_oq_xml(out_file)
[docs] def event_to_json(self, fault_model: RsqSimMultiFault, path2cfm: str, catalogue_version: str = 'v1', xml_dir: str = 'OQ-events', wgs84: bool = False, subd_tile_size: float = 15000., tile_size: float = 5000., tectonic_region: str = "NZ"): """ Export event slip distribution to a ShakeMap-compatible GeoJSON file. Discretises the rupturing faults into rectangular tiles, selects those near slipping patches, converts coordinates to WGS84, and writes a GeoJSON file with metadata to ``<xml_dir>/event_<id>/<id>.json``. Parameters ---------- fault_model : RsqSimMultiFault Fault model used to merge and discretise fault segments. path2cfm : str Path to the Community Fault Model directory (required for ``catalogue_version="v2"`` name lookups). catalogue_version : str, optional ``"v1"`` or ``"v2"``. Affects which fault name dictionary is used. Defaults to ``"v1"``. xml_dir : str, optional Output directory. Defaults to ``"OQ-events"``. wgs84 : bool, optional If ``True``, hypocentre coordinates are already in WGS84 (lon/lat). If ``False`` (default), NZTM coordinates are transformed to WGS84. subd_tile_size : float, optional Tile size (m) used when discretising subduction faults. Defaults to 15000 m. tile_size : float, optional Tile size (m) used when discretising crustal faults. Defaults to 5000 m. tectonic_region : str, optional Tectonic region tag written into the metadata. Defaults to ``"NZ"``. """ # setup assert os.path.exists(path2cfm), "Path to CFM does not exist" if catalogue_version == 'v2': fault_model.make_v2_name_dic(path2cfm=path2cfm) name_dict = fault_model.v2_name_dic else: name_dict = dict(zip(fault_model.names, fault_model.names)) # create output directory if needed outdir = os.path.join(xml_dir, f'event_{self.event_id}') if not os.path.exists(outdir): os.makedirs(outdir) ### create metadata string for event # start with timestamp (not sure this is really necessary) ev_time = self.t0 yrs = f"{np.floor(ev_time / (365.25 * 24. * 3600.)):.0f}" months = f"{np.floor((ev_time - (float(yrs) * 365.25 * 24. * 3600.)) / (3600. * 24. * 30.4)):02.0f}" remain_ym = ev_time - (float(yrs) * (365.25 * 24. * 3600.) + float(months) * (3600. * 24. * 30.4)) days = np.floor(remain_ym / (24. * 3600.)) hours = np.floor((remain_ym - days * (24. * 3600.)) / 3600.) mins = np.floor((remain_ym - days * (24. * 3600.) - (hours * 3600.)) / 60.) secs = remain_ym - days * (24. * 3600.) - (hours * 3600.) - mins * 60. timestring = f"{yrs}-{months}-{days:02.0f}T{hours:02.0f}:{mins:02.0f}:{secs:02.6f}Z" # location if not wgs84: lon, lat = transformer_nztm2wgs.transform(self.x, self.y) else: lon = self.x lat = self.y metadata = {"lat": np.round(lat, decimals=3), "id": f"rsq{catalogue_version}{self.event_id}", "mech": "ALL", "mag": self.mw, "rake": self.mean_rake, "reference": f"rsqsim_{catalogue_version}", "netid": f"{tectonic_region}", "depth": f'{self.z / 1000.:.1f}', "network": f"rsqsim_{catalogue_version}", "locstring": f"{tectonic_region}", "time": f"{timestring}", "lon": np.round(lon, decimals=3), "productcode": f"rsq{catalogue_version}{self.event_id}"} ### convert faults to quadrilaterals # which faults are involved in this event? faults = RsqSimMultiFault(self.faults) faultNames = faults.names # find corresponding larger/cfm faults allFaults = np.unique([name_dict[name] for name in faultNames]) subdFaults = np.unique([name_dict[name] for name in faultNames if fnmatch.fnmatch(name, "*puy*") or fnmatch.fnmatch(name, "*hik*")]) # create empty list to store polygons poly_list = [] # iterate over faults which participate in the event for fName in allFaults: try: # need to find all parts of the fault, then later select those which have non-zero slip fault_merged = fault_model.merge_segments(fName, name_dict=name_dict, fault_name=fName) if len(subdFaults) > 0 and fName in subdFaults: # find average dip using subduction tile size dip_angle = fault_merged.get_average_dip(subd_tile_size) # Discretize into rectangular tiles new_fault_rect = fault_merged.discretize_rectangular_tiles(tile_size=subd_tile_size) else: # average dip dip_angle = fault_merged.get_average_dip() # Discretize into rectangular tiles new_fault_rect = fault_merged.discretize_rectangular_tiles(tile_size=tile_size) # want to discard patches which don't slip in the event for quad in new_fault_rect: # find nearest triangular patches (to check slip is non-zero on the quads which get passed to the json) approx_centroid = np.mean(quad, axis=0) # find closest patches is only in 2d so then need to check depth nearest_patches_ids = faults.find_closest_patches(approx_centroid[0], approx_centroid[1]) nearest_patches = [faults.patch_dic[patch_id] for patch_id in nearest_patches_ids] min_z_diff = min([np.abs(patch.centre[2] - approx_centroid[2]) for patch in nearest_patches]) nearest_patches_z = [patch for patch in nearest_patches if np.abs(patch.centre[2] - approx_centroid[2]) == min_z_diff] patch_ids = [patch.patch_number for patch in nearest_patches_z] # find associated slip slip = 0. for patch_id in patch_ids: if patch_id in self.patch_numbers: slip_index = np.searchsorted(self.patch_numbers, patch_id) slip_mag = self.patch_slip[slip_index] slip += slip_mag mean_slip = slip / len(patch_ids) # check slip isn't 0/ less than a mm if mean_slip > 1.e-3: # convert coordinates to lat lon if needed if not wgs84: x2, y2 = transformer_nztm2wgs.transform(quad[:, 0], quad[:, 1]) else: x2, y2 = quad[:, 0], quad[:, 1] # and round to 3dp quad[:, 0] = np.round(x2, decimals=3) quad[:, 1] = np.round(y2, decimals=3) # need depths to be positive and in km new_depths = np.zeros(np.shape(quad[:, 2])) for i, depth in enumerate(quad[:, 2]): # prevent 0s being written as -0.0 if isclose(depth, 0.0, abs_tol=0.2): new_depths[i] = 0. else: new_depths[i] = np.round(-1. * depth / 1000., decimals=2) quad[:, 2] = new_depths # sort into correct order for json (shallowest points first) quad_sorted = np.sort(quad, axis=0) poly_list.append(Polygon(quad_sorted)) except: print(f"{fName} could not be discretised - event slip distribution will be incomplete") all_segs = MultiPolygon(poly_list) polys = gpd.GeoSeries(all_segs) # write to initial json polys.to_file(os.path.join(outdir, f'{self.event_id}.json'), driver='GeoJSON') # read back in to edit properties with open(os.path.join(outdir, f'{self.event_id}.json'), 'r') as jfile: pjson = json.load(jfile) pjson['metadata'] = metadata pjson['features'][0]['properties'] = {"rupture type": "rupture extent"} # hack to remove extra set of brackets pjson['features'][0]['geometry']['coordinates'] = [ [item[0] for item in pjson['features'][0]['geometry']['coordinates'][:]]] # and write back out to json with open(os.path.join(outdir, f'{self.event_id}.json'), 'w') as jfile: json.dump(pjson, jfile)
[docs] def event_to_OQ_xml(self, fault_model: RsqSimMultiFault, path2cfm: str, catalogue_version: str = 'v2', xml_dir: str = 'OQ_events', subd_tile_size: float = 15000., tile_size: float = 5000., probability: float = 0.9, tectonic_region: str = 'NZ',min_mag: float=6.0,hypocentre: list = None,nztm2wgs: bool = True): """ Export the event as an OpenQuake multi-planar rupture XML file. Discretises faults exceeding ``min_mag`` into rectangular tiles, selects tiles near ruptured patches, and writes an OpenQuake NRML XML file to ``<xml_dir>/<event_id>/event_<event_id>.xml``. Parameters ---------- fault_model : RsqSimMultiFault Fault model for segment merging and discretisation. path2cfm : str Path to the Community Fault Model directory. catalogue_version : str, optional ``"v1"`` or ``"v2"``. Defaults to ``"v2"``. xml_dir : str, optional Output directory. Defaults to ``"OQ_events"``. subd_tile_size : float, optional Tile size (m) for subduction faults. Defaults to 15000 m. tile_size : float, optional Tile size (m) for crustal faults. Defaults to 5000 m. probability : float, optional Probability of occurrence for the rupture. Defaults to 0.9. tectonic_region : str, optional Tectonic region tag. Defaults to ``"NZ"``. min_mag : float, optional Only include faults whose moment corresponds to at least this magnitude. Defaults to 6.0. hypocentre : list or None, optional ``[x, y, z]`` hypocentre override. If ``None``, uses the event hypocentre. nztm2wgs : bool, optional If ``True`` (default), convert NZTM coordinates to WGS84 before writing. """ assert os.path.exists(path2cfm), "Path to CFM does not exist" if catalogue_version == 'v2': fault_model.make_v2_name_dic(path2cfm=path2cfm) name_dict = fault_model.v2_name_dic else: name_dict = dict(zip(fault_model.names, fault_model.names)) unique_names = set(name_dict.values()) # which faults are involved in this event? M0s = self.make_fault_moment_dict(fault_model=fault_model, by_cfm_names=False) min_M0=10.**(1.5*(min_mag+6.03)) faults = RsqSimMultiFault(self.faults) faultNames = [fault.name for fault in self.faults] allFaults = [] for fault in faultNames: print(fault, M0s[fault]) if M0s[fault] > min_M0: allFaults.append(fault) subdFaults = np.unique([name_dict[name] for name in faultNames if fnmatch.fnmatch(name, "*puysegar*") or fnmatch.fnmatch(name, "*hikurangi*")]) outdir = os.path.join(xml_dir, f'{self.event_id}') if not os.path.exists(outdir): os.makedirs(outdir) poly_list = [] for fName in allFaults: try: # print(fName) # need to find all parts of the fault, then later select those which have non-zero slip fault_merged = fault_model.merge_segments(fName, name_dict=name_dict, fault_name=fName) if len(subdFaults) > 0 and fName in subdFaults: dip_angle = fault_merged.get_average_dip(subd_tile_size) new_fault_rect = fault_merged.discretize_rectangular_tiles(tile_size=subd_tile_size) else: # Average dip dip_angle = fault_merged.get_average_dip() # Discretize into rectangular tiles new_fault_rect = fault_merged.discretize_rectangular_tiles(tile_size=tile_size) # want to discard patches which don't slip in the event for quad in new_fault_rect: # find nearest patch approx_centroid = np.mean(quad, axis=0) # find closest patches is only in 2d so then need to check depth nearest_patches_ids = faults.find_closest_patches(approx_centroid[0], approx_centroid[1]) nearest_patches = [faults.patch_dic[patch_id] for patch_id in nearest_patches_ids] min_z_diff = min([np.abs(patch.centre[2] - approx_centroid[2]) for patch in nearest_patches]) nearest_patches_z = [patch for patch in nearest_patches if np.abs(patch.centre[2] - approx_centroid[2]) == min_z_diff] patch_ids = [patch.patch_number for patch in nearest_patches_z] # find associated slip slip = 0. for patch_id in patch_ids: if patch_id in self.patch_numbers: slip_index = np.searchsorted(self.patch_numbers, patch_id) slip_mag = self.patch_slip[slip_index] slip += slip_mag mean_slip = slip / len(patch_ids) # check slip isn't 0/ less than a mm if mean_slip > 1.e-3: poly_list.append(Polygon(quad)) except: print(f"{fName} could not be discretised - event slip distribution will be incomplete") # set parameters for OQ # parameters for openquake if hypocentre is None: hypocentre = np.array([self.x, self.y, self.z]) print(f"Check hypocentre: {hypocentre}") evname = f'event_{self.event_id}_OQ' event_asOQ = OpenQuakeMultiSquareRupture(tile_list=poly_list, probability=probability, magnitude=self.mw, rake=self.mean_rake, hypocentre=hypocentre, event_id=self.event_id, name=evname, tectonic_region=tectonic_region,nztm2wgs=nztm2wgs) event_asOQ.to_oq_xml(write=os.path.join(outdir, f'event_{self.event_id}.xml')) return
[docs] def slip_dist_quads_ktree(self, fault_model: RsqSimMultiFault, quads_dict: dict, min_moment: float = 1.e+18, min_slip: float = 0.,threshold_for_inclusion: float = 0.5, slip_per_quad: bool = False): """ Find ruptured quadrilateral tiles for each fault using a KD-tree. For each fault segment that exceeds ``min_moment``, uses a KD-tree on pre-computed quad centroids to assign triangular patches to quads, then returns quads where the fraction of ruptured patches exceeds ``threshold_for_inclusion``. Parameters ---------- fault_model : RsqSimMultiFault Fault model providing patch dictionaries and patch centres. quads_dict : dict Mapping of fault segment name to a ``(n, 4, 3)`` array of quad corner coordinates (NZTM, m). min_moment : float, optional Minimum moment (N·m) for a fault to be considered. Defaults to 1×10¹⁸ N·m. min_slip : float, optional Minimum patch slip (m) for a patch to count as ruptured. Defaults to 0. threshold_for_inclusion : float, optional Fraction of a quad's assigned patches that must be ruptured for the quad to be included. Defaults to 0.5. slip_per_quad : bool, optional If ``True``, compute and store average slip per quad (not yet returned in output). Returns ------- dict Mapping of fault segment name to a ``(n_ruptured, 4, 3)`` array of ruptured quad corner coordinates. """ moment_dict = self.make_fault_moment_dict(fault_model=fault_model, min_m0=min_moment, by_cfm_names=False) moment_quads = [key for key in moment_dict.keys() if key in quads_dict.keys()] missing_quads = [key for key in moment_dict.keys() if key not in moment_quads] if missing_quads: print("Warning: some fault segments have no associated quads") print(missing_quads) fault_patches = np.array(list(fault_model.patch_dic.keys())) ruptured_quads_dict = {} for name in moment_quads: segment_quads = quads_dict[name] if segment_quads.size > 0: segment = fault_model.name_dic[name] segment_quad_centroids = segment_quads.mean(axis=1) ruptured_patch_numbers = self.patch_numbers[np.isin(self.patch_numbers, fault_patches) & (self.patch_slip > min_slip)] segment_patch_centroids = segment.get_patch_centres() ruptured_patch_centroids = segment_patch_centroids[np.isin(segment.patch_numbers, ruptured_patch_numbers)] tree = KDTree(segment_quad_centroids) _, all_patch_indices = tree.query(segment_patch_centroids) _, ruptured_patch_indices = tree.query(ruptured_patch_centroids) ruptured_quads = [] quad_slip = [] for i, quad in enumerate(segment_quads): num_triangles = (all_patch_indices == i).sum() num_ruptured_triangles = (ruptured_patch_indices == i).sum() if num_ruptured_triangles / num_triangles > threshold_for_inclusion: ruptured_quads.append(quad) if slip_per_quad: areas = np.array([segment.patch_dic[patch_number].area for patch_number in ruptured_patch_indices]) slip = self.patch_slip[np.isin(self.patch_numbers, ruptured_patch_indices)] average_slip = np.average(slip, weights=areas) quad_slip.append(average_slip) ruptured_quads_dict[name] = np.array(ruptured_quads) return ruptured_quads_dict
[docs] def to_oq_points(self, fault_model: RsqSimMultiFault): """ Convert event to OpenQuake point-source representation. Not yet implemented. Parameters ---------- fault_model : RsqSimMultiFault Fault model (reserved for future use). """ pass
[docs] def get_crustal_component(self, fault_model: RsqSimMultiFault, crustal_names: list, min_moment: float = 1.e+18, min_slip: float = 0.): """ Compute moment, mean rake, and hypocentre for crustal fault components. Parameters ---------- fault_model : RsqSimMultiFault Fault model providing patch dictionaries and patch moments. crustal_names : list of str Names of fault segments to treat as crustal. min_moment : float, optional Minimum moment (N·m) for a fault to be considered. Defaults to 1×10¹⁸ N·m. min_slip : float, optional Minimum patch slip (m) to count as ruptured. Defaults to 0. Returns ------- tuple of (float, float, numpy.ndarray) or None ``(mw, mean_rake, hypocentre_xyz)`` for the crustal component, or ``None`` if no crustal faults qualify. """ moment_dict = self.make_fault_moment_dict(fault_model=fault_model, min_m0=min_moment, by_cfm_names=False) if any([name in crustal_names for name in moment_dict.keys()]): crustal_moment = 0. fault_patches = np.array(list(fault_model.patch_dic.keys())) rake_list = [] moment_list = [] hypocentre_time = 1.e20 hypocentre = None for name, moment in moment_dict.items(): if name in crustal_names: crustal_moment += moment segment = fault_model.name_dic[name] ruptured_patch_numbers = self.patch_numbers[ np.isin(self.patch_numbers, fault_patches) & (self.patch_slip > min_slip)] rakes = segment.rake[np.isin(segment.patch_numbers, ruptured_patch_numbers)] rake_list.append(rakes) patch_moment = segment.patch_moments[np.isin(segment.patch_numbers, ruptured_patch_numbers)] moment_list.append(patch_moment) patch_numbers_this_segment = segment.patch_numbers[ np.isin(segment.patch_numbers, ruptured_patch_numbers)] patch_times_this_segment = self.patch_time[np.isin(self.patch_numbers, patch_numbers_this_segment)] segment_first_patch = fault_model.patch_dic[ patch_numbers_this_segment[np.argmin(patch_times_this_segment)]] if patch_times_this_segment.min() < hypocentre_time: hypocentre_time = patch_times_this_segment.min() hypocentre = segment_first_patch.centre patch_moment_array = np.hstack(moment_list) rake_array = np.hstack(rake_list) mean_rake = weighted_circular_mean(rake_array, patch_moment_array) crustal_mw = m0_to_mw(crustal_moment) crustal_hypocentre = hypocentre hyp_x, hyp_y, hyp_z = crustal_hypocentre return crustal_mw, mean_rake, np.array([hyp_x, hyp_y, hyp_z]) else: return None
[docs] def get_subduction_component(self, fault_model: RsqSimMultiFault, subduction_names: list, min_moment: float = 1.e+18, min_slip: float = 0.): """ Compute moment, mean rake, and hypocentre for subduction fault components. Parameters ---------- fault_model : RsqSimMultiFault Fault model providing patch dictionaries and patch moments. subduction_names : list of str Names of fault segments to treat as subduction interface. min_moment : float, optional Minimum moment (N·m) for a fault to be considered. Defaults to 1×10¹⁸ N·m. min_slip : float, optional Minimum patch slip (m) to count as ruptured. Defaults to 0. Returns ------- tuple of (float, float, numpy.ndarray) or None ``(mw, mean_rake, hypocentre_xyz)`` for the subduction component, or ``None`` if no subduction faults qualify. """ moment_dict = self.make_fault_moment_dict(fault_model=fault_model, min_m0=min_moment, by_cfm_names=False) if any([name in subduction_names for name in moment_dict.keys()]): subduction_moment = 0. fault_patches = np.array(list(fault_model.patch_dic.keys())) rake_list = [] moment_list = [] hypocentre_time = 1.e20 hypocentre = None for name, moment in moment_dict.items(): if name in subduction_names: subduction_moment += moment segment = fault_model.name_dic[name] ruptured_patch_numbers = self.patch_numbers[ np.isin(self.patch_numbers, fault_patches) & (self.patch_slip > min_slip)] rakes = segment.rake[np.isin(segment.patch_numbers, ruptured_patch_numbers)] rake_list.append(rakes) patch_moment = segment.patch_moments[np.isin(segment.patch_numbers, ruptured_patch_numbers)] moment_list.append(patch_moment) patch_numbers_this_segment = segment.patch_numbers[np.isin(segment.patch_numbers, ruptured_patch_numbers)] patch_times_this_segment = self.patch_time[np.isin(self.patch_numbers, patch_numbers_this_segment)] segment_first_patch = fault_model.patch_dic[patch_numbers_this_segment[np.argmin(patch_times_this_segment)]] if patch_times_this_segment.min() < hypocentre_time: hypocentre_time = patch_times_this_segment.min() hypocentre = segment_first_patch.centre patch_moment_array = np.hstack(moment_list) rake_array = np.hstack(rake_list) mean_rake = weighted_circular_mean(rake_array, patch_moment_array) subduction_mw = m0_to_mw(subduction_moment) subduction_hypocentre = hypocentre hyp_x, hyp_y, hyp_z = subduction_hypocentre return subduction_mw, mean_rake, np.array([hyp_x, hyp_y, hyp_z]) else: return None
[docs] def slip_dist_to_quads(self, fault_model: RsqSimMultiFault, path2cfm: str, catalogue_version: str = 'v2', vtk_dir: str = 'fault_vtks', subd_tile_size: float = 15000., tile_size: float = 5000., ): """ Write per-fault slip distributions as quadrilateral VTK meshes. Discretises each involved fault into rectangular tiles, assigns the average slip of the nearest triangular patches to each tile, and writes a VTK file per fault to ``<vtk_dir>/event_<id>_quad/<fault_name>_<event_id>.vtk``. Parameters ---------- fault_model : RsqSimMultiFault Fault model for segment merging and discretisation. path2cfm : str Path to the Community Fault Model directory. catalogue_version : str, optional ``"v1"`` or ``"v2"``. Defaults to ``"v2"``. vtk_dir : str, optional Output directory for VTK files. Defaults to ``"fault_vtks"``. subd_tile_size : float, optional Tile size (m) for subduction faults. Defaults to 15000 m. tile_size : float, optional Tile size (m) for crustal faults. Defaults to 5000 m. """ assert os.path.exists(path2cfm), "Path to CFM does not exist" if catalogue_version == 'v2': fault_model.make_v2_name_dic(path2cfm=path2cfm) name_dict = fault_model.v2_name_dic else: name_dict = dict(zip(fault_model.names, fault_model.names)) unique_names = set(name_dict.values()) # which faults are involved in this event? faults = RsqSimMultiFault(self.faults) faultNames = faults.names # find corresponding larger/cfm faults allFaults = np.unique([name_dict[name] for name in faultNames]) subdFaults = np.unique([name_dict[name] for name in faultNames if fnmatch.fnmatch(name, "*puysegar*") or fnmatch.fnmatch(name, "*hikurangi*")]) outdir = os.path.join(vtk_dir, f'event_{self.event_id}_quad') if not os.path.exists(outdir): os.makedirs(outdir) poly_list = [] for fName in allFaults: try: # print(fName) # need to find all parts of the fault, then later select those which have non-zero slip fault_merged = fault_model.merge_segments(fName, name_dict=name_dict, fault_name=fName) if len(subdFaults) > 0 and fName in subdFaults: dip_angle = fault_merged.get_average_dip(subd_tile_size) new_fault_rect = fault_merged.discretize_rectangular_tiles(tile_size=subd_tile_size) else: # Average dip dip_angle = fault_merged.get_average_dip() # Discretize into rectangular tiles new_fault_rect = fault_merged.discretize_rectangular_tiles(tile_size=tile_size) # Turn into quadrilateral mesh vertices = np.unique(np.vstack([rect for rect in new_fault_rect]), axis=0) vertex_dict = {tuple(vertex): i for i, vertex in enumerate(vertices)} new_fault_rect_indices = [[vertex_dict[tuple(vertex)] for vertex in quad] for quad in new_fault_rect] mesh = meshio.Mesh(points=vertices, cells={"quad": new_fault_rect_indices}) # assign slip to these slip_list = [] for quad in new_fault_rect: # find nearest patch approx_centroid = np.mean(quad, axis=0) # find closest patches is only in 2d so then need to check depth nearest_patches_ids = faults.find_closest_patches(approx_centroid[0], approx_centroid[1]) nearest_patches = [faults.patch_dic[patch_id] for patch_id in nearest_patches_ids] min_z_diff = min([np.abs(patch.centre[2] - approx_centroid[2]) for patch in nearest_patches]) nearest_patches_z = [patch for patch in nearest_patches if np.abs(patch.centre[2] - approx_centroid[2]) == min_z_diff] patch_ids = [patch.patch_number for patch in nearest_patches_z] # find associated slip slip = 0. for patch_id in patch_ids: if patch_id in self.patch_numbers: slip_index = np.searchsorted(self.patch_numbers, patch_id) slip_mag = self.patch_slip[slip_index] slip += slip_mag mean_slip = slip / len(patch_ids) slip_list.append(mean_slip) mesh.cell_data = {"slip": np.array(slip_list)} meshio.write(os.path.join(outdir, f'{fName}_{self.event_id}.vtk'), mesh, file_format="vtk") except: print(f"{fName} could not be discretised - event slip distribution will be incomplete") return
[docs] class OpenQuakeMultiSquareRupture: """ Multi-planar rupture representation for the OpenQuake engine. Packages a list of rectangular tile polygons (each representing a fault-surface patch) together with event metadata into the structure needed to write OpenQuake NRML ``multiPlanesRupture`` XML. Attributes ---------- patches : list of OpenQuakeRectangularPatch Individual rectangular patches converted from input polygons. prob : float Probability of occurrence. magnitude : float Moment magnitude. rake : float Mean rake (degrees). hypocentre : numpy.ndarray Hypocentre coordinates ``[x, y, z]`` in the input CRS. inv_prob : float Complementary probability (``1 - prob``). hyp_depth : float Hypocentre depth in km (positive downward). hyp_lon, hyp_lat : float Hypocentre longitude and latitude in WGS84 degrees. event_id : int Catalogue event identifier. name : str Descriptive rupture name. tectonic_region : str Tectonic region tag for OpenQuake. """ def __init__(self, tile_list: list[Polygon], probability: float, magnitude: float, rake: float, hypocentre: np.ndarray, event_id: int, name: str = "Subduction earthquake", tectonic_region: str = "subduction", nztm2wgs: bool = True): """ Initialise an OpenQuake multi-planar rupture. Parameters ---------- tile_list : list of Polygon Shapely polygons representing rectangular fault-surface tiles (in NZTM coordinates if ``nztm2wgs=True``). probability : float Probability of occurrence. magnitude : float Moment magnitude. rake : float Mean rake in degrees. hypocentre : numpy.ndarray ``[x, y, z]`` coordinates of the hypocentre. If ``nztm2wgs=True``, ``x`` and ``y`` are NZTM eastings and northings (m); ``z`` is depth in m (negative downward). event_id : int Catalogue event identifier. name : str, optional Rupture name written to the XML. Defaults to ``"Subduction earthquake"``. tectonic_region : str, optional Tectonic region tag. Defaults to ``"subduction"``. nztm2wgs : bool, optional If ``True`` (default), transform hypocentre from NZTM to WGS84 longitude/latitude. """
[docs] self.patches = [OpenQuakeRectangularPatch.from_polygon(tile) for tile in tile_list]
[docs] self.prob = probability
[docs] self.magnitude = magnitude
[docs] self.rake = rake
[docs] self.hypocentre = hypocentre
[docs] self.inv_prob = 1. - probability
[docs] self.hyp_depth = -1.e-3 * hypocentre[-1]
if nztm2wgs: self.hyp_lon, self.hyp_lat = transformer_nztm2wgs.transform(hypocentre[0], hypocentre[1]) else: self.hyp_lon, self.hyp_lat = hypocentre[0:2]
[docs] self.event_id = event_id
[docs] self.name = name
[docs] self.tectonic_region = tectonic_region
[docs] def to_oq_xml(self, write: str = None): """ Build (and optionally write) an OpenQuake NRML rupture XML element. Constructs a ``<multiPlanesRupture>`` element containing magnitude, rake, hypocenter, and one ``<planarSurface>`` per patch. Parameters ---------- write : str or None, optional Output file path. If the path does not end with ``".xml"`` the extension is appended automatically. If ``None``, the XML is not written to disk. Returns ------- xml.etree.ElementTree.Element The root ``<nrml>`` element (regardless of whether the file was written). """ source_element = ElemTree.Element("nrml", attrib={"xmlns": "http://openquake.org/xmlns/nrml/0.4", "xmlns:gml": "http://www.opengis.net/gml" }) multi_patch_elem = ElemTree.Element("multiPlanesRupture", attrib={"probs_occur": f"{self.inv_prob:.4f} {self.prob:.4f}"}) mag_elem = ElemTree.Element("magnitude") mag_elem.text = f"{self.magnitude:.2f}" multi_patch_elem.append(mag_elem) rake_elem = ElemTree.Element("rake") rake_elem.text = f"{self.rake:.1f}" multi_patch_elem.append(rake_elem) hyp_element = ElemTree.Element("hypocenter", attrib={"depth": f"{self.hyp_depth:.3f}", "lat": f"{self.hyp_lat:.4f}", "lon": f"{self.hyp_lon:.4f}"}) multi_patch_elem.append(hyp_element) for patch in self.patches: multi_patch_elem.append(patch.to_oq_xml()) source_element.append(multi_patch_elem) if write is not None: assert isinstance(write, str) if write[-4:] != ".xml": write += ".xml" elmstr = ElemTree.tostring(source_element, encoding="UTF-8", method="xml") xml_dom = minidom.parseString(elmstr) pretty_xml_str = xml_dom.toprettyxml(indent=" ", encoding="utf-8") with open(write, "wb") as xml: xml.write(pretty_xml_str) return source_element