Level Up Coding

Coding tutorials and news. The developer homepage gitconnected.com && skilled.dev && levelup.dev

Follow publication

An overview of Apache NiFi and Toolkit CLI deployments

--

Apache NiFi is a tool used to create Data Pipelines via a drag/drop interface and is designed to automate the flow of data between systems. It fits into the ‘no code/low code’ category of tools, primarily geared towards companies who feel less comfortable writing and managing code or building a solution that requires a significant amount of engineering effort.

What can NiFi be used for?

NiFi is good at reliably transferring data between platforms e.g. Kafka to ES and allows you to perform some lightweight ETL during the process. It can enrich and prepare data, perform conversion between data formats, change named fields and filter/route data to different locations.

However, it should not be used for complex event processing, distributed processing, joins, rolling windows or aggregation (it is not Apache Spark and doesn’t claim to be! 😎). Therefore it shouldn’t be compared to big data processing tools like AWS EMR or more flexible Open Source ETL Tools like Talend. A more reasonable comparison would be between Apache NiFi and StreamSets.

NiFi can integrate with some AWS services, such as S3, SNS, Lambda, SQS and DynamoDB.

If you need to integrate with streaming services, it’s worth noting that there is more support for the fellow Apache product Kafka, than there is AWS Kinesis. There is currently no ability to read from Kinesis Data Streams, but there is an open PR for the change!

NiFi Terminology

Below are some useful terms specific to constructing NiFi flows which will help in understanding the rest of this article:

FlowFile — is the data, it consists of content (the data) and attributes (metadata key value pairs e.g. ‘creation date’)

Processor — applies a set of transformations and rules to the FlowFiles and generates new FlowFiles. Apache NiFi boasts there are 280+ processors to choose from! Examples include getKafta, getFTP, putKinesisStream, putHDFS, convertRecord, replaceText, validateCSV etc.

Connector — a queue of all the FlowFiles that are yet to be processed by the next processor. Which defines rules about how FlowFiles are prioritised (which ones are first, or filtered out e.g. FIFO). Connectors can be used for Load balancing and re-directing ‘failed’ FlowFiles.

Controller Service — a shared service used by processors e.g. DB connections, AWS credentials, Writers and Readers that you can use within conversion processors (to name just a few).

Process Group — acts as an abstraction layer, generally good practice to use these as a way of breaking up large data flows.

Templates — can be used if you have a collection of processors performing some logic you’d like to repeat and re-use. e.g. common components you wish to share with others.

Input/Output ports — allow you to transfer data in or out of remote process groups.

Installation

Installation is very simple, you can download the nifi-<version>-bin.tar.gz via this link or pull, build and run the docker image.

UI and NiFi Flows

The UI is very intuitive and simple to use. You drag different components from the top banner onto the canvas and configure accordingly. You can also write labels within the code, a bit like JavaDoc!

An image of the UI is below, you can see a processing group in the centre of the canvas which you can right click and ‘enter’ into to display the flow within.

NiFi Canvas with a process group
An example NiFi flow: reads from S3, converts to JSON and writes to Kinesis Firehose

Here you can see how processors are used in conjunction with connectors to handle the flow of data. Failed files can be redirected to other pathways and handled/logged accordingly. No code is needed, this is all done through configuration. This flow lists CSV files in an S3 bucket (to achieve this you need a combination of ListS3 and FetchS3Object processors), logs any issues via a LogAttribute processors, converts the files to JSON and puts on to a Kinesis Firehose Stream.

Within your root installation NiFi directory (e.g. ‘/Applications/nifi-<version>’), state is stored within a ‘/state’ folder (unless you run as a cluster, then Zookeeper stores this). Logs are stored under ‘/logs’, this directory is utilised by the LogAttribute processor. By default the flow is saved in ‘/conf/flow.xml.gz’, but you can override this configuration along with many others by editing the ‘/conf/nifi.properties’ file’.

NiFi directs you to documentation (if available) via the small book icon, for example the View usage option below:

This is also available when adding controller services to a process group.

AWS Connectivity

You can create an AWSCredentialsProviderControllerService (set at process group level) to re-use your AWS credentials within multiple components of your flow. Specify either the location of a credentials file, which states the AWS Access Key and Secret Access Key or set these key’s directly within the properties of the controller service.

Then you can reference this controller service within the configuration of processors that require AWS connection e.g. the PutKinesisFirehose processor properties.

PutKinesisFireshose processor configuration

Running processors need to be stopped to apply any changes such as new controller services or other property modifications.

NiFi Registry

The NiFi Registry is an Apache NiFi sub project and aims to answer people’s deployment and version control needs! It provides a central location to store resource group flows. You have to use the registry UI to create a ‘bucket’ first (which are used to organise the version controlled flows) before adding the new registry client in your local NiFi flow. This article: NiFi-how-do-i-deploy-my-flow? has some nicely written steps detailing how to do this.

