#!/usr/bin/env python # system libraries import sys import gc, math # AFNI libraries import option_list as OL import lib_timing as LT import afni_util as UTIL import lib_textdata as TD # ---------------------------------------------------------------------- # globals g_help_string = """ ============================================================================= timing_tool.py - for manipulating and evaluating stimulus timing files (-stim_times format: where each row is a separate run) This program is meant to work with ascii files containing rows of floats ('*' characters are ignored). This is the format used by 3dDeconvolve with the -stim_times option. Some timing files do not need evaluation, such as those where the timing is very consistent. However, it may be important to examine those files from a random timing design. Recall that an ISI (inter-stimulus interval) is the interval of time between the end of one stimulus and start of the next. The basic program operations include: o reporting ISI statistics, such as min/mean/max values per run o reporting overall ISI statistics for a set of timing files o converting stim_times format to stim_file format o adding a constant offset to time o combining multiple timing files into 1 (like '1dcat' + sort) o appending additional timing runs (like 'cat') o sort times per row (though 3dDeconvolve does not require this) o converting between local and global stim times A sample stimulus timing file having 3 runs with 4 stimuli per run might look something like the following. Note that the file does not imply the durations of the stimuli, except that stimuli are generally not allowed to overlap. 17.3 24.0 66.0 71.6 11.0 30.6 49.2 68.5 19.4 28.7 53.8 69.4 The program works on either a single timing element (which can be modified), or a list of them (which cannot be modified). The only real use of a list of timing elements is to show statistics (via -multi_show_isi_stats). -------------------------------------------------------------------------- examples: 0. Basic commands: timing_tool.py -help timing_tool.py -hist timing_tool.py -show_valid_opts timing_tool.py -ver 1. Combine the timing of 2 files (extend one timing by another and sort). Write to a new timing file. timing_tool.py -timing stimesB_01_houses.1D \\ -extend stimesB_02_faces.1D \\ -sort \\ -write_timing stimesB_extended.1D 2. Subtract 12 seconds from each stimulus time (to offset TRs dropped prior to the magnetization steady state). timing_tool.py -timing stimesB_01_houses.1D \\ -add_offset -12.0 \\ -write_timing stimesB1_offset12.1D 2b.Similar to 2, but scale times (multiply) by 0.975, perhaps to account for a different TR or stimulus duration. timing_tool.py -timing stimesB_01_houses.1D \\ -scale_data 0.975 \\ -write_timing stimesB1_scaled.1D 2c.Similar to 2, but shift run times so that the first time occurs at the beginning of the run. timing_tool.py -timing stimesB_01_houses.1D \\ -shift_to_run_offset 0 \\ -write_timing stimesB1_offset0.1D 3. Show timing statistics for the 3 timing files generated by example 3 from "make_random_timing -help". To be accurate, specify the run and stimulus durations. timing_tool.py -multi_timing stimesC_*.1D \\ -run_len 200 -multi_stim_dur 3.5 \\ -multi_show_isi_stats 4. Show timing statistics for the timing files generated by example 6 from "make_random_timing -help". Since both the run and stimulus durations vary, 4 run lengths and 3 stimulus durations are given. timing_tool.py -multi_timing stimesF_*.1D \\ -run_len 200 190 185 225 \\ -multi_stim_dur 3.5 4.5 3 \\ -multi_show_isi_stats 5. Partition the stimulus timing file 'response_times.1D' into multiple timing files based on the labels in a partition file, partitions.1D. If partitions.txt contains (0, correct, incorrect), there will be 2 output timing files, new_times_correct.1D and new_times_incorrect.1D. Times where the partition label is '0' will be skipped. timing_tool.py -timing response_times.1D \\ -partition partitions.txt new_times 6a.Convert a stim_times timing file to 0/1 stim_file format. Suppose the timing is random where each event lasts 2.5 seconds and runs are of lengths 360, 360 and 400 seconds. Convert timing.txt to sfile.1D on a TR grid of 0.5 seconds (oversampling), where a TR gets an event if at least 30% of the TR is is occupied by stimulus. timing_tool.py -timing timing.txt -timing_to_1D sfile.1D \\ -tr 0.5 -stim_dur 2.5 -min_frac 0.3 \\ -run_len 360 360 400 6b.Evaluate the results. Use waver to convolve sfile.1D with GAM and use 3dDeconvolve to convolve the timing file with BLOCK(2.5). waver -GAM -TR 0.5 -peak 1 -input sfile.1D > waver.1D 3dDeconvolve -nodata 2240 0.5 -concat '1D: 0 720 1440' \\ -polort -1 -num_stimts 1 \\ -stim_times 1 timing.txt 'BLOCK(2.5)' \\ -x1D X.xmat.1D -x1D_stop 1dplot -sepscl sfile.1D waver.1D X.xmat.1D 7. Truncate stimulus times to the beginning of respective TRs. Given a TR of 2.5 seconds and random stimulus times, truncate those times to multiples of the TR (2.5). timing_tool.py -timing timing.txt -tr 2.5 -truncate_times \\ -write_timing trunc_times.txt Here, 11.83 would get truncated down to 10, the largest multiple of 2.5 less than or equal to the original time. 7b.Instead of just truncating the times, round them to the nearest TR, based on some TR fraction. In this example, round up to the next TR when a stimulus occurs at least 70% into a TR, otherwise round down to the beginning. timing_tool.py -timing timing.txt -tr 2.5 -round_times 0.7 \\ -write_timing round_times.txt With no rounding, a time of 11.83 would be truncated to 10.0. But 11.83 is 1.83 seconds into the TR, or is 73.2 percent into the TR. Since it is at least 70% into the TR, it is rounded up to the next one. Here, 11.83 would get rounded up to 12.5. 8. Create an event list from stimulus timing files. The TR is 1.25s, events are ~1 TR long, and require them to occupy at least half of the given TR. Specify that rows should be per run and the run durations are all 370. timing_tool.py -multi_timing stimes.*.txt \\ -multi_timing_to_events all.events.txt \\ -tr 1.25 -multi_stim_dur 1 -min_frac 0.5 \\ -per_run -run_len 370 8b.Break the event file into 2, one for a sequence of changing event types, one for a sequence of ISIs (TRs from one event to the next, including the TR of the event). So if the event file from #8 shows: 0 0 3 0 0 0 0 1 0 2 2 0 0 0 ... The resulting event/ISI files would read: event: 0 3 1 2 2 ... ISI: 2 5 2 1 4 ... timing_tool.py -multi_timing stimes.*.txt \\ -multi_timing_to_event_pair events.txt isi.txt \\ -tr 1.25 -multi_stim_dur 1 -min_frac 0.5 \\ -per_run -run_len 370 9a. Convert from global stim times to local. This requires knowing the run lengths, say 4 runs of 200 seconds here. The result will have 4 rows, each starting at time 0. timing_tool.py -timing stim.1D \\ -global_to_local local.1D \\ -run_len 200 200 200 Note that if stim.1D looks like this ( ** but as a single column ** ): 12.3 115 555 654 777 890 then local.1D will look like this: 12.3 115 * 155 254 377 490 It will complain about the 3 times after the last run ends (no run should have times above 200 sec). 9b. Convert from local timing back to global. timing_tool.py -timing local.1D \\ -local_to_global global.1D \\ -run_len 200 200 200 10. Display within-TR statistics of stimulus timing files, to show when stimuli occur within TRs. The -tr option must be specified. a. one file: show offset statistics (using -show_tr_stats) timing_tool.py -timing stim01_houses.txt -tr 2.0 -show_tr_stats b. (one or) many files (use -multi_timing) timing_tool.py -multi_timing stim*.txt -tr 2.0 -show_tr_stats c. only warn about potential problems (use -warn_tr_stats) timing_tool.py -multi_timing stim*.txt -tr 2.0 -warn_tr_stats -------------------------------------------------------------------------- Notes: 1. Action options are performed in the order of the options. If the -chrono option is given, everything (but -chrono) is. 2. Either -timing or -multi_timing is required for processing. 3. Option -run_len applies to single or multiple stimulus classes. A single parameter would be used for all runs. Otherwise one duration per run should be supplied. -------------------------------------------------------------------------- basic informational options: -help : show this help -hist : show the module history -show_valid_opts : show all valid options -ver : show the version number ------------------------------------------ options with both single and multi versions (all single first): -timing TIMING_FILE : specify a stimulus timing file to load e.g. -timing stimesB_01_houses.1D Use this option to specify a single stimulus timing file. The user can modify this timing via some of the action options listed below. -show_isi_stats : display timing and ISI statistics With this option, the program will display timing statistics for the single (possibly modified) timing element. If -tr is included, TR offset statistics are also shown. -show_timing_ele : display info on the main timing element With this option, the program will display information regarding the single (possibly modified) timing element. -stim_dur DURATION : specify the stimulus duration, in seconds e.g. -stim_dur 3.5 This option allows the user to specify the duration of the stimulus, as applies to the single timing element. The only use of this is in conjunction with -show_isi_stats. Consider '-show_isi_stats' and '-run_len'. -------------------- -multi_timing FILE1 FILE2 ... : specify multiple timing files to load e.g. -timing stimesB_*.1D Use this option to specify a list of stimulus timing files. The user cannot modify this data, but can display the overall ISI statistics from it. Options that pertain to this timing list include: -multi_show_isi_stats -multi_show_timing_ele -multi_stim_dur -run_len -write_all_rest_times -multi_show_isi_stats : display timing and ISI statistics With this option, the program will display timing statistics for the multiple timing files. If -tr is included, TR offset statistics are also shown. If -write_all_rest_times is included, write a file of rest durations. -multi_show_timing_ele : display info on the multiple timing elements With this option, the program will display information regarding the multiple timing element list. -multi_stim_dur DUR1 ... : specify the stimulus duration(s), in seconds e.g. -multi_stim_dur 3.5 e.g. -multi_stim_dur 3.5 4.5 3 This option allows the user to specify the durations of the stimulus classes, as applies to the multiple timing elements. The only use of this is in conjunction with -multi_show_isi_stats. If only one duration is specified, it is applied to all elements. Otherwise, there should be as many stimulus durations as files specified with -multi_timing. Consider '-multi_show_isi_stats' and '-run_len'. ------------------------------------------ action options (apply to single timing element, only): ** Note that these options are processed in the order they are read. See '-chrono' for similar notions. -add_offset OFFSET : add OFFSET to every time in main element e.g. -add_offset -12.0 Use this option to add a single offset to all of the times in the main timing element. For example, if the user deletes 3 4-second TRs from the EPI data, they may wish to subtract 12 seconds from every stimulus time, so that the times match the modified EPI data. Consider '-write_timing'. -add_rows NEW_FILE : append these timing rows to main element e.g. -add_rows more_times.1D Use this option to append rows from NEW_FILE to those of the main timing element. If the user then wrote out the result, it would be identical to using cat: "cat times1.txt times2.txt > both_times.txt". Consider '-write_timing'. -extend NEW_FILE : extend the timing rows with those in NEW_FILE e.g. -extend more_times.1D Use this option to extend each row (run) with the times in NEW_FILE. This has an effect similar to that of '1dcat'. Sorting the times is optional, done via '-sort'. Note that 3dDeconvolve does not need the times to be sorted, though it is more understandable to the user. Consider '-sort' and '-write_timing'. -global_to_local LOCAL_NAME.1D : convert from global timing to local e.g. -global_to_local local_times.1D Use this option to convert from global stimulus timing (in a single column format) to local stimulus timing. Run durations must be given of course, to determine which run each stimulus occurs in. Each stimulus time will be adjusted to be an offset into the current run, e.g. if each run is 120 s, a stimulus at time 143.6 would occur in run #2 (1-based) at time 23.6 s. Consider example 9a and options '-run_len' and '-local_to_global'. -local_to_global GLOBAL_NAME.1D : convert from local timing to global e.g. -local_to_global global_times.1D Use this option to convert from local stimulus timing (one row of times per run) to global stimulus timing (a single column of times across the runs, where time is considered continuous across the runs). Run durations must be given of course, to determine which run each stimulus occurs in. Each stimulus time will be adjusted to be an offset from the beginning of the first run, as if there were no breaks between the runs. e.g. if each run is 120 s, a stimulus in run #2 (1-based) at time 23.6 s would be converted to a stimulus at global time 143.6 s. Consider example 9b and options '-run_len' and '-global_to_local'. -partition PART_FILE PREFIX : partition the stimulus timing file e.g. -partition partitions.txt new_times Use this option to partition the input timing file into multiple timing files based on the labels in a partition file, PART_FILE. The partition file would have the same number of rows and entries on each row as the timing file, but would contain labels to use in partitioning the times into multiple output files. A label of 0 will cause that timing entry to be dropped. Otherwise, each distinct label will have those times put into its timing file. e.g. timing file: 23.5 46.0 79.3 84.9 116.2 11.4 38.2 69.7 93.5 121.8 partition file: correct 0 0 incorrect incorrect 0 correct 0 correct correct ==> results in new_times_good.1D and new_times_bad.1D new_times_correct.1D: 23.5 0 0 0 0 0 38.2 0 93.5 121.8 new_times_incorrect.1D: 0 0 0 84.9 116.2 * -round_times FRAC : round times to multiples of the TR 0.0 <= FRAC <= 1.0 e.g. -round_times 0.7 All stimulus times will be rounded to a multiple TR, rounding down if the fraction of the TR that has passed is less than FRAC, rounding up otherwise. Using the example of FRAC=0.7, if the TR is 2.5 seconds, then times are rounded down if they occur earlier than 1.75 seconds into the TR. So 11.83 would get rounded up to 12.5, while 11.64 would be rounded down to 10. FRAC = 1.0 is essentially floor() (as in -truncate_times), while FRAC = 0.0 is essentially ceil(). This option requires -tr. Consider example 7b. See also -truncate_times. -scale_data SCALAR : multiply every stim time by SCALAR e.g. -scale_data 0.975 Use this option to scale (multiply) all times by a single value. This might be useful in effectively changing the TR, or changing the stimulus frequency, if it is regular. Consider '-write_timing'. -show_timing : display the current single timing data This prints the current (possibly modified) single timing data to the terminal. If the user is making multiple modifications to the timing data, they may wish to display the updated timing after each step. -show_tr_stats : display within-TR statistics of stimuli This displays the mean, max and stdev of stimulus times modulo the TR, both in seconds and as fractions of the TR. See '-warn_tr_stats' for more details. -warn_tr_stats : display within-TR stats only for warnings This is akin to -show_tr_stats, but output is only displayed if there might be a warning based on the timing. Warnings occur when the minimum fraction is positive and the maximum fraction is small (less than -min_frac, 0.3). If such warnings are encountered, particularly in the case of TENT basis functions used in the linear regression, they can affect the X-matrix, essentially scaling beta #0 by the reciprocal of the fraction (noise dependent). In such a case the stimuli are almost TR-locked, and the user might be better off making them exactly TR-locked (by creating new timing files using "timing_tool.py -round_times"). See also '-show_tr_stats', '-min_frac' and '-round_times'. -sort : sort the times, per row (run) This will cause each row (run) of the main timing element to be sorted (from smallest to largest). Such a step may be highly desired after using '-extend', or after some external manipulation that causes the times to be unsorted. Note that 3dDeconvolve does not require sorted timing. Consider '-write_timing'. -timing_to_1D output.1D : convert stim_times format to stim_file e.g. -timing_to_1D stim_file.1D This action is used to convert stimulus times to set (i.e. 1) values in a 1D stim_file. Besides an input -timing file, -tr is needed to specify the timing grid of the output 1D file, -stim_dur is needed to specify the duration of each stimulus (which might cross many output TRs), and -run_len is needed to specify the duration of each (or all) of the runs. The -min_frac option may be applied to give a minimum cutoff for the fraction of a TR occupied by a stimulus required to label that TR as a 1. If not, the default cutoff is 0.3. For example, assume options: '-tr 2', '-stim_dur 4.2', '-min_frac 0.2'. A stimulus at time 9.7 would last until 13.9. TRs 0..4 would certainly be 0, TR 5 would also be 0 as the stimulus covers only .15 of the TR (.3 seconds out of 2 seconds). TR 6 would be 1 since it is completely covered, and TR 7 would be 1 since .95 (1.9/2) would be covered. So the resulting 1D file would start with: 0 0 0 0 0 1 1 The main use of this operation is for PPI analysis, to partition the time series (maybe on a fine grid) with 1D files that are 1 when the given stimulus is on and 0 otherwise. Consider -tr, -stim_dur, -min_frac, -run_len. Consider example 6a. -transpose : transpose the data (only if rectangular) This works exactly like 1dtranspose, and requires each row to have the same number of entries (rectangular data). The first row would be swapped with the first column, etc. Consider '-write_timing'. -truncate_times : truncate times to multiples of the TR All stimulus times will be truncated to the largest multiple of the TR that is less than or equal to each respective time. That is to say, shift each stimulus time to the beginning of its TR. This is particularly important when stimulus times are at a constant offset into each TR and at the same time using TENT basis functions for regression (in 3dDeconvolve, say). The shorter the (non-zero) offset, the more correlated the first two tent regressors will be, possibly leading to unpredictable results. This option requires -tr. Consider example 7. -write_timing NEW_FILE : write the current timing to a new file e.g. -write_timing new_times.1D After modifying the timing data, the user will probably want to write out the result. Alternatively, the user could use -show_timing and cut-and-paste to write such a file. ------------------------------------------ action options (apply to multi timing elements, only): -multi_timing_to_events FILE : create event list from stimulus timing e.g. -multi_timing_to_events all.events.txt Decide which TR each stimulus event belongs to and make an event file (of TRs) containing a sequence of values between 0 (no event) and N (the index of the event class, for the N timing files). This option requires -tr, -multi_stim_dur, -min_frac and -run_len. Consider example 8. -multi_timing_to_event_pair Efile Ifile : break event file into 2 pieces e.g. -multi_timing_to_event_pair events.txt isi.txt Similar to -multi_timing_to_events, but break the output event file into 2 pieces, an event list and an ISI list. Each event E followed by K zeros in the previous events file would be broken into a single E (in the new event file) and K+1 (in the ISI file). Note that K+1 is appropriate from the assumption that events are 0-duration. The ISI entries should sum to the total number of TRs per run. Suppose the event file shows 2 TRs of rest, event type 3 followed by 4 TRs of rest, event type 1 followed by 1 TR of rest, type 2 and no rest, type 2 and 3 TRs of rest. So it would read: all events: 0 0 3 0 0 0 0 1 0 2 2 0 0 0 ... Then the event_pair files would read: events: 0 3 1 2 2 ... ISIs: 2 5 2 1 4 ... Note that the only 0 events occur at the beginnings of runs. Note that the ISI is always at least 1, for the TR of the event. This option requires -tr, -multi_stim_dur, -min_frac and -run_len. Consider example 8b. ------------------------------------------ general options: -chrono : process options chronologically While the action options are already processed in order, general and -timing options are not, unless the chrono option is given. This allows one to do things like scripting a sequence of operations within a single command. -min_frac FRAC : specify minimum TR fraction e.g. -min_frac 0.1 This option applies to either -timing_to_1D action or -warn_tr_stats. For -warn_tr_stats (or -show), if the maximum tr fraction is below this limit, TRs are considered to be approximately TR-locked. For -timing_to_1D, when a random timing stimulus is converted to part of a 0/1 1D file, if the stimulus occupies at least FRAC of a TR, then that TR gets a 1 (meaning it is "on"), else it gets a 0 ("off"). FRAC is required to be within [0,1], though clearly 0 is not very useful. Also, 1 is not recommended unless that TR can be stored precisely as a floating point number. For example, 0.1 cannot be stored exactly, so 0.999 might be safer to basically mean 1.0. Consider -timing_to_1D. -nplaces NPLACES : specify # decimal places used in printing e.g. -nplaces 1 This option allows the user to specify the number of places to the right of the decimal that are used when printing a stimulus time (to the screen via -show_timing or to a file via -write_timing). The default is -1, which uses the minimum needed for accuracy. Consider '-show_timing' and '-write_timing'. -per_run : perform relevant operations per run e.g. -per_run This option applies to -timing_to_1D, so that each 0/1 array is one row per run, as opposed to a single column across runs. -run_len RUN_TIME ... : specify the run duration(s), in seconds e.g. -run_len 300 e.g. -run_len 300 320 280 300 This option allows the user to specify the duration of each run. If only one duration is provided, it is assumed that all runs are of that length of time. Otherwise, the user must specify the same number of runs that are found in the timing files (one run per row). This option applies to both -timing and -multi_timing files. The run durations only matter for displaying ISI statistics. Consider '-show_isi_stats' and '-multi_show_isi_stats'. -tr TR : specify the time resolution in 1D output (in seconds) e.g. -tr 2.0 e.g. -tr 0.1 For any action that write out 1D formatted data (currently just the -timing_to_1D action), this option is used to set the temporal resolution of the data. For example, given -run_len 200 and -tr 0.5, one run would be 400 time points. Consider -timing_to_1D and -run_len. -verb LEVEL : set the verbosity level e.g. -verb 3 This option allows the user to specify how verbose the program is. The default level is 1, 0 is quiet, and the maximum is (currently) 4. -write_all_rest_times : write all rest durations to 'timing' file e.g. -write_all_rest_times all_rest.txt In the case of a show_isi_stats option, the user can opt to save all rest (pre-stim, isi, post-stim) durations to a timing-style file. Each row (run) would have one more entry than the number of stimuli (for pre- and post- rest). Note that pre- and post- might be 0. ----------------------------------------------------------------------------- R Reynolds December 2008 ============================================================================= """ g_history = """ timing_tool.py history: 1.0 Dec 01, 2008 - initial/release version 1.1 Jul 23, 2009 - added -partition option 1.2 Sep 16, 2009 - added -scale_data 1.3 Feb 20, 2010 - added -timing_to_1D, -tr and -min_frac 1.4 Mar 17, 2010 - fixed timing_to_1D when some runs are empty 1.5 Jun 09, 2010 - fixed partitioning without zeros 1.6 Jul 11, 2010 - show TR offset stats if -tr and -show_isi_stats 1.7 Jul 12, 2010 - added -truncate_times and -round_times (added for S Durgerian) 1.8 Aug 16, 2010 - use lib_textdata for I/O 1.9 Oct 15, 2010 - added -multi_timing_to_events, -multi_timing_to_event_pair, -per_run (added for N Adleman) 1.10 Oct 16, 2010 - fixed timing_to_1D fractions 1.11 Oct 21, 2010 - added -shift_to_run_offset 1.12 Nov 19, 2010 - moved write_to_timing_file to afni_util.py - added -write_all_rest_times (for J Poore) 1.13 Dec 15, 2010 - use lib_textdata.py for reading timing files - allow empty files as valid (for C Deveney) 1.14 May 25, 2011 - added -global_to_local and -local_to_global (for G Chen) 2.00 Oct 25, 2011 - process married files with current operations 1. AfniMarriedTiming inherits from AfniData (instead of local copies) 2. add all AfniTiming methods to AfniMarriedTiming (as married timing) 3. rename AfniMarriedTiming back to AfniTiming (but now married) 2.01 Oct 28, 2011 - allow use of -show_isi_stats w/o stim duration 2.02 Oct 31, 2011 - added -show_tr_stats and -warn_tr_stats options 2.03 Oct 03, 2012 - some options do not allow dashed parameters """ g_version = "timing_tool.py version 2.03, Oct 3, 2012" class ATInterface: """interface class for AfniTiming""" def __init__(self, verb=1): # main variables self.status = 0 # exit value self.valid_opts = None self.user_opts = None # user options self.chrono = 0 # are options processed chronologically self.nplaces = -1 # num decimal places for writing self.run_len = [0] # time per run (for single/multi) self.verb = verb self.min_frac = 0.3 # applies to timing_to_1D self.tr = 0 # applies to some output self.per_run = 0 # conversions done per run # user options - single var self.timing = None # main timing element self.fname = 'no file selected' self.all_rest_file = '' # for -write_all_rest_times self.stim_dur = -1 # apply on read # user options - multi var self.m_timing = [] self.m_fnames = [] self.m_stim_dur = [] # initialize valid_opts self.init_options() def set_timing(self, fname): """load a timing file, and init the main class elements""" self.status = 1 # init to failure timing = LT.AfniTiming(fname, dur=self.stim_dur, verb=self.verb) if not timing.ready: print "** failed to read timing from '%s'" % fname return 1 # success, so nuke and replace the old stuff if self.timing: if self.verb > 0: print "-- replacing old timing with that from '%s'" % fname del(self.timing) del(self.fname) self.timing = None self.fname = None elif self.verb > 1: print "++ read timing from file '%s'" % fname self.timing = timing self.fname = fname self.status = 0 return 0 def multi_set_timing(self, flist): """load multiple timing files""" if type(flist) != type([]): print '** multi_set_timing: list of files required' return 1 if len(flist) < 1: return 0 sdl = len(self.m_stim_dur) if sdl == 0: sdurs = [-1 for ind in range(len(flist))] elif sdl > 1 and sdl != len(flist): print '** length of stim times does not match # files (%d, %d)' % \ (sdl, len(flist)) # set all durations to 0 sdurs = [-1 for ind in range(len(flist))] elif sdl == 1: # duplicate duration for all stimuli sdurs = [self.m_stim_dur[0] for ind in range(len(flist))] else: # sdl > 1 and lengths are equal: so use what we have sdurs = self.m_stim_dur rdlist = [] errs = 0 for ind in range(len(flist)): name = flist[ind] timing = LT.AfniTiming(name, dur=sdurs[ind], verb=self.verb) if not timing.ready: print "** (multi) failed to read timing from '%s'\n" % name errs += 1 continue rdlist.append(timing) if errs: return 1 # success, so nuke and replace the old stuff if self.m_timing: if self.verb > 0: print "-- replacing multi timing from %d files" % len(flist) del(self.m_timing) del(self.m_fnames) self.m_timing = [] self.m_fnames = [] elif self.verb > 1: print "++ read timing from %d files" % len(flist) self.m_timing = rdlist self.m_fnames = flist return 0 def set_stim_dur(self, dur): """apply the stim duration to the timing element""" if type(dur) != float: print "** set_stim_dur: float required, have '%s'" % type(dur) return 1 if dur < 0: return 0 self.stim_dur = dur if self.timing: self.timing.init_durations(dur) if self.verb > 2: print '++ applying stim dur: %f' % dur def multi_set_stim_durs(self, durs): """apply the stim durations to any multi-timing list""" if type(durs) != type([]): print '** multi_set_stim_durs: list of durations required' return 1 sdl = len(durs) stl = len(self.m_timing) # if we have no timing elements, just save the list if stl < 1: self.m_stim_dur = durs return 0 # now be sure durs matches the list length if sdl < 1: return 0 elif sdl == 1: # duplicate duration for all stimuli sdurs = [durs[0] for ind in range(stl)] elif sdl != stl: print '** length of durs list does not match # elements (%d, %d)' % \ (sdl, stl) return 1 else: sdurs = durs # now store the list self.m_stim_dur = sdurs if self.verb > 2: print '++ applying multi stim durs: %s' % sdurs for ind in range(stl): self.m_timing[ind].init_durations(sdurs[ind]) def show_multi(self): print '==================== multi-timing list ====================\n' for rd in self.m_timing: rd.show() def write_timing(self, fname): """write the current timing out, with nplaces right of the decimal""" if not self.timing: print '** no timing to write' return 1 return self.timing.write_times(fname, nplaces=self.nplaces) def init_options(self): self.valid_opts = OL.OptionList('valid opts') # short, terminal arguments self.valid_opts.add_opt('-help', 0, [], \ helpstr='display program help') self.valid_opts.add_opt('-hist', 0, [], \ helpstr='display the modification history') self.valid_opts.add_opt('-show_valid_opts', 0, [],\ helpstr='display all valid options') self.valid_opts.add_opt('-ver', 0, [], \ helpstr='display the current version number') # action options - single data self.valid_opts.add_opt('-add_offset', 1, [], helpstr='offset all data by the given value') self.valid_opts.add_opt('-add_rows', 1, [], helpstr='append the rows (runs) from the given file') self.valid_opts.add_opt('-extend', 1, [], helpstr='extend the rows lengths from the given file') self.valid_opts.add_opt('-global_to_local', 1, [], helpstr='convert global times to local and write') self.valid_opts.add_opt('-local_to_global', 1, [], helpstr='convert local times to global and write') self.valid_opts.add_opt('-partition', 2, [], helpstr='partition the events into multiple files') self.valid_opts.add_opt('-round_times', 1, [], helpstr='round times up if past FRAC of TR') self.valid_opts.add_opt('-scale_data', 1, [], helpstr='multiply all data by the given value') self.valid_opts.add_opt('-shift_to_run_offset', 1, [], helpstr='shift each run to start at time OFFSET') self.valid_opts.add_opt('-show_timing', 0, [], helpstr='display timing contents') self.valid_opts.add_opt('-sort', 0, [], helpstr='sort the data, per row') self.valid_opts.add_opt('-timing_to_1D', 1, [], helpstr='convert stim_times to 0/1 stim_file') self.valid_opts.add_opt('-transpose', 0, [], helpstr='transpose timing data (must be rectangular)') self.valid_opts.add_opt('-truncate_times', 0, [], helpstr='truncation times to multiple of TR') self.valid_opts.add_opt('-write_timing', 1, [], helpstr='write timing contents to the given file') # (ending with matches for multi) self.valid_opts.add_opt('-timing', 1, [], helpstr='load the given timing file') self.valid_opts.add_opt('-show_isi_stats', 0, [], helpstr='show ISI stats for the main timing object') self.valid_opts.add_opt('-show_timing_ele', 0, [], helpstr='display info about the main timing element') self.valid_opts.add_opt('-stim_dur', 1, [], helpstr='provide a stimulus duration for main timing') # action options - multi self.valid_opts.add_opt('-multi_timing', -1, [], okdash=0, helpstr='load the given list of timing files') self.valid_opts.add_opt('-multi_show_isi_stats', 0, [], helpstr='show ISI stats for load_multi_timing objs') self.valid_opts.add_opt('-multi_show_timing_ele', 0, [], helpstr='display info about the multi timing elements') self.valid_opts.add_opt('-multi_stim_dur', -1, [], okdash=0, helpstr='provide stimulus durations for timing list') self.valid_opts.add_opt('-multi_timing_to_events', 1, [], helpstr='convert stim_times event file') self.valid_opts.add_opt('-multi_timing_to_event_pair', 2, [], helpstr='convert stim_times event/isi files') # general options (including multi) self.valid_opts.add_opt('-chrono', 0, [], helpstr='process options chronologically') self.valid_opts.add_opt('-min_frac', 1, [], helpstr='min tr fraction (in [0,1.0])') self.valid_opts.add_opt('-nplaces', 1, [], helpstr='set number of decimal places for printing') self.valid_opts.add_opt('-per_run', 0, [], helpstr='perform operations per run') self.valid_opts.add_opt('-run_len', -1, [], okdash=0, helpstr='specify the lengths of each run (seconds)') self.valid_opts.add_opt('-show_tr_stats', 0, [], helpstr='show fractional TR stats timing files') self.valid_opts.add_opt('-warn_tr_stats', 0, [], helpstr='warn about bad fractional TR stats') self.valid_opts.add_opt('-tr', 1, [], helpstr='specify output timing resolution (seconds)') self.valid_opts.add_opt('-verb', 1, [], helpstr='set the verbose level (default is 1)') self.valid_opts.add_opt('-write_all_rest_times', 1, [], helpstr='in isi_stats, save rest durations to file') return 0 def process_options(self): # process any optlist_ options self.valid_opts.check_special_opts(sys.argv) # process terminal options without the option_list interface if len(sys.argv) <= 1 or '-help' in sys.argv: print g_help_string return 0 if '-hist' in sys.argv: print g_history return 0 if '-show_valid_opts' in sys.argv: self.valid_opts.show('', 1) return 0 if '-ver' in sys.argv: print g_version return 0 # ============================================================ # read options specified by the user self.user_opts = OL.read_options(sys.argv, self.valid_opts) uopts = self.user_opts # convenience variable if not uopts: return 1 # error condition # ------------------------------------------------------------ # check general options, esp. chrono if uopts.find_opt('-chrono'): self.chrono = 1 # if options are not chronological, process general options now # (so -show options are still in order) if not self.chrono: val, err = self.user_opts.get_type_opt(int, '-verb') if val != None and not err: self.verb = val val, err = uopts.get_type_opt(float, '-min_frac') if val and not err: self.min_frac = val if self.min_frac < 0.0 or self.min_frac > 1.0: print '** invalid -min_frac = %g' % self.min_frac print ' (should be in [0,1])' return 1 val, err = uopts.get_type_opt(int, '-nplaces') if val and not err: self.nplaces = val if uopts.find_opt('-per_run'): self.per_run = 1 val, err = uopts.get_type_opt(float, '-tr') if val and not err: self.tr = val if self.tr <= 0.0: print '** invalid (non-positive) -tr = %g' % self.tr return 1 val, err = uopts.get_type_list(float, '-run_len') if type(val) == type([]) and not err: self.run_len = val # main timing options val, err = uopts.get_string_opt('-timing') if val and not err: if self.set_timing(val): return 1 val, err = uopts.get_type_opt(float, '-stim_dur') if val and not err: self.set_stim_dur(val) val, err = uopts.get_string_list('-multi_timing') if type(val) == type([]) and not err: if self.multi_set_timing(val): return 1 val, err = uopts.get_type_list(float, '-multi_stim_dur') if type(val) == type([]) and not err: self.multi_set_stim_durs(val) val, err = uopts.get_string_opt('-write_all_rest_times') if val and not err: self.all_rest_file = val # ------------------------------------------------------------ # selection and process options: # process sequentially, to make them like a script for opt in uopts.olist: # if all options are chronological, check load and general, too if self.chrono: # continue after any found option if opt.name == '-chrono': continue # main timing options if opt.name == '-timing': if self.set_timing(opt.parlist[0]): return 1 continue elif opt.name == '-multi_timing': if self.multi_set_timing(opt.parlist): return 1 continue elif opt.name == '-stim_dur': val, err = self.user_opts.get_type_opt(float, opt=opt) if val != None and err: return 1 else: self.set_stim_dur(val) continue elif opt.name == '-multi_stim_dur': val, err = self.user_opts.get_type_list(float, opt=opt) if val != None and err: return 1 else: self.multi_set_stim_durs(val) continue # general options val, err = uopts.get_type_opt(float, '-min_frac') if val and not err: self.min_frac = val if self.min_frac < 0.0 or self.min_frac > 1.0: print '** invalid -min_frac = %g' % self.min_frac print ' (should be in [0,1])' return 1 elif opt.name == '-nplaces': val, err = self.user_opts.get_type_opt(int, '', opt=opt) if val != None and err: return 1 else: self.nplaces = val continue elif opt.name == '-per_run': self.per_run = 1 continue elif opt.name == '-run_len': val, err = self.user_opts.get_type_list(float, opt=opt) if val != None and err: return 1 else: self.run_len = val continue val, err = uopts.get_type_opt(float, '-tr') if val and not err: self.tr = val if self.tr <= 0.0: print '** invalid -tr = %g' % self.tr return 1 elif opt.name == '-verb': val, err = self.user_opts.get_type_opt(int, '', opt=opt) if val != None and err: return 1 else: self.verb = val continue elif opt.name == '-write_all_rest_times': val, err = self.user_opts.get_string_opt('', opt=opt) if val != None and err: return 1 else: self.all_rest_file = val continue #---------- action options, always chronological ---------- # check multi- options first, then require self.timing if opt.name == '-multi_show_timing_ele': self.show_multi() continue if opt.name == '-add_rows': if not self.timing: print "** '%s' requires -timing" % opt.name return 1 val, err = uopts.get_string_opt('', opt=opt) if val != None and err: return 1 newrd = LT.AfniTiming(val,dur=self.stim_dur,verb=self.verb) if not newrd.ready: return 1 if self.timing.add_rows(newrd): return 1 elif opt.name == '-extend': if not self.timing: print "** '%s' requires -timing" % opt.name return 1 val, err = uopts.get_string_opt('', opt=opt) if val != None and err: return 1 newrd = LT.AfniTiming(val,dur=self.stim_dur,verb=self.verb) if not newrd.ready: return 1 if self.timing.extend_rows(newrd): return 1 elif opt.name == '-partition': if not self.timing: print "** '%s' requires -timing" % opt.name return 1 val, err = uopts.get_string_list('', opt=opt) if val != None and err: return 1 if self.timing.partition(val[0], val[1]): return 1 elif opt.name == '-add_offset': if not self.timing: print "** '%s' requires -timing" % opt.name return 1 val, err = uopts.get_type_opt(float, opt=opt) if val != None and err: return 1 if self.timing.add_val(val): return 1 elif opt.name == '-scale_data': if not self.timing: print "** '%s' requires -timing" % opt.name return 1 val, err = uopts.get_type_opt(float, opt=opt) if val != None and err: return 1 if self.timing.scale_val(val): return 1 elif opt.name == '-shift_to_run_offset': if not self.timing: print "** '%s' requires -timing" % opt.name return 1 val, err = uopts.get_type_opt(float, opt=opt) if val != None and err: return 1 if self.timing.shift_to_offset(val): return 1 elif opt.name == '-round_times': if not self.timing: print "** '%s' requires -timing" % opt.name return 1 if self.tr <= 0.0: print "** '%s' requires -tr" % opt.name return 1 val, err = uopts.get_type_opt(float, opt=opt) if val != None and err: return 1 if self.timing.round_times(self.tr, round_frac=val): return 1 elif opt.name == '-truncate_times': if not self.timing: print "** '%s' requires -timing" % opt.name return 1 if self.tr <= 0.0: print "** '%s' requires -tr" % opt.name return 1 if self.timing.round_times(self.tr): return 1 elif opt.name == '-sort': if not self.timing: print "** '%s' requires -timing" % opt.name return 1 self.timing.sort() elif opt.name == '-show_timing': if not self.timing: print "** '%s' requires -timing" % opt.name return 1 if self.verb > 0: print '# ++ timing (%d runs)\n' % len(self.timing.data) print UTIL.make_timing_data_string(self.timing.data, nplaces=self.nplaces, verb=self.verb) elif opt.name == '-show_isi_stats': if not self.timing: print "** '%s' requires -timing" % opt.name return 1 if self.show_isi_stats(): return 1 elif opt.name == '-multi_show_isi_stats': if len(self.m_timing) < 1: print "** '%s' requires -multi_timing" % opt.name return 1 if self.multi_show_isi_stats(): return 1 elif opt.name == '-show_tr_stats': if not self.timing and len(self.m_timing) == 0: print "** '%s' requires -timing or -multi_timing" % opt.name return 1 if self.tr <= 0.0: print "** '%s' requires -tr" % opt.name return 1 if self.show_tr_stats(): return 1 elif opt.name == '-warn_tr_stats': if not self.timing and len(self.m_timing) == 0: print "** '%s' requires -timing or -multi_timing" % opt.name return 1 if self.tr <= 0.0: print "** '%s' requires -tr" % opt.name return 1 if self.show_tr_stats(warn=1): return 1 elif opt.name == '-timing_to_1D': if not self.timing: print "** '%s' requires -timing" % opt.name return 1 val, err = uopts.get_string_opt('', opt=opt) if val != None and err: return 1 self.write_timing_as_1D(val) # pass event and ISI filenames elif opt.name == '-multi_timing_to_event_pair': if not self.m_timing: print "** '%s' requires -multi_timing" % opt.name return 1 val, err = uopts.get_string_list('', opt=opt) if val != None and err: return 1 self.multi_timing_to_events(val[0], val[1]) # just pass the event filename elif opt.name == '-multi_timing_to_events': if not self.m_timing: print "** '%s' requires -multi_timing" % opt.name return 1 val, err = uopts.get_string_opt('', opt=opt) if val != None and err: return 1 self.multi_timing_to_events(val) elif opt.name == '-transpose': if not self.timing: print "** '%s' requires -timing" % opt.name return 1 if self.timing.transpose(): return 1 elif opt.name == '-show_timing_ele': if not self.timing: print "** '%s' requires -timing" % opt.name return 1 self.timing.show() # this is a write option elif opt.name == '-global_to_local': if not self.timing: print "** '%s' requires -timing" % opt.name return 1 val, err = uopts.get_string_opt('', opt=opt) if val != None and err: return 1 if self.global_to_local(): return 1 self.write_timing(val) elif opt.name == '-local_to_global': if not self.timing: print "** '%s' requires -timing" % opt.name return 1 val, err = uopts.get_string_opt('', opt=opt) if val != None and err: return 1 if self.local_to_global(): return 1 self.write_timing(val) elif opt.name == '-write_timing': if not self.timing: print "** '%s' requires -timing" % opt.name return 1 val, err = uopts.get_string_opt('', opt=opt) if val != None and err: return 1 self.write_timing(val) return 0 def multi_timing_to_events(self, fname, wname=None): """convert multi-stim_times to 0..N event list If wname is not set, write event list to fname. Otherwise, write collapsed event list and ISI list files. """ errs = 0 if len(self.m_timing) < 1: print '** no multi_timing, cannot convert' errs += 1 if not fname: print '** multi_timing_to_events: missing filename' errs += 1 if self.tr <= 0.0: print '** error: -tr must be positive' errs += 1 if len(self.run_len) < 2 and self.run_len[0] == 0: print '** mulit_timing_to_events requires -run_len' errs += 1 if self.min_frac <= 0.0 or self.min_frac > 1.0: print '** error: -min_frac must be in (0.0, 1.0]' errs += 1 if errs: return 1 amtlist = [] for index, timing in enumerate(self.m_timing): errstr, result = timing.timing_to_1D(self.run_len, self.tr, self.min_frac, self.per_run) if errstr: print errstr return 1 if self.verb > 3: print '++ event list %d : %s' % (index+1, result) amtlist.append(result) err, combo = self.combine_multi_1D_lists(amtlist) if err: return 1 # maybe this is all the users wants if not wname: TD.write_1D_file(combo, fname, self.per_run) return 0 err, stimlist, waitlist = self.combo_to_event_and_wait(combo) if err: return 1 TD.write_1D_file(stimlist, fname, self.per_run) TD.write_1D_file(waitlist, wname, self.per_run) return 0 def combo_to_event_and_wait(self, combo): """return a list of events (0 for leading rest) and waiting periods (TRs, including events, until next event)""" if type(combo[0]) == type([]): # then process as list of runs elist = [] wlist = [] for llist in combo: rv, ee, ww = self.single_combo_to_EW(llist) if rv: return 1, [], [] elist.append(ee) wlist.append(ww) else: # process as one long list rv, elist, wlist = self.single_combo_to_EW(combo) if rv: return 1, [], [] return 0, elist, wlist def single_combo_to_EW(self, combo): """return a list of events (0 for leading rest) and waiting periods (TRs, including events, until next event)""" elist = [] wlist = [] eposn = 0 for ind, val in enumerate(combo): if val == 0: if ind == 0: # we are at the beginnng of the run elist.append(0) wlist.append(1) else: # wait longer for next event (common) wlist[-1] += 1 else: # a new event elist.append(val) wlist.append(1) return 0, elist, wlist def combine_multi_1D_lists(self, amtlist): """combine multiple 0/1 lists into event lists - if each list is 2 dimensional, work per row """ if self.verb > 1: print '++ creating combo from %d event lists' % len(amtlist) if type(amtlist[0][0]) == type([]): combo = [] # test lengths L0 = amtlist[0] for L in amtlist: if len(L) != len(L0): print '** CM1L: length mis-match: %d vs %d' % (len(L0), len(L)) return 1, [] for rind in range(len(L)): if len(L[rind]) != len(L0[rind]): print '** CM1L: row length mis-match at row %d' % rind return 1, [] if self.verb > 2: print '-- combining event lists per run, %d lists of length %d' \ % (len(amtlist), len(L[0])) # put each row together for rind in range(len(L0)): result = self.combine_1D_lists([L[rind] for L in amtlist]) if result == None: return 1, [] combo.append(result) else: if self.verb > 2: print '-- combining events lists as single sequence' combo = self.combine_1D_lists(amtlist) if self.verb > 2: print '++ have combined TR list: %s' % combo return 0, combo def combine_1D_lists(self, tlist): """combine these lists of events into a composite, ordered by their indices there should be no overlap """ if len(tlist) < 1: return [] if len(tlist) == 1: return tlist[0] combo = [0 for val in tlist[0]] # and fill with every other for lind, llist in enumerate(tlist): for vind, val in enumerate(llist): if val != 0: if combo[vind] != 0: print '** event duplication between lists %d and %d at' \ ' index %d' % (combo[vind], lind+1, vind) combo[vind] = lind+1 # insert 1-based list index here return combo def write_timing_as_1D(self, fname): """convert stim_times to 0/1 format""" if not self.timing: print '** no timing, cannot convert to 1D' return 1 if not fname: print '** write_timing_as_1D: missing filename' return 1 if self.tr <= 0.0: print '** error: -tr must be positive' return 1 if len(self.run_len) < 2 and self.run_len[0] == 0: print '** timing_to_1D requires -run_len' return 1 if self.min_frac <= 0.0 or self.min_frac > 1.0: print '** error: -min_frac must be in (0.0, 1.0]' return 1 errstr, result = self.timing.timing_to_1D(self.run_len, self.tr, self.min_frac, self.per_run) if errstr: print errstr return 1 TD.write_1D_file(result, fname, self.per_run) def global_to_local(self): """convert global times to local, based in run_len array return 0 on success, 1 on any error""" if not self.timing: print '** no timing, cannot convert to local' return 1 if len(self.run_len) < 2 and self.run_len[0] == 0: print '** global_to_local requires -run_len' return 1 return self.timing.global_to_local(self.run_len) def local_to_global(self): """convert local times to global, based in run_len array return 0 on success, 1 on any error""" if not self.timing: print '** no timing, cannot convert to global' return 1 if len(self.run_len) < 2 and self.run_len[0] == 0: print '** local_to_global requires -run_len' return 1 return self.timing.local_to_global(self.run_len) def show_isi_stats(self): if not self.timing: print '** no timing, cannot show stats' return 1 rv = self.timing.show_isi_stats(mesg='single element', run_len=self.run_len, tr=self.tr, rest_file=self.all_rest_file) if rv and self.verb > 2: self.timing.make_data_string(nplaces=self.nplaces, mesg='SHOW ISI FAILURE') return 0 def multi_show_isi_stats(self): if len(self.m_timing) < 1: print '** no multi-timing, cannot show stats' return 1 amt = self.m_timing[0].copy() nele = len(self.m_timing) for ind in range(1, nele): amt.extend_rows(self.m_timing[ind]) if self.verb > 2: amt.show('final AMT') rv = amt.show_isi_stats(mesg='%d elements'%nele, run_len=self.run_len, tr=self.tr, rest_file=self.all_rest_file) if rv and self.verb > 2: print amt.make_data_string(nplaces=self.nplaces,mesg='ISI FAILURE') return 0 def show_tr_stats(self, warn=0): """show results from get_TR_offset_stats() if warn, only output warnings (where max is below min_frac) """ if self.min_frac >= 0 and self.min_frac < 1: frac = self.min_frac else: frac = 0.3 if not self.timing and len(self.m_timing) == 0: print '** no timing, cannot show stats' return 1 if self.tr <= 0.0: print "** show_tr_stats requires -tr" return 1 # either way, process as a list tlist = self.m_timing if len(tlist) == 0: tlist = [self.timing] for timing in tlist: rv, rstr = timing.get_TR_offset_stats_str(self.tr, mesg=timing.fname, wlimit=frac) if rv or not warn: print rstr return 0 def test(self, verb=3): # init print '------------------------ initial reads -----------------------' self.verb = verb # these should not fail, so quit if they do # first try AFNI_data4, then regression data if self.set_timing('X.xmat.1D'): return None # reset print '------------------------ reset files -----------------------' # failures print '------------------------ should fail -----------------------' self.set_timing('noxmat') # more tests return None def main(): ard = ATInterface() if not ard: return 1 rv = ard.process_options() if rv > 0: return 1 return ard.status if __name__ == '__main__': sys.exit(main())