import argparse, logging, os, sys
from datetime import datetime, timedelta
from astropy.io import fits
import numpy as np
import sqlite3
from base.show_info import get_last_record, get_key_table, get_path_to_seg
from base.misc import xdays
MainTable = 'farside'
InfoTable = 'farside_meta'
KeyList = [
('t_rec', str),
('crpix1', float),
('crpix2', float),
('cdelt1', float),
('cdelt2', float),
('crota2', float),
('crln_obs', float),
('crlt_obs', float),
('dsun_obs', float),
]
[docs]
def create_info_table(con, infotbl, fmt_postel, fmt_corr, fmt_31hr, fmt_79hr,
logger=None, T=None):
"""
Thiis function creates the info table that contains the paths to the processed images.
Parameters
----------
con : sqlite3.Connection
The SQLite3 database connection.
infotbl : str
The name of the info table.
fmt_postel : str
Path format for Postel images.
fmt_corr : str
Path format for corrected images.
fmt_31hr : str
Path format for 31-hour images.
fmt_79hr : str
Path format for 79-hour images.
logger : Optional[logging.RootLogger], optional
The logger for logging messages (default is None).
T : datetime, optional
List of datetime, used to estimate walltime (default is None).
Returns
-------
None
"""
if T is None:
T = [datetime.now()]
if logger is None:
logger = logging.getLogger()
# --- create info table ---
with con:
con.executescript(f"""
DROP TABLE IF EXISTS {infotbl};
CREATE TABLE {infotbl}(
key TEXT NOT NULL UNIQUE,
val);
""")
rows = [
('fmt_postel', fmt_postel),
('fmt_corr', fmt_corr),
('fmt_31hr', fmt_31hr),
('fmt_79hr', fmt_79hr),
]
con.executemany(f'INSERT INTO {infotbl} VALUES (?,?)', rows)
T.append(datetime.now())
logger.info(f'{T[-1]-T[-2]} create table "{infotbl}"')
return
[docs]
def create_main_table(con, maintbl, dstart, dstop, fmt_postel, fmt_corr,
fmt_31hr, fmt_79hr, logger=None, T=None):
"""
This function creates the main table that contains the list of timestamps and processing
status, while the corresponding hmi.m_720s columns are filled with None.
Parameters
----------
con : sqlite3.connect
Database connection object.
maintbl : str
Name of the main table.
dstart : datetime
Start datetime.
dstop : datetime
Stop datetime.
fmt_postel : str
Image path format for POSTEL images.
fmt_corr : str
Image path format for corrected images.
fmt_31hr : str
Image path format for 31-hour images.
fmt_79hr : str
Image path format for 79-hour images.
logger : logging.RootLogger
Logger instance.
T : list of datetime
List of timestamps used to estimate walltime.
"""
if T is None:
T = [datetime.now()]
if logger is None:
logger = logging.getLogger()
# --- create main table ---
with con:
con.executescript(f"""
DROP TABLE IF EXISTS {maintbl};
CREATE TABLE {maintbl}(
date TEXT NOT NULL UNIQUE,
has_postel INTEGER DEFAULT 0,
has_corr INTEGER DEFAULT 0,
has_31hr INTEGER DEFAULT 0,
has_79hr INTEGER DEFAULT 0,
N_miss INTEGER,
N_corr INTEGER,
L0_ref REAL,
B0_ref REAL,
crpix1 REAL,
crpix2 REAL,
cdelt1 REAL,
cdelt2 REAL,
crota2 REAL,
crln_obs REAL,
crlt_obs REAL,
dsun_obs REAL,
hmi_m_720s TEXT);
""")
T.append(datetime.now())
logger.info(f'{T[-1]-T[-2]} create table "{maintbl}"')
# --- insert rows into main table ---
with con:
for irow,dat in enumerate(xdays(dstart, dstop, timedelta(days=0.5))):
infile_postel = dat.strftime(fmt_postel)
infile_corr = dat.strftime(fmt_corr)
infile_31hr = dat.strftime(fmt_31hr)
infile_79hr = dat.strftime(fmt_79hr)
has_postel = os.path.exists(infile_postel)
has_corr = os.path.exists(infile_corr)
has_31hr = os.path.exists(infile_31hr)
has_79hr = os.path.exists(infile_79hr)
if has_postel:
hdr = fits.getheader(infile_postel)
L0_ref = hdr['ref_L0']
B0_ref = hdr['ref_B0']
N_miss = hdr['NMISS'] if 'NMISS' in hdr else None
else:
L0_ref, B0_ref, N_miss = None, None, None
N_corr = fits.getheader(infile_79hr)['N_CORR'] if has_79hr else None
if N_corr:
assert N_corr.is_integer()
N_corr = int(N_corr)
con.execute(f"""
INSERT INTO {maintbl} (date, has_postel, has_corr, has_31hr,
has_79hr, N_miss, N_corr, L0_ref, B0_ref)
VALUES (datetime(?),?,?,?,?,?,?,?,?)""",
[str(dat), has_postel, has_corr, has_31hr, has_79hr,
N_miss, N_corr, L0_ref, B0_ref])
[(nrows,)] = con.execute(f'SELECT count(*) FROM {maintbl}').fetchall()
T.append(datetime.now())
logger.info(f'{T[-1]-T[-2]} insert {nrows} rows into table "{maintbl}"')
return
[docs]
def update_hmi_columns(con, maintbl, timestamp1=None, timestamp2=None, logger=None, T=None):
"""
Update the hmi.m_720s records for existing rows between timestamp1 and
timestamp2 in the main table. This routine does not insert new rows.
Parameters
----------
con : sqlite3.connect
Database connection object.
maintbl : str
Name of the main table.
timestamp1 : str
Start timestamp in format of '%Y-%m-%d %H:%M:%S'. If None, all rows will be updated.
timestamp2 : str
End timestamp in format of '%Y-%m-%d %H:%M:%S'. If None, all rows will be updated.
logger : logging.RootLogger
Logger instance.
T : list of datetime
List of datetimes used to estimate the walltime.
"""
if T is None:
T = [datetime.now()]
if logger is None:
logger = logging.getLogger()
# read basic info from maintbl
if timestamp1 is None or timestamp2 is None:
timestamps = con.execute(f'SELECT date FROM {maintbl} ORDER BY date').fetchall()
else:
timestamps = con.execute(f'SELECT date FROM {maintbl} WHERE date >= ? and date <= ? ORDER BY date', [timestamp1, timestamp2]).fetchall()
timestamps = np.ravel(timestamps)
dat1 = datetime.strptime(timestamps[0], '%Y-%m-%d %H:%M:%S')
dat2 = datetime.strptime(timestamps[-1], '%Y-%m-%d %H:%M:%S')
nrows = len(timestamps)
T.append(datetime.now())
logger.info(f'{T[-1]-T[-2]} read timestamps from table "{maintbl}"')
# retrieve keys from NetDRMS
ds = 'hmi.m_720s[{}-{}@12h]'.format(
dat1.strftime('%Y.%m.%d_%H:%M:%S_TAI'),
dat2.strftime('%Y.%m.%d_%H:%M:%S_TAI'))
keys = get_key_table(ds, KeyList)
if len(keys) != nrows:
raise RuntimeError(f'The number of rows with {ds} is {len(keys)} (expected {nrows})')
chk = [str(datetime.strptime(s, '%Y.%m.%d_%H:%M:%S_TAI')) for s in keys['t_rec']]
assert np.all([chk[i] == timestamps[i] for i in range(nrows)])
keys['date'] = timestamps
T.append(datetime.now())
logger.info(f'{T[-1]-T[-2]} fetch keys for {ds}')
# retrieve paths from NetDRMS
path = get_path_to_seg(ds, segname='magnetogram')
T.append(datetime.now())
logger.info(f'{T[-1]-T[-2]} fetch path for {ds}')
keys['path'] = path
# update hmi columns
with con:
con.executemany(f"""
UPDATE {maintbl}
SET crpix1 = ?,
crpix2 = ?,
cdelt1 = ?,
cdelt2 = ?,
crota2 = ?,
crln_obs = ?,
crlt_obs = ?,
dsun_obs = ?,
hmi_m_720s = ?
WHERE date = ?
""", keys['crpix1','crpix2','cdelt1','cdelt2','crota2','crln_obs','crlt_obs','dsun_obs','path','date'])
T.append(datetime.now())
logger.info(f'{T[-1]-T[-2]} update hmi records in the table "{maintbl}"')
return
[docs]
def insert_new_rows_for_hmi_records(con, maintbl, dat1, dat2, logger=None, T=None):
"""
Retrieve new hmi records for the time period from dat1 to dat2 (inclusive)
and insert them into maintbl.
Parameters
----------
con : sqlite3.connect
Database connection object.
maintbl : str
Name of the main table.
dat1 : datetime
Start datetime for the time range.
dat2 : datetime
End datetime for the time range (inclusive).
logger : logging.RootLogger
Logger instance.
T : list of datetime
List of datetimes used to estimate the walltime.
"""
if T is None:
T = [datetime.now()]
if logger is None:
logger = logging.getLogger()
# retrieve keys from NetDRMS
ds = 'hmi.m_720s[{}-{}@12h]'.format(
dat1.strftime('%Y.%m.%d_%H:%M:%S_TAI'),
dat2.strftime('%Y.%m.%d_%H:%M:%S_TAI'))
keys = get_key_table(ds, KeyList)
keys['date'] = [str(datetime.strptime(s, '%Y.%m.%d_%H:%M:%S_TAI')) for s in keys['t_rec']]
T.append(datetime.now())
logger.info(f'{T[-1]-T[-2]} fetch keys for {ds}')
# retrieve paths from NetDRMS
path = get_path_to_seg(ds, segname='magnetogram')
T.append(datetime.now())
logger.info(f'{T[-1]-T[-2]} fetch path for {ds}')
keys['path'] = path
# insert new rows
with con:
con.executemany(f"""
INSERT INTO {maintbl} (date, has_postel, has_corr, has_31hr, has_79hr,
crpix1, crpix2, cdelt1, cdelt2, crota2,
crln_obs, crlt_obs, dsun_obs, hmi_m_720s)
VALUES (datetime(?),0,0,0,0,?,?,?,?,?,?,?,?,?)""",
keys['date', 'crpix1', 'crpix2', 'cdelt1', 'cdelt2', 'crota2',
'crln_obs', 'crlt_obs', 'dsun_obs', 'path'])
T.append(datetime.now())
logger.info(f'{T[-1]-T[-2]} insert {len(keys)} rows into table "{maintbl}"')
return
T = [datetime.now()]
# --- argparse --- Annita modified this so that sphinx can run, because Sphinx is not designed to handle command-line parsing, and attempting to do so will cause it to generate incorrect documentation.
if __name__ == "__main__":
# Parse command-line arguments
parser = argparse.ArgumentParser(description=f"""
Update the processing status for the table "{MainTable}" in
GOSIP_DBFILE. If GOSIP_DBFILE does not exist, construct the tables
"{MainTable}" and "{InfoTable}" from scratch.
""")
parser.add_argument('-l', '--loglevel', nargs='?', type=str,
help='logging level: critical, error, warning, info, debug',
default=None, # the behavior when -l is not present
const='info') # the behavior when -l is present but without an argument
parser.add_argument('-D', '--dbfile', type=str, default=None, help="""
Path to the sqlite database. Expect the filename extension to be "db".
If not given, use the environment variable GOSIP_DBFILE.""")
parser.add_argument('-S', '--starttime', type=str, default=None, help=f"""
Start time (included) in format of %%Y-%%m-%%d %%H:%%M:%%S. If not
given, use the row right after the last row that has_79hr from table
"{MainTable}" in GOSIP_DBFILE.""")
parser.add_argument('-E', '--endtime', type=str, default=None, help=f"""
End time (included) in format of %%Y-%%m-%%d %%H:%%M:%%S. If not given,
use the last row from table "{MainTable}" in GOSIP_DBFILE.""")
parser.add_argument('-I', '--insert-new-hmi', action='store_true', help="""
Insert new hmi.m_720s records if available.""")
parser.add_argument('-U', '--update-old-hmi', action='store_true', help="""
Update the hmi.m_720s records in existing rows as well.""")
parser.add_argument('--fmt_postel', type=str, default=None, help=f"""
Path format to Postel images. If not given, use the value from
table "{InfoTable}" in GOSIP_DBFILE.""")
parser.add_argument('--fmt_corr', type=str, default=None, help=f"""
Path format to CORR images. If not given, use the value from
table "{InfoTable}" in GOSIP_DBFILE.""")
parser.add_argument('--fmt_31hr', type=str, default=None, help=f"""
Path format to 31-hr farside images. If not given, use the value from
table "{InfoTable}" in GOSIP_DBFILE.""")
parser.add_argument('--fmt_79hr', type=str, default=None, help=f"""
Path format to 79-hr farside images. If not given, use the value from
table "{InfoTable}" in GOSIP_DBFILE.""")
args = parser.parse_args()
if args.loglevel is None:
logging.disable(logging.CRITICAL)
else:
loglevel = getattr(logging, args.loglevel.upper(), None)
if not isinstance(loglevel, int):
raise RuntimeError(f'Unknown logging level: {args.loglevel}')
logging.basicConfig(format='%(levelname)s %(message)s',
level=loglevel, stream=sys.stderr)
logger = logging.getLogger()
dbfile = args.dbfile
fmt_postel = args.fmt_postel
fmt_corr = args.fmt_corr
fmt_31hr = args.fmt_31hr
fmt_79hr = args.fmt_79hr
starttime = args.starttime
endtime = args.endtime
maintbl = MainTable
infotbl = InfoTable
is_insert_new = args.insert_new_hmi
is_update_old = args.update_old_hmi
if not dbfile:
if 'GOSIP_DBFILE' in os.environ:
dbfile = os.environ['GOSIP_DBFILE']
else:
raise RuntimeError('GOSIP_DBFILE is not set')
logger.info(f'dbfile: {dbfile}')
logger.info(f'main table: {maintbl}')
logger.info(f'info table: {infotbl}')
logger.info(f'insert new hmi: {is_insert_new}')
logger.info(f'update old hmi: {is_update_old}')
# --- open the database ---
if not dbfile.endswith('db'):
raise RuntimeError(f'DBFILE filename extension is not "db": {dbfile}')
file_exists = os.path.exists(dbfile)
sqlite3.register_adapter(np.int64, int)
sqlite3.register_adapter(np.int32, int)
con = sqlite3.connect(dbfile)
if not file_exists:
logger.warning(f'DBFILE does not exist: {dbfile}')
if not (fmt_postel and fmt_corr and fmt_31hr and fmt_79hr):
raise RuntimeError('--fmt_postel, --fmt_corr, --fmt_31hr, and --fmt_79hr must be given')
else:
logger.info(f'fmt_postel: {fmt_postel}')
logger.info(f'fmt_corr: {fmt_corr}')
logger.info(f'fmt_31hr: {fmt_31hr}')
logger.info(f'fmt_79hr: {fmt_79hr}')
dstart = datetime(2010,5,1)
dstop = get_last_record('hmi.m_720s')
create_info_table(con, infotbl, fmt_postel, fmt_corr, fmt_31hr, fmt_79hr,
logger, T)
create_main_table(con, maintbl, dstart, dstop, fmt_postel, fmt_corr,
fmt_31hr, fmt_79hr, logger, T)
update_hmi_columns(con, maintbl, logger=logger, T=T)
con.close()
T.append(datetime.now())
logger.info(f'{T[-1]-T[-2]} close {dbfile}')
logger.info(f'{T[-1]-T[0]} total elapsed time')
raise SystemExit
# --- table does not exist ---
tbls = con.execute(f'SELECT name FROM sqlite_master WHERE type="table"').fetchall()
tbls = np.ravel(tbls)
if maintbl not in tbls or infotbl not in tbls:
logger.warning(f'Table "{maintbl}" and/or "{infotbl}" does not exist')
if not (fmt_postel and fmt_corr and fmt_31hr and fmt_79hr):
raise RuntimeError('--fmt_postel, --fmt_corr, --fmt_31hr, and --fmt_79hr must be given')
else:
logger.info(f'fmt_postel: {fmt_postel}')
logger.info(f'fmt_corr: {fmt_corr}')
logger.info(f'fmt_31hr: {fmt_31hr}')
logger.info(f'fmt_79hr: {fmt_79hr}')
create_info_table(con, infotbl, fmt_postel, fmt_corr, fmt_31hr, fmt_79hr,
logger, T)
dstart = datetime(2010,5,1)
dstop = get_last_record('hmi.m_720s')
create_main_table(con, maintbl, dstart, dstop, fmt_postel, fmt_corr,
fmt_31hr, fmt_79hr, logger, T)
update_hmi_columns(con, maintbl, logger=logger, T=T)
con.close()
T.append(datetime.now())
logger.info(f'{T[-1]-T[-2]} close {dbfile}')
logger.info(f'{T[-1]-T[0]} total elapsed time')
raise SystemExit
# --- check the lastest hmi record ---
if is_insert_new:
[(old_hmi_last,new_hmi_start,)] = con.execute(f'SELECT max(date),datetime(max(date), "+12 hours") FROM {maintbl}').fetchall()
old_hmi_last = datetime.strptime(old_hmi_last, '%Y-%m-%d %H:%M:%S')
new_hmi_start = datetime.strptime(new_hmi_start, '%Y-%m-%d %H:%M:%S')
new_hmi_last = get_last_record('hmi.m_720s')
if new_hmi_last >= new_hmi_start:
T.append(datetime.now())
logger.warning(f'The latest timestamp in hmi.m_720s {new_hmi_last} is ahead of the one in DBFILE {old_hmi_last}')
insert_new_rows_for_hmi_records(con, maintbl, new_hmi_start, new_hmi_last,
logger, T)
else:
assert (new_hmi_last - old_hmi_last) < timedelta(hours=12)
logger.info(f'No new rows to insert. DBFILE has the latest record {old_hmi_last} from local NetDRMS')
# --- retrieve timestamps from datebase ---
if starttime:
if endtime:
timestamps = con.execute(f'SELECT date FROM {maintbl} WHERE date >= "{starttime}" AND date <= "{endtime}" ORDER BY date').fetchall()
else:
timestamps = con.execute(f'SELECT date FROM {maintbl} WHERE date >= "{starttime}" ORDER BY date').fetchall()
else:
[(N_79hr,)] = con.execute(f'SELECT count(*) FROM {maintbl} WHERE has_79hr').fetchall()
if N_79hr > 0:
[(first_one_that_has_no_79hr,)] = con.execute(f'SELECT datetime(max(date), "+12 hours") FROM {maintbl} WHERE has_79hr').fetchall()
else:
[(first_one_that_has_no_79hr,)] = con.execute(f'SELECT min(date) FROM {maintbl}').fetchall()
if endtime:
timestamps = con.execute(f'SELECT date FROM {maintbl} WHERE date >= "{first_one_that_has_no_79hr}" AND date <= "{endtime}" ORDER BY date').fetchall()
else:
timestamps = con.execute(f'SELECT date FROM {maintbl} WHERE date >= "{first_one_that_has_no_79hr}" ORDER BY date').fetchall()
timestamps = np.ravel(timestamps)
if len(timestamps) > 0:
first = timestamps[0]
last = timestamps[-1]
else:
logger.info('starttime: {starttime}')
logger.info('endtime: {endtime}')
logger.warning('No timestamps selected')
con.close()
T.append(datetime.now())
logger.info(f'{T[-1]-T[-2]} close {dbfile}')
logger.info(f'{T[-1]-T[0]} total elapsed time')
raise SystemExit
if starttime:
logger.info('Time range to be updated:')
else:
logger.info('Time range where 79hr farside images are absent:')
logger.info(f'first: {first}')
logger.info(f'last: {last}')
# --- update existing hmi records as well ---
if is_update_old:
logger.info(f'Update existing hmi.m_720s records in DBFILE as well')
update_hmi_columns(con, maintbl, first, last, logger=logger, T=T)
# --- retrieve fmt from datebase if not given ---
if not fmt_postel:
[(fmt_postel,)] = con.execute(f'SELECT val FROM {infotbl} WHERE key="fmt_postel"').fetchall()
if not fmt_corr:
[(fmt_corr,)] = con.execute(f'SELECT val FROM {infotbl} WHERE key="fmt_corr"').fetchall()
if not fmt_31hr:
[(fmt_31hr,)] = con.execute(f'SELECT val FROM {infotbl} WHERE key="fmt_31hr"').fetchall()
if not fmt_79hr:
[(fmt_79hr,)] = con.execute(f'SELECT val FROM {infotbl} WHERE key="fmt_79hr"').fetchall()
logger.info(f'fmt_postel: {fmt_postel}')
logger.info(f'fmt_corr: {fmt_corr}')
logger.info(f'fmt_31hr: {fmt_31hr}')
logger.info(f'fmt_79hr: {fmt_79hr}')
T.append(datetime.now())
logger.info(f'{T[-1]-T[-2]} read parameters')
with con:
logger.debug('%-19s %-10s %-8s %-8s %-8s %-6s %-6s %-10s %-10s',
'date', 'has_postel', 'has_corr', 'has_31hr', 'has_79hr',
'N_miss', 'N_corr', 'L0_ref', 'B0_ref')
logger.debug(f'{"":-<19s} {"":-<10s} {"":-<8s} {"":-<8s} {"":-<8s} {"":-<6s} {"":-<6s} {"":-<10s} {"":-<10s}')
for timestamp in timestamps:
dat = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')
infile_postel = dat.strftime(fmt_postel)
infile_corr = dat.strftime(fmt_corr)
infile_31hr = dat.strftime(fmt_31hr)
infile_79hr = dat.strftime(fmt_79hr)
has_postel = os.path.exists(infile_postel)
has_corr = os.path.exists(infile_corr)
has_31hr = os.path.exists(infile_31hr)
has_79hr = os.path.exists(infile_79hr)
if has_postel:
hdr = fits.getheader(infile_postel)
L0_ref = hdr['ref_L0']
B0_ref = hdr['ref_B0']
N_miss = hdr['NMISS'] if 'NMISS' in hdr else None
else:
L0_ref, B0_ref, N_miss = None, None, None
N_corr = fits.getheader(infile_79hr)['N_CORR'] if has_79hr else None
if N_corr:
assert N_corr.is_integer()
N_corr = int(N_corr)
con.execute(f"""
UPDATE {maintbl}
SET has_postel = ?,
has_corr = ?,
has_31hr = ?,
has_79hr = ?,
N_miss = ?,
N_corr = ?,
L0_ref = ?,
B0_ref = ?
WHERE date = ?
""", [has_postel, has_corr, has_31hr, has_79hr,
N_miss, N_corr, L0_ref, B0_ref, timestamp])
logger.debug('%-19s %-10s %-8s %-8s %-8s %-6s %-6s %-10s %-10s',
timestamp, has_postel, has_corr, has_31hr, has_79hr, N_miss, N_corr, L0_ref, B0_ref)
T.append(datetime.now())
logger.info(f'{T[-1]-T[-2]} update {timestamps.size} row(s)')
con.close()
T.append(datetime.now())
logger.info(f'{T[-1]-T[-2]} close {dbfile}')
logger.info(f'{T[-1]-T[0]} total elapsed time')