class Redis::SubscribedClient

Public Class Methods

new(client) click to toggle source
# File lib/redis/subscribe.rb, line 5
def initialize(client)
  @client = client
  @write_monitor = Monitor.new
end

Public Instance Methods

call_v(command) click to toggle source
# File lib/redis/subscribe.rb, line 10
def call_v(command)
  @write_monitor.synchronize do
    @client.call_v(command)
  end
end
close() click to toggle source
# File lib/redis/subscribe.rb, line 40
def close
  @client.close
end
psubscribe(*channels, &block) click to toggle source
# File lib/redis/subscribe.rb, line 24
def psubscribe(*channels, &block)
  subscription("psubscribe", "punsubscribe", channels, block)
end
psubscribe_with_timeout(timeout, *channels, &block) click to toggle source
# File lib/redis/subscribe.rb, line 28
def psubscribe_with_timeout(timeout, *channels, &block)
  subscription("psubscribe", "punsubscribe", channels, block, timeout)
end
punsubscribe(*channels) click to toggle source
# File lib/redis/subscribe.rb, line 36
def punsubscribe(*channels)
  call_v([:punsubscribe, *channels])
end
subscribe(*channels, &block) click to toggle source
# File lib/redis/subscribe.rb, line 16
def subscribe(*channels, &block)
  subscription("subscribe", "unsubscribe", channels, block)
end
subscribe_with_timeout(timeout, *channels, &block) click to toggle source
# File lib/redis/subscribe.rb, line 20
def subscribe_with_timeout(timeout, *channels, &block)
  subscription("subscribe", "unsubscribe", channels, block, timeout)
end
unsubscribe(*channels) click to toggle source
# File lib/redis/subscribe.rb, line 32
def unsubscribe(*channels)
  call_v([:unsubscribe, *channels])
end

Protected Instance Methods

subscription(start, stop, channels, block, timeout = 0) click to toggle source
# File lib/redis/subscribe.rb, line 46
def subscription(start, stop, channels, block, timeout = 0)
  sub = Subscription.new(&block)

  call_v([start, *channels])
  while event = @client.next_event(timeout)
    if event.is_a?(::RedisClient::CommandError)
      raise Client::ERROR_MAPPING.fetch(event.class), event.message
    end

    type, *rest = event
    if callback = sub.callbacks[type]
      callback.call(*rest)
    end
    break if type == stop && rest.last == 0
  end
  # No need to unsubscribe here. The real client closes the connection
  # whenever an exception is raised (see #ensure_connected).
end