webdevqa.jp.net

カスタムケースクラスのデータセットを作成するときに、「データセットに格納されているタイプのエンコーダーが見つかりません」というのはなぜですか?

Spark 2.0(最終版)Scala 2.11.8。次の非常に単純なコードは、コンパイルエラーError:(17, 45) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.を生成します

import org.Apache.spark.sql.SparkSession

case class SimpleTuple(id: Int, desc: String)

object DatasetTest {
  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder.
      master("local")
      .appName("example")
      .getOrCreate()

    val dataset = sparkSession.createDataset(dataList)
  }
}
52
clay

Spark Datasetsには、保存されるデータ型にEncodersが必要です。一般的なタイプ(アトミック、製品タイプ)には、多数の定義済みエンコーダーが用意されていますが、これらを機能させるには、まず SparkSession.implicits からこれらをインポートする必要があります。

val sparkSession: SparkSession = ???
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)

または、明示的に直接提供することができます

import org.Apache.spark.sql.{Encoder, Encoders}

val dataset = sparkSession.createDataset(dataList)(Encoders.product[SimpleTuple])

または暗黙的

implicit val enc: Encoder[SimpleTuple] = Encoders.product[SimpleTuple]
val dataset = sparkSession.createDataset(dataList)

格納された型のEncoder

Enocdersは、アトミックタイプの定義済みEncodersの数も提供し、複雑なタイプのEncodersは、 ExpressionEncoder で導出できることに注意してください。

参考文献:

75
zero323

他のユーザー(自分が正しい)の場合、case classobjectスコープの外側で定義されることも重要であることに注意してください。そう:

失敗:

object DatasetTest {
  case class SimpleTuple(id: Int, desc: String)

  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .master("local")
      .appName("example")
      .getOrCreate()
    val dataset = sparkSession.createDataset(dataList)
  }
}

暗黙を追加しますが、同じエラーで失敗します:

object DatasetTest {
  case class SimpleTuple(id: Int, desc: String)

  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .master("local")
      .appName("example")
      .getOrCreate()

    import sparkSession.implicits._
    val dataset = sparkSession.createDataset(dataList)
  }
}

作品:

case class SimpleTuple(id: Int, desc: String)

object DatasetTest {   
  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .master("local")
      .appName("example")
      .getOrCreate()

    import sparkSession.implicits._
    val dataset = sparkSession.createDataset(dataList)
  }
}

関連するバグは次のとおりです: https://issues.Apache.org/jira/browse/SPARK-13​​540 。 Sparkのリリース2。

(編集:そのバグ修正は実際にはSpark 2.0.0にあるようです...だから、これがまだ失敗する理由はわかりません)。

44
MrProper