...
Kafka config & Implementation
...
Example
...
Kafka Consumer Implementation from CPS-Temporal
The below code snippet taken from cps-temporal can be used in the same way in NCMP to listen to message from DMI substituting the topics and errorHandler
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Consume the specified event. * * @param cpsDataUpdatedEvent the data updated event to be consumed and persisted. */ @KafkaListener(topics = "${app.listener.data-updated.topic}", errorHandler = "dataUpdatedEventListenerErrorHandler") public void consume(final CpsDataUpdatedEvent cpsDataUpdatedEvent) { log.debug("Receiving {} ...", cpsDataUpdatedEvent); // Validate event envelop validateEventEnvelop(cpsDataUpdatedEvent); // Map event to entity final var networkData = this.cpsDataUpdatedEventMapper.eventToEntity(cpsDataUpdatedEvent); log.debug("Persisting {} ...", networkData); // Persist entity final var persistedNetworkData = this.networkDataService.addNetworkData(networkData); log.debug("Persisted {}", persistedNetworkData); } |
Example
...
Kafka Consumer
...
Config
...
from CPS-Temporal
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
# ============LICENSE_START======================================================= # Copyright (c) 2021 Bell Canada. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= # Spring profile configuration for sasl ssl Kafka spring: kafka: bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER} security: protocol: SASL_SSL ssl: trust-store-type: JKS trust-store-location: ${KAFKA_SSL_TRUST_STORE_LOCATION} trust-store-password: ${KAFKA_SSL_TRUST_STORE_PASSWORD} properties: sasl.mechanism: SCRAM-SHA-512 sasl.jaas.config: ${KAFKA_SASL_JAAS_CONFIG} ssl.endpoint.identification.algorithm: |
Example
...
Example of Existing Producer (Config)
...
Kafka Producer Implementation from CPS-NCMP
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/*
* ============LICENSE_START=======================================================
* Copyright (c) 2021 Bell Canada.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
package org.onap.cps.notification;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.onap.cps.event.model.CpsDataUpdatedEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class NotificationPublisher {
private KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate;
private String topicName;
/**
* Create an instance of Notification Publisher.
*
* @param kafkaTemplate kafkaTemplate is send event using kafka
* @param topicName topic, to which cpsDataUpdatedEvent is sent, is provided by setting
* 'notification.data-updated.topic' in the application properties
*/
@Autowired
public NotificationPublisher(
final KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate,
final @Value("${notification.data-updated.topic}") String topicName) {
this.kafkaTemplate = kafkaTemplate;
this.topicName = topicName;
}
/**
* Send event to Kafka with correct message key.
*
* @param cpsDataUpdatedEvent event to be sent to kafka
*/
public void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
final var messageKey = cpsDataUpdatedEvent.getContent().getDataspaceName() + ","
+ cpsDataUpdatedEvent.getContent().getAnchorName();
log.debug("Data Updated event is being sent with messageKey: '{}' & body : {} ",
messageKey, cpsDataUpdatedEvent);
kafkaTemplate.send(topicName, messageKey, cpsDataUpdatedEvent);
}
}
|
Example Kafka Producer Config from CPS-NCMP (application.yml)
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
spring:
kafka:
properties:
request.timeout.ms: 5000
retries: 1
max.block.ms: 10000
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
cliend-id: cps
consumer:
group-id: cps-test
auto-offset-reset: earliest
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.value.default.type: org.onap.cps.event.model.CpsDataUpdatedEvent |
Example Kafka Docker-Compose
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
kafka: image: confluentinc/cp-kafka:6.1.1 container_name: kafka ports: - "19092:19092" depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 |
Response types for
...
Passthrough
...
Will we be able to handle everything "as-is"?
TODO: Clarify what the response type will be e.g. Object, String, JSON
Future (or alternative)
...
What are Futures?
...
Async Request Option using Messaging
...
TODO: Clarify what is intended in this section
Demo/Test
...
Jira | ||||||
---|---|---|---|---|---|---|
|