With much help from the answers above I have come to a working example, which is available on github:
https://github.com/alexflint/bigquery-storage-api-example
The main code is:
const (
project = "myproject"
dataset = "mydataset"
table = "mytable"
trace = "bigquery-writeclient-example" // identifies this client for bigquery debugging
)
// the data we will stream to bigquery
var rows = []*Row{
{Name: "John Doe", Age: 104},
{Name: "Jane Doe", Age: 69},
{Name: "Adam Smith", Age: 33},
}
func main() {
ctx := context.Background()
// create the bigquery client
client, err := storage.NewBigQueryWriteClient(ctx)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// create the write stream
// a COMMITTED write stream inserts data immediately into bigquery
resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
WriteStream: &storagepb.WriteStream{
Type: storagepb.WriteStream_COMMITTED,
},
})
if err != nil {
log.Fatal("CreateWriteStream: ", err)
}
// get the stream by calling AppendRows
stream, err := client.AppendRows(ctx)
if err != nil {
log.Fatal("AppendRows: ", err)
}
// get the protobuf descriptor for our row type
var row Row
descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
if err != nil {
log.Fatal("NormalizeDescriptor: ", err)
}
// serialize the rows
var opts proto.MarshalOptions
var data [][]byte
for _, row := range rows {
buf, err := opts.Marshal(row)
if err != nil {
log.Fatal("protobuf.Marshal: ", err)
}
data = append(data, buf)
}
// send the rows to bigquery
err = stream.Send(&storagepb.AppendRowsRequest{
WriteStream: resp.Name,
TraceId: trace, // identifies this client
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
// protocol buffer schema
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: descriptor,
},
// protocol buffer data
Rows: &storagepb.ProtoRows{
SerializedRows: data, // serialized protocol buffer data
},
},
},
})
if err != nil {
log.Fatal("AppendRows.Send: ", err)
}
// get the response, which will tell us whether it worked
_, err = stream.Recv()
if err != nil {
log.Fatal("AppendRows.Recv: ", err)
}
log.Println("done")
}
And the protocol buffer definition for the "Row" struct above is:
syntax = "proto3";
package tutorial;
option go_package = ".;main";
message Row {
string Name = 1;
int32 Age = 2;
}
You need to create a bigquery dataset and table first with a schema that corresponds to the protocol buffer. See the readme in the repository linked above for how to do that.
After running the code above, the data shows up in bigquery like this:
$ bq query 'select * from mydataset.mytable'
Waiting on bqjob_r1b39442e5474a885_0000017df21f629e_1 ... (0s) Current status: DONE
+------------+-----+
| name | age |
+------------+-----+
| John Doe | 104 |
| Jane Doe | 69 |
| Adam Smith | 33 |
+------------+-----+
Thanks all for the help!