소스 코드: Lib/asyncio/queues.py


asyncio 큐는 queue 모듈의 클래스와 유사하도록 설계되었습니다. asyncio 큐는 스레드 안전하지 않지만, async/await 코드에서 사용되도록 설계되었습니다.

asyncio 큐의 메서드에는 timeout 매개 변수가 없습니다; 시간제한이 있는 큐 연산을 하려면 asyncio.wait_for() 함수를 사용하십시오.

아래의 예제 절도 참조하십시오.

Queue

class asyncio.Queue(maxsize=0)

선입 선출 (FIFO) 큐.

maxsize가 0보다 작거나 같으면 큐 크기는 무한합니다. 0보다 큰 정수면, 큐가 maxsize에 도달했을 때 get()이 항목을 제거할 때까지 await put()이 블록합니다.

표준 라이브러리의 스레드를 쓰는 queue와는 달리, 큐의 크기는 항상 알려져 있으며 qsize() 메서드를 호출하여 얻을 수 있습니다.

버전 3.10에서 변경: loop 매개 변수를 제거했습니다.

이 클래스는 스레드 안전하지 않습니다.

maxsize

큐에 허용되는 항목 수.

empty()

큐가 비어 있으면 True를 반환하고, 그렇지 않으면 False를 반환합니다.

full()

큐에 maxsize 항목이 있으면 True를 반환합니다.

큐가 maxsize=0 (기본값)으로 초기화되었으면, full()은 절대 True를 반환하지 않습니다.

async get()

큐에서 항목을 제거하고 반환합니다. 큐가 비어 있으면, 항목이 들어올 때까지 기다립니다.

Raises QueueShutDown if the queue has been shut down and is empty, or if the queue has been shut down immediately.

get_nowait()

항목을 즉시 사용할 수 있으면 항목을 반환하고, 그렇지 않으면 QueueEmpty를 발생시킵니다.

async join()

큐의 모든 항목을 수신하여 처리할 때까지 블록합니다.

완료되지 않은 작업 수는 항목이 큐에 추가될 때마다 증가합니다. 이 수는 소비자 코루틴이 항목을 수신했고 그 항목에 관한 작업이 모두 완료되었음을 나타내는 task_done()를 호출할 때마다 감소합니다. 완료되지 않은 작업 수가 0으로 떨어지면 join()가 블록 해제됩니다.

async put(item)

큐에 항목을 넣습니다. 큐가 가득 차면, 항목을 추가할 빈자리가 생길 때까지 기다립니다.

Raises QueueShutDown if the queue has been shut down.

put_nowait(item)

블록하지 않고 항목을 큐에 넣습니다.

자리가 즉시 나지 않으면, QueueFull를 일으킵니다.

qsize()

큐에 있는 항목 수를 돌려줍니다.

shutdown(immediate=False)

Shut down the queue, making get() and put() raise QueueShutDown.

By default, get() on a shut down queue will only raise once the queue is empty. Set immediate to true to make get() raise immediately instead.

All blocked callers of put() and get() will be unblocked. If immediate is true, a task will be marked as done for each remaining item in the queue, which may unblock callers of join().

Added in version 3.13.

task_done()

이전에 큐에 넣은 작업 항목이 완료되었음을 나타냅니다.

큐 소비자가 사용합니다. 작업 항목을 꺼내는 데 사용된 get() 마다, 뒤따르는 task_done() 호출은 작업 항목에 관한 처리가 완료되었음을 큐에 알려줍니다.

join()이 현재 블록 중이면, 모든 항목이 처리될 때 다시 시작됩니다 (큐에 put()한 모든 항목에 대해 task_done() 호출이 수신되었음을 뜻합니다).

shutdown(immediate=True) calls task_done() for each remaining item in the queue.

큐에 넣은 항목보다 더 많이 호출되면 ValueError를 발생시킵니다.

우선순위 큐

class asyncio.PriorityQueue

Queue의 변형; 우선순위 순서로 항목을 꺼냅니다 (가장 낮은 우선순위가 처음입니다).

엔트리는 일반적으로 (priority_number, data) 형식의 튜플입니다.

LIFO 큐

class asyncio.LifoQueue

가장 최근에 추가된 항목을 먼저 꺼내는 Queue의 변형 (후입 선출).

예외

exception asyncio.QueueEmpty

이 예외는 get_nowait() 메서드가 빈 큐에 호출될 때 발생합니다.

exception asyncio.QueueFull

put_nowait() 메서드가 maxsize에 도달한 큐에 호출될 때 발생하는 예외입니다.

exception asyncio.QueueShutDown

put() 이나 get() 메서드가 종료된 큐에 호출될 때 발생하는 예외입니다.

Added in version 3.13.

예제

큐를 사용하여 여러 동시 태스크로 작업 부하를 분산시킬 수 있습니다:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # 큐에서 "작업 항목"을 가져옵니다.
        sleep_for = await queue.get()

        # "sleep_for" 초 동안 잡니다.
        await asyncio.sleep(sleep_for)

        # 큐에 "작업 항목"이 처리되었음을 알립니다.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # "작업 부하"를 저장하는 데 사용할 큐를 만듭니다.
    queue = asyncio.Queue()

    # 무작위 대기 시간을 만들어서 큐에 넣습니다.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # 큐를 동시에 처리할 세 개의 worker 태스크를 만듭니다.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # 큐가 완전히 처리될 때까지 기다립니다.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # worker 태스크를 취소합니다.
    for task in tasks:
        task.cancel()
    # 모든 worker 태스크가 취소될 때까지 기다립니다.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())