__author__ = "Joshua Zosky" """ Copyright 2015 Joshua Zosky joshua.e.zosky@gmail.com This file is part of "RetroTS". "RetroTS" is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. "RetroTS" is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with "RetroTS". If not, see . """ from numpy import zeros, ones, nonzero, pi, argmin, sin, cos from numpy import size, arange, clip, histogram, r_, Inf, divide, append, delete, array from .zscale import z_scale import matplotlib.pyplot as plt def my_hist(x, bin_centers): """ This frivolous yet convenient conversion from bin-edges to bin-centers is from Stack Overflow user Bas Swinckels http://stackoverflow.com/questions/18065951/why-does-numpy-histogram-python-leave-off-one-element-as-compared-to-hist-in-m :param x:dataset :param bin_centers:bin values in a list to be moved from edges to centers :return: counts = the data in bin centers ready for pyplot.bar """ bin_edges = r_[-Inf, 0.5 * (bin_centers[:-1] + bin_centers[1:]), Inf] counts, edges = histogram(x, bin_edges) return counts def phase_base(amp_type, phasee): """ :type phasee: object :param amp_type: if 0, it is a time-based phase estimation if 1, it is an amplitude-based phase estimation :param phasee: phasee information :return: """ if amp_type == 0: # Calculate the phase of the trace, with the peak to be the start of the phase nptrc = len(phasee["tp_trace"]) phasee["phase"] = -2 * ones(size(phasee["t"])) i = 0 j = 0 while i <= (nptrc - 2): while phasee["t"][j] < phasee["tp_trace"][i + 1]: if phasee["t"][j] >= phasee["tp_trace"][i]: # Note: Using a constant 244 period for each interval # causes slope discontinuity within a period. # One should resample period[i] so that it is # estimated at each time in phasee['t'][j], # dunno if that makes much of a difference in the end however. if j == 10975: pass phasee["phase"][j] = ( phasee["t"][j] - phasee["tp_trace"][i] ) / phasee["prd"][i] + phasee["zero_phase_offset"] if phasee["phase"][j] < 0: phasee["phase"][j] = -phasee["phase"][j] if phasee["phase"][j] > 1: phasee["phase"][j] -= 1 j += 1 if i == 124: pass i += 1 # Remove the points flagged as unset temp = nonzero(phasee["phase"] < -1) phasee["phase"][temp] = 0.0 # Change phase to radians phasee["phase"] = phasee["phase"] * 2 * pi else: # phase based on amplitude # at first scale to the max mxamp = max(phasee["p_trace"]) phasee["phase_pol"] = [] gR = z_scale(phasee["v"], 0, mxamp) # Scale, per Glover 2000's paper bins = arange(0.01, 1.01, 0.01) * mxamp hb_value = my_hist(gR, bins) # hb_value = histogram(gR, bins) if phasee["show_graphs"] == 1: center = (bins[:-1] + bins[1:]) / 2 plt.bar(center, hb_value[: len(hb_value) - 1]) # , align='center') plt.show() # find the polarity of each time point in v i = 0 itp = 0 inp = 0 tp = phasee["tp_trace"][0] tn = phasee["tn_trace"][0] while ( (i <= len(phasee["v"])) and (phasee["t"][i] < tp) and (phasee["t"][i] < tn) ): phasee["phase_pol"].append(0) i += 1 if tp < tn: # Expiring phase (peak behind us) cpol = -1 itp = 1 else: # Inspiring phase (bottom behind us) cpol = 1 inp = 1 phasee["phase_pol"] = zeros( size(phasee["v"]) ) # Not sure why you would replace the # list that you created 10 lines prior to this # Add a fake point to tptrace and tntrace to avoid ugly if statements phasee["tp_trace"] = append(phasee["tp_trace"], phasee["t"][-1]) phasee["tn_trace"] = append(phasee["tn_trace"], phasee["t"][-1]) while i < len(phasee["v"]): phasee["phase_pol"][i] = cpol if phasee["t"][i] == phasee["tp_trace"][itp]: cpol = -1 itp = min((itp + 1), (len(phasee["tp_trace"]) - 1)) elif phasee["t"][i] == phasee["tn_trace"][inp]: cpol = 1 inp = min((inp + 1), (len(phasee["tn_trace"]) - 1)) # cpol, inp, itp, i, R i += 1 phasee["tp_trace"] = delete(phasee["tp_trace"], -1) phasee["tn_trace"] = delete(phasee["tn_trace"], -1) if phasee["show_graphs"] == 1: # clf plt.plot(phasee["t"], gR, "b") ipositive = nonzero(phasee["phase_pol"] > 0) ipositive = ipositive[0] ipositive_x = [] for i in ipositive: ipositive_x.append(phasee["t"][i]) ipositive_y = zeros(size(ipositive_x)) ipositive_y.fill(0.55 * mxamp) plt.plot(ipositive_x, ipositive_y, "r.") inegative = nonzero(phasee["phase_pol"] < 0) inegative = inegative[0] inegative_x = [] for i in inegative: inegative_x.append(phasee["t"][i]) inegative_y = zeros(size(inegative_x)) inegative_y.fill(0.45 * mxamp) plt.plot(inegative_x, inegative_y, "g.") plt.show() # Now that we have the polarity, without computing sign(dR/dt) # as in Glover et al 2000, calculate the phase per eq. 3 of that paper # First the sum in the numerator for i, val in enumerate(gR): gR[i] = round(val / mxamp * 100) + 1 gR = clip(gR, 0, 99) shb = sum(hb_value) hbsum = [] hbsum.append(float(hb_value[0]) / shb) for i in range(1, 100): hbsum.append(hbsum[i - 1] + (float(hb_value[i]) / shb)) for i in range(len(phasee["t"])): phasee["phase"].append(pi * hbsum[int(gR[i]) - 1] * phasee["phase_pol"][i]) phasee["phase"] = array(phasee["phase"]) # Time series time vector phasee["time_series_time"] = arange( 0, (max(phasee["t"]) - 0.5 * phasee["volume_tr"]), phasee["volume_tr"] ) # Python uses half open ranges, so we need to catch the case when the stop # is evenly divisible by the step and add one more to the time series in # order to match Matlab, which uses closed ranges 1 Jun 2017 [D Nielson] if (max(phasee["t"]) - 0.5 * phasee["volume_tr"]) % phasee["volume_tr"] == 0: phasee["time_series_time"] = append( phasee["time_series_time"], [phasee["time_series_time"][-1] + phasee["volume_tr"]], ) phasee["phase_slice"] = zeros( (len(phasee["time_series_time"]), phasee["number_of_slices"]) ) phasee["phase_slice_reg"] = zeros( (len(phasee["time_series_time"]), 4, phasee["number_of_slices"]) ) for i_slice in range(phasee["number_of_slices"]): tslc = phasee["time_series_time"] + phasee["slice_offset"][i_slice] for i in range(len(phasee["time_series_time"])): imin = argmin(abs(tslc[i] - phasee["t"])) # mi = abs(tslc[i] - phasee['t']) # probably not needed phasee["phase_slice"][i, i_slice] = phasee["phase"][imin] # Make regressors for each slice phasee["phase_slice_reg"][:, 0, i_slice] = sin( phasee["phase_slice"][:, i_slice] ) phasee["phase_slice_reg"][:, 1, i_slice] = cos( phasee["phase_slice"][:, i_slice] ) phasee["phase_slice_reg"][:, 2, i_slice] = sin( 2 * phasee["phase_slice"][:, i_slice] ) phasee["phase_slice_reg"][:, 3, i_slice] = cos( 2 * phasee["phase_slice"][:, i_slice] ) if phasee["quiet"] == 0 and phasee["show_graphs"] == 1: print("--> Calculated phase") plt.subplot(413) a = divide(divide(phasee["phase"], 2), pi) plt.plot(phasee["t"], divide(divide(phasee["phase"], 2), pi), "m") if "phase_r" in phasee: plt.plot(phasee["tR"], divide(divide(phasee["phase_r"], 2), pi), "m-.") plt.subplot(414) plt.plot( phasee["time_series_time"], phasee["phase_slice"][:, 1], "ro", phasee["time_series_time"], phasee["phase_slice"][:, 2], "bo", phasee["time_series_time"], phasee["phase_slice"][:, 2], "b-", ) plt.plot(phasee["t"], phasee["phase"], "k") # grid on # title it plt.title(phasee["v_name"]) plt.show() # Need to implement this yet # if phasee['Demo']: # uiwait(msgbox('Press button to resume', 'Pausing', 'modal')) return phasee def phase_estimator(amp_phase, phase_info): """ v_name='', amp_phase=0, t=[], x=[], iz=[], # zero crossing (peak) locations p_trace=[], tp_trace=[], n_trace=[], tn_trace=[], prd=[], t_mid_prd=[], p_trace_mid_prd=[], phase=[], rv=[], rvt=[], var_vector=[], phys_fs=(1 / 0.025), zero_phase_offset=0.5, quiet=0, resample_fs=(1 / 0.025), f_cutoff=10, fir_order=80, resample_kernel='linear', demo=0, as_window_width=0, as_percover=0, as_fftwin=0, sep_dups=0, phasee_list=0, show_graphs=0 """ """ Example: PhaseEstimator.phase_estimator(amp_phase, info_dictionary) or PhaseEstimator.phase_estimator(v) where v is a list if v is a matrix, each column is processed separately. :param var_vector: column vector--list of list(s) :param phys_fs: Sampling frequency :param zero_phase_offset: Fraction of the period that corresponds to a phase of 0 0.5 means the middle of the period, 0 means the 1st peak :param quiet: :param resample_fs: :param frequency_cutoff: :param fir_order: BC ??? :param resample_kernel: :param demo: :param as_window_width: :param as_percover: :param fftwin: :param sep_dups: :return: *_phased: phase estimation of input signal """ phasee = dict( v_name="", t=[], x=[], iz=[], # zero crossing (peak) locations volume_tr=2, p_trace=[], tp_trace=[], n_trace=[], tn_trace=[], prd=[], t_mid_prd=[], p_trace_mid_prd=[], phase=[], rv=[], rvt=[], var_vector=[], phys_fs=(1 / 0.025), zero_phase_offset=0.5, quiet=0, resample_fs=(1 / 0.025), frequency_cutoff=10, fir_order=80, resample_kernel="linear", demo=0, as_window_width=0, as_percover=0, as_fftwin=0, sep_dups=0, phasee_list=0, show_graphs=0, number_of_slices=0, ) phasee.update(phase_info) if isinstance(phasee["phasee_list"], type([])): return_phase_list = [] for phasee_column in phasee["phasee_list"]: return_phase.append(phase_base(amp_phase, phasee_column)) return return_phase_list else: return_phase = phase_base(amp_phase, phasee) return return_phase