#!/usr/bin/env python # python3 status: compatible # coding=utf-8 __author__ = "Joshua Zosky" # Modified a bit by gianfranco """ Copyright 2015 Joshua Zosky joshua.e.zosky@gmail.com This file is part of "RetroTS". "RetroTS" is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. "RetroTS" is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with "RetroTS". If not, see . """ import gzip import json from numpy import zeros, size, savetxt, column_stack, shape, array from lib_RetroTS.PeakFinder import peak_finder from lib_RetroTS.PhaseEstimator import phase_estimator from lib_RetroTS.RVT_from_PeakFinder import rvt_from_peakfinder from lib_RetroTS.Show_RVT_Peak import show_rvt_peak def setup_exceptionhook(): """ Overloads default sys.excepthook with our exceptionhook handler. If interactive, our exceptionhook handler will invoke pdb.post_mortem; if not interactive, then invokes default handler. """ def _pdb_excepthook(type, value, tb): if sys.stdin.isatty() and sys.stdout.isatty() and sys.stderr.isatty(): import traceback import pdb traceback.print_exception(type, value, tb) # print() pdb.post_mortem(tb) else: print("We cannot setup exception hook since not in interactive mode") sys.excepthook = _pdb_excepthook def retro_ts( respiration_file=None, cardiac_file=None, phys_fs=None, number_of_slices=None, volume_tr=None, prefix="Output_File_Name", slice_offset=0, slice_major=1, rvt_shifts=list(range(0, 21, 5)), respiration_cutoff_frequency=3, cardiac_cutoff_frequency=3, interpolation_style="linear", fir_order=40, quiet=1, demo=0, rvt_out=1, cardiac_out=1, respiration_out=1, slice_order="alt+z", show_graphs=0, zero_phase_offset=0, legacy_transform=0, phys_file=None, phys_json=None, ): """ :param respiration_file: :param cardiac_file: :param phys_fs: :param number_of_slices: :param volume_tr: :param prefix: :param slice_offset: :param slice_major: :param rvt_shifts: :param respiration_cutoff_frequency: :param cardiac_cutoff_frequency: :param interpolation_style: kind : str or int, optional Specifies the kind of interpolation as a string: "linear", "nearest", "zero", 'slinear', "quadratic", "cubic", Where 'slinear', "quadratic" and "cubic" refer to a spline interpolation of first, second or third order Or as an integer specifying the order of the spline interpolator to use. Default is "linear". :param fir_order: :param quiet: :param demo: :param rvt_out: :param cardiac_out: :param respiration_out: :param slice_order: :param show_graphs: :param legacy_transform: Important-this will specify whether you use the original Matlab code's version or the potentially bug-corrected version for the final phase correction in lib_RetroTS/RVT_from_PeakFinder.py :param phys_file: IDS formatted physio file in tab separated format. May be gzipped. :param phys_json: BIDS formatted physio metadata json file. If not specified the json corresponding to the phys_file will be loaded. :return: """ if not slice_offset: slice_offset = zeros((1, number_of_slices)) main_info = { "respiration_file": respiration_file, "cardiac_file": cardiac_file, "phys_fs": phys_fs, "number_of_slices": number_of_slices, "volume_tr": volume_tr, "prefix": prefix, "slice_offset": slice_offset, "slice_major": slice_major, "rvt_shifts": rvt_shifts, "respiration_cutoff_frequency": respiration_cutoff_frequency, "cardiac_cutoff_frequency": cardiac_cutoff_frequency, "interpolation_style": interpolation_style, # replacement for 'ResamKernel' variable name "fir_order": fir_order, "quiet": quiet, "demo": demo, "rvt_out": rvt_out, "cardiac_out": cardiac_out, "respiration_out": respiration_out, "slice_order": slice_order, "show_graphs": show_graphs, "zero_phase_offset": zero_phase_offset, "legacy_transform": legacy_transform, "phys_file": phys_file, "phsio_json": phys_json, } # Determining main_info['slice_offset'] based upon main_info['slice_order'], main_info['volume_tr'], # and main_info['number_of_slices']. tt = 0.0 # Default float value to start iterations dtt = float(main_info["volume_tr"]) / float( main_info["number_of_slices"] ) # Increments for iteration # init slice_offsets, unless Custom order # (noted by Jogi Ho on board 27 Dec 2017 [rickr]) if ( (main_info["slice_order"] not in ["Custom", "custom"]) or len(main_info["slice_offset"]) != main_info["number_of_slices"] ): main_info["slice_offset"] = [0] * main_info[ "number_of_slices" ] # Initial value for main_info['slice_offset'] slice_file_list = ( [] ) # List for using external file for main_info['slice_offset'] values/ # Indicates if using external file in last loop if main_info["slice_order"][0:3] == "alt": # Alternating? for i in range(0, main_info["number_of_slices"], 2): main_info["slice_offset"][i] = tt tt += dtt for i in range(1, main_info["number_of_slices"], 2): main_info["slice_offset"][i] = tt tt += dtt elif main_info["slice_order"][0:3] == "seq": # Sequential? for i in range(0, main_info["number_of_slices"]): main_info["slice_offset"][i] = tt tt += dtt elif main_info["slice_order"] in ["Custom", "custom"] \ and type(slice_offset) == str: # If slice_order is custom, parse from slice_offset string. # Allow simple or pythonic array form. 1 Dec 2020 [rickr] try: offlist = eval(slice_offset) noff = len(offlist) except: try: offlist = [float(v) for v in slice_offset.split()] except: print("** failed to apply custom slice timing from: %s" \ % slice_offset) return if len(offlist) != main_info["number_of_slices"]: print("** error: slice_offset len = %d, but %d slices" \ % (len(offlist), main_info["number_of_slices"])) return # success, report and apply print("applying custom slice timing, min = %g, max = %g" \ % (min(offlist), max(offlist))) slice_offset = offlist main_info["slice_offset"] = offlist else: # Open external file specified in argument line, # fill SliceFileList with values, then load into main_info['slice_offset'] with open(main_info["slice_order"], "r") as f: for i in f.readlines(): # read times, in seconds slice_file_list.append(float(i)) # Check that slice acquisition times match the number of slices if len(slice_file_list) != main_info["number_of_slices"]: print("Could not read enough slice offsets from file") print("File should have as many offsets as number_of_slices") quit() main_info["slice_offset"] = slice_file_list if ( main_info["slice_order"][3] == "-" and slice_file_list == [] ): # Check for a minus to indicate # a reversed offset list main_info["slice_offset"].reverse() if ( main_info["quiet"] != 1 ): # Show the slice timing (P.S. Printing is very time consuming in python) print("Slice timing: %s" % main_info["slice_offset"]) # Create information copy for each type of signal respiration_info = dict(main_info) respiration_info["frequency_cutoff"] = main_info["respiration_cutoff_frequency"] # Amplitude-based phase for respiration respiration_info["amp_phase"] = 1 # respiration_info['as_percover'] = 50 # Percent overlap of windows for fft # respiration_info['as_windwidth'] = 0 # Window width in seconds for fft, 0 for full window # respiration_info['as_fftwin'] = 0 # 1 == hamming window. 0 == no windowing cardiac_info = dict(main_info) cardiac_info["frequency_cutoff"] = main_info["cardiac_cutoff_frequency"] # Time-based phase for cardiac signal cardiac_info["amp_phase"] = 0 # Handle file inputs if (((phys_file is not None) and (respiration_file is not None)) or ((phys_file is not None) and (cardiac_file is not None))): raise ValueError('You should not pass a BIDS style phsyio file' ' and respiration or cardiac files.') # Get the peaks for respiration_info and cardiac_info # init dicts, may need -cardiac_out 0, for example [16 Nov 2021 rickr] cardiac_peak = {} respiration_peak = {} if phys_file: if phys_json is None: phys_json = phys_file.replace(".gz", "").replace(".tsv", ".json") with open(phys_json, 'rt') as h: phys_meta = json.load(h) phys_ending = phys_file.split(".")[-1] if phys_ending == 'gz': opener = gzip.open else: opener = open phys_dat = {k:[] for k in phys_meta['Columns']} with opener(phys_file, 'rt') as h: for pl in h.readlines(): pls = pl.split("\t") for k,v in zip(phys_meta['Columns'], pls): phys_dat[k].append(float(v)) for k in phys_meta['Columns']: phys_dat[k] = array(phys_dat[k]) if k.lower() == 'respiratory': # create peaks only if asked for 25 May 2021 [rickr] if main_info["respiration_out"] or main_info["rvt_out"]: if not main_info['phys_fs']: respiration_info['phys_fs'] = phys_meta['SamplingFrequency'] respiration_peak, error = peak_finder(respiration_info, v=phys_dat[k]) if error: print("Died in respiratory PeakFinder") return else: # user opted out respiration_peak = {} elif k.lower() == 'cardiac': # create peaks only if asked for 25 May 2021 [rickr] if main_info["cardiac_out"] != 0: if not main_info['phys_fs']: cardiac_info['phys_fs'] = phys_meta['SamplingFrequency'] cardiac_peak, error = peak_finder(cardiac_info,v=phys_dat[k]) if error: print("Died in cardiac PeakFinder") return else: # user opted out cardiac_peak = {} else: print("** warning phys data contains column '%s', but\n" \ " RetroTS only handles cardiac or reipiratory data" % k) else: if respiration_file: respiration_peak, error = peak_finder(respiration_info, respiration_file) if error: print("Died in respiratory PeakFinder") return else: respiration_peak = {} if cardiac_file: cardiac_peak, error = peak_finder(cardiac_info, cardiac_file) if error: print("Died in cardiac PeakFinder") return else: cardiac_peak = {} main_info["resp_peak"] = respiration_peak main_info["card_peak"] = cardiac_peak respiration_info.update(respiration_peak) cardiac_info.update(cardiac_peak) # Get the phase if respiration_peak: print("Estimating phase for respiration_info") respiration_phased = phase_estimator( respiration_info["amp_phase"], respiration_info ) else: respiration_phased = {} if cardiac_peak: print("Estimating phase for cardiac_info") print(cardiac_info["v"]) cardiac_phased = phase_estimator(cardiac_info["amp_phase"], cardiac_info) else: cardiac_phased = {} respiration_info.update(respiration_phased) cardiac_info.update(cardiac_phased) if respiration_phased: print("Computing RVT from peaks") print(respiration_info["p_trace_r"]) rvt = rvt_from_peakfinder(respiration_phased) respiration_info.update(rvt) # Show some results if show_graphs: if respiration_info: print("Showing RVT Peaks for R\n") show_rvt_peak(respiration_info, 1) """ # Not sure if this code is necessary, 'if 0' is never run in MATLAB # Show RVT graphs goes here, currently not important though if 0: # Write retroicor regressors for i in range(0, opt['main_info['number_of_slices']']): fname = '%s.RetroCard.slc%02d.1D' % (opt['Prefix'], i) #wryte3(cardiac_phased['phase_slice_reg'][:,:,i], fname, 1); savetxt(fname, cardiac_phased['phase_slice_reg'][:,:,i], fmt="%12.6G") fname = sprintf('%s.RetroResp.slc%02d.1D', Opt.Prefix, i); # wryte3(respiration_info.phase_slice_reg(:,:,i), fname, 1); savetxt(fname, respiration_phased['phase_slice_reg'][:,:,i], fmt="%12.6G") # And write the RVT puppy, plus or minus a few seconds delay fname = '%s.RetroRVT.1D' % opt['Prefix'] # wryte3(respiration_info.rvtrs_slc, fname, 1); savetxt(fname, rvt['rvtrs_slc'], fmt="%12.6G") """ # also generate files as 3dREMLfit likes them n_n = 0 n_r_v = 0 n_r_p = 0 n_e = 0 if "time_series_time" in respiration_info: n_n = len(respiration_info["time_series_time"]) n_r_p = size(respiration_info["phase_slice_reg"], 1) n_r_v = size(respiration_info["rvtrs_slc"], 0) if "time_series_time" in cardiac_info: # must have cardiac_info n_n = len( cardiac_phased["time_series_time"] ) # ok to overwrite len(respiration_info.tst), should be same. n_e = size(cardiac_phased["phase_slice_reg"], 1) if ( main_info["cardiac_out"] == 0 and main_info["respiration_out"] == 0 and main_info["rvt_out"] == 0 ): print( "Options cardiac_out, respiration_out, and RVT_out all 0.\nNo output required.\n" ) return temp_y_axis = main_info["number_of_slices"] * ( (main_info["rvt_out"]) * int(n_r_v) + (main_info["respiration_out"]) * int(n_r_p) + (main_info["cardiac_out"]) * int(n_e) ) main_info["reml_out"] = zeros((n_n, temp_y_axis)) cnt = 0 head = ( " slice_offsets.txt RetroTS.py -c ECG.1D -r Resp.1D \\ -v 2 -p 50 -n 10 -prefix fred \\ -slice_order slice_offsets.txt ============================================================================ :param -zero_phase_offset: ============================================================================ :param legacy_transform: Important-this will specify whether you use the original Matlab code's version (1) or the potentially bug-corrected version (0) for the final phase correction in lib_RetroTS/RVT_from_PeakFinder.py (default is 0) Output: ================================================================================ Files saved to same folder based on selection for "-respiration_out" and "-cardiac_out". If these options are enabled, than the data will be written to a single output file based on the filename assigned to the option "-prefix". Example: C:\\afni\\python RetroTS.py -r resp_file.dat -c card_file.dat -p 50 -n 20 -v 2 -prefix subject12_regressors -respiration_out 1 -cardiac_out 1 Output: The file "subject12_regressors.slibase.1D" will be saved to current directory, including respiratory regressors and cardiac regressors. """, "-r": None, "-c": None, "-p": None, "-n": None, "-v": None, "-prefix": "Output_File_Name", "-slice_offset": 0, "-slice_major": 1, "-rvt_shifts": list(range(0, 21, 5)), "-respiration_cutoff_frequency": 3, "-cardiac_cutoff_frequency": 3, "-interpolation_style": "linear", "-fir_order": 40, "-quiet": 1, "-demo": 0, "-debug": False, "-rvt_out": 1, "-cardiac_out": 1, "-respiration_out": 1, "-slice_order": "alt+z", "-show_graphs": 0, "-zero_phase_offset": 0, "-legacy_transform": 0, "-phys_file":None, "-phys_json":None } if len(sys.argv) < 2: print( "You need to provide parameters. If you need help, rerun the" 'program using the "-help" argument:' '\n"python RetroTS.py -help"' ) quit() else: opts = sys.argv[1:] temp_opt = None for opt in opts: if opt in opt_dict: if opt == "-help": print(opt_dict[opt]) quit() elif opt == "-debug": setup_exceptionhook() elif temp_opt in opt_dict: opt_dict[temp_opt] = opt else: print("No such command '%s', try:" % opt) for key in list(opt_dict.keys()): print("%s" % key) quit() temp_opt = opt if opt_dict["-p"]: opt_dict["-p"] = float(opt_dict["-p"]) # change phys_fs and volume_tr to float 6 Mar 2017 [rickr] retro_ts( respiration_file=opt_dict["-r"], cardiac_file=opt_dict["-c"], phys_fs=opt_dict["-p"], number_of_slices=int(opt_dict["-n"]), volume_tr=float(opt_dict["-v"]), prefix=opt_dict["-prefix"], slice_offset=opt_dict["-slice_offset"], slice_major=opt_dict["-slice_major"], rvt_shifts=opt_dict["-rvt_shifts"], respiration_cutoff_frequency=opt_dict["-respiration_cutoff_frequency"], cardiac_cutoff_frequency=opt_dict["-cardiac_cutoff_frequency"], interpolation_style=opt_dict["-interpolation_style"], fir_order=opt_dict["-fir_order"], quiet=opt_dict["-quiet"], demo=opt_dict["-demo"], rvt_out= (int(opt_dict["-rvt_out"]) ), cardiac_out= (int(opt_dict["-cardiac_out"])), respiration_out= (int(opt_dict["-respiration_out"])), slice_order=opt_dict["-slice_order"], show_graphs=opt_dict["-show_graphs"], zero_phase_offset=opt_dict["-zero_phase_offset"], legacy_transform=opt_dict["-legacy_transform"], phys_file=opt_dict["-phys_file"], phys_json=opt_dict["-phys_json"] )