Source code for updatedb


import argparse, logging, os, sys
from datetime import datetime, timedelta
from astropy.io import fits
import numpy as np
import sqlite3


from base.show_info import get_last_record, get_key_table, get_path_to_seg
from base.misc import xdays


MainTable = 'farside'
InfoTable = 'farside_meta'

KeyList = [
        ('t_rec',    str),
        ('crpix1',   float),
        ('crpix2',   float),
        ('cdelt1',   float),
        ('cdelt2',   float),
        ('crota2',   float),
        ('crln_obs', float),
        ('crlt_obs', float),
        ('dsun_obs', float),
        ]


[docs] def create_info_table(con, infotbl, fmt_postel, fmt_corr, fmt_31hr, fmt_79hr, logger=None, T=None): """ Thiis function creates the info table that contains the paths to the processed images. Parameters ---------- con : sqlite3.Connection The SQLite3 database connection. infotbl : str The name of the info table. fmt_postel : str Path format for Postel images. fmt_corr : str Path format for corrected images. fmt_31hr : str Path format for 31-hour images. fmt_79hr : str Path format for 79-hour images. logger : Optional[logging.RootLogger], optional The logger for logging messages (default is None). T : datetime, optional List of datetime, used to estimate walltime (default is None). Returns ------- None """ if T is None: T = [datetime.now()] if logger is None: logger = logging.getLogger() # --- create info table --- with con: con.executescript(f""" DROP TABLE IF EXISTS {infotbl}; CREATE TABLE {infotbl}( key TEXT NOT NULL UNIQUE, val); """) rows = [ ('fmt_postel', fmt_postel), ('fmt_corr', fmt_corr), ('fmt_31hr', fmt_31hr), ('fmt_79hr', fmt_79hr), ] con.executemany(f'INSERT INTO {infotbl} VALUES (?,?)', rows) T.append(datetime.now()) logger.info(f'{T[-1]-T[-2]} create table "{infotbl}"') return
[docs] def create_main_table(con, maintbl, dstart, dstop, fmt_postel, fmt_corr, fmt_31hr, fmt_79hr, logger=None, T=None): """ This function creates the main table that contains the list of timestamps and processing status, while the corresponding hmi.m_720s columns are filled with None. Parameters ---------- con : sqlite3.connect Database connection object. maintbl : str Name of the main table. dstart : datetime Start datetime. dstop : datetime Stop datetime. fmt_postel : str Image path format for POSTEL images. fmt_corr : str Image path format for corrected images. fmt_31hr : str Image path format for 31-hour images. fmt_79hr : str Image path format for 79-hour images. logger : logging.RootLogger Logger instance. T : list of datetime List of timestamps used to estimate walltime. """ if T is None: T = [datetime.now()] if logger is None: logger = logging.getLogger() # --- create main table --- with con: con.executescript(f""" DROP TABLE IF EXISTS {maintbl}; CREATE TABLE {maintbl}( date TEXT NOT NULL UNIQUE, has_postel INTEGER DEFAULT 0, has_corr INTEGER DEFAULT 0, has_31hr INTEGER DEFAULT 0, has_79hr INTEGER DEFAULT 0, N_miss INTEGER, N_corr INTEGER, L0_ref REAL, B0_ref REAL, crpix1 REAL, crpix2 REAL, cdelt1 REAL, cdelt2 REAL, crota2 REAL, crln_obs REAL, crlt_obs REAL, dsun_obs REAL, hmi_m_720s TEXT); """) T.append(datetime.now()) logger.info(f'{T[-1]-T[-2]} create table "{maintbl}"') # --- insert rows into main table --- with con: for irow,dat in enumerate(xdays(dstart, dstop, timedelta(days=0.5))): infile_postel = dat.strftime(fmt_postel) infile_corr = dat.strftime(fmt_corr) infile_31hr = dat.strftime(fmt_31hr) infile_79hr = dat.strftime(fmt_79hr) has_postel = os.path.exists(infile_postel) has_corr = os.path.exists(infile_corr) has_31hr = os.path.exists(infile_31hr) has_79hr = os.path.exists(infile_79hr) if has_postel: hdr = fits.getheader(infile_postel) L0_ref = hdr['ref_L0'] B0_ref = hdr['ref_B0'] N_miss = hdr['NMISS'] if 'NMISS' in hdr else None else: L0_ref, B0_ref, N_miss = None, None, None N_corr = fits.getheader(infile_79hr)['N_CORR'] if has_79hr else None if N_corr: assert N_corr.is_integer() N_corr = int(N_corr) con.execute(f""" INSERT INTO {maintbl} (date, has_postel, has_corr, has_31hr, has_79hr, N_miss, N_corr, L0_ref, B0_ref) VALUES (datetime(?),?,?,?,?,?,?,?,?)""", [str(dat), has_postel, has_corr, has_31hr, has_79hr, N_miss, N_corr, L0_ref, B0_ref]) [(nrows,)] = con.execute(f'SELECT count(*) FROM {maintbl}').fetchall() T.append(datetime.now()) logger.info(f'{T[-1]-T[-2]} insert {nrows} rows into table "{maintbl}"') return
[docs] def update_hmi_columns(con, maintbl, timestamp1=None, timestamp2=None, logger=None, T=None): """ Update the hmi.m_720s records for existing rows between timestamp1 and timestamp2 in the main table. This routine does not insert new rows. Parameters ---------- con : sqlite3.connect Database connection object. maintbl : str Name of the main table. timestamp1 : str Start timestamp in format of '%Y-%m-%d %H:%M:%S'. If None, all rows will be updated. timestamp2 : str End timestamp in format of '%Y-%m-%d %H:%M:%S'. If None, all rows will be updated. logger : logging.RootLogger Logger instance. T : list of datetime List of datetimes used to estimate the walltime. """ if T is None: T = [datetime.now()] if logger is None: logger = logging.getLogger() # read basic info from maintbl if timestamp1 is None or timestamp2 is None: timestamps = con.execute(f'SELECT date FROM {maintbl} ORDER BY date').fetchall() else: timestamps = con.execute(f'SELECT date FROM {maintbl} WHERE date >= ? and date <= ? ORDER BY date', [timestamp1, timestamp2]).fetchall() timestamps = np.ravel(timestamps) dat1 = datetime.strptime(timestamps[0], '%Y-%m-%d %H:%M:%S') dat2 = datetime.strptime(timestamps[-1], '%Y-%m-%d %H:%M:%S') nrows = len(timestamps) T.append(datetime.now()) logger.info(f'{T[-1]-T[-2]} read timestamps from table "{maintbl}"') # retrieve keys from NetDRMS ds = 'hmi.m_720s[{}-{}@12h]'.format( dat1.strftime('%Y.%m.%d_%H:%M:%S_TAI'), dat2.strftime('%Y.%m.%d_%H:%M:%S_TAI')) keys = get_key_table(ds, KeyList) if len(keys) != nrows: raise RuntimeError(f'The number of rows with {ds} is {len(keys)} (expected {nrows})') chk = [str(datetime.strptime(s, '%Y.%m.%d_%H:%M:%S_TAI')) for s in keys['t_rec']] assert np.all([chk[i] == timestamps[i] for i in range(nrows)]) keys['date'] = timestamps T.append(datetime.now()) logger.info(f'{T[-1]-T[-2]} fetch keys for {ds}') # retrieve paths from NetDRMS path = get_path_to_seg(ds, segname='magnetogram') T.append(datetime.now()) logger.info(f'{T[-1]-T[-2]} fetch path for {ds}') keys['path'] = path # update hmi columns with con: con.executemany(f""" UPDATE {maintbl} SET crpix1 = ?, crpix2 = ?, cdelt1 = ?, cdelt2 = ?, crota2 = ?, crln_obs = ?, crlt_obs = ?, dsun_obs = ?, hmi_m_720s = ? WHERE date = ? """, keys['crpix1','crpix2','cdelt1','cdelt2','crota2','crln_obs','crlt_obs','dsun_obs','path','date']) T.append(datetime.now()) logger.info(f'{T[-1]-T[-2]} update hmi records in the table "{maintbl}"') return
[docs] def insert_new_rows_for_hmi_records(con, maintbl, dat1, dat2, logger=None, T=None): """ Retrieve new hmi records for the time period from dat1 to dat2 (inclusive) and insert them into maintbl. Parameters ---------- con : sqlite3.connect Database connection object. maintbl : str Name of the main table. dat1 : datetime Start datetime for the time range. dat2 : datetime End datetime for the time range (inclusive). logger : logging.RootLogger Logger instance. T : list of datetime List of datetimes used to estimate the walltime. """ if T is None: T = [datetime.now()] if logger is None: logger = logging.getLogger() # retrieve keys from NetDRMS ds = 'hmi.m_720s[{}-{}@12h]'.format( dat1.strftime('%Y.%m.%d_%H:%M:%S_TAI'), dat2.strftime('%Y.%m.%d_%H:%M:%S_TAI')) keys = get_key_table(ds, KeyList) keys['date'] = [str(datetime.strptime(s, '%Y.%m.%d_%H:%M:%S_TAI')) for s in keys['t_rec']] T.append(datetime.now()) logger.info(f'{T[-1]-T[-2]} fetch keys for {ds}') # retrieve paths from NetDRMS path = get_path_to_seg(ds, segname='magnetogram') T.append(datetime.now()) logger.info(f'{T[-1]-T[-2]} fetch path for {ds}') keys['path'] = path # insert new rows with con: con.executemany(f""" INSERT INTO {maintbl} (date, has_postel, has_corr, has_31hr, has_79hr, crpix1, crpix2, cdelt1, cdelt2, crota2, crln_obs, crlt_obs, dsun_obs, hmi_m_720s) VALUES (datetime(?),0,0,0,0,?,?,?,?,?,?,?,?,?)""", keys['date', 'crpix1', 'crpix2', 'cdelt1', 'cdelt2', 'crota2', 'crln_obs', 'crlt_obs', 'dsun_obs', 'path']) T.append(datetime.now()) logger.info(f'{T[-1]-T[-2]} insert {len(keys)} rows into table "{maintbl}"') return
T = [datetime.now()] # --- argparse --- Annita modified this so that sphinx can run, because Sphinx is not designed to handle command-line parsing, and attempting to do so will cause it to generate incorrect documentation. if __name__ == "__main__": # Parse command-line arguments parser = argparse.ArgumentParser(description=f""" Update the processing status for the table "{MainTable}" in GOSIP_DBFILE. If GOSIP_DBFILE does not exist, construct the tables "{MainTable}" and "{InfoTable}" from scratch. """) parser.add_argument('-l', '--loglevel', nargs='?', type=str, help='logging level: critical, error, warning, info, debug', default=None, # the behavior when -l is not present const='info') # the behavior when -l is present but without an argument parser.add_argument('-D', '--dbfile', type=str, default=None, help=""" Path to the sqlite database. Expect the filename extension to be "db". If not given, use the environment variable GOSIP_DBFILE.""") parser.add_argument('-S', '--starttime', type=str, default=None, help=f""" Start time (included) in format of %%Y-%%m-%%d %%H:%%M:%%S. If not given, use the row right after the last row that has_79hr from table "{MainTable}" in GOSIP_DBFILE.""") parser.add_argument('-E', '--endtime', type=str, default=None, help=f""" End time (included) in format of %%Y-%%m-%%d %%H:%%M:%%S. If not given, use the last row from table "{MainTable}" in GOSIP_DBFILE.""") parser.add_argument('-I', '--insert-new-hmi', action='store_true', help=""" Insert new hmi.m_720s records if available.""") parser.add_argument('-U', '--update-old-hmi', action='store_true', help=""" Update the hmi.m_720s records in existing rows as well.""") parser.add_argument('--fmt_postel', type=str, default=None, help=f""" Path format to Postel images. If not given, use the value from table "{InfoTable}" in GOSIP_DBFILE.""") parser.add_argument('--fmt_corr', type=str, default=None, help=f""" Path format to CORR images. If not given, use the value from table "{InfoTable}" in GOSIP_DBFILE.""") parser.add_argument('--fmt_31hr', type=str, default=None, help=f""" Path format to 31-hr farside images. If not given, use the value from table "{InfoTable}" in GOSIP_DBFILE.""") parser.add_argument('--fmt_79hr', type=str, default=None, help=f""" Path format to 79-hr farside images. If not given, use the value from table "{InfoTable}" in GOSIP_DBFILE.""") args = parser.parse_args() if args.loglevel is None: logging.disable(logging.CRITICAL) else: loglevel = getattr(logging, args.loglevel.upper(), None) if not isinstance(loglevel, int): raise RuntimeError(f'Unknown logging level: {args.loglevel}') logging.basicConfig(format='%(levelname)s %(message)s', level=loglevel, stream=sys.stderr) logger = logging.getLogger() dbfile = args.dbfile fmt_postel = args.fmt_postel fmt_corr = args.fmt_corr fmt_31hr = args.fmt_31hr fmt_79hr = args.fmt_79hr starttime = args.starttime endtime = args.endtime maintbl = MainTable infotbl = InfoTable is_insert_new = args.insert_new_hmi is_update_old = args.update_old_hmi if not dbfile: if 'GOSIP_DBFILE' in os.environ: dbfile = os.environ['GOSIP_DBFILE'] else: raise RuntimeError('GOSIP_DBFILE is not set') logger.info(f'dbfile: {dbfile}') logger.info(f'main table: {maintbl}') logger.info(f'info table: {infotbl}') logger.info(f'insert new hmi: {is_insert_new}') logger.info(f'update old hmi: {is_update_old}') # --- open the database --- if not dbfile.endswith('db'): raise RuntimeError(f'DBFILE filename extension is not "db": {dbfile}') file_exists = os.path.exists(dbfile) sqlite3.register_adapter(np.int64, int) sqlite3.register_adapter(np.int32, int) con = sqlite3.connect(dbfile) if not file_exists: logger.warning(f'DBFILE does not exist: {dbfile}') if not (fmt_postel and fmt_corr and fmt_31hr and fmt_79hr): raise RuntimeError('--fmt_postel, --fmt_corr, --fmt_31hr, and --fmt_79hr must be given') else: logger.info(f'fmt_postel: {fmt_postel}') logger.info(f'fmt_corr: {fmt_corr}') logger.info(f'fmt_31hr: {fmt_31hr}') logger.info(f'fmt_79hr: {fmt_79hr}') dstart = datetime(2010,5,1) dstop = get_last_record('hmi.m_720s') create_info_table(con, infotbl, fmt_postel, fmt_corr, fmt_31hr, fmt_79hr, logger, T) create_main_table(con, maintbl, dstart, dstop, fmt_postel, fmt_corr, fmt_31hr, fmt_79hr, logger, T) update_hmi_columns(con, maintbl, logger=logger, T=T) con.close() T.append(datetime.now()) logger.info(f'{T[-1]-T[-2]} close {dbfile}') logger.info(f'{T[-1]-T[0]} total elapsed time') raise SystemExit # --- table does not exist --- tbls = con.execute(f'SELECT name FROM sqlite_master WHERE type="table"').fetchall() tbls = np.ravel(tbls) if maintbl not in tbls or infotbl not in tbls: logger.warning(f'Table "{maintbl}" and/or "{infotbl}" does not exist') if not (fmt_postel and fmt_corr and fmt_31hr and fmt_79hr): raise RuntimeError('--fmt_postel, --fmt_corr, --fmt_31hr, and --fmt_79hr must be given') else: logger.info(f'fmt_postel: {fmt_postel}') logger.info(f'fmt_corr: {fmt_corr}') logger.info(f'fmt_31hr: {fmt_31hr}') logger.info(f'fmt_79hr: {fmt_79hr}') create_info_table(con, infotbl, fmt_postel, fmt_corr, fmt_31hr, fmt_79hr, logger, T) dstart = datetime(2010,5,1) dstop = get_last_record('hmi.m_720s') create_main_table(con, maintbl, dstart, dstop, fmt_postel, fmt_corr, fmt_31hr, fmt_79hr, logger, T) update_hmi_columns(con, maintbl, logger=logger, T=T) con.close() T.append(datetime.now()) logger.info(f'{T[-1]-T[-2]} close {dbfile}') logger.info(f'{T[-1]-T[0]} total elapsed time') raise SystemExit # --- check the lastest hmi record --- if is_insert_new: [(old_hmi_last,new_hmi_start,)] = con.execute(f'SELECT max(date),datetime(max(date), "+12 hours") FROM {maintbl}').fetchall() old_hmi_last = datetime.strptime(old_hmi_last, '%Y-%m-%d %H:%M:%S') new_hmi_start = datetime.strptime(new_hmi_start, '%Y-%m-%d %H:%M:%S') new_hmi_last = get_last_record('hmi.m_720s') if new_hmi_last >= new_hmi_start: T.append(datetime.now()) logger.warning(f'The latest timestamp in hmi.m_720s {new_hmi_last} is ahead of the one in DBFILE {old_hmi_last}') insert_new_rows_for_hmi_records(con, maintbl, new_hmi_start, new_hmi_last, logger, T) else: assert (new_hmi_last - old_hmi_last) < timedelta(hours=12) logger.info(f'No new rows to insert. DBFILE has the latest record {old_hmi_last} from local NetDRMS') # --- retrieve timestamps from datebase --- if starttime: if endtime: timestamps = con.execute(f'SELECT date FROM {maintbl} WHERE date >= "{starttime}" AND date <= "{endtime}" ORDER BY date').fetchall() else: timestamps = con.execute(f'SELECT date FROM {maintbl} WHERE date >= "{starttime}" ORDER BY date').fetchall() else: [(N_79hr,)] = con.execute(f'SELECT count(*) FROM {maintbl} WHERE has_79hr').fetchall() if N_79hr > 0: [(first_one_that_has_no_79hr,)] = con.execute(f'SELECT datetime(max(date), "+12 hours") FROM {maintbl} WHERE has_79hr').fetchall() else: [(first_one_that_has_no_79hr,)] = con.execute(f'SELECT min(date) FROM {maintbl}').fetchall() if endtime: timestamps = con.execute(f'SELECT date FROM {maintbl} WHERE date >= "{first_one_that_has_no_79hr}" AND date <= "{endtime}" ORDER BY date').fetchall() else: timestamps = con.execute(f'SELECT date FROM {maintbl} WHERE date >= "{first_one_that_has_no_79hr}" ORDER BY date').fetchall() timestamps = np.ravel(timestamps) if len(timestamps) > 0: first = timestamps[0] last = timestamps[-1] else: logger.info('starttime: {starttime}') logger.info('endtime: {endtime}') logger.warning('No timestamps selected') con.close() T.append(datetime.now()) logger.info(f'{T[-1]-T[-2]} close {dbfile}') logger.info(f'{T[-1]-T[0]} total elapsed time') raise SystemExit if starttime: logger.info('Time range to be updated:') else: logger.info('Time range where 79hr farside images are absent:') logger.info(f'first: {first}') logger.info(f'last: {last}') # --- update existing hmi records as well --- if is_update_old: logger.info(f'Update existing hmi.m_720s records in DBFILE as well') update_hmi_columns(con, maintbl, first, last, logger=logger, T=T) # --- retrieve fmt from datebase if not given --- if not fmt_postel: [(fmt_postel,)] = con.execute(f'SELECT val FROM {infotbl} WHERE key="fmt_postel"').fetchall() if not fmt_corr: [(fmt_corr,)] = con.execute(f'SELECT val FROM {infotbl} WHERE key="fmt_corr"').fetchall() if not fmt_31hr: [(fmt_31hr,)] = con.execute(f'SELECT val FROM {infotbl} WHERE key="fmt_31hr"').fetchall() if not fmt_79hr: [(fmt_79hr,)] = con.execute(f'SELECT val FROM {infotbl} WHERE key="fmt_79hr"').fetchall() logger.info(f'fmt_postel: {fmt_postel}') logger.info(f'fmt_corr: {fmt_corr}') logger.info(f'fmt_31hr: {fmt_31hr}') logger.info(f'fmt_79hr: {fmt_79hr}') T.append(datetime.now()) logger.info(f'{T[-1]-T[-2]} read parameters') with con: logger.debug('%-19s %-10s %-8s %-8s %-8s %-6s %-6s %-10s %-10s', 'date', 'has_postel', 'has_corr', 'has_31hr', 'has_79hr', 'N_miss', 'N_corr', 'L0_ref', 'B0_ref') logger.debug(f'{"":-<19s} {"":-<10s} {"":-<8s} {"":-<8s} {"":-<8s} {"":-<6s} {"":-<6s} {"":-<10s} {"":-<10s}') for timestamp in timestamps: dat = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S') infile_postel = dat.strftime(fmt_postel) infile_corr = dat.strftime(fmt_corr) infile_31hr = dat.strftime(fmt_31hr) infile_79hr = dat.strftime(fmt_79hr) has_postel = os.path.exists(infile_postel) has_corr = os.path.exists(infile_corr) has_31hr = os.path.exists(infile_31hr) has_79hr = os.path.exists(infile_79hr) if has_postel: hdr = fits.getheader(infile_postel) L0_ref = hdr['ref_L0'] B0_ref = hdr['ref_B0'] N_miss = hdr['NMISS'] if 'NMISS' in hdr else None else: L0_ref, B0_ref, N_miss = None, None, None N_corr = fits.getheader(infile_79hr)['N_CORR'] if has_79hr else None if N_corr: assert N_corr.is_integer() N_corr = int(N_corr) con.execute(f""" UPDATE {maintbl} SET has_postel = ?, has_corr = ?, has_31hr = ?, has_79hr = ?, N_miss = ?, N_corr = ?, L0_ref = ?, B0_ref = ? WHERE date = ? """, [has_postel, has_corr, has_31hr, has_79hr, N_miss, N_corr, L0_ref, B0_ref, timestamp]) logger.debug('%-19s %-10s %-8s %-8s %-8s %-6s %-6s %-10s %-10s', timestamp, has_postel, has_corr, has_31hr, has_79hr, N_miss, N_corr, L0_ref, B0_ref) T.append(datetime.now()) logger.info(f'{T[-1]-T[-2]} update {timestamps.size} row(s)') con.close() T.append(datetime.now()) logger.info(f'{T[-1]-T[-2]} close {dbfile}') logger.info(f'{T[-1]-T[0]} total elapsed time')