首页 > 解决方案 > 将 pandas read_csv 调用传递给另一个带有命名参数的函数

问题描述

由于函数是Python 中的一等公民,我应该能够重构它:

def get_events():
    csv_path = os.path.join(INPUT_CSV_PATH, DATA_NAME + '.csv')
    print(f'Starting reading events at {datetime.now()}')
    start_time = datetime.now()
    events = pd.read_csv(csv_path, dtype=DTYPES)
    end_time = datetime.now()
    print(f'Finished reading events at {end_time} ({end_time - start_time})')
    return events

对于这样的事情:

def get_events():
    csv_path = os.path.join(INPUT_CSV_PATH, DATA_NAME + '.csv')
    events = _time_function_call('reading events', pd.read_csv, {'filepath_or_buffer': csv_path, 'dtype': DTYPES})
    return events

def _time_function_call(message, func, *kwargs):
    print(f'Starting {message} at {datetime.now()}')
    start_time = datetime.now()
    result = func(*kwargs)
    end_time = datetime.now()
    print(f'Finished {message} at {end_time} ({end_time - start_time})')
    return result

即将pandas read_csv函数及其命名参数传递给辅助函数。(注意,我不确定在传递函数时如何传递命名参数,这个答案有帮助。)

但重构后我得到以下错误:

ValueError:无效的文件路径或缓冲区对象类型:<class 'dict'>

关于如何将函数及其命名参数传递给另一个 Python 函数进行评估,我缺少什么?

标签: python

解决方案


您可能想重构为:

def get_events():
    csv_path = os.path.join(INPUT_CSV_PATH, DATA_NAME + '.csv')
    events = _time_function_call('reading events', pd.read_csv, filepath_or_buffer=csv_path, dtype=DTYPES)
    return events

def _time_function_call(message, func, *args, **kwds):
    start_time = datetime.now()
    print(f'Starting {message} at {start_time}')
    result = func(*args, **kwds)
    end_time = datetime.now()
    duration = end_time - start_time
    print(f'Finished {message} at {end_time} ({duration})')
    return result

这样 Python 就可以处理任意参数列表

我建议使用上下文管理器logging模块,因为这样可以更容易地编写这样的代码,例如:

from time import perf_counter
import logging

logger = logging.getLogger(__name__)

class log_timer:
    def __init__(self, message):
        self.message = message

    def __enter__(self):
        logger.info(f"{self.message} started")
        # call to perf_counter() should be the last statement in method
        self.start_time = perf_counter()

    def __exit__(self, exc_type, exc_value, traceback):
        # perf_counter() call should be first statement
        secs = perf_counter() - self.start_time
        state = 'finished' if exc_value is None else 'failed'
        logger.info(f"{self.message} {state} after {secs * 1000:.2f}ms")

可以像这样使用:

from time import sleep

logging.basicConfig(
    format='%(asctime)s %(levelname)s %(message)s',
    level=0,
)

with log_timer("sleep"):
    sleep(1)

这样您就不必担心将任意位代码转换为函数和它们之间的线程状态。

此外,datetime像您一样使用并不适合测量少量代码的运行时间,该time模块为perf_counter更合适的 OS/CPU(更高分辨率)计时器提供了哪些手。


推荐阅读