Encode data in attribute level and store it in HDFS in NiFi

This recipe helps you encode data in attribute level and store it in HDFS in NiFi. Apache NiFi is used as open-source software for automating and managing the data flow between systems in most big data scenarios. It is a robust and reliable system to process and distribute data.

Recipe Objective: How to encode data in attribute level and store it in HDFS in NiFi?

In most big data scenarios, Apache NiFi is used as open-source software for automating and managing the data flow between systems. It is a robust and reliable system to process and distribute data. It provides a web-based User Interface to create, monitor, and control data flows. Gathering data using rest API calls is widely used to collect real-time streaming data in Big data environments to capture, process, and analyze the data. In this scenario, we will fetch data from API, encoding the data at the content level and storing it into the HDFS filesystem. Encoding data at a field level is of utmost necessity in scenarios where PII data is involved; for example, Social security numbers need to be encoded where data gets transmitted over the network. Decoding of the data can be done in the API layer or in the same layer itself.

Yelp Dataset Analysis with Spark and Parquet

System requirements :

Step 1: Configure the GenerateFlow File

This processor creates FlowFiles with random data or custom content. Schedule the GenerateFlow file processor as shown below.

bigdata_1.jpg

We scheduled this processor to run every 60 sec in the Run Schedule and Execution as the Primary node.

Step 2: Configure the InvokeHttp

An HTTP client processor which can interact with a configurable HTTP Endpoint. The destination URL and HTTP Method are configurable. FlowFile attributes are converted to HTTP headers, and the FlowFile contents are included as the request's body.

Here we are specifying the open public API URL in the invoke Http Processor as an endpoint.

bigdata_2.jpg

We Specified the HTTP Method as GET and passed the endpoint URL in the Remote URL as shown above. Then we will get a response as the JSON data.

The output of the JSON data:

bigdata_3.jpg

Step 3: Configure the Split JSON

Splits a JSON File into multiple, separate FlowFiles for an array element specified by a JsonPath expression. Each generated FlowFile is compressed of an element of the specified array and transferred to relationship 'split,' with the original file transferred to the 'original' relationship. If the specified JsonPath is not found or does not evaluate an array element, the original file is routed to 'failure,' and no files are generated.

bigdata_4.jpg

Since we have a JSON array in the output of the JSON data, we need to split the JSON object to call the attributes easily, so we are splitting the JSON object into a single line object as above.

The output of the JSON data after splitting JSON object:

bigdata_5.jpg

Step 4: Configure the EvaluateJsonPath

To Evaluate the attribute values from JSON, JsonPaths are entered by adding user-defined properties; the property's name maps to the Attribute Name into which the result will be placed (if the Destination is flowfile-attribute; otherwise, the property name is ignored). The value of the property must be a valid JsonPath.

bigdata_6.jpg

In the above image, we are evaluating the required attribute naming attributes meaning full.

The output of the above-evaluated attributes:

bigdata_7.jpg

Step 5: Configure the UpdateAttribute

Updates the Attributes for a FlowFile using the Attribute Expression Language and/or deletes the attributes based on a regular expression.

Here we are going to encode the data in the attribute level as shown below:

bigdata_8.jpg

The output of the encoded attributes:

bigdata_9.jpg

Step 6: Configure the ReplaceText

Updates the content of a FlowFile by evaluating a Regular Expression (regex) against it and replacing the section of the content that matches the Regular Expression with some alternate value.

bigdata_10.jpg

After evaluating the required attributes and their values, we arrange them column by column using ReplaceText below.

bigdata_11.jpg

The output of the data is as below:

bigdata_12.jpg

Step 7: Configure the MergeContent

Merges a Group of FlowFiles based on a user-defined strategy and packages them into a single FlowFile. We are merging the single row 1000 rows as a group as for that, we need to configure it as below.

bigdata_13.jpg

In the above, we need to specify the Delimiter Strategy as Text and In Demarcator value press shift button + Enter then click OK because we need to add every row in the new line.

The output of the data:

bigdata_14.jpg

