首页 > 解决方案 > 缓慢的 VCF 解析和插入数据库

问题描述

我的数据文件是gnomad.exomes.r2.1.1.sites.13.vcf.bgz.

我已经解压了 VCF。这是我在过滤数据后插入 Postgres 的代码:

# =============================================================================
# import libraries for python 
# =============================================================================
import vcf
import os.path
from sqlalchemy import create_engine
import psycopg2       
import datetime
import sys
import _thread
import csv

from io import StringIO
import pandas as pd

db_user_name = "postgres";db_password = "password";db_name = "vcd";db_host_name = "localhost"
url = 'postgres://'+db_user_name+':'+db_password+'@'+db_host_name+'/'+db_name
engine = create_engine(url)

# =============================================================================
# function drops the previous table if exists and creates new table with dynamic table name
# =============================================================================
def create_table(table_name,table_type,conn_obj):
    dbcur = conn_obj.cursor()
    dbcur.execute("""DROP TABLE IF EXISTS {0};""".format(table_name))
    if table_type == '1' :
        dbcur.execute("""CREATE TABLE IF NOT EXISTS {0} (as_ID INT, as_NM TEXT, as_at_ID INT, as_at_NM TEXT, VCF_ID TEXT, VARIANT_ID TEXT, as_at_LINE_SEQ INT, DATE_START DATE, DATE_END DATE, as_at_VAL_SEQ INT, as_at_VA_NM TEXT, as_at_VALUE TEXT);""".format(table_name))
    elif table_type == '2' :
        dbcur.execute("""CREATE TABLE IF NOT EXISTS {0} (as_at_VA_NM TEXT, as_at_VA_DESC TEXT,as_at_DATA_TYPE TEXT);""".format(table_name))     
    conn_obj.commit()
    message = 'Table '+table_name+' created successfully.\n'
    print(message)
    return table_name

# =============================================================================
# function for insert vcf files data in values table
# =============================================================================

def create_tuples(as_id, vcf_id, Variant_id, as_at_nm, sample_id, as_at_va_nm, as_at_value, date_start, date_end, as_va_seq, as_at_va_line_seq,data_table_name):
    as_at_id = '1'
    sample_id = '1'
    variant_id = '1'
    as_nm = 'VCF'
    global datalist
    if (as_at_va_nm in headers_arr)==False:
        return
    #datalist.append("({0},'{1}','{2}','{3}','{4}',{5},'{6}','{7}','{8}','{9}')".format(as_id,str(as_nm),as_at_id,as_at_nm,"",variant_id,as_at_va_line_seq,as_va_seq,as_at_va_nm,as_at_value))
    datalist.append([as_id,str(as_nm),as_at_id,as_at_nm,vcf_id,variant_id,as_at_va_line_seq,as_va_seq,as_at_va_nm,as_at_value])

    if len(datalist)==10000:
        #insertdata(datalist,data_table_name,as_id)
        _thread.start_new_thread( insertdata, (datalist,data_table_name,as_id ) )
        datalist=[]
def insertdata(datalist2,data_table_name,as_id):
    global execution_start_time
    columns = ["as_id","as_nm","as_at_id","as_at_nm","vcf_id","variant_id","as_at_line_seq","as_at_val_seq","as_at_va_nm","as_at_value"]
    df = pd.DataFrame(datalist2,columns=columns)
    df.to_sql(data_table_name,engine,if_exists='append',method=psql_insert_copy, index=False)
    time_now = datetime.datetime.now()
    time_difference = time_now - execution_start_time
    print('Total inserted records:'+str(datalist2[len(datalist2)-1][6])+' in time '+str(time_difference))

def psql_insert_copy(table, conn, keys, data_iter):
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()

        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
          table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
                          table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)
# =============================================================================
# function defination / switch case for get recoras variable name & variable value
# =============================================================================
def get_DataFrom_Column(header):
    switcher = {
        "CHROM" : {'variable_name':'CHROM', 'variable_value': record.CHROM},
        "POS" : {'variable_name':'POS', 'variable_value': record.POS},
        "ID" : {'variable_name':'ID', 'variable_value': record.ID},
        "REF" : {'variable_name':'REF', 'variable_value': record.REF},
        "ALT" : {'variable_name':'ALT', 'variable_value': record.ALT},
        "QUAL" : {'variable_name':'QUAL', 'variable_value': record.QUAL},
        "FILTER" : {'variable_name':'FILTER', 'variable_value': record.FILTER},
        "INFO" : {'variable_name':'INFO', 'variable_value': record.INFO},
        "FORMAT" : {'variable_name':'FORMAT', 'variable_value': record.FORMAT}
    } 
    return switcher.get(header, "Invalid header")   
