Source code for spec_analysis.spectra

# !/usr/bin/env python3
# -*- coding: UTF-8 -*-

"""The ``spectra`` module provides object representations of spectra and is
responsible for processing spectra (e.g. extinction correction) and sampling
any spectral features.

.. note:: Where applicable, this module uses the Schlegel et al. 1998 dust map
   and Fitzpatrick et al. 1999 extinction law.

Usage Examples
--------------

``Spectrum`` objects are used to represent individual spectra and provide
functionality for correcting MW extinction, rest-framing, and binning the
spectra (**in that specific order!**). A handful of examples demonstrating this
functionality is provided below:

Working with a Single Spectrum
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

First we define a demo spectrum with dummy values for wavelength, flux,
redshift, right ascension (RA) and declination (Dec). All values are defined
in the observer frame.

.. code-block:: python
   :linenos:

   import numpy as np

   from spec_analysis import simulate, spectra

   # Define feature using rest and observer frame average wavelength
   wave = np.arange(4000, 5000)
   lambda_rest = np.mean(wave)
   lambda_observed = lambda_rest - 10
   flux, eflux = simulate.gaussian(wave, mean=lambda_observed, stddev=100)

   # Prepare a demo spectrum object
   cls.spectrum = Spectrum(wave, flux, ra=0, dec=-1, z=0)

To correct for MW extinction and bin the spectrum, use the ``prepare_spectrum``
method. Several filters are available for binning the spectrum. Here we use a
``median filter`` with a window size of ``10``:

.. code-block:: python
   :linenos:

   spectrum.prepare_spectrum(bin_size=10, method='median')

   rest_wave = spectrum.rest_wave  # Rest-framed wavelengths
   rest_flux = spectrum.rest_flux  # Extinction corrected flux
   bin_flux = spectrum.bin_flux    # Filtered, extinction corrected flux

Once the spectrum is prepared, you can measure it's properties for a given
feature. This requires knowing the start / end wavelength of the feature in
the current spectrum, and the feature's rest frame wavelength.

.. code-block:: python
   :linenos:

   feat_start = spectrum.rest_wave[10]
   feat_end = spectrum.rest_wave[-10]
   spectrum.sample_feature_properties(feat_start, feat_end, lambda_rest)

Custom Callbacks
^^^^^^^^^^^^^^^^

If you want to inject your own custom calculations to the sampling process,
this can be added by specifying the ``callback`` argument.

.. code-block:: python
   :linenos:

   def my_callback(feature):
       '''This function will be called for every iteration / sample

       Args:
           feature (ObservedFeature): The sampled flux as a feature object
       '''

       pass

   spectrum.sample_feature_properties(
       feat_start, feat_end, rest_frame, callback=my_callback)

Iterating over a Data Release
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The ``SpectraIterator`` class is used to iterate over spectra from data releases
provided by the ``sndata`` package. Various arguments can be specified to
account for variations in data formatting between different surveys. As an
example, we consider the SDSS survey, which is accessed as follows:

.. code-block:: python
   :linenos:

   from sndata.sdss import Sako18Spec
   from spec_analysis.spectra import SpectraIterator

   # Make sure data is downloaded to your local machine
   data_release = Sako18Spec()
   data_release.download_module_data()

   # Just so we can see the SDSS data model
   demo_table = next(data_release.iter_data())
   print(demo_table)

The data tables from SDSS include multiple spectra, including host galaxy
spectra and multiple observations of the supernova. We use a **pre-processing**
function to remove the galaxy spectra, and distinguish between different
spectra using their observation time (the ``time`` column in the data tables)

.. code-block:: python
   :linenos:

   # Function called to process data tables before plotting / analysis
   def pre_process(table):
       \"""Pre-process spectral data

       Args:
           table: An astropy table with unprocessed data

       Returns:
           An astropy table with processed data
       \"""

       # Remove galaxy spectra from data tables
       # (and any other spectral data you don't want to consider)
       return table[table['type'] != 'Gal']

   data_iter = SpectraIterator(data_release, pre_process=pre_process, group_by='time')

If we only wanted to consider data for a subset of targets, we can specify
those targets using their object Id.

.. code-block:: python
   :linenos:

   obj_ids = ['722', '739', '744', '762', '774']
   data_iter = SpectraIterator(
       data_release, obj_ids=obj_ids, pre_process=pre_process, group_by='time')

.. note:: The ``pre_process`` function is called **before** spectra are grouped
   using the ``group_by`` keyword. This allows you to add a custom ``group_by``
   value to the data tables if necessary.

The ``SpectraIterator`` class expects data tables to have the ``ra``, ``dec``,
ad ``z`` keys specified in their meta data. If those values are not available,
they can be added in the pre-processing function.

API Documentation
-----------------
"""

