I have this class (non-Rx) that concurrently starts 1000 producer threads and 1000 consumer threads, and then waits until they exchange a predefined number of messages through a simple implementation of a blocking queue. After this process completes, observers notified with result:
public class ProducerConsumerBenchmarkUseCase extends BaseObservable<ProducerConsumerBenchmarkUseCase.Listener> {
public static interface Listener {
void onBenchmarkCompleted(Result result);
}
public static class Result {
private final long mExecutionTime;
private final int mNumOfReceivedMessages;
public Result(long executionTime, int numOfReceivedMessages) {
mExecutionTime = executionTime;
mNumOfReceivedMessages = numOfReceivedMessages;
}
public long getExecutionTime() {
return mExecutionTime;
}
public int getNumOfReceivedMessages() {
return mNumOfReceivedMessages;
}
}
private static final int NUM_OF_MESSAGES = 1000;
private static final int BLOCKING_QUEUE_CAPACITY = 5;
private final Object LOCK = new Object();
private final Handler mUiHandler = new Handler(Looper.getMainLooper());
private final MyBlockingQueue mBlockingQueue = new MyBlockingQueue(BLOCKING_QUEUE_CAPACITY);
private int mNumOfFinishedConsumers;
private int mNumOfReceivedMessages;
private long mStartTimestamp;
public void startBenchmarkAndNotify() {
synchronized (LOCK) {
mNumOfReceivedMessages = 0;
mNumOfFinishedConsumers = 0;
mStartTimestamp = System.currentTimeMillis();
}
// watcher-reporter thread
new Thread(() -> {
synchronized (LOCK) {
while (mNumOfFinishedConsumers < NUM_OF_MESSAGES) {
try {
LOCK.wait();
} catch (InterruptedException e) {
return;
}
}
}
notifySuccess();
}).start();
// producers init thread
new Thread(() -> {
for (int i = 0; i < NUM_OF_MESSAGES; i++) {
startNewProducer(i);
}
}).start();
// consumers init thread
new Thread(() -> {
for (int i = 0; i < NUM_OF_MESSAGES; i++) {
startNewConsumer();
}
}).start();
}
private void startNewProducer(final int index) {
new Thread(() -> mBlockingQueue.put(index)).start();
}
private void startNewConsumer() {
new Thread(() -> {
int message = mBlockingQueue.take();
synchronized (LOCK) {
if (message != -1) {
mNumOfReceivedMessages++;
}
mNumOfFinishedConsumers++;
LOCK.notifyAll();
}
}).start();
}
private void notifySuccess() {
mUiHandler.post(() -> {
Result result;
synchronized (LOCK) {
result =
new Result(
System.currentTimeMillis() - mStartTimestamp,
mNumOfReceivedMessages
);
}
for (Listener listener : getListeners()) {
listener.onBenchmarkCompleted(result);
}
});
}
}
Now I want to refactor it to Rx. So far, I managed to get this far:
public class ProducerConsumerBenchmarkUseCase {
public static class Result {
private final long mExecutionTime;
private final int mNumOfReceivedMessages;
public Result(long executionTime, int numOfReceivedMessages) {
mExecutionTime = executionTime;
mNumOfReceivedMessages = numOfReceivedMessages;
}
public long getExecutionTime() {
return mExecutionTime;
}
public int getNumOfReceivedMessages() {
return mNumOfReceivedMessages;
}
}
private static final int NUM_OF_MESSAGES = 1000;
private static final int BLOCKING_QUEUE_CAPACITY = 5;
private final MyBlockingQueue mBlockingQueue = new MyBlockingQueue(BLOCKING_QUEUE_CAPACITY);
private final AtomicInteger mNumOfFinishedConsumers = new AtomicInteger(0);
private final AtomicInteger mNumOfReceivedMessages = new AtomicInteger(0);
private volatile long mStartTimestamp;
public Maybe<Result> startBenchmark() {
return Maybe.fromCallable(new Callable<Result>() {
@Override
public Result call() {
mNumOfReceivedMessages.set(0);
mNumOfFinishedConsumers.set(0);
mStartTimestamp = System.currentTimeMillis();
Observable.range(0, NUM_OF_MESSAGES)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.forEach(
index -> newProducer(index).subscribe()
);
Observable.range(0, NUM_OF_MESSAGES)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.map(index -> newConsumer())
.doOnNext(completable -> completable.subscribe())
.flatMap(completable -> { return Observable.just(completable); })
.toList()
.blockingGet();
return new Result(
System.currentTimeMillis() - mStartTimestamp,
mNumOfReceivedMessages.get()
);
}
});
}
private Completable newProducer(final int index) {
return Completable
.fromAction(() -> mBlockingQueue.put(index))
.subscribeOn(Schedulers.io());
}
private Completable newConsumer() {
return Completable
.fromAction(() -> {
int message = mBlockingQueue.take();
if (message != -1) {
mNumOfReceivedMessages.incrementAndGet();
}
})
.subscribeOn(Schedulers.io());
}
}
This code compiles, runs and even completes, but the resulting number of exchanged messages is smaller than 1000, which means that there is some kind of a problem here.
What did I do wrong?
I don't fully understand why you'd want to do this type of processing. Also I don't think there is a need for message counting because you are supposed to generate 1000 messages. Note that with this type of benchmark, you are most likely measure how fast the system can create 2000 threads.
public Maybe<Result> startBenchmark() {
return
Flowable.range(0, NUM_OF_MESSAGES)
.flatMap(id ->
Flowable.fromCallable(() -> id) // <-- generate message
.subscribeOn(Schedulers.io())
)
.parallel(NUM_OF_MESSAGES)
.runOn(Schedulers.io())
.doOnNext(msg -> { }) // <-- process message
.sequential()
.count()
.doOnSubscribe(s -> { mStartTimestamp = System.currentTimeMillis(); })
.map(cnt -> new Result(System.currentTimeMillis() - mStartTimestamp, cnt))
.toMaybe();
}