python - 为什么我的 MySQL 数据库在运行 cron 作业时断开连接?
问题描述
我正在 Databricks 笔记本上运行一项作业,该笔记本连接到我在 AWS RDS 上的 MySQL 数据库并插入数据。当我手动运行笔记本时,我能够连接到端点 URL 并插入我的数据。现在我的笔记本每 30 分钟运行一次玉米作业。第一个作业成功,但之后的每个作业都因以下错误而失败:
MySQLInterfaceError: MySQL server has gone away
然后我尝试再次手动运行我的工作,我在tweets_pdf.to_sql(name='tweets', con=engine, if_exists = 'replace', index=False)
. 这是在 Databricks 笔记本中运行的代码:
from __future__ import print_function
import sys
import pymysql
import os
import re
import mysql.connector
from sqlalchemy import create_engine
from operator import add
import pandas as pd
from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SQLContext
import json
import boto
import boto3
from boto.s3.key import Key
import boto.s3.connection
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import *
# Get AWS credentials
aws_key_id = os.environ.get("accesskeyid")
aws_key = os.environ.get("secretaccesskey")
# Start spark instance
conf = SparkConf().setAppName("first")
sc = SparkContext.getOrCreate(conf=conf)
# Allow spark to access my S3 bucket
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId",aws_key_id)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",aws_key)
config_dict = {"fs.s3n.awsAccessKeyId":aws_key_id,
"fs.s3n.awsSecretAccessKey":aws_key}
bucket = "diego-twitter-stream-sink"
prefix = "/2020/*/*/*/*"
filename = "s3n://{}/{}".format(bucket, prefix)
# Convert file from S3 bucket to an RDD
rdd = sc.hadoopFile(filename,
'org.apache.hadoop.mapred.TextInputFormat',
'org.apache.hadoop.io.Text',
'org.apache.hadoop.io.LongWritable',
conf=config_dict)
spark = SparkSession.builder.appName("PythonWordCount").config("spark.files.overwrite","true").getOrCreate()
# Map RDD to specific columns
df = spark.read.json(rdd.map(lambda x: x[1]))
features_of_interest = ["ts", "text", "sentiment"]
df_reduce = df.select(features_of_interest)
# Convert RDD to Pandas Dataframe
tweets_pdf = df_reduce.toPandas()
engine = create_engine(f'mysql+mysqlconnector://admin:{os.environ.get("databasepassword")}@{os.environ.get("databasehost")}/twitter-data')
tweets_pdf.to_sql(name='tweets', con=engine, if_exists = 'replace', index=False)
有谁知道可能是什么问题?所有数据库配置变量都是正确的,PySpark 从中流式传输的 S3 存储桶有数据,AWS RDS 远未达到任何容量或计算限制。
解决方案
默认的max_allowed_packets (4M) 可能会导致此问题
推荐阅读
- .net-core - NSwag 可以在具有传统路由的自托管 .net 核心 API 中使用吗?
- r - R:过滤出现在较小向量中的大向量元素
- flutter - Flutter 可以在 kiosk 模式下制作 Application 吗?
- excel - 使一个工作簿中的范围等于另一个工作簿中的相同大小范围
- vb.net - 应用程序在系统托盘中时显示工具提示?
- mysql - 根据查询结果重新分配号码范围
- typescript - 将 tsconfig 设置为忽略“类型 'bar' 上不存在属性 'foo'。” 但仅适用于特定变量
- python-3.x - 带有覆盆子的数据矩阵
- kotlin - JVM 使用 Java_xx 方法而不是 JavaCritical_xx
- ionic4 - 包含离子卡的 Ionic 4 离子载玻片 - 布局问题