Because most services and collectors deployed on DCAE platform relies on similar microservices a commmon Software Development Kit has been created. It contains utilities and clients which may be used when getting configuration from CBS, consuming messages from DMaaP, interacting with AAI, etc. SDK is written in Java.

Reactive programming

Most of SDK APIs are using Project Reactor, which is one of available implementations of Reactive Streams (as well as Java 9 Flow). This way we support both high-performance, non-blocking asynchronous clients and old-school, thread-bound, blocking clients. We believe that using reactive programming can solve many cloud-specific problems for us - if used properly.

Handling streams of data—especially “live” data whose volume is not predetermined—requires special care in an asynchronous system. The most prominent issue is that resource consumption needs to be controlled such that a fast data source does not overwhelm the stream destination. Asynchrony is needed in order to enable the parallel use of computing resources, on collaborating network hosts or multiple CPU cores within a single machine.

The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary—think passing elements on to another thread or thread-pool—while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, back pressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the communication of back pressure were synchronous (see also the Reactive Manifesto), therefore care has to be taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation.

from: http://www.reactive-streams.org/

Before using DCAE SDK, please take a moment and read Project Reactor documentation. You should also skim through methods available in Flux and Mono. It can save you many wasted minutes/hours of thinking how to implement the functionality by your own instead of using already implemented method.

However if you are not concerned about performance you can always take a blue pill and go back to blocking world by means of block* methods.

Rx short intro

For general introduction please read 3rd section of Reactor Documentation.

Some general notes:

Handling errors in reactive streams

As noted above a reactive stream (Flux/Mono) terminates on first exception in any of the stream operators. For instance if Flux.map throws an exception, downstream operators won't receive onNext event. onError event will be propagated instead. It is a terminal event so the stream will be finished. This fail-fast behavior is a reasonable default but sometimes you will want to avoid it. For instance when polling for the updates from a remote service you may want to retry the call when the remote service is unavailable at a given moment. In such cases you might want to retry the stream using one of retry* operators.

// Simple retry on error with error type check/
// It will immediately retry stream failing with IOException
public Mono<String> fetchSomething() {
	Mono<Response> restResponse = ...
	return restResponse
			.retry(ex -> ex instanceof IOException)
			.map(resp -> ...);
}

// Fancy retry using reactor-extra library
// It will retry stream on IOException after some random time as specified in randomBackoff JavaDoc
public Mono<String> fetchSomething() {
	Mono<Response> restResponse = ...
	Retry retry = Retry.anyOf(IOException.class)
                 .randomBackoff(Duration.ofMillis(100), Duration.ofSeconds(60));
	return restResponse
			.retryWhen(retry)
			.map(resp -> ...);
}


Artifacts

Current version


<properties>
  <sdk.version>1.1.5</sdk.version>
</properties>

Maven dependencies

Choose one or more depending on what you need.

<dependencies>
  <dependency>
    <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
    <artifactId>cbs-client</artifactId>
    <version>${sdk.version}</version>
  </dependency>

  <dependency>
    <groupId>org.onap.dcaegen2.services.sdk.security.crypt</groupId>
    <artifactId>crypt-password</artifactId>
    <version>${sdk.version}</version>
  </dependency>

  <dependency>
    <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
    <artifactId>dmaap-client</artifactId>
    <version>${sdk.version}</version>
  </dependency>

  <dependency>
    <groupId>org.onap.dcaegen2.services.sdk</groupId>
    <artifactId>hvvesclient-producer-api</artifactId>
    <version>${sdk.version}</version>
  </dependency>
  <dependency>
    <groupId>org.onap.dcaegen2.services.sdk</groupId>
    <artifactId>hvvesclient-producer-impl</artifactId>
    <version>${sdk.version}</version>
    <scope>runtime</scope>
  </dependency>

 <!-- more to go -->

</dependencies>


Available APIs


cbs-client - a Config Binding Service client

You can use CbsClientFactory to lookup for CBS in your application. Returned CbsClient can then be used to get a configuration, poll for configuration or poll for configuration changes.

The following CBS endpoints are supported by means of different CbsRequests:

Sample usage:

// Generate RequestID and InvocationID which will be used when logging and in HTTP requests
final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
final CbsRequest request = CbsRequests.getConfiguration(diagnosticContext);

// Read necessary properties from the environment
final EnvProperties env = EnvProperties.fromEnvironment();

// Create the client and use it to get the configuration
CbsClientFactory.createCbsClient(env)
        .flatMap(cbsClient -> cbsClient.get(request))
        .subscribe(
                jsonObject -> {
                    // do a stuff with your JSON configuration using GSON API
                    final int port = Integer.parseInt(jsonObject.get("collector.listen_port").getAsString());
                    // ...
                },
                throwable -> {
                    logger.warn("Ooops", throwable);
                });

Note that a subscribe handler can/will be called in separate thread asynchronously after CBS address lookup succeeds and CBS service call returns a result.

If you are interested in calling CBS periodically and react only when the configuration changed you can use updates method:

// Generate RequestID and InvocationID which will be used when logging and in HTTP requests
final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
final CbsRequest request = CbsRequests.getConfiguration(diagnosticContext);

