Source code for pyimcom.diagnostics.dynrange

"""
Code to construct dynamic range estimates from block files.

Functions
---------
gen_dynrange_data
    Generates dynamic range data and writes to files.

"""

import json
import re
import sys
import warnings
from os.path import exists

import healpy
import numpy as np
from astropy import wcs

from ..compress.compressutils import ReadFile
from .outimage_utils.helper import HDU_to_bels

[docs] nscale = 1
# nscale=10 # this line is for bug compensation --- will remove it later
[docs] def gen_dynrange_data(inpath, outstem, rpix_try=50, nblockmax=100): """Takes files from inpath and writes histograms to outstem. inpath should be a function that takes ix and iy and returns a file name Optional input parameters: rpix_try : int = radius over which to compute profiles (needs to be an integer less than the padding, will truncate) nblockmax : maximum block number to consider (default 100, only need to reduce this if you want to make a report generate faster for testing) The output files are: Histograms: outstem +'_sqrtS_hist.dat': histogram of noise amplification factor sqrtS outstem +'_neff_hist.dat': histogram of effective exposure number Both of these have a header that indicates the fraction of data that is off scale high. dynamic range file: outstem +'_dynrange.dat': table of percentiles of noisy star images (Columns are radius and [1,5,25,50,75,95,99] percentiles) Returns a dictionary of which files were successfully generated: output[key] = filename (if successful), None (if not successful) Current file keys: 'SQRTS', 'NEFF', 'DYNRANGE' Header information is in keys 'SQRTS_HEADER' and 'NEFF_HEADER' Number of blocks read is in 'COUNTBLOCK' """ # initialization of output output = {"SQRTS": None, "NEFF": None, "DYNRANGE": None, "COUNTBLOCK": 0} # initialize table of pixel values vals = [] for _ in range(rpix_try): vals += [np.zeros((0,), dtype=np.float32)] is_first = True # histogram initialization N_noise = 100 d_noise = 0.02 countnoise = np.zeros((N_noise, 2)) countnoise[:, 0] = d_noise * np.linspace(0.5, N_noise - 0.5, N_noise) tnoise = 0.0 tnoise_gt = 0.0 N_neff = 100 d_neff = 0.1 countneff = np.zeros((N_neff, 2)) countneff[:, 0] = d_neff * np.linspace(0.5, N_neff - 0.5, N_neff) tneff = 0.0 tneff_gt = 0.0 # now loop over the blocks for iby in range(nblockmax): for ibx in range(nblockmax): # get the input file (if it exists -- otherwise keep going) try: infile = inpath(ibx, iby) if not exists(infile): continue except (FileNotFoundError, ValueError): continue # if this is the first block we find, get the configuration file if is_first: is_first = False config = "" with ReadFile(infile) as f: for g in f["CONFIG"].data["text"].tolist(): config += g + " " configStruct = json.loads(config) blocksize = ( int(configStruct["OUTSIZE"][0]) * int(configStruct["OUTSIZE"][1]) * float(configStruct["OUTSIZE"][2]) / 3600.0 * np.pi / 180 ) # radians rs = 1.5 * blocksize / np.sqrt(2.0) # search radius # padding region around the edge bd = int(configStruct["OUTSIZE"][1]) * int(configStruct["PAD"]) rpix = min(rpix_try, bd - 1) # figure out which layer we want layers = [""] + configStruct["EXTRAINPUT"] framenumber = 0 res = 9 nstarlayer = {} for i in range(len(layers))[::-1]: m = re.match(r"^nstar(\d+),([^,]+),([^,]+),([^,]+)$", layers[i]) if m: framenumber = i res = int(m.group(1)) nstarlayer = { "RESOLUTION": res, "FLUX": float(m.group(2)), "BACKGROUND": float(m.group(3)), "SEED": int(m.group(4)), } print("# using layer", framenumber, "resolution", res) print("# rs=", rs) # now we know this file exists with ReadFile(infile, layers=[framenumber]) as f: n = np.shape(f[0].data)[-1] mywcs = wcs.WCS(f[0].header) starmap = f[0].data[0, framenumber, :, :] # now extract histogram information try: sigma_ = 10 ** ( -0.5 * HDU_to_bels(f["SIGMA"]) * f["SIGMA"].data[0, bd : n - bd, bd : n - bd] ) # noise standard deviation in units of input noise for j in range(N_noise): countnoise[j, 1] = countnoise[j, 1] + np.count_nonzero( np.logical_and(sigma_ / d_noise >= j, sigma_ / d_noise < j + 1) ) tnoise = tnoise + np.size(sigma_) tnoise_gt = tnoise_gt + np.count_nonzero(sigma_ >= d_noise * N_noise) except KeyError: warnings.warn("No valid noise frame: " + infile) try: neff_ = 10 ** ( HDU_to_bels(f["EFFCOVER"]) * f["EFFCOVER"].data[0, bd : n - bd, bd : n - bd] * nscale ) # effective coverage for j in range(N_neff): countneff[j, 1] = countneff[j, 1] + np.count_nonzero( np.logical_and(neff_ / d_neff >= j, neff_ / d_neff < j + 1) ) tneff = tneff + np.size(neff_) tneff_gt = tneff_gt + np.count_nonzero(neff_ >= d_neff * N_neff) except KeyError: warnings.warn("No valid coverage frame: " + infile) # identify which HEALpix positions we have ra_cent, dec_cent = mywcs.all_pix2world( [(n - 1) / 2], [(n - 1) / 2], [0.0], [0.0], 0, ra_dec_order=True ) ra_cent = ra_cent[0] dec_cent = dec_cent[0] vec = healpy.ang2vec(ra_cent, dec_cent, lonlat=True) qp = healpy.query_disc(2**res, vec, rs, nest=False) ra_hpix, dec_hpix = healpy.pix2ang(2**res, qp, nest=False, lonlat=True) npix = len(ra_hpix) x, y, z1, z2 = mywcs.all_world2pix(ra_hpix, dec_hpix, np.zeros((npix,)), np.zeros((npix,)), 0) xi = np.rint(x).astype(np.int16) yi = np.rint(y).astype(np.int16) grp = np.where( np.logical_and( np.logical_and(xi >= bd, xi < n - bd), np.logical_and(yi >= bd, yi < n - bd) ) ) ra_hpix = ra_hpix[grp] dec_hpix = dec_hpix[grp] x = x[grp] xi = xi[grp] y = y[grp] yi = yi[grp] npix = len(x) print("# read grid position:", (ibx, iby), n, "number of HEALPix pixels =", npix) # complain if it didn't find a star for ipix in range(npix): if starmap[yi[ipix], xi[ipix]] < 1000: print( "block", ibx, iby, "star", ipix, "pos", xi[ipix], yi[ipix], "val", starmap[yi[ipix], xi[ipix]], ) sys.stdout.flush() # extract profile around each object x_, y_ = np.meshgrid(range(n), range(n)) tempvals = [None] * rpix for j in range(rpix): tempvals[j] = np.zeros((0,), dtype=np.float32) for ipix in range(npix): xmin = np.clip(np.floor(x[ipix]).astype(np.int16) - rpix - 1, 0, n) xmax = np.clip(np.ceil(x[ipix]).astype(np.int16) + rpix + 1, 0, n) ymin = np.clip(np.floor(y[ipix]).astype(np.int16) - rpix - 1, 0, n) ymax = np.clip(np.ceil(y[ipix]).astype(np.int16) + rpix + 1, 0, n) r = np.floor( np.sqrt( (x_[ymin:ymax, xmin:xmax] - x[ipix]) ** 2 + (y_[ymin:ymax, xmin:xmax] - y[ipix]) ** 2 ) ).astype(np.int16) for j in range(rpix): tempvals[j] = np.concatenate((tempvals[j], starmap[ymin:ymax, xmin:xmax][r == j])) for j in range(rpix): vals[j] = np.concatenate((vals[j], tempvals[j])) output["COUNTBLOCK"] += 1 outst = "" for j in range(rpix): outst += f"{j:3d} {np.size(vals[j]):8d}" for q in [1, 5, 25, 50, 75, 95, 99]: outst += f" {np.percentile(vals[j], q):12.5E}" outst += "\n" ofile = outstem + "_dynrange.dat" with open(ofile, "w") as fn: fn.write(outst) if framenumber > 0: output["DYNRANGE"] = ofile # save histograms ofile = outstem + "_sqrtS_hist.dat" np.savetxt( ofile, countnoise, header=f" {np.amax(countnoise[:, 1]):11.5E} {100 * tnoise_gt / tnoise:9.6f}" ) output["SQRTS"] = ofile output["SQRTS_HEADER"] = (np.amax(countnoise[:, 1]), 100 * tnoise_gt / tnoise) ofile = outstem + "_neff_hist.dat" np.savetxt(ofile, countneff, header=f" {np.amax(countneff[:, 1]):11.5E} {100 * tneff_gt / tneff:9.6f}") output["NEFF"] = ofile output["NEFF_HEADER"] = (np.amax(countneff[:, 1]), 100 * tneff_gt / tneff) output["NSTARLAYER"] = nstarlayer return output
""" Command line driver. Format: python3 -m dynrange <file stem> <output filename> """ if __name__ == "__main__":
[docs] def fn(ibx, iby): # noqa: D103 return sys.argv[1] + f"_{ibx:02d}_{iby:02d}.fits"
output = gen_dynrange_data(fn, sys.argv[2]) print("**", output)