5

Today I tried to solve a small challenge:

You are a big company with 500 offices, you want to compute the global revenue (sum of revenues of each office).

Each office exposes a service to get the revenue. The call takes a certain delay (network, db access, ...).

Obviously, you want global revenue as fast as possible.

Firstly I tried in python with pretty good results:

import asyncio
import time

DELAYS = (475, 500, 375, 100, 250, 125, 150, 225, 200, 425, 275, 350, 450, 325, 400, 300, 175)


class Office:

    def __init__(self, delay, name, revenue):
        self.delay = delay
        self.name = name
        self.revenue = revenue

    async def compute(self):
        await asyncio.sleep(self.delay / 1000)
        print(f'{self.name} finished in {self.delay}ms')
        return self.revenue


async def main(offices, totest):
    computed = sum(await asyncio.gather(*[o.compute() for o in offices]))
    verdict = ['nok', 'ok'][computed == totest]
    print(f'Sum of revenues = {computed} {verdict}')


if __name__ == "__main__":
    offices = [Office(DELAYS[i % len(DELAYS)], f'Office-{i}', 3 * i + 10) for i in range(500)]
    totest = sum(o.revenue for o in offices)
    start = time.perf_counter()
    asyncio.run(main(offices, totest))
    end = time.perf_counter()
    print(f'Ends in {(end-start)*1000:.3f}ms')

On my computer it takes around 500ms, the ideal case (because 500ms is the maximum delay)

Next, I tried in java with RxJava:

import java.util.concurrent.TimeUnit;

public class Office {
    private int sleepTime;
    private String name;
    private int revenue;

    public Office(int sleepTime, String name, int revenue) {
        this.sleepTime = sleepTime;
        this.name = name;
        this.revenue = revenue;
    }

    public int getRevenue() {
        return revenue;
    }

    public int compute() {
        try {
            TimeUnit.MILLISECONDS.sleep(this.sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("%s finished in %dms on thread %d%n", this.name, this.sleepTime, Thread.currentThread().getId());
        return this.revenue;
    }
}

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;

public class Tester {
    private static int[] DELAYS = {475, 500, 375, 100, 250, 125, 150, 225, 200, 425, 275, 350, 450, 325, 400, 300, 175};

    public static void main(String[] args) {
        final ArrayList<Office> offices = new ArrayList<>();

        for (int i = 0; i < 500; i++) {
            offices.add(new Office(DELAYS[i % DELAYS.length], String.format("Office-%d", i), 3 * i + 10));
        }
        int totest = offices.stream().mapToInt(Office::getRevenue).sum();

        final Instant start = Instant.now();
        final Flowable<Office> officeObservable = Flowable.fromIterable(offices);
        int computation = officeObservable.parallel(500).runOn(Schedulers.io()).map(Office::compute).reduce(Integer::sum).blockingSingle();
        boolean verdict = computation == totest;
        System.out.println("" + computation + " " + (verdict ? "ok" : "nok"));
        final Instant end = Instant.now();

        System.out.printf("Ends in %dms%n", Duration.between(start, end).toMillis());

    }
}

On my computer, it takes around 1000ms (with a pool of 500 threads !!).

Of course, I tried with different number of threads but results are worst or similar.

I don't want to compare Python and Java, I just want:

Explanations if I did mistakes

A better approach?

Also, python async uses only one thread but in Java I didn't find how not to use multithreading to have a similar result.

Maybe someone could help me? :-)

1
  • Does total time decrease when you limit office count to 50? If yes, does it decrease linearly with count or there is sudden increase after some number? Commented Feb 8, 2019 at 19:15

2 Answers 2

1

It quite simple. On the python side, you wait in async mode ( not blocking) on the java side, you wait with a blocking code hence the difference.

The correct code in java should be:

package com.test;

import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Publisher;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;


public class TestReactive {

    public static class Office {
        private int sleepTime;
        private String name;
        private int revenue;

        public Office(int sleepTime, String name, int revenue) {
            this.sleepTime = sleepTime;
            this.name = name;
            this.revenue = revenue;
        }

        public int getRevenue() {
            return revenue;
        }

        public Publisher<Integer> compute() {
            return Single.just("")
                    .delay(this.sleepTime, TimeUnit.MILLISECONDS)
                    .map(x-> {
                        System.out.printf("%s finished in %dms on thread %d%n", this.name, this.sleepTime, Thread.currentThread().getId());
                        return this.revenue;
                    }).toFlowable();
        }
    }

    private static int[] DELAYS = {475, 500, 375, 100, 250, 125, 150, 225, 200, 425, 275, 350, 450, 325, 400, 300, 175};

    public static void main(String[] args) {
        final ArrayList<Office> offices = new ArrayList<>();

        for (int i = 0; i < 500; i++) {
            offices.add(new Office(DELAYS[i % DELAYS.length], String.format("Office-%d", i), 3 * i + 10));
        }
        int totest = offices.stream().mapToInt(Office::getRevenue).sum();

        final Instant start = Instant.now();

        final Flowable<Office> officeObservable = Flowable.fromIterable(offices);
        int computation = officeObservable.parallel(2).runOn(Schedulers.io()).flatMap(Office::compute).reduce(Integer::sum).blockingSingle();
        boolean verdict = computation == totest;
        System.out.println("" + computation + " " + (verdict ? "ok" : "nok"));
        final Instant end = Instant.now();

        System.out.printf("Ends in %dms%n", Duration.between(start, end).toMillis());

    }

}

Edit: I set a parallel of 2 but who care, you can put a single thread as it is not a CPU limit issue here.

Sign up to request clarification or add additional context in comments.

Comments

0

After a lot of tries (thanks M. T for you help), finally I have a good Java implementation!

public class Office {
    private int sleepTime;
    private int revenue;

    public Office(int sleepTime, int revenue) {
        this.sleepTime = sleepTime;
        this.revenue = revenue;
    }

    public int getRevenue() {
        return revenue;
    }

    public Single<Integer> compute() {
        return Single.timer(sleepTime, TimeUnit.MILLISECONDS).map(l -> this.revenue);
    }
}


public class Tester {
    private static int[] DELAYS = {475, 500, 375, 100, 250, 125, 150, 225, 200, 425, 275, 350, 450, 325, 400, 300, 175};

    public static void main(String[] args) {
        final ArrayList<Office> offices = new ArrayList<>();

        for (int i = 0; i < 1_000_000; i++) {
            offices.add(new Office(DELAYS[i % DELAYS.length], 1));
        }
        int totest = offices.stream().mapToInt(Office::getRevenue).sum();

        final Instant start = Instant.now();
        final Flowable<Office> officeObservable = Flowable.fromIterable(offices);
        int computation = officeObservable.flatMapSingle(Office::compute).reduce(Integer::sum).blockingGet();
        boolean verdict = computation == totest;
        System.out.println("" + computation + " " + (verdict ? "ok" : "nok"));
        final Instant end = Instant.now();

        System.out.printf("Ends in %dms%n", Duration.between(start, end).toMillis());
    }
}

This code is blazing fast! 2s for 1_000_000 offices!

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.