0

I'm trying to implement method Download table data in the Avro data format from this example, but I don't know how to implement.

namespace {
void ProcessRowsInAvroFormat(
    ::google::cloud::bigquery::storage::v1::AvroSchema const&,
    ::google::cloud::bigquery::storage::v1::AvroRows const&) {
  // Code to deserialize avro rows should be added here.
}
}  // namespace

I installed Apache AVRO C++ library and write the codes like:

bool bq::ReadSessionFromSchema(std::string project_id, std::string dataset_id, std::string table_id)
try
{
    auto table_name = "projects/" + project_id + "/datasets/" + dataset_id + "/tables/" + table_id;
    int max_stream_count = 1;

    google::cloud::bigquery::storage::v1::ReadSession read_session;
    read_session.set_table(table_name);
    read_session.set_data_format(google::cloud::bigquery::storage::v1::DataFormat::AVRO);
    read_session.mutable_read_options()->set_row_restriction(R"(state_name = "Kentucky")");

    auto session = read_client->CreateReadSession("projects/" + project_id, read_session, max_stream_count);
    if (!session)
    {
        std::cerr << session.status() << "\n";
        return false;
    }

    std::cout << "ReadSession successfully created: " << session->name()
              << ".\n";
    constexpr int kRowOffset = 0;
    auto read_rows = read_client->ReadRows(session->streams(0).name(), kRowOffset);

    std::int64_t num_rows = 0;
    for (auto const &row : read_rows)
    {
        if (row.ok())
        {
            num_rows += row->row_count();
            std::cout << row->row_count() << std::endl;

            [](::google::cloud::bigquery::storage::v1::AvroSchema const &schema,
               ::google::cloud::bigquery::storage::v1::AvroRows const &rows)
            {
                auto vs = avro::compileJsonSchemaFromString(schema.schema());
                std::unique_ptr<avro::InputStream> in = avro::memoryInputStream((uint8_t *)(rows.serialized_binary_rows().data()), rows.serialized_binary_rows().size());

                avro::DecoderPtr d = avro::validatingDecoder(vs, avro::binaryDecoder());
                avro::GenericDatum datum(vs);

                d->init(*in);
                avro::decode(*d, datum);

                if (datum.type() == avro::AVRO_RECORD)
                {
                    const avro::GenericRecord &r = datum.value<avro::GenericRecord>();
                    std::cout << "Field-count: " << r.fieldCount() << std::endl;
                    for (auto i = 0; i < r.fieldCount(); i++)
                    {
                        const avro::GenericDatum &f0 = r.fieldAt(i);

                        if (f0.type() == avro::AVRO_STRING)
                        {
                            std::cout << "string: " << f0.value<std::string>() << std::endl;
                        }
                        else if (f0.type() == avro::AVRO_INT)
                        {
                            std::cout << "int: " << f0.value<int>() << std::endl;
                        }
                        else if (f0.type() == avro::AVRO_LONG)
                        {
                            std::cout << "long: " << f0.value<long>() << std::endl;
                        }
                        else
                        {
                            std::cout << f0.type() << std::endl;
                        }
                    }
                }

            }(session->avro_schema(), row->avro_rows());
        }
    }

    std::cout << num_rows << " rows read from table: " << table_name << "\n";
    return true;
}
catch (google::cloud::Status const &status)
{
    std::cerr << "google::cloud::Status thrown: " << status << "\n";
    return false;
}

BigQuery session gives me 3 chunks and each chunk has 41 rows, 25 rows and 10 rows in. But with this code, I can only print first row in the chunks.

I want to print all rows what I received from session.

Original Data is here and I copied this table to my own project.

Expect Result.(78 rows)

project id: MY_PROJECT_ID
ReadSession successfully created: projects/MY_PROJECT_ID/locations/asia-northeast3/sessions/<MY_SESSION_ID>.
row count: 41
Field-count: 25
string: 21083
string: Graves County
string: Kentucky
long: 32460
long: 227
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
string: 21221
string: Trigg County
string: Kentucky
long: 17300
long: 19
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
........<More rows>
........<More rows>
row count: 27
Field-count: 25
string: 21013
string: Bell County
string: Kentucky
long: 33180
long: 223
long: 0
long: 0
long: 0
long: 3
long: 30
long: 26
long: 0
long: 4
long: 0
long: 0
long: 10
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 1
........<More rows>
........<More rows>
row count: 10
Field-count: 25
string: 21015
string: Boone County
string: Kentucky
long: 17140
long: 187
long: 0
long: 0
long: 0
long: 0
long: 51
long: 0
long: 0
long: 18
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 62
long: 0
long: 0
long: 0
long: 16
long: 12
........<More rows>
........<More rows>
78 rows read from table: projects/MY_PROJECT_ID/datasets/MY_DATASET_ID/tables/covid_19

