Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Warning

This spike is a WIP

Table of Contents


Jira
serverONAP Jira
serverId425b2b0a-557c-3c0c-b515-579789cceedb
keyCPS-821

...

Jira
serverONAP Jira
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution,subtasks
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution,subtasks
maximumIssues20
jqlQuerykey = CPS-828
serverId425b2b0a-557c-3c0c-b515-579789cceedb

Issues/Decisions

...


ColorMeaning

New/Open

Reopened

Agreed/Closed


To facilitate asynchronous requests to DMI we will need to either create a new endpoint or modify existing endpoint to include /async flag. The second solution may not be backwards compatible. However creating a new endpoint solely for a flag is also not ideal. We could add async to list of options (but this might interfere with the purpose of /options.

Additionally, considered adding a new endpoint for async which simply re-routes the response to the original endpoint while adding the logic for OK response to the client. However, would this lead to a change in the schema? If so, would this be backwards compatible?

#IssueDecisionNotes/JiraDecision
1What topic to use for client?To be supplied by cientTopic provided by client as a parameter which will be injected into our environment and used for asynchronous requests sent back to client.2What topic to use for private DMI-NCMP?e.g. ncmp-async-private but decision needs to be made with current best practices.3Are adding a new REST endpoint for async or modifying an existing endpoint?/ncmp/v1/data/ch/123ee5/ds/ncmp-datastore:*?topic=<topic-name> 
Jira
serverONAP Jira
serverId425b2b0a-557c-3c0c-b515-579789cceedb
keyCPS-830

4Agree URL for async once #2 is clarified

/ncmp/v1/data/ch/123ee5/ds/ncmp-datastore:*?topic=<topic-name> 

CPS R10 Release Planning#NCMPRequirements #11. Based on this additional path parameter we no longer require additional /async flag in url.5Passthrough request need to be able to handle different response types (using accept header) but the async option would have a fixed and possibly different response type.We should, by default, be able to accept multiple contnet-types.CPS R10 Release Planning#NCMPRequirements #11.6Should we create a standalone app to demo or are tests sufficient?Yes, standalone app would be helpful, along with traditional tests.CSIT tests may require more involved effort - perhaps we could add standalone app to nexus and use it as part of CSIT test?7

Do we need to persist the generated requestID?

NoWe should be be stateless8What mechanism should we use for the payload? Event?, Header?, This needs clarification9Error ReportingOut of scope10CSIT TestingWill this be required to add to CSIT tests? If so, client app will need to be created and stored in gerrit for cloning during setup phase. 11

Async Request Option using Messaging

Out of scopeSee:  AsyncRequestOptionusingMessaging

Proposed Diagram

draw.io Diagram
bordertrue
diagramNameCPS-821
simpleViewerfalse
width1000
linksauto
tbstyletop
lboxtrue
diagramWidth1209
revision12

Kafka config & Implementation

Example Kafka Consumer Implementation from CPS-Temporal

The below code snippet taken from cps-temporal can be used in the same way in NCMP to listen to message from DMI substituting the topics and errorHandler

Code Block
languagejava
titleExample Kafka Consumer Implementation
collapsetrue
 /**
     * Consume the specified event.
     *
     * @param cpsDataUpdatedEvent the data updated event to be consumed and persisted.
     */
    @KafkaListener(topics = "${app.listener.data-updated.topic}", errorHandler = "dataUpdatedEventListenerErrorHandler")
    public void consume(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {

        log.debug("Receiving {} ...", cpsDataUpdatedEvent);

        // Validate event envelop
        validateEventEnvelop(cpsDataUpdatedEvent);

        // Map event to entity
        final var networkData = this.cpsDataUpdatedEventMapper.eventToEntity(cpsDataUpdatedEvent);
        log.debug("Persisting {} ...", networkData);

        // Persist entity
        final var persistedNetworkData = this.networkDataService.addNetworkData(networkData);
        log.debug("Persisted {}", persistedNetworkData);

    }

Example Kafka Consumer Config from CPS-Temporal

  Update: Toine Siebelink had a conversation with Tony Finnerty it has been agreed CPS is to configure a (temporary) dedicated client-topic as final destination for the async replies: we now want to propose to use a common ‘cps-broadcast’ topic which also will used for broadcasting messages like cm handles being added o removed etc.

Still it is only a temporary solution until the client have a better way of declaring their own topics. It does not affect our proposed interfaces, just the name of the topic that will be accepted by NCMP (for now)

 ---------------------------------------------------------------------------------------

  Update: After discussion with Fiachra Corcoran regarding Kafka-strimzi it has been agreed that we will define a topic for client responses in advance (defined upfront). After release K it is envisioned that auto creation of topics will be disbaled by default thus the client will have to use our predefined topic.

Agreement on topic name outstanding 

---------------------------------------------------------------------------------------

To be supplied by cient

2What topic to use for private DMI-NCMP?

Contact Fiachra Corcoran regarding ONAP conventions.

Response was that there aren't any conventions to speak of but we would use dashes (i.e. my-new-topic) instead of dot notation (i.e. my.new.topic) for topic name

ncmp-async-m2m

3Are adding a new REST endpoint for async or modifying an existing endpoint?

To facilitate asynchronous requests to DMI we will need to either create a new endpoint or modify existing endpoint to include /async flag. The second solution may not be backwards compatible. However creating a new endpoint solely for a flag is also not ideal. We could add async to list of options (but this might interfere with the purpose of /options.

Additionally, considered adding a new endpoint for async which simply re-routes the response to the original endpoint while adding the logic for OK response to the client. However, would this lead to a change in the schema? If so, would this be backwards compatible?

Jira
serverONAP Jira
serverId425b2b0a-557c-3c0c-b515-579789cceedb
keyCPS-830

/ncmp/v1/data/ch/123ee5/ds/ncmp-datastore:*?topic=<topic-name> 
4Agree URL for async once #2 is clarifiedCPS R10 Release Planning#NCMPRequirements #11. 

/ncmp/v1/data/ch/123ee5/ds/ncmp-datastore:*?topic=<topic-name> 


5Passthrough request need to be able to handle different response types (using accept header) but the async option would have a fixed and possibly different response type.CPS R10 Release Planning#NCMPRequirements #11.The async response can 'wrap' the native response inside a json object identical to the synchronous response
6Should we create a standalone app to demo or are tests sufficient?

CSIT tests may require more involved effort - perhaps we could add standalone app to nexus and use it as part of CSIT test?

See #13
7

Do we need to persist the generated requestID?

We should be be statelessNo - Further disc
8Error Reporting - Topic Correctness/AvailabilityAt a minimum we should report to the client if a topic was not found or if the topic name was incorrectIn Scope
9Error Reporting - Kafka IssuesIssues such full buffer/queue, drop messages, failure not in scopeOut of scope
10

Async Request Option using Messaging

See:  https://wiki.onap.org/display/DW/CPS-821+Spike%3A+Support+Async+read-write+operations+on+CPS-NCMP+interface#CPS821Spike:SupportAsyncreadwriteoperationsonCPSNCMPinterface-AsyncRequestOptionusingMessaging(OutofScope)Out of scope
11Do we actually require futures in this implementation proposal?

It could be argued that the need for futures is made redundant by the fact we call dmi from ncmp through rest and the response will be consumed via Kafka.

What benefit would future give us in this case? 

Not needed
12ID GenerationWhich mechanism to use? Look at CPS-Temporal and follow to keep consistencySee: https://wiki.onap.org/display/DW/CPS-821+Spike%3A+Support+Async+read-write+operations+on+CPS-NCMP+interface#CPS821Spike:SupportAsyncreadwriteoperationsonCPSNCMPinterface-CanRobotFrameworkverifyKafkaEvents
13Can robot framework verify if Kafka events have been sent/received

This would be less work and overhead (rather than creating/.maintaining client app)

Will need to verify if 3PP libraries are safe to introduce into codebase. If so, what is the process? Do they need to be FOSSed?

Integration testing should be carried out by a client of NCMP. Demo can be performed up the point NCMP produces message for the client.
14Can Webflux do this work with less code/impl?Sourabh Sourabh suggested using this to compliment our existing approach. By adding webflux we add an event loop to synchronize and access I/O connections to the database.No, It will compliment the design by adding an event loop for I/O synchronization and access. See: CPS-850
15ONAP may be deprecating PLAINTEXT for Kafka. Strimzi Kafka might need to be usedNo relevant information could be found relating to this. See: https://wiki.onap.org/display/DW/CPS-821+Spike%3A+Support+Async+read-write+operations+on+CPS-NCMP+interface#CPS821Spike:SupportAsyncreadwriteoperationsonCPSNCMPinterface-KafkaStrimziInvestigation

The underlying implementation won't be affected. The config will contain relevant configuration for protocol (e.g. PLAINTEXT, SASL) and connections. 

This information needs to be made configurable when implementing.

16Should we have something like the message count in the response or an indication of the last message at least.?

On the topic=Xyz async analysis, One item I forgot to mention was when a client might know when they have received the last response message for a request.  

If async there may be many responses (otherwise there's no need for async) 

Therefore should it not have something like the message count in it or an indication of the last message at least. 

This is another step in an evolution of the product. Multiple response messages might become a necessity for asycnc bulk operations and maybe that is when we can discuss it in more detail. I don’t think it has to affect our current implementation additional field to handle that can always be added in a backward compatible way.

17Do we need Kafka messaging between DMI and NCMP?This was discussed as it was argued that the client and CPS ecosystem is decoupled and async using only bus between ncmp and client. However, this would not rectify blocking calls between ncmp and dmi.It was decided to have two separate kafka topics for client → ncmp & ncmp → dmi
18

Can NCMP list all existing topics

Jira
serverONAP Jira
serverId425b2b0a-557c-3c0c-b515-579789cceedb
keyCPS-828

AC Point 3 : Topic does not exists (not allowed to create new topics, assuming NCMP can 'list' existing topics)

Gareth Roper Need your suggestion.


19

Jira
serverONAP Jira
serverId425b2b0a-557c-3c0c-b515-579789cceedb
keyCPS-828

Async: NCMP Rest impl. including Request ID generation

AC Points : 

2. cm handle does not exists

3. No DMI registration for CM Handle

AC point 2 : We agreed to check only if cm handle exists. If cm does not exists will get Http status 404 as below.

{
    "status": "404 NOT_FOUND",
    "message": "DataNode not found",
    "details": "DataNode with xpath /dmi-registry/cm-handles[@id='my-cm-handle1'] was not found for anchor ncmp-dmi-registry and dataspace NCMP-Admin."
}

AC point 3 . Is not valid.

20Client Kafka User & Privileges

  After discussion with Fiachra Corcoran regarding Kafka-strimzi we will need to ensure client can access Kafka user and be allocated sufficient privileges (principle of least privilege)


21agree client response format 

Message gets forwarded to original Client Topic

22Async Ack

{
    "headers": {},
    "body": {
        "requestId""f1aa75c6-e9ad-4072-bf19-659a66b3e0ff"
    },
    "statusCode""OK",
    "statusCodeValue"200
}

Agreed & implemented

Proposed Design

...

draw.io Diagram
bordertrue
diagramNameCPS-821
simpleViewerfalse
width1000
linksauto
tbstyletop
diagramDisplayNameProposed Design
lboxtrue
diagramWidth1256
revision25


High-level Steps/Possible Tickets:

  1. Modify REST endpoint to include param topic (1)
  2. Add logic to send response and request (2a)
  3. Send request if async is implemented or NotSupportedException if not (2b
  4. Add producer to DMI (implementation and config) (31 & 3b)
  5. Add consumer to NCMP (implementation and config) (4a)
  6. Add Producer to NCMP (implementation and config) (4b)
  7. Demo & Test (5)


Alternative Approaches


#PersonApproach
1

[Relates to point 3 from high-level steps]

It would be good to have async implementation only dependent on NCMP. So that it does not break or work with only few DMI plugins. 
I think a DMI plugin independent async flow would be like 
1. Once request is received, check if  dmi plugin exist. If exists, generate request uuid, send the message to the Kafka topic (or store it in DB), and send an accepted response back to client.
2. NCMP will listen to the same topic or read from the DB. Process it by calling appropriate DMI Plugins synchronously. After it receives the response back, it sends to to Kafka topic provided in the input by user. 

In this implementation, we are not dependent on the DMI implementation. 
We do need persistence for storing async request. It can be a stored in DB or Kafka, it does not matter as long as we can read it in the same order. 

The main points are
1. No dependency on the DMI for async behavior

2. No async request get lost because of pod restart or JVM crash. 


Proposed Message Format

...


Info
iconfalse

Legend

ColorMeaning

NCMP standard event schema

Event payload schema as per org.onap.ncmp:async-request-response-event:v1

Response payload schema.  The payload schema is an onap one if non-passthrough but may be either an onap published or other (oran, proprietary, etc) schema if the original call received by the dmi plugin is a ncmp:passthrough-*.


SCOPE OF THE WORK :  the black highlighted parts of the above events. 
The blue highlight is not in current scope.  The blue bits will be up to the dmi plugins.
In the future these blue bits may become part of the standard onap base release.

Success Scenario


Info
iconfalse
titleDMI Event sent to NCMP

{

  "eventId"                    : "9999",                                                                      # some generic event uuid generated by DMI Plugin
  “eventCorrelationId” : “request-009995”,                                                      # for event correlation - acts as the request Id when
                                                                                                                        # eventType=org.onap.ncmp.async-request-response-event
  "eventTime"             : "2021-11-16T16:42:25-04:00", (kieran mccarthy is this format agreed?)
  "eventSource"          : "org.onap.ncmp",                                                        # ??? Is this needed? Joseph Keenan will use dmiAsyncRequestResponseEvent.setEventSource(Application.class.getPackageName());  

  "eventType"             : "org.onap.ncmp.async-request-response-event",       # event type for async request response events (kieran mccarthy Who owns this schema? How do the schemas relate?)
  ”eventSchema”        : “org.onap.ncmp:async-request-response-event:v1",  # event schema for async request response events  (who owns this? table with owner, repo, color coded)
  “eventTarget”            : “topic:client-application-topic”,                                 # indicates where to route the event - ‘topic’ is default target type (kieran mccarthy will it be in format of topic:<topic> always or could it be extended e.g. ip:<ip-address>?)


  "event": {
     “response-data-schema” : “org.onap.cps.yang-patch-json:v1”,
     “response-status” : “SUCCESS”,
     “response-code”   : “201”,
     “response-data” : {
            …
           <response-data-payload>
           …
     }
  }
}


Info
iconfalse
titleNCMP Event forwarded to Client topic (in eventTarget)

{

  "eventId"                    : "33001",                                                                    # some generic event uuid generated by NCMP
  “eventCorrelationId” : “request-009995”,                                                      # for event correlation - acts as the request Id when
                                                                                                                        # eventType=org.onap.ncmp.async-request-response-event
  "eventTime"             : "2015-11-16T16:42:26-05:06", ( kieran mccarthy Is this a timezone?)
  "eventSource"          : "org.onap.ncmp",                                                        # The source of the event
  "eventType"             : "org.onap.ncmp.async-request-response-event",       # event type for async request response events
  ”eventSchema”        : “org.onap.ncmp:async-request-response-event:v1",  # event schema for async request response events

  "eventOrigin"           : {

        "eventId"                    : "00001",                                                                     # some generic event uuid generated by DMI Plugin
        "eventTime"               : "2021-11-16T16:42:25-04:00",  (kieran mccarthy is this agreed upon?)
        “eventCorrelationId” : “request-009995”,     
        "eventSource"           : "org.onap.dmi",                                                           # ??? Is this needed?
        "eventType"               : "org.onap.ncmp.async-request-response-event",      # event type for async request response events
        ”eventSchema”          : “org.onap.ncmp:async-request-response-event:v1",  # event schema for async request response events
        “eventTarget”            : “topic:client-application-topic”
   },


  "event": {
     “response-data-schema” : “org.onap.cps.yang-patch-json:v1”,
     “response-status” : “SUCCESS”,
     “response-code”   : “201”,
     “response-data” : {
            …
           <response-data-payload>
           …
     }
  }
}


Failure Scenario


Info
iconfalse
titleDMI Event sent to NCMP

{

  "eventId"                    : "9999",                                                                    # some generic event uuid generated by DMI Plugin
  “eventCorrelationId” : “request-009995”,                                                    # for event correlation - acts as the request Id when
                                                                                                                       # eventType=org.onap.ncmp.async-request-response-event
  "eventTime"             : "2021-11-16T16:42:25-04:00",
  "eventSource"          : "org.onap.ncmp",                                                       # ??? Is this needed?
  "eventType"             : "org.onap.ncmp.async-request-response-event",      # event type for async request response events
  ”eventSchema”        : “org.onap.ncmp:async-request-response-event:v1",  # event schema for async request response events
  “eventTarget”            : “topic:client-application-topic” ,                                # indicates where to route the event
                                                                                                                         # - ‘topic’ is default target type
  "event": {
     “response-data-schema” : “org.onap.cps.generic-error:v1”,
     “response-status” : “FAILURE”,
     “response-code”   : “404”,
     “response-data” : {
            …
           <response-data-payload-as-org.onap.ncmp.generic-error:v1>
           …
     }
   }
}


Info
iconfalse
titleNCMP Event forwarded to Client topic (defined in eventTarget)

{

  "eventId"                    : "33001",                                                                  # some generic event uuid generated by NCMP
  “eventCorrelationId” : “request-009995”,                                                    # for event correlation - acts as the request Id when
                                                                                                                       # eventType=org.onap.ncmp.async-request-response-event
  "eventTime"             : "2015-11-16T16:42:26-05:06",
  "eventSource"          : "org.onap.ncmp",                                                       # The source of the event
  "eventType"             : "org.onap.ncmp.async-request-response-event",      # event type for async request response events
  ”eventSchema”        : “org.onap.ncmp:async-request-response-event:v1",  # event schema for async request response events

  "eventOrigin"           : {

        "eventId"                    : "00001",                                                                       # some generic event uuid generated by DMI Plugin
        "eventTime"               : "2021-11-16T16:42:25-04:00",
        “eventCorrelationId” : “request-009995”,     
        "eventSource"           : "org.onap.dmi",                                                             # ??? Is this needed?
        "eventType"               : "org.onap.ncmp.async-request-response-event",       # event type for async request response events
        ”eventSchema”          : “org.onap.ncmp:async-request-response-event:v1",  # event schema for async request response events
        “eventTarget”            : “topic:client-application-topic”
   },


  "event": {
     “response-data-schema” : “org.onap.cps.generic-error:v1”,
     “response-status” : “FAILURE”,
     “response-code”   : “404”,
     “response-data” : {
            …
           <response-data-payload-as-org.onap.ncmp.generic-error:v1>
           …
     }
  }
}


Sample Output

Code Block
languageyml
titleResponse Event 1: DMI -> NCMP
collapsetrue
{
  "eventId": "c5c7c885-2e31-4951-aba5-3062889b8a7c",
  "eventCorrelationId": "2539f8ac-b2c3-422d-bb75-b3f10d3d6216",
  "eventTime": "2022-05-05T13:48:51.157+0000",
  "eventSource": "org.onap.ncmp",
  "eventSchema": "urn:cps:org.onap.cps:async-request-response-event-schema:v1",
  "eventTarget": "my-topic-999",
  "event": {
    "response-data-schema": "urn:cps:org.onap.cps:async-request-response-event-schema:v1",
    "response-status": "200",
    "response-code": "SUCCESS",
    "response-data": {
      "ietf-netconf-monitoring:netconf-state": {
        "schemas": {
          "schema": [
            {
              "identifier": "ietf-tls-server",
              "version": "2016-11-02",
              "format": "ietf-netconf-monitoring:yang",
              "namespace": "urn:ietf:params:xml:ns:yang:ietf-tls-server",
              "location": [
                "NETCONF"
              ]
            },
            ...,
            {
              "identifier": "ietf-system",
              "version": "2014-08-06",
              "format": "ietf-netconf-monitoring:yin",
              "namespace": "urn:ietf:params:xml:ns:yang:ietf-system",
              "location": [
                "NETCONF"
              ]
            }
          ]
        }
      }
    }
  }
}


Code Block
languageyml
titleResponse Event 2: NCMP -> Client
collapsetrue
{
  "eventId": "bfb86f3e-641b-4750-b3a0-d8d31f01a708",
  "eventCorrelationId": "2539f8ac-b2c3-422d-bb75-b3f10d3d6216",
  "eventTime": "2022-05-05T13:48:53.375+0000",
  "eventSource": "org.onap.ncmp",
  "eventSchema": "urn:cps:org.onap.cps:async-request-response-event-schema:v1",
  "eventTarget": "my-topic-999",
  "eventOrigin": {
    "eventId": "c5c7c885-2e31-4951-aba5-3062889b8a7c",
    "eventCorrelationId": "2539f8ac-b2c3-422d-bb75-b3f10d3d6216",
    "eventTime": "2022-05-05T13:48:51.157+0000",
    "eventSource": "org.onap.ncmp",
    "eventSchema": "urn:cps:org.onap.cps:async-request-response-event-schema:v1",
    "eventTarget": "my-topic-999"
  },
  "event": {
    "response-data-schema": "urn:cps:org.onap.cps:async-request-response-event-schema:v1",
    "response-status": "200",
    "response-code": "SUCCESS",
    "response-data": {
      "ietf-netconf-monitoring:netconf-state": {
        "schemas": {
          "schema": [
            {
              "identifier": "ietf-tls-server",
              "version": "2016-11-02",
              "format": "ietf-netconf-monitoring:yang",
              "namespace": "urn:ietf:params:xml:ns:yang:ietf-tls-server",
              "location": [
                "NETCONF"
              ]
            },
            ...,
            {
              "identifier": "ietf-system",
              "version": "2014-08-06",
              "format": "ietf-netconf-monitoring:yin",
              "namespace": "urn:ietf:params:xml:ns:yang:ietf-system",
              "location": [
                "NETCONF"
              ]
            }
          ]
        }
      }
    }
  }
}


Manual Testing

...


Code Block
languageyml
titleClient -> NCMP Response
linenumberstrue
collapsetrue
{
    "headers": {},
    "body": {
        "requestId": "0f574dfa-9c05-4899-9ae1-e6741f53c6d9"
    },
    "statusCode": "OK",
    "statusCodeValue": 200
}


Code Block
languageyml
titleNCMP Sync Done
linenumberstrue
collapsetrue
{
	"logTimeStamp": "2022-05-10T11:16:27.123Z",
	"logLevel": "INFO",
	"principalId": "cpsuser",
	"serviceName": "cps-application",
	"message": "Async task completed successfully.",
	"processId": "1",
	"threadName": "ForkJoinPool.commonPool-worker-5",
	"class": "o.o.c.n.r.executor.CpsNcmpTaskExecutor"
}


Code Block
languageyml
titleDMI -> NCMP (DmiAsyncRequestResponseEvent)
linenumberstrue
collapsetrue
{
  "eventId": "b2f629ae-ee90-44d9-97f9-500c8a595b6a",
  "eventCorrelationId": "2243bbcd-b0b3-4aa5-9d53-41dc88a525a5",
  "eventTime": "2022-05-10T08:39:37.597+0000",
  "eventSource": "org.onap.ncmp",
  "eventSchema": "urn:cps:org.onap.cps:async-request-response-event-schema:v1",
  "eventTarget": "my-topic-999",
  "eventContent": {
    "response-data-schema": "urn:cps:org.onap.cps:async-request-response-event-schema:v1",
    "response-status": "SUCCESS",
    "response-code": "200",
    "response-data": {
      "ietf-netconf-monitoring:netconf-state": {
        "schemas": {
          "schema": [
            {
              "identifier": "ietf-tls-server",
              "version": "2016-11-02",
              "format": "ietf-netconf-monitoring:yang",
              "namespace": "urn:ietf:params:xml:ns:yang:ietf-tls-server",
              "location": [
                "NETCONF"
              ]
            },
            {
              "identifier": "nc-notifications",
              "version": "2008-07-14",
              "format": "ietf-netconf-monitoring:yang",
              "namespace": "urn:ietf:params:xml:ns:netmod:notification",
              "location": [
                "NETCONF"
              ]
            }
        }
      }
    }
  }
}


Code Block
languageyml
titleNCMP -> Client (NcmpAsyncRequestResponseEvent)
linenumberstrue
collapsetrue
{
  "eventId": "699da452-4131-4e55-b49a-50e0c5f0d641",
  "eventCorrelationId": "efc17123-902a-4f50-8294-178a782b9102",
  "eventTarget": "test-env-1",
  "eventTime": "2022-05-10T11:16:27.427+0000",
  "forwardedEvent": {
    "eventId": "0f574dfa-9c05-4899-9ae1-e6741f53c6d9",
    "eventCorrelationId": "efc17123-902a-4f50-8294-178a782b9102",
    "eventTime": "2022-05-10T11:16:27.374+0000",
    "eventSource": "org.onap.ncmp",
    "eventSchema": "urn:cps:org.onap.cps:async-request-response-event-schema:v1",
    "eventTarget": "test-env-1",
    "response-data-schema": "urn:cps:org.onap.cps:async-request-response-event-schema:v1",
    "response-status": "SUCCESS",
    "response-code": "200",
    "response-data": {
      "ietf-netconf-monitoring:netconf-state": {
        "schemas": {
          "schema": [
            {
              "identifier": "ietf-tls-server",
              "version": "2016-11-02",
              "format": "ietf-netconf-monitoring:yang",
              "namespace": "urn:ietf:params:xml:ns:yang:ietf-tls-server",
              "location": [
                "NETCONF"
              ]
            },
            {
              "identifier": "nc-notifications",
              "version": "2008-07-14",
              "format": "ietf-netconf-monitoring:yang",
              "namespace": "urn:ietf:params:xml:ns:netmod:notification",
              "location": [
                "NETCONF"
              ]
            }
          ]
        }
      }
    }
  }
}


Kafka config & Implementation

...

Example Kafka Consumer Implementation from CPS-Temporal

The below code snippet taken from cps-temporal can be used in the same way in NCMP to listen to message from DMI substituting the topics and errorHandler

Code Block
languagejava
titleExample Kafka Consumer Implementation
collapsetrue
 /**
     * Consume the specified event.
     *
     * @param cpsDataUpdatedEvent the data updated event to be consumed and persisted.
     */
    @KafkaListener(topics = "${app.listener.data-updated.topic}", errorHandler = "dataUpdatedEventListenerErrorHandler")
    public void consume(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {

        log.debug("Receiving {} ...", cpsDataUpdatedEvent);

        // Validate event envelop
        validateEventEnvelop(cpsDataUpdatedEvent);

        // Map event to entity
        final var networkData = this.cpsDataUpdatedEventMapper.eventToEntity(cpsDataUpdatedEvent);
        log.debug("Persisting {} ...", networkData);

        // Persist entity
        final var persistedNetworkData = this.networkDataService.addNetworkData(networkData);
        log.debug("Persisted {}", persistedNetworkData);

    }

Example Kafka Consumer Config from CPS-Temporal

Code Block
languageyml
titleExample Kafka Consumer Config
collapsetrue
# ============LICENSE_START=======================================================
# Copyright (c) 2021 Bell Canada.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================

# Spring profile configuration for sasl ssl Kafka

spring:
    kafka:
        bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
        security:
            protocol: SASL_SSL
        ssl:
            trust-store-type: JKS
            trust-store-location: ${KAFKA_SSL_TRUST_STORE_LOCATION}
            trust-store-password: ${KAFKA_SSL_TRUST_STORE_PASSWORD}
        properties:
            sasl.mechanism: SCRAM-SHA-512
            sasl.jaas.config: ${KAFKA_SASL_JAAS_CONFIG}
            ssl.endpoint.identification.algorithm:

Example Kafka Producer Implementation from CPS-NCMP

Code Block
languagejava
titleExample Kafka Producer Implementation
collapsetrue
/*
 * ============LICENSE_START=======================================================
 *  Copyright (c) 2021 Bell Canada.
 *  ================================================================================
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *         http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 *  SPDX-License-Identifier: Apache-2.0
 *  ============LICENSE_END=========================================================
 */

package org.onap.cps.notification;

import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.onap.cps.event.model.CpsDataUpdatedEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class NotificationPublisher {

    private KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate;
    private String topicName;

    /**
     * Create an instance of Notification Publisher.
     *
     * @param kafkaTemplate kafkaTemplate is send event using kafka
     * @param topicName     topic, to which cpsDataUpdatedEvent is sent, is provided by setting
     *                      'notification.data-updated.topic' in the application properties
     */
    @Autowired
    public NotificationPublisher(
        final KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate,
        final @Value("${notification.data-updated.topic}") String topicName) {
        this.kafkaTemplate = kafkaTemplate;
        this.topicName = topicName;
    }

    /**
     * Send event to Kafka with correct message key.
     *
     * @param cpsDataUpdatedEvent event to be sent to kafka
     */
    public void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
        final var messageKey = cpsDataUpdatedEvent.getContent().getDataspaceName() + ","
            + cpsDataUpdatedEvent.getContent().getAnchorName();
        log.debug("Data Updated event is being sent with messageKey: '{}' & body : {} ",
            messageKey, cpsDataUpdatedEvent);
        kafkaTemplate.send(topicName, messageKey, cpsDataUpdatedEvent);
    }

}

Example Kafka Producer Config from CPS-NCMP

Code Block
languagejava
titleExample Kafka Producer Config
collapsetrue
spring:
  kafka:
    properties:
      request.timeout.ms: 5000
      retries: 1
      max.block.ms: 10000
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      cliend-id: cps
    consumer:
      group-id: cps-test
      auto-offset-reset: earliest
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.value.default.type: org.onap.cps.event.model.CpsDataUpdatedEvent

Example Kafka Docker-Compose

Code Block
languageyml
titleExample Kafka Docker-Compose
collapsetrue
  kafka:
    image: confluentinc/cp-kafka:6.1.1
    container_name: kafka
    ports:
      - "19092:19092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Future or alternative (Out of Scope)

...

What are Futures?

A Java Future, java.util.concurrent.Future, represents the result of an asynchronous computation. When the asynchronous task is created, a Java Future object is returned. This Future object functions as a handle to the result of the asynchronous task. Once the asynchronous task completes, the result can be accessed via the Future object returned when the task was started

source: http://tutorials.jenkov.com/java-util-concurrent/java-future.html

CompletableFuture (Java8+)

Java 8 introduced the CompletableFuture class. Along with the Future interface, it also implemented the CompletionStage interface. This interface defines the contract for an asynchronous computation step that we can combine with other steps.

CompletableFuture is at the same time a building block and a framework, with about 50 different methods for composing, combining, and executing asynchronous computation steps and handling errors.


Code Block
languagejava
themeEclipse
titleSimple Future Example
collapsetrue
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // Simulate a long-running Job   
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    System.out.println("I'll run in a separate thread than the main thread.");
});

source: https://www.callicoder.com/java-8-completablefuture-tutorial/

Alternatives - Thread

Code Block
languagejava
titleThread Example
collapsetrue
int number = 20;
Thread newThread = new Thread(() -> {
    System.out.println("Factorial of " + number + " is: " + factorial(number));
});
newThread.start();


#TypeProsConsRecommend
1FutureFutures return value
Y
2Thread
threads does not return anything as the run() method returns void. We could possibly implement mechanism to trigger a response but this is unnecessary as futures do thisN

Request ID Generation

...

TypeMethodEase of implementationDecision
UUID
String uniqueID = UUID.randomUUID().toString();
Easy~
CustomWe generate our own  (example exists in NCMP (notificationPublisher - confrm))Medium-
HTTP Request IDFurther investigation required
~
Kafka Event IDFurther investigation required
~

How do NCMP → CPS-Temporal perform ID Generation for Events?

Code Block
languagejava
titleNotificationService
collapsetrue
/*
 * ============LICENSE_START=======================================================
 * Copyright (c) 2021 Bell Canada.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *         http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 * ============LICENSE_END=========================================================
 */

package org.onap.cps.notification;

import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class NotificationService {

    private NotificationProperties notificationProperties;
    private NotificationPublisher notificationPublisher;
    private CpsDataUpdatedEventFactory cpsDataUpdatedEventFactory;
    private NotificationErrorHandler notificationErrorHandler;
    private List<Pattern> dataspacePatterns;

    /**
     * Create an instance of Notification Subscriber.
     *
     * @param notificationProperties     properties for notification
     * @param notificationPublisher      notification Publisher
     * @param cpsDataUpdatedEventFactory to create CPSDataUpdatedEvent
     * @param notificationErrorHandler   error handler
     */
    public NotificationService(
        final NotificationProperties notificationProperties,
        final NotificationPublisher notificationPublisher,
        final CpsDataUpdatedEventFactory cpsDataUpdatedEventFactory,
        final NotificationErrorHandler notificationErrorHandler) {
        log.info("Notification Properties {}", notificationProperties);
        this.notificationProperties = notificationProperties;
        this.notificationPublisher = notificationPublisher;
        this.cpsDataUpdatedEventFactory = cpsDataUpdatedEventFactory;
        this.notificationErrorHandler = notificationErrorHandler;
        this.dataspacePatterns = getDataspaceFilterPatterns(notificationProperties);
    }

    private List<Pattern> getDataspaceFilterPatterns(final NotificationProperties notificationProperties) {
        if (notificationProperties.isEnabled()) {
            return Arrays.stream(notificationProperties.getFilters()
                .getOrDefault("enabled-dataspaces", "")
                .split(","))
                .map(filterPattern -> Pattern.compile(filterPattern, Pattern.CASE_INSENSITIVE))
                .collect(Collectors.toList());
        } else {
            return Collections.emptyList();
        }
    }

    /**
     * Process Data Updated Event and publishes the notification.
     *
     * @param dataspaceName dataspace name
     * @param anchorName    anchor name
     * @param observedTimestamp observedTimestamp
     * @return future
     */
    @Async("notificationExecutor")
    public Future<Void> processDataUpdatedEvent(final String dataspaceName, final String anchorName,
        final OffsetDateTime observedTimestamp) {
        log.debug("process data updated event for dataspace '{}' & anchor '{}'", dataspaceName, anchorName);
        try {
            if (shouldSendNotification(dataspaceName)) {
                final var cpsDataUpdatedEvent =
                    cpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(dataspaceName, anchorName, observedTimestamp);
                log.debug("data updated event to be published {}", cpsDataUpdatedEvent);
                notificationPublisher.sendNotification(cpsDataUpdatedEvent);
            }
        } catch (final Exception exception) {
            /* All the exceptions are handled to not to propagate it to caller.
               CPS operation should not fail if sending event fails for any reason.
             */
            notificationErrorHandler.onException("Failed to process cps-data-updated-event.",
     
Code Block
languageyml
titleExample Kafka Consumer Config
collapsetrue
# ============LICENSE_START=======================================================
# Copyright (c) 2021 Bell Canada.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================

# Spring profile configuration for sasl ssl Kafka

spring:
    kafka:
        bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
        security:
           exception, protocol: SASL_SSLdataspaceName, anchorName);
        ssl:}
        return CompletableFuture.completedFuture(null);
   trust-store-type: JKS }

    /*
        trust-store-location: ${KAFKA_SSL_TRUST_STORE_LOCATION}
            trust-store-password: ${KAFKA_SSL_TRUST_STORE_PASSWORD}Add more complex rules based on dataspace and anchor later
     */
    properties:
private boolean shouldSendNotification(final String dataspaceName) {

       sasl.mechanism: SCRAM-SHA-512
 return notificationProperties.isEnabled()
            && sasl.jaas.config: ${KAFKA_SASL_JAAS_CONFIG}dataspacePatterns.stream()
            ssl.endpoint.identification.algorithm:

...

.anyMatch(pattern -> pattern.matcher(dataspaceName).find());
    }

}


Code Block
languagejava
titleExample Kafka Producer Implementation NotificationPublisher
collapsetrue
/*
 * ============LICENSE_START=======================================================
 *  Copyright (c) 2021 Bell Canada.
 *  ================================================================================
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *         http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 *  SPDX-License-Identifier: Apache-2.0
 *  ============LICENSE_END=========================================================
 */

package org.onap.cps.notification;

import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.onap.cps.event.model.CpsDataUpdatedEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class NotificationPublisher {

    private KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate;
    private String topicName;

    /**
     * Create an instance of Notification Publisher.
     *
     * @param kafkaTemplate kafkaTemplate is send event using kafka
     * @param topicName     topic, to which cpsDataUpdatedEvent is sent, is provided by setting
public class NotificationPublisher {

    private KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate;
    private String topicName;

     /**
     * Create an instance of             'notification.data-updated.topic' in the application propertiesNotification Publisher.
     */
    @Autowired
 * @param kafkaTemplate public NotificationPublisher(
        final KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate,
    kafkaTemplate is send event using kafka
    final @Value("${notification.data-updated.topic}") String topicName) {
 * @param topicName     topic, to which cpsDataUpdatedEvent this.kafkaTemplate = kafkaTemplate;
   is sent, is provided by setting
     this.topicName* = topicName;
    }

    /**
     * Send event to Kafka with correct message key.
     * 'notification.data-updated.topic' in the application properties
     * @param cpsDataUpdatedEvent event to be sent to kafka/
    @Autowired
    public */NotificationPublisher(
    public  void sendNotification(@NonNull final CpsDataUpdatedEventKafkaTemplate<String, cpsDataUpdatedEvent)CpsDataUpdatedEvent> {kafkaTemplate,
        final var messageKey = cpsDataUpdatedEvent.getContent().getDataspaceName() + ","@Value("${notification.data-updated.topic}") String topicName) {
        this.kafkaTemplate    + cpsDataUpdatedEvent.getContent().getAnchorName()= kafkaTemplate;
        log.debug("Data Updated event is being sent with messageKey: '{}' & body : {} ",this.topicName = topicName;
    }

    /**
     * Send event to Kafka with correct messageKey, cpsDataUpdatedEvent);message key.
     *
   kafkaTemplate.send(topicName, messageKey, cpsDataUpdatedEvent);
    }

}

Example Kafka Producer Config from CPS-NCMP (application.yml)

Code Block
languagejava
titleExample Kafka Producer Config
collapsetrue
spring:
  kafka:
    properties:
      request.timeout.ms: 5000  * @param cpsDataUpdatedEvent event to be sent to kafka
     */
    public void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
      retries: 1
 final var messageKey =  max.block.ms: 10000
cpsDataUpdatedEvent.getContent().getDataspaceName() + ","
      producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer+ cpsDataUpdatedEvent.getContent().getAnchorName();
      cliend-id: cps
    consumer:
      group-id: cps-test  log.debug("Data Updated event is being sent with messageKey: '{}' & body : {} ",
      auto-offset-reset: earliest
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializermessageKey, cpsDataUpdatedEvent);
      properties:
  kafkaTemplate.send(topicName, messageKey, cpsDataUpdatedEvent);
    spring.json.value.default.type: org.onap.cps.event.model.CpsDataUpdatedEvent

Example Kafka Docker-Compose

Code Block
languageyml
titleExample Kafka Docker-Compose
collapsetrue
  kafka:
    image: confluentinc/cp-kafka:6.1.1
    container_name: kafka
    ports:
      - "19092:19092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Future (or alternative)

What are Futures?

A Java Future, java.util.concurrent.Future, represents the result of an asynchronous computation. When the asynchronous task is created, a Java Future object is returned. This Future object functions as a handle to the result of the asynchronous task. Once the asynchronous task completes, the result can be accessed via the Future object returned when the task was started

source: http://tutorials.jenkov.com/java-util-concurrent/java-future.html

CompletableFuture (Java8+)

Java 8 introduced the CompletableFuture class. Along with the Future interface, it also implemented the CompletionStage interface. This interface defines the contract for an asynchronous computation step that we can combine with other steps.

CompletableFuture is at the same time a building block and a framework, with about 50 different methods for composing, combining, and executing asynchronous computation steps and handling errors.

Code Block
languagejava
themeEclipse
titleSimple Future Example
collapsetrue
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // Simulate a long-running Job   
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    System.out.println("I'll run in a separate thread than the main thread.");
});

source: https://www.callicoder.com/java-8-completablefuture-tutorial/

Alternatives

Thread

Code Block
languagejava
titleThread Example
collapsetrue
int number = 20;
Thread newThread = new Thread(() -> {
    System.out.println("Factorial of " + number + " is: " + factorial(number));
});
newThread.start();

...

}

}

Async Request Option using Messaging (Out of Scope)

...

This was for a future completely message driven solution (for now we start with a REST request that will generate an async message eventually. In future we could also send a message that will trigger the same.

Webflux Investigation (Out of Scope)

...

Info

This may need it's own study to determine improvements to operations like model sync.

There is a spike planned to further investigate reactive programming in cps-core.

Jira
serverONAP Jira
serverId425b2b0a-557c-3c0c-b515-579789cceedb
keyCPS-850

What is Webflux?

Spring WebFlux is a web framework that’s built on top of Project Reactor, to give you asynchronous I/O, and allow your application to perform better. The original web framework included in the Spring Framework, Spring Web MVC, was purpose-built for the Servlet API and Servlet containers. The reactive-stack web framework, Spring WebFlux, was added later in version 5.0. It is fully non-blocking, supports Reactive Streams back pressure, and runs on such servers as Netty, Undertow, and Servlet 3.1+ containers.

spring mvc and webflux vennImage Added

















We suggest that you consider the following specific points:

  • If you have a Spring MVC application that works fine, there is no need to change. Imperative programming is the easiest way to write, understand, and debug code. You have maximum choice of libraries, since, historically, most are blocking.

  • In a microservice architecture, you can have a mix of applications with either Spring MVC or Spring WebFlux controllers or with Spring WebFlux functional endpoints. Having support for the same annotation-based programming model in both frameworks makes it easier to re-use knowledge while also selecting the right tool for the right job.

  • A simple way to evaluate an application is to check its dependencies. If you have blocking persistence APIs (JPA, JDBC) or networking APIs to use, Spring MVC is the best choice for common architectures at least. It is technically feasible with both Reactor and RxJava to perform blocking calls on a separate thread but you would not be making the most of a non-blocking web stack.

  • If you have a Spring MVC application with calls to remote services, try the reactive WebClient. You can return reactive types (Reactor, RxJava, or other) directly from Spring MVC controller methods. The greater the latency per call or the interdependency among calls, the more dramatic the benefits. Spring MVC controllers can call other reactive components too.

  • If you have a large team, keep in mind the steep learning curve in the shift to non-blocking, functional, and declarative programming. A practical way to start without a full switch is to use the reactive WebClient. Beyond that, start small and measure the benefits. We expect that, for a wide range of applications, the shift is unnecessary. If you are unsure what benefits to look for, start by learning about how non-blocking I/O works (for example, concurrency on single-threaded Node.js) and its effects.

  • Webflux supports:

    • Annotation-based reactive components
    • Functional routing and handling

Source: https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html

Pros & cons

ProsCons
  • Better scalability due to non blocking threads
  • Use less threads (1 per core)
  • Better CPU Efficiency
  • Reactive web programming is great for applications that have streaming data, and clients that consume it and stream it to their users. It ain’t great for developing CRUD apps. If you want to develop a CRUD API, stick with Spring MVC.

  • Steep learning curve in the shift to non-blocking, functional, and declarative programming


Links to materials:

https://www.baeldung.com/spring-webflux

https://www.youtube.com/watch?v=1F10gr2pbvQ

Kafka Strimzi Investigation

...

Could not find relevant information for ONAP strimzi but there is a PoC to move CPS to Strimzi:

Jira
serverONAP Jira
serverId425b2b0a-557c-3c0c-b515-579789cceedb
keyDMAAP-1681

Can Robot Framework verify Kafka Events?

...

It does not appear to be possible to verify Kafka in Robotframework natively, but there are third party libraries that would aid in this:

RequestID Generation

...

String uniqueID = UUID.randomUUID().toString();

...

Async Request Option using Messaging (Out of Scope)

...

Demo/Test

...

Existing Groovy tests exist for Kafka in cps-service/src/test/groovy/org/onap/cps/notification

Jira
serverONAP Jira
serverId425b2b0a-557c-3c0c-b515-579789cceedb
keyCPS-834

To facilitate demo and testing of this functionality a new standalone app will be required to act as client.

...

For a demo it will be sufficient to show that the response is being produced on the client topic.

The client should conduct integration testing.


References

...