首页 > 解决方案 > 使用 aws 胶水和雪花获取表“NM_TEMP_STAGING_1100952600”不存在

问题描述

我正在使用胶水作业来编写数据管道。我从社区获取代码,如下

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from py4j.java_gateway import java_import
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake" 
#args = getResolvedOptions(sys.argv, ['JOB_NAME'])
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'URL', 'ACCOUNT', 'WAREHOUSE', 'DB', 'SCHEMA', 'USERNAME', 'PASSWORD', 'ROLE'])

sparkContext = SparkContext()
glueContext = GlueContext(sparkContext)
sparkSession = glueContext.spark_session


glueJob = Job(glueContext)
glueJob.init(args['JOB_NAME'], args)

##Use the CData JDBC driver to read Snowflake data from the Products table into a DataFrame
##Note the populated JDBC URL and driver class name

java_import(sparkSession._jvm, SNOWFLAKE_SOURCE_NAME)
sparkSession._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(sparkSession._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
tmp_dir=args["TempDir"]
sfOptions = {
"sfURL" : args['URL'],
"sfAccount" : args['ACCOUNT'],
"sfUser" : args['USERNAME'],
"sfPassword" : args['PASSWORD'],
"sfDatabase" : args['DB'],
"sfSchema" : args['SCHEMA'],
"sfRole" : args['ROLE'],
"sfWarehouse" : args['WAREHOUSE'],
"preactions" : "USE DATABASE dev_lz;",
}

#"tempDir" : tmp_dir,

print('=========DB Connection details ================== ', sfOptions)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "aws-nonprod-datalake-glue-catalog", table_name = "nm_s_amaster", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [ mappings], transformation_ctx = "applymapping1")
selectfields2 = SelectFields.apply(frame = applymapping1, paths = [columns], transformation_ctx = "selectfields2")
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "aws-nonprod-datalake-glue-catalog", table_name = "NM_TEMP", transformation_ctx = "resolvechoice3")
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")

##Convert DataFrames to AWS Glue's DynamicFrames Object

    resolvechoice4.toDF().write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("preactions","USE DATABASE dev_lz").option("dbtable", "nm_temp").mode("overwrite").save()


glueJob.commit()

但运行代码后,我得到 net.snowflake.client.jdbc.SnowflakeSQLException: SQL 编译错误:表 'NM_TEMP_STAGING_1100952600' 不存在

如果我遗漏了什么,请告诉我。我有权创建、选择阶段、创建、选择表和创建未来表。上面的代码我已经删除了列和映射。但原始代码可用。

标签: snowflake-cloud-data-platform

解决方案


resolvechoice4.toDF().write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("preactions","USE DATABASE dev_lz").option("dbtable", "nm_temp").mode("overwrite").save()

在上面的 dbtable 选项中添加了以下内容,它开始工作,

.option("preactions","USE ROLE DEVELOPER;USE DATABASE dev_db;USE SCHEMA aws_test")

如下

resolvechoice4.toDF().write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("preactions","USE DATABASE dev_lz").option("preactions","USE ROLE DEVELOPER;USE DATABASE dev_db;USE SCHEMA aws_test").option("dbtable", "nm_temp").mode("overwrite").save()

推荐阅读