How to fix the date issues in the data using pandas and write the corrected data to csv?
BIG DATA RECIPES DATA CLEANING PYTHON DATA MUNGING MACHINE LEARNING RECIPES PANDAS CHEATSHEET     ALL TAGS

How to fix the date issues in the data using pandas and write the corrected data to csv?

How to fix the date issues in the data using pandas and write the corrected data to csv?

This recipe helps you fix the date issues in the data using pandas and write the corrected data to csv

0

Recipe Objective

In most of the big data scenarios , there will be a requirement to use a date module for fixing the date issues like incorrect date , flipping the date format and changing the date formats and correct them based on certain criteria and flag the incorrect dates and fix them appropriately. This module is re-usable and can be used to fix most of the frequently occurring date issues in your big data pipeline.

System requirements :

  • Installpython modules as follows: pip install pandas pip install numpy
  • The below codes can be run in Jupyter notebook , or any python console
  • In this scenario we are going to use two excel files which have consequent months of data with various date fields and we fix them accordingly feb_order_dataset , jan_order_dataset

Step 1: Import the modules

In this example we are going to use the pandas, numpy and datetime modules these modules are used for data manipulation pandas data structures and operations for manipulating numerical tables and time series.

import pandas as pd import datetime import numpy as np

Step 2: function to flip month and date

def day_month_flip(date_to_flip): return pd.to_datetime(date_to_flip.strftime('%Y-%d-%m %H:%M:%S'))

Step 3: Write conditions to check the date issues

In the code below we wrote conditions to check the Incorrect dates,Flipped dates, New date values compared to previous file's data Invalid date format And add three new columns for new date, flag for new date, status of new date to the output file which can be added to the pipeline which can use the corrected date column for further ingestion.

def check_condtn1(date_to_check, month_val): if date_to_check > month_val: if date_to_check.day <= 12: flipped_date = day_month_flip(date_to_check) print('$$$$', flipped_date) if flipped_date <= month_val: return [flipped_date, 1] else: return [date_to_check, 2] else: return [date_to_check, 2] else: return [date_to_check, 0] def check_condtn2(date_to_check, date_to_check_prev, month_val): if date_to_check != date_to_check_prev: if date_to_check.day <= 12: flipped_date = day_month_flip(date_to_check) if flipped_date == date_to_check_prev and flipped_date <= month_val: return [flipped_date, 1] else: return [date_to_check, 2] else: return [date_to_check, 2] else: return [date_to_check, 0]

Step 4 : Checking new date value

In this below code we are compare the old order date with new order_date

def check_new_value(a, x, y): month_last = '2020-02-28' month_val = pd.to_datetime(month_last) if a == 'New Date Value Found': if pd.to_datetime(x) > pd.to_datetime(y) or pd.to_datetime(x) > month_val: flip = day_month_flip(x) if flip <= pd.to_datetime(y) and flip <= month_val: return (flip, 'flipping solved the issue') else: return ('', 'by flipping issue not solved') else: return ('', '')

In this below code , It is a main method to takes order_dates from the excel sheets, using the above conditions , using same columns in the sheets comparing the dates with previous dates, then it checks the conditions for date issues and if the conditions are true, it takes dates into list and then creates the new columns, it will print the status of date issue in the status column.

def date_check(dates, safe_prev, month_val): print(dates,safe_prev) date_list = [] final_date = [] doc = dates[0] for i in range(1): if pd.isnull(dates[i]) or (isinstance(dates[i], datetime.date) == False): date_list.extend([dates[i], 3]) elif isinstance(dates[i], datetime.date): date_list.append(check_condtn1(dates[i], month_val)) if safe_prev.empty == False and isinstance(safe_prev.values[0][i], datetime.date): date_list.append(check_condtn2(dates[i], safe_prev.values[0][i], month_val)) elif safe_prev.empty == False and pd.isnull(safe_prev.values[0][i]) and isinstance(dates[i], datetime.date): print('catch') print([dates[i], 4]) date_list.append([dates[i], 4]) else: date_list.append([dates[i], 0]) # segregating the combined results into separate list to their respective date lisdoc_list = [date_list[i] for i in [0]] doc_list = [date_list[i] for i in [0,1]] print('date_list',date_list) codes = [date_list[i][1] for i in range(0,2)] if 2 in codes or 3 in codes: flag = 'Issue in one of the date' else: flag = 0 # Assigning the value for codes so that it can be populated in status column for lists in [doc_list]: list_code = [lists[i][1] for i in range(2)] if 2 in list_code: for cond_result in lists: if cond_result[1] == 2: final_date.extend([cond_result[0], 'Date has issue']) break elif sum(list_code) == 0: for cond_result in lists: if cond_result[1] == 0: final_date.extend([cond_result[0], 'Date is correct']) break elif 3 in list_code: for cond_result in lists: if cond_result[1] == 3: final_date.extend([cond_result[0], 'Invalid Date format']) break elif 4 in list_code: for cond_result in lists: if cond_result[1] == 4: print('second_catch') final_date.extend([cond_result[0], 'New Date Value Found']) break else: for cond_result in lists: if cond_result[1] == 1: final_date.extend([cond_result[0], 'Flipping fixed the issue']) break final_date.append(flag) return pd.Series(final_date)

