首页 > 解决方案 > 为什么我的 MySQL 数据库在运行 cron 作业时断开连接?

问题描述

我正在 Databricks 笔记本上运行一项作业,该笔记本连接到我在 AWS RDS 上的 MySQL 数据库并插入数据。当我手动运行笔记本时,我能够连接到端点 URL 并插入我的数据。现在我的笔记本每 30 分钟运行一次玉米作业。第一个作业成功,但之后的每个作业都因以下错误而失败:

MySQLInterfaceError: MySQL server has gone away

然后我尝试再次手动运行我的工作,我在tweets_pdf.to_sql(name='tweets', con=engine, if_exists = 'replace', index=False). 这是在 Databricks 笔记本中运行的代码:

from __future__ import print_function
import sys
import pymysql
import os
import re
import mysql.connector
from sqlalchemy import create_engine
from operator import add
import pandas as pd
from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SQLContext
import json
import boto
import boto3
from boto.s3.key import Key
import boto.s3.connection
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import *

# Get AWS credentials
aws_key_id = os.environ.get("accesskeyid")
aws_key = os.environ.get("secretaccesskey")

# Start spark instance
conf = SparkConf().setAppName("first") 
sc = SparkContext.getOrCreate(conf=conf)

# Allow spark to access my S3 bucket
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId",aws_key_id)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",aws_key)
config_dict = {"fs.s3n.awsAccessKeyId":aws_key_id,
               "fs.s3n.awsSecretAccessKey":aws_key}
bucket = "diego-twitter-stream-sink"
prefix = "/2020/*/*/*/*"
filename = "s3n://{}/{}".format(bucket, prefix)

# Convert file from S3 bucket to an RDD
rdd = sc.hadoopFile(filename,
                'org.apache.hadoop.mapred.TextInputFormat',
                'org.apache.hadoop.io.Text',
                'org.apache.hadoop.io.LongWritable',
                conf=config_dict)
spark = SparkSession.builder.appName("PythonWordCount").config("spark.files.overwrite","true").getOrCreate()

# Map RDD to specific columns
df = spark.read.json(rdd.map(lambda x: x[1]))
features_of_interest = ["ts", "text", "sentiment"]
df_reduce = df.select(features_of_interest)



# Convert RDD to Pandas Dataframe
tweets_pdf = df_reduce.toPandas()

engine = create_engine(f'mysql+mysqlconnector://admin:{os.environ.get("databasepassword")}@{os.environ.get("databasehost")}/twitter-data')

tweets_pdf.to_sql(name='tweets', con=engine, if_exists = 'replace', index=False)

有谁知道可能是什么问题?所有数据库配置变量都是正确的,PySpark 从中流式传输的 S3 存储桶有数据,AWS RDS 远未达到任何容量或计算限制。

标签: pythonmysqlapache-sparkpyspark

解决方案


默认的max_allowed_pa​​ckets (4M) 可能会导致此问题


推荐阅读