Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Introduction

Current policy-framework supports different communication endpoints, defined in policy/common/policy-endpoints as below
-> DMaaP
-> UEB
-> NOOP
-> Http Servers
-> Http Clients

...

-> DMaaP (Kafka/Zookeeper backend)
-> Kafka (Kafka/Strimzi backend)
-> UEB
-> NOOP
-> Http Servers
-> Http Clients

What changes are needed in Policy framework application properties?

Following are the properties for topicSources and topicSinks used across all the applications in policy-framework to communicate with certain endpoints. 

...

Code Block
languageyml
spring:
  kafka:
    template:
      default-topic: POLICY-PDP-PAP
    consumer:
      bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: pap-group
    producer:
      bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer


How to run Kafka?

Install any latest stable version of Kafka, edit the server.properties as desired and run

Code Block
languagebash
~/ONAP/kafka_2.12-3.1.0# ls
bin  config  data  libs  LICENSE  licenses  logs  NOTICE  site-docs
~/ONAP/kafka_2.12-3.1.0# bin/kafka-server-start.sh config/server.properties

What changes are needed in POLICY-ENDPOINTS codebase?

Kafka dependencies to be added in policy-endpoints/pom.xml of policy/common repo

...

Code Block
languagebash
        policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicEndpoint.java
        policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactories.java
        policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactory.java
        policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSink.java
        policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactory.java
        policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSource.java
        policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java

Why Spring Kafka?

Spring Kafka brings the simple Spring template programming model with a KafkaTemplate and Message-driven POJOs via @KafkaListener annotation.

...

kafkaTemplate.send(<topic>, <message>);

How to start a sample policy participant application with Kafka?

Code Block
languagebash
@SpringBootApplication
@ComponentScan({
    "org.onap.policy.clamp.controlloop.participant.policy",
    "org.onap.policy.clamp.controlloop.participant.intermediary",
    "org.onap.policy.common.endpoints.event.comm.kafka"})
@ConfigurationPropertiesScan("org.onap.policy.clamp.controlloop.participant.policy.main.parameters")
public class PolicyParticipantApplication {
    public static void main(String[] args) {
        SpringApplication.run(PolicyParticipantApplication.class, args);
    }
}

...

