ThreadPoolTaskExecutor (2)

💻 Programming/Java

SpringBoot에서 ThreadPoolTaskExecutor 설정하기

이번 포스팅에서는 SpringBoot에서 ThreadPoolTaskExecutor를 어떻게 간단하게 설정하여 사용할 수 있는지 알려드립니다.

SpringBoot 프로젝트는 이미 구성되어 있다는 전제하에 Java11, SpringBoot 2.4.2 기준으로 작성했습니다.

참고로 ThreadPoolTaskExecutor의 설정 및 구체적인 사용법에 대해서는 이전에 포스팅한 문서를 참고하시기 바랍니다.

 

ThreadPoolTaskExecutor의 사용법

 

ThreadPoolTaskExecutor를 이용하여 비동기 처리하기(멀티쓰레드)

async(비동기) 처리를 위한 ThreadPoolTaskExecutor ThreadPoolTaskExecutor를 이용하여 비동기처리하는 방법을 알아보겠습니다. ThreadPoolTaskExecutor는 스프링에서 제공해주는 클래스로 org.springframework.s..

keichee.tistory.com

ThreadPoolTaskExecutor Bean 등록하기

우선 ThreadPoolTaskExecutor를 빈으로 등록하도록 하겠습니다. 

TaskExecutorConfig 클래스를 만들고 @Configuration 어노테이션을 추가하여 설정을 위한 클래스를 하나 만들었습니다.

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class TaskExecutorConfig {

	@Bean(name = "executor")
	public ThreadPoolTaskExecutor executor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setThreadNamePrefix("my-");
		return executor;
	}
}

그리고 executor() 메서드를 하나 만들어서 ThreadPoolTaskExecutor를 생성하여 반환해주도록 하고 @Bean으로 등록하면서 name 속성을 지정해주었습니다. 참고로 name 속성의 값은 메서드명과 달라고 무관합니다. 또한, 해당 executor를 이용하여 쓰레드를 실행시켰을 때 내가 만든 ThreadPoolTaskExecutor가 사용되는 것인지 확인할 수 있도록 쓰레드명prefix를 "my-"로 세팅해주었습니다. 

이전 포스팅을 읽어보셨거나 또는 ThreadPoolTaskExecutor를 직업 new 키워드로 생성해서 사용해보신 분은 눈치채셨을 수도 있을텐데요, 여기서 initialize()를 하지 않았습니다. 공식문서에는 initialize 메서드를 호출하는 것으로 나와있지만 굳이 하지 않아도 빈으로 등록될 때 initialize를 하더군요.

 

@Autowired 로 ThreadPoolTaskExecutor 사용하기

아래는 빈으로 등록한 executor를 서비스 레이어에서 autowired하여 사용하는 예제입니다.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Service
public class TestService {

  @Autowired
  @Qualifier("executor")
  private ThreadPoolTaskExecutor executor;

  public void executeThreads() {
      log.info("executing threads....");
      Runnable r = () -> {
          try {
              log.info(Thread.currentThread().getName() + ", Now sleeping 10 seconds...");
              Thread.sleep(10000);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      };

      for (int i = 0; i < 10; i++) {
          executor.execute(r);
      }
  }
}

TestService라는 서비스 클래스에서 멤버변수 executor에 위에서 빈으로 등록한 객체를 쓰도록 @Qualifier로 executor 이름을 명시해주었습니다.

위 코드를 실행해보면 쓰레드 명이 my-1 로 출력되는 것을 확인하실 수 있습니다. corePoolSize를 default 값인 1 로 사용하여 싱글 쓰레드로 실행되기 때문에 my-2, my-3은 보이지 않습니다. corePoolSize 를 2 이상으로 세팅해주면 my-2, my-3 등의 쓰레드도 확인가능합니다.

2021-01-26 18:16:23.147  INFO 16390 --- [           my-1] com.keichee.test.service.TestService     : my-1, Now sleeping 10 seconds...
2021-01-26 18:16:33.150  INFO 16390 --- [           my-1] com.keichee.test.service.TestService     : my-1, Now sleeping 10 seconds...
2021-01-26 18:16:43.152  INFO 16390 --- [           my-1] com.keichee.test.service.TestService     : my-1, Now sleeping 10 seconds...
2021-01-26 18:16:53.157  INFO 16390 --- [           my-1] com.keichee.test.service.TestService     : my-1, Now sleeping 10 seconds...

 

@Async로 ThreadPoolTaskExecutor 사용하기

@Async 어노테이션을 이용하여 위에서 빈으로 등록한 ThreadPoolTaskExecutor를 사용하려면 @EnableAsync 도 추가해주어야 합니다.

따라서 TaskExecutorConfig 클래스에 @EnableAsync 어노테이션을 추가해줍니다.

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@EnableAsync
@Configuration
public class TaskExecutorConfig {

