0

I have a Pub/Sub script publishing male first names as follow:

from google.cloud import pubsub_v1
import names

project_id = "Your-Project-Name"
topic_name = "Your-Topic-Name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

while True:
    data = names.get_first_name(gender='male') #u"Message number {}".format(n)
    data = data.encode("utf-8")
    publisher.publish(topic_path, data=data)

Then I have a Dataflow that reads from the subscription attached to the topic then count each element of the pipeline as follow:

import logging,re,os
import apache_beam as beam
from apache_beam.options.pipeline_options import  PipelineOptions

root = logging.getLogger()
root.setLevel(logging.INFO)

p = beam.Pipeline(options=PipelineOptions())
x = (
 p
 | beam.io.ReadFromPubSub(topic=None, subscription="projects/YOUR-PROJECT-NAME/subscriptions/YOUR-SUBSCRIPTION-NAME").with_output_types(bytes)
 | 'Decode_UTF-8' >> beam.Map(lambda x: x.decode('utf-8'))
 | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
 | 'CountingElem' >> beam.combiners.Count.PerElement()
 | 'FormatOutput' >> beam.MapTuple(lambda word, count: '%s: %s' % (word, count))
 | 'Printing2Log' >> beam.Map(lambda k: logging.info(k)))

result = p.run()
result.wait_until_finish()

The issue is: I don't get any output from the last 3 steps of pipeline while I could see data flowing from the first 3 steps of the pipeline - which means nothing is logged.

I expected the output like this:

Peter: 2
Glen: 1
Alex: 1
Ryan: 2

I thank you already for helping me

2
  • Which runner are you using to run the dataflow job ? Commented Mar 27, 2020 at 15:00
  • I'm using the DataflowRunner Commented Mar 27, 2020 at 17:25

1 Answer 1

1

Given that this is a streaming pipeline, you need to setup windowing/triggering appropriately for the pipeline to work. See following. https://beam.apache.org/documentation/programming-guide/#windowing

More specifically:

Caution: Beam’s default windowing behavior is to assign all elements of a PCollection to a single, global window and discard late data, even for unbounded PCollections. Before you use a grouping transform such as GroupByKey on an unbounded PCollection, you must do at least one of the following:

beam.combiners.Count.PerElement() contains a GroupByKey in it.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.