I am trying to download a encrypted file and a key from GCP bucket, then decrypt the file and load it back to the bucket. So I built this DataFlow pipeline which looks as follow:
class downloadFile(beam.DoFn):
def __init__(self):
self.bucket_name = 'bucket_name'
self.source_blob_name = 'test.csv.gpg'
self.destination_file_name = "/tmp/test.csv.gpg"
def process(self, element):
from google.cloud import storage
storage_client = storage.Client()
bucket = storage_client.bucket(self.bucket_name)
blob = bucket.blob(self.source_blob_name)
blob.download_to_filename(self.destination_file_name)
Here I used self.destination_file_name = "/tmp/test.csv.gpg" because I learnt from others that DataFlow job will be run on an Linux VM, so download the file to this /tmp/ path is totally safe.
class downloadKey(beam.DoFn):
def __init__(self):
self.bucket_name = 'bucket_name'
self.source_blob_name = 'privateKey.txt'
self.destination_file_name = "/tmp/privateKey.txt"
def process(self, element):
from google.cloud import storage
storage_client = storage.Client()
bucket = storage_client.bucket(self.bucket_name)
blob = bucket.blob(self.source_blob_name)
blob.download_to_filename(self.destination_file_name)
Basically, two download DoFns have the same structure. after file and key are downloaded, the key will be imported to the DataFlow running VM:
class importKey(beam.DoFn):
def process(self, element):
import subprocess
subprocess.call(['gpg', '--import','/tmp/privateKey.txt'])
Then the decryption DoFn:
class decryption(beam.DoFn):
def process(self, element, *args, **kwargs):
import subprocess
subprocess.call(['gpg', '-d', '/tmp/test.csv.gpg > test.csv'])
# load file back to bucket
bucket_name = 'bucket_name'
source_file_name = '/tmp/test.csv'
destination_blob_name = "clearText.csv"
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
So this decryption DoFn will invoke a gpg command with subprocess to decrypt the file.
And finally the peipeline itself:
dummyMessage = {"projectID":"fakeProjectID",
"bucketID":"fakeBucketID"}
setp= (
p
| 'Create Sample'
>> beam.Create([dummyMessage["projectID"]])
|"testDecrypt" >> beam.ParDo(downloadLookupFile())
|"testDecrypt2" >> beam.ParDo(downloadKey())
|"testDecrypt3" >> beam.ParDo(importKey())
|"testDecrypt4" >> beam.ParDo(decryption())
)
Here I am just creating a dummy message to invoke the pipeline, later will be replaced with real message.
When I run the pipeline, it looks all good, I can see the job has been created in DataFlow, and it shows job status is successful. but in the bucket I cannot see the decrypted file.
I have added several print statements to the code to debug, seems in the downloadFile() and downloadKey() methods, process() is never reached, which means no file has ever been processed. Can anyone share some knowledge on how to access GCS bucket in DoFn? I am not sure which part of the code is worng, it all looks good to me.
Any help will be appreciated.
