CPS-526 - Getting issue details... STATUS

Overview

CPS core sends the data updated event to Kafka synchronously which increases the Data node API response time. To avoid the impact, the cps-core notification feature will process the events asynchronously using spring async.

Solution

Spring async method is well documented and more details can be found at https://spring.io/guides/gs/async-method/ 

In the proposed implementation, we need to

  • Enable async processing

    @EnableAsync
    public class AsyncConfig {
    ...
    }
  • Define the thread pool which will execute the async method. The thread pool setup properties are configurable.

    AsyncConfig
    @ConfigurationProperties("notification.async-executor")
    @Validated
    public class AsyncConfig {
    
    	@Min(0)
        private int corePoolSize = 2;
        @Min(2)
        private int maxPoolSize = 10;
        @Min(0)
        private int queueCapacity = 2147483647;
        private boolean waitForTasksToCompleteOnShutdown = true;
        private String threadNamePrefix = "Async-";
    
    	@Bean("notificationExecutor")
        public TaskExecutor getThreadAsyncExecutorForNotification() {
            final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(corePoolSize);
            executor.setMaxPoolSize(maxPoolSize);
            executor.setQueueCapacity(queueCapacity);
            executor.setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown);
            executor.setThreadNamePrefix(threadNamePrefix);
            return executor;
        }
    }
    application.yml
    notification:
        async-executor:
            core-pool-size: 2
            max-pool-size: 10
            queue-capacity: 500
            wait-for-tasks-to-complete-on-shutdown: true
            thread-name-prefix: Async-
  • Mark the method async and mention the thread-pool name

    NotificationService
    @Async("notificationExecutor")
    public Future<Void> processDataUpdatedEvent(final String dataspaceName, final String anchorName) {
    ...	
    }



  • No labels