Integrating Apache NiFi with external API’s

What is NiFi?

Apache Nifi is a data logistics platform used for automating the data flow between disparate data sources and systems which makes the data ingestion fast and secure.

Connecting Nifi to external API:

To connect Nifi with the external API we have used the InvokeHttp processor. We can configure it by right-clicking on the processor and provide the link from where the data needs to be fetched.

Provide the URL and the authorization credentials in the properties tab of the processor.

Implementation:

We need to get data from an API and store the necessary columns in postgres database.

We can get the data by using Invokehttp processor. The resultant data is in json format.  

To split json array into individual records we use SplitJson processor. In some cases, the resultant data is in nested json format.

To convert into single json file we use JoltTransformJson. To evaluate one or more json path expression we use EvaluateJsonPath processor,resultant of the processor are designate to the flow file attribute.

Cutting edge Big Data Engineering Services at your Finger Tips

Read More

By using evaluate json path processor we can filter required data in json. Then we use AttributeToJson processor for converting resultant attributes into json format.

Finally, we use ReplaceText processor for parsing query and ExecuteSql processor for executing the query.

1)Type casting:

In the above example, we need to store the column ‘active’ as an integer in the Postgres database.

To achieve this, we have used Update Attribute processor which supports nifi-expression language.

We have added a property as ‘active’ and converted it to integer by passing a property value as ${active:toNumber()}.

Challenges Faced:

While using AttributetoJson processor for writing all the flow file attribute the resultant json values will be of string data type.

In the above example, we need to store the column ‘active’ as an integer in the postgres database.

Example:

{

  “dept_name ” : “CSE”,

  “active” : “1”

           }

To achieve this, we have used Update Attribute processor which supports nifi-expression language. We have added a property as ‘active’ and converted it to integer by passing a property value as ${active:toNumber()}.

2)Handling apostrophe:

In our use case, we had to store the values with apostrophe in the database. But while trying to store it using ExecuteSql process we got an error message as “Invalid string”.

In order to store the values with apostrophe, we have added a property to replace the apostrophe with an empty character in the Update Attribute processor.

Example:

If the value of dept_name is  Royalty’s, add a new property named as “dept_name” and pass the condition in property value as ${dept_name:replaceAll(‘\”,’”‘)}

3)Handling a large dataset:

MergeContent processor can be used for executing batch queries. It reduces the execution time taken by inserting bulk data. By default, it inserts 1000 records in a batch. We can also change the number of records to be inserted.

4)Storing records using a timestamp:

To store recent records based on the updated date in the postgres database, we have added a property in the UpdateAttribute processor.

Example:

If the value of updated_at is 2017-12-28T01:47:05Z, here we want to convert this UTC time format into actual date format. To do this, we have added a property as updated_at and passed the condition in property value as

${updated_at:toDate(“yyyy-MM-dd’T’HH:mm:ss’Z’”):format(‘yyyyMMdd’)}.

Then pass the condition by using which, the data needs to be fetched in the RouteOnAttribute processor. Here, we have taken the records that are updated after the given timestamp.

We can also filter the records based on the timestamp while fetching it from the API.

The above property will retrieve all the records from the API and based on the timestamp, the recent records will be stored whereas by defining it globally we can just retrieve the records that are updated after the given time frame.

To pass a variable as global, right click on the processor group. Under variables, add the variable which we want to pass to all the flows.

5)Handling Null values:

To handle null records and route it into failure, we have added a property in Update Attribute processor: This property will check whether all the columns are empty if so, then it will not be stored and will be routed to failure.

6)Pagination:

To iterate multiple pages and retrieve records, we had to use GenerateFlowFile and Set Initial Pagination Parameter processor. In Set Initial Pagination Parameter processor add a property as given below.

This property will be the value to the parameter in the URL that we have given in the InvokeHTTP processor.

Leverge your Biggest Asset Data

Inquire Now

https://api.example.com/v2/clients?page=${page}

Then add the name of the property where the total number of page information will be available in the EvaluateJsonPath processor.

In our use case, the next_page attribute contains the total number of pages.

Then we use ExtractText processor to get the current page number.

In RouteOnAttribute processor, add the below property so that it will get iterated till the last page.

Each time the page argument in the InvokeHTTP URL will be replaced with the current page number and this will run till the last page. Below is the whole flow.