1

I am currently working on a NiFi flow that requires the implementation of custom processors to apply transformations on csv records.

I've noticed this behavior during some benchmarks I am performing: if only one thread is assigned to each custom processor everything works well. Assigning more threads to the custom processors results to a failed to process session due to java.lang.NullPointerException.

Since the error cannot be reproduced with a single thread, I am thinking more about some issues in handling the flowfiles, or some aspect of NiFi I am not aware of.

Processing is performed accessing to the flowfile attributes. The flowfile content is never read, and the output flowfile is returned after adding some attributes. The following is a snippet of the relevant parts of the processor code:

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

    //  Will hold all the processed attributes
    Map<String, String> processedAttributes = new HashMap<>();
    FlowFile flowfile = session.get();
    ...
    //  Adds the attributes to the flowfile
    flowfile = session.putAllAttributes(flowfile, processedAttributes);
    session.transfer(flowfile, PROCESSED);
    }

I am running NiFi 0.7 on a m4.4xlarge Amazon ec2 instance. Since I am seeking high performances (who doesn't) I am looking for a safe way to increase the number of threads. Any suggestion is really appreciated.

Thank you in advance.

1
  • 1
    Could you provide the associated stack trace and ensure the code referenced is provided? Commented Sep 21, 2016 at 15:11

1 Answer 1

3

It is possible (especially with multiple threads / concurrent tasks) that the session.get() method will return null for some threads. This happens if two or more threads are scheduled because there is a flow file available, then one thread gets the flow file (via session.get()) and the next thread will get null.

In your case you may not be reading or writing to the flow file, but other methods (like session.putAllAttributes()) call methods on the flow file. Many processors add a check to see if flowfile == null and will return (since it wants a flow file to operate on), perhaps that would fix your issue as well.

Sign up to request clarification or add additional context in comments.

1 Comment

Tried first separately on only one custom processor and it did not throw any NullPointerException. Changed the code on all of them, no exception are displayed. Out of curiosity, how does NiFi handle concurrency of threads in a queue? Thank you for your help

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.