Step 5 : Reading the Data from excel sheets from local /HDFS

In this code below we are reading the data from excel sheets of the latest data and old data and creating the new columns and fixed dates issues and status of them writing the file to the HDFS/ local.

def execute(): # Importing rawdata data = pd.read_excel('feb_order_data.xlsx') df = data # Reading previous month data month_last = '2020-02-28' month_last = pd.to_datetime(month_last) data_prev = pd.read_excel('jan_order_data.xlsx') date_cols = ['order_date', 'request_id', 'order_no'] # print(list(data_prev)) if data_prev.empty: data_prev = pd.DataFrame(columns=date_cols) data_comp = data_prev[date_cols] for i in date_cols: df[i] = df[i].apply(lambda x: pd.to_datetime(x, errors='ignore') if type(x) == str or type(x) == bytes else x) data_comp[i] = data_comp[i].apply(lambda x: pd.to_datetime(x, errors='ignore') if type(x) == str or type(x) == bytes else x) data_comp.replace({pd.NaT: np.nan}, inplace=True) new_cols = ['order_date_new', 'order_date_status', 'status'] print('check before') date_df = df.apply(lambda x: date_check(x[date_cols],data_comp[(data_comp['order_no'] == x[date_cols][2]) & (data_comp['request_id'] == x[date_cols][1])], month_last), axis=1) print('check after') date_df.columns = new_cols print('new_data columns', date_df.columns) df = pd.concat([df, date_df], axis=1) print('after append', list(df)) df = df.replace(r'\\n', ' ', regex=True) df = df.replace(r'\\N', ' ', regex=True) df = df.replace(r'\n', ' ', regex=True) df = df.replace(r'\r', ' ', regex=True) # writing the cleaned file to hdfs print(list(data)) df.to_excel('resultof_fixeddates.xlsx', encoding = 'utf-8') execute() The complete code is as follows : import pandas as pd import datetime import numpy as np # function to flip the month and date def day_month_flip(date_to_flip): return pd.to_datetime(date_to_flip.strftime('%Y-%d-%m %H:%M:%S')) def check_condtn1(date_to_check, month_val): if date_to_check > month_val: if date_to_check.day <= 12: flipped_date = day_month_flip(date_to_check) print('$$$$', flipped_date) if flipped_date <= month_val: return [flipped_date, 1] else: return [date_to_check, 2] else: return [date_to_check, 2] else: return [date_to_check, 0] def check_condtn2(date_to_check, date_to_check_prev, month_val): if date_to_check != date_to_check_prev: if date_to_check.day <= 12: flipped_date = day_month_flip(date_to_check) if flipped_date == date_to_check_prev and flipped_date <= month_val: return [flipped_date, 1] else: return [date_to_check, 2] else: return [date_to_check, 2] else: return [date_to_check, 0] def check_new_value(a, x, y): month_last = '2020-02-28' month_val = pd.to_datetime(month_last) if a == 'New Date Value Found': if pd.to_datetime(x) > pd.to_datetime(y) or pd.to_datetime(x) > month_val: flip = day_month_flip(x) if flip <= pd.to_datetime(y) and flip <= month_val: return (flip, 'flipping solved the issue') else: return ('', 'by flipping issue not solved') else: return ('', '') def date_check(dates, safe_prev, month_val): print(dates,safe_prev) date_list = [] final_date = [] doc = dates[0] for i in range(1): if pd.isnull(dates[i]) or (isinstance(dates[i], datetime.date) == False): date_list.extend([dates[i], 3]) elif isinstance(dates[i], datetime.date): date_list.append(check_condtn1(dates[i], month_val)) if safe_prev.empty == False and isinstance(safe_prev.values[0][i], datetime.date): date_list.append(check_condtn2(dates[i], safe_prev.values[0][i], month_val)) elif safe_prev.empty == False and pd.isnull(safe_prev.values[0][i]) and isinstance(dates[i], datetime.date): print('catch') print([dates[i], 4]) date_list.append([dates[i], 4]) else: date_list.append([dates[i], 0]) # segregating the combined results into separate list to their respective date lisdoc_list = [date_list[i] for i in [0]] doc_list = [date_list[i] for i in [0,1]] print('date_list',date_list) codes = [date_list[i][1] for i in range(0,2)] if 2 in codes or 3 in codes: flag = 'Issue in one of the date' else: flag = 0 # Assigning the value for codes so that it can be populated in status column for lists in [doc_list]: list_code = [lists[i][1] for i in range(2)] if 2 in list_code: for cond_result in lists: if cond_result[1] == 2: final_date.extend([cond_result[0], 'Date has issue']) break elif sum(list_code) == 0: for cond_result in lists: if cond_result[1] == 0: final_date.extend([cond_result[0], 'Date is correct']) break elif 3 in list_code: for cond_result in lists: if cond_result[1] == 3: final_date.extend([cond_result[0], 'Invalid Date format']) break elif 4 in list_code: for cond_result in lists: if cond_result[1] == 4: print('second_catch') final_date.extend([cond_result[0], 'New Date Value Found']) break else: for cond_result in lists: if cond_result[1] == 1: final_date.extend([cond_result[0], 'Flipping fixed the issue']) break final_date.append(flag) return pd.Series(final_date) def execute(): # Importing rawdata data = pd.read_excel('feb_order_data.xlsx') df = data # Reading previous month data month_last = '2020-02-28' month_last = pd.to_datetime(month_last) data_prev = pd.read_excel('jan_order_data.xlsx') date_cols = ['order_date', 'request_id', 'order_no'] # print(list(data_prev)) if data_prev.empty: data_prev = pd.DataFrame(columns=date_cols) data_comp = data_prev[date_cols] for i in date_cols: df[i] = df[i].apply(lambda x: pd.to_datetime(x, errors='ignore') if type(x) == str or type(x) == bytes else x) data_comp[i] = data_comp[i].apply(lambda x: pd.to_datetime(x, errors='ignore') if type(x) == str or type(x) == bytes else x) data_comp.replace({pd.NaT: np.nan}, inplace=True) new_cols = ['order_date_new', 'order_date_status', 'status'] print('check before') date_df = df.apply(lambda x: date_check(x[date_cols],data_comp[(data_comp['order_no'] == x[date_cols][2]) & (data_comp['request_id'] == x[date_cols][1])], month_last), axis=1) print('check after') date_df.columns = new_cols print('new_data columns', date_df.columns) df = pd.concat([df, date_df], axis=1) print('after append', list(df)) df = df.replace(r'\\n', ' ', regex=True) df = df.replace(r'\\N', ' ', regex=True) df = df.replace(r'\n', ' ', regex=True) df = df.replace(r'\r', ' ', regex=True) # writing the cleaned file to hdfs print(list(data)) df.to_excel('resultof_fixeddates.xlsx', encoding = 'utf-8') execute()

