Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Kafka config & Implementation

...

Example

...

Kafka Consumer Implementation from CPS-Temporal

The below code snippet taken from cps-temporal can be used in the same way in NCMP to listen to message from DMI substituting the topics and errorHandler

Code Block
languagejava
titleExample Listener (Java, cps-temporal)Kafka Consumer Implementation from CPS-Temporal
collapsetrue
 /**
     * Consume the specified event.
     *
     * @param cpsDataUpdatedEvent the data updated event to be consumed and persisted.
     */
    @KafkaListener(topics = "${app.listener.data-updated.topic}", errorHandler = "dataUpdatedEventListenerErrorHandler")
    public void consume(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {

        log.debug("Receiving {} ...", cpsDataUpdatedEvent);

        // Validate event envelop
        validateEventEnvelop(cpsDataUpdatedEvent);

        // Map event to entity
        final var networkData = this.cpsDataUpdatedEventMapper.eventToEntity(cpsDataUpdatedEvent);
        log.debug("Persisting {} ...", networkData);

        // Persist entity
        final var persistedNetworkData = this.networkDataService.addNetworkData(networkData);
        log.debug("Persisted {}", persistedNetworkData);

    }

Example

...

Kafka Consumer

...

Config

...

from CPS-Temporal

Code Block
languageyml
titleExample of Existing Kafka Consumer ( Config)
collapsetrue
# ============LICENSE_START=======================================================
# Copyright (c) 2021 Bell Canada.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================

# Spring profile configuration for sasl ssl Kafka

spring:
    kafka:
        bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
        security:
            protocol: SASL_SSL
        ssl:
            trust-store-type: JKS
            trust-store-location: ${KAFKA_SSL_TRUST_STORE_LOCATION}
            trust-store-password: ${KAFKA_SSL_TRUST_STORE_PASSWORD}
        properties:
            sasl.mechanism: SCRAM-SHA-512
            sasl.jaas.config: ${KAFKA_SASL_JAAS_CONFIG}
            ssl.endpoint.identification.algorithm:

Example

...

Example of Existing Producer (Config)

...

Kafka Producer Implementation from CPS-NCMP

Code Block
languagejava
titleExample Kafka Producer Implementation
collapsetrue
/*
 * ============LICENSE_START=======================================================
 *  Copyright (c) 2021 Bell Canada.
 *  ================================================================================
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *         http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 *  SPDX-License-Identifier: Apache-2.0
 *  ============LICENSE_END=========================================================
 */

package org.onap.cps.notification;

import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.onap.cps.event.model.CpsDataUpdatedEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class NotificationPublisher {

    private KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate;
    private String topicName;

    /**
     * Create an instance of Notification Publisher.
     *
     * @param kafkaTemplate kafkaTemplate is send event using kafka
     * @param topicName     topic, to which cpsDataUpdatedEvent is sent, is provided by setting
     *                      'notification.data-updated.topic' in the application properties
     */
    @Autowired
    public NotificationPublisher(
        final KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate,
        final @Value("${notification.data-updated.topic}") String topicName) {
        this.kafkaTemplate = kafkaTemplate;
        this.topicName = topicName;
    }

    /**
     * Send event to Kafka with correct message key.
     *
     * @param cpsDataUpdatedEvent event to be sent to kafka
     */
    public void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
        final var messageKey = cpsDataUpdatedEvent.getContent().getDataspaceName() + ","
            + cpsDataUpdatedEvent.getContent().getAnchorName();
        log.debug("Data Updated event is being sent with messageKey: '{}' & body : {} ",
            messageKey, cpsDataUpdatedEvent);
        kafkaTemplate.send(topicName, messageKey, cpsDataUpdatedEvent);
    }

}

Example Kafka Producer Config from CPS-NCMP (application.yml)

Code Block
languagejava
titleExample Kafka Producer Config
collapsetrue
spring:
  kafka:
    properties:
      request.timeout.ms: 5000
      retries: 1
      max.block.ms: 10000
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      cliend-id: cps
    consumer:
      group-id: cps-test
      auto-offset-reset: earliest
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.value.default.type: org.onap.cps.event.model.CpsDataUpdatedEvent

Example Kafka Docker-Compose

Code Block
languageyml
titleExample Kafka Docker Example-Compose
collapsetrue
  kafka:
    image: confluentinc/cp-kafka:6.1.1
    container_name: kafka
    ports:
      - "19092:19092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Response types for

...

Passthrough

...

Will we be able to handle everything "as-is"?

TODO: Clarify what the response type will be e.g. Object, String, JSON

Future (or alternative)

...

What are Futures?

...

Async Request Option using Messaging

...

TODO: Clarify what is intended in this section

Demo/Test

...

Jira
serverONAP Jira
serverId425b2b0a-557c-3c0c-b515-579789cceedb
keyCPS-834