"""
Gas and Dust Python Calculator
==============================
A python module for estimating extinction,
reddening and Hydrogen column densities.
"""
from __future__ import print_function
import os
import warnings
import numpy as np
import pkg_resources
from astropy import units as u
from astropy.coordinates import Galactic
from astropy.io import fits
from astropy.table import Table, join
from astropy.utils.exceptions import AstropyWarning
from astropy.wcs import WCS
from astropy_healpix import HEALPix, nside_to_pixel_area
from regions import PixCoord
[docs]class Map(object):
"""
A class with common methods for HEALpix maps.
"""
_data_path = pkg_resources.resource_filename("gdpyc", "data")
_map_type = None
_maps = []
[docs] @classmethod
def show_maps(cls):
"""
Show available maps.
"""
print("Available maps:")
for key, value in cls._maps.items():
print("- {} ({})".format(key, value))
[docs] @classmethod
def plot_map(cls, map_name, plotname=None):
"""
Full-sky plot (mollweide projection) of the map.
**Note:** this method needs ``healpy`` and ``matplotlib``.
Parameters
----------
map_name : ``str``
Name of the map to be plotted. Use ``show_maps`` method
to see a list of all available maps.
plotname : ``str`` or ``None``, optional
Name of the file where the plot will be saved. If ``None``,
the plot is shown but not saved. Defaults to ``None``.
Returns
-------
plot : ``numpy.ndarray``
2D numpy array with the plot.
"""
import healpy as hp
import matplotlib.pyplot as plt
cls._check_map(map_name)
hpmapfile = "{}_{}_healpix_lowres.fits".format(cls._map_type, map_name)
hpmapfile = os.path.join(cls._data_path, hpmapfile)
hpmap = hp.read_map(hpmapfile)
title = "{} ({})".format(map_name, cls._maps[map_name])
if cls._map_type == "h1_nh":
unit_label = "cm-2"
minval, maxval = 1e19, 3e22
else:
unit_label = "mag"
minval, maxval = 0, hpmap.max()
plot = hp.mollview(
hpmap,
title=title,
norm="hist",
min=minval,
max=maxval,
unit=unit_label,
return_projected_map=True,
)
if plotname is None:
plt.show()
else:
plt.savefig(plotname)
return plot
[docs] @classmethod
def unseen_pixels(cls, map_name, hires=False):
"""
Returns a list of UNSEEN healpix pixels contained in `map_name`.
Parameters
----------
map_name : ``str``
Name of the map to be plotted. Use ``show_maps`` method
to see a list of all available maps.
hires : ``boolean``, optional
Use high resolution map. If the map is not available locally,
it is downloaded. Defaults to ``False``.
"""
hpmap, nside, _ = cls._load_map(map_name, hires)
unseen_pixels = np.where(np.isnan(hpmap))[0]
npixels = len(unseen_pixels)
if npixels:
bad_area = npixels * nside_to_pixel_area(nside).to(u.deg ** 2)
print(
"{} contains {} UNSEEN pixels ({})".format(map_name, npixels, bad_area)
)
return unseen_pixels
@classmethod
def _load_map(cls, map_name, hires=False):
# Load healpix map for map_name.
# If hires is True and the map is not locally available,
# it is downloaded.
cls._check_map(map_name)
if hires:
resolution = "hires"
else:
resolution = "lowres"
hmapfile = "{}_{}_healpix_{}.fits".format(cls._map_type, map_name, resolution)
hmapfile = os.path.join(cls._data_path, hmapfile)
if hires and not os.path.isfile(hmapfile):
print("High resolution map is not locally available.")
print("Downloading {} map...".format(map_name))
cls._download_map(map_name)
with fits.open(hmapfile) as hdu:
nside = hdu[1].header["NSIDE"]
order = hdu[1].header["ORDERING"]
hpmap = hdu[1].data.field(0)
hpmap[hpmap == -1.6375e30] = np.nan # Mask UNSEEN pixels as nan
return hpmap, nside, order
@classmethod
def _download_map(cls, map_name):
from .data import get_map
get_map(map_name, cls._data_path)
@classmethod
def _interpolate(cls, coords, hpmap, nside, order):
hp = HEALPix(nside=nside, order=order, frame=Galactic())
i = hp.interpolate_bilinear_skycoord(coords, hpmap)
if np.any(np.isnan(i)):
warnings.warn(
"Result contains NaN values corresponding to UNSEEN coordinates.",
AstropyWarning,
)
return i
@classmethod
def _check_map(cls, map_name):
if map_name not in cls._maps.keys():
message = "Map {} unknown!\nAvailable maps: {}"
raise ValueError(message.format(map_name, cls._maps.keys()))
[docs]class GasMap(Map):
"""
Class for HI maps.
"""
_map_type = "h1_nh"
_maps = {
"DL": "Dickey & Lockman 1990, Ann. Rev. A&A, 28, 215",
"LAB": "Kalberla et al. 2005, A&A, 440, 775",
"HI4PI": "HI4PI Collaboration et al. 2016, A&A, 594, A116",
}
[docs] @classmethod
def nh(cls, coords, nhmap="LAB", hires=False):
"""
Hydrogen column density in the line-of-sight of `coords`,
using LAMBDA healpix maps [1]_.
Parameters
----------
coords : ``SkyCoord``
Astropy SkyCoord object with the line-of-sight coordinates.
nhmap : ``str``, optional
Name of the HI survey. Use ``show_maps`` method
to see a list of all available maps. Defaults to 'LAB'.
hires : ``boolean``, optional
Use high resolution map. If the map is not available locally,
it is downloaded. Defaults to ``False``.
Returns
-------
nh : ``Quantity``
An Astropy Quantity object with shape like `coords`, in cm**-2.
References
----------
.. [1] https://lambda.gsfc.nasa.gov/product/foreground/fg_diffuse.cfm
"""
nh_hpmap, nside, order = cls._load_map(nhmap, hires=hires)
nh = cls._interpolate(coords, nh_hpmap, nside, order)
return nh << u.cm ** -2
[docs] @classmethod
def nhtotal(cls, coords, hires=False):
"""
Total Hydrogen column density (NHI + 2NH2) in the line-of-sight
of `coords`, using Willingale's method [2]_.
Parameters
----------
coords : ``SkyCoord``
Astropy SkyCoord object with the line-of-sight coordinates.
hires : ``boolean``, optional
Use high resolution maps. If the map is not available locally,
it is downloaded. Defaults to ``False``.
Returns
-------
nhtotal : ``Quantity``
An Astropy Quantity object with shape like `coords`, in cm**-2.
References
----------
.. [2] Willingale et al. 2013, MNRAS, 431, 1.
"""
nHI = cls.nh(coords, nhmap="LAB", hires=hires)
ebv = DustMap.ebv(coords, dustmap="SFD", hires=hires)
nH2max = 7.3e20 * u.cm ** -2
Nc = 3.0e20 * u.cm ** -2
alpha = 1.1
nH2 = nH2max * (1 - np.exp(-nHI * ebv / Nc)) ** alpha
return nHI + 2 * nH2
[docs] @classmethod
def nhf(cls, coords, nhmap="LAB", radius=1.0 * u.deg):
"""
Hydrogen column density in the line-of-sight of `coords`,
using HEASoft fits images (resolution of 0.675 x 0.675 deg) [3]_
and method.
Parameters
----------
coords : ``SkyCoord``
Astropy SkyCoord object with the line-of-sight coordinates.
nhmap : ``str``, optional
Name of the HI survey. Use ``show_maps`` method
to see a list of all available maps. Defaults to 'LAB'.
radius : ``Quantity``, optional
Radius of the circle around `coords` where the nH value is averaged.
An Astropy angular Quantity object, consistent with degrees.
Returns
-------
nh : ``Quantity``
An Astropy Quantity object with shape like `coords`, in cm**-2..
References
----------
.. [3] Original fits files created by K. Kuntz (LAB) and
Steve Snowden (DL).
"""
if nhmap not in ["LAB", "DL"]:
raise ValueError("Only LAB and DL maps are available")
radius = radius.to(u.deg).value
if radius > 3:
raise ValueError("radius must be <= 3 deg!!!")
hmapfile = "{}_{}_heasoft.fits".format(cls._map_type, nhmap)
hmapfile = os.path.join(cls._data_path, hmapfile)
hmapimage = fits.getdata(hmapfile)
wcs = WCS(hmapfile)
# If coords is not an array of coordinates, change to a list
try:
len(coords)
except TypeError:
coords = np.array([coords])
# TODO: vectorize this loop
# numpy iterator for preserving the shape of coords in nh
nh = np.empty_like(coords)
it = np.nditer(coords, flags=["multi_index", "refs_ok"])
while not it.finished:
nh[it.multi_index] = cls._nhftools(
coords[it.multi_index], hmapimage, wcs, radius
)
it.iternext()
if len(nh) == 1:
nh = float(nh)
return nh * u.cm ** -2
@classmethod
def _nhftools(cls, center_sky, image, wcs, radius=1.0):
"""
Quick and dirty python implementation of ftool's nh
"""
# Select a 3 x 3 deg subimage (default in ftool's nh)
box_pixcoord = cls._subimage(center_sky, wcs, size=3.0)
# Select only pixels with non-zero values to avoid border effects
out_mask = image[box_pixcoord.y, box_pixcoord.x] > 0
# Estimate sky distance of each pixel (in deg) to center
box_skycoord = box_pixcoord.to_sky(wcs)
distance = np.empty_like(box_skycoord)
distance[out_mask] = box_skycoord[out_mask].separation(center_sky)
distance[~out_mask] = -99
# Select pixels within radius
good_mask = np.logical_and(distance <= radius, distance > 0)
good_nh = image[box_pixcoord[good_mask].y, box_pixcoord[good_mask].x]
# Estimate weights
weights = (radius - distance[good_mask]) / radius
if len(weights):
nh = np.sum(good_nh * weights) / weights.sum()
else:
message = (
"No points are within {} deg from input position. "
"First good point is at distance {} deg"
)
message = message.format(radius, distance[out_mask].min())
warnings.warn(message, AstropyWarning)
nh = np.nan
return nh
@staticmethod
def _subimage(center_sky, wcs, size=3.0):
center_x, center_y = center_sky.to_pixel(wcs)
npixtot = size / wcs.wcs.cdelt[1]
# Box limits
fpix_x = int(np.round(center_x - (npixtot - 1) / 2.0))
fpix_y = int(np.round(center_y - (npixtot - 1) / 2.0))
lpix_x = int(np.round(center_x + (npixtot - 1) / 2.0))
lpix_y = int(np.round(center_y + (npixtot - 1) / 2.0))
xbox = np.arange(fpix_x, lpix_x + 1)
ybox = np.arange(fpix_y, lpix_y + 1)
# Define box within the pixel limits of the fits image
naxis2, naxis1 = wcs.array_shape
mask_x = np.logical_and(xbox >= 0, xbox < naxis1)
mask_y = np.logical_and(ybox >= 0, ybox < naxis2)
# Grid of pixel coordinates for the subimage
grid_x, grid_y = np.meshgrid(xbox[mask_x], ybox[mask_y])
return PixCoord(x=grid_x, y=grid_y)
[docs]class DustMap(Map):
"""
Class for dust maps.
"""
_map_type = "dust_ebv"
_maps = {
"SFD": "Schlegel, Finkbeiner & Davis 1998, ApJ, 500, 2",
"Planck13": "Planck Collaboration et al. 2013, A&A, 571, A11",
}
[docs] @classmethod
def ebv(cls, coords, dustmap="SFD", hires=False):
"""
E(B - V) in the line-of-sight of `coords`.
Parameters
----------
coords : ``SkyCoord``
Astropy SkyCoord object with the line-of-sight coordinates.
dustmap : ``str``, optional
Name of the dust survey. Use ``show_maps`` method
to see a list of all available maps. Defaults to 'SFD'.
hires : ``boolean``, optional
Use high resolution map. If the map is not available locally,
it is downloaded. Defaults to `False`.
Returns
-------
ebv : ``float`` or ``numpy.ndarray``
Float, or numpy array with shape like `coords`.
"""
ebv_hpmap, nside, order = cls._load_map(dustmap, hires=hires)
ebv = cls._interpolate(coords, ebv_hpmap, nside, order)
return ebv
[docs] @classmethod
def extinction(cls, coords, dustmap="SFD", filters="default", hires=False):
"""
Galactic extinction in the line-of-sight of `coords` for the
bandpasses defined in `filters`. If there are N coords and
M filters, returns an astropy table of shape N x M.
**Note:** E(B - V) to extinction conversion coefficients were
estimated in [4]_ for the SFD map [5]_. Using this method with a
different dust map is possible, but obtaining accurate values is
highly unlikely.
Parameters
----------
coords : ``SkyCoord``
Astropy SkyCoord object with the line-of-sight coordinates.
dustmap : ``str``, optional
Name of the dust survey. Use ``show_maps`` method
to see a list of all available maps. Defaults to 'SFD'.
filters : ``str`` or ``list``, optional
List of the filters (or, equivalently, a single string with
comma separated filters). Use ``show_filters`` method
to see a list of all available maps. If filters is 'default',
returns extinction for the five SDSS bands (u, g, r, i, z).
Defaults to 'default'.
hires : ``boolean``, optional
Use high resolution map. If the map is not available locally,
it is downloaded. Defaults to ``False``.
Returns
-------
ext : ``Table``
Astropy Table with the extinction values at `coords`
for each selected filter.
References
----------
.. [4] Schlafly & Finkbeiner 2011, ApJ, 737, 2, 103.
.. [5] Schlegel, Finkbeiner & Davis 1998, ApJ, 500, 2.
"""
list_filters = cls._parse_filters(filters)
ebv = cls.ebv(coords, dustmap=dustmap, hires=hires)
aebv = cls._ebv_to_ext(list_filters)
if dustmap != "SFD":
warnings.warn(
"Extinction for a dust map other than SFD.\n"
"Results are not reliable!!!",
AstropyWarning,
)
if isinstance(ebv, np.ndarray):
ext = np.matmul(ebv[:, np.newaxis], aebv[np.newaxis, :])
else:
ext = ebv * aebv
return Table(ext, names=list_filters)
[docs] @classmethod
def show_filters(cls, lambda_eff=False):
"""
List of the available filters in the S&F conversion table [4]_.
Parameters
----------
lambda_eff : ``boolean``, optional
If ``True``, also returns the effective wavelength of each filter.
Deafults to ``False``.
"""
aebv = cls._load_sfcoeff()
if lambda_eff:
return aebv["filter"], aebv["lambda_eff"]
else:
return aebv["filter"]
@classmethod
def _ebv_to_ext(cls, filters):
# Conversion coefficients from E(B - V) to extinction in `filters`.
good_filters = cls.show_filters()
for f in filters:
if f not in good_filters:
raise ValueError("Unknown filter: {}".format(f))
aebv = cls._load_sfcoeff()
sortidx = list(range(len(filters)))
filters = Table([filters, sortidx], names=["filter", "sortidx"])
sfcoeff_filters = join(filters, aebv, keys=["filter"], join_type="left")
sfcoeff_filters.sort("sortidx")
return np.array(sfcoeff_filters["AEBV2"]) # AEBV2 assumes RV = 3.1
@classmethod
def _load_sfcoeff(cls):
# Load conversion coefficients from the SFD maps of E(B - V) to
# extinction in several bandpasses, by Schlafly and Finkbeiner (2011).
sfcoeff = os.path.join(cls._data_path, "sfcoeff.fits")
return Table.read(sfcoeff, format="fits")
@classmethod
def _parse_filters(cls, filters):
if isinstance(filters, str):
if filters == "default":
filters = "SDSS_u, SDSS_g, SDSS_r, SDSS_i, SDSS_z"
list_filters = filters.replace(" ", "").split(",")
else:
list_filters = filters
return list_filters