首页 > 解决方案 > Flask 跨多个请求存储大型数据帧

问题描述

我有一个使用大型 DataFrame(数百 Megs)的 Flask Web 应用程序。DataFrame 在应用程序中用于几种不同的机器学习模型。我只想在应用程序中创建一次 DataFrame 并在多个请求中使用它,以便用户可以基于相同的数据构建不同的模型。Flask 会话不是为大数据构建的,所以这不是一个选项。如果数据源是 csv 文件 (yuck),我不想返回并重新创建 DataFrame。是

我有一个可行的解决方案,但我在堆栈溢出中找不到任何关于此解决方案的讨论。这让我怀疑我的解决方案可能不是一个好的设计理念。我一直使用这样一个假设,即在软件开发中一条成功的道路是一条经过精心挑选的道路。

我的解决方案是简单地创建一个具有一个类变量的数据持有者类:

class DataHolder:
     dataFrameHolder = None

现在 dataFrameHolder 在所有类实例中都是已知的(就像 Java 中的静态变量),因为它存储在服务器的内存中。

我现在可以创建一次 DataFrame,将其放入 DataHolder 类中:

import pandas as pd
from dataholder import DataHolder

result_set = pd.read_sql_query(some_SQL, connection)
df = pd.DataFrame(result_set, columns=['col1', 'col2',....]
DataHolder.dataFrameHolder = df

然后从任何导入 DataHolder 类的代码中访问该 DataFrame。然后我可以在应用程序的任何地方使用存储的 DataFrame,包括跨不同的请求:

.
.
modelDataFrame = DataHolder.dataFrameHolder
do_some_model(modelDataFrame)
.
.

这是一个坏主意,一个好主意,还是有其他我不知道的东西已经解决了问题?

标签: pythonpandasdataframeflask

解决方案


可以使用 Redis。我的用例是较小的数据帧,因此尚未使用较大的数据帧进行测试。这允许我向多个浏览器客户端提供 3 秒的滴答数据。pyarrow 序列化/反序列化表现良好。在本地和跨 AWS/GCloud 和 Azure 工作

获取路线

@app.route('/cacheget/<path:key>', methods=['GET'])
def cacheget(key):
    c = mycache()
    data = c.redis().get(key)
    resp = Response(BytesIO(data), mimetype="application/octet-stream", direct_passthrough=True)
    resp.headers["key"] = key
    resp.headers["type"] = c.redis().get(f"{key}.type")
    resp.headers["size"] = sys.getsizeof(data)
    resp.headers["redissize"] = sys.getsizeof(c.redis().get(key))
    return resp

将数据帧放入缓存的示例路由

@app.route('/sensor_data', methods=['POST'])
def sensor_data() -> str:
    c = mycache()
    dfsensor = c.get("dfsensor")
    newsensor = json_normalize(request.get_json())
    newsensor[["x","y"]] = newsensor[["epoch", "value"]]
    newsensor["xy"] = newsensor[['x', 'y']].agg(pd.Series.to_dict, axis=1)
    newsensor["amin"] = newsensor["value"]
    newsensor["amax"] = newsensor["value"]
    newsensor = newsensor.drop(columns=["x","y"])

    # add new data from serial interface to start of list (append old data to new data).
    # default time as now to new data
    dfsensor = newsensor.append(dfsensor, sort=False)
    # keep size down - only last 500 observations
    c.set("dfsensor", dfsensor[:500])
    del dfsensor

    return jsonify(result={"status":"ok"})

实用类

import pandas as pd
import pyarrow as pa, os
import redis,json, os, pickle
import ebutils
from logenv import logenv
from pandas.core.frame import DataFrame
from redis.client import Redis
from typing import (Union, Optional)

class mycache():
    __redisClient:Redis
    CONFIGKEY = "cacheconfig"

    def __init__(self) -> None:
        try:
            ep = os.environ["REDIS_HOST"]
        except KeyError:
            if os.environ["HOST_ENV"] == "GCLOUD":
                os.environ["REDIS_HOST"] = "redis://10.0.0.3"
            elif os.environ["HOST_ENV"] == "EB":
                os.environ["REDIS_HOST"] = "redis://" + ebutils.get_redis_endpoint()
            elif os.environ["HOST_ENV"] == "AZURE":
                #os.environ["REDIS_HOST"] = "redis://ignore:password@redis-sensorvenv.redis.cache.windows.net"
                pass # should be set in azure env variable
            elif os.environ["HOST_ENV"] == "LOCAL":
                os.environ["REDIS_HOST"] = "redis://127.0.0.1"
            else:
                raise "could not initialise redis"
                return # no known redis setup

        #self.__redisClient = redis.Redis(host=os.environ["REDIS_HOST"])
        self.__redisClient = redis.Redis.from_url(os.environ["REDIS_HOST"])
        self.__redisClient.ping()
        # get config as well...
        self.config = self.get(self.CONFIGKEY)
        if self.config is None:
            self.config = {"pyarrow":True, "pickle":False}
            self.set(self.CONFIGKEY, self.config)
        self.alog = logenv.alog()

    def redis(self) -> Redis:
        return self.__redisClient


    def exists(self, key:str) -> bool:
        if self.__redisClient is None:
            return False

        return self.__redisClient.exists(key) == 1

    def get(self, key:str) -> Union[DataFrame, str]:
        keytype = "{k}.type".format(k=key)
        valuetype = self.__redisClient.get(keytype)
        if valuetype is None:
            if (key.split(".")[-1] == "pickle"):
                return pickle.loads(self.redis().get(key))
            else:
                ret = self.redis().get(key)
                if ret is None:
                    return ret
                else:
                    return ret.decode()
        elif valuetype.decode() == str(pd.DataFrame):
            # fallback to pickle serialized form if pyarrow fails
            # https://issues.apache.org/jira/browse/ARROW-7961
            try:
                return pa.deserialize(self.__redisClient.get(key))
            except pa.lib.ArrowIOError as err:
                self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
                return pickle.loads(self.redis().get(f"{key}.pickle"))
            except OSError as err:
                if "Expected IPC" in str(err):
                    self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
                    return pickle.loads(self.redis().get(f"{key}.pickle"))
                else:
                    raise err

        elif valuetype.decode() == str(type({})):
            return json.loads(self.__redisClient.get(key).decode())
        else:
            return self.__redisClient.get(key).decode() # type: ignore

    def set(self, key:str, value:Union[DataFrame, str]) -> None:
        if self.__redisClient is None:
            return
        keytype = "{k}.type".format(k=key)

        if str(type(value)) == str(pd.DataFrame):
            self.__redisClient.set(key, pa.serialize(value).to_buffer().to_pybytes())
            if self.config["pickle"]:
                self.redis().set(f"{key}.pickle", pickle.dumps(value))
                # issue should be transient through an upgrade....
                # once switched off data can go away
                self.redis().expire(f"{key}.pickle", 60*60*24)
        elif str(type(value)) == str(type({})):
            self.__redisClient.set(key, json.dumps(value))
        else:
            self.__redisClient.set(key, value)

        self.__redisClient.set(keytype, str(type(value)))


if __name__ == '__main__':
    os.environ["HOST_ENV"] = "LOCAL"
    r = mycache()
    rr = r.redis()
    for k in rr.keys("cache*"):
        print(k.decode(), rr.ttl(k))
        print(rr.get(k.decode()))


推荐阅读