CPS-821
-
Getting issue details...
STATUS
Description/Scope
The scope of this spike is to ascertain:
- How to use messaging (producer, agree topic etc))
- Using existing rest endpoint with additional flag indicating async response
- Also consider asynchronous request option using messaging in the proposal
Associated Jira Created for Implementation
Key
|
Summary
|
T
|
Created
|
Updated
|
Due
|
Assignee
|
Reporter
|
P
|
Status
|
Resolution
|
Issues/Decisions
# | Issue | Decision | Notes/Jira |
---|
1 | What topic to use? proposed ncmp-async-xxx |
|
|
2 | Are adding a new REST endpoint for async or modifying an existing endpoint? |
|
CPS-830
-
Getting issue details...
STATUS
|
3 | Agree URL for async once #2 is clarified |
|
|
4 | passthrough request need to be able to handle different response types (using accept header) but the async option would have a fixed and possibly different response type. |
|
|
5 | How many messages are we expecting at peak time? |
|
|
6 | Should we create a standalone app to demo or are tests sufficient? |
|
|
7 | Do we need to persist the generated requestID? |
|
|
8 | Will cps-temporal require the requestID? |
|
|
Proposed Diagram
eyJleHRTcnZJbnRlZ1R5cGUiOiIiLCJnQ2xpZW50SWQiOiIiLCJjcmVhdG9yTmFtZSI6Ikpvc2VwaCBLZWVuYW4iLCJvdXRwdXRUeXBlIjoiYmxvY2siLCJsYXN0TW9kaWZpZXJOYW1lIjoiSm9zZXBoIEtlZW5hbiIsImxhbmd1YWdlIjoiZW4iLCJkaWFncmFtRGlzcGxheU5hbWUiOiIiLCJzRmlsZUlkIjoiIiwiYXR0SWQiOiIxMTc3NDEwNTQiLCJkaWFncmFtTmFtZSI6IkNQUy04MjEiLCJhc3BlY3QiOiIiLCJsaW5rcyI6ImF1dG8iLCJjZW9OYW1lIjoiQ1BTLTgyMSBTcGlrZTogU3VwcG9ydCBBc3luYyByZWFkLXdyaXRlIG9wZXJhdGlvbnMgb24gQ1BTLU5DTVAgaW50ZXJmYWNlIiwidGJzdHlsZSI6InRvcCIsImNhbkNvbW1lbnQiOmZhbHNlLCJkaWFncmFtVXJsIjoiIiwiY3N2RmlsZVVybCI6IiIsImJvcmRlciI6dHJ1ZSwibWF4U2NhbGUiOiIxIiwib3duaW5nUGFnZUlkIjoxMTc3NDA0ODcsImVkaXRhYmxlIjpmYWxzZSwiY2VvSWQiOjExNzc0MTE3NiwicGFnZUlkIjoiIiwibGJveCI6dHJ1ZSwic2VydmVyQ29uZmlnIjp7ImVtYWlscHJldmlldyI6IjEifSwib2RyaXZlSWQiOiIiLCJyZXZpc2lvbiI6NSwibWFjcm9JZCI6ImY5NTMyZDdmLTFhNGItNGEwZi04NjUyLWEwY2E0Mjg2ZmVmMCIsInByZXZpZXdOYW1lIjoiQ1BTLTgyMS5wbmciLCJsaWNlbnNlU3RhdHVzIjoiT0siLCJzZXJ2aWNlIjoiIiwiaXNUZW1wbGF0ZSI6IiIsIndpZHRoIjoiNzcxIiwic2ltcGxlVmlld2VyIjpmYWxzZSwibGFzdE1vZGlmaWVkIjoxNjQzMzAzOTc4MDAwLCJleGNlZWRQYWdlV2lkdGgiOmZhbHNlLCJvQ2xpZW50SWQiOiIifQ==
Rest Endpoint with Async Flag
Current | Proposed |
---|
<ncmp>/v1/ch/PNFDemo/data/ds/ncmp-datastore:passthrough-running?resourceIdentifier=stores:bookstore | <ncmp>/v1/ch/PNFDemo/data/async/ds/ncmp-datastore:passthrough-running?resourceIdentifier=stores:bookstore |
Kafka config & Implementation
Example of Existing Consumer (Java)
The below code snippet taken from cps-temporal can be used in the same way in NCMP to listen to message from DMI substituting the topics and errorHandler
/**
* Consume the specified event.
*
* @param cpsDataUpdatedEvent the data updated event to be consumed and persisted.
*/
@KafkaListener(topics = "${app.listener.data-updated.topic}", errorHandler = "dataUpdatedEventListenerErrorHandler")
public void consume(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
log.debug("Receiving {} ...", cpsDataUpdatedEvent);
// Validate event envelop
validateEventEnvelop(cpsDataUpdatedEvent);
// Map event to entity
final var networkData = this.cpsDataUpdatedEventMapper.eventToEntity(cpsDataUpdatedEvent);
log.debug("Persisting {} ...", networkData);
// Persist entity
final var persistedNetworkData = this.networkDataService.addNetworkData(networkData);
log.debug("Persisted {}", persistedNetworkData);
}
Example of Existing Consumer (Config)
# ============LICENSE_START=======================================================
# Copyright (c) 2021 Bell Canada.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
# Spring profile configuration for sasl ssl Kafka
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
security:
protocol: SASL_SSL
ssl:
trust-store-type: JKS
trust-store-location: ${KAFKA_SSL_TRUST_STORE_LOCATION}
trust-store-password: ${KAFKA_SSL_TRUST_STORE_PASSWORD}
properties:
sasl.mechanism: SCRAM-SHA-512
sasl.jaas.config: ${KAFKA_SASL_JAAS_CONFIG}
ssl.endpoint.identification.algorithm:
Example of Existing Producer (Java)
Example of Existing Producer (Config)
Example of Kafka Docker-Compose
kafka:
image: confluentinc/cp-kafka:6.1.1
container_name: kafka
ports:
- "19092:19092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Response types for passthrough
Will we be able to handle everything "as-is"?
Future (or alternative)
What are Futures?
A Java Future, java.util.concurrent.Future
, represents the result of an asynchronous computation. When the asynchronous task is created, a Java Future
object is returned. This Future
object functions as a handle to the result of the asynchronous task. Once the asynchronous task completes, the result can be accessed via the Future
object returned when the task was started
source: http://tutorials.jenkov.com/java-util-concurrent/java-future.html
CompletableFuture (Java8+)
Java 8 introduced the CompletableFuture class. Along with the Future interface, it also implemented the CompletionStage interface. This interface defines the contract for an asynchronous computation step that we can combine with other steps.
CompletableFuture is at the same time a building block and a framework, with about 50 different methods for composing, combining, and executing asynchronous computation steps and handling errors.
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// Simulate a long-running Job
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("I'll run in a separate thread than the main thread.");
});
source: https://www.callicoder.com/java-8-completablefuture-tutorial/
Alternatives
Thread
int number = 20;
Thread newThread = new Thread(() -> {
System.out.println("Factorial of " + number + " is: " + factorial(number));
});
newThread.start();
# | Type | Pros | Cons | Recommend |
---|
1 | Future | Futures return value |
| Y |
2 | Thread |
| threads does not return anything as the run() method returns void . We could possibly implement mechanism to trigger a response but this is unnecessary as futures do this | N |
RequestID Generation
Type | Method | Ease of implementation | Recommend |
---|
UUID | String uniqueID = UUID.randomUUID().toString();
| Easy | Y |
Custom | We generate our own | Medium - Hard | N |
Async Request Option using Messaging
Demo/Test
CPS-834
-
Getting issue details...
STATUS