embedding_maker.py 4.77 KB
__all__ = [
    'load_embedding', 'load_vocab',
    'encoding_and_padding', 'get_embedding_model'
]

import bz2
import json
import os
from tqdm import tqdm
import numpy as np
import pkg_resources
from gensim.models import FastText

from soynlp.hangle import character_is_korean

def pad_sequences(sequences,
                  maxlen=None,
                  dtype='float32',
                  padding='pre',
                  truncating='pre',
                  value=0.):

    if not hasattr(sequences, '__len__'):
        raise ValueError('`sequences` must be iterable.')
    lengths = []
    for x in sequences:
        if not hasattr(x, '__len__'):
            raise ValueError('`sequences` must be a list of iterables. '
                             'Found non-iterable: ' + str(x))
        lengths.append(len(x))

    num_samples = len(sequences)
    if maxlen is None:
        maxlen = np.max(lengths)

    # take the sample shape from the first non empty sequence
    # checking for consistency in the main loop below.
    sample_shape = tuple()
    for s in sequences:
        if len(s) > 0:
            sample_shape = np.asarray(s).shape[1:]
            break

    x = (np.ones((num_samples, maxlen) + sample_shape) * value).astype(dtype)
    for idx, s in enumerate(sequences):
        #print(s)
        if not len(s):
            continue  # empty list/array was found
        if truncating == 'pre':
            trunc = s[-maxlen:]
        elif truncating == 'post':
            trunc = s[:maxlen]
        else:
            raise ValueError('Truncating type "%s" not understood' %
                             truncating)

        #print(type(trunc))
        trunc = np.asarray(trunc, dtype=dtype)
        if trunc.shape[1:] != sample_shape:
            raise ValueError(
                'Shape of sample %s of sequence at position %s is different from expected shape %s'
                % (trunc.shape[1:], idx, sample_shape))

        if padding == 'post':
            x[idx, :len(trunc)] = trunc
        elif padding == 'pre':
            x[idx, -len(trunc):] = trunc
        else:
            raise ValueError('Padding type "%s" not understood' % padding)
    return x


def load_embedding(embeddings_file):
    return (np.load(embeddings_file))


def load_vocab(vocab_path):
    with open(vocab_path, 'r') as f:
        data = json.loads(f.read())
    word2idx = data
    idx2word = dict([(v, k) for k, v in data.items()])
    return word2idx, idx2word


def encoding_and_padding(word2idx_dic,sequences,fasttext, **params):
    """
    1. making item to idx
    2. padding
    :word2idx_dic
    :sequences: list of lists where each element is a sequence
    :maxlen: int, maximum length
    :dtype: type to cast the resulting sequence.
    :padding: 'pre' or 'post', pad either before or after each sequence.
    :truncating: 'pre' or 'post', remove values from sequences larger than
        maxlen either in the beginning or in the end of the sequence
    :value: float, value to pad the sequences to the desired value.
    """
    try:
        #print('seq_idx start')
        model = fasttext
        seq_idx = []
        #iter_seq = tqdm(sequences, 'Batches')
        for sentence in sequences:
            data = []
            # for char in sentence:
            #     data.append(word2idx_dic.get(a, model.wv.most_similar(a)[0][1]))
            idx = 0
            # print(sentence)
            # print(type(word2idx_dic))
            while idx < len(sentence):
                if not character_is_korean(sentence[idx]):
                    data.append(word2idx_dic.get(sentence[idx], word2idx_dic.get(model.wv.most_similar(sentence[idx])[0][0])))
                    idx += 1
                else:
                    # print(sentence[idx : idx + 3])
                    # print(model.wv.most_similar(sentence[idx : idx + 3]))
                    data.append(word2idx_dic.get(sentence[idx : idx + 3], word2idx_dic.get(model.wv.most_similar(sentence[idx : idx + 3])[0][0])))
                    idx += 3
            seq_idx.append(data)
            # print(data)
            #iter_seq.set_description("Processing seq_idx", idx)

        # seq_idx = [
        #     [word2idx_dic.get(a, model.wv.most_similar(a)[0][0]) for a in i] #딕셔너리에 없는 단어 처리
        #            for i in sequences]
        #print('seq_idx end')
        #print(seq_idx)
        
        params['value'] = word2idx_dic['__PAD__']
    except Exception as e:
        print('에러발생', e)
        
    return (pad_sequences(seq_idx, **params))


def get_embedding_model(name='fee_prods', path='data/embedding'):
    weights = pkg_resources.resource_filename(
        'dsc', os.path.join(path, name, 'weights.np'))
    w2idx = pkg_resources.resource_filename(
        'dsc', os.path.join(path, name, 'idx.json'))
    return ((load_embedding(weights), load_vocab(w2idx)[0]))