Source code for nenupytf.read.spectrum

#! /usr/bin/python3
# -*- coding: utf-8 -*-


"""
    ********
    Spectrum
    ********


    The :mod:`.read` module contains two main object classes,
    namely :class:`.Lane` and :class:`.Spectrum`.
    Both are aiming at easy data selection from *NenuFAR/UnDySPuTeD*
    binary files, which are created one for each lane of the backend,
    depending on the observation setup.
    
    The former is working at the lowest file level, hence handling 
    *NenuFAR/UnDySPuTeD* files independently from one another.

    :class:`.Spectrum` is a higher level object class, able to work on
    different files from a given observation. Therefore, if for example,
    a particular observed beam is spread over two lane files (each one
    covering half of the frequency bandwidth), a full frequency selection
    would return data gathered from those two files.
    To do so, :class:`.Spectrum` first determine which files are 
    encompassing the data selection and then calls for :class:`.Lane`
    to perform the refined selection.

    
    **Initialization**
    
    The :class:`.Spectrum` object needs to be provided with a path to
    an observation directory, for e.g., ``'/path/to/observation'``, by
    default the current directory is used.

    >>> from nenupytf.read import Spectrum
    >>> spectrum = Spectrum('/path/to/observation') 


    **Data selection**

    High-rate time-frequency data can be selected among different
    parameters (handled as attributes of the class :class:`.Spectrum`),
    namely: :attr:`Spectrum.beam`, :attr:`Spectrum.time` and
    :attr:`Spectrum.freq`. These attributes can also be set while
    they are passed as keyword arguments in :func:`Spectrum.select`
    and :func:`Spectrum.average`.

    Observation parameters can be quickly displayed to ease the
    parameter selection thanks to :func:`.info`:

    >>> spectrum.info()

    * :attr:`Spectrum.beam` defines the selected beam index.
    * :attr:`Spectrum.time` defines the selection time range.
    * :attr:`Spectrum.freq` defines the selection frequency range.


    **Polyphase filter correction**
    
    Reconstructed sub-bands may not display a flat bandpass due
    to polyphase filter response. It may be usefull to correct
    for this effect and reduce dynamic spectra artefacts.

    Several correction options are available, set by user at the 
    :func:`Spectrum.select` and :func:`Spectrum.average` function
    levels with the :attr:`bp_corr` keyword. By default, this
    keyword is set to ``bp_corr=True`` to apply a precomputed
    correction. The default option is pretty fast to apply and is
    convenient in most situations although it may leave smal 
    artefacts at the sub-band edges. Same kind of artefacts
    are present if ``bp_corr='fft'``, however the computation
    time is increased as the sub-band bandpasses are corrected
    within the Fourier domain. ``bp_corr='median'`` correction
    allows for most reduced aretfacts. However, the latter may
    significantly alter the signal if the dynamic spectrum is not
    relatively smooth, use with caution! 
"""


__author__ = ['Alan Loh']
__copyright__ = 'Copyright 2019, nenupytf'
__credits__ = ['Alan Loh']
__maintainer__ = 'Alan Loh'
__email__ = 'alan.loh@obspm.fr'
__status__ = 'Production'
__all__ = [
    'Spectrum'
    ]


from nenupytf.read import ObsRepo, Lane
from nenupytf.stokes import SpecData
from nenupytf.other import to_unix, ProgressBar

import numpy as np


