Spark的StreamingContext连接Kafka错误

ERROR StreamingContext: Error starting the context, marking it as stopped
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:702)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:557)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:540)
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:84)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:70)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:240)
at org.apache.spark.streaming.DStreamGraph

1.先查看kafka是否启动成功

在这里插入图片描述

2.查看kafka的topic的list

在这里插入图片描述

3.查看程序中的kafka的配置,特别注意bootstrap.servers和topic

第一种

  val kafkaParams: Map[String, Object] = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "bigdata",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (true: java.lang.Boolean)
  )

第二种

  val kafkaParams: Map[String, String] = Map(
    ConsumerConfig.GROUP_ID_CONFIG -> "bigdata",
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest"
  )
Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