#! /usr/bin/python3
# -*- coding: utf-8 -*-
"""
****
Lane
****
Test de docstring
"""
__author__ = ['Alan Loh']
__copyright__ = 'Copyright 2019, nenupytf'
__credits__ = ['Alan Loh']
__maintainer__ = 'Alan Loh'
__email__ = 'alan.loh@obspm.fr'
__status__ = 'Production'
__all__ = [
'Lane'
]
from astropy.time import Time
import os.path as path
from os import getpid
import psutil
import numpy as np
import warnings
from nenupytf.other import header_struct, max_bsn
from nenupytf.stokes import NenuStokes, SpecData
from nenupytf.other import idx_of, to_unix, rebin1d, ProgressBar
# ============================================================= #
# --------------------------- Lane ---------------------------- #
# ============================================================= #
[docs]class Lane(object):
""" Class to properly handle opening of one '*.spectra' file
for a specific lane.
Parameters
----------
spectrum : str
Complete path towards a '*.spectra' file
Attributes
----------
memdata : `numpy.memmap`
Memory map obejct containing the decoded binary data
lane : int
Land index corresponding to the file
sfile : str
Absolute path towards the '*.spectra' file
idx : int
freq_min : float
Minimal observed frequency in MHz
freq_max : float
Maximal observed frequency in MHz
time_min : `astropy.time.Time`
Minimal observed time
time_max : `astropy.time.Time`
maximal observed time
fftlen : int
Number of frequencies within each channel
nfft2int : int
fftovlp : int
apodisation : int
nffte : int
nbchan : int
Number of frequency channels
"""
def __init__(self, spectrum):
self._dtype = None
self.memdata = None
self.lane = None
self.sfile = spectrum
self.beam = None
self.time = None
self.freq = None
def __str__(self):
lane = 'Lane {}: '.format(self.lane)
freq = 'freq=[{}, {}] MHz, '.format(
self.freq_min,
self.freq_max
)
time = 'time=[{}, {}], '.format(
self.time_min.isot,
self.time_max.isot
)
beam = 'beams={}'.format(
np.unique(self._beams)
)
return lane + freq + time + beam
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def sfile(self):
return self._sfile
@sfile.setter
def sfile(self, s):
if not isinstance(s, str):
raise TypeError(
'String expected.'
)
self._sfile = path.abspath(s)
if not path.isfile(self._sfile):
raise FileNotFoundError(
'File {} not found'.format(self._sfile)
)
fname = path.basename(self._sfile)
self.lane = int(
fname.split('_')[-1].replace('.spectra', '')
)
self._load()
self._parse_tf()
return
@property
def time(self):
""" Selected time range
Parameters
----------
time : list
Length-2 list of time in ISO/ISOT or unix.
Returns
-------
time : list
length-2 list of unix timestamps
"""
return self._time
@time.setter
def time(self, t):
if t is None:
t = [self.time_min.unix, self.time_max.unix]
else:
if not isinstance(t, (list, np.ndarray)):
raise TypeError(
'`time` should be an array.'
)
if not len(t) == 2:
raise IndexError(
'`time` should be a length 2 array.'
)
if isinstance(t[0], str) & isinstance(t[1], str):
t0_unix = Time(t[0], precision=7).unix
t1_unix = Time(t[1], precision=7).unix
else:
# Assume unix time
t0_unix = t[0]
t1_unix = t[1]
if t0_unix < self.time_min.unix:
warnings.warn(
'Out of range time selection, default values adopted.'
)
t0_unix = self.time_min.unix
if t1_unix > self.time_max.unix:
warnings.warn(
'Out of range time selection, default values adopted.'
)
t1_unix = self.time_max.unix
if t1_unix < t0_unix:
raise ValueError(
'Stop time < Start time'
)
t = [t0_unix, t1_unix]
self._time = t
return
@property
def freq(self):
""" Selected frequency range in MHz.
Parameters
----------
freq : list
Length-2 list of frequencies
Returns
-------
freq : list
Length-2 list of frequencies
"""
return self._freq
@freq.setter
def freq(self, f):
if f is None:
f = [self.freq_min, self.freq_max]
else:
if not isinstance(f, (list, np.ndarray)):
raise TypeError(
'`freq` should be an array.'
)
if not len(f) == 2:
raise IndexError(
'`freq` should be a length 2 array.'
)
if f[0] < self.freq_min:
warnings.warn(
'Out of range freq selection, default values adopted.'
)
f[0] = self.freq_min
if f[1] > self.freq_max:
warnings.warn(
'Out of range freq selection, default values adopted.'
)
f[1] = self.freq_max
if f[1] < f[0]:
raise ValueError(
'Max freq < Min freq'
)
self._freq = f
return
@property
def beam(self):
""" Selected beam index.
Parameters
----------
beam : int
Selected beam index
"""
return self._beam
@beam.setter
def beam(self, b):
if b is None:
b = self._beams[0]
if not isinstance(b, (int, np.integer)):
raise TypeError(
'`beam` is expected to be an integer.'
)
if b not in self._beams:
raise ValueError(
'Out of range beam selection.'
)
self._beam = b
return
@property
def frequencies(self):
""" Array of frequencies in MHz corresponding
to the selected beam.
"""
channels = self._channels[self._beams == self.beam]
return channels * max_bsn * 1e-6
@property
def freq_min(self):
""" Minimal observed frequency in MHz
"""
return np.min(self.frequencies)
@property
def freq_max(self):
""" Maximal observed frequency in MHz
"""
df = (1.0 / 5.12e-6) * 1e-6
return np.max(self.frequencies) + df
@property
def time_min(self):
""" Minimal observed time.
`astropy.Time` object
"""
return to_unix(self._timestamps[0])
@property
def time_max(self):
""" Maximal observed time.
`astropy.Time` object
"""
times_per_block = self.nffte * self.fftlen * self.nfft2int
sec_dt_block = 5.12e-6 * times_per_block
return to_unix(self._timestamps[-1] + sec_dt_block)
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs] def select(self, stokes='I', time=None, freq=None, beam=None, bp_corr=True):
""" Select data within a lane file.
If the selection appears to be too big regarding
available memory, an error should be raised.
However as rough estimate, try to avoid time range
of more than 15 min and/or frequency range of more
than 10 MHz...
Parameters
----------
time : list
Length-2 list of time range (ISO or ISOT format)
e.g.:
`time=['2019-03-20T11:59:00.0', '2019-03-20T12:20:00.0']`
Default: `None` whole time range selection.
stokes : str
Stokes parameter required (I, Q, U, V, fracV)
Default: `'I'`
freq : list
Length-2 list of frequency range (in MHz)
e.g.:
`freq=[30, 35]`
Default: `None` whole frequency range selection.
beam : int
Beam index, refer to observation setup to see the
details of the different observed beams.
Default: `None` consider index 0.
bp_corr : bool or int, optional, default: `True
Compute the bandpass correction.
`False`: do not compute any correction
`True``: compute the correction with Kaiser coefficients
`'median'`: compute a medianed correction
`'fft'`: correct the bandpass using FFT
Returns
-------
spec : `SpecData`
SpecData object containing the time, the frequency and the data
"""
self.beam = beam
self.time = time
self.freq = freq
if self.time[1] - self.time[0] < self.dt:
raise ValueError(
'Time interval selected < {} sec'.format(self.dt)
)
if self.freq[1] - self.freq[0] < self.df:
raise ValueError(
'Frequency interval selected < {} MHz'.format(self.df)
)
tmin_idx = self._t2bidx(
time=self.time[0],
order='low'
)
tmax_idx = self._t2bidx(
time=self.time[1],
order='high'
)
fmin_idx = self._f2bidx(
frequency=self.freq[0],
order='low'
)
fmax_idx = self._f2bidx(
frequency=self.freq[1],
order='high'
)
self._check_memory(
nt=tmax_idx + 1 - tmin_idx,
nf=fmax_idx + 1 - fmin_idx
)
times = self._get_time(
id_min=tmin_idx,
id_max=tmax_idx + 1
)
t_mask = (times >= to_unix(self.time[0])) &\
(times < to_unix(self.time[1]))
freqs = self._get_freq(
id_min=fmin_idx,
id_max=fmax_idx + 1
)
f_mask = (freqs >= self.freq[0]) &\
(freqs < self.freq[1])
spectrum = NenuStokes(
data=self.memdata['data'],
stokes=stokes,
nffte=self.nffte,
fftlen=self.fftlen,
bp_corr=bp_corr
)[
tmin_idx:tmax_idx + 1,
fmin_idx:fmax_idx + 1,
]
return SpecData(
data=spectrum[t_mask, :][:, f_mask],
time=times[t_mask],
freq=freqs[f_mask],
stokes=stokes
)
[docs] def average(self, stokes='I', time=None, freq=None, beam=None, dt=None, df=None):
""" Average a dynamic spectrum in time and frequency
Parameters
----------
time : list
Length-2 list of time range (ISO or ISOT format)
e.g.:
`time=['2019-03-20T11:59:00.0', '2019-03-20T12:20:00.0']`
Default: `None` whole time range selection.
stokes : str
Stokes parameter required (I, Q, U, V, fracV)
Default: `'I'`
freq : list
Length-2 list of frequency range (in MHz)
e.g.:
`freq=[30, 35]`
Default: `None` whole frequency range selection.
beam : int
Beam index, refer to observation setup to see the
details of the different observed beams.
Default: `None` consider index 0.
dt : float
Time steps in seconds
df : float
Frequency steps in MHz
Returns
-------
spec : `SpecData`
SpecData object containing the time, the frequency and the data
"""
self.beam = beam
self.time = time
self.freq = freq
time_min, time_max = self.time
freq_min, freq_max = self.freq
if dt is None:
dt = (time_max - time_min) / 1000
if df is None:
df = (freq_max - freq_min) / 500
# Prepare the final array
nt = int((time_max - time_min) // dt)
nf = int((freq_max - freq_min) // df)
available_memory = psutil.virtual_memory().available
avg_data_size = nt * nf * np.dtype(np.float32).itemsize
if avg_data_size > available_memory:
raise MemoryError(
'Try to increase dt and/or df'
)
averaged_data = np.zeros((nt, nf), dtype='float32')
averaged_time = np.zeros(nt)#, dtype='float32')
averaged_freq = np.zeros(nf)#, dtype='float32')
# Loop over the time and seelct corresponding data
bar = ProgressBar(valmax=nt, title='Averaging spectra...')
for i in range(nt):
spec = self.select(
stokes=stokes,
time=[time_min + i*dt, time_min + (i + 1)*dt],
freq=freq,
beam=beam
)
# Averaging data in time
d = np.squeeze(np.mean(spec.data, axis=0))
averaged_time[i] = to_unix(np.mean(spec.time.unix)).unix
# Averaging in frequency
d = rebin1d(d, nf)
if i == 0:
averaged_freq[:] = rebin1d(spec.freq, nf)
# Storing into final array
averaged_data[i, :] = d
bar.update()
# return averaged_time, averaged_freq, averaged_data
return SpecData(
data=averaged_data,
time=to_unix(averaged_time),
freq=averaged_freq,
stokes=stokes
)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _load(self):
""" Open the .spectra file in memmap mode.
Store it in the `memdata` attribute.
"""
with open(self.sfile, 'rb') as rf:
hd_struct = np.dtype(header_struct)
header = np.frombuffer(
rf.read(hd_struct.itemsize),
count=1,
dtype=hd_struct,
)[0]
for key in [h[0] for h in header_struct]:
setattr(self, key.lower(), header[key])
self.dt = 5.12e-6 * self.fftlen * self.nfft2int
self.block_dt = self.dt * self.nffte # sec
self.df = (1.0 / 5.12e-6 / self.fftlen) * 1e-6
beamlet_struct = np.dtype(
[('lane', 'int32'),
('beam', 'int32'),
('channel', 'int32'),
('fft0', 'float32', (self.nffte, self.fftlen, 2)),
('fft1', 'float32', (self.nffte, self.fftlen, 2))]
)
block_struct = header_struct +\
[('data', beamlet_struct, (self.nbchan))]
self._dtype = np.dtype(block_struct)
itemsize = self._dtype.itemsize
with open(self.sfile, 'rb') as rf:
tmp = np.memmap(rf, dtype='int8', mode='r')
n_blocks = tmp.size * tmp.itemsize // (itemsize)
data = tmp[: n_blocks * itemsize].view(self._dtype)
self.memdata = data
return
def _parse_tf(self):
""" Once the file is open wia memmap, go through it
and store information for each time, frequency,
beam index and lane index.
"""
datacube = self.memdata['data']
self._ntb, self._nfb = datacube['lane'].shape
self._timestamps = np.arange(self._ntb, dtype='float64')
self._timestamps *= self.block_dt
self._timestamps += self.memdata['TIMESTAMP'][0]
# We here assume that the same information
# is repeated at each time block
self._beams = datacube['beam'][0]
self._channels = datacube['channel'][0]
return
def _t2bidx(self, time, order='low'):
""" Time to block index
Parameter
---------
time : str
Time selection in ISO or ISOT.
Returns
-------
index : dict
Dictionnary of lanes <-> block index
"""
indices = {}
idx = idx_of(
array=self._timestamps,
value=time,
order=order
)
return idx
def _f2bidx(self, frequency, order='low'):
""" Frequency to block index
Parameter
---------
frequency : float
Frequency selection in MHz.
Returns
-------
index : dict
Dictionnary of lanes <-> block index
"""
idx = idx_of(
array=self.frequencies,
value=frequency,
order=order
)
idx += np.searchsorted(self._beams, self.beam)
return idx
def _get_time(self, id_min, id_max):
""" Recover the time for a given block selection
Parameters
----------
id_min : int
Index of starting time block
id_max : int
Index of ending time block
Returns
-------
time : `astropy.Time`
Time object array
"""
n_times = (id_max - id_min) * self.nffte
t = np.arange(n_times, dtype='float64')
dt = t * self.dt,
dt += self._timestamps[id_min]
return to_unix(np.squeeze(dt))
def _get_freq(self, id_min, id_max):
""" Recover the frequency for a given block selection
Parameters
----------
id_min : int
Index of starting frequency block
id_max : int
Index of ending frequency block
Returns
-------
frequency : `np.ndarray`
Array of frequencies in MHz
"""
beam_shift = np.searchsorted(self._beams, self.beam)
id_max -= beam_shift
id_min -= beam_shift
f = np.tile(np.arange(self.fftlen, dtype='float64'), (id_max - id_min))
f *= self.df
f += np.repeat(self.frequencies[id_min:id_max], self.fftlen)
return f
def _check_memory(self, nt, nf):
""" Check that the requested data selection does not
exceed the available memory.
"""
vm = psutil.virtual_memory()
mem_gb = vm.available / (1024**3)
n_elements = nt * self.nffte * nf * self.fftlen
fftn_size = n_elements * 4 * np.dtype(np.float32).itemsize
meta_size = n_elements * 3 * np.dtype(np.int32).itemsize
sel_gb = (fftn_size + meta_size) / (1024**3)
if sel_gb > mem_gb * 0.9:
raise MemoryError(
'Try to reduce the selection range'
)
return
# ============================================================= #