首页 > 解决方案 > 从动态 sql 列表运行 python 多进程

问题描述

嗨,我试图让我的代码更加动态和智能,为此我想通过动态列表调用我想要运行的函数,而不是让它们硬编码。这将清理代码并帮助自动重新运行失败的脚本。

下面是我一直在处理的代码片段。运行时我通常会收到此错误

TypeError:“str”对象不可调用

 cursor = conn.cursor()
    cursor.execute(
        f'''select distinct caller from {db}.log a where a.log_text like 'Failed:%' and a.log_time > DATE_TRUNC('DAY', NOW()) and caller not in (select caller from {db}.log a where a.log_text like 'Done' and a.log_time > DATE_TRUNC('DAY', NOW()))''')
    df = as_pandas(cursor)
    print('The following scripts will be rerun')
    print(df)

    c = df['caller']

    processes = []
    # Loop over failed scripts/modules
    for mod in (c):  
        print(f'Rerun of {c}')
        p = multiprocessing.Process(target=mod, args=(db,))
        time.sleep(10)
        p.start()
        processes.append(p)

    for process in processes:
        process.join()

完整的回溯错误

回溯(最后一次调用):文件“/home/xxx/anaconda3/lib/python3.6/multiprocessing/process.py”,第 258 行,在 _bootstrap self.run() 文件“/home/xxx/anaconda3/lib /python3.6/multiprocessing/process.py”,第 93 行,运行中 self._target(*self._args, **self._kwargs) TypeError: 'str' object is not callable Process Process-2: Traceback (最近最后调用):文件“/home/xxx/anaconda3/lib/python3.6/multiprocessing/process.py”,第 258 行,在 _bootstrap self.run() 文件“/home/xxx/anaconda3/lib/python3.6 /multiprocessing/process.py",第 93 行,运行中 self._target(*self._args, **self._kwargs) TypeError: 'str' object is not callable Process finished with exit code 0

标签: pythonsqlloopsimpalamultiprocess

解决方案


所以我找到了解决问题的方法,并将其发布在这里,以防将来有人可以使用它。

基本上我使用 getattr 和 importlib.import_module 函数。然后我通过 for 循环启动它们。

因此,我使用 sql 获取所有失败模块的名称,然后将这些模块加载到 df 列表中,然后通过 for 循环进行迭代,该循环将启动失败的模块。

cursor.execute(
            f'''select distinct caller from {db}.sch_log_python a where a.log_text like 'Failed:%' and lower(caller) in ('st_%','ctrl_%','ar%') and a.log_time > DATE_TRUNC('DAY', NOW()) and caller not in (select caller from {db}.sch_log_python a where a.log_text like 'Done' and a.log_time > DATE_TRUNC('DAY', NOW()))''')
        df = as_pandas(cursor)
        print('The following scripts will be rerun')
        print(df)

        sch_log_func(caller, 'Rerun of failed scripts', db)

        c = df
        # Loop over failed scripts/modules
        for i in (c):
            cstr = c.to_string(index=False, header=False)
            print(f'Initialise ' + cstr)
            cstr = cstr.lower()
            print(cstr + ' Module to import')
            print('Trying to run ' + cstr)
            cls = getattr(importlib.import_module(cstr), cstr)
            cls(db)
            print(f'Started {cstr}')

            

推荐阅读