Consumers¶
- nsq.run()
Starts any instantiated nsq.Reader or nsq.Writer
Reader – high-level consumer¶
- class nsq.Reader(topic, channel, message_handler=None, name=None, nsqd_tcp_addresses=None, lookupd_http_addresses=None, max_tries=5, max_in_flight=1, requeue_delay=90, lookupd_poll_interval=60, low_rdy_idle_timeout=10, heartbeat_interval=30, max_backoff_duration=128, lookupd_poll_jitter=0.3, tls_v1=False, tls_options=None)¶
Reader provides high-level functionality for building robust NSQ consumers in Python on top of the async module.
Reader receives messages over the specified topic/channel and calls message_handler for each message (up to max_tries).
Multiple readers can be instantiated in a single process (to consume from multiple topics/channels at once).
Supports various hooks to modify behavior when heartbeats are received, to temporarily disable the reader, and pre-process/validate messages.
When supplied a list of nsqlookupd addresses, it will periodically poll those addresses to discover new producers of the specified topic.
It maintains a sufficient RDY count based on the # of producers and your configured max_in_flight.
Handlers should be defined as shown in the examples below. The handler receives a nsq.Message object that has instance methods nsq.Message.finish(), nsq.Message.requeue(), and nsq.Message.touch() to respond to nsqd.
It is responsible for sending FIN or REQ commands based on return value of message_handler. When re-queueing, an increasing delay will be calculated automatically.
Additionally, when message processing fails, it will backoff in increasing multiples of requeue_delay between updating of RDY count.
Synchronous example:
import nsq def handler(message): print message return True r = nsq.Reader(message_handler=handler, lookupd_http_addresses=['http://127.0.0.1:4161'], topic="nsq_reader", channel="asdf", lookupd_poll_interval=15) nsq.run()
Asynchronous example:
import nsq buf = [] def process_message(message): global buf message.enable_async() # cache the message for later processing buf.append(message) if len(buf) >= 3: for msg in buf: print msg msg.finish() buf = [] else: print 'deferring processing' r = nsq.Reader(message_handler=process_message, lookupd_http_addresses=['http://127.0.0.1:4161'], topic="nsq_reader", channel="async", max_in_flight=9) nsq.run()
Parameters: - message_handler – the callable that will be executed for each message received
- topic – specifies the desired NSQ topic
- channel – specifies the desired NSQ channel
- name – a string that is used for logging messages (defaults to “topic:channel”)
- nsqd_tcp_addresses – a sequence of string addresses of the nsqd instances this reader should connect to
- lookupd_http_addresses – a sequence of string addresses of the nsqlookupd instances this reader should query for producers of the specified topic
- max_tries – the maximum number of attempts the reader will make to process a message after which messages will be automatically discarded
- max_in_flight – the maximum number of messages this reader will pipeline for processing. this value will be divided evenly amongst the configured/discovered nsqd producers
- requeue_delay – the base multiple used when re-queueing (multiplied by # of attempts)
- lookupd_poll_interval – the amount of time in seconds between querying all of the supplied nsqlookupd instances. a random amount of time based on thie value will be initially introduced in order to add jitter when multiple readers are running
- low_rdy_idle_timeout – the amount of time in seconds to wait for a message from a producer when in a state where RDY counts are re-distributed (ie. max_in_flight < num_producers)
- heartbeat_interval – the amount of time in seconds to negotiate with the connected producers to send heartbeats (requires nsqd 0.2.19+)
- max_backoff_duration – the maximum time we will allow a backoff state to last in seconds
- lookupd_poll_jitter – The maximum fractional amount of jitter to add to the lookupd pool loop. This helps evenly distribute requests even if multiple consumers restart at the same time.
- tls_v1 – enable TLS v1 encryption (requires nsqd 0.2.22+)
- tls_options – dictionary of options to pass to ssl.wrap_socket() as **kwargs
- connect_to_nsqd(host, port)¶
Adds a connection to nsqd at the specified address.
Parameters: - host – the address to connect to
- port – the port to connect to
- disabled()¶
Called as part of RDY handling to identify whether this Reader has been disabled
This is useful to subclass and override to examine a file on disk or a key in cache to identify if this reader should pause execution (during a deploy, etc.).
- giving_up(message)¶
Called when a message has been received where msg.attempts > max_tries
This is useful to subclass and override to perform a task (such as writing to disk, etc.)
Parameters: message – the nsq.Message received
- heartbeat(conn)¶
Called whenever a heartbeat has been received
This is useful to subclass and override to perform an action based on liveness (for monitoring, etc.)
Parameters: conn – the nsq.AsyncConn over which the heartbeat was received
- is_starved()¶
Used to identify when buffered messages should be processed and responded to.
When max_in_flight > 1 and you’re batching messages together to perform work is isn’t possible to just compare the len of your list of buffered messages against your configured max_in_flight (because max_in_flight may not be evenly divisible by the number of producers you’re connected to, ie. you might never get that many messages... it’s a max).
Example:
def message_handler(self, nsq_msg, reader): # buffer messages if reader.is_starved(): # perform work reader = nsq.Reader(...) reader.set_message_handler(functools.partial(message_handler, reader=reader)) nsq.run()
- process_message(message)¶
Called when a message is received in order to execute the configured message_handler
This is useful to subclass and override if you want to change how your message handlers are called.
Parameters: message – the nsq.Message received
- query_lookupd()¶
Trigger a query of the configured nsq_lookupd_http_addresses.
- set_message_handler(message_handler)¶
Assigns the callback method to be executed for each message received
Parameters: message_handler – a callable that takes a single argument