amazon-web-services - 我可以在动态框架上使用 SQL 上下文吗?
问题描述
我开始将 AWS 服务用于 ETL。我的 Glue 作业存在一定的内存和资源堆叠问题,一位合作伙伴告诉我将 Spark 数据帧更改为动态帧(将其读取为动态帧),但我不确定我是否能够维护脚本(主要是 SQL 上下文),或者我是否需要将 SQL 上下文查询更改为 Scala/Pyspark 数据转换。
编辑:
我的胶水代码:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark import SparkContext,SparkConf
from pyspark.sql.window import Window
from functools import reduce
from operator import add
import re
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
sqlContext = SQLContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
spark._jsc.hadoopConfiguration().set("parquet.summary.metadata.level", "NONE")
df1_path = 's3_path'
df1= sqlContext.read.load(df1_path)
sqlContext.registerDataFrameAsTable(df1, "table1")
df2= sqlContext.sql("Query_to_table1")
sqlContext.registerDataFrameAsTable(df2, "table2")
df3 = sqlContext.sql("Query_to_table2")
df3_comp = df3.repartition(1)
output_s3_path = "out_s3_path"
df3_comp_DDF = DynamicFrame.fromDF(df3_comp,glueContext,'output')
datasink = glueContext.write_dynamic_frame.from_options(frame = df3_comp_DDF , connection_type = "s3", connection_options = {"path": output_s3_path }, format = "parquet", transformation_ctx = "datasink")
我的胶水错误:
servlet.ServletHandler (ServletHandler.java:doHandle(632)) - /api/v1/applications/application_1579274094469_0001
java.lang.NullPointerException
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)
at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1772)
at org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:171)
at org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:582)
at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:493)
at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.spark_project.jetty.server.Server.handle(Server.java:539)
at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:333)
at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283)
at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:108)
at org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
at org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
at java.lang.Thread.run(Thread.java:748)
解决方案
glueContext = GlueContext(sc)
spark = glueContext.spark_session
df = dyf.toDF()
df.createOrReplaceTempView(glue_table)
df = spark.sql(query)
spark 是 Spark SQL 上下文
推荐阅读
- forge - 是否可以通过设计自动化 api 修改 revit 模型的尺寸?如果是的话怎么办?
- delphi - 写入后 TFileStream 的 MD5 哈希不正确
- python - Python/美丽汤。url列表->解析->将数据提取到csv。错误
- autocomplete - Google Geocode/Places API:搜索邮政编码 - 获取与其相关的地址,可能吗?
- node.js - 我做错了什么?
- pandas - 如何在 pandas 数据框中将同名列合并为一个,以便所有列都是唯一的?
- maven-plugin - 使用 native-image-maven-plugin 构建 GraalVM 错误
- javascript - CesiumJS JSON - 描述图像从信息框中删除属性
- object - 为扩展 Pane 的类创建的对象在哪里?
- python - 使用糟糕的计算机进行迁移学习