• 欢迎访问开心洋葱网站,在线教程,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站,欢迎加入开心洋葱 QQ群
  • 为方便开心洋葱网用户,开心洋葱官网已经开启复制功能!
  • 欢迎访问开心洋葱网站,手机也能访问哦~欢迎加入开心洋葱多维思维学习平台 QQ群
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏开心洋葱吧~~~~~~~~~~~~~!
  • 由于近期流量激增,小站的ECS没能经的起亲们的访问,本站依然没有盈利,如果各位看如果觉着文字不错,还请看官给小站打个赏~~~~~~~~~~~~~!

NiFi 学习 — 统计数

JAVA相关 supingemail 2691次浏览 0个评论

.

利用nifi统计单词出现的次数

  1. 从工具栏中拖入一个Processor,在弹出面板中搜索GenerateFlowFile,然后确认,设置GenerateFlowFile 的属性如下:

注意看属性的设置, 尤其要输入custom text的内容: 本例中输入的text内容是:

With each release of Apache NiFi, we tend to see at least one pretty powerful new application-level feature, in addition to all of the new and improved Processors that are added. And the latest release of NiFi, version 1.8.0, is no exception! Version 1.8.0 brings us a very powerful new feature, known as Load-Balanced Connections, which makes it much easier to move data around a cluster. Prior to this feature, when a user needs to spread data from one node in a cluster to all the nodes of the cluster, the best option was to use Remote Process Groups and Site-to-Site to move the data. The approach looks like this:

First, an Input Port has to be added to the Root Group. Then, a Remote Process Group has to be added to the flow in order to transfer data to that Root Group Input Port. The connection has to be drawn from the Processor to the Remote Process Group, and the Input Port has to be chosen. Next, the user will configure the specific Port within the Remote Process Group and set the Batch Size to 1 FlowFile, so that data is Round-Robin’ed between the nodes. A connection must then be made from the Root Group Input Port all the way back down, through the Process Groups, to the desired destination, so that the FlowFiles that are sent to the Remote Process Group are transferred to where they need to go in the flow. This may involve adding several more Local Input Ports to the Process Groups, to ensure that the data can flow to the correct destination. Finally, all of those newly created components have to be started.

b.从工具栏中拖入一个Processor,在弹出面板中搜索ExecuteScript,然后确认,设置ExecuteScript的属性如下:

 1.ScriptEngine 选择 Groovy

  2. 输入groovy统计的代码实现:(此代码可能因为字符集或者什么的容易出错,最好是用utf8的编辑工具重新编辑以便,这样才能保证运行处正常结果)

import org.apache.commons.io.IOUtils
import java.nio.charset.*

def flowFile = session.get()
if(!flowFile) return

flowFile = session.write(flowFile, {inputStream, outputStream ->
   def wordCount = [:]

   def tellTaleHeart = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
   def words = tellTaleHeart.split(/(!|\?|-|\.|\”|:|;|,|\s)+/)*.toLowerCase()

   words.each { word ->
   def currentWordCount = wordCount.get(word)
   if(!currentWordCount) {
          wordCount.put(word, 1)
   }
   else {
          wordCount.put(word, currentWordCount + 1)
   }
   }

   def outputMapString = wordCount.inject(“”, {k,v -> k += “${v.key}: ${v.value}\n”})

   outputStream.write(outputMapString.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)

flowFile = session.putAttribute(flowFile, ‘filename’, ‘wordcount.txt’)
session.transfer(flowFile, REL_SUCCESS)

将这段代码输入到Script Body 中去:如下图所示!

c. 从工具栏中拖入一个Processor,在弹出面板中搜索PutFIle,然后设置PutFile的属性,主要是设置output的地址

d.将GenerateFlowFile ExecuteScriptPutFile 这三个processor的组件连接起来: 连接顺序如下所示:

查看最终计算的结果:如下所示!

如此表示改用例是OK的,是没有什么问题的。

 


开心洋葱 , 版权所有丨如未注明 , 均为原创丨未经授权请勿修改 , 转载请注明NiFi 学习 — 统计数
喜欢 (0)

您必须 登录 才能发表评论!

加载中……