Actual Result.(Only 3 rows)

project id: MY_PROJECT_ID
ReadSession successfully created: projects/MY_PROJECT_ID/locations/asia-northeast3/sessions/<MY_SESSION_ID>.
row count: 41
Field-count: 25
string: 21083
string: Graves County
string: Kentucky
long: 32460
long: 227
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
row count: 27
Field-count: 25
string: 21013
string: Bell County
string: Kentucky
long: 33180
long: 223
long: 0
long: 0
long: 0
long: 3
long: 30
long: 26
long: 0
long: 4
long: 0
long: 0
long: 10
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 1
row count: 10
Field-count: 25
string: 21015
string: Boone County
string: Kentucky
long: 17140
long: 187
long: 0
long: 0
long: 0
long: 0
long: 51
long: 0
long: 0
long: 18
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 62
long: 0
long: 0
long: 0
long: 16
long: 12
78 rows read from table: projects/MY_PROJECT_ID/datasets/MY_DATASET_ID/tables/covid_19

1 Answer 1

0

After a lot spending times for toying codes, I found a working code below.

Use avro::GenericReader.read() to load data from avro::InputStream sequentially. After parsing row, use avro::GenericReader.drain() to remove currunt row and read next row from avro::InputStreamPtr.

bool bq::ReadSessionFromSchema(std::string project_id, std::string dataset_id, std::string table_id)
try
{
    auto table_name = "projects/" + project_id + "/datasets/" + dataset_id + "/tables/" + table_id;
    int max_stream_count = 1;

    google::cloud::bigquery::storage::v1::ReadSession read_session;
    read_session.set_table(table_name);
    read_session.set_data_format(google::cloud::bigquery::storage::v1::DataFormat::AVRO);
    read_session.mutable_read_options()->set_row_restriction(R"(state_name = "Kentucky")");

    auto session = read_client->CreateReadSession("projects/" + project_id, read_session, max_stream_count);
    if (!session)
    {
        std::cerr << session.status() << "\n";
        return false;
    }

    std::cout << "ReadSession successfully created: " << session->name()
              << ".\n";
    constexpr int kRowOffset = 0;
    auto read_rows = read_client->ReadRows(session->streams(0).name(), kRowOffset);

    std::int64_t num_rows = 0;
    for (auto const &row : read_rows)
    {
        if (row.ok())
        {
            num_rows += row->row_count();
            std::cout << "row count: " << row->row_count() << std::endl;

            [](::google::cloud::bigquery::storage::v1::AvroSchema const &schema,
               ::google::cloud::bigquery::storage::v1::AvroRows const &rows,
               int64_t count)
            {
                const avro::ValidSchema vs = avro::compileJsonSchemaFromString(schema.schema());
                std::istringstream iss(rows.serialized_binary_rows(), std::ios::binary);
                std::unique_ptr<avro::InputStream> in = avro::istreamInputStream(iss);

                avro::DecoderPtr d = avro::validatingDecoder(vs, avro::binaryDecoder());
                avro::GenericReader gr(vs, d);
                d->init(*in);

                avro::GenericDatum datum(vs);

                for (auto i = 0; i < count; i++)
                {
                    gr.read(*d, datum, vs);

                    if (datum.type() == avro::AVRO_RECORD)
                    {
                        const avro::GenericRecord &r = datum.value<avro::GenericRecord>();
                        std::cout << "Field-count: " << r.fieldCount() << std::endl;
                        for (auto i = 0; i < r.fieldCount(); i++)
                        {
                            const avro::GenericDatum &f0 = r.fieldAt(i);

                            if (f0.type() == avro::AVRO_STRING)
                            {
                                std::cout << "string: " << f0.value<std::string>() << std::endl;
                            }
                            else if (f0.type() == avro::AVRO_INT)
                            {
                                std::cout << "int: " << f0.value<int>() << std::endl;
                            }
                            else if (f0.type() == avro::AVRO_LONG)
                            {
                                std::cout << "long: " << f0.value<long>() << std::endl;
                            }
                            else
                            {
                                std::cout << f0.type() << std::endl;
                            }
                        }
                    }

                    gr.drain();
                }
            }(session->avro_schema(), row->avro_rows(), row->row_count());
        }
    }

    std::cout << num_rows << " rows read from table: " << table_name << "\n";
    return true;
}
catch (google::cloud::Status const &status)
{
    std::cerr << "google::cloud::Status thrown: " << status << "\n";
    return false;
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.