Ensar Basri Kahveci

overly distributed

A Simple Jedis Publish / Subscribe Example

Posted at — Jun 20, 2012

Hello again, after more than 4 months :) So many things have changed in my life since my last blog post. I will try to talk about them from time to time, which I am not very sure about that actually :)

I have been playing with Redis and its Java client Jedis lately. It looks very promising. I’m making experimental things on it and I really liked it. I will probably use it one of my personal projects for some use cases.

Below code is an example for a publish / subscribe use case with Jedis. You need Jedis (my version is 2.0.0) and SLF4J jars in your classpath for running it.

In redis, you can subscribe to multiple channels and when someone publishes messages on those channels, redis notifies you with published messages. Jedis provides this functionality with JedisPubSub abstract class. To handle pub / sub events, you need to extend JedisPubSub class and implement the abstract methods.

	package com.basrikahveci.redis;
	
	import org.slf4j.Logger;
	import org.slf4j.LoggerFactory;
	import redis.clients.jedis.JedisPubSub;
	
	public class Subscriber extends JedisPubSub {

    	private static Logger logger = LoggerFactory.getLogger(Subscriber.class);
	
    	@Override
	    public void onMessage(String channel, String message) {
    	    logger.info("Message received. Channel: {}, Msg: {}", channel, message);
	    }

    	@Override
	    public void onPMessage(String pattern, String channel, String message) {
	
    	}

    	@Override
	    public void onSubscribe(String channel, int subscribedChannels) {
	
    	}
	
    	@Override
	    public void onUnsubscribe(String channel, int subscribedChannels) {
	
    	}
	
    	@Override
	    public void onPUnsubscribe(String pattern, int subscribedChannels) {
	
    	}

	    @Override
    	public void onPSubscribe(String pattern, int subscribedChannels) {
	
    	}
	}

Here is my dummy subscriber class. I will subscribe to some channels with this class and get notifications.

My ugly main method is below.

	package com.basrikahveci.redis;
	
	import org.slf4j.Logger;
	import org.slf4j.LoggerFactory;
	import redis.clients.jedis.Jedis;
	import redis.clients.jedis.JedisPool;
	import redis.clients.jedis.JedisPoolConfig;
	
	public class Program {

    	public static final String CHANNEL_NAME = "commonChannel";

	    private static Logger logger = LoggerFactory.getLogger(Program.class);

	    public static void main(String[] args) throws Exception {

    	    final JedisPoolConfig poolConfig = new JedisPoolConfig();
        	final JedisPool jedisPool = new JedisPool(poolConfig, "localhost", 6379, 0);
		final Jedis subscriberJedis = jedisPool.getResource();
		final Subscriber subscriber = new Subscriber();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    logger.info("Subscribing to \"commonChannel\". This thread will be blocked.");
                    subscriberJedis.subscribe(subscriber, CHANNEL_NAME);
                    logger.info("Subscription ended.");
                } catch (Exception e) {
                    logger.error("Subscribing failed.", e);
                }
            }
        }).start();

        final Jedis publisherJedis = jedisPool.getResource();

        new Publisher(publisherJedis, CHANNEL_NAME).start();

        subscriber.unsubscribe();
        jedisPool.returnResource(subscriberJedis);
        jedisPool.returnResource(publisherJedis);
    }
}

Let me explain it. I am using a JedisPool to get Jedis instances. Jedis class is not thread-safe but JedisPool is thread-safe by the way. I am using 2 Jedis instances, one for publishing messages and one for subscribing to channels. I made the subscription on another thread because it’s a blocking operation. I also have a very simple Publisher class which reads from console and publishes them on the given channel. Here it is.

	package com.basrikahveci.redis;

	import org.slf4j.Logger;
	import org.slf4j.LoggerFactory;
	import redis.clients.jedis.Jedis;

	import java.io.BufferedReader;
	import java.io.IOException;
	import java.io.InputStreamReader;

	public class Publisher {

    	private static final Logger logger = LoggerFactory.getLogger(Publisher.class);

	    private final Jedis publisherJedis;

    	private final String channel;

    	public Publisher(Jedis publisherJedis, String channel) {
        	this.publisherJedis = publisherJedis;
	        this.channel = channel;
    	}

	    public void start() {
    	    logger.info("Type your message (quit for terminate)");
        	try {
            	BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

            	while (true) {
                	String line = reader.readLine();

	                if (!"quit".equals(line)) {
                    	publisherJedis.publish(channel, line);
	                } else {
    	                break;
        	        }
            	}

	        } catch (IOException e) {
    	        logger.error("IO failure while reading input, e");
        	}
	    }
	}

I simply take user’s input from console and publish it on a channel via Jedis client. When user types quit, I cancel the subscription and that triggers the blocked subscription thread to continue.

Key points here are:

You can get Jedis from here.

comments powered by Disqus