I am scraping a page (e.g., facebook page ) using python script and want to pass on each post to be written to a file (something like gettwitter process). ExecuteScript is first processor in my nifi data flow. I managed to create a flow file using session.create() and no issues in that.
However, I am confused about how to put the data I read from facebook into outputstreamCallback. Most of the examples I have seen have used java overriding but I have to use Python and must admit I am little new to that.
I have found so many examples on reading the flow file but nothing much on. Below is something in Java which I want to do in Python.
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(tweet.getBytes(StandardCharsets.UTF_8));
If there is any other way, please guide. Thanks.
After adopting the changes suggested by @James I wrote a snippet which is given below but not flow file transferred. No compilation error though.
import urllib2
import json
import datetime
import csv
import time
import sys
import traceback
from org.apache.nifi.processor.io import OutputStreamCallback
from org.python.core.util import StringUtil
class WriteContentCallback(OutputStreamCallback):
def __init__(self, content):
self.content_text = content
def process(self, outputStream):
try:
outputStream.write(StringUtil.toBytes(self.content_text))
except:
traceback.print_exc(file=sys.stdout)
raise
#app_id = "<FILL IN>"
#app_secret = "<FILL IN>" # DO NOT SHARE WITH ANYONE!
page_id = "dsssssss"
#page_id = raw_input("Please Paste Public Page Name:")
#access_token = app_id + "|" + app_secret
access_token = "sdfsdfsf%sdfsdf"
#access_token = raw_input("Please Paste Your Access Token:")
def scrapeFacebookPageFeedStatus(page_id, access_token):
flowFile = session.create()
flowFile = session.write(flowFile, WriteContentCallback("Hello there this is my data"))
flowFile = session.write()
session.transfer(flowFile, REL_SUCCESS)
has_next_page = False
num_processed = 0 # keep a count on how many we've processed
scrape_starttime = datetime.datetime.now()
while has_next_page:
print "Scraping %s Facebook Page: %s\n" % (page_id, scrape_starttime)
has_next_page = False
print "\nDone!\n%s Statuses Processed in %s" % \
(num_processed, datetime.datetime.now() - scrape_starttime)
if __name__ == '__main__':
scrapeFacebookPageFeedStatus(page_id, access_token)
flowFile = session.create()
flowFile = session.write(flowFile, WriteContentCallback("and your data"))
session.transfer(flowFile, REL_SUCCESS)
Below is the output from nifi-app.log
> [root@ambari logs]# tail -100 nifi-app.log 2017-04-03 14:08:07,989
> INFO [StandardProcessScheduler Thread-6]
> o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled
> ExecuteScript[id=a62f4b97-8fd7-15cd-95b9-505e1b960805] to run with 1
> threads 2017-04-03 14:08:08,938 INFO [Flow Service Tasks Thread-2]
> o.a.nifi.controller.StandardFlowService Saved flow controller
> org.apache.nifi.controller.FlowController@44ec5960 // Another save
> pending = false 2017-04-03 14:08:13,789 INFO [StandardProcessScheduler
> Thread-3] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled
> PutFile[id=a62f4b8e-8fd7-15cd-7517-56593deabf55] to run with 1 threads
> 2017-04-03 14:08:14,296 INFO [Flow Service Tasks Thread-2]
> o.a.nifi.controller.StandardFlowService Saved flow controller
> org.apache.nifi.controller.FlowController@44ec5960 // Another save
> pending = false