java - Apache Kafka Producer twitter 连接 IllegalStateException
问题描述
我正在尝试与 twitter 客户端建立连接并收到以下异常:此异常出现在建立基本连接时,而 zookeepers 和 Kafka 服务器正在运行。
Exception in thread "main" java.lang.IllegalStateException: There is already a connection thread running for Hosebird-Client-01, endpoint: /1.1/statuses/filter.json?delimited=length&stall_warnings=true
at com.twitter.hbc.httpclient.BasicClient.connect(BasicClient.java:92)
at com.github.simpleProject.kafka.simpleProject.twitter.TwitterProducer.run(TwitterProducer.java:37)
at com.github.simpleProject.kafka.simpleProject.twitter.TwitterProducer.main(TwitterProducer.java:28)
这是我的代码
public class TwitterProducer {
Logger logger = LoggerFactory.getLogger(TwitterProducer.class.getName());
String consumerKey ="sbkOd*********tMJQUpU4Iz4j";
String consumerSecret="eZZPa788***********TR6hx39MlkvWylO6rF";
String token = "758284333996277763-*************D6f5CanCsre2qPgviv";
String secret="jJRbC9cMOaacHEHEd7y6po********1VwsS5x0ZmDG";
public TwitterProducer(){}
public static void main(String[] args){
new TwitterProducer().run();
}
public void run(){
/** Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);
// create a twitter client
Client client = createTwitterClient(msgQueue);
// Attempts to establish a connection.
client.connect();
// create a kafka producer
// loops to send tweets to kafka
// on a different thread, or multiple different threads....
while (!client.isDone()) {
String msg =null;
try{
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e){
e.printStackTrace();
client.stop();
}
if(msg!=null){
logger.info(msg);
}
logger.info("End of application");
}
}
public Client createTwitterClient(BlockingQueue<String> msgQueue){
/** Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) */
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
List<String> terms = Lists.newArrayList("bitcoin");
hosebirdEndpoint.trackTerms(terms);
// These secrets should be read from a config file
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);
ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01") // optional: mainly for the log
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue));
Client hosebirdClient = builder.build();
// Attempts to establish a connection.
hosebirdClient.connect();
return hosebirdClient;
}
}
解决方案
推荐阅读
- xml - 轻松将书籍转换为数据库(XML?)
- javascript - 如何从工具提示 echarts 中获取数据
- react-native - React Native,使用 fetch 方法,[TypeError: Network request failed]
- firebase - 尝试创建 Firebase Firestore 数据库时出错
- java - 带有可编辑框架帽的 Java 游戏循环
- reactjs - react-hook-form 自定义验证消息未显示
- android - Android如何将多个资源添加到工具中:keep in keep.xml?
- javascript - 如何在第二次点击时关闭手风琴?
- python - 如何在pyspark中将列转换为行
- sql - 替换 SQL Server 2016 中的确切单词,替换功能作为“喜欢”但不准确