Menu

RxJdbc

Gary Cheng
package com.example.jdbc;

import io.reactivex.Observable;
import io.reactivex.Single;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.sql.UpdateResult;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.ext.jdbc.JDBCClient;
import io.vertx.reactivex.ext.sql.SQLConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RxJDBCExample extends AbstractVerticle {
    private static final Logger logger = LoggerFactory.getLogger(RxJDBCExample.class);
    private static final String CONFIG_FILE = "src/config/app-conf.json";
    private static final String SQL_CREATE_TABLE = "CREATE TABLE Product(PROD_ID VARCHAR(20) PRIMARY KEY, PROD_NAME VARCHAR(200) NOT NULL)";
    private static final String SQL_INSERT_PRODUCT = "INSERT INTO Product(PROD_ID, PROD_NAME) VALUES (?, ?)";
    private static final String SQL_SELECT_PRODUCTS = "SELECT * FROM Product";

    private JDBCClient jdbc;

    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        vertx.fileSystem().rxReadFile(CONFIG_FILE)
                .flatMap(buffer -> Single.just(buffer.toJsonObject()))
                .flatMap(config -> vertx.rxDeployVerticle(RxJDBCExample.class.getName(), new DeploymentOptions().setConfig(config)))
                .subscribe(id -> logger.debug("Vertical deployed, deployment ID:{}", id));
    }

    @Override
    public void start() throws Exception {
        logger.debug("Verticle RxJDBCExample started");
        JsonObject config = this.config();
        logger.debug("Config:{}", config.toString());
        jdbc = JDBCClient.createShared(vertx, config);
        jdbc.rxGetConnection().flatMapPublisher(conn ->
                conn.rxUpdate(SQL_CREATE_TABLE)
                        .flatMap(res -> this.insertProduct(conn, "1", "Product 1"))
                        .flatMap(res -> this.insertProduct(conn, "2", "Product 2"))
                        .flatMap(res -> conn.rxQuery(SQL_SELECT_PRODUCTS))
                        .flattenAsFlowable(resultSet -> resultSet.getRows())
                        .doAfterTerminate(conn::close)
        ).subscribe(json -> logger.debug(json.encodePrettily()));
//        jdbc.rxGetConnection().flatMapPublisher(conn ->
//                conn.rxUpdate(SQL_CREATE_TABLE)
//                        .flatMap(res -> this.insertProduct(conn, "1", "Product 1"))
//                        .flatMap(res -> this.insertProduct(conn, "2", "Product 2"))
//                        .flatMap(res -> conn.rxQueryStream(SQL_SELECT_PRODUCTS))
//                        .flatMapPublisher(sqlRowStream -> sqlRowStream.)
//                        .doAfterTerminate(conn::close)
//        ).subscribe(row -> logger.debug("row:{}", row.toString()));
    }

    private Single<UpdateResult> insertProduct(SQLConnection conn, String productId, String productName) {
        logger.debug("Add product id:{}, name:{}", productId, productName);
        JsonArray params = new JsonArray().add(productId).add(productName);
        return conn.rxUpdateWithParams(SQL_INSERT_PRODUCT, params);
    }
}

MongoDB Logo MongoDB