首页 > 解决方案 > 使用行中的第 n 个元素从 RDD 创建对

问题描述

我用过这段代码:

def process_row(row):
words = row.replace('"', '').split(' ')
for i in range(len(words)):
      #if we find ‘-’ we will replace it with ‘0’
      if(words[-1]=='-'):
          words[i]='0'
return words
return [words(0),words(1), words(2), words(3), words(4), int(words(5))]

nasa = (
nasa_raw.flatMap(process_row)
)
nasa.persist()
for row in nasa.take(10):
print(row)

转换此数据:

in24.inetnebr.com [01/Aug/1995:00:00:01] "GET /shuttle/missions/sts-68/news/sts-68-mcc-05.txt" 200 
1839
 uplherc.upl.com [01/Aug/1995:00:00:07] "GET /" 304 0
uplherc.upl.com [01/Aug/1995:00:00:08] "GET /images/ksclogo-medium.gif" 304 0
uplherc.upl.com [01/Aug/1995:00:00:08] "GET /images/MOSAIC-logosmall.gif" 304 0
 uplherc.upl.com [01/Aug/1995:00:00:08] "GET /images/USA-logosmall.gif" 304 0
ix-esc-ca2-07.ix.netcom.com [01/Aug/1995:00:00:09] "GET /images/launch-logo.gif" 200 1713
uplherc.upl.com [01/Aug/1995:00:00:10] "GET /images/WORLD-logosmall.gif" 304 0
slppp6.intermind.net [01/Aug/1995:00:00:10] "GET /history/skylab/skylab.html" 200 1687
piweba4y.prodigy.com [01/Aug/1995:00:00:10] "GET /images/launchmedium.gif" 200 11853
slppp6.intermind.net [01/Aug/1995:00:00:11] "GET /history/skylab/skylab-small.gif" 200 9202

进入这个管道rdd:

in24.inetnebr.com
[01/Aug/1995:00:00:01]
 GET
 /shuttle/missions/sts-68/news/sts-68-mcc-05.txt
 200
 1839
 uplherc.upl.com
 [01/Aug/1995:00:00:07]
 GET
 /

我想通过使用对创建地址的频率,例如: uplherc.upl.com :

pairs = nasa.map(lambda x: (x , 1))
count_by_resource = pairs.reduceByKey(lambda x, y : x + y)
count_by_resource =  count_by_resource.takeOrdered(10, key = lambda x: -x[1])
spark.createDataFrame(count_by_resource, ['Resource_location','Count']).show(10)

但结果是每个元素频率的东西:

   --------------------+-------+
   |   Resource_location|  Count|
   +--------------------+-------+
   |                 GET|1551681|
   |                 200|1398910|
   |                   0| 225418|

我应该如何提及我感兴趣的元素?

标签: pythonapache-sparkbigdatardddatabricks

解决方案


当您主要对域的计数感兴趣时,将每一行按空格分割然后创建所有这些值的平面图可能会带来额外的工作,并且肯定会带来额外的开销和处理。

根据提供的样本数据,域是每行的第一项。我还注意到,您的某些行以空格开头,因此会产生额外的字符串片段。您可以考虑strip在处理之前使用该功能修剪线条。

您可以考虑修改过程以仅返回字符串的第一位或创建另一个map操作。


def extract_domain_from_row(row):
    # if row is a string
    domain = row.strip().split(' ')[0]
    # if you send a list, you could always extract the first item from that list as the domain name
    # domain = row[0]
    return domain.lower()

#intermediary rdd 
nasa_domains = nasa_raw.map(extract_domain_from_row)

# continue operations as desired with `nasa`
pairs = nasa_domains.map(lambda x: (x , 1))
count_by_resource = pairs.reduceByKey(lambda x, y : x + y)
count_by_resource =  count_by_resource.takeOrdered(10, key = lambda x: -x[1])
spark.createDataFrame(count_by_resource, ['Resource_location','Count']).show(10)

输出

+--------------------+-----+
|   Resource_location|Count|
+--------------------+-----+
|     uplherc.upl.com|    5|
|slppp6.intermind.net|    2|
|   in24.inetnebr.com|    1|
|ix-esc-ca2-07.ix....|    1|
|piweba4y.prodigy.com|    1|
+--------------------+-----+

如果第一项不是域,您可能希望使用模式过滤您的集合以匹配域,请参阅此处的建议domain regex suggestions


推荐阅读