首页 > 解决方案 > 来自 Google Dataproc 上 Cloudkarfka 的 PySpark 流式传输

问题描述

服务:GCP Dataproc & DataprocHub

服务:Cloudkarafka ( www.cloudkarafka.com ),一种简单快捷的方式来启动您的 kafka 服务。

1:开箱即用的 GCP-DataprocHub 提供 Spark 2.4.8。

2:创建具有特定版本的 Dataproc 集群。

gcloud shell:
gcloud dataproc clusters create dataproc-spark312  --image-version=2.0-ubuntu18  --region=us-central1 --single-node

3:导出并保存在gs桶中

gcloud dataproc clusters export dataproc-spark312 --destination dataproc-spark312.yaml --region us-central1
gsutil cp dataproc-spark312.yaml gs://gcp-learn-lib/

4:创建一个ENV文件

DATAPROC_CONFIGS=gs://gcp-learn-lib/dataproc-spark312.yaml
NOTEBOOKS_LOCATION=gs://gcp-learn-notebooks/notebooks
DATAPROC_LOCATIONS_LIST=a,b,c

另存为:dataproc-hub-config.env 并上传到 gs:bucket

5:创建一个Datapro-Hub并链接上面的Cluster。部分:“自定义环境设置”键:container-env-file 值:gs://gcp-learn-lib/dataproc-hub-config.env

6:完成创建

7:点击 Jupyter 链接

8:它应该显示你的集群,选择它。并选择区域(与步骤2中的原始集群相同)

9:Dataproc -> Cluster -> 点击Cluster(以hub开头-) -> VM Instances -> SSH 下载并将kafka jars复制到

cd /usr/lib/spark/jars/
wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.6.2/commons-pool2-2.6.2.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.6.0/kafka-clients-2.6.0.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.1.2/spark-sql-kafka-0-10_2.12-3.1.2.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.12/3.1.2/spark-streaming-kafka-0-10_2.12-3.1.2.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10-assembly_2.12/3.1.2/spark-streaming-kafka-0-10-assembly_2.12-3.1.2.jar

10:准备JAAS和CA文件

vi /tmp/cloudkarafka_gcp_oct2021.jaas
vi /tmp/cloudkarafka_gcp_oct2021.ca

11:追加到文件末尾:

sudo vi /etc/spark/conf/spark-defaults.conf
spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/cloudkarafka_gcp_oct2021.jaas -Dsasl.jaas.config=/tmp/cloudkarafka_gcp_oct2021.jaas -Dssl.ca.location=/tmp/cloudkarafka_gcp_oct2021.ca
spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/cloudkarafka_gcp_oct2021.jaas

12:回到 JupyterHub。重启内核 - 测试你的代码

spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "server1:9094,serer2:9094,server3:9094") \
.option("subscribe", "foo-default") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-256") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream.format("kafka") \
.option("kafka.bootstrap.servers", "server1:9094,serer2:9094,server3:9094") \
.option("topic", "foo-test") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-256") \
.option("checkpointLocation", "/tmp/stream/kafkatest") \
.start()

13:享受

===============

标签: pysparkspark-streaminggoogle-cloud-dataproc

解决方案


推荐阅读