首页 > 解决方案 > Apache Kafka Producer twitter 连接 IllegalStateException

问题描述

我正在尝试与 twitter 客户端建立连接并收到以下异常:此异常出现在建立基本连接时,而 zookeepers 和 Kafka 服务器正在运行。

Exception in thread "main" java.lang.IllegalStateException: There is already a connection thread running for Hosebird-Client-01, endpoint: /1.1/statuses/filter.json?delimited=length&stall_warnings=true
    at com.twitter.hbc.httpclient.BasicClient.connect(BasicClient.java:92)
    at com.github.simpleProject.kafka.simpleProject.twitter.TwitterProducer.run(TwitterProducer.java:37)
    at com.github.simpleProject.kafka.simpleProject.twitter.TwitterProducer.main(TwitterProducer.java:28)

这是我的代码

public class TwitterProducer {
    Logger logger = LoggerFactory.getLogger(TwitterProducer.class.getName());
    String consumerKey ="sbkOd*********tMJQUpU4Iz4j";
    String consumerSecret="eZZPa788***********TR6hx39MlkvWylO6rF";
    String token = "758284333996277763-*************D6f5CanCsre2qPgviv";
    String secret="jJRbC9cMOaacHEHEd7y6po********1VwsS5x0ZmDG";
    public TwitterProducer(){}
    public static void main(String[] args){
        new TwitterProducer().run();
    }
    public void run(){
        /** Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */
        BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);

        // create a twitter client
        Client client = createTwitterClient(msgQueue);
        // Attempts to establish a connection.
        client.connect();
        // create a kafka producer

        // loops to send tweets to kafka
        // on a different thread, or multiple different threads....
        while (!client.isDone()) {
            String msg =null;
            try{
                msg = msgQueue.poll(5, TimeUnit.SECONDS);
            } catch (InterruptedException e){
                e.printStackTrace();
                client.stop();
            }
            if(msg!=null){
                logger.info(msg);
            }
            logger.info("End of application");
        }
    }


    public Client createTwitterClient(BlockingQueue<String> msgQueue){

        /** Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) */
        Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
        StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();

        List<String> terms = Lists.newArrayList("bitcoin");

        hosebirdEndpoint.trackTerms(terms);

        // These secrets should be read from a config file
        Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);

        ClientBuilder builder = new ClientBuilder()
                .name("Hosebird-Client-01")             // optional: mainly for the log
                .hosts(hosebirdHosts)
                .authentication(hosebirdAuth)
                .endpoint(hosebirdEndpoint)
                .processor(new StringDelimitedProcessor(msgQueue));

        Client hosebirdClient = builder.build();
        // Attempts to establish a connection.
        hosebirdClient.connect();
        return hosebirdClient;
    }
}

标签: javatwitterapache-kafka

解决方案


推荐阅读