Source code for neurobooth_terra.redcap

"""Functions to parse redcap surveys."""

# Authors: Mainak Jas <mjas@mgh.harvard.edu>

from datetime import datetime, date
from warnings import warn
import time
import json
import tempfile
import os
import os.path as op

import numpy as np
import pandas as pd
from pandas.api.types import infer_dtype


def iter_interval(wait=60, exit_after=np.inf):
    """Run redcap service in a loop.

    Exit loop with Ctrl + C

    Parameters
    ----------
    wait : float
        The time to wait in seconds.
    exit_after : float
        The time after which to exit in seconds.
    """
    start_time = start_time_t0 = time.time()
    yield
    while True:
        try:
            time.sleep(1)
            curr_time = time.time()
            time_elapsed = curr_time - start_time
            time_left = wait - time_elapsed
            if curr_time - start_time_t0 > exit_after:
                break
            elif time_left < 0:
                start_time = time.time()
                print('')
                yield
                continue
            print(f'Time left: {time_left:2.2f} s', end='\r')
        except KeyboardInterrupt:
            break


[docs]def fetch_survey(project, survey_name, survey_id, index=None, cast_dtype=True): """Get schema of table from redcap Parameters ---------- project : instance of Project The project created using pycap. survey_name : str The name of the survey to export survey_id : int The survey_id. See under Reports in Redap. index : str The column to set as index. cast_dtype : bool If True, cast to datatype. This is done by writing the dataframe to a temporary csv file and reading it back. Returns ------- df : instance of DataFrame The pandas dataframe. """ print(f'Fetching report {survey_name} from Redcap') data = project.export_reports(report_id=survey_id) if 'error' in data: raise ValueError(data['error']) # format = 'df' didn't work df = pd.DataFrame(data) if cast_dtype: with tempfile.TemporaryDirectory() as temp_dir: csv_fname = op.join(temp_dir, survey_name + '.csv') df.to_csv(csv_fname, index=False) df = pd.read_csv(csv_fname) print('[Done]') df = df.where(pd.notnull(df), None) if index is not None: df = df.set_index(index) df.index = df.index.astype(str) # allow mixing int and str record_id return df
def _is_series_equal(src_series, target_series): """Check equality of two series by casting dtype when necessary.""" for col_index, target_item in target_series.iteritems(): src_item = src_series.loc[col_index] if isinstance(target_item, date): # ignore datetime column continue elif type(target_item) != type(src_item): src_item = np.array([src_item]).astype(type(target_item))[0] if target_item != src_item: return False return True def combine_indicator_columns(df, src_cols, target_col): """Combine indicator columns into categorical variable. Parameters ---------- df : pd.Dataframe The dataframe src_cols : dict The names of the columns are keys and the value it gets is the value. Example: dict(race___1=1, race___2=2). target_col : str The name of the target column to create """ arr = [list() for _ in range(df.shape[0])] for col_name, val in src_cols.items(): for idx, this_element in enumerate(df[col_name]): if this_element == 1.: arr[idx].append(int(val)) df[target_col] = arr df.drop(src_cols.keys(), axis=1, inplace=True) return df
[docs]def extract_field_annotation(s): """Extract the field annotation and create new columns for them Annotations are structured in the format: FOI-visual DB-y FOI-motor Returns ------- s : pandas series object The pandas series with error and columns for field annotation added. """ field_annot = s['field_annotation'] if pd.isna(field_annot): return s fields = field_annot.split(' ') fois = list() for field in fields: if field.startswith('@'): continue try: field_name, field_value = field.split('-') if field_name == 'FOI': fois.append(field_value) else: s[field_name] = field_value s['error'] = '' except Exception as e: msg = f'field_annotation reads: {field_annot}' s['error'] = msg warn(msg) s['FOI'] = fois return s
[docs]def get_response_array(s): """Add response array dictionary mapping answer choices to answer text. Returns ------- s : pandas series object The pandas series object containing new entries for the response array """ choices = s['select_choices_or_calculations'] if s['field_type'] not in ['radio', 'checkbox']: return s response_array = dict() choices = choices.split('|') for c in choices: k, v = c.strip().split(', ', maxsplit=1) response_array[k] = v s['response_array'] = response_array return s
[docs]def map_dtypes(s): """Create new columns mapping Redcap data types to Postgres and Python. Returns ------- s : pandas series object The pandas series object containing new entries database_dtype and python_dtype """ dtype_mapping = {'calc': 'double precision', 'checkbox': 'smallint[]', 'dropdown': 'smallint', 'notes': 'text', 'radio': 'smallint', 'yesno': 'boolean', 'file': 'varchar(255)'} text_dtype_mapping = {'date_mdy': 'date', 'email': 'varchar(255)', 'datetime_seconds_ymd': 'timestamp', 'datetime_seconds_mdy': 'timestamp', 'mrn_6d': 'integer', 'number': 'integer', 'phone': 'varchar(15)'} python_dtype_mapping = {'smallint[]': 'list', 'boolean': 'bool', 'text': 'str', 'varchar(255)': 'str', 'timestamp': 'str', 'date': 'str', 'datetime': 'str', 'double precision': 'float64', 'smallint': 'Int64', 'bigint': 'Int64', 'integer': 'Int64', 'varchar(15)': 'str', 'file': 'str'} redcap_dtype = s['field_type'] text_validation = s['text_validation_type_or_show_slider_number'] if pd.isna(redcap_dtype) or redcap_dtype in ['descriptive']: return s if redcap_dtype in dtype_mapping: s['database_dtype'] = dtype_mapping[redcap_dtype] elif redcap_dtype == 'text': s['database_dtype'] = text_dtype_mapping.get(text_validation, 'text') s['python_dtype'] = python_dtype_mapping[s['database_dtype']] return s
[docs]def get_tables_structure(metadata, include_surveys=None): """Get the column names and datatypes for the tables. Returns ------- table_infos : dict Dictionary with keys as table names and each table having the following entries: columns, dtypes, indicator_columns, and python_columns, python_dtypes (for casting columns to the right Python data type). """ metadata_by_form = metadata[np.any( [metadata['in_database'] == 'y', metadata['database_dtype'].notnull()], axis=0)] metadata_by_form = metadata_by_form.groupby('redcap_form_name') special_columns = { 'columns': ['subject_id', # integer for first instance is null and for # subsequent instance is integer starting at 1 and should # correspond to integer # in redcap_event_name (if completed once per visit) 'redcap_repeat_instance', 'redcap_event_name', # XXX: don't add in database? 'redcap_repeat_instrument'], 'dtypes': ['varchar(255)', 'integer', 'varchar(255)', 'varchar(255)'], 'python_dtypes': ['str', 'Int64', 'str', 'str'] } table_infos = dict() for form_name, metadata_form in metadata_by_form: if form_name == 'subject': # subject table is special continue table_infos[form_name] = { 'columns': list(), 'dtypes': list(), 'python_dtypes': list(), 'indicator_columns': list() } for index, row in metadata_form.iterrows(): if row['database_dtype'] == 'smallint[]': # checkbox column table_infos[form_name]['indicator_columns'].append(index) else: table_infos[form_name]['columns'].append(index) table_infos[form_name]['dtypes'].append(row['database_dtype']) table_infos[form_name]['python_dtypes'].append(row['python_dtype']) for meta_name in special_columns: table_infos[form_name][meta_name].extend(special_columns[meta_name]) table_infos[form_name]['columns'].append(f'{form_name}_complete') table_infos[form_name]['dtypes'].append('integer') table_infos[form_name]['python_dtypes'].append('Int64') if include_surveys is not None: table_infos = {k: v for (k, v) in table_infos.items() if k in include_surveys} return table_infos
[docs]def subselect_table_structure(table_info, df_cols): """Subselect columns that are in the report.""" table_info_new = {'columns': list(), 'dtypes': list(), 'python_dtypes': list()} for col, dtype, python_dtype in zip(table_info['columns'], table_info['dtypes'], table_info['python_dtypes']): if col in df_cols: table_info_new['columns'].append(col) table_info_new['dtypes'].append(dtype) table_info_new['python_dtypes'].append(python_dtype) table_info_new['indicator_columns'] = table_info['indicator_columns'] return table_info_new
[docs]def dataframe_to_tuple(df, df_columns, fixed_columns=None, indicator_columns=None): """Extract pandas dataframe into tuples for ingestion in Postgres. Parameters ---------- df : instance of pd.Dataframe The dataframe whose index is record_id df_columns : list of str The column names of the dataframe to process. The column_name 'record_id' is special and inserts the record_id index. fixed_columns : dict | None The columns that have fixed values. E.g., dict(study_id=study1) makes all the rows of column study_id to have value study1 indicator_columns : list of str | None The indicator columns. Returns ------- rows : list (n_rows,) of tuples A list with each row having the columns of dataframe. cols : list of str Ordered list of columns in which the tuple entries are added """ if fixed_columns is None: fixed_columns = dict() if indicator_columns is None: indicator_columns = list() for indicator_column in indicator_columns: mapping = dict() # {race___1: 1, race___2: 2} for col in df.columns: if col.startswith(indicator_column): mapping[col] = col.split('___')[1] if len(mapping) == 0: raise ValueError(f'No column found starting with {indicator_column}') df = combine_indicator_columns(df, mapping, indicator_column) rows = list() for index, df_row in df.iterrows(): row = list() for column_name in df_columns: row.append(df_row[column_name]) # XXX: hack, None/nan means missing values in comments. if not isinstance(row[-1], list) and \ (pd.isna(row[-1]) or row[-1] in ('None', 'nan')): row[-1] = None for column_name in fixed_columns: row.append(fixed_columns[column_name]) rows.append(tuple(row)) cols = df_columns + list(fixed_columns.keys()) return rows, cols
[docs]def rename_subject_ids(table_subject, redcap_df): """Rename subject_id in table_subject using data in redcap_df. Parameters ---------- table_subject : instance of Table The subject table. redcap_df : instance of dataframe The dataframe of subject from redcap. Notes ----- Let's say the table in database has the following subject_id first_name last_name date_of_birth old_subject_id 1001 anoopum gupta 28/11/1985 NULL 1002 adonay nunes 13/09/1987 NULL Now, redcap has been modified in the following way 901 anoopum gupta 28/11/1985 1001 1002 adonay nunes 13/09/1987 NULL 1003 sheraz khan 15/05/1980 NULL Our protocol is the following: 1. Fetch database table into Python dataframe 2. Inner join Redcap dataframe to database dataframe based on match in first_name, last_name and date_of_birth 3. Select rows from the inner join where Redcap.old_subject_id != database.old_subject_id AND Redcap.subject_id = database.old_subject_id 4. Insert these rows into database, updating the subject_id and old_subject_id """ match_columns = ['first_name_birth', 'last_name_birth', 'date_of_birth_subject'] database_df = table_subject.query().reset_index() # caste columns to match into same datatype to enable comparison database_df = database_df.astype({ col: 'str' for col in match_columns + ['subject_id', 'old_subject_id']}) redcap_df = redcap_df.astype({ col: 'str' for col in match_columns + ['subject_id', 'old_subject_id']}) joined_df = pd.merge(redcap_df, database_df, how='inner', left_on=match_columns, right_on=match_columns, suffixes=('', '_y')) joined_df = joined_df[joined_df['old_subject_id'] != joined_df['old_subject_id_y']] joined_df = joined_df[joined_df['old_subject_id'] == joined_df['subject_id_y']] if len(joined_df) > 0: rows_subject, cols_subject = dataframe_to_tuple( joined_df, df_columns=['subject_id', 'old_subject_id', 'redcap_event_name', 'first_name_birth', 'last_name_birth', 'date_of_birth_subject']) table_subject.insert_rows(rows_subject, cols_subject, on_conflict='update', conflict_cols=match_columns, update_cols=['subject_id', 'old_subject_id'])
def compare_dataframes(src_df, target_df): """Compare dataframes. Parameters ---------- src_df : pandas dataframe The source dataframe whose changes are printed. target_df : pandas dataframe The target dataframe that is compared against. """ # print extra columns in src_df src_columns = set(src_df.columns) target_columns = set(target_df.columns) common_columns = src_columns.intersection(target_columns) print(f'extra columns: {src_columns - common_columns}') # differences in rows ignoring extra columns print('Changed rows') src_df = src_df[common_columns] target_df = target_df[common_columns] for index, src_row in src_df.iterrows(): if index not in target_df.index: print(src_row[common_columns]) print('') continue target_row = target_df.loc[index] # if not src_row[target_columns].equals(target_row): if not _is_series_equal(src_row, target_row): print(src_row[common_columns]) print('') print(target_row) print('') print('') def infer_schema(survey_df, metadata_df): """Get schema from dataframe. Parameters ---------- survey_df : instance of pd.Dataframe The dataframe for which to infer the schema. metadata_df : instance of pd.Dataframe The metadata dataframe containing information about all the columns in all the surveys. """ # pandas to bigquery datatype mapping # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#tablefieldschema mapping = dict(floating='float', integer='integer', string='string') dtypes = survey_df.dtypes.to_dict() schema = dict() for column in survey_df.columns: choice = dict() question = '' field_type = '' if column in metadata_df.index: row = metadata_df.loc[column] question = row['field_label'] if question.startswith('<'): # html question = '' field_type = row['field_type'] choices = row['select_choices_or_calculations'] if not pd.isnull(choices): choices = choices.split('|') for c in choices: k, v = c.strip().split(', ') choice[k] = v else: print(f'Skipping {column}') dtype = infer_dtype(survey_df[column], skipna=True) dtype = mapping[dtype] if dtype == 'string': val = survey_df[column].dropna().iloc[0] if column.startswith('date'): # hardcode for now dtype = 'datetime' schema[column] = { 'name': column, 'type': dtype, 'mode': 'NULLABLE', 'choices': choice, 'question': question, 'field_type': field_type } return schema