Code Block
languagebash
~/onap/policy/clamp/participant/participant-impl/participant-impl-policy# mvn -Dcheckstyle.skip spring-boot:run
[INFO] Scanning for projects...
[INFO] 
[INFO] --< org.onap.policy.clamp.participant:policy-clamp-participant-impl-policy >--
[INFO] Building policy-clamp-participant-impl-policy 6.2.1-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] >>> spring-boot-maven-plugin:2.5.0:run (default-cli) > test-compile @ policy-clamp-participant-impl-policy >>>
[INFO] 
[INFO] --- maven-enforcer-plugin:3.0.0-M2:enforce (enforce-property) @ policy-clamp-participant-impl-policy ---
[INFO] 
[INFO] --- maven-enforcer-plugin:3.0.0-M2:enforce (enforce-no-snapshots) @ policy-clamp-participant-impl-policy ---
[INFO] 
[INFO] --- jacoco-maven-plugin:0.8.5:prepare-agent (pre-unit-test) @ policy-clamp-participant-impl-policy ---
[INFO] surefireArgLine set to -javaagent:/root/.m2/repository/org/jacoco/org.jacoco.agent/0.8.5/org.jacoco.agent-0.8.5-runtime.jar=destfile=/root/ONAP/onap/policy/clamp/participant/participant-impl/participant-impl-policy/../target/code-coverage/jacoco-ut.exec,append=true,excludes=**/gen/**:**/generated-sources/**:**/yang-gen/**:**/pax/**
[INFO] 
[INFO] --- maven-checkstyle-plugin:3.1.1:check (onap-license) @ policy-clamp-participant-impl-policy ---
[INFO] 
[INFO] --- maven-checkstyle-plugin:3.1.1:check (onap-java-style) @ policy-clamp-participant-impl-policy ---
[INFO] 
[INFO] --- maven-resources-plugin:3.2.0:resources (default-resources) @ policy-clamp-participant-impl-policy ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Using 'UTF-8' encoding to copy filtered properties files.
[INFO] Copying 3 resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.8.1:compile (default-compile) @ policy-clamp-participant-impl-policy ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- maven-resources-plugin:3.2.0:testResources (default-testResources) @ policy-clamp-participant-impl-policy ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Using 'UTF-8' encoding to copy filtered properties files.
[INFO] Copying 36 resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.8.1:testCompile (default-testCompile) @ policy-clamp-participant-impl-policy ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] <<< spring-boot-maven-plugin:2.5.0:run (default-cli) < test-compile @ policy-clamp-participant-impl-policy <<<
[INFO] 
[INFO] 
[INFO] --- spring-boot-maven-plugin:2.5.0:run (default-cli) @ policy-clamp-participant-impl-policy ---
[INFO] Attaching agents: []

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.5.0)

2022-02-16 11:06:19.636  INFO 7087 --- [           main] o.p.c.c.p.p.PolicyParticipantApplication : Starting PolicyParticipantApplication using Java 11.0.11 on astra1786.startdedicated.com with PID 7087 (/root/ONAP/onap/policy/clamp/participant/participant-impl/participant-impl-policy/target/classes started by root in /root/ONAP/onap/policy/clamp/participant/participant-impl/participant-impl-policy)
2022-02-16 11:06:19.642  INFO 7087 --- [           main] o.p.c.c.p.p.PolicyParticipantApplication : No active profile set, falling back to default profiles: default
2022-02-16 11:06:22.276  INFO 7087 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8085 (http)
2022-02-16 11:06:22.290  INFO 7087 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2022-02-16 11:06:22.291  INFO 7087 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.45]
2022-02-16 11:06:22.479  INFO 7087 --- [           main] o.a.c.c.C.[.[.[/onap/policyparticipant]  : Initializing Spring embedded WebApplicationContext
2022-02-16 11:06:22.479  INFO 7087 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 2744 ms
2022-02-16 11:06:24.049  INFO 7087 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 3 endpoint(s) beneath base path '/actuator'
2022-02-16 11:06:24.715  INFO 7087 --- [           main] o.s.s.web.DefaultSecurityFilterChain     : Will secure any request with [org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter@55e4dd68, org.springframework.security.web.context.SecurityContextPersistenceFilter@73852720, org.springframework.security.web.header.HeaderWriterFilter@13047d3d, org.springframework.security.web.csrf.CsrfFilter@69c227fd, org.springframework.security.web.authentication.logout.LogoutFilter@7f5e9949, org.springframework.security.web.authentication.www.BasicAuthenticationFilter@75f2ff80, org.springframework.security.web.savedrequest.RequestCacheAwareFilter@7ae0a26, org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter@f96654, org.springframework.security.web.authentication.AnonymousAuthenticationFilter@28c7fd9d, org.springframework.security.web.session.SessionManagementFilter@33f2cf82, org.springframework.security.web.access.ExceptionTranslationFilter@4e789704, org.springframework.security.web.access.intercept.FilterSecurityInterceptor@2d3eb1ea]
2022-02-16 11:06:24.899  INFO 7087 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-group_id-1
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = group_id
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        socket.connection.setup.timeout.max.ms = 127000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2022-02-16 11:06:25.046  INFO 7087 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.7.1
2022-02-16 11:06:25.047  INFO 7087 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 61dbce85d0d41457
2022-02-16 11:06:25.048  INFO 7087 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1645009585045
2022-02-16 11:06:25.052  INFO 7087 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group_id-1, groupId=group_id] Subscribed to topic(s): POLICY-CLRUNTIME-PARTICIPANT
2022-02-16 11:06:25.097  INFO 7087 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8085 (http) with context path '/onap/policyparticipant'
2022-02-16 11:06:25.488  INFO 7087 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-group_id-1, groupId=group_id] Cluster ID: iBg71lfxSfShVDKKzY1Ebg
2022-02-16 11:06:25.495  INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2022-02-16 11:06:25.542  INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group
2022-02-16 11:06:25.600  INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group
2022-02-16 11:06:25.625  INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully joined group with generation Generation{generationId=49, memberId='consumer-group_id-1-6ffe3410-cf5d-4049-99cb-d7417ac055ec', protocol='range'}
2022-02-16 11:06:25.629  INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Finished assignment for group at generation 49: {consumer-group_id-1-6ffe3410-cf5d-4049-99cb-d7417ac055ec=Assignment(partitions=[POLICY-CLRUNTIME-PARTICIPANT-0])}
2022-02-16 11:06:25.714  INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully synced group in generation Generation{generationId=49, memberId='consumer-group_id-1-6ffe3410-cf5d-4049-99cb-d7417ac055ec', protocol='range'}
2022-02-16 11:06:25.715  INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Notifying assignor about the new Assignment(partitions=[POLICY-CLRUNTIME-PARTICIPANT-0])
2022-02-16 11:06:25.718  INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Adding newly assigned partitions: POLICY-CLRUNTIME-PARTICIPANT-0
2022-02-16 11:06:25.724  INFO 7087 --- [           main] o.o.p.c.utils.services.ServiceManager    : service manager starting
2022-02-16 11:06:25.725  INFO 7087 --- [           main] o.o.p.c.utils.services.ServiceManager    : service manager starting Topic endpoint management
2022-02-16 11:06:25.751  INFO 7087 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Setting offset for partition POLICY-CLRUNTIME-PARTICIPANT-0 to the committed offset FetchPosition{offset=32, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2022-02-16 11:06:25.732  INFO 7087 --- [           main] o.o.p.c.e.e.comm.bus.KafkaTopicEndpoint  : KafkaTopicSource[KafkaTopicEndpoint[TopicBase [servers=[localhost], topic=POLICY-CLRUNTIME-PARTICIPANT, effectiveTopic=POLICY-CLRUNTIME-PARTICIPANT, #recentEvents=0, locked=false, #topicListeners=0]]]: starting
2022-02-16 11:06:25.754  INFO 7087 --- [           main] o.o.p.c.e.e.comm.bus.KafkaTopicEndpoint  : KafkaTopicSink[KafkaTopicEndpoint[TopicBase [servers=[localhost], topic=POLICY-CLRUNTIME-PARTICIPANT, effectiveTopic=POLICY-CLRUNTIME-PARTICIPANT, #recentEvents=0, locked=false, #topicListeners=0]]]: starting
2022-02-16 11:06:25.755  INFO 7087 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group_id: partitions assigned: [POLICY-CLRUNTIME-PARTICIPANT-0]
2022-02-16 11:06:25.756  INFO 7087 --- [           main] o.o.p.c.utils.services.ServiceManager    : service manager starting Publisher ParticipantMessagePublisher
2022-02-16 11:06:25.762  INFO 7087 --- [           main] o.o.p.c.utils.services.ServiceManager    : service manager starting Listener ControlLoopStateChangeListener
2022-02-16 11:06:25.763  INFO 7087 --- [           main] o.o.p.c.utils.services.ServiceManager    : service manager starting Listener ParticipantStatusReqListener
2022-02-16 11:06:25.763  INFO 7087 --- [           main] o.o.p.c.utils.services.ServiceManager    : service manager starting Listener ControlLoopUpdateListener
2022-02-16 11:06:25.764  INFO 7087 --- [           main] o.o.p.c.utils.services.ServiceManager    : service manager starting Listener ParticipantUpdateListener
2022-02-16 11:06:25.764  INFO 7087 --- [           main] o.o.p.c.utils.services.ServiceManager    : service manager starting Listener ParticipantDeregisterAckListener
2022-02-16 11:06:25.765  INFO 7087 --- [           main] o.o.p.c.utils.services.ServiceManager    : service manager starting Listener ParticipantRegisterAckListener
2022-02-16 11:06:25.765  INFO 7087 --- [           main] o.o.p.c.utils.services.ServiceManager    : service manager starting Topic Message Dispatcher
2022-02-16 11:06:25.766  INFO 7087 --- [           main] o.o.p.c.e.e.comm.bus.internal.TopicBase  : KafkaTopicSource[KafkaTopicEndpoint[TopicBase [servers=[localhost], topic=POLICY-CLRUNTIME-PARTICIPANT, effectiveTopic=POLICY-CLRUNTIME-PARTICIPANT, #recentEvents=0, locked=false, #topicListeners=0]]]: registering org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher@5693d1d2
2022-02-16 11:06:25.766  INFO 7087 --- [           main] o.o.p.c.utils.services.ServiceManager    : service manager started
2022-02-16 11:06:26.122  INFO 7087 --- [           main] o.o.p.c.gson.GsonMessageBodyHandler      : Using GSON for REST calls
2022-02-16 11:06:26.135  INFO 7087 --- [           main] o.o.p.c.gson.GsonMessageBodyHandler      : Using GSON for REST calls
2022-02-16 11:06:26.277  INFO 7087 --- [           main] o.p.c.c.p.p.PolicyParticipantApplication : Started PolicyParticipantApplication in 7.422 seconds (JVM running for 8.24)
2022-02-16 11:06:26.286  INFO 7087 --- [           main] o.s.b.a.ApplicationAvailabilityBean      : Application availability state LivenessState changed to CORRECT
2022-02-16 11:06:26.290  INFO 7087 --- [           main] o.s.b.a.ApplicationAvailabilityBean      : Application availability state ReadinessState changed to ACCEPTING_TRAFFIC

What are current challenges?

  • policy-common is not converted to Springboot.
  • Policy-endpoint classes are being converted to Springboot components to achieve reuse.

Summary

  • Policy-framework applications and current communication infrastructure will-not experience any code changes
  • Include Kafka properties and change topicCommInfrastructure to Kafka, if communication if desired over Kafka then current Dmaap communication
  • Run desired version of Kafka with needed Kafka server properties instead of Dmaap
  • This endpoint will be a managed end-point and lifecycle management will be taken care by respective changes in policy-common repo.

...