Introduction
Current policy-framework supports different communication endpoints, defined in policy/common/policy-endpoints as below
-> DMaaP
-> UEB
-> NOOP
-> Http Servers
-> Http Clients
DMaaP, UEB, and NOOP are message-based communication infrastructures which operate in asynchronous unidirectional mode, hence the terminology of source and sinks, to denote their directionality into or out of the controller, respectively.
Http, which is synchronous bi-directional communication with remote endpoints is out-of-scope of this page.
This investigation focuses on including Kubernetes-friendly Kafka/Strimzi backend, as another communication endpoint choice
-> DMaaP (Kafka/Zookeeper backend)
-> Kafka (Kafka/Strimzi backend)
-> UEB
-> NOOP
-> Http Servers
-> Http Clients
What changes are needed in Policy framework application properties?
Following are the properties for topicSources and topicSinks used across all the applications in policy-framework to communicate with certain endpoints.
topicCommInfrastructure dictates which communication end point to pass the messages to, and policy/common/endpoints will bring up respective communication infrastructure according to the chosen option
topicParameterGroup: topicSources: - topic: POLICY-PDP-PAP servers: - message-router topicCommInfrastructure: dmaap fetchTimeout: 15000 - topic: POLICY-HEARTBEAT effectiveTopic: POLICY-PDP-PAP consumerGroup: policy-pap servers: - message-router topicCommInfrastructure: dmaap fetchTimeout: 15000 topicSinks: - topic: POLICY-PDP-PAP servers: - message-router topicCommInfrastructure: dmaap - topic: POLICY-NOTIFICATION servers: - message-router topicCommInfrastructure: dmaap
Changing topicCommInfrastructure: kafka in application properties should be able to allow messaging to be carried over Kafka.
Additional optional kafka properties needed in application properties of any spring-boot application
spring: kafka: template: default-topic: POLICY-PDP-PAP consumer: bootstrap-servers: localhost:9092,localhost:9093,localhost:9094 key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: pap-group producer: bootstrap-servers: localhost:9092,localhost:9093,localhost:9094 key-serializer: org.apache.kafka.common.serialization.IntegerSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
How to run Kafka?
Install any latest stable version of Kafka, edit the server.properties as desired and run
~/ONAP/kafka_2.12-3.1.0# ls bin config data libs LICENSE licenses logs NOTICE site-docs ~/ONAP/kafka_2.12-3.1.0# bin/kafka-server-start.sh config/server.properties
What changes are needed in POLICY-ENDPOINTS codebase?
Kafka dependencies to be added in policy-endpoints/pom.xml of policy/common repo
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.7.2</version> </dependency>
New communication infrastructure type to be introduced in policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java
/** * Underlying Communication infrastructure Types. */ enum CommInfrastructure { /** * UEB Communication Infrastructure. */ UEB, /** * DMAAP Communication Infrastructure. */ DMAAP, /** * KAFKA Communication Infrastructure. */ KAFKA, /** * NOOP for internal use only. */ NOOP, /** * REST Communication Infrastructure. */ REST }
New Kafka properties should be defined in PolicyEndPointProperties
/* KAFKA Properties */ public static final String PROPERTY_KAFKA_SOURCE_TOPICS = "kafka.source.topics"; public static final String PROPERTY_KAFKA_SINK_TOPICS = "kafka.sink.topics";
Respective additions of topic infrastructure are needed in TopicEndpoint and TopicEndpointProxy
Kafka TopicSink, TopicSource, Factory classes need to be defined.
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicEndpoint.java policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactories.java policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactory.java policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSink.java policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactory.java policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSource.java policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java
Why Spring Kafka?
Spring Kafka brings the simple Spring template programming model with a KafkaTemplate and Message-driven POJOs via @KafkaListener annotation.
With @Autowired annotation in spring, it automatically injects the dependent beans into the associated references of a POJO class.
KafkaTemplate<?, ?> kafkaTemplate ProducerListener<Object, Object> kafkaProducerListener ConsumerFactory<?, ?> kafkaConsumerFactory ProducerFactory<?, ?> kafkaProducerFactory KafkaTransactionManager<?, ?> kafkaTransactionManager KafkaJaasLoginModuleInitializer kafkaJaasInitializer KafkaAdmin kafkaAdmin ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory KafkaStreamsConfiguration defaultKafkaStreamsConfig KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer
Spring Boot provides a comprehensive enough auto-configuration for Spring Kafka: https://docs.spring.io/spring-boot/docs/2.4.3/reference/html/spring-boot-features.html#boot-features-kafka.
Kafka producer and consumer configuration, with @EnableKafka and @Configuration annotations
Listeners and Resources to be defined policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/kafka/
@KafkaListener(topics = <topic-name>) will be used to listen to the messages
and an autowired KafkaTemplate instance to publish messages
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
kafkaTemplate.send(<topic>, <message>);
How to start a sample policy participant application with Kafka?
@SpringBootApplication @ComponentScan({ "org.onap.policy.clamp.controlloop.participant.policy", "org.onap.policy.clamp.controlloop.participant.intermediary", "org.onap.policy.common.endpoints.event.comm.kafka"}) @ConfigurationPropertiesScan("org.onap.policy.clamp.controlloop.participant.policy.main.parameters") public class PolicyParticipantApplication { public static void main(String[] args) { SpringApplication.run(PolicyParticipantApplication.class, args); } }
~/onap/policy/clamp/participant/participant-impl/participant-impl-policy# mvn -Dcheckstyle.skip spring-boot:run [INFO] Scanning for projects... [INFO] [INFO] --< org.onap.policy.clamp.participant:policy-clamp-participant-impl-policy >-- [INFO] Building policy-clamp-participant-impl-policy 6.2.1-SNAPSHOT [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] >>> spring-boot-maven-plugin:2.5.0:run (default-cli) > test-compile @ policy-clamp-participant-impl-policy >>> [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M2:enforce (enforce-property) @ policy-clamp-participant-impl-policy --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M2:enforce (enforce-no-snapshots) @ policy-clamp-participant-impl-policy --- [INFO] [INFO] --- jacoco-maven-plugin:0.8.5:prepare-agent (pre-unit-test) @ policy-clamp-participant-impl-policy --- [INFO] surefireArgLine set to -javaagent:/root/.m2/repository/org/jacoco/org.jacoco.agent/0.8.5/org.jacoco.agent-0.8.5-runtime.jar=destfile=/root/ONAP/onap/policy/clamp/participant/participant-impl/participant-impl-policy/../target/code-coverage/jacoco-ut.exec,append=true,excludes=**/gen/**:**/generated-sources/**:**/yang-gen/**:**/pax/** [INFO] [INFO] --- maven-checkstyle-plugin:3.1.1:check (onap-license) @ policy-clamp-participant-impl-policy --- [INFO] [INFO] --- maven-checkstyle-plugin:3.1.1:check (onap-java-style) @ policy-clamp-participant-impl-policy --- [INFO] [INFO] --- maven-resources-plugin:3.2.0:resources (default-resources) @ policy-clamp-participant-impl-policy --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] Using 'UTF-8' encoding to copy filtered properties files. [INFO] Copying 3 resources [INFO] [INFO] --- maven-compiler-plugin:3.8.1:compile (default-compile) @ policy-clamp-participant-impl-policy --- [INFO] Nothing to compile - all classes are up to date [INFO] [INFO] --- maven-resources-plugin:3.2.0:testResources (default-testResources) @ policy-clamp-participant-impl-policy --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] Using 'UTF-8' encoding to copy filtered properties files. [INFO] Copying 36 resources [INFO] [INFO] --- maven-compiler-plugin:3.8.1:testCompile (default-testCompile) @ policy-clamp-participant-impl-policy --- [INFO] Nothing to compile - all classes are up to date [INFO] [INFO] <<< spring-boot-maven-plugin:2.5.0:run (default-cli) < test-compile @ policy-clamp-participant-impl-policy <<< [INFO] [INFO] [INFO] --- spring-boot-maven-plugin:2.5.0:run (default-cli) @ policy-clamp-participant-impl-policy --- [INFO] Attaching agents: [] . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.5.0) 2022-02-16 11:06:19.636 INFO 7087 --- [ main] o.p.c.c.p.p.PolicyParticipantApplication : Starting PolicyParticipantApplication using Java 11.0.11 on astra1786.startdedicated.com with PID 7087 (/root/ONAP/onap/policy/clamp/participant/participant-impl/participant-impl-policy/target/classes started by root in /root/ONAP/onap/policy/clamp/participant/participant-impl/participant-impl-policy) 2022-02-16 11:06:19.642 INFO 7087 --- [ main] o.p.c.c.p.p.PolicyParticipantApplication : No active profile set, falling back to default profiles: default 2022-02-16 11:06:22.276 INFO 7087 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8085 (http) 2022-02-16 11:06:22.290 INFO 7087 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat] 2022-02-16 11:06:22.291 INFO 7087 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.45] 2022-02-16 11:06:22.479 INFO 7087 --- [ main] o.a.c.c.C.[.[.[/onap/policyparticipant] : Initializing Spring embedded WebApplicationContext 2022-02-16 11:06:22.479 INFO 7087 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 2744 ms 2022-02-16 11:06:24.049 INFO 7087 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 3 endpoint(s) beneath base path '/actuator' 2022-02-16 11:06:24.715 INFO 7087 --- [ main] o.s.s.web.DefaultSecurityFilterChain : Will secure any request with [org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter@55e4dd68, org.springframework.security.web.context.SecurityContextPersistenceFilter@73852720, org.springframework.security.web.header.HeaderWriterFilter@13047d3d, org.springframework.security.web.csrf.CsrfFilter@69c227fd, org.springframework.security.web.authentication.logout.LogoutFilter@7f5e9949, org.springframework.security.web.authentication.www.BasicAuthenticationFilter@75f2ff80, org.springframework.security.web.savedrequest.RequestCacheAwareFilter@7ae0a26, org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter@f96654, org.springframework.security.web.authentication.AnonymousAuthenticationFilter@28c7fd9d, org.springframework.security.web.session.SessionManagementFilter@33f2cf82, org.springframework.security.web.access.ExceptionTranslationFilter@4e789704, org.springframework.security.web.access.intercept.FilterSecurityInterceptor@2d3eb1ea] 2022-02-16 11:06:24.899 INFO 7087 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-group_id-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = group_id group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 socket.connection.setup.timeout.max.ms = 127000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2022-02-16 11:06:25.046 INFO 7087 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.7.1 2022-02-16 11:06:25.047 INFO 7087 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 61dbce85d0d41457 2022-02-16 11:06:25.048 INFO 7087 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1645009585045 2022-02-16 11:06:25.052 INFO 7087 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-group_id-1, groupId=group_id] Subscribed to topic(s): POLICY-CLRUNTIME-PARTICIPANT 2022-02-16 11:06:25.097 INFO 7087 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8085 (http) with context path '/onap/policyparticipant' 2022-02-16 11:06:25.488 INFO 7087 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-group_id-1, groupId=group_id] Cluster ID: iBg71lfxSfShVDKKzY1Ebg 2022-02-16 11:06:25.495 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null) 2022-02-16 11:06:25.542 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group 2022-02-16 11:06:25.600 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group 2022-02-16 11:06:25.625 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully joined group with generation Generation{generationId=49, memberId='consumer-group_id-1-6ffe3410-cf5d-4049-99cb-d7417ac055ec', protocol='range'} 2022-02-16 11:06:25.629 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Finished assignment for group at generation 49: {consumer-group_id-1-6ffe3410-cf5d-4049-99cb-d7417ac055ec=Assignment(partitions=[POLICY-CLRUNTIME-PARTICIPANT-0])} 2022-02-16 11:06:25.714 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully synced group in generation Generation{generationId=49, memberId='consumer-group_id-1-6ffe3410-cf5d-4049-99cb-d7417ac055ec', protocol='range'} 2022-02-16 11:06:25.715 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Notifying assignor about the new Assignment(partitions=[POLICY-CLRUNTIME-PARTICIPANT-0]) 2022-02-16 11:06:25.718 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Adding newly assigned partitions: POLICY-CLRUNTIME-PARTICIPANT-0 2022-02-16 11:06:25.724 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting 2022-02-16 11:06:25.725 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Topic endpoint management 2022-02-16 11:06:25.751 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Setting offset for partition POLICY-CLRUNTIME-PARTICIPANT-0 to the committed offset FetchPosition{offset=32, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}} 2022-02-16 11:06:25.732 INFO 7087 --- [ main] o.o.p.c.e.e.comm.bus.KafkaTopicEndpoint : KafkaTopicSource[KafkaTopicEndpoint[TopicBase [servers=[localhost], topic=POLICY-CLRUNTIME-PARTICIPANT, effectiveTopic=POLICY-CLRUNTIME-PARTICIPANT, #recentEvents=0, locked=false, #topicListeners=0]]]: starting 2022-02-16 11:06:25.754 INFO 7087 --- [ main] o.o.p.c.e.e.comm.bus.KafkaTopicEndpoint : KafkaTopicSink[KafkaTopicEndpoint[TopicBase [servers=[localhost], topic=POLICY-CLRUNTIME-PARTICIPANT, effectiveTopic=POLICY-CLRUNTIME-PARTICIPANT, #recentEvents=0, locked=false, #topicListeners=0]]]: starting 2022-02-16 11:06:25.755 INFO 7087 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group_id: partitions assigned: [POLICY-CLRUNTIME-PARTICIPANT-0] 2022-02-16 11:06:25.756 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Publisher ParticipantMessagePublisher 2022-02-16 11:06:25.762 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Listener ControlLoopStateChangeListener 2022-02-16 11:06:25.763 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Listener ParticipantStatusReqListener 2022-02-16 11:06:25.763 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Listener ControlLoopUpdateListener 2022-02-16 11:06:25.764 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Listener ParticipantUpdateListener 2022-02-16 11:06:25.764 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Listener ParticipantDeregisterAckListener 2022-02-16 11:06:25.765 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Listener ParticipantRegisterAckListener 2022-02-16 11:06:25.765 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Topic Message Dispatcher 2022-02-16 11:06:25.766 INFO 7087 --- [ main] o.o.p.c.e.e.comm.bus.internal.TopicBase : KafkaTopicSource[KafkaTopicEndpoint[TopicBase [servers=[localhost], topic=POLICY-CLRUNTIME-PARTICIPANT, effectiveTopic=POLICY-CLRUNTIME-PARTICIPANT, #recentEvents=0, locked=false, #topicListeners=0]]]: registering org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher@5693d1d2 2022-02-16 11:06:25.766 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager started 2022-02-16 11:06:26.122 INFO 7087 --- [ main] o.o.p.c.gson.GsonMessageBodyHandler : Using GSON for REST calls 2022-02-16 11:06:26.135 INFO 7087 --- [ main] o.o.p.c.gson.GsonMessageBodyHandler : Using GSON for REST calls 2022-02-16 11:06:26.277 INFO 7087 --- [ main] o.p.c.c.p.p.PolicyParticipantApplication : Started PolicyParticipantApplication in 7.422 seconds (JVM running for 8.24) 2022-02-16 11:06:26.286 INFO 7087 --- [ main] o.s.b.a.ApplicationAvailabilityBean : Application availability state LivenessState changed to CORRECT 2022-02-16 11:06:26.290 INFO 7087 --- [ main] o.s.b.a.ApplicationAvailabilityBean : Application availability state ReadinessState changed to ACCEPTING_TRAFFIC
What are current challenges?
- policy-common is not converted to Springboot.
- Policy-endpoint classes are being converted to Springboot components to achieve reuse.
Summary
- Policy-framework applications and current communication infrastructure will-not experience any code changes
- Include Kafka properties and change topicCommInfrastructure to Kafka, if communication if desired over Kafka then current Dmaap communication
- Run desired version of Kafka with needed Kafka server properties instead of Dmaap
- This endpoint will be a managed end-point and lifecycle management will be taken care by respective changes in policy-common repo.