Source code for src.common.calibrate

# =========================================================================== #
# File: calc_pss.py                                                           #
# Author: Pfesesani V. van Zyl                                                #
# =========================================================================== #

# Standard library imports
# --------------------------------------------------------------------------- #
import numpy as np 
import sys
from .miscellaneousFunctions import catch_zeroDivError
# =========================================================================== #

[docs] def calc_pss(flux, Ta, errTa): """ Calculate the Point source sensitivity (PSS) and its error for data with no pointing correction applied. Args: flux: the flux density of the source Ta: the antenna temperature errTa: the error in the antenna temperature Returns: pss: the point source sensitivity errPSS: the error in the point source sensitivity appEff: the apperture effeciency """ try: pss = flux/2.0/Ta errPSS = np.sqrt(errTa**2/Ta**2)*pss appEff = 1380.0/np.pi/(25.9/2.0)**2/pss except ZeroDivisionError: pss = .0 errPSS = .0 appEff = .0 return pss, errPSS, appEff
[docs] def calc_tcorr(Ta, pc, data): """ Calculate the antenna temperature correction for high frequencies. Args: Ta - the on scan antenna temperature pc - the pointing correction data - the dictionary containing all the drift scan parameters Returns: corrTa - the corrected antenna temperature """ # print(data['FRONTEND'],data['BEAMTYPE'],data['OBJECT']) # sys.exit() if data["FRONTEND"] == "01.3S": if data["OBJECT"].upper() == "JUPITER": # Only applying a size correction factor and atmospheric correction to Jupiter # See van Zyl (2023) - in Prep corrTa = Ta * pc * data["ATMOS_ABSORPTION_CORR"] * data["SIZE_CORRECTION_FACTOR"] return corrTa else: # do we also apply a atmospheric correcttion factor directly to the target source ? # find out, for now I'm not applyin it # tests this ASAP corrTa = Ta * pc #* data["ATMOS_ABSORPTION_CORR"] return corrTa else: corrTa = Ta*pc return corrTa
[docs] def test_for_nans(Ta,errTa): """ Test if data has nans Returns: Ta : antenna temperature errTa: error in antenna temperature """ if str(Ta) == "nan" or Ta is None or Ta==np.nan: Ta=0.0 errTa=0.0 else: pass return Ta, errTa
[docs] def calc_pc_pss(scanNum, hpsTa, errHpsTa, hpnTa, errHpnTa, onTa, errOnTa, flux,data): """ Calculate the pss for pointing corrected observations. Args: scanNum: the index of the current scan, see plot_manager.py hpsTa: the half power south antenna temperature errHpsTa: the error in the half power south antenna temperature hpnTa: the half power north antenna temperature errHpnTa: the error in the half power north antenna temperature onTa: the on source antenna temperature errOnTa: the error in the on source antenna temperature flux: the source flux density data: the dictionary containing all the drift scan parameters Returns: pss: the point source sensitivity errPss: the error in the point source sensitivity appEff: the apperture effeciency corrTa: the corrected antenna temperature errCorrTa: the error in the corrected antenna temperature appEff: the apperture effeciency """ # Check if data has nans onTa, errOnTa = test_for_nans(onTa,errOnTa) hpsTa, errHpsTa = test_for_nans(hpsTa,errHpsTa) hpnTa, errHpnTa = test_for_nans(hpnTa,errHpnTa) if onTa != 0.0: #if no hpnTa if((hpnTa == 0) and (hpsTa != 0)): #pc = exp[((ln(hpnTa) - ln(onTa) + ln(2))**2)/4ln(2)] #corrTa = onTa x pc # calculate the pointing and corrected antenna temp and its error pc, der1, der2 = calc_pc_eq(hpsTa, onTa) corrTa = calc_tcorr(scanNum,onTa, pc, data) errCorrTa = np.sqrt((errOnTa**2)*(der2**2) + (errHpsTa**2)*(der1**2)) # Calculate the PSS and appeture effeciency pss = abs(float(flux)/2.0/corrTa) # 0.0 errPss = np.sqrt(abs(errCorrTa/corrTa))*pss appEff = 1380.0/np.pi/(25.9/2.0)**2/pss #print("\n,errCorrTa: %.3f, corrTa: %.3f, PSS: %.3f, errPSS: %.3f\n" % # (errCorrTa, corrTa, pss,errPss)) #if no hpsTa elif((hpnTa != 0) and (hpsTa == 0)): # pc = exp[((ln(onTa) - ln(hpnTa) + ln(2))**2)/4ln(2)] # corrTa = onTa x pc # calculate the pointing and corrected antenna temp and its error pc,der1, der2 = calc_pc_eq(onTa, hpnTa) corrTa = calc_tcorr(scanNum,onTa, pc, data) errCorrTa = np.sqrt((errOnTa**2)*(der1**2) + (errHpnTa**2)*(der2**2)) # Calculate the PSS and apeture effeciency pss = abs(float(flux)/2.0/corrTa) # 0.0 errPss = np.sqrt(abs(errCorrTa/corrTa))*pss appEff = 1380.0/np.pi/(25.9/2.0)**2/pss #print("\n,errCorrTa: %.3f, corrTa: %.3f, PSS: %.3f, errPSS: %.3f\n" % # (errCorrTa, corrTa, pss, errPss)) #if all present elif((hpnTa != 0) and (hpsTa != 0)): # pc = exp[((ln(hpsTa) - ln(hpnTa)**2)/16ln(2)] # corrTa = Ton x pc # calculate the pointing and corrected antenna temp and its error term2 = 16*np.log(2) pc, der1, der2 = calc_pc_eq(hpsTa, hpnTa, term2) corrTa = calc_tcorr(scanNum, onTa, pc, data) errCorrTa = np.sqrt((errHpnTa**2)*(der2**2) + (errHpsTa**2)*(der1**2)) # Calculate the PSS and apeture effeciency pss = abs(float(flux)/2.0/corrTa) errPss = np.sqrt(abs(errCorrTa/corrTa))*pss appEff = 1380.0/np.pi/(25.9/2.0)**2/pss #print("\nTcorerr: %.3f, corrTa: %.3f, PSS: %.3f, errPSS: %.3f\n" % # (errCorrTa, corrTa, pss, errPss)) #if no hpnTa/hpsTa if((hpnTa == 0) and (hpsTa == 0)): pss, errPss, appEff = set_pss_to_zero() corrTa, errCorrTa, pc = set_corr_ta_to_zero() else: # if there's no on scan observation, everything defaults to zero pss, errPss, appEff = set_pss_to_zero() corrTa, errCorrTa, pc = set_corr_ta_to_zero() return pss, errPss, pc, corrTa, errCorrTa, appEff
[docs] def calibrate( hpsTa, errHpsTa, hpnTa, errHpnTa, onTa, errOnTa, data, log): """ Calculate the pointing corrected antenna temperature. Parameters: hpsTa: the half power south antenna temperature errHpsTa: the error in the half power south antenna temperature hpnTa: the half power north antenna temperature errHpnTa: the error in the half power north antenna temperature onTa: the on source antenna temperature errOnTa: the error in the on source antenna temperature data: the dictionary containing all the drift scan parameters Returns: pc: the pointing correction corrTa: the corrected antenna temperature errCorrTa: the error in the corrected antenna temperature """ # # Check if data has nans onTa, errOnTa = test_for_nans(onTa, errOnTa) hpsTa, errHpsTa = test_for_nans(hpsTa, errHpsTa) hpnTa, errHpnTa = test_for_nans(hpnTa, errHpnTa) if onTa != 0.0: #if no hpnTa if((hpnTa == 0) and (hpsTa != 0)): #pc = exp[((ln(hpsTa) - ln(Ton) + ln(2))**2)/4ln(2)] #corrTa = Ton x pc # calculate the pointing and corrected antenna temp and its error pc, der1, der2 = calc_pc_eq(hpsTa, onTa) corrTa = calc_tcorr( onTa, pc, data) errCorrTa = np.sqrt((errOnTa**2)*(der2**2) + (errHpsTa**2)*(der1**2)) #if no hpsTa elif((hpnTa != 0) and (hpsTa == 0)): # pc = exp[((ln(Ton) - ln(hpnTa) + ln(2))**2)/4ln(2)] # corrTa = Ton x pc # calculate the pointing and corrected antenna temp and its error pc, der1, der2 = calc_pc_eq(onTa, hpnTa) corrTa = calc_tcorr(onTa, pc, data) errCorrTa = np.sqrt((errOnTa**2)*(der1**2) + (errHpnTa**2)*(der2**2)) #if all present elif((hpnTa != 0) and (hpsTa != 0)): # pc = exp[((ln(hpsTa) - ln(hpnTa)**2)/16ln(2)] # corrTa = Ton x pc # calculate the pointing and corrected antenna temp and its error term2 = 16*np.log(2) pc, der1, der2 = calc_pc_eq(hpsTa, hpnTa, term2) corrTa = calc_tcorr( onTa, pc, data) errCorrTa = np.sqrt((errHpnTa**2)*(der2**2) + (errHpsTa**2)*(der1**2)) #if no hpnTa/hpsTa if((hpnTa == 0) and (hpsTa == 0)): corrTa, errCorrTa, pc = set_corr_ta_to_zero() else: corrTa, errCorrTa, pc = set_corr_ta_to_zero() #print("\npc: {}, corrTa: {}, Tcorrerr: {}\n".format(pc, corrTa, errCorrTa)) return pc, corrTa, errCorrTa
[docs] def calc_pc_eq(Ta1, Ta2, term2=""): """ Calculate the pointing correction and its derivatives. The pointing equations are given in the method calc_pc_pss() Parameters: Ta1: first temperature Ta2: second temperature term2: the second term of the pointing correction equation as defined in thesis/reference. Returns: pc: the pointing correction der1: first derivative with respect to Ta1 der2: first derivative with respect to Ta2 """ if term2=="": term1 = (np.log(abs(Ta1)) - np.log(abs(Ta2)) - np.log(2))**2 term2 = 4*np.log(2) else: term1 = (np.log(abs(Ta1)) - np.log(abs(Ta2)))**2 term3 = term1/term2 pc = np.exp(term3) # calculate the derivatives der1 = pc * 2.0 * term3 * (1.0/Ta1) der2 = pc * 2.0 * term3 * (-1.0/Ta2) return pc, der1, der2
[docs] def set_pss_to_zero(): """ Set the estimation of the pss and apperture effeciency to zero """ pss, errPss, appEff = .0, .0, .0 return pss, errPss, appEff
[docs] def set_corr_ta_to_zero(): """ Set the estimation of the corrected antenna temp to zero. """ corrTa, corrTaErr, pc = .0, .0, .0 return corrTa, corrTaErr, pc
[docs] def calc_flux(ta,dta,pss,dpss): """Calculate the flux density. """ ta=float(ta) dta=float(dta) pss=float(pss) dpss=float(dpss) if float(ta) != np.nan and float(pss)!=np.nan: # print(ta,pss) flux=ta*pss lta=catch_zeroDivError(ta,dta) lps=catch_zeroDivError(pss,dpss) errta=(lta)**2 errpss=(lps)**2 dflux=flux*np.sqrt(errta+errpss) else: flux=np.nan dflux=np.nan return flux,dflux
[docs] def calc_totFlux(lcp,dlcp,rcp,drcp): ''' Calculate the total flux density of the source''' sumflux=np.sum([lcp,rcp]) n=(np.array([lcp,rcp])>0).sum() fluxtot=(sumflux/n)*2 dfluxtot=(2/n)*np.sqrt(dlcp**2+drcp**2) return fluxtot,dfluxtot
[docs] def calc_dualtotFlux(slcp,dslcp,srcp,dsrcp,bslcp,bdslcp,bsrcp,bdsrcp): ''' Calculate the total flux density of the source''' sumflux=np.sum([slcp,srcp,bslcp,bsrcp]) n=(np.array([slcp,srcp,bslcp,bsrcp])>0).sum() fluxtot=(sumflux/n)*2 dfluxtot=(2/n)*np.sqrt(dslcp**2+dsrcp**2+bdslcp**2+bdsrcp**2) return fluxtot,dfluxtot
[docs] def getpss(pssdf, date): """ Get the PSS of a given bin time-range. pssdf: dataframe containing the pss bins. date: the date used to estimate the required bin range in order to extract the PSS. """ date=float(date) pss=np.nan dpss=np.nan # print(pssdf.columns) for k in range(len(pssdf)): start = float(pssdf['MJD_START'].iloc[k]) end = float(pssdf['MJD_END'].iloc[k]) # print(start,date,end) if date>=start and date<=end: # print(start,date,end) pss=pssdf['PSS'].iloc[k] dpss=pssdf['PSS_STDEV'].iloc[k] return pss,dpss else: pass return np.nan,np.nan