Queue¶
These queues require additional dependencies.
Queue |
Python Dependency |
---|---|
Redis |
redis |
RabbitMQ |
pika |
Kafka |
confluent_kafka |
Nats |
nats |
-
class
tinder.queue.
NatsConsumer
(subject: str, max_inflight: int, client_name: str, durable_name: str = 'durable', cluster_id: str = 'test-cluster')[source]¶ A Nats Streaming consumer using durable queues.
It allows you to resume your progress with manual acks.
A durable subscription is identified by durable_name & client_name.
-
class
tinder.queue.
NatsProducer
(subject: str, client_name: str, cluster_id: str = 'test-cluster')[source]¶ A Nats Streaming producer using durable queues.
It allows you to resume your progress with manual acks.
A durable subscription is identified by durable_name & client_name.
-
class
tinder.queue.
KafkaConsumer
(topic: str, prefetch: int, consumer_id: str, host: str = 'localhost')[source]¶
-
class
tinder.queue.
RabbitConsumer
(queue: str, prefetch: int, host: str = 'localhost', port: int = 5672)[source]¶ A RabbitMQ consumer that provides data in batch.
If the prefetch is 3*B, and you are processing messsages in batch of the size B, the server sends you up to 2*B messages in advance.
- Parameters
-
ack
(ack_tags: List[T])[source]¶ Report that you successfully processed messages.
- Parameters
ack_tags – a single ack tag or a list of ack tags of successful messages.
-
ack_upto
(ack_tag)[source]¶ Report that you successfully processed all messages up to ack_tag.
- Parameters
ack_tag – the ack tag of the last successful message.
-
close
()[source]¶ Close the connection.
If a msg is not acked until the disconnect, it is considered not delivered.
-
nack
(ack_tags: List[T], requeue)[source]¶ Report that you fail to process messages.
- Parameters
ack_tags – a list of ack tags of successful messages.
-
nack_upto
(ack_tag, requeue)[source]¶ Report that you fail to process all messages up to ack_tag.
- Parameters
ack_tag – the ack tag of the last successful message.
-
one
(timeout=None)[source]¶ Consume one message. (blocking) After processing the message, you should call consumer.ack(ack_tag).
- Returns
A tuple (msg, ack_tag).
- Raise:
raise on timeout.
-
one_batch
(max_batch_size: int) → Tuple[List[T], List[T]][source]¶ Consume up to max_batch_size messages. Wait until at least one msg is available. After processing the message, you should call consumer.ack.
- Parameters
max_batch_size – the number of messages to consume.
Returns: (List,List). The first is a list of messages. The second is a list of ack tags.
-
class
tinder.queue.
RedisQueue
(queue: str, unique_history: bool = False, soft_capacity=None, redis_client=None)[source]¶ A FIFO queue based on Redis. Can be used by multiple workers. Workers can share the queue by specifying the same queue name.
Example:
import redis import tinder q = tinder.queue.RedisQueue('q1')
- Parameters
queue (str) – the name of a queue.
unique_history (bool) – Any element that is ever pushed into the queue is not pushed again.
soft_capacity (int) – The max size of queue. This is ‘soft’ because the queue can grow up to soft_capacity+(number of workers)-1.
redis_client – if not provided, the default one is created.
-
clear
(history: bool = False)[source]¶ Empty the queue.
- Parameters
history – if True, clear all history as well. has no effect if unique_history is False.