本文共 11186 字,大约阅读时间需要 37 分钟。
上图中先使用redis-cli作为客户端连接了Redis,之后使用了SUBSCRIBE命令,后面的参数表示订阅了china和hongkong两个channel。可以看到"SUBSCRIBE china hongkong"这条命令的输出是6行(可以分为2组,每一组是一个Message)。因为订阅、取消订阅的操作跟发布的消息都是通过消息(Message)的方式发送的,消息的第一个元素就是消息类型,它可以是以下几种类型:
subscribe: means that we successfully subscribed to the channel given as the second element in the reply. The third argument represents the number of channels we are currently subscribed to.
unsubscribe: means that we successfully unsubscribed from the channel given as second element in the reply. The third argument represents the number of channels we are currently subscribed to. When the last argument is zero, we are no longer subscribed to any channel, and the client can issue any kind of Redis command as we are outside the Pub/Sub state.
message: it is a message received as result of a PUBLISH command issued by another client. The second element is the name of the originating channel, and the third argument is the actual message payload.
--from http://redis.io/topics/pubsub
A client subscribed to one or more channels should not issue commands, although it can subscribe and unsubscribe to and from other channels.
The commands that are allowed in the context of a subscribed client are SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, PING and QUIT
--from http://redis.io/topics/pubsub
On this page: http://redis.io/commands/subscribe applies only to those clients.
The redis-cli is among those clients. So, the comment is not an instruction for users of redis-cli.
Instead, redis-cli blocks waiting for messages on the bus (only to be unsubcribed via a ctrl+c).
--from http://stackoverflow.com/questions/17621371/redis-unsubscribe
可以看到,新的一个客户端使用PUBLISH命令往china频道发布了一条叫"China News"的消息,接下来再看看订阅端:
可以看见,这条消息已经被接收到了。可以看到,收到的消息中第一个参数是类型"message",第二个参数是channel名字"china",第三个参数是消息内容"China News",这和开始说的message类型的结构一致。
pmessage: it is a message received as result of a PUBLISH command issued by another client, matching a pattern-matching subscription. The second element is the original pattern matched, the third element is the name of the originating channel, and the last element the actual message payload.
--from http://redis.io/topics/pubsub
第二部分是匹配的模式“chi*”,第三部分是实际的channel名字“china”,第四部分是消息内容“China Daily”。
1 2 3 4 5 6 7 8 9 10 | < dependency > < groupId >redis.clients</ groupId > < artifactId >jedis</ artifactId > < version >2.8.0</ version > </ dependency > < dependency > < groupId >log4j</ groupId > < artifactId >log4j</ artifactId > < version >1.2.17</ version > </ dependency > |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | package com.demo.redis; import org.apache.log4j.Logger; import redis.clients.jedis.JedisPubSub; public class Subscriber extends JedisPubSub { //注意这里继承了抽象类JedisPubSub private static final Logger LOGGER = Logger.getLogger(Subscriber. class ); @Override public void onMessage(String channel, String message) { LOGGER.info(String.format( "Message. Channel: %s, Msg: %s" , channel, message)); } @Override public void onPMessage(String pattern, String channel, String message) { LOGGER.info(String.format( "PMessage. Pattern: %s, Channel: %s, Msg: %s" , pattern, channel, message)); } @Override public void onSubscribe(String channel, int subscribedChannels) { LOGGER.info( "onSubscribe" ); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { LOGGER.info( "onUnsubscribe" ); } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { LOGGER.info( "onPUnsubscribe" ); } @Override public void onPSubscribe(String pattern, int subscribedChannels) { LOGGER.info( "onPSubscribe" ); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | package com.demo.redis; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import org.apache.log4j.Logger; import redis.clients.jedis.Jedis; public class Publisher { private static final Logger LOGGER = Logger.getLogger(Publisher. class ); private final Jedis publisherJedis; private final String channel; public Publisher(Jedis publisherJedis, String channel) { this .publisherJedis = publisherJedis; this .channel = channel; } /** * 不停的读取输入,然后发布到channel上面,遇到quit则停止发布。 */ public void startPublish() { LOGGER.info( "Type your message (quit for terminate)" ); try { BufferedReader reader = new BufferedReader( new InputStreamReader(System.in)); while ( true ) { String line = reader.readLine(); if (! "quit" .equals(line)) { publisherJedis.publish(channel, line); } else { break ; } } } catch (IOException e) { LOGGER.error( "IO failure while reading input" , e); } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | package com.demo.redis; import org.apache.log4j.Logger; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class Program { public static final String CHANNEL_NAME = "MyChannel" ; //我这里的Redis是一个集群,和192.168.56.102都可以使用 public static final String REDIS_HOST = "" ; public static final int REDIS_PORT = 7000 ; private final static Logger LOGGER = Logger.getLogger(Program. class ); private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig(); private final static JedisPool JEDIS_POOL = new JedisPool(POOL_CONFIG, REDIS_HOST, REDIS_PORT, 0 ); public static void main(String[] args) throws Exception { final Jedis subscriberJedis = JEDIS_POOL.getResource(); final Jedis publisherJedis = JEDIS_POOL.getResource(); final Subscriber subscriber = new Subscriber(); //订阅线程:接收消息 new Thread( new Runnable() { public void run() { try { LOGGER.info( "Subscribing to \"MyChannel\". This thread will be blocked." ); //使用subscriber订阅CHANNEL_NAME上的消息,这一句之后,线程进入订阅模式,阻塞。 subscriberJedis.subscribe(subscriber, CHANNEL_NAME); //当unsubscribe()方法被调用时,才执行以下代码 LOGGER.info( "Subscription ended." ); } catch (Exception e) { LOGGER.error( "Subscribing failed." , e); } } }).start(); //主线程:发布消息到CHANNEL_NAME频道上 new Publisher(publisherJedis, CHANNEL_NAME).startPublish(); publisherJedis.close(); //Unsubscribe subscriber.unsubscribe(); subscriberJedis.close(); } } |
主类Program中定义了channel名字、连接redis的地址和端口,并使用JedisPool来获取Jedis实例。由于订阅者(subscriber)在进入订阅状态后会阻塞线程,因此新起一个线程(new Thread())作为订阅线程,并是用主线程来发布消息。待发布者(类中的new Publisher)停止发布消息(控制台中输入quit即可)时,解除订阅者的订阅(subscriber.unsubscribe()方法)。此时订阅线程解除阻塞,打印结束的日志并退出。
1 2 3 4 5 | log4j.rootLogger=INFO,stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %m%n |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | final byte [] resp = ( byte []) firstObj; if (Arrays.equals(SUBSCRIBE.raw, resp)) { subscribedChannels = ((Long) reply.get( 2 )).intValue(); final byte [] bchannel = ( byte []) reply.get( 1 ); final String strchannel = (bchannel == null ) ? null : SafeEncoder.encode(bchannel); //调用onSubscribe方法,该方法在我们的Subscriber类中实现 onSubscribe(strchannel, subscribedChannels); } else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) { subscribedChannels = ((Long) reply.get( 2 )).intValue(); final byte [] bchannel = ( byte []) reply.get( 1 ); final String strchannel = (bchannel == null ) ? null : SafeEncoder.encode(bchannel); //调用onUnsubscribe方法,该方法在我们的Subscriber类中实现 onUnsubscribe(strchannel, subscribedChannels); } else if (Arrays.equals(MESSAGE.raw, resp)) { final byte [] bchannel = ( byte []) reply.get( 1 ); final byte [] bmesg = ( byte []) reply.get( 2 ); final String strchannel = (bchannel == null ) ? null : SafeEncoder.encode(bchannel); final String strmesg = (bmesg == null ) ? null : SafeEncoder.encode(bmesg); //调用onMessage方法,该方法在我们的Subscriber类中实现 onMessage(strchannel, strmesg); } else if (Arrays.equals(PMESSAGE.raw, resp)) { final byte [] bpattern = ( byte []) reply.get( 1 ); final byte [] bchannel = ( byte []) reply.get( 2 ); final byte [] bmesg = ( byte []) reply.get( 3 ); final String strpattern = (bpattern == null ) ? null : SafeEncoder.encode(bpattern); final String strchannel = (bchannel == null ) ? null : SafeEncoder.encode(bchannel); final String strmesg = (bmesg == null ) ? null : SafeEncoder.encode(bmesg); //调用onPMessage方法,该方法在我们的Subscriber类中实现 onPMessage(strpattern, strchannel, strmesg); } else if (Arrays.equals(PSUBSCRIBE.raw, resp)) { subscribedChannels = ((Long) reply.get( 2 )).intValue(); final byte [] bpattern = ( byte []) reply.get( 1 ); final String strpattern = (bpattern == null ) ? null : SafeEncoder.encode(bpattern); onPSubscribe(strpattern, subscribedChannels); } else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) { subscribedChannels = ((Long) reply.get( 2 )).intValue(); final byte [] bpattern = ( byte []) reply.get( 1 ); final String strpattern = (bpattern == null ) ? null : SafeEncoder.encode(bpattern); //调用onPUnsubscribe方法,该方法在我们的Subscriber类中实现 onPUnsubscribe(strpattern, subscribedChannels); } else { //对于其他Redis没有定义的返回消息类型,则直接报错 throw new JedisException( "Unknown message type: " + firstObj); } |