博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
storm集群配置以及java编写拓扑例子
阅读量:5014 次
发布时间:2019-06-12

本文共 7872 字,大约阅读时间需要 26 分钟。

storm集群配置

storm配置相当简单

安装

tar -zxvf apache-storm-1.2.2.tar.gzrm apache-storm-1.2.2.tar.gzmv apache-storm-1.2.2 stormsudo vim /etc/profile    export STORM_HOME=/usr/local/storm    export PATH=$PATH:$STORM_HOME/binsource /etc/profileapt install python准备 master worker1 worker2 worker3 这四台机器首先确保你的zookeeper集群能够正常运行worker1 worker2 worker3为zk集群具体配置参照我的博客https://www.cnblogs.com/ye-hcj/p/9889585.html

修改配置文件

  1. storm.yaml

    sudo vim storm.yaml在四台机器中都加入如下配置    storm.zookeeper.servers:    - "worker1"    - "worker2"    - "worker3"    storm.local.dir: "/usr/local/tmpdata/storm"    supervisor.slots.ports:    - 6700    - 6701    - 6702    - 6703    nimbus.seeds: ["master"]    storm.zookeeper.port: 2181    // 不加下面这几个你的拓扑直接跑不起来    nimbus.childopts: "-Xmx1024m"     supervisor.childopts: "-Xmx1024m"    worker.childopts: "-Xmx768m"
  2. 启动

    在master中运行    storm nimbus >> /dev/null &    storm ui >/dev/null 2>&1 &在worker1,worker2,worker3中运行    storm supervisor >/dev/null 2>&1 &    storm logviewer >/dev/null 2>&1 &直接访问http://master:8080即可

