admin_system/zyhs_admin_java/RabbitMQTest.java

127 lines
4.9 KiB
Java
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import com.rabbitmq.client.*;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.nio.charset.StandardCharsets;
import java.security.KeyStore;
import java.util.concurrent.TimeoutException;
/**
* RabbitMQ连接测试类
* 用于测试Java应用与RabbitMQ的连接、发送和接收消息
*/
public class RabbitMQTest {
private static final String HOST = "mq.zyihs.com";
private static final int PORT = 5671; // SSL端口
private static final String USERNAME = "admin";
private static final String PASSWORD = "a7c73c9a";
private static final String QUEUE_NAME = "test_queue";
private static final String EXCHANGE_NAME = "test_exchange";
private static final String ROUTING_KEY = "test_key";
public static void main(String[] args) {
System.out.println("开始测试RabbitMQ连接...");
try {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
// 配置SSL
try {
factory.useSslProtocol(createSslContext());
} catch (Exception e) {
System.out.println("SSL配置失败尝试不使用SSL连接: " + e.getMessage());
// 如果SSL配置失败尝试使用非SSL连接
factory.setPort(5672); // 非SSL端口
}
// 创建连接
System.out.println("正在连接到RabbitMQ...");
Connection connection = factory.newConnection();
System.out.println("连接成功!");
// 创建通道
Channel channel = connection.createChannel();
System.out.println("通道创建成功!");
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
System.out.println("交换机声明成功!");
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("队列声明成功!");
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
System.out.println("队列绑定成功!");
// 发送消息
String message = "Hello RabbitMQ! 测试消息 " + System.currentTimeMillis();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送成功: " + message);
// 接收消息
System.out.println("开始接收消息...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String receivedMessage = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("收到消息: " + receivedMessage);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费被取消: " + consumerTag);
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
// 等待5秒后关闭连接
Thread.sleep(5000);
// 清理资源
channel.queueDelete(QUEUE_NAME);
channel.exchangeDelete(EXCHANGE_NAME);
System.out.println("资源清理完成!");
// 关闭连接
channel.close();
connection.close();
System.out.println("连接已关闭!");
System.out.println("RabbitMQ测试完成连接正常");
} catch (Exception e) {
System.err.println("RabbitMQ测试失败: " + e.getMessage());
e.printStackTrace();
}
}
/**
* 创建SSL上下文
*/
private static SSLContext createSslContext() throws Exception {
char[] keyPassphrase = "changeit".toCharArray();
KeyStore ks = KeyStore.getInstance("PKCS12");
ks.load(new FileInputStream("/apps/ssl/mq.zyihs.com/client.p12"), keyPassphrase);
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, keyPassphrase);
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(ks);
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
return sslContext;
}
}