Menu

ServiceDiscovery

Gary Cheng
package com.example.common;

import io.reactivex.Completable;
import io.reactivex.Single;
import io.vertx.circuitbreaker.CircuitBreakerOptions;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.circuitbreaker.CircuitBreaker;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.http.HttpClient;
import io.vertx.reactivex.servicediscovery.ServiceDiscovery;
import io.vertx.reactivex.servicediscovery.types.HttpEndpoint;
import io.vertx.servicediscovery.Record;
import io.vertx.servicediscovery.ServiceDiscoveryOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**

 * The base verticle class which provided some common functions include service discovery
 *
 * @author Gary Cheng
 */
public abstract class BaseMicroServicesVerticle extends AbstractVerticle {
    public static final String KEY_SERVICE_NAME = "service.name";
    public static final String KEY_HOST = "host";
    public static final String KEY_PORT = "port";
    public static final String KEY_ROOT = "root";
    private static final String KEY_NAME = "name";

    private static final Logger logger = LoggerFactory.getLogger(BaseMicroServicesVerticle.class);

    protected ServiceDiscovery discovery;
    private Collection<Record> publishedRecords = new ConcurrentHashSet<>();
    private Map<String, CircuitBreaker> circuitBreakerMap = new ConcurrentHashMap<>();
    private Map<String, Object> serviceClientMap = new ConcurrentHashMap<>();

    @Override
    public void start() {
        logger.debug("Starting verticle - {}", this.getClass().getName());
        logger.debug("Config:{}", this.config().encodePrettily());
        this.discovery = ServiceDiscovery.create(vertx, new ServiceDiscoveryOptions().setBackendConfiguration(this.getServiceDiscoveryConfig()));
    }

    @Override
    public void stop() {
        logger.debug("Stopping verticle - {}", this.getClass().getName());
        if (null != discovery) {
            List<Completable> completables = new ArrayList<>();
            publishedRecords.forEach(record -> completables.add(discovery.rxUnpublish(record.getRegistration())));
            Completable.merge(completables)
                    .subscribe(() -> {
                        logger.debug("All services have been un-published");
                        discovery.close();
                    });
        }
    }

    /**

     * Return Service discovery configure, the default implementation store them in verticle's config()
     *
     * @return
     */
    protected JsonObject getServiceDiscoveryConfig() {
        return this.config();
    }

    /**

     * Return circuit breaker of service by service name
     *
     * @param serviceName the name of service
     * @return
     */
    protected final CircuitBreaker getCircuitBreaker(String serviceName) {
        CircuitBreaker circuitBreaker = circuitBreakerMap.get(serviceName);
        if (null == circuitBreaker) {
            circuitBreaker = this.createCircuitBreaker(serviceName);
            this.circuitBreakerMap.put(serviceName, circuitBreaker);
        }
        return circuitBreaker;
    }

    /**

     * Create circuit breaker for service
     *
     * @param serviceName the name of service
     * @return
     */
    protected CircuitBreaker createCircuitBreaker(String serviceName) {
        logger.debug("create circuit breaker for service {}", serviceName);
        String circuitBreakerName = serviceName + "-" + "circuit-breaker";
        CircuitBreakerOptions options = new CircuitBreakerOptions()
                .setMaxFailures(5)
                .setTimeout(5000)
                .setResetTimeout(10000)
                .setFallbackOnFailure(true);
        return CircuitBreaker.create(circuitBreakerName, vertx, options)
                .openHandler(v -> logger.debug("{} opened", circuitBreakerName))
                .halfOpenHandler(v -> logger.debug("{} half opened", circuitBreakerName))
                .closeHandler(v -> logger.debug("{} closed", circuitBreakerName));
    }

