package com.ezshop.common;
import io.reactivex.Single;
import io.vertx.circuitbreaker.CircuitBreakerOptions;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.circuitbreaker.CircuitBreaker;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.http.HttpClient;
import io.vertx.reactivex.ext.web.client.HttpRequest;
import io.vertx.reactivex.ext.web.client.HttpResponse;
import io.vertx.reactivex.ext.web.client.WebClient;
import io.vertx.reactivex.servicediscovery.ServiceDiscovery;
import io.vertx.reactivex.servicediscovery.types.HttpEndpoint;
import io.vertx.servicediscovery.Record;
import io.vertx.servicediscovery.ServiceDiscoveryOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static com.ezshop.common.ConfigKeys.*;
/**
* The base verticle class which provided some common functions include service discovery
*
* @author Gary Cheng
*/
public abstract class BaseMicroServicesVerticle extends AbstractVerticle {
protected static final String KEY_NAME = "name";
private static final Logger logger = LoggerFactory.getLogger(BaseMicroServicesVerticle.class);
protected ServiceDiscovery discovery;
private Record publishedRecord;
private Map<String, CircuitBreaker> circuitBreakerMap = new ConcurrentHashMap<>();
private Map<String, Object> serviceEndPointMap = new ConcurrentHashMap<>();
@Override
public void start() {
logger.debug("Starting verticle - {}", this.getClass().getName());
logger.debug("Config:{}", this.config().encodePrettily());
this.discovery = ServiceDiscovery.create(vertx, new ServiceDiscoveryOptions().setBackendConfiguration(this.getServiceDiscoveryConfig()));
}
@Override
public void stop() {
logger.debug("Stopping verticle - {}", this.getClass().getName());
this.serviceEndPointMap.keySet().stream().forEach(serviceName -> this.closeCachedEndPoint(serviceName));
this.serviceEndPointMap.clear();
this.circuitBreakerMap.values().stream().forEach(circuitBreaker -> circuitBreaker.close());
this.circuitBreakerMap.clear();
this.unpublishRecord().subscribe(b -> discovery.close());
}
/**
* Return Service discovery configure, the default implementation store them in verticle's config()
*
* @return Service discovery configure
*/
protected JsonObject getServiceDiscoveryConfig() {
return this.config();
}
/**
* Return circuit breaker of service by service name
*
* @param serviceName the name of service
* @return circuit breaker of service
*/
protected final CircuitBreaker getCircuitBreaker(String serviceName) {
logger.debug("Get CircuitBreaker of service {}", serviceName);
CircuitBreaker circuitBreaker = circuitBreakerMap.get(serviceName);
if (null == circuitBreaker) {
circuitBreaker = this.createCircuitBreaker(serviceName);
this.circuitBreakerMap.put(serviceName, circuitBreaker);
}
return circuitBreaker;
}
/**
* Create circuit breaker for service
*
* @param serviceName the name of service
* @return circuit breaker of service
*/
protected CircuitBreaker createCircuitBreaker(String serviceName) {
logger.debug("Create CircuitBreaker for service {}", serviceName);
String circuitBreakerName = serviceName + "-" + "circuit-breaker";
CircuitBreakerOptions options = new CircuitBreakerOptions()
.setMaxFailures(5)
.setTimeout(5000)
.setResetTimeout(10000)
.setFallbackOnFailure(true);
return CircuitBreaker.create(circuitBreakerName, vertx, options)
.openHandler(v -> {
logger.debug("{} opened", circuitBreakerName);
this.closeCachedEndPoint(serviceName);
})
.halfOpenHandler(v -> logger.debug("{} half opened", circuitBreakerName))
.closeHandler(v -> logger.debug("{} closed", circuitBreakerName));
}
/**
* Return an async HttpClient by service name
*
* @param serviceName the name of service
* @return
*/
protected final Single<HttpClient> getHttpEndPoint(String serviceName) {
logger.debug("Get HTTP client by service name[{}]", serviceName);
Object endPoint = this.serviceEndPointMap.get(serviceName);
if (null != endPoint && endPoint instanceof HttpClient) {
return Single.just((HttpClient) endPoint);
} else {
return HttpEndpoint.rxGetClient(discovery, new JsonObject().put(KEY_NAME, serviceName))
.doOnSuccess(httpClient -> this.serviceEndPointMap.put(serviceName, httpClient));
}
}
/**
* Return an async HttpClient by service name
*
* @param serviceName the name of service
* @return
*/
protected final Single<WebClient> getWebEndPoint(String serviceName) {
logger.debug("Get Web client by service name[{}]", serviceName);
Object endPoint = this.serviceEndPointMap.get(serviceName);
if (null != endPoint && endPoint instanceof WebClient) {
return Single.just((WebClient) endPoint);
} else {
logger.debug("Getting Web client from ServiceDiscovery...");
return HttpEndpoint.rxGetWebClient(discovery, new JsonObject().put(KEY_NAME, serviceName))
.doOnSuccess(webClient -> this.serviceEndPointMap.put(serviceName, webClient));
}
}
/**
* Publish a HttpEndPoint service to ServiceDiscovery
*
* @param serviceName the name of service to be published
* @param config the configure of HttpEndPoint
* @return
*/
protected final Single<Record> publishHttpEndPoint(String serviceName, JsonObject config) {
String host = config.getString(KEY_HOST, "localhost");
Integer port = config.getInteger(KEY_PORT, 8080);
String root = config.getString(KEY_ROOT, "/");
logger.debug("publishHttpEndPoint service:{}, host:{}, port:{}, root:{}", serviceName, host, port, root);
return this.publishRecord(HttpEndpoint.createRecord(serviceName, host, port, root));
}
/**
* Publish a service record to ServiceDiscovery
*
* @param record record to publish
* @return
*/
protected Single<Record> publishRecord(Record record) {
return this.unpublishRecord()
.flatMap(b -> discovery.rxPublish(record))
.doOnSuccess(r -> this.publishedRecord = r);
}
/**
* Invoke a restful service by service name
*
* @param serviceName the name of service
* @param method HTTP method
* @param uri uri of request
* @param body body of request
* @return result as JsonObject
*/
protected Single<JsonObject> invokeRestfulService(String serviceName, HttpMethod method, String uri, JsonObject body) {
logger.debug("invokeRestfulService, service name:{}, uri:{}", serviceName, uri);
return this.getCircuitBreaker(serviceName).rxExecuteCommand(future -> this.getWebEndPoint(serviceName).subscribe(
webClient -> {
HttpRequest request = webClient.request(method, uri);
Single<HttpResponse<Buffer>> result;
if (null == body) {
result = request.rxSend();
} else {
result = request.rxSendJsonObject(body);
}
result.map(HttpResponse::bodyAsJsonObject).subscribe(future::complete, future::fail);
},
throwable -> future.fail("Service [" + serviceName + "] not found"))
);
}
/**
* Invoke a restful service by given host and port
*
* @param method HTTP method
* @param port port of EndPoint
* @param host host of EndPoint
* @param uri uri of request
* @param body body of request
* @return
*/
protected Single<JsonObject> invokeRestful(HttpMethod method, int port, String host, String uri, JsonObject body) {
logger.debug("invokeRestfulService, host:{}, port:{}, uri:{}", host, port, uri);
HttpRequest<Buffer> request = WebClient.create(vertx).request(method, port, host, uri);
Single<HttpResponse<Buffer>> result;
if (null == body) {
result = request.rxSend();
} else {
result = request.rxSendJsonObject(body);
}
return result.map(HttpResponse::bodyAsJsonObject);
}
private Single<Boolean> unpublishRecord() {
return Single.create(emitter -> {
if (null != this.publishedRecord) {
discovery.rxUnpublish(this.publishedRecord.getRegistration())
.subscribe(() -> {
logger.debug("Service {} unpublished", this.publishedRecord.getName());
this.publishedRecord = null;
emitter.onSuccess(true);
}, emitter::onError);
} else {
emitter.onSuccess(true);
}
});
}
private void closeCachedEndPoint(String serviceName) {
Object endPoint = this.serviceEndPointMap.get(serviceName);
if (null != endPoint) {
logger.debug("Close cached EndPoint for service {}", serviceName);
if (endPoint instanceof WebClient) {
((WebClient) endPoint).close();
} else if (endPoint instanceof HttpClient) {
((HttpClient) endPoint).close();
}
this.serviceEndPointMap.remove(serviceName);
}
}
}