Queue

These queues require additional dependencies.

Queue

Python Dependency

Redis

redis

RabbitMQ

pika

Kafka

confluent_kafka

Nats

nats

class tinder.queue.NatsConsumer(subject: str, max_inflight: int, client_name: str, durable_name: str = 'durable', cluster_id: str = 'test-cluster')[source]

A Nats Streaming consumer using durable queues.

It allows you to resume your progress with manual acks.

A durable subscription is identified by durable_name & client_name.

Parameters
  • subject (str) – a subject to subscribe

  • max_flight (int) – the maximum number of msgs to hold in the client

  • durable_name (str) –

  • client_name (str) – Defaults to ‘clientid’.

  • cluster_id (str) – Defaults to ‘test-cluster’.

class tinder.queue.NatsProducer(subject: str, client_name: str, cluster_id: str = 'test-cluster')[source]

A Nats Streaming producer using durable queues.

It allows you to resume your progress with manual acks.

A durable subscription is identified by durable_name & client_name.

Parameters
  • subject (str) – a subject to subscribe

  • max_flight (int) – the maximum number of msgs to hold in the client

  • durable_name (str) –

  • client_name (str) – Defaults to ‘clientid’.

  • cluster_id (str) – Defaults to ‘test-cluster’.

class tinder.queue.KafkaConsumer(topic: str, prefetch: int, consumer_id: str, host: str = 'localhost')[source]
class tinder.queue.KafkaProducer(topic: str, host: str = 'localhost')[source]
Parameters
  • topic (str) – the name of a topic (queue) you publish to.

  • host (str, optional) – Defaults to ‘localhost’.

flush()[source]

flush the reamining kafka messages.

send(msg: str)[source]

send a single string.

Parameters

msg (str) – a message to send.

class tinder.queue.RabbitConsumer(queue: str, prefetch: int, host: str = 'localhost', port: int = 5672)[source]

A RabbitMQ consumer that provides data in batch.

If the prefetch is 3*B, and you are processing messsages in batch of the size B, the server sends you up to 2*B messages in advance.

Parameters
  • queue (str) – the name of a queue.

  • prefetch (int) – the number of msgs to prefetch. reommend: batch_size*3

  • host (str) – the hostname to connect without port.

  • port (int) – the port to connect

ack(ack_tags: List[T])[source]

Report that you successfully processed messages.

Parameters

ack_tags – a single ack tag or a list of ack tags of successful messages.

ack_upto(ack_tag)[source]

Report that you successfully processed all messages up to ack_tag.

Parameters

ack_tag – the ack tag of the last successful message.

close()[source]

Close the connection.

If a msg is not acked until the disconnect, it is considered not delivered.

nack(ack_tags: List[T], requeue)[source]

Report that you fail to process messages.

Parameters

ack_tags – a list of ack tags of successful messages.

nack_upto(ack_tag, requeue)[source]

Report that you fail to process all messages up to ack_tag.

Parameters

ack_tag – the ack tag of the last successful message.

one(timeout=None)[source]

Consume one message. (blocking) After processing the message, you should call consumer.ack(ack_tag).

Returns

A tuple (msg, ack_tag).

Raise:

raise on timeout.

one_batch(max_batch_size: int) → Tuple[List[T], List[T]][source]

Consume up to max_batch_size messages. Wait until at least one msg is available. After processing the message, you should call consumer.ack.

Parameters

max_batch_size – the number of messages to consume.

Returns: (List,List). The first is a list of messages. The second is a list of ack tags.

class tinder.queue.RedisQueue(queue: str, unique_history: bool = False, soft_capacity=None, redis_client=None)[source]

A FIFO queue based on Redis. Can be used by multiple workers. Workers can share the queue by specifying the same queue name.

Example:

import redis
import tinder
q = tinder.queue.RedisQueue('q1')
Parameters
  • queue (str) – the name of a queue.

  • unique_history (bool) – Any element that is ever pushed into the queue is not pushed again.

  • soft_capacity (int) – The max size of queue. This is ‘soft’ because the queue can grow up to soft_capacity+(number of workers)-1.

  • redis_client – if not provided, the default one is created.

available()[source]
Returns

The current size of the queue.

clear(history: bool = False)[source]

Empty the queue.

Parameters

history – if True, clear all history as well. has no effect if unique_history is False.

pop_at_least_one(max_num: int = 1)[source]

Pop a batch of elements. return at least one. Wait if the queue is empty. Return immediately if the queue is not empty but smaller than max_num.

Parameters

max_num (int) – maximum number of samples to pop.

Returns

a list of elements.

pop_one()[source]

Pop an element. Wait if the queue is empty.

Returns

an element.

push(element)[source]

Append element to the queue.

If soft_capacity is set and the queue is full, wait until a room is available.

If unique_history is set, the ever-seen element is ignored.

Parameters

element – an element of type that redis-py can encode.

Returns: