当前位置: 首页 > 图灵资讯 > 技术篇> Apache Storm技术实战之1 -- WordCountTopology

Apache Storm技术实战之1 -- WordCountTopology

来源:图灵教育
时间:2023-06-13 09:17:53

“源代码走读系列”从代码层面分析了storm的具体实现,然后通过具体实例说明storm的使用情况。由于storm已经正式迁移到Apache,文章系列也是twitter storm转变为apachee storm.

WordCountTopology 使用storm统计文件中每个单词的出现次数。

通过这个例子来解释tuple发送时的几个要素

  1. source component 发送源
  2. destination component 接收者
  3. stream 消息通道
  4. tuple 消息本身

本文所涉及的开发环境建设可参考前两篇博文。

  1. arch linux简易安装指南
  2. 在archlinux上建立storm cluster
awk实现

事实上,统计文件中的单词是Linux的下一个非常常见的任务。它可以很容易地用awk解决(如果文件不太大)。以下是word counting的awk脚本将其保存为wordcountt.awk文件。

wordcount.awk

{ for (i = 1; i<=NF; i++)    freq[$i]++ }END{ for (word in freq)    printf "%s\t%d\n",word,freq[word]}

操作脚本,统计文件中的单词

gawk -f wordcount.awk filename

原始版本

在github上复制内容

git clone https://github.com/nathanmarz/storm-starter.git

编译运行

lein depslein compilejava -cp $(lein classpath) WordCountTopology

main函数

main函数的主要内容

TopologyBuilder builder = new TopologyBuilder();    builder.setSpout("spout", new RandomSentenceSpout(), 5);    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

注:grouping操作时,如果指定stream没有显示 id,使用default stream. 例如shuflegrouping(”spout)表示从名为spout的component中接收default stream发送的tupleam.

改进版本

Apache Storm技术实战之1 -- WordCountTopology_vim

在原版中,spout不断地向split发送 bolt随机发送句子,Count bolt统计每个单词出现的次数。

读完文件后,Spout能通知下游bolt显示最新的统计结果吗?

为实现上述改进目标,可采用上图所示的结构。变化如下,

  1. 在Spout中添加SUCCES_STREAM
  2. statisticsscs只添加一个操作实例 bolt
  3. 当spout读取文件内容时,通过SUCCES_STREAM告诉statisticstics bolt,文件已经处理好,可以打印当前的统计结果
RandomSentenceSpout.java declareOutputFields

添加SUCCESS_STREAM

@Override public void declareOutputFields(OutputFieldsDeclarer declarer) {    declarer.declare(new Fields("word"));    declarer.declareStream("SUCCESS_STREAM",new Fields("word"));  }

nextTuple

使用SUCCES_STREAM通知下游,文件处理完毕

@Override public void nextTuple() {    Utils.sleep(100);    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; if ( count == sentences.length )     {      System.out.println(count+" try to emit tuple by success_stream");      _collector.emit("SUCCESS_STREAM",new Values(sentences[0]));      count++;    }else if ( count < sentences.length ){      _collector.emit(new Values(sentences[count]));      count++;    }  }

WordCountTopology.java 添加静态WordCount2

public static class WordCount2 extends BaseBasicBolt {    Map<String, Integer> counts = new HashMap<String, Integer>();    @Override public void execute(Tuple tuple, BasicOutputCollector collector) { if ( tuple.getSourceStreamId() == "SUCCESS_STREAM" ) {    System.out.println("prepare to print the statistics"); for (String key : counts.keySet()) {      System.out.println(key+"\t"+counts.get(key));    }    System.out.println("finish printing");      }else {    String word = tuple.getString(0);    Integer count = counts.get(word); if (count == null)      count = 0;    count++;    counts.put(word, count);      }    }

main函数

将spout的并行数从5改为1

builder.setSpout("spout", new RandomSentenceSpout(), 1);

将WordCount2添加到原始Topology中 Bolt

builder.setBolt(count2”, new WordCount2(), 1).globalGrouping("count").globalGrouping("spout","SUCCESS_STREAM");

WordCount2 从Countt接收Bolt 通过default通过Bolt通过defalt stream发送的tuple,Spout通过SUCESS_STREAM发送的tuple,也就是说wordcount2将从两个stream接收数据。

编译

编译修改后的源文件

cd $STROM_STARTERlein compile storm.starter

可能会出现以下异常信息,可以忽略。

Exception in thread "main" java.io.FileNotFoundException: Could not locate storm/starter/WordCountTopology__init.class or storm/starter/WordCountTopology.clj on classpath:

运行

WordCountopology在local模式下运行修改后

java -cp $(lein classpath) storm.starter.WordCountTopology

假如一切正常,日志如下所示,线程的名称可能会有所不同。

moon    1score    1cow    1doctor    1over    1nature    1snow    1four    1keeps    1with    1a    1white    1dwarfs    1at    1the    4and    2i    1two    1away    1seven    2apple    1am    1an    1jumped    1day    1years    1ago    1

结果验证

与awk脚本的运行结果相比,WordCountopology的运行结果应该是一致的。

小技巧
  1. awk脚本的执行结果存储为文件result1.log, WordCountopology输出中的单词统计部分存储在result2.log
  2. 用vim打开result1.log,sorting,保存结果;用vim打开result2.log,保存sorting。
  3. 然后用vimdiff进行比较 vimdiff result1.log result2.log

转载 :

“源代码走读系列”从代码层面分析了storm的具体实现,然后通过具体实例说明storm的使用情况。由于storm已经正式迁移到Apache,文章系列也是twitter storm转变为apachee storm.

WordCountTopology 使用storm统计文件中每个单词的出现次数。

通过这个例子来解释tuple发送时的几个要素

