Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 182 additions & 0 deletions scripts/convert_srg_outputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import argparse
from pathlib import Path

import numpy as np
from osgeo import gdal


gdal.UseExceptions()


def parse_rsc(rsc_file: Path) -> dict:
"""Parse the metadata from an SRG RSC file

Args:
rsc_file: Path to the RSC file

Returns:
Dictionary with the metadata from the RSC file
"""
types_dict = {
'WIDTH': int,
'FILE_LENGTH': int,
'X_FIRST': float,
'Y_FIRST': float,
'X_STEP': float,
'Y_STEP': float,
'X_UNIT': str,
'Y_UNIT': str,
'Z_OFFSET': float,
'Z_SCALE': float,
'PROJECTION': str,
'xstart': float,
'ystart': float,
'xend': float,
'yend': float,
'xsize': int,
'ysize': int,
}
with open(rsc_file, 'r') as f:

Check failure on line 39 in scripts/convert_srg_outputs.py

View workflow job for this annotation

GitHub Actions / call-ruff-workflow / check-with-ruff

Ruff (UP015)

scripts/convert_srg_outputs.py:39:25: UP015 Unnecessary mode argument
lines = f.readlines()
rsc_dict = {}
for line in lines:
key, value = line.strip().split()
rsc_dict[key] = types_dict[key](value)
return rsc_dict


def parse_sbas_list(sbas_list: Path) -> tuple[list[str], list[list[str]]]:
"""Parse the list of SBAS pairs from a SBAS list file

Args:
sbas_list: Path to the SBAS list file

Returns:
List with the names of the files in the SBAS list
List with the pairs of files in the SBAS list
"""
with open(sbas_list, 'r') as f:

Check failure on line 58 in scripts/convert_srg_outputs.py

View workflow job for this annotation

GitHub Actions / call-ruff-workflow / check-with-ruff

Ruff (UP015)

scripts/convert_srg_outputs.py:58:26: UP015 Unnecessary mode argument
lines = f.readlines()
pairs = [[Path(x).name for x in line.strip().split()[:2]] for line in lines]
files = sorted(list(set([x for pair in pairs for x in pair])))
return files, pairs


def write_geotiff(data: np.ndarray, info: dict, filename: Path, band_names: list[str] = None) -> None:
"""Write a numpy array to a GeoTIFF file

Args:
data: Numpy array with the data to be written
info: Dictionary with the metadata from the DEM RSC file
filename: Path to the output GeoTIFF file
band_names: List with the names of the bands in the output GeoTIFF
"""
if data.dtype == np.float32:
dtype = gdal.GDT_Float32
elif data.dtype == np.int16:
dtype = gdal.GDT_Int16
elif data.dtype == np.int32:
dtype = gdal.GDT_Int32
else:
raise ValueError('Data type not supported')

if len(data.shape) == 3:
n_bands, n_rows, n_cols = data.shape
else:
n_bands = 1
n_rows, n_cols = data.shape

driver = gdal.GetDriverByName('GTiff')
ds = driver.Create(str(filename), n_cols, n_rows, n_bands, dtype)
ds.SetGeoTransform((info['X_FIRST'], info['X_STEP'], 0, info['Y_FIRST'], 0, info['Y_STEP']))
ds.SetProjection('EPSG:4326')
if n_bands == 1:
ds.GetRasterBand(1).WriteArray(data)
else:
for i in range(n_bands):
band = ds.GetRasterBand(i + 1)
band.WriteArray(data[i, :, :])
if band_names is not None:
band.SetDescription(band_names[i])
band.FlushCache()
del band
ds = None


def convert_single_band(file: Path, info: dict, dtype: np.dtype) -> None:
"""Convert a single-band SRG file to a GeoTIFF

Args:
file: Path to the SRG file
info: Dictionary with the metadata from the DEM RSC file
dtype: Numpy data type of the file
"""
data = np.fromfile(file, dtype=dtype).reshape(info['FILE_LENGTH'], -1)
write_geotiff(data, info, file.with_suffix('.tif'))