dic={}
def insert_infos_metadata(reader_infos,line_index,file_name,as_at_nm):
    for info_data in reader_infos:
        va_desc = reader_infos[info_data].desc
        arr_len = len(va_desc.split(':'))
        if arr_len > 1 :
            last_str = va_desc.split(':')[arr_len-1]
            dic[info_data]=last_str
    return line_index

# =============================================================================
# global variable declaration
# =============================================================================
index = 0
variableList = []
datalist  = []
headers_arr = []
totalrecordinserted=0
curentRecord=-1
execution_start_time= datetime.datetime.now()                                       # calculate execution start time

# =============================================================================
# get file path or name from user
# =============================================================================

file_path = "/gnomad.exomes.r2.1.1.sites.13.vcf"
if os.path.isfile(file_path) :                                                      #Check file is exists or not on given path
    if True:
        conn = psycopg2.connect(host="localhost",database="vcd", user="postgres", password="pgAdmin")
        cursor = conn.cursor()
        data_table_name = "data2"
        create_table(data_table_name,"1",conn)
        vcf_reader = vcf.Reader(open(file_path, 'r'))
        column_headers = vcf_reader._column_headers
        line_index = 0
        null_date = None
        as_at_header = 'Header'
        as_at_variant = 'Variant' 
        reader_infos = vcf_reader.infos
        line_index = insert_infos_metadata(reader_infos,line_index,file_path,as_at_header)

        execution_start_time= datetime.datetime.now()  
        headers_arr = ['INFO/AF', 'INFO/AF_NFE_SEU', 'INFO/AF_RAW', 'INFO/AF_FIN_FEMALE', 'INFO/AF_NFE_BGR', 'INFO/AF_SAS_MALE', 'INFO/AF_AFR_MALE', 'INFO/AF_AFR', 'INFO/AF_EAS_FEMALE', 'INFO/AF_AFR_FEMALE', 'INFO/AF_SAS', 'INFO/AF_NFE_ONF', 'INFO/AF_FIN_MALE', 'INFO/AF_NFE_FEMALE', 'INFO/AF_AMR', 'INFO/AF_EAS', 'INFO/AF_ASJ_MALE', 'INFO/AF_OTH_FEMALE', 'INFO/AF_NFE_SWE', 'INFO/AF_NFE_NWE', 'INFO/AF_EAS_JPN', 'INFO/AF_FEMALE', 'INFO/AF_EAS_KOR', 'INFO/AF_EAS_OEA', 'INFO/AF_NFE_EST', 'INFO/AF_EAS_MALE', 'INFO/AF_NFE', 'INFO/AF_FIN', 'INFO/AF_NFE_MALE', 'INFO/AF_SAS_FEMALE', 'INFO/AF_ASJ_FEMALE', 'INFO/AF_ASJ', 'INFO/AF_OTH', 'INFO/AF_MALE', 'INFO/AF_AMR_MALE', 'INFO/AF_AMR_FEMALE', 'INFO/AF_OTH_MALE', 'INFO/AF_POPMAX', 'CHROM', 'POS', 'REF', 'ALT', 'QUAL', 'FILTER']
        for record in vcf_reader:
            index += 1
            line_index += 1
            if index == 50000:
                break
            Variant_id = ''
            sample_name = ''
            #for header in column_headers : 
            for header in column_headers :
                if header != 'FORMAT' :
                    header_data_dict = get_DataFrom_Column(header)
                    #print(header_data_dict)
                    header_data_dict_value = header_data_dict['variable_value']
                    #print(header_data_dict_value)
                    if isinstance(header_data_dict_value, str) :
                        create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, header, str(header_data_dict_value), null_date, null_date, 1, str(line_index),data_table_name)
                    elif isinstance(header_data_dict_value, list) :
                        if len(header_data_dict_value) == 0 and header == 'FILTER' :
                            create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, header, 'PASS', null_date, null_date, 1, str(line_index),data_table_name)
                        else :
                            i = 0
                            while i < len(header_data_dict_value) :
                                create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, header, str(header_data_dict_value[i]), null_date, null_date, str(i), str(line_index),data_table_name)
                                i += 1
                    elif isinstance(header_data_dict_value, dict) :
                        for dict_val_record in header_data_dict_value :
                            #print(dict_val_record,'.......',header_data_dict_value[dict_val_record])
                            variable_name = header + '/' + dict_val_record
                            variable_value = header_data_dict_value[dict_val_record]
                            #print('.........',variable_value)
                            variable_seq = 1 
                            if isinstance(variable_value,list) :
                                for value in variable_value :
                                    if dict_val_record in dic :  
                                        header_key_description = vcf_reader.infos[dict_val_record].desc
                                        arr_len = len(header_key_description.split(':'))
                                        last_str = header_key_description.split(':')[arr_len-1]
                                        array_obj_desc_headers = last_str.split('|')
                                        arr_obj_val = value.strip("()[]").split("|")
                                        vararray = dic[dict_val_record].split("|")
                                        arr_index = 0
                                        for desc_header_value in arr_obj_val :
                                            variable_name = header+'/'+dict_val_record+'/'+vararray[arr_index].lstrip().replace("'","")
                                            variable_value = arr_obj_val[arr_index]
                                            arr_index += 1
                                            create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, variable_name, variable_value, null_date, null_date, str(variable_seq), str(line_index),data_table_name)
                                    else :
                                          create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, variable_name, value, null_date, null_date, str(variable_seq), str(line_index),data_table_name)

                                    variable_seq += 1  
                            else :
                                create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, variable_name, variable_value, null_date, null_date, str(variable_seq), str(line_index),data_table_name)
                    else :
                        variable_name = header
                        variable_value = str(header_data_dict_value)
                        create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, variable_name, variable_value, null_date, null_date, 1, str(line_index),data_table_name)
                else :
                    format_data_arr = (record.FORMAT).split(":")
                    for format_data_record in format_data_arr :
                        variable_name = header+'/'+format_data_record
                        variable_value = record.genotype(record_data.sample)[format_data_record]
                        create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, variable_name, variable_value, null_date, null_date, 1,str(line_index),data_table_name)

        insertdata(datalist,data_table_name,"1")
        print('Variants inserted successfully.')
    else:
        print('Incorrect Choice.')