Step 8: Configure the UpdateAttribute to update the filename

Updates the Attributes for a FlowFile using the Attribute Expression Language and/or deletes the attributes based on a regular expression. Here, we will add an attribute called filename to FlowFile as value dept.

bigdata_15.jpg

The output of the file name:

bigdata_16.jpg

Step 9: Configure the UpdateAttribute to update file extension

Configured the update attribute processor as below; UpdateAttribute adds the file name with the CSV extension as an attribute to the FlowFile. We need to give the name for the FlowFile differently for that we are using the UpdateAttribute to update the filename and providing the UUID for FlowFile as a unique identity name as below

bigdata_17.jpg

The output of the file name:

bigdata_18.jpg

Step 10: Configure the PutHDFS

Write FlowFile data to Hadoop Distributed File System (HDFS). Here we are writing parsed data from the HTTP endpoint and storing it into the HDFS to configure the processor below.

Note In the Hadoop configurations, we should provide the 'core-site.xml' and 'hdfs-site.xml' files because Hadoop will search the classpath for a 'core-site.xml' 'hdfs-site.xml' file or will revert to a default configuration.

bigdata_19.jpg

Here in the above image, we provided Hadoop configurations resources, and in Directory, we have given the directory name be to store files. We have given value append for the conflict resolutions strategy append because it will append to it when new data comes.

The output of the stored data in the HDFS and its file structure:

bigdata_20.jpg

Conclusion

Here we learned to encode data in attribute level and store it in HDFS in NiFi.

What Users are saying..

profile image

Jingwei Li

Graduate Research assistance at Stony Brook University
linkedin profile url

ProjectPro is an awesome platform that helps me learn much hands-on industrial experience with a step-by-step walkthrough of projects. There are two primary paths to learn: Data Science and Big Data.... Read More

Relevant Projects

Build an AWS ETL Data Pipeline in Python on YouTube Data
AWS Project - Learn how to build ETL Data Pipeline in Python on YouTube Data using Athena, Glue and Lambda

Explore features of Spark SQL in practice on Spark 2.0
The goal of this spark project for students is to explore the features of Spark SQL in practice on the latest version of Spark i.e. Spark 2.0.

Build Serverless Pipeline using AWS CDK and Lambda in Python
In this AWS Data Engineering Project, you will learn to build a serverless pipeline using AWS CDK and other AWS serverless technologies like AWS Lambda and Glue.

Hive Mini Project to Build a Data Warehouse for e-Commerce
In this hive project, you will design a data warehouse for e-commerce application to perform Hive analytics on Sales and Customer Demographics data using big data tools such as Sqoop, Spark, and HDFS.

Real-time Auto Tracking with Spark-Redis
Spark Project - Discuss real-time monitoring of taxis in a city. The real-time data streaming will be simulated using Flume. The ingestion will be done using Spark Streaming.

Build an Incremental ETL Pipeline with AWS CDK
Learn how to build an Incremental ETL Pipeline with AWS CDK using Cryptocurrency data

Building Data Pipelines in Azure with Azure Synapse Analytics
In this Microsoft Azure Data Engineering Project, you will learn how to build a data pipeline using Azure Synapse Analytics, Azure Storage and Azure Synapse SQL pool to perform data analysis on the 2021 Olympics dataset.

Build an ETL Pipeline with Talend for Export of Data from Cloud
In this Talend ETL Project, you will build an ETL pipeline using Talend to export employee data from the Snowflake database and investor data from the Azure database, combine them using a Loop-in mechanism, filter the data for each sales representative, and export the result as a CSV file.

Build a big data pipeline with AWS Quicksight, Druid, and Hive
Use the dataset on aviation for analytics to simulate a complex real-world big data pipeline based on messaging with AWS Quicksight, Druid, NiFi, Kafka, and Hive.

Learn Efficient Multi-Source Data Processing with Talend ETL
In this Talend ETL Project , you will create a multi-source ETL Pipeline to load data from multiple sources such as MySQL Database, Azure Database, and API to Snowflake cloud using Talend Jobs.