def convert_displacement(displacement_file: Path, info: dict, band_names: list[str]) -> None:
"""Convert the velocity file to separate GeoTIFFs for amplitude and displacement
Save each date as a separate band

Args:
displacement_file: Path to the SRG displacement file
info: Dictionary with the metadata from the DEM RSC file
band_names: List with the dates of the SBAS pairs
"""
data = np.fromfile(displacement_file, dtype=np.float32).reshape(-1, info['WIDTH'])

amplitude = data[0::2, :]
amplitude = amplitude.reshape(len(band_names), info['FILE_LENGTH'], info['WIDTH'])
write_geotiff(amplitude, info, displacement_file.parent / 'amplitude.tif', band_names=band_names)

displacement = data[1::2, :]
displacement = displacement.reshape(len(band_names), info['FILE_LENGTH'], info['WIDTH'])
write_geotiff(displacement, info, displacement_file.parent / 'displacement.tif', band_names=band_names)


def convert_velocity(velocity_file: Path, info: dict, band_names: list[str]) -> None:
"""Convert the velocity file to a GeoTIFF
Save each date as a separate band

Args:
velocity_file: Path to the SRG velocity file
info: Dictionary with the metadata from the DEM RSC file
band_names: List with the dates of the SBAS pairs
"""
data = np.fromfile(velocity_file, dtype=np.float32).reshape(-1, info['WIDTH'])
velocity = data.reshape(len(band_names), info['FILE_LENGTH'], info['WIDTH'])
write_geotiff(velocity, info, velocity_file.parent / 'velocity.tif', band_names=band_names)


def convert_files(sbas_dir: Path) -> None:
"""Convert the contents of a SRG SBAS directory to GeoTIFFs

Args:
sbas_dir: Path to the SBAS directory for a SRG run
"""
info = parse_rsc(sbas_dir / 'dem.rsc')
files, pairs = parse_sbas_list(sbas_dir / 'sbas_list')
band_names = [x[17:25] for x in files][1:]
convert_single_band(sbas_dir / 'dem', info, dtype=np.int16)
convert_single_band(sbas_dir / 'npts', info, dtype=np.int32)
convert_single_band(sbas_dir / 'stacktime', info, dtype=np.int32)
convert_displacement(sbas_dir / 'displacement', info, band_names)
convert_velocity(sbas_dir / 'velocity', info, band_names)


def main():
"""CLI for the convert_srg_outputs.py script

Example:
python convert_srg_outputs.py ./path/to/srg_sbas_directory
"""
parser = argparse.ArgumentParser(description='Convert the contents of a SRG SBAS directory to GeoTIFF')
parser.add_argument('directory', type=str, help='SRG SBAS directory')
args = parser.parse_args()
args.directory = Path(args.directory)
convert_files(args.directory)


if __name__ == '__main__':
main()
145 changes: 145 additions & 0 deletions scripts/view_image.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import argparse
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
from convert_srg_outputs import parse_rsc


