webdevqa.jp.net

Spark 1.6:describe()によって生成されたDataFrameのフィルタリング

DataFrameでdescribe関数を呼び出すと、問題が発生します。

_val statsDF = myDataFrame.describe()
_

Describe関数を呼び出すと、次の出力が生成されます。

_statsDF: org.Apache.spark.sql.DataFrame = [summary: string, count: string]
_

statsDF.show()を呼び出すことで、通常はstatsDFを表示できます。

_+-------+------------------+
|summary|             count|
+-------+------------------+
|  count|             53173|
|   mean|104.76128862392568|
| stddev|3577.8184333911513|
|    min|                 1|
|    max|            558407|
+-------+------------------+
_

statsDFから標準偏差と平均を取得したいのですが、次のような方法で値を収集しようとしています。

_val temp = statsDF.where($"summary" === "stddev").collect()
_

_Task not serializable_例外が発生しています。

電話をかけると、同じ例外に直面します。

_statsDF.where($"summary" === "stddev").show()
_

describe()関数によって生成されたDataFrameをフィルタリングできないようです。

9
Rami

いくつかの健康疾患データを含むおもちゃのデータセットを検討しました

val stddev_tobacco = rawData.describe().rdd.map{ 
    case r : Row => (r.getAs[String]("summary"),r.get(1))
}.filter(_._1 == "stddev").map(_._2).collect
5
eliasah

データフレームから選択できます。

from pyspark.sql.functions import mean, min, max
df.select([mean('uniform'), min('uniform'), max('uniform')]).show()
+------------------+-------------------+------------------+
|      AVG(uniform)|       MIN(uniform)|      MAX(uniform)|
+------------------+-------------------+------------------+
|0.5215336029384192|0.19657711634539565|0.9970412477032209|
+------------------+-------------------+------------------+

テーブルとして登録し、テーブルにクエリを実行することもできます。

val t = x.describe()
t.registerTempTable("dt")

%sql 
select * from dt
2
oluies

別のオプションは、selectExpr()を使用することです。これも最適化されて実行されます。最小値を取得するには:

myDataFrame.selectExpr('MIN(count)').head()[0]
1
Boern
myDataFrame.describe().filter($"summary"==="stddev").show()

これはSpark 2.3.0で非常にうまく機能しました

1
kvk