python - Apache Flume 从 python 脚本中获取数据
问题描述
我正在运行一个 python 脚本来从新闻提供者那里收集数据,并在 flume.conf 文件中获取这个脚本。
我的 flume.conf 文件:
newsAgent.sources = r1
newsAgent.sinks = spark
newsAgent.channels = MemChannel
# Describe/configure the source
newsAgent.sources.r1.type = exec
newsAgent.sources.r1.command = python path_to/data_collector.py
# Describe the sink
newsAgent.sinks.spark.type = avro
newsAgent.sinks.spark.channel = memoryChannel
newsAgent.sinks.spark.hostname = localhost
newsAgent.sinks.spark.port = 4040
# Use a channel which buffers events in memory
newsAgent.channels.MemChannel.type = memory
newsAgent.channels.MemChannel.capacity = 10000
newsAgent.channels.MemChannel.transactionCapacity = 100
# Bind the source and sink to the channel
newsAgent.sources.r1.channels = MemChannel
newsAgent.sinks.spark.channel = MemChannel
日晒中的 python 脚本运行良好,我可以看到 json 数据被打印出来。但是当我通过水槽执行它并下沉数据以引发低于警告消息时。
警告信息
18/08/04 07:36:20 WARN HttpParser: Illegal character 0x0 in state=START
for buffer HeapByteBuffer@5ae61d8b[p=1,l=8192,c=8192,r=8191]= . {\x00<<<\x00\x00\x01\x00\x00\x00\x06\x00\x00\x000\x86\xAa\xDa\xE2\xC4T...ing town", "sum>>>}
18/08/04 07:36:20 WARN HttpParser: bad HTTP parsed: 400 Illegal character 0x0 for HttpChannelOverHttp@46691f53{r=0,c=false,a=IDLE,uri=null}
数据收集器.py
def process():
for k, v in news_source.items():
feeds = feedparser.parse(v)
for e in feeds.entries:
doc = json.dumps(
{"news_provider": k, "title": e.title.strip(), "summary": BeautifulSoup(e.summary, 'lxml').text.strip(),
"id": e.id.strip(), "published": e.published if e.has_key('published') else None})
print("%s"%doc)
流式脚本
def func():
sc = SparkContext(master="local[*]", appName="App")
ssc = StreamingContext(sc, 300)
flume_strm = FlumeUtils.createStream(ssc, "localhost", 9999)
lines = flume_strm.map(lambda v: json.loads(v[1]))
lines.pprint()
ssc.start()
ssc.awaitTermination()
使用的命令
bin/flume-ng agent --conf conf --conf-file libexec/conf/test.conf --name Agent -Dflume.root.logger=INFO,console
spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 path_to/streaming_script.py
我无法摆脱那些警告消息,我希望使用 pprint() 在 spark 日志中打印相同的 json 数据,稍后我可以相应地处理这些消息。
阅读流媒体内容时我是否缺少任何特定配置?我需要指定任何特定的编码器吗?
任何帮助表示赞赏。
解决方案
我一定和你看了同样的教程。我尝试了许多不同的选择。大多数都没有成功。但是我找到了一种解决方法:在您的 flume.conf 中使用 exec 源并完全按照您的方式调用脚本。但是,在您的 python 脚本中,将数据写入文件。然后在你的脚本(data_collector.py)停止执行之前“cat”文件。
我认为这是因为 exec 源需要“流式传输”数据,而简单地打印输出是行不通的。
我的设置与您的非常相似:
stream.py(为了便于理解删除了逻辑):
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
if __name__ == "__main__":
sc = SparkContext(appName="test");
ssc = StreamingContext(sc, 30)
stream = FlumeUtils.createStream(ssc, "127.0.0.1", 55555)
stream.pprint()
这是我的 data_collector.py(注意最后一行的“cat”命令):
#! /usr/bin/python
import requests
import random
class RandResp():
def __init__(self):
self.url = "https://swapi.co/api/people/"
self.rand = str(random.randint(0, 17))
self.r = requests.get(self.url + self.rand)
def get_r(self):
return(self.r.text)
if __name__ == "__main__":
import os
with open("exec.txt", "w") as file_in:
file_in.write(RandResp().get_r())
os.system("cat exec.txt")
这是我的flume.conf:
# list sources, sinks and channels in the agent
agent.sources = tail-file
agent.channels = c1
agent.sinks=avro-sink
# define the flow
agent.sources.tail-file.channels = c1
agent.sinks.avro-sink.channel = c1
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
# define source and sink
agent.sources.tail-file.type = exec
agent.sources.tail-file.command = python /home/james/Desktop/testing/data_collector.py
agent.sources.tail-file.channels = c1
agent.sinks.avro-sink.type = avro
agent.sinks.avro-sink.hostname = 127.0.0.1
agent.sinks.avro-sink.port = 55555
所以基本上在我的 data_collector.py 中,我只是做任何需要做的逻辑,将它写入一个名为 exec.txt 的文件,然后立即“cat”该文件。它有效......祝你好运
推荐阅读
- google-apps-script - 是否可以将数据范围从一个电子表格复制到另一个电子表格?
- java - 创建对象将最后创建的对象的值分配给java中Arraylist中的每个其他对象
- rust - 使用 std::iter::Iterator::reduce 进行求和的正确语法是什么?
- c# - 当用户输入与字符串匹配的关键字时,将执行 Else 语句
- python - python中的Psychopy:按下键时有两个答案,释放键时有第二个答案
- ios - 使用 textField 以编程方式加载键盘并使用 swift 发送按钮
- php - 如何记录通过php销毁会话的时间
- d3.js - ObservableHQ 中的 FileAttachment - 如果在 2 个单元格或 1 个单元格内,则行为不同:TypeError:reading.map 不是函数
- r - 使用R的宽格式数据帧到长格式数据帧
- php - 错误 [RuntimeException] 无法扫描“数据库/种子”中的类