Menu

RxTask

Gary Cheng
package com.demo.rxjava;


import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.functions.Action;
import io.reactivex.schedulers.Schedulers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RxTask {
    private static final Logger logger = LoggerFactory.getLogger(RxTask.class);
    private TaskService taskService = new TaskService();
    private Flowable<String> tasks;

    public RxTask() {
        Single<String> task1 = this.executeTask(taskService::task1, "Task 1").subscribeOn(Schedulers.io());
        Single<String> task2 = this.executeTask(taskService::task2, "Task 2").subscribeOn(Schedulers.io());
        Single<String> task3 = this.executeTask(taskService::task3, "Task 3").subscribeOn(Schedulers.io());
        this.tasks = task1.mergeWith(task2).mergeWith(task3);
        //.observeOn(Schedulers.io())
    }

    public static void main(String[] args) throws InterruptedException {
        new RxTask().getTasks().subscribe(task -> logger.debug("Received message:{}", task),
                exception -> logger.error(exception.getMessage()),
                () -> logger.debug("All tasks executed"));
        Thread.sleep(10_000);
    }

    public Flowable<String> getTasks() {
        return tasks;
    }

    private Single<String> executeTask(Action action, String taskName) {
        Single<String> task = Single.create(emitter -> {
            try {
                action.run();
                emitter.onSuccess(taskName + " executed successfully");
            } catch (Throwable t) {
                emitter.onError(t);
            }
        });
        return task.retry(5).onErrorResumeNext(t -> Single.just(taskName + " failed"));
    }
}

MongoDB Logo MongoDB