package com.example.common;
import io.reactivex.Completable;
import io.reactivex.Single;
import io.vertx.circuitbreaker.CircuitBreakerOptions;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.circuitbreaker.CircuitBreaker;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.http.HttpClient;
import io.vertx.reactivex.servicediscovery.ServiceDiscovery;
import io.vertx.reactivex.servicediscovery.types.HttpEndpoint;
import io.vertx.servicediscovery.Record;
import io.vertx.servicediscovery.ServiceDiscoveryOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* The base verticle class which provided some common functions include service discovery
*
* @author Gary Cheng
*/
public abstract class BaseMicroServicesVerticle extends AbstractVerticle {
public static final String KEY_SERVICE_NAME = "service.name";
public static final String KEY_HOST = "host";
public static final String KEY_PORT = "port";
public static final String KEY_ROOT = "root";
private static final String KEY_NAME = "name";
private static final Logger logger = LoggerFactory.getLogger(BaseMicroServicesVerticle.class);
protected ServiceDiscovery discovery;
private Collection<Record> publishedRecords = new ConcurrentHashSet<>();
private Map<String, CircuitBreaker> circuitBreakerMap = new ConcurrentHashMap<>();
private Map<String, Object> serviceClientMap = new ConcurrentHashMap<>();
@Override
public void start() {
logger.debug("Starting verticle - {}", this.getClass().getName());
logger.debug("Config:{}", this.config().encodePrettily());
this.discovery = ServiceDiscovery.create(vertx, new ServiceDiscoveryOptions().setBackendConfiguration(this.getServiceDiscoveryConfig()));
}
@Override
public void stop() {
logger.debug("Stopping verticle - {}", this.getClass().getName());
if (null != discovery) {
List<Completable> completables = new ArrayList<>();
publishedRecords.forEach(record -> completables.add(discovery.rxUnpublish(record.getRegistration())));
Completable.merge(completables)
.subscribe(() -> {
logger.debug("All services have been un-published");
discovery.close();
});
}
}
/**
* Return Service discovery configure, the default implementation store them in verticle's config()
*
* @return
*/
protected JsonObject getServiceDiscoveryConfig() {
return this.config();
}
/**
* Return circuit breaker of service by service name
*
* @param serviceName the name of service
* @return
*/
protected final CircuitBreaker getCircuitBreaker(String serviceName) {
CircuitBreaker circuitBreaker = circuitBreakerMap.get(serviceName);
if (null == circuitBreaker) {
circuitBreaker = this.createCircuitBreaker(serviceName);
this.circuitBreakerMap.put(serviceName, circuitBreaker);
}
return circuitBreaker;
}
/**
* Create circuit breaker for service
*
* @param serviceName the name of service
* @return
*/
protected CircuitBreaker createCircuitBreaker(String serviceName) {
logger.debug("create circuit breaker for service {}", serviceName);
String circuitBreakerName = serviceName + "-" + "circuit-breaker";
CircuitBreakerOptions options = new CircuitBreakerOptions()
.setMaxFailures(5)
.setTimeout(5000)
.setResetTimeout(10000)
.setFallbackOnFailure(true);
return CircuitBreaker.create(circuitBreakerName, vertx, options)
.openHandler(v -> logger.debug("{} opened", circuitBreakerName))
.halfOpenHandler(v -> logger.debug("{} half opened", circuitBreakerName))
.closeHandler(v -> logger.debug("{} closed", circuitBreakerName));
}
/**
* Lookup ServiceDiscovery by service name and return an async HttpClient
*
* @param serviceName
* @return
*/
protected final Single<HttpClient> getServiceHttpClient(String serviceName) {
logger.debug("Get HTTP client by service name[{}]", serviceName);
Object serviceClient = this.serviceClientMap.get(serviceName);
if (null != serviceClient) {
return Single.just((HttpClient) serviceClient);
} else {
return HttpEndpoint.rxGetClient(discovery, new JsonObject().put(KEY_NAME, serviceName))
.doOnSuccess(httpClient -> this.serviceClientMap.put(serviceName, httpClient));
}
}
/**
* Publish a HttpEndPoint service to ServiceDiscovery
*
* @param config the configure of HttpEndPoint
* @return
*/
protected final Single<Record> publishHttpEndPoint(JsonObject config) {
String serviceName = config.getString(KEY_SERVICE_NAME);
String host = config.getString(KEY_HOST, "localhost");
Integer port = config.getInteger(KEY_PORT, 8080);
String root = config.getString(KEY_ROOT, "/");
return this.publishRecord(HttpEndpoint.createRecord(serviceName, host, port, root));
}
/**
* Publish a service record to ServiceDiscovery
*
* @param record record to publish
* @return
*/
private Single<Record> publishRecord(Record record) {
return discovery.rxPublish(record)
.flatMap(r -> {
logger.debug("Service {} has been published", record.getName());
this.publishedRecords.add(r);
return Single.just(r);
});
}
}
package com.example.common;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpMethod;
import io.vertx.reactivex.core.http.HttpClient;
import io.vertx.reactivex.core.http.HttpClientRequest;
import io.vertx.reactivex.ext.web.Router;
import io.vertx.reactivex.ext.web.RoutingContext;
import io.vertx.reactivex.ext.web.handler.CorsHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
/**
* The abstract verticle which work as HTTP server
*
* @author Gary Cheng
*/
public abstract class BaseHttpMicroServicesVerticle extends BaseMicroServicesVerticle {
private static final Logger logger = LoggerFactory.getLogger(BaseMicroServicesVerticle.class);
/**
* Enable CORS
*
* @param router
*/
protected void enableCorsSupport(Router router) {
Set<String> allowedHeaders = new HashSet<>();
allowedHeaders.add("x-requested-with");
allowedHeaders.add("Access-Control-Allow-Origin");
allowedHeaders.add("origin");
allowedHeaders.add("Content-Type");
allowedHeaders.add("accept");
CorsHandler corsHandler = CorsHandler.create("*").allowedHeaders(allowedHeaders);
Arrays.asList(HttpMethod.values()).stream().forEach(method -> corsHandler.allowedMethod(method));
router.route().handler(corsHandler);
}
protected void dispatchHttpRequest(RoutingContext context, String serviceName, Handler<? super Throwable> errorHandler) {
logger.debug("Dispatch Http Request {} to service {}", context.request().uri(), serviceName);
this.getCircuitBreaker(serviceName).rxExecuteCommandWithFallback(
future -> this.getServiceHttpClient(serviceName)
.subscribe(httpClient -> this.executeDispatchHttpRequest(context, httpClient, future),
throwable -> future.fail("Service [" + serviceName + "] not published")),
throwable -> this.circuitFallback(errorHandler, throwable))
.subscribe(v -> logger.debug("dispatch request completed"));
}
private Void circuitFallback(Handler<? super Throwable> errorHandler, Throwable throwable) {
errorHandler.handle(throwable);
return null;
}
private void executeDispatchHttpRequest(RoutingContext context, HttpClient httpClient, io.vertx.reactivex.core.Future<Void> future) {
logger.debug("executeDispatchHttpRequest, uri:{}", context.request().uri());
HttpClientRequest clientRequest = httpClient.request(context.request().method(), context.request().uri());
clientRequest.toFlowable()
.flatMap(response -> {
context.response().setStatusCode(response.statusCode());
response.headers().getDelegate().forEach(header -> context.response().putHeader(header.getKey(), header.getValue()));
return response.toFlowable();
})
.subscribe(buffer -> {
context.response().end(buffer);
future.complete();
}, future::fail);
context.request().headers().getDelegate().forEach(header -> clientRequest.putHeader(header.getKey(), header.getValue()));
if (context.user() != null) {
clientRequest.putHeader("user-principal", context.user().principal().encode());
}
if (null != context.getBody()) {
clientRequest.end(context.getBody());
} else {
clientRequest.end();
}
}
}
package com.example.discovery;
import com.example.common.BaseHttpMicroServicesVerticle;
import io.vertx.core.Future;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.ext.web.Router;
import io.vertx.reactivex.ext.web.RoutingContext;
import io.vertx.reactivex.ext.web.handler.BodyHandler;
import io.vertx.servicediscovery.rest.ServiceDiscoveryRestEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ApiGatewayVerticle extends BaseHttpMicroServicesVerticle {
private static final Logger logger = LoggerFactory.getLogger(ApiGatewayVerticle.class);
private static final int PORT = 8080;
private static final String PREFIX_SERVICE = "/service/";
public static void main(String[] args) throws InterruptedException {
Vertx.rxClusteredVertx(new VertxOptions().setClustered(true))
.flatMap(vertx -> vertx.rxDeployVerticle(ApiGatewayVerticle.class.getName()))
.subscribe(id -> logger.debug("GatewayVerticle deployed successfully"));
}
@Override
public void start(Future<Void> startFuture) throws Exception {
super.start();
Router router = Router.router(vertx);
this.enableCorsSupport(router);
ServiceDiscoveryRestEndpoint.create(router.getDelegate(), discovery.getDelegate());
this.configureRouter(router);
vertx.createHttpServer()
.requestHandler(router::accept)
.rxListen(PORT)
.subscribe(s -> {
logger.debug("Gateway HTTP server started on port 8080");
startFuture.complete();
}, startFuture::fail);
}
private void configureRouter(Router router) {
router.route().handler(BodyHandler.create());
router.route(PREFIX_SERVICE + "*").handler(this::serviceHandler);
String regexMatcher = "^(?!" + PREFIX_SERVICE + ").*";
router.routeWithRegex(regexMatcher).handler(this::indexHandler);
//router.route("/").handler(this::indexHandler);
}
private void indexHandler(RoutingContext context) {
logger.debug("Received http request");
context.response()
.putHeader("content-type", "application/json")
.end(new JsonObject().put("message", "Welcome to gateway").encodePrettily());
}
private void serviceHandler(RoutingContext context) {
String path = context.request().uri();
logger.debug("path:{}", path);
String serviceName = path.substring(PREFIX_SERVICE.length()).split("/")[0];
logger.debug("Service Name:{}", serviceName);
if (null == serviceName || serviceName.trim().equalsIgnoreCase("")) {
this.error(context, 500, "Empty service name");
} else {
this.dispatchHttpRequest(context, serviceName, error -> this.error(context, error));
}
}
private void error(RoutingContext context, Throwable throwable) {
this.error(context, 500, throwable.getMessage());
}
private void error(RoutingContext context, int statusCode, String errorMessage) {
if (!context.response().ended()) {
context.response().setStatusCode(statusCode).putHeader("content-type", "application/json")
.end(new JsonObject().put("statusCode", statusCode).put("error", errorMessage).encodePrettily());
}
}
}