	@Bean(name = "executor")
	public ThreadPoolTaskExecutor executor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setThreadNamePrefix("my-");
		return executor;
	}
}

 

그리고 서비스 레이어의 메서드를 하나 추가하겠습니다.

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Service
public class TestService {

    @Async("executor")
    public void task() {
        try {
            log.info(Thread.currentThread().getName() + ", Now sleeping 10 seconds...");
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

이제 controller 쪽에서 task() 메서드를 loop 돌면서 호출해보면 @Autowired 를 이용한 방법과 동일하게 로그가 출력되는 것을 확인하실 수 있을 겁니다.

 

이상으로 SpringBoot에서 ThreadPoolTaskExecutor 설정하여 사용하는 방법에 대해서 알아보았습니다.

간략하게 다시 정리해보자면

 

1. @Configuration 으로 등록한 클래스에 executor @Bean 추가 (@Async 를 이용할 경우 @EnableAsync 도 추가)

2. @Autowired @Qualifier 로 주입하여 사용하거나 또는 메서드 레벨에 @Async 를 붙여 사용

 

이렇게 정리할 수 있겠네요.

 

그럼 오늘도 행복한 코딩하세요~

async(비동기) 처리를 위한 ThreadPoolTaskExecutor

ThreadPoolTaskExecutor를 이용하여 비동기처리하는 방법을 알아보겠습니다.

ThreadPoolTaskExecutor는 스프링에서 제공해주는 클래스로 org.springframework.scheduling.concurrent 패키지에 속해있습니다.

생성자도 기본생성자 하나만 존재합니다. 이름에서 알 수 있듯이 쓰레드풀을 이용하여 멀티쓰레드 구현을 쉽게 해주는 클래스입니다.

 

그럼 어떻게 사용하는지 살펴보겠습니다.

	public static void main(String[] args) {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.initialize();
	}

ThreadPoolTaskExecutor 를 생성하고 사용할 수 있도록 initialize()를 호출했습니다. 왜냐면 이니셜라이즈하기 전에는 executor를 사용을 할 수 없기 때문입니다. 만약 이니셜라이즈 하기 전에 사용하려고 한다면 아래와 같은 오류 메시지를 보게 됩니다.

 

Exception in thread "main" java.lang.IllegalStateException: ThreadPoolTaskExecutor not initialized

 

그럼 이제 아래처럼 코드를 좀 더 추가한 뒤 실제로 쓰레드를 실행시켜 보겠습니다.

	public static void main(String[] args) {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.initialize();
		
		log.info("executing threads....");
		Runnable r = () -> {
			try {
				log.info(Thread.currentThread().getName() + ", Now sleeping 10 seconds...");
				Thread.sleep(10000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		};

		for (int i = 0; i < 10; i++) {
			executor.execute(r);
		}
	}

 

출력 결과를 한번 볼까요?

07:42:09.450 [main] INFO com.keichee.test.service.TestService - executing threads....
07:42:09.460 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:42:19.464 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:42:29.465 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:42:39.470 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:42:49.472 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:42:59.477 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:43:09.483 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:43:19.489 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:43:29.491 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:43:39.496 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...

로그가 출력된 시간을 보면 10초마다 출력이 되고있고 쓰레드도 ThreadPoolTaskExecutor-1 하나가 모두 처리한 것을 알 수 있습니다. 즉, 지금 위 코드는 멀티쓰레드로 돌아간게 아니란 얘기죠. ThreadPoolTaskExecutor는 몇 가지 설정값들을 가지고 있습니다. 그 중 corePoolSize 값이 동시에 실행할 쓰레드의 개수를 의미하는데 이 설정의 default 값이 1로 세팅되어 있기 때문에 위 처럼 corePoolSize 설정없이 그대로 사용하면 싱글쓰레드로 돌아가게 됩니다. 

 

설정값

그럼 설정값들에 어떤 것들이 있는지 그것들이 의미하는게 무엇인지 setter 메서드를 기준으로 중요한 값들만 한번 살펴보고 넘어가겠습니다.

 

메서드

설명

기본값

setCorePoolSize

corePoolSize 값을 설정함. corePoolSize는 동시에 실행시킬 쓰레드의 개수를 의미함

1

setAllowCoreThreadTimeOut

코어 쓰레드의 타임아웃을 허용할 것인지에 대한 세터 메서드. true로 설정할 경우 코어 쓰레드를 10으로 설정했어도 일정 시간(keepAliveSeconds)이 지나면 코어 쓰레드 개수가 줄어듦.

false

setKeepAliveSeconds

코어 쓰레드 타임아웃을 허용했을 경우 사용되는 설정값으로, 여기 설정된 시간이 지날 때까지 코어 쓰레드 풀의 쓰레드가 사용되지 않을 경우 해당 쓰레드는 terminate 된다.

60초

setMaxPoolSize

쓰레드 풀의 최대 사이즈

Integer.MAX

setQueueCapacity

쓰레드 풀 큐의 사이즈. corePoolSize 개수를 넘어서는 task가 들어왔을 때 queue에 해당 task들이 쌓이게 된다. 최대로 maxPoolSize 개수 만큼 쌓일 수 있다.

Integer.MAX

 

여기서 corePoolSize, maxPoolSize, queueCapacity 이 세 가지 설정값이 가장 중요합니다.

우선 위에서 봤던 첫 번째 예제에서는 이 세 가지 값에 대해서 별도로 설정을 하지 않았었습니다. 그래서 싱글 쓰레드로 작업이 이루어졌죠.

 

corePoolSize

이번에는 corePoolSize를 올려보겠습니다.

    public static void main(String[] args) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.initialize();

        log.info("executing threads....");
        Runnable r = () -> {
            try {
                log.info(Thread.currentThread().getName() + ", Now sleeping 10 seconds...");
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        for (int i = 0; i < 10; i++) {
            executor.execute(r);
        }
    }

 

corePoolSize를 5로 설정 후 실행해보았습니다. (queueCapacity와 maxPoolSize값은 현재 기본값인 Integer.MAX 입니다)

08:52:50.423 [main] INFO com.keichee.test.service.TestService - executing threads....
08:52:50.456 [ThreadPoolTaskExecutor-3] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-3, Now sleeping 10 seconds...
08:52:50.456 [ThreadPoolTaskExecutor-2] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-2, Now sleeping 10 seconds...
08:52:50.456 [ThreadPoolTaskExecutor-5] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-5, Now sleeping 10 seconds...
08:52:50.456 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
08:52:50.456 [ThreadPoolTaskExecutor-4] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-4, Now sleeping 10 seconds...
08:53:00.460 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
08:53:00.460 [ThreadPoolTaskExecutor-2] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-2, Now sleeping 10 seconds...
08:53:00.461 [ThreadPoolTaskExecutor-3] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-3, Now sleeping 10 seconds...
08:53:00.461 [ThreadPoolTaskExecutor-4] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-4, Now sleeping 10 seconds...
08:53:00.461 [ThreadPoolTaskExecutor-5] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-5, Now sleeping 10 seconds...

실행되자마자 5개의 쓰레드가 실행되고 10초 후에 또 다른 5개의 쓰레드가 실행된 것을 확인할 수 있습니다. 즉, queueCapacity와 maxPoolSize값을 기본값으로 해놓으면 corePoolSize의 값만큼 쓰레드가 동시에 실행되는 것을 알 수 있습니다.

 

corePoolSize와 queueCapacity

그럼 이번에는 corePoolSize는 default 값인 1로 놔두고 queueCapacity와 maxPoolSize 값을 5로 설정해보겠습니다. 그리고 10개의 task가 실행될 때 poolSize, activeSize, queueSize 가 어떻게 변하는지 확인할 수 있게 출력해보겠습니다. 또, 출력을 좀 짧게 하기위해서 쓰레드명 prefix를 "my-"로 변경했습니다.

    public static void main(String[] args) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("my-");
        executor.setQueueCapacity(5);
        executor.setMaxPoolSize(5);
        executor.initialize();

        log.info("executing threads....");
        Runnable r = () -> {
            try {
                log.info(Thread.currentThread().getName() + ", Now sleeping 10 seconds...");
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        for (int i = 0; i < 10; i++) {
            executor.execute(r);
            System.out.print("poolSize:" + executor.getPoolSize());
            System.out.print(", active:" + executor.getActiveCount());
            System.out.println(", queue:" + executor.getThreadPoolExecutor().getQueue().size());
        }
    }

 

어떤 결과가 나올 것 같은지 한번 생각을 해보고 아래 결과를 확인해보시기 바랍니다.

09:02:22.864 [main] INFO com.keichee.test.service.TestService - executing threads....
poolSize:1, active:1, queue:0
poolSize:1, active:1, queue:1
poolSize:1, active:1, queue:2
poolSize:1, active:1, queue:3
poolSize:1, active:1, queue:4
poolSize:1, active:1, queue:5
poolSize:2, active:2, queue:5
09:02:22.886 [my-1] INFO com.keichee.test.service.TestService - my-1, Now sleeping 10 seconds...
09:02:22.887 [my-2] INFO com.keichee.test.service.TestService - my-2, Now sleeping 10 seconds...
poolSize:3, active:3, queue:5
09:02:22.887 [my-3] INFO com.keichee.test.service.TestService - my-3, Now sleeping 10 seconds...
poolSize:4, active:4, queue:5
09:02:22.887 [my-4] INFO com.keichee.test.service.TestService - my-4, Now sleeping 10 seconds...
poolSize:5, active:5, queue:5
09:02:22.887 [my-5] INFO com.keichee.test.service.TestService - my-5, Now sleeping 10 seconds...
09:02:32.888 [my-1] INFO com.keichee.test.service.TestService - my-1, Now sleeping 10 seconds...
09:02:32.890 [my-2] INFO com.keichee.test.service.TestService - my-2, Now sleeping 10 seconds...
09:02:32.890 [my-4] INFO com.keichee.test.service.TestService - my-4, Now sleeping 10 seconds...
09:02:32.890 [my-5] INFO com.keichee.test.service.TestService - my-5, Now sleeping 10 seconds...
09:02:32.890 [my-3] INFO com.keichee.test.service.TestService - my-3, Now sleeping 10 seconds...

 

 

위 출력 결과를 보면 10개의 task를 실행할 때 queue 사이즈가 0에서 5까지 올라가고 그 이후에 poolSize와 active 사이즈가 증가하는 것을 알 수 있습니다. corePoolSize가 1 이라서 2번째 task부터 6번째 task까지는 queue에 들어가게 됩니다. 그리고 7번째 task부터 10번째 task까지 4개의 task는 maxPoolSize 를 넘어서지 않는 한 추가로 쓰레드를 생성하여 pool에 넣고 해당 쓰레드로 각 task를 실행하게 됩니다. 

그럼 만약 maxPoolSize를 넘어설 만큼 많은 양의 task들이 들어온다면 어떻게 될까요?

Exception in thread "main" org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@32eff876[Running, pool size = 5, active threads = 5, queued tasks = 5, completed tasks = 0]] did not accept task: com.keichee.test.service.TestService$$Lambda$22/0x00000008000ce840@5e0826e7
	at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324)

TaskRejectedException 오류가 발생하게 됩니다. 

따라서 얼마나 많은 양의 task를 소화해야 하는지를 정확히 알고 corePoolSize, queueCapacity, maxPoolSize에 적절한 값을 세팅하여 사용해야 합니다. 기본 값으로 사용하면 TaskRejectedException을 볼 일은 거의 없겠지만 그 대신 queue에 어마어마한 양의 task가 쌓이겠죠. 그러다가 애플리케이션의 배포나 어떤 이유에 의해서 재기동이 필요하게 되면 해당 queue에 쌓여있던 task들은 사라지게 됩니다. 

 

스프링부트에서 사용하실 분들은 아래 포스팅을 참고하시면 좋습니다.

스프링 부트에서 ThreadPoolTaskExecutor를 설정 및 사용하는 방법