import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException;
public class QueueClient {
public final static String QUEUE_NAME = "hello";
public static void send() throws IOException, TimeoutException{ RabbitmqClient client = new RabbitmqClient(); Channel channel = client.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// create messages. List<Message> messages = new ArrayList<Message>(); for (int i = 0; i < 10; i++) { messages.add(new Message(QUEUE_NAME, "Hello World! " + i)); }
System.out.println("ready to send."); for (Message m : messages) { channel.basicPublish(m.exchange, m.routingKey, null, m.body.getBytes()); System.out.println(" [x] Sent " + m.toString()); }
client.close(); }
public static void receive() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException, TimeoutException{ RabbitmqClient client = new RabbitmqClient(); Channel channel = client.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer);
System.out.println("ready to receive."); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } }
public static void main(String[]args) throws IOException, TimeoutException, InterruptedException{
// QueueClient.send();
QueueClient.receive();
}
}
class RabbitmqClient { public static String HOST = "127.0.0.1";// RabbitMQ Server. private Connection connection = null; private Channel channel = null;
public Channel getChannel() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(RabbitmqClient.HOST); this.connection = factory.newConnection(); this.channel = connection.createChannel();
return this.channel; }
public void close() throws IOException, TimeoutException { this.channel.close(); this.connection.close(); } }
class Message { public String exchange = ""; public String routingKey = ""; public String body = "";
public Message(String routingKey, String body) { this.routingKey = routingKey; this.body = body; }
public Message(String exchange, String routingKey, String body) { this.exchange = exchange; this.routingKey = routingKey; this.body = body; }
public String toString() { if (exchange.length() > 0) { return String.format("Exchange='%s', Key='%s', '%s'", exchange, routingKey, body); } else { return String.format("Key='%s', '%s'", routingKey, body); } } } |