i trying calculate weighted mean in pyspark not making lot of progress
# example data df = sc.parallelize([ ("a", 7, 1), ("a", 5, 2), ("a", 4, 3), ("b", 2, 2), ("b", 5, 4), ("c", 1, -1) ]).todf(["k", "v1", "v2"]) df.show() import numpy np def weighted_mean(workclass, final_weight): return np.average(workclass, weights=final_weight) weighted_mean_udaf = pyspark.sql.functions.udf(weighted_mean, pyspark.sql.types.integertype())
but when try execute code
df.groupby('k').agg(weighted_mean_udaf(df.v1,df.v2)).show()
i getting error
u"expression 'pythonudf' neither present in group by, nor aggregate function. add group or wrap in first() (or first_value) if don't care value
my question is, can specify custom function ( taking multiple arguments) argument agg? if not, there alternative perform operations weighted mean after grouping key?
user defined aggregation function (udaf, works on pyspark.sql.groupeddata
not supported in pyspark) not user defined function (udf, works on pyspark.sql.dataframe
).
because in pyspark cannot create own udaf, , supplied udafs cannot resolve issue, may need go rdd world:
from numpy import sum def weighted_mean(vals): vals = list(vals) # save values iterator sum_of_weights = sum(tup[1] tup in vals) return sum(1. * tup[0] * tup[1] / sum_of_weights tup in vals) df.map( lambda x: (x[0], tuple(x[1:])) # reshape (key, val) grouping work ).groupbykey().mapvalues( weighted_mean ).collect()
Comments
Post a Comment