首页 > 解决方案 > 有没有办法拆分 Apache Storm 元组并将它们合并回来?

问题描述

场景:Storm 从 Kafka 接收文本消息(例如新闻、推文)并将每条消息拆分为句子。然后,每个句子将被不同的 NLP 模块标记,例如 NER、POS 和 Sentiment analysis。最后,句子将根据它们的 id 合并回来形成一条消息。使用 Trident 或 Spark 可以实现这些场景,但我只需要使用 Apache Storm 来实现。这是我的错误代码。这是我的分句器螺栓:

import opennlp.tools.sentdetect.SentenceDetectorME;
import opennlp.tools.sentdetect.SentenceModel;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.json.JSONArray;
import org.json.JSONObject;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.util.Map;
public class SentenceBoltSplitter extends BaseRichBolt {
    private OutputCollector collector;
    private SentenceDetectorME detector;
    String ModelPath;
    public SentenceBoltSplitter(String path)
    {
        ModelPath=path;
    }
    public SentenceBoltSplitter()
    {
        ModelPath="fail";
    }
    @Override
    public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        Constructor<?> constructor;

        InputStream inputStream;
        try {
            inputStream = new FileInputStream(ModelPath);
            SentenceModel model = new SentenceModel(inputStream);
            detector = new SentenceDetectorME(model);
        } catch ( IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    @Override
    public void execute(Tuple tuple) {

        JSONObject json = null;
        try {
            json = new JSONObject(tuple.getValue(4).toString());
        }
        catch (Exception e)
        {
            System.out.println("json error catched");
        }
        String sentences[] = detector.sentDetect(json.get("content").toString());
        JSONArray sentencesArr = new JSONArray();
        for(String temp:sentences) {
            JSONObject sent=new JSONObject();
            sent.put("sentence",temp);
            sent.put("id",json.get("id"));
            collector.emit(tuple.getSourceStreamId(), new Values( json.get("id"),sent.toString()));
        }
        collector.ack(tuple);
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "doc"));
        declarer.declareStream("StreamEnglish", new Fields("id" ,"doc"));
    }
}

这是我的句子合并Bolt:

import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.json.*;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.BasicOutputCollector;
public class Output_merger extends BaseBasicBolt {
    JSONObject docs = new JSONObject();
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String id="";
        id= tuple.getValue(0).toString();
        JSONArray sentences=docs.getJSONArray("senttences");
        sentences.put(new JSONObject(tuple.getValue(1).toString()));
        docs.put("sentences",sentences);
        collector.emit( new Values(id ,docs));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer)
    {
        declarer.declare(new Fields("id","message"));
    }
}

这是我的java拓扑:

import SentenceBoltSplitter;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class PipelineTopologySplitter {
    public static void main(String args[]) throws Exception
    {
        Properties props = new Properties();
        props.put("metadata.broker.list",  "10.XX.XX.XX:9092");
        props.put("bootstrap.servers", "10.XX.XX.XX:9092");
        props.put("request.required.acks", "1");
        props.put("serializer.class",  "kafka.serializer.StringEncoder");
        Config conf = new Config();
        props.put("bootstrap.servers",  "10.XX.XX.XX:9092");
        props.put("request.required.acks", "1");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer",  "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",  "org.apache.kafka.common.serialization.StringSerializer");
        conf.put("kafka.broker.config", props);
        conf.put(KafkaBolt.TOPIC,  "test-output");
        int fourGB = 1 * 1000;
        conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, fourGB);
        conf.setMaxTaskParallelism(300);
        conf.setNumWorkers(2);
        conf.put("topology.subprocess.timeout.secs", 1000);
        KafkaBolt<String, String> kafkabolt = new KafkaBolt<String, String>()
                .withTopicSelector(new DefaultTopicSelector("test-output"))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("id",  "message"))
                .withProducerProperties(props);
        final TopologyBuilder tp = new TopologyBuilder();
        String path= "StormProject/python/";
        String path_resources="StormProject/resources/";
        tp.setSpout("kafka_spout", new  KafkaSpout<>(KafkaSpoutConfig.builder("10.XX.XX.XX:9092",  "test_08May").build()), 2);
        tp.setBolt("LanguageBoltJavaFastext", new  MyBolt("lid.176.bin","LanguageBoltFasttext"), 2).shuffleGrouping("kafka_spout");
        tp.setBolt("SentenceBoltEnglish", new SentenceBoltSplitter(path_resources+"en-sent.bin"), 2).shuffleGrouping("LanguageBoltJavaFastext","StreamEnglish");
        tp.setBolt("Output", new Output_merger(), 1).fieldsGrouping("SentenceBoltEnglish" , "StreamEnglish",new Fields("id"));
        tp.setBolt("forwardToKafka", kafkabolt, 1).shuffleGrouping("Output");
        LocalCluster localCluster = new LocalCluster();
        try {
            localCluster.submitTopology("nlp-pipeline-topology-local-debuggig", conf, tp.createTopology());
            Thread.sleep(100000000);
        } catch (InterruptedException e)
        {
            e.printStackTrace(); localCluster.shutdown();
        }
    }
}

这是接收错误:

Exception in thread "main" java.lang.IllegalStateException: Bolt 'Output' contains a non-serializable field of type org.json.JSONObject, which was instantiated prior to topology creation. org.json.JSONObject should be instantiated within the prepare method of 'Output at the earliest.
    at org.apache.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:122)
    at PipelineTopologySplitter.main(PipelineTopologySplitter.java:56)

我试图在我的代码中模仿单词计数示例的逻辑,但欢迎任何其他解决方案,除非使用 Trident 或其他库/框架。

标签: real-timeapache-stormdividesplitter

解决方案


推荐阅读