Menu

RxjavaTest

Gary Cheng
package com.demo.rxjava;

import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.TestSubscriber;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.core.IsCollectionContaining.hasItem;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertThat;

public class RxTaskTest {
    private static final Logger logger = LoggerFactory.getLogger(RxTask.class);

    @Test
    public void executeTasks() throws InterruptedException {
        TestSubscriber<String> subscriber = new TestSubscriber<String>() {
            @Override
            public void onComplete() {
                super.onComplete();
                logger.debug("All tasks executed");
            }

            @Override
            public void onNext(String s) {
                super.onNext(s);
                logger.debug("Message received: {}", s);
            }
        };
        RxTask rxTask = new RxTask();
        rxTask.getTasks().subscribeOn(Schedulers.computation()).subscribe(subscriber);
        await().atMost(20, SECONDS).until(subscriber::valueCount, equalTo(3));
        logger.debug("Asynchronous ended");
        subscriber.assertComplete();
        assertThat(subscriber.values(), hasItem("Task 3 failed"));
    }
}

MongoDB Logo MongoDB