{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import kfp\n", "import kfp.dsl as dsl\n", "import kfp.components as components\n", "from kfp.components import func_to_container_op, InputPath, OutputPath\n", "from typing import NamedTuple" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "def cornell_sample_dowload_and_preprocess(log_folder:str) -> NamedTuple('Outputs', [('logdir',str)]):\n", " import re\n", " import string\n", " import pandas as pd\n", " from random import shuffle\n", " import nltk\n", " import joblib\n", " from nltk.corpus import movie_reviews\n", " from nltk.corpus import stopwords\n", " from nltk.stem import PorterStemmer\n", " from nltk.tokenize import TweetTokenizer\n", " from sklearn.model_selection import train_test_split\n", " from tqdm import tqdm\n", " from nltk import data\n", " \n", " data.path.append(log_folder)\n", " nltk.download('movie_reviews', download_dir = log_folder)\n", " nltk.download('stopwords', download_dir = log_folder)\n", " \n", " pos_tweets = []\n", " neg_tweets = []\n", " \n", " for fileid in movie_reviews.fileids('pos'):\n", " content = ''\n", " for word in movie_reviews.words(fileid):\n", " content += ' ' + word\n", " pos_tweets.append(content)\n", " for fileid in movie_reviews.fileids('neg'):\n", " content = ''\n", " for word in movie_reviews.words(fileid):\n", " content += ' ' + word\n", " neg_tweets.append(content)\n", "\n", " print(f\"positive sentiment GOOD total samples {len(pos_tweets)}\")\n", " print(f\"negative sentiment Bad total samples {len(neg_tweets)}\")\n", " \n", " class Twitter_Preprocess():\n", " \n", " def __init__(self):\n", " self.tokenizer = TweetTokenizer(preserve_case=False, strip_handles=True,\n", " reduce_len=True)\n", " self.stopwords_en = stopwords.words('english') \n", " self.punctuation_en = string.punctuation\n", " self.stemmer = PorterStemmer() \n", "\n", " def __remove_unwanted_characters__(self, tweet):\n", " tweet = re.sub(r'^RT[\\s]+', '', tweet)\n", " tweet = re.sub(r'https?:\\/\\/.*[\\r\\n]*', '', tweet)\n", " tweet = re.sub(r'#', '', tweet)\n", " tweet = re.sub('\\S+@\\S+', '', tweet)\n", " tweet = re.sub(r'\\d+', '', tweet)\n", " return tweet\n", "\n", " def __tokenize_tweet__(self, tweet): \n", " return self.tokenizer.tokenize(tweet)\n", "\n", " def __remove_stopwords__(self, tweet_tokens):\n", " tweets_clean = []\n", "\n", " for word in tweet_tokens:\n", " if (word not in self.stopwords_en and \n", " word not in self.punctuation_en):\n", " tweets_clean.append(word)\n", " return tweets_clean\n", "\n", " def __text_stemming__(self,tweet_tokens):\n", " tweets_stem = [] \n", "\n", " for word in tweet_tokens:\n", " stem_word = self.stemmer.stem(word) \n", " tweets_stem.append(stem_word)\n", " return tweets_stem\n", "\n", " def preprocess(self, tweets):\n", " tweets_processed = []\n", " for _, tweet in tqdm(enumerate(tweets)): \n", " tweet = self.__remove_unwanted_characters__(tweet) \n", " tweet_tokens = self.__tokenize_tweet__(tweet) \n", " tweet_clean = self.__remove_stopwords__(tweet_tokens)\n", " tweet_stems = self.__text_stemming__(tweet_clean)\n", " tweets_processed.extend([tweet_stems])\n", " return tweets_processed\n", " \n", " twitter_text_processor = Twitter_Preprocess()\n", " processed_pos_tweets = twitter_text_processor.preprocess(pos_tweets)\n", " processed_neg_tweets = twitter_text_processor.preprocess(neg_tweets)\n", " \n", " def build_bow_dict(tweets, labels):\n", " freq = {}\n", " for tweet, label in list(zip(tweets, labels)):\n", " for word in tweet:\n", " freq[(word, label)] = freq.get((word, label), 0) + 1 \n", " return freq\n", "\n", " labels = [1 for i in range(len(processed_pos_tweets))]\n", " labels.extend([0 for i in range(len(processed_neg_tweets))])\n", " \n", " twitter_processed_corpus = processed_pos_tweets + processed_neg_tweets\n", " bow_word_frequency = build_bow_dict(twitter_processed_corpus, labels)\n", " \n", " shuffle(processed_pos_tweets)\n", " shuffle(processed_neg_tweets)\n", "\n", " positive_tweet_label = [1 for i in processed_pos_tweets]\n", " negative_tweet_label = [0 for i in processed_neg_tweets]\n", "\n", " tweet_df = pd.DataFrame(list(zip(twitter_processed_corpus,\n", " positive_tweet_label+negative_tweet_label)),\n", " columns=[\"processed_tweet\", \"label\"])\n", " \n", " train_X_tweet, test_X_tweet, train_Y, test_Y = train_test_split(tweet_df[\"processed_tweet\"],\n", " tweet_df[\"label\"],\n", " test_size = 0.20,\n", " stratify=tweet_df[\"label\"])\n", " \n", " print(f\"train_X_tweet {train_X_tweet.shape}, test_X_tweet {test_X_tweet.shape}\")\n", " print(f\"train_Y {train_Y.shape}, test_Y {test_Y.shape}\")\n", " \n", " joblib.dump(bow_word_frequency, log_folder + '/bow_word_frequency.pkl')\n", " joblib.dump(train_X_tweet, log_folder + '/train_X_tweet.pkl')\n", " joblib.dump(test_X_tweet, log_folder + '/test_X_tweet.pkl')\n", " joblib.dump(train_Y, log_folder + '/train_Y.pkl')\n", " joblib.dump(test_Y, log_folder + '/test_Y.pkl')\n", " \n", " return ([log_folder])" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "def numpy_process(log_folder:str) -> NamedTuple('Outputs', [('logdir',str), ('numpydir',str)]):\n", " \n", " import numpy as np\n", " import joblib\n", " import os\n", " \n", " bow_word_frequency = joblib.load(open(log_folder + '/bow_word_frequency.pkl','rb'))\n", " train_X_tweet = joblib.load(open(log_folder + '/train_X_tweet.pkl','rb'))\n", " test_X_tweet = joblib.load(open(log_folder + '/test_X_tweet.pkl','rb'))\n", " train_Y = joblib.load(open(log_folder + '/train_Y.pkl','rb'))\n", " test_Y = joblib.load(open(log_folder + '/test_Y.pkl','rb'))\n", " \n", " def extract_features(processed_tweet, bow_word_frequency):\n", " features = np.zeros((1,3))\n", " features[0,0] = 1\n", "\n", " for word in processed_tweet:\n", " features[0,1] = bow_word_frequency.get((word, 1), 0)+features[0,1]\n", " features[0,2] = bow_word_frequency.get((word, 0), 0)+features[0,2]\n", " return features\n", " \n", " train_X = np.zeros((len(train_X_tweet), 3))\n", " for index, tweet in enumerate(train_X_tweet):\n", " train_X[index, :] = extract_features(tweet, bow_word_frequency)\n", "\n", " test_X = np.zeros((len(test_X_tweet), 3))\n", " for index, tweet in enumerate(test_X_tweet):\n", " test_X[index, :] = extract_features(tweet, bow_word_frequency)\n", "\n", " print(f\"train_X {train_X.shape}, test_X {test_X.shape}\")\n", " \n", " if not os.path.isdir(log_folder + '/numpy'):\n", " os.makedirs(log_folder + '/numpy') \n", " numpy_folder = log_folder + '/numpy'\n", " joblib.dump(train_X, numpy_folder + '/train_X.pkl')\n", " joblib.dump(test_X, numpy_folder + '/test_X.pkl')\n", " \n", " return ([log_folder, numpy_folder])" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "def sklearn_logistic(log_folder:str, numpy_folder:str)->NamedTuple('Outputs',[('logdir',str), ('sklearndir',str), ('sklearnscore',float)]):\n", " \n", " from sklearn.linear_model import SGDClassifier\n", " from sklearn.metrics import accuracy_score\n", " import numpy as np\n", " import joblib\n", " import os\n", " \n", " train_X = joblib.load(open(numpy_folder + '/train_X.pkl','rb'))\n", " test_X = joblib.load(open(numpy_folder + '/test_X.pkl','rb'))\n", " train_Y = joblib.load(open(log_folder + '/train_Y.pkl','rb'))\n", " test_Y = joblib.load(open(log_folder + '/test_Y.pkl','rb'))\n", " \n", " clf = SGDClassifier(loss='log')\n", " clf.fit(train_X, np.array(train_Y).reshape(-1,1))\n", " y_pred = clf.predict(test_X)\n", " y_pred_probs = clf.predict(test_X)\n", " \n", " print(f\"Scikit learn logistic regression accuracy is {accuracy_score(test_Y , y_pred)*100:.2f}\")\n", " \n", " if not os.path.isdir(numpy_folder + '/sklearn'):\n", " os.makedirs(numpy_folder + '/sklearn')\n", " \n", " sklearn_folder = numpy_folder + '/sklearn'\n", " \n", " joblib.dump(clf, sklearn_folder + '/sklearn.pkl')\n", " sklearn_score = accuracy_score(test_Y , y_pred)\n", " \n", " return ([log_folder, sklearn_folder, sklearn_score])" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "def logistic(log_folder:str, numpy_folder:str) -> NamedTuple('Outputs', [('logdir',str), ('logisticdir',str), ('logisticscore',float)]):\n", " \n", " import numpy as np\n", " import joblib\n", " import os\n", " \n", " train_X = joblib.load(open(numpy_folder + '/train_X.pkl','rb'))\n", " test_X = joblib.load(open(numpy_folder + '/test_X.pkl','rb'))\n", " train_Y = joblib.load(open(log_folder + '/train_Y.pkl','rb'))\n", " test_Y = joblib.load(open(log_folder + '/test_Y.pkl','rb'))\n", " \n", " def sigmoid(z): \n", " h = 1 / (1+ np.exp(-z))\n", " return h\n", " \n", " def gradientDescent(x, y, theta, alpha, num_iters, c):\n", " m = x.shape[0]\n", " for i in range(0, num_iters):\n", " z = np.dot(x, theta)\n", " h = sigmoid(z)\n", " J = (-1/m) * ((np.dot(y.T, np.log(h)) + np.dot((1 - y).T, np.log(1-h))) + (c * np.sum(theta)))\n", " theta = theta - (alpha / m) * np.dot((x.T), (h - y))\n", " J = float(J)\n", " return J, theta\n", " \n", " np.random.seed(1)\n", " J, theta = gradientDescent(train_X, np.array(train_Y).reshape(-1,1), np.zeros((3, 1)), 1e-7, 1000, 0.1)\n", " print(f\"The cost after training is {J:.8f}.\")\n", " print(f\"The resulting vector of weights is {[round(t, 8) for t in np.squeeze(theta)]}\")\n", " \n", " def predict_tweet(x, theta):\n", " y_pred = sigmoid(np.dot(x, theta))\n", " return y_pred\n", " \n", " predicted_probs = predict_tweet(test_X, theta)\n", " predicted_labels = np.where(predicted_probs > 0.5, 1, 0)\n", " print(f\"Own implementation of logistic regression accuracy is {len(predicted_labels[predicted_labels == np.array(test_Y).reshape(-1,1)]) / len(test_Y)*100:.2f}\")\n", " \n", " if not os.path.isdir(numpy_folder + '/logistic'):\n", " os.makedirs(numpy_folder + '/logistic')\n", " logistic_folder = numpy_folder + '/logistic'\n", " joblib.dump(theta, logistic_folder + '/logistic.pkl')\n", " \n", " logistic_score = len(predicted_labels[predicted_labels == np.array(test_Y).reshape(-1,1)]) / len(test_Y)\n", " \n", " return ([log_folder, logistic_folder, logistic_score])" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "def torch_process_logistic(log_folder:str) -> NamedTuple('Outputs', [('logdir',str), ('torchdir',str), ('torchscore',float)]):\n", " \n", " import torch\n", " import joblib\n", " import os\n", "\n", " bow_word_frequency = joblib.load(open(log_folder + '/bow_word_frequency.pkl','rb'))\n", " train_X_tweet = joblib.load(open(log_folder + '/train_X_tweet.pkl','rb'))\n", " test_X_tweet = joblib.load(open(log_folder + '/test_X_tweet.pkl','rb'))\n", " train_Y = joblib.load(open(log_folder + '/train_Y.pkl','rb'))\n", " test_Y = joblib.load(open(log_folder + '/test_Y.pkl','rb'))\n", " \n", " def extract_features(processed_tweet, bow_word_frequency):\n", " features = torch.zeros((1,3))\n", " features[0,0] = 1\n", "\n", " for word in processed_tweet:\n", " features[0,1] = bow_word_frequency.get((word, 1), 0) + features[0,1]\n", " features[0,2] = bow_word_frequency.get((word, 0), 0) + features[0,2]\n", " return features\n", " \n", " train_X_Tensor = torch.zeros((len(train_X_tweet), 3))\n", " for index, tweet in enumerate(train_X_tweet):\n", " train_X_Tensor[index, :] = extract_features(tweet, bow_word_frequency)\n", "\n", " test_X_Tensor = torch.zeros((len(test_X_tweet), 3))\n", " for index, tweet in enumerate(test_X_tweet):\n", " test_X_Tensor[index, :] = extract_features(tweet, bow_word_frequency)\n", "\n", " print(f\"train_X_Tensor {train_X_Tensor.shape}, test_X_Tensor {test_X_Tensor.shape}\")\n", " type(train_X_Tensor)\n", " \n", " def sigmoid(z):\n", " h = 1 / (1+ torch.exp(-z))\n", " return h\n", " \n", " def gradientDescent(x, y, theta, alpha, num_iters, c):\n", "\n", " m = x.shape[0]\n", "\n", " for i in range(0, num_iters):\n", " z = torch.mm(x, theta)\n", " h = sigmoid(z)\n", " J = (-1/m) * ((torch.mm(y.T,torch.log(h)) + torch.mm((1 - y).T, torch.log(1-h))) \n", " + (c * torch.sum(theta)))\n", " theta = theta - (alpha / m) * torch.mm((x.T), (h - y))\n", " J = float(J)\n", " return J, theta\n", "\n", " torch.manual_seed(1)\n", " J, theta = gradientDescent(train_X_Tensor,\n", " torch.reshape(torch.Tensor(train_Y.to_numpy()),(-1,1)),\n", " torch.zeros((3,1)),1e-7,1000,0.1)\n", " print(f\"The cost after training is {J:.8f}.\")\n", " \n", " def predict_tweet(x,theta):\n", " y_pred = sigmoid(torch.mm(x,theta))\n", " return y_pred\n", " \n", " predicted_probs =predict_tweet(test_X_Tensor, theta)\n", " prediceted_probs=torch.tensor(predicted_probs)\n", " predicted_labels = torch.where(predicted_probs >0.5, torch.tensor(1), torch.tensor(0))\n", " print(f\"Pytorch of logistic regression accuracy is {len(predicted_labels[predicted_labels == torch.reshape(torch.Tensor(test_Y.to_numpy()),(-1,1))]) / len(test_Y)*100:.2f}\")\n", " \n", " if not os.path.isdir(log_folder + '/torch'):\n", " os.makedirs(log_folder + '/torch')\n", " torch_folder = log_folder + '/torch'\n", " joblib.dump(theta, torch_folder + '/torch.pkl')\n", " \n", " torch_score = len(predicted_labels[predicted_labels == torch.reshape(torch.Tensor(test_Y.to_numpy()),(-1,1))]) / len(test_Y)\n", " \n", " return ([log_folder, torch_folder, torch_score])" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "def svm_process(log_folder:str, numpy_folder:str) -> NamedTuple('Outputs', [('svmdir',str), ('svmscore',float)]):\n", " import joblib\n", " import os\n", " import numpy as np\n", " from sklearn.metrics import accuracy_score\n", " from sklearn.svm import SVC\n", " from sklearn.preprocessing import StandardScaler\n", "\n", " train_X = joblib.load(open(numpy_folder + '/train_X.pkl','rb'))\n", " test_X = joblib.load(open(numpy_folder + '/test_X.pkl','rb'))\n", " train_Y = joblib.load(open(log_folder + '/train_Y.pkl','rb'))\n", " test_Y = joblib.load(open(log_folder + '/test_Y.pkl','rb'))\n", " \n", " scaler = StandardScaler()\n", " train_X_s = scaler.fit(train_X).transform(train_X)\n", " \n", " clf = SVC(kernel='linear')\n", " t = clf.fit(train_X_s, np.array(train_Y).reshape(-1,1))\n", " y_pred = clf.predict(test_X)\n", " svm_score = accuracy_score(test_Y , y_pred)\n", " \n", " if not os.path.isdir(numpy_folder + '/svm'):\n", " os.makedirs(numpy_folder + '/svm')\n", " svm_folder = numpy_folder + '/svm'\n", " joblib.dump(t, svm_folder + '/svm.pkl')\n", "\n", " return ([svm_folder, svm_score])" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "def accuracy(sklearn_score:float,logistic_score:float,torch_score:float,svm_score:float) -> NamedTuple('Outputs', [('mlpipeline_metrics', 'Metrics')]):\n", " import json\n", "\n", " metrics = {\n", " 'metrics': [\n", " {\n", " 'name': 'sklearn_score',\n", " 'numberValue': sklearn_score,\n", " 'format': \"PERCENTAGE\",\n", " },\n", " {\n", " 'name': 'logistic_score',\n", " 'numberValue': logistic_score,\n", " 'format': \"PERCENTAGE\",\n", " },\n", " {\n", " 'name': 'torch_score',\n", " 'numberValue': torch_score,\n", " 'format': \"PERCENTAGE\",\n", " },\n", " {\n", " 'name': 'svm_score',\n", " 'numberValue': svm_score,\n", " 'format': \"PERCENTAGE\",\n", " },\n", " ]\n", " }\n", " return [json.dumps(metrics)]" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "def http_port(log_folder:str, sklearn_folder:str, logistic_folder:str, torch_folder:str, svm_folder:str):\n", " \n", " import re\n", " import string\n", " import pandas as pd\n", " from random import shuffle\n", " import torch\n", " import numpy as np\n", " import nltk\n", " import joblib\n", " from nltk.corpus import twitter_samples\n", " from nltk.corpus import stopwords\n", " from nltk.stem import PorterStemmer\n", " from nltk.tokenize import TweetTokenizer\n", " from sklearn.model_selection import train_test_split\n", " from tqdm import tqdm\n", " from nltk import data\n", " from flask import Flask,render_template,url_for,request\n", " \n", " data.path.append(log_folder)\n", "\n", " app = Flask(__name__,template_folder='/http-port/templates')\n", "\n", " @app.route('/')\n", " def home():\n", " return render_template('home.html')\n", "\n", " @app.route('/predict', methods=['POST'])\n", " def predict():\n", "\n", " class Preprocess(): \n", " def __init__(self):\n", " self.tokenizer = TweetTokenizer(preserve_case=False, strip_handles=True,reduce_len=True)\n", " self.stopwords_en = stopwords.words('english') \n", " self.punctuation_en = string.punctuation\n", " self.stemmer = PorterStemmer() \n", " def __remove_unwanted_characters__(self, tweet):\n", " tweet = re.sub(r'^RT[\\s]+', '', tweet)\n", " tweet = re.sub(r'https?:\\/\\/.*[\\r\\n]*', '', tweet)\n", " tweet = re.sub(r'#', '', tweet)\n", " tweet = re.sub('\\S+@\\S+', '', tweet)\n", " tweet = re.sub(r'\\d+', '', tweet)\n", " return tweet \n", " def __tokenize_tweet__(self, tweet): \n", " return self.tokenizer.tokenize(tweet) \n", " def __remove_stopwords__(self, tweet_tokens):\n", " tweets_clean = []\n", " for word in tweet_tokens:\n", " if (word not in self.stopwords_en and \n", " word not in self.punctuation_en):\n", " tweets_clean.append(word)\n", " return tweets_clean \n", " def __text_stemming__(self,tweet_tokens):\n", " tweets_stem = [] \n", " for word in tweet_tokens:\n", " stem_word = self.stemmer.stem(word) \n", " tweets_stem.append(stem_word)\n", " return tweets_stem\n", " def preprocess(self, tweets):\n", " tweets_processed = []\n", " for _, tweet in tqdm(enumerate(tweets)): \n", " tweet = self.__remove_unwanted_characters__(tweet) \n", " tweet_tokens = self.__tokenize_tweet__(tweet) \n", " tweet_clean = self.__remove_stopwords__(tweet_tokens)\n", " tweet_stems = self.__text_stemming__(tweet_clean)\n", " tweets_processed.extend([tweet_stems])\n", " return tweets_processed\n", " \n", " def extract_features(processed_tweet, bow_word_frequency):\n", " features = np.zeros((1,3))\n", " features[0,0] = 1\n", " for word in processed_tweet:\n", " features[0,1] = bow_word_frequency.get((word, 1), 0) + features[0,1]\n", " features[0,2] = bow_word_frequency.get((word, 0), 0) + features[0,2]\n", " return features\n", "\n", " def sigmoid(z): \n", " h = 1 / (1+ np.exp(-z))\n", " return h\n", "\n", " def predict_tweet(x, theta_ns):\n", " y_pred = sigmoid(np.dot(x, theta_ns)) \n", " return y_pred\n", "\n", " def extract_features_torch(processed_tweet, bow_word_frequency):\n", " features = torch.zeros((1,3))\n", " features[0,0] = 1\n", " for word in processed_tweet:\n", " features[0,1] = bow_word_frequency.get((word, 1), 0) + features[0,1]\n", " features[0,2] = bow_word_frequency.get((word, 0), 0) + features[0,2]\n", " return features\n", "\n", " def sigmoid_torch(z):\n", " h = 1 / (1+ torch.exp(-z)) \n", " return h\n", "\n", " def predict_tweet_torch(x,theta_toc):\n", " y_pred = sigmoid_torch(torch.mm(x,theta_toc))\n", " return y_pred\n", "\n", " text_processor = Preprocess()\n", " \n", " bow_word_frequency = joblib.load(open(log_folder + '/bow_word_frequency.pkl','rb')) \n", " theta_ns = joblib.load(open(logistic_folder + '/logistic.pkl','rb'))\n", " clf = joblib.load(open(sklearn_folder + '/sklearn.pkl','rb'))\n", " theta_toc = joblib.load(open(torch_folder + '/torch.pkl','rb'))\n", " svm = joblib.load(open(svm_folder + '/svm.pkl','rb'))\n", "\n", " if request.method == 'POST':\n", " message = request.form['message']\n", " data = [message]\n", " data = text_processor.preprocess(data)\n", " \n", " data_o = str(data)\n", " data_o = data_o[2:len(data_o)-2]\n", "\n", " vect = np.zeros((1, 3))\n", " for index, tweet in enumerate(data):\n", " vect[index, :] = extract_features(tweet, bow_word_frequency)\n", " predicted_probs_np = predict_tweet(vect, theta_ns)\n", " my_prediction_np = np.where(predicted_probs_np > 0.5, 1, 0)\n", "\n", " my_prediction_skl = clf.predict(vect)\n", "\n", " vect_Tensor = torch.zeros((1, 3))\n", " for index, tweet in enumerate(data):\n", " vect_Tensor[index, :] = extract_features_torch(\n", " tweet, bow_word_frequency)\n", " predicted_probs_toc = predict_tweet_torch(vect_Tensor, theta_toc)\n", " my_prediction_toc = torch.where(\n", " predicted_probs_toc > 0.5, torch.tensor(1), torch.tensor(0))\n", " \n", " my_prediction_svm = svm.predict(vect)\n", " \n", " return render_template('home.html',\n", " message = message,\n", " data = data_o,\n", " my_prediction_np = my_prediction_np,\n", " my_prediction_skl = my_prediction_skl,\n", " my_prediction_toc = my_prediction_toc,\n", " my_prediction_svm = my_prediction_svm)\n", "\n", " if __name__ == '__main__':\n", " \n", " app.run(debug=True,use_reloader=False)" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "@dsl.pipeline(\n", " name='Cornell nltk pipeline',\n", " description='Writing code by the other way.'\n", ")\n", "\n", "def nltk_pipeline():\n", " \n", " log_folder = '/information'\n", " pvc_name = \"cornell-1000\"\n", "\n", " image = \"dfm871002/nltk_env:2.4.2\"\n", " \n", " vop = dsl.VolumeOp(\n", " name=pvc_name,\n", " resource_name=\"cornell-1000\",\n", " size=\"2Gi\",\n", " modes=dsl.VOLUME_MODE_RWM\n", " )\n", " \n", " dowload_op = func_to_container_op(\n", " func = cornell_sample_dowload_and_preprocess,\n", " base_image = image,\n", " )\n", " \n", " numpy_op = func_to_container_op(\n", " func = numpy_process,\n", " base_image = image,\n", " )\n", " \n", " sklearn_op = func_to_container_op(\n", " func = sklearn_logistic,\n", " base_image = image,\n", " )\n", " \n", " logistic_op = func_to_container_op(\n", " func = logistic,\n", " base_image = image,\n", " )\n", " \n", " torch_op = func_to_container_op(\n", " func = torch_process_logistic,\n", " base_image = image,\n", " )\n", " \n", " svm_op = func_to_container_op(\n", " func = svm_process,\n", " base_image = image,\n", " )\n", " \n", " accuracy_op = func_to_container_op(\n", " func = accuracy,\n", " base_image = image,\n", " )\n", " \n", " http_op = func_to_container_op(\n", " func = http_port,\n", " base_image = image,\n", " )\n", " \n", " dowload_task = dowload_op(log_folder).add_pvolumes({ log_folder:vop.volume, })\n", " \n", " numpy_task = numpy_op(dowload_task.outputs['logdir']).add_pvolumes({ log_folder:vop.volume, })\n", " \n", " svm_task = svm_op(numpy_task.outputs['logdir'], numpy_task.outputs['numpydir']).add_pvolumes({ log_folder:vop.volume, })\n", " \n", " sklearn_task = sklearn_op(\n", " numpy_task.outputs['logdir'],\n", " numpy_task.outputs['numpydir']\n", " ).add_pvolumes({ log_folder:vop.volume, })\n", " \n", " logistic_task = logistic_op(\n", " numpy_task.outputs['logdir'],\n", " numpy_task.outputs['numpydir']\n", " ).add_pvolumes({ log_folder:vop.volume, })\n", " \n", " torch_task = torch_op(\n", " dowload_task.outputs['logdir']\n", " ).add_pvolumes({ log_folder:vop.volume, })\n", " \n", " accuracy_task = accuracy_op(\n", " sklearn_task.outputs['sklearnscore'],\n", " logistic_task.outputs['logisticscore'],\n", " torch_task.outputs['torchscore'],\n", " svm_task.outputs['svmscore']\n", " )\n", " \n", " http_task = http_op(\n", " sklearn_task.outputs['logdir'],\n", " sklearn_task.outputs['sklearndir'],\n", " logistic_task.outputs['logisticdir'],\n", " torch_task.outputs['torchdir'],\n", " svm_task.outputs['svmdir']\n", " ).add_pvolumes({ log_folder:vop.volume, })" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "kfp.compiler.Compiler().compile(nltk_pipeline, 'cornell-1000.zip')" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" } }, "nbformat": 4, "nbformat_minor": 4 }