scala - spark jupyter notebook 不显示 scala 控制台输出
问题描述
1)我正在学习流式传输并遇到控制台(scala)上没有显示(通过sendEVent的println)的问题。我进一步尝试植入 println("xyz") 行,发现只有当它们没有嵌入到“while”块的块中时才会被打印出来……否则即使放在 while 循环之前也不会被打印。我又放了几行这些 println("xyz") ,发现有些可能会被挡住……只有最后一行被打印出来。
以前我在 Storm 流媒体上也遇到过两次不同的代码:Jupyter Notebook 没有打印出任何东西,但在 Scala Shell 上完全没问题。
2)我也想知道那些awaitTermination(),比如:messages.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination() (我也没有从控制台获得输出)
或那些“不定式循环”,如下所示: var finished = false while (!finished) {................. ..}
他们是否在等待像停止或 [CTR]C 之类的硬中断......或者如何正确地打破它们?所以下一行被执行。我很困惑,因为编写示例/教程的作者对此一无所知。
enter code here
import java.util._
import scala.collection.JavaConverters._
import java.util.concurrent._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.eventhubs.ConnectionStringBuilder
// Event hub configurations
// Replace values below with yours
val eventHubName = "<Event hub name>"
val eventHubNSConnStr = "<Event hub namespace connection string>"
val connStr = ConnectionStringBuilder (eventHubNSConnStr)
.setEventHubName(eventHubName).build
import com.microsoft.azure.eventhubs._
val pool = Executors.newFixedThreadPool(1)
val eventHubClient = EventHubClient.create(connStr.toString(), pool)
def sendEvent(message: String) = {
val messageData = EventData.create(message.getBytes("UTF-8"))
eventHubClient.get().send(messageData)
println("Sent event: " + message + "\n")
}
import twitter4j._
import twitter4j.TwitterFactory
import twitter4j.Twitter
import twitter4j.conf.ConfigurationBuilder
// Twitter application configurations
// Replace values below with yours
val twitterConsumerKey = "<CONSUMER KEY>"
val twitterConsumerSecret = "<CONSUMER SECRET>"
val twitterOauthAccessToken = "<ACCESS TOKEN>"
val twitterOauthTokenSecret = "<TOKEN SECRET>"
val cb = new ConfigurationBuilder()
cb.setDebugEnabled
(true).setOAuthConsumerKeywitterConsumerKey).setOAuthConsumerSecret
(twitterConsumerSecret).setOAuthAccessToken
(twitterOauthAccessToken).setOAuthAccessTokenSecret(twitterOauthTokenSecret)
val twitterFactory = new TwitterFactory(cb.build())
val twitter = twitterFactory.getInstance()
//Getting tweets with keyword "Azure" and sending them to Event Hub realtime
val query = new Query(" #Azure ")
query.setCount(100)
query.lang("en")
var finished = false
while (!finished) {
val result = twitter.search(query)
val statuses = result.getTweets()
var lowestStatusId = Long.MaxValue
for (status <- statuses.asScala) {
if(!status.isRetweet()){
sendEvent(status.getText())
}
lowestStatusId = Math.min(status.getId(), lowestStatusId)
Thread.sleep(2000)
}
query.setMaxId(lowestStatusId - 1)
}
// Closing connection to the Event Hub
eventHubClient.get().close()