import asynctnt
from .tube import Tube
__all__ = (
'Queue',
)
[docs]class Queue:
__slots__ = (
'_conn', '_tube_cls', '_tubes', '_namespace'
)
[docs] def __init__(self,
conn: asynctnt.Connection,
tube_cls=Tube,
namespace='queue'):
"""
Queue constructor.
:param conn:
asynctnt connection (see
`asynctnt <https://github.com/igorcoding/asynctnt>`__
documentation)
:param tube_cls:
Tube class that is used for Tube creation (default is
:class:`asynctnt_queue.Tube`)
:param namespace:
Variable which was used for queue module import (
default is `queue`)
"""
assert isinstance(conn, asynctnt.Connection), \
'conn must be asynctnt.Connection instance'
self._conn = conn
self._tube_cls = tube_cls
self._tubes = {}
self._namespace = namespace
@property
def conn(self):
"""
``asynctnt`` connection
:returns: :class:`asynctnt.Connection` instance
"""
return self._conn
@property
def namespace(self):
"""
Queues namespace
:returns: :class:`str` instance
"""
return self._namespace
[docs] def tube(self, name):
"""
Returns tube by its name
:param name: Tube name
:returns: ``self.tube_cls`` instance
(by default :class:`asynctnt_queue.Tube`)
"""
if name in self._tubes:
return self._tubes[name]
assert name, 'Tube name must be specified'
t = self._tube_cls(self, name)
self._tubes[name] = t
return t
[docs] async def statistics(self, tube_name=None):
"""
Returns queue statistics (coroutine)
:param tube_name:
If specified, statistics by a specific tube is returned,
else statistics about all tubes is returned
"""
args = None
if tube_name is not None:
args = (tube_name,)
res = await self._conn.call('{}.statistics'.format(self._namespace), args)
if self._conn.version < (1, 7): # pragma: nocover
return res.body[0][0]
return res.body[0]