# ============================================================= #
# ------------------------- Spectrum -------------------------- #
# ============================================================= #
[docs]class Spectrum(ObsRepo): """ :class:`Spectrum` is the class designed to work with several lane files from the *UnDySPuTeD* backend. Once instanciated, a `Spectrum` object has a view of all file informations within the :attr:`directory`. This means that a selection may involve several lane files to search for data, regardless of the lane index. :param directory: Directory where observation files are stored :type directory: str, optional :Example: >>> from nenupytf.read import Spectrum >>> s = Spectrum('/path/to/observation/') """ def __init__(self, directory=''): super().__init__(repo=directory) self.beam = None self.freq = None self.time = None # --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def beam(self): """ Beam selection. This attribute can be set either directly or via specific keyword arguments in :func:`select()` and :func:`average()`. Default value is `0`. :setter: Beam index :getter: Beam index :type: int :Example: >>> from nenupytf.read import Spectrum >>> s = Spectrum('/path/to/observation/') >>> s.beam = 1 """ return self._beam @beam.setter def beam(self, b): if b is None: b = self.desctab['beam'][0] self._beam = b self._bmask = (self.desctab['beam'] == b) return @property def time(self): """ Time range selection. This attribute can be set either directly or via specific keyword arguments in :func:`select()` and :func:`average()`. Default value is `[obs_tmin, obs_tmax]` :setter: Length-2 list defining the selected time range: `[time_min, time_max]` where `time_min` and `time_max` are in ISO/ISOT formats :getter: Time range :type: list :Example: >>> from nenupytf.read import Spectrum >>> s = Spectrum('/path/to/observation/') >>> s.time = ['2019-10-03 14:30:00', '2019-10-03 18:31:01.34'] """ return self._time.copy() @time.setter def time(self, t): tmin = self.desctab['tmin'] tmax = self.desctab['tmax'] if t is None: t = [ tmin.min(), tmax.max() ] else: t = [to_unix(ti).unix for ti in t] if len(t) != 2: raise IndexError( 'time should be a length 2 list' ) self._time = t self._tmask = ((tmin <= t[0]) * (t[0] <= tmax)) +\ ((t[0] <= tmin) * (tmin <= t[1])) return @property def freq(self): """ Frequency range selection. This attribute can be set either directly or via specific keyword arguments in :func:`select()` and :func:`average()`. Default value is `[obs_fmin, obs_fmax]` :setter: Length-2 list defining the selected frequency range: `[freq_min, freq_max]` where `freq_min` and `freq_max` are in MHz :getter: Frequency range :type: list :Example: >>> from nenupytf.read import Spectrum >>> s = Spectrum('/path/to/observation/') >>> s.freq = [54, 66] """ return self._freq.copy() @freq.setter def freq(self, f): fmin = self.desctab['fmin'] fmax = self.desctab['fmax'] if f is None: f = [ fmin.min(), fmax.max() ] if len(f) != 2: raise IndexError( 'freq should be a length 2 list' ) self._freq = f self._fmask = ((fmin <= f[0]) * (f[0] <= fmax)) +\ ((f[0] <= fmin) * (fmin <= f[1])) return # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] def select(self, stokes='I', bp_corr=True, **kwargs): r""" Select among the data stored in the directory according to a :attr:`Spectrum.time` range, a :attr:`Spectrum.freq` range and a :attr:`Spectrum.beam` index and return them converted in the chosen Stokes parameter. NenuFAR-TF data can be spread over several lane files. This will select the data nonetheless and concatenate what is needed to ouptut a single :class:`.SpecData` object. This may be usefull to call for :func:`ObsRepo.info()` method prior to select the data in order to know the time and frequency boundaries of the observation as well as recorded beam indices. :param stokes: Stokes parameter value to convert raw data to, allowed values are `{'I', 'Q', 'U', 'V', 'fracV', 'XX', 'YY'}`, defaults to `'I'` :type stokes: str, optional :param bp_corr: Compute the bandpass correction, defaults to `True`, possible values are * `False`: do not compute any correction * `True`: compute the correction with Kaiser coefficients * `'median'`: compute a medianed correction * `'fft'`: correct the bandpass using FFT :type bp_corr: bool, str, optional :param \**kwargs: See below for keyword arguments: :param freq: Frequency range in MHz passed as a lenght-2 list. :type freq: list, optional :param time: Time range in ISOT or ISO format passed as a length-2 list. :type time: list, optional :param beam: Beam index. :type beam: int, optional :returns: `SpecData` object, embedding the stacked averaged spectra :rtype: :class:`.SpecData` :Example: >>> from nenupytf.read import Spectrum >>> s = Spectrum('/path/to/observation/') >>> spec = s.select( time=['2019-10-03 14:30:00', '2019-10-03 14:30:10.34'], freq=[34.5, 40], stokes='I' ) .. seealso:: :func:`average()` :class:`.SpecData` .. warning:: This may take a significant time to process depending on the time and frequency ranges. """ self._parameters(**kwargs) mask = self._bmask * self._fmask * self._tmask if all(~mask): raise ValueError( 'Empty selection, check parameter ranges' ) for f in self.desctab[mask]['file']: l = Lane(f) if not 'spec' in locals(): spec = l.select( stokes=stokes, time=[to_unix(t).isot for t in self.time], freq=self.freq, beam=self.beam, bp_corr=bp_corr ) else: # We assume here that the time is the same, # only frequency is spread over different lanes. # This will concatenate spectra from different # lanes: spec = spec & l.select( stokes=stokes, time=[to_unix(t).isot for t in self.time], freq=self.freq, beam=self.beam, bp_corr=bp_corr ) del l return spec
[docs] def average( self, stokes='I', df=1., dt=1., bp_corr=True, **kwargs ): r""" Average in time and frequency *NenuFAR/UnDySPuTeD* high rate time-frequency data. Data are read by time blocks of size `dt` before averaging by computing the mean over time. Then, the spectrum is rebinned to a reconstructed frequency axis with spaces around `df`. :param stokes: Stokes parameter value to convert raw data to, allowed values are `{'I', 'Q', 'U', 'V', 'fracV', 'XX', 'YY'}`, defaults to `'I'` :type stokes: str, optional :param df: Frequency resolution in MHz on which the averaging is performed, defaults to `1.` :type df: int, float, optional :param dt: Time resolution in seconds on which the average is performed, defaults to `1.` :type dt: int, float, optional :param bp_corr: Compute the bandpass correction, defaults to `True`, possible values are * `False`: do not compute any correction * `True`: compute the correction with Kaiser coefficients * `'median'`: compute a medianed correction * `'fft'`: correct the bandpass using FFT :type bp_corr: bool, str, optional :param \**kwargs: See below for keyword arguments: :param freq: Frequency range in MHz passed as a lenght-2 list. :type freq: list, optional :param time: Time range in ISOT or ISO format passed as a length-2 list. :type time: list, optional :param beam: Beam index. :type beam: int, optional :returns: `SpecData` object, embedding the stacked averaged spectra :rtype: :class:`.SpecData` :Example: >>> from nenupytf.read import Spectrum >>> s = Spectrum('/path/to/observation/') >>> spec = s.average( time=['2019-10-03 14:30:00', '2019-10-03 18:31:01.34'], freq=[34.5, 40], dt=0.01, df=0.5, stokes='I' ) .. seealso:: :func:`select()` :class:`.SpecData` .. warning:: This may take a significant time to process depending on the time and frequency ranges and the required time and frequency resolution. """ self._parameters(**kwargs) # Keep track of inputs parameters beam = self.beam freq = self.freq.copy() time = self.time.copy() start, stop = time # Predefined array ntimes = int(np.ceil((stop - start)/dt)) nfreqs = int((freq[1]-freq[0])/df) avg_data = np.zeros( (ntimes, nfreqs) ) avg_time = np.zeros(ntimes) avg_freq = np.zeros(nfreqs) i = 0 bar = ProgressBar( valmax=ntimes, title='Averaging...') while start < stop - dt: tmp_spec = self.select( stokes=stokes, time=[start, start+dt], freq=freq, beam=beam, bp_corr=bp_corr, ).tmean().frebin((freq[1]-freq[0])/df) avg_data[i, :] = tmp_spec.amp avg_time[i] = start + dt/2 avg_freq = tmp_spec.freq i += 1 start += dt bar.update() spec = SpecData( data=avg_data, time=to_unix(avg_time), freq=avg_freq, stokes=stokes ) return spec
# --------------------------------------------------------- # # ----------------------- Internal ------------------------ # def _parameters(self, **kwargs): """ Read the selection parameters """ for key, value in kwargs.items(): if key == 'beam': self.beam = value if key == 'freq': self.freq = value if key == 'time': self.time = value return
# ============================================================= #