Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

TypeMethodEase of implementationDecision
UUID
String uniqueID = UUID.randomUUID().toString();
Easy~
CustomWe generate our ownown  (example exists in NCMP (notificationPublisher - confrm))Medium- HardN
HTTP Request IDFurther investigation required
~
Kafka Event IDFurther investigation required
~


How do NCMP → CPS-Temporal perform ID Generation for Events?

...

Code Block
languagejava
titleNotificationService
collapsetrue
/*
 * ============LICENSE_START=======================================================
 * Copyright (c) 2021 Bell Canada.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *         http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 * ============LICENSE_END=========================================================
 */

package org.onap.cps.notification;

import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class NotificationService {

    private NotificationProperties notificationProperties;
    private NotificationPublisher notificationPublisher;
    private CpsDataUpdatedEventFactory cpsDataUpdatedEventFactory;
    private NotificationErrorHandler notificationErrorHandler;
    private List<Pattern> dataspacePatterns;

    /**
     * Create an instance of Notification Subscriber.
     *
     * @param notificationProperties     properties for notification
     * @param notificationPublisher      notification Publisher
     * @param cpsDataUpdatedEventFactory to create CPSDataUpdatedEvent
     * @param notificationErrorHandler   error handler
     */
    public NotificationService(
        final NotificationProperties notificationProperties,
        final NotificationPublisher notificationPublisher,
        final CpsDataUpdatedEventFactory cpsDataUpdatedEventFactory,
        final NotificationErrorHandler notificationErrorHandler) {
        log.info("Notification Properties {}", notificationProperties);
        this.notificationProperties = notificationProperties;
        this.notificationPublisher = notificationPublisher;
        this.cpsDataUpdatedEventFactory = cpsDataUpdatedEventFactory;
        this.notificationErrorHandler = notificationErrorHandler;
        this.dataspacePatterns = getDataspaceFilterPatterns(notificationProperties);
    }

    private List<Pattern> getDataspaceFilterPatterns(final NotificationProperties notificationProperties) {
        if (notificationProperties.isEnabled()) {
            return Arrays.stream(notificationProperties.getFilters()
                .getOrDefault("enabled-dataspaces", "")
                .split(","))
                .map(filterPattern -> Pattern.compile(filterPattern, Pattern.CASE_INSENSITIVE))
                .collect(Collectors.toList());
        } else {
            return Collections.emptyList();
        }
    }

    /**
     * Process Data Updated Event and publishes the notification.
     *
     * @param dataspaceName dataspace name
     * @param anchorName    anchor name
     * @param observedTimestamp observedTimestamp
     * @return future
     */
    @Async("notificationExecutor")
    public Future<Void> processDataUpdatedEvent(final String dataspaceName, final String anchorName,
        final OffsetDateTime observedTimestamp) {
        log.debug("process data updated event for dataspace '{}' & anchor '{}'", dataspaceName, anchorName);
        try {
            if (shouldSendNotification(dataspaceName)) {
                final var cpsDataUpdatedEvent =
                    cpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(dataspaceName, anchorName, observedTimestamp);
                log.debug("data updated event to be published {}", cpsDataUpdatedEvent);
                notificationPublisher.sendNotification(cpsDataUpdatedEvent);
            }
        } catch (final Exception exception) {
            /* All the exceptions are handled to not to propagate it to caller.
               CPS operation should not fail if sending event fails for any reason.
             */
            notificationErrorHandler.onException("Failed to process cps-data-updated-event.",
                exception, dataspaceName, anchorName);
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
        Add more complex rules based on dataspace and anchor later
     */
    private boolean shouldSendNotification(final String dataspaceName) {

        return notificationProperties.isEnabled()
            && dataspacePatterns.stream()
            .anyMatch(pattern -> pattern.matcher(dataspaceName).find());
    }

}


Code Block
languagejava
titleNotificationPublisher
collapsetrue
/*
 * ============LICENSE_START=======================================================
 *  Copyright (c) 2021 Bell Canada.
 *  ================================================================================
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *         http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 *  SPDX-License-Identifier: Apache-2.0
 *  ============LICENSE_END=========================================================
 */

package org.onap.cps.notification;

import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.onap.cps.event.model.CpsDataUpdatedEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class NotificationPublisher {

    private KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate;
    private String topicName;

    /**
     * Create an instance of Notification Publisher.
     *
     * @param kafkaTemplate kafkaTemplate is send event using kafka
     * @param topicName     topic, to which cpsDataUpdatedEvent is sent, is provided by setting
     *                      'notification.data-updated.topic' in the application properties
     */
    @Autowired
    public NotificationPublisher(
        final KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate,
        final @Value("${notification.data-updated.topic}") String topicName) {
        this.kafkaTemplate = kafkaTemplate;
        this.topicName = topicName;
    }

    /**
     * Send event to Kafka with correct message key.
     *
     * @param cpsDataUpdatedEvent event to be sent to kafka
     */
    public void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
        final var messageKey = cpsDataUpdatedEvent.getContent().getDataspaceName() + ","
            + cpsDataUpdatedEvent.getContent().getAnchorName();
        log.debug("Data Updated event is being sent with messageKey: '{}' & body : {} ",
            messageKey, cpsDataUpdatedEvent);
        kafkaTemplate.send(topicName, messageKey, cpsDataUpdatedEvent);
    }

}

...

  • If you have a Spring MVC application that works fine, there is no need to change. Imperative programming is the easiest way to write, understand, and debug code. You have maximum choice of libraries, since, historically, most are blocking.

  • In a microservice architecture, you can have a mix of applications with either Spring MVC or Spring WebFlux controllers or with Spring WebFlux functional endpoints. Having support for the same annotation-based programming model in both frameworks makes it easier to re-use knowledge while also selecting the right tool for the right job.

  • A simple way to evaluate an application is to check its dependencies. If you have blocking persistence APIs (JPA, JDBC) or networking APIs to use, Spring MVC is the best choice for common architectures at least. It is technically feasible with both Reactor and RxJava to perform blocking calls on a separate thread but you would not be making the most of a non-blocking web stack.

  • If you have a Spring MVC application with calls to remote services, try the reactive WebClient. You can return reactive types (Reactor, RxJava, or other) directly from Spring MVC controller methods. The greater the latency per call or the interdependency among calls, the more dramatic the benefits. Spring MVC controllers can call other reactive components too.

  • If you have a large team, keep in mind the steep learning curve in the shift to non-blocking, functional, and declarative programming. A practical way to start without a full switch is to use the reactive WebClient. Beyond that, start small and measure the benefits. We expect that, for a wide range of applications, the shift is unnecessary. If you are unsure what benefits to look for, start by learning about how non-blocking I/O works (for example, concurrency on single-threaded Node.js) and its effects.

  • Webflux supports:

    • Annotation-based reactive components
    • Functional routing and handling

Source: https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html

Webflux supports:

...

Pros & cons

ProsCons
  • Better scalability due to non blocking threads
  • Use less threads (1 per core)
  • Better CPU Efficiency
  • Reactive web programming is great for applications that have streaming data, and clients that consume it and stream it to their users. It ain’t great for developing CRUD apps. If you want to develop a CRUD API, stick with Spring MVC.

  • Steep learning curve in the shift to non-blocking, functional, and declarative programming

...