from __future__ import absolute_import import collections import numpy as np import pp def test(nx=50, ny=50): # NOT WORKING # all wavenumbers are cm-1 wn_max = 100.0 wn_min = 50.0 resolution_min = 300.0 delta_wn = wn_min / resolution_min # spectra are evenly sampled from 0 to wn_max with spacing delta_wn nfreq = np.ceil(wn_max / delta_wn) wn_max = delta_wn * nfreq freqs = np.arange(nfreq) freqs *= delta_wn # construct a dummy 'sky' # numpy default indexing is C-style so that the rightmost index # 'changes the fastest'. sky_s = np.zeros([nfreq, ny, nx], np.float) sky_s[:,ny/2,nx/2] = 1.0 # x coords in radians arcsec2rad = 1.0 / 206265 sky_x = (np.arange(nx) - nx/2) * 1.0 * arcsec2rad sky_y = (np.arange(ny) - ny/2) * 1.0 * arcsec2rad # dummy baselines (each baseline entry is [u,v] in centimetres) baselines = [] for i in range(500): baselines.append([20000.0, 20000.0]) df = DoubleFourier(sky_s=sky_s, freqs=freqs, sky_x=sky_x, sky_y=sky_y, baselines=baselines) df.matlab_transform() class DoubleFourier(object): """Class to compute interferograms. """ def __init__(self, parameters, previous_results): self.parameters = parameters self.previous_results = previous_results # paralle processing stuff ppservers = () self.job_server = pp.Server(ppservers=ppservers) print 'DoubleFourier starting pp with %s workers' % \ self.job_server.get_ncpus() self.result = collections.OrderedDict() def run(self): print 'DoubleFourier.run' fts = self.previous_results['fts'] frequency_axis = fts['fts_wn'] opd_max = fts['opd_max'] fts_nsample = fts['ftsnsample'] vdrive = fts['vdrive'] delta_opd = fts['delta_opd'] times = np.arange(int(fts_nsample), dtype=np.float) times *= (opd_max / vdrive) / float(fts_nsample-1) beamsgenerator = self.previous_results['beamsgenerator'] uvmapgenerator = self.previous_results['uvmapgenerator'] bxby = uvmapgenerator['bxby'] skygenerator = self.previous_results['skygenerator'] skymodel = skygenerator['sky model'] spatial_axis = self.result['spatial axis'] = skygenerator['spatial axis'] self.result['frequency axis'] = frequency_axis # assuming nx is even then transform has 0 freq at origin and [nx/2] is # Nyquist frequency. Nyq freq = 0.5 * Nyquist sampling freq. # Assume further that the fft is shifted so that 0 freq is at nx/2 nx = len(spatial_axis) spatial_freq_axis = np.arange(-nx/2, nx/2, dtype=np.float) sample_freq = (180.0 * 3600.0 / np.pi) / (spatial_axis[1] - spatial_axis[0]) spatial_freq_axis *= (sample_freq / nx) self.result['spatial frequency axis'] = spatial_freq_axis self.result['baseline interferograms'] = collections.OrderedDict() # for baseline in bxby: for baseline in bxby[:3]: print baseline measurement = np.zeros(np.shape(times)) # FTS path diff and possibly baseline itself vary with time for tindex,t in enumerate(times): # for tindex,t in enumerate(times[:1]): sky_now = skymodel # what is the system looking at? # add 'errors' in order # 1. baseline should be perp to centre of field. # If baseline is tilted then origin of sky map shifts. # (I think effect could be corrected by changing # FTS sample position to compensate.?) # for now assume 0 error but do full calculation for timing # purposes baseline_dx = 0.0 baseline_dy = 0.0 # calculate xpos, ypos in units of pixel - numpy arrays # [row,col] nx = len(spatial_axis) colpos = float(nx-1) * float (baseline_dx - spatial_axis[0]) / \ (spatial_axis[-1] - spatial_axis[0]) rowpos = float(nx-1) * float (baseline_dy - spatial_axis[0]) / \ (spatial_axis[-1] - spatial_axis[0]) if colpos < 0 or colpos > (nx-1) or rowpos < 0 or rowpos > \ (nx-1): raise Exception, \ 'baseline centre outside limits of sky model' # calculate fourier phase shift to move point at [0,0] to # [rowpos, colpos] shiftx = np.zeros([nx], np.complex) shiftx[:nx/2] = np.arange(nx/2, dtype=np.complex) shiftx[nx/2:] = np.arange(-nx/2, 0, dtype=np.complex) shiftx = np.exp((-2.0j * np.pi * colpos * shiftx) / float(nx)) shifty = np.zeros([nx], np.complex) shifty[:nx/2] = np.arange(nx/2, dtype=np.complex) shifty[nx/2:] = np.arange(-nx/2, 0, dtype=np.complex) shifty = np.exp((-2.0j * np.pi * rowpos * shifty) / float(nx)) shift = np.ones([nx,nx], np.complex) for j in range(nx): shift[j,:] *= shiftx for i in range(nx): shift[:,i] *= shifty # go through freq planes and shift them for iwn,wn in enumerate(frequency_axis): # move centre of image to array origin temp = np.fft.fftshift(sky_now[:,:,iwn]) # 2d fft temp = np.fft.fft2(temp) # apply phase shift temp *= shift # transform and shift back temp = np.fft.ifft2(temp) temp = np.fft.fftshift(temp) # imag part should still be 0 # print 'max real', np.max(np.real(temp)) # print 'max imag', np.max(np.imag(temp)) temp = np.abs(temp) sky_now[:,:,iwn] = np.fft.fftshift(temp) if t == times[0]: self.result['sky at time 0'] = sky_now # 2. telescopes should be centred on centre of field # Telescopes collect flux from the 'sky' and pass # it to the FTS beam combiner. In doing this each # telescope multiplies the sky emission by its # amplitude beam response - always real but with # negative areas. Is this correct? Gives right # answer for 'no error' case. # multiply sky by amplitude beam 1 * amplitude beam 2 for iwn,wn in enumerate(frequency_axis): # calculate shifted beams here # for now assume no errors and just use beam # calculated earlier amplitude_beam = beamsgenerator['primary amplitude beam']\ [wn] amp_beam_1 = amplitude_beam.data amp_beam_2 = amplitude_beam.data sky_now[:,:,iwn] *= amp_beam_1 * amp_beam_2 if t == times[0]: self.result['sky*beams at time 0'] = sky_now # 3. baseline error revisited # derive baseline at this time baseline_error = 0.0 baseline_now = baseline + baseline_error # print 'baseline_now', baseline_now # gamma_12[(baseline,t)] = 0.0 fft_now = np.zeros(np.shape(sky_now), np.complex) spectrum = np.zeros(np.shape(frequency_axis), np.complex) for iwn,wn in enumerate(frequency_axis): # derive shift needed to place baseline at one of FFT coords # this depends on physical baseline and frequency baseline_now_lambdas = baseline_now * wn * 100.0 # print 'baseline lambda', wn, baseline_now_lambdas # calculate baseline position in units of pixels of FFTed sky - # numpy arrays [row,col] colpos = float(nx-1) * float(baseline_now_lambdas[0] - spatial_freq_axis[0]) / \ (spatial_freq_axis[-1] - spatial_freq_axis[0]) rowpos = float(nx-1) * float(baseline_now_lambdas[1] - spatial_freq_axis[0]) / \ (spatial_freq_axis[-1] - spatial_freq_axis[0]) # print 'spatial colpos, rowpos', colpos, rowpos # calculate fourier phase shift to move point at [rowpos,colpos] to # [0,0] shiftx = np.zeros([nx], np.complex) shiftx[:nx/2] = np.arange(nx/2, dtype=np.complex) shiftx[nx/2:] = np.arange(-nx/2, 0, dtype=np.complex) shiftx = np.exp((-2.0j * np.pi * colpos * shiftx) / float(nx)) shifty = np.zeros([nx], np.complex) shifty[:nx/2] = np.arange(nx/2, dtype=np.complex) shifty[nx/2:] = np.arange(-nx/2, 0, dtype=np.complex) shifty = np.exp((-2.0j * np.pi * rowpos * shifty) / float(nx)) shift = np.ones([nx,nx], np.complex) for j in range(nx): shift[j,:] *= shiftx for i in range(nx): shift[:,i] *= shifty # move centre of sky image to origin temp = np.fft.fftshift(sky_now[:,:,iwn]) # apply phase shift temp *= shift # 2d fft temp = np.fft.fft2(temp) # shift 0 freq to centre of array fft_now[:,:,iwn] = np.fft.ifftshift(temp) spectrum[iwn] = temp[0,0] if t == times[0]: self.result['skyfft at time 0'] = fft_now # 3. FTS sampling should be accurate # derive lag due to FTS path difference # 0 error for now mirror_error = 0.0 opd = 2.0 * (vdrive * t + mirror_error) print 'opd', opd, vdrive, t, mirror_error opd_ipos = opd / delta_opd # make spectrum symmetric nfreq = len(frequency_axis) symmetric_spectrum = np.zeros([2*nfreq]) symmetric_spectrum[:nfreq] = spectrum symmetric_spectrum[nfreq:] = spectrum[::-1] # calculate shift needed to move point at opd to 0 shift = np.zeros([2*nfreq], dtype=np.complex) shift[:nfreq] = np.arange(nfreq, dtype=np.complex) shift[nfreq:] = np.arange(-nfreq, 0, dtype=np.complex) shift = np.exp((-2.0j * np.pi * opd_ipos * shift) / float(nfreq)) # apply phase shift and fft symmetric_spectrum *= shift spectrum_fft = np.fft.fft(symmetric_spectrum) measurement[tindex] = spectrum_fft[0] self.result['baseline interferograms'][tuple(baseline)] = measurement return self.result def matlab_transform(self): # readers should look at Izumi et al. 2006, Applied Optics, 45, 2576 # for theoretical background. Names of variables in the code correspond # to that work. # For now, assume 2 light collectors giving one baseline at a time. interferograms = {} for baseline in self.baselines: interferogram = 0 # baseline length (cm) and position angle bu = baseline[0] bv = baseline[1] mod_b = np.sqrt(pow(bu,2) + pow(bv,2)) ang_b = np.arctan2(bv, bu) # loop over sky pixels covered by primary beam nx = self.sky_s.shape[1] ny = self.sky_s.shape[2] for j in range(ny): for i in range(nx): # inverse fft of emission spectrum at this point temp = np.fft.ifft(self.sky_s[:,j,i]) # move 0 frequency to centre of array temp = np.fft.fftshift(temp) # length (radians) and position angle of theta vector mod_theta = np.sqrt(pow(self.sky_x[i],2) + pow(self.sky_y[j],2)) ang_theta = np.arctan2(self.sky_y[j], self.sky_x[i]) # calculate b.theta (the projection of b on theta) # and the corresponding delay in units of wavelength at # Nyquist frequency delay = mod_theta * mod_b * np.cos(ang_b - ang_theta) * self.freqs[-1] # sampling is done at twice Nyquist freq so shift transformed # spectrum by 2 * delay samples (approximated to nint) # NOTE factor of 2 discrepency with matlab version! I think # this is because there the variable 'Nyq' is the Nyquist # sampling rate, not the Nyquist frequency. temp = np.roll(temp, int(round(2.0 * delay))) # want only the real part of the result interferogram += np.real(temp) def __repr__(self): return 'DoubleFourier'