代码比较简单,看图说话:
package storm.benchmark;import backtype.storm.Config;import backtype.storm.StormSubmitter;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.TopologyBuilder;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.Map;import java.util.Random;public class ThroughputTest { public static class GenSpout extends BaseRichSpout { private static final Character[] CHARS = new Character[] { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'}; SpoutOutputCollector _collector; int _size; Random _rand; String _id; String _val; public GenSpout(int size) { _size = size; } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); _id = randString(5); _val = randString(_size); } @Override public void nextTuple() { _collector.emit(new Values(_id, _val)); } private String randString(int size) { StringBuffer buf = new StringBuffer(); for(int i=0; i