__all__ = ['EmbFile']
import abc
import itertools
import warnings
from pathlib import Path
from typing import (Callable, Dict, Iterable, Iterator, List, NamedTuple, Optional, Set, Tuple,
TypeVar, Union)
import numpy
from embfile._utils import coalesce, maybe_progbar, noop
from embfile.compression import COMPRESSION_TO_EXTENSIONS, remove_compression_extension
from embfile.core.loaders import SequentialLoader, VectorsLoader
from embfile.core.reader import EmbFileReader
from embfile.errors import IllegalOperation
from embfile.types import DType, PairsType, PathType, VectorType
from embfile.word_vector import WordVector
#: Default verbosity mode
DEFAULT_VERBOSE: bool = True
[docs]class EmbFile(abc.ABC):
"""
*(Abstract class)* The base class of all the embedding files.
Sub-classes must:
#. ensure they set attributes :attr:`.vocab_size` and :attr:`.vector_size` when a file
instance is created
#. implement a :class:`~embfile.core.EmbFileReader` for the format and implements
the abstract method :meth:`_reader`
#. implement the abstract method :meth:`_close`
#. *(optionally)* implement a :class:`~embfile.core.loaders.VectorsLoader` (if they can improve
upon the default loader) and override :meth:`loader`
#. *(optionally)* implement a :class:`~embfile.core.EmbFileCreator` for the format and set
the class constant :attr:`.Creator`
Args:
path (Path):
path of the embedding file (eventually compressed)
out_dtype (numpy.dtype):
all the vectors will be converted to this data type. The sub-class
is responsible to set a suitable default value.
verbose (bool):
whether to show a progress bar by default in all time-consuming operations
Attributes:
path (Path):
path of the embedding file
vocab_size (int or ``None``):
number of words in the file (can be ``None`` for some ``TextEmbFile``)
vector_size (int):
length of the vectors
verbose (bool):
whether to show a progress bar by default in all time-consuming operations
closed (bool):
True if the file was closed
.. automethod:: _reader
.. automethod:: _close
"""
DEFAULT_EXTENSION: str
def __init__(self, path: PathType,
out_dtype: Optional[DType] = None,
verbose: bool = DEFAULT_VERBOSE):
path = Path(path)
self.path = path
self.out_dtype = numpy.dtype(out_dtype)
self.verbose = verbose
self.vocab_size: Optional[int] = None
self.vector_size = -1
self._objects_to_close: List[Union[EmbFileReader, VectorsLoader]] = list()
self.closed = False
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def __repr__(self):
classname = self.__class__.__name__
return '{} (\n' \
' path = {},\n' \
' vocab_size = {},\n' \
' vector_size = {}\n' \
')'.format(classname, self.path, self.vocab_size, self.vector_size)
[docs] @abc.abstractmethod
def _close(self) -> None:
""" *(Abstract method)* Releases eventual resources used by the EmbFile. """
[docs] def close(self) -> None:
""" Releases all the open resources linked to this file, including the opened readers. """
for obj in self._objects_to_close:
obj.close()
self._objects_to_close.clear()
self._close()
self.closed = True
[docs] @abc.abstractmethod
def _reader(self) -> EmbFileReader:
""" *(Abstract method)* Returns a new reader for the file which allows to iterate
efficiently the word-vectors inside it. Called by :meth:`reader`. """
[docs] def reader(self) -> EmbFileReader:
""" Creates and returns a new file reader. When the file is closed, all the still opened
readers are closed automatically. """
if self.closed:
raise IllegalOperation('attempted to use a closed file')
reader = self._reader()
self._objects_to_close.append(reader)
return reader
def _loader(self, words: Iterable[str], missing_ok: bool = True,
verbose: Optional[bool] = None) -> 'VectorsLoader':
return SequentialLoader(self, words, missing_ok)
[docs] def loader(self, words: Iterable[str], missing_ok: bool = True,
verbose: Optional[bool] = None) -> 'VectorsLoader':
"""
Returns a :class:`~embfile.core.loaders.VectorsLoader`, an iterator that looks for the
provided words in the file and yields available (word, vector) pairs one by one.
If ``missing_ok=True`` (default), provides the set of missing words in the
property ``missing_words`` (once the iteration ends).
See :class:`embfile.core.VectorsLoader` for more info.
Example:
You should use a loader when you need to load many vectors in some custom data structure
and you don't want to waste memory (e.g. build_matrix uses it to load the vectors
directly into the matrix)::
data_structure = MyCustomStructure()
with file.loader(many_words) as loader:
for word, vector in loader:
data_structure[word] = vector
print('Number of missing words:', len(loader.missing_words)
See Also:
:meth:`load`
:meth:`find`
"""
if self.closed:
raise IllegalOperation('attempted to use a closed file')
loader = self._loader(words, missing_ok, verbose=verbose)
self._objects_to_close.append(loader)
return loader
[docs] def words(self) -> Iterable[str]:
""" Returns an iterable for all the words in the file. """
with self.reader() as reader:
yield from reader
[docs] def vectors(self) -> Iterable[VectorType]:
""" Returns an iterable for all the vectors in the file. """
with self.reader() as reader:
for _ in reader:
yield reader.current_vector
[docs] def word_vectors(self) -> Iterable[WordVector]:
""" Returns an iterable for all the (word, vector) pairs in the file. """
with self.reader() as reader:
for word in reader:
yield WordVector(word, reader.current_vector)
def _maybe_progbar(self, iterable: Iterable, enable: Optional[bool] = None, **kwargs):
return maybe_progbar(iterable, yes=coalesce(enable, self.verbose), **kwargs)
[docs] def to_dict(self, verbose: Optional[bool] = None) -> Dict[str, VectorType]:
""" Returns the entire file content in a dictionary word -> vector. """
word_vectors = self._maybe_progbar(self.word_vectors(), verbose,
total=self.vocab_size, desc='Loading to dict')
return dict(word_vectors)
[docs] def to_list(self, verbose: Optional[bool] = None) -> List[WordVector]:
""" Returns the entire file content in a list of :class:`WordVector`'s. """
word_vectors = self._maybe_progbar(self.word_vectors(), verbose,
total=self.vocab_size, desc='Loading to list')
return list(word_vectors)
[docs] def load(self, words: Iterable[str], verbose: Optional[bool] = None) -> Dict[str, VectorType]:
"""
Loads the vectors for the input words in a ``{word: vec}`` dict, raising
``KeyError`` if any word is missing.
Args:
words:
the words to get
verbose:
if None, self.verbose is used
Returns:
(Dict[str, VectorType]): a dictionary ``{word: vector}``
See Also:
:meth:`find` - it returns the set of all missing words, instead of raising
``KeyError``.
"""
with self.loader(words, missing_ok=False, verbose=verbose) as loader:
return {word: vector for word, vector in loader}
class _FindOutput(NamedTuple):
word2vec: Dict[str, VectorType]
missing_words: Set[str]
[docs] def find(self, words: Iterable[str],
verbose: Optional[bool] = None) -> _FindOutput: # noqa: F821
"""
Looks for the input words in the file, return: 1) a dict ``{word: vec}``
containing the available words and 2) a set containing the words not found.
Args:
words (Iterable[str]):
the words to look for
verbose:
if None, self.verbose is used
Returns:
namedtuple:
a namedtuple with the following fields:
- **word2vec** (*Dict[str, VectorType]*): dictionary ``{word: vector}``
- **missing_words** (*Set[str]*): set of words not found in the file
See also:
:meth:`load` - which raises KeyError if any word is not found in the file.
"""
with self.loader(words, verbose=verbose) as loader:
word2vec = {word: vec for word, vec in loader}
return EmbFile._FindOutput(word2vec=word2vec, # noqa
missing_words=loader.missing_words)
[docs] def filter(self, condition: Callable[[str], bool],
verbose: Optional[bool] = None) -> Iterator[Tuple[str, VectorType]]:
"""
Returns a generator that yields a word vector pair for each word in the file that satisfies
a given condition. For example, to get all the words starting with "z"::
list(file.filter(lambda word: word.startswith('z')))
Args:
condition:
a function that, given a word in input, outputs True if the word should be taken
verbose:
if True, a progress bar is showed (the bar is updated each time a word is read, not
each time a word vector pair is yielded).
"""
with self.reader() as reader:
for word in self._maybe_progbar(reader, verbose, total=self.vocab_size,
desc="Filtering word vectors"):
if condition(word):
yield word, reader.current_vector
[docs] def save_vocab(self, path: PathType = None,
encoding: str = 'utf-8',
overwrite: bool = False,
verbose: Optional[bool] = None) -> Path:
"""
Save the vocabulary of the embedding file on a text file. By default the file is saved
in the same directory of the embedding file, e.g.::
/path/to/filename.txt.gz ==> /path/to/filename_vocab.txt
Args:
path:
where to save the file
encoding:
text encoding
overwrite:
if the file exists and it is True, overwrite the file
verbose:
if None, self.verbose is used
Returns:
(Path): the path to the vocabulary file
"""
if path is None:
basename = self.path.name.split('.')[0]
filename = basename + '_vocab.txt'
path = self.path.parent / filename
if path.exists() and not overwrite:
raise FileExistsError(path)
with open(path, 'wt', encoding=encoding) as vocab_file:
for word in self._maybe_progbar(self.words(), verbose, total=self.vocab_size,
desc='Saving vocabulary'):
vocab_file.write(word + '\n')
return path
@classmethod
@abc.abstractmethod
def _create(cls, out_path: Path,
word_vectors: Iterable[Tuple[str, VectorType]],
vector_size: int,
vocab_size: Optional[int],
compression: Optional[str] = None,
verbose: bool = True, **format_kwargs) -> None:
"""
The core method that actually writes word vectors to disk and it's format-specific.
This method is called by the public ``create`` method after it performs boring
args checking and normalization.
Note that ``vocab_size`` can be ``None``: it is up to the specific implementation
to treat it as an error or not.
For implementors:
#. replace the generic ``**format_kwargs`` with format-specific arguments
#. you can safely assume ``out_path.exists() is False``
#. you should warn the user if the provided ``vocab_size`` is not equal to the
actual number of written word vectors (use :func:`warn_if_wrong_vocab_size`)
#. should raise ValueError if a vector has a different size than expected
(use :func:`check_vector_size`)
"""
[docs] @classmethod
def create(cls, out_path: PathType,
word_vectors: PairsType,
vocab_size: Optional[int] = None,
compression: Optional[str] = None,
verbose: bool = True, overwrite: bool = False,
**format_kwargs) -> None:
"""
Creates a file on disk containing the provided word vectors.
Args:
out_path:
path to the created file
word_vectors (Dict[str, VectorType] or Iterable[Tuple[str, VectorType]]):
it can be an iterable of word vector tuples or a dictionary ``word -> vector``;
the word vectors are written in the order determined by the iterable object.
vocab_size:
it must be provided if ``word_vectors`` has no ``__len__`` and the specific-format
creator needs to know a priori the vocabulary size; in any case, the creator
should check at the end that the provided ``vocab_size`` matches the actual length
of ``word_vectors``
compression:
valid values are: ``"bz2"|"bz", "gzip"|"gz", "xz"|"lzma", "zip"``
verbose:
if positive, show progress bars and information
overwrite:
overwrite the file if it already exists
format_kwargs:
format-specific arguments
"""
echo = print if verbose else noop
out_path = Path(out_path)
if out_path.exists():
if overwrite:
out_path.unlink()
echo('the file %s already exists and overwriting is enabled, '
'so it was removed' % out_path)
else:
raise FileExistsError('the file %s already exists' % out_path)
# if ``word_vectors`` has length, we use that as ``vocab_size``
try:
actual_vocab_size = len(word_vectors)
except TypeError:
pass
else:
if vocab_size and vocab_size != actual_vocab_size:
warnings.warn('you provided vocab_size=%d but the actual vocab_size is %d; we will '
'use the actual vocab_size' % (vocab_size, actual_vocab_size))
vocab_size = actual_vocab_size
pairs: Iterable[Tuple[str, VectorType]] # for mypy
if isinstance(word_vectors, dict):
pairs = word_vectors.items()
elif isinstance(word_vectors, Iterable):
pairs = word_vectors
else:
raise TypeError('word_vectors is neither a dict nor an iterable: %r' % word_vectors)
# To get [vector_size] we have to "glance" at the first vector in [pairs];
# in case pairs is an iterator, [glance_first_element] returns a new iterable
# itertools.
(_, first_vector), pairs = glance_first_element(pairs)
vector_size = len(first_vector)
cls._create(out_path, pairs,
vector_size=vector_size, vocab_size=vocab_size,
compression=compression, verbose=verbose, **format_kwargs)
echo('Creation completed: %s' % out_path)
[docs] @classmethod
def create_from_file(cls, source_file: 'EmbFile',
out_dir: Optional[PathType] = None,
out_filename: Optional[str] = None,
vocab_size: Optional[int] = None,
compression: Optional[str] = None,
verbose: bool = True,
overwrite: bool = False,
**format_kwargs) -> Path:
"""
Creates a new file on disk with the same content of another file.
Args:
source_file:
the file to take data from
out_dir:
directory where the file will be stored; by default, it's the parent directory
of the source file
out_filename:
filename of the produced name (inside ``out_dir``); by default, it is obtained by
replacing the extension of the source file with the proper one and appending the
compression extension if ``compression is not None``.
**Note:** if you pass this argument, the compression extension is not automatically
appended.
vocab_size:
if the source EmbFile has attribute ``vocab_size == None``, then: if the specific
creator requires it (bin and txt formats do), it `must` be provided; otherwise it
`can` be provided for having ETA in progress bars.
compression:
valid values are: ``"bz2"|"bz", "gzip"|"gz", "xz"|"lzma", "zip"``
verbose:
print info and progress bar
overwrite:
overwrite a file with the same name if it already exists
format_kwargs:
format-specific arguments (see above)
"""
vocab_size = source_file.vocab_size or vocab_size
out_dir = Path(out_dir or source_file.path.parent)
if out_filename is None:
suffix = cls.DEFAULT_EXTENSION
if compression:
suffix += COMPRESSION_TO_EXTENSIONS[compression][0]
source_path = remove_compression_extension(source_file.path)
out_filename = source_path.with_suffix(suffix).name
out_path = out_dir / out_filename
cls.create(out_path, word_vectors=source_file.word_vectors(),
vocab_size=vocab_size, compression=compression,
verbose=verbose, overwrite=overwrite, **format_kwargs)
return out_path
# ==============================
# Utility functions
# ==============================
T = TypeVar('T')
def glance_first_element(iterable: Iterable[T]) -> Tuple[T, Iterable[T]]:
iterator = iter(iterable)
first = next(iterator)
if iterable is iterator: # iterable is an iterator
# methaphorically put the first element "back in"
return first, itertools.chain([first], iterable)
else:
return first, iterable
def warn_if_wrong_vocab_size(expected_size, actual_size, extra_info=''):
if actual_size is not None and actual_size != expected_size:
fmt = ('the actual number of word vectors in the iterator/file was different than the '
'provided/expected one; expected: %d; actual: %d.\n' + extra_info)
warnings.warn(fmt % (expected_size, actual_size))
def check_vector_size(i, vector, vector_size):
if len(vector) != vector_size:
raise ValueError('inconsistent vector_size: the first vector has size %d but the vector of '
'index %d has size %d' % (vector_size, i, len(vector)))