Output of the above code is an excel file which will be written to current location of execution of the code and it looks like below :

Relevant Projects

Create A Data Pipeline Based On Messaging Using PySpark And Hive - Covid-19 Analysis
In this PySpark project, you will simulate a complex real-world data pipeline based on messaging. This project is deployed using the following tech stack - NiFi, PySpark, Hive, HDFS, Kafka, Airflow, Tableau and AWS QuickSight.

Spark Project -Real-time data collection and Spark Streaming Aggregation
In this big data project, we will embark on real-time data collection and aggregation from a simulated real-time system using Spark Streaming.

Analysing Big Data with Twitter Sentiments using Spark Streaming
In this big data spark project, we will do Twitter sentiment analysis using spark streaming on the incoming streaming data.

Design a Hadoop Architecture
Learn to design Hadoop Architecture and understand how to store data using data acquisition tools in Hadoop.

AWS Project - Build an ETL Data Pipeline on AWS EMR Cluster
Build a fully working scalable, reliable and secure AWS EMR complex data pipeline from scratch that provides support for all data stages from data collection to data analysis and visualization.

Hadoop Project-Analysis of Yelp Dataset using Hadoop Hive
The goal of this hadoop project is to apply some data engineering principles to Yelp Dataset in the areas of processing, storage, and retrieval.

Online Hadoop Projects -Solving small file problem in Hadoop
In this hadoop project, we are going to be continuing the series on data engineering by discussing and implementing various ways to solve the hadoop small file problem.

Movielens dataset analysis for movie recommendations using Spark in Azure
In this Databricks Azure tutorial project, you will use Spark Sql to analyse the movielens dataset to provide movie recommendations. As part of this you will deploy Azure data factory, data pipelines and visualise the analysis.

Yelp Data Processing Using Spark And Hive Part 1
In this big data project, we will continue from a previous hive project "Data engineering on Yelp Datasets using Hadoop tools" and do the entire data processing using spark.

PySpark Tutorial - Learn to use Apache Spark with Python
PySpark Project-Get a handle on using Python with Spark through this hands-on data processing spark python tutorial.