1

I am trying to download a encrypted file and a key from GCP bucket, then decrypt the file and load it back to the bucket. So I built this DataFlow pipeline which looks as follow:

class downloadFile(beam.DoFn):
def __init__(self):
    self.bucket_name = 'bucket_name'
    self.source_blob_name = 'test.csv.gpg'
    self.destination_file_name = "/tmp/test.csv.gpg"

def process(self, element):
    from google.cloud import storage

    storage_client = storage.Client()
    bucket = storage_client.bucket(self.bucket_name)
    blob = bucket.blob(self.source_blob_name)
    blob.download_to_filename(self.destination_file_name)

Here I used self.destination_file_name = "/tmp/test.csv.gpg" because I learnt from others that DataFlow job will be run on an Linux VM, so download the file to this /tmp/ path is totally safe.

class downloadKey(beam.DoFn):
def __init__(self):
    self.bucket_name = 'bucket_name'
    self.source_blob_name = 'privateKey.txt'
    self.destination_file_name = "/tmp/privateKey.txt"


def process(self, element):
    from google.cloud import storage

    storage_client = storage.Client()
    bucket = storage_client.bucket(self.bucket_name)
    blob = bucket.blob(self.source_blob_name)
    blob.download_to_filename(self.destination_file_name)

Basically, two download DoFns have the same structure. after file and key are downloaded, the key will be imported to the DataFlow running VM:

class importKey(beam.DoFn):
def process(self, element):
    import subprocess
    subprocess.call(['gpg', '--import','/tmp/privateKey.txt'])

Then the decryption DoFn:

class decryption(beam.DoFn):
def process(self, element, *args, **kwargs):
    import subprocess
    subprocess.call(['gpg', '-d', '/tmp/test.csv.gpg > test.csv'])
    
    # load file back to bucket
    bucket_name = 'bucket_name'
    source_file_name = '/tmp/test.csv'
    destination_blob_name = "clearText.csv"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)

So this decryption DoFn will invoke a gpg command with subprocess to decrypt the file.

And finally the peipeline itself:

dummyMessage = {"projectID":"fakeProjectID",
            "bucketID":"fakeBucketID"}

setp= (
    p
    | 'Create Sample'
            >> beam.Create([dummyMessage["projectID"]])
    |"testDecrypt" >> beam.ParDo(downloadLookupFile())
    |"testDecrypt2" >> beam.ParDo(downloadKey())
    |"testDecrypt3" >> beam.ParDo(importKey())
    |"testDecrypt4" >> beam.ParDo(decryption())
   )

Here I am just creating a dummy message to invoke the pipeline, later will be replaced with real message.

When I run the pipeline, it looks all good, I can see the job has been created in DataFlow, and it shows job status is successful. but in the bucket I cannot see the decrypted file.

I have added several print statements to the code to debug, seems in the downloadFile() and downloadKey() methods, process() is never reached, which means no file has ever been processed. Can anyone share some knowledge on how to access GCS bucket in DoFn? I am not sure which part of the code is worng, it all looks good to me.

Any help will be appreciated.

1 Answer 1

0

Welcome Alex to stackoverflow.

  • First, about the logs (the print statements), if you're not seeing them, probably because you are looking at the wrong place. In fact, if you have placed them inside the process of the DoFns (like decryption class), you need to look inside the WORKER LOGS and not the JOB LOGS nor your terminal. In the screenshot below I show how you access the worker logs. The Job logs or driver logs are the logs showing the prints/ logs you add at the level of the pipeline creation (beam.Create ...) you can see them in your terminal if you had run the job from it.

  • Then, IMHO, dataflow is not the right processing platform for this kind of requirement. It is meant for distributed processing of chunks of big files or big data (lets say > 2GB) in parallel. Which means that behind the scene you have one part is processed on one worker node (GCE VM instance behind the scene) and another chunk of the file processed on another worker node. In you case, if you have more than 1 worker, you risk downloading the encrypted file in one node, the key in another node and the decryption in a third node. So using/tmp would be obsolete.

  • Finally a workaround solution, is to use for example a Cloud Function (CF), that will run in a single threaded fashion and which allows you to reuse the code you have inside the different process methods :

  1. the CF is triggered by an upload of the encrypted file to GCS bucket. Here some docs how to setup such trigger: https://cloud.google.com/functions/docs/calling/storage (there examples in python)
  2. your CF code will download the encryption key, decrypt and upload the decrypted file back to another GCS bucket. For cloud function you can set memory up to 8GB and use the /tmp which uses the memory behind the scene.

By the way, security speaking, I think it is not a good practice to store the encryption key in GCS, try to take a look at https://cloud.google.com/secret-manager enter image description here

Sign up to request clarification or add additional context in comments.

5 Comments

Thanks MBHA Phoenix, that was definitely a very useful guideline and clarification for a new GCP user like myself. Regarding your second point, I am just curious. Say if I force the number of worker to be just one by using '''num_workers=1'''. From my understanding, that implies all the work will be done in one node, and would that make more sense? Though it is not a smart way to do, but still doable?
Yes you can enforce the number of workers to one. And have you checked the logs ?
Thank you, yes, I have checked the logs. There is just one thing that still confuses me, as you mentioned running this kind of job on Dataflow could risk downloading the encrypted file in one node, the key in another node and the decryption in a third node. Does this mean each node will have its own storage place, so the decryption process will not be able to invoke, as encrypted file and key are stored in different places? Thanks again~
Yes I confirm you got the idea: each worker node is a GCE VM with its own disk (you can set the disk space for a worker using --disk_size_gb) so you have that risk if you have more the one worker.
Thank you for your answer. it was really helpful

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.