def load_image(file_path: Path, info: dict):
suffix = file_path.suffix
if suffix == '.geo':
ftype = 'slc'
data = np.fromfile(file_path, dtype=np.float32).reshape(info['FILE_LENGTH'], -1)
real = data[:, 0::2]
imag = data[:, 1::2]
data = np.abs(real + 1j * imag)
elif suffix == '.amp':
ftype = 'amp'
data = np.fromfile(file_path, dtype=np.float32).reshape(info['FILE_LENGTH'], -1)
elif suffix == '.int':
ftype = 'int'
data = np.fromfile(file_path, dtype=np.float32).reshape(info['FILE_LENGTH'], -1)
real = data[:, 0::2]
imag = data[:, 1::2]
data = np.angle(real + 1j * imag)
elif suffix == '.unw':
ftype = 'unw'
data = np.fromfile(file_path, dtype=np.float32).reshape(info['FILE_LENGTH'], -1)
data = data[:, data.shape[1] // 2 :]
elif suffix == '.cc':
ftype = 'cc'
data = np.fromfile(file_path, dtype=np.float32).reshape(info['FILE_LENGTH'], -1)
data = data[:, data.shape[1] // 2 :]
elif suffix == '':
name = file_path.name
if name == 'dem':
ftype = 'dem'
data = np.fromfile(file_path, dtype=np.int16).reshape(info['FILE_LENGTH'], -1)
elif name == 'npts':
ftype = 'npts'
data = np.fromfile(file_path, dtype=np.int32).reshape(info['FILE_LENGTH'], -1)
elif name == 'stacktime':
ftype = 'stacktime'
data = np.fromfile(file_path, dtype=np.int32).reshape(info['FILE_LENGTH'], -1)
elif name == 'velocity':
raise NotImplementedError('Loading of velocity files not implemented.')
elif name == 'displacement':
raise NotImplementedError('Loading of displacement files not implemented.')
else:
raise ValueError(f'File {file_path} not recognized')
else:
raise ValueError(f'File {file_path} not recognized')
return ftype, data


def plot_image(file_path: Path, rsc_path: Path):
info = parse_rsc(rsc_path)
ftype, data = load_image(file_path, info)
title = file_path.name

cmap = 'gray'
if ftype == 'int':
cmap = 'hsv'

f, ax = plt.subplots(1, 1, figsize=(15, 10))
im = ax.imshow(data, cmap=cmap, vmin=float(np.percentile(data, 1)), vmax=float(np.percentile(data, 99)))
f.colorbar(im, ax=ax, shrink=0.5)
ax.set_title(title)
plt.tight_layout()
plt.show()


def plot_hist(ax, array, color, label):
ax.hist(
array.flatten(),
bins=50,
color=color,
label=label,
alpha=0.25,
range=(np.percentile(array, 1), np.percentile(array, 99)),
)


def plot_diff(file_path: Path, file_path2: Path, rsc_path: Path, save: bool = False):
info = parse_rsc(rsc_path)
ftype, data = load_image(file_path, info)
title = file_path.name

ftype2, data2 = load_image(file_path2, info)
if ftype != ftype2:
raise ValueError('Files must be of the same type to plot the difference.')
diff = np.abs(data - data2)
title = f'{file_path}\n-\n{file_path2}'

cmap = 'gray'
if ftype == 'int':
cmap = 'hsv'

f, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 10))
im = ax1.imshow(diff, cmap=cmap, vmin=np.percentile(diff, 1), vmax=np.percentile(diff, 99))
plot_hist(ax2, diff, 'black', 'Difference')
f.colorbar(im, ax=ax1, shrink=0.5)
ax1.set_title(title)
plt.tight_layout()
if not save:
plt.show()
else:
out_path = file_path.parent / f'{file_path.stem}_minus_{file_path2.stem}.png'
plt.savefig(out_path, dpi=300)
print(f'Saved figure to {out_path}')
plt.close(f)


def plots():
image1 = Path('stanford/raw/S1A_IW_RAW__0SDV_20241006T161640_20241006T161717_055984_06D8A0_FC33.geo')
image2s = list(Path('hawaii_old').glob('*.geo'))
rsc = Path('stanford/elevation.dem.rsc')
for image2 in image2s:
plot_diff(image1, image2, rsc, save=True)


def main():
"""CLI for the convert_srg_outputs.py script

Example:
python convert_srg_outputs.py ./path/to/srg_sbas_directory
"""
parser = argparse.ArgumentParser(description='Convert the contents of a SRG SBAS directory to GeoTIFF')
parser.add_argument('file', type=Path, help='Path to file to display.')
parser.add_argument('rsc', type=Path, help='Path to companion RSC file.')
parser.add_argument(
'file2',
type=Path,
help='Path a second file. Image displayed will be the difference between the two files.',
nargs='?',
)
args = parser.parse_args()
if args.file2 is None:
plot_image(args.file, args.rsc)
else:
plot_diff(args.file, args.file2, args.rsc)


if __name__ == '__main__':
main()
Loading