首页 > 解决方案 > 在 cassandra 中写入 Spark Streaming 中的数据丢失

问题描述

我正在将数据写入 cassandra 和文本文件。几分钟后,我停止了这个过程。然后,例如,我在 cassandra 中有 82035 行,在文件中有 96749 行。

我在不在数据库中的 txt 文件中发现了明显有效的数据。举例:

promerar|082220.80|4158.5985417|00506.7613786
MOLIIUO|082220|4107.4749|00444.2117
josehrin|082220|4159.1124|00455.1298

这是代码:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 pyspark-shell'
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row

conf = SparkConf() \
    .setAppName("Streaming test") \
    .setMaster("local[2]") \
    .set("spark.cassandra.connection.host", "127.0.0.1")
sc = SparkContext(conf=conf) 
sqlContext=SQLContext(sc)
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
count =0

def saveToCassandra(rows):
    if not rows.isEmpty():
        sqlContext.createDataFrame(rows).write\
        .format("org.apache.spark.sql.cassandra")\
        .mode('append')\
        .options(table="puntos", keyspace="test_rtk")\
        .save()


def saveCoord(rdd):
    rdd.foreach(lambda x: open("/tmp/p1", "a").write(x[0]+"|"+x[2]+"|"+x[3]+"|"+x[5]+"\n"))


ssc = StreamingContext(sc, 1)
kvs = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {'test': 2})
data = kvs.map(lambda x: x[1].split(","))
rows= data.map(lambda x:Row(long=x[5],lat=x[3],date=time.strftime("%Y-%m-%d"),time=x[2],user=x[0]))
rows.foreachRDD(saveToCassandra)
data.foreachRDD(saveCoord)

ssc.start()

这是在 cassandra 中创建的表:

CREATE KEYSPACE test_rtk WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

CREATE TABLE test_rtk.puntos(
 long text,
 lat text,
 time text,
 date text,
 user TEXT,
PRIMARY KEY (time,long,lat)
);

你能帮我吗?

标签: pythonspark-streamingcassandra-2.0

解决方案


在这里查看更好的解决方案

Spark Streaming 编程指南


推荐阅读