else:
    print('File not present.')

这是输出表:

as_id   as_nm   as_at_id   as_at_nm vcf_id  variant_id  as_at_line_seq  date_start  date_end    as_at_val_seq   as_at_va_nm                         as_at_value
1       VCF     1           Variant ""      1           1                                       1               CHROM                               22
1       VCF     1           Variant ""      1           1                                       1               POS                                 16050036
1       VCF     1           Variant ""      1           1                                       1               ID                                  rs374742143
1       VCF     1           Variant ""      1           1                                       1               REF                                 A
1       VCF     1           Variant ""      1           1                                       0               ALT                                 C
1       VCF     1           Variant ""      1           1                                       1               QUAL                                442156.34
1       VCF     1           Variant ""      1           1                                       0               FILTER                              RF
1       VCF     1           Variant ""      1           1                                       1               INFO/non_neuro_nhomalt_amr_male     0
1       VCF     1           Variant ""      1           1                                       1               INFO/controls_AN_afr                4
1       VCF     1           Variant ""      1           1                                       1               INFO/controls_AC_amr_female         3
1       VCF     1           Variant ""      1           1                                       1               INFO/AF_oth_female                  0.5
1       VCF     1           Variant ""      1           1                                       1               INFO/non_topmed_nhomalt_afr_female  0

处理 50,000 条记录大约需要 5 分 43 秒。我无法理解为什么要花这么长时间。

有人可以帮我优化这段代码吗?

标签: pythonpostgresqlbioinformaticsvcf-variant-call-format

解决方案


为了加快插入速度,在事务中,您可以调整work_mem和/或maintenance_work_mem. 另外,这个这个答案更深入地解释了


推荐阅读