前段时间由于公司rabbitmq集群网络出现故障导致Rabbitmq响应延迟,引发楼主团队的一个核心服务出现连锁问题,现用这个demo复盘一下当时情景,及演示如何快速定位问题,以及总结经验教训思考如何避免类似情况发生
项目地址: https://gitee.com/kenwar/debug_online_demo
"http-nio-8093-exec-7" #20 daemon prio=5 os_prio=0 tid=0x00007f6a58b8c000 nid=0x15a8 waiting on condition [0x00007f6a32167000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000ed03f468> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at com.alibaba.druid.pool.DruidDataSource.pollLast(DruidDataSource.java:1487)
at com.alibaba.druid.pool.DruidDataSource.getConnectionInternal(DruidDataSource.java:1086)
at com.alibaba.druid.pool.DruidDataSource.getConnectionDirect(DruidDataSource.java:953)
at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:933)
at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:923)
at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:100)
at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:264)
DruidDataSource.java:1487
// druid连接池获取连接时会通过该condition进行限时等待,限时时间为连接池中配置的maxWAit值
estimate = notEmpty.awaitNanos(estimate); // signal by
"http-nio-8093-exec-6" #19 daemon prio=5 os_prio=0 tid=0x00007f7274b0d000 nid=0x2930 waiting for monitor entry [0x00007f7243af6000]
java.lang.Thread.State: BLOCKED (on object monitor)
at com.ken.debugonline.sender.MsgSender.sender(MsgSender.java:29)
- waiting to lock <0x00000000ed1202f8> (a java.lang.Object)
at com.ken.debugonline.sender.MsgSender.sendMsg(MsgSender.java:20)
at com.ken.debugonline.service.impl.TestServiceImpl.sendMsg(TestServiceImpl.java:23)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.transaction.interceptor.TransactionInterceptor$$Lambda$508/65382615.proceedWithInvocation(Unknown Source)
模拟阻塞的代码private void sender(Integer msg){
// 该锁模拟rabbitmq获取connection是的逻辑 CachingConnectionFactory.java-> createConnection
synchronized (connectionMonitor){
try {
// 模拟mq建立连接时的网络阻塞
Thread.sleep(20000);
log.info("send msg:{} to mq",msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
CachingConnectionFactory.java->createConnection()实际rabbitmq代码片段 public final Connection createConnection() throws AmqpException {
Assert.state(!this.stopped, "The ApplicationContext is closed and the ConnectionFactory can no longer create connections.");
Object var1 = this.connectionMonitor;
synchronized(this.connectionMonitor) {
...
模拟业务代码TestServiceImpl.java // 声明事务
@Transactional(rollbackFor = Exception.class)
public void sendMsg() {
int id = testMapper.insert();
事务中进行发送
testMapper.select(1L);
// 模拟发送消息
msgSender.sendMsg(id);
}
// 示例代码com.ken.debugonline.sender.MsgSender
/**
* 将消息放入队列中,使用线程异步发送,若队列已满则同步发送
* @param msg
*/
public void sendMsg(Integer msg){
boolean offer = needSend.offer(msg);
// 若队列已满则同步发送
if (!offer){
this.sender(msg);
}
}
private void sender(Integer msg){
// 该锁模拟rabbitmq获取connection是的逻辑 CachingConnectionFactory.java-> createConnection
synchronized (connectionMonitor){
try {
// 模拟mq建立连接时的网络阻塞
Thread.sleep(20000);
log.info("send msg:{} to mq",msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
原文:https://www.cnblogs.com/kenwar/p/13620431.html