0

I am trying to validate process a csv using Apache-NiFi. I use ExecuteGroovyScript to process csv and get data.

My original csv has a problem, some records look like this :

id,name,age,bd,email,address
1,sachith,29,9,[email protected],{"No": "1","Lane":"Lane-1"}
2,nalaka,29,17,[email protected],{"No": "1","Lane":
"Lane-1"}

here 2nd record is invalid, I want to remove just this record and process rest.

import groovy.json.*

def ff=session.get()
if(!ff)return

def parser = new JsonSlurper().setType(JsonParserType.LAX)

ff.write{streamIn,streamOut->
    streamIn.withReader('UTF-8'){r->      //convert in stream to reader
        streamOut.withWriter('UTF-8'){w-> //convert out stream to writer
            //go line by line
            r.eachLine{line, lineNum->
                if(lineNum==1){
                    w<<line<<'id,name,age,bd,email,address'<<'\n'        //for the first line just add some headers
                }else{
                    def row=line.split(',')          //split line by coma
                    def json=row[5..-1].join(',')    //join back to string starting from 3rd element
                    json = parser.parseText(json)
                    w<<"${json.id},${json.name},${json.age},${json.bd},${json.email},${json.address}"<<'\n'
                }
            }
        }
    }
}
REL_SUCCESS<<ff

This was taken from my previous question.

Basically I want to just ignore the record and process with other values :

I have referred : groovy.json.JsonException: expecting

Groovy: validate JSON string

But I dont understand how to integrate this to Apache-NiFi flow.

2
  • 2
    Why not fix upstream? Use a proper CSV writer instead of banging strings together? Commented Feb 12, 2020 at 12:05
  • @cfrick you mean in Apache-NiFi? Commented Feb 12, 2020 at 13:20

1 Answer 1

1

I agree that better to fix the source

however if it's not possible then you could try to match if line is complete

import groovy.json.*

def parser = new JsonSlurper().setType(JsonParserType.LAX) //LAX to accept strings without double-quotes

def w = System.out
def buf = new StringBuilder() //buffer to collect lines if they are not complete
new StringReader('''id,name,age,bd,email,address
1,sachith,29,9,[email protected],{"No": "1","Lane":"Lane-1"}
2,nalaka,29,17,[email protected],{"No": "1"
,"Lane":"Lane-1"}''').withReader{r->
    r.eachLine{line, lineNum->
        if(lineNum==1){
            w<<line<<'id,name,age,bd,email,address'<<'\n'
        }else{
            buf<<(buf?'\n':'')<<line //append line to previous incomplete line(s)
            if(buf=~/(?s)^\d.*\}$/){
                //normal line: starts with number and ends with }
                def row=buf.toString().split(',')   //split line by coma
                def json=row[5..-1].join(',')       //join back to string starting from 3rd element
                json = parser.parseText(json)
                w<<"${json.No},${json.Lane}"<<'\n'
                buf.setLength(0) //reset buffer
            }
        }
    }
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.