pyspark - 传递给 sparkSQL 的动态值
问题描述
基本上我在 pyspark SQL 中传递动态值。我的代码详细如下:
set_sql = "".join(["set app_list_0 = 'app_3'"])
sqlContext.sql(set_sql)
click_app_join_sql = sqlContext.sql("select click_id, (case when app_new in ${app_list_0} then 1 else 0 END) as ${app_list_0}, device, os, channel from clickDF ")
click_app = sqlContext.sql(click_app_join_sql)
click_app.show(3)
当我运行我的代码时,最终出现以下错误。你能告诉我上面的代码出了什么问题吗?
File "/home/saureddi/spark/data_process/hive_data_process.py", line 103, in <module>
click_app_join_sql = sqlContext.sql("select click_id, (case when app_new in ${app_list_0} then 1 else 0 END) as ${app_list_0}, device, os, channel from clickDF ")
File "/usr/hdp/2.6.4.0-91/spark2/python/lib/pyspark.zip/pyspark/sql/context.py", line 384, in sql
File "/usr/hdp/2.6.4.0-91/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 603, in sql
File "/usr/hdp/2.6.4.0-91/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/hdp/2.6.4.0-91/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 73, in deco
pyspark.sql.utils.ParseException: u"\nno viable alternative at input '(case when app_new in 'app_3''(line 1, pos 39)\n\n== SQL ==\nselect click_id, (case when app_new in 'app_3' then 1 else 0 END) as 'app_3', device, os, channel from clickDF \n---------------------------------------^^^\n"
代码:
app_sql = "".join(["select a.app, CONCAT('app', '_',a.app) as new_app, a.app_count from(select app, count(app) as app_count from dblclk_text.click_data group by app )a order by a.app_count desc limit 5"])
df1 = hiveContext.sql(app_sql)
df1.createOrReplaceTempView('app')
df1_new_app = sqlContext.sql("select new_app from app ")
df1_new_app.printSchema()
app_list = []
app_result = df1_new_app.collect()
print(type(app_result))
for app in app_result:
app_list.append(app)
click_sql = "select click_id,CONCAT ('app', '_', app) as app_new, device, os, channel from dblclk_text.click_pank_data"
clickDF = hiveContext.sql(click_sql)
clickDF.createOrReplaceTempView('clickDF')
app_list_0 = str(app_list[0])
#app_list_0 = 'app_3'
print (app_list_0)
sample_sql = '''select click_id, (case when app_new in {0} then 1 else 0 END) as {0}, device, os, channel from clickDF '''.format(app_list_0)
click_app_join_sql = sqlContext.sql(sample_sql)
click_app = sqlContext.sql(click_app_join_sql)
click_app.show(3)
解决方案
sample_sql = '''
select tenant, user_id, event_type,bounce_class from {0}
where {1} and {2} and {3} and {4}
and (event_type in ('list_unsubscribe', 'link_unsubscribe', 'spam_complaint') or bounce_class in ('10','30','90'))
and tenant is not null
and tenant != ''
'''.format(table_name, dateid_start, dateid_end,event_timestamp_start,event_timestamp_end)
HiveContext.sql(sample_sql)
table_name、dateid_start、dateid_end、event_timestamp_start、event_timestamp_end 是 python 值。
推荐阅读
- android - Onesignal 推送通知到特定类别
- perl - 如何四舍五入数组中的十个随机数?
- arrays - 如何使用实例变量在另一个变量中访问其对象并返回
- ios - 在模拟器中使用 SWRevealViewController 时导航延迟
- php - 如何搜索整个句子形式的mysql数据库?
- node.js - 如何从 Vue.js 组件连接到 socket.io?
- python - qtpropertyanimation 只改变比例
- javascript - 数据表未捕获类型错误:无法读取未定义的属性“mData”
- ruby-on-rails - 在不同的子控制器中访问不同的关系
- c# - Xamarin.Forms:PageRenderer 中的 NavigationController 为空