package com.demo.rxjava;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.functions.Action;
import io.reactivex.schedulers.Schedulers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RxTask {
private static final Logger logger = LoggerFactory.getLogger(RxTask.class);
private TaskService taskService = new TaskService();
private Flowable<String> tasks;
public RxTask() {
Single<String> task1 = this.executeTask(taskService::task1, "Task 1").subscribeOn(Schedulers.io());
Single<String> task2 = this.executeTask(taskService::task2, "Task 2").subscribeOn(Schedulers.io());
Single<String> task3 = this.executeTask(taskService::task3, "Task 3").subscribeOn(Schedulers.io());
this.tasks = task1.mergeWith(task2).mergeWith(task3);
//.observeOn(Schedulers.io())
}
public static void main(String[] args) throws InterruptedException {
new RxTask().getTasks().subscribe(task -> logger.debug("Received message:{}", task),
exception -> logger.error(exception.getMessage()),
() -> logger.debug("All tasks executed"));
Thread.sleep(10_000);
}
public Flowable<String> getTasks() {
return tasks;
}
private Single<String> executeTask(Action action, String taskName) {
Single<String> task = Single.create(emitter -> {
try {
action.run();
emitter.onSuccess(taskName + " executed successfully");
} catch (Throwable t) {
emitter.onError(t);
}
});
return task.retry(5).onErrorResumeNext(t -> Single.just(taskName + " failed"));
}
}