from __future__ import absolute_import from __future__ import division from __future__ import print_function
from sklearn.manifold import TSNE import matplotlib.pyplot as plt
import collections import math import os import random import zipfile
import numpy as np from six.moves import urllib from six.moves import xrange import tensorflow as tf
classBasicPatternEmbedding: def__init__(self): self.url = 'http://mattmahoney.net/dc/' self.data_index = 0 self.vocabulary_size = 5000 self.batch_size = 128 self.embedding_size = 128# Dimension of the embedding vector. self.skip_window = 1# How many words to consider left and right. self.num_skips = 2# How many times to reuse an input to generate a label.
# We pick a random validation set to sample nearest neighbors. Here we limit the # validation samples to the words that have a low numeric ID, which by # construction are also the most frequent. self.valid_size = 16# Random set of words to evaluate similarity on. self.valid_window = 100# Only pick dev samples in the head of the distribution. # choose 16 numbers from 0 to 99 randomly self.valid_examples = np.random.choice(self.valid_window, self.valid_size, replace=False) self.num_sampled = 64# Number of negative examples to sample. self.num_steps = 10001 self.final_embedding = None self.graph = tf.Graph() # download and verify the dataset file defmaybe_download(self, filename, expected_bytes): # If the dataset file is not under the current path, download it directly ifnot os.path.exists(filename): filename, _ = urllib.request.urlretrieve(self.url + filename, filename) # get dataset file infomationn statinfo = os.stat(filename) # verify file size if statinfo.st_size == expected_bytes: print('Found and verified', filename) else: print(statinfo.st_size) raise Exception( 'Failed to verify ' + filename + '. Can you get to it with a browser?') return filename # read the data from zip into a list of strings defread_data(self, filename): with zipfile.ZipFile(filename) as f: # separate by default separators, that is, all null characters, including spaces, newlines (\n), tabs (\t), etc. data = tf.compat.as_str(f.read(f.namelist()[0])).split() return data # process raw inputs into a dataset defbuild_dataset(self, words): # add unknown words into count list count = [['UNK', -1]] # count the words list and add the pairs (word_name, number) into count list count.extend(collections.Counter(words).most_common(self.vocabulary_size - 1)) dictionary = dict() # create a dictionary of the words with serial number for word, _ in count: dictionary[word] = len(dictionary) data = list() unk_count = 0 # convert the word list into a number list, 0 for unknown words for word in words: if word in dictionary: index = dictionary[word] else: index = 0 unk_count += 1 data.append(index) # update the number of UNK count[0][1] = unk_count # generate a new dictionary by exchanging key and value reversed_dictionary = dict(zip(dictionary.values(), dictionary.keys())) return data, count, dictionary, reversed_dictionary # function to generate a training batch for the skip-gram model defgenerate_batch(self, data): # make sure the data length is OK assert self.batch_size % self.num_skips == 0 assert self.num_skips <= 2 * self.skip_window
batch = np.ndarray(shape=(self.batch_size), dtype=np.int32) labels = np.ndarray(shape=(self.batch_size, 1), dtype=np.int32) span = 2 * self.skip_window + 1# [ skip_window target skip_window ] # create a new double-ended queue to store the buffer buffer = collections.deque(maxlen=span) # data_index indicates the end point of the current window if self.data_index + span > len(data): data_index = 0 buffer.extend(data[self.data_index:self.data_index + span]) self.data_index += span for i in range(self.batch_size // self.num_skips): target = self.skip_window # target label at the center of the buffer targets_to_avoid = [self.skip_window] # sample num_skips batches and labels, optimizable for j in range(self.num_skips): while target in targets_to_avoid: target = random.randint(0, span - 1) # avoid sampling to the same target targets_to_avoid.append(target) # each batch item stands for input batch[i * self.num_skips + j] = buffer[self.skip_window] # each label item stands for ground truth labels[i * self.num_skips + j, 0] = buffer[target] if self.data_index == len(data): buffer[:] = data[:span] self.data_index = span else: buffer.append(data[self.data_index]) self.data_index += 1 # Backtrack a little bit to avoid skipping words in the end of a batch self.data_index = self.data_index - span return batch, labels deftrain(self, data, reverse_dictionary): with self.graph.as_default(): train_inputs = tf.placeholder(tf.int32, shape=[self.batch_size]) train_labels = tf.placeholder(tf.int32, shape=[self.batch_size, 1]) valid_dataset = tf.constant(self.valid_examples, dtype=tf.int32)
# Ops and variables pinned to the CPU with tf.device('/cpu:0'): # Look up embeddings for inputs. embeddings = tf.Variable(tf.random_uniform([self.vocabulary_size, self.embedding_size], -1.0, 1.0)) # according to embeddings, the 128-dimensional vector corresponding to the input word(train inputs) was extracted embed = tf.nn.embedding_lookup(embeddings, train_inputs)
# Construct the variables for the NCE loss nce_weights = tf.Variable(tf.truncated_normal([self.vocabulary_size, self.embedding_size], stddev=1.0 / math.sqrt(self.embedding_size))) nce_biases = tf.Variable(tf.zeros([self.vocabulary_size])) # Compute the average NCE loss for the batch. # tf.nce_loss automatically draws a new sample of the negative labels each # time we evaluate the loss. loss = tf.reduce_mean( tf.nn.nce_loss(weights=nce_weights, biases=nce_biases, labels=train_labels, inputs=embed, num_sampled=self.num_sampled, num_classes=self.vocabulary_size)) # Construct the SGD optimizer using a learning rate of 1.0. optimizer = tf.train.GradientDescentOptimizer(1.0).minimize(loss)
# Compute the cosine similarity between minibatch examples and all embeddings. norm = tf.sqrt(tf.reduce_sum(tf.square(embeddings), 1, keep_dims=True)) normalized_embeddings = embeddings / norm valid_embeddings = tf.nn.embedding_lookup(normalized_embeddings, valid_dataset) similarity = tf.matmul(valid_embeddings, normalized_embeddings, transpose_b=True)
# Add variable initializer. init = tf.global_variables_initializer() with tf.Session(graph = self.graph) as session: init.run() average_loss = 0 for step in xrange(self.num_steps): batch_inputs, batch_labels = self.generate_batch(data) feed_dict = {train_inputs: batch_inputs, train_labels: batch_labels}
# we perform one update step by evaluating the optimizer op (including it # in the list of returned values for session.run() _, loss_val = session.run([optimizer, loss], feed_dict=feed_dict) average_loss += loss_val if step % 2000 == 0: if step > 0: average_loss /= 2000 # the average loss is an estimate of the loss over the last 2000 batches. print('Average loss at step ', step, ': ', average_loss) average_loss = 0 # output the most similar eight words to the screen if step % 10000 == 0: sim = similarity.eval() for i in xrange(self.valid_size): valid_word = reverse_dictionary[self.valid_examples[i]] top_k = 8# number of nearest neighbors nearest = (-sim[i, :]).argsort()[1:top_k + 1] log_str = 'Nearest to %s:' % valid_word for k in xrange(top_k): close_word = reverse_dictionary[nearest[k]] log_str = '%s %s,' % (log_str, close_word) print(log_str) self.final_embeddings = normalized_embeddings.eval() # visualize the embeddings defplot_with_labels(self, low_dim_embs, labels, filename='tsne.png'): assert low_dim_embs.shape[0] >= len(labels), 'More labels than embeddings' plt.figure(figsize=(18, 18)) # in inches for i, label in enumerate(labels): x, y = low_dim_embs[i, :] plt.scatter(x, y) plt.annotate(label, xy=(x, y), xytext=(5, 2), textcoords='offset points', ha='right', va='bottom') plt.show() #plt.savefig(filename)
# Signature: tf.nn.embedding_lookup(params, ids, partition_strategy='mod', name=None, validate_indices=True, max_norm=None) # Docstring: # Looks up `ids` in a list of embedding tensors.