real-time - 有没有办法拆分 Apache Storm 元组并将它们合并回来?
问题描述
场景:Storm 从 Kafka 接收文本消息(例如新闻、推文)并将每条消息拆分为句子。然后,每个句子将被不同的 NLP 模块标记,例如 NER、POS 和 Sentiment analysis。最后,句子将根据它们的 id 合并回来形成一条消息。使用 Trident 或 Spark 可以实现这些场景,但我只需要使用 Apache Storm 来实现。这是我的错误代码。这是我的分句器螺栓:
import opennlp.tools.sentdetect.SentenceDetectorME;
import opennlp.tools.sentdetect.SentenceModel;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.json.JSONArray;
import org.json.JSONObject;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.util.Map;
public class SentenceBoltSplitter extends BaseRichBolt {
private OutputCollector collector;
private SentenceDetectorME detector;
String ModelPath;
public SentenceBoltSplitter(String path)
{
ModelPath=path;
}
public SentenceBoltSplitter()
{
ModelPath="fail";
}
@Override
public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
Constructor<?> constructor;
InputStream inputStream;
try {
inputStream = new FileInputStream(ModelPath);
SentenceModel model = new SentenceModel(inputStream);
detector = new SentenceDetectorME(model);
} catch ( IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void execute(Tuple tuple) {
JSONObject json = null;
try {
json = new JSONObject(tuple.getValue(4).toString());
}
catch (Exception e)
{
System.out.println("json error catched");
}
String sentences[] = detector.sentDetect(json.get("content").toString());
JSONArray sentencesArr = new JSONArray();
for(String temp:sentences) {
JSONObject sent=new JSONObject();
sent.put("sentence",temp);
sent.put("id",json.get("id"));
collector.emit(tuple.getSourceStreamId(), new Values( json.get("id"),sent.toString()));
}
collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "doc"));
declarer.declareStream("StreamEnglish", new Fields("id" ,"doc"));
}
}
这是我的句子合并Bolt:
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.json.*;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.BasicOutputCollector;
public class Output_merger extends BaseBasicBolt {
JSONObject docs = new JSONObject();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String id="";
id= tuple.getValue(0).toString();
JSONArray sentences=docs.getJSONArray("senttences");
sentences.put(new JSONObject(tuple.getValue(1).toString()));
docs.put("sentences",sentences);
collector.emit( new Values(id ,docs));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields("id","message"));
}
}
这是我的java拓扑:
import SentenceBoltSplitter;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class PipelineTopologySplitter {
public static void main(String args[]) throws Exception
{
Properties props = new Properties();
props.put("metadata.broker.list", "10.XX.XX.XX:9092");
props.put("bootstrap.servers", "10.XX.XX.XX:9092");
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
Config conf = new Config();
props.put("bootstrap.servers", "10.XX.XX.XX:9092");
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
conf.put("kafka.broker.config", props);
conf.put(KafkaBolt.TOPIC, "test-output");
int fourGB = 1 * 1000;
conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, fourGB);
conf.setMaxTaskParallelism(300);
conf.setNumWorkers(2);
conf.put("topology.subprocess.timeout.secs", 1000);
KafkaBolt<String, String> kafkabolt = new KafkaBolt<String, String>()
.withTopicSelector(new DefaultTopicSelector("test-output"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("id", "message"))
.withProducerProperties(props);
final TopologyBuilder tp = new TopologyBuilder();
String path= "StormProject/python/";
String path_resources="StormProject/resources/";
tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("10.XX.XX.XX:9092", "test_08May").build()), 2);
tp.setBolt("LanguageBoltJavaFastext", new MyBolt("lid.176.bin","LanguageBoltFasttext"), 2).shuffleGrouping("kafka_spout");
tp.setBolt("SentenceBoltEnglish", new SentenceBoltSplitter(path_resources+"en-sent.bin"), 2).shuffleGrouping("LanguageBoltJavaFastext","StreamEnglish");
tp.setBolt("Output", new Output_merger(), 1).fieldsGrouping("SentenceBoltEnglish" , "StreamEnglish",new Fields("id"));
tp.setBolt("forwardToKafka", kafkabolt, 1).shuffleGrouping("Output");
LocalCluster localCluster = new LocalCluster();
try {
localCluster.submitTopology("nlp-pipeline-topology-local-debuggig", conf, tp.createTopology());
Thread.sleep(100000000);
} catch (InterruptedException e)
{
e.printStackTrace(); localCluster.shutdown();
}
}
}
这是接收错误:
Exception in thread "main" java.lang.IllegalStateException: Bolt 'Output' contains a non-serializable field of type org.json.JSONObject, which was instantiated prior to topology creation. org.json.JSONObject should be instantiated within the prepare method of 'Output at the earliest.
at org.apache.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:122)
at PipelineTopologySplitter.main(PipelineTopologySplitter.java:56)
我试图在我的代码中模仿单词计数示例的逻辑,但欢迎任何其他解决方案,除非使用 Trident 或其他库/框架。
解决方案
推荐阅读
- google-calendar-api - 是否存在用户阻止注册 Google Calendar API 的设置?
- mule-studio - Anypoint Studio 7.3 中的 db 连接器的 TestConnection 失败
- html - HTML:如何立即重定向页面?
- visual-studio - 如何使用 .NET Core 根据发布配置文件更新 appsettings.json?
- sql - 查找重复 ID 并添加新的序列 ID
- docker - 在树莓派上安装 Kubernetes 集群
- python-3.x - 使用 Python Pandas 组合 SIMILAR 值行
- python-3.x - SSL 证书验证失败,Install Certificates.command 上的权限错误
- macos - 如何获得帮助工具来访问 macOS 上的文件?
- c# - 如何声明公共静态字节?