Fetch data from HDFS and store it into the MySQL table in NiFi

This recipe helps you fetch data from HDFS and store it into the MySQL table in NiFi. Apache NiFi is used as open-source software for automating and managing the data flow between systems in most big data scenarios. It is a robust and reliable system to process and distribute data.

Recipe Objective: How to fetch data from HDFS and store it into the MySQL table in NiFi?

In most big data scenarios, Apache NiFi is used as open-source software for automating and managing the data flow between systems. It is a robust and reliable system to process and distribute data. It provides a web-based User Interface to create, monitor, and control data flows. Gathering data using rest API calls is widely used to collect real-time streaming data in Big data environments to capture, process, and analyze the data. Here in this scenario, we will fetch data from HDFS and store it into the MySQL table.

System requirements :

Here is my local Hadoop; we have a CSV file fetching CSV files from the HDFS. The file looks as shown in the below image.

bigdata_1.jpg

Step 1: Configure The GetHDFS

Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles. This processor will delete the file from HDFS after fetching it. To configure the GetHDFS processor, provide information as shown below.

bigdata_2.jpg

As shown in the above image, we need to provide the Hadoop resource configurations, A file, or a comma-separated list of files that contain the Hadoop file system configuration. Without this, Hadoop will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or revert to a default configuration.

Provide the Directory Path to fetch data from and also provide file filter Regex as shown above. This processor will delete the file from HDFS after fetching it to keep it to fetch without deleting it from HDFS "Keep Source File" property value as True.

We scheduled this processor to run every 60 sec in the Run Schedule and Execution as the Primary node in Scheduling Tab.

Step 2: Configure The UpdateAttribute

Here we are configuring updateAttribute to added attribute "Schema.name" to configure the Avro schema registry.

bigdata_3.jpg

As shown in the above, we added a new attribute schema.name as dept is value.

Step 3: Configure the ConvertRecord and Create Controller Services

Using a CSV Reader controller service that references a schema in an AvroSchemaRegistry controller service. The AvroSchemaRegistry contains a "parlament_department" schema which defines information about each record (field names, field ids, field types), Using a Json controller service that references the same AvroSchemaRegistry schema.

In Convert record processor, the properties tab in the RecordReader value column drop down will get as below, then click on create new service.

bigdata_4.jpg

Then you will get the pop-up as below, select CSVReader in compatible controller service drop-down as shown below; we can also provide a name to the Controller service. Then click on Create.

bigdata_5.jpg

Follow the same steps to create a controller service for the JSON RecordSetWriter as below.

bigdata_6.jpg

To Enable Controller Services Select the gear icon from the Operate Palette:

bigdata_7.jpg

The output of the data:

bigdata_8.jpg

Step 4: Configure the Split JSON

Splits a JSON File into multiple, separate FlowFiles for an array element specified by a JsonPath expression. Each generated FlowFile is compressed of an element of the specified array and transferred to relationship 'split,' with the original file transferred to the 'original' relationship. If the specified JsonPath is not found or does not evaluate an array element, the original file is routed to 'failure,' and no files are generated.

bigdata_9.jpg

Since we have a JSON array in the output of the JSON data, we need to split the JSON object to call the attributes easily, so we are splitting the JSON object into a single line object as above.

Explore SQL Database Projects to Add them to Your Data Engineer Resume.

The output of the JSON data after splitting JSON object:

bigdata_10.jpg

Step 5: Configure the ConvertJsonToSQL

Converts a JSON-formatted FlowFile into an UPDATE, INSERT, or DELETE SQL statement. The incoming FlowFile is expected to be a "flat" JSON message, meaning that it consists of a single JSON element, and each field maps to a simple type. Suppose a field maps to a JSON object, that JSON object will be interpreted as Text. If the input is an array of JSON elements, each element in the array is output as a separate FlowFile to the 'SQL' relationship.

bigdata_11.jpg

Here we need to specify the JDBC Connection Pool (MySQL JDBC connection) to convert the JSON message to a SQL statement. The Connection Pool is necessary to determine the appropriate database column types. Also, we need to specify the statement type and Table name to insert data into that table, as shown in the above image.

bigdata_12.jpg

In the Database Connection Pooling service drop-down. Create a new service, DBCPConnectionPool, as shown above.

bigdata_13.jpg

Select the DBCP connection in the drop-down and click on the create then it will create, then after providing the information in the configuration as below:

bigdata_14.jpg

Provide the connection URL, Driver class Name, and the location driver, username, and password of the MySQL as shown in the image, then click on the apply to save the information .then after we get the below dialog box.

bigdata_15.jpg

Then after we enable the controller service by clicking on the thunder symbol and Enable it.

Before running the flow, please create a table in the MySQL database as we specified in DBCPConnectionPool.

To Create a table in the MySQL database as shown below:

bigdata_16.jpg

The output of the statement after converting JSON to SQL statement:

bigdata_17.jpg

Step 6: Configure the PutSQL

Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command to execute. The SQL command may use the? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes with the naming convention SQL.args.N.type and SQL.args.N.value, where N is a positive integer. The SQL.args.N.type is expected to be a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.

bigdata_18.jpg

Here we need to select the JDBC connection pool as we created the connection in the above step.

Output table data after insert into the table:

bigdata_19.jpg

Conclusion

Here we learned to fetch data from HDFS and store it into the MySQL table in NiFi.

What Users are saying..

profile image

Gautam Vermani

Data Consultant at Confidential
linkedin profile url

Having worked in the field of Data Science, I wanted to explore how I can implement projects in other domains, So I thought of connecting with ProjectPro. A project that helped me absorb this topic... Read More

Relevant Projects

Build a Streaming Pipeline with DBT, Snowflake and Kinesis
This dbt project focuses on building a streaming pipeline integrating dbt Cloud, Snowflake and Amazon Kinesis for real-time processing and analysis of Stock Market Data.

Python and MongoDB Project for Beginners with Source Code-Part 2
In this Python and MongoDB Project for Beginners, you will learn how to use Apache Sedona and perform advanced analysis on the Transportation dataset.

PySpark Project to Learn Advanced DataFrame Concepts
In this PySpark Big Data Project, you will gain hands-on experience working with advanced functionalities of PySpark Dataframes and Performance Optimization.

PySpark Project-Build a Data Pipeline using Hive and Cassandra
In this PySpark ETL Project, you will learn to build a data pipeline and perform ETL operations by integrating PySpark with Hive and Cassandra

GCP Project to Explore Cloud Functions using Python Part 1
In this project we will explore the Cloud Services of GCP such as Cloud Storage, Cloud Engine and PubSub

Yelp Data Processing using Spark and Hive Part 2
In this spark project, we will continue building the data warehouse from the previous project Yelp Data Processing Using Spark And Hive Part 1 and will do further data processing to develop diverse data products.

PySpark ETL Project for Real-Time Data Processing
In this PySpark ETL Project, you will learn to build a data pipeline and perform ETL operations for Real-Time Data Processing

Implementing Slow Changing Dimensions in a Data Warehouse using Hive and Spark
Hive Project- Understand the various types of SCDs and implement these slowly changing dimesnsion in Hadoop Hive and Spark.

Learn to Build Regression Models with PySpark and Spark MLlib
In this PySpark Project, you will learn to implement regression machine learning models in SparkMLlib.

Hive Mini Project to Build a Data Warehouse for e-Commerce
In this hive project, you will design a data warehouse for e-commerce application to perform Hive analytics on Sales and Customer Demographics data using big data tools such as Sqoop, Spark, and HDFS.