问题描述

使用官方提供的格式调用Structured Streaming 的foreachBatch案例输出时报异常,下面是案例的代码

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

下面是我当时遇到的异常输出

Error:(87, 8) ambiguous reference to overloaded definition,
both method foreachBatch in class DataStreamWriter of type (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
and  method foreachBatch in class DataStreamWriter of type (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
match argument types ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => Unit)

解决方案

确定lambda表达式输出Unit。修改代码为:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
  () //在最后添加一个括号,表示输出Unit
}

其他说明

这样改主要是因为,对于两个冲突的lambda类型(org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],Long])(org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Long)=>Unit,后一的lambda表达式有确定返回值为Unit。其实我也不是很确定是不是这个原因造成的,但是当时这样改就成功了。

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