Source code for specviz.io.yaml_loader

"""This module contains functions that perform the actual data parsing."""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

# STDLIB
import logging
import os

# THIRD-PARTY
import numpy as np
from astropy import units as u
from astropy.io import ascii, fits
from astropy.table import Table
from astropy.wcs import WCS
from astropy.nddata import StdDevUncertainty

# LOCAL
from specviz.core.data import Spectrum1DRef
from specviz.core import linelist
from specviz.core.linelist import LineList

__all__ = [
    'AsciiYamlRegister',
    'FitsYamlRegister',
    'LineListYamlRegister',
    'YamlRegister',
]

# Loader automatically falls back to these units for some cases
default_waveunit = u.Unit('Angstrom')
default_fluxunit = u.Unit('erg / (Angstrom cm2 s)')


[docs]class YamlRegister(object): """ Class to encapsulate the IO registry information for a set of yaml-loaded attributes. """ def __init__(self, reference): """ Initialize this particular YamlRegister. Parameters ---------- reference : dict-like The yaml reference object created by loading a yaml file. """ self._reference = reference
[docs] def identify(self, *args, **kwargs): return (isinstance(args[0], str) and args[0].lower().split('.')[-1] in self._reference.extension)
[docs] def reader(self, filename, **kwargs): raise NotImplementedError()
# NOTE: This is used by both FITS and ASCII. def _set_uncertainty(err_array, err_type): """Uncertainty is dictated by its type. Parameters ---------- err_array : array Uncertainty values. err_type : {'ivar', 'std'} Inverse variance or standard deviation. Returns ------- uncertainty : `~astropy.nddata.nduncertainty.StdDevUncertainty` Standard deviation uncertainty. """ if err_type == 'ivar': err = np.sqrt(1.0 / err_array) else: # 'std' err = err_array return StdDevUncertainty(err) def _flux_unit_from_header(header, key='BUNIT'): """Get flux unit from header. Parameters ---------- header : dict Extracted header. key : str Keyword name. Returns ------- unit : `~astropy.units.core.Unit` Flux unit. This falls back to default flux unit if look-up failed. """ unitname = header.get(key, default_fluxunit.to_string()).lower() # TODO: A more elegant way is to use astropy.units.def_unit() if unitname == 'electrons/s': unitname = 'electron/s' try: unit = u.Unit(unitname) except ValueError as e: unit = default_fluxunit logging.warning(str(e)) return unit def _read_table(hdu, col_idx=0): """Parse FITS table using Astropy first, but use brute force if the former fails. Astropy parsing is very good at extracting unit and mask, along with the data, if FITS table is formatted properly. Brute force guarantees the data but provides no unit nor mask. Parameters ---------- hdu : obj HDU object. col_idx : int Column index to extract the data directly from HDU. This is only used if Astropy parsing fails. Returns ------- tab : `~astropy.table.table.Table` Parsed table. """ # Let Astropy parse the table for us. try: tab = Table.read(hdu, format='fits') # Build manually if we have to. except: tab = Table([hdu.data[col_idx].flatten()]) return tab def _read_table_column(tab, col_idx, to_unit=None, equivalencies=[]): """Read a given Astropy Table column. Parameters ---------- tab : `~astropy.table.Table` FITS table parsed by Astropy. col_idx : int Column index. to_unit : `~astropy.units.core.Unit` or `None` If given, convert data to this unit. equivalencies : list Astropy unit conversion equivalencies, if needed. This might be needed for some flux or wavelength conversions. Returns ------- data : array 1D array of the values. unit : `~astropy.units.core.Unit` Unit, if any. mask : array or `None` Mask of the data, if any. """ cols = tab.colnames # Special handling for single-column table generated by brute force if len(cols) == 1: col_idx = 0 coldat = tab[cols[col_idx]] data = coldat.data unit = coldat.unit # Sometimes, Astropy returns masked column. if hasattr(data, 'mask'): mask = data.mask.flatten() data = data.data.flatten() else: mask = None data = data.flatten() # If data has no unit, just assume it is the output unit. # Otherwise, perform unit conversion. if isinstance(to_unit, u.Unit) and to_unit != u.dimensionless_unscaled: unit = to_unit if unit != u.dimensionless_unscaled and unit != to_unit: data = coldat.to(to_unit, equivalencies).value return data, unit, mask
[docs]class FitsYamlRegister(YamlRegister): """ Defines the generation of `Spectrum1DRef` objects by parsing FITS files with information from YAML files. """
[docs] def reader(self, filename, **kwargs): """This generic function will query the loader factory, which has already loaded the YAML configuration files, in an attempt to parse the associated FITS file. Parameters ---------- filename : str Input filename. filter : str File type for YAML look-up. kwargs : dict Keywords for Astropy reader. """ logging.info("Attempting to open '{}' using filter '{}'.".format( filename, filter)) name = os.path.basename(filename.rstrip(os.sep)).rsplit('.', 1)[0] hdulist = fits.open(filename, **kwargs) meta = self._reference.meta header = dict(hdulist[self._reference.wcs['hdu']].header) meta['header'] = header wcs = WCS(hdulist[self._reference.wcs['hdu']].header) # Usually, all the data should be in this table tab = _read_table(hdulist[self._reference.data['hdu']], col_idx=self._reference.data['col']) # Read flux column data, unit, mask = _read_table_column(tab, self._reference.data['col']) # First try to get flux unit from YAML if self._reference.data.get('unit') is not None: unit = u.Unit(self._reference.data['unit']) # Get flux unit from header if there wasn't one in the table column elif unit is None: unit = _flux_unit_from_header(meta['header']) # Get data mask, if not in column. # 0/False = good data (unlike Layers) if mask is None: mask = np.zeros(data.shape, dtype=np.bool) else: mask = mask.astype(np.bool) # Read in DQ column if it exists # 0/False = good (everything else bad) if hasattr(self._reference, 'mask') and self._reference.mask.get('hdu') is not None: if self._reference.mask['hdu'] == self._reference.data['hdu']: dqtab = tab else: dqtab = _read_table( hdulist[self._reference.mask['hdu']], col_idx=self._reference.mask['col']) mask2 = _read_table_column(dqtab, self._reference.mask['col'])[0] # Data only mask |= mask2.astype(np.bool) # Combine with existing mask # Wavelength constructed from WCS by default dispersion = None disp_unit = None # Read in wavelength column if it exists if hasattr(self._reference, 'dispersion'): if self._reference.dispersion.get('hdu') is not None: if self._reference.dispersion['hdu'] == self._reference.data['hdu']: wavtab = tab else: wavtab = _read_table(hdulist[self._reference.dispersion['hdu']], col_idx=self._reference.dispersion['col']) dispersion, disp_unit = _read_table_column( wavtab, self._reference.dispersion['col'])[:2] # Ignore mask # Overrides wavelength unit from YAML if self._reference.dispersion.get('unit') is not None: disp_unit = u.Unit(self._reference.dispersion['unit']) # If no unit, try to use WCS if disp_unit == u.dimensionless_unscaled: disp_unit = None # Read flux uncertainty if hasattr(self._reference, 'uncertainty') and self._reference.uncertainty.get('hdu') is not None: if self._reference.uncertainty['hdu'] == self._reference.data['hdu']: errtab = tab else: errtab = _read_table( hdulist[self._reference.uncertainty['hdu']], col_idx=self._reference.uncertainty['col']) uncertainty = _read_table_column( errtab, self._reference.uncertainty['col'], to_unit=unit)[0] # Data only uncertainty_type = self._reference.uncertainty.get('type', 'std') else: uncertainty = np.zeros(data.shape) uncertainty_type = 'std' # This is dictated by the type of the uncertainty. uncertainty = _set_uncertainty(uncertainty, uncertainty_type) hdulist.close() return Spectrum1DRef(name=name, data=data, unit=unit, uncertainty=uncertainty, mask=mask, wcs=wcs, dispersion=dispersion, dispersion_unit=disp_unit)
[docs]class AsciiYamlRegister(YamlRegister): """ Defines the generation of `Spectrum1DRef` objects by parsing ASCII files with information from YAML files. """
[docs] def reader(self, filename, **kwargs): """Like :func:`fits_reader` but for ASCII file.""" name = os.path.basename(filename.rstrip(os.sep)).rsplit('.', 1)[0] tab = ascii.read(filename, **kwargs) cols = tab.colnames meta = self._reference.meta meta['header'] = {} # Only loads KEY=VAL comment entries into header if 'comments' in tab.meta: for s in tab.meta['comments']: if '=' not in s: continue s2 = s.split('=') meta['header'][s2[0]] = s2[1] wcs = None wave = tab[cols[self._reference.dispersion['col']]] dispersion = wave.data flux = tab[cols[self._reference.data['col']]] data = flux.data uncertainty = np.zeros(data.shape) uncertainty_type = 'std' if flux.unit is None: unit = u.Unit(self._reference.data.get('unit', default_fluxunit)) else: unit = flux.unit if wave.unit is None: disp_unit = u.Unit(self._reference.dispersion.get('unit', default_waveunit)) else: disp_unit = wave.unit # Since there's no WCS, include the dispersion unit in the meta data meta['header']['cunit'] = [disp_unit.to_string(), unit.to_string()] # 0/False = good data (unlike Layers) mask = np.zeros(data.shape, dtype=np.bool) if hasattr(self._reference, 'uncertainty') and self._reference.uncertainty.get('col') is not None: try: uncertainty = tab[cols[self._reference.uncertainty['col']]].data except IndexError: pass # Input has no uncertainty column else: uncertainty_type = self._reference.uncertainty.get('type', 'std') # This is dictated by the type of the uncertainty. uncertainty = _set_uncertainty(uncertainty, uncertainty_type) if hasattr(self._reference, 'mask') and self._reference.mask.get('col') is not None: try: mask = tab[cols[self._reference.mask['col']]].data.astype(np.bool) except IndexError: pass # Input has no mask column return Spectrum1DRef(name=str(name), data=data, dispersion=dispersion, uncertainty=uncertainty, mask=mask, wcs=wcs, unit=unit, dispersion_unit=disp_unit, meta=meta)
[docs]class LineListYamlRegister(YamlRegister): """ Defines the generation of `Spectrum1DRef` objects by parsing LineList files with information from YAML files. """
[docs] def reader(self, filename, **kwargs): names_list = [] start_list = [] end_list = [] units_list = [] for k in range(len((self._reference.columns))): name = self._reference.columns[k][linelist.COLUMN_NAME] names_list.append(name) start = self._reference.columns[k][linelist.COLUMN_START] end = self._reference.columns[k][linelist.COLUMN_END] start_list.append(start) end_list.append(end) if linelist.UNITS_COLUMN in self._reference.columns[k]: units = self._reference.columns[k][linelist.UNITS_COLUMN] else: units = '' units_list.append(units) tab = ascii.read(filename, format = self._reference.format, names = names_list, col_starts = start_list, col_ends = end_list) for k, colname in enumerate(tab.columns): tab[colname].unit = units_list[k] # The table name (for e.g. display purposes) # is taken from the 'name' element in the # YAML file descriptor. return LineList(tab)