博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm手写WordCount
阅读量:5791 次
发布时间:2019-06-18

本文共 5285 字,大约阅读时间需要 17 分钟。

建立一个maven项目,在pom.xml中进行如下配置:

4.0.0
cn.darrenchan
StormDemo
0.0.1-SNAPSHOT
StormDemo
org.apache.storm
storm-core
0.9.5
maven-assembly-plugin
jar-with-dependencies
cn.itcast.bigdata.hadoop.mapreduce.wordcount.WordCount
make-assembly
package
single
org.apache.maven.plugins
maven-compiler-plugin
1.7
1.7

项目目录为:

MySpout.java:

package cn.darrenchan.storm;import java.util.Map;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;public class MySpout extends BaseRichSpout {        private SpoutOutputCollector collector;    //storm框架不停地调用nextTuple方法     //values继承ArrayList     @Override    public void nextTuple() {        collector.emit(new Values("i am lilei love hanmeimei"));    }    //初始化方法    @Override    public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {        this.collector = collector;    }    //声明本spout组件发送出去的tuple中的数据的字段名    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {        declarer.declare(new Fields("love"));    }}

MySplitBolt.java:

package cn.darrenchan.storm;import java.util.Map;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class MySplitBolt extends BaseRichBolt {        private OutputCollector collector;    //storm框架不停地调用,传入参数是tutle    @Override    public void execute(Tuple input) {        String line = input.getString(0);        String[] words = line.split(" ");        for (String word : words) {
//Values有两个,对应下面Fields有两个 collector.emit(new Values(word, 1)); } } //初始化方法 @Override public void prepare(Map config, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {
//Fields有两个,对应上面Values有两个 declarer.declare(new Fields("word", "num")); }}

MyCountBolt.java:

package cn.darrenchan.storm;import java.util.HashMap;import java.util.Map;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;public class MyCountBolt extends BaseRichBolt {        private OutputCollector collector;    private Map
map; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; map = new HashMap
(); } @Override public void execute(Tuple input) { String word = input.getString(0); Integer num = input.getInteger(1); if(map.containsKey(word)){ map.put(word, map.get(word) + num); } else { map.put(word, 1); } System.out.println(map); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { }}

WordCountTopoloyMain.java:

package cn.darrenchan.storm;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;public class WordCountTopoloyMain {    public static void main(String[] args) throws Exception {        //1.准备一个TopologyBuilder        TopologyBuilder builder = new TopologyBuilder();        builder.setSpout("mySpout", new MySpout(), 1);        builder.setBolt("mySplitBolt", new MySplitBolt(), 2).shuffleGrouping("mySpout");        builder.setBolt("myCountBolt", new MyCountBolt(), 2).fieldsGrouping("mySplitBolt", new Fields("word"));                //2.创建一个configuration,用来指定当前的topology需要的worker的数量        Config config = new Config();        config.setNumWorkers(4);                //3.任务提交 两种模式————本地模式和集群模式        //集群模式        //StormSubmitter.submitTopology("myWordCount", config, builder.createTopology());        //本地模式        LocalCluster localCluster = new LocalCluster();        localCluster.submitTopology("myWordCount", config, builder.createTopology());    }}

 

三种求wordcount方式 比较:

整体运行架构图:

 

转载地址:http://azwfx.baihongyu.com/

你可能感兴趣的文章
POJ 1258:Agri-Net Prim最小生成树模板题
查看>>
(转载)多线程和异步
查看>>
phpstorm 2017激活码(方法)
查看>>
Linux - 几种方法来实现scp拷贝时无需输入密码
查看>>
ArcGIS下如何提取研究区域
查看>>
ARM Translation 下载 | ARM Translation Download
查看>>
Nginx和Apache配置日志格式记录Cookie
查看>>
【P1304】【P1305】选课与选课输出方案
查看>>
Thread 源码分析
查看>>
Kerneloops为Linux用户与开发人员搭建“oops”报告提交桥梁
查看>>
设置桌面图标
查看>>
mark matcap
查看>>
win8或win8.1修改注册表失败的原因
查看>>
Hibernate执行流程和关系映射
查看>>
Siverlight MarkerSize 控制数据点半径大小 LineThickness 控制点与点之间直线的厚度
查看>>
easyui 获取特定页签tab
查看>>
oracle存储过程结合我公司代码1
查看>>
一些莫名其妙的东东
查看>>
今天开始闭关一个月看C语言 不做汽车配件了,捡回自己以前的东西 相信自己...
查看>>
符号常量与常变量的区别
查看>>