This section is an proposition keeping using exiting event schema implementation.

CPS Events Releases

CPS events are defined (JSON Schema) and generated (Java classes) from cps repository in cps-events module: https://github.com/onap/cps/tree/master/cps-events

Each time a new version of CPS Events schema is published it contains both:

  • The new schema definition (N)
  • The previously published schema definition (N-1)

It also clearly specifies (in release notes ?) what is the Compatibility type of new schema (N) with previous schema (N-1).

Finally, an EventSchemaMapper is provider to map events from new schema (N) to previous schema (N-1) and the other way around. This mapper needs to have some logic depending on the change made in the schema. For example, If new fields are added in the new schema, they might be given a default value in the previous one, if possible.

No more than 2 schemas versions are supported at the same time.

Following is an example of v0 and v1 schema definition in cps-events module.

Then, cps-event artifact built contains the generated classes for both new and previous event schema. Both are available to be used by producer and consumers that are importing this specific cps-events jar as a dependency.

cps-events Maven Artifact Version

To be detailed ...

DataUpdatedEvent Schema Version

To be detailed ...

Consumers of CPS Events

Compiled Code

When using cps-events artifact, consumers has access to both new and previous cps events classes.

Then, it is possible to implement listeners for both events versions as suggested in the following snippet:

DataUpdatedEventListener.java
    /**
     * Consume event from v0 schema.
     */
    public void consume(final org.onap.cps.event.model.v0.CpsDataUpdatedEvent eventV0) {
        // Map event v0 to v1
        org.onap.cps.event.model.v1.CpsDataUpdatedEvent eventV1 = this.eventSchemaMapper.v0ToV1(eventV0);
        // Consume event v1
        consume(eventV1);
    }

    /**
     * Consume event from v1 schema.
     */
    public void consume(final org.onap.cps.event.model.v1.CpsDataUpdatedEvent eventV1) {
        // Map event to entity
        final var networkData = this.cpsDataUpdatedEventMapper.eventToEntity(eventV1);
        // Persist entity
        this.networkDataService.addNetworkData(networkData);
    }

From a compiled code point of view, supporting both event type versions is in place.

Runtime Configuration

The challenge is now to be able to choose which event version a listener is consuming at startup, by configuration only, without changing the application compiled code.

This is done by enabling only one of the 2 listeners by configuration:

DataUpdatedEventListener.java
    @KafkaListener(
        topics = "${app.listener.data-updated.v0.topic}",
        errorHandler = "dataUpdatedEventListenerErrorHandler",
        autoStartup = "${app.listener.data-updated.v0.autoStartup}")
    public void consume(final org.onap.cps.event.model.v0.CpsDataUpdatedEvent eventV0)

    @KafkaListener(
        topics = "${app.listener.data-updated.v1.topic}",
        errorHandler = "dataUpdatedEventListenerErrorHandler",
        autoStartup = "${app.listener.data-updated.v1.autoStartup}")
    public void consume(final org.onap.cps.event.model.v1.CpsDataUpdatedEvent eventV1)
application.yml
app:
    listener:
        data-updated:
            v0:
                topic: ${CPS_CHANGE_EVENT_V0_LISTENER_TOPIC:cps.dev.null}
                autoStartup: ${CPS_CHANGE_EVENT_V0_LISTENER_ENABLED:false}
            v1:
                topic: ${CPS_CHANGE_EVENT_V1_LISTENER_TOPIC:cps.cfg-state-events}
                autoStartup: ${CPS_CHANGE_EVENT_V1_LISTENER_ENABLED:true}

Producer of CPS Events

Compiled Code

The producer is able to send both current and previous versions of events by implementing 2 sets of Notification Service and Notification Publisher using Template Method design pattern.

Following diagrams and snippets are illustrating this implementation:


EventFactory is able to create events for both versions:

CpsDataUpdatedEventFactory.java
public class CpsDataUpdatedEventFactory {

    ...

    private EventSchemaMapper eventSchemaMapper;

    ...

