Source code for repurpose.ts2img

# Copyright (c) 2020, TU Wien, Department of Geodesy and Geoinformation
# All rights reserved.

# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#    * Redistributions of source code must retain the above copyright notice,
#      this list of conditions and the following disclaimer.
#    * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#    * Neither the name of TU Wien, Department of Geodesy and Geoinformation
#      nor the names of its contributors may be used to endorse or promote
#      products derived from this software without specific prior written
#      permission.

# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL TU WIEN DEPARTMENT OF GEODESY AND
# GEOINFORMATION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import numpy as np


[docs]def agg_tsmonthly(ts, **kwargs): """ Parameters ---------- ts : pandas.DataFrame time series of a point kwargs : dict any additional keyword arguments that are given to the ts2img object during initialization Returns ------- ts_agg : pandas.DataFrame aggregated time series, they all must have the same length otherwise it can not work each column of this DataFrame will be a layer in the image """ # very simple example # aggregate to monthly timestamp # should also make sure that the output has a certain length return ts.asfreq("M")
[docs]class Ts2Img(object): """ Takes a time series dataset and converts it into an image dataset. A custom aggregate function should be given otherwise a daily mean will be used Parameters ---------- tsreader: object object that implements a iter_ts method which iterates over pandas time series and has a grid attribute that is a pytesmo BasicGrid or CellGrid imgwriter: object writer object that implements a write_ts method that takes a list of grid point indices and a 2D array containing the time series data agg_func: function function that takes a pandas DataFrame and returns an aggregated pandas DataFrame ts_buffer: int how many time series to read before writing to disk, constrained by the working memory the process should use. """ def __init__(self, tsreader, imgwriter, agg_func=None, ts_buffer=1000): self.agg_func = agg_func if self.agg_func is None: try: self.agg_func = tsreader.agg_ts2img except AttributeError: self.agg_func = agg_tsmonthly self.tsreader = tsreader self.imgwriter = imgwriter self.ts_buffer = ts_buffer
[docs] def calc(self, **tsaggkw): """ does the conversion from time series to images """ for gpis, ts in self.tsbulk(**tsaggkw): self.imgwriter.write_ts(gpis, ts)
[docs] def tsbulk(self, gpis=None, **tsaggkw): """ iterator over gpi and time series arrays of size self.ts_buffer Parameters ---------- gpis: iterable, optional if given these gpis will be used, can be practical if the gpis are managed by an external class e.g. for parallel processing tsaggkw: dict Keywords to give to the time series aggregation function Returns ------- gpi_array: numpy.array numpy array of gpis in this batch ts_bulk: dict of numpy arrays for each variable one numpy array of shape (len(gpi_array), len(ts_aggregated)) """ # have to use the grid iteration as long as iter_ts only returns # data frame and no time series object including relevant metadata # of the time series i = 0 gpi_bulk = [] ts_bulk = {} ts_index = None if gpis is None: # get grid points can return either 3 or 4 values # depending on the grid type, gpis is the first in both cases gpi_info = list(self.tsreader.grid.grid_points()) gpis = np.array(gpi_info[0], dtype=int) for gpi in gpis: gpi_bulk.append(gpi) ts = self.tsreader.read_ts(gpi) ts_agg = self.agg_func(ts, **tsaggkw) for column in ts_agg.columns: try: ts_bulk[column].append(ts_agg[column].values) except KeyError: ts_bulk[column] = [] ts_bulk[column].append(ts_agg[column].values) if ts_index is None: ts_index = ts_agg.index i += 1 if i >= self.ts_buffer: for key in ts_bulk: ts_bulk[key] = np.vstack(ts_bulk[key]) gpi_array = np.hstack(gpi_bulk) yield gpi_array, ts_bulk ts_bulk = {} gpi_bulk = [] i = 0 if i > 0: for key in ts_bulk: ts_bulk[key] = np.vstack(ts_bulk[key]) gpi_array = np.hstack(gpi_bulk) yield gpi_array, ts_bulk