Old Posts/Java

[Java] DelayQueue 사용법 및 예제

A6K 2023. 5. 1. 05:53

자바의 DelayQueue는 java.util.concurrent 패키지에 들어있는 클래스로 AbstractQueue를 상속받고 있으며 BlockingQueue 인터페이스를 구현한다. 

DelayQueue는 엘리먼트의 딜레이 시간을 기반으로 동작하는 Priority Queue라고 생각하면 된다. 즉 엘리먼트들이 Delay 시간을 기준으로 정렬되어 가장 빨리 딜레이 시간이 끝나는 엘리먼트가 큐의 헤드쪽에 위치한다.

큐에서 엘리먼트를 꺼낼 때, 엘리먼트의 딜레이 시간이 지나지 않았다면 소비할 수 없다.

이후 딜레이 시간이 0보다 작은 숫자가 리턴되면 그 때서야 엘리먼트를 꺼내 쓸 수 있다.

DelayQueue 클래스

DelayQueue는 다음과 같은 상속 구조를 가지고 있다.

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
   implements BlockingQueue<E>

AbstractQueue를 상속하며 BlockingQueue를 구현한다.

중요한 포인트는 DelayQueue의 제네릭 타입이다. 제네릭 타입 정보를 보면 DelayQueue가 다룰 수 있는 엘리먼트는 Delayed 인터페이스를 구현해야 함을 알 수 있다.

Delayed 인터페이스

DelayQueue에 사용할 엘리먼트는 Delayed 인터페이스를 구현해야한다. 

public interface Delayed extends Comparable<Delayed> {

    long getDelay(TimeUnit unit);
}

Delayed 인터페이스는 getDelay() 메소드를 정의해야한다. getDelay() 메소드는 DelayQueue에서 호출해 엘리먼트를 큐에서 꺼낼 수 있는지 여부를 판단하게 된다. getDelay() 메소드가 0이나 음수를 리턴하면 큐에서 꺼낼 수 있다는 의미다.

Delayed 인터페이스는 Comparable 인터페이스를 상속하고 있다. Comparable 인터페이스는 compareTo() 메소드를 정의해야한다. compareTo() 메소드는 두 엘리먼트의 딜레이 타임의 비교 연산이 들어가면 되는데, 이 메소드를 이용해 DelayQueue 안쪽에서 엘리먼트들의 순서를 정하게 된다.

예를 들어 DelayQueue에 사용할 DelayedObject 객체는 다음과 같이 정의할 수 있다.

public class DelayedObject implements Delayed {

    private final String data;
    private final long expTime;

    public DelayedObject(String data, long delayInMillis) {
        this.data = data;
        this.expTime = System.currentTimeMillis() + delayInMillis;
    }

    public String getData() {
        return data;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(expTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        DelayedObject that = (DelayedObject) o;
        int c = Long.compare(expTime, that.expTime);
        if (c != 0) {
            return c;
        }
        return Integer.compare(System.identityHashCode(this), System.identityHashCode(that));
    }
}

위에서 언급한 것처럼 getDelay() 메소드와 compareTo() 메소드를 구현했다.

DelayQueue 생산자/소비자 예제

위에서 구현한 DelayedObject를 이용한 생산자/소비자 예제 코드를 작성해보자. 생산자와 소비자를 정의하고, 각각을 별도의 스레드에서 동작하게 만드는 코드를 작성하자.

우선 DelayedObject를 생산해서 DelayQueue에 넣는 생산자는 다음과 같이 구현할 수 있다.

public class DelayQueueProducer implements Runnable {
    private final BlockingQueue<DelayedObject> queue;
    private final Integer numberOfElements;
    private final int delay;
    private final int interval;
    
    public DelayQueueProducer(BlockingQueue<DelayedObject> queue, int numberOfElements, int delay, int interval) {
        this.queue = queue;
        this.numberOfElements = numberOfElements;
        this.delay = delay;
        this.interval = interval;
    }

