"""Utilities for reading RSQSim output files in text, binary, CSV, DXF, STL, and VTK formats."""
import os
import meshio
import numpy as np
import pandas as pd
import ezdxf
from scipy.spatial import KDTree
[docs]
catalogue_columns = ["t0", "m0", "mw", "x", "y", "z", "area", "dt"]
[docs]
def read_text(file: str, format: str):
"""
Read scalar values from a text file output by RSQSim.
Produced when RSQSim is compiled with serial (text) output mode.
Parameters
----------
file :
Path to the text file to read.
format :
Data type specifier: ``"d"`` for double (float64) or ``"i"``
for integer (int32).
Returns
-------
numpy.ndarray
Flattened 1-D array of values read from the file.
Raises
------
AssertionError
If ``format`` is not ``"d"`` or ``"i"``, or if ``file`` does not
exist.
"""
# Check file existence and that parameter supplied for format makes sense.
assert format in ("d", "i")
assert os.path.exists(file)
if format == "d":
numbers = np.loadtxt(file, dtype=np.float64).flatten()
else:
numbers = np.loadtxt(file, dtype=np.int32).flatten()
return numbers
[docs]
def read_binary(file: str, format: str, endian: str = "little"):
"""
Read scalar values from a binary file output by RSQSim.
Parameters
----------
file :
Path to the binary file to read.
format :
Data type specifier: ``"d"`` for double (float64) or ``"i"``
for integer (int32).
endian :
Byte order of the file. ``"little"`` (default) is standard for
most modern x86 systems; use ``"big"`` for non-standard platforms.
Returns
-------
numpy.ndarray
Flattened 1-D array of values read from the file.
Raises
------
AssertionError
If ``endian`` is not ``"little"`` or ``"big"``, if ``format`` is
not ``"d"`` or ``"i"``, or if ``file`` does not exist.
"""
# Check that parameter supplied for endianness makes sense
assert endian in ("little", "big"), "Must specify either 'big' or 'little' endian"
endian_sign = "<" if endian == "little" else ">"
assert format in ("d", "i")
assert os.path.exists(file)
if format == "d":
numbers = np.fromfile(file, endian_sign + "f8").flatten()
else:
numbers = np.fromfile(file, endian_sign + "i4").flatten()
return numbers
[docs]
def read_csv_and_array(prefix: str, read_index: bool = True):
"""
Read a catalogue CSV and its associated NumPy array files from a common prefix.
Expects files named ``<prefix>_catalogue.csv``, ``<prefix>_events.npy``,
``<prefix>_patches.npy``, ``<prefix>_slip.npy``, and
``<prefix>_slip_time.npy`` to exist on disk.
Parameters
----------
prefix :
File path prefix (with or without trailing underscore).
read_index : bool, optional
If ``True`` (default), read the first column of the CSV as the
DataFrame index.
Returns
-------
list
``[df, events, patches, slip, slip_time]`` where ``df`` is a
``pandas.DataFrame`` and the remaining elements are NumPy arrays
loaded from the corresponding ``.npy`` files.
Raises
------
AssertionError
If ``prefix`` is an empty string.
FileNotFoundError
If any of the five expected files is missing.
"""
assert prefix, "Empty prefix string supplied"
if prefix[-1] != "_":
prefix += "_"
suffixes = ["catalogue.csv", "events.npy", "patches.npy", "slip.npy", "slip_time.npy"]
file_list = [prefix + suffix for suffix in suffixes]
for file, suffix in zip(file_list, suffixes):
if not os.path.exists(file):
raise FileNotFoundError("{} file missing!".format(suffix))
if read_index:
df = pd.read_csv(file_list[0], index_col=0)
else:
df = pd.read_csv(file_list[0])
array_ls = [np.load(file) for file in file_list[1:]]
return [df] + array_ls
[docs]
def read_earthquakes(earthquake_file: str, get_patch: bool = False, eq_start_index: int = None,
eq_end_index: int = None, endian: str = "little"):
"""
Read earthquake event data, inferring companion file names from the event file prefix.
Based on R scripts by Keith Richards-Dinger. The earthquake file is
expected to follow the naming convention ``eqs.<prefix>.out``.
Parameters
----------
earthquake_file :
Path to the RSQSim earthquake output file, usually with a
``.out`` suffix and named ``eqs.<prefix>.out``.
get_patch :
If ``True``, also read per-patch rupture data. Currently unused
placeholder.
eq_start_index :
Zero-based index of the first earthquake to read. Both
``eq_start_index`` and ``eq_end_index`` must be provided together.
eq_end_index :
Zero-based index of the last earthquake to read (exclusive).
endian :
Byte order for binary companion files. Defaults to ``"little"``.
Raises
------
AssertionError
If ``endian`` is not ``"little"`` or ``"big"``, or if
``earthquake_file`` does not exist.
ValueError
If ``eq_start_index >= eq_end_index`` when both are provided.
"""
assert endian in ("little", "big"), "Must specify either 'big' or 'little' endian"
assert os.path.exists(earthquake_file)
if not any([a is None for a in (eq_start_index, eq_end_index)]):
if eq_start_index >= eq_end_index:
raise ValueError("eq_start index should be smaller than eq_end_index!")
# Get full path to file and working directory
abs_file_path = os.path.abspath(earthquake_file)
file_base_name = os.path.basename(abs_file_path)
# Get file prefix from basename
split_by_dots = file_base_name.split(".")
# Check that filename fits expected format
if not all([split_by_dots[0] == "eqs", split_by_dots[-1] == "out"]):
print("Warning: non-standard file name.")
print("Expecting earthquake file name to have the format: eqs.{prefix}.out")
print("using 'catalogue' as prefix...")
prefix = "catalogue"
else:
# Join prefix back together if necessary, warning if empty
prefix_list = split_by_dots[1:-1]
if len(prefix_list) == 1:
prefix = prefix_list[0]
if prefix.strip() == "":
print("Warning: empty prefix string")
else:
prefix = ".".join(*prefix_list)
# Search for binary files in directory
tau_file = abs_file_path + "/tauDot.{}.out".format(prefix)
sigmat_file = abs_file_path + "/sigmaDot.{}.out".format(prefix)
[docs]
def read_earthquake_catalogue(catalogue_file: str):
"""
Read an RSQSim earthquake catalogue text file into a DataFrame.
Parses the catalogue file produced by RSQSim, skipping the header
block that ends with the line ``%%% end input files``, and loads the
subsequent rows into a DataFrame with the standard catalogue columns
``t0, m0, mw, x, y, z, area, dt``.
Parameters
----------
catalogue_file :
Path to the RSQSim catalogue text file.
Returns
-------
pandas.DataFrame
DataFrame with columns ``["t0", "m0", "mw", "x", "y", "z",
"area", "dt"]``, one row per earthquake event.
Raises
------
AssertionError
If ``catalogue_file`` does not exist.
"""
assert os.path.exists(catalogue_file)
with open(catalogue_file, "r") as fid:
data = fid.readlines()
start_eqs = data.index("%%% end input files\n") + 1
data_array = np.loadtxt(data[start_eqs:])
earthquake_catalogue = pd.DataFrame(data_array[:, :8], columns=catalogue_columns)
return earthquake_catalogue
# def read_fault(fault_file_name: str, check_if_grid: bool = True, )
[docs]
def read_ts_coords(filename):
"""
Read vertex and triangle data from a tsurf (``.ts``) file.
Parses the SCEC Community Fault Model tsurf format, extracting
vertex coordinates and triangle connectivity. Based on the MATLAB
script ``ReadAndSaveCfm.m`` by Brendan Meade.
Parameters
----------
filename :
Path to the tsurf ``.ts`` file.
Returns
-------
vrtx : numpy.ndarray of shape (n_vertices, 4)
Vertex data array. Each row is ``[vertex_id, x, y, z]``.
trgl : numpy.ndarray of shape (n_triangles, 3), dtype int
Triangle connectivity array. Each row gives the three vertex IDs
forming a triangle.
tri : numpy.ndarray of shape (n_triangles, 9)
Triangle data array. Each row contains the (x, y, z) coordinates
of all three corners concatenated: ``[x1,y1,z1, x2,y2,z2, x3,y3,z3]``.
Notes
-----
Copyright Paul Kaeufl, July 2014.
Original MATLAB script:
http://structure.rc.fas.harvard.edu/cfm/download/meade/ReadAndSaveCfm.m
"""
f = open(filename, 'r')
lines = f.readlines()
f.close()
idxVrtx = [idx for idx, l in enumerate(lines)
if 'VRTX' in l or 'PVRTX' in l]
idxTrgl = [idx for idx, l in enumerate(lines) if 'TRGL' in l]
nVrtx = len(idxVrtx)
nTrgl = len(idxTrgl)
vrtx = np.zeros((nVrtx, 4))
trgl = np.zeros((nTrgl, 3), dtype='int')
tri = np.zeros((nTrgl, 9))
for k, iVrtx in enumerate(idxVrtx):
line = lines[iVrtx]
tmp = line.split()
vrtx[k] = [int(tmp[1]), float(tmp[2]), float(tmp[3]), float(tmp[4])]
for k, iTrgl in enumerate(idxTrgl):
line = lines[iTrgl]
tmp = line.split(' ')
trgl[k] = [int(tmp[1]), int(tmp[2]), int(tmp[3])]
for l in range(3):
i1 = l * 3
i2 = 3 * (l + 1)
vertex_i = vrtx[vrtx[:, 0] == trgl[k, l]][0]
tri[k, i1:i2] = vertex_i[1:]
return vrtx, trgl, tri
[docs]
def read_dxf(dxf_file: str):
"""
Read a triangulated mesh and boundary polyline from a DXF file.
Expects a DXF file exported from Move containing exactly one
``POLYLINE`` boundary and one or more ``3DFACE`` triangles.
Parameters
----------
dxf_file :
Path to the DXF file.
Returns
-------
triangle_array : numpy.ndarray of shape (n_triangles, 9)
Each row contains the (x, y, z) coordinates of the three triangle
corners concatenated: ``[x1,y1,z1, x2,y2,z2, x3,y3,z3]``.
boundary_array : numpy.ndarray of shape (n_points, 3)
(x, y, z) coordinates of the boundary polyline vertices.
Raises
------
AssertionError
If ``dxf_file`` does not exist, or if the file does not contain
both ``3DFACE`` and ``POLYLINE`` entities.
ValueError
If the file contains more than one ``POLYLINE`` boundary.
"""
assert os.path.exists(dxf_file)
dxf = ezdxf.readfile(dxf_file)
msp = dxf.modelspace()
dxftypes = [e.dxftype() for e in msp]
assert all([a in dxftypes for a in ("3DFACE", "POLYLINE")]), "{}: Expected triangles and boundary".format(dxf_file)
if dxftypes.count("POLYLINE") > 1:
raise ValueError("{}: Too many boundaries lines...".format(dxf_file))
triangle_ls = []
boundary_array = None
for entity in msp:
if entity.dxftype() == "3DFACE":
triangle = np.array([vertex.xyz for vertex in entity])
unique_triangle = np.unique(triangle, axis=0).reshape((9,))
triangle_ls.append(unique_triangle)
elif entity.dxftype() == "POLYLINE":
boundary_ls = []
for point in entity.points():
boundary_ls.append(point.xyz)
boundary_array = np.array(boundary_ls)
triangle_array = np.array(triangle_ls)
return triangle_array, boundary_array
[docs]
def read_stl(stl_file: str, min_point_sep=100.):
"""
Read a triangulated surface mesh from an STL file.
Merges near-duplicate vertices (within ``min_point_sep``) by averaging
their positions and removing degenerate triangles that result from the
merge.
Parameters
----------
stl_file :
Path to the STL mesh file.
min_point_sep :
Minimum separation in metres below which two vertices are
considered duplicates and merged. Defaults to 100.0 m.
Returns
-------
numpy.ndarray of shape (n_triangles, 9)
Each row contains the (x, y, z) coordinates of the three triangle
corners: ``[x1,y1,z1, x2,y2,z2, x3,y3,z3]``.
Raises
------
AssertionError
If ``stl_file`` does not exist or if the mesh contains no
triangular cells.
"""
assert os.path.exists(stl_file)
mesh = meshio.read(stl_file)
assert "triangle" in mesh.cells_dict.keys()
triangles = mesh.cells_dict["triangle"]
mesh_points = mesh.points[:]
point_tree = KDTree(mesh.points)
distances, indices = point_tree.query(mesh.points, k=[2])
problem_indices = indices[distances < min_point_sep]
paired_indices = []
for i1 in problem_indices:
i2 = indices[i1][0]
if not any([i in paired_indices for i in [i1, i2]]):
p1 = mesh.points[i1]
p2 = mesh.points[i2]
mesh_points[i2] = 0.5 * (p1 + p2)
mesh_points[i1] = 0.5 * (p1 + p2)
triangles[triangles == i1] = i2
paired_indices += [i1, i2]
if len(problem_indices) > 0:
num_tris_pre = len(triangles)
tris_num_unique_vertices = np.unique(triangles, axis=1)
tri_lens = np.array([len(np.unique(tri)) for tri in tris_num_unique_vertices])
valid_tri_indices = np.where(tri_lens == 3)[0]
triangles = triangles[valid_tri_indices]
num_tris_post = len(triangles)
if num_tris_post < num_tris_pre:
print("Warning: {} triangles removed from mesh due to duplicate vertices".format(
num_tris_pre - num_tris_post))
point_dict = {i: point for i, point in enumerate(mesh_points)}
mesh_as_array = np.array([np.hstack([point_dict[vertex] for vertex in tri]) for tri in triangles])
return mesh_as_array
[docs]
def read_vtk(vtk_file: str, min_point_sep=1.):
"""
Read a triangulated surface mesh with slip and rake data from a VTK file.
Merges near-duplicate vertices (within ``min_point_sep``) and removes
degenerate triangles, preserving the associated slip and rake cell data.
Parameters
----------
vtk_file :
Path to the VTK mesh file.
min_point_sep :
Minimum separation in metres below which two vertices are
considered duplicates and merged. Defaults to 1.0 m.
Returns
-------
mesh_as_array : numpy.ndarray of shape (n_triangles, 9)
Each row contains the (x, y, z) coordinates of the three triangle
corners: ``[x1,y1,z1, x2,y2,z2, x3,y3,z3]``.
slip : numpy.ndarray of shape (n_triangles,)
Slip magnitude (metres) for each triangle after duplicate removal.
rake : numpy.ndarray of shape (n_triangles,)
Rake angle (degrees) for each triangle after duplicate removal.
Raises
------
AssertionError
If ``vtk_file`` does not exist, if the mesh contains no triangular
cells, or if the ``slip`` and ``rake`` cell data arrays are absent.
"""
assert os.path.exists(vtk_file)
mesh = meshio.read(vtk_file)
assert "triangle" in mesh.cells_dict.keys()
assert all([data in mesh.cell_data.keys() for data in ("slip", "rake")])
triangles = mesh.cells_dict["triangle"]
mesh_points = mesh.points[:]
point_tree = KDTree(mesh.points)
distances, indices = point_tree.query(mesh.points, k=[2])
problem_indices = indices[distances < min_point_sep]
paired_indices = []
for i1 in problem_indices:
i2 = indices[i1][0]
if not any([i in paired_indices for i in [i1, i2]]):
p1 = mesh.points[i1]
p2 = mesh.points[i2]
mesh_points[i2] = 0.5 * (p1 + p2)
mesh_points[i1] = 0.5 * (p1 + p2)
triangles[triangles == i1] = i2
paired_indices += [i1, i2]
if len(problem_indices) > 0:
num_tris_pre = len(triangles)
tris_num_unique_vertices = np.unique(triangles, axis=1)
tri_lens = np.array([len(np.unique(tri)) for tri in tris_num_unique_vertices])
valid_tri_indices = np.where(tri_lens == 3)[0]
triangles = triangles[valid_tri_indices]
slip = mesh.cell_data["slip"][0][valid_tri_indices]
rake = mesh.cell_data["rake"][0][valid_tri_indices]
num_tris_post = len(triangles)
if num_tris_post < num_tris_pre:
print("Warning: {} triangles removed from mesh due to duplicate vertices".format(num_tris_pre - num_tris_post))
else:
slip = mesh.cell_data["slip"][0]
rake = mesh.cell_data["rake"][0]
point_dict = {i: point for i, point in enumerate(mesh_points)}
mesh_as_array = np.array([np.hstack([point_dict[vertex] for vertex in tri]) for tri in triangles])
return mesh_as_array, slip, rake