目录
PYSPARK学习笔记
Defining a schema
# Import the pyspark.sql.types library
from pyspark.sql.types import *
# Define a new schema using the StructType method
people_schema = StructType([
# Define a StructField for each field
StructField('name', StringType(), False),
StructField('age', IntegerType(), False),
StructField('city', StringType(), False)
])
展示数据
# Load the CSV file
aa_dfw_df = spark.read.format('csv').options(Header=True).load('AA_DFW_2018.csv.gz')
# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport'])) #增加一列名为airport的,并置为小写
# Drop the Destination Airport column
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport']) # 删掉Destination Airport这一列
# Show the DataFrame
aa_dfw_df.show() # show等价于head
parquet文件读写
为了节省内存
pyspark 写文件到hdfs (一般都存为parquet读写都比json、csv快,还节约约75%存储空间)
其他hadoop生态圈内容在这
# View the row count of df1 and df2
print("df1 Count: %d" % df1.count())
print("df2 Count: %d" % df2.count())
# Combine the DataFrames into one
df3 = df1.union(df2) # 等价于r里面的rbind,就是按行拼接
# Save the df3 DataFrame in Parquet format
df3.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')
# Read the Parquet file into a new DataFrame and run a count
print(spark.read.parquet('AA_DFW_ALL.parquet').count())
# Read the Parquet file into flights_df
flights_df = spark.read.parquet('AA_DFW_ALL.parquet')
# Register the temp table
flights_df.createOrReplaceTempView('flights') # 创建一个可替换的临时表
# Run a SQL query of the average flight duration
avg_duration = spark.sql('SELECT avg(flight_duration) from flights').collect()[0]
print('The average flight time is: %d' % avg_duration)
DataFrame column operations
对数据框列的操作
筛选操作
# Show the distinct VOTER_NAME entries
voter_df.select(voter_df['VOTER_NAME']).distinct().show(40, truncate=False) 去除重复值
# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')
# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))
# Show the distinct VOTER_NAME entries again
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)
# Show the distinct VOTER_NAME entries
voter_df.select(voter_df['VOTER_NAME']).distinct().show(40, truncate=False)
# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20'). # 筛选操作,同r
# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))
# Show the distinct VOTER_NAME entries again
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)
withcolumn
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+')) 增加一列
# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0)) # getItem是查一个字典的映射
# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))
# Drop the splits column
voter_df = voter_df.drop('splits') # 删除一列
# Show the voter_df DataFrame
voter_df.show() #等价于head()
select
# Add a column to voter_df for any voter with the title **Councilmember**
voter_df = voter_df.withColumn('random_val',
when(voter_df.TITLE == 'Councilmember', F.rand())) #新增一列random_val ,当title=='Councilmember'时,产生随机数
# Show some of the DataFrame rows, noting whether the when clause worked
voter_df.show()
# 查看一下结果
<script.py> output:
+----------+-------------+-------------------+--------------------+
| DATE| TITLE| VOTER_NAME| random_val|
+----------+-------------+-------------------+--------------------+
|02/08/2017|Councilmember| Jennifer S. Gates| 0.6072090670654174|
|02/08/2017|Councilmember| Philip T. Kingston| 0.8779894137938334|
|02/08/2017| Mayor|Michael S. Rawlings| null|
|02/08/2017|Councilmember| Adam Medrano| 0.2496996705882797|
|02/08/2017|Councilmember| Casey Thomas| 0.20338678125255483|
|02/08/2017|Councilmember|Carolyn King Arnold| 0.911553073855913|
|02/08/2017|Councilmember| Scott Griggs| 0.1134459593298831|
|02/08/2017|Councilmember| B. Adam McGough| 0.42041407481646487|
|02/08/2017|Councilmember| Lee Kleinman| 0.9109217573924748|
|02/08/2017|Councilmember| Sandy Greyson|0.055814633336865205|
|02/08/2017|Councilmember| Jennifer S. Gates| 0.9429223451510873|
|02/08/2017|Councilmember| Philip T. Kingston|0.022915415927586502|
|02/08/2017| Mayor|Michael S. Rawlings| null|
|02/08/2017|Councilmember| Adam Medrano| 0.9833216773540682|
|02/08/2017|Councilmember| Casey Thomas| 0.2944981610876857|
|02/08/2017|Councilmember|Carolyn King Arnold| 0.67447246683049|
|02/08/2017|Councilmember| Rickey D. Callahan| 0.20480391619888827|
|01/11/2017|Councilmember| Jennifer S. Gates| 0.14057384767559866|
|04/25/2018|Councilmember| Sandy Greyson| 0.6598564900991037|
|04/25/2018|Councilmember| Jennifer S. Gates| 0.9412719200394332|
+----------+-------------+-------------------+--------------------+
only showing top 20 rows
when
# Add a column to voter_df for a voter based on their position
voter_df = voter_df.withColumn('random_val',
when(voter_df.TITLE == 'Councilmember', F.rand()) #这个when完美的取代了if else。。
.when(voter_df.TITLE == 'Mayor', 2)
.otherwise(0))
# Show some of the DataFrame rows
voter_df.show()
# Use the .filter() clause with random_val
voter_df.filter(voter_df.random_val == 0).show()
### user defined functions
udf 自定义函数
```r
def getFirstAndMiddle(names):
# Return a space separated string of names
return ' '.join(names[:-1])
# Define the method as a UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType()). #函数的返回结果是字符串类型,这里需要定义一下函数,有点类似于声明一个函数
# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits)) #第一个参数是新增一列的名字,第二个参数就是调用这个函数并给函数赋值
# Show the DataFrame
voter_df.show()
<script.py> output:
+----------+-------------+-------------------+--------------------+----------+---------+---------------------+
| DATE| TITLE| VOTER_NAME| splits|first_name|last_name|first_and_middle_name|
+----------+-------------+-------------------+--------------------+----------+---------+---------------------+
|02/08/2017|Councilmember| Jennifer S. Gates|[Jennifer, S., Ga...| Jennifer| Gates| Jennifer S.|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...| Philip| Kingston| Philip T.|
|02/08/2017| Mayor|Michael S. Rawlings|[Michael, S., Raw...| Michael| Rawlings| Michael S.|
|02/08/2017|Councilmember| Adam Medrano| [Adam, Medrano]| Adam| Medrano| Adam|
|02/08/2017|Councilmember| Casey Thomas| [Casey, Thomas]| Casey| Thomas| Casey|
|02/08/2017|Councilmember|Carolyn King Arnold|[Carolyn, King, A...| Carolyn| Arnold| Carolyn King|
|02/08/2017|Councilmember| Scott Griggs| [Scott, Griggs]| Scott| Griggs| Scott|
|02/08/2017|Councilmember| B. Adam McGough| [B., Adam, McGough]| B.| McGough| B. Adam|
|02/08/2017|Councilmember| Lee Kleinman| [Lee, Kleinman]| Lee| Kleinman| Lee|
|02/08/2017|Councilmember| Sandy Greyson| [Sandy, Greyson]| Sandy| Greyson| Sandy|
|02/08/2017|Councilmember| Jennifer S. Gates|[Jennifer, S., Ga...| Jennifer| Gates| Jennifer S.|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...| Philip| Kingston| Philip T.|
|02/08/2017| Mayor|Michael S. Rawlings|[Michael, S., Raw...| Michael| Rawlings| Michael S.|
|02/08/2017|Councilmember| Adam Medrano| [Adam, Medrano]| Adam| Medrano| Adam|
|02/08/2017|Councilmember| Casey Thomas| [Casey, Thomas]| Casey| Thomas| Casey|
|02/08/2017|Councilmember|Carolyn King Arnold|[Carolyn, King, A...| Carolyn| Arnold| Carolyn King|
|02/08/2017|Councilmember| Rickey D. Callahan|[Rickey, D., Call...| Rickey| Callahan| Rickey D.|
|01/11/2017|Councilmember| Jennifer S. Gates|[Jennifer, S., Ga...| Jennifer| Gates| Jennifer S.|
|04/25/2018|Councilmember| Sandy Greyson| [Sandy, Greyson]| Sandy| Greyson| Sandy|
|04/25/2018|Councilmember| Jennifer S. Gates|[Jennifer, S., Ga...| Jennifer| Gates| Jennifer S.|
+----------+-------------+-------------------+--------------------+----------+---------+---------------------+
only showing top 20 rows
Partitioning and lazy processing
分区和延时处理
spark 不是实时处理的,而是延时处理任务,也就是lazy
I’m # Select all the unique council voters
voter_df = df.select(df["VOTER NAME"]).distinct() #查找值并去重,上次有个任务做了多次的groupby现在可以用这个
# Count the rows in voter_df
print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count()) #计数
# Add a ROW_ID
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id()) #增加一列
# Show the rows with 10 highest IDs in the set
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10) #排序任务
cache
缓存机制
start_time = time.time()
# Add caching to the unique rows in departures_df
departures_df = departures_df.distinct().cache()
# Count the unique rows in departures_df, noting how long the operation takes
print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))
# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))
计算时间
# Import the full and split files into DataFrames
full_df = spark.read.csv('departures_full.txt.gz')
split_df = spark.read.csv('departures_0*.txt.gz')
# Print the count and run time for each DataFrame
start_time_a = time.time(). #开始时间
print("Total rows in full DataFrame:\t%d" % full_df.count())
print("Time to run: %f" % (time.time() - start_time_a)) # 算运行时间
start_time_b = time.time()
print("Total rows in split DataFrame:\t%d" % split_df.count())
print("Time to run: %f" % (time.time() - start_time_b))
集群配置
# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')
# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')
# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')
# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)
# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()
# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)
# Recreate the DataFrame using the departures data file
departures_df = spark.read.csv('departures.txt.gz').distinct()
# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())
json
输出json格式
# Import the data to a DataFrame
departures_df = spark.read.csv('2015-departures.csv.gz', header=True)
# Remove any duration of 0
departures_df = departures_df.filter(departures_df[3] > 0)
# Add an ID column
departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())
# Write the file out to JSON format
departures_df.write.json('output.json', mode='overwrite'). #输出json格式
~ 这个符号表示去取反
# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], '\t')
# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))
# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (annotations_df["colcount"] < 5))
# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))