apache-spark - 使用 Azure Databricks 创建值和时间戳数据框时出错
问题描述
我对 Spark 不太熟悉,但我不得不用它来消耗一些数据。我基本上已经尝试了所有我能找到的语法来制作一个带有值和时间戳的数据框,我可以将其放入数据库中,以便在我从数据源获取更新时进行跟踪。错误是无穷无尽的,我没有想法,也缺乏为什么我不能把事情做得这么简单的原因。下面是我试图开始工作的代码示例
sc = spark.sparkContext
df = sc.parallelize([[1,pyspark.sql.functions.current_timestamp()]]).toDF(("Value","CreatedAt"))
这个错误并没有真正帮助
py4j.Py4JException: Method __getstate__([]) does not exist
---------------------------------------------------------------------------
Py4JError Traceback (most recent call last)
<command-1699228214903488> in <module>
29
30 sc = spark.sparkContext
---> 31 df = sc.parallelize([[1,pyspark.sql.functions.current_timestamp()]]).toDF(("Value","CreatedAt"))
/databricks/spark/python/pyspark/context.py in parallelize(self, c, numSlices)
557 return self._jvm.PythonParallelizeServer(self._jsc.sc(), numSlices)
558
--> 559 jrdd = self._serialize_to_jvm(c, serializer, reader_func, createRDDServer)
560
561 return RDD(jrdd, self, serializer)
/databricks/spark/python/pyspark/context.py in _serialize_to_jvm(self, data, serializer, reader_func, createRDDServer)
590 try:
591 try:
--> 592 serializer.dump_stream(data, tempFile)
593 finally:
594 tempFile.close()
我也试过这个
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc) # sc is the spark context
df = sqlContext.createDataFrame(
[( current_timestamp(), '12a345')],
['CreatedAt','Value'] # the row header/column labels should be entered here
)
随着错误
AssertionError: dataType <py4j.java_gateway.JavaMember object at 0x7f43d97c6ba8> should be an instance of <class 'pyspark.sql.types.DataType'>
---------------------------------------------------------------------------
AssertionError Traceback (most recent call last)
<command-2294571935273349> in <module>
33 df = sqlContext.createDataFrame(
34 [( current_timestamp(), '12a345')],
---> 35 ['CreatedAt','Value'] # the row header/column labels should be entered here
36 )
37
/databricks/spark/python/pyspark/sql/context.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
305 Py4JJavaError: ...
306 """
--> 307 return self.sparkSession.createDataFrame(data, schema, samplingRatio, verifySchema)
308
309 @since(1.3)
/databricks/spark/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
815 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
816 else:
--> 817 rdd, schema = self._createFromLocal(map(prepare, data), schema)
818 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
解决方案
好吧,我最终编写了一些代码。不过,我无法让它与 TimestampType() 一起使用,插入数据时会触发火花。我认为这可能是运行时错误,而不是编码问题。
import adal
import datetime;
from pyspark.sql.types import *
# Set Access Token
access_token = token["accessToken"]
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc) # sc is the spark context
schema = StructType([
StructField("CreatedAt", StringType(), True),
StructField("value", StringType(), True)
])
da = datetime.datetime.now().strftime("%m/%d/%Y %H:%M:%S")
df = sqlContext.createDataFrame(
[(da,'12a345')],schema
)
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url)\
.option("dbtable", "dbo.RunStart")\
.option("accessToken", access_token)\
.option("databaseName", database_name) \
.option("encrypt", "true")\
.option("hostNameInCertificate", "*.database.windows.net")\
.option("applicationintent", "ReadWrite") \
.mode("append") \
.save()
推荐阅读
- powershell - 用于远程计算机的 Powershell Get-Volumn
- r - Matching a dataset with the closest neighbour in another dataset
- javascript - 让子函数递归
- object - Delphi编译器错误?使用“对象”,但使用“记录”编译
- r - R: data.table inside GA optimization throws error
- javascript - React Native FlatList data gets cut off when FlatList is pushed down
- pytorch - Does a 'ConcatDataset' in pytorch iterates each Datasets ununiformly?
- python - 没有找到 django 的 HTML 图像
- python - 是否可以使用 forloop 将数组存储为唯一变量?
- java - PJSUA2 -" Failed connecting media ports pj::AudioMedia const & reference is null" on certain devices