import os
import re
from concurrent.futures import ProcessPoolExecutor, as_completed
# import from furry_parakeet
from ..config import Config
# local imports
from .imsubtract import run_imsubtract_single
[docs]
def run_imsubtract_all(cfg_file, workers=4, max_imgs=None, display=None, local_output=False, mmap=None):
"""
Main routine to run imsubtract on all images in the cache.
Parameters
----------
cfg_file: str
Path to the config file.
workers: int, optional
Number of workers to use for parallel processing. Default is 4.
max_imgs: int, optional
If provided, does computations for a maximum number of SCAs. Most users will
want the default of None; this is provided mainly for testing.
display: str or None, optional
Display location for intermediate steps. Default is None.
local_output: bool, optional
Whether to direct the file to local output instead of the cache directory.
mmap : str or str-like, optional
Directory to put temporary mmap files.
"""
# Additional imports
import multiprocessing as mp
import traceback
# load the file using Config and get information
cfgdata = Config(cfg_file)
cacheinfo = cfgdata.inlayercache
# separate the path from the inlayercache info
m = re.search(r"^(.*)\/(.*)", cacheinfo)
if m:
path = m.group(1)
stem = m.group(2)
# create empty list of exposures
exps = []
# find all the fits files and add them to the list
for _, _, files in os.walk(path):
for file in files:
if file.startswith(stem) and file.endswith(".fits") and file[-6].isdigit():
exps.append(file)
# print("List of exposures:", exps)
# Run imsubtract on each exposure in parallel using ProcessPoolExecutor
count = 0
start_method = "forkserver" if os.name.lower() == "posix" else "spawn"
ctx = mp.get_context(start_method)
nfail = 0
with ProcessPoolExecutor(max_workers=workers, mp_context=ctx) as executor:
futures = []
for exp in exps:
if max_imgs is not None and count > max_imgs:
break
m2 = re.search(r"(\w*)_0*(\d*)_(\d*).fits", exp)
if m2:
obsid = int(m2.group(2))
scaid = int(m2.group(3))
futures.append(
executor.submit(
run_imsubtract_single,
cfgdata,
scaid,
obsid,
path,
exp,
display=display,
fft_workers=None,
local_output=local_output,
max_layers=max_imgs,
mmap=mmap,
)
)
count += 1
# Wait for all futures to complete
for future in as_completed(futures):
try:
future.result()
print(f"Completed {count}/{len(futures)}", flush=True)
except Exception as e:
nfail += 1
print(f"Worker failed with exception {e}", flush=True)
traceback.print_exc()
if nfail > 0:
print(f"{nfail}/{len(futures)} instances of run_imsubtract_single failed.", flush=True)