0

I am trying to write a dataflow pipeline to migrate data from google Datastore to BigQuery using Python. After some search I figured I need to do three steps:

    1. ReadFromDatastore
    2. Convert to Python dicts or Tablerows
    3. WriteToBigQuery

Now, first and last step is simple as they are the functions themselves. But I am having hard time in finding a good way to do the second step.

I wrote the output of ReadFromDatastore to a text file and json looks like as below:

key {
  partition_id {
    project_id: "ProjectID"
  }
  path {
    kind: "KindName"
    id:9999
  }
}
properties {
  key: "property1"
  value {
    string_value: "property_value"
  }
}
properties {
  key: "property2"
  value {
    string_value: ""
  }
}
properties {
  key: "property3"
  value {
    boolean_value: false
  }
}
properties {
  key: "created"
  value {
    timestamp_value {
      seconds: 4444
      nanos: 2222
    }
  }
}
properties {
  key: "created_by"
  value {
    string_value: "property_value"
  }
}
properties {
  key: "date_created"
  value {
    timestamp_value {
      seconds: 4444
    }
  }
}
properties {
  key: "property4"
  value {
    string_value: "property_value"
  }
}
properties {
  key: "property5"
  value {
    array_value {
      values {
        meaning: 00
        string_value: "link"
        exclude_from_indexes: true
      }
    }
  }
}
properties {
  key: "property6"
  value {
    null_value: NULL_VALUE
  }
}
properties {
  key: "property7"
  value {
    string_value: "property_value"
  }
}
properties {
  key: "property8"
  value {
    string_value: ""
  }
}
properties {
  key: "property9"
  value {
    timestamp_value {
      seconds: 3333
      nanos: 3333
    }
  }
}
properties {
  key: "property10"
  value {
    meaning: 00
    string_value: ""
    exclude_from_indexes: true
  }
}
properties {
  key: "property11"
  value {
    boolean_value: false
  }
}
properties {
  key: "property12"
  value {
    array_value {
      values {
        key_value {
          partition_id {
            project_id: "project_id"
          }
          path {
            kind: "Another_kind_name"
            id: 4444
          }
        }
      }
    }
  }
}
properties {
  key: "property13"
  value {
    string_value: "property_value"
  }
}
properties {
  key: "version"
  value {
    integer_value: 4444
  }
}

key {
  partition_id {
    project_id: "ProjectID"
  }
  path {
    kind: "KindName"
    id: 9999
  }
}
.
.
.
.next_entity/row

Do I have to write a custom function to convert json to python dicts to be able to write to BigQuery or are there any functions/libraries from google datastore or apache that I can use?

I found an article describing what I am trying to do but code shown is in Java.

1
  • I do not think this is JSON. This may be a protobuf-style object. Can you check (by logging the type of the objects)? - if so, you can convert the protobufs into JSON by following these instructions: stackoverflow.com/questions/19734617/protobuf-to-json-in-python Commented Jun 3, 2019 at 22:26

1 Answer 1

1

The output of the ReadFromDatastore transform is of Entity-typed protocol buffers.

To convert protobuff to JSON, you can check this question: Protobuf to json in python

You would do:

p | ReadFromDatastore(...) | beam.Map(my_proto_to_json_fn) | beam.WriteToBigQuery(...)
Sign up to request clarification or add additional context in comments.

2 Comments

I am not able to run "my_proto_to_json_fn" but logged the type of element, it's coming out to be 'unicode'. My code is: objects = (p | 'Read from Text' >> beam.io.ReadFromText(input_file)) objects | 'Convert to Dict' >> beam.Pardo(Protobuf2Dict()) objects | 'Debug print' >> beam.ParDo(PrintFn()) And PrintFn is: class PrintFn(beam.DoFn): def process(self, element): print (type(element)) print (element) return None
okay... so now you can take those unicode elements and make them json maybe?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.