    @Override
    public void run() {
        for (int i = 0; i < numberOfElements; i++) {
            DelayedObject object = new DelayedObject(UUID.randomUUID().toString(), delay);
            System.out.println("(" + Date.from(Instant.now()) + ") Put object : " + object.getData());

            try {
                queue.put(object);
                Thread.sleep(interval);
            } catch (InterruptedException ie) {
                ie.printStackTrace();
            }
        }
    }
}

생성자에서 DelayedObject를 생산해 넣을 DelayQueue 객체와 몇 개의 엘리먼트 생산할 것인지, 각 엘리먼트의 기본 딜레이는 얼마나 줄 것이며, 엘리먼트들을 몇 밀리세컨드 단위로 만들 것인지를 인자로 받았다.

run() 메소드에서 DelayedObject 객체를 만들고 DelayQueue에 넣는 동작을 정의했다.

비슷하게 DelayedObject를 DelayQueue로 부터 가져오는 소비자를 다음과 같이 구현할 수 있다.

public class DelayQueueConsumer implements Runnable {
    private final BlockingQueue<DelayedObject> queue;
    private final int numberOfElements;

    public DelayQueueConsumer(BlockingQueue<DelayedObject> queue, int numberOfElements) {
        this.queue = queue;
        this.numberOfElements = numberOfElements;
    }

    // standard constructors

    @Override
    public void run() {
        for (int i = 0; i < numberOfElements; i++) {
            try {
                DelayedObject object = queue.take();
                System.out.println("(" + Date.from(Instant.now()) + ") Get object : " + object.getData());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

소비자는 DelayQueue로부터 DelayedObject 객체를 가져온다.

마지막으로 생산자와 소비자를 만들어 별도의 스레드에서 구동하는 메인 스레드를 다음과 같이 작성해보자.

public class Main {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        BlockingQueue<DelayedObject> queue = new DelayQueue<>();
        int numberOfElements = 3;
        int interval = 1000;
        int delay = 5000;

        DelayQueueProducer producer = new DelayQueueProducer(queue, numberOfElements, delay, interval);
        DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElements);

        // Run Producer / Consumer
        executor.submit(producer);
        executor.submit(consumer);

        // Wait for termination
        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.SECONDS);
    }
}

ExecutorService를 이용해서 생산자와 소비자를 구동한다. 총 3개의 DelayedObject 객체가 생산자에서부터 만들어져 소비자로 전달될 예정이며, DelayedObject 객체는 1초마다 만들어지고 각 엘리먼트의 딜레이는 5초다.

결과는 다음과 같다.

(Wed Apr 26 21:49:27 KST 2023) Put object : 31297989-2021-40bc-a557-c211d7fdd8e6
(Wed Apr 26 21:49:28 KST 2023) Put object : 0cb88e0b-e839-4b0c-af70-dd29ac5be280
(Wed Apr 26 21:49:29 KST 2023) Put object : 1eac1150-e2a1-46c2-9e0e-1c77429dc0a3
(Wed Apr 26 21:49:32 KST 2023) Get object : 31297989-2021-40bc-a557-c211d7fdd8e6
(Wed Apr 26 21:49:33 KST 2023) Get object : 0cb88e0b-e839-4b0c-af70-dd29ac5be280
(Wed Apr 26 21:49:34 KST 2023) Get object : 1eac1150-e2a1-46c2-9e0e-1c77429dc0a3

딜레이 5초짜리 엘리먼트를 1초마다 만들어 총 3개의 엘리먼트를 큐에 집어넣었다. 그리고 3초가 지나 첫 번째 엘리먼트의 딜레이가 만료되면 엘리먼트가 1초마다 소비되기 시작한다.

만약 딜레이없이 엘리먼트가 바로 소비되기를 원한다면 DelayedObject 생성시 딜레이를 0으로 주면 된다. 그러면 getDelay() 메소드가 0 혹은 음수를 바로 리턴하게 되고, 딜레이 없이 엘리먼트가 바로 소비된다.