java async (1)

async(비동기) 처리를 위한 ThreadPoolTaskExecutor

ThreadPoolTaskExecutor를 이용하여 비동기처리하는 방법을 알아보겠습니다.

ThreadPoolTaskExecutor는 스프링에서 제공해주는 클래스로 org.springframework.scheduling.concurrent 패키지에 속해있습니다.

생성자도 기본생성자 하나만 존재합니다. 이름에서 알 수 있듯이 쓰레드풀을 이용하여 멀티쓰레드 구현을 쉽게 해주는 클래스입니다.

 

그럼 어떻게 사용하는지 살펴보겠습니다.

	public static void main(String[] args) {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.initialize();
	}

ThreadPoolTaskExecutor 를 생성하고 사용할 수 있도록 initialize()를 호출했습니다. 왜냐면 이니셜라이즈하기 전에는 executor를 사용을 할 수 없기 때문입니다. 만약 이니셜라이즈 하기 전에 사용하려고 한다면 아래와 같은 오류 메시지를 보게 됩니다.

 

Exception in thread "main" java.lang.IllegalStateException: ThreadPoolTaskExecutor not initialized

 

그럼 이제 아래처럼 코드를 좀 더 추가한 뒤 실제로 쓰레드를 실행시켜 보겠습니다.

	public static void main(String[] args) {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.initialize();
		
		log.info("executing threads....");
		Runnable r = () -> {
			try {
				log.info(Thread.currentThread().getName() + ", Now sleeping 10 seconds...");
				Thread.sleep(10000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		};

		for (int i = 0; i < 10; i++) {
			executor.execute(r);
		}
	}

 

출력 결과를 한번 볼까요?

07:42:09.450 [main] INFO com.keichee.test.service.TestService - executing threads....
07:42:09.460 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:42:19.464 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:42:29.465 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:42:39.470 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:42:49.472 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:42:59.477 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:43:09.483 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:43:19.489 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:43:29.491 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
07:43:39.496 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...

로그가 출력된 시간을 보면 10초마다 출력이 되고있고 쓰레드도 ThreadPoolTaskExecutor-1 하나가 모두 처리한 것을 알 수 있습니다. 즉, 지금 위 코드는 멀티쓰레드로 돌아간게 아니란 얘기죠. ThreadPoolTaskExecutor는 몇 가지 설정값들을 가지고 있습니다. 그 중 corePoolSize 값이 동시에 실행할 쓰레드의 개수를 의미하는데 이 설정의 default 값이 1로 세팅되어 있기 때문에 위 처럼 corePoolSize 설정없이 그대로 사용하면 싱글쓰레드로 돌아가게 됩니다. 

 

설정값

그럼 설정값들에 어떤 것들이 있는지 그것들이 의미하는게 무엇인지 setter 메서드를 기준으로 중요한 값들만 한번 살펴보고 넘어가겠습니다.

 

메서드

설명

기본값

setCorePoolSize

corePoolSize 값을 설정함. corePoolSize는 동시에 실행시킬 쓰레드의 개수를 의미함

1

setAllowCoreThreadTimeOut

코어 쓰레드의 타임아웃을 허용할 것인지에 대한 세터 메서드. true로 설정할 경우 코어 쓰레드를 10으로 설정했어도 일정 시간(keepAliveSeconds)이 지나면 코어 쓰레드 개수가 줄어듦.

false

setKeepAliveSeconds

코어 쓰레드 타임아웃을 허용했을 경우 사용되는 설정값으로, 여기 설정된 시간이 지날 때까지 코어 쓰레드 풀의 쓰레드가 사용되지 않을 경우 해당 쓰레드는 terminate 된다.

60초

setMaxPoolSize

쓰레드 풀의 최대 사이즈

Integer.MAX

setQueueCapacity

쓰레드 풀 큐의 사이즈. corePoolSize 개수를 넘어서는 task가 들어왔을 때 queue에 해당 task들이 쌓이게 된다. 최대로 maxPoolSize 개수 만큼 쌓일 수 있다.

Integer.MAX

 

여기서 corePoolSize, maxPoolSize, queueCapacity 이 세 가지 설정값이 가장 중요합니다.

우선 위에서 봤던 첫 번째 예제에서는 이 세 가지 값에 대해서 별도로 설정을 하지 않았었습니다. 그래서 싱글 쓰레드로 작업이 이루어졌죠.

 

corePoolSize

이번에는 corePoolSize를 올려보겠습니다.

    public static void main(String[] args) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.initialize();

        log.info("executing threads....");
        Runnable r = () -> {
            try {
                log.info(Thread.currentThread().getName() + ", Now sleeping 10 seconds...");
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        for (int i = 0; i < 10; i++) {
            executor.execute(r);
        }
    }

 

corePoolSize를 5로 설정 후 실행해보았습니다. (queueCapacity와 maxPoolSize값은 현재 기본값인 Integer.MAX 입니다)

08:52:50.423 [main] INFO com.keichee.test.service.TestService - executing threads....
08:52:50.456 [ThreadPoolTaskExecutor-3] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-3, Now sleeping 10 seconds...
08:52:50.456 [ThreadPoolTaskExecutor-2] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-2, Now sleeping 10 seconds...
08:52:50.456 [ThreadPoolTaskExecutor-5] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-5, Now sleeping 10 seconds...
08:52:50.456 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
08:52:50.456 [ThreadPoolTaskExecutor-4] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-4, Now sleeping 10 seconds...
08:53:00.460 [ThreadPoolTaskExecutor-1] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-1, Now sleeping 10 seconds...
08:53:00.460 [ThreadPoolTaskExecutor-2] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-2, Now sleeping 10 seconds...
08:53:00.461 [ThreadPoolTaskExecutor-3] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-3, Now sleeping 10 seconds...
08:53:00.461 [ThreadPoolTaskExecutor-4] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-4, Now sleeping 10 seconds...
08:53:00.461 [ThreadPoolTaskExecutor-5] INFO com.keichee.test.service.TestService - ThreadPoolTaskExecutor-5, Now sleeping 10 seconds...

실행되자마자 5개의 쓰레드가 실행되고 10초 후에 또 다른 5개의 쓰레드가 실행된 것을 확인할 수 있습니다. 즉, queueCapacity와 maxPoolSize값을 기본값으로 해놓으면 corePoolSize의 값만큼 쓰레드가 동시에 실행되는 것을 알 수 있습니다.

 

corePoolSize와 queueCapacity

그럼 이번에는 corePoolSize는 default 값인 1로 놔두고 queueCapacity와 maxPoolSize 값을 5로 설정해보겠습니다. 그리고 10개의 task가 실행될 때 poolSize, activeSize, queueSize 가 어떻게 변하는지 확인할 수 있게 출력해보겠습니다. 또, 출력을 좀 짧게 하기위해서 쓰레드명 prefix를 "my-"로 변경했습니다.

    public static void main(String[] args) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("my-");
        executor.setQueueCapacity(5);
        executor.setMaxPoolSize(5);
        executor.initialize();

        log.info("executing threads....");
        Runnable r = () -> {
            try {
                log.info(Thread.currentThread().getName() + ", Now sleeping 10 seconds...");
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        for (int i = 0; i < 10; i++) {
            executor.execute(r);
            System.out.print("poolSize:" + executor.getPoolSize());
            System.out.print(", active:" + executor.getActiveCount());
            System.out.println(", queue:" + executor.getThreadPoolExecutor().getQueue().size());
        }
    }

 

어떤 결과가 나올 것 같은지 한번 생각을 해보고 아래 결과를 확인해보시기 바랍니다.

09:02:22.864 [main] INFO com.keichee.test.service.TestService - executing threads....
poolSize:1, active:1, queue:0
poolSize:1, active:1, queue:1
poolSize:1, active:1, queue:2
poolSize:1, active:1, queue:3
poolSize:1, active:1, queue:4
poolSize:1, active:1, queue:5
poolSize:2, active:2, queue:5
09:02:22.886 [my-1] INFO com.keichee.test.service.TestService - my-1, Now sleeping 10 seconds...
09:02:22.887 [my-2] INFO com.keichee.test.service.TestService - my-2, Now sleeping 10 seconds...
poolSize:3, active:3, queue:5
09:02:22.887 [my-3] INFO com.keichee.test.service.TestService - my-3, Now sleeping 10 seconds...
poolSize:4, active:4, queue:5
09:02:22.887 [my-4] INFO com.keichee.test.service.TestService - my-4, Now sleeping 10 seconds...
poolSize:5, active:5, queue:5
09:02:22.887 [my-5] INFO com.keichee.test.service.TestService - my-5, Now sleeping 10 seconds...
09:02:32.888 [my-1] INFO com.keichee.test.service.TestService - my-1, Now sleeping 10 seconds...
09:02:32.890 [my-2] INFO com.keichee.test.service.TestService - my-2, Now sleeping 10 seconds...
09:02:32.890 [my-4] INFO com.keichee.test.service.TestService - my-4, Now sleeping 10 seconds...
09:02:32.890 [my-5] INFO com.keichee.test.service.TestService - my-5, Now sleeping 10 seconds...
09:02:32.890 [my-3] INFO com.keichee.test.service.TestService - my-3, Now sleeping 10 seconds...

 

 

위 출력 결과를 보면 10개의 task를 실행할 때 queue 사이즈가 0에서 5까지 올라가고 그 이후에 poolSize와 active 사이즈가 증가하는 것을 알 수 있습니다. corePoolSize가 1 이라서 2번째 task부터 6번째 task까지는 queue에 들어가게 됩니다. 그리고 7번째 task부터 10번째 task까지 4개의 task는 maxPoolSize 를 넘어서지 않는 한 추가로 쓰레드를 생성하여 pool에 넣고 해당 쓰레드로 각 task를 실행하게 됩니다. 

그럼 만약 maxPoolSize를 넘어설 만큼 많은 양의 task들이 들어온다면 어떻게 될까요?

Exception in thread "main" org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@32eff876[Running, pool size = 5, active threads = 5, queued tasks = 5, completed tasks = 0]] did not accept task: com.keichee.test.service.TestService$$Lambda$22/0x00000008000ce840@5e0826e7
	at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324)

TaskRejectedException 오류가 발생하게 됩니다. 

따라서 얼마나 많은 양의 task를 소화해야 하는지를 정확히 알고 corePoolSize, queueCapacity, maxPoolSize에 적절한 값을 세팅하여 사용해야 합니다. 기본 값으로 사용하면 TaskRejectedException을 볼 일은 거의 없겠지만 그 대신 queue에 어마어마한 양의 task가 쌓이겠죠. 그러다가 애플리케이션의 배포나 어떤 이유에 의해서 재기동이 필요하게 되면 해당 queue에 쌓여있던 task들은 사라지게 됩니다. 

 

스프링부트에서 사용하실 분들은 아래 포스팅을 참고하시면 좋습니다.

스프링 부트에서 ThreadPoolTaskExecutor를 설정 및 사용하는 방법