Spring Cloud Stream kafka项目启动时报错
于是就对streamBridge加了非空判断,不为null时再去调用send方法发送消息,但是仍然遇到了空指针异常。继续debug代码,追踪出错时的情况和正常情况,进行对比,发现失败时streamBridge的初始化initialized为完成为false,成功时为true。原因不难想到,项目启动时,还未创建好kafka连接,但是程序已经接收到socket消息,已经开始调用streamBridge
报错信息
- 使用的是
spring-cloud-starter-stream-kafka
,函数式编程 - 程序接收
socket.io
消息,使用kafka发送消息 - 程序启动时,
StreamBridge.send
发送消息报空指针异常,追踪代码报错在Message<byte[]> resultMessage = (Message)((Function)functionToInvoke).apply(data);
这一行,functionToInvoke
为null
- 具体代码如下:
@Component
public class ChannelParamHandler {
static final String OUTPUT_BINDING_NAME = "channel-out-0";
private StreamBridge streamBridge;
public ChannelParamHandler(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@OnEvent(EventName.CHANNEL_PARAM)
public void onEvent(SocketIOClient client, Object data, AckRequest ackSender) {
// ....其他逻辑省略
streamBridge.send(OUTPUT_BINDING_NAME, mapper.writeValueAsString(channelDTOList) + "-");
// ....其他逻辑省略
}
}
- 启动时有一个warn日志
WARN 1 --- [ restartedMain] onConfiguration$FunctionBindingRegistrar : You have defined function definition that does not exist: streamBridge
- 具体报错信息如下:
2022-07-22 11:08:38.934 ERROR 18088 --- [ntLoopGroup-3-6] c.n.s.listener.DefaultExceptionListener : java.lang.NullPointerException
com.newatc.socketio.handler.SocketIOException: java.lang.NullPointerException
at com.newatc.socketio.annotation.OnEventScanner$2.onData(OnEventScanner.java:111)
at com.newatc.socketio.namespace.Namespace.onEvent(Namespace.java:127)
at com.newatc.socketio.handler.PacketListener.onPacket(PacketListener.java:73)
at com.newatc.socketio.handler.InPacketHandler.channelRead0(InPacketHandler.java:84)
at com.newatc.socketio.handler.InPacketHandler.channelRead0(InPacketHandler.java:37)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:305)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at com.newatc.socketio.handler.AuthorizeHandler.channelRead(AuthorizeHandler.java:132)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException: null
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:221)
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:164)
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:144)
at com.newatc.unit.agent.handler.PhaseStatusHandler.onEvent(PhaseStatusHandler.java:65)
at jdk.internal.reflect.GeneratedMethodAccessor292.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at com.newatc.socketio.annotation.OnEventScanner$2.onData(OnEventScanner.java:109)
... 42 common frames omitted
- streamBridge为null,空指针异常
- streamBridge不为null时,functionToInvoke 为null,空指针异常
解决
-
原因不难想到,项目启动时,还未创建好kafka连接,但是程序已经接收到socket消息,已经开始调用streamBridge发送消息了
-
这个时候streamBridge并未创建好或者初始化完成,就导致了空指针异常
-
一开始想着优先创建kafka连接相关bean完成初始化,再去允许程序调用,但是针对
Spring Cloud Stream
,实在找不到可以提前初始化的bean,这条路失败 -
项目启动或重启过程中,接收到的一些socket消息,其实是可以丢弃的,项目启动完成后再接收处理也没关系。于是就对streamBridge加了非空判断,不为null时再去调用send方法发送消息,但是仍然遇到了空指针异常
-
继续debug代码,追踪出错时的情况和正常情况,进行对比,发现失败时streamBridge的初始化initialized为false,成功时为true
-
于是继续看源码找初始化方法,根据initialized搜索,果然找到了
-
判断非空后,先调用初始化方法,再调用send方法,果然没有空指针问题了
-
总结一下:空指针异常处理,增加非空判断,增加初始化方法
if (streamBridge != null) {
streamBridge.afterSingletonsInstantiated();
streamBridge.send(OUTPUT_BINDING_NAME, mapper.writeValueAsString(channelDTOList) + "-");
} else {
log.error("streamBridge is null-------------------!");
}

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。
更多推荐
所有评论(0)