We have a department in our org that generates a lot of data in flat files, all with different formats. We are now trying to sort this data out and load into a database. As step 1, I am trying to identify the structure(separator, header, fields, datatypes, etc) of the files using a python script. This is my first time doing python programming, also I am programming after a long time. While this code gives me the output I need, how can I make it better?
The code looks at the directory C:\logs and loops through all files in the directory first to identify a separator and header, then if the separator is space it will check if it is a fixed width file. use the information here to identify all fields and their related attributes and prints out a mapping file. It also combines multiple mapping files into one mapping file if possible (this allows the mapping sheet to accommodate format variances across multiple files). I want to be able to use this on Excel files later.
SAMPLE INPUT:

Example Input in txt format below. However this code is supposed to identify 'any character' delimited or fixed width files. Files can have any number of header lines, etc.
Some Title
Grp IDS Date Time Weight
1 3559 3-10-19 8:45 1159.4
1 3680 3-10-19 8:40 1861.2
1 3844 3-10-19 8:36 2039.4
1 3867 3-10-19 8:38 1861.2
1 3985 3-10-19 8:40 1729.2
1 4009 3-10-19 8:46 1883.2
1 4014 3-10-19 8:36 1920.6
1 4044 3-10-19 8:41 1689.6
1 4058 3-10-19 8:39 1764.4
1 4192 3-10-19 8:43 1775.4
1 4344 3-10-19 8:43 1449.8
1 4354 3-10-19 8:49 1555.4
1 4356 3-10-19 8:31 1091.2
1 4359 3-10-19 8:43 0.0
1 4361 3-10-19 8:44 1689.6
1 4365 3-10-19 8:43 1513.6
2 3347 3-10-19 8:34 1867.8
2 3860 3-10-19 8:37 1788.6
2 3866 3-10-19 8:33 1980.0
2 4004 3-10-19 8:24 1634.6
2 4020 3-10-19 8:29 1612.6
2 4086 3-10-19 8:35 1553.2
2 4139 3-10-19 8:22 1883.2
2 4145 3-10-19 8:27 1177.0
2 4158 3-10-19 8:33 0.0
2 4186 3-10-19 8:29 1586.2
2 4193 3-10-19 8:28 1746.8
2 4202 3-10-19 8:28 1870.0
2 4215 3-10-19 8:31 1104.4
2 4348 3-10-19 8:33 1628.0
2 4369 3-10-19 8:32 1392.6
2 4374 3-10-19 8:33 1394.8
OUTPUT:
# Import Libraries
import os
import csv
import re
import pandas as pd
from collections import Counter
from dateutil.parser import parse
import calendar
# Set Variables
path = "C:\Logs"
printableChars = list('0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;?@[\\]^_`{|}~ \t\n\r\x0b\x0c')
skipSeperator = list('0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ:.!"#$%&\'()*+@[]\n\r\x0b\x0c')
masterFields=pd.DataFrame(columns=['Target Name','Source Name','Column Position','Column Width','DataType','Size','Format', 'KEY','Default','Comments','Unique', 'ImpactsGrain', 'HasNULLs'])
consolidate= True
debug=False
# Identify if any column is duplicate. It needs to be removed from further analysis
def getDuplicateColumns(df):
'''
Get a list of duplicate columns.
It will iterate over all the columns in dataframe and find the columns whose contents are duplicate.
:param df: Dataframe object
:return: List of columns whose contents are duplicates.
'''
duplicateColumnNames = set()
# Iterate over all the columns in dataframe
for x in range(df.shape[1]):
# Select column at xth index.
col = df.iloc[:, x]
# Iterate over all the columns in DataFrame from (x+1)th index till end
for y in range(x + 1, df.shape[1]):
# Select column at yth index.
otherCol = df.iloc[:, y]
# Check if two columns at x 7 y index are equal
if col.equals(otherCol):
duplicateColumnNames.add(df.columns.values[y])
return list(duplicateColumnNames)
# Identify how column values are separated within a row.
def identifySeparator(name):
if debug: print( "Currently analyzing file : " + name)
currentFile = open(path +"\\" +name, "r")
# Create a list of charecters by Line
characterFrequencies =[]
for line in currentFile:
characters = list(line)
if characters[0] == '\n':
continue
characterFrequencies.append(Counter(characters))
if debug: print(pd.DataFrame(characterFrequencies).head())
df = pd.DataFrame(characterFrequencies)
df = df.drop(columns=skipSeperator, errors='ignore') # Remove characters that are generally not used as sperators.
if debug: print("Potential Seperators")
# METHOD 1: Try to identify seperator with the understanding it should be present in every row.
dfDense = df.dropna(axis='columns')
if debug: print(dfDense.head())
Candidates = dfDense.columns
if debug: print("Number of characters present in every row : " + str(len(dfDense.columns)))
if len(dfDense.columns) == 1:
Separator = str(dfDense.columns[0])
if debug: print("Separator identified as : " + Separator + ' using METHOD 1')
else:
Separator = '-1'
if debug: print('Unable to identify seperator using METHOD 1 as 0 or multiple exist!!')
# METHOD 2: Least Variance: The count of the seperator should be more or less same across rows.
if debug: print('% of rows missing the charecter')
if debug: print(df.isna().sum()/ df.isna().count())
cleanupCandidates=df.dropna(axis='columns', thresh = (df.shape[0] + 1)*.8).fillna(-1)
if debug: print('Dropping characters not present in 80% of the columns')
if debug: print(cleanupCandidates.head())
lowestVariance = 0
spaceDetectedFlag = False
Separator2 = ''
for character in cleanupCandidates.columns:
if debug: print('**********' + character +' **********')
x = cleanupCandidates.loc[:,character].var()
if debug: print('Calculated variance : ' + str(x) )
if character == ' ':
spaceDetectedFlag = True
if debug: print('Potential position based file...')
continue
if lowestVariance >= x:
lowestVariance =x
Separator2 = character
if debug: print("Separator identified as : " + Separator2 + ' using METHOD 2')
if Separator == Separator2:
commonSep= Separator
else:
commonSep = list(set(Candidates).intersection(cleanupCandidates.columns))
if debug: print('Both methods identify '+ str(commonSep) + 'as one of the separator candidates.')
maxMode = 0
modeTable = cleanupCandidates.mode()
if len(commonSep) != 1:
print ('Multiple Common Seperator!! Use Max MODE', cleanupCandidates.columns)
if debug: print(cleanupCandidates.mode())
for column in modeTable.columns:
x = modeTable.loc[0,column]
print (column,'\'s Mode: ', x)
if x > maxMode:
commonSep = column
maxMode = x
if debug: print('Resolved ambiguity by Max Mode Method to: ', commonSep)
# Identify if header rows need to be skipped
firstRow = cleanupCandidates[commonSep].idxmax()
if debug: print('The Header is expected to be in row: ',firstRow)
return commonSep[0], firstRow
#candidates = []
#candidates.append(Counter(characters))
def identifyFixedWidth(name):
numberLines = 0
totalCharecters = 0
maxLength = 0
for line in open(path +"\\" +name, "r"):
numberLines +=1
totalCharecters += len(line)
if len(line) <= 2: numberLines -=1
if maxLength < len(line): maxLength = len(line)
avgChars =totalCharecters/numberLines
if debug: print('Sample file has '+ str(numberLines) + ' lines. There are an average '+ str(avgChars) +' charecters pe line.')
counter = 0
for line in open(path +"\\" +name, "r"):
if debug: print(str(counter) + ' has ' + str(len(line)) + ' chars')
if len(line)<= avgChars*.9 or len(line)>=avgChars*1.1:
if debug: print('Line '+ str(counter) +': Maybe part of header and needs to be skipped')
counter+=1
else:
if debug: print('Header found at column :', counter)
break
# Figure out Column Start and stop positions
rowCounter = -1
colPos = []
for line in open(path +"\\" +name, "r"):
rowCounter+=1
if rowCounter < counter: continue
blanks=[m.start() for m in re.finditer(' ', line)]
if len(blanks)>2:
colPos.append([m.start() for m in re.finditer(' ', line)])
if rowCounter<=5:
if debug: print(colPos[-1])
# Intersection
Common = list(set.intersection(*map(set, colPos)))
if debug: print('Potential field separator positions: ',Common)
# Remove sequential values
newCommon = []
for x in Common:
if ((x-1) not in Common):newCommon.append(x)
newCommon.append(maxLength)
if debug: print('Field separator positions identified as :',newCommon)
#Calculate Width
width=[]
range = len(newCommon)
i=0
for x in newCommon[0:range-1]:
width.append(newCommon[i+1]-newCommon[i])
i+=1
if debug: print('Column Lengths:', width)
return counter,width
# Parse File and collect Field Information
def identifyFields(name,separator, HeaderPos, width):
if debug: print( "Currently analyzing column structure for file : " + name)
currentFile = path +"\\" +name
if separator !='FWF':
df = pd.read_csv(currentFile, sep=separator, parse_dates=False, skiprows=HeaderPos)
else:
df = pd.read_fwf(currentFile, parse_dates=False, header=HeaderPos, widths=width)
if debug: print('Opening File as Fixed Width')
dupCols = getDuplicateColumns(df)
df = df.drop(columns=dupCols)
fieldStructure = pd.DataFrame(index=[df.columns], columns=['Target Name','Source Name','Column Position','Column Width','DataType','Size','Format', 'KEY','Default','Comments','Unique', 'ImpactsGrain', 'HasNULLs'])
totalRowCount = df.shape[0]
totalDupRows = df[df.duplicated()].shape[0]
if totalDupRows > 0:
print('!!!!!!!!!WARNING!!!!!!!! This file contains ' + str(totalDupRows) + ' duplicate rows!')
if len(dupCols)> 0:
fieldStructure.loc['Duplicate','Target Name'] = 'DUPLICATE Fields:' + '-'.join(dupCols)
print('!!!!!!!!!WARNING!!!!!!!! The following columns were identified as a duplicate column and will be removed from the final mapping')
if debug: print(dupCols)
if debug: print('Columns in the Dataset',df.columns)
if debug: print(fieldStructure)
counter = 1
for fieldName in df.columns:
print('Processing Field: ' + fieldName)
fieldStructure.loc[fieldName,'Source Name'] = fieldName
fieldStructure.loc[fieldName,'Target Name'] = fieldName
fieldStructure.loc[fieldName,'Column Position'] = counter
if separator =='FWF':fieldStructure.loc[fieldName,'Column Width'] = width[counter-1]
counter +=1
fieldStructure.loc[fieldName,'DataType'] = str(df[fieldName].dtypes).replace('64', '',1)
if str(df[fieldName].dtypes).replace('64', '',1) == 'float':
if df[fieldName].fillna(1).apply(float.is_integer).all():
fieldStructure.loc[fieldName,'DataType'] = 'int'
else:
fieldStructure.loc[fieldName,'DataType'] = 'float'
format = ''
dateFlg = True
if df[fieldName].isnull().all(): fieldStructure.loc[fieldName,'DataType'] = 'Unknown'
if df[fieldName].dtypes == 'object':
fieldValue = str(df.loc[df.loc[:,fieldName].first_valid_index(),fieldName])
if debug: print('First non NaN Index & Value: ',str(df.loc[:,fieldName].first_valid_index()), str(fieldValue))
try:
dateTime = parse(fieldValue,fuzzy=False)
except ValueError:
dateFlg = False
if dateFlg == True:
shortMonth = False
shortDate = False
if debug: print('Input Date:', fieldValue)
if debug: print('Interpreted Date',dateTime)
yrSt = fieldValue.find(str(dateTime.year))
if debug: print('Year Start Position: ',yrSt)
format = fieldValue.replace(str(dateTime.year), 'yyyy',1)
if yrSt == -1:
format = fieldValue.replace(str(dateTime.year)[2:], 'yy',1)
if debug: print('Year format: ',format)
noMonth=False
monSt = format.find(str(dateTime.month).zfill(2))
if debug: print('2 Digit Month Position:',monSt)
format = format.replace(str(dateTime.month).zfill(2), 'mm',1)
if monSt == -1:
monSt1 = format.find(str(dateTime.month))
if monSt1 != -1:
shortMonth = True
format = format.replace(str(dateTime.month).zfill(1), 'mm',1)
else:
noMonth = True
#Check if Month Name or Abbreviations are used
if noMonth:
abbr=map(str.upper,calendar.month_abbr[1:])
fmnth=map(str.upper,calendar.month_name[1:])
if debug: print(abbr)
for mon in abbr:
if(mon in format.upper()):
if debug: print('Month Abbr used in this date format', mon)
noMonth = False
format = format.replace(mon, 'MMM',1)
for mon in fmnth:
if(mon in format.upper()):
if debug: print('Month Name used in this date format', mon)
noMonth = False
format = format.replace(mon, 'MMMM',1)
if debug: print('Month Format: ',format)
daySt = format.find(str(dateTime.day).zfill(2))
if debug: print('2 Digit Day Position: ',daySt)
format = format.replace(str(dateTime.day).zfill(2), 'dd',1)
if daySt == -1:
daySt1 = format.find(str(dateTime.day))
if daySt1 != -1 and not noMonth:
shortDate = True
format = format.replace(str(dateTime.day), 'dd',1)
if debug: print('Day format: ',format)
hhSt = format.find(str(dateTime.hour).zfill(2))
if debug: print('2 digit Hour Position: ',hhSt)
format = format.replace(str(dateTime.hour).zfill(2), 'HH',1)
if debug: print('2 digit Hour Format: ',format)
if hhSt == -1:
hhSt = format.find(str(dateTime.hour).zfill(2))
if debug: print('24 Hour Format Position: ',hhSt)
format = format.replace(str(dateTime.hour).zfill(2), 'HH',1)
if debug: print('24 Hour Format: ',format)
if hhSt == -1:
hhSt = format.find(str(dateTime.hour))
if debug: print('1 digit Hour Position: ',hhSt)
format = format.replace(str(dateTime.hour), 'H',1)
mnSt = format.find(str(dateTime.minute).zfill(2))
if debug: print('Mins Position:',mnSt)
format = format.replace(str(dateTime.minute).zfill(2), 'MM',1)
if debug: print('Mins Format: ',format)
secSt = format.find(str(dateTime.second).zfill(2))
if debug: print('Seconds Position',secSt)
format = format.replace(str(dateTime.second).zfill(2), 'SS',1)
if debug: print('Seconds Format',format)
if shortMonth or shortDate:
format = format + ';' + format.replace(str('mm'), 'm',1).replace(str('dd'), 'd',1)
if debug: print('Date Format Identified as :', format)
fieldStructure.loc[fieldName,'DataType'] = 'Timestamp'
else:
fieldStructure.loc[fieldName,'DataType'] = 'String'
if df[fieldName].isnull().all():
fieldStructure.loc[fieldName,'Size'] = 0
else:
fieldStructure.loc[fieldName,'Size'] = df[fieldName].map(str).apply(len).max()
fieldStructure.loc[fieldName,'Format'] = format
fieldStructure.loc[fieldName,'Unique'] = df[fieldName].is_unique
dftmp = df.drop(columns=fieldName)
# if debug: print(dftmp.head())
dupRows = dftmp[dftmp.duplicated()].shape[0]
if dupRows > totalDupRows:
grainFlg = True
else:
grainFlg = False
fieldStructure.loc[fieldName,'ImpactsGrain'] = grainFlg # NEEDS MORE ANALYSIS
if df[fieldName].isna().sum() > 0:
fieldStructure.loc[fieldName,'HasNULLs'] = True
else:
fieldStructure.loc[fieldName,'HasNULLs'] = False
if debug: print(df.columns)
if debug: print(fieldStructure)
return fieldStructure
# Merge all File Mappings to master mapping file.
def addToMaster(fieldStructure):
if debug: print('Consolidating Multiple Mappings.....')
#if debug: print(fieldStructure.loc[:,'Target Name'])
#masterFields
#masterFields.loc[:,'Format']='XXX'
masterFields['Format'] = masterFields.Format.apply(lambda x: x if not (pd.isnull(x) or x=='') else 'XXX')
if debug: print(masterFields['Format'])
for index, row in fieldStructure.iterrows():
#if debug: print(row)
masterFields.loc[row['Target Name'],'Target Name']=row['Target Name']
masterFields.loc[row['Target Name'],'Source Name']=row['Source Name']
if pd.isnull(masterFields.loc[row['Target Name'],'Column Position']):
masterFields.loc[row['Target Name'],'Column Position']=row['Column Position']
else:
if masterFields.loc[row['Target Name'],'Column Position']!=row['Column Position']:
if debug: print(bcolors.WARNING + "WARNING: Column positions vary by file."+ bcolors.ENDC)
if pd.isnull(masterFields.loc[row['Target Name'],'Column Width']):
masterFields.loc[row['Target Name'],'Column Width']=row['Column Width']
else:
if masterFields.loc[row['Target Name'],'Column Width']<row['Column Width']:
if debug: print('!!!!!!!!!WARNING!!!!!!!! Column Widths vary by file.Merge may not be accurate')
masterFields.loc[row['Target Name'],'Column Width']=row['Column Width']
if pd.isnull(masterFields.loc[row['Target Name'],'DataType']):
masterFields.loc[row['Target Name'],'DataType']=row['DataType']
else:
if masterFields.loc[row['Target Name'],'DataType']!=row['DataType']:
if row['DataType']== 'float': masterFields.loc[row['Target Name'],'DataType'] = float
if row['DataType']=='Timestamp': masterFields.loc[row['Target Name'],'DataType'] = Timestamp
if pd.isnull(masterFields.loc[row['Target Name'],'Size']):
masterFields.loc[row['Target Name'],'Size']=row['Size']
else:
if masterFields.loc[row['Target Name'],'Size']<row['Size']:
masterFields.loc[row['Target Name'],'Size']=row['Size']
if pd.isnull(masterFields.loc[row['Target Name'],'Format']):masterFields.loc[row['Target Name'],'Format']='XXX'
if not(pd.isnull(row['Format']) or row['Format']==''):
if debug: print('Checking if ',row['Format'], ' not in ', masterFields.loc[row['Target Name'],'Format'])
if debug: print('Size of Format value is:', str(len(row['Format'])))
if debug: print('Check to see if the value is NULL: ', pd.isnull(row['Format']))
if row['Format'] not in masterFields.loc[row['Target Name'],'Format']:
masterFields.loc[row['Target Name'],'Format'] += row['Format']
masterFields.loc[row['Target Name'],'Format'] += ';'
if pd.isnull(masterFields.loc[row['Target Name'],'Unique']):
masterFields.loc[row['Target Name'],'Unique'] = row['Unique']
else:
if not(row['Unique']): masterFields.loc[row['Target Name'],'Unique'] = False
if pd.isnull(masterFields.loc[row['Target Name'],'ImpactsGrain']):
masterFields.loc[row['Target Name'],'ImpactsGrain'] = row['ImpactsGrain']
else:
if row['ImpactsGrain']: masterFields.loc[row['Target Name'],'ImpactsGrain'] = True
if pd.isnull(masterFields.loc[row['Target Name'],'HasNULLs']):
masterFields.loc[row['Target Name'],'HasNULLs'] = row['HasNULLs']
else:
if row['HasNULLs']: masterFields.loc[row['Target Name'],'HasNULLs'] = True
if debug: print(masterFields)
def printMapping(fileNameParts,separator,HeaderPos,fieldStructure):
fileNameParts[len(fileNameParts) -1] = 'map'
name='.'.join(fileNameParts)
name+='.csv'
if debug: print(name)
currentFile = open(path +"\\" +name, "w+")
currentFile.write('Source file directory,' + path + '\n')
currentFile.write('Source file pattern,' + 'TBD' + '\n')
currentFile.write('Target table name,' + '[ENTER TABLENAME]' + '\n')
if separator != ' ':
currentFile.write('Field Seperator,"' + separator + '"\n')
else:
currentFile.write('Field Seperator,"' + 'FWF' + '"\n')
currentFile.write('Skip Rows,' + str(HeaderPos) + '\n' + '\n')
currentFile.close()
fieldStructure.to_csv(path +"\\" +name, mode='a', index= False, header= True )
# Read all files in directory
files = os.listdir(path)
masterSep='INIT'
textFormats = ["txt","csv","log"]
#Print all files in directory
for name in files:
print('>>>>>> CURRENTLY PROCESSING FILE : ',name)
fileNameParts = str.split(name, ".")
if debug: print(fileNameParts)
width=[0]
if '.map.' in name:
print('Skip current file (Output File): ',name)
continue
if fileNameParts[len(fileNameParts) -1] in textFormats:
if debug: print("This is a text file.")
separator, HeaderPos = identifySeparator(name)
if separator != '-1':
print('Seperator successfully identified as: ' + separator)
else:
print('Unable to identify Separator. This file will be skipped!')
continue
if separator == ' ':
print('This file may be a fixed width file')
HeaderPos, width=identifyFixedWidth(name)
if debug: print('Width Array ',width, len(width))
if len(width)<=1:
print('This file may be space seperated however it is not a fixed width file.')
continue
else:
separator = 'FWF'
fieldStructure = identifyFields(name, separator, HeaderPos, width)
if masterSep !='INIT':
if (masterSep != separator) and consolidate:
print('These files have different seperators so results will not be consolidated')
consolidate=False
else:
masterSep = separator
print('Field Structure identified successfully')
# Print Mapping Sheet
# if debug: print(fieldStructure)
if consolidate: addToMaster(fieldStructure)
printMapping(fileNameParts,separator,HeaderPos,fieldStructure)
else:
print('Skip current file (Unknown Format) : ',name)
#Prepare to print consolidated results
if consolidate:
fileNameParts[0] = 'FinalMappingSheet'
masterFields['Format'] = [x[3:] for x in masterFields['Format']]
printMapping(fileNameParts,separator,HeaderPos,masterFields)
