python - Calculating Weighted Mean in PySpark -


i trying calculate weighted mean in pyspark not making lot of progress

# example data df = sc.parallelize([     ("a", 7, 1), ("a", 5, 2), ("a", 4, 3),     ("b", 2, 2), ("b", 5, 4), ("c", 1, -1) ]).todf(["k", "v1", "v2"]) df.show()  import numpy np def weighted_mean(workclass, final_weight):     return np.average(workclass, weights=final_weight)  weighted_mean_udaf = pyspark.sql.functions.udf(weighted_mean,     pyspark.sql.types.integertype()) 

but when try execute code

df.groupby('k').agg(weighted_mean_udaf(df.v1,df.v2)).show() 

i getting error

u"expression 'pythonudf' neither present in group by, nor aggregate function. add group or wrap in first() (or first_value) if don't care value 

my question is, can specify custom function ( taking multiple arguments) argument agg? if not, there alternative perform operations weighted mean after grouping key?

user defined aggregation function (udaf, works on pyspark.sql.groupeddata not supported in pyspark) not user defined function (udf, works on pyspark.sql.dataframe).

because in pyspark cannot create own udaf, , supplied udafs cannot resolve issue, may need go rdd world:

from numpy import sum  def weighted_mean(vals):     vals = list(vals)  # save values iterator     sum_of_weights = sum(tup[1] tup in vals)     return sum(1. * tup[0] * tup[1] / sum_of_weights tup in vals)  df.map(     lambda x: (x[0], tuple(x[1:]))  # reshape (key, val) grouping work ).groupbykey().mapvalues(     weighted_mean ).collect() 

Comments