package com.demo.rxjava;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.TestSubscriber;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.core.IsCollectionContaining.hasItem;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertThat;
public class RxTaskTest {
private static final Logger logger = LoggerFactory.getLogger(RxTask.class);
@Test
public void executeTasks() throws InterruptedException {
TestSubscriber<String> subscriber = new TestSubscriber<String>() {
@Override
public void onComplete() {
super.onComplete();
logger.debug("All tasks executed");
}
@Override
public void onNext(String s) {
super.onNext(s);
logger.debug("Message received: {}", s);
}
};
RxTask rxTask = new RxTask();
rxTask.getTasks().subscribeOn(Schedulers.computation()).subscribe(subscriber);
await().atMost(20, SECONDS).until(subscriber::valueCount, equalTo(3));
logger.debug("Asynchronous ended");
subscriber.assertComplete();
assertThat(subscriber.values(), hasItem("Task 3 failed"));
}
}