import json import logging import os import sys import numpy as np import dill as dpickle import pandas as pd import tensorflow as tf # TODO(https://github.com/kubeflow/examples/issues/280) # TODO(https://github.com/kubeflow/examples/issues/196) # We'd like to switch to importing keras from TensorFlow in order to support # TF.Estimator but using tensorflow.keras we can't train a model either using # Keras' fit function or using TF.Estimator. import keras from keras.callbacks import CSVLogger, ModelCheckpoint from ktext.preprocess import processor from sklearn.model_selection import train_test_split from seq2seq_utils import load_decoder_inputs, load_encoder_inputs, load_text_processor, Seq2Seq_Inference # # pylint: disable=line-too-long class Trainer(object): #pylint: disable=too-many-instance-attributes def __init__(self, output_dir): """Construct the trainer. Args: output_dir: Directory where outputs should be written. """ if not output_dir: raise ValueError("output dir can't be None.") self.output_dir = output_dir # Pull out the information needed for TF.Estimator. self.tf_config = os.environ.get('TF_CONFIG', '{}') self.tf_config_json = json.loads(self.tf_config) self.cluster = self.tf_config_json.get('cluster') self.job_name = self.tf_config_json.get('task', {}).get('type') self.task_index = self.tf_config_json.get('task', {}).get('index') # Files storing the preprocessors self.body_pp_file = os.path.join(self.output_dir, 'body_pp.dpkl') self.title_pp_file = os.path.join(self.output_dir, 'title_pp.dpkl') # Files to store the processed data self.preprocessed_titles = os.path.join(self.output_dir, 'train_title_vecs.npy') self.preprocessed_bodies = os.path.join(self.output_dir, 'train_body_vecs.npy') self.history = None self.decoder_input_data = None self.seq2seq_Model = None self.decoder_target_data = None self.test_df = None self.encoder_input_data = None self.title_pp = None self.body_pp = None def preprocess(self, data_file, num_samples=None): """Preprocess the input. Trains preprocessors and splits the data into train and test sets. Args: data_file: The datafile to process num_samples: Number of samples to use. Set to None to use entire dataset. """ # We preprocess the data if we are the master or chief. # Or if we aren't running distributed. if self.job_name and self.job_name.lower() not in ["master", "chief"]: return # TODO(jlewi): The test data isn't being used for anything. How can # we configure evaluation? if num_samples: sampled = pd.read_csv(data_file).sample(n=num_samples) traindf, self.test_df = train_test_split(sampled, test_size=.10) else: traindf, self.test_df = train_test_split(pd.read_csv(data_file), test_size=.10) # Print stats about the shape of the data. logging.info('Train: %d rows %d columns', traindf.shape[0], traindf.shape[1]) train_body_raw = traindf.body.tolist() train_title_raw = traindf.issue_title.tolist() # Clean, tokenize, and apply padding / truncating such that each document # length = 70. Also, retain only the top 8,000 words in the vocabulary and set # the remaining words to 1 which will become common index for rare words. self.body_pp = processor(keep_n=8000, padding_maxlen=70) train_body_vecs = self.body_pp.fit_transform(train_body_raw) logging.info('Example original body: %s', train_body_raw[0]) logging.info('Example body after pre-processing: %s', train_body_vecs[0]) self.title_pp = processor(append_indicators=True, keep_n=4500, padding_maxlen=12, padding='post') # process the title data train_title_vecs = self.title_pp.fit_transform(train_title_raw) logging.info('Example original title: %s', train_title_raw[0]) logging.info('Example title after pre-processing: %s', train_title_vecs[0]) # Save the preprocessor with open(self.body_pp_file, 'wb') as f: dpickle.dump(self.body_pp, f) with open(self.title_pp_file, 'wb') as f: dpickle.dump(self.title_pp, f) # Save the processed data np.save(self.preprocessed_titles, train_title_vecs) np.save(self.preprocessed_bodies, train_body_vecs) def build_model(self, learning_rate): """Build a keras model.""" logging.info("starting") if self.job_name and self.job_name.lower() in ["ps"]: logging.info("ps doesn't build model") return self.encoder_input_data, doc_length = load_encoder_inputs( self.preprocessed_bodies) self.decoder_input_data, self.decoder_target_data = load_decoder_inputs( self.preprocessed_titles) num_encoder_tokens, self.body_pp = load_text_processor( self.body_pp_file) num_decoder_tokens, self.title_pp = load_text_processor( self.title_pp_file) #arbitrarly set latent dimension for embedding and hidden units latent_dim = 300 ##### Define Model Architecture ###### ######################## #### Encoder Model #### encoder_inputs = keras.layers.Input(shape=(doc_length,), name='Encoder-Input') # Word embeding for encoder (ex: Issue Body) x = keras.layers.Embedding( num_encoder_tokens, latent_dim, name='Body-Word-Embedding', mask_zero=False)(encoder_inputs) x = keras.layers.BatchNormalization(name='Encoder-Batchnorm-1')(x) # We do not need the `encoder_output` just the hidden state. _, state_h = keras.layers.GRU(latent_dim, return_state=True, name='Encoder-Last-GRU')(x) # Encapsulate the encoder as a separate entity so we can just # encode without decoding if we want to. encoder_model = keras.Model(inputs=encoder_inputs, outputs=state_h, name='Encoder-Model') seq2seq_encoder_out = encoder_model(encoder_inputs) ######################## #### Decoder Model #### decoder_inputs = keras.layers.Input(shape=(None,), name='Decoder-Input') # for teacher forcing # Word Embedding For Decoder (ex: Issue Titles) dec_emb = keras.layers.Embedding( num_decoder_tokens, latent_dim, name='Decoder-Word-Embedding', mask_zero=False)(decoder_inputs) dec_bn = keras.layers.BatchNormalization(name='Decoder-Batchnorm-1')(dec_emb) # TODO(https://github.com/kubeflow/examples/issues/196): # With TF.Estimtor we hit https://github.com/keras-team/keras/issues/9761 # and the model won't train. decoder_gru = keras.layers.GRU( latent_dim, return_state=True, return_sequences=True, name='Decoder-GRU') decoder_gru_output, _ = decoder_gru(dec_bn, initial_state=[seq2seq_encoder_out]) x = keras.layers.BatchNormalization(name='Decoder-Batchnorm-2')(decoder_gru_output) # Dense layer for prediction decoder_dense = keras.layers.Dense( num_decoder_tokens, activation='softmax', name='Final-Output-Dense') decoder_outputs = decoder_dense(x) ######################## #### Seq2Seq Model #### self.seq2seq_Model = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs) self.seq2seq_Model.compile( optimizer=keras.optimizers.Nadam(lr=learning_rate), loss='sparse_categorical_crossentropy',) # TODO(jlewi): Computing accuracy causes a dimension mismatch. # tensorflow.python.framework.errors_impl.InvalidArgumentError: Incompatible shapes: [869] vs. [79,11] # pylint: disable=line-too-long # [[{{node metrics/acc/Equal}} = Equal[T=DT_FLOAT, _device="/job:localhost/replica:0/task:0/device:CPU:0"](metrics/acc/Reshape, metrics/acc/Cast)]] # pylint: disable=line-too-long # metrics=['accuracy']) self.seq2seq_Model.summary() def train_keras(self, output_model_h5, base_name='tutorial_seq2seq', batch_size=1200, epochs=7): """Train using Keras. This is an alternative to using the TF.Estimator API. TODO(jlewi): The reason we added support for using Keras was to debug whether we were hitting issue: https://github.com/keras-team/keras/issues/9761 only with TF.Estimator. """ logging.info("Using base name: %s", base_name) csv_logger = CSVLogger('{:}.log'.format(base_name)) model_checkpoint = ModelCheckpoint( '{:}.epoch{{epoch:02d}}-val{{val_loss:.5f}}.hdf5'.format( base_name), save_best_only=True) self.history = self.seq2seq_Model.fit( [self.encoder_input_data, self.decoder_input_data], np.expand_dims(self.decoder_target_data, -1), batch_size=batch_size, epochs=epochs, validation_split=0.12, callbacks=[csv_logger, model_checkpoint]) ############# # Save model. ############# self.seq2seq_Model.save(output_model_h5) def evaluate_keras(self): """Generates predictions on holdout set and calculates BLEU Score.""" seq2seq_inf = Seq2Seq_Inference(encoder_preprocessor=self.body_pp, decoder_preprocessor=self.title_pp, seq2seq_model=self.seq2seq_Model) bleu_score = seq2seq_inf.evaluate_model(holdout_bodies=self.test_df.body.tolist(), holdout_titles=self.test_df.issue_title.tolist(), max_len_title=12) logging.info("Bleu score: %s", bleu_score) return bleu_score def train_estimator(self): """Train the model using the TF.Estimator API.""" if self.job_name: cluster_spec = tf.train.ClusterSpec(self.cluster) if self.job_name == "ps": server = tf.train.Server(cluster_spec, job_name=self.job_name, task_index=self.task_index) server.join() sys.exit(0) cfg = tf.estimator.RunConfig(session_config=tf.ConfigProto(log_device_placement=False)) estimator = keras.estimator.model_to_estimator( keras_model=self.seq2seq_Model, model_dir=self.output_dir, config=cfg) expanded = np.expand_dims(self.decoder_target_data, -1) input_fn = tf.estimator.inputs.numpy_input_fn( x={'Encoder-Input': self.encoder_input_data, 'Decoder-Input': self.decoder_input_data}, y=expanded, shuffle=False) train_spec = tf.estimator.TrainSpec(input_fn=input_fn, max_steps=self.args.max_steps) eval_spec = tf.estimator.EvalSpec(input_fn=input_fn, throttle_secs=10, steps=self.args.eval_steps) tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)