    public org.onap.cps.event.model.v0.CpsDataUpdatedEvent createCpsDataUpdatedEventV0(
            final String dataspaceName, final String anchorName) {
        final var eventV1 = this.createCpsDataUpdatedEventV1(dataspaceName, anchorName);
        return this.eventSchemaMapper.v1ToV0(eventV1);
    }

    public org.onap.cps.event.model.v1.CpsDataUpdatedEvent createCpsDataUpdatedEventV1(
            final String dataspaceName, final String anchorName) {
        final var dataNode = 
                cpsDataService.getDataNode(dataspaceName, anchorName, "/", FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
        final var anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
        return toCpsDataUpdatedEvent(anchor, dataNode);
    }

    ...

}


Abstract Notification and Publisher classes:

NotificationService.java
public abstract class NotificationService {
    ...    
    public void processDataUpdatedEvent(final String dataspaceName, final String anchorName) {
        ...
        try {
            if (shouldSendNotification()) {
                notify(dataspaceName, anchorName);
            }
        } catch (final Exception exception) {
            notificationErrorHandler.onException("Failed to process cps-data-updated-event.",
                exception, dataspaceName, anchorName);
        }
    }

    protected abstract void notify(String dataspaceName, String anchorName);
}
NotificationPublisher.java
public abstract class NotificationPublisher<V> {

    private final KafkaTemplate<String, V> kafkaTemplate;
    private final String topicName;

    protected NotificationPublisher(
            final KafkaTemplate<String, V> kafkaTemplate,
            final @Value("${notification.data-updated.topic}") String topicName) {
        this.kafkaTemplate = kafkaTemplate;
        this.topicName = topicName;
    }

    protected abstract void sendNotification(@NonNull final V event);
}


Concrete Notification and Publisher classes:

V1NotificationService.java
@Service("v1NotificationService")
public class V1NotificationService extends NotificationService {

    private final V1NotificationPublisher notificationPublisher;

    ...

    @Override
    protected void notify(final String dataspaceName, final String anchorName) {
        final var cpsDataUpdatedEvent =
            super.getCpsDataUpdatedEventFactory().createCpsDataUpdatedEventV1(dataspaceName, anchorName);
        notificationPublisher.sendNotification(cpsDataUpdatedEvent);
    }

}
V1NotificationPublisher.java
import org.onap.cps.event.model.v1.CpsDataUpdatedEvent;

public class V1NotificationPublisher extends NotificationPublisher<CpsDataUpdatedEvent> {

    public V1NotificationPublisher(
            final KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate,
            final @Value("${notification.data-updated.topic}") String topicName) {
        super(kafkaTemplate, topicName);
    }

    protected void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
        final var messageKey = 
                cpsDataUpdatedEvent.getContent().getDataspaceName() + "," + cpsDataUpdatedEvent.getContent().getAnchorName();
        super.getKafkaTemplate().send(super.getTopicName(), messageKey, cpsDataUpdatedEvent);
    }

}

Runtime Configuration

The event schema version to be published is specified and configurable in application properties. The specified value is used by Spring when the application is starting to instantiate the expected concrete classes for Notification Service and Publisher.

application.yml
notification:
    data-updated:
        enabled: false
        topic: ${CPS_CHANGE_EVENT_TOPIC:cps.cfg-state-events}
    event-schema-version: ${CPS_CHANGE_EVENT_SCHEMA_VERSION:v1}

The specific requested Notification Service is injected into Data Service:

CpsDataServiceImpl.java
public class CpsDataServiceImpl implements CpsDataService {

    ...

    @Value("${notification.event-schema-version}NotificationService")
    private String notificationServiceQualifier;

    private NotificationService notificationService;

    @Autowired
    public void setNotificationService(final ApplicationContext applicationContext) {
        this.notificationService = (NotificationService) applicationContext.getBean(notificationServiceQualifier);
    }

    ...

}

POC

POC code can be found in Gerrit WIP changes:

Conclusion

Proposed solution above gives flexibility to deploy new releases of the components (producer or consumers) independently, in any order.

When deploying them, the event schema configuration provided ensures that producer and consumers can still remain compliant with specific Compatibility type (forward or backward), if any.

However, some of complexity is introduced in the design because current and previous event classes do not implement any common type, then more specific code is needed to support both types of event.

  • No labels