from pathlib import Path import h5py import click import numpy as np import pandas as pd import nibabel as nib from nilearn import glm, plotting from nilearn.glm.first_level import FirstLevelModel from nilearn.glm.first_level import compute_regressor from nilearn.interfaces.fmriprep import load_confounds_strategy from nilearn.glm.first_level.hemodynamic_models import orthogonalize def _poly_drift(order, frame_times): """ Vendored from nilearn/glm/first_level/design_matrix. Create a polynomial drift matrix. Parameters ---------- order : :obj:`int`, Number of polynomials in the drift model. frame_times : array of shape(n_scans), Time stamps used to sample polynomials. Returns ------- pol : ndarray, shape(n_scans, order + 1) Estimated polynomial drifts plus a constant regressor. """ order = int(order) pol = np.zeros((np.size(frame_times), order + 1)) tmax = float(frame_times.max()) for k in range(order + 1): pol[:, k] = (frame_times / tmax) ** k pol = orthogonalize(pol) pol = np.hstack((pol[:, 1:], pol[:, :1])) return pol def _def_lsa_model(events_df): """ Transform the DataFrame for LSA Parameters ---------- events_df : pd.DataFrame Returns ------- lsa_events_df : pd.DataFrame """ # copy the df to avoid in-place modification lsa_events_df = events_df.copy() # for each uniique condition, create a trial counter conditions = lsa_events_df["trial_type"].unique() condition_counter = dict.fromkeys(conditions, 0) # iterate over the df, generating new conditions for each trial for i_trial, trial in lsa_events_df.iterrows(): trial_condition = trial["trial_type"] condition_counter[trial_condition] += 1 # We use a unique delimiter here (``__``) that shouldn't be in the # original condition names trial_name = f"{trial_condition}__{condition_counter[trial_condition]:03d}" lsa_events_df.loc[i_trial, "trial_type"] = trial_name return lsa_events_df def _label_cond(row): """ Parameters ---------- row : pd.Series # TODO : Double-check that these are being correctly generated """ view_cond, _ = row["subcondition"].split("-", 1) if not row["error"] and view_cond == "unseen": cond = "correct_rej" elif not row["error"] and view_cond == "seen": cond = "hit" elif row["error"] is True and view_cond == "unseen": cond = "false_alarm" elif row["error"] is True and view_cond == "seen": cond = "miss" else: cond = pd.NA return cond def _get_files(subject, data_dir): """ Parameters ---------- subject : str CNeuroMod subject for analysis. Must be in ['sub-01', 'sub-02', 'sub-03', 'sub-06'] data_dir : str or Pathlike Location on disk with CNeuroMod things.fmriprep dataset. Assumes this has previously been installed with datalad. Returns ------- img_files : list List of fMRIPrep-preprocessed NifTI image files for the given subject, T1w space events : list List of events files for each subject, used to generate design matrices masks : list List of brain masks generated by fMRIPrep for each run, T1w space """ img_files = sorted( Path(data_dir, "things.fmriprep", subject).rglob( "*space-T1w_desc-preproc_bold.nii.gz" ) ) masks = sorted( Path(data_dir, "things.fmriprep", subject).rglob( "*space-T1w_desc-brain_mask.nii.gz" ) ) events = sorted( Path( data_dir, "things.fmriprep", "sourcedata", "things", subject, ).rglob("*events.tsv") ) # drop ses-01 / ses-001 from images, masks, events img_files = list(filter(lambda i: ("ses-01" not in str(i)), img_files)) masks = list(filter(lambda m: ("ses-01" not in str(m)), masks)) events = list(filter(lambda e: ("ses-01" not in str(e)), events)) if subject == "sub-01": # exceptionally sub-01, ses-30, run-1 had no responses ; # need to additionally filter these out img_files = list( filter( lambda i: ("sub-01_ses-30_task-things_run-1" not in str(i)), img_files ) ) masks = list( filter(lambda m: ("sub-01_ses-30_task-things_run-1" not in str(m)), masks) ) events = list( filter(lambda e: ("sub-01_ses-30_task-things_run-01" not in str(e)), events) ) if subject == "sub-06": # exceptionally sub-06, ses-08, run-6 was dropped after preprocessing ; # not yet removed from the full dataset, so filter accordingly img_files = list( filter( lambda i: ("sub-06_ses-08_task-things_run-6" not in str(i)), img_files ) ) masks = list( filter(lambda m: ("sub-06_ses-08_task-things_run-6" not in str(m)), masks) ) events = list( filter(lambda e: ("sub-06_ses-08_task-things_run-06" not in str(e)), events) ) return img_files, events, masks def _gen_fmri_glm(img, event, mask, t_r=1.49, smoothing_fwhm=5, reaction_time=False): """ Parameters ---------- img : str or Pathlike event : str or Pathlike mask : str or Pathlik t_r : float Default 1.49 smoothing_fwhm : int Default 5 reaction_time : Bool Default False Returns ------- stats_img : dict design_matrix : pd.DataFrame, optional """ # load in events files and create memory conditions # based on performance try: n_scans = nib.load(img).shape[-1] frame_times = np.arange(n_scans) * t_r minutes_scanned = (n_scans * t_r) / 60 design_df = pd.DataFrame(np.repeat([1.0], n_scans), columns=["constant"]) # drop trials with no response, since memory effect is not defined df = pd.read_csv(event, sep="\t") for index, row in df.iterrows(): if pd.isnull(row["error"]): df.drop(index, inplace=True) # define LSA model for memory conditions df["memory_cond"] = df.apply(_label_cond, axis=1) memory_events = pd.DataFrame( { "trial_type": df.memory_cond, "onset": df.onset, "duration": df.duration, "amplitude": 1.0, } ) memory_events = _def_lsa_model(memory_events) # convolve each memory event with HRF, add to design_df for _, mem in memory_events.iterrows(): regressor_array, regressor_name = compute_regressor( [[m] for m in mem[1:].values], "spm", np.arange(n_scans) * t_r + (t_r / 2), con_id=mem.trial_type, ) regressor_series = pd.Series( regressor_array.squeeze(), name=regressor_name[0] ) design_df = pd.concat([design_df, regressor_series], axis=1) # if using reaction time, define and convole with HRF, add to design_df if reaction_time: reaction_dur = pd.DataFrame( { "onset": df.onset, "duration": df.response_time_lastkeypress, "amplitude": 1.0, } ) regressor_array, _ = compute_regressor( np.transpose(reaction_dur.values), "spm", np.arange(n_scans) * t_r + (t_r / 2), con_id="ConsDurRTDur", ) reaction_dur_conv = pd.Series( regressor_array.squeeze(), name="ConsDurRTDur" ) design_df = pd.concat([design_df, reaction_dur_conv], axis=1) # calculate polynomial drifts, of same order as GLMSingle poly_drifts = _poly_drift( order=round(minutes_scanned / 2), frame_times=frame_times, )[ :, :-1 ] # drop constant regressor, since already present in df drifts = pd.DataFrame(poly_drifts, columns=["poly_drift0", "poly_drift1"]) # Load confounds using compcor strategy ; # n_compcor chosen from # https://www.frontiersin.org/files/Articles/54426/fnins-07-00247-HTML/image_m/fnins-07-00247-g002.jpg confounds, _ = load_confounds_strategy( str(img), denoise_strategy="compcor", compcor="temporal_anat_separated", n_compcor=5, ) # create final design matrix by concatenating design_matrix = pd.concat( [ design_df, confounds, drifts, ], axis=1, ) # design_matrix should be analogous to the following : # design_matrix = glm.first_level.make_first_level_design_matrix( # frame_times=frame_times, # events=memory_events, # drift_model="polynomial", # drift_order=round(minutes_scanned / 2), # add_regs=confounds, # add_reg_names=confounds.columns, # hrf_model="spm", # ) # add in stimulus type, repetition in dataframe # this will be useful later for HDF5 memory_events["image_path"] = df.image_path memory_events["condition"] = df.condition # Define and fit First Level model fmri_glm = FirstLevelModel(mask_img=mask, smoothing_fwhm=smoothing_fwhm) fmri_glm = fmri_glm.fit(img, design_matrices=design_matrix) except (FileNotFoundError, ValueError) as e: warn_msg = ( "Not all files can be loaded. " "Please ensure that files have been " "first downloaded with datalad." ) raise UserWarning(warn_msg) return fmri_glm, memory_events, design_matrix def _gen_stats_img(img_files, events, masks, data_dir, reaction_time): """ Parameters ---------- img_files : itr events : itr masks : itr data_dir : str or pathlike reaction_time : Bool """ for img, event, mask in zip(img_files, events, masks): stats_imgs = [] condition_names = [] # recreate this ; will need sub_name regardless sub_name, ses, _, run, _ = event.name.split("_") print(f"Currently running {sub_name}, {ses}, {run}") fmri_glm, memory_events, xmatrix = _gen_fmri_glm( img, event, mask, reaction_time=reaction_time ) # save out x_matrices for inspection if reaction_time: xmatrix_name = f"{sub_name}_{ses}_task-things_{run}_ConsDurRTDur_design.png" else: xmatrix_name = f"{sub_name}_{ses}_task-things_{run}_design.png" out_name = Path( data_dir, sub_name, "glm", "design_matrices", xmatrix_name, ) # explicitly make parent directories, if they don't exist out_name.parent.mkdir(parents=True, exist_ok=True) plotting.plot_design_matrix( xmatrix, output_file=out_name, ) # generate beta maps, one per trial trialwise_conditions = memory_events["trial_type"].unique() for condition in trialwise_conditions: condition_names.append(condition) beta_map = fmri_glm.compute_contrast(condition, output_type="effect_size") stats_imgs.append(beta_map.get_fdata()) # save out beta maps in h5 if reaction_time: h5_name = f"{sub_name}_{ses}_task-things_{run}_desc-ConsDurRTDurtrialwiseBetas_stats.h5" else: h5_name = f"{sub_name}_{ses}_task-things_{run}_desc-trialwiseBetas_stats.h5" out_name = Path( data_dir, sub_name, "glm", h5_name, ) out_name.parent.mkdir(parents=True, exist_ok=True) with h5py.File(out_name, "w") as hf: hf.create_dataset("effect_size", data=stats_imgs) hf.create_dataset("memory_behavior", data=condition_names) hf.create_dataset("image_path", data=memory_events.image_path.values) hf.create_dataset("condition", data=memory_events.condition.values) return @click.command() @click.option("--sub_name", default="sub-01", help="Subject name") @click.option( "--data_dir", default="/Users/emdupre/Desktop/things-glm", help="Data directory." ) @click.option("--reaction_time", is_flag=True, help="Whether to regress reaction time") def main(sub_name, data_dir, reaction_time): """ """ img_files, events, masks = _get_files(subject=sub_name, data_dir=data_dir) _gen_stats_img(img_files, events, masks, data_dir, reaction_time=reaction_time) return if __name__ == "__main__": main()