使用java编写拓扑

  1. 四个文件如图

    1108804-20190110201001299-249848389.png

  2. pom.xml

    4.0.0
    test
    test
    1.0.0
    test
    Test project for spring boot mybatis
    jar
    UTF-8
    UTF-8
    1.8
    1.8
    1.8
    org.apache.storm
    storm-core
    1.2.2
    provided
    junit
    junit
    3.8.1
    maven-assembly-plugin
    jar-with-dependencies
    make-assembly
    package
    single
  3. App.java

    package test;    import org.apache.storm.Config;    import org.apache.storm.LocalCluster;    import org.apache.storm.StormSubmitter;    import org.apache.storm.topology.TopologyBuilder;    import org.apache.storm.utils.Utils;    public class App     {        public static void main( String[] args ) throws Exception {            TopologyBuilder topologyBuilder = new TopologyBuilder();            topologyBuilder.setSpout("word",new WordSpout(),1);            topologyBuilder.setBolt("receive",new RecieveBolt(),1).shuffleGrouping("word");            topologyBuilder.setBolt("print",new ConsumeBolt(),1).shuffleGrouping("receive");            // 集群运行            Config config = new Config();            config.setNumWorkers(3);            config.setDebug(true);            StormSubmitter.submitTopology("teststorm", config, topologyBuilder.createTopology());            // 本地测试            // Config config = new Config();            // config.setNumWorkers(3);            // config.setDebug(true);            // config.setMaxTaskParallelism(20);            // LocalCluster cluster = new LocalCluster();            // cluster.submitTopology("wordCountTopology", config, topologyBuilder.createTopology());            // Utils.sleep(60000);            // 执行完毕,关闭cluster            // cluster.shutdown();        }    }
  4. WordSpout.java

    package test;    import java.util.Map;    import java.util.Random;    import org.apache.storm.spout.SpoutOutputCollector;    import org.apache.storm.task.TopologyContext;    import org.apache.storm.topology.OutputFieldsDeclarer;    import org.apache.storm.topology.base.BaseRichSpout;    import org.apache.storm.tuple.Fields;    import org.apache.storm.tuple.Values;    import org.apache.storm.utils.Utils;    public class WordSpout extends BaseRichSpout {        private static final long serialVersionUID = 6102239192526611945L;        private SpoutOutputCollector collector;        Random random = new Random();        // 初始化tuple的collector        public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector collector) {            this.collector = collector;        }        public void nextTuple() {            // 模拟产生消息队列            String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};            final String word = words[random.nextInt(words.length)];            // 提交一个tuple给默认的输出流            this.collector.emit(new Values(word));            Utils.sleep(5000);        }        // 声明发送消息的字段名        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {            outputFieldsDeclarer.declare(new Fields("word"));        }    }
  5. RecieveBolt.java

    package test;    import java.util.Map;    import org.apache.storm.task.OutputCollector;    import org.apache.storm.task.TopologyContext;    import org.apache.storm.topology.OutputFieldsDeclarer;    import org.apache.storm.topology.base.BaseRichBolt;    import org.apache.storm.tuple.Fields;    import org.apache.storm.tuple.Tuple;    import org.apache.storm.tuple.Values;    public class RecieveBolt extends BaseRichBolt {        private static final long serialVersionUID = -4758047349803579486L;        private OutputCollector collector;        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {            this.collector = collector;        }        public void execute(Tuple tuple) {            // 将spout传递过来的tuple值进行转换            this.collector.emit(new Values(tuple.getStringByField("word") + "!!!"));         }        // 声明发送消息的字段名        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {            outputFieldsDeclarer.declare(new Fields("word"));        }    }
  6. ConsumeBolt.java

    package test;    import java.io.FileWriter;    import java.io.IOException;    import java.util.Map;    import java.util.UUID;    import org.apache.storm.task.OutputCollector;    import org.apache.storm.task.TopologyContext;    import org.apache.storm.topology.OutputFieldsDeclarer;    import org.apache.storm.topology.base.BaseRichBolt;    import org.apache.storm.tuple.Tuple;    public class ConsumeBolt extends BaseRichBolt {        private static final long serialVersionUID = -7114915627898482737L;        private FileWriter fileWriter = null;        private OutputCollector collector;        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {            this.collector = collector;            try {                fileWriter = new FileWriter("/usr/local/tmpdata/" + UUID.randomUUID());                // fileWriter = new FileWriter("C:\\Users\\26401\\Desktop\\test\\" + UUID.randomUUID());            } catch (IOException e) {                throw new RuntimeException(e);            }        }        public void execute(Tuple tuple) {            try {                String word = tuple.getStringByField("word") + "......." + "\n";                fileWriter.write(word);                fileWriter.flush();                System.out.println(word);            } catch (IOException e) {                throw new RuntimeException(e);            }        }        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {        }    }
  7. 在集群中运行

    storm jar test-1.0.0-jar-with-dependencies.jar test.App // 启动集群    storm kill teststorm // 结束集群

转载于:https://www.cnblogs.com/ye-hcj/p/10252206.html

你可能感兴趣的文章
设计模式之---装饰器设计模式
查看>>
基于WordNet的英文同义词、近义词相似度评估及代码实现
查看>>
Equation漏洞混淆利用分析总结(上)
查看>>
shell学习1shell简介
查看>>
Qt 【无法打开 xxxx头文件】
查看>>
JAVA项目将 Oracle 转 MySQL 数据库转换(Hibernate 持久层)
查看>>
三层架构(我的理解及详细分析)
查看>>
Django模板语言相关内容
查看>>
前端开发工程师如何在2013年里提升自己【转】--2016已更新升级很多何去何从?...
查看>>
markdown语法测试集合
查看>>
running and coding
查看>>
实现QQ第三方登录、网站接入
查看>>
HTML CSS 层叠样式表 三
查看>>
Qt pro pri 文件学习1
查看>>
软件工程概论第六周学习进度条
查看>>
[思路]导入导出功能
查看>>
【iOS】UICollectionView自己定义Layout之蜂窝布局
查看>>
golang——(strings包)常用字符串操作函数
查看>>
发布aar到jcenter
查看>>
跨浏览器问题的五种解决方案
查看>>