首页 > 技术文章 > pyspark操作数据库

hziwei 2020-07-23 18:09 原文

使用pyspark连接数据库

# spark操作数据库
from pyspark.sql import SQLContext, SparkSession
from pyspark import SparkConf, SparkContext

# spark地址
master_url = "spark://hdp-100:7777"
conf = SparkConf().setAppName("mainProject").setMaster(master_url).set("spark.sql.execution.arrow.enabled", "true")
sc = SparkContext.getOrCreate(conf)
sql_context = SQLContext(sc)
url = "jdbc:mysql://192.168.130.77:3306/paixin?useSSL=false&useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&allowPublicKeyRetrieval=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
table = "paixin"
properties = {"user": user, "password": password}
df = sql_context.read.jdbc(url=url, table=table, properties=properties)
print(df)
# print(df.select('id'))
print(df.show(10))
sql = "select * from paixin where id < 5000"
print(sql)
df.createOrReplaceTempView(table)
df2 = sql_context.sql(sql)

print(df2.collect())

操作

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
import pandas as pd

# 设置spark地址
master_url = "spark://hdp-100:7777"
conf = SparkConf().setAppName("paixin").setMaster(master_url).set("spark.sql.execution.arrow.enabled", "true")
sc = SparkContext.getOrCreate(conf)
sql_context = SQLContext(sc)
url = "jdbc:mysql://192.168.130.77:3306/paixin?useSSL=false&useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&allowPublicKeyRetrieval=true"
user = "root"
password = "123456"
table = "paixin"
properties = {"user": user, "password": password}
df = sql_context.read.jdbc(url=url, table=table, properties=properties)
print(df)
print(df.show(10))
print("*" * 100)
print(df.select('store_path').count())
print(df.select("store_path").distinct().count())
print("*" * 100)
print(df)
print(type(df))
print(df.filter(df.store_path == "D:图片拍信/交通运输/公路交通在日落.jpg").show(truncate=False))
print(df.orderBy(df.img_id.desc()).show(truncate=False, n=100))
# show函数默认打印20行,可以指定,truncate=False显示所有信息
print(df.show(truncate=False))
print("*" * 20, "1.show", "*" * 20)
print(df.show(truncate=False))
print("*" * 20, "2.以树的形式打印概要", "*" * 20)
print(df.printSchema())
print("*" * 20, "3.获取头几行到本地", "*" * 20)
print(df.head(3))
print(df.take(5))
print("*" * 20, "4.查询总行数", "*" * 20)
print(df.count())
print("*" * 20, "5.取别名", "*" * 20)
print(df.select(df.img_id.alias("图片id"), 'img_url').show())
print("*" * 20, "6.查询某列为null的行", "*" * 20)
from pyspark.sql.functions import isnull
print(df.filter(isnull('img_id')).show())
print("*" * 20, "7.收集到本地,耗费资源", "*" * 20)
# print(df.collect())
print("*" * 20, "8.查询概况", "*" * 20)
print(df.describe().show())
print("*" * 20, "9.去重set操作", "*" * 20)
print(df.select("img_id").distinct().show())
print("*" * 20, "10.选择一列或多列", "*" * 20)
# print(df['img_id'].show()) x
# print(df.img_id.show()) x
print(df.select('img_id').show())
print(df.select('img_id', 'img_url').show())
print("*" * 20, "11.可以用where按条件选择", "*" * 20)
print(df.where("store_path like '%交通运输%'").show(truncate=False))
print("*" * 20, "12.排序,默认按升序orderBy和sort", "*" * 20)
print(df.orderBy(df.id.desc()).show(truncate=False))
print("*" * 20, "13.抽样", "*" * 20)
print(df.sample(False, 0.2, 42).show())
print(df.sample(False, 0.2, 43).show())
print("*" * 20, "14.when满足条件和不满足条件应该怎么赋值", "*" * 20)
from pyspark.sql import functions as F
print(df.select(df.store_path, F.when(df.id < 3400, 1).when(df.id > 3500, -1).otherwise(0)).show(300))
print("*" * 20, "15.between筛选出某个范围的值,返回的市TRUE or FALSE", "*" * 20)
print(df.select(df.store_path, df.id.between(3400, 3456)).show(100))
print("*" * 20, "16.筛选特定行数", "*" * 20)
# 1.先添加索引
from pyspark.sql.functions import monotonically_increasing_id
dfWithIndex = df.withColumn("id", monotonically_increasing_id())
print(dfWithIndex.select(dfWithIndex.store_path, dfWithIndex.id.between(50, 100)).show())
print("*" * 20, "17.apply函数", "*" * 20)

推荐阅读