#!/usr/bin/env python
# This library contains functions for managing the outputs of
# compute_ROI_stats.tcsh.
import sys, os, copy
from afnipy import afni_base as ab
from afnipy import afni_util as au
from afnipy import lib_apqc_html_css as lahc
# ============================================================================
# how to map warn levels to Q ratings here
warn2q = {
'none' : ' ',
'undecided' : '?',
'mild' : '?',
'medium' : 'x',
'severe' : 'x',
}
# ============================================================================
class all_comp_roi_dset_table:
"""An object to accompany compute_ROI_stats.tcsh text file output.
This contains a *set* of one or more tables."""
def __init__(self, ftext, prefix='', fname='input_file',
write_out=True, disp_max_warn=False, verb=0):
"""Take a full file text (list of strings) and loop over tables
within."""
self.verb = verb # verbosity level
self.ftext = ftext # list of str, full file text
self.prefix = prefix # str, output filename radix
self.fname = fname # str, input filename
self.write_out = write_out # bool, make output file?
self.disp_max_warn = disp_max_warn # bool, show max warn str?
# attributes defined by parsing self.ftext
self.all_tables_raw = [] # list of comp_roi_dset* text
self.all_tables_eval = [] # list of comp_roi_dset_* obj
# ----- start doing work
### [PT] note to self: wd have to adjust this to be more
### flexible about whitespace/empty lines
# _tmp = is_comp_roi_str_list_ok(self.ftext)
self.find_all_tables()
self.evaluate_all_tables()
if self.write_out :
self.write_out_table_file(self.prefix)
# ---------------------------------------------------
def write_out_table_file(self, prefix):
"""Save to disk the processed text files: table_values_html."""
# default postfixes for particular kinds of tables, if prefix
# is empty or None
dict_tables = {
'values_html' : '_eval_html.txt',
}
for table in dict_tables.keys() :
if prefix :
opref = prefix
elif '.' in self.fname :
ppp = '.'.join(self.fname.split('.')[:-1])
opref = ppp + dict_tables[table]
elif len(self.fname) :
opref = self.fname + dict_tables[table]
else:
opref = 'prefix' + dict_tables[table]
otext = '\n' # empty line at top
# for each given table type, loop over all tables in the file
for ii in range(self.n_tables):
# insert empty line after any existing table
if ii : otext+= '\n'
iitab = self.all_tables_eval[ii]
otext+= iitab.assemble_table_values(table)
otext+= '\n'
fff = open( opref, "w" )
fff.write( otext )
fff.close()
def evaluate_all_tables(self):
""" Run the quality evaluations for each table """
for table_raw in self.all_tables_raw:
OBJ = comp_roi_dset_table(table_raw,
disp_max_warn = self.disp_max_warn)
self.all_tables_eval.append(OBJ)
def find_all_tables(self):
"""Go through ftext and parcel out into separate tables."""
# bc is_comp_roi_str_list_ok() has been run, we know we can go
# 6 lines in and then look for the first empty line break (or
# end of file) to determine where the table ends
start = 0
while start < self.n_ftext and len(self.ftext[start].split()) == 0 :
start+=1
ii = start + 6
while ii < self.n_ftext :
if len(self.ftext[ii].split()) == 0 :
# found a break between/at end of tables
self.all_tables_raw.append(copy.deepcopy(self.ftext[start:ii]))
# look for additional whitespace-only lines
while ii < self.n_ftext and len(self.ftext[ii].split()) == 0 :
ii+=1
start = ii
# jump across possible table header height
ii+= 6
else:
ii+= 1
if start < self.n_ftext and ii != start :
# must have found another table
self.all_tables_raw.append(copy.deepcopy(self.ftext[start:ii]))
ab.IP("Found {} tables".format(self.n_tables))
# verify tables
for table in self.all_tables_raw:
_tmp = is_comp_roi_str_list_ok(table)
@property
def n_ftext(self):
"""Number of lines in ftext list"""
return len(self.ftext)
@property
def n_tables(self):
"""Number of tables in all_tables_raw"""
return len(self.all_tables_raw)
# ---------------------------------------------------------------------------
class comp_roi_dset_table:
"""An object to accompany compute_ROI_stats.tcsh text file output.
This contains a single dset's table.
The major input here is:
table : list of strings from reading a text file created by
compute_ROI_stats.tcsh; just a single table, if that program
put multiple ones into its text file.
Probably the major products of interest from creating this object are:
self.table_values_html : a copy of the raw input table that includes
HTML-style warning level coloration
"""
def __init__(self, table, disp_max_warn=False, verb=0):
"""Take a dset table (list of strings) and go to work
calculating/evaluating things.
"""
self.verb = verb # verbosity level
self.table = table # list of str, raw table text
# attributes defined by parsing self.table
self.table_top = [] # list of top 3 header rows
self.table_cols = [] # list of column names
self.table_coldash = [] # list of col name dashes
self.table_values = [] # list of Nroi str
# attributes defined by evaluating table_values for warnings
self.table_values_html = [] # table_values + HTML colors
self.table_maxwarn = [] # list of max warn per item
self.disp_max_warn = disp_max_warn # do display max warning lev?
# ----- start doing work
_tmp = is_comp_roi_str_list_ok(self.table)
self.parse_input_table()
self.init_eval_tables()
self.evaluate_all_warns()
self.apply_warns_to_values_html()
if self.disp_max_warn :
self.do_display_max_warn()
# ---------------------------------------------------
def assemble_table_values(self, table=None):
"""Combine the header+text for table_values*, such as for writing to
a file; allowed table values are:
'values', 'values_html'."""
# allowed keyword list for 'table'
tlist = ['values', 'values_html']
if table == None :
ab.WP("No table specified for assembling? Choose from one of:"
"{}".format(', '.join(tlist)))
return ''
otext = ''
otext+= ''.join(self.table_top)
otext+= ''.join(self.table_cols)
otext+= ''.join(self.table_coldash)
if table == 'values' :
otext+= ''.join([''.join(x) for x in self.table_values])
elif table == 'values_html' :
otext+= ''.join([''.join(x) for x in self.table_values_html])
else:
ab.WP("No table specified for assembling? Choose from one of:"
"{}".format(', '.join(tlist)))
return ''
return otext
def do_display_max_warn(self):
"""Get the maximum warning level across the table, and output it as
text"""
max_warn = 0
max_warn_str = ''
for idx in range(self.n_table_values):
for col in range(self.len_table_cols):
wlev = lahc.wlevel_ranks[ self.table_maxwarn[idx][col] ]
if wlev > max_warn :
max_warn = wlev
max_warn_str = self.table_maxwarn[idx][col]
ab.IP("max warn level : {}".format(max_warn_str))
def apply_warns_to_values_html(self):
"""Go through all warns, and add HTML wrappers in appropriate
places for table_values_html."""
for idx in range(self.n_table_values):
for col in range(self.len_table_cols):
wlev = self.table_maxwarn[idx][col]
self.table_values_html[idx][col] = \
wrap_val_with_html_span(self.table_values[idx][col], wlev=wlev)
def evaluate_all_warns(self):
"""For each ROI (i.e., each element of table_values), go through all
possible warn functions, and update: table_values_html and
table_maxwarn."""
# each of these updates table_values_html and table_maxwarn
for idx in range(self.n_table_values):
self.check_nvox(idx)
self.check_nzer_frac(idx)
self.check_dvox(idx)
self.check_tsnr_slope_2575(idx)
self.check_tsnr_value_75(idx) # do after TSNR-slope check
def check_tsnr_slope_2575(self, idx):
"""Run the check on TSNR slope in ROI."""
# find columns with correct info
all_col = [self.table_cols.index('Tmin'),
self.table_cols.index('T25%'),
self.table_cols.index('Tmed'),
self.table_cols.index('T75%'),
self.table_cols.index('Tmax')
]
# get numbers from relevant row
tlist = [int(self.table_values[idx][jj]) for jj in all_col]
# evaluate warning level, and save
wlev = warn_roi_stats_tsnr_slope_2575(tlist, verb=self.verb)
# ... and update warning levels in cols (here, middle 3 of 5)
for jj in all_col[1:4]:
self.update_table_maxwarn(wlev, idx, jj)
#wrap_val_with_html_span(self.table_values[idx][jj], wlev=wlev)
def check_tsnr_value_75(self, idx):
"""Run the check on TSNR 75th %ile (instead of checking the max),
for its absolute magnitude."""
# find columns with correct info
col_T75p = self.table_cols.index('T75%')
# get numbers from relevant row
T75p = float(self.table_values[idx][col_T75p])
# evaluate warning level, and save
wlev = warn_roi_stats_tsnr_value_75(T75p, verb=self.verb)
# ... and update warning levels in cols
self.update_table_maxwarn(wlev, idx, col_T75p)
def check_dvox(self, idx):
"""Run the check on max voxel depth in the ROI."""
# find columns with correct info
col_Dvox = self.table_cols.index('Dvox')
# get numbers from relevant row
Dvox = float(self.table_values[idx][col_Dvox])
# evaluate warning level, and save
wlev = warn_roi_stats_dvox(Dvox, verb=self.verb)
# ... and update warning levels in cols
self.update_table_maxwarn(wlev, idx, col_Dvox)
def check_nzer_frac(self, idx):
"""Run the check on fraction of nonzero voxels in the ROI."""
# find columns with correct info
col_Nvox = self.table_cols.index('Nvox')
col_Nzer = self.table_cols.index('Nzer')
# get numbers from relevant row
Nvox = int(self.table_values[idx][col_Nvox])
Nzer = int(self.table_values[idx][col_Nzer])
# evaluate warning level, and save
wlev = warn_roi_stats_nzer_frac(Nvox, Nzer, verb=self.verb)
# ... and update warning levels in cols
self.update_table_maxwarn(wlev, idx, col_Nzer)
def check_nvox(self, idx):
"""Run the check on total number of voxels in the ROI."""
# find columns with correct info
col_Nvox = self.table_cols.index('Nvox')
# get numbers from relevant row
Nvox = int(self.table_values[idx][col_Nvox])
# evaluate warning level, and save
wlev = warn_roi_stats_nvox(Nvox, verb=self.verb)
# ... and update warning levels in cols
self.update_table_maxwarn(wlev, idx, col_Nvox)
def update_table_maxwarn(self, wlev, idx, col):
"""For a given idx (row in table) and col (column number), update
the warning level with wlev; that is, record the max warning level
between what is there already and wlev."""
# get old warn level value
wlev_old = self.table_maxwarn[idx][col]
# ... and update it with wlev, if the latter is larger
self.table_maxwarn[idx][col] = maxwarn(wlev, wlev_old)
def parse_input_table(self):
"""Parse the input table, separating it into useful attributes"""
# store the header part in pieces
self.table_top = copy.deepcopy(self.table[:3])
self.table_cols = break_line_at_whitespace(self.table[3])
self.table_coldash = copy.deepcopy(self.table[4])
# store the table itself
for i in range(5, self.n_table):
x = self.table[i]
y = break_line_at_whitespace(x)
if len(y) == 0:
ab.EP("Row {} in full table is empty?".format(i))
###z = self.manage_q_col(y) # not used currently
# check that we have achieved consistency
if len(y) != self.len_table_cols :
ab.WP("[{}]th ROI has {} pieces, but col header has {}?"
"".format(i, len(y), self.len_table_cols))
if self.verb :
print('row:', x)
self.table_values.append(y)
def init_eval_tables(self):
"""Initialize some evaluation things"""
self.table_values_html = copy.deepcopy(self.table_values)
# store max warning for each column in table---even white
# space ones, sillily enough (well, for bookkeeping)
row = ['none'] * self.len_table_cols
self.table_maxwarn = [copy.deepcopy(row) for n in range(self.n_table_values)]
def manage_q_col(self, y):
"""Make the table_values have a Q column entry in an appropriate
way. If they have one already, do nothing. If they don't have
one, then create a placeholder one, which is a ' ' carved out
of the initial white space there. Here, y is a list of str,
representing the full ROI row that has been broken up into
words and whitespace chunks. **NOT USED CURRENTLY**"""
if y[0][0] != ' ' :
# nothing to do in this case: the entry should be a
# rating already.
return copy.deepcopy(y)
else:
# no rating present; split [0]th block of len=N whitespace
# into 2 blocks: one of len=1, and one of len=N-1
N = len(y[0])
z = copy.deepcopy(y)
z[0] = ' '*(N-1) # make y[0] shorter by 1 char
z.insert(0, ' ') # insert that space as new [0]th char
return z
@property
def n_table(self):
"""Number of lines in raw input table"""
return len(self.table)
@property
def n_table_top(self):
"""Number of lines in 'table top' part of header"""
return len(self.table_top)
@property
def n_table_values(self):
"""Number of ROI rows in table"""
return len(self.table_values)
@property
def len_table_cols(self):
"""Number of table columns (in broken format, that counts whitespace
chunks)"""
return len(self.table_cols)
# ----------------------------------------------------------------------------
def maxwarn(A, B):
"""For two warn level strings, A and B, return the max level. If A or
B is not an allowed warn level, produce an error.
Parameters
----------
A : str
a warning level str from:
'none', 'undecided', 'mild', 'medium', 'severe'
B : str
a warning level str (same list as above)
Returns
-------
C : str
max warning level string between A and B
"""
all_level = lahc.wlevel_ranks.keys()
if not(A in all_level) :
ab.EP("Input A ({}) is not in allowed list:\n"
"{}".format(A, ', '.join(all_level)))
if not(B in all_level) :
ab.EP("Input A ({}) is not in allowed list:\n"
"{}".format(B, ', '.join(all_level)))
wA = lahc.wlevel_ranks[A]
wB = lahc.wlevel_ranks[B]
# return largest (in case of tie, can return either
if wA > wB : return A
else: return B
def warn_roi_stats_tsnr_slope_2575(tlist, verb=0):
"""For a given set of TSNR percentile values in tlist (= Tmin, T25%,
Tmed, T75%, Tmax), provide warning levels based on the ratio:
rat = (T75% - T25%)/Tmed
As the ratio gets larger, the warning level increases.
Parameters
----------
tlist : list or order collection
set of 5 TSNR values, as defined above
verb : int
verbosity level
Returns
-------
wlevel : str
string of warning level, from:
'none', 'undecided', 'mild', 'medium', 'severe'
"""
ttt = ', '.join([str(x) for x in tlist])
if verb : ab.IP("TSNR values : ".format(ttt))
N = len(tlist)
if N != 5 :
ab.EP("Should have exactly 5 TSNR values, not: {})".format(N))
tmin = tlist[0]
t25p = tlist[1]
tmed = tlist[2]
t75p = tlist[3]
tmax = tlist[4]
if tmax < 0 :
ab.EP("Can't have negative max TSNR ({})".format(tmax))
if tmax == 0 : return 'undecided'
rat = float(t75p - t25p)/tmed
# not sure what these warn level ranges should be!
if rat > 2.0 : return 'severe'
elif rat > 1.5 : return 'medium'
elif rat > 1.0 : return 'mild'
elif rat > 0.5 : return 'undecided'
else: return 'none'
def warn_roi_stats_tsnr_value_75(t75p, verb=0):
"""For a given value of the 75%ile of TSNR in the ROI, determine
warning level. This increasingly warns as this TSNR value decreases.
Parameters
----------
t75p : int/float
75th %ile value of TSNR in the ROI
verb : int
verbosity level
Returns
-------
wlevel : str
string of warning level, from:
'none', 'undecided', 'mild', 'medium', 'severe'
"""
if verb : ab.IP("t75p: {}".format(t75p))
if t75p < 0:
ab.EP("Can't have negative t75p ({})".format(t75p))
if t75p == 0 : return 'none' # no vox; gets flagged elsewhere
if t75p >= 100 : return 'none'
elif t75p >= 80 : return 'mild'
elif t75p >= 50 : return 'medium'
else: return 'severe'
def warn_roi_stats_nvox(nvox, verb=0):
"""For a given number of voxels (nvox), determine warning level. This
increasingly warns as the absolute number of voxels an ROI drops.
Parameters
----------
nvox : int
number of voxels in ROI
verb : int
verbosity level
Returns
-------
wlevel : str
string of warning level, from:
'none', 'undecided', 'mild', 'medium', 'severe'
"""
if verb : ab.IP("nvox: {}".format(nvox))
if nvox < 0:
ab.EP("Can't have negative nvox ({})".format(nvox))
if nvox == 0 : return 'none' # no vox; gets flagged elsewhere
if nvox > 15 : return 'none'
elif nvox > 9 : return 'mild'
elif nvox > 4 : return 'medium'
else: return 'severe'
def warn_roi_stats_nzer_frac(nvox, nzer, verb=0):
"""For a given number of voxels (nvox) and number of zero-valued
voxels (nzer), determine warning level for: frac = nzer/nvox. This warns
increasingly as the ROI gets filled with more empty voxels.
Parameters
----------
nvox : int
number of voxels in ROI
nzer : int
number of zero-valued voxels in the ROI
verb : int
verbosity level
Returns
-------
wlevel : str
string of warning level, from:
'none', 'undecided', 'mild', 'medium', 'severe'
"""
if verb : ab.IP("nvox: {}, nzer: {}".format(nvox, nzer))
if nvox < 0 or nzer < 0 :
ab.EP("Can't have negative nvox ({}) or nzer ({})".format(nvox, nzer))
if nvox == 0 : return 'none' # no vox; gets flagged elsewhere
elif nzer == 0 : return 'none'
frac = float(nzer) / nvox
if frac < 0.05 : return 'none'
elif frac < 0.1 : return 'undecided'
elif frac < 0.2 : return 'mild'
elif frac < 0.35 : return 'medium'
else: return 'severe'
def warn_roi_stats_dvox(dvox, verb=0):
"""Assign an appropriate warning level for the max volumetric depth
(dvox) of the ROI, counting in units of (isotropic) voxel dimension.
As the dvox decreases, the warning level increases. Lower dvox
suggests greater influence of partial voluming, for example.
Parameters
----------
dvox : float
max voxel depth of ROI (units: number of voxels, which are likely
isotropic)
verb : int
verbosity level
Returns
-------
wlevel : str
string of warning level, from:
'none', 'undecided', 'mild', 'medium', 'severe'
"""
if verb : ab.IP("dvox: {}".format(dvox))
if dvox < 0 :
ab.EP("Can't have negative dvox ({})".format(dvox))
elif dvox and dvox < 1 :
ab.WP("Shouldn't have sub-unity dvox ({})".format(dvox))
if dvox == 0 : return 'none' # no vox; gets flagged elsewhere
elif dvox >= 1.7 : return 'none'
elif dvox >= 1.4 : return 'mild' # though, 1.4 is a common val
else: return 'medium'
# ============================================================================
def wrap_val_with_html_span(x, wlev='none', style='background'):
"""Take a column entry x, and either wrap it HTML-style in a
background color (if the warning level wlev is high enough), or return
it plainly as a string. We use the same colormapping rules as main
APQC HTML (lahc.wlevel_colors); note that 'none' will be ignored,
though (no green, just leave plain).
Parameters
----------
x : str
an entry in the table_values table to (possibly) be wrapped in HTML
coloration
wlev : str
the warning level associated with x, which determines if and what
color will be wrapped
style : str
a keyword for the kind of coloration to add; valid args are:
'background', 'border', 'underline'
Returns
-------
sss : str
the table entry either just returned back plainly (no change) or
wrapped in color
"""
# use same colormapping rules as main APQC HTML
# (lahc.wlevel_colors); note that 'none' will be ignored, though
# (no green, just leave plain)
# warning level not high enough to add color
if wlev == 'none' :
return """{}""".format(x)
# warning level *is* high enough to add color
wcol = lahc.wlevel_colors[wlev]
if style == 'background' :
sss = """""".format(wcol)
sss+= """{}""".format(x)
elif style == 'border' :
# this does not work well, shifts things
sss = """""".format(wcol)
sss+= """{}""".format(x)
elif style == 'underline' :
sss = """""".format(wcol)
sss+= """{}""".format(x)
return sss
# ----------------------------------------------------------------------------
def break_line_at_whitespace(x, verb=0):
"""Take a string x and break it into a list of strings that are broken
up at whitespace boundaries. This is different than the str.split()
method, because it preserves all characters (even whitespace), but
chunks them in sequence into substrings.
Parameters
----------
x : str
input string to be broken up
verb : int
verbosity level
Returns
-------
y : list
list of strings from x, broken up at whitespace boundaries. At the
end, should have: ''.join(y) == x
"""
# ----- simple checks
xtype = type(x).__name__
if xtype != 'str' :
ab.EP("Input x must be of type str, not: {}".format(xtype))
N = len(x)
if N == 0 :
return ['']
if verb :
ab.IP("Input string length: {} char".format(N))
# ----- main work
all_word = x.split() # list of non-whitespace words
y = [] # init empty list for all str
bot = 0 # init starting index of each search
for word in all_word :
# find next word, starting from end of last one
top = x.find(word, bot)
if bot != top :
y.append(x[bot:top])
y.append(word)
# update starting index for next search
bot = top + len(word)
# get any whitespace that is after final word
if bot != N :
y.append(x[bot:])
Ny = len(y)
if verb :
ab.IP("Output list has length: {} elements".format(Ny))
# ----- check if we lost anything
ystr = ''.join(y)
if ystr != x :
ab.WP("Reconstituted broken string y differs from x", y, x)
if verb :
print(" x: '{}'".format(x))
print(" y: '{}'".format(ystr))
return y
def is_comp_roi_str_list_ok(L):
"""Preliminary evaluation of the input table (which is a list of str),
for fundamental/necessary properties. Basically, fail if something
looks wrong and just return zero if we successfully run the gamut
of properties.
"""
# check type of input
ttype = type(L).__name__
if ttype != 'list' :
ab.EP("Table must be a list, not: {}".format(ttype))
# check minimum len of input
N = len(L)
if not(N) :
ab.EP("Table is empty")
elif N < 6 :
ab.EP("Table must have at least 6 lines (to have >=1 ROI). "
"This one has only: {}".format(N))
# check input contents (list of str)
for x in L:
xtype = type(x).__name__
if xtype != 'str' :
ab.EP("Table must contain only str, not: {}".format(xtype))
# [0]th row must start with 'dset:'
lstart = L[0][:5]
if lstart != 'dset:' :
ab.EP("Table must start with 'dset:', not {}".format(lstart))
# [3]rd row must have 'R' (for 'ROI') as first non-whitespace char
lstart = L[3].split()[0][0]
if lstart != 'R' :
ab.EP("Line [3]'s first non-whitespace char must be 'R', "
"not {}".format(lstart))
return 0
# ============================================================================
if __name__ == '__main__' :
#fname = '/home/ptaylor/AFNI_data6/FT_analysis/'
#fname+= 'sub-456.results_FT.rest.15/t.tsnr_stats_regress8/'
#fname+= 'stats_MNI-Glasser.txt'
#
#x = au.read_text_file(fname, strip=False)
#
#OBJ = comp_roi_dset_table(x)
'''
fname2 = '/home/ptaylor/AFNI_data6/FT_analysis/'
fname2+= 'sub-456.results_FT.rest.16/tsnr_stats_regress/'
fname2+= 'stats_COMBO9.txt'
prefout = '.'.join(fname2.split('.')[:-1]) # use input file for output pref
x2 = au.read_text_file(fname2, strip=False)
OBJ2 = all_comp_roi_dset_table(x2, prefix = prefout)
'''
fname2 = '/home/ptaylor/AFNI_data6/FT_analysis/FT.results/tsnr_stats_regress/'
fname2+= 'stats_APQC_atlas.txt'
prefout = '.'.join(fname2.split('.')[:-1]) # use input file for output pref
x2 = au.read_text_file(fname2, strip=False)
OBJ2 = all_comp_roi_dset_table(x2, prefix = prefout)