  1. source component 发送源
  2. destination component 接收者
  3. stream 消息通道
  4. tuple 消息本身

本文所涉及的开发环境建设可参考前两篇博文。

  1. arch linux简易安装指南
  2. 在archlinux上建立storm cluster
awk实现

事实上,统计文件中的单词是Linux的下一个非常常见的任务。它可以很容易地用awk解决(如果文件不太大)。以下是word counting的awk脚本将其保存为wordcountt.awk文件。

wordcount.awk

{ for (i = 1; i<=NF; i++)    freq[$i]++ }END{ for (word in freq)    printf "%s\t%d\n",word,freq[word]}

操作脚本,统计文件中的单词

gawk -f wordcount.awk filename

原始版本

在github上复制内容

git clone https://github.com/nathanmarz/storm-starter.git

编译运行

lein depslein compilejava -cp $(lein classpath) WordCountTopology

main函数

main函数的主要内容

TopologyBuilder builder = new TopologyBuilder();    builder.setSpout("spout", new RandomSentenceSpout(), 5);    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

注:grouping操作时,如果指定stream没有显示 id,使用default stream. 例如shuflegrouping(”spout)表示从名为spout的component中接收default stream发送的tupleam.

改进版本

Apache Storm技术实战之1 -- WordCountTopology_vim

在原版中,spout不断地向split发送 bolt随机发送句子,Count bolt统计每个单词出现的次数。

读完文件后,Spout能通知下游bolt显示最新的统计结果吗?

为实现上述改进目标,可采用上图所示的结构。变化如下,

  1. 在Spout中添加SUCCES_STREAM
  2. statisticsscs只添加一个操作实例 bolt
  3. 当spout读取文件内容时,通过SUCCES_STREAM告诉statisticstics bolt,文件已经处理好,可以打印当前的统计结果
RandomSentenceSpout.java declareOutputFields

添加SUCCESS_STREAM

@Override public void declareOutputFields(OutputFieldsDeclarer declarer) {    declarer.declare(new Fields("word"));    declarer.declareStream("SUCCESS_STREAM",new Fields("word"));  }

nextTuple

使用SUCCES_STREAM通知下游,文件处理完毕

@Override public void nextTuple() {    Utils.sleep(100);    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; if ( count == sentences.length )     {      System.out.println(count+" try to emit tuple by success_stream");      _collector.emit("SUCCESS_STREAM",new Values(sentences[0]));      count++;    }else if ( count < sentences.length ){      _collector.emit(new Values(sentences[count]));      count++;    }  }

WordCountTopology.java 添加静态WordCount2

public static class WordCount2 extends BaseBasicBolt {    Map<String, Integer> counts = new HashMap<String, Integer>();    @Override public void execute(Tuple tuple, BasicOutputCollector collector) { if ( tuple.getSourceStreamId() == "SUCCESS_STREAM" ) {    System.out.println("prepare to print the statistics"); for (String key : counts.keySet()) {      System.out.println(key+"\t"+counts.get(key));    }    System.out.println("finish printing");      }else {    String word = tuple.getString(0);    Integer count = counts.get(word); if (count == null)      count = 0;    count++;    counts.put(word, count);      }    }

main函数

将spout的并行数从5改为1

builder.setSpout("spout", new RandomSentenceSpout(), 1);

将WordCount2添加到原始Topology中 Bolt

builder.setBolt(count2”, new WordCount2(), 1).globalGrouping("count").globalGrouping("spout","SUCCESS_STREAM");

WordCount2 从Countt接收Bolt 通过default通过Bolt通过defalt stream发送的tuple,Spout通过SUCESS_STREAM发送的tuple,也就是说wordcount2将从两个stream接收数据。

编译

编译修改后的源文件

cd $STROM_STARTERlein compile storm.starter

可能会出现以下异常信息,可以忽略。

Exception in thread "main" java.io.FileNotFoundException:Could not locate storm/starter/WordCountTopology__init.class or storm/starter/WordCountTopology.clj on classpath:

运行

WordCountopology在local模式下运行修改后

java -cp $(lein classpath) storm.starter.WordCountTopology

假如一切正常,日志如下所示,线程的名称可能会有所不同。

moon    1score    1cow    1doctor    1over    1nature    1snow    1four    1keeps    1with    1a    1white    1dwarfs    1at    1the    4and    2i    1two    1away    1seven    2apple    1am    1an    1jumped    1day    1years    1ago    1

结果验证

与awk脚本的运行结果相比,WordCountopology的运行结果应该是一致的。

小技巧
  1. awk脚本的执行结果存储为文件result1.log, WordCountopology输出中的单词统计部分存储在result2.log
  2. 用vim打开result1.log,sorting,保存结果;用vim打开result2.log,保存sorting。
  3. 然后用vimdiff进行比较 vimdiff result1.log result2.log