增加第四小节内容
This commit is contained in:
parent
78bc278f7b
commit
00baca93d5
|
|
@ -403,6 +403,7 @@ public static boolean isTick(Tuple tuple) {
|
||||||

|

|
||||||
|
|
||||||
Topology
|
Topology
|
||||||
|
|
||||||
'''
|
'''
|
||||||
String spoutId = "wordGenerator";
|
String spoutId = "wordGenerator";
|
||||||
String counterId = "counter";
|
String counterId = "counter";
|
||||||
|
|
@ -418,12 +419,14 @@ Topology
|
||||||
// TotalRankingsBolt, 将完成完整聚合,统计出top-n的话题
|
// TotalRankingsBolt, 将完成完整聚合,统计出top-n的话题
|
||||||
builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
|
builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
|
||||||
```
|
```
|
||||||
|
|
||||||
上面的topology设计如下:
|
上面的topology设计如下:
|
||||||
|
|
||||||

|

|
||||||
|
|
||||||
将聚合计算与时间结合起来
|
将聚合计算与时间结合起来
|
||||||
前文,我们叙述了tick事件,回调中会触发bolt的execute方法,那可以这么做:
|
前文,我们叙述了tick事件,回调中会触发bolt的execute方法,那可以这么做:
|
||||||
|
|
||||||
```
|
```
|
||||||
RollingCountBolt:
|
RollingCountBolt:
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -459,6 +462,7 @@ RollingCountBolt:
|
||||||
emit(counts, actualWindowLengthInSeconds);
|
emit(counts, actualWindowLengthInSeconds);
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
上面的代码可能有点抽象,看下这个图就明白了,tick一到,窗口就滚动:
|
上面的代码可能有点抽象,看下这个图就明白了,tick一到,窗口就滚动:
|
||||||
|
|
||||||

|

|
||||||
|
|
@ -480,6 +484,7 @@ IntermediateRankingsBolt & TotalRankingsBolt:
|
||||||
  其中,IntermediateRankingsBolt和TotalRankingsBolt的聚合排序方法略有不同:
|
  其中,IntermediateRankingsBolt和TotalRankingsBolt的聚合排序方法略有不同:
|
||||||
|
|
||||||
IntermediateRankingsBolt的聚合排序方法:
|
IntermediateRankingsBolt的聚合排序方法:
|
||||||
|
|
||||||
```
|
```
|
||||||
@Override
|
@Override
|
||||||
void updateRankingsWithTuple(Tuple tuple) {
|
void updateRankingsWithTuple(Tuple tuple) {
|
||||||
|
|
@ -489,7 +494,9 @@ IntermediateRankingsBolt的聚合排序方法:
|
||||||
super.getRankings().updateWith(rankable);
|
super.getRankings().updateWith(rankable);
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
TotalRankingsBolt的聚合排序方法:
|
TotalRankingsBolt的聚合排序方法:
|
||||||
|
|
||||||
```
|
```
|
||||||
@Override
|
@Override
|
||||||
void updateRankingsWithTuple(Tuple tuple) {
|
void updateRankingsWithTuple(Tuple tuple) {
|
||||||
|
|
@ -501,13 +508,16 @@ TotalRankingsBolt的聚合排序方法:
|
||||||
super.getRankings().pruneZeroCounts();
|
super.getRankings().pruneZeroCounts();
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
而重排序方法比较简单粗暴,因为只求前N个,N不会很大:
|
而重排序方法比较简单粗暴,因为只求前N个,N不会很大:
|
||||||
|
|
||||||
```
|
```
|
||||||
private void rerank() {
|
private void rerank() {
|
||||||
Collections.sort(rankedItems);
|
Collections.sort(rankedItems);
|
||||||
Collections.reverse(rankedItems);
|
Collections.reverse(rankedItems);
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
  结语
|
  结语
|
||||||
|
|
||||||
  下图可能就是我们想要的结果,我们完成了t0 - t1时刻之间的热点话题统计.
|
  下图可能就是我们想要的结果,我们完成了t0 - t1时刻之间的热点话题统计.
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue