Bring Your Own Model
Introduction
MindsDB allows you to integrate your own machine learning models into it.
In order to do this your model will require some sort of API wrapper, for now we have two API specifications we support: MLflow and Ray Serve.
The former supports importing already trained models and predicting with them from mindsdb. The later supports both training and predicting with external models.
In order to use custom models there are three mandatory arguments one must past inside the USING
statement:
url.predict
, this is the url to call for getting predictions from your model
format
, this can be either mlflow
or ray_serve
dtype_dict
, this is a JSON specifying all columns expected by their models, and their respective data types. For now, the mapping supports data types used by lightwood, our AutoML library.
There’s an additional optional argument if you want to train the model via MindsDB (only for Ray Serve):
url.train
, which is the endpoint that will be called to train your model
1. Ray Serve
1.1 Simple example - Logistic regression
Ray serve is a simple high-throughput service that can wrap over your own ml models. In this example, we will train and predict with an external scikit-learn model. First, let’s look at the actual model wrapped inside a class that complies with the above requirements:
The important bits here are having train and predict endpoints.
The train
endpoint accept two parameters in the JSON sent via POST:
df
— a serialized dictionary that can be converted into a pandas dataframe
target
— the name of the target column
The predict
endpoint needs only one parameter:
df
— a serialized dictionary that can be converted into a pandas dataframe
The training endpoints must return a JSON that contains the keys status
set to ok
. The predict endpoint must return a dictionary containing the prediction
key, storing the predictions. Additional keys can be returned for confidence and confidence intervals.
Once you start this RayServe-wrapped model you can train it using a query like this one:
And you can query predictions as usual, either by conditioning on a subset of input colums:
Or by JOINING
to do batch predictions:
Please note that, if your model is behind a reverse proxy (e.g. nginx) you might have to increase the maximum limit for POST requests in order to receive the training data. MindsDB itself can send as much as you’d like and has been stress-tested with over a billion rows.
1.2. Example - Keras NLP model
For this example, we will consider a natural language processing (NLP) task where we want to train a neural network with Keras to detect if a tweet is related to a natural disaster (fires, earthquakes, etc.). Please download this dataset to follow the example.
The code for the model here is a bit more complex than in section 1.1, but the same rules apply: we create a Ray Server based service that wraps around a Kaggle NLP Model which can be trained and then used for predictions:
import re
import time
import json
import string
import requests
from collections import Counter, defaultdict
import ray
from ray import serve
import gensim
import numpy as np
import pandas as pd
from tqdm import tqdm
from nltk.util import ngrams
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from fastapi import Request, FastAPI
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense, SpatialDropout1D
from tensorflow.keras.initializers import Constant
from tensorflow.keras.optimizers import Adam
app = FastAPI()
stop = set(stopwords.words('english'))
async def parse_req(request: Request):
data = await request.json()
target = data.get('target', None)
di = json.loads(data['df'])
df = pd.DataFrame(di)
return df, target
@serve.deployment(route_prefix="/nlp_kaggle_model")
@serve.ingress(app)
class Model:
MAX_LEN = 100
GLOVE_DIM = 50
EPOCHS = 10
def __init__(self):
self.model = None
@app.post("/train")
async def train(self, request: Request):
df, target = await parse_req(request)
target_arr = df.pop(target).values
df = self.preprocess_df(df)
train_corpus = self.create_corpus(df)
self.embedding_dict = {}
with open('./glove.6B.50d.txt', 'r') as f:
for line in f:
values = line.split()
word = values[0]
vectors = np.asarray(values[1:], 'float32')
self.embedding_dict[word] = vectors
f.close()
self.tokenizer_obj = Tokenizer()
self.tokenizer_obj.fit_on_texts(train_corpus)
sequences = self.tokenizer_obj.texts_to_sequences(train_corpus)
tweet_pad = pad_sequences(sequences, maxlen=self.__class__.MAX_LEN, truncating='post', padding='post')
df = tweet_pad[:df.shape[0]]
word_index = self.tokenizer_obj.word_index
num_words = len(word_index) + 1
embedding_matrix = np.zeros((num_words, self.__class__.GLOVE_DIM))
for word, i in tqdm(word_index.items()):
if i > num_words:
continue
emb_vec = self.embedding_dict.get(word)
if emb_vec is not None:
embedding_matrix[i] = emb_vec
self.model = Sequential()
embedding = Embedding(num_words,
self.__class__.GLOVE_DIM,
embeddings_initializer=Constant(embedding_matrix),
input_length=self.__class__.MAX_LEN,
trainable=False)
self.model.add(embedding)
self.model.add(SpatialDropout1D(0.2))
self.model.add(LSTM(64, dropout=0.2, recurrent_dropout=0.2))
self.model.add(Dense(1, activation='sigmoid'))
optimzer = Adam(learning_rate=1e-5)
self.model.compile(loss='binary_crossentropy', optimizer=optimzer, metrics=['accuracy'])
X_train, X_test, y_train, y_test = train_test_split(df, target_arr, test_size=0.15)
self.model.fit(X_train, y_train, batch_size=4, epochs=self.__class__.EPOCHS, validation_data=(X_test, y_test), verbose=2)
return {'status': 'ok'}
@app.post("/predict")
async def predict(self, request: Request):
df, _ = await parse_req(request)
df = self.preprocess_df(df)
test_corpus = self.create_corpus(df)
sequences = self.tokenizer_obj.texts_to_sequences(test_corpus)
tweet_pad = pad_sequences(sequences, maxlen=self.__class__.MAX_LEN, truncating='post', padding='post')
df = tweet_pad[:df.shape[0]]
y_pre = self.model.predict(df)
y_pre = np.round(y_pre).astype(int).flatten().tolist()
sub = pd.DataFrame({'target': y_pre})
pred_dict = {'prediction': [float(x) for x in sub['target'].values]}
return pred_dict
def preprocess_df(self, df):
df = df[['text']]
df['text'] = df['text'].apply(lambda x: self.remove_URL(x))
df['text'] = df['text'].apply(lambda x: self.remove_html(x))
df['text'] = df['text'].apply(lambda x: self.remove_emoji(x))
df['text'] = df['text'].apply(lambda x: self.remove_punct(x))
return df
def remove_URL(self, text):
url = re.compile(r'https?://\S+|www\.\S+')
return url.sub(r'', text)
def remove_html(self, text):
html = re.compile(r'<.*?>')
return html.sub(r'', text)
def remove_punct(self, text):
table = str.maketrans('', '', string.punctuation)
return text.translate(table)
def remove_emoji(self, text):
emoji_pattern = re.compile("["
u"\U0001F600-\U0001F64F"
u"\U0001F300-\U0001F5FF"
u"\U0001F680-\U0001F6FF"
u"\U0001F1E0-\U0001F1FF"
u"\U00002702-\U000027B0"
u"\U000024C2-\U0001F251"
"]+", flags=re.UNICODE)
return emoji_pattern.sub(r'', text)
def create_corpus(self, df):
corpus = []
for tweet in tqdm(df['text']):
words = [word.lower() for word in word_tokenize(tweet) if ((word.isalpha() == 1) & (word not in stop))]
corpus.append(words)
return corpus
if __name__ == '__main__':
ray.init()
serve.start(detached=True)
Model.deploy()
while True:
time.sleep(1)
We need access to the training data, so we’ll create a table called nlp_kaggle_train
to load the dataset that the original model uses.
And ingest it into a table with the following schema:
Note: specifics of the schema and how to ingest the csv will vary depending on your database.
Next, we can register and train the above custom model using the following query:
Training will take a while given that this model is a neural network rather than a simple logistic regression. You can check its status with the query SELECT * FROM mindsdb.predictors WHERE name = 'byom_ray_serve_nlp';
, much like you’d do with a “normal” MindsDB predictor.
Once the predictor’s status becomes trained
we can query it for predictions as usual:
Which would, hopefully, output 1
. Alternatively, we can try out this tweet to expect 0
as an output:
If your results do not match this example, it could help to train the model for a longer amount of epochs.
2. MLFlow
2.1 Simple example - Logistic Regression
MLFlow is a tool that you can use to train and serve models, among other features like organizing experiments, tracking metrics, etc.
Given there is no way to train an MLflow-wrapped model using its API, you will have to train your models outside of MindsDB by pulling your data manually (i.e. with a script), ideally using a MLflow run or experiment.
The first step would be to create a script where you train a model and save it using one of the saving methods that MLflow exposes. For this example, we will use the model in this simple tutorial where the method is mlflow.sklearn.log_model
(here), given that the model is built with scikit-learn.
Once trained, you need to make sure the model is served and listening for input in a URL of your choice (note, this can mean your model can run on a different machine than the one executing MindsDB). Let’s assume this URL to be http://localhost:5000/invocations
for now.
This means you would execute the following command in your terminal, from the directory where the model was stored:
mlflow models serve --model-uri runs:/<run-id>/model
With <run-id>
given in the output of the command python train.py
used for actually training the model.
Next, we’re going to bring this model into MindsDB:
We can now run predictions as usual, by using the WHERE
statement or joining on a data table with an appropriate schema:
2.2. Advanced example - Keras NLP model
Same use case as in section 1.2, be sure to download the dataset to reproduce the steps here. In this case, we will take a look at the best practices when your model needs custom data preprocessing code (which, realistically, will be fairly common).
The key difference is that we now need to use the mlflow.pyfunc
module to both 1) save the model using mlflow.pyfunc.save_model
and 2) subclass mlflow.pyfunc.PythonModel
to wrap the model in an MLflow-compatible way that will enable our custom inference logic to be called.
Saving the model
In the same script where you train the model (which you can find in the final section of 2.2) there should be a call at the end where you actually use mlflow to save every produced artifact:
Here, artifacts
will be a dictionary with all expected produced outputs when running the training phase. In this case, we want both a model and a tokenizer to preprocess the input text. On the other hand, conda_env
specifies the environment under which your model should be executed once served in a self-contained conda environment, so it should include all required packages and dependencies. For this example, they look like this:
Finally, to actually store the model you need to provide the wrapper class that will 1) load all produced artifacts into an accessible “context” and 2) implement all required inference logic:
As you can see, here we are loading multiple artifacts and using them to guarantee the input data will be in the same format that was used when training. Ideally, you would abstract this even further into a single preprocess
method that is called both at training time and inference time.
Finally, serving is simple. Go to the directory where you called the above script, and execute mlflow models serve --model-uri ./nlp_kaggle
.
At this point, the rest is essentially the same as in the previous example. You can link the MLflow model with these SQL statements:
To get predictions, you can directly pass input data using the WHERE
clause:
Or you can JOIN
with a data table. For this, you should ensure the table actually exists and that the database it belongs to has been connected to your MindsDB instance. For more details, refer to the same steps in the Ray Serve example (section 1.2).
Full Script
Finally, for reference, here’s the full script that trains and saves the model. The model is exactly the same as in section 1.2, so it may seem familiar.
import re
import pickle
import string
import mlflow.pyfunc
import nltk
import tqdm
import sklearn
import tensorflow
import cloudpickle
import numpy as np
import pandas as pd
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from sklearn.model_selection import train_test_split
from tensorflow.keras.initializers import Constant
from tensorflow.keras.layers import Embedding, LSTM, Dense, SpatialDropout1D
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.models import load_model
stop = set(stopwords.words('english'))
MAX_LEN = 100
GLOVE_DIM = 50
EPOCHS = 10
def preprocess_df(df):
df = df[['text']]
funcs = [remove_URL, remove_html, remove_emoji, remove_punct]
for fn in funcs:
df['text'] = df['text'].apply(lambda x: fn(x))
return df
def remove_URL(text):
url = re.compile(r'https?://\S+|www\.\S+')
return url.sub(r'', text)
def remove_html(text):
html = re.compile(r'<.*?>')
return html.sub(r'', text)
def remove_punct(text):
table = str.maketrans('', '', string.punctuation)
return text.translate(table)
def remove_emoji(text):
emoji_pattern = re.compile("["
u"\U0001F600-\U0001F64F"
u"\U0001F300-\U0001F5FF"
u"\U0001F680-\U0001F6FF"
u"\U0001F1E0-\U0001F1FF"
u"\U00002702-\U000027B0"
u"\U000024C2-\U0001F251"
"]+", flags=re.UNICODE)
return emoji_pattern.sub(r'', text)
def create_corpus(df):
corpus = []
for tweet in tqdm.tqdm(df['text']):
words = [word.lower() for word in word_tokenize(tweet) if ((word.isalpha() == 1) & (word not in stop))]
corpus.append(words)
return corpus
class Model(mlflow.pyfunc.PythonModel):
def load_context(self, context):
self.model_path = context.artifacts['model']
with open(context.artifacts['tokenizer_path'], 'rb') as f:
self.tokenizer = pickle.load(f)
self.model = load_model(self.model_path)
def predict(self, context, model_input):
df = preprocess_df(model_input)
corpus = create_corpus(df)
sequences = self.tokenizer.texts_to_sequences(corpus)
tweet_pad = pad_sequences(sequences, maxlen=MAX_LEN, truncating='post', padding='post')
df = tweet_pad[:df.shape[0]]
y_pre = self.model.predict(df)
y_pre = np.round(y_pre).astype(int).flatten().tolist()
return list(y_pre)
if __name__ == '__main__':
train_model = True
model_path = './'
tokenizer_path = './tokenizer.pkl'
run_name = 'test_run'
mlflow_pyfunc_model_path = "nlp_kaggle"
mlflow.set_tracking_uri("sqlite:///mlflow.db")
if train_model:
df = pd.read_csv('./train.csv')
target = df[['target']]
target_arr = target.values
df = preprocess_df(df)
train_corpus = create_corpus(df)
embedding_dict = {}
with open('./glove.6B.50d.txt', 'r') as f:
for line in f:
values = line.split()
word = values[0]
vectors = np.asarray(values[1:], 'float32')
embedding_dict[word] = vectors
f.close()
tokenizer_obj = Tokenizer()
tokenizer_obj.fit_on_texts(train_corpus)
with open(tokenizer_path, 'wb') as f:
pickle.dump(tokenizer_obj, f)
sequences = tokenizer_obj.texts_to_sequences(train_corpus)
tweet_pad = pad_sequences(sequences, maxlen=MAX_LEN, truncating='post', padding='post')
df = tweet_pad[:df.shape[0]]
word_index = tokenizer_obj.word_index
num_words = len(word_index) + 1
embedding_matrix = np.zeros((num_words, GLOVE_DIM))
for word, i in tqdm.tqdm(word_index.items()):
if i > num_words:
continue
emb_vec = embedding_dict.get(word)
if emb_vec is not None:
embedding_matrix[i] = emb_vec
X_train, X_test, y_train, y_test = train_test_split(df, target_arr, test_size=0.15)
model = Sequential()
embedding = Embedding(num_words,
GLOVE_DIM,
embeddings_initializer=Constant(embedding_matrix),
input_length=MAX_LEN,
trainable=False)
model.add(embedding)
model.add(SpatialDropout1D(0.2))
model.add(LSTM(64, dropout=0.2, recurrent_dropout=0.2))
model.add(Dense(1, activation='sigmoid'))
optimzer = Adam(learning_rate=1e-5)
model.compile(loss='binary_crossentropy', optimizer=optimzer, metrics=['accuracy'])
model.fit(X_train, y_train, batch_size=4, epochs=EPOCHS, validation_data=(X_test, y_test), verbose=2)
model.save(model_path)
artifacts = {
'model': model_path,
'tokenizer_path': tokenizer_path,
}
conda_env = {
'channels': ['defaults'],
'dependencies': [
'python=3.8',
'pip',
{
'pip': [
'mlflow',
'tensorflow=={}'.format(tensorflow.__version__),
'cloudpickle=={}'.format(cloudpickle.__version__),
'nltk=={}'.format(nltk.__version__),
'pandas=={}'.format(pd.__version__),
'numpy=={}'.format(np.__version__),
'scikit-learn=={}'.format(sklearn.__version__),
'tqdm=={}'.format(tqdm.__version__)
],
},
],
'name': 'nlp_keras_env'
}
with mlflow.start_run(run_name=run_name) as run:
mlflow.pyfunc.save_model(
path=mlflow_pyfunc_model_path,
python_model=Model(),
conda_env=conda_env,
artifacts=artifacts)
result = mlflow.register_model(
f"runs:/{run.info.run_id}/{mlflow_pyfunc_model_path}",
f"{mlflow_pyfunc_model_path}"
)