首页 > 解决方案 > 如何有效地使用 pyspark mapPartitions 训练 facebook 先知模型?

问题描述

我现在正在和先知一起训练股票模型。有 3000 只股票需要训练。我需要为每个模型找到最佳参数。所以我使用带火花的先知,但效率不高。像这样的代码:

    filter_code = "'000001'"
    df = spark.read.format("jdbc") \
        .option("url", "jdbc:mysql://local:3306?useSSL=false") \
        .option("dbtable", "(select code,datetime,open from table where code in (%s)) as table" % filter_code) \
        .option("user", "user") \
        .option("password","password") \
        .option("numPartitions",1)
        .option("partitionColumn","code")
        .option("lowerBound", "000001")
        .option("upperBound","000001")
        .load()
    df = df.withColumn('open',df['open'].cast('float'))
        .withColumn('date',to_timestamp(df['datetime'],'yyyy/MM/dd HH:mm:ss').cast('timestamp'))
    parameters = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306?useSSL=false").option("dbtable", "table").option("user", "user").option("password", "password").load()
    evaluate = df.groupBy('code').agg(collect_list(struct(['date','open'])).alias("ds"))
    evaluate_param = evaluate.join(parameters,evaluate.code==parameters.code_p,'left_outer').select('code','ds','changepoint_prior_scale','seasonality_prior_scale','monthly_seasonality','yearly_seasonality')
    evaluate_result = evaluate_param.repartition(1) \
                        .rdd.mapPartitions(evaluationPartion)
    print(evaluate_result.collect())

evaluate_param 结果如下: 在此处输入图像描述

时间序列数据ds是先知参数。changepoint_prior_scale, seasonality_prior_scale, monthly_seasonality, yearly_seasonality

评估分区功能如下:

    def evaluationPartion(partitions):
        import pandas as pd
        import pyarrow
        import pyarrow.parquet as pq
        import fbprophet
        import numpy as np
        result = []
        for partition in partitions:
            stock = pd.DataFrame.from_records(partition.ds, columns=['date','open'])
            changepoint_prior_scale = partition.changepoint_prior_scale
            weekly_seasonality = 'auto'
            daily_seasonality = 'auto'
            monthly_seasonality = False if partition.monthly_seasonality == 0 else True
            yearly_seasonality = False if partition.yearly_seasonality == 0 else True
            changepoints = None
            seasonality_mode = 'additive'
            seasonality_prior_scale = partition.seasonality_prior_scale
            mcmc_samples = 0
            interval_width = 0.80
            uncertainty_samples = 100
            stock['ds'] = stock['date']
            stock['y'] = stock['open']
            max_date = max(stock['date'])
            min_date = min(stock['date'])
            start_date = max_date - pd.DateOffset(months=1)
            end_date = max_date
            training_years = 3
            train = stock[(stock['date'] < start_date) &  (stock['date'] > (start_date - pd.DateOffset(years=training_years)))]
            # get periods
            train_max_date = max(train['date'])
            time_diff = int((end_date.to_pydatetime()-train_max_date.to_pydatetime()).total_seconds()/60/5)
            # Testing data is specified in the range
            test = stock[(stock['date'] >= start_date) & (stock['date'] <= end_date)]
            model = fbprophet.Prophet(daily_seasonality=daily_seasonality,
                                      weekly_seasonality=weekly_seasonality,
                                      yearly_seasonality=yearly_seasonality,
                                      changepoint_prior_scale=changepoint_prior_scale,
                                      changepoints=changepoints,
                                      seasonality_mode=seasonality_mode,
                                      seasonality_prior_scale=seasonality_prior_scale,
                                      mcmc_samples=mcmc_samples,
                                      interval_width=interval_width,
                                      uncertainty_samples=uncertainty_samples)
            if monthly_seasonality:
                # Add monthly seasonality
                model.add_seasonality(name = 'monthly', period = 30.5, fourier_order = 5)
            model.fit(train)
            # Make a future dataframe and predictions
            future = model.make_future_dataframe(periods = time_diff, freq='5min')
            future = model.predict(future)
            # Merge predictions with the known values
            test = pd.merge(test, future, on = 'ds', how = 'inner')
            train = pd.merge(train, future, on = 'ds', how = 'inner')
            # Calculate the differences between consecutive measurements
            test['pred_diff'] = test['yhat'].diff()
            test['real_diff'] = test['y'].diff()
            # Correct is when we predicted the correct direction
            test['correct'] = (np.sign(test['pred_diff']) == np.sign(test['real_diff'])) * 1
            # Accuracy when we predict increase and decrease
            increase_accuracy = 100 * np.mean(test[test['pred_diff'] > 0]['correct'])
            decrease_accuracy = 100 * np.mean(test[test['pred_diff'] < 0]['correct'])
            # Calculate mean absolute error
            test_errors = abs(test['y'] - test['yhat'])
            test_mean_error = np.mean(test_errors)
            train_errors = abs(train['y'] - train['yhat'])
            train_mean_error = np.mean(train_errors)
            # Calculate percentage of time actual value within prediction range
            test['in_range'] = False
            for i in test.index:
                if (test.ix[i, 'y'] < test.ix[i, 'yhat_upper']) & (test.ix[i, 'y'] > test.ix[i, 'yhat_lower']):
                    test.ix[i, 'in_range'] = True
            in_range_accuracy = 100 * np.mean(test['in_range'])
            predict_price = future.ix[len(future) - 1, 'yhat']
            actual_price = test.ix[len(test) - 1, 'y']
            yield [partition.code]+['open',predict_price,actual_price,train_mean_error,test_mean_error,increase_accuracy,decrease_accuracy,in_range_accuracy] +[partition.changepoint_prior_scale,partition.seasonality_prior_scale,partition.monthly_seasonality,partition.yearly_seasonality]

现在训练过程很慢,evaluate_result计数是100,我需要5个小时才能训练100次。如何提高训练速度?谢谢!

标签: pysparkfacebook-prophet

解决方案


推荐阅读