storm集群配置
storm配置相当简单
安装
tar -zxvf apache-storm-1.2.2.tar.gzrm apache-storm-1.2.2.tar.gzmv apache-storm-1.2.2 stormsudo vim /etc/profile export STORM_HOME=/usr/local/storm export PATH=$PATH:$STORM_HOME/binsource /etc/profileapt install python准备 master worker1 worker2 worker3 这四台机器首先确保你的zookeeper集群能够正常运行worker1 worker2 worker3为zk集群具体配置参照我的博客https://www.cnblogs.com/ye-hcj/p/9889585.html
修改配置文件
storm.yaml
sudo vim storm.yaml在四台机器中都加入如下配置 storm.zookeeper.servers: - "worker1" - "worker2" - "worker3" storm.local.dir: "/usr/local/tmpdata/storm" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 nimbus.seeds: ["master"] storm.zookeeper.port: 2181 // 不加下面这几个你的拓扑直接跑不起来 nimbus.childopts: "-Xmx1024m" supervisor.childopts: "-Xmx1024m" worker.childopts: "-Xmx768m"
启动
在master中运行 storm nimbus >> /dev/null & storm ui >/dev/null 2>&1 &在worker1,worker2,worker3中运行 storm supervisor >/dev/null 2>&1 & storm logviewer >/dev/null 2>&1 &直接访问http://master:8080即可
使用java编写拓扑
四个文件如图
pom.xml
4.0.0 test test 1.0.0 test Test project for spring boot mybatis jar UTF-8 UTF-8 1.8 1.8 1.8 org.apache.storm storm-core 1.2.2 provided junit junit 3.8.1 maven-assembly-plugin jar-with-dependencies make-assembly package single App.java
package test; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; public class App { public static void main( String[] args ) throws Exception { TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("word",new WordSpout(),1); topologyBuilder.setBolt("receive",new RecieveBolt(),1).shuffleGrouping("word"); topologyBuilder.setBolt("print",new ConsumeBolt(),1).shuffleGrouping("receive"); // 集群运行 Config config = new Config(); config.setNumWorkers(3); config.setDebug(true); StormSubmitter.submitTopology("teststorm", config, topologyBuilder.createTopology()); // 本地测试 // Config config = new Config(); // config.setNumWorkers(3); // config.setDebug(true); // config.setMaxTaskParallelism(20); // LocalCluster cluster = new LocalCluster(); // cluster.submitTopology("wordCountTopology", config, topologyBuilder.createTopology()); // Utils.sleep(60000); // 执行完毕,关闭cluster // cluster.shutdown(); } }
WordSpout.java
package test; import java.util.Map; import java.util.Random; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; public class WordSpout extends BaseRichSpout { private static final long serialVersionUID = 6102239192526611945L; private SpoutOutputCollector collector; Random random = new Random(); // 初始化tuple的collector public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector collector) { this.collector = collector; } public void nextTuple() { // 模拟产生消息队列 String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"}; final String word = words[random.nextInt(words.length)]; // 提交一个tuple给默认的输出流 this.collector.emit(new Values(word)); Utils.sleep(5000); } // 声明发送消息的字段名 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } }
RecieveBolt.java
package test; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class RecieveBolt extends BaseRichBolt { private static final long serialVersionUID = -4758047349803579486L; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { // 将spout传递过来的tuple值进行转换 this.collector.emit(new Values(tuple.getStringByField("word") + "!!!")); } // 声明发送消息的字段名 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } }
ConsumeBolt.java
package test; import java.io.FileWriter; import java.io.IOException; import java.util.Map; import java.util.UUID; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; public class ConsumeBolt extends BaseRichBolt { private static final long serialVersionUID = -7114915627898482737L; private FileWriter fileWriter = null; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; try { fileWriter = new FileWriter("/usr/local/tmpdata/" + UUID.randomUUID()); // fileWriter = new FileWriter("C:\\Users\\26401\\Desktop\\test\\" + UUID.randomUUID()); } catch (IOException e) { throw new RuntimeException(e); } } public void execute(Tuple tuple) { try { String word = tuple.getStringByField("word") + "......." + "\n"; fileWriter.write(word); fileWriter.flush(); System.out.println(word); } catch (IOException e) { throw new RuntimeException(e); } } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
在集群中运行
storm jar test-1.0.0-jar-with-dependencies.jar test.App // 启动集群 storm kill teststorm // 结束集群