5

I am trying to use the new Bigquery Storage API to do streaming inserts from Golang. I understand based on this page that this API replaces the old streaming insert bigquery API.

However, none of the examples in the docs show how to actually insert rows. In order to create an AppendRowsRequest, I have arrived at the following:

&storagepb.AppendRowsRequest{
    WriteStream: resp.Name,
    Rows: &storagepb.AppendRowsRequest_ProtoRows{
        ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
            WriterSchema: nil, // protobuf schema??
            Rows: &storagepb.ProtoRows{
                SerializedRows: [][]byte{}, // serialized protocol buffer data??
            },
        },
    },
}

What data should I put into the SerializedRows field above?

The storagepb.ProtoRows struct above is documented here. Unfortunately all that is given is a link to the main overview page for protocol buffers.

Can anyone give me an example of using the new Bigquery Storage API to stream rows into bigquery from Golang?

2 Answers 2

6

With much help from the answers above I have come to a working example, which is available on github: https://github.com/alexflint/bigquery-storage-api-example

The main code is:

const (
    project = "myproject"
    dataset = "mydataset"
    table   = "mytable"
    trace   = "bigquery-writeclient-example" // identifies this client for bigquery debugging
)

// the data we will stream to bigquery
var rows = []*Row{
    {Name: "John Doe", Age: 104},
    {Name: "Jane Doe", Age: 69},
    {Name: "Adam Smith", Age: 33},
}

func main() {
    ctx := context.Background()

    // create the bigquery client
    client, err := storage.NewBigQueryWriteClient(ctx)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // create the write stream
    // a COMMITTED write stream inserts data immediately into bigquery
    resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
        Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
        WriteStream: &storagepb.WriteStream{
            Type: storagepb.WriteStream_COMMITTED,
        },
    })
    if err != nil {
        log.Fatal("CreateWriteStream: ", err)
    }

    // get the stream by calling AppendRows
    stream, err := client.AppendRows(ctx)
    if err != nil {
        log.Fatal("AppendRows: ", err)
    }

    // get the protobuf descriptor for our row type
    var row Row
    descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
    if err != nil {
        log.Fatal("NormalizeDescriptor: ", err)
    }

    // serialize the rows
    var opts proto.MarshalOptions
    var data [][]byte
    for _, row := range rows {
        buf, err := opts.Marshal(row)
        if err != nil {
            log.Fatal("protobuf.Marshal: ", err)
        }
        data = append(data, buf)
    }

    // send the rows to bigquery
    err = stream.Send(&storagepb.AppendRowsRequest{
        WriteStream: resp.Name,
        TraceId:     trace, // identifies this client
        Rows: &storagepb.AppendRowsRequest_ProtoRows{
            ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
                // protocol buffer schema
                WriterSchema: &storagepb.ProtoSchema{
                    ProtoDescriptor: descriptor,
                },
                // protocol buffer data
                Rows: &storagepb.ProtoRows{
                    SerializedRows: data, // serialized protocol buffer data
                },
            },
        },
    })
    if err != nil {
        log.Fatal("AppendRows.Send: ", err)
    }

    // get the response, which will tell us whether it worked
    _, err = stream.Recv()
    if err != nil {
        log.Fatal("AppendRows.Recv: ", err)
    }

    log.Println("done")
}

And the protocol buffer definition for the "Row" struct above is:

syntax = "proto3";

package tutorial;

option go_package = ".;main";

message Row {
    string Name = 1;
    int32 Age = 2;
}

You need to create a bigquery dataset and table first with a schema that corresponds to the protocol buffer. See the readme in the repository linked above for how to do that.

After running the code above, the data shows up in bigquery like this:

$ bq query 'select * from mydataset.mytable'
Waiting on bqjob_r1b39442e5474a885_0000017df21f629e_1 ... (0s) Current status: DONE   
+------------+-----+
|    name    | age |
+------------+-----+
| John Doe   | 104 |
| Jane Doe   |  69 |
| Adam Smith |  33 |
+------------+-----+

Thanks all for the help!

Sign up to request clarification or add additional context in comments.

3 Comments

I tried the code above (with storagepb.WriteStream_COMMITTED), I don't get any error, but the records are not available immediately in BQ. Does it take time for the data to be queryable?
@EyalP The data should definitely be available immediately -- that is one of the key goals of the streaming insert API. It's possible that things have changed in newer versions of the bigquery API (I was using v1beta2 when I wrote this). Perhaps try with the exact code and dependency versions in my repository here: github.com/alexflint/bigquery-storage-api-example. If that doesn't work then make double sure that you are selecting the most recent entries in your table when you query the table.
Maybe it's because my table is configured as multi regional. Anyhow, thanks for your response.
1

I found out some documentation [1][2] about writing streams to a table but I’m not really sure that this is what you’re looking for. Keep in mind that storage/apiv1beta2 is currently in beta state, so maybe this is not yet implemented or lacks documentation about it. If the documentation that I attached doesn’t help you we could open a public issue tracker to correctly document or implement the row streaming.

5 Comments

Yeah these are the examples I found in the code repo and the documentation that I found there too. Thanks for the pointers nevertheless.
Are you looking for something like this github.com/googleapis/python-bigquery-storage/blob/HEAD/samples/… but in Golang?
Yeah seems helpful. I guess no such example has been written yet in Golang. Python one is still helpful. Much appreciated.
Hi Alex! Sorry for the late response, I’ve just opened a public feature request. Please, feel free to add any information you consider relevant
Additionally, the current ‘managedwriter’ client is here, maybe you can find it useful

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.