pyspark - 来自 Google Dataproc 上 Cloudkarfka 的 PySpark 流式传输
问题描述
服务:GCP Dataproc & DataprocHub
服务:Cloudkarafka ( www.cloudkarafka.com ),一种简单快捷的方式来启动您的 kafka 服务。
1:开箱即用的 GCP-DataprocHub 提供 Spark 2.4.8。
2:创建具有特定版本的 Dataproc 集群。
gcloud shell:
gcloud dataproc clusters create dataproc-spark312 --image-version=2.0-ubuntu18 --region=us-central1 --single-node
3:导出并保存在gs桶中
gcloud dataproc clusters export dataproc-spark312 --destination dataproc-spark312.yaml --region us-central1
gsutil cp dataproc-spark312.yaml gs://gcp-learn-lib/
4:创建一个ENV文件
DATAPROC_CONFIGS=gs://gcp-learn-lib/dataproc-spark312.yaml
NOTEBOOKS_LOCATION=gs://gcp-learn-notebooks/notebooks
DATAPROC_LOCATIONS_LIST=a,b,c
另存为:dataproc-hub-config.env 并上传到 gs:bucket
5:创建一个Datapro-Hub并链接上面的Cluster。部分:“自定义环境设置”键:container-env-file 值:gs://gcp-learn-lib/dataproc-hub-config.env
6:完成创建
7:点击 Jupyter 链接
8:它应该显示你的集群,选择它。并选择区域(与步骤2中的原始集群相同)
9:Dataproc -> Cluster -> 点击Cluster(以hub开头-) -> VM Instances -> SSH 下载并将kafka jars复制到
cd /usr/lib/spark/jars/
wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.6.2/commons-pool2-2.6.2.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.6.0/kafka-clients-2.6.0.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.1.2/spark-sql-kafka-0-10_2.12-3.1.2.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.12/3.1.2/spark-streaming-kafka-0-10_2.12-3.1.2.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10-assembly_2.12/3.1.2/spark-streaming-kafka-0-10-assembly_2.12-3.1.2.jar
10:准备JAAS和CA文件
vi /tmp/cloudkarafka_gcp_oct2021.jaas
vi /tmp/cloudkarafka_gcp_oct2021.ca
11:追加到文件末尾:
sudo vi /etc/spark/conf/spark-defaults.conf
spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/cloudkarafka_gcp_oct2021.jaas -Dsasl.jaas.config=/tmp/cloudkarafka_gcp_oct2021.jaas -Dssl.ca.location=/tmp/cloudkarafka_gcp_oct2021.ca
spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/cloudkarafka_gcp_oct2021.jaas
12:回到 JupyterHub。重启内核 - 测试你的代码
spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "server1:9094,serer2:9094,server3:9094") \
.option("subscribe", "foo-default") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-256") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream.format("kafka") \
.option("kafka.bootstrap.servers", "server1:9094,serer2:9094,server3:9094") \
.option("topic", "foo-test") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-256") \
.option("checkpointLocation", "/tmp/stream/kafkatest") \
.start()
13:享受
===============
解决方案
推荐阅读
- r - 如何在 ggplot 的 facet_wrap 中为每个面板重复 x 标签?
- python - Discord bot in python
- javascript - 由 jquery 创建的元素上的事件
- r - Keeping group_by in tact while applying a filter within mutate in dplyr
- powershell - Check item into List box by Radio Button
- java - How can I create a Jenkins pipeline to run a Java Application on a schedule?
- docker - 如何在 ubuntu 上构建基于 alpine 的类似 docker 镜像?
- jquery - My jquery code for showdays was working with accurate date now but now it is 1 day less
- docker - Azure DevOps pipeline not setting docker memory
- html - 在一行中显示所有列,并且不会溢出到 HTML 中的下一行