1

I'm trying to insert data in parallel ,using spring r2dbc, to psql table
No matter how I run ,either in different connections or using the same connection and running in parallel batches/statements it always revert to a single thread (reactor-tcp-nio-1) and running in serial and not parallel which affect performance

Multiple connections

  @Test
    void testMultipleStatementBatch2() {
        final List<List<Tuple2<Integer, String>>> result = Flux.range(0, 100)
                .parallel()
                .runOn(Schedulers.parallel())
                .flatMap(index -> Flux.usingWhen(
                        connectionFactory.create(),
                        connection -> {
                            log.debug("Running index:{}", index);
                            return batch(connection);
                        },
                        Connection::close
                ))
                .doOnNext(tuple2s -> log.debug("result:{}", tuple2s))
                .sequential()
                .collectList()
                .block();
        log.debug("list:{}", gson.toJson(result));
    }

 private Mono<List<Tuple2<Integer, String>>> batch(Connection connection) {
        log.debug("start running");
        final Batch batch = connection.createBatch();
        IntStream.range(0, 10).boxed()
                .forEach(index ->
                        batch.add("INSERT INTO my_test(key,value,comment) " +
                                "values (" +
                                "'" + index + "'" +
                                ",'value_" + index + "'" +
                                ",'comment_" + index + "'" +
                                ")" +
                                " RETURNING *"));
        return Flux.from(batch.execute())
                .flatMap(result -> result.map((row, rowMetadata) -> {
                    log.debug("Result values");
                    final Integer id = row.get("key", Integer.class);
                    final String value = row.get("value", String.class);
                    return Tuples.of(id, value);
                }))
                .collectList();
    }
...
20230831 13:33:31.167 DEBUG [Test worker]          QUERY.debug(249) - Executing query: drop table if exists my_test cascade ; create table my_test(key int,value varchar(255),comment varchar(255))
20230831 13:33:31.171 WARN  [reactor-tcp-nio-1]    ReactorNettyClient.warn(289) - Notice: SEVERITY_LOCALIZED=NOTICE, SEVERITY_NON_LOCALIZED=NOTICE, CODE=00000, MESSAGE=table "my_test" does not exist, skipping, FILE=tablecmds.c, LINE=1186, ROUTINE=DropErrorMsgNonExistent
20230831 13:33:32.256 DEBUG [parallel-6]           StatementConnectionTest.lambda$testMultipleStatementBatch2$3(76) - Running index:1
20230831 13:33:32.257 DEBUG [parallel-6]           StatementConnectionTest.batch(96) - start running
20230831 13:33:32.265 DEBUG [parallel-6]           QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 13:33:32.276 DEBUG [parallel-6]           StatementConnectionTest.lambda$testMultipleStatementBatch2$3(76) - Running index:0
...
20230831 13:33:32.312 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.313 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.314 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.314 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.315 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$testMultipleStatementBatch2$5(81) - result:[[0,value_0], [1,value_1], [2,value_2], [3,value_3], [4,value_4], [5,value_5], [6,value_6], [7,value_7], [8,value_8], [9,value_9]]
20230831 13:33:32.316 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$testMultipleStatementBatch2$3(76) - Running index:12
20230831 13:33:32.316 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.batch(96) - start running
20230831 13:33:32.317 DEBUG [reactor-tcp-nio-1]    QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 13:33:32.318 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.319 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.320 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.321 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.322 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.323 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values

Multiple batches

 @Test
    void runBatchParallelism() {
        var list = Mono.usingWhen(dbService.createConnection(),
                        this::batchParallelism,
                        Connection::close)
                .block();
        log.debug("list:{}", gson.toJson(list));
    }

 private Mono<List<List<Tuple2<Integer, String>>>> batchParallelism(Connection connection) {
        log.debug("start running");
        return Flux.range(0, 5).parallel().runOn(Schedulers.parallel())
                .flatMap(fluxIndex -> {
                    log.debug("flux index:{}", fluxIndex);
                    final Batch batch = connection.createBatch();
                    IntStream.range(0, 10).boxed()
                            .forEach(index ->
                                    batch.add("INSERT INTO my_test(key,value,comment) " +
                                            "values (" +
                                            "'" + index + "'" +
                                            ",'value_" + index + "'" +
                                            ",'comment_" + index + "'" +
                                            ")" +
                                            " RETURNING *"));
                    return Flux.from(batch.execute())
                            .flatMap(result -> result.map((row, rowMetadata) -> {
                                log.debug("Result values");
                                final Integer id = row.get("key", Integer.class);
                                final String value = row.get("value", String.class);
                                return Tuples.of(id, value);
                            }))
                            .collectList();
                }).sequential()
                .collectList();
    }
...
20230831 14:05:08.608 DEBUG [Test worker]          StatementConnectionTest.batchParallelism(100) - start running
20230831 14:05:08.646 DEBUG [parallel-3]           StatementConnectionTest.lambda$batchParallelism$9(103) - flux index:4
20230831 14:05:08.646 DEBUG [parallel-1]           StatementConnectionTest.lambda$batchParallelism$9(103) - flux index:2
20230831 14:05:08.646 DEBUG [parallel-2]           StatementConnectionTest.lambda$batchParallelism$9(103) - flux index:3
20230831 14:05:08.646 DEBUG [parallel-8]           StatementConnectionTest.lambda$batchParallelism$9(103) - flux index:1
20230831 14:05:08.646 DEBUG [parallel-7]           StatementConnectionTest.lambda$batchParallelism$9(103) - flux index:0
20230831 14:05:08.660 DEBUG [parallel-1]           QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 14:05:08.673 DEBUG [parallel-8]           QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 14:05:08.678 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.680 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.681 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.683 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.684 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.685 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.686 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.687 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.688 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.688 DEBUG [parallel-7]           QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 14:05:08.689 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.690 DEBUG [reactor-tcp-nio-1]    QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 14:05:08.691 DEBUG [reactor-tcp-nio-1]    QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 14:05:08.694 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.696 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
...

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.