You can download the registry locally via this link or use Docker.

NiFi Toolkit and Deployments

The toolkit provides several command line utilities to help setup NiFi in single and clustered environments. Whilst investigating deployment options, I used the cli tool to connect to the NiFi Registry.

You can download the toolkit locally via this link or use Docker.

Within your downloaded toolkit bin directory (e.g. /Applications/nifi-toolkit-version/bin) there is a ‘cli.sh’ script which can be used to launch an interactive shell.

For my deployment to an AWS account, I used terraform to create two EC2 instances (using the m4.large instance type) running within the same VPC, one was installed with NiFi and the other with NiFi Registry. The simple user_data configuration for NiFi is below:

#!/usr/bin/env bash 
curl -fsSL https://get.docker.com get-docker.sh | sh
sudo docker pull apache/nifi:latest
sudo docker build -t apache/nifi:latest .
sudo docker run --name nifi -p 8080:8080 -d apache/nifi:latest

…. and NiFi Registry:

#!/usr/bin/env bash 
curl -fsSL https://get.docker.com get-docker.sh | sh
sudo docker pull apache/nifi-registry:latest
sudo docker build -t apache/nifi-registry:latest .
sudo docker run --name nifi-registry -p 18080:18080 /
-d apache/nifi-registry:latest

Both shell scripts install docker, pull, build and run the latest images and publish the default container ports to the instance.

For my ami_id I used a community edition of ubuntu-focal-20.04-amd64-server — this allows you to install docker on the instance.

I had already created a bucket (“leah’s_bucket”) in the remote NiFi Registry and committed a flow (“product_flow“). Now I wanted to see how to deploy this to the remote NiFi instance…

Local version controlled process group
Remote registry with committed versioned flow

Once you’ve launched the interactive shell, you can deploy and start a versioned flow with just a few cli commands:

First you need to create a new registry client in the remote NiFi instance, e.g:

nifi create-reg-client --baseUrl http://<public_ip_of_nifi>:8080 \
--registryClientUrl http://<private_ip_of_registry>:18080 \
--registryClientName leah's_client

The baseUrl is the url to execute the command against (as I’m running these cli commands on my local machine, the baseUrl is the public IP address of the NiFi EC2 instance — because I’m connecting over the internet).

The EC2 instance Security Group was modified to allow traffic from my local machine

The registryClientUrl can refer to the private IP of the NiFi Registry — as these two EC2 instances are within the same VPC in AWS so can talk to one another!

The registryClientName is whatever name you want this registry to be called within the NiFi UI.

Next we need to create a process group by importing a versioned flow from the registry, e.g:

nifi pg-import --bucketIdentifier 0455d64e-ea40-4213-bbe4-359985de09f9 --flowIdentifier 6df2b6c1-249b-4af3-8f84-b57f32f6c5aa \
--flowVersion 1 --baseUrl http://<public_ip_of_nifi>:8080

The bucketIdentifier and flowIdentifier can both be found in the screenshot of the committed versioned flow above, these are unique.

This pg-import command returns a unique id for the process group (processGroupId) — which you need for the next steps!

Enable any controller services you are using, e.g:

nifi pg-enable-services --processGroupId 94a93dc0-0173-1000-fee3-442454b1ecd6 --baseUrl http://<public_ip_of_nifi>:8080

Then you can start the process group, e.g:

nifi pg-start --processGroupId 94a93dc0-0173-1000-fee3-442454b1ecd6 --baseUrl http://<public_ip_of_nifi>:8080

You should be able to see the newly deployed process group running in the UI!

These commands could be scripted to provide a more automated way of deployment.

If you need to set variables in your deployed NiFi flow, check out this link for more details.

Things to consider when using Apache NiFi

  • Unit testing — with most “low code“ solutions unit testing is never easy and often goes overlooked 😢. NiFi have provided a way to test individual processors using a TestRunner class, however no way to test a process group. This article discusses the difficulties in testing the application and introduces a library they created for testing a full flow.
  • Reviewing the code — as the flow code format is xml, it would be hard to detect small but potentially impactful (e.g. connector load balance changes) modifications to the flow file via a code review tool. You’d need a fairly strict review process here and consider looking through the Flow Configuration History in the UI (images below). The registry provides a way of version controlling the code, so you could review the configuration locally, before promoting it to another environment.
Flow Configuration History
Flow Configuration Action Details for a component

Thanks for reading my whistle-stop tour of Apache NiFi! I hope you’ve found it useful 😊

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

Written by Leah Tarbuck

Software Engineer at BlackCat Technology Solutions, likes cats, running and yoga :)

Responses (1)

Write a response