"""
Compression tools for PyIMCOM outputs.
Classes
-------
CompressedOutput
Main class for compresseing a FITS file.
Functions
---------
_parser
File name parser; only needed for file names with regular expressions.
ReadFile
Stand-alone function to read a compressed FITS file.
"""
import re
from urllib.parse import urlparse
import numpy as np
from astropy.io import fits
from ..config import Config
# specific compression tools we need
from .i24 import i24compress, i24decompress
[docs]
class CompressedOutput:
"""Class for compressing pyimcom output files.
Parameters
----------
fname : str
File name for uncompressed file.
format : str or None, optional
Compression format.
layers : list, optional
Which layers to de-compress (if given; otherwise de-compresses everything).
Use only for reading (don't write an instance initialized with `layers` to a file).
extraargs : dict, optional
Extra arguments for astropy.io.fits.
Attributes
----------
gzip : bool
Is gzipped?
ftype : str
File type. Currently the only option is 'fits'.
cprstype : str
Compression type (for a full file, not currently used).
origfile : str
Original file name (when opened)
hdul : astropy.io.fits.HDUList
HDU List of information (initially a deep copy of the input file).
cfg : pyimcom.config.Config
Configuration file used to generate this image.
Methods
-------
__init__
Constructor.
compress_2d_image
Wrapper for 2d image compression (staticmethod).
get_compression_dict
Extract compression scheme for a PyIMCOM layer as a dictionary.
decompress_2d_image
Wrapper for 2d image decompression (staticmethod).
compress_layer
Compresses a layer.
decompress
Decompresses the whole file.
recompress
Recompress previously compressed layers.
to_file
Saves to a file.
close
Close associated file.
Notes
-----
At some point, we may add file formats other than FITS, but not yet.
This object is big since it stores a deep copy of the whole file.
Therefore, it is not a good idea to open lots of compressed files at once.
"""
[docs]
def __init__(self, fname, format=None, layers=None, extraargs={}):
# figure out what type of file it is, and if it is gzipped
# determines which layers to decompress
[docs]
self.decompress_layers = layers
if fname[-3:] == ".gz":
self.gzip = True
if format is None:
pref = fname[:-3] if self.gzip else fname
# right now supports fits files
if pref[-5:] == ".fits":
self.ftype = "fits"
self.hdul = fits.open(fname, mode="readonly", decompress_in_memory=True, **extraargs)
if "CPRSTYPE" in self.hdul[0].header:
self.cprstype = self.hdul[0].header["CPRSTYPE"]
else:
self.cprstype = ""
self.hdul[0].header["CPRSTYPE"] = ""
self.cfg = Config(fname, inmode="block")
else:
raise Exception("unrecognized file type")
@staticmethod
[docs]
def compress_2d_image(im, scheme, pars):
"""Wrapper to compress a 2D image.
Parameters
----------
im : np.array
2D image to be compressed.
scheme : str
Name of the compression scheme.
pars : dict
Parameters to be passed to the compression algorithm.
Returns
-------
imout : np.array
The compressed 2D image, same shape as `im`.
ovflow : astropy.io.fits.BinTableHDU
A table of values that overflowed the quantization range for the
compression scheme. Returns None if not used.
"""
if scheme[:3] == "I24":
imout, ovflow = i24compress(im, scheme, pars)
return imout, ovflow
# unrecognized scheme or NULL
return np.copy(im), None
@classmethod
[docs]
def get_compression_dict(cls, hdulist, ilayer):
"""Extract compression scheme for a PyIMCOM layer as a dictionary.
Parameters
----------
hdulist : astropy.io.fits.HDUList
The FITS file as an HDUList.
ilayer : int
The layer number to extract.
Returns
-------
dict
The compression scheme parameter dictionary. Note that all values are
returned as strings, the calling routine must typecast them as appropriate.
If the layer was not
compressed, returns an empty list, ``{}``.
"""
# extract the compression data if that HDU is present
try:
cprs = [x.strip().split(":") for x in hdulist["CPRESS"].data["text"]]
except KeyError:
return {} # stop here for uncompressed files
cprs_thislayer = [item for item in cprs if int(item[0], 16) == ilayer]
return dict(
zip(
[item[1].strip() for item in cprs_thislayer],
[item[2].strip() for item in cprs_thislayer],
strict=False,
)
)
@staticmethod
[docs]
def decompress_2d_image(im, scheme, pars, overflow=None):
"""Wrapper to decompress a 2D image; overflow table used for some formats.
Parameters
----------
im : np.array
The compressed image.
scheme : str
Name of the compression scheme.
pars : dict
Parameters to be passed to the compression algorithm.
overflow : astropy.io.fits.BinTableHDU or None, optional
(If used.) A table of values that overflowed the quantization range for the
compression scheme. Returns None if not used.
Returns
-------
imout : np.array
The de-compressed 2D image, same shape as `im`.
"""
if scheme[:3] == "I24":
return i24decompress(im, scheme, pars, overflow=overflow)
# unrecognized scheme or NULL
return np.copy(im)
[docs]
def compress_layer(self, layerid, scheme=None, pars={}):
"""Compresses the given layerid with the indicated scheme.
The pars is a dictionary of parameters that go with that scheme.
It must be in a format that supports a FITS header.
If scheme is None, then the algorithm will re-compress the layer in the same way was
done previously (if it was compressed before), otherwise it will do the NULL compression.
Parameters
----------
layerid : int
Number of the layer to be compressed.
scheme : str or None, optional
Name of the compression scheme. Defaults to None (no compression).
pars : dict, optional
Parameters to be passed to the compression algorithm.
Returns
-------
None
"""
# refuse to compress the science layer
if layerid == 0:
return
# this failure to write the EXTNAME shouldn't happen, but just in case
if layerid >= 16**4:
return
# make a blank table if there isn't compression information yet
if "CPRESS" not in [hdu.name for hdu in self.hdul]:
hdu = fits.TableHDU.from_columns([fits.Column(name="text", format="A512", array=[], ascii=True)])
hdu.name = "CPRESS"
self.hdul.append(hdu)
# now get compression information
rows = self.hdul["CPRESS"].data["text"].tolist()
# None means we should check if this data has been compressed
# before and use that method.
if scheme is None:
compressiondict = {}
for kv in rows:
layer_, key_, val_ = kv.strip().split(":")
key_ = key_.strip()
val_ = val_.strip()
if int(layer_, 0x10) == layerid:
compressiondict[key_] = val_
if "SCHEME" in compressiondict:
# this was done before
# re-compress without adding new keywords
cd_data, cd_overflow = CompressedOutput.compress_2d_image(
self.hdul[0].data[0, layerid, :, :], compressiondict["SCHEME"], compressiondict
)
self.hdul[0].data[0, layerid, :, :] = 0
newhdu = fits.ImageHDU(cd_data)
for p in compressiondict:
newhdu.header[p] = compressiondict[p]
newhdu.name = f"HSHX{layerid:04X}"
self.hdul.append(newhdu)
cd_overflow.name = f"HSHV{layerid:04X}"
self.hdul.append(cd_overflow)
# print('re-compressed', compressiondict)
return
# we will do the null compression in this case
scheme = "NULL"
cd_data, cd_overflow = CompressedOutput.compress_2d_image(
self.hdul[0].data[0, layerid, :, :], scheme, pars
)
self.hdul[0].data[0, layerid, :, :] = 0
newhdu = fits.ImageHDU(cd_data)
for p in pars:
newhdu.header[p] = pars[p]
rows += [f"{layerid:04X}:{p:8s}:{str(pars[p]):s}"]
newhdu.header["SCHEME"] = scheme
rows += ["{:04X}:{:8s}:{:s}".format(layerid, "SCHEME", scheme)]
newhdu.name = f"HSHX{layerid:04X}"
self.hdul.append(newhdu)
cd_overflow.name = f"HSHV{layerid:04X}"
self.hdul.append(cd_overflow)
# which HDU has the compression data?
j_ = -1
for j in range(len(self.hdul)):
if self.hdul[j].name == "CPRESS":
j_ = j
if j_ == -1:
raise Exception("Can't find CPRESS: this shouldn't happen.")
# now overwrite that one
hdu = fits.TableHDU.from_columns([fits.Column(name="text", format="A512", array=rows, ascii=True)])
hdu.name = "CPRESS"
self.hdul[j_] = hdu
[docs]
def decompress(self):
"""Decompresses all the layers that were compressed by compress_layer."""
j = 0
while j < len(self.hdul):
if self.hdul[j].name[:4] == "HSHX":
layer_target = int(self.hdul[j].name[-4:], 0x10) # this is a layer number
if (self.decompress_layers is not None) and (layer_target not in self.decompress_layers):
j = j + 1
continue
self.hdul[0].data[0, layer_target, :, :] = CompressedOutput.decompress_2d_image(
self.hdul[j].data,
self.hdul[j].header["SCHEME"],
self.hdul[j].header,
overflow=self.hdul["HSHV" + self.hdul[j].name[-4:]],
)
del self.hdul[j]
else:
# we only increment j if the we didn't remove anything, since if we removed an HDU then
# what was hdul[j+1] is now hdul[j]
j = j + 1
# remove the overflow tables
j = 0
while j < len(self.hdul):
if self.hdul[j].name[:4] == "HSHV":
del self.hdul[j]
else:
j = j + 1
[docs]
def recompress(self):
"""Recompresses all the layers that were previously compressed by compress_layer."""
# if this wasn't compressed before, nothing to do
if "CPRESS" not in [hdu.name for hdu in self.hdul]:
return
# figure out which layers were previously compressed
nlayer = np.shape(self.hdul[0].data)[-3]
wascompressed = np.zeros(nlayer, dtype=bool)
for compressnote in self.hdul["CPRESS"].data["text"].tolist():
ilayer = int(compressnote.split(":")[0], 16) # which layer this referred to
wascompressed[ilayer] = True
# print(wascompressed)
for ilayer in range(nlayer):
if wascompressed[ilayer]:
self.compress_layer(ilayer)
[docs]
def to_file(self, fname, overwrite=False):
"""
Saves to a file.
Parameters
----------
fname : str
File name.
overwrite : bool, optional
Whether to overwrite the file.
Returns
-------
None
"""
self.hdul.writeto(fname, overwrite=overwrite)
[docs]
def close(self):
"""Closes the associated file."""
self.hdul.close()
[docs]
def __enter__(self):
"""Entry method."""
return self
[docs]
def __exit__(self, exc_type, exc_val, exc_tb):
"""Has context manager automatically close."""
self.close()
return False # do not suppress exception
[docs]
def _parser(fname):
"""
Re-formats a file name containing a regular expression.
Regular expressions are separated with the ^ character and contain a row and column index,
followed by the suffix. For example::
>>> _parser('hello_world/Q_02_31.fits') # no ^, regular file name
'hello_world/Q_02_31.fits'
>>> _parser('hello_world/Row{1:2d}/Q_{0:02d}_{1:02d}^_02_31.fits') # FITS file
'hello_world/Row31/Q_02_31.fits'
>>> _parser('hello_world/Row{1:2d}/Q_{0:02d}_{1:02d}^_02_12.fits.gz') # gzipped; suffix is copied over
'hello_world/Row12/Q_02_12.fits.gz'
This is useful if the files are not all in the same directory.
Parameters
----------
fname : str or str-like
Regular file name (not including ^) or regular expression.
Returns
-------
str
The formatted file name.
"""
fname = str(fname) # converts other objects, e.g., pathlib.Path, to a string
# normal file name: nothing to be done
if "^" not in fname:
return fname
# pattern match
parts = fname.split("^")
sub = parts[1].split(".")
coordstring = sub[0]
m = re.match(r"_(\d+)_(\d+)(\D*)", coordstring)
if m is not None:
ix = int(m.group(1))
iy = int(m.group(2))
term = m.group(3)
suffix = term + "." + ".".join(sub[1:])
outname = "^".join(parts[:-1])
outname = outname.format(ix, iy) + suffix
return outname
[docs]
def ReadFile(fname, layers=None):
"""Wrapper to read a compressed file.
This should read a file just like astropy.io.fits.open(fname, mode='readonly'),
but works even if the file is compressed using compressutils.
Parameters
----------
fname : str or str-like
File name to read.
layers : list, optional
Which layers to de-compress (if given; otherwise de-compresses everything).
Use only for reading (don't write an instance initialized with `layers` to a file).
Notes
-----
This can also be used with the Python context manager, e.g.::
with ReadFile('my.fits.gz') as f:
...
File names with sepcific types of regular expressions are allowed, and unpacked by the ``_parser``
function. Regular expressions are separated with the ^ character and contain a row and column index,
followed by the suffix. For example::
>>> _parser('hello_world/Q_02_31.fits') # no ^, regular file name
'hello_world/Q_02_31.fits'
>>> _parser('hello_world/Row{1:2d}/Q_{0:02d}_{1:02d}^_02_31.fits') # FITS file
'hello_world/Row31/Q_02_31.fits'
>>> _parser('hello_world/Row{1:2d}/Q_{0:02d}_{1:02d}^_02_12.fits.gz') # gzipped; suffix is copied over
'hello_world/Row12/Q_02_12.fits.gz'
"""
fname = _parser(fname) # if the file name is a regular expression.
_o = urlparse(fname)
extraargs = {}
match _o.scheme:
case "" | "file":
pass
case "http" | "https":
extraargs["use_fsspec"] = True
case "s3":
extraargs["use_fsspec"] = True
extraargs["fsspec_kwargs"] = {"anon": True}
# extraargs["fsspec_kwargs"] = {"key": "YOUR-SECRET-KEY-ID", "secret": "YOUR-SECRET-KEY"}
case _:
raise ValueError(f"Scheme {_o.scheme} not supported")
_url = _o.geturl()
f = fits.open(_url, **extraargs)
if "CPRESS" not in [hdu.name for hdu in f]:
return f
else:
f.close()
# otherwise, make a decompressed version
x = CompressedOutput(fname, layers=layers, extraargs=extraargs)
x.decompress()
return fits.HDUList(x.hdul)