from repurpose.process import parallel_process, idx_chunks
import logging
import numpy as np
import pandas as pd
import xarray as xr
from pygeogrids.grids import BasicGrid, CellGrid
import os
from typing import Union
from pynetcf.time_series import (
GriddedNcOrthoMultiTs,
GriddedNcContiguousRaggedTs,
GriddedNcIndexedRaggedTs
)
from datetime import datetime
import warnings
from repurpose.stack import Regular3dimImageStack
"""
TODOs:
- add possibility to use resampling methods other than nearest neighbour
- integrate repurpose.resample module
- allows weighting functions etc.
- similar to resample, use multiple neighbours when available for image pixel
- further harmonisation with pynetcf interface
- time ranges for images instead of time stamps
"""
def _convert(converter: 'Ts2Img',
writer: Regular3dimImageStack,
img_gpis: np.ndarray,
lons: np.ndarray,
lats: np.ndarray,
preprocess_func=None,
preprocess_kwargs=None) -> xr.Dataset:
"""
Wrapper to allow parallelization of the conversion process.
This is kept outside the Ts2Img class for parallelization.
"""
for gpi, lon, lat in zip(img_gpis, lons, lats):
ts = converter._read_nn(lon, lat)
if ts is None:
continue
if preprocess_func is not None:
preprocess_func = np.atleast_1d(preprocess_func)
if preprocess_kwargs is None:
preprocess_kwargs = [{}] * len(preprocess_func)
preprocess_kwargs = np.atleast_1d(preprocess_kwargs)
if len(preprocess_func) != len(preprocess_kwargs):
raise ValueError("Length of preprocess_func and "
"preprocess_kwargs is different")
for func, kwargs in zip(preprocess_func, preprocess_kwargs):
ts = func(ts, **kwargs)
if np.any(np.isin(ts.columns, Ts2Img._protected_vars)):
raise ValueError(
f"Time series contains protected variables. "
f"Please rename them: {Ts2Img._protected_vars}"
)
writer.write_ts(ts, gpi)
return writer.ds
def _write_img(
image: xr.Dataset,
dt: datetime,
out_path: str,
filename: str,
annual_folder: bool = True,
encoding: dict = None,
):
"""
Wrapper to allow writing several images in parallel (see
Ts2Img.store_netcdf_images).
This is kept outside the Ts2Img class for parallelization.
"""
if annual_folder:
out_path = os.path.join(out_path, f"{dt.year:04}")
os.makedirs(out_path, exist_ok=True)
image.attrs['date_created'] = f"File created: {datetime.now()}"
image.to_netcdf(os.path.join(out_path, filename),
engine='netcdf4', encoding=encoding)
image.close()
del image
[docs]class Ts2Img:
"""
Takes a time series dataset and converts it into a set of images.
Images are stored on a regular grid. This includes a spatial and temporal
lookup, ie resampling of the time series data to a regular 2d grid as well
as assigning time series time stamps to images.
Protected variable names (used internally) are:
timedelta_seconds, index_other, distance_other
gpi, lon, lat
Parameters
----------
ts_reader: GriddedNcOrthoMultiTs or GriddedNcContiguousRaggedTs or GriddedNcIndexedRaggedTs
A reader that returns a time series for a given lon/lat combination.
The class method defined in `read_name` is called to read a pandas
DataFrame that has a DateTimeIndex and the variables as columns for
a location.
img_grid: BasicGrid or CellGrid
A regular grid that defines the output images. Must be rectangular and
have a 2d shape attribute. Can be a spatial subset of the time series
grid and contain points that are missing in the time series (filled
with nan). For each grid point, we search the closest time series
(within `max_dist` of ts_reader).
timestamps: pd.DateTimeIndex
Each data point in the loaded time series must be assigned to an image.
This defines the temporal sampling of the image stack. Each time stamp
is a separate image.
The closest time stamp from the time series will be stored in the
according image, other data that would be assinged to the same image
are DISCARDED!
In this case a higher frequency (eg 12-hourly) should be chosen.
A too low frequency here means that information is lost.
A too high frequency here means that data is split up into many images.
variables: dict or list[str] or None, optional (default: None)
Data variables to be read from the time series and transfer to the
images. Must exist in the time series. If a dict is given, then the
variables are renamed after reading.
Ideally a fill value for each variable (new name) is given in
'fill_values'.
If None, all variables are read.
read_function: str, optional (default: 'read')
Name of the method in `ts_reader` that takes a lon/lat pair and returns
a pandas DataFrame with a DateTimeIndex and the variables as columns.
max_dist: float, optional (default: 0.25)
Maximum distance around an image grid cell to tool for a time series.
If mutliple are found, only the nearest one is used!
time_colloction: bool, optional (default: True)
Relevant when converting data with varying time stamps per location.
For each image time stamp find the closest time series time stamp
afterwards. Then convert time series time stamps to deltas (>0) from
the image time stamps and store them in a new image variable
'timedelta_seconds'.
loglevel: str, optional (default: 'WARNING')
Logging level.
Must be one of 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'.
ignore_errors: bool, optional (default: False)
Instead of raising an exception, log errors and continue the
process. E.g. to skip individual corrupt files.
backend: str, optional (default: 'threading')
Which backend joblib should use. Default is 'threading',
other options are 'multiprocessing' and 'loky'
"""
# Some variables are generated internally and cannot be used.
# If there are variable with the same name, rename them using the
# `rename` keyword of the `.calc()` method.
_protected_vars = ['timedelta_seconds']
def __init__(self, ts_reader, img_grid, timestamps,
variables=None, read_function='read',
max_dist=18000, time_collocation=True, loglevel="WARNING",
ignore_errors=False, backend='threading'):
self.backend = backend
self.ts_reader = ts_reader
self.img_grid: CellGrid = Regular3dimImageStack._eval_grid(img_grid)
self.timestamps = timestamps
self.ignore_errors = ignore_errors
if variables is not None:
if not isinstance(variables, dict):
variables = {v: v for v in variables}
self.variables = variables
self.read_function = read_function
self.max_dist = max_dist
self.time_collocation = time_collocation
self.loglevel = loglevel
self.stack = None
def _cell_writer(self, cell: int, timestamps_chunk: pd.DatetimeIndex):
"""
Create sub-cube for cell to write time series of current chunk to.
This is only in memory, writing to disk is done after collecting all
cells.
"""
# RegularGriddedImageStack for a single cell
cellgrid = self.img_grid.subgrid_from_cells(cell)
n_lats = np.unique(cellgrid.activearrlat).size
n_lons = np.unique(cellgrid.activearrlon).size
cellgrid.shape = (n_lats, n_lons)
writer = Regular3dimImageStack(
cellgrid, timestamps=timestamps_chunk, zlib=False,
time_collocation=self.time_collocation)
return writer
def _read_nn(self, lon: float, lat: float) -> Union[pd.DataFrame, None]:
"""
Wrapper around read function. Read nearest time series within max_dist.
Log error when no GPI is found in time series + rename columns if
necessary.
"""
_, dist = self.ts_reader.grid.find_nearest_gpi(lon, lat)
if dist > self.max_dist:
return None
try:
ts = self.ts_reader.__getattribute__(self.read_function)(lon, lat)
except Exception as e:
logging.error(f"Error reading Time series data at "
f"lon: {lon}, lat: {lat}: {e}")
return None
if (self.variables is not None) and (ts is not None):
ts = ts.rename(columns=self.variables)[self.variables.values()]
return ts
def _calc_chunk(self, timestamps, preprocess_func=None, preprocess_kwargs=None,
log_path=None, n_proc=1):
"""
Create image stack from time series for the passed timestamps.
See: self.calc
"""
self.timestamps = timestamps
logging.info(f"Processing chunk from {timestamps[0]} to "
f"{timestamps[-1]}")
# Transfer time series to images, parallel for cells
STATIC_KWARGS = {
'converter': self,
'preprocess_func': preprocess_func,
'preprocess_kwargs': preprocess_kwargs,
}
ITER_KWARGS = {'writer': [], 'img_gpis': [], 'lons': [], 'lats': []}
for cell in np.unique(self.img_grid.activearrcell):
ITER_KWARGS['writer'].append(self._cell_writer(cell, timestamps))
img_gpi, lons, lats = self.img_grid.grid_points_for_cell(cell)
ITER_KWARGS['img_gpis'].append(img_gpi)
ITER_KWARGS['lons'].append(lons)
ITER_KWARGS['lats'].append(lats)
stack = parallel_process(
_convert, ITER_KWARGS, STATIC_KWARGS, n_proc=n_proc,
show_progress_bars=True, log_path=log_path, backend=self.backend,
verbose=False, ignore_errors=self.ignore_errors)
stack = xr.combine_by_coords(stack)
# positive latitudes are on top
stack = stack.reindex(dict(time=stack['time'],
lat=stack['lat'][::-1],
lon=stack['lon']))
return stack
[docs] def calc(self, path_out, format_out='slice', preprocess=None,
preprocess_kwargs=None, postprocess=None, postprocess_kwargs=None,
fn_template="{datetime}.nc",
drop_empty=False, encoding=None, zlib=True, glob_attrs=None,
var_attrs=None, var_fillvalues=None, var_dtypes=None,
img_buffer=100, n_proc=1):
"""
Perform conversion of all time series to images. This will first split
timestamps into processing chunks (img_buffer) and then - for each
chunk - iterate through all cells (parallel) in the img_grid, and
transfer the time series for each pixel into the image stack.
Parameters
----------
path_out: str
Path to the output directory where the files are written to.
format_out: str, optional (default: 'slice')
- slice: write each time step as a separate file. In this case
the fn_template must contain a placeholder {datetime} where
the date is inserted for each image
- stack: write all time steps into one file. In this case if there
is a {datetime} placeholder in the fn_template, then the time
range is inserted.
preprocess: callable or list[Callable], optional (default: None)
Function that is applied to each time series before converting it.
The first argument is the data frame that the reader returns.
Additional keyword arguments can be passed via `preprocess_kwargs`.
The function must return a data frame of the same form as the input
data, i.e. with a datetime index and at least one column of data.
Note: As an alternative to a preprocessing function, consider
applying an adapter to the reader class. Adapters also perform
preprocessing, see `pytesmo.validation_framework.adapters`
A simple example for a preprocessing function to compute the sum:
```
def preprocess_add(df: pd.DataFrame, **preprocess_kwargs) \
-> pd.DataFrame:
df['var3'] = df['var1'] + df['var2']
return df
```
preprocess_kwargs: dict or list[dict], optional (default: None)
Keyword arguments for the preprocess function. If None are given,
then the preprocessing function is is called with only the input
data frame and no additional arguments (see example above).
postprocess: Callable or list[Callable], optional (default: None)
Function(s) applied to the image stack after loading the data
and before writing it to disk. The function must take an xarray
Dataset as the first argument and return an xarray Dataset of the
same form.
A simple example for a preprocessing function to add a new variable
from the sum of two existing variables:
```
def postprocess_add(stack: xr.Dataset, **postprocess_kwargs) \
-> xr.Dataset
stack = stack.assign(var3=lambda x: x['var0'] + x['var2'])
return stack
```
postprocess_kwargs: dict or list[dict], optional (default: None)
Keyword arguments for the postprocess function(s). If None are given,
then the postprocess function is called with only the input
image stack and no additional arguments (see example above).
fn_template: str, optional (default: "{datetime}.nc")
Template for the output image file names.
If format_out is 'slice', then a placeholder {datetime} must be
in the fn_template, which will be replaced by the timestamp of each
image.
If format_out is 'stack', then no {datetime} placeholder is
required. If it's till provided, the time range of the stack is
inserted.
drop_empty: bool, optional (default: False)
Images where all data variables are empty are removed from
the stack after loading / before writing. Otherwise, emtpy images
will be written to disk.
encoding: dict of dicts, optional (default: None)
Encoding kwargs for each variable. Are passed to netcdf for storing
the files to apply dtype, scale_factor, add_offset, etc.
Make sure that the encoding is consistent with the data and fill
values (`var_fillvalues`).
For example, conversion to int16 for data values between 0 and 100
can result in data loss.
e.g. {'sm': {'dtype': 'int32', 'scale_factor': 0.01}}
zlib: bool, optional (default: True)
If True, then the netcdf files are compressed using zlib
compression for all data variables.
glob_attrs: dict, optional (default: None)
Additional global attributes that are added to the netcdf file.
e.g. {'product': 'ASCAT 12.5 TS'}
var_attrs: dict of dicts, optional (default: None)
Additional variable attributes that are added to the netcdf file.
The dict must have the following structure:
{varname: {'attrname': value}}, e.g
{'sm': {'long_name': 'soil moisture', 'units': 'm3 m-3'}, ...}
In case variable was renamed, use the new name here!
var_fillvalues: dict, optional (default: None)
Fill values for each variable. By default, nan is used for all
variables (you can also use the `encoding` parameter to set a
fill value when writing to disk).
In case variable was renamed, use the new name here!
var_dtypes: dict, optional (default: None)
Data types for each variable. By default, float32 is used for all
variables (you can also use the `encoding` parameter to set a
dtype when writing to disk).
In case variable was renamed, use the new name here!
img_buffer: int, optional (default: 100)
Size of the stack before writing to disk. Larger stacks need
more memory but will lead to faster conversion.
Passing -1 means that the whole stack loaded into memory at once.
n_proc: int, optional (default: 1)
Number of processes to use for parallel processing. We parallelize
by 5 deg. grid cell.
"""
if format_out not in ['slice', 'stack']:
raise ValueError("format_out must be 'slice' or 'stack'")
if format_out == 'slice' and '{datetime}' not in fn_template:
raise ValueError("fn_template must contain {datetime} for "
"format_out='slice'")
log_path = os.path.join(path_out, '000_logs')
os.makedirs(log_path, exist_ok=True)
dt_index_chunks = list(idx_chunks(self.timestamps, int(img_buffer)))
for timestamps in dt_index_chunks:
self.stack = self._calc_chunk(timestamps,
preprocess, preprocess_kwargs,
log_path, n_proc)
if drop_empty:
vars = [var for var in self.stack.data_vars if var not in
self._protected_vars]
idx_empty = []
for i in range(len(self.stack.time)):
img = self.stack[vars].isel(time=i)
for var in vars:
if not np.all([np.all(np.isnan(img[var].values))]):
break
else:
idx_empty.append(i)
self.stack = self.stack.drop_isel(time=idx_empty)
if var_fillvalues is not None:
for var, fillvalue in var_fillvalues.items():
self.stack[var].values = np.nan_to_num(
self.stack[var].values, nan=fillvalue)
if var_dtypes is not None:
for var, dtype in var_dtypes.items():
self.stack[var] = self.stack[var].astype(dtype)
encoding = encoding or {}
# activate zlib compression for all data variables
if zlib is True:
for var in self.stack.data_vars:
if var not in encoding:
encoding[var] = {}
encoding[var]['zlib'] = True
encoding[var]['complevel'] = 6
if glob_attrs is not None:
self.stack.attrs.update(glob_attrs)
if var_attrs is not None:
for var in var_attrs:
self.stack[var].attrs.update(var_attrs[var])
if postprocess is not None:
# this is done as late as possible
postprocess = np.atleast_1d(postprocess)
if postprocess_kwargs is None:
postprocess_kwargs = [{}] * len(postprocess)
postprocess_kwargs = np.atleast_1d(postprocess_kwargs)
if len(postprocess) != len(postprocess_kwargs):
raise ValueError("postprocess and postprocess_kwargs "
"have different lengths")
for func, kwargs in zip(postprocess, postprocess_kwargs):
self.stack = func(self.stack, **kwargs)
if self.stack['time'].size == 0:
warnings.warn("No images in stack to write to disk.")
self.stack = None
elif format_out == 'stack':
if '{datetime}' in fn_template:
dt_from = pd.to_datetime(self.stack.time.values[0])\
.to_pydatetime().strftime('%Y%m%dT%H%M%S')
dt_to = pd.to_datetime(self.stack.time.values[-1])\
.to_pydatetime().strftime('%Y%m%dT%H%M%S')
fname = fn_template.format(datetime=f"{dt_from}_{dt_to}")
else:
fname = fn_template
self.stack.to_netcdf(os.path.join(path_out, fname),
encoding=encoding)
self.stack = None
elif format_out == 'slice':
self.store_netcdf_images(
path_out, fn_template, keep=False, encoding=encoding,
annual_folder=True, n_proc=n_proc)
else:
raise NotImplementedError("Unknown `format_out`")
[docs] def store_netcdf_images(self, path_out, fn_template=f"{datetime}.nc",
encoding=None, annual_folder=True,
keep=False, n_proc=1):
"""
Write the (global) merged image stack to netcdf files.
Parameters
----------
path_out: str
Path to the output directory where the files are written to.
fn_template: str, optional (default: None)
Template for the output image file names. Must contain a placeholder
{datetime} where the image date is inserted.
encoding: dict (default: None)
Encoding for the netcdf variables. The keys are the variable names,
If True, then the images are grouped by year, and images for each
year a written to a separate folder.
annual_folder: bool, optional (default: True)
If True, then the images are grouped by year, and images for each
year a written to a separate folder.
keep: bool, optional (default: False)
If True, then the image stack is kept in memory during writing.
This is only needed if anything else should be done with the stack
after writing it to disk.
If False (recommended), then the stack is gradually deleted to empty
memory during writing.
n_proc: int, optional (default: 1)
Number of processes to use for reading cells and writing images
in parallel. Merging cells after reading is not parallelised and
might be a bottleneck.
"""
# Slice stack and write down individual images with according file
# names
if self.stack is None:
warnings.warn("No stack loaded, nothing to write")
images = []
dts = []
filenames = []
drop_z = []
for z in self.stack['time'].values:
dt = pd.to_datetime(z).to_pydatetime()
filename = fn_template.format(
datetime=datetime.strftime(dt, '%Y%m%d%H%M%S'))
image = self.stack.sel({'time': [dt]})
image.attrs['id'] = filename
drop_z.append(dt)
images.append(image)
dts.append(dt)
filenames.append(filename)
if not keep:
if len(drop_z) > 10: # to avoid too many duplicates in memory
self.stack = self.stack.drop_sel({'time': drop_z})
drop_z = []
if not keep:
del self.stack
self.stack = None
ITER_KWARGS = {'image': images, 'filename': filenames, 'dt': dts}
STATIC_KWARGS = {'out_path': path_out, 'annual_folder': annual_folder,
'encoding': encoding}
_ = parallel_process(
FUNC=_write_img,
ITER_KWARGS=ITER_KWARGS,
STATIC_KWARGS=STATIC_KWARGS,
n_proc=n_proc,
show_progress_bars=True,
verbose=False,
loglevel=self.loglevel,
ignore_errors=self.ignore_errors,
backend='multiprocessing',
)
del ITER_KWARGS, STATIC_KWARGS, images, _
def __del__(self):
if self.stack is not None:
self.stack.close()