CPS-821
-
Getting issue details...
STATUS
Description/Scope
The scope of this spike is to ascertain:
- How to use messaging (producer, agree topic etc))
- Using existing rest endpoint with additional flag indicating async response
- Also consider asynchronous request option using messaging in the proposal
Associated Jira Created for Implementation
Key
|
Summary
|
T
|
Created
|
Updated
|
Due
|
Assignee
|
Reporter
|
P
|
Status
|
Resolution
|
Issues/Decisions
# | Issue | Decision | Notes/Jira |
---|
1 | What topic to use for client? | To be supplied by cient | Topic provided by client as a parameter which will be injected into our environment and used for asynchronous requests sent back to client. |
2 | What topic to use for private DMI-NCMP? |
| e.g. ncmp-async-private but decision needs to be made with current best practices. |
3 | Are adding a new REST endpoint for async or modifying an existing endpoint? | /ncmp/v1/data/ch/123ee5/ds/ncmp-datastore:*?topic=<topic-name> |
To facilitate asynchronous requests to DMI we will need to either create a new endpoint or modify existing endpoint to include /async flag. The second solution may not be backwards compatible. However creating a new endpoint solely for a flag is also not ideal. We could add async to list of options (but this might interfere with the purpose of /options. Additionally, considered adding a new endpoint for async which simply re-routes the response to the original endpoint while adding the logic for OK response to the client. However, would this lead to a change in the schema? If so, would this be backwards compatible?
CPS-830
-
Getting issue details...
STATUS
|
4 | Agree URL for async once #2 is clarified | /ncmp/v1/data/ch/123ee5/ds/ncmp-datastore:*?topic=<topic-name>
| CPS R10 Release Planning#NCMPRequirements #11. Based on this additional path parameter we no longer require additional /async flag in url. |
5 | Passthrough request need to be able to handle different response types (using accept header) but the async option would have a fixed and possibly different response type. | We should, by default, be able to accept multiple contnet-types. | CPS R10 Release Planning#NCMPRequirements #11. |
6 | Should we create a standalone app to demo or are tests sufficient? | Yes, standalone app would be helpful, along with traditional tests. | CSIT tests may require more involved effort - perhaps we could add standalone app to nexus and use it as part of CSIT test? |
7 | | No | We should be be stateless |
8 | What mechanism should we use for the payload? Event?, Header?, |
| This needs clarification |
9 | Error Reporting | Out of scope |
|
10 | CSIT Testing |
| Will this be required to add to CSIT tests? If so, client app will need to be created and stored in gerrit for cloning during setup phase. |
eyJleHRTcnZJbnRlZ1R5cGUiOiIiLCJnQ2xpZW50SWQiOiIiLCJjcmVhdG9yTmFtZSI6Ikpvc2VwaCBLZWVuYW4iLCJvdXRwdXRUeXBlIjoiYmxvY2siLCJsYXN0TW9kaWZpZXJOYW1lIjoiSm9zZXBoIEtlZW5hbiIsImxhbmd1YWdlIjoiZW4iLCJkaWFncmFtRGlzcGxheU5hbWUiOiIiLCJzRmlsZUlkIjoiIiwiYXR0SWQiOiIxMTc3NDEwNTQiLCJkaWFncmFtTmFtZSI6IkNQUy04MjEiLCJhc3BlY3QiOiIiLCJsaW5rcyI6ImF1dG8iLCJjZW9OYW1lIjoiQ1BTLTgyMSBTcGlrZTogU3VwcG9ydCBBc3luYyByZWFkLXdyaXRlIG9wZXJhdGlvbnMgb24gQ1BTLU5DTVAgaW50ZXJmYWNlIiwidGJzdHlsZSI6InRvcCIsImNhbkNvbW1lbnQiOmZhbHNlLCJkaWFncmFtVXJsIjoiIiwiY3N2RmlsZVVybCI6IiIsImJvcmRlciI6dHJ1ZSwibWF4U2NhbGUiOiIxIiwib3duaW5nUGFnZUlkIjoxMTc3NDA0ODcsImVkaXRhYmxlIjpmYWxzZSwiY2VvSWQiOjExNzc0MjAyNCwicGFnZUlkIjoiIiwibGJveCI6dHJ1ZSwic2VydmVyQ29uZmlnIjp7ImVtYWlscHJldmlldyI6IjEifSwib2RyaXZlSWQiOiIiLCJyZXZpc2lvbiI6MTIsIm1hY3JvSWQiOiI1YWZlZjVkYy01YzRkLTQwNWEtODEzMC01NGYwNDYxNWY3ZjMiLCJwcmV2aWV3TmFtZSI6IkNQUy04MjEucG5nIiwibGljZW5zZVN0YXR1cyI6Ik9LIiwic2VydmljZSI6IiIsImlzVGVtcGxhdGUiOiIiLCJ3aWR0aCI6IjEwMDAiLCJzaW1wbGVWaWV3ZXIiOmZhbHNlLCJsYXN0TW9kaWZpZWQiOjE2NDMzMDM5NzgwMDAsImV4Y2VlZFBhZ2VXaWR0aCI6ZmFsc2UsIm9DbGllbnRJZCI6IiJ9
Kafka config & Implementation
Example Kafka Consumer Implementation from CPS-Temporal
The below code snippet taken from cps-temporal can be used in the same way in NCMP to listen to message from DMI substituting the topics and errorHandler
/**
* Consume the specified event.
*
* @param cpsDataUpdatedEvent the data updated event to be consumed and persisted.
*/
@KafkaListener(topics = "${app.listener.data-updated.topic}", errorHandler = "dataUpdatedEventListenerErrorHandler")
public void consume(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
log.debug("Receiving {} ...", cpsDataUpdatedEvent);
// Validate event envelop
validateEventEnvelop(cpsDataUpdatedEvent);
// Map event to entity
final var networkData = this.cpsDataUpdatedEventMapper.eventToEntity(cpsDataUpdatedEvent);
log.debug("Persisting {} ...", networkData);
// Persist entity
final var persistedNetworkData = this.networkDataService.addNetworkData(networkData);
log.debug("Persisted {}", persistedNetworkData);
}
Example Kafka Consumer Config from CPS-Temporal
# ============LICENSE_START=======================================================
# Copyright (c) 2021 Bell Canada.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
# Spring profile configuration for sasl ssl Kafka
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
security:
protocol: SASL_SSL
ssl:
trust-store-type: JKS
trust-store-location: ${KAFKA_SSL_TRUST_STORE_LOCATION}
trust-store-password: ${KAFKA_SSL_TRUST_STORE_PASSWORD}
properties:
sasl.mechanism: SCRAM-SHA-512
sasl.jaas.config: ${KAFKA_SASL_JAAS_CONFIG}
ssl.endpoint.identification.algorithm:
Example Kafka Producer Implementation from CPS-NCMP
/*
* ============LICENSE_START=======================================================
* Copyright (c) 2021 Bell Canada.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
package org.onap.cps.notification;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.onap.cps.event.model.CpsDataUpdatedEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class NotificationPublisher {
private KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate;
private String topicName;
/**
* Create an instance of Notification Publisher.
*
* @param kafkaTemplate kafkaTemplate is send event using kafka
* @param topicName topic, to which cpsDataUpdatedEvent is sent, is provided by setting
* 'notification.data-updated.topic' in the application properties
*/
@Autowired
public NotificationPublisher(
final KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate,
final @Value("${notification.data-updated.topic}") String topicName) {
this.kafkaTemplate = kafkaTemplate;
this.topicName = topicName;
}
/**
* Send event to Kafka with correct message key.
*
* @param cpsDataUpdatedEvent event to be sent to kafka
*/
public void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
final var messageKey = cpsDataUpdatedEvent.getContent().getDataspaceName() + ","
+ cpsDataUpdatedEvent.getContent().getAnchorName();
log.debug("Data Updated event is being sent with messageKey: '{}' & body : {} ",
messageKey, cpsDataUpdatedEvent);
kafkaTemplate.send(topicName, messageKey, cpsDataUpdatedEvent);
}
}
Example Kafka Producer Config from CPS-NCMP (application.yml)
spring:
kafka:
properties:
request.timeout.ms: 5000
retries: 1
max.block.ms: 10000
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
cliend-id: cps
consumer:
group-id: cps-test
auto-offset-reset: earliest
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.value.default.type: org.onap.cps.event.model.CpsDataUpdatedEvent
Example Kafka Docker-Compose
kafka:
image: confluentinc/cp-kafka:6.1.1
container_name: kafka
ports:
- "19092:19092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Future (or alternative)
What are Futures?
A Java Future, java.util.concurrent.Future
, represents the result of an asynchronous computation. When the asynchronous task is created, a Java Future
object is returned. This Future
object functions as a handle to the result of the asynchronous task. Once the asynchronous task completes, the result can be accessed via the Future
object returned when the task was started
source: http://tutorials.jenkov.com/java-util-concurrent/java-future.html
CompletableFuture (Java8+)
Java 8 introduced the CompletableFuture class. Along with the Future interface, it also implemented the CompletionStage interface. This interface defines the contract for an asynchronous computation step that we can combine with other steps.
CompletableFuture is at the same time a building block and a framework, with about 50 different methods for composing, combining, and executing asynchronous computation steps and handling errors.
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// Simulate a long-running Job
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("I'll run in a separate thread than the main thread.");
});
source: https://www.callicoder.com/java-8-completablefuture-tutorial/
Alternatives
Thread
int number = 20;
Thread newThread = new Thread(() -> {
System.out.println("Factorial of " + number + " is: " + factorial(number));
});
newThread.start();
# | Type | Pros | Cons | Recommend |
---|
1 | Future | Futures return value |
| Y |
2 | Thread |
| threads does not return anything as the run() method returns void . We could possibly implement mechanism to trigger a response but this is unnecessary as futures do this | N |
Type | Method | Ease of implementation | Recommend |
---|
UUID | String uniqueID = UUID.randomUUID().toString();
| Easy | Y |
Custom | We generate our own | Medium - Hard | N |
HTTP Request ID |
|
|
|
Async Request Option using Messaging
TODO: Clarify what is intended in this section
Demo/Test
Existing Groovy tests exist for Kafka in cps-service/src/test/groovy/org/onap/cps/notification
CPS-834
-
Getting issue details...
STATUS
To facilitate demo and testing of this functionality a new standalone app will be required to act as client.