Source code for nenupytf.stokes.stokes

#! /usr/bin/python3
# -*- coding: utf-8 -*-


"""
    **********
    NenuStokes
    **********
"""


__author__ = ['Alan Loh']
__copyright__ = 'Copyright 2019, nenupytf'
__credits__ = ['Alan Loh']
__maintainer__ = 'Alan Loh'
__email__ = 'alan.loh@obspm.fr'
__status__ = 'Production'
__all__ = [
    'NenuStokes',
    'LaneDSpec',
    'Stokes_I',
    'Stokes_Q',
    'Stokes_U',
    'Stokes_V',
    'CrossXX',
    'CrossYY'
    ]


import numpy as np

from nenupytf.other import allowed_stokes, compute_bandpass


# ============================================================= #
# ------------------------ NenuStokes ------------------------- #
# ============================================================= #
[docs]class NenuStokes(object): """ """ def __init__(self, data, stokes, nffte, fftlen, bp_corr=True): self.data = data self.stokes = stokes self.nffte = nffte self.fftlen = fftlen self.ntblocks = None self.nfblocks = None self.sel_slice = None self.bp_corr = bp_corr def __getitem__(self, slice_val): self.sel_slice = slice_val if self.stokes == 'i': data = Stokes_I(self.data)[slice_val] elif self.stokes == 'q': data = Stokes_Q(self.data)[slice_val] elif self.stokes == 'u': data = Stokes_U(self.data)[slice_val] elif self.stokes == 'v': data = Stokes_V(self.data)[slice_val] elif self.stokes == 'fracv': stokes_i = Stokes_I(self.data)[slice_val] stokes_v = Stokes_V(self.data)[slice_val] data = stokes_v/stokes_i elif self.stokes == 'xx': data = CrossXX(self.data)[slice_val] elif self.stokes == 'yy': data = CrossYY(self.data)[slice_val] elif self.stokes == 'argxy': re = Stokes_U(self.data)[slice_val] im = Stokes_V(self.data)[slice_val] data = np.abs( re + 1j * im) elif self.stokes == 'phasexy': re = Stokes_U(self.data)[slice_val] im = Stokes_V(self.data)[slice_val] data = np.angle( re + 1j * im) return self.correct( data=data, bandpass=self.bp_corr ) @property def sel_slice(self): return self._sel_slice @sel_slice.setter def sel_slice(self, s): if s is None: return if len(s) != 2: raise ValueError( '2D slice expected.' ) self.ntblocks = s[0].stop - s[0].start self.nfblocks = s[1].stop - s[1].start self._sel_slice = s return @property def stokes(self): return self._stokes @stokes.setter def stokes(self, s): if not isinstance(s, str): raise TypeError('String expected.') self._stokes = s.lower() if self._stokes not in allowed_stokes: raise ValueError('Wrong Stokes parameter.')
[docs] def correct(self, data, bandpass=True): """ Transfom the data into a 2D array of time-frquency Invert the halves of each beamlet Correct for bandpass """ # Make a 2D array data = np.swapaxes(data, 1, 2) n_times = self.ntblocks * self.nffte n_freqs = self.nfblocks * self.fftlen # Invert the halves of the beamlet if self.fftlen % 2. != 0.0: raise ValueError('Problem with fftlen value!') data = data.reshape( ( n_times, int(n_freqs/self.fftlen), 2, int(self.fftlen/2) ) ) data = data[:, :, ::-1, :].reshape((n_times, n_freqs)) # Bandpass correction if bandpass: if bandpass == 'median': spectrum = np.median(data, axis=0) folded = spectrum.reshape( (int(spectrum.size / self.fftlen), self.fftlen) ) broadband = np.median(folded, axis=1) broadband = np.repeat(broadband, self.fftlen) return data / spectrum * broadband elif bandpass == 'fft': from scipy.signal import find_peaks bp_fft = np.fft.fft(data) # Find peaks avg_fft = np.mean(np.abs(bp_fft), axis=0) p_idx, meta = find_peaks( x=avg_fft, height=np.median(avg_fft) * 2. ) # Put to zero peaks and neighbouring slices p_idx = np.concatenate( (p_idx, p_idx + 1, p_idx - 1) ) bp_fft[:, p_idx] = 0. return np.abs(np.fft.ifft(bp_fft)) else: bp = compute_bandpass(self.fftlen) data = data.reshape( ( n_times, int(n_freqs/bp.size), bp.size ) ) data *= bp[np.newaxis, np.newaxis] return data.reshape((n_times, n_freqs)) else: return data
# ============================================================= # # ============================================================= # # ------------------------- LaneDSpec ------------------------- # # ============================================================= #
[docs]class LaneDSpec(object): """ """ def __init__(self, data): self.fft0 = None self.fft1 = None self.mem_data = data @property def mem_data(self): return self._mem_data @mem_data.setter def mem_data(self, d): """ NenuFAR TF read data. """ if not isinstance(d, np.memmap): raise TypeError('Expecting a numpy.memmap object.') ffts = ['fft0', 'fft1'] if not all([fft in d.dtype.names for fft in ffts]): raise ValueError('Wrong level of data.') self._get_parameters(d) self._mem_data = d return def _get_parameters(self, data): """ """ self.fft0 = data['fft0'] self.fft1 = data['fft1'] return
# ============================================================= # # ============================================================= # # ------------------------- Stokes_I -------------------------- # # ============================================================= #
[docs]class Stokes_I(LaneDSpec): """ """ def __init__(self, data): super().__init__(data=data) def __getitem__(self, slice_val): selection = np.sum(self.fft0[slice_val], axis=4) return selection
# ============================================================= # # ============================================================= # # ------------------------- Stokes_Q -------------------------- # # ============================================================= #
[docs]class Stokes_Q(LaneDSpec): """ """ def __init__(self, data): super().__init__(data=data) def __getitem__(self, slice_val): selection = self.fft0[slice_val] selection = selection[..., 0] - selection[..., 1] return selection
# ============================================================= # # ============================================================= # # ------------------------- Stokes_U -------------------------- # # ============================================================= #
[docs]class Stokes_U(LaneDSpec): """ """ def __init__(self, data): super().__init__(data=data) def __getitem__(self, slice_val): selection = self.fft1[slice_val] selection = selection[..., 0] * 2 return selection
# ============================================================= # # ============================================================= # # ------------------------- Stokes_V -------------------------- # # ============================================================= #
[docs]class Stokes_V(LaneDSpec): """ """ def __init__(self, data): super().__init__(data=data) def __getitem__(self, slice_val): selection = self.fft1[slice_val] selection = selection[..., 1] * (-2) return selection
# ============================================================= # # ============================================================= # # -------------------------- CrossXX -------------------------- # # ============================================================= #
[docs]class CrossXX(LaneDSpec): """ """ def __init__(self, data): super().__init__(data=data) def __getitem__(self, slice_val): selection = self.fft0[slice_val] return selection[..., 0] * 2
# ============================================================= # # ============================================================= # # -------------------------- CrossYY -------------------------- # # ============================================================= #
[docs]class CrossYY(LaneDSpec): """ """ def __init__(self, data): super().__init__(data=data) def __getitem__(self, slice_val): selection = self.fft0[slice_val] return selection[..., 1] * 2
# ============================================================= #