import extinction
from pathlib import Path

import numpy as np
import sfdmap
import uncertainties as unc
from scipy.ndimage.filters import gaussian_filter, generic_filter, median_filter
from uncertainties.unumpy import nominal_values

from .exceptions import SamplingRangeError
from .features import ObservedFeature

_file_dir = Path(__file__).resolve().parent
_dust_dir = _file_dir / 'schlegel98_dust_map'
dust_map = sfdmap.SFDMap(_dust_dir)


[docs]class Spectrum: """Object representation of an observed spectrum"""
[docs] def __init__(self, wave, flux, ra, dec, z, **kwargs): """Measures pEW and calc_area of spectral features Args: wave (ndarray): Observer frame wavelength flux (ndarray): Observer frame flux ra (float): The target's right ascension dec (float): The target's declination z (float): The target's redshift Additional kwargs to set as instance attributes """ self.wave = wave self.flux = flux self.ra = ra self.dec = dec self.z = z for key, value in kwargs.items(): setattr(self, key, value) # Place holders for results of intermediate analyses self.rest_wave = None self.rest_flux = None self.bin_flux = None self.feature_bounds = []
def _correct_extinction(self, rv=3.1): """Rest frame spectra and correct for MW extinction Spectra are rest-framed and corrected for MW extinction using the Schlegel et al. 98 dust map and the Fitzpatrick et al. 99 extinction law. if rv is not given, a value of 3.1 is used. Results are set to the ``self.rest_wave`` and ``self.rest_flux`` attributes. Args: rv (float): Rv value to use for extinction Returns: - The rest framed wavelengths - The flux corrected for extinction """ # Determine extinction mwebv = dust_map.ebv(self.ra, self.dec, frame='fk5j2000', unit='degree') mag_ext = extinction.fitzpatrick99(self.wave, rv * mwebv, rv) # Correct flux to rest-frame self.rest_wave = self.wave / (1 + self.z) self.rest_flux = self.flux * 10 ** (0.4 * mag_ext) def _bin_spectrum(self, bin_size, bin_method): """Bin a spectrum to a given resolution Bins the values of ``self.rest_wave`` and ``self.rest_flux`` and sets the results to ``self.bin_wave``, and self.bin_flux``. Args: bin_size (float): The width of the bins bin_method (str): Either 'median', 'average', 'sum', or 'gauss' Returns: - The center of each bin - The binned flux values """ if self.rest_wave is None or self.rest_flux is None: raise RuntimeError('Spectrum must be corrected for extinction before binning') if bin_method == 'sum': self.bin_flux = generic_filter(self.rest_flux, sum, bin_size) elif bin_method == 'average': self.bin_flux = generic_filter(self.rest_flux, np.average, bin_size) elif bin_method == 'gauss': self.bin_flux = gaussian_filter(self.rest_flux, bin_size) elif bin_method == 'median': self.bin_flux = median_filter(self.rest_flux, bin_size) else: raise ValueError(f'Unknown method {bin_method}')
[docs] def prepare_spectrum(self, rv=3.1, bin_size=5, bin_method='median'): """Correct for extinction, rest-frame, and bin the spectrum Args: bin_size (float): Bin size in units of Angstroms bin_method (str): Either 'median', 'average', 'sum', or 'gauss' rv (float): Rv value to use for extinction (Default: 3.1) """ self._correct_extinction(rv=rv) self._bin_spectrum(bin_size=bin_size, bin_method=bin_method.lower())
[docs] def sample_feature_properties( self, feat_start, feat_end, rest_frame, nstep=0, callback=None): """Calculate the properties of a single feature in a spectrum Velocity values are returned in km / s. Error values are determined both formally (summed in quadrature) and by re-sampling the feature boundaries ``nstep`` flux measurements in either direction. Args: feat_start (float): Starting wavelength of the feature feat_end (float): Ending wavelength of the feature rest_frame (float): Rest frame location of the specified feature nstep (int): Number of samples taken in each direction callback (callable): Call a function after every iteration. Function is passed the sampled feature. Returns: - The line velocity - The formal error in velocity - The sampling error in velocity - The equivalent width - The formal error in equivalent width - The sampling error in equivalent width - The feature calc_area - The formal error in calc_area - The sampling error in calc_area """ # Get indices for beginning and end of the feature idx_start = np.where(self.rest_wave == feat_start)[0][0] idx_end = np.where(self.rest_wave == feat_end)[0][0] if idx_end - idx_start <= 10: raise ValueError('Range too small. Please select a wider range') # We vary the beginning and end of the feature to estimate the error velocity, pequiv_width, area = [], [], [] for i in np.arange(-nstep, nstep + 1): for j in np.arange(nstep, -nstep - 1, -1): # Get sub-sampled wavelength/flux sample_start_idx = idx_start + i sample_end_idx = idx_end + j if sample_start_idx < 0 or sample_end_idx >= len(self.rest_wave): raise SamplingRangeError sample_wave = self.rest_wave[sample_start_idx: sample_end_idx] sample_bflux = self.bin_flux[sample_start_idx: sample_end_idx] sample_rflux = self.rest_flux[sample_start_idx: sample_end_idx] # Determine feature properties feature = ObservedFeature( sample_wave, sample_rflux, sample_bflux, rest_frame) velocity.append(feature.velocity) pequiv_width.append(feature.pew) area.append(feature.area) if callback: callback(feature) avg_velocity = np.mean(velocity) avg_ew = np.mean(pequiv_width) avg_area = np.mean(area) return [ unc.nominal_value(avg_velocity), unc.std_dev(avg_velocity), np.std(nominal_values(velocity)), unc.nominal_value(avg_ew), unc.std_dev(avg_ew), np.std(nominal_values(pequiv_width)), unc.nominal_value(avg_area), unc.std_dev(avg_area), np.std(nominal_values(area)) ]
[docs]class SpectraIterator: """Iterator over individual spectra from an ``sndata`` data release"""
[docs] def __init__(self, data_release, obj_ids=None, pre_process=None, group_by='time'): """An iterator over individual spectra in a data release Instantiates an iterator over spectra in a ``sndata`` data release. Spectra are yielded individually as ``Spectrum`` objects. Any meta data for a given spectrum is added as an attribute to the corresponding ``Spectrum`` object. The time of the observation is also included as an attribute. Args: data_release (SpectroscopicRelease): An sndata style data release obj_ids (list): Optionally only consider a subset of Id's pre_process (Callable): Function to prepare data before plotting Yields: ``Spectrum`` objects """ super().__init__() # Make sure the passed data release is spectroscopic data_type = data_release.data_type if data_type != 'spectroscopic': raise ValueError(f'Requires spectroscopic data. Passed {data_type}') # Store arguments and set defaults default_obj_ids = list(data_release.get_available_ids()) self.obj_ids = default_obj_ids if obj_ids is None else obj_ids self.pre_process = pre_process self.data_release = data_release self.group_by = group_by # Build iterator over spectra self._iter_data = self._create_spectrum_iterator()
def _create_spectrum_iterator(self): """Instantiate an iterator that sets ``self.current_spectrum``""" for i, obj_id in enumerate(self.obj_ids): # Retrieve and format object data object_data = self.data_release.get_data_for_id(obj_id) if self.pre_process: object_data = self.pre_process(object_data) # If formatting data results in an empty table, next_feat to next object if not object_data: continue # Yield individual spectra for the object # object_data.sort('wavelength') object_data = object_data.group_by(self.group_by) group_iter = zip(object_data.groups.keys, object_data.groups) for group_by_val, spectrum_data in group_iter: spectrum = Spectrum( spectrum_data['wavelength'], spectrum_data['flux'], **spectrum_data.meta, # meta should include ra, dec, and z meta=spectrum_data.meta ) setattr(spectrum, self.group_by, group_by_val[0]) for column in spectrum_data.colnames: if column not in ('wavelength', 'flux'): col_constant = spectrum_data[column][0] if all(spectrum_data[column] == col_constant): setattr(spectrum, column, col_constant) yield spectrum def __iter__(self): return self._iter_data