Class: Falqon::Message
- Inherits:
-
Object
- Object
- Falqon::Message
- Extended by:
- Forwardable, T::Sig
- Defined in:
- lib/falqon/message.rb
Overview
A message in a queue
This class should typically not be instantiated directly, but rather be created by a queue instance.
Defined Under Namespace
Classes: Metadata
Instance Attribute Summary collapse
-
#queue ⇒ Queue
readonly
The queue instance the message belongs to.
Instance Method Summary collapse
-
#create ⇒ Message
Create the message in the queue.
-
#data ⇒ String
The message data.
-
#dead? ⇒ Boolean
Whether the message status is dead.
-
#delete ⇒ void
Delete the message, removing it from the queue.
-
#exists? ⇒ Boolean
Whether the message exists (i.e. has been created).
-
#id ⇒ Identifier
The message identifier.
-
#initialize(queue, id: nil, data: nil) ⇒ void
constructor
Create a new message.
- #inspect ⇒ String
-
#kill ⇒ void
Kill the message.
-
#metadata ⇒ Metadata
Metadata of the message.
-
#pending? ⇒ Boolean
Whether the message status is pending.
-
#processing? ⇒ Boolean
Whether the message status is processing.
-
#scheduled? ⇒ Boolean
Whether the message status is scheduled.
-
#size ⇒ Integer
Message length.
-
#unknown? ⇒ Boolean
Whether the message status is unknown.
Constructor Details
#initialize(queue, id: nil, data: nil) ⇒ void
Create a new message
39 40 41 42 43 |
# File 'lib/falqon/message.rb', line 39 def initialize(queue, id: nil, data: nil) @queue = queue @id = id @data = data end |
Instance Attribute Details
#queue ⇒ Queue (readonly)
The queue instance the message belongs to
17 18 19 |
# File 'lib/falqon/message.rb', line 17 def queue @queue end |
Instance Method Details
#create ⇒ Message
Create the message in the queue
This method will overwrite any existing message with the same identifier.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/falqon/message.rb', line 102 def create redis.with do |r| = id r.multi do |t| # Store data t.set("#{queue.id}:data:#{}", data) # Set metadata t.hset("#{queue.id}:metadata:#{}", :created_at, Time.now.to_i, :updated_at, Time.now.to_i,) end end self end |
#data ⇒ String
The message data
53 54 55 |
# File 'lib/falqon/message.rb', line 53 def data @data ||= redis.with { |r| r.get("#{queue.id}:data:#{id}") } end |
#dead? ⇒ Boolean
Whether the message status is dead
83 84 85 |
# File 'lib/falqon/message.rb', line 83 def dead? .status == "dead" end |
#delete ⇒ void
This method returns an undefined value.
Delete the message, removing it from the queue
This method deletes the message, metadata, and data from the queue.
146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/falqon/message.rb', line 146 def delete redis.with do |r| r.multi do |t| # Delete message from queue queue.pending.remove(id) queue.dead.remove(id) # Delete data and metadata t.del("#{queue.id}:data:#{id}", "#{queue.id}:metadata:#{id}") end end end |
#exists? ⇒ Boolean
Whether the message exists (i.e. has been created)
89 90 91 92 93 |
# File 'lib/falqon/message.rb', line 89 def exists? redis.with do |r| r.exists("#{queue.id}:data:#{id}") == 1 end end |
#id ⇒ Identifier
The message identifier
47 48 49 |
# File 'lib/falqon/message.rb', line 47 def id @id ||= redis.with { |r| r.incr("#{queue.id}:id") } end |
#inspect ⇒ String
182 183 184 |
# File 'lib/falqon/message.rb', line 182 def inspect "#<#{self.class} id=#{id.inspect} size=#{size.inspect}>" end |
#kill ⇒ void
This method returns an undefined value.
Kill the message
This method moves the message to the dead queue, and resets the retry count.
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/falqon/message.rb', line 125 def kill logger.debug "Killing message #{id} on queue #{queue.name}" redis.with do |r| # Add identifier to dead queue queue.dead.add(id) # Reset retry count and set status to dead r.hdel("#{queue.id}:metadata:#{id}", :retries) r.hset("#{queue.id}:metadata:#{id}", :status, "dead") # Remove identifier from queues queue.pending.remove(id) end end |
#metadata ⇒ Metadata
Metadata of the message
174 175 176 177 178 179 |
# File 'lib/falqon/message.rb', line 174 def queue.redis.with do |r| Metadata .parse(r.hgetall("#{queue.id}:metadata:#{id}")) end end |
#pending? ⇒ Boolean
Whether the message status is pending
65 66 67 |
# File 'lib/falqon/message.rb', line 65 def pending? .status == "pending" end |
#processing? ⇒ Boolean
Whether the message status is processing
71 72 73 |
# File 'lib/falqon/message.rb', line 71 def processing? .status == "processing" end |
#scheduled? ⇒ Boolean
Whether the message status is scheduled
77 78 79 |
# File 'lib/falqon/message.rb', line 77 def scheduled? .status == "scheduled" end |
#size ⇒ Integer
Message length
164 165 166 |
# File 'lib/falqon/message.rb', line 164 def size redis.with { |r| r.strlen("#{queue.id}:data:#{id}") } end |
#unknown? ⇒ Boolean
Whether the message status is unknown
59 60 61 |
# File 'lib/falqon/message.rb', line 59 def unknown? .status == "unknown" end |