首页 > 解决方案 > 我可以在动态框架上使用 SQL 上下文吗?

问题描述

我开始将 AWS 服务用于 ETL。我的 Glue 作业存在一定的内存和资源堆叠问题,一位合作伙伴告诉我将 Spark 数据帧更改为动态帧(将其读取为动态帧),但我不确定我是否能够维护脚本(主要是 SQL 上下文),或者我是否需要将 SQL 上下文查询更改为 Scala/Pyspark 数据转换。

编辑:

我的胶水代码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark import SparkContext,SparkConf
from pyspark.sql.window import Window
from functools import reduce
from operator import add
import re


args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
sqlContext = SQLContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
spark._jsc.hadoopConfiguration().set("parquet.summary.metadata.level", "NONE")
df1_path = 's3_path'
df1= sqlContext.read.load(df1_path)
sqlContext.registerDataFrameAsTable(df1, "table1")
df2= sqlContext.sql("Query_to_table1")
sqlContext.registerDataFrameAsTable(df2, "table2")
df3 = sqlContext.sql("Query_to_table2")
df3_comp = df3.repartition(1)
output_s3_path = "out_s3_path"
df3_comp_DDF =  DynamicFrame.fromDF(df3_comp,glueContext,'output')
datasink = glueContext.write_dynamic_frame.from_options(frame = df3_comp_DDF , connection_type = "s3", connection_options = {"path": output_s3_path }, format = "parquet", transformation_ctx = "datasink")

我的胶水错误:

servlet.ServletHandler (ServletHandler.java:doHandle(632)) - /api/v1/applications/application_1579274094469_0001
java.lang.NullPointerException
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)
    at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
    at org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1772)
    at org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:171)
    at org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:582)
    at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
    at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
    at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
    at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:493)
    at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
    at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
    at org.spark_project.jetty.server.Server.handle(Server.java:539)
    at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:333)
    at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
    at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283)
at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:108)
    at org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
    at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
    at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
    at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
    at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
    at org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
    at java.lang.Thread.run(Thread.java:748)

标签: amazon-web-servicesamazon-s3pysparkapache-spark-sqlaws-glue

解决方案


glueContext = GlueContext(sc) 
spark = glueContext.spark_session

df = dyf.toDF()
df.createOrReplaceTempView(glue_table)
df = spark.sql(query)

spark 是 Spark SQL 上下文


推荐阅读