// Read necessary properties from the environment
final EnvProperties env = EnvProperties.fromEnvironment();

// Polling properties
final Duration initialDelay = Duration.ofSeconds(5);
final Duration period = Duration.ofMinutes(1);

// Create the client and use it to get the configuration
CbsClientFactory.createCbsClient(env)
        .flatMapMany(cbsClient -> cbsClient.updates(request, initialDelay, period))
        .subscribe(
                jsonObject -> {
                    // do a stuff with your JSON configuration using GSON API
                    final int port = Integer.parseInt(jsonObject.get("collector.listen_port").getAsString());
                    // ...
                },
                throwable -> {
                    logger.warn("Ooops", throwable);
                });

The most significant change is in line 14. We are using flatMapMany since we want to map one CbsClient to many JsonObject updates. After 5 seconds CbsClient will call CBS every minute. If the configuration has changed it will pass the JsonObject downstream - in our case consumer of JsonObject will be called.

Parsing streams' definitions

CBS configuration response contains various service-specific entries. It also contains a standardized DCAE streams definitions as streams_publishes and streams_subscribes JSON objects. CBS Client API provides a way of parsing this part of configuration so you can use Java objects instead of low-level GSON API.

Because streams definitions are a simple value objects we were not able to provide you a nice polymorphic API. Instead you have 2-level API at your disposal:

Sample usage:

final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();

CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment())
    .flatMapMany(cbsClient -> cbsClient.updates(request, Duration.ofSeconds(5), Duration.ofMinutes(1)))
    .map(DataStreams::namedSinks)
    .map(sinks -> sinks.filter(StreamPredicates.streamOfType(MESSAGE_ROUTER)).map(mrSinkParser::unsafeParse).toList())
    .subscribe(
        mrSinks -> mrSinks.forEach(mrSink -> {
            logger.info(mrSink.name()); // name = the configuration key
            logger.info(mrSink.aafCredentials().username()); // = aaf_username
            logger.info(mrSink.topicUrl());
            // ...
        }),
        throwable -> logger.warn("Ooops", throwable)
 );

For details and sample usage please refer to JavaDoc and unit and integration tests. Especially CbsClientImplIT, MessageRouterSinksIT and  MixedDmaapStreamsIT might be useful.

INFO



crypt-password - an utility for BCrypt passwords

Library to generate and match cryptography password using BCrypt algorithm

java -jar crypt-password-${sdk.version}.jar password_to_crypt

$2a$10$iDEKdKknakPqH5XZb6wEmeBP2SMRwwiWHy8RNioUTNycIomjIqCAO

Can be used like maven dependency to match generated password.



dmaap-client - a DMaaP MR client

After parsing CBS sink definitions you will get a Source or Sink value object. It can be then directly used to communicate with DMaaP Message Router REST API.

Writing message publisher

final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher();
final MessageRouterSink sinkDefinition; //... Sink definition obtained by parsing CBS response
final MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder()
        .sinkDefinition(sinkDefinition)
        .build();

Flux.just(1, 2, 3)
        .map(JsonPrimitive::new)
        .transform(input -> publisher.put(request, input))
        .subscribe(resp -> {
                    if (resp.successful()) {
                        logger.debug("Sent a batch of messages to the MR");
                    } else {
                        logger.warn("Message sending has failed: {}", resp.failReason());
                    }
                },
                ex -> {
                    logger.warn("An unexpected error while sending messages to DMaaP", ex);
                });

Note that we are using Reactor transform operator. As an alternative you could assign Flux of JSON values to the variable and then invoke publisher.put on it. The important performance-related thing to remember is that you should feed the put method with a stream of messages instead of multiple calls with single messages. This way the client API will be able to send them in batches which should significantly improve performance (at least on transfer level).

Writing message subscriber

final MessageRouterSource sourceDefinition; //... Source definition obtained by parsing CBS response
final MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder()
        .sinkDefinition(sourceDefinition)
        .build();

cut.subscribeForElements(request, Duration.ofMinutes(1))
        .map(JsonElement::getAsJsonObject)
        .subscribe(json -> {
                // application logic
            },
            ex -> {
                logger.warn("An unexpected error while receiving messages from DMaaP", ex);
            });



hvvesclient-producer - a reference Java implementation of High Volume VES Collector client

This library is used in xNF simulator which helps us test HV VES Collector in CSIT tests. You may use it as a reference when implementing your code in non-JVM language or directly when using Java/Kotlin/etc.

Sample usage:

final ProducerOptions producerOptions = ImmutableProducerOptions.builder()
        .collectorAddresses(HashSet.of(
                InetSocketAddress.createUnresolved("dcae-hv-ves-collector", 30222)))
        .build();
final HvVesProducer hvVesProducer = HvVesProducerFactory.create(producerOptions);

Flux<VesEvent> events; // ...
Mono.from(hvVesProducer.send(events))
        .doOnSuccess(() -> logger.info("All events has been sent"))
        .doOnError(ex -> logger.warn("Failed to send one or more events", ex))
        .subscribe();