0

I try to read from a flowfile and update a record value using default value in csv. To that I have used ExecuteScript processor with following python code in it.

import sys
import re
import traceback
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
from java.lang import Class
from java.io import BufferedReader
from java.io import InputStreamReader
from java.io import OutputStreamWriter

flowfile = session.get()
record = flowfile.getAttribute('record_type')

if record == '0':
    flowfile = session.putAttribute(flowfile,'record_type', 'NEW_USER')
    session.transfer(flowFile, REL_SUCCESS)
    session.commit()
elif record == '1':
    flowfile = session.putAttribute(flowfile,'record_type', 'OLD_USER')
    session.transfer(flowFile, REL_SUCCESS)
    session.commit()
else:
    flowfile = session.putAttribute(flowfile,'record_type', 'IGNORE')
    session.transfer(flowFile, REL_SUCCESS)
    session.commit()

writer.flush()
writer.close()
reader.close()

My csv looks like

id,record_type
1,0
2,1
3,2
4,0

Result should be :

id,record_type
1,NEW_USER
2,OLD_USER
3,IGNORE
4,NEW_USER

I get following error :

AttributeError : 'NoneType' object has no attribute 'getAttribute' in script at line number 13

It says record = flowfile.getAttribute('record_type') this is wrong..

I have no idea how to solve this as I am not good with python.

6
  • 1
    ExecuteScript processes the whole file (not by record). getAttribute returns attribute (like filename) and not the content. To change content use flowFile.write function. Search the inet for nifi python cookbook and look at the examples. Commented Nov 14, 2019 at 6:24
  • @daggett thanks for the suggestion. But still I dont understand how to get a value to compare. Commented Nov 14, 2019 at 6:59
  • If you are not good in python, maybe better to use record processing. Check the UpdateRecord processor. Commented Nov 14, 2019 at 7:44
  • @daggett Yes, I have used UpdateRecord processor, but having issue with replacing multiple values in one step as described in the question. Commented Nov 14, 2019 at 8:00
  • You have record-based if and I think it's possible to use UpdateRecord in your case. I could show how to do groovy script for your case.. (I'm bad in python also) Commented Nov 14, 2019 at 8:55

1 Answer 1

2

that,s not python, but according to comment from author could be groovy.

use ExecuteGroovyScript processor with following code:

def ff=session.get()
if(!ff)return

def map = [
    '0': 'NEW_USER',
    '1': 'OLD_USER',
]

ff.write{rawIn, rawOut->
    rawOut.withWriter("UTF-8"){w->
        rawIn.withReader("UTF-8"){r->
            int rowNum = 0
            //iterate lines from input stream and split each with coma
            r.splitEachLine( ',' ){row->
                if(rowNum>0){
                    //if not a header line then substitute value using map
                    row[1] = map[ row[1] ] ?: 'IGNORE'
                }
                //join and write row to output writer
                w << row.join(',') << '\n'
                rowNum++
            }
        }
    }
}

REL_SUCCESS << ff
Sign up to request clarification or add additional context in comments.

10 Comments

I am accepting this as the answer as it solved my issue.
This is sub question, if we want to compare string, what kind of modification do we need to do for this?
In this script everything strings (except rowNum) even it contains numbers. '0' is a string.
I have tried it with a column defined as use string from headers but its not working as expected as earlier sample.
You mean you want to assess column by name from header and not by number?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.