scala - Spark 的分层凝聚聚类
问题描述
我正在使用 Spark 和 Scala 开展一个项目,我正在寻找一种层次聚类算法,它类似于 scipy.cluster.hierarchy.fcluster 或 sklearn.cluster.AgglomerativeClustering,可用于大量数据。
Spark 的 MLlib 实现了 Bisecting k-means,它需要输入集群的数量。不幸的是,在我的情况下,我不知道集群的数量,我更愿意使用一些距离阈值作为输入参数,因为可以在上面的这两个 python 实现中使用。
如果有人知道答案,我将不胜感激。
解决方案
所以我遇到了同样的问题,在寻找高低之后没有找到答案,所以我将在这里发布我所做的,希望它可以帮助其他人,也许有人会在它的基础上建立。
我所做的基本思想是递归地使用二等分 K-means 继续将集群分成两半,直到集群中的所有点都与质心相距指定距离。我使用的是 gps 数据,所以我有一些额外的机器来处理它。
第一步是创建一个模型,将数据减半。我使用二等分 K 均值,但我认为这适用于任何 pyspark 聚类方法,只要您可以获得到质心的距离。
import pyspark.sql.functions as f
from pyspark import SparkContext, SQLContext
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
bkm = BisectingKMeans().setK(2).setSeed(1)
assembler = VectorAssembler(inputCols=['lat','long'], outputCol="features")
adf = assembler.transform(locAggDf)#locAggDf contains my location info
model = bkm.fit(adf)
# predictions will have the original data plus the "features" col which assigns a cluster number
predictions = model.transform(adf)
predictions.persist()
下一步是我们的递归函数。这里的想法是我们指定与质心的距离,如果集群中的任何点比该距离更远,我们将集群切成两半。当一个集群足够紧密以至于它满足条件时,我将它添加到我用来构建最终集群的结果数组中
def bisectToDist(model, predictions, bkm, precision, result = []):
centers = model.clusterCenters()
# row[0] is predictedClusterNum, row[1] is unit, row[2] point lat, row[3] point long
# centers[row[0]] is the lat long of center, centers[row[0]][0] = lat, centers[row[0]][1] = long
distUdf = f.udf(
lambda row: getDistWrapper((centers[row[0]][0], centers[row[0]][1], row[1]), (row[2], row[3], row[1])),
FloatType())##getDistWrapper(is how I calculate the distance of lat and long but you can define any distance metric)
predictions = predictions.withColumn('dist', distUdf(
f.struct(predictions.prediction, predictions.encodedPrecisionUnit, predictions.lat, predictions.long)))
#create a df of all rows that were in clusters that had a point outside of the threshold
toBig = predictions.join(
predictions.groupby('prediction').agg({"dist": "max"}).filter(f.col('max(dist)') > self.precision).select(
'prediction'), ['prediction'], 'leftsemi')
#this could probably be improved
#get all cluster numbers that were to big
listids = toBig.select("prediction").distinct().rdd.flatMap(lambda x: x).collect()
#if all data points are within the speficed distance of the centroid we can return the clustering
if len(listids) == 0:
return predictions
# assuming binary class now k must be = 2
# if one of the two clusters was small enough we will not have another recusion call for that cluster
# we must save it and return it at this depth the clustiering that was 2 big will be cut in half in the loop below
if len(listids) == 1:
ok = predictions.join(
predictions.groupby('prediction').agg({"dist": "max"}).filter(
f.col('max(dist)') <= precision).select(
'prediction'), ['prediction'], 'leftsemi')
for clusterId in listids:
# get all of the pieces that were to big
part = toBig.filter(toBig.prediction == clusterId)
# we now deed to refit the subset of the data
assembler = VectorAssembler(inputCols=['lat', 'long'], outputCol="features")
adf = assembler.transform(part.drop('prediction').drop('features').drop('dist'))
model = bkm.fit(adf)
#predictions now holds the new subclustering and we are ready for recursion
predictions = model.transform(adf)
result.append(bisectToDist(model, predictions, bkm, result=result))
#return anything that was given and already good
if len(listids) == 1:
return ok
最后我们可以调用该函数并构建生成的数据框
result = []
self.bisectToDist(model, predictions, bkm, result=result)
#drop any nones can happen in recursive not top level call
result =[r for r in result if r]
r = result[0]
r = r.withColumn('subIdx',f.lit(0))
result = result[1:]
idx = 1
for r1 in result:
r1 = r1.withColumn('subIdx',f.lit(idx))
r = r.unionByName(r1)
idx = idx + 1
# each of the subclusters will have a 0 or 1 classification in order to make it 0 - n I added the following
r = r.withColumn('delta', r.subIdx * 100 + r.prediction)
r = r.withColumn('delta', r.delta - f.lag(r.delta, 1).over(Window.orderBy("delta"))).fillna(0)
r = r.withColumn('ddelta', f.when(r.delta != 0,1).otherwise(0))
r = r.withColumn('spacialLocNum',f.sum('ddelta').over(Window.orderBy(['subIdx','prediction'])))
#spacialLocNum should be the final clustering
诚然,这是相当复杂和缓慢的,但它确实完成了工作,希望这会有所帮助!
推荐阅读
- oracle - Oracle 数据库的代理 SQL
- yii2 - 如何延迟 yii2-queue 任务,直到所有先前的任务都完成?
- excel - 向下扫描一列,直到您点击文本,然后粘贴第一行和最后一行的编号
- excel - 如何复制相对于我的活动单元格的一系列单元格?
- swift - 通过进程运行脚本 - 找不到脚本
- c# - 字符串数组 GetLength(1) 工作替代
- python - 如何使用python在文件中有效地存储多个浮点数组
- c# - 使用 SSIS 包和脚本任务执行具有多个子报表的 SSRS 报表
- java - JavaFX + SQL:为 SQL 数据库的每一行添加 1 个带有 3 个 ComboBox 的新 HBox
- reactjs - 在 React 中使用下拉按钮