Class: Falqon::Message

Inherits:
Object
  • Object
show all
Extended by:
Forwardable, T::Sig
Defined in:
lib/falqon/message.rb

Overview

A message in a queue

This class should typically not be instantiated directly, but rather be created by a queue instance.

Defined Under Namespace

Classes: Metadata

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue, id: nil, data: nil) ⇒ void

Create a new message

Examples:

Instantiate an existing message

queue = Falqon::Queue.new("my_queue")
id = queue.push("Hello, World!")
message = Falqon::Message.new(queue, id:)
message.data # => "Hello, World!"

Create a new message

queue = Falqon::Queue.new("my_queue")
message = Falqon::Message.new(queue, data: "Hello, World!")
message.create
message.id # => 1

Parameters:

  • queue (Queue)

    The queue instance the message belongs to

  • id (Identifier, nil) (defaults to: nil)

    The message identifier (optional if creating a new message)

  • data (Data, nil) (defaults to: nil)

    The message data (optional if fetching an existing message)



39
40
41
42
43
# File 'lib/falqon/message.rb', line 39

def initialize(queue, id: nil, data: nil)
  @queue = queue
  @id = id
  @data = data
end

Instance Attribute Details

#queueQueue (readonly)

The queue instance the message belongs to

Returns:



17
18
19
# File 'lib/falqon/message.rb', line 17

def queue
  @queue
end

Instance Method Details

#createMessage

Create the message in the queue

This method will overwrite any existing message with the same identifier.

Returns:

  • (Message)

    The message instance



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/falqon/message.rb', line 102

def create
  redis.with do |r|
    message_id = id

    r.multi do |t|
      # Store data
      t.set("#{queue.id}:data:#{message_id}", data)

      # Set metadata
      t.hset("#{queue.id}:metadata:#{message_id}",
             :created_at, Time.now.to_i,
             :updated_at, Time.now.to_i,)
    end
  end

  self
end

#dataString

The message data

Returns:

  • (String)


53
54
55
# File 'lib/falqon/message.rb', line 53

def data
  @data ||= redis.with { |r| r.get("#{queue.id}:data:#{id}") }
end

#dead?Boolean

Whether the message status is dead

Returns:

  • (Boolean)


83
84
85
# File 'lib/falqon/message.rb', line 83

def dead?
  .status == "dead"
end

#deletevoid

This method returns an undefined value.

Delete the message, removing it from the queue

This method deletes the message, metadata, and data from the queue.



146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/falqon/message.rb', line 146

def delete
  redis.with do |r|
    r.multi do |t|
      # Delete message from queue
      queue.pending.remove(id)
      queue.dead.remove(id)

      # Delete data and metadata
      t.del("#{queue.id}:data:#{id}", "#{queue.id}:metadata:#{id}")
    end
  end
end

#exists?Boolean

Whether the message exists (i.e. has been created)

Returns:

  • (Boolean)


89
90
91
92
93
# File 'lib/falqon/message.rb', line 89

def exists?
  redis.with do |r|
    r.exists("#{queue.id}:data:#{id}") == 1
  end
end

#idIdentifier

The message identifier

Returns:



47
48
49
# File 'lib/falqon/message.rb', line 47

def id
  @id ||= redis.with { |r| r.incr("#{queue.id}:id") }
end

#inspectString

Returns:

  • (String)


182
183
184
# File 'lib/falqon/message.rb', line 182

def inspect
  "#<#{self.class} id=#{id.inspect} size=#{size.inspect}>"
end

#killvoid

This method returns an undefined value.

Kill the message

This method moves the message to the dead queue, and resets the retry count.



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/falqon/message.rb', line 125

def kill
  logger.debug "Killing message #{id} on queue #{queue.name}"

  redis.with do |r|
    # Add identifier to dead queue
    queue.dead.add(id)

    # Reset retry count and set status to dead
    r.hdel("#{queue.id}:metadata:#{id}", :retries)
    r.hset("#{queue.id}:metadata:#{id}", :status, "dead")

    # Remove identifier from queues
    queue.pending.remove(id)
  end
end

#metadataMetadata

Metadata of the message

Returns:

  • (Metadata)

    The metadata of the message

See Also:



174
175
176
177
178
179
# File 'lib/falqon/message.rb', line 174

def 
  queue.redis.with do |r|
    Metadata
      .parse(r.hgetall("#{queue.id}:metadata:#{id}"))
  end
end

#pending?Boolean

Whether the message status is pending

Returns:

  • (Boolean)


65
66
67
# File 'lib/falqon/message.rb', line 65

def pending?
  .status == "pending"
end

#processing?Boolean

Whether the message status is processing

Returns:

  • (Boolean)


71
72
73
# File 'lib/falqon/message.rb', line 71

def processing?
  .status == "processing"
end

#scheduled?Boolean

Whether the message status is scheduled

Returns:

  • (Boolean)


77
78
79
# File 'lib/falqon/message.rb', line 77

def scheduled?
  .status == "scheduled"
end

#sizeInteger

Message length

Returns:

  • (Integer)

    The string length of the message (in bytes)



164
165
166
# File 'lib/falqon/message.rb', line 164

def size
  redis.with { |r| r.strlen("#{queue.id}:data:#{id}") }
end

#unknown?Boolean

Whether the message status is unknown

Returns:

  • (Boolean)


59
60
61
# File 'lib/falqon/message.rb', line 59

def unknown?
  .status == "unknown"
end