1

I am scraping a page (e.g., facebook page ) using python script and want to pass on each post to be written to a file (something like gettwitter process). ExecuteScript is first processor in my nifi data flow. I managed to create a flow file using session.create() and no issues in that.

However, I am confused about how to put the data I read from facebook into outputstreamCallback. Most of the examples I have seen have used java overriding but I have to use Python and must admit I am little new to that.

I have found so many examples on reading the flow file but nothing much on. Below is something in Java which I want to do in Python.

FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
    @Override
    public void process(final OutputStream out) throws IOException {
        out.write(tweet.getBytes(StandardCharsets.UTF_8));

If there is any other way, please guide. Thanks.


After adopting the changes suggested by @James I wrote a snippet which is given below but not flow file transferred. No compilation error though.

import urllib2
import json
import datetime
import csv
import time
import sys
import traceback
from org.apache.nifi.processor.io import OutputStreamCallback
from org.python.core.util import StringUtil

class WriteContentCallback(OutputStreamCallback):
    def __init__(self, content):
        self.content_text = content

    def process(self, outputStream):
        try:
            outputStream.write(StringUtil.toBytes(self.content_text))
        except:
            traceback.print_exc(file=sys.stdout)
            raise

#app_id = "<FILL IN>"
#app_secret = "<FILL IN>" # DO NOT SHARE WITH ANYONE!
page_id = "dsssssss"
#page_id = raw_input("Please Paste Public Page Name:")

#access_token = app_id + "|" + app_secret

access_token = "sdfsdfsf%sdfsdf"

#access_token = raw_input("Please Paste Your Access Token:")


def scrapeFacebookPageFeedStatus(page_id, access_token):
        flowFile = session.create()
        flowFile = session.write(flowFile, WriteContentCallback("Hello there this is my data"))
        flowFile = session.write()
        session.transfer(flowFile, REL_SUCCESS)

        has_next_page = False
        num_processed = 0   # keep a count on how many we've processed
        scrape_starttime = datetime.datetime.now()


        while has_next_page:
            print "Scraping %s Facebook Page: %s\n" % (page_id, scrape_starttime)
            has_next_page = False

        print "\nDone!\n%s Statuses Processed in %s" % \
                (num_processed, datetime.datetime.now() - scrape_starttime)


if __name__ == '__main__':
    scrapeFacebookPageFeedStatus(page_id, access_token)
    flowFile = session.create()
    flowFile = session.write(flowFile, WriteContentCallback("and your data"))
    session.transfer(flowFile, REL_SUCCESS)

Below is the output from nifi-app.log

> [root@ambari logs]# tail -100 nifi-app.log 2017-04-03 14:08:07,989
> INFO [StandardProcessScheduler Thread-6]
> o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled
> ExecuteScript[id=a62f4b97-8fd7-15cd-95b9-505e1b960805] to run with 1
> threads 2017-04-03 14:08:08,938 INFO [Flow Service Tasks Thread-2]
> o.a.nifi.controller.StandardFlowService Saved flow controller
> org.apache.nifi.controller.FlowController@44ec5960 // Another save
> pending = false 2017-04-03 14:08:13,789 INFO [StandardProcessScheduler
> Thread-3] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled
> PutFile[id=a62f4b8e-8fd7-15cd-7517-56593deabf55] to run with 1 threads
> 2017-04-03 14:08:14,296 INFO [Flow Service Tasks Thread-2]
> o.a.nifi.controller.StandardFlowService Saved flow controller
> org.apache.nifi.controller.FlowController@44ec5960 // Another save
> pending = false

1 Answer 1

2

Here is a simple implementation of a NiFi OutputStreamCallback in Python ExecuteScript:

import sys
import traceback
from org.apache.nifi.processor.io import OutputStreamCallback
from org.python.core.util import StringUtil

class WriteContentCallback(OutputStreamCallback):
    def __init__(self, content):
        self.content_text = content

    def process(self, outputStream):
        try:
            outputStream.write(StringUtil.toBytes(self.content_text))
        except:
            traceback.print_exc(file=sys.stdout)
            raise

# Create new FlowFile with content
flowFile = session.create()
flowFile = session.write(flowFile, WriteContentCallback("This is the flowfile content"))
session.transfer(flowFile, REL_SUCCESS)
Sign up to request clarification or add additional context in comments.

1 Comment

the snippet you shared works well if executed standalone. However, when I try to enrich it with python code, I don't get the flow file transferred to next processor (0 bytes transferred) although there is no compilation error.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.