...
Type | Method | Ease of implementation | Decision | |
---|---|---|---|---|
UUID |
| Easy | ~ | |
Custom | We generate our ownown (example exists in NCMP (notificationPublisher - confrm)) | Medium | - Hard | N |
HTTP Request ID | Further investigation required | ~ | ||
Kafka Event ID | Further investigation required | ~ |
How do NCMP → CPS-Temporal perform ID Generation for Events?
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/*
* ============LICENSE_START=======================================================
* Copyright (c) 2021 Bell Canada.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
package org.onap.cps.notification;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class NotificationService {
private NotificationProperties notificationProperties;
private NotificationPublisher notificationPublisher;
private CpsDataUpdatedEventFactory cpsDataUpdatedEventFactory;
private NotificationErrorHandler notificationErrorHandler;
private List<Pattern> dataspacePatterns;
/**
* Create an instance of Notification Subscriber.
*
* @param notificationProperties properties for notification
* @param notificationPublisher notification Publisher
* @param cpsDataUpdatedEventFactory to create CPSDataUpdatedEvent
* @param notificationErrorHandler error handler
*/
public NotificationService(
final NotificationProperties notificationProperties,
final NotificationPublisher notificationPublisher,
final CpsDataUpdatedEventFactory cpsDataUpdatedEventFactory,
final NotificationErrorHandler notificationErrorHandler) {
log.info("Notification Properties {}", notificationProperties);
this.notificationProperties = notificationProperties;
this.notificationPublisher = notificationPublisher;
this.cpsDataUpdatedEventFactory = cpsDataUpdatedEventFactory;
this.notificationErrorHandler = notificationErrorHandler;
this.dataspacePatterns = getDataspaceFilterPatterns(notificationProperties);
}
private List<Pattern> getDataspaceFilterPatterns(final NotificationProperties notificationProperties) {
if (notificationProperties.isEnabled()) {
return Arrays.stream(notificationProperties.getFilters()
.getOrDefault("enabled-dataspaces", "")
.split(","))
.map(filterPattern -> Pattern.compile(filterPattern, Pattern.CASE_INSENSITIVE))
.collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
/**
* Process Data Updated Event and publishes the notification.
*
* @param dataspaceName dataspace name
* @param anchorName anchor name
* @param observedTimestamp observedTimestamp
* @return future
*/
@Async("notificationExecutor")
public Future<Void> processDataUpdatedEvent(final String dataspaceName, final String anchorName,
final OffsetDateTime observedTimestamp) {
log.debug("process data updated event for dataspace '{}' & anchor '{}'", dataspaceName, anchorName);
try {
if (shouldSendNotification(dataspaceName)) {
final var cpsDataUpdatedEvent =
cpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(dataspaceName, anchorName, observedTimestamp);
log.debug("data updated event to be published {}", cpsDataUpdatedEvent);
notificationPublisher.sendNotification(cpsDataUpdatedEvent);
}
} catch (final Exception exception) {
/* All the exceptions are handled to not to propagate it to caller.
CPS operation should not fail if sending event fails for any reason.
*/
notificationErrorHandler.onException("Failed to process cps-data-updated-event.",
exception, dataspaceName, anchorName);
}
return CompletableFuture.completedFuture(null);
}
/*
Add more complex rules based on dataspace and anchor later
*/
private boolean shouldSendNotification(final String dataspaceName) {
return notificationProperties.isEnabled()
&& dataspacePatterns.stream()
.anyMatch(pattern -> pattern.matcher(dataspaceName).find());
}
}
|
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/* * ============LICENSE_START======================================================= * Copyright (c) 2021 Bell Canada. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ package org.onap.cps.notification; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.NonNull; import org.onap.cps.event.model.CpsDataUpdatedEvent; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Component @Slf4j public class NotificationPublisher { private KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate; private String topicName; /** * Create an instance of Notification Publisher. * * @param kafkaTemplate kafkaTemplate is send event using kafka * @param topicName topic, to which cpsDataUpdatedEvent is sent, is provided by setting * 'notification.data-updated.topic' in the application properties */ @Autowired public NotificationPublisher( final KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate, final @Value("${notification.data-updated.topic}") String topicName) { this.kafkaTemplate = kafkaTemplate; this.topicName = topicName; } /** * Send event to Kafka with correct message key. * * @param cpsDataUpdatedEvent event to be sent to kafka */ public void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) { final var messageKey = cpsDataUpdatedEvent.getContent().getDataspaceName() + "," + cpsDataUpdatedEvent.getContent().getAnchorName(); log.debug("Data Updated event is being sent with messageKey: '{}' & body : {} ", messageKey, cpsDataUpdatedEvent); kafkaTemplate.send(topicName, messageKey, cpsDataUpdatedEvent); } } |
...
If you have a Spring MVC application that works fine, there is no need to change. Imperative programming is the easiest way to write, understand, and debug code. You have maximum choice of libraries, since, historically, most are blocking.
In a microservice architecture, you can have a mix of applications with either Spring MVC or Spring WebFlux controllers or with Spring WebFlux functional endpoints. Having support for the same annotation-based programming model in both frameworks makes it easier to re-use knowledge while also selecting the right tool for the right job.
A simple way to evaluate an application is to check its dependencies. If you have blocking persistence APIs (JPA, JDBC) or networking APIs to use, Spring MVC is the best choice for common architectures at least. It is technically feasible with both Reactor and RxJava to perform blocking calls on a separate thread but you would not be making the most of a non-blocking web stack.
If you have a Spring MVC application with calls to remote services, try the reactive
WebClient
. You can return reactive types (Reactor, RxJava, or other) directly from Spring MVC controller methods. The greater the latency per call or the interdependency among calls, the more dramatic the benefits. Spring MVC controllers can call other reactive components too.If you have a large team, keep in mind the steep learning curve in the shift to non-blocking, functional, and declarative programming. A practical way to start without a full switch is to use the reactive
WebClient
. Beyond that, start small and measure the benefits. We expect that, for a wide range of applications, the shift is unnecessary. If you are unsure what benefits to look for, start by learning about how non-blocking I/O works (for example, concurrency on single-threaded Node.js) and its effects.Webflux supports:
- Annotation-based reactive components
- Functional routing and handling
Source: https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html
Webflux supports:
...
Pros & cons
Pros | Cons |
---|---|
|
|
...