from .msgConfiguration import msg_wrapper
from .plotting import *
from scipy import interpolate
from sklearn.metrics import mean_squared_error
import numpy as np
# import matplotlib.pyplot as pl
from scipy.optimize import curve_fit
import sys
import bisect
# import warnings
# def fxn():
# warnings.warn("deprecated", DeprecationWarning)
# with warnings.catch_warnings(action="ignore"):
# fxn()
[docs]
def clean_rfi(x, y, log=""):
"""
Clean the RFI in the data using iterative rms cuts.
Parameters:
x (array): Array of the independent variable
y (array): Array of the dependent variable
log (object): file logging object
returns:
finX (array): the data representing the x-axis after the removal of rfi data
finY (array):the data representing the y-axis after the removal of rfi data
rmsBeforeClean (int): the rms before removal of rfi data
rmsAfterClean (int): the rms after removal of rfi data
finspl (array): the spline of the cleaned data
pointsDeleted (int): number of points deleted when cleaning the data
"""
msg_wrapper("debug", log.debug, "Cleaning the data of RFI")
# spline the data
splined_data = spline(x, y,log=log)
scanLen = len(x)
resRFI, rmsBeforeClean, rmsAfterClean, finX, finY, finRes, finMaxSpl, finspl, pointsDeleted = clean_data(
splined_data, x, y, scanLen,log)
msg_wrapper("debug", log.debug, "Splined RMS before: after cleaning -> {:.3f}: {:.3f} \n".format
(rmsBeforeClean, rmsAfterClean))
return finX, finY, rmsBeforeClean, rmsAfterClean, finspl, pointsDeleted
[docs]
def clean_data(spl, x, y, scanLen, log=""):
"""
Clean the data using iterative fitting.
Args:
spl : 1d array
the splined data
x : 1d array
data representing the x-axis
y : 1d array
data representing the y-axis
scanLen : int
length of the drift scan array
log : object
file logging object
Returns:
resRFI: 1d array
the residual before the rfi has been removed
rmsBeforeClean: int
the rms before removing rfi data
rmsAfterClean: int
the rms after removal of rfi data
finX: 1d array
the data representing the x-axis after the removal of rfi data
finY: 1d array
the data representing the y-axis after the removal of rfi data
finRes: 1d array
the residual of the cleaned data after the rfi has been removed
finMaxSpl: int
the maximum of the spline of the cleaned data
finspl: 1d array
the spline of the cleaned data
pointsDeleted: int
number of points deleted when cleaning the data
"""
msg_wrapper("debug", log.debug, "Iterative cleaning of RFI")
# calculate the residual and clean the data
resRFI, rmsBeforeClean = calc_residual(spl, y,log)
finX, finY, rmsAfterClean, finRes, finMaxSpl, finspl, pointsDeleted = clean_data_iterative_fitting(
x, y, scanLen, resRFI, rmsBeforeClean,log)
return resRFI, rmsBeforeClean, rmsAfterClean, finX, finY, finRes, finMaxSpl, finspl, pointsDeleted
[docs]
def calc_residual(model, data, log=""):
"""
Calculate the residual and rms between the model and the data.
Parameters:
model (array): 1D array containing the model data
data (array): 1D array containing the raw data
log (object): file logging object
Returns
-------
res: 1d array
the residual
rms: int
the rms value
"""
res = np.array(model - data)
rms = np.sqrt(mean_squared_error(data, model))
return res, rms
[docs]
def clean_data_iterative_fitting(x, y, scanLen, res, rms, log="",x2=""):
''' Find the best fit to the data by iteratively eliminating data points
that fall beyond an established cut-off limit.
Parameters
----------
x : 1d array
data representing the x-axis
y : 1d array
data representing the y-axis
scanLen : int
length of the drift scan array
res: 1d array
the residual
rms: int
the rms value
log : object
file logging object
x2 : 1d array
filenames
Returns
-------
finalX: 1d array
the data representing the x-axis after the removal of rfi data
finalY: 1d array
the data representing the y-axis after the removal of rfi data
finalRms: int
the rms after removal of rfi data
finRes: 1d array
the residual of the cleaned data after the rfi has been removed
finMaxSpl: int
the maximum of the spline of the cleaned data
finalSplinedData: 1d array
the spline of the cleaned data
pointsDeleted: int
number of points deleted when cleaning the data
'''
#msg_wrapper("debug", log.debug, "Performing RFI cuts on data")
# ToDo: Make this more effecient
# set initial values
smallestY = y
smallestX = x
# set final value parameters
finalNames=[]
finalX = []
finalY = []
finalRms = []
finalRes = []
finalMaxSpl = []
finalSplinedData = []
smallest = True
loop = 0
while smallest:
#While you don't have the smallest rms, process data
loop = loop+1
#print('loop: ',loop)
#Remove spikes
if len(x2)==0:
newX, newY = _remove_RFI(smallestX, smallestY, res, rms)
else:
newX, newY ,newNames= _remove_RFI(smallestX, smallestY, res, rms,log,x2)
finalNames=newNames
newX = np.array(newX)
newY = np.array(newY)
#spline the data if you found RFI
splineData2 = spline(newX, newY,log=log)
maxSplineData2 = max(splineData2)
res2, rms2 = calc_residual(splineData2, newY)
if rms2 < rms: # get better values
rms = rms2
res = res2
smallestX = newX
smallestY = newY
finalX = newX
finalY = newY
finalRms = rms2
finalRes = res2
finalSplinedData = splineData2
finalMaxSpl = maxSplineData2
if len(x2)!=0:
finalNames=newNames
smallest = True
else:
smallest = False
# If the rms cut only looped once, values dont change
if loop == 1:
# If you already have good data, keep values as they are
finalX = x
finalY = y
finalRms = rms
finalRes = res
#spline the data
finalSplinedData = spline(x, y,log=log)
finalMaxSpl = max(finalSplinedData)
if len(x2)==0:
#print('p')
finalNames=x2
pointsDeleted = scanLen-len(finalY)
if len(x2)==0:
return finalX, finalY, finalRms, finalRes, finalMaxSpl, finalSplinedData, pointsDeleted
else:
print(f'\n{abs(len(x)-len(finalNames))} entries removed, after {loop} iterations\n{len(finalNames)} remaining\n')
return finalX, finalY, finalRms, finalRes, finalMaxSpl, finalSplinedData, pointsDeleted, finalNames
def _remove_RFI(x, y, res, rms, log="",x2=""):
'''
Removes data points with a residual cutt-off
more or less than 2.7*rms
Parameters
----------
x : 1d array
data representing the x-axis
y : 1d array
data representing the y-axis
res: 1d array
the residual
rms: int
the rms value
log : object
file logging object
Returns
-------
cleanX: 1d array
array of data representing the x-axis
cleanY: 1d array
array of data representing the cleaned y-axis data
'''
scanLen = len(x)
cleanX, cleanY, names = ([] for i in range(3)) # create new lists
cut=3#2.7
if len(x2)==0:
for i in range(scanLen):
if(abs(res[i]) > cut * rms): # 2.5, 2.7 3.0
pass
else:
# Keep points that lie within the cut_off
cleanX.append(x[i])
cleanY.append(y[i])
return cleanX, cleanY
else:
#print('n')
for i in range(scanLen):
if(abs(res[i]) > cut* rms): # 2.5, 2.7 3.0
pass
else:
# Keep points that lie within the cut_off
cleanX.append(x[i])
cleanY.append(y[i])
names.append(x2[i])
#print('len names: ',len(names))
return cleanX, cleanY, names
[docs]
def gauss_lin(x, *p):
"""
Generate a gaussian plus first-order polynomial to fit the drift
scan beam. Note that the width of the Gaussian is hard-coded
to the half-power beamwidth.
Parameters
----------
x : 1D array
1D array of data representing the x-axis
p : tuple
tuple of gaussian parameters
Returns
-------
gaussFit: 1d array
array of data representing the gaussian fit
"""
amp, mu, hpbw, a, c = p
sigma = hpbw/(2*np.sqrt(2*np.log(2)))
return amp*np.exp(-(x-mu)**2/(2.*sigma**2)) + a*x + c
[docs]
def spline(x, y, anchor_points=9, order=3,log=""):
'''
Given a set of data points (x,y) determine a smooth spline
approximation of degree k on the interval x[0] <= x <= x[n]
Args:
x (array): 1D array of data representing the x-axis
y (array): 1D array of data representing the y-axis
anchor_points (int): the number of anchor points in the data
order (int): polynomial order to fit, preferrably a cubic spline
Return:
spline_fit (array): Spline of the data
'''
try:
msg_wrapper("debug", log.debug, "Spline the data to get estimate of underlying signal")
except:
pass
scanLen = len(x)
if scanLen <= anchor_points*2:
print("Too few points to interpolate, consider entering lower knot points")
print("spline not set")
spline_fit=[]
else:
#anchor_points=7
anchor_points_intervals = scanLen/anchor_points # intervals where anchor points will be placed
anchor_points_pos = [] # list to hold positions where anchor_points will be placed olong the array
anchor_points_counter = 0 # position counter or locator
# create a list of the positions where the anchor_points will be located
#anchor_points=10
for i in range(anchor_points-1):
anchor_points_counter = anchor_points_counter + anchor_points_intervals
anchor_points_pos.append(int(anchor_points_counter))
# interpolate the data
# create linearly spaced data points
x1 = np.linspace(1, scanLen, scanLen)
x2 = np.array((anchor_points_pos), int) # create array of anchor_points positions
try:
tck = interpolate.splrep(x1, y, k=order, task=-1,
t=x2) # interpolate, k=5 is max
spline_fit = interpolate.splev(x1, tck, der=0)
except:
print("Failed to interpolate, Error on input data,too few points, min required = 9")
spline_fit=y
return spline_fit#,anchor_points_pos
[docs]
def gauss(x, *p):
"""
Gaussian for fitting the beam
"""
amp, mu, hpbw = p
sigma = hpbw/(2*np.sqrt(2*np.log(2))) #a bit messy but not sure how to pass a constant through the curve fitting
return amp*np.exp(-(x-mu)**2/(2.*sigma**2))
[docs]
def test_gauss_fit(x,y,p0,log=""):
"""
Fit the data using a gaussian
Arguments:
p0: initial fit guess
"""
# fit initial guess parameters to data and get best fit parameters
# returns best fit coeffecients and errors
# Curve fitting is a type of optimization that finds an optimal set
# of parameters for a defined function that best fits a given set of observations.
try:
coeff, covarMatrix = curve_fit(gauss_lin, x,y, p0)
fit = gauss_lin(x, *coeff)
try:
msg_wrapper("info", log.info, f"Passed gaussian fit test, Max peak = {max(fit):.3f} [K]")
except:
pass
return coeff,fit,""
except Exception as e:
print(e)
try:
msg_wrapper("warning", log.warning, "gaussian curve_fit algorithm failed")
except:
pass
flag=3
return [],[],flag
[docs]
def test_position_validity(localMaxPositions,localMinPositions,maxPoints):
"""
Test the position validity of the local min/max positions. The
local minimum positions cannot fall within the range of the
local maximum positions.
Args:
localMaxPositions (list): list of local max positions
localMinPositions (list): list of local min positions
p (int): maximum number of permissable points.
Returns:
pointsToDelete: index list of positions to delete
"""
pointsToDelete = []
halfMaxPoints=int(maxPoints/2)
for i in range(len(localMaxPositions)):
# create list of max positions bounded by the condition
window=np.arange(localMaxPositions[i]-halfMaxPoints,localMaxPositions[i]+halfMaxPoints,1)
for j in range(len(localMinPositions)):
if localMinPositions[j] in window:
print(j, localMinPositions[j] , i, localMaxPositions[i])#, window)
pointsToDelete.append(localMinPositions[j] )
return pointsToDelete
[docs]
def locate_baseline_blocks_auto(x,y,peakCenterGuess, hfnbw,log,saveLoc):
"""
Find the locations to fit a baseline automatically.
These locations are found/determined by fitting a
spline to the data and using the
locations of the local minimum as baseline regions.
Parameters:
peakCenterGuess (float): value of x at peak center in x array
hfnbw (float): half the first null beam width
log (object): loffing object
"""
# setup parameters
msg = "" # message to write on plot
flag = 0 # error flag, default to no error detected
# generate a spline to help locate where to best fit our baseline model.
yspl = spline(x, y,log=log)
# find location of maximum peak, assumed to be at center
try:
peakPosition = (np.where(x >= peakCenterGuess)[0])[0]
peakSpline = np.where(yspl == max(yspl))[0]
except Exception:
msg = "Failed to determine center peak location"
msg_wrapper("warning",log.warning,msg)
flag = 22
return [], [], [], 0, flag, msg, 0, 0, 0, 0
# LOCATE LOCAL MINUMUM/MAXIMUM POSITIONS
# these values are used to figure out where to
# select our baseline points/locations for
msg_wrapper("debug",log.debug,'Locate local min and max positions')
localMinPositions = (np.diff(np.sign(np.diff(yspl))) > 0).nonzero()[0] + 1 # local min positions / first nulls
localMaxPositions = (np.diff(np.sign(np.diff(yspl))) < 0).nonzero()[0] + 1 # local max positions / peak
msg=f'Found positions of possible local mins at: {localMinPositions}, and local maxs at: {localMaxPositions}'
msg_wrapper("debug",log.debug,msg)
# get maximum of driftscan
ymax = max(yspl)
# Delete local min point within 50 points of either side of a local max point from the positions found above
maxPoints=50
pointsToDelete=test_position_validity(localMaxPositions,localMinPositions,maxPoints)
# If any invalid points found, delete them
if len(pointsToDelete)!=0:
for k in range(len(pointsToDelete)):
if pointsToDelete[k] in localMinPositions:
localMinPositions=list(localMinPositions)
ind=localMinPositions.index(pointsToDelete[k])
del localMinPositions[ind]
#ensure that the points are in an array
localMinPositions=np.array(localMinPositions)
msg=f'After validating points, the accepted positions for -> Min locs: {localMinPositions}, and Max locs: {localMaxPositions}'
msg_wrapper("debug",log.debug,msg)
# if len(localMinPositions)=0 try to establish other possible locations
if len(localMinPositions) == 0 :
msg = "- Failed to locate local min positions, set to FNBW locs"
msg_wrapper("warning", log.warning, msg)
flag = 21
# sys.exit()
try:
h=np.where(x<=-hfnbw)[0][-1]
print(h)
except:
h=x[0]
try:
#TODO write better notes on why i do this
hh=[np.where(x>=hfnbw)[0][0]][0]
except:
hh=x[-1]
localMinPositions=[h,hh]
# if len(localMinPositions) still = 0 and len(localMaxPositions) == 0
if len(localMinPositions) == 0 or len(localMaxPositions) == 0:
msg = "Failed to locate local min and max positions"
msg_wrapper("warning", log.warning, msg)
flag = 19
# print(msg)
# sys.exit()
return [], [], [], 1, flag, msg, 0, 0, 0, 0
# localMinPositions must be = 2, number of local minimum positions found
# IF MORE ARE FOUND WE MOST PROBABLY HAVE SIDELOBES
numberOfMinPositions = len(localMinPositions)
# Find the index or insertion point for the peakPosition in the
# locMinPositions list. i.e. where we most likely expect the
# peak to be located.
peakInd = bisect.bisect(localMinPositions, peakPosition)
scanLen = len(x)
# basic parameter info
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*30)
msg_wrapper("debug", log.debug,"Drift scan basic info: ")
msg_wrapper("debug", log.debug,"-"*30)
msg_wrapper("debug", log.debug,f"Found local minimums at = {localMinPositions}")
msg_wrapper("debug", log.debug,f"Found local maximums at = {localMaxPositions}")
msg_wrapper("debug", log.debug, f"no. of local min positions = {numberOfMinPositions}")
msg_wrapper('debug', log.debug, f"index of peak in local mins = {peakInd}") # i.e. if peak were inserted between the local mins, what would its index be
# check location of peak
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug, "Peak locations: ")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug, f"Peak pos from Gauss fit: {peakPosition}")
msg_wrapper("debug", log.debug, f"Peak pos from Spline fit: {peakSpline[0]}")
msg_wrapper("debug", log.debug, f"Peak pos if at mid of scan: {int(scanLen/2)}")
# Ensure peak falls within expected range, beyond 25% of the beginning of the drift scan and
# below 75% of the end of the drift scan.
peakLocMinLimit = int(scanLen*.25)
peakLocMaxLimit = int(scanLen*.75)
# If the code has struggled to locate the local min/max,
if ( peakPosition < peakLocMinLimit or peakPosition > peakLocMaxLimit ):
msg = "Peak not within expected range."
msg_wrapper("info", log.info, msg)
flag=20
msg_wrapper("info", log.info, f"Peak limit left: {peakLocMinLimit}")
msg_wrapper("info", log.info, f"Peak position: {peakPosition}")
msg_wrapper("info", log.info, f"Peak limit right: {peakLocMaxLimit}")
# sys.exit()
return [],[],[],1,flag,msg,[],[],0,0
else:
# locate/setup fnbw location on left and right of beam
# if we can't locate base locations program defaults
# to fnbw locations
try:
leftFNBWPoint = (np.where(x >= (peakCenterGuess-hfnbw))[0])[0]
except:
leftFNBWPoint = 0
try:
rightFNBWPoint = (np.where(x >= (peakCenterGuess+hfnbw))[0])[0]
except:
rightFNBWPoint = len(x)
if rightFNBWPoint == len(x) or leftFNBWPoint == 0:
flag=27
msg="Failed to locate FNBW locations"
msg_wrapper("info", log.info, msg)
# print(msg)
# sys.exit()
return [], [], [], 1, flag, msg, 0, 0, 0, 0
# msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug, "Locate/setup location of FNBW: ")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug, f"FNBW: {hfnbw*2}")
msg_wrapper("debug", log.debug, f"HFNBW: {hfnbw}")
msg_wrapper("debug", log.debug, f"left fnbw loc: {leftFNBWPoint}")
msg_wrapper("debug", log.debug, f"right fnbw loc: {rightFNBWPoint}")
# SEARCH FOR MINIMUMS
# We only need two points, one on left of peak and one on right of peak
# if we have more points this means there are side lobes and we
# need to adjust the baseline accordingly
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug, "Finding locations of local minimum from scan: ")
msg_wrapper("debug", log.debug, "-"*20)
# set fault parameters
rf = 0 # right is faulty rf=1
lf = 0 # left is faulty lf=1
if peakInd == 0 and len(localMinPositions) == 1:
# there is only one peak index position found
msg_wrapper("debug", log.debug, f'peak ind: {peakInd}, local min pos: {len(localMinPositions)}')
if localMinPositions[peakInd] < peakPosition:
# the position found is to the left of the assumed peak location
minPosOnLeftOfPeak = localMinPositions
minPosOnRightOfPeak = rightFNBWPoint
flag = 6
rf = 1
msg = "Failed to locate right min pos"
msg_wrapper("info", log.info,msg)
msg_wrapper("debug", log.debug,
"leftMinLoc found: {}\n".format(minPosOnLeftOfPeak))
msg_wrapper("debug", log.debug, "setting rightMinLoc: {}\n".format(
minPosOnRightOfPeak))
if localMinPositions[peakInd] > peakPosition:
# the position found is to the right of the assumed peak location
minPosOnLeftOfPeak = leftFNBWPoint
minPosOnRightOfPeak = localMinPositions
flag = 7
lf = 1
msg = "Failed to locate left min pos"
msg_wrapper("info", log.info,msg)
msg_wrapper("debug", log.debug,"setting leftMinLoc: {}\n".format(minPosOnLeftOfPeak))
msg_wrapper("debug", log.debug,"rightMinLoc found: {}\n".format(
minPosOnRightOfPeak))
elif peakInd < len(localMinPositions):
msg_wrapper("debug", log.debug, f'peak ind < local min pos: {peakInd} < {len(localMinPositions)}; i.e. one peak, 2 or more local mins' )
if (localMinPositions[peakInd] < peakPosition) or (localMinPositions[peakInd] > peakPosition):
# grab all the local min positions to the left and right of the peak
minPosOnLeftOfPeak = localMinPositions[:peakInd]
minPosOnRightOfPeak = localMinPositions[peakInd:]
msg_wrapper("debug", log.debug,f"all leftMinLoc: {minPosOnLeftOfPeak}")
msg_wrapper("debug", log.debug,f"all rightMinLoc: {minPosOnRightOfPeak}")
if len(minPosOnLeftOfPeak) == 0:
# there are no left min positons, set left min to fnbw point
minPosOnLeftOfPeak = leftFNBWPoint
flag = 7
lf = 1
msg = "Failed to locate left min pos"
msg_wrapper("info", log.info, msg)
if len(minPosOnRightOfPeak) == 0:
# there are no right min positions, set right min to fnbw point
minPosOnRightOfPeak = rightFNBWPoint
flag = 6
rf = 1
msg = "Failed to locate right min pos"
msg_wrapper("info", log.info, msg)
else:
if localMinPositions[peakInd-1] < peakPosition:
# can't find local mins beyond peak
minPosOnLeftOfPeak = localMinPositions[:peakInd]
minPosOnRightOfPeak = rightFNBWPoint
flag = 6
rf = 1
msg = "Failed to locate right min pos"
print(msg)
msg_wrapper("info", log.info,msg)
msg_wrapper("debug", log.debug,"leftMinLoc: {}\n".format(minPosOnLeftOfPeak))
msg_wrapper("debug", log.debug,"rightMinLoc: {}\n".format(
minPosOnRightOfPeak))
if localMinPositions[0] > peakPosition:
# all local mins are beyond peak
minPosOnLeftOfPeak = leftFNBWPoint
minPosOnRightOfPeak = localMinPositions
flag = 7
lf = 1
msg = "Failed to locate left min pos"
msg_wrapper("info", log.info,msg)
msg_wrapper("debug", log.debug,
"setting leftMinLocs : {}\n".format(minPosOnLeftOfPeak))
msg_wrapper("debug", log.debug, "rightMinLoc beyond peak: {}\n".format(
minPosOnRightOfPeak))
# Determine if data has large sidelobes by evaluating the region
# around the center peak
maxPosInMaxs = localMaxPositions[np.abs(localMaxPositions-peakPosition).argmin()]
indOfmaxPosInMaxs = np.where(localMaxPositions == maxPosInMaxs)[0]
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug, "Searching for sidelobes")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug, f"local max positions: {localMaxPositions}")
msg_wrapper("debug", log.debug, f"maxPosInMaxs: {maxPosInMaxs}")
msg_wrapper("debug", log.debug, f"IndexOfMaxPos: {indOfmaxPosInMaxs}")
msg_wrapper("debug", log.debug, f"lenmaxpos: {len(localMaxPositions)} ")
sidelobes = 0 # data contains no sidelobes
if len(localMaxPositions) > 1:
# theres more than one peak
if len(localMaxPositions) == indOfmaxPosInMaxs[0]:
leftSidelobe = localMaxPositions[indOfmaxPosInMaxs[0]-1]
rightSidelobe = localMaxPositions[indOfmaxPosInMaxs[0]]
else:
if indOfmaxPosInMaxs == len(localMaxPositions)-1:
leftSidelobe = localMaxPositions[indOfmaxPosInMaxs[0]-1]
rightSidelobe = []
else:
leftSidelobe = localMaxPositions[indOfmaxPosInMaxs[0]-1]
rightSidelobe = localMaxPositions[indOfmaxPosInMaxs[0]+1]
maxPeakInSpline = yspl[maxPosInMaxs]
halfOfMaxPeakInSpline = 0.5*maxPeakInSpline
# check if sidelobes are larger than half the peak
if (yspl[leftSidelobe] >= halfOfMaxPeakInSpline) or (yspl[rightSidelobe] >= halfOfMaxPeakInSpline):
# large sidelobes detected
msg_wrapper("info", log.info, "Large sidelobes detected: {} < {} < {}".format(
yspl[leftSidelobe], maxPeakInSpline, yspl[rightSidelobe]))
sidelobes = 1
msg = "Large sidelobes detected"
flag = 9
else:
msg_wrapper("info", log.info, "No sidelobes detected, moving on")
elif len(localMaxPositions) == 1:
msg_wrapper("info", log.info, "Passed sidelobe check: No sidelobes detected\n")
else:
msg_wrapper("info", log.info, "\nFailed to locate a maximum")
sys.exit()
maxloc = np.where(yspl==max(yspl))[0]
#print(maxloc,maxloc[0],len(yspl),yspl[0],yspl[-1])
# Make sure peak lies within FNBW window or points
if maxloc[0] < leftFNBWPoint or maxloc[0]> rightFNBWPoint:
# found peak in noise
msg_wrapper("info", log.info,"peak found in baselocs")
# sys.exit()
msg = "peak found in baselocs"
flag = 16
# we have sidelobes
# find local min positions ----------------------------------------
locminpos = (np.diff(np.sign(np.diff(yspl))) > 0).nonzero()[0] + 1
baseLine, leftBaselineBlock, rightBaselineBlock, minPosOnLeftOfPeak, minPosOnRightOfPeak, lp, rp = get_base(
locminpos,int((scanLen*.05)/2), len(x))
#locminpos, int((scanLen*.05)/2), len(x)), # why 5% ???
# ----------------------------------------------------------------
#print(lp, rp)
# pl.plot(x,y)
# pl.plot(x,yspl)
# pl.plot(x[leftBaselineBlock], y[leftBaselineBlock], 'b.')
# pl.plot(x[rightBaselineBlock],y[rightBaselineBlock],'m.')
# pl.plot(x[lp], y[lp], 'k.')
# pl.plot(x[rp],y[rp],'k*')
# pl.plot(x[minPosOnLeftOfPeak],yspl[minPosOnLeftOfPeak],"y*")
# pl.plot(x[minPosOnRightOfPeak], yspl[minPosOnRightOfPeak], "r*")
# pl.show()
# pl.close()
# sys.exit()
sidelobes=1
#sys.exit()
return baseLine, leftBaselineBlock, rightBaselineBlock, sidelobes, flag, msg, minPosOnLeftOfPeak, minPosOnRightOfPeak,lp,rp
#return [],[],[],1,flag,msg,[],[],0,0
else:
# Check if you returned anything: sanity check
try:
type(minPosOnRightOfPeak)
# check you are returned something, dummy check
except:
flag=6
msg="Failed to locate right min pos"
msg_wrapper("info", log.info, msg)
return [], [], [], np.nan, flag, msg, np.nan, np.nan, [], []
try:
# check you are returned something, dummy check
type(minPosOnLeftOfPeak)
except:
flag=7
msg="Failed to locate left min pos"
msg_wrapper("info", log.info, msg)
return [], [], [], np.nan, flag, msg, np.nan, np.nan, [], []
# print(type(minPosOnLeftOfPeak).__name__)
# Get the locations of the local minimum positions, if not found, return the position to be within
# a hundred points from both extreme ends of the data array.
if(type(minPosOnRightOfPeak).__name__) != 'ndarray':
if(type(minPosOnRightOfPeak).__name__) == 'list':
pass
else:
try:
minPosOnRightOfPeak=[minPosOnRightOfPeak]
except:
minPosOnRightOfPeak = [len(x)-100]
if(type(minPosOnLeftOfPeak).__name__) != 'ndarray':
if(type(minPosOnRightOfPeak).__name__) == 'list':
ls=[]
# check all values are integers
try:
for val in minPosOnLeftOfPeak:
# print(type(val).__name__, val)
if "float" in (type(val).__name__):
pass
else:
ls.append(val)
minPosOnLeftOfPeak=ls
except:
pass
else:
try:
minPosOnLeftOfPeak=[minPosOnLeftOfPeak]
except:
minPosOnLeftOfPeak = [100]
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("info", log.info,"Getting center of baseline blocks on left and right of peak: ")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("info", log.info,f"min pos left: {x[minPosOnLeftOfPeak]} @ loc/s {minPosOnLeftOfPeak}")
msg_wrapper("info", log.info,f"min pos right: {x[minPosOnRightOfPeak]} @ loc/s {minPosOnRightOfPeak}")
msg_wrapper("info", log.info,f"scan length: {scanLen}")
# Plot possible locations to fit your baseline
saveTo=f'{saveLoc}_baselocs.png'
plotBaselineEstimate(x, y, yspl, minPosOnLeftOfPeak, minPosOnRightOfPeak, \
"left local minumums", "right local minimums", "Baseline points selection", saveTo)
# Locate the baselines
# the number of points to use for baseline on each side of beam
# limited to 5% of length of the scan, this works well for
# situations where there are large sidelobes e.g. Jupiter, so
# we apply it to all scans, if fit is bad you can always use the GUI
# to fit by hand.
maxPointsInBaselineBlock = int(scanLen*.05)
hMaxPointsInBaselineBlock = int(maxPointsInBaselineBlock/2)
nums = np.arange(1, scanLen, 1) # array of numbers from 1 to len(list)
# set left and right baseline points placeholders
lb=[]
rb=[]
# Get baseline block on left of beam
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug,"Get baseline block on left of beam")
msg_wrapper("debug", log.debug, "-"*20)
if type(minPosOnLeftOfPeak).__name__ == "int64":
if lf == 1:
# left is faulty, set baseline points to half of the 5% limit
leftBaselineBlock = np.arange(0, minPosOnLeftOfPeak-hMaxPointsInBaselineBlock, 1)
lb = lb+[0, minPosOnLeftOfPeak-hMaxPointsInBaselineBlock]
else:
leftBaselineBlock = np.arange(
minPosOnLeftOfPeak-hMaxPointsInBaselineBlock, minPosOnLeftOfPeak+hMaxPointsInBaselineBlock, 1)
lb = lb+[minPosOnLeftOfPeak-hMaxPointsInBaselineBlock, minPosOnLeftOfPeak+hMaxPointsInBaselineBlock]
if minPosOnLeftOfPeak < len(leftBaselineBlock):
# min pos left is too close to the edge, adjust accordingly
leftBaselineBlock = np.arange(0, len(leftBaselineBlock), 1)
lb = lb+[0, len(leftBaselineBlock)]
msg_wrapper("debug", log.debug,f"mix: {leftBaselineBlock}")
sys.exit()
else:
leftBaselineBlock = []
slots = []
if lf == 0: # left not faulty
for i in range(len(minPosOnLeftOfPeak)):
slots = (np.arange(
minPosOnLeftOfPeak[i]-hMaxPointsInBaselineBlock, minPosOnLeftOfPeak[i]+hMaxPointsInBaselineBlock, 1))
lb = lb+[minPosOnLeftOfPeak[i]-hMaxPointsInBaselineBlock,
minPosOnLeftOfPeak[i]+hMaxPointsInBaselineBlock]
for j in range(len(slots)):
leftBaselineBlock.append(slots[j])
else:
# struggled to find left base so use all data to fnbw point
if len(minPosOnLeftOfPeak)==1:
leftBaselineBlock = np.arange(0,minPosOnLeftOfPeak[0],1)
lb = lb+[0,minPosOnLeftOfPeak[0]]
flag=7
msg="-- Failed to locate left min pos"
msg_wrapper("debug", log.debug,msg)
# TODO: Think about this a little bit more,
msg_wrapper("debug", log.debug, f"lb: {lb}")
# msg_wrapper("debug", log.debug, f"leftBaselineBlock: {leftBaselineBlock}")
# Get baseline block on right of beam
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug,"Get baseline block on right of beam")
msg_wrapper("debug", log.debug, "-"*20)
if type(minPosOnRightOfPeak).__name__ == "int64":
if rf == 1:
flag=6
msg="Failed to locate right min pos"
rightBaselineBlock = np.arange(minPosOnRightOfPeak+hMaxPointsInBaselineBlock, scanLen, 1)
rb= rb+[minPosOnRightOfPeak +
hMaxPointsInBaselineBlock, scanLen]
msg_wrapper("debug", log.debug,msg)
else:
rightBaselineBlock = np.arange(minPosOnRightOfPeak-hMaxPointsInBaselineBlock, minPosOnRightOfPeak+hMaxPointsInBaselineBlock, 1)
rb=rb+[minPosOnRightOfPeak-hMaxPointsInBaselineBlock,
minPosOnRightOfPeak+hMaxPointsInBaselineBlock]
if (scanLen - len(rightBaselineBlock)) < minPosOnRightOfPeak:
# min pos right is too close to the edge, adjust accordingly
rightBaselineBlock = np.arange(scanLen - len(rightBaselineBlock), scanLen, 1)
rb= rb+[scanLen - len(rightBaselineBlock), scanLen]
else:
rightBaselineBlock=[]
slots = []
#rf=[]
if rf == 0:
for i in range(len(minPosOnRightOfPeak)):
end=minPosOnRightOfPeak[i]+hMaxPointsInBaselineBlock
if end>len(x):
end = len(x)#-1
else:
pass
slots=(np.arange(
minPosOnRightOfPeak[i]-hMaxPointsInBaselineBlock, end-1, 1))
rb = rb+[minPosOnRightOfPeak[i]-hMaxPointsInBaselineBlock, end-1]
for j in range(len(slots)):
rightBaselineBlock.append(slots[j])
else:
rightBaselineBlock = np.arange(
scanLen-maxPointsInBaselineBlock, scanLen, 1)
rb = rb+[scanLen-maxPointsInBaselineBlock, scanLen-1]
flag = 6
msg = "Failed to locate right min pos"
msg_wrapper("debug", log.debug,msg)
msg_wrapper("debug", log.debug, f"rb: {rb}")
# ensure data makes sense
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug,"Ensure data makes sense: more sanity checks")
msg_wrapper("debug", log.debug, "-"*20)
try:
# find if data contains negatives and move/shift points forward
zeroIndex = leftBaselineBlock.index(0)
#print("\nzeroIndex found at index: {}".format(zeroIndex))
# find the difference between adjacent numbers and identify
# where to allocate shift
res = np.array([leftBaselineBlock[i + 1] - leftBaselineBlock[i] for i in range(len(leftBaselineBlock)-1)])
if len(res)>0:
# if more than one location for baseline is selected,
# determine shift and adjust accordingly
l = np.where(res!=1)[0]
#("shift parameter: ",l)
#print("value at shift parametera: ",leftBaselineBlock[l[0]],"\n")
left = leftBaselineBlock[zeroIndex:l[0]+1]
right = leftBaselineBlock[l[0]+1:]
s = np.arange(
leftBaselineBlock[l[0]]+1, leftBaselineBlock[l[0]]+1+zeroIndex, 1)
shiftedBlock = left+list(s)+right #np.arange(0, shift,1)
#print("shiftedleftbaselineblock: ", shiftedBlock)
leftBaselineBlock = shiftedBlock
msg_wrapper("debug", log.debug, "Shifted baseline block")
else:
pass
except Exception:
pass
try:
# find if data contains goes beyond max of scan
# and move/shift points backwards
maxIndex = rightBaselineBlock.index(scanLen)
#print("maxIndex: {}".format(maxIndex))
# find the difference between adjacent numbers and identify
# where to allocate shift
res = np.array([rightBaselineBlock[i + 1] - rightBaselineBlock[i] for i in range(len(rightBaselineBlock)-1)])
#print(res)
if len(res)>0:
# if more than one location for baseline is selected,
# determine shift and adjust accordingly
l = np.where(res != 1)[0]
#print("shift parameter: ",l)
if len(l) == 0:
msg_wrapper("debug", log.debug, "Shifting entire block")
last = rightBaselineBlock[-1]
shift =abs(scanLen-last)
shiftedBlock = np.arange(
rightBaselineBlock[0]-shift, scanLen, 1)
#print("shiftedrightbaselineblock: ", shiftedBlock)
rightBaselineBlock = shiftedBlock
else:
#print("value at shift parameterb: ",
# rightBaselineBlock[l[0]], "\n")
msg_wrapper("debug", log.debug, "Shifting block")
if len(l)==1:
left = rightBaselineBlock[:l[0]+1]
right = rightBaselineBlock[l[0]+1:]
shift = abs(rightBaselineBlock[-1]-scanLen)
s = np.arange(
rightBaselineBlock[l[0]+1]-shift, rightBaselineBlock[-1]-shift, 1)
shiftedBlock = left+list(s)
#print("shiftedrightbaselineblock: ", shiftedBlock)
rightBaselineBlock = shiftedBlock
elif len(l)==2:
#print("Length l > 1")
# find value closest to max
nearest = min(l, key=lambda t: abs(t-maxIndex))
index=[np.where(l==nearest)[0]][0]
#print(nearest,index[0])
#print("value at shift parameter: ",
# rightBaselineBlock[nearest], "\n")
# shift backwards
left = rightBaselineBlock[:nearest+1]
right = rightBaselineBlock[nearest+1:]
#print("left: ", left)
#print("right: ", right)
shift = abs(rightBaselineBlock[-1]-scanLen)
#print("shifting back by: ", shift)
s = np.arange(
rightBaselineBlock[nearest+1]-shift, rightBaselineBlock[-1]-shift, 1)
shiftedBlock = left+list(s)
#print("shiftedrightbaselineblock: ", shiftedBlock)
rightBaselineBlock = shiftedBlock
else:
#print("l > 2")
msg = "Too many shift parameters."
flag=18
msg_wrapper("debug", log.debug, f"Too many shift parameters: Peak limit left: {peakLocMinLimit}, Peak position: {peakPosition}, Peak limit right: {peakLocMaxLimit}")
sys.exit()
return [],[],[],1,flag,msg,np.nan
except Exception:
pass
# msg_wrapper("debug", log.debug, f"lb: {lb}")
# msg_wrapper("debug", log.debug, f"rb: {rb}")
# print(len(x),len(y),len(yspl))
# print(leftBaselineBlock)
# print(rightBaselineBlock)
saveTo=f'{saveLoc}_baselocsOutline.png'
plotBaselineEstimate(x, y, yspl, lb, rb,'baselocs at left of beam','baselocs at rigth of beam',\
"Plot with baseline blocks outlined", saveTo, leftBaselineBlock, rightBaselineBlock)
# create a single baseline block
baseLine = list(leftBaselineBlock) + list(rightBaselineBlock)
baseLine = np.array(baseLine, int)
return baseLine, leftBaselineBlock, rightBaselineBlock, sidelobes, flag, msg, minPosOnLeftOfPeak, minPosOnRightOfPeak,lb,rb
[docs]
def correct_drift(xBase, yBase, x, log, order=1):
'''
Correct for a drifting baseline in the scan by fitting
a first order polynomial to a region with no
signal.
Parameters
-----------
xBase : x data of the baseline
yBase : y data of the baseline
x : x data of the entire drift scan
'''
# fit the baseline and get best fit coeffecients
driftModel, driftRes, driftRms, driftCoeffs = calc_residual_and_rms(
xBase, yBase,log, order)
dataModel = np.polyval(driftCoeffs, x)
msg_wrapper("info", log.info, "\n")
msg_wrapper("info", log.info, "-"*30)
msg_wrapper("info", log.info, "Fit the baseline")
msg_wrapper("info", log.info,"-"*30)
msg = "Fit = {:.3}x + ({:.3}), rms error = {:.3}".format(driftCoeffs[0], driftCoeffs[1], driftRms)
msg_wrapper("info", log.info, msg)
return dataModel, driftModel, driftRes, driftRms, driftCoeffs
[docs]
def calc_residual_and_rms(x, y, log, order=1):
"""Calculate the residual and rms from data
Args:
x (array): 1d array
data representing the x-axis
y (array): 1d array
data representing the y-axis
deg(int): degree of the polynomial
Return:
model (array): model of
res (array):
rms (float):
coeff():
"""
#TODO: remove redundant code
# fit the baseline and get best fit coeffecients
coeffs = poly_coeff(x, y, order)
msg_wrapper("debug",log.debug,f'coeffs: {coeffs}')
# get a model for the fit using the best fit coeffecients
model = np.polyval(coeffs, x)
# Calculate the residual and rms
res, rms = calc_residual(y, model)
return model, res, rms, coeffs
[docs]
def poly_coeff(x, y, deg):
'''
Calculate the polynomial coeffecients depending
on the degree/order of the polynomial
Args:
x (array): 1d array
data representing the x-axis
y (array): 1d array
data representing the y-axis
deg(int): degree of the polynomial
Return:
array of polynomial fitted data
'''
return np.polyfit(x, y, deg)
[docs]
def fit_beam(x, y, p, fnbw, force, log, saveTag, fitTheoretical, autoFit=None):
"""
Fit single beam data.
Parameters:
x (array): 1D array of data representing the x-axis
y (array): 1D array of data representing the y-axis
p (list): list of initial fit parameters
fnbw (float): source first null beam width from file
dec (float): source declination
data (dict): dictionary of source parameters
scanNum (int): Value representing the current scan
force (str): String to determine whether to force a fit or not
log (object): loffing object
"""
# Setup parameters
scanLen = len(x) # length of the scan
hhpbw = p[2]/2 # half of the hpbw
hfnbw = fnbw/2 # half of the fnbw
flag = 0 # default flag set to zero
msg = "" # flag message to go on plot
# Try to fit a gaussian to determine peak location
# this also works as a bad scan filter
coeff, fit, flag = test_gauss_fit(x, y, p,log)
if len(coeff)==0:
if force=="y":
pass
else:
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
if autoFit !=None and len(autoFit)>1:
# If given fitting points
#------------------------
#print(autoFit, autoFit['baselocs'])
#print(float(autoFit['baselocs']))
# fit baseline
# 2) correct the drift in the data and get the residual and rms
baseLocs=[]#np.where()[0]
for i in range(len(autoFit['baselocs'])):
if i%2==0:
#print(autoFit['baselocs'][i],autoFit['baselocs'][i+1])
if i==0:
baseLocsl=np.where((x>=autoFit['baselocs'][i]) & (x<=autoFit['baselocs'][i+1]))[0]
else:
baseLocsr=np.where((x>=autoFit['baselocs'][i]) & (x<=autoFit['baselocs'][i+1]))[0]
d=np.where((x>=autoFit['baselocs'][i]) & (x<=autoFit['baselocs'][i+1]))[0]
baseLocs=baseLocs+list(d)
#print(baseLocs)
#baseLocsl=x[b1]
#baseLocsr=x[b2]
lb=[baseLocsl[0],baseLocsl[-1]]
rb=[baseLocsr[0],baseLocsr[-1]]
#print(lb,rb)
#sys.exit()
# fit baseline blocks
dataModel, driftModel, driftRes, driftRms, driftCoeffs = correct_drift(
x[baseLocs], y[baseLocs], x,log)
# 4) apply a polynomial fit to the baseline data
lin_first_null = np.poly1d(np.polyfit(x[baseLocs], y[baseLocs], 1))
# 5) Subtract the polynomial fitted to the baseline to get corrected beam
yCorrected = y - lin_first_null(x)
# 6) get sline of corrected data
s = spline(x, yCorrected)
plotCorrectedData(x,yCorrected,baseLocsl,baseLocsr,'Corrected data','blocks','Plot of baseline corrected data',f'{saveTo}corrected.png',xlabel="",ylabel="")
plot_overlap(x,yCorrected,x,s,'Plot of splined data','corrected','spline fit',f'{saveTo}splined.png',xlabel="",ylabel="")
# pl.title("Baseline corrected data")
# pl.xlabel("Scandist [Deg]")
# pl.ylabel("Ta [K]")
# #pl.plot(x,y)
# pl.plot(x, yCorrected, 'k',label="baseline corrected data")
# #pl.plot(x[main_beam], yCorrected[main_beam])
# pl.plot(x[baseLocs], yCorrected[baseLocs],".")
# pl.plot(x[lb], yCorrected[lb],".")
# #pl.plot(x,s)
# pl.plot(x,np.zeros_like(x),'k')
# #pl.grid()
# pl.legend(loc="best")
# try:
# pl.savefig(saveFolder+"baseline_corrected_data.png")
# except:
# pass
# #pl.show()
# pl.close()
# #sys.exit()
# fit a polynomial to peak data
msg_wrapper("info", log.info, "Fit the peak")
print("*"*60)
hmain_beam=np.where((x>=autoFit['peaklocs'][0]) & (x<=autoFit['peaklocs'][1]))[0]
ypeak = np.polyval(np.polyfit(x[hmain_beam],
yCorrected[hmain_beam], 2), x[hmain_beam])
# get residual and rms of peak fit
fitRes, err_peak = calc_residual(yCorrected[hmain_beam], ypeak)
# pl.title("Plot of final peak fitted data")
# pl.xlabel("Scandist [Deg]")
# pl.ylabel("Ta [K]")
# pl.plot(x, yCorrected, "k", label="corrected data")
# #pl.plot(x[main_beam],yCorrected[main_beam])
# pl.plot(x[hmain_beam], yCorrected[hmain_beam])
# #pl.plot(x,fit)
# pl.plot(x[hmain_beam],ypeak,"r",label="Ta[K] = %.3f +- %.3f" %(max(ypeak),err_peak))
# pl.plot(x,np.zeros(scanLen),"k")
# #pl.grid()
# pl.legend(loc="best")
# try:
# pl.savefig(saveFolder+"peak_fit_data.png")
# except:
# pass
# #pl.show()
# pl.close()
#sys.exit()
# find final peak loc
ploc = np.where(ypeak == max(ypeak))[0]
peakLoc = (x[hmain_beam])[ploc[0]]
# return max(ypeak), ypeak, err_peak, yCorrected, hmain_beam, "", driftRes, driftRms, driftCoeffs, fitRes, coeff[1],
# flag, baseLocsl, baseLocsr, baseLocs, peakLoc,lb,rb
ret={
"peakFit":max(ypeak),"peakModel":ypeak, "peakRms":err_peak,"correctedData":yCorrected,"peakPts":hmain_beam,
"msg":"","driftRes":driftRes,"driftRms":driftRms,"driftCoeffs":driftCoeffs,"fitRes":fitRes,"midXValue":coeff[1],
"flag":flag,"baseLocsLeft":baseLocsl,"baseLocsRight":baseLocsr,"baseLocsCombined":baseLocs,"peakLoc":peakLoc,
"baseLeft":lb,"baseRight":rb
}
return ret
sys.exit()
else:
# 1. Locate baseline blocks
# These are the blocks that will be used to correct the drift in the data
if force=="y" and flag==3:
baseLocsl=[]
baseLocsr=[]
else:
if fitTheoretical=="y":
#baseLocs, baseLocsl, baseLocsr, sidelobes, flag, msg, minPosOnLeftOfPeak, minPosOnRightOfPeak, lb, rb = locate_baseline_blocks_auto(
#x, y, coeff[1], hfnbw,log)
baseLocs, baseLocsl, baseLocsr, sidelobes = locate_baseline_blocks_fnbw(
x, hfnbw,)
#print(baseLocsl,baseLocsr)
flag = 26
msg = "Fitting left and right theoretical (FNBW) points"
minPosOnLeftOfPeak = baseLocsl[int(len(baseLocsl)/2)]
minPosOnRightOfPeak = baseLocsr[int(len(baseLocsr)/2)]
lb = [baseLocsl[0],baseLocsl[-1]]
rb = [baseLocsr[0],baseLocsr[-1]]
#sys.exit()
else:
msg_wrapper("info",log.info,"AUTO fitting started.")
baseLocs, baseLocsl, baseLocsr, sidelobes, flag, msg, minPosOnLeftOfPeak, minPosOnRightOfPeak, lb, rb = locate_baseline_blocks_auto(
x, y, coeff[1], hfnbw,log, saveTag)
if 'int' in type(minPosOnRightOfPeak).__name__ or 'float' in type(minPosOnRightOfPeak).__name__ or len(minPosOnRightOfPeak) == 0:
print("Failed to locate right min pos")
flag=6
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, 6, [], [], [], np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
elif 'int' in type(minPosOnLeftOfPeak).__name__ or len(minPosOnLeftOfPeak) == 0:
print("Failed to locate left min pos")
flag=7
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, 7, [], [], [], np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
else:
pass
if len(baseLocsl) == 0 or len(baseLocsr) == 0:
# one side of the basline has could not be located possibly due to rfi
msg = "*failed to locate base locs"
msg_wrapper("warning", log.warning, msg)
sys.exit()
flag=30
# Force the fit
if force=="y":
print("Forcing the fit")
# set baseline points and fit edges or fnbw points
#print()
# 1. try fitting the data with the centre at 0
baseLocs, baseLocsl, baseLocsr, sidelobes, flag, msg, minPosOnLeftOfPeak, minPosOnRightOfPeak, lb, rb = locate_baseline_blocks_auto(x, y, 0, hfnbw,log)
msg="force fitted local mins"
if len(baseLocsl) == 0 or len(baseLocsr) == 0:
# 2. try fitting the hfnbw points
baseLocsl = (np.where(x <= (-hfnbw))[0])
baseLocsr = (np.where(x >= (hfnbw))[0])
baseLocs = list(baseLocsl)+list(baseLocsr)
msg="force fitted fnbw locs"
sidelobes=2 #error fitting the data
# if both methods failed, exit
if len(baseLocsl) == 0 or len(baseLocsr) == 0:
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [], np.nan
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
'''pl.title("Plot of baseline corrected data")
pl.xlabel("Scandist [Deg]")
pl.ylabel("Ta [K]")
#pl.plot(x,y)
pl.plot(x, y, 'k',label="baseline corrected data")
#pl.plot(x[main_beam], yCorrected[main_beam])
pl.plot(x[baseLocs], y[baseLocs],".")
pl.show()
pl.close()
sys.exit()'''
# fit the baseline
dataModel, driftModel, driftRes, driftRms, driftCoeffs = correct_drift(x[baseLocs], y[baseLocs], x,log)
# get main beam
try:
main_beam = np.where(np.logical_and(x >= coeff[1]-hfnbw, x <= coeff[1]+hfnbw))[0]
except:
print("Failed to get coeff[1], setting beam to fnbw window")
main_beam = np.where(np.logical_and(
x >= hfnbw, x <= hfnbw))[0]
# 4) apply a polynomial fit to the baseline data
lin_first_null = np.poly1d(np.polyfit(x[baseLocs], y[baseLocs], 1))
# 5) Subtract the polynomial fitted to the baseline to get corrected beam
yCorrected = y - lin_first_null(x)
# 6) get sline of corrected data
s = spline(x, yCorrected)
pl.title("Plot of baseline corrected data")
pl.xlabel("Scandist [Deg]")
pl.ylabel("Ta [K]")
#pl.plot(x,y)
pl.plot(x, yCorrected, 'k',label="baseline corrected data")
#pl.plot(x[main_beam], yCorrected[main_beam])
pl.plot(x[baseLocsl], yCorrected[baseLocsl],".")
pl.plot(x[baseLocsr], yCorrected[baseLocsr],".")
#pl.plot(x,s)
pl.plot(x,np.zeros_like(x),'k')
pl.grid()
pl.legend(loc="best")
try:
pl.savefig(saveFolder+"baseline_corrected_data.png")
except:
pass
#pl.show()
pl.close()
#sys.exit()
# fit the peak
# 3) GET LOCATIONS OF DATA FOR THE MAIN BEAM ONLY
# this is limited by the fnbw
# check peak is at centre
main_beam = np.where(np.logical_and(
x >= -hfnbw, x <= hfnbw))[0]
# 4) apply a polynomial fit to the baseline data
lin_first_null = np.poly1d(np.polyfit(x[baseLocs], y[baseLocs], 1))
# 5) Subtract the polynomial fitted to the baseline to get corrected beam
yCorrected = y - lin_first_null(x)
# 6) get sline of corrected data
s = spline(x, yCorrected)
# 7. Fit the peak
# find max location
msg_wrapper("info", log.info, "Fit the peak")
print("*"*60)
maxp = max(s)
maxloc = np.where(s == maxp)[0]
xmaxloc = x[maxloc[0]]
# max is not at center, found at extreme ends of scan
if (max(s) == s[0]) or (max(s) == s[-1]):
msg = "max of peak is not at center, failed peak fit "
msg_wrapper("warning", log.warning, msg)
flag = 10
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
else:
pass
# 7) Get indices where the x values are within the main beam
# Get top 30% of beam
xMainBeam = x[main_beam]
yMainBeam = s[main_beam]
maxs = max(s[main_beam])
loc = np.where(s[main_beam] == maxs)[0]
midxMainBeam = xMainBeam[loc[0]]
# Get main beam for peak fitting
try:
hmain_beamn = np.where(np.logical_and(
xMainBeam >= coeff[1]-hhpbw, xMainBeam <= coeff[1]+hhpbw))[0]
except:
hmain_beamn = np.where(np.logical_and(
xMainBeam >= -hhpbw, xMainBeam <= hhpbw))[0]
# Fit top 50% or 30% depending on sidelobe confirmation
if sidelobes == 1:
hmain_beamp = np.where(yMainBeam[hmain_beamn] >= 0.5*maxp)[0]
flag = 9
msg = "large sidelobes detected"
msg_wrapper("warning", log.warning, msg)
else:
hmain_beamp = np.where(yMainBeam[hmain_beamn] >= 0.7*maxp)[0]
flag = 0
if (len(hmain_beamp) == 0) or len(hmain_beamn) ==0:
msg = "couldn't find peak main beam data"
flag=4
msg_wrapper("warning", log.warning, msg)
# find peak of spline
sloc = np.where(s == max(s))[0]
print("sloc: ", sloc, ", len: ", len(
yCorrected), ", mid: ", len(yCorrected)/2)
#print(data)
pl.title("Baseline corrected data")
pl.xlabel("Scandist [Deg]")
pl.ylabel("Ta [K]")
pl.plot(x,y)
pl.plot(x, yCorrected, 'k',label="baseline corrected data")
pl.plot(x[main_beam], yCorrected[main_beam])
pl.plot(x[baseLocsl], yCorrected[baseLocsl],".")
pl.plot(x[baseLocsr], yCorrected[baseLocsr],".")
#pl.plot(x,s)
pl.plot(x,np.zeros_like(x),'k')
#pl.grid()
pl.legend(loc="best")
#pl.savefig(saveFolder+"baseline_corrected_data.png")
pl.show()
pl.close()
sys.exit()
# peak shifted left
if sloc < len(yCorrected)/2 and sloc != 0:
print("Peak is shifted to left, but not first element")
print(x[sloc], hhpbw)
# look for peak around new max loc
beam = np.where(np.logical_and(
x >= x[sloc]-hhpbw, x <= x[sloc]+hhpbw))[0]
# fit peak
if len(beam) > 0:
ypeak = np.polyval(np.polyfit(
x[beam], yCorrected[beam], 2), x[beam])
# get residual and rms of peak fit
fitRes, err_peak = calc_residual(
yCorrected[beam], ypeak)
# find final peak loc
ploc = np.where(ypeak == max(ypeak))[0]
peakLoc = (x[beam])[ploc[0]]
# return max(ypeak), ypeak, err_peak, yCorrected, beam, msg, driftRes, driftRms, driftCoeffs, fitRes,
# np.nan, flag, baseLocsl, baseLocsr, baseLocs, peakLoc
ret={"peakFit":max(ypeak),"peakModel":ypeak, "peakRms":err_peak,"correctedData":yCorrected,"peakPts":beam,
"msg":"","driftRes":driftRes,"driftRms":driftRms,"driftCoeffs":driftCoeffs,"fitRes":fitRes,"midXValue":np.nan,
"flag":flag,"baseLocsLeft":baseLocsl,"baseLocsRight":baseLocsr,"baseLocsCombined":baseLocs,"peakLoc":peakLoc,
"baseLeft":[],"baseRight":[]
}
return ret
else:
# couldn't locate peak
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [], np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
else:
# peak shifted right
print("Peak is shifted to left, but not first element")
print(x[sloc], hhpbw)
# look for peak around new max loc
beam = np.where(np.logical_and(
x >= x[sloc]-hhpbw, x <= x[sloc]+hhpbw))[0]
# fit peak
if len(beam) > 0:
ypeak = np.polyval(np.polyfit(
x[beam], yCorrected[beam], 2), x[beam])
# get residual and rms of peak fit
fitRes, err_peak = calc_residual(
yCorrected[beam], ypeak)
# find final peak loc
ploc = np.where(ypeak == max(ypeak))[0]
peakLoc = (x[beam])[ploc[0]]
#sys.exit()
# return max(ypeak), ypeak, err_peak, yCorrected, beam, msg, driftRes, driftRms, driftCoeffs, fitRes, np.nan,
# flag, baseLocsl, baseLocsr, baseLocs, peakLoc
return {"peakFit":max(ypeak),"peakModel":ypeak, "peakRms":err_peak,"correctedData":yCorrected,"peakPts":beam,
"msg":"","driftRes":driftRes,"driftRms":driftRms,"driftCoeffs":driftCoeffs,"fitRes":fitRes,"midXValue":np.nan,
"flag":flag,"baseLocsLeft":baseLocsl,"baseLocsRight":baseLocsr,"baseLocsCombined":baseLocs,"peakLoc":peakLoc,
"baseLeft":[],"baseRight":[]
}
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
hmain_beam = hmain_beamp + hmain_beamn[0]+main_beam[0]
if hmain_beam[0] > baseLocsr[-1] or hmain_beam[-1] < baseLocsl[0]:
# peak loctaion is beyond fnbw locations
msg = "peak location is beyond fnbw locations"
msg_wrapper("warning", log.warning, msg)
flag = 24
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
# fit a polynomial to peak data
ypeak = np.polyval(np.polyfit(x[hmain_beam],
yCorrected[hmain_beam], 2), x[hmain_beam])
# check if peak was fit correctly
if max(ypeak) == ypeak[0] or max(ypeak) == ypeak[-1]:
# peak is concave, try fitting wider range
hmain_beamp = np.where(yMainBeam[hmain_beamn] >= 0.5*maxp)[0]
hmain_beam = hmain_beamp + hmain_beamn[0]+main_beam[0]
ypeak = np.polyval(np.polyfit(x[hmain_beam],
yCorrected[hmain_beam], 2), x[hmain_beam])
if (max(ypeak) == ypeak[0]) or (max(ypeak) == ypeak[-1]):
# still struggling to fit ?, stop fitting
msg = "failed to accurately establish peak fit location, 1"
msg_wrapper("warning", log.warning, msg)
flag = 5
pl.title("Plot of baseline corrected data")
pl.xlabel("Scandist [Deg]")
pl.ylabel("Ta [K]")
#pl.plot(x,y)
pl.plot(x, yCorrected, 'k',label="baseline corrected data")
pl.plot(x,s)
pl.plot(x[hmain_beamp], yCorrected[hmain_beamp],label=msg)
pl.plot(x[baseLocsl], yCorrected[baseLocsl],".")
pl.plot(x[baseLocsr], yCorrected[baseLocsr],".")
#pl.plot(x,s)
pl.plot(x,np.zeros_like(x),'k')
pl.grid()
pl.legend(loc="best")
#pl.savefig(saveFolder+"baseline_corrected_data.png")
#pl.show()
pl.close()
#sys.exit()
# find peak of spline
sloc = np.where(s==max(s))[0]
print("sloc: ",sloc,", len: ", len(yCorrected),", mid: ",len(yCorrected)/2)
# peak shifted left
if sloc < len(yCorrected)/2 and sloc!=0:
print("Peak is shifted to left, but not first element")
print(x[sloc],hhpbw)
# look for peak around new max loc
beam = np.where(np.logical_and(
x >= x[sloc]-hhpbw, x <= x[sloc]+hhpbw))[0]
# fit peak
if len(beam) > 0:
ypeak = np.polyval(np.polyfit(
x[beam], yCorrected[beam], 2), x[beam])
# get residual and rms of peak fit
fitRes, err_peak = calc_residual(yCorrected[beam], ypeak)
# find final peak loc
ploc = np.where(ypeak == max(ypeak))[0]
peakLoc = (x[beam])[ploc[0]]
# return max(ypeak), ypeak, err_peak, yCorrected, beam, msg, driftRes, driftRms, driftCoeffs, fitRes, np.nan,
# flag, baseLocsl, baseLocsr, baseLocs,peakLoc
return {
"peakFit":max(ypeak),"peakModel":ypeak, "peakRms":err_peak,"correctedData":yCorrected,"peakPts":beam,
"msg":"","driftRes":driftRes,"driftRms":driftRms,"driftCoeffs":driftCoeffs,"fitRes":fitRes,"midXValue":np.nan,
"flag":flag,"baseLocsLeft":baseLocsl,"baseLocsRight":baseLocsr,"baseLocsCombined":baseLocs,"peakLoc":peakLoc,
"baseLeft":[],"baseRight":[],
}
else:
# couldn't locate peak
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
sys.exit()
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
# get residual and rms of peak fit
fitRes, err_peak = calc_residual(yCorrected[hmain_beam], ypeak)
# find final peak loc
ploc = np.where(ypeak == max(ypeak))[0]
peakLoc = (x[hmain_beam])[ploc[0]]
# return max(ypeak), ypeak, err_peak, yCorrected, hmain_beam, "", driftRes, driftRms, driftCoeffs, fitRes, np.nan,
# flag, baseLocsl, baseLocsr, baseLocs,peakLoc
return {
"peakFit":max(ypeak),"peakModel":ypeak, "peakRms":err_peak,"correctedData":yCorrected,"peakPts":hmain_beam,
"msg":"","driftRes":driftRes,"driftRms":driftRms,"driftCoeffs":driftCoeffs,"fitRes":fitRes,"midXValue":np.nan,
"flag":flag,"baseLocsLeft":baseLocsl,"baseLocsRight":baseLocsr,"baseLocsCombined":baseLocs,"peakLoc":peakLoc,
"baseLeft":[],"baseRight":[]
}
else:
flag = 37
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
else:
pass
# 2) correct the drift in the data and get the residual and rms
dataModel, driftModel, driftRes, driftRms, driftCoeffs = correct_drift(
x[baseLocs], y[baseLocs], x,log)
# 3) GET LOCATIONS OF DATA FOR THE MAIN BEAM ONLY
# this is limited by the fnbw
# check peak is at centre
main_beam = np.where(np.logical_and(
x >= coeff[1]-hfnbw, x <= coeff[1]+hfnbw))[0]
lin_first_null = np.poly1d(np.polyfit(x[baseLocs], y[baseLocs], 1)) # 4) apply a polynomial fit to the baseline data
yCorrected = y - lin_first_null(x) # 5) Subtract the polynomial fitted to the baseline to get corrected beam
s = spline(x, yCorrected) # 6) get spline of corrected data
saveTo=f'{saveTag}corrected.png'
plotCorrectedData(x,yCorrected,baseLocsl,baseLocsr,"baseline corrected data",'baseLocs','Plot of corrected data',saveTo)
# 7. Fit the peak
# find max location
msg_wrapper("info", log.info, "\n")
msg_wrapper("info", log.info, "-"*30)
msg_wrapper("info", log.info, "Fit the peak")
msg_wrapper("info", log.info,"-"*30)
maxp = max(s)
maxloc = np.where(s == maxp)[0]
# xmaxloc = x[maxloc[0]]
# max is not at center, found at extreme ends of scan
if (max(s) == s[0]) or (max(s) == s[-1]):
msg = "max of peak is not at center, failed peak fit "
msg_wrapper("warning", log.warning, msg)
flag = 10
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
else:
pass
# 7) Get indices where the x values are within the main beam
# Get top 30% of beam
xMainBeam = x[main_beam]
yMainBeam = s[main_beam]
try:
maxs = max(s[main_beam])
except:
msg = "max of peak is not at center, failed peak fit "
msg_wrapper("warning", log.warning, msg)
flag = 10
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
loc = np.where(s[main_beam] == maxs)[0]
midxMainBeam = xMainBeam[loc[0]]
# Try to fit a gaussian to confirm center peak location after correction of scan
try:
p0=[maxs, midxMainBeam, p[2], 0, 0]
coeff, covarMatrix = curve_fit(gauss_lin, x,yCorrected, p0)
fit = gauss_lin(x, *coeff)
# coeff, fit = fit_gauss_lin(
# x, yCorrected, [maxs, midxMainBeam, p[2], 0, 0])
except Exception:
msg = "couldnt find max in main beam"
msg_wrapper("warning", log.warning, msg)
flag = 11
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
# Get main beam for peak fitting
hmain_beamn = np.where(np.logical_and(
xMainBeam >= coeff[1]-hhpbw, xMainBeam <= coeff[1]+hhpbw))[0]
# Fit top 50% or 30% depending on sidelobe confirmation
if sidelobes == 1:
hmain_beamp = np.where(yMainBeam[hmain_beamn] >= 0.5*maxp)[0]
flag = 9
msg_wrapper("warning", log.warning, "large sidelobes detected")
else:
hmain_beamp = np.where(yMainBeam[hmain_beamn] >= 0.7*maxp)[0]
flag = 0
if (len(hmain_beamp) == 0) or len(hmain_beamn) ==0:
msg = "couldn't find peak main beam data"
flag=4
msg_wrapper("warning", log.warning, msg)
# sys.exit()
#print(data)
# pl.title("Baseline corrected data")
# pl.xlabel("Scandist [Deg]")
# pl.ylabel("Ta [K]")
# pl.plot(x,y)
# #pl.plot(x, yCorrected, 'k',label="baseline corrected data")
# pl.plot(x[main_beam], yCorrected[main_beam])
# pl.plot(x[baseLocsl], yCorrected[baseLocsl],".")
# pl.plot(x[baseLocsr], yCorrected[baseLocsr],".")
# #pl.plot(x,s)
# pl.plot(x,np.zeros_like(x),'k')
# #pl.grid()
# pl.legend(loc="best")
# #pl.savefig(saveFolder+"baseline_corrected_data.png")
# #pl.show()
# pl.close()
# #sys.exit()
# #sys.exit()
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [], np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
if (len(hmain_beamp) < 0.5*len(hmain_beamn)):
msg = "peak main beam data too noisy"
flag = 23
msg_wrapper("warning", log.warning, msg)
# sys.exit()
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
hmain_beam = hmain_beamp + hmain_beamn[0]+main_beam[0]
if hmain_beam[0] > baseLocsr[-1] or hmain_beam[-1] < baseLocsl[0]:
# peak loctaion is beyond fnbw locations
msg = "peak location is beyond fnbw locations"
msg_wrapper("warning", log.warning, msg)
flag = 24
# sys.exit()
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
# fit a polynomial to peak data
ypeak = np.polyval(np.polyfit(x[hmain_beam],
yCorrected[hmain_beam], 2), x[hmain_beam])
# # check if peak was fit correctly
msg=f'Test fit makes sense - left: {ypeak[0]:.3f}, center: {max(ypeak):.3f}, right: {ypeak[-1]:.3f}'
msg_wrapper("debug", log.debug, msg)
# print(max(ypeak),ypeak[0],ypeak[-1])
if max(ypeak) == ypeak[0] or max(ypeak) == ypeak[-1]:
# peak is concave, try fitting wider range
hmain_beamp = np.where(yMainBeam[hmain_beamn] >= 0.5*maxp)[0]
hmain_beam = hmain_beamp + hmain_beamn[0]+main_beam[0]
ypeak = np.polyval(np.polyfit(x[hmain_beam],
yCorrected[hmain_beam], 2), x[hmain_beam])
if (max(ypeak) == ypeak[0]) or (max(ypeak) == ypeak[-1]):
# still struggling to fit ?, try this then stop fitting
try:
# LOCATE LOCAL MINUMUM/MAXIMUM POSITIONS
# these values are used to figure out where to
# select our baseline points/locations
localMinPositions = (np.diff(np.sign(np.diff(s))) > 0).nonzero()[
0] + 1 # local min positions / first nulls
localMaxPositions = (np.diff(np.sign(np.diff(s))) < 0).nonzero()[
0] + 1 # local max positions / peak
msg=f'mins: {localMinPositions}, maxs: {localMaxPositions}'
msg_wrapper("debug", log.debug, msg)
# find peak closest to zero
locs=x[localMaxPositions]
k = min(locs, key=lambda x:abs(x-0))
locs=list(locs)
#print(f'locs: {locs}, k: {k}')
msg=f'pos of possible center peak: {locs.index(k)}'
msg_wrapper("debug", log.debug, msg)
# use this peak as the main peak
kpos=np.where(x==k)[0]
msg=f'peak is at loc: {kpos}'
msg_wrapper("debug", log.debug, msg)
ymax=s[kpos] #max(s)
yhalf= ymax/2
# find surrounding mins
l=min(localMinPositions, key=lambda x:abs(x-kpos))
ppos=list(localMinPositions).index(l)
# print(l,ppos,kpos)
# print(localMinPositions)
if l>kpos:
v=[localMinPositions[ppos-1],localMinPositions[ppos]]
print('window - ',localMinPositions[ppos-1],kpos,localMinPositions[ppos])
left=localMinPositions[ppos-1]
right=localMinPositions[ppos]
elif l < kpos:
print(localMinPositions[ppos],kpos,localMinPositions[ppos+1])
left=localMinPositions[ppos]
right=localMinPositions[ppos+1]
print('l < kpos')
#sys.exit()
else:
print(localMinPositions[ppos],kpos,localMinPositions[ppos+1])
left=localMinPositions[ppos]
right=localMinPositions[ppos+1]
print('l ? kpos')
sys.exit()
top = np.where((s >= yhalf)&(x>x[left])&(x<x[right]))
# peak is concave, try fitting wider range
hmain_beamp = top#np.where(yMainBeam[hmain_beamn] >= 0.5*maxp)[0]
#hmain_beam = hmain_beamp + hmain_beamn[0]+main_beam[0]
ypeak = np.polyval(np.polyfit(x[hmain_beamp],
yCorrected[hmain_beamp], 2), x[hmain_beamp])
# get residual and rms of peak fit
fitRes, err_peak = calc_residual(yCorrected[hmain_beamp], ypeak)
# find final peak loc
#ploc = np.where(ypeak == max(ypeak))[0]
peakLoc = (x[kpos])#[ploc[0]]
msg = "attempted to accurately establish peak fit location"
msg_wrapper("warning", log.warning, msg)
flag = 37
sys.exit()
pl.title("Plot of final peak fitted data")
pl.xlabel("Scandist [Deg]")
pl.ylabel("Ta [K]")
pl.plot(x, yCorrected, "k", label="corrected data")
#pl.plot(x[main_beam],yCorrected[main_beam])
pl.plot(x[hmain_beamp], yCorrected[hmain_beamp])
#pl.plot(x,fit)
pl.plot(x[hmain_beamp],ypeak,"r",label="Ta[K] = %.3f +- %.3f" %(max(ypeak),err_peak))
pl.plot(x,np.zeros(scanLen),"k")
#pl.grid()
pl.legend(loc="best")
try:
pl.savefig(saveFolder+"peak_fit_data.png")
except:
pass
#pl.show()
pl.close()
#sys.exit()
return {
"peakFit":max(ypeak),"peakModel":ypeak, "peakRms":err_peak,"correctedData":yCorrected,"peakPts":hmain_beamp,
"msg":"","driftRes":driftRes,"driftRms":driftRms,"driftCoeffs":driftCoeffs,"fitRes":fitRes,"midXValue":coeff[1],
"flag":flag,"baseLocsLeft":baseLocsl,"baseLocsRight":baseLocsr,"baseLocsCombined":baseLocs,"peakLoc":peakLoc,
"baseLeft":lb,"baseRight":rb,
}
# return max(ypeak), ypeak, err_peak, yCorrected, hmain_beamp, "", driftRes, driftRms, driftCoeffs, fitRes,
# coeff[1], flag, baseLocsl, baseLocsr, baseLocs, peakLoc,lb,rb
except:
msg = "failed to accurately establish peak fit location"
msg_wrapper("warning", log.warning, msg)
flag = 5
print(msg)
# sys.exit()
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
# print(max(ypeak),ypeak[0],ypeak[-1],abs(min(yCorrected)) > max(yCorrected))
if (abs(min(yCorrected)) > max(yCorrected)):
# still struggling to fit ?, stop fitting
msg = "min > max"
msg_wrapper("warning", log.warning, msg)
flag = 25
# sys.exit()
if force=="y":
pass
else:
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
else:
pass
# get residual and rms of peak fit
fitRes, err_peak = calc_residual(yCorrected[hmain_beam], ypeak)
saveTo=f'{saveTag}_peak_fit.png'
plotPeakFit("Plot of final peak fitted data",x,yCorrected,ypeak,err_peak,hmain_beam,saveTo)
# find final peak loc
ploc = np.where(ypeak == max(ypeak))[0]
peakLoc = (x[hmain_beam])[ploc[0]]
# print('out')
return {
"peakFit":max(ypeak),"peakModel":ypeak, "peakRms":err_peak,"correctedData":yCorrected,"peakPts":hmain_beam,
"msg":"","driftRes":driftRes,"driftRms":driftRms,"driftCoeffs":driftCoeffs,"fitRes":fitRes,"midXValue":coeff[1],
"flag":flag,"baseLocsLeft":baseLocsl,"baseLocsRight":baseLocsr,"baseLocsCombined":baseLocs,"peakLoc":peakLoc,
"baseLeft":lb,"baseRight":rb,
}
# return max(ypeak), ypeak, err_peak, yCorrected, hmain_beam, "", driftRes, driftRms, driftCoeffs, fitRes, coeff[1], flag, baseLocsl, baseLocsr, baseLocs, peakLoc,lb,rb
[docs]
def fit_dual_beam(x, y, hpbw, fnbw, factor,saveTo,log): #, npts, dec, srcType, data, scanNum, force, log):
# Setup parameters
scanLen = len(x) # length of the scan
midLeftLoc = int(scanLen/4) # estimated location of peak on left beam
midRightLoc = midLeftLoc * 3 # estimated location of peak on right beam
hhpbw = hpbw/2 # half of the hpbw
hfnbw = fnbw/2 # half of the fnbw
factoredfnbw = (fnbw*factor) # fnbw multiplied by factor
flag=0 # default flag set to zero
msg="" # flag message to go on plot image
# saveFolder="currentScanPlots/"
fl = 0 # failed left gaussian fit
fr = 0 # failed right gaussian fit
# LOCATE BASELINE BLOCKS
msg_wrapper("debug", log.debug, "-- Locate baseline blocks")
# we don't worry about sidelobes here so baseline
# blocks are set to edges or fnbw points
ptLimit = int(scanLen*0.04) # Point limit, number of allowed points per base block
baseLocsLeft = np.arange(0,ptLimit,1)
baseLocsRight = np.arange(scanLen-ptLimit,scanLen,1)
baseLocs = list(baseLocsLeft)+list(baseLocsRight)
if len(baseLocsLeft) == 0 or len(baseLocsRight) == 0:
msg = "failed to locate base locs"
flag = 30
msg_wrapper("warning", log.warning, msg)
# sys.exit()
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
msg_wrapper("debug", log.debug,
f'BaseLocsLeft: {baseLocsLeft[0]} to {baseLocsLeft[-1]} = {len(baseLocsLeft)} pts')
msg_wrapper("debug", log.debug,
f'BaseLocsLeft: {baseLocsRight[0]} to {baseLocsRight[-1]} = {len(baseLocsRight)} pts')
dataModel, driftModel, driftRes, driftRms, driftCoeffs = correct_drift(x[baseLocs], y[baseLocs], x,log)
# Correct the data and get global max/min
yCorrected = y-dataModel
maxyloc = np.argmax(yCorrected)
minyloc = np.argmin(yCorrected)
maxy = yCorrected[maxyloc]
miny = yCorrected[minyloc]
# Spline the data and get global max/min
yspl = spline(x, yCorrected)
ysplmaxloc = np.argmax(yspl)
ysplminloc = np.argmin(yspl)
ysplmax = yspl[ysplmaxloc]
ysplmin = yspl[ysplminloc]
# make plots
plotCorrectedData(x,yCorrected,baseLocsLeft,baseLocsRight,'Corrected data','Baseline blocks','Plot of baseline corrected data',f'{saveTo}corrected.png',xlabel="",ylabel="")
plot_overlap(x,yCorrected,x,yspl,'Plot of splined data','corrected','spline fit',f'{saveTo}splined.png',xlabel="",ylabel="")
# sys.exit()
msg_wrapper("debug", log.debug, "maxyloc: {}, maxy: {:.3f}, minyloc: {}, miny: {:.3f}".format(maxyloc, maxy, minyloc, miny))
msg_wrapper("debug", log.debug, "maxyloc: {}, maxy: {:.3f}, minyloc: {}, miny: {:.3f}\n".format(ysplmaxloc, ysplmax, ysplminloc, ysplmin))
msg_wrapper("debug", log.debug, "A beam peakloc: {} \n# B beam peakloc: {}".format(ysplmaxloc, ysplminloc))
# A/B BEAM DATA PROCESSING
AbeamScan = np.where(np.logical_and(x > -factoredfnbw, x < 0))[0]
BbeamScan = np.where(np.logical_and(x > 0, x < factoredfnbw))[0]
if len(AbeamScan) == 0 or len(BbeamScan) == 0:
msg="A/B beam scan data == 0, no data"
flag = 8
msg_wrapper("warning", log.warning, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
msg_wrapper("debug", log.debug, "- Beam seperation:")
msg_wrapper("debug", log.debug, "Left beam indeces: {} to {}".format(AbeamScan[0], AbeamScan[-1]))
msg_wrapper("debug", log.debug, "Right beam indeces: {} to {}\n".format(
BbeamScan[0], BbeamScan[-1]))
msg_wrapper("debug", log.debug, "- Data Limits")
msg_wrapper("debug", log.debug, "base left: {}, drift A left: {}, peak A: {}, drift A right: {}".format(baseLocsLeft[-1], AbeamScan[0], ysplminloc, AbeamScan[-1]))
msg_wrapper("debug", log.debug, "drift B left: {}, peak B: {}, drift B right: {}, base right: {}\n".format(
BbeamScan[0], ysplmaxloc, BbeamScan[-1], baseLocsRight[0]))
# figure out orientation of beam. With some scans the beams
# are flipped e.g, pks2326-502, A beam is positive, whereas
# j0450-81 A beam is negative.
# find value closest to zero and use the other value to determine
# which side to fit positive/negative peak
lstA=[min(yspl[AbeamScan]), max(yspl[AbeamScan])]
minA = min(lstA,key=abs)
fa=""
if minA==min(yspl[AbeamScan]):
# fit A beam positive B beam negative
#print("Fitting positive")
fa="p"
else:
# fit A beam negative B beam positive
#print("Fitting negative")
fa="n"
beamCut = 0.6 # fitting top 40%, 0.7 (30% for cals) or 0.5 (50% for targets) should make this an option
# Ensure peak is within accepted limits
if fa=="p":
if ysplmaxloc > AbeamScan[-1]:# or ysplminloc > beamScan[0]:
msg = "Max is beyond left baseline block"
flag = 31
msg_wrapper("warning", log.warning, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
if ysplminloc < BbeamScan[0]:#baseLocsRight[0]: # ysplminloc < BbeamScan[0] or
msg="Min is beyond right baseline block"
# print(msg)
flag=32
msg_wrapper("warning", log.warning, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
# Try to fit a gaussian to determine peak parameters
try:
# set initial parameters for data fitting
p = [max(y), x[midLeftLoc], hpbw, .01, .0]
# Try to fit a gaussian to determine peak location
# this also works as a bad scan filter
coeffl, fitl, flagl = test_gauss_fit(x[AbeamScan], y[AbeamScan], p,log)
# coeffl, covar_matrixl, fitLeft = fit_gauss_lin(
# x[AbeamScan], y[AbeamScan], p)
except Exception:
fl=1
msg = "gaussian curve_fit algorithm failed"
msg_wrapper("debug", log.debug, msg)
fitLeft = y[AbeamScan]
flag = 3
try:
# set initial parameters for data fitting
p = [min(y), x[midRightLoc], hpbw, .01, .0]
coeffr, fitr, flagr = test_gauss_fit(x[AbeamScan], y[AbeamScan], p,log)
# coeffr, covar_matrixr, fitRight = fit_gauss_lin(
# x[BbeamScan], y[BbeamScan], p)
except Exception:
fr = 1
msg = "gaussian curve_fit algorithm failed"
msg_wrapper("debug", log.debug, msg)
fitRight = y[BbeamScan]
flag = 3
# Determine peak fitting location
BbeamLeftLimit = x[ysplminloc]-hhpbw #coeffl[1] - hhpbw # *2*.6
BbeamRightLimit = x[ysplminloc]+hhpbw
AbeamLeftLimit = x[ysplmaxloc]-hhpbw #coeffl[1] - hhpbw # *2*.6
AbeamRightLimit = x[ysplmaxloc]+hhpbw
leftlocs = np.where(np.logical_and(
x >= AbeamLeftLimit, x <= AbeamRightLimit))[0]
rightlocs = np.where(np.logical_and(
x >= BbeamLeftLimit, x <= BbeamRightLimit))[0]
# select part of beam to fit
if ysplmax < 0.1:
flag = 36
msg = "fit entire left beam, max yspl < 0.1"
msg_wrapper("warning", log.warning, msg)
leftMainBeamLocs = leftlocs
else:
topCut = np.where(yspl[leftlocs] >= ysplmax*beamCut)[0]
leftMainBeamLocs = leftlocs[0]+np.array(topCut)
if ysplmin > -0.1:
flag = 35
msg = "fit entire right beam, min yspl > -0.1"
msg_wrapper("warning", log.warning, msg)
rightMainBeamLocs = rightlocs
else:
bottomCut = np.where(yspl[rightlocs] <= ysplmin*beamCut)[0]
rightMainBeamLocs = rightlocs[0]+np.array(bottomCut)
if fa=="n":
# A beam is min
if ysplmaxloc < AbeamScan[-1]:# or ysplminloc > beamScan[0]:
msg = "Max is beyond left baseline block"
flag = 31
msg_wrapper("warning", log.warning, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
if ysplminloc > BbeamScan[0]:#baseLocsRight[0]: # ysplminloc < BbeamScan[0] or
msg="Min is beyond right baseline block"
# print(msg)
flag=32
msg_wrapper("warning", log.warning, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
# Try to fit a gaussian to determine peak parameters
try:
# set initial parameters for data fitting
p = [min(y), x[midLeftLoc], hpbw, .01, .0]
coeffl, fitl, flag = test_gauss_fit(x[AbeamScan], y[AbeamScan], p,log)
# coeffl, covar_matrixl, fitLeft = fit_gauss_lin(
# x[AbeamScan], y[AbeamScan], p)
except Exception:
fl=1
msg = "gaussian curve_fit algorithm failed"
msg_wrapper("debug", log.debug, msg)
fitLeft = y[AbeamScan]
flag = 3
try:
# set initial parameters for data fitting
p = [max(y), x[midRightLoc], hpbw, .01, .0]
coeffr, fitr, flagr = test_gauss_fit( x[BbeamScan], y[BbeamScan], p,log)
# coeffr, covar_matrixr, fitRight = fit_gauss_lin(
# x[BbeamScan], y[BbeamScan], p)
except Exception:
fr = 1
msg = "gaussian curve_fit algorithm failed"
msg_wrapper("debug", log.debug, msg)
fitRight = y[BbeamScan]
flag = 3
# Determine peak fitting location
AbeamLeftLimit = x[ysplminloc]-hhpbw #coeffl[1] - hhpbw # *2*.6
AbeamRightLimit = x[ysplminloc]+hhpbw
BbeamLeftLimit = x[ysplmaxloc]-hhpbw #coeffl[1] - hhpbw # *2*.6
BbeamRightLimit = x[ysplmaxloc]+hhpbw
leftlocs = np.where(np.logical_and(
x >= AbeamLeftLimit, x <= AbeamRightLimit))[0]
rightlocs = np.where(np.logical_and(
x >= BbeamLeftLimit, x <= BbeamRightLimit))[0]
#select part of beam to fit
if ysplmax < 0.1:
flag = 36
msg = "fit entire left beam, max yspl < 0.1"
msg_wrapper("warning", log.warning, msg)
rightMainBeamLocs = rightlocs
else:
topCut = np.where(yspl[rightlocs] >= ysplmax*beamCut)[0]
rightMainBeamLocs = rightlocs[0]+np.array(topCut)
if ysplmin > -0.1:
flag = 35
msg = "fit entire right beam, min yspl > -0.1"
msg_wrapper("warning", log.warning, msg)
leftMainBeamLocs = leftlocs
else:
bottomCut = np.where(yspl[leftlocs] <= ysplmin*beamCut)[0]
leftMainBeamLocs = leftlocs[0]+np.array(bottomCut)
# fit left peak
ypeakl = np.polyval(np.polyfit(x[leftMainBeamLocs], yCorrected[leftMainBeamLocs], 2), x[leftMainBeamLocs])
fitResl, err_peakl = calc_residual(yCorrected[leftMainBeamLocs], ypeakl)
# fit right peak
ypeakr = np.polyval(np.polyfit(
x[rightMainBeamLocs], yCorrected[rightMainBeamLocs], 2), x[rightMainBeamLocs])
fitResr, err_peakr = calc_residual(yCorrected[rightMainBeamLocs], ypeakr)
if fa=="p":
ymin = min(ypeakr)
ymax = max(ypeakl)
else:
ymin = min(ypeakl)
ymax = max(ypeakr)
msg_wrapper("debug", log.debug, "A/B beam peak")
if fa=="p":
msg_wrapper("debug", log.debug, "left: {:.3f}, max: {:.3f}, right: {:.3f}".format(
ypeakl[0], ymax, ypeakl[-1]))
msg_wrapper("debug", log.debug, "left: {:.3f}, min: {:.3f}, right{:.3f}".format(
ypeakr[0], ymin, ypeakr[-1]))
ypeakrdata = x[rightMainBeamLocs]
ypeakldata = x[leftMainBeamLocs]
# check data doesn't overlap
overlapRight = set(baseLocsRight) & set(rightMainBeamLocs)
overlapLeft = set(baseLocsLeft) & set(leftMainBeamLocs)
# overlapbeams = set(leftMainBeamLocs) & set(rightMainBeamLocs)
msg=("checking for overlapping beams: ")
msg_wrapper("debug", log.debug, msg)
if len(overlapLeft) != 0:
msg = "beams don't overlap on left"
msg_wrapper("debug", log.debug, msg)
if leftMainBeamLocs[0] > baseLocsLeft[int(
len(baseLocsLeft)*.8)]:
pass
else:
overlap = next(iter(overlapLeft))
shift = list(leftMainBeamLocs).index(int(overlap))
msg = "Overlap found on A beam"
flag = 33
msg_wrapper("warning", log.warning, msg)
# move beam to the left
f = abs(len(leftMainBeamLocs)-shift)
leftMainBeamLocs = abs(leftMainBeamLocs+f)
# fit left peak
ypeakl = np.polyval(np.polyfit(
x[leftMainBeamLocs], yCorrected[leftMainBeamLocs], 2), x[leftMainBeamLocs])
fitResl, err_peakl = calc_residual(
yCorrected[leftMainBeamLocs], ypeakl)
ymax = max(ypeakl)
msg="left: {:.3f}, max: {:.3f}, right: {:.3f}".format(
ypeakl[0], ymax, ypeakl[-1])
msg_wrapper("debug", log.debug, msg)
if(ypeakl[0] >= ymax or ypeakl[-1] >= ymax):
ymax = np.nan
err_peakl = np.nan
else:
flag = 36
msg = "fit entire left beam"
msg_wrapper("debug", log.debug, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
if len(overlapRight) != 0:
overlap = next(iter(overlapRight))
shift = list(rightMainBeamLocs).index(int(overlap))
msg = "Overlap found on B beam"
flag = 34
msg_wrapper("warning", log.warning, msg)
# move beam to the RIGHT
f = abs(len(rightMainBeamLocs)-shift)
rightMainBeamLocs = abs(rightMainBeamLocs-f)
msg="beam shifted to left by {} points".format(f)
msg_wrapper("debug", log.debug, msg)
# fit right peak
ypeakr = np.polyval(np.polyfit(
x[rightMainBeamLocs], yCorrected[rightMainBeamLocs], 2), x[rightMainBeamLocs])
fitResr, err_peakr = calc_residual(
yCorrected[rightMainBeamLocs], ypeakr)
ymin = min(ypeakr)
msg="left: {:.3f}, min: {:.3f}, right{:.3f}".format(
ypeakr[0], ymin, ypeakr[-1])
msg_wrapper("debug", log.debug, msg)
if(ypeakr[0] <= ymin or ypeakr[-1] <= ymin):
ymin = np.nan
err_peakr = np.nan
else:
flag = 35
msg = "fit entire right beam"
msg_wrapper("debug", log.debug, msg)
ypeakrdata = x[rightMainBeamLocs]
#pl.plot(x, yCorrected)
'''#pl.plot(x[leftlocs],yCorrected[leftlocs])
#pl.plot(x[leftMainBeamLocs], yCorrected[leftMainBeamLocs])
#pl.plot(x[rightlocs],yCorrected[rightlocs])
pl.plot(x[rightMainBeamLocs], yCorrected[rightMainBeamLocs])
#pl.plot(x[leftMainBeamLocs], ypeakl)
pl.plot(x[rightMainBeamLocs], ypeakr)
pl.plot(x, np.zeros(scanLen))
pl.plot(x[baseLocs], yCorrected[baseLocs], ".")
#pl.show()
pl.close()
#sys.exit()'''
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
if ((x[leftMainBeamLocs])[-1] > 0):
flag = 28
msg = "left beam data goes beyond midpoint"
msg_wrapper("warning", log.warning, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
if ((x[rightMainBeamLocs])[-1] < 0):
flag = 29
msg = "right beam data goes beyond midpoint"
msg_wrapper("warning", log.warning, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
# import matplotlib.pyplot as pl
# pl.plot(x, yCorrected, label="corrected data")
# pl.plot(x[leftlocs], yCorrected[leftlocs], label="peakl data")
# pl.plot(x[leftMainBeamLocs], yCorrected[leftMainBeamLocs])
# pl.plot(x[rightlocs], yCorrected[rightlocs], label="peakr data")
# pl.plot(x[rightMainBeamLocs], yCorrected[rightMainBeamLocs])
# pl.plot(x[leftMainBeamLocs],
# ypeakl, label="peak fitl: {:.3f} +- {:.3f}".format(ymax, err_peakl))
# pl.plot(x[rightMainBeamLocs], ypeakr,
# label="peak fitr: {:.3f} +- {:.3f}".format(ymin, err_peakr))
# pl.plot(x[baseLocs], yCorrected[baseLocs], ".", label="baselocs")
# pl.plot(x, np.zeros(scanLen))
# pl.legend(loc="best")
# pl.grid()
# #pl.show()
# try:
# pl.savefig(saveFolder+"fitted.png")
# except:
# pass
# pl.close()
# sys.exit()
msg_wrapper("info", log.info, "\n")
msg_wrapper("info", log.info, "-"*30)
msg_wrapper("info", log.info, "Fit the peaks")
msg_wrapper("info", log.info,"-"*30)
msg="\npeak left: {:.3f} +- {:.3f} K\npeak right: {:.3f} +- {:.3f} K\n".format(
ymax, err_peakl, ymin, err_peakr)
msg_wrapper("info", log.info, msg)
# find final peak loc
ploca = np.where(ypeakl == ymax)[0]
if len(ploca)==0:
peakLoca=np.nan
else:
peakLoca = (x[leftMainBeamLocs])[ploca[0]]
# find final peak loc
plocb = np.where(ypeakr == ymin)[0]
if len(plocb)==0:
peakLocb=np.nan
else:
peakLocb = (x[rightMainBeamLocs])[plocb[0]]
return {"correctedData":yCorrected,"driftRes":driftRes,"driftRms":driftRms,
"driftCoeffs":driftCoeffs, "baseLocsCombined":baseLocs,
"baseLocsLeft":leftMainBeamLocs,"baseLocsRight":rightMainBeamLocs,
"leftPeakData":ypeakldata,"leftPeakModelData":ypeakl,
"leftPeakFit":ymax, "leftPeakFitErr":err_peakl,"leftPeakFitRes":fitResl,
"rightPeakData":ypeakrdata,"rightPeakModelData":ypeakr,
"rightPeakFit":ymin, "rightPeakFitErr":err_peakr,"rightPeakFitRes":fitResr,
"msg":"","midXValueLeft":peakLoca,"midXValueRight":peakLocb,
"flag":flag
}
else:
msg_wrapper("debug", log.debug, "left: {:.3f}, min: {:.3f}, right: {:.3f}".format(
ypeakl[0], ymin, ypeakl[-1]))
msg_wrapper("debug", log.debug, "left: {:.3f}, max: {:.3f}, right{:.3f}".format(
ypeakr[0], ymax, ypeakr[-1]))
ypeakrdata = x[rightMainBeamLocs]
ypeakldata = x[leftMainBeamLocs]
# check data doesn't overlap
overlapRight = set(baseLocsRight) & set(rightMainBeamLocs)
overlapLeft = set(baseLocsLeft) & set(leftMainBeamLocs)
# overlapbeams = set(leftMainBeamLocs) & set(rightMainBeamLocs)
msg=("checking for overlapping beams: ")
msg_wrapper("debug", log.debug, msg)
if len(overlapLeft) != 0:
msg = "beams don't overlap on left"
msg_wrapper("debug", log.debug, msg)
if leftMainBeamLocs[0] > baseLocsLeft[int(
len(baseLocsLeft)*.8)]:
pass
else:
overlap = next(iter(overlapLeft))
shift = list(leftMainBeamLocs).index(int(overlap))
msg = "Overlap found on A beam"
flag = 33
msg_wrapper("warning", log.warning, msg)
# move beam to the left
f = abs(len(leftMainBeamLocs)-shift)
leftMainBeamLocs = abs(leftMainBeamLocs+f)
# fit left peak
ypeakl = np.polyval(np.polyfit(
x[leftMainBeamLocs], yCorrected[leftMainBeamLocs], 2), x[leftMainBeamLocs])
fitResl, err_peakl = calc_residual(
yCorrected[leftMainBeamLocs], ypeakl)
ymax = max(ypeakl)
msg="left: {:.3f}, max: {:.3f}, right: {:.3f}".format(
ypeakl[0], ymax, ypeakl[-1])
msg_wrapper("debug", log.debug, msg)
if(ypeakl[0] <= ymax or ypeakl[-1] <= ymax):
ymax = np.nan
err_peakl = np.nan
else:
flag = 36
msg = "fit entire left beam"
msg_wrapper("debug", log.debug, msg)
ypeakldata = x[leftMainBeamLocs]
'''pl.plot(x, yCorrected)
pl.plot(x[leftMainBeamLocs], yCorrected[leftMainBeamLocs], 'b')
#pl.plot(x[rightlocs],yCorrected[rightlocs])
#pl.plot(x[rightMainBeamLocs], yCorrected[rightMainBeamLocs])
pl.plot(x[leftMainBeamLocs], ypeakl, 'r')
#pl.plot(x[rightMainBeamLocs], ypeakr)
pl.plot(x, np.zeros(scanLen))
pl.plot(x[baseLocs], yCorrected[baseLocs], ".")
#pl.show()
pl.close()
#sys.exit()'''
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
if len(overlapRight) != 0:
overlap = next(iter(overlapRight))
shift = list(rightMainBeamLocs).index(int(overlap))
msg = "Overlap found on B beam"
flag = 34
msg_wrapper("warning", log.warning, msg)
# move beam to the RIGHT
f = abs(len(rightMainBeamLocs)-shift)
rightMainBeamLocs = abs(rightMainBeamLocs-f)
msg="beam shifted to left by {} points".format(f)
msg_wrapper("debug", log.debug, msg)
# fit right peak
ypeakr = np.polyval(np.polyfit(
x[rightMainBeamLocs], yCorrected[rightMainBeamLocs], 2), x[rightMainBeamLocs])
fitResr, err_peakr = calc_residual(
yCorrected[rightMainBeamLocs], ypeakr)
ymin = min(ypeakr)
msg="left: {:.3f}, min: {:.3f}, right{:.3f}".format(
ypeakr[0], ymin, ypeakr[-1])
msg_wrapper("debug", log.debug, msg)
if(ypeakr[0] <= ymin or ypeakr[-1] <= ymin):
ymin = np.nan
err_peakr = np.nan
else:
flag = 35
msg = "fit entire right beam"
msg_wrapper("debug", log.debug, msg)
ypeakrdata = x[rightMainBeamLocs]
#pl.plot(x, yCorrected)
'''#pl.plot(x[leftlocs],yCorrected[leftlocs])
#pl.plot(x[leftMainBeamLocs], yCorrected[leftMainBeamLocs])
#pl.plot(x[rightlocs],yCorrected[rightlocs])
pl.plot(x[rightMainBeamLocs], yCorrected[rightMainBeamLocs])
#pl.plot(x[leftMainBeamLocs], ypeakl)
pl.plot(x[rightMainBeamLocs], ypeakr)
pl.plot(x, np.zeros(scanLen))
pl.plot(x[baseLocs], yCorrected[baseLocs], ".")
#pl.show()
pl.close()
#sys.exit()'''
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
if ((x[leftMainBeamLocs])[-1] > 0):
flag = 28
msg = "left beam data goes beyond midpoint"
msg_wrapper("warning", log.warning, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
if ((x[rightMainBeamLocs])[-1] < 0):
flag = 29
msg = "right beam data goes beyond midpoint"
msg_wrapper("warning", log.warning, msg)
# return [], [], [], np.nan, [], \
# [], [], np.nan, np.nan, [], \
# [], [], np.nan, np.nan, [],\
# msg, flag, np.nan, np.nan
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
# import matplotlib.pyplot as pl
# pl.plot(x, yCorrected, label="corrected data")
# pl.plot(x[leftlocs], yCorrected[leftlocs], label="peakl data")
# pl.plot(x[leftMainBeamLocs], yCorrected[leftMainBeamLocs])
# pl.plot(x[rightlocs], yCorrected[rightlocs], label="peakr data")
# pl.plot(x[rightMainBeamLocs], yCorrected[rightMainBeamLocs])
# pl.plot(x[leftMainBeamLocs],
# ypeakl, label="peak fitl: {:.3f} +- {:.3f}".format(ymin, err_peakl))
# pl.plot(x[rightMainBeamLocs], ypeakr,
# label="peak fitr: {:.3f} +- {:.3f}".format(ymax, err_peakr))
# pl.plot(x[baseLocs], yCorrected[baseLocs], ".", label="baselocs")
# pl.plot(x, np.zeros(scanLen))
# pl.legend(loc="best")
# pl.show()
# try:
# pl.savefig(saveFolder+"fitted.png")
# except:
# pass
# pl.close()
# sys.exit()
msg_wrapper("info", log.info, "\n")
msg_wrapper("info", log.info, "-"*30)
msg_wrapper("info", log.info, "Fit the peaks.")
msg_wrapper("info", log.info,"-"*30)
msg="\npeak left: {:.3f} +- {:.3f} K\npeak right: {:.3f} +- {:.3f} K\n".format(
ymin, err_peakl, ymax, err_peakr)
msg_wrapper("info", log.info, msg)
# find final peak loc
ploca = np.where(ypeakl == ymin)[0]
if len(ploca)==0:
peakLoca=np.nan
else:
peakLoca = (x[leftMainBeamLocs])[ploca[0]]
# find final peak loc
plocb = np.where(ypeakr == ymax)[0]
if len(plocb)==0:
peakLocb=np.nan
else:
peakLocb = (x[rightMainBeamLocs])[plocb[0]]
return {"correctedData":yCorrected,"driftRes":driftRes,"driftRms":driftRms,
"driftCoeffs":driftCoeffs, "baseLocsCombined":baseLocs,
"baseLocsLeft":leftMainBeamLocs,"baseLocsRight":rightMainBeamLocs,
"leftPeakData":ypeakldata,"leftPeakModelData":ypeakl,
"leftPeakFit":ymax, "leftPeakFitErr":err_peakl,"leftPeakFitRes":fitResl,
"rightPeakData":ypeakrdata,"rightPeakModelData":ypeakr,
"rightPeakFit":ymin, "rightPeakFitErr":err_peakr,"rightPeakFitRes":fitResr,
"msg":"","midXValueLeft":peakLoca,"midXValueRight":peakLocb,
"flag":flag
}
# return yCorrected, driftRes, driftRms, driftCoeffs[0], baseLocs, \
# ypeakldata, ypeakl, ymin, err_peakl, fitResl, \
# ypeakrdata, ypeakr, ymax, err_peakr, fitResr,\
# msg, flag, peakLoca, peakLocb
[docs]
def get_base(localMinPositions, block_width, scan_len):
""" get the baseline block/s from data with large sidelobes. """
base=[] # entire basline block points
localMinPositions=sorted(localMinPositions)
basel=[] # left baseline blocks
baser=[] # right baseline blocks
lp=[]
rp=[]
basell=[]
baserr=[]
ind=4
for i in range(len(localMinPositions)):
#print(localMinPositions[i], block_width)
if localMinPositions[i]<=block_width:
# get everything before and after
# print("0",localMinPositions[i], block_width+localMinPositions[i])
base=base+list(np.arange(0,block_width+localMinPositions[i]+1))
if localMinPositions[i] <= scan_len/2:
basel = basel+list(np.arange(0, block_width+localMinPositions[i]+1))
lp=lp+[localMinPositions[i]]
#print("1- ",localMinPositions[i])
basell.append(0)
basell.append(block_width+localMinPositions[i]+1)
elif localMinPositions[i] > scan_len/2:
baser = baser + \
list(np.arange(0, block_width+localMinPositions[i]+1))
rp=rp+[localMinPositions[i]]
#print("2+ ", localMinPositions[i])
baserr.append(0)
baserr.append(block_width+localMinPositions[i]+1)
elif localMinPositions[i]>=scan_len-block_width:
# get everything before and after
#print(localMinPositions[i]-block_width, localMinPositions[i], scan_len)
base=base+list(np.arange(localMinPositions[i]-block_width,scan_len))
if localMinPositions[i] <= scan_len/2:
basel = basel+list(
np.arange(localMinPositions[i]-block_width, scan_len))
lp=lp+[localMinPositions[i]]
#print("3- ", localMinPositions[i])
basell.append(localMinPositions[i]-block_width)
basell.append(scan_len)
elif localMinPositions[i] > scan_len/2:
baser = baser+list(
np.arange(localMinPositions[i]-block_width, scan_len-1))
baserr.append(localMinPositions[i]-block_width)
baserr.append(scan_len-1)
rp = rp+[localMinPositions[i]]
#print("4+ ", localMinPositions[i])
else:
#print(localMinPositions[i]-block_width, localMinPositions[i], block_width+localMinPositions[i])
base=base+list(np.arange(localMinPositions[i]-block_width,block_width+localMinPositions[i]))
if localMinPositions[i] <= scan_len/2:
#print("5- ", localMinPositions[i])
basel=basel+list(
np.arange(localMinPositions[i]-block_width, block_width+localMinPositions[i]))
#print("6- ",lp,localMinPositions[i])
lp = lp+[localMinPositions[i]]
basell.append(localMinPositions[i]-block_width)
basell.append(block_width+localMinPositions[i])
elif localMinPositions[i] > scan_len/2:
baser= baser+list(
np.arange(localMinPositions[i]-block_width, block_width+localMinPositions[i]))
rp = rp+[localMinPositions[i]]
end = block_width+localMinPositions[i]
baserr.append(localMinPositions[i]-block_width)
if end >=scan_len:
baserr.append(scan_len)
return base, basel,baser,basell,baserr,lp,rp