...
DMaaP offers a client plugin called DMaaP Client, which can be found here. This Client can be used to register to a DMaaP event. The PoC code will use this client to show that CPS can listen for and receive DMaaP events.
Filtering of AAI-EVENT
Based on input from the DMaaP team, the filtering will be done on client side. From looking at the DMaaP event that is created when a xNF is created, we can filter on the following properties:
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
{"cambria.partition":"AAI", "event-header":{ "severity":"NORMAL", "entity-type":"pnf", "top-entity-type":"pnf", "entity-link":"/aai/v23/network/pnfs/pnf/dummyTest30", "event-type":"AAI-EVENT", "domain":"dev", "action":"CREATE", "sequence-number":"0","id":"577eb229-7c74-4cf8-84ab-79f66f4899b2", "source-name":"SO", "version":"v23", "timestamp":"20210701-12:45:09:351" }, "entity":{ "ipaddress-v4-oam":"100.10.20.20", "equip-type":"example-equip-type-val-20348", "equip-vendor":"example-equip-vendor-val-52182", "ipaddress-v6-oam":"", "orchestration-status":"Active", "pnf-name2":"dummypnf-1597953056126", "equip-model":"example-equip-model-val-8370", "in-maint":false, "resource-version":"1625143508438", "pnf-id":"927b2580-36d9-4f13-8421-3c9d43b7a75e", "management-option":"example-management-option-val-72881", "spare-equipment-indicator":false, "pnf-name":"dummyTest30"} } |
Proof Of Concept Code
Adding DMaaP Client dependency
To use the DMaaP Client we need the following dependency in our pom.xml file.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
<dependency> <groupId>org.onap.dmaap.messagerouter.dmaapclient</groupId> <artifactId>dmaapClient</artifactId> <version>1.1.12</version> </dependency> |
Creating DMaaP consumer client
The following builder method is used to create an instance of a DMaaP Consumer Client:
...
Attribute name | Attribute Description | Example | |||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
setHostPart | The host and port of the DMaaP server in the format <host:port>. This method takes a collection of strings, check example. |
| |||||||||||
setTopic | The topic name to consume from | "AAI-EVENT" | |||||||||||
setConsumerGroup | A name that uniquely identifies the subscriber's group | "onap-cps" | |||||||||||
setConsumerId | Within the subscriber's group, a name that uniquely identifies the subscriber's process | "cps-ncmp" | |||||||||||
setTimeoutMs | Time in ms to wait for messages on the server before returning | 10000 | |||||||||||
setLimit | Maximum number of messages that is returned per fetch | 1000 | |||||||||||
setFilter | A customizable message filter, or null if no filtering is required | null | |||||||||||
setApiKey_username | The username of the client application | null | |||||||||||
setApiSecret_password | The password for the username | null |
Adding required properties to the consumer
The consumer also takes in the following properties:
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
# The http protocol to use: # http # https final Properties props = new Properties(); props.setProperty("Protocol", "http"); mrConsumer.setProps(props); |
Fetching DMaaP event using the consumer object
To fetch, the following method needs to be called on the consumer object:
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
mrConsumer.fetch() |
POC code used in the demo
...
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
public static void main(String[] args) throws Exception { final Collection<String> hosts = new ArrayList<>(); hosts.add("message-router.onap:3904"); final MRConsumerImpl mrConsumer = new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hosts) .setTopic("AAI-EVENT").setConsumerGroup("cg").setConsumerId("cid").setTimeoutMs(10000) .setLimit(1000).setFilter("").setApiKey_username(null) .setApiSecret_password(null).createMRConsumerImpl(); mrConsumer.setProtocolFlag("HTTPNOAUTH"); final Properties props = new Properties(); props.setProperty("Protocol", "http"); mrConsumer.setProps(props); log.info("fetch {}", mrConsumer.fetch()); } |
...