読者です 読者をやめる 読者になる 読者になる

Screaming Loud

研究・プログラミングなど気づいたことをメモをしています

K-means

python プログラミング 機械学習

今回は"k-means"(k-平均法)を実装してみました.
k-meansに関してはネット上でたくさん説明されています.
簡単に説明すると,
前提:クラスタ数は与えられている
1,適当に各データ点をクラスに割り振る.
2,各クラスタの重心を求める
3,各データ点を求めた一番近い重心に割り振る.
4,2-3を収束するまで繰り返す.
とこんな感じです.

実装

import random
import numpy as np
from optparse import OptionParser

#D = [[0,1,3],[8,3,1.5],[10,2,1],[2,7,4],[7,1,5]]
D = [[0,1],[3,8],[3,1],[5,10],[6,1],[11,2],[2,11]]

class Iter(object):
    def __init__(self,array,k):
	self.array = array
	self.repre = []
	self.clusters = {}
	self.cluster_maker(range(1,k+1))
	self.dim = len(self.array[0])
	self.n_clus = k
	self.random_assign()

    def cluster_maker(self,index):
	self.clusters.clear()
	for i in index:
	    self.clusters.setdefault(str(i),[])

    def random_assign(self):
	'''assign cluster to array features randomly
	lists transform to ndarray'''
	for feat in self.array:
	    index = str(random.randint(1,self.n_clus))
	    self.clusters[index].append(feat)
	for index in self.clusters:
	    self.clusters[index] = np.asarray(self.clusters[index])

    def distance(self,x,y):
	'''it calculate the distance,
	but we do not need exact distance,
	so we do not extract the square root.'''
	sum = 0
	for d in range(self.dim):
	    sum += (x[d]-y[d])**2
	return sum

    def centroid(self,array):
	'''calculate centroids'''
	return np.mean(array,axis=0)
	
    def representative(self):
	'''calculate representative point'''
	self.repre = []
	for cluster in self.clusters:
	    cent = self.centroid(self.clusters[cluster])
	    self.repre.append(cent)#.tolist())

    def calc_min(self,point):
	'''calculate the distance and return the nearest point'''
	return min([(self.distance(x,point),x) for x in self.repre])

    def near_assign(self):
	'''calclate the nearest representative point'''
	self.cluster_maker(self.repre)
	for feat in self.array:
	    min_ix = str(self.calc_min(feat)[1])
	    self.clusters[min_ix].append(feat)
	for index in self.clusters:
	    self.clusters[index] = np.asarray(self.clusters[index])

def check_end(dicA,dicB):
    '''check whether changes exist or not'''
    A = dicA.keys()
    B = dicB.keys()
    if A==B:
	for key in dicA:
	    if dicA[key].all()!=dicB[key].all():
		return False
	return True
    else:
	return False

def main(array,cluster):
    '''main part of the calculation'''
    it = Iter(D,cluster)
    new = it.clusters
    old = 0
    import copy
    while(True):
	it.representative()
	old = copy.copy(new)
	it.near_assign()
	new = it.clusters
	if check_end(new,old):
	    break
    return new
    
if __name__ == '__main__':
    usage = 'usage: %prog [options]'
    parser = OptionParser(usage)
    parser.add_option('-k',dest='num_k',type='str',default=2)
    (options,args) = parser.parse_args()
    
    cluster = options.num_k
    print main(D,cluster)

追記:
本ブログではnumpyをちゃんと使っていない.numpyをちゃんと使ったもの.
numpyを使ってK-meansを書いてみた - 唯物是真 @Scaled_Wurm