首页 > 解决方案 > 尝试使用 PySpark 读取 MySQL 表时出错

问题描述

我目前正在尝试使用 Pyspark 从 Pycharm 中的 MySQl 中读取数据。我的目标是读取数据并将其加载到 HDFS 中。这是它的开始。

from pyspark.sql import SparkSession
if __name__ == "__main__":
    print("Read MySQL Table Demo - Application Started...")
    spark = SparkSession \
       .builder \
       .appName("Read MySQL Table Demo") \
       .config("spark.jars", "file:///home/amel/Downloads/mysql-connector-java-8.0.25.jar") \
       .config("spark.executor.extraClassPath", "file:///home/amel/Downloads/mysql-connector-java-8.0.25.jar") \
       .config("spark.executor.extraLibrary", "file:///home/amel/Downloads/mysql-connector-java-8.0.25.jar") \
       .config("spark.driver.extraClassPath", "file:///home/amel/Downloads/mysql-connector-java-8.0.25.jar") \
       .enableHiveSupport() \
       .getOrCreate()


    spark.sparkContext.setLogLevel( " ERROR " )

    mysql_db_driver_class = "com.mysql.jdbc.Driver"
    table_name = "mock3"
    host_name = "localhost"
    port_no = str(3306)
    user_name = "Amel"
    password = "Amel@-1998"
    database_name = "testDb"

mysql_select_query= None
mysql_select_query = "(select * from" + table_name + ") as users"
print("Printing mysql_select_query:")
print(mysql_select_query)

mysql_jdbc_url = "jdbc:mysql://" + host_name + " :" + port_no + "/" + database_name
print("Printing JDBC Url:" + mysql_jdbc_url)

trans_detail_tbl_data_df = spark.read.format("jdbc") \
.option("url", mysql_jdbc_url) \
.option("driver", mysql_db_driver_class) \
.option("dbtable ", mysql_select_query) \
.option("user", user_name) \
.option("password", password) \
.load()

trans_detail_tbl_data_df.show(10, False)

print("Read MySQL Table Demo = Application Completed.")

这些是错误

/home/amel/PycharmProjects/pythonProject/venv/bin/python /home/amel/PycharmProjects/pythonProject/Hello.py
Traceback (most recent call last):
  File "/home/amel/PycharmProjects/pythonProject/Hello.py", line 1, in <module>
    from pyspark.sql import SparkSession
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
  File "<frozen importlib._bootstrap>", line 618, in _load_backward_compatible
  File "<frozen zipimport>", line 259, in load_module
  File "/home/amel/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 51, in <module>
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
  File "<frozen importlib._bootstrap>", line 618, in _load_backward_compatible
  File "<frozen zipimport>", line 259, in load_module
  File "/home/amel/spark/python/lib/pyspark.zip/pyspark/context.py", line 31, in <module>
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
  File "<frozen importlib._bootstrap>", line 618, in _load_backward_compatible
  File "<frozen zipimport>", line 259, in load_module
  File "/home/amel/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 97, in <module>
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
  File "<frozen importlib._bootstrap>", line 618, in _load_backward_compatible
  File "<frozen zipimport>", line 259, in load_module
  File "/home/amel/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 71, in <module>
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
  File "<frozen importlib._bootstrap>", line 618, in _load_backward_compatible
  File "<frozen zipimport>", line 259, in load_module
  File "/home/amel/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 145, in <module>
  File "/home/amel/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 126, in _make_cell_set_template_code
TypeError: an integer is required (got type bytes)

Process finished with exit code 1

我想先读取 MySQl 中的数据。之后,我计划将数据加载到 HDFS 中。在我可以继续下一步之前,我面临着这些错误

标签: pythonmysqlapache-sparkpysparkapache-spark-sql

解决方案


因此,为了使用 python 读取 mySQL 数据,我使用了这个:

import mysql.connector

mydb = mysql.connector.connect(
    host='localhost',
    user='Amel',
    password='Amel@-1998',
    port='3306',
    database='testDb'
)
mycursor = mydb.cursor()
mycursor.execute('SELECT * FROM mock3')
users = mycursor.fetchall()
for user in users:
    print(user)

我还添加了 Mysql-connector-python


推荐阅读