Table of Contents |
---|
Introduction
Current policy-framework supports different communication endpoints, defined in policy/common/policy-endpoints as below
-> DMaaP
-> UEB
-> NOOP
-> Http Servers
-> Http Clients
...
-> DMaaP (Kafka/Zookeeper backend)
-> Kafka (Kafka/Strimzi backend)
-> UEB
-> NOOP
-> Http Servers
-> Http Clients
What changes are needed in Policy framework application properties?
Following are the properties for topicSources and topicSinks used across all the applications in policy-framework to communicate with certain endpoints.
...
Code Block | ||
---|---|---|
| ||
spring: kafka: template: default-topic: POLICY-PDP-PAP consumer: bootstrap-servers: localhost:9092,localhost:9093,localhost:9094 key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: pap-group producer: bootstrap-servers: localhost:9092,localhost:9093,localhost:9094 key-serializer: org.apache.kafka.common.serialization.IntegerSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer |
How to run Kafka?
Install any latest stable version of Kafka, edit the server.properties as desired and run
Code Block | ||
---|---|---|
| ||
~/ONAP/kafka_2.12-3.1.0# ls bin config data libs LICENSE licenses logs NOTICE site-docs ~/ONAP/kafka_2.12-3.1.0# bin/kafka-server-start.sh config/server.properties |
What changes are needed in POLICY-ENDPOINTS codebase?
Kafka dependencies to be added in policy-endpoints/pom.xml of policy/common repo
...
Code Block | ||
---|---|---|
| ||
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicEndpoint.java policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactories.java policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactory.java policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSink.java policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactory.java policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSource.java policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java |
Why Spring Kafka?
Spring Kafka brings the simple Spring template programming model with a KafkaTemplate and Message-driven POJOs via @KafkaListener annotation.
...
kafkaTemplate.send(<topic>, <message>);
How to start a sample policy participant application with Kafka?
Code Block | ||
---|---|---|
| ||
@SpringBootApplication @ComponentScan({ "org.onap.policy.clamp.controlloop.participant.policy", "org.onap.policy.clamp.controlloop.participant.intermediary", "org.onap.policy.common.endpoints.event.comm.kafka"}) @ConfigurationPropertiesScan("org.onap.policy.clamp.controlloop.participant.policy.main.parameters") public class PolicyParticipantApplication { public static void main(String[] args) { SpringApplication.run(PolicyParticipantApplication.class, args); } } |
...
Code Block | ||
---|---|---|
| ||
~/onap/policy/clamp/participant/participant-impl/participant-impl-policy# mvn -Dcheckstyle.skip spring-boot:run [INFO] Scanning for projects... [INFO] [INFO] --< org.onap.policy.clamp.participant:policy-clamp-participant-impl-policy >-- [INFO] Building policy-clamp-participant-impl-policy 6.2.1-SNAPSHOT [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] >>> spring-boot-maven-plugin:2.5.0:run (default-cli) > test-compile @ policy-clamp-participant-impl-policy >>> [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M2:enforce (enforce-property) @ policy-clamp-participant-impl-policy --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M2:enforce (enforce-no-snapshots) @ policy-clamp-participant-impl-policy --- [INFO] [INFO] --- jacoco-maven-plugin:0.8.5:prepare-agent (pre-unit-test) @ policy-clamp-participant-impl-policy --- [INFO] surefireArgLine set to -javaagent:/root/.m2/repository/org/jacoco/org.jacoco.agent/0.8.5/org.jacoco.agent-0.8.5-runtime.jar=destfile=/root/ONAP/onap/policy/clamp/participant/participant-impl/participant-impl-policy/../target/code-coverage/jacoco-ut.exec,append=true,excludes=**/gen/**:**/generated-sources/**:**/yang-gen/**:**/pax/** [INFO] [INFO] --- maven-checkstyle-plugin:3.1.1:check (onap-license) @ policy-clamp-participant-impl-policy --- [INFO] [INFO] --- maven-checkstyle-plugin:3.1.1:check (onap-java-style) @ policy-clamp-participant-impl-policy --- [INFO] [INFO] --- maven-resources-plugin:3.2.0:resources (default-resources) @ policy-clamp-participant-impl-policy --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] Using 'UTF-8' encoding to copy filtered properties files. [INFO] Copying 3 resources [INFO] [INFO] --- maven-compiler-plugin:3.8.1:compile (default-compile) @ policy-clamp-participant-impl-policy --- [INFO] Nothing to compile - all classes are up to date [INFO] [INFO] --- maven-resources-plugin:3.2.0:testResources (default-testResources) @ policy-clamp-participant-impl-policy --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] Using 'UTF-8' encoding to copy filtered properties files. [INFO] Copying 36 resources [INFO] [INFO] --- maven-compiler-plugin:3.8.1:testCompile (default-testCompile) @ policy-clamp-participant-impl-policy --- [INFO] Nothing to compile - all classes are up to date [INFO] [INFO] <<< spring-boot-maven-plugin:2.5.0:run (default-cli) < test-compile @ policy-clamp-participant-impl-policy <<< [INFO] [INFO] [INFO] --- spring-boot-maven-plugin:2.5.0:run (default-cli) @ policy-clamp-participant-impl-policy --- [INFO] Attaching agents: [] . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.5.0) 2022-02-16 11:06:19.636 INFO 7087 --- [ main] o.p.c.c.p.p.PolicyParticipantApplication : Starting PolicyParticipantApplication using Java 11.0.11 on astra1786.startdedicated.com with PID 7087 (/root/ONAP/onap/policy/clamp/participant/participant-impl/participant-impl-policy/target/classes started by root in /root/ONAP/onap/policy/clamp/participant/participant-impl/participant-impl-policy) 2022-02-16 11:06:19.642 INFO 7087 --- [ main] o.p.c.c.p.p.PolicyParticipantApplication : No active profile set, falling back to default profiles: default 2022-02-16 11:06:22.276 INFO 7087 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8085 (http) 2022-02-16 11:06:22.290 INFO 7087 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat] 2022-02-16 11:06:22.291 INFO 7087 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.45] 2022-02-16 11:06:22.479 INFO 7087 --- [ main] o.a.c.c.C.[.[.[/onap/policyparticipant] : Initializing Spring embedded WebApplicationContext 2022-02-16 11:06:22.479 INFO 7087 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 2744 ms 2022-02-16 11:06:24.049 INFO 7087 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 3 endpoint(s) beneath base path '/actuator' 2022-02-16 11:06:24.715 INFO 7087 --- [ main] o.s.s.web.DefaultSecurityFilterChain : Will secure any request with [org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter@55e4dd68, org.springframework.security.web.context.SecurityContextPersistenceFilter@73852720, org.springframework.security.web.header.HeaderWriterFilter@13047d3d, org.springframework.security.web.csrf.CsrfFilter@69c227fd, org.springframework.security.web.authentication.logout.LogoutFilter@7f5e9949, org.springframework.security.web.authentication.www.BasicAuthenticationFilter@75f2ff80, org.springframework.security.web.savedrequest.RequestCacheAwareFilter@7ae0a26, org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter@f96654, org.springframework.security.web.authentication.AnonymousAuthenticationFilter@28c7fd9d, org.springframework.security.web.session.SessionManagementFilter@33f2cf82, org.springframework.security.web.access.ExceptionTranslationFilter@4e789704, org.springframework.security.web.access.intercept.FilterSecurityInterceptor@2d3eb1ea] 2022-02-16 11:06:24.899 INFO 7087 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-group_id-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = group_id group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 socket.connection.setup.timeout.max.ms = 127000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2022-02-16 11:06:25.046 INFO 7087 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.7.1 2022-02-16 11:06:25.047 INFO 7087 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 61dbce85d0d41457 2022-02-16 11:06:25.048 INFO 7087 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1645009585045 2022-02-16 11:06:25.052 INFO 7087 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-group_id-1, groupId=group_id] Subscribed to topic(s): POLICY-CLRUNTIME-PARTICIPANT 2022-02-16 11:06:25.097 INFO 7087 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8085 (http) with context path '/onap/policyparticipant' 2022-02-16 11:06:25.488 INFO 7087 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-group_id-1, groupId=group_id] Cluster ID: iBg71lfxSfShVDKKzY1Ebg 2022-02-16 11:06:25.495 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null) 2022-02-16 11:06:25.542 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group 2022-02-16 11:06:25.600 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group 2022-02-16 11:06:25.625 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully joined group with generation Generation{generationId=49, memberId='consumer-group_id-1-6ffe3410-cf5d-4049-99cb-d7417ac055ec', protocol='range'} 2022-02-16 11:06:25.629 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Finished assignment for group at generation 49: {consumer-group_id-1-6ffe3410-cf5d-4049-99cb-d7417ac055ec=Assignment(partitions=[POLICY-CLRUNTIME-PARTICIPANT-0])} 2022-02-16 11:06:25.714 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully synced group in generation Generation{generationId=49, memberId='consumer-group_id-1-6ffe3410-cf5d-4049-99cb-d7417ac055ec', protocol='range'} 2022-02-16 11:06:25.715 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Notifying assignor about the new Assignment(partitions=[POLICY-CLRUNTIME-PARTICIPANT-0]) 2022-02-16 11:06:25.718 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Adding newly assigned partitions: POLICY-CLRUNTIME-PARTICIPANT-0 2022-02-16 11:06:25.724 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting 2022-02-16 11:06:25.725 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Topic endpoint management 2022-02-16 11:06:25.751 INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Setting offset for partition POLICY-CLRUNTIME-PARTICIPANT-0 to the committed offset FetchPosition{offset=32, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}} 2022-02-16 11:06:25.732 INFO 7087 --- [ main] o.o.p.c.e.e.comm.bus.KafkaTopicEndpoint : KafkaTopicSource[KafkaTopicEndpoint[TopicBase [servers=[localhost], topic=POLICY-CLRUNTIME-PARTICIPANT, effectiveTopic=POLICY-CLRUNTIME-PARTICIPANT, #recentEvents=0, locked=false, #topicListeners=0]]]: starting 2022-02-16 11:06:25.754 INFO 7087 --- [ main] o.o.p.c.e.e.comm.bus.KafkaTopicEndpoint : KafkaTopicSink[KafkaTopicEndpoint[TopicBase [servers=[localhost], topic=POLICY-CLRUNTIME-PARTICIPANT, effectiveTopic=POLICY-CLRUNTIME-PARTICIPANT, #recentEvents=0, locked=false, #topicListeners=0]]]: starting 2022-02-16 11:06:25.755 INFO 7087 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group_id: partitions assigned: [POLICY-CLRUNTIME-PARTICIPANT-0] 2022-02-16 11:06:25.756 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Publisher ParticipantMessagePublisher 2022-02-16 11:06:25.762 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Listener ControlLoopStateChangeListener 2022-02-16 11:06:25.763 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Listener ParticipantStatusReqListener 2022-02-16 11:06:25.763 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Listener ControlLoopUpdateListener 2022-02-16 11:06:25.764 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Listener ParticipantUpdateListener 2022-02-16 11:06:25.764 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Listener ParticipantDeregisterAckListener 2022-02-16 11:06:25.765 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Listener ParticipantRegisterAckListener 2022-02-16 11:06:25.765 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager starting Topic Message Dispatcher 2022-02-16 11:06:25.766 INFO 7087 --- [ main] o.o.p.c.e.e.comm.bus.internal.TopicBase : KafkaTopicSource[KafkaTopicEndpoint[TopicBase [servers=[localhost], topic=POLICY-CLRUNTIME-PARTICIPANT, effectiveTopic=POLICY-CLRUNTIME-PARTICIPANT, #recentEvents=0, locked=false, #topicListeners=0]]]: registering org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher@5693d1d2 2022-02-16 11:06:25.766 INFO 7087 --- [ main] o.o.p.c.utils.services.ServiceManager : service manager started 2022-02-16 11:06:26.122 INFO 7087 --- [ main] o.o.p.c.gson.GsonMessageBodyHandler : Using GSON for REST calls 2022-02-16 11:06:26.135 INFO 7087 --- [ main] o.o.p.c.gson.GsonMessageBodyHandler : Using GSON for REST calls 2022-02-16 11:06:26.277 INFO 7087 --- [ main] o.p.c.c.p.p.PolicyParticipantApplication : Started PolicyParticipantApplication in 7.422 seconds (JVM running for 8.24) 2022-02-16 11:06:26.286 INFO 7087 --- [ main] o.s.b.a.ApplicationAvailabilityBean : Application availability state LivenessState changed to CORRECT 2022-02-16 11:06:26.290 INFO 7087 --- [ main] o.s.b.a.ApplicationAvailabilityBean : Application availability state ReadinessState changed to ACCEPTING_TRAFFIC |
...
What are current challenges?
- policy-common is not converted to Springboot.
- Policy-endpoint classes are being converted to Springboot components to achieve reuse.
Summary
- Policy-framework applications and current communication infrastructure will-not experience any code changes
- Include Kafka properties and change topicCommInfrastructure to Kafka, if communication if desired over Kafka then current Dmaap communication
- Run desired version of Kafka with needed Kafka server properties instead of Dmaap
- This endpoint will be a managed end-point and lifecycle management will be taken care by respective changes in policy-common repo.
...