首页 > 其他 > 详细

k means || on hadoop

时间:2020-01-14 15:23:48      阅读:83      评论:0      收藏:0      [点我收藏+]

k means 是经典的聚类算法,这里不详细介绍。而初始化的kmeans的效果及收敛时间极端依赖初始化的K个点,对此有改进的算法k means ++ ,但是由于其实现过程不能并行对于大数据集的聚类显得有点势单力薄。对此,斯坦福提出了K means || 做出改进,详细证明内容请参考文献1。k-means∥ 改变了的取样策略,并不是 k-means++ 那样每次只取一个样本,而是每次遍历取样 O(k) 个样本,重复该取样过程大约 O(logn) 次,重复取样过后共得到 O(klogn) 个样本点组成的集合,该集合以常数因子近似于最优解,然后再聚类这O(klogn) 个点成 k 个点,最后将这 k 个点作为初始聚类中心送入Lloyd迭代中,实际实验证明 O(logn) 次重复取样是不需要的,一般5次重复取样就可以得到一个较好的聚类初始中心。正由于此,它的每次采样可以并行实现,本文尝试使用mapreduce实现实现该初始化过程。

本文使用mrjob框架方便使用python编写mapreduce作业。

下面是主文件内容

from compute_dist import Compute_dist
from update_centriods import Update_centriods
import pickle as pk
import sys

def get_sum_dist(job, runner):
    for line in runner.stream_output():
        key, value = job.parse_output_line(line)
    return value

def write_sum_dist_to_disk(sum_dist, fileName):
    sumdistFile = open(fileName, "w")
    sumdistFile.truncate()
    sumdistFile.write(str(sum_dist))
    sumdistFile.close()

def get_centroids(job, runner):
    centroids = []
    for line in runner.stream_output():
        key, value = job.parse_output_line(line)
        if value == 1:
            centroids.append(key)
    return centroids

def write_centroids_to_disk(centroids, fileName):
    centroidFile = open(fileName, "a")
    for centroid in centroids:
        line =  .join(str(i) for i in centroid)
        line = line.strip()
        centroidFile.write("\n" + line)
    centroidFile.close()

SUMDIST_FILE = "sum.txt"
CENTROIDS_FILE = "centroids.txt"

if __name__ == __main__:
    args = sys.argv[1:]
    
    computeJob = Compute_dist(args = args + [--centroids=+CENTROIDS_FILE])

    for i in range(5):
        with computeJob.make_runner() as computeJobRunner:
            computeJobRunner.run()

            sum_dist = get_sum_dist(computeJob, computeJobRunner)
            write_sum_dist_to_disk(sum_dist, SUMDIST_FILE)

            updaterJob = Update_centriods(args=args + [--centroids=+CENTROIDS_FILE, --sumdist=+SUMDIST_FILE])
            with updaterJob.make_runner() as updaterJobRunner:
                updaterJobRunner.run()
                newCentroids = get_centroids(updaterJob, updaterJobRunner)
                write_centroids_to_disk(newCentroids, CENTROIDS_FILE)

 一下是两个副文件

from mrjob.job import MRJob
from mrjob.step import MRStep
import random
import numpy as np
import pickle as pk
#from hdfs.client import Client



class Compute_dist(MRJob):
    
    def distEclud(self, vecA, vecB):
        dist_int = 0
        for i in range(len(vecA)):
            dist_int = dist_int + (vecA[i]-vecB[i])**2
        dist = dist_int ** 0.5
        return dist

    def configure_args(self):
        super(Compute_dist, self).configure_args()
        self.add_file_arg(--centroids)

    def get_centroids(self):
        centroidsFile = open(self.options.centroids)
        lines = centroidsFile.readlines()
        centroids = []
        for line in lines:
            line = line.strip().split( )
            line = [int(x) for x in line]
            centroids.append(line)
        centroidsFile.close()
        return centroids


    def mapper_get_sum_dist(self, _, line):
        line = line.strip().split( )
        line = [int(x) for x in line]
        centroids = self.get_centroids()
        Dx = []
        for c in centroids:
            temp = self.distEclud(c,line)
            Dx.append(temp)

        min_dist = min(Dx)
        yield None, min_dist
    
    def reducer_get_sum_dist(self, _, dist):
        yield None, sum(dist)


    def steps(self):
        return [MRStep(mapper = self.mapper_get_sum_dist,
                        reducer = self.reducer_get_sum_dist)]

if __name__ == __main__:
    Compute_dist.run()
        
from mrjob.job import MRJob
from mrjob.step import MRStep
import random
import numpy as np
import pickle as pk


class Update_centriods(MRJob):
    
    def distEclud(self, vecA, vecB):
        dist_int = 0
        for i in range(len(vecA)):
            dist_int = dist_int + (vecA[i]-vecB[i])**2
        dist = dist_int ** 0.5
        return dist

    def configure_args(self):
        super(Update_centriods, self).configure_args()
        self.add_file_arg(--centroids)
        self.add_file_arg(--sumdist)

    def get_centroids(self):
        centroidsFile = open(self.options.centroids)
        lines = centroidsFile.readlines()
        centroids = []
        for line in lines:
            line = line.strip().split( )
            line = [int(x) for x in line]
            centroids.append(line)
        centroidsFile.close()
        return centroids

    def get_sum_dist(self):
        sumdistFile = open(self.options.sumdist)
        dist = sumdistFile.readline().strip()
        dist = float(dist)
        sumdistFile.close()
        return dist

    def mapper_sample_centroids(self, _, line):
        dist = self.get_sum_dist()
        line = line.strip().split( )
        line = [int(x) for x in line]
        centroids = self.get_centroids()
        Dx = []
        for c in centroids:
            temp = self.distEclud(c, line)
            Dx.append(temp)
        min_dist = min(Dx)
        prob = (5*min_dist)/dist
        r = random.random()
        if r < prob:
            yield line, 1
        else:
            yield line, 0
    
    def steps(self):
        return [MRStep(mapper = self.mapper_sample_centroids)]



if __name__ == __main__:
    Update_centriods.run()

 

 

 

参考文献:

1.Bahman Bahmani,Benjamin Moseley,Andrea Vattani.Scalable K-Means++

k means || on hadoop

原文:https://www.cnblogs.com/londist/p/11151830.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!