Source code for compass.services.queues

"""Module for "singleton" QUERIES dictionary"""

import asyncio


_QUEUES = {}


[docs] def initialize_service_queue(service_name): """Initialize an `asyncio.Queue()` for a service Repeated calls to this function return the same queue Parameters ---------- service_name : str Name of service to initialize queue for. Returns ------- asyncio.Queue() Queue instance for this service. """ return _QUEUES.setdefault(service_name, asyncio.Queue())
[docs] def tear_down_service_queue(service_name): """Remove the queue for a service The queue does not have to exist, so repeated calls to this function are OK. Parameters ---------- service_name : str Name of service to delete queue for. """ _QUEUES.pop(service_name, None)
[docs] def get_service_queue(service_name): """Retrieve the queue for a service Parameters ---------- service_name : str Name of service to retrieve queue for. Returns ------- asyncio.Queue or None Queue instance for this service, or `None` if the queue was not initialized. """ return _QUEUES.get(service_name)