    /**

     * Lookup ServiceDiscovery by service name and return an async HttpClient
     *
     * @param serviceName
     * @return
     */
    protected final Single<HttpClient> getServiceHttpClient(String serviceName) {
        logger.debug("Get HTTP client by service name[{}]", serviceName);
        Object serviceClient = this.serviceClientMap.get(serviceName);
        if (null != serviceClient) {
            return Single.just((HttpClient) serviceClient);
        } else {
            return HttpEndpoint.rxGetClient(discovery, new JsonObject().put(KEY_NAME, serviceName))
                    .doOnSuccess(httpClient -> this.serviceClientMap.put(serviceName, httpClient));
        }
    }

    /**

     * Publish a HttpEndPoint service to ServiceDiscovery
     *
     * @param config the configure of HttpEndPoint
     * @return
     */
    protected final Single<Record> publishHttpEndPoint(JsonObject config) {
        String serviceName = config.getString(KEY_SERVICE_NAME);
        String host = config.getString(KEY_HOST, "localhost");
        Integer port = config.getInteger(KEY_PORT, 8080);
        String root = config.getString(KEY_ROOT, "/");
        return this.publishRecord(HttpEndpoint.createRecord(serviceName, host, port, root));
    }

    /**

     * Publish a service record to ServiceDiscovery
     *
     * @param record record to publish
     * @return
     */
    private Single<Record> publishRecord(Record record) {
        return discovery.rxPublish(record)
                .flatMap(r -> {
                    logger.debug("Service {} has been published", record.getName());
                    this.publishedRecords.add(r);
                    return Single.just(r);
                });
    }
}
package com.example.common;

import io.vertx.core.Handler;
import io.vertx.core.http.HttpMethod;
import io.vertx.reactivex.core.http.HttpClient;
import io.vertx.reactivex.core.http.HttpClientRequest;
import io.vertx.reactivex.ext.web.Router;
import io.vertx.reactivex.ext.web.RoutingContext;
import io.vertx.reactivex.ext.web.handler.CorsHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

/**

 * The abstract verticle which work as HTTP server
 *
 * @author Gary Cheng
 */
public abstract class BaseHttpMicroServicesVerticle extends BaseMicroServicesVerticle {
    private static final Logger logger = LoggerFactory.getLogger(BaseMicroServicesVerticle.class);

    /**

     * Enable CORS
     *
     * @param router
     */
    protected void enableCorsSupport(Router router) {
        Set<String> allowedHeaders = new HashSet<>();
        allowedHeaders.add("x-requested-with");
        allowedHeaders.add("Access-Control-Allow-Origin");
        allowedHeaders.add("origin");
        allowedHeaders.add("Content-Type");
        allowedHeaders.add("accept");

        CorsHandler corsHandler = CorsHandler.create("*").allowedHeaders(allowedHeaders);
        Arrays.asList(HttpMethod.values()).stream().forEach(method -> corsHandler.allowedMethod(method));
        router.route().handler(corsHandler);
    }

    protected void dispatchHttpRequest(RoutingContext context, String serviceName, Handler<? super Throwable> errorHandler) {
        logger.debug("Dispatch Http Request {} to service {}", context.request().uri(), serviceName);
        this.getCircuitBreaker(serviceName).rxExecuteCommandWithFallback(
                future -> this.getServiceHttpClient(serviceName)
                        .subscribe(httpClient -> this.executeDispatchHttpRequest(context, httpClient, future),
                                throwable -> future.fail("Service [" + serviceName + "] not published")),
                throwable -> this.circuitFallback(errorHandler, throwable))
                .subscribe(v -> logger.debug("dispatch request completed"));
    }

    private Void circuitFallback(Handler<? super Throwable> errorHandler, Throwable throwable) {
        errorHandler.handle(throwable);
        return null;
    }

