Source code for nemo.collections.common.tokenizers.sentencepiece_tokenizer

# Copyright (c) 2020, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from typing import Dict, List, Optional, Union

import numpy as np
import sentencepiece

from nemo.collections.common.parts.utils import if_exist
from nemo.collections.common.tokenizers.tokenizer_spec import TokenizerSpec
from nemo.utils import logging

__all__ = ['SentencePieceTokenizer', 'create_spt_model']


[docs]class SentencePieceTokenizer(TokenizerSpec): """ Sentencepiecetokenizer https://github.com/google/sentencepiece. Args: model_path: path to sentence piece tokenizer model. To create the model use create_spt_model() special_tokens: either list of special tokens or dictionary of token name to token value legacy: when set to True, the previous behavior of the SentecePiece wrapper will be restored, including the possibility to add special tokens inside wrapper. """
[docs] def __init__( self, model_path: str, special_tokens: Optional[Union[Dict[str, str], List[str]]] = None, legacy: bool = False ): if not model_path or not os.path.exists(model_path): raise ValueError(f"model_path: {model_path} is invalid") self.tokenizer = sentencepiece.SentencePieceProcessor() self.tokenizer.Load(model_path) self.original_vocab_size = self.tokenizer.get_piece_size() self.vocab_size = self.tokenizer.get_piece_size() self.legacy = legacy self.special_token_to_id = {} self.id_to_special_token = {} if special_tokens: if not self.legacy: raise ValueError( "Special tokens must be None when legacy is set to False. Provide special tokens at train time." ) self.add_special_tokens(special_tokens)
[docs] def text_to_tokens(self, text): if self.legacy: tokens = [] idx = 0 last_idx = 0 while 1: indices = {} for token in self.special_token_to_id: try: indices[token] = text[idx:].index(token) except ValueError: continue if len(indices) == 0: break next_token = min(indices, key=indices.get) next_idx = idx + indices[next_token] tokens.extend(self.tokenizer.encode_as_pieces(text[idx:next_idx])) tokens.append(next_token) idx = next_idx + len(next_token) tokens.extend(self.tokenizer.encode_as_pieces(text[idx:])) return tokens return self.tokenizer.encode_as_pieces(text)
[docs] def text_to_ids(self, text): if self.legacy: ids = [] idx = 0 last_idx = 0 while 1: indices = {} for token in self.special_token_to_id: try: indices[token] = text[idx:].index(token) except ValueError: continue if len(indices) == 0: break next_token = min(indices, key=indices.get) next_idx = idx + indices[next_token] ids.extend(self.tokenizer.encode_as_ids(text[idx:next_idx])) ids.append(self.special_token_to_id[next_token]) idx = next_idx + len(next_token) ids.extend(self.tokenizer.encode_as_ids(text[idx:])) return ids return self.tokenizer.encode_as_ids(text)
[docs] def tokens_to_text(self, tokens): if isinstance(tokens, np.ndarray): tokens = tokens.tolist() return self.tokenizer.decode_pieces(tokens)
[docs] def ids_to_text(self, ids): if isinstance(ids, np.ndarray): ids = ids.tolist() if self.legacy: text = "" last_i = 0 for i, id in enumerate(ids): if id in self.id_to_special_token: text += self.tokenizer.decode_ids(ids[last_i:i]) + " " text += self.id_to_special_token[id] + " " last_i = i + 1 text += self.tokenizer.decode_ids(ids[last_i:]) return text.strip() return self.tokenizer.decode_ids(ids)
[docs] def token_to_id(self, token): if self.legacy and token in self.special_token_to_id: return self.special_token_to_id[token] return self.tokenizer.piece_to_id(token)
[docs] def ids_to_tokens(self, ids): tokens = [] for id in ids: if id >= self.original_vocab_size: tokens.append(self.id_to_special_token[id]) else: tokens.append(self.tokenizer.id_to_piece(id)) return tokens
[docs] def tokens_to_ids(self, tokens: Union[str, List[str]]) -> Union[int, List[int]]: if isinstance(tokens, str): tokens = [tokens] ids = [] for token in tokens: ids.append(self.token_to_id(token)) return ids
[docs] def add_special_tokens(self, special_tokens): if not self.legacy: raise AttributeError("Special Token addition does not work when legacy is set to False.") if isinstance(special_tokens, list): for token in special_tokens: if ( self.tokenizer.piece_to_id(token) == self.tokenizer.unk_id() and token not in self.special_token_to_id ): self.special_token_to_id[token] = self.vocab_size self.id_to_special_token[self.vocab_size] = token self.vocab_size += 1 elif isinstance(special_tokens, dict): for token_name, token in special_tokens.items(): setattr(self, token_name, token) if ( self.tokenizer.piece_to_id(token) == self.tokenizer.unk_id() and token not in self.special_token_to_id ): self.special_token_to_id[token] = self.vocab_size self.id_to_special_token[self.vocab_size] = token self.vocab_size += 1
@property def pad_id(self): if self.legacy: pad_id = self.tokens_to_ids([self.pad_token])[0] else: pad_id = self.tokenizer.pad_id() return pad_id @property def bos_id(self): if self.legacy: bos_id = self.tokens_to_ids([self.bos_token])[0] else: bos_id = self.tokenizer.bos_id() return bos_id @property def eos_id(self): if self.legacy: eos_id = self.tokens_to_ids([self.eos_token])[0] else: eos_id = self.tokenizer.eos_id() return eos_id @property def sep_id(self): if self.legacy: return self.tokens_to_ids([self.sep_token])[0] else: raise NameError("Use function token_to_id to retrieve special tokens other than unk, pad, bos, and eos.") @property def cls_id(self): if self.legacy: return self.tokens_to_ids([self.cls_token])[0] else: raise NameError("Use function token_to_id to retrieve special tokens other than unk, pad, bos, and eos.") @property def mask_id(self): if self.legacy: return self.tokens_to_ids([self.mask_token])[0] else: raise NameError("Use function token_to_id to retrieve special tokens other than unk, pad, bos, and eos.") @property def unk_id(self): return self.tokenizer.unk_id() @property def additional_special_tokens_ids(self): """Returns a list of the additional special tokens (excluding bos, eos, pad, unk). Used to return sentinel tokens for e.g. T5.""" special_tokens = set( [self.bos_token, self.eos_token, self.pad_token, self.mask_token, self.cls_token, self.sep_token] ) return [v for k, v in self.special_token_to_id.items() if k not in special_tokens] @property def vocab(self): main_vocab = [self.tokenizer.id_to_piece(id) for id in range(self.tokenizer.get_piece_size())] special_tokens = [ self.id_to_special_token[self.original_vocab_size + i] for i in range(self.vocab_size - self.original_vocab_size) ] return main_vocab + special_tokens
def create_spt_model( data_file: str, vocab_size: int, sample_size: int, do_lower_case: bool, tokenizer_type: str = 'unigram', output_dir: Optional[str] = None, character_coverage: float = 1.0, train_extremely_large_corpus: bool = False, max_sentencepiece_length: int = -1, bos: bool = False, eos: bool = False, pad: bool = False, control_symbols: List[str] = None, user_defined_symbols: List[str] = None, byte_fallback: bool = False, split_digits: bool = False, split_by_whitespace: bool = True, split_by_unicode_script: bool = True, ): """ Creates sentence piece tokenizer model from data file. Args: data_file: data file vocab_size: vocabulary size sample_size: maximum size of sentences the trainer loads do_lower_case: if text should be lower cased before tokenizer model is created character_coverage: float value between 0 and 1 (as a percentage). For languages with a vast charset, can be < 1.0, but for all other languages, it should be set as 1.0 output_dir: folder to save created tokenizer model. If not specified will store model at data_file/../spt folder train_extremely_large_corpus: If training on huge datasets, pass this flag to allow SentencePiece to build the tokenizer. max_sentencepiece_length: Limits the maximum length of the SentencePiece subword that can be constructed. By default, no limit is placed. bos: when True, bos token "<s>" is added to the vocabulary. eos: when True, eos token "</s>" is added to the vocabulary. pad: when True, pad token "<pad>" is added to the vocabulary. control_symbols: control symbols to add to tokenizer, as defined by sentencepiece. These tokens get removed at decode time and are not encoded from the text - can only be added to the input programatically. user_defined_symbols: user symbols to add to tokenizer, as defined by sentencepiece. These tokens remain in the decoded text and are encoded automatically when present in the input text. byte_fallback: If <unk>, fallback to a byte sequence of the character. split_digits: If true, digits are split into individual tokens. split_by_whitespace: Whether to respect white space while creating subwords. If False, will learn merges across whitespace. split_by_unicode_script: Whether to include multiple Unicode scripts. Ex. is Arabic diacritics which are considered part of the letter (عِدَّةُ) """ if not data_file or not os.path.exists(data_file): raise ValueError(f"data_file must be valid file path, but got {data_file}") data_dir = os.path.dirname(data_file) vocab = [] special_tokens = ["<s>", "</s>", "<pad>", "<unk>"] if not output_dir: output_dir = f'{data_dir}/spt' if if_exist(output_dir, ['tokenizer.model']): logging.info(f"tokenizer model {output_dir}/tokenizer.model already exists") return f'{output_dir}/tokenizer.model', f'{output_dir}/vocab.txt' logging.info(f'Processing {data_file} and store at {output_dir}') os.makedirs(output_dir, exist_ok=True) cmd = ( f"--input={data_file} --model_prefix={output_dir}/tokenizer " f"--vocab_size={vocab_size} " f"--shuffle_input_sentence=true --hard_vocab_limit=false " f"--model_type={tokenizer_type} " f"--character_coverage={character_coverage}" ) pad_id = 3 if not bos: pad_id -= 1 cmd += " --bos_id=-1" if not eos: pad_id -= 1 cmd += " --eos_id=-1" if pad: cmd += f" --pad_id={pad_id}" if control_symbols: control_string = (",").join(control_symbols) cmd += f" --control_symbols={control_string}" special_tokens += control_symbols if user_defined_symbols: user_string = (",").join(user_defined_symbols) cmd += f" --user_defined_symbols={user_string}" special_tokens += user_defined_symbols if do_lower_case: cmd += " --normalization_rule_name=nmt_nfkc_cf" if sample_size > 0: cmd += f" --input_sentence_size={sample_size}" if train_extremely_large_corpus: cmd += " --train_extremely_large_corpus=true" if max_sentencepiece_length >= 0: cmd += f" --max_sentencepiece_length={max_sentencepiece_length}" if byte_fallback: cmd += " --byte_fallback=true" if split_digits: cmd += " --split_digits=true" if not split_by_whitespace: cmd += " --split_by_whitespace=false" if not split_by_unicode_script: cmd += " --split_by_unicode_script=false" sentencepiece.SentencePieceTrainer.Train(cmd) # Add BERT control symbols tokens = [] with open(f"{output_dir}/tokenizer.vocab", "r") as f: # Read tokens from each line and parse for vocab for line in f: piece = line.split("\t")[0] if piece in special_tokens: # skip special tokens continue token = piece[1:] if piece.startswith("▁") else f"##{piece}" if len(token) > 0: tokens.append(token) else: tokens.append(piece[0]) vocab.extend(tokens) # Save vocabulary to output file vocab_file = f'{output_dir}/vocab.txt' with open(vocab_file, "w") as f: for token in vocab: f.write(f"{token}\n") return f'{output_dir}/tokenizer.model', vocab_file