python - 在 cassandra 中写入 Spark Streaming 中的数据丢失
问题描述
我正在将数据写入 cassandra 和文本文件。几分钟后,我停止了这个过程。然后,例如,我在 cassandra 中有 82035 行,在文件中有 96749 行。
我在不在数据库中的 txt 文件中发现了明显有效的数据。举例:
promerar|082220.80|4158.5985417|00506.7613786
MOLIIUO|082220|4107.4749|00444.2117
josehrin|082220|4159.1124|00455.1298
这是代码:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 pyspark-shell'
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
conf = SparkConf() \
.setAppName("Streaming test") \
.setMaster("local[2]") \
.set("spark.cassandra.connection.host", "127.0.0.1")
sc = SparkContext(conf=conf)
sqlContext=SQLContext(sc)
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
count =0
def saveToCassandra(rows):
if not rows.isEmpty():
sqlContext.createDataFrame(rows).write\
.format("org.apache.spark.sql.cassandra")\
.mode('append')\
.options(table="puntos", keyspace="test_rtk")\
.save()
def saveCoord(rdd):
rdd.foreach(lambda x: open("/tmp/p1", "a").write(x[0]+"|"+x[2]+"|"+x[3]+"|"+x[5]+"\n"))
ssc = StreamingContext(sc, 1)
kvs = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {'test': 2})
data = kvs.map(lambda x: x[1].split(","))
rows= data.map(lambda x:Row(long=x[5],lat=x[3],date=time.strftime("%Y-%m-%d"),time=x[2],user=x[0]))
rows.foreachRDD(saveToCassandra)
data.foreachRDD(saveCoord)
ssc.start()
这是在 cassandra 中创建的表:
CREATE KEYSPACE test_rtk WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};
CREATE TABLE test_rtk.puntos(
long text,
lat text,
time text,
date text,
user TEXT,
PRIMARY KEY (time,long,lat)
);
你能帮我吗?
解决方案
在这里查看更好的解决方案
推荐阅读
- java - 如何在 Spring Integration DSL 中实现简单的 echo socket 服务
- javascript - 如何从客户端清除 rad 列表框项目
- javascript - ES6 数组交换的时间/空间复杂度是多少?
- opengl - 如何索引 3D 空间中的点网格以使用 OpenGL GL_LINES 进行绘制?
- firebase - 带有颤动的timetemp的Firestore查询
- python - Spyder3 未启动 - 如何启动?
- r - 将多个日期和时间列合并到一个日期时间列 R
- javascript - 当我用javascript选择值“1”时显示模型
- javascript - 从 Google Maps Autocomplete 获取 JSON 并将其提交到 Django 表单数据中
- xml - E-BIZ并发程序XML文件的执行流程是什么?