You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 13 Next »

Because most services and collectors deployed on DCAE platform relies on similar microservices a commmon Software Development Kit has been created. It contains utilities and clients which may be used when getting configuration from CBS, consuming messages from DMaaP, interacting with AAI, etc. SDK is written in Java.

NOTE For now only Config Binding Service client has reached a stable API level. More are expected to be done in Dublin.

NOTE This page is a work in progress.

Reactive programming

Most of SDK APIs are using Project Reactor, which is one of available implementation of Reactive Streams (as well as Java 9 Flow). This way we support both high-performance, non-blocking asynchronous clients and old-school, thread-bound, blocking clients. We believe that using reactive programming can solve many cloud-specific problems for us - if used properly.

Handling streams of data—especially “live” data whose volume is not predetermined—requires special care in an asynchronous system. The most prominent issue is that resource consumption needs to be controlled such that a fast data source does not overwhelm the stream destination. Asynchrony is needed in order to enable the parallel use of computing resources, on collaborating network hosts or multiple CPU cores within a single machine.

The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary—think passing elements on to another thread or thread-pool—while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, back pressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the communication of back pressure were synchronous (see also the Reactive Manifesto), therefore care has to be taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation.

from: http://www.reactive-streams.org/

Before using DCAE SDK, please take a moment and read Project Reactor documentation. You should also skim through methods available in Flux and Mono. It can save you many wasted minutes/hours of thinking how to implement the functionality by your own instead of using already implemented method.

However if you are not concerned about performance you can always take a blue pill and go back to blocking world by means of block* methods.

Rx short intro

For general introduction please read 3rd section of Reactor Documentation.

Some general notes:

  • In Project Reactor you have two reactive streams' types at your disposal: Mono which may emit at most 1 element and Flux which may emit 0, finite or infinite number of elements.
  • Both of them may end with error. In such situation the stream ends immediately. After stream is terminated (normally or because of error) it won't emit any new elements. You may use retry operators to resubscribe to events in case of error. In cloud environment retryWhen is especially usable: you may use it together with reactor-extra retry functionality in order to support more advanced reaction to unreachable peer microservice.
  • If you do not have any background in functional operators like map, flatMap,  please take a time to understand them. The general meaning is similar as in Java 8 Streams API. They are the most common operators used in reactive applications. Especially flatMap is very powerful despite its simplicity.
  • There is a large group of operators which deal with time dimension of the stream, eg. buffer, window,  delay*, timeout etc.
  • Be aware that calling aggregation operators (count, collect, etc.) on infinite Flux makes no sense. In worst case scenario you can end JVM with OoM error.
  • There is a nice intro to operators in Appendix A of Reactor Documentation. You should also learn how to read Marble Diagrams which concisely describe operators in a graphical form. Fortunately they are quite easy to grasp.
  • Do not block in any of handlers which are passed to operators defined by Reactor. The library uses a set of Schedulers (think thread-pools) which are suitable for different jobs. More details can be found in the documentation. If possible try to use non-blocking APIs.
  • Most of operators support back-pressure. That means that a demand for new messages will be signalized from downstream subscribers. For instance if you have a flux.flatMap(this::doThis).map(this::doThat).subscribe() then if doThis is very slow it will not request many items from source flux and it will emit items at it's own pace for doThat to process. So usually there will be no buffering nor blocking needed between flux and doThis.
  • (Almost) nothing will happen without subscribing to the Flux/Mono. These reactive streams are lazy, so the demand will be signaled only when subscription is being made ie. by means of subscribe or block* methods.
  • If you are going to go fully-reactive then you should probably not call subscribe/block anywhere in your code. For instance, when using Reactor Netty or Spring Web Flux you should return Mono/Flux from your core methods and it will be subscribed somewhere by the library you are using.
  • Return Mono<Void> in case you want to return from the method a listener to some processing being done. You may be tempted to return Disposable (result of subscribe()) but it won't compose nicely in reactive flow. Using then() operator is generally better as you can flatMap over it in the client code.

Artifacts

Current version


<properties>
  <sdk.version>1.1.3</sdk.version>
</properties>

Maven dependencies

Choose one or more depending on what you need.

<dependencies>
  <dependency>
    <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
    <artifactId>cbs-client</artifactId>
    <version>${sdk.version}</version>
  </dependency>

  <dependency>
    <groupId>org.onap.dcaegen2.services.sdk.security.crypt</groupId>
    <artifactId>crypt-password</artifactId>
    <version>${sdk.version}</version>
  </dependency>

 <!-- more to go -->

</dependencies>


Available APIs

cbs-client - a Config Binding Service client

You can use CbsClientFactory to lookup for CBS in your application. Returned CbsClient can then be used to get a configuration, poll for configuration or poll for configuration changes.

Sample usage:

// Generate RequestID and InvocationID which will be used when logging and in HTTP requests
RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();

// Read necessary properties from the environment
final EnvProperties env = EnvProperties.fromEnvironment();

// Create the client and use it to get the configuration
CbsClientFactory.createCbsClient(env)
        .flatMap(cbsClient -> cbsClient.get(diagnosticContext))
        .subscribe(
                jsonObject -> {
                    // do a stuff with your JSON configuration using GSON API
                    final int port = Integer.parseInt(jsonObject.get("collector.listen_port").getAsString());
                    // ...
                },
                throwable -> {
                    logger.warn("Ooops", throwable);
                });

Note that a subscribe handler can/will be called in separate thread asynchronously after CBS address lookup succeeds and CBS service call returns a result.

If you are interested in calling CBS periodically and react only when the configuration changed you can use updates method:

// Generate RequestID and InvocationID which will be used when logging and in HTTP requests
RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();

// Read necessary properties from the environment
final EnvProperties env = EnvProperties.fromEnvironment();

// Polling properties
final Duration initialDelay = Duration.ofSeconds(5);
final Duration period = Duration.ofMinutes(1);

// Create the client and use it to get the configuration
CbsClientFactory.createCbsClient(env)
        .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period))
        .subscribe(
                jsonObject -> {
                    // do a stuff with your JSON configuration using GSON API
                    final int port = Integer.parseInt(jsonObject.get("collector.listen_port").getAsString());
                    // ...
                },
                throwable -> {
                    logger.warn("Ooops", throwable);
                });

The most significant change is in line 13. We are using flatMapMany since we want to map one CbsClient to many JsonObject updates. After 5 seconds CbsClient will call CBS every minute. If the configuration has changed it will pass the JsonObject downstream - in our case consumer of JsonObject will be called.

crypt-password - an utility for BCrypt passwords

TODO

  • No labels