首页 > 解决方案 > 如何使用python循环通过文件并使用文本文件中的参数执行查询

问题描述

我正在尝试使脚本工作,该脚本从文件中获取每一行并使用该行作为输入来运行 SQL 查询。具体来说,我正在尝试使用具有域列表的文件并使用这些域名来查询 postgresql 数据库。任何帮助将不胜感激!

from __future__ import print_function

try:
    import psycopg2
except ImportError:
    raise ImportError('\n\033[33mpsycopg2 library missing. pip install psycopg2\033[1;m\n')
    sys.exit(1)
import re
import sys
import json

DB_HOST = 'crt.sh'
DB_NAME = 'certwatch'
DB_USER = 'guest'


def connect_to_db(domain_name):
    try:
        conn = psycopg2.connect("dbname={0} user={1} host={2}".format(DB_NAME, DB_USER, DB_HOST))
        cursor = conn.cursor()
        cursor.execute("SELECT ci.NAME_VALUE NAME_VALUE FROM certificate_identity ci WHERE ci.NAME_TYPE = 'emailAddress' AND reverse(lower(ci.NAME_VALUE)) LIKE reverse(lower('%{}'));".format(domain_name))
    except:
        print("\n\033[1;31m[!] Unable to connect to the database\n\033[1;m")
    return cursor

def get_unique_emails(cursor, domain_name):
    unique_emails = []
    for result in cursor.fetchall():
        matches=re.findall(r"\'(.+?)\'",str(result))
        for email in matches:
            #print(email)
            if email not in unique_emails:
                if "{}".format(domain_name) in email:
                    unique_emails.append(email)
    return unique_emails

def print_unique_emails(unique_emails):
    print("\033[1;32m[+] Total unique emails found: {}\033[1;m".format(len(unique_emails)))
    for unique_email in sorted(unique_emails):
        print(unique_email)

def write_unique_emails(unique_emails):
    with open('unique_emails.json', 'w') as outfile:
        json.dump(unique_emails, outfile, sort_keys=True, indent=4)

def get_domain_name():
    filepath = 'file.txt'  
    with open(filepath) as fp:  
    for cnt, line in enumerate(fp):
        print("Line {}: {}".format(cnt, line))
    return line

if __name__ == '__main__':
    domain_name = get_domain_name()                                             
    cursor = connect_to_db(domain_name)
    unique_emails = get_unique_emails(cursor, domain_name)
    print_unique_emails(unique_emails)
    write_unique_emails(unique_emails)

下面的代码使用 sys.argv

from __future__ import print_function

try:
    import psycopg2
except ImportError:
    raise ImportError('\n\033[33mpsycopg2 library missing. pip install psycopg2\033[1;m\n')
    sys.exit(1)
import re
import sys
import json

DB_HOST = 'crt.sh'
DB_NAME = 'certwatch'
DB_USER = 'guest'

def connect_to_db(domain_name):
    try:
        conn = psycopg2.connect("dbname={0} user={1} host={2}".format(DB_NAME, DB_USER, DB_HOST))
        cursor = conn.cursor()
        cursor.execute("SELECT ci.NAME_VALUE NAME_VALUE FROM certificate_identity ci WHERE ci.NAME_TYPE = 'emailAddress' AND reverse(lower(ci.NAME_VALUE)) LIKE reverse(lower('%{}'));".format(domain_name))
    cursor.execute("SELECT ci.NAME_VALUE NAME_VALUE FROM certificate_identity ci WHERE ci.NAME_TYPE = 'serialNumber' AND reverse(lower(ci.NAME_VALUE)) LIKE reverse(lower('%{}'));".format(domain_name))
    except:
        print("\n\033[1;31m[!] Unable to connect to the database\n\033[1;m")
    return cursor

def get_unique_emails(cursor, domain_name):
    unique_emails = []
    for result in cursor.fetchall():
        matches=re.findall(r"\'(.+?)\'",str(result))
        for email in matches:
            #print(email)
            if email not in unique_emails:
                if "{}".format(domain_name) in email:
                    unique_emails.append(email)
    return unique_emails

def get_unique_serialNumber(cursor, domains):
    unique_domains = []
    for result in cursor.fetchall():
        matches=re.findall(r"\'(.+?)\'",str(result))
        for serialNumber in matches:
            if serialNumber not in unique_serialNumber:
                if ".{}".format(domain_name) in serialNumber:
                    unique_serialNumber.append(serialNumber)
    return unique_serialNumber

def print_unique_serialNumber(unique_serialNumber):
    for unique_serialNumber in sorted(unique_serialNumber):
        print(unique_serialNumber)

def print_unique_emails(unique_emails):
    print("\033[1;32m[+] Total unique emails found: {}\033[1;m".format(len(unique_emails)))
    for unique_email in sorted(unique_emails):
        print(unique_email)

def write_unique_emails(unique_emails):
    with open('read.json', 'w') as outfile:
        json.dump(unique_emails, outfile, sort_keys=True, indent=4)

def get_domain_name():
    if len(sys.argv) <= 1:
        print("\n\033[33mUsage: python emails_from_ct_logs.py <target_domain>\033[1;m\n")
        sys.exit(1)
    else:
        return sys.argv[1]

if __name__ == '__main__':
    domain_name = get_domain_name()
    cursor = connect_to_db(domain_name)
    unique_emails = get_unique_emails(cursor, domain_name)
    print_unique_emails(unique_emails)
    write_unique_emails(unique_emails)
    unique_serialNumber = get_unique_serialNumber(cursor, domain_name)
    print_unique_serialNumber(unique_serialNumber)

标签: pythonsqlpython-3.xpostgresqlpsycopg2

解决方案


查看Psycopg2。如果不知道数据库的所有细节,就不可能进行“剪切和粘贴”代码转储。这里涵盖了基础知识,希望足以让您继续前进。何时或如果您有更具体的问题,请创建一个新线程。


推荐阅读