    private void executeDispatchHttpRequest(RoutingContext context, HttpClient httpClient, io.vertx.reactivex.core.Future<Void> future) {
        logger.debug("executeDispatchHttpRequest, uri:{}", context.request().uri());
        HttpClientRequest clientRequest = httpClient.request(context.request().method(), context.request().uri());
        clientRequest.toFlowable()
                .flatMap(response -> {
                    context.response().setStatusCode(response.statusCode());
                    response.headers().getDelegate().forEach(header -> context.response().putHeader(header.getKey(), header.getValue()));
                    return response.toFlowable();
                })
                .subscribe(buffer -> {
                    context.response().end(buffer);
                    future.complete();
                }, future::fail);
        context.request().headers().getDelegate().forEach(header -> clientRequest.putHeader(header.getKey(), header.getValue()));
        if (context.user() != null) {
            clientRequest.putHeader("user-principal", context.user().principal().encode());
        }
        if (null != context.getBody()) {
            clientRequest.end(context.getBody());
        } else {
            clientRequest.end();
        }
    }
}
package com.example.discovery;

import com.example.common.BaseHttpMicroServicesVerticle;
import io.vertx.core.Future;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.ext.web.Router;
import io.vertx.reactivex.ext.web.RoutingContext;
import io.vertx.reactivex.ext.web.handler.BodyHandler;
import io.vertx.servicediscovery.rest.ServiceDiscoveryRestEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ApiGatewayVerticle extends BaseHttpMicroServicesVerticle {
    private static final Logger logger = LoggerFactory.getLogger(ApiGatewayVerticle.class);
    private static final int PORT = 8080;
    private static final String PREFIX_SERVICE = "/service/";

    public static void main(String[] args) throws InterruptedException {
        Vertx.rxClusteredVertx(new VertxOptions().setClustered(true))
                .flatMap(vertx -> vertx.rxDeployVerticle(ApiGatewayVerticle.class.getName()))
                .subscribe(id -> logger.debug("GatewayVerticle deployed successfully"));
    }

    @Override
    public void start(Future<Void> startFuture) throws Exception {
        super.start();
        Router router = Router.router(vertx);
        this.enableCorsSupport(router);
        ServiceDiscoveryRestEndpoint.create(router.getDelegate(), discovery.getDelegate());
        this.configureRouter(router);
        vertx.createHttpServer()
                .requestHandler(router::accept)
                .rxListen(PORT)
                .subscribe(s -> {
                    logger.debug("Gateway HTTP server started on port 8080");
                    startFuture.complete();
                }, startFuture::fail);
    }

    private void configureRouter(Router router) {
        router.route().handler(BodyHandler.create());
        router.route(PREFIX_SERVICE + "*").handler(this::serviceHandler);
        String regexMatcher = "^(?!" + PREFIX_SERVICE + ").*";
        router.routeWithRegex(regexMatcher).handler(this::indexHandler);
        //router.route("/").handler(this::indexHandler);
    }

    private void indexHandler(RoutingContext context) {
        logger.debug("Received http request");
        context.response()
                .putHeader("content-type", "application/json")
                .end(new JsonObject().put("message", "Welcome to gateway").encodePrettily());
    }

    private void serviceHandler(RoutingContext context) {
        String path = context.request().uri();
        logger.debug("path:{}", path);
        String serviceName = path.substring(PREFIX_SERVICE.length()).split("/")[0];
        logger.debug("Service Name:{}", serviceName);
        if (null == serviceName || serviceName.trim().equalsIgnoreCase("")) {
            this.error(context, 500, "Empty service name");
        } else {
            this.dispatchHttpRequest(context, serviceName, error -> this.error(context, error));
        }
    }

    private void error(RoutingContext context, Throwable throwable) {
        this.error(context, 500, throwable.getMessage());
    }

    private void error(RoutingContext context, int statusCode, String errorMessage) {
        if (!context.response().ended()) {
            context.response().setStatusCode(statusCode).putHeader("content-type", "application/json")
                    .end(new JsonObject().put("statusCode", statusCode).put("error", errorMessage).encodePrettily());
        }
    }
}

MongoDB Logo MongoDB