You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 60 Next »


CPS-821 - Getting issue details... STATUS

Description/Scope


The scope of this spike is to ascertain:

  • How to use messaging (producer, agree topic etc))
  • Using existing rest endpoint with additional flag indicating async response
  • Also consider asynchronous request option using messaging in the proposal


Associated Jira Created for Implementation

Key Summary T Created Updated Assignee Reporter P Status Resolution Sub-Tasks
Loading...
Refresh

Issues/Decisions


#IssueNotes/JiraDecision
1What topic to use for client?Topic provided by client as a parameter which will be injected into our environment and used for asynchronous requests sent back to client.To be supplied by cient
2What topic to use for private DMI-NCMP?e.g. ncmp-async-private but decision needs to be made with current best practices.
3Are adding a new REST endpoint for async or modifying an existing endpoint?


To facilitate asynchronous requests to DMI we will need to either create a new endpoint or modify existing endpoint to include /async flag. The second solution may not be backwards compatible. However creating a new endpoint solely for a flag is also not ideal. We could add async to list of options (but this might interfere with the purpose of /options.

Additionally, considered adding a new endpoint for async which simply re-routes the response to the original endpoint while adding the logic for OK response to the client. However, would this lead to a change in the schema? If so, would this be backwards compatible?

CPS-830 - Getting issue details... STATUS

/ncmp/v1/data/ch/123ee5/ds/ncmp-datastore:*?topic=<topic-name> 
4Agree URL for async once #2 is clarifiedCPS R10 Release Planning#NCMPRequirements #11. Based on this additional path parameter we no longer require additional /async flag in url.

/ncmp/v1/data/ch/123ee5/ds/ncmp-datastore:*?topic=<topic-name> 


5Passthrough request need to be able to handle different response types (using accept header) but the async option would have a fixed and possibly different response type.CPS R10 Release Planning#NCMPRequirements #11.We should, by default, be able to accept multiple contnet-types.
6Should we create a standalone app to demo or are tests sufficient?

CSIT tests may require more involved effort - perhaps we could add standalone app to nexus and use it as part of CSIT test?


7

Do we need to persist the generated requestID?

We should be be statelessNo
8Error Reporting
Out of scope
9

Async Request Option using Messaging

See:  AsyncRequestOptionusingMessagingOut of scope
10Do we actually require futures in this implementation proposal?

It could be argued that the need for futures is made redundant by the fact we call dmi from ncmp through rest and the response will be consumed via Kafka.

What benefit would future give us in this case? 

Not needed
11ID GenerationWhich mechanism to use? Look at CPS-Temporal and follow to keep consistency
12Are there any Kafka specific environment variables or config variables to be set for this work?We will need to add new topic for DMI → NCMP M2M Kafka - which file(s)?
13Can robot framework verify if Kafka events have been sent/receivedThis would be less work and overhead (rather than creating/.maintaining client app)
14Can Webflux do this work with less code/impl?

15ONAP may be deprecating PALINTEXT for Kafka. Strimzi Kafka might need to be used


Proposed Diagram



Steps/possible tickets:

  1. Modify REST endpoint to include param topic (1)
  2. Add logic to send response and request (2a & 2b
  3. Add producer to DMI (implementation and config) (31 & 3b)
  4. Add consumer to NCMP (implementation and config) (4)
  5. Add consumer to NCMP (implementation and config) (5)
  6. Demo & Test (6 & 7)

Kafka config & Implementation


Example Kafka Consumer Implementation from CPS-Temporal

The below code snippet taken from cps-temporal can be used in the same way in NCMP to listen to message from DMI substituting the topics and errorHandler

Example Kafka Consumer Implementation
 /**
     * Consume the specified event.
     *
     * @param cpsDataUpdatedEvent the data updated event to be consumed and persisted.
     */
    @KafkaListener(topics = "${app.listener.data-updated.topic}", errorHandler = "dataUpdatedEventListenerErrorHandler")
    public void consume(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {

        log.debug("Receiving {} ...", cpsDataUpdatedEvent);

        // Validate event envelop
        validateEventEnvelop(cpsDataUpdatedEvent);

        // Map event to entity
        final var networkData = this.cpsDataUpdatedEventMapper.eventToEntity(cpsDataUpdatedEvent);
        log.debug("Persisting {} ...", networkData);

        // Persist entity
        final var persistedNetworkData = this.networkDataService.addNetworkData(networkData);
        log.debug("Persisted {}", persistedNetworkData);

    }

Example Kafka Consumer Config from CPS-Temporal

Example Kafka Consumer Config
# ============LICENSE_START=======================================================
# Copyright (c) 2021 Bell Canada.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================

# Spring profile configuration for sasl ssl Kafka

spring:
    kafka:
        bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
        security:
            protocol: SASL_SSL
        ssl:
            trust-store-type: JKS
            trust-store-location: ${KAFKA_SSL_TRUST_STORE_LOCATION}
            trust-store-password: ${KAFKA_SSL_TRUST_STORE_PASSWORD}
        properties:
            sasl.mechanism: SCRAM-SHA-512
            sasl.jaas.config: ${KAFKA_SASL_JAAS_CONFIG}
            ssl.endpoint.identification.algorithm:

Example Kafka Producer Implementation from CPS-NCMP

Example Kafka Producer Implementation
/*
 * ============LICENSE_START=======================================================
 *  Copyright (c) 2021 Bell Canada.
 *  ================================================================================
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *         http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 *  SPDX-License-Identifier: Apache-2.0
 *  ============LICENSE_END=========================================================
 */

package org.onap.cps.notification;

import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.onap.cps.event.model.CpsDataUpdatedEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class NotificationPublisher {

    private KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate;
    private String topicName;

    /**
     * Create an instance of Notification Publisher.
     *
     * @param kafkaTemplate kafkaTemplate is send event using kafka
     * @param topicName     topic, to which cpsDataUpdatedEvent is sent, is provided by setting
     *                      'notification.data-updated.topic' in the application properties
     */
    @Autowired
    public NotificationPublisher(
        final KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate,
        final @Value("${notification.data-updated.topic}") String topicName) {
        this.kafkaTemplate = kafkaTemplate;
        this.topicName = topicName;
    }

    /**
     * Send event to Kafka with correct message key.
     *
     * @param cpsDataUpdatedEvent event to be sent to kafka
     */
    public void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
        final var messageKey = cpsDataUpdatedEvent.getContent().getDataspaceName() + ","
            + cpsDataUpdatedEvent.getContent().getAnchorName();
        log.debug("Data Updated event is being sent with messageKey: '{}' & body : {} ",
            messageKey, cpsDataUpdatedEvent);
        kafkaTemplate.send(topicName, messageKey, cpsDataUpdatedEvent);
    }

}

Example Kafka Producer Config from CPS-NCMP (application.yml)

Example Kafka Producer Config
spring:
  kafka:
    properties:
      request.timeout.ms: 5000
      retries: 1
      max.block.ms: 10000
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      cliend-id: cps
    consumer:
      group-id: cps-test
      auto-offset-reset: earliest
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.value.default.type: org.onap.cps.event.model.CpsDataUpdatedEvent

Example Kafka Docker-Compose

Example Kafka Docker-Compose
  kafka:
    image: confluentinc/cp-kafka:6.1.1
    container_name: kafka
    ports:
      - "19092:19092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1


Future or alternative (Out of Scope)


What are Futures?

A Java Future, java.util.concurrent.Future, represents the result of an asynchronous computation. When the asynchronous task is created, a Java Future object is returned. This Future object functions as a handle to the result of the asynchronous task. Once the asynchronous task completes, the result can be accessed via the Future object returned when the task was started

source: http://tutorials.jenkov.com/java-util-concurrent/java-future.html


CompletableFuture (Java8+)

Java 8 introduced the CompletableFuture class. Along with the Future interface, it also implemented the CompletionStage interface. This interface defines the contract for an asynchronous computation step that we can combine with other steps.

CompletableFuture is at the same time a building block and a framework, with about 50 different methods for composing, combining, and executing asynchronous computation steps and handling errors.


Simple Future Example
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // Simulate a long-running Job   
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    System.out.println("I'll run in a separate thread than the main thread.");
});

source: https://www.callicoder.com/java-8-completablefuture-tutorial/

Alternatives

Thread

Thread Example
int number = 20;
Thread newThread = new Thread(() -> {
    System.out.println("Factorial of " + number + " is: " + factorial(number));
});
newThread.start();


#TypeProsConsRecommend
1FutureFutures return value
Y
2Thread
threads does not return anything as the run() method returns void. We could possibly implement mechanism to trigger a response but this is unnecessary as futures do thisN

RequestID Generation


TypeMethodEase of implementationDecision
UUID
String uniqueID = UUID.randomUUID().toString();
Easy~
CustomWe generate our ownMedium - HardN
HTTP Request ID

~
Kafka ID

~


Async Request Option using Messaging (Out of Scope)


This was for a future completely message driven solution (for now we start with a REST request that will generate an async message eventually. In future we could also send a message that will trigger the same.


Weblux Investigation



https://www.baeldung.com/spring-webflux


Kafka Strimzi Investigation



https://strimzi.io/

Demo/Test


Existing Groovy tests exist for Kafka in cps-service/src/test/groovy/org/onap/cps/notification

CPS-834 - Getting issue details... STATUS

To facilitate demo and testing of this functionality a new standalone app will be required to act as client.

This is necessary as the client will need to connect to Kafka to consume async responses.


  • No labels