"""
RSQSim earthquake catalogue management and analysis.
Provides :class:`RsqSimCatalogue` for loading, filtering, and analysing
earthquake catalogues produced by RSQSim, along with module-level helper
functions for reading Bruce Shaw run directories and combining spatial
bounds.
The catalogue stores per-event metadata (time, magnitude, location,
area, duration) as a Pandas DataFrame alongside parallel flat arrays of
per-patch slip, timing, and event identifiers that can be lazily resolved
to :class:`~rsqsim_api.catalogue.event.RsqSimEvent` objects.
"""
from typing import Any
from collections import abc, Counter, defaultdict
from collections.abc import Iterable
import os
import pickle
from multiprocessing import Queue, Process
from multiprocessing import sharedctypes
import pandas as pd
import numpy as np
import pyproj
from matplotlib import pyplot as plt
from matplotlib import cm, colors
from shapely.geometry import Polygon
import geopandas as gpd
from numpy.random import default_rng
from numba import njit, prange, types, typed
from rsqsim_api.fault.multifault import RsqSimMultiFault, RsqSimSegment
from rsqsim_api.catalogue.event import RsqSimEvent
from rsqsim_api.io.read_utils import read_earthquake_catalogue, read_binary, catalogue_columns, read_csv_and_array, read_text
from rsqsim_api.io.write_utils import write_catalogue_dataframe_and_arrays
from rsqsim_api.tsunami.tsunami import SeaSurfaceDisplacements
from rsqsim_api.visualisation.utilities import plot_coast, plot_background, plot_hillshade_niwa, plot_lake_polygons, \
plot_river_lines, plot_highway_lines, plot_boundary_polygons
from rsqsim_api.io.bruce_shaw_utilities import bruce_subduction
import rsqsim_api.io.rsqsim_constants as csts
from rsqsim_api.catalogue.utilities import calculate_scaling_c, calculate_stress_drop, \
summary_statistics, median_cumulant, mw_to_m0, jit_intersect
[docs]
sensible_ranges = {"t0": (0, 1.e15), "m0": (1.e13, 1.e24), "mw": (2.5, 10.0),
"x": (-180., 1.e8), "y": (-90., 1.e8), "z": (-1.e6, 0),
"area": (0, 1.e12), "dt": (0, 1200)}
[docs]
list_file_suffixes = (".pList", ".eList", ".dList", ".tList")
[docs]
def get_mask(ev_ls, min_patches, faults_with_patches, event_list, patch_list, queue):
"""
Worker function: compute per-event patch masks and post results to a queue.
For each event index in ``ev_ls``, finds the patches belonging to each
fault segment and masks out segments that contribute fewer than
``min_patches`` patches. Results are posted to ``queue`` as
``(event_index, ev_indices, mask)`` tuples.
Parameters
----------
ev_ls : array-like of int
Event indices to process in this worker.
min_patches : int
Minimum patches per fault for a segment to be retained.
faults_with_patches : dict
Mapping of patch number to fault segment number (integer IDs
rather than objects, for serialisability).
event_list : array-like of int
Flat array of event IDs for every patch entry in the catalogue.
patch_list : array-like of int
Flat array of patch IDs parallel to ``event_list``.
queue : multiprocessing.Queue
Output queue; tuples of ``(event_id, ev_indices, mask)`` are
placed here.
"""
patches = np.asarray(patch_list)
events = np.asarray(event_list)
unique_events, unique_event_indices = np.unique(events, return_index=True)
unique_dic = {unique_events[i]: (unique_event_indices[i], unique_event_indices[i + 1]) for i in
range(len(unique_events) - 1)}
unique_dic[unique_events[-1]] = (unique_event_indices[-1], len(events))
for index in ev_ls:
ev_range = unique_dic[index]
ev_indices = np.arange(ev_range[0], ev_range[1])
patch_numbers = patches[ev_indices]
patches_on_fault = defaultdict(list)
[patches_on_fault[faults_with_patches[i]].append(i) for i in patch_numbers]
mask = np.full(len(patch_numbers), True)
for fault in patches_on_fault.keys():
patches_on_this_fault = patches_on_fault[fault]
if len(patches_on_this_fault) < min_patches:
patch_on_fault_indices = np.searchsorted(patch_numbers, patches_on_this_fault)
mask[patch_on_fault_indices] = False
queue.put((index, ev_indices, mask))
[docs]
class RsqSimCatalogue:
"""
Container for an RSQSim earthquake catalogue with per-patch slip data.
Stores the catalogue as a Pandas DataFrame together with four
parallel flat arrays (event IDs, patch IDs, slip values, and rupture
times) that cover the slip distribution for every event. Provides
methods to load data from RSQSim binary output files, filter by
magnitude, time, location, or fault, retrieve fully populated
:class:`~rsqsim_api.catalogue.event.RsqSimEvent` objects, and
produce diagnostic plots.
Attributes
----------
t0, m0, mw : numpy.ndarray or None
Origin time (s), scalar moment (N·m), and moment magnitude
arrays from the catalogue DataFrame (set lazily).
x, y, z : numpy.ndarray or None
Hypocentre NZTM coordinates (m).
area, dt : numpy.ndarray or None
Rupture areas (m²) and durations (s).
"""
def __init__(self):
"""Initialise an empty catalogue; use class methods to populate."""
# Essential attributes
self._catalogue_df = None
self._event_list = None
self._patch_list = None
self._patch_time_list = None
self._patch_slip = None
self._accumulated_slip = None
self._event_mean_slip = None
self._event_length = None
self._event_mean_sdr = None
self._event_length = None
# Useful attributes
self.t0, self.m0, self.mw = (None,) * 3
self.x, self.y, self.z = (None,) * 3
self.area, self.dt = (None,) * 2
@property
[docs]
def catalogue_df(self):
"""pandas.DataFrame: Per-event catalogue with 8 numeric columns."""
return self._catalogue_df
@catalogue_df.setter
def catalogue_df(self, dataframe: pd.DataFrame):
assert dataframe.columns.size == 8, "Should have 8 columns"
assert all([col.dtype in ("float", "int") for i, col in dataframe.items()])
dataframe.columns = catalogue_columns
self._catalogue_df = dataframe
[docs]
def check_list(self, data_list: np.ndarray, data_type: str):
"""
Validate a flat patch/event list array before assignment.
Parameters
----------
data_list : numpy.ndarray
1-D array to validate.
data_type : str
``"i"`` for integer arrays (patch/event lists) or ``"d"``
for floating-point arrays (slip/time lists).
Raises
------
AttributeError
If the catalogue DataFrame has not been loaded yet.
AssertionError
If ``data_type``, dtype, or dimensionality are invalid.
"""
assert data_type in ("i", "d")
if self.catalogue_df is None:
raise AttributeError("Read in main catalogue (eqs.*.out) before list files")
if data_type == "i":
assert data_list.dtype.char in np.typecodes['AllInteger']
else:
assert data_list.dtype.char in np.typecodes['AllFloat']
assert data_list.ndim == 1, "Expecting 1D array as input"
return
@property
[docs]
def event_list(self):
"""numpy.ndarray: Flat integer array of event IDs, one per patch entry."""
return self._event_list
@event_list.setter
def event_list(self, data_list: np.ndarray):
self.check_list(data_list, data_type="i")
if not len(np.unique(data_list)) == len(self.catalogue_df):
print(len(np.unique(data_list)),len(self.catalogue_df))
raise ValueError("Numbers of events in catalogue and supplied list are different!")
self._event_list = data_list
@property
[docs]
def patch_list(self):
"""numpy.ndarray: Flat integer array of patch IDs, parallel to ``event_list``."""
return self._patch_list
@patch_list.setter
def patch_list(self, data_list: np.ndarray):
self.check_list(data_list, data_type="i")
self._patch_list = data_list
@property
[docs]
def patch_time_list(self):
"""numpy.ndarray: Rupture times (s) for each patch entry."""
return self._patch_time_list
@patch_time_list.setter
def patch_time_list(self, data_list: np.ndarray):
self.check_list(data_list, data_type="d")
self._patch_time_list = data_list
@property
[docs]
def patch_slip(self):
"""numpy.ndarray: Slip magnitudes (m) for each patch entry."""
return self._patch_slip
@patch_slip.setter
def patch_slip(self, data_list: np.ndarray):
self.check_list(data_list, data_type="d")
self._patch_slip = data_list
@property
[docs]
def accumulated_slip(self):
"""dict: Mapping of patch ID to total slip accumulated over all events."""
if self._accumulated_slip is None:
self.assign_accumulated_slip()
return self._accumulated_slip
@property
[docs]
def event_mean_slip(self):
"""dict or None: Mapping of event ID to mean slip (m); populated by :meth:`assign_event_mean_slip`."""
return self._event_mean_slip
@property
[docs]
def event_mean_sdr(self):
"""dict or None: Mapping of event ID to ``[strike, dip, rake]``; populated by :meth:`assign_event_mean_sdr`."""
return self._event_mean_sdr
@property
[docs]
def event_length(self):
"""dict or None: Mapping of event ID to rupture length (m); populated by :meth:`assign_event_length`."""
return self._event_length
@classmethod
[docs]
def from_dataframe(cls, dataframe: pd.DataFrame, reproject: list = None):
"""
Construct a catalogue from an existing DataFrame.
Parameters
----------
dataframe : pandas.DataFrame
DataFrame with 8 numeric columns in the standard RSQSim
catalogue column order (t0, m0, mw, x, y, z, area, dt).
reproject : list or tuple of int, optional
``[in_epsg, out_epsg]`` pair. If provided, the ``x`` and
``y`` columns are reprojected using PyProj.
Returns
-------
RsqSimCatalogue
Catalogue populated with the DataFrame (no patch lists).
"""
rsqsim_cat = cls()
if reproject is not None:
assert isinstance(reproject, (tuple, list))
assert len(reproject) == 2
in_epsg, out_epsg = reproject
transformer = pyproj.Transformer.from_crs(in_epsg, out_epsg, always_xy=True)
new_x, new_y = transformer.transform(dataframe["x"], dataframe["y"])
dataframe["x"] = new_x
dataframe["y"] = new_y
rsqsim_cat.catalogue_df = dataframe
return rsqsim_cat
@classmethod
[docs]
def from_catalogue_file(cls, filename: str, reproject: list = None):
"""
Construct a catalogue by reading an RSQSim ``eqs.*.out`` file.
Parameters
----------
filename : str
Path to the RSQSim catalogue file.
reproject : list or tuple of int, optional
``[in_epsg, out_epsg]`` pair for coordinate reprojection.
Returns
-------
RsqSimCatalogue
Catalogue populated from the file (no patch lists).
"""
assert os.path.exists(filename)
catalogue_df = read_earthquake_catalogue(filename)
rsqsim_cat = cls.from_dataframe(catalogue_df, reproject=reproject)
return rsqsim_cat
@classmethod
[docs]
def from_catalogue_file_and_lists(cls, catalogue_file: str, list_file_directory: str,
list_file_prefix: str, read_extra_lists: bool = False, reproject: list = None, serial: bool = False, endian: str = "little"):
"""
Construct a fully populated catalogue from an RSQSim output directory.
Reads the ``eqs.*.out`` catalogue file together with the four
binary list files (``.pList``, ``.eList``, ``.dList``, ``.tList``).
Parameters
----------
catalogue_file : str
Path to the ``eqs.*.out`` catalogue file.
list_file_directory : str
Directory containing the ``.pList``/``.eList``/``.dList``/
``.tList`` binary files.
list_file_prefix : str
Common prefix for the list files (e.g. ``"rundir4627"``).
read_extra_lists : bool, optional
Unused; reserved for future extra list support.
reproject : list or tuple of int, optional
``[in_epsg, out_epsg]`` pair for coordinate reprojection.
serial : bool, optional
If ``True``, read list files as plain text rather than
binary. Defaults to ``False``.
endian : str, optional
Byte order of the binary list files: ``"little"`` (default)
or ``"big"``.
Returns
-------
RsqSimCatalogue
Fully populated catalogue with patch slip distribution.
"""
assert os.path.exists(catalogue_file)
assert os.path.exists(list_file_directory)
standard_list_files = [os.path.join(list_file_directory, list_file_prefix + suffix)
for suffix in list_file_suffixes]
for fname, suffix in zip(standard_list_files, list_file_suffixes):
if not os.path.exists(fname):
raise FileNotFoundError("{} file required to populate event slip distributions".format(suffix))
# Read in catalogue to dataframe and initiate class instance
rcat = cls.from_catalogue_file(catalogue_file, reproject=reproject)
num_events = len(rcat.catalogue_df)
if serial:
event_list = read_text(standard_list_files[1], format="i") - 1
patch_list = read_text(standard_list_files[0], format="i") - 1
patch_slip, patch_time_list = [read_text(fname, format="d") for fname in
standard_list_files[2:]]
else:
patch_list = read_binary(standard_list_files[0], format="i",endian=endian) - 1
event_list = read_binary(standard_list_files[1], format="i",endian=endian) - 1
patch_slip, patch_time_list = [read_binary(fname, format="d",endian=endian) for fname in standard_list_files[2:]]
unique_events = np.unique(event_list)
if len(unique_events) == num_events:
rcat.event_list = event_list
rcat.patch_list = patch_list
rcat.patch_slip, rcat.patch_time_list = patch_slip, patch_time_list
else:
print("Event list does not match catalogue length. Trying to fix...")
if not len(unique_events) > num_events:
raise ValueError("Event list is too short!")
last_event = rcat.catalogue_df.index[-1]
short_events = event_list[event_list <= last_event]
short_patches = patch_list[:len(short_events)]
short_slip = patch_slip[:len(short_events)]
short_time = patch_time_list[:len(short_events)]
rcat.event_list = short_events
rcat.patch_list = short_patches
rcat.patch_slip, rcat.patch_time_list = short_slip, short_time
print("Fixed!")
return rcat
@classmethod
[docs]
def from_dataframe_and_arrays(cls, dataframe: pd.DataFrame, event_list: np.ndarray, patch_list: np.ndarray,
patch_slip: np.ndarray, patch_time_list: np.ndarray,reproject: list = None):
"""
Construct a catalogue from a DataFrame and pre-loaded patch arrays.
Parameters
----------
dataframe : pandas.DataFrame
Per-event catalogue DataFrame.
event_list : numpy.ndarray
Flat 1-D integer array of event IDs, one per patch entry.
patch_list : numpy.ndarray
Flat 1-D integer array of patch IDs parallel to ``event_list``.
patch_slip : numpy.ndarray
Flat 1-D float array of slip magnitudes (m).
patch_time_list : numpy.ndarray
Flat 1-D float array of rupture times (s).
reproject : list or tuple of int, optional
``[in_epsg, out_epsg]`` pair for coordinate reprojection.
Returns
-------
RsqSimCatalogue
Fully populated catalogue.
"""
assert all([arr.ndim == 1 for arr in [event_list, patch_list, patch_slip, patch_time_list]])
list_len = event_list.size
assert all([arr.size == list_len for arr in [patch_list, patch_slip, patch_time_list]])
assert len(np.unique(event_list)) == len(dataframe), "Number of events in dataframe and lists do not match"
rcat = cls.from_dataframe(dataframe,reproject=reproject)
rcat.event_list, rcat.patch_list, rcat.patch_slip, rcat.patch_time_list = [event_list, patch_list,
patch_slip, patch_time_list]
return rcat
@classmethod
[docs]
def from_csv_and_arrays(cls, prefix: str, read_index: bool = True, reproject: list = None):
"""
Construct a catalogue from a CSV and companion NumPy array files.
Parameters
----------
prefix : str
File path prefix used by :func:`~rsqsim_api.io.read_utils.read_csv_and_array`.
read_index : bool, optional
Whether to read the DataFrame index from the CSV.
Defaults to ``True``.
reproject : list or tuple of int, optional
``[in_epsg, out_epsg]`` pair for coordinate reprojection.
Returns
-------
RsqSimCatalogue
Fully populated catalogue.
"""
df, event_ls, patch_ls, slip_ls, time_ls = read_csv_and_array(prefix, read_index=read_index)
return cls.from_dataframe_and_arrays(df, event_ls, patch_ls, slip_ls, time_ls, reproject=reproject)
[docs]
def write_csv_and_arrays(self, prefix: str, directory: str = None, write_index: bool = True):
"""
Write the catalogue DataFrame and patch arrays to CSV and NumPy files.
Parameters
----------
prefix : str
File name prefix for the output files.
directory : str or None, optional
Output directory. Created if it does not exist. If
``None``, files are written to the current directory.
write_index : bool, optional
Whether to write the DataFrame index to the CSV.
Defaults to ``True``.
"""
assert prefix, "Empty prefix!"
if directory is not None:
if not os.path.exists(directory):
os.mkdir(directory)
write_catalogue_dataframe_and_arrays(prefix, self, directory=directory, write_index=write_index)
[docs]
def first_event(self, fault_model: RsqSimMultiFault):
"""Return the first event in the catalogue as an :class:`~rsqsim_api.catalogue.event.RsqSimEvent`."""
return self.events_by_number(int(self.catalogue_df.index[0]), fault_model)[0]
[docs]
def nth_event(self,fault_model: RsqSimMultiFault,n: int):
"""
Return the n-th event (1-based) from the catalogue.
Parameters
----------
fault_model : RsqSimMultiFault
Fault model for resolving patch slip distributions.
n : int
1-based index of the event to retrieve.
Returns
-------
RsqSimEvent
"""
assert isinstance(n,int)
return self.events_by_number(int(self.catalogue_df.index[n-1]), fault_model)[0]
[docs]
def first_n_events(self, number_of_events: int, fault_model: RsqSimMultiFault):
"""
Return the first ``number_of_events`` events from the catalogue.
Parameters
----------
number_of_events : int
Number of events to retrieve.
fault_model : RsqSimMultiFault
Fault model for resolving patch slip distributions.
Returns
-------
list of RsqSimEvent
"""
return self.events_by_number(list(self.catalogue_df.index[:number_of_events]), fault_model)
[docs]
def all_events(self, fault_model: RsqSimMultiFault):
"""
Return all events in the catalogue.
Parameters
----------
fault_model : RsqSimMultiFault
Fault model for resolving patch slip distributions.
Returns
-------
list of RsqSimEvent
"""
return self.events_by_number(list(self.catalogue_df.index), fault_model)
[docs]
def event_outlines(self, fault_model: RsqSimMultiFault, event_numbers: Iterable = None):
"""
Return the Shapely exterior geometries for a set of events.
Parameters
----------
fault_model : RsqSimMultiFault
Fault model for resolving patch slip distributions.
event_numbers : iterable of int, optional
Event IDs to retrieve. If ``None``, all events are used.
Returns
-------
list of shapely.geometry.base.BaseGeometry
Unary-union patch outlines for each event.
"""
if event_numbers is not None:
events = self.events_by_number(event_numbers, fault_model)
else:
events = self.all_events(fault_model)
return [event.exterior for event in events]
[docs]
def filter_df(self, min_t0: fint = None, max_t0: fint = None, min_m0: fint = None,
max_m0: fint = None, min_mw: fint = None, max_mw: fint = None,
min_x: fint = None, max_x: fint = None, min_y: fint = None, max_y: fint = None,
min_z: fint = None, max_z: fint = None, min_area: fint = None, max_area: fint = None,
min_dt: fint = None, max_dt: fint = None):
"""
Return a filtered view of the catalogue DataFrame.
All parameters are optional range constraints. Values outside
``sensible_ranges`` raise a ``ValueError``.
Parameters
----------
min_t0, max_t0 : float or int, optional
Origin time bounds (s).
min_m0, max_m0 : float or int, optional
Scalar moment bounds (N·m).
min_mw, max_mw : float or int, optional
Moment magnitude bounds.
min_x, max_x, min_y, max_y : float or int, optional
Horizontal coordinate bounds (NZTM metres).
min_z, max_z : float or int, optional
Depth bounds (m, negative downward).
min_area, max_area : float or int, optional
Rupture area bounds (m²).
min_dt, max_dt : float or int, optional
Duration bounds (s).
Returns
-------
pandas.DataFrame or None
Filtered DataFrame, or ``None`` if no conditions were
specified.
"""
assert isinstance(self.catalogue_df, pd.DataFrame), "Read in data first!"
conditions_str = ""
range_checks = [(min_t0, max_t0, "t0"), (min_m0, max_m0, "m0"), (min_mw, max_mw, "mw"),
(min_x, max_x, "x"), (min_y, max_y, "y"), (min_z, max_z, "z"),
(min_area, max_area, "area"), (min_dt, max_dt, "dt")]
if all([any([a is not None for a in (min_m0, max_m0)]),
any([a is not None for a in (min_mw, max_mw)])]):
print("Probably no need to filter by both M0 and Mw...")
for range_check in range_checks:
min_i, max_i, label = range_check
if any([a is not None for a in (min_i, max_i)]):
for a in (min_i, max_i):
if not any([a is None, isinstance(a, (float, int))]):
raise ValueError("Min and max {} should be int or float".format(label))
sensible_min, sensible_max = sensible_ranges[label]
if min_i is None:
min_i = sensible_min
if max_i is None:
max_i = sensible_max
sensible_conditions = all([sensible_min <= a <= sensible_max for a in (min_i, max_i)])
if not sensible_conditions:
raise ValueError("{} values should be between {:e} and {:e}".format(label, sensible_min,
sensible_max))
range_condition_str = "{} >= {:e} & {} < {:e}".format(label, min_i, label, max_i)
if not conditions_str:
conditions_str += range_condition_str
else:
conditions_str += " & "
conditions_str += range_condition_str
if not conditions_str:
print("No valid conditions... Copying original catalogue")
return
trimmed_df = self.catalogue_df[self.catalogue_df.eval(conditions_str)]
return trimmed_df
[docs]
def filter_whole_catalogue(self, min_t0: fint = None, max_t0: fint = None, min_m0: fint = None,
max_m0: fint = None, min_mw: fint = None, max_mw: fint = None,
min_x: fint = None, max_x: fint = None, min_y: fint = None, max_y: fint = None,
min_z: fint = None, max_z: fint = None, min_area: fint = None, max_area: fint = None,
min_dt: fint = None, max_dt: fint = None, reset_index: bool = False):
"""
Return a new catalogue filtered by catalogue-level parameter ranges.
Parameters are identical to :meth:`filter_df` with an additional
``reset_index`` option.
Parameters
----------
min_t0, max_t0, min_m0, max_m0, min_mw, max_mw, min_x, max_x,
min_y, max_y, min_z, max_z, min_area, max_area, min_dt, max_dt :
See :meth:`filter_df`.
reset_index : bool, optional
If ``True``, reindex the filtered catalogue starting from 0.
Returns
-------
RsqSimCatalogue
New catalogue containing only events within the specified
parameter ranges.
"""
trimmed_df = self.filter_df(min_t0, max_t0, min_m0, max_m0, min_mw, max_mw, min_x, max_x, min_y, max_y,
min_z, max_z, min_area, max_area, min_dt, max_dt)
event_indices = np.where(np.isin(self.event_list, np.array(trimmed_df.index)))[0]
trimmed_event_ls = self.event_list[event_indices]
trimmed_patch_ls = self.patch_list[event_indices]
trimmed_patch_slip = self.patch_slip[event_indices]
trimmed_patch_time = self.patch_time_list[event_indices]
if reset_index:
trimmed_df.reset_index(inplace=True, drop=True)
unique_indices = np.unique(trimmed_event_ls)
index_array = np.zeros(trimmed_event_ls.shape, dtype=int)
for new_i, old_i in enumerate(unique_indices):
index_array[np.where(trimmed_event_ls == old_i)] = new_i
else:
index_array = trimmed_event_ls
rcat = self.from_dataframe_and_arrays(trimmed_df, event_list=index_array, patch_list=trimmed_patch_ls,
patch_slip=trimmed_patch_slip, patch_time_list=trimmed_patch_time)
return rcat
[docs]
def filter_by_events(self, event_number: int | Iterable[int], reset_index: bool = False):
"""
Return a new catalogue containing only the specified events.
Parameters
----------
event_number : int or iterable of int
One or more event IDs to retain.
reset_index : bool, optional
If ``True``, reindex the filtered catalogue from 0.
Returns
-------
RsqSimCatalogue
Catalogue containing only the selected events.
"""
if isinstance(event_number, (int, np.int32,np.int64)):
ev_ls = [event_number]
else:
assert isinstance(event_number, abc.Iterable), "Expecting either int or array/list of ints"
ev_ls = list(event_number)
assert all([isinstance(a, (int, np.int32,np.int64)) for a in ev_ls])
trimmed_df = self.catalogue_df.loc[ev_ls]
event_indices = np.where(np.isin(self.event_list, np.array(trimmed_df.index)))[0]
trimmed_event_ls = self.event_list[event_indices]
trimmed_patch_ls = self.patch_list[event_indices]
trimmed_patch_slip = self.patch_slip[event_indices]
trimmed_patch_time = self.patch_time_list[event_indices]
if reset_index:
trimmed_df.reset_index(inplace=True, drop=True)
unique_indices = np.unique(trimmed_event_ls)
index_array = np.zeros(trimmed_event_ls.shape, dtype=int)
for new_i, old_i in enumerate(unique_indices):
index_array[np.where(trimmed_event_ls == old_i)] = new_i
print(index_array)
else:
index_array = trimmed_event_ls
rcat = self.from_dataframe_and_arrays(trimmed_df, event_list=index_array, patch_list=trimmed_patch_ls,
patch_slip=trimmed_patch_slip, patch_time_list=trimmed_patch_time)
return rcat
[docs]
def drop_few_patches(self, fault_model: RsqSimMultiFault, min_patches: int = 3):
"""
Return a catalogue with events having fewer than ``min_patches`` dropped.
Parameters
----------
fault_model : RsqSimMultiFault
Fault model for resolving patch distributions.
min_patches : int, optional
Minimum number of patches required for an event to be
retained. Defaults to 3.
Returns
-------
RsqSimCatalogue
Filtered catalogue.
"""
event_list = self.events_by_number(self.catalogue_df.index, fault_model, min_patches=min_patches)
new_ids = [ev.event_id for ev in event_list if len(ev.patches) >= min_patches]
print(len(event_list), new_ids)
return self.filter_by_events(new_ids)
[docs]
def filter_by_fault(self, fault_or_faults: RsqSimMultiFault | RsqSimSegment | list | tuple,
minimum_patches_per_fault: int = None):
"""
Return a catalogue filtered to events that rupture specified faults.
An event is included if at least one of its patches belongs to
any fault in ``fault_or_faults``.
Parameters
----------
fault_or_faults : RsqSimSegment, RsqSimMultiFault, list, or tuple
Fault(s) to filter on.
minimum_patches_per_fault : int or None, optional
If given, an event is only included if it ruptures at least
this many patches on at least one of the target faults.
Returns
-------
RsqSimCatalogue or None
Filtered catalogue, or ``None`` if no matching events found.
"""
if isinstance(fault_or_faults,RsqSimSegment):
fault_ls = [fault_or_faults]
elif isinstance(fault_or_faults,RsqSimMultiFault):
fault_ls=fault_or_faults.faults
else:
fault_ls = list(fault_or_faults)
if minimum_patches_per_fault is not None:
assert isinstance(minimum_patches_per_fault, int)
assert minimum_patches_per_fault > 0
# Collect all fault numbers
all_patches = []
for fault in fault_ls:
all_patches += list(fault.patch_dic.keys())
patch_numbers = np.unique(np.array(all_patches))
#in1d will return true if any value in patch_numbers matches a value in patch_list
#i.e. don't have to rupture all faults in the list, only 1
patch_indices = np.where(np.isin(self.patch_list, patch_numbers))[0]
selected_events = self.event_list[patch_indices]
selected_patches = self.patch_list[patch_indices]
if selected_events.size > 0:
if minimum_patches_per_fault is not None:
events_gt_min = []
for fault in fault_ls:
fault_patches = np.array(list(fault.patch_dic.keys()))
fault_patch_indices = np.where(np.isin(selected_patches, fault_patches))[0]
fault_event_list = selected_events[fault_patch_indices]
events_counter = Counter(fault_event_list)
events_sufficient_patches = np.array([ev for ev, count in events_counter.items()
if count >= minimum_patches_per_fault])
events_gt_min += list(events_sufficient_patches)
event_numbers = np.unique(np.array(events_gt_min))
event_indices = np.where(np.isin(self.event_list, event_numbers))[0]
else:
event_numbers = np.unique(selected_events)
event_indices = np.where(np.isin(self.event_list, event_numbers))[0]
trimmed_df = self.catalogue_df.loc[event_numbers]
filtered_cat = self.from_dataframe(trimmed_df)
filtered_cat.event_list = self.event_list[event_indices]
filtered_cat.patch_list = self.patch_list[event_indices]
filtered_cat.patch_slip = self.patch_slip[event_indices]
filtered_cat.patch_time_list = self.patch_time_list[event_indices]
return filtered_cat
else:
print("No events found on the following faults:")
for fault in fault_ls:
print(fault.name)
return None
[docs]
def filter_not_on_fault(self, fault_or_faults: RsqSimMultiFault | RsqSimSegment | list | tuple,
minimum_patches_per_fault: int = None):
"""
Return a catalogue with events that rupture specified faults removed.
The complement of :meth:`filter_by_fault`: events are rejected if
they rupture any patch on the target faults.
Parameters
----------
fault_or_faults : RsqSimSegment, RsqSimMultiFault, list, or tuple
Fault(s) whose events should be excluded.
minimum_patches_per_fault : int or None, optional
If given, an event is only rejected if it ruptures at least
this many patches on one of the target faults.
Returns
-------
RsqSimCatalogue or None
Filtered catalogue, or ``None`` if no events remain, or
``self`` if no events were on the target faults.
"""
if isinstance(fault_or_faults,RsqSimSegment):
fault_ls = [fault_or_faults]
elif isinstance(fault_or_faults,RsqSimMultiFault):
fault_ls=fault_or_faults.faults
else:
fault_ls = list(fault_or_faults)
if minimum_patches_per_fault is not None:
assert isinstance(minimum_patches_per_fault, int)
assert minimum_patches_per_fault > 0
# Collect all patches we don't want to be involved
all_patches = []
for fault in fault_ls:
all_patches += list(fault.patch_dic.keys())
patch_numbers = np.unique(np.array(all_patches))
#in1d will return true if any value in patch_numbers matches a value in patch_list
#i.e. don't have to rupture all faults in the list, only 1
# choose all events where this is the case, then remove them from the catalogue
patch_indices = np.where(np.isin(self.patch_list, patch_numbers))[0]
selected_events = self.event_list[patch_indices]
selected_patches = self.patch_list[patch_indices]
if selected_events.size > 0:
if minimum_patches_per_fault is not None:
events_gt_min = []
for fault in fault_ls:
fault_patches = np.array(list(fault.patch_dic.keys()))
fault_patch_indices = np.where(np.isin(selected_patches, fault_patches))[0]
fault_event_list = selected_events[fault_patch_indices]
events_counter = Counter(fault_event_list)
events_sufficient_patches = np.array([ev for ev, count in events_counter.items()
if count >= minimum_patches_per_fault])
events_gt_min += list(events_sufficient_patches)
event_numbers2reject = np.unique(np.array(events_gt_min))
event_indices = np.where(np.isin(self.event_list, event_numbers2reject))[0]
else:
event_numbers2reject = np.unique(selected_events)
event_indices = np.where(np.isin(self.event_list, event_numbers2reject))[0]
if len(event_indices) > 0:
trimmed_df = self.catalogue_df.drop(event_numbers2reject)
filtered_cat = self.from_dataframe(trimmed_df)
filtered_cat.event_list = np.delete(self.event_list,event_indices)
filtered_cat.patch_list = np.delete(self.patch_list,event_indices)
filtered_cat.patch_slip = np.delete(self.patch_slip,event_indices)
filtered_cat.patch_time_list = np.delete(self.patch_time_list,event_indices)
return filtered_cat
else:
print("No remaining events")
return None
else:
print("No events found which include:")
for fault in fault_ls:
print(fault.name)
return self
[docs]
def find_surface_rupturing_events(self,fault_model: RsqSimMultiFault,min_slip: float =0.1, method: str = 'vertex',
n_patches: int = 1, max_depth: float = -1000., n_faults: int =1, write_flt_dict: bool = False,
faults2ignore: [list,str] = 'hikurangi'):
"""
Find catalogue event IDs that include surface rupture.
Parameters
----------
fault_model : RsqSimMultiFault
Fault model used to resolve patch slip distributions.
min_slip : float, optional
Minimum slip (m) for a patch to count as ruptured.
Defaults to 0.1 m.
method : str, optional
``"vertex"`` (use the shallowest vertex) or ``"centroid"``
(use the centroid depth) for the depth criterion.
n_patches : int, optional
Minimum number of qualifying surface patches per fault.
Defaults to 1.
max_depth : float, optional
Depth threshold (m, negative). Defaults to -1000 m.
n_faults : int, optional
Minimum number of surface-rupturing faults per event.
Defaults to 1.
write_flt_dict : bool, optional
If ``True``, also return a dict mapping event IDs to lists
of surface-rupturing fault names.
faults2ignore : list of str or str, optional
Fault name(s) to exclude. Defaults to ``"hikurangi"``.
Returns
-------
list of int
Event IDs with surface rupture.
dict, optional
Only returned when ``write_flt_dict=True``; maps event ID
to the list of surface-rupturing fault names.
"""
assert method in ['centroid','vertex'],"Method must be centroid or vertex"
assert max_depth < 0., "depths should be negative"
if write_flt_dict:
flt_dict = {}
surface_ev_ids = []
for event in self.all_events(fault_model=fault_model):
surface_faults = event.find_surface_faults(fault_model, min_slip=min_slip, method=method,
n_patches=n_patches, max_depth=max_depth, faults2ignore=faults2ignore)
if len(surface_faults) >= n_faults:
surface_ev_ids.append(event.event_id)
if write_flt_dict:
flt_dict[event.event_id] = surface_faults
if write_flt_dict:
return surface_ev_ids, flt_dict
else:
return surface_ev_ids
[docs]
def find_multi_fault(self,fault_model: RsqSimMultiFault):
"""
Identify events that rupture more than one named fault segment.
Note that segmentation is based on the fault model's segment
names and may not correspond to other multi-fault definitions.
Parameters
----------
fault_model : RsqSimMultiFault
Fault model for resolving patch slip distributions.
Returns
-------
multifault : list of RsqSimEvent
Events that involve more than one fault segment.
multi_cat : RsqSimCatalogue
Sub-catalogue containing only those events.
"""
multifault = [ev for ev in self.all_events(fault_model) if ev.num_faults > 1]
# and filter catalogue to just these events
multifault_ids = [event.event_id for event in multifault]
multi_cat = self.filter_by_events(multifault_ids)
return multifault,multi_cat
[docs]
def filter_by_region(self, region: Polygon | gpd.GeoSeries, fault_model: RsqSimMultiFault,
event_numbers: Iterable = None):
"""
Filter events by geographic region.
Not yet implemented.
Parameters
----------
region : Polygon or geopandas.GeoSeries
Region geometry (reserved for future use).
fault_model : RsqSimMultiFault
Fault model (reserved for future use).
event_numbers : iterable of int, optional
Reserved for future use.
"""
pass
[docs]
def filter_by_patch_numbers(self, patch_numbers):
"""
Return a catalogue containing only events that ruptured specified patches.
Parameters
----------
patch_numbers : array-like of int
Patch IDs to filter on; events with at least one matching
patch are retained.
Returns
-------
RsqSimCatalogue or None
Filtered catalogue, or ``None`` if no events match.
"""
patch_indices = np.where(np.isin(self.patch_list, patch_numbers))[0]
event_numbers = self.event_list[patch_indices]
if event_numbers.size:
trimmed_df = self.catalogue_df.loc[np.unique(event_numbers)]
filtered_cat = self.from_dataframe(trimmed_df)
filtered_cat.event_list = event_numbers
filtered_cat.patch_list = self.patch_list[patch_indices]
filtered_cat.patch_slip = self.patch_slip[patch_indices]
filtered_cat.patch_time_list = self.patch_time_list[patch_indices]
return filtered_cat
else:
print("No events found!")
return
[docs]
def events_by_number(self, event_number: int | Iterable[int], fault_model: RsqSimMultiFault,
child_processes: int = 0, min_patches: int = 1) -> list[RsqSimEvent]:
"""
Retrieve one or more events as fully populated :class:`~rsqsim_api.catalogue.event.RsqSimEvent` objects.
Parameters
----------
event_number : int or iterable of int
Event ID(s) to retrieve.
fault_model : RsqSimMultiFault
Fault model for resolving patch objects and fault associations.
child_processes : int, optional
If > 0, distribute event processing across this many child
processes using shared memory. Defaults to 0 (serial).
min_patches : int, optional
Minimum patches per fault segment for an event to be
considered. Defaults to 1.
Returns
-------
list of RsqSimEvent
Fully populated events in the order they were requested.
"""
assert isinstance(fault_model,RsqSimMultiFault), "Fault model required"
if isinstance(event_number, (int, np.int32, np.int64)):
ev_ls = [event_number]
else:
assert isinstance(event_number, abc.Iterable), "Expecting either int or array/list of ints"
ev_ls = list(event_number)
assert all([isinstance(a, (int, np.int32, np.int64)) for a in ev_ls])
out_events = []
cat_dict = self.catalogue_df.to_dict(orient='index')
# Stores the first and last index for each event
unique_events, unique_event_indices = np.unique(self.event_list, return_index=True)
unique_dic = {unique_events[i]: (unique_event_indices[i], unique_event_indices[i + 1]) for i in
range(len(unique_events) - 1)}
unique_dic[unique_events[-1]] = (unique_event_indices[-1], len(self.event_list))
if child_processes == 0:
for index in ev_ls:
ev_range = unique_dic[index]
ev_indices = np.arange(ev_range[0], ev_range[1])
patch_numbers = self.patch_list[ev_indices]
patch_slip = self.patch_slip[ev_indices]
patch_time_list = self.patch_time_list[ev_indices]
ev_data = cat_dict[index]
event_i = RsqSimEvent.from_earthquake_list(ev_data['t0'], ev_data['m0'], ev_data['mw'], ev_data['x'],
ev_data['y'], ev_data['z'], ev_data['area'], ev_data['dt'],
patch_numbers=patch_numbers,
patch_slip=patch_slip,
patch_time=patch_time_list,
fault_model=fault_model, min_patches=min_patches,
event_id=index)
out_events.append(event_i)
else:
# Using shared data between processes
event_list = np.ctypeslib.as_ctypes(self.event_list)
raw_event_list = sharedctypes.RawArray(event_list._type_, event_list)
patch_list = np.ctypeslib.as_ctypes(self.patch_list)
raw_patch_list = sharedctypes.RawArray(event_list._type_, patch_list)
queue = Queue() # queue to handle processed events
# much faster to serialize when dealing with numbers instead of objects
faults_with_patches = {patch_num: seg.segment_number for (patch_num, seg) in
fault_model.faults_with_patches.items()}
ev_chunks = np.array_split(np.array(ev_ls),
child_processes) # break events into equal sized chunks for each child process
processes = []
for i in range(child_processes):
p = Process(target=get_mask, args=(
ev_chunks[i], min_patches, faults_with_patches, raw_event_list, raw_patch_list, queue))
p.start()
processes.append(p)
num_events = len(ev_ls)
while len(out_events) < num_events:
index, ev_indices, mask = queue.get()
patch_numbers = self.patch_list[ev_indices]
patch_slip = self.patch_slip[ev_indices]
patch_time = self.patch_time_list[ev_indices]
ev_data = cat_dict[index]
event_i = RsqSimEvent.from_multiprocessing(ev_data['t0'], ev_data['m0'], ev_data['mw'], ev_data['x'],
ev_data['y'], ev_data['z'], ev_data['area'], ev_data['dt'],
patch_numbers, patch_slip, patch_time,
fault_model, mask, event_id=index)
out_events.append(event_i)
for p in processes:
p.join()
return out_events
[docs]
def assign_accumulated_slip(self):
"""
Compute and cache total accumulated slip per patch.
Sums ``patch_slip`` over all events for each unique patch ID
and stores the result in :attr:`_accumulated_slip`. Subsequent
accesses via :attr:`accumulated_slip` return the cached value.
"""
accumulated_slip = {}
for patch_i in np.unique(self.patch_list):
matching = (self.patch_list == patch_i)
accumulated_slip_i = self.patch_slip[matching].sum()
accumulated_slip[patch_i] = accumulated_slip_i
self._accumulated_slip = accumulated_slip
[docs]
def assign_event_mean_slip(self, fault_model: RsqSimMultiFault):
"""
Compute and cache mean slip per event.
Parameters
----------
fault_model : RsqSimMultiFault
Fault model for resolving patch objects.
"""
event_mean_slip = {}
for event in self.all_events(fault_model):
event_mean_slip[event.event_id] = event.mean_slip
self._event_mean_slip = event_mean_slip
[docs]
def assign_event_mean_sdr(self, fault_model: RsqSimMultiFault):
"""
Compute and cache mean strike, dip, and rake per event.
Parameters
----------
fault_model : RsqSimMultiFault
Fault model for resolving patch objects.
"""
event_mean_sdr = {}
for event in self.all_events(fault_model):
event_mean_sdr[event.event_id] = [round(event.mean_strike),round(event.mean_dip),round(event.mean_rake)]
self._event_mean_sdr = event_mean_sdr
[docs]
def assign_event_length(self, fault_model: RsqSimMultiFault):
"""
Compute and cache rupture length per event.
Parameters
----------
fault_model : RsqSimMultiFault
Fault model for resolving patch and fault objects.
"""
event_lengths = {}
for event in self.all_events(fault_model):
event.find_length()
event_lengths[event.event_id] = event.length
self._event_length = event_lengths
[docs]
def plot_accumulated_slip_2d(self, fault_model: RsqSimMultiFault, subduction_cmap: str = "plasma",
crustal_cmap: str = "viridis", show: bool = True,
write: str = None, subplots=None, global_max_sub_slip: int = 0,
global_max_slip: int = 0,
figsize: tuple = (6.4, 4.8), hillshading_intensity: float = 0.0, bounds: tuple = None,
plot_rivers: bool = True, plot_lakes: bool = True,
plot_highways: bool = True, plot_boundaries: bool = False,
create_background: bool = False,
coast_only: bool = True, hillshade_cmap: colors.LinearSegmentedColormap = cm.terrain,
plot_log_scale: bool = False, log_cmap: str = "magma", log_min: float = 1.0,
log_max: float = 100., plot_traces: bool = True, trace_colour: str = "pink",
min_slip_percentile: float = None, min_slip_value: float = None,
plot_zeros: bool = True):
"""
Plot a 2-D map of accumulated slip across all events in the catalogue.
Sums slip over the entire catalogue per patch and plots the
result with separate colourmaps for subduction-interface and
crustal faults.
Parameters
----------
fault_model : RsqSimMultiFault
Fault model whose patches are coloured by accumulated slip.
subduction_cmap : str, optional
Colourmap for subduction-interface patches. Defaults to
``"plasma"``.
crustal_cmap : str, optional
Colourmap for crustal patches. Defaults to ``"viridis"``.
show : bool, optional
If ``True`` (default), call ``plt.show()``.
write : str or None, optional
File path to save the figure.
subplots : tuple or str or None, optional
Existing ``(fig, ax)`` or pickled figure to plot on.
global_max_sub_slip, global_max_slip : float, optional
Fixed colourbar maxima; 0 means per-data maximum.
figsize : tuple of float, optional
Figure size in inches.
hillshading_intensity : float, optional
Hillshading intensity.
bounds : tuple or None, optional
Map extent ``(x_min, y_min, x_max, y_max)`` in NZTM.
plot_rivers, plot_lakes, plot_highways, plot_boundaries : bool, optional
Toggle background map layers.
create_background : bool, optional
Render full background map.
coast_only : bool, optional
Render coastline only as background.
hillshade_cmap : LinearSegmentedColormap, optional
Hillshade colourmap.
plot_log_scale : bool, optional
Use logarithmic colour scaling.
log_cmap, log_min, log_max : optional
Parameters for log-scale colouring.
plot_traces : bool, optional
Plot fault surface traces.
trace_colour : str, optional
Colour for fault traces.
min_slip_percentile : float or None, optional
Slip percentile threshold below which patches are zeroed.
min_slip_value : float or None, optional
Slip value threshold (m) below which patches are zeroed.
plot_zeros : bool, optional
If ``True`` (default), plot zero-slip patches.
Returns
-------
list
Matplotlib ``PolyCollection`` objects for each fault plotted.
"""
if bounds is None and fault_model.bounds is not None:
bounds = fault_model.bounds
if all([min_slip_percentile is not None, min_slip_value is None]):
min_slip = np.percentile(self.patch_slip, min_slip_percentile)
else:
min_slip = min_slip_value
if subplots is not None:
if isinstance(subplots, str):
# Assume pickled figure
with open(subplots, "rb") as pfile:
loaded_subplots = pickle.load(pfile)
fig, ax = loaded_subplots
else:
# Assume matplotlib objects
fig, ax = subplots
elif create_background:
# TODO: add this directory + file to repo data/other_lines/nz-lake-polygons-topo-1250k.shp
fig, ax = plot_background(figsize=figsize, hillshading_intensity=hillshading_intensity,
bounds=bounds, plot_rivers=plot_rivers, plot_lakes=plot_lakes,
plot_highways=plot_highways, plot_boundaries=plot_boundaries,
hillshade_cmap=hillshade_cmap)
elif coast_only:
fig, ax = plot_background(figsize=figsize, hillshading_intensity=hillshading_intensity,
bounds=bounds, plot_rivers=False, plot_lakes=False, plot_highways=False,
plot_boundaries=False, hillshade_cmap=hillshade_cmap)
else:
fig, ax = plt.subplots()
fig.set_size_inches(figsize)
# Find maximum slip on subduction interface
max_slip = 0
colour_dic = {}
for f_i, fault in enumerate(fault_model.faults):
if fault.name in bruce_subduction:
if plot_zeros:
colours = np.zeros(fault.patch_numbers.shape)
else:
colours = np.nan * np.ones(fault.patch_numbers.shape)
for local_id, patch_id in enumerate(fault.patch_numbers):
if patch_id in self.accumulated_slip.keys():
if min_slip is not None:
if self.accumulated_slip[patch_id] >= min_slip:
colours[local_id] = self.accumulated_slip[patch_id]
else:
if self.accumulated_slip[patch_id] > 0.:
colours[local_id] = self.accumulated_slip[patch_id]
colour_dic[f_i] = colours
if np.nanmax(colours) > max_slip:
max_slip = np.nanmax(colours)
max_slip = global_max_sub_slip if global_max_sub_slip > 0 else max_slip
plots = []
# Plot subduction interface
subduction_list = []
subduction_plot = None
for f_i, fault in enumerate(fault_model.faults):
if fault.name in bruce_subduction:
subduction_list.append(fault.name)
if plot_log_scale:
subduction_plot = ax.tripcolor(fault.vertices[:, 0], fault.vertices[:, 1], fault.triangles,
facecolors=colour_dic[f_i],
cmap=log_cmap, norm=colors.LogNorm(vmin=log_min, vmax=log_max))
else:
subduction_plot = ax.tripcolor(fault.vertices[:, 0], fault.vertices[:, 1], fault.triangles,
facecolors=colour_dic[f_i],
cmap=subduction_cmap, vmin=0., vmax=max_slip)
plots.append(subduction_plot)
max_slip = 0
colour_dic = {}
for f_i, fault in enumerate(fault_model.faults):
if fault.name not in bruce_subduction:
if plot_zeros:
colours = np.zeros(fault.patch_numbers.shape)
else:
colours = np.nan * np.ones(fault.patch_numbers.shape)
for local_id, patch_id in enumerate(fault.patch_numbers):
if patch_id in self.accumulated_slip.keys():
if min_slip is not None:
if self.accumulated_slip[patch_id] >= min_slip:
colours[local_id] = self.accumulated_slip[patch_id]
else:
if self.accumulated_slip[patch_id] > 0.:
colours[local_id] = self.accumulated_slip[patch_id]
colour_dic[f_i] = colours
if np.nanmax(colours) > max_slip:
max_slip = np.nanmax(colours)
max_slip = global_max_slip if global_max_slip > 0 else max_slip
crustal_plot = None
for f_i, fault in enumerate(fault_model.faults):
if fault.name not in bruce_subduction:
if plot_log_scale:
crustal_plot = ax.tripcolor(fault.vertices[:, 0], fault.vertices[:, 1], fault.triangles,
facecolors=colour_dic[f_i],
cmap=log_cmap, norm=colors.LogNorm(vmin=log_min, vmax=log_max))
else:
crustal_plot = ax.tripcolor(fault.vertices[:, 0], fault.vertices[:, 1], fault.triangles,
facecolors=colour_dic[f_i],
cmap=crustal_cmap, vmin=0., vmax=max_slip)
plots.append(crustal_plot)
if any([subplots is None, isinstance(subplots, str)]):
if plot_log_scale:
if subduction_list:
sub_cbar = fig.colorbar(subduction_plot, ax=ax)
sub_cbar.set_label("Slip (m)")
elif crustal_plot is not None:
crust_cbar = fig.colorbar(crustal_plot, ax=ax)
crust_cbar.set_label("Slip (m)")
else:
if subduction_list:
sub_cbar = fig.colorbar(subduction_plot, ax=ax)
sub_cbar.set_label("Subduction slip (m)")
if crustal_plot is not None:
crust_cbar = fig.colorbar(crustal_plot, ax=ax)
crust_cbar.set_label("Slip (m)")
plot_coast(ax=ax)
if write is not None:
fig.savefig(write, dpi=300)
if show:
plt.show()
else:
plt.close(fig)
if show and subplots is None:
plt.show()
return plots
[docs]
def plot_mfd(self, plot_type: str = 'differential' , nSamp: int = 1000,
window: float = 80.*csts.seconds_per_year, n_bins: int=50, instrumental_path: str = None, inst_year_min: float = 1940, show: bool = True,
write: str = None, tmin: float = None, tmax: float = None, depth_min: float = None, depth_max: float =None, mmin: float = 4.5, mmax: float = 9.5,
plot_corrected_instrumental: bool = True):
"""
Plot a magnitude–frequency distribution for the catalogue.
Produces a Gutenberg-Richter plot (annual rates vs Mw) in either
differential or cumulative form. Random time-window samples can
be overlaid to show variability.
Parameters
----------
plot_type : str, optional
``"differential"`` (default) or ``"cumulative"``.
nSamp : int, optional
Number of random time-window samples to draw for the
variability envelope. Defaults to 1000.
window : float, optional
Duration (s) of each random sample window. Defaults to
80 years.
n_bins : int, optional
Number of magnitude bins. Defaults to 50.
instrumental_path : str or None, optional
Path to an instrumental seismicity CSV file for comparison.
inst_year_min : float, optional
Minimum year for the instrumental catalogue. Defaults to
1940.
show : bool, optional
If ``True`` (default), call ``plt.show()``.
write : str or None, optional
File path to save the figure.
tmin, tmax : float or None, optional
Time range (s) to use. Defaults to the catalogue range.
depth_min, depth_max : float or None, optional
Depth range (km) for filtering the instrumental catalogue.
mmin, mmax : float, optional
Magnitude range for the histogram/plot. Defaults to
4.5–9.5.
plot_corrected_instrumental : bool, optional
If ``True`` (default), overlay a magnitude-corrected
cumulative curve for the instrumental catalogue.
"""
assert plot_type in ['differential','cumulative'], "plot_type must be one of cumulative or differential"
if instrumental_path is not None:
assert os.path.exists(instrumental_path), "Path to instrumental catalogue not found"
# Read in instrumental seismicity
seismicity = pd.read_csv(instrumental_path, converters={0: lambda s: str(s)}, infer_datetime_format=True)
#check for parameters and find them from catalogue if not specified
if tmin is None:
tmin=self.catalogue_df['t0'].min(axis=0)
if tmax is None:
tmax = self.catalogue_df['t0'].max(axis=0)
# find magnitudes for whole catalogue
rsqsim_mags = self.catalogue_df["mw"]
weightsyrs2 = np.ones(len(rsqsim_mags)) * csts.seconds_per_year / (tmax - tmin)
plt.figure()
if plot_type == "differential":
tints = []
for i in np.arange(1, nSamp, 1):
tinit = tmin + (rng.uniform() * (tmax - tmin - window))
tints.append(tinit)
tfin = tinit + window
mags = self.catalogue_df["mw"].loc[
(self.catalogue_df['t0'] > tinit) & (self.catalogue_df['t0'] <= tfin)]
weightsyrs = np.ones(len(mags)) * csts.seconds_per_year / window
plt.hist(mags, n_bins, weights=weightsyrs, range=(mmin, mmax), histtype='stepfilled', log=True,
edgecolor='lightgray', facecolor='lightgray', alpha=0.1)
# plot last sample with legend
plt.hist(mags, n_bins, weights=weightsyrs, range=(mmin, mmax), histtype='stepfilled', log=True, edgecolor='lightgray',
facecolor='lightgray', alpha=0.1, label=f"{window / csts.seconds_per_year:.0f} yr RSQsim samples")
# plot full histogram
plt.hist(rsqsim_mags, n_bins, weights=weightsyrs2, range=(mmin, mmax), histtype='step', log=True, label="RSQsim",
edgecolor='grey')
if instrumental_path is not None:
#trim to region of interest
x1 = self.catalogue_df['x'].min()
y1 = self.catalogue_df['y'].min()
x2 = self.catalogue_df['x'].max()
y2 = self.catalogue_df['y'].max()
maskmag = ((seismicity['Mpref'] >= mmin) & (seismicity['lons'] > x1) & (seismicity['lons'] < x2) & (
seismicity['lats'] > y1) & (seismicity['lats'] < y2) & (seismicity['year'] >= inst_year_min) & (
seismicity['depths'] > depth_min) & (seismicity['depths'] <= depth_max))
latest_year = np.max(seismicity['year'].loc[seismicity['year'] >= inst_year_min])
instrumental_mags = seismicity["Mpref"].loc[maskmag]
weightsyrs_IM = np.ones(np.size(instrumental_mags)) / np.ptp(
seismicity['year'].loc[seismicity['year'] >= inst_year_min])
plt.hist(instrumental_mags, n_bins, weights=weightsyrs_IM, range=(mmin, mmax), histtype='step',
log=True, label=f"{str(inst_year_min)}-{str(latest_year)}, Mw>{mmin:.1f}", edgecolor='b',
linewidth=1.5)
instrumental_b_value = calculate_b_value(instrumental_mags, time_interval_years=(latest_year - inst_year_min), min_mw=4.5, max_mw=8.5)
print(f"b-value for instrumental catalogue: {instrumental_b_value[0]:.2f}")
plt.ylim([0.0001, 1000])
elif plot_type == "cumulative":
tints = []
for i in np.arange(1, nSamp, 1):
tinit = tmin + (rng.uniform() * (tmax - tmin - window))
tints.append(tinit)
tfin = tinit + window
mags = self.catalogue_df["mw"].loc[
(self.catalogue_df['t0'] > tinit) & (self.catalogue_df['t0'] <= tfin)]
mag_sort = np.sort(mags)[::-1]
mag_cum = np.ones(shape=np.size(mag_sort))
mag_cum = np.cumsum(mag_cum)
weightsyrs = np.ones(len(mags)) * csts.seconds_per_year / (tfin - tinit)
plt.semilogy(mag_sort, mag_cum * weightsyrs, c='lightgray', alpha=0.6, linewidth=0.5)
plt.semilogy(mag_sort, mag_cum * weightsyrs, c='lightgray', alpha=0.6, linewidth=0.5,
label=f"{window / csts.seconds_per_year:.0f} yr samples")
new_sort = np.sort(rsqsim_mags)[::-1]
new_cum = np.ones(shape=np.size(new_sort))
new_cum = np.cumsum(new_cum)
plt.semilogy(new_sort, new_cum * weightsyrs2, c='grey', linewidth=1, label="RSQsim")
if instrumental_path is not None:
# trim to region of interest
x1 = self.catalogue_df['x'].min()
y1 = self.catalogue_df['y'].min()
x2 = self.catalogue_df['x'].max()
y2 = self.catalogue_df['y'].max()
maskmag = ((seismicity['Mpref'] >= mmin) & (seismicity['lons'] > x1) & (seismicity['lons'] < x2) & (
seismicity['lats'] > y1) & (seismicity['lats'] < y2) & (seismicity['year'] >= inst_year_min) & (
seismicity['depths'] > depth_min) & (seismicity['depths'] <= depth_max))
latest_year = np.max(seismicity['year'].loc[seismicity['year'] >= inst_year_min])
instrumental_mags = seismicity["Mpref"].loc[maskmag]
weightsyrs_IM = np.ones(np.size(instrumental_mags)) / np.ptp(
seismicity['year'].loc[seismicity['year'] >= inst_year_min])
IMsort = np.sort(instrumental_mags)[::-1]
IMcum = np.ones(shape=np.size(IMsort))
IMcum = np.cumsum(IMcum)
plt.semilogy(IMsort, IMcum * weightsyrs_IM[0], c='b', linewidth=1,
label=f'{inst_year_min} - {int(latest_year)}, Mw>{mmin:.1f}')
if plot_corrected_instrumental:
correctionFactor = 2 / 3.0 * 1 / 1.5
plt.semilogy(IMsort, IMcum * weightsyrs_IM[0] * correctionFactor, c='b', linestyle=':', linewidth=1,
label=f'{inst_year_min} - {int(latest_year)}, Mw>{mmin:.1f} corr')
xsortNZFull = IMsort[:]
ysortNZFull = IMcum * weightsyrs_IM[0]
plt.ylim([0.0001, 100])
plt.ylabel('N [$yr^{-1}$]')
plt.xlabel('M')
plt.tight_layout()
plt.legend()
if os.path.exists(os.path.dirname(write)):
plt.savefig(write, dpi=450)
else:
print("write directory not found")
if show:
plt.show()
[docs]
def plot_depth_hist(self,fault_model: RsqSimMultiFault, n_bins: int = 10, depth_min: float=None, depth_max: float=None, write: str = None, show: bool = True):
"""
Plot a histogram of hypocentral depths alongside fault-base depths.
Parameters
----------
fault_model : RsqSimMultiFault
Fault model used to extract fault base depths.
n_bins : int, optional
Number of depth bins. Defaults to 10.
depth_min : float or None, optional
Minimum depth (km) for the plot. Defaults to the shallowest
hypocentre.
depth_max : float or None, optional
Maximum depth (km) for the plot. Defaults to the deepest
hypocentre.
write : str or None, optional
File path to save the figure.
show : bool, optional
If ``True`` (default), display the figure.
"""
if depth_min is None:
depth_min=-0.001 * self.catalogue_df['z'].max(axis=0)
if depth_max is None:
depth_max = -0.001 * self.catalogue_df['z'].min(axis=0)
fig, ax = plt.subplots(1, 1)
depths = self.catalogue_df['z'] * -0.001
ax.hist(depths, n_bins, range=(depth_min, depth_max), orientation="horizontal", label="Hypocentral depths")
fault_depths = [fault.max_depth * -0.001 for fault in fault_model.faults]
ax.set_ylim([depth_max, depth_min])
plt.ylabel("Depth (km)")
ax.set_xlabel("# earthquakes")
ax2 = ax.twiny()
ax2.hist(fault_depths, orientation="horizontal", histtype='step', edgecolor='b',
label='Base of faults')
ax2.set_xlabel("# faults")
fig.legend(loc="lower right", bbox_to_anchor=(1, 0), bbox_transform=ax2.transAxes)
if os.path.exists(os.path.dirname(write)):
fig.savefig(write,dpi=450)
if show:
fig.show()
[docs]
def plot_mean_slip_vs_mag(self, fault_model: RsqSimMultiFault, show: bool = True,
write: str = None, plot_rel: bool = True):
"""
Scatter-plot mean patch slip vs moment magnitude for all events.
Parameters
----------
fault_model : RsqSimMultiFault
Fault model for computing mean slip per event.
show : bool, optional
If ``True`` (default), display the figure.
write : str or None, optional
File path to save the figure.
plot_rel : bool, optional
If ``True`` (default), overlay the best-fit trend and the
expected 1/3-slope scaling-law line.
"""
# check mean slip is assigned
if self.event_mean_slip is None:
self.assign_event_mean_slip(fault_model)
# create dictionary of magnitudes and mean slips
mag_mean_slip = {}
for event in self.all_events(fault_model):
mag = event.mw
mean_slip = self.event_mean_slip[event.event_id]
mag_mean_slip[mag] = mean_slip
# convert to data frame for easy plotting
slip_dict = pd.DataFrame.from_dict(mag_mean_slip, orient='index', columns=['Mean Slip'])
slip_dict.reset_index(inplace=True)
slip_dict.rename(columns={"index": "mag"}, inplace=True)
slip_dict["log_slip"] = np.log10(slip_dict["Mean Slip"])
# plot
ax = slip_dict.plot.scatter(x="mag", y="log_slip")
if plot_rel:
trend_func=np.polyfit(slip_dict["mag"],slip_dict["log_slip"],1)
trend_fit=np.poly1d(trend_func)
plt.plot(slip_dict["mag"],trend_fit(slip_dict["mag"]),"r--",label="y = {}".format(trend_fit))
offset=(1./3.)*(slip_dict["mag"][0])-slip_dict["log_slip"][0]
plt.plot(slip_dict["mag"],(1./3.)*(slip_dict["mag"])-offset
,"b--",label="y = 1/3 x - {:.2f}".format(offset))
plt.legend()
plt.xlabel("M$_W$")
plt.ylabel("log$_1$$_0$ (Mean Slip (m))")
if write is not None:
plt.savefig(write, dpi=300)
if show:
plt.show()
else:
plt.close()
[docs]
def plot_area_vs_mag(self, fault_model: RsqSimMultiFault, show: bool = True,
write: str = None, plot_rel: bool = True):
"""
Scatter-plot rupture area vs moment magnitude for all events.
Parameters
----------
fault_model : RsqSimMultiFault
Fault model for retrieving event areas.
show : bool, optional
If ``True`` (default), display the figure.
write : str or None, optional
File path to save the figure.
plot_rel : bool, optional
If ``True`` (default), overlay the best-fit log-linear trend.
"""
# create dictionary of magnitudes and areas
mag_area = {}
for event in self.all_events(fault_model):
mag = event.mw
mag_area[mag] = event.area
# convert to data frame for easy plotting
area_dict = pd.DataFrame.from_dict(mag_area, orient='index', columns=['Area'])
area_dict.reset_index(inplace=True)
area_dict.rename(columns={"index": "mag"}, inplace=True)
area_dict["log_area"] = np.log10(area_dict["Area"])
# plot
ax = area_dict.plot.scatter(x="mag", y="log_area")
if plot_rel:
log_fit=np.polyfit(area_dict["mag"],area_dict["log_area"],1)
log_func=np.poly1d(log_fit)
plt.plot(area_dict["mag"], log_func(area_dict["mag"]), 'r--',
label="y = {}".format(log_func))
plt.legend()
plt.xlabel("M$_W$")
plt.ylabel("log$_1$$_0$ (Area (m$^2$))")
if write is not None:
plt.savefig(write, dpi=300)
if show:
plt.show()
else:
plt.close()
[docs]
def stress_drops(self, stress_c: float = 2.44):
"""
Compute stress drops for all events.
Parameters
----------
stress_c : float, optional
Shape constant (default 2.44 for a circular crack).
Returns
-------
numpy.ndarray
Stress drop in Pa for each event.
"""
return calculate_stress_drop(self.m0, self.area, stress_c=stress_c)
[docs]
def scaling_c(self):
"""
Compute the Gutenberg-Richter scaling parameter c for all events.
Returns
-------
numpy.ndarray
Scaling parameter c (``Mw - log10(area) + 6``) per event.
"""
return calculate_scaling_c(self.mw, self.area)
[docs]
def scaling_summary_statistics(self, stress_c: float = 2.44):
"""
Return summary statistics of scaling parameters for the catalogue.
Parameters
----------
stress_c : float, optional
Shape constant passed to
:func:`~rsqsim_api.catalogue.utilities.summary_statistics`.
Returns
-------
pandas.Series
Summary statistics including max Mw and percentiles of c and
stress drop.
"""
return summary_statistics(self.catalogue_df, stress_c=stress_c)
[docs]
def all_slip_distributions_to_vtk(self, fault_model: RsqSimMultiFault, output_directory: str,
include_zeros: bool = False, min_slip_value: float = None):
"""
Write VTK slip distribution files for every event in the catalogue.
Parameters
----------
fault_model : RsqSimMultiFault
Fault model for resolving patch objects.
output_directory : str
Existing directory to write ``event<id>.vtk`` files.
include_zeros : bool, optional
If ``True``, include zero-slip patches. Defaults to
``False``.
min_slip_value : float or None, optional
Minimum slip threshold (m) passed to
:meth:`~rsqsim_api.catalogue.event.RsqSimEvent.slip_dist_to_vtk`.
"""
assert os.path.exists(output_directory), "Make directory before writing VTK"
for event in self.all_events(fault_model):
outfile_path = os.path.join(output_directory, f"event{event.event_id}.vtk")
event.slip_dist_to_vtk(outfile_path, include_zeros=include_zeros,min_slip_value=min_slip_value)
[docs]
def calculate_b_value(self, min_mw: float = 0.0, max_mw: float = 10.0, interval=0.1):
"""
Estimate the Gutenberg-Richter b-value for the catalogue.
Parameters
----------
min_mw : float, optional
Minimum magnitude bin centre to include. Defaults to 0.0.
max_mw : float, optional
Maximum magnitude bin centre to include. Defaults to 10.0.
interval : float, optional
Magnitude bin width. Defaults to 0.1.
Returns
-------
tuple of float
``(b_value, intercept)`` from the least-squares fit to the
log-linear cumulative MFD.
"""
time_interval = (self.catalogue_df['t0'].max() - self.catalogue_df['t0'].min())/csts.seconds_per_year
return calculate_b_value(self.catalogue_df["mw"], time_interval, min_mw=min_mw, max_mw=max_mw, interval=interval)
[docs]
def calculate_dominant_magnitudes(self, fault_model: RsqSimMultiFault, unfiltered_catalogue = None,
min_for_median: int = 5):
"""
Compute the dominant (median cumulant) magnitude for each patch.
Parameters
----------
fault_model : RsqSimMultiFault
Fault model providing patch indices.
unfiltered_catalogue : RsqSimCatalogue or None, optional
If provided, use this catalogue's magnitude arrays (useful
when the current catalogue is a filtered subset).
min_for_median : int, optional
Minimum number of matching events for the median to be
computed. Defaults to 5.
Returns
-------
numpy.ndarray
Dominant magnitude (Mw) for each patch in ``fault_model``.
"""
patch_indices = np.array(list(fault_model.patch_dic.keys()), dtype=np.int32)
event_list = self.event_list
if unfiltered_catalogue is not None:
catalogue_mws = np.array(unfiltered_catalogue.catalogue_df["mw"])
catalogue_m0s = np.array(unfiltered_catalogue.catalogue_df["m0"])
else:
catalogue_mws = np.array(self.catalogue_df["mw"])
catalogue_m0s = np.array(self.catalogue_df["m0"])
event_mws = catalogue_mws[event_list]
event_m0s = catalogue_m0s[event_list]
medians = np.zeros_like(patch_indices, dtype=np.float64)
return self.dominant_magnitudes(patch_indices, self.patch_list, event_mws, event_m0s, min_for_median=min_for_median)
@staticmethod
@njit(parallel=True)
[docs]
def dominant_magnitudes(fault_patch_indices: np.ndarray[int | np.int32 | np.int64],
catalogue_patch_array: np.ndarray[int | np.int32 | np.int64],
event_mws: np.ndarray, event_m0s: np.ndarray,
min_for_median: int = 5):
"""
Numba-parallel computation of dominant magnitude per patch.
For each patch index, finds all matching entries in the catalogue
and computes the median cumulant magnitude.
Parameters
----------
fault_patch_indices : numpy.ndarray of int
All patch IDs in the fault model.
catalogue_patch_array : numpy.ndarray of int
Flat array of patch IDs from the catalogue (parallel to
``event_list``).
event_mws : numpy.ndarray of float
Mw for each entry in ``catalogue_patch_array``.
event_m0s : numpy.ndarray of float
M0 (N·m) for each entry in ``catalogue_patch_array``.
min_for_median : int, optional
Minimum matching events required to compute the median.
Patches below this threshold receive 0. Defaults to 5.
Returns
-------
numpy.ndarray of float
Dominant magnitude for each patch (0 if insufficient data).
"""
medians_array = np.zeros_like(fault_patch_indices, dtype=np.float64)
for i in prange(len(fault_patch_indices)):
n_matching = np.count_nonzero(catalogue_patch_array == i)
if n_matching >= min_for_median:
matching = np.flatnonzero(catalogue_patch_array == i)
matching_m0s = event_m0s[matching]
matching_mws = event_mws[matching]
med_cumulant = median_cumulant(matching_m0s, matching_mws)
medians_array[i] = med_cumulant
return medians_array
[docs]
def filter_crustal_events(self, fault_model: RsqSimMultiFault,
subduction_names: tuple = ("hikkerm", "puysegur"),
min_crustal_mw: float = 6.0):
"""
Return event IDs where crustal moment exceeds a magnitude threshold.
Uses Numba-parallel processing to check each event's crustal
moment contribution.
Parameters
----------
fault_model : RsqSimMultiFault
Fault model providing patch areas and crustal patch numbers.
subduction_names : tuple of str, optional
Fault names classified as subduction interface; all other
patches are considered crustal.
min_crustal_mw : float, optional
Minimum crustal moment magnitude threshold. Defaults to 6.0.
Returns
-------
numpy.ndarray of int
Event IDs whose crustal moment meets the threshold.
"""
min_crustal_m0 = mw_to_m0(min_crustal_mw)
crustal_patch_numbers = fault_model.get_crustal_patch_numbers(subduction_faults=subduction_names)
event_indices = np.array(list(self.catalogue_df.index), dtype=np.int32)
all_patch_areas = fault_model.get_patch_areas()
patch_area_list = all_patch_areas[self.patch_list]
return self.filter_crustal_events_parallel(event_indices, crustal_patch_numbers, self.event_list,
self.patch_list, self.patch_slip, patch_area_list, min_crustal_m0)
@staticmethod
@njit(parallel=True)
[docs]
def filter_crustal_events_parallel(event_indices: np.ndarray[int | np.int32 | np.int64],
crustal_patch_numbers: np.ndarray[int | np.int32 | np.int64],
event_list: np.ndarray[int | np.int32 | np.int64],
patch_list: np.ndarray[int | np.int32 | np.int64],
patch_slip: np.ndarray[float | np.float32 | np.float64],
patch_area_list: np.ndarray[float | np.float32 | np.float64],
min_crustal_m0: float | np.float32 | np.float64):
"""
Numba-parallel filter: return events with crustal moment >= threshold.
Parameters
----------
event_indices : numpy.ndarray of int
All event IDs to check.
crustal_patch_numbers : numpy.ndarray of int
Patch IDs classified as crustal (non-subduction).
event_list : numpy.ndarray of int
Flat catalogue event-ID array.
patch_list : numpy.ndarray of int
Flat catalogue patch-ID array.
patch_slip : numpy.ndarray of float
Slip (m) parallel to ``patch_list``.
patch_area_list : numpy.ndarray of float
Patch areas (m²) parallel to ``patch_list``.
min_crustal_m0 : float
Minimum crustal scalar moment (N·m) threshold.
Returns
-------
numpy.ndarray of int
Event IDs whose crustal moment meets ``min_crustal_m0``.
"""
crustal_bool = np.zeros_like(event_indices, dtype=np.int32)
for i in prange(len(event_indices)):
event_i = event_indices[i]
patch_numbers = patch_list[np.where(event_list == event_i)]
crustal_patches_i = jit_intersect(patch_numbers, crustal_patch_numbers)
if len(crustal_patches_i) > 0:
crustal_patch_indices = np.searchsorted(patch_numbers, crustal_patches_i)
patch_slip_i = patch_slip[patch_numbers]
patch_areas_i = patch_area_list[patch_numbers]
crustal_slip_i = patch_slip_i[crustal_patch_indices]
crustal_areas_i = patch_areas_i[crustal_patch_indices]
m0 = np.sum(crustal_slip_i * crustal_areas_i) * 3e10
if m0 >= min_crustal_m0:
crustal_bool[i] = 1
return event_indices[crustal_bool == 1]
[docs]
def match_events_to_crustal_nshm_dicts(self, fault_model: RsqSimMultiFault,
event_ids: np.ndarray[int | np.int32 | np.int64],
crustal_patch_dict: dict, crustal_n_dict: dict,
subduction_names: tuple = ("hikkerm", "puysegur"), threshold_proportion: float = 0.5):
"""
Match events to NSHM subsection indices using a KD-tree patch lookup.
For each event, determines which NSHM subsection indices are
ruptured based on the fraction of the subsection's patches that
slipped.
Parameters
----------
fault_model : RsqSimMultiFault
Fault model providing crustal patch numbers.
event_ids : numpy.ndarray of int
Event IDs to process.
crustal_patch_dict : dict
Mapping of NSHM subsection index (int) to a dict with
``"triangle_indices"`` giving the RSQSim patch IDs.
crustal_n_dict : dict
Mapping of NSHM subsection index (int) to total number of
RSQSim patches in that subsection.
subduction_names : tuple of str, optional
Fault names excluded from the crustal analysis.
threshold_proportion : float, optional
Fraction of subsection patches that must be ruptured for the
subsection to be considered active. Defaults to 0.5.
Returns
-------
numba.typed.Dict
Mapping of event ID (int32) to an array of NSHM subsection
indices (int32) that were ruptured.
"""
crustal_patch_numbers = fault_model.get_crustal_patch_numbers(subduction_faults=subduction_names)
crustal_patch_dict_typed = typed.Dict.empty(types.int32, types.int32[:])
for key in crustal_patch_dict.keys():
crustal_patch_dict_typed[key] = np.array(crustal_patch_dict[key]["triangle_indices"], dtype=np.int32)
crustal_n_dict_typed = typed.Dict.empty(types.int32, types.int32)
for key in crustal_n_dict.keys():
crustal_n_dict_typed[key] = crustal_n_dict[key]
return self.match_crustal_nshm_parallel(event_ids, self.event_list, self.patch_list, crustal_patch_dict_typed,
crustal_n_dict_typed, crustal_patch_numbers,
threshold_proportion=threshold_proportion)
@staticmethod
@njit
[docs]
def match_crustal_nshm_parallel(event_ids: np.ndarray[int | np.int32 | np.int64],
event_list: np.ndarray[int | np.int32 | np.int64],
patch_list: np.ndarray[int | np.int32 | np.int64],
crustal_patch_dict: typed.Dict,
crustal_n_dict: typed.Dict,
crustal_patch_numbers: np.ndarray[int | np.int32 | np.int64],
threshold_proportion: float = 0.5):
"""
Numba JIT-compiled: map events to NSHM subsection indices.
Parameters
----------
event_ids : numpy.ndarray of int32
Events to process.
event_list : numpy.ndarray of int32
Flat catalogue event-ID array.
patch_list : numpy.ndarray of int32
Flat catalogue patch-ID array.
crustal_patch_dict : numba.typed.Dict
Mapping of subsection index to array of RSQSim patch IDs.
crustal_n_dict : numba.typed.Dict
Mapping of subsection index to total patch count.
crustal_patch_numbers : numpy.ndarray of int32
All crustal patch IDs.
threshold_proportion : float, optional
Fraction threshold for subsection inclusion. Defaults to
0.5.
Returns
-------
numba.typed.Dict
Mapping of event ID (int32) to array of active subsection
indices (int32).
"""
nshm_patch_dict = typed.Dict.empty(types.int32, types.int32[:])
nshm_keys = np.array(list(crustal_patch_dict.keys()), dtype=np.int32)
for event_i in range(len(event_ids)):
event_id = event_ids[event_i]
patch_numbers = patch_list[np.where(event_list == event_id)]
crustal_patches_i = jit_intersect(patch_numbers, crustal_patch_numbers)
subsection_bool = np.zeros_like(nshm_keys, dtype=np.int32)
for subsection_i in range(len(nshm_keys)):
subsection_number = nshm_keys[subsection_i]
subsection_patches = crustal_patch_dict[subsection_number]
subsection_patches_i = jit_intersect(crustal_patches_i, subsection_patches)
if len(subsection_patches_i) > 0:
nshm_n_i = crustal_n_dict[subsection_number]
if len(subsection_patches_i) / nshm_n_i >= threshold_proportion:
subsection_bool[subsection_i] = 1
nshm_patch_dict[event_id] = nshm_keys[subsection_bool == 1]
print(event_id)
return nshm_patch_dict
[docs]
def read_bruce(run_dir: str = "/home/UOCNT/arh128/PycharmProjects/rnc2/data/shaw2021/rundir4627",
fault_file: str = "bruce_faults.in", names_file: str = "bruce_names.in",
catalogue_file: str = "eqs..out"):
"""
Read a Bruce Shaw RSQSim run directory and return the fault model and catalogue.
Parameters
----------
run_dir : str
Path to the RSQSim run directory.
fault_file : str, optional
Fault geometry file name. Defaults to ``"bruce_faults.in"``.
names_file : str, optional
Fault names file name. Defaults to ``"bruce_names.in"``.
catalogue_file : str, optional
Catalogue file name. Defaults to ``"eqs..out"``.
Returns
-------
bruce_faults : RsqSimMultiFault
Fault model with UTM coordinates transformed to NZTM.
catalogue : RsqSimCatalogue
Fully populated catalogue.
"""
fault_full = os.path.join(run_dir, fault_file)
names_full = os.path.join(run_dir, names_file)
assert os.path.exists(fault_full)
bruce_faults = RsqSimMultiFault.read_fault_file_bruce(fault_full,
names_full,
transform_from_utm=True)
catalogue_full = os.path.join(run_dir, catalogue_file)
assert os.path.exists(catalogue_full)
catalogue = RsqSimCatalogue.from_catalogue_file_and_lists(catalogue_full,
run_dir, "rundir4627")
return bruce_faults, catalogue
[docs]
def read_bruce_if_necessary(run_dir: str = "/home/UOCNT/arh128/PycharmProjects/rnc2/data/shaw2021/rundir4627",
fault_file: str = "bruce_faults.in", names_file: str = "bruce_names.in",
catalogue_file: str = "eqs..out", default_faults: str = "bruce_faults",
default_cat: str = "catalogue"):
"""
Read a Bruce Shaw run directory only if the variables are not already in global scope.
Parameters
----------
run_dir : str
Path to the RSQSim run directory.
fault_file, names_file, catalogue_file : str, optional
File names for faults, fault names, and the catalogue.
default_faults : str, optional
Global variable name expected to hold the fault model.
Defaults to ``"bruce_faults"``.
default_cat : str, optional
Global variable name expected to hold the catalogue.
Defaults to ``"catalogue"``.
Returns
-------
tuple of (RsqSimMultiFault, RsqSimCatalogue) or None
Returns the fault model and catalogue if they were not already
defined; otherwise returns ``None``.
"""
print(globals())
if not all([a in globals() for a in (default_faults, default_cat)]):
bruce_faults, catalogue = read_bruce(run_dir=run_dir, fault_file=fault_file, names_file=names_file,
catalogue_file=catalogue_file)
return bruce_faults, catalogue
[docs]
def combine_boundaries(bounds1: list, bounds2: list):
"""
Return the bounding box that encompasses both input bounding boxes.
Takes the element-wise minimum for the lower two coordinates and the
element-wise maximum for the upper two, producing a combined extent.
Parameters
----------
bounds1 : list
First bounding box as ``[min_x, min_y, max_x, max_y]``.
bounds2 : list
Second bounding box in the same format.
Returns
-------
list
Combined bounding box ``[min_x, min_y, max_x, max_y]``.
Raises
------
AssertionError
If ``bounds1`` and ``bounds2`` have different lengths.
"""
assert len(bounds1) == len(bounds2)
min_bounds = [min([a, b]) for a, b in zip(bounds1, bounds2)]
max_bounds = [max([a, b]) for a, b in zip(bounds1, bounds2)]
return min_bounds[:2] + max_bounds[2:]
[docs]
def calculate_b_value(magnitudes, time_interval_years, min_mw: float = 0.0, max_mw: float = 10.0, interval = 0.1):
"""
Estimate the Gutenberg-Richter b-value from a magnitude catalogue.
Builds a cumulative MFD histogram, normalises by the observation
interval, and fits a straight line to log₁₀(rate) vs Mw to obtain
the b-value (negative slope) and the a-value (intercept).
Parameters
----------
magnitudes : array-like
Array of moment magnitudes.
time_interval_years : float
Observation duration in years used to convert event counts to
annual rates.
min_mw : float, optional
Lower magnitude cutoff for the regression. Defaults to 0.0.
max_mw : float, optional
Upper magnitude cutoff for the regression. Defaults to 10.0.
interval : float, optional
Magnitude bin width. Defaults to 0.1.
Returns
-------
b_value : float
Estimated b-value (positive, i.e. the negative of the fitted
gradient).
intercept : float
Log₁₀ a-value (y-intercept of the Gutenberg-Richter fit).
"""
magnitudes = np.array(magnitudes)
mfd_bins = np.arange(0., 10. + interval, interval)
mfd_hist = np.histogram(magnitudes, bins=mfd_bins)
bin_centres = mfd_bins[:-1] + interval/2
trimmed_bins = bin_centres[(bin_centres >= min_mw) & (bin_centres <= max_mw + interval)]
cumulative_mfd = np.array([sum(mfd_hist[0][i:]) for i in range(len(mfd_hist[0]))], dtype=float)
trimmed_mfd = cumulative_mfd[(bin_centres >= min_mw) & (bin_centres <= max_mw + interval)]
trimmed_mfd_no_zeros = trimmed_mfd[trimmed_mfd > 0.]
trimmed_mfd_no_zeros /= time_interval_years
trimmed_bins_no_zeros = trimmed_bins[trimmed_mfd > 0.]
grad, intercept = np.polyfit(trimmed_bins_no_zeros, np.log10(trimmed_mfd_no_zeros), 1)
return -1 * grad, intercept