amazon-web-services - 将 DF 创建到 tempView 后运行 Spark Glue 作业时出错
问题描述
Explanation
当我从动态框架创建 DF 时,它工作正常,并且我能够将数据帧写回动态框架,但是当我将数据帧转换为 createOrReplaceTempView 时,
它会抛出这个错误。列数相同,从源到目标没有任何变化
请帮助我出了什么问题
ERROR
IllegalArgumentException: "requirement failed: The number of columns doesn't
match.\nOld column names (2): name, id\nNew column names (0): "
pysparkGlueCode
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkConf,SparkContext
from pyspark.sql import *
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database =
"mygluedatabaseoregon", table_name = "dev_glue_poc_gluetable", redshift_tmp_dir =
args["TempDir"], transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("name",
"string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx =
"applymapping1")
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["name", "id"],
transformation_ctx = "selectfields2")
getOutput = selectfields2.toDF() #.select('name','id')
getOutput.createOrReplaceTempView("info")
sqlData = spark.sql("select name,id from info")
outputSql = sqlData.toDF()
getOutputDFY = DynamicFrame.fromDF(outputSql,glueContext,"getOutputDFY")
resolvechoice3 = ResolveChoice.apply(frame = getOutputDFY, choice =
"MATCH_CATALOG", database = "mygluedatabaseoregon", table_name =
"dev_glue_poc_importfromglue", transformation_ctx = "resolvechoice3")
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols",
transformation_ctx = "resolvechoice4")
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4,
database = "mygluedatabaseoregon", table_name = "dev_glue_poc_importfromglue",
redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
job.commit()
解决方案
推荐阅读
- python - (美丽的soap4,)AttributeError:'NoneType'对象没有属性'get_text'
- python - Discord.py 频道检测
- mysql - 在 docker 文件中为 Github Actions CI Build 关闭 MySQL 严格模式
- r - 获得 NA 在合并中合并第二对列中的一个/或类型的匹配
- python - 用值替换 DataFrame 列中的字典键
- gulp - 使用 gulp 将一个文件夹中的所有文件覆盖到另一个文件夹
- pyspark - 创建一列以在数组 psypark 中累积数据
- snowflake-cloud-data-platform - 以特定顺序对列项目进行排序或排序使用雪花代码
- sybase - 在sybase存储过程中将日期时间作为参数传递
- mysql - 按观看视频的时间列出前 5 位用户