Few years ago when NoSQL was trending, like every other team, our team was also enthusiastic about the new and exciting stuff and we were planning to change the database in one of the Application… But, when we got into the finer details of implementation, we remembered what wise men say, “devil is in the details” and eventually we realized that NoSql is not a silver bullet to fix all problems and the answer to NoSQL VS RDMS was “It depends”. Similarly, in last one year, Concurrency libraries like RX-Java, Spring Reactor were trending with enthusiastic statements like Asynchronous Non-Blocking approach is the way to go, etc… In order to not make the same mistake again, have tried to evaluate how concurrency frameworks like ExecutorService, RX-Java, Disruptor, Akka differ from one another and how to identify the right use-case fit for respective frameworks.
Before getting into comparison of concurrency frameworks, a quick refresher on how to configure the optimal number of threads to increase the performance of parallel tasks. This theory applies to all frameworks and the same thread configuration has been used in all frameworks to measure performance.
Reference : http://baddotrobot.com/blog/2013/06/01/optimum-number-of-threads/
Performance tests ran in GCP -> processor model name: Intel(R) Xeon(R) CPU @ 2.30GHz; Architecture: x86_64; No. of cores : 8 (Note : These results are subjective to this use-case and doesn’t imply one framework is better than other)
Label | # of requests | Thread Pool size for I/O Tasks | Average Latency in ms (50 req/sec) |
All the operations are in Sequential order | ~10000 | NA | ~2100 |
Parallelize IO Tasks with Executor Service and use http-thread for in-memory task | ~10000 | 16 | ~1800 |
Parallelize IO Tasks with Executor Service (Completable Future) and use http-thread for in-memory task | ~10000 | 16 | ~1800 |
Parallelize All tasks with ExecutorService and use @Suspended AsyncResponse response to send response in non-blocking manner |
~10000 | 16 | ~3500 |
Use Rx-Java for performing all tasks and use @Suspended AsyncResponse response to send response in non-blocking manner |
~10000 | 16 | ~2300 |
Parallelize All tasks with Disruptor framework (Http thread will be blocked) | ~10000 | 11 | ~3000 |
Parallelize All tasks with Disruptor framework and use @Suspended AsyncResponse response to send response in non-blocking manner |
~10000 | 12 | ~3500 |
Parallelize All tasks with Akka framework (Http thread will be blocked) | ~10000 | ~3000 |
If an Application is deployed in multiple nodes and if req/sec in each node is less than the no. of cores available, then Executor Service can be used to parallelize tasks and execute faster.
If an Application is deployed in multiple nodes and if req/sec in each node is much higher than the no. of cores available, then using ExecutorService to further parallelize can only make things worse.
Label | # of requests | Thread Pool size for I/O Tasks | Average Latency in ms (50 req/sec) |
All the operations are in Sequential order | ~3000 | NA | ~2600 |
Parallelize IO Tasks with Executor Service and use http-thread for in-memory task | ~3000 | 24 | ~3000 |
long startTimeOfIOTasks = System.currentTimeMillis();
String posts = JsonService.getPosts();
String comments = JsonService.getComments();
String albums = JsonService.getAlbums();
String photos = JsonService.getPhotos();
long endTimeOfIOTasks = System.currentTimeMillis();
long timeTakenOfIOTasks = endTimeOfIOTasks - startTimeOfIOTasks;
LOG.info("Time Taken for Sequential Service IO Operations :: " + timeTakenOfIOTasks + " - in Thread "
+ Thread.currentThread().getName());
long startTimeOfNonIOTasks = System.currentTimeMillis();
int userId = new Random().nextInt(10) + 1;
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
long endTimeOfNonIOTasks = System.currentTimeMillis();
long timeTakenOfNonIOTasks = endTimeOfNonIOTasks - startTimeOfNonIOTasks;
long timeTaken = endTimeOfNonIOTasks - startTimeOfIOTasks;
LOG.info("Time Taken for Sequential Service non-IO Operations :: " + timeTakenOfNonIOTasks + " - in Thread "
+ Thread.currentThread().getName());
LOG.info("Time Taken for Sequential Service to build response :: " + timeTaken + " - in Thread "
+ Thread.currentThread().getName());
return response;
List<Callable<String>> ioCallableTasks = new ArrayList<>();
ioCallableTasks.add(JsonService::getPosts);
ioCallableTasks.add(JsonService::getComments);
ioCallableTasks.add(JsonService::getAlbums);
ioCallableTasks.add(JsonService::getPhotos);
ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
List<Future<String>> futuresOfIOTasks = ioExecutorService.invokeAll(ioCallableTasks);
String posts = futuresOfIOTasks.get(0).get();
String comments = futuresOfIOTasks.get(1).get();
String albums = futuresOfIOTasks.get(2).get();
String photos = futuresOfIOTasks.get(3).get();
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
Without AsyncResponse, performance is same as ExecutorService; If multiple API calls has to be async and if it has to be chained, this approach is better. (this is similar to Promises in Node)
int userId = new Random().nextInt(10) + 1;
ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);
CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,
ioExecutorService);
CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,
ioExecutorService);
CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,
ioExecutorService);
CompletableFuture.allOf(postsFuture, commentsFuture, albumsFuture, photosFuture).get();
String posts = postsFuture.get();
String comments = commentsFuture.get();
String albums = albumsFuture.get();
String photos = photosFuture.get();
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
@Suspended AsyncResponse response
to send response in non-blocking way[io vs nio]
From http://tutorials.jenkov.com/java-nio/nio-vs-io.html
If use case is like a server-side chat Application where a thread need not hold the Connection until client responds, then Async non-blocking approach can be preferred over synchronous communication; in those use cases, rather than just waiting, system resources can be put to better use with asynchronous non-blocking approach.
int userId = new Random().nextInt(10) + 1;
ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);
CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,
ioExecutorService);
CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,
ioExecutorService);
CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,
ioExecutorService);
CompletableFuture<String> postsAndCommentsFuture = postsFuture.thenCombineAsync(commentsFuture,
(posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments),
ioExecutorService);
CompletableFuture<String> albumsAndPhotosFuture = albumsFuture.thenCombineAsync(photosFuture,
(albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos),
ioExecutorService);
postsAndCommentsFuture.thenAcceptBothAsync(albumsAndPhotosFuture, (s1, s2) -> {
LOG.info("Building Async Response in Thread " + Thread.currentThread().getName());
String response = s1 + s2;
asyncHttpResponse.resume(response);
}, ioExecutorService);
If Asynchronous non-blocking approach suits a use-case, then RX-Java or any reactive libraries can be preferred (It has additional capabilities like back-pressure which can balance the load between producers and consumers)
int userId = new Random().nextInt(10) + 1;
ExecutorService executor = CustomThreads.getExecutorService(8);
Observable<String> postsObservable = Observable.just(userId).map(o -> JsonService.getPosts())
.subscribeOn(Schedulers.from(executor));
Observable<String> commentsObservable = Observable.just(userId).map(o -> JsonService.getComments())
.subscribeOn(Schedulers.from(executor));
Observable<String> albumsObservable = Observable.just(userId).map(o -> JsonService.getAlbums())
.subscribeOn(Schedulers.from(executor));
Observable<String> photosObservable = Observable.just(userId).map(o -> JsonService.getPhotos())
.subscribeOn(Schedulers.from(executor));
Observable<String> postsAndCommentsObservable = Observable
.zip(postsObservable, commentsObservable,
(posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments))
.subscribeOn(Schedulers.from(executor));
Observable<String> albumsAndPhotosObservable = Observable
.zip(albumsObservable, photosObservable,
(albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos))
.subscribeOn(Schedulers.from(executor));
Observable.zip(postsAndCommentsObservable, albumsAndPhotosObservable, (r1, r2) -> r1 + r2)
.subscribeOn(Schedulers.from(executor))
.subscribe((response) -> asyncResponse.resume(response), e -> asyncResponse.resume("error"));
[Queue vs RingBuffer]
From http://tutorials.jenkov.com/java-concurrency/blocking-queues.html
From https://www.baeldung.com/lmax-disruptor-concurrency
It performs better when used with event-driven architectural patterns and when there is a single producer and multiple consumers with main focus on in-memory tasks.
static {
int userId = new Random().nextInt(10) + 1;
// Sample Event-Handler; count down latch is used to synchronize the thread with http-thread
EventHandler<Event> postsApiHandler = (event, sequence, endOfBatch) -> {
event.posts = JsonService.getPosts();
event.countDownLatch.countDown();
};
DISRUPTOR.handleEventsWith(postsApiHandler, commentsApiHandler, albumsApiHandler)
.handleEventsWithWorkerPool(photosApiHandler1, photosApiHandler2)
.thenHandleEventsWithWorkerPool(postsAndCommentsResponseHandler1, postsAndCommentsResponseHandler2)
.handleEventsWithWorkerPool(albumsAndPhotosResponseHandler1, albumsAndPhotosResponseHandler2);
DISRUPTOR.start();
}
// for each request :
Event event = null;
RingBuffer<Event> ringBuffer = DISRUPTOR.getRingBuffer();
long sequence = ringBuffer.next();
CountDownLatch countDownLatch = new CountDownLatch(6);
try {
event = ringBuffer.get(sequence);
event.countDownLatch = countDownLatch;
event.startTime = System.currentTimeMillis();
} finally {
ringBuffer.publish(sequence);
}
try {
event.countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
From https://blog.codecentric.de/en/2015/08/introduction-to-akka-actors/
// from controller :
Actors.masterActor.tell(new Master.Request("Get Response", event, Actors.workerActor), ActorRef.noSender());
// handler :
public Receive createReceive() {
return receiveBuilder().match(Request.class, request -> {
Event event = request.event; // Ideally, immutable data structures should be used here.
request.worker.tell(new JsonServiceWorker.Request("posts", event), getSelf());
request.worker.tell(new JsonServiceWorker.Request("comments", event), getSelf());
request.worker.tell(new JsonServiceWorker.Request("albums", event), getSelf());
request.worker.tell(new JsonServiceWorker.Request("photos", event), getSelf());
}).match(Event.class, e -> {
if (e.posts != null && e.comments != null & e.albums != null & e.photos != null) {
int userId = new Random().nextInt(10) + 1;
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, e.posts,
e.comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, e.albums,
e.photos);
String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
e.response = response;
e.countDownLatch.countDown();
}
}).build();
}