Skip to main content

MLOps Code Snippets

  1. Dask-ML-Parallelize model training:
  • Use Dask-ML to train and evaluate your machine-learning models in parallel, leveraging the full power of your hardware.
  • With Dask-ML, you can quickly scale your machine learning workloads across multiple cores, processors, or even clusters, making it easy to train and evaluate large models on large datasets.
import dask_ml.model_selection as dcv  
from sklearn.datasets import make_classification
from sklearn.svm import SVC

# Create a large dataset
X, y = make_classification(n_samples=100000, n_features=20, random_state=42)

# Define your model
model = SVC()

# Train your model in parallel using Dask-ML
params = {"C": dcv.Categorical([0.1, 1, 10]), "kernel": dcv.Categorical(["linear", "rbf"])}
search = dcv.RandomizedSearchCV(model, params, n_iter=10, cv=3)
search.fit(X, y)

Check for more information.

2. Feature Tools: Featuretools is an open-source Python library for automated feature engineering, allowing you to generate new features from your raw data with minimal manual effort.

import featuretools as ft  

# Load your raw data into an entityset
es = ft.EntitySet(id="my_data")
es = es.entity_from_dataframe(entity_id="customers", dataframe=data, index="customer_id")

# Define relationships between entities
# ...

# Automatically generate new features
feature_matrix, feature_defs = ft.dfs(entityset=es, target_entity="customers", max_depth=2)

For more information please check.

3. Tensorboard: TensorBoard is a powerful visualization tool for TensorFlow that allows you to monitor your model’s performance and track various metrics during training and evaluation.

import tensorflow as tf  
from tensorflow.keras.callbacks import TensorBoard

# Define your model
model = tf.keras.Sequential([...])

# Compile your model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Create a TensorBoard callback
tensorboard_callback = TensorBoard(log_dir="./logs")

# Train your model with the TensorBoard callback
model.fit(X_train, y_train, epochs=10, validation_data=(X_test, y_test), callbacks=[tensorboard_callback])

For more information please check.

4. Tensorflow Serving:

  • TensorFlow Serving is a high-performance serving system for machine learning models, designed for production environments.
  • TensorFlow Serving supports multiple models, model versioning, and automatic loading and unloading of models, making it easy to manage and serve your machine learning models at scale.
# Save your TensorFlow model in the SavedModel format  
model.save("my_model/1/")

# Install TensorFlow Serving
echo "deb [arch=amd64] http://storage.googleapis.com/tensorflow-serving-apt stable tensorflow-model-server tensorflow-model-server-universal" | sudo tee /etc/apt/sources.list.d/tensorflow-serving.list &&
curl https://storage.googleapis.com/tensorflow-serving-apt/tensorflow-serving.release.pub.gpg | sudo apt-key add -
sudo apt-get update && sudo apt-get install tensorflow-model-server

# Start TensorFlow Serving with your model
tensorflow_model_server --rest_api_port=8501 --model_name=my_model --model_base_path=$(pwd)/my_model

For more information please check.

5. Automate hyperparameter tuning with Optuna: Optuna is a powerful and flexible optimization library that can automatically explore and optimize hyperparameters for your machine-learning models.

import optuna  
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier

def objective(trial):
n_estimators = trial.suggest_int("n_estimators", 10, 200)
max_depth = trial.suggest_int("max_depth", 3, 20)

clf = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth)
score = cross_val_score(clf, X_train, y_train, cv=5).mean()

return score

study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=50)

best_params = study.best_params

For more information please check.

6. SHAP: Use SHAP (SHapley Additive exPlanations) to explain the output of your machine learning models and gain insights into their behavior.

import shap  
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor

# Load and prepare your data
# ...

# Train a RandomForestRegressor
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestRegressor()
model.fit(X_train, y_train)

# Explain the model's predictions using SHAP
explainer = shap.Explainer(model)
shap_values = explainer(X_test)

# Plot the SHAP values for a single prediction
shap.plots.waterfall(shap_values[0])

For more information please check.

7. Ray: Ray Tune is a powerful and flexible library for distributed hyperparameter tuning, allowing you to leverage the full power of your hardware to optimize your machine learning models.

from ray import tune  
from ray.tune.schedulers import ASHAScheduler
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier

def train_model(config):
n_estimators = config["n_estimators"]
max_depth = config["max_depth"]

clf = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth)
score = cross_val_score(clf, X_train, y_train, cv=3).mean()

tune.report(mean_accuracy=score)

# Load your data
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)

# Define the search space for hyperparameters
config = {
"n_estimators": tune.randint(10, 200),
"max_depth": tu:ne.randint(3, 20)
}

# Set up Ray Tune
scheduler = ASHAScheduler(metric="mean_accuracy", mode="max")
analysis = tune.run(train_model, config=config, scheduler=scheduler, num_samples=50)

# Get the best hyperparameters
best_params = analysis.best_config

For more information please check.

8. Experiment tracking with MLflow: MLflow, you can compare different runs, reproduce previous results, and share your work with others, making collaboration and iteration more efficient.

import mlflow  
import mlflow.sklearn
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# Load your data
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

# Train your model and log metrics with MLflow
with mlflow.start_run():
clf = RandomForestClassifier(n_estimators=100, max_depth=10)
clf.fit(X_train, y_train)

train_accuracy = clf.score(X_train, y_train)
test_accuracy = clf.score(X_test, y_test)

mlflow.log_param("n_estimators", 100)
mlflow.log_param("max_depth", 10)
mlflow.log_metric("train_accuracy", train_accuracy)
mlflow.log_metric("test_accuracy", test_accuracy)

mlflow.sklearn.log_model(clf, "model")

For more information please check.

9. Scikit-learn: Pipeline: Use Scikit-learn **Pipeline** to chain multiple preprocessing steps and a final estimator.

from sklearn.pipeline import Pipeline  
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

pipe = Pipeline([
("scaler", StandardScaler()),
("classifier", LogisticRegression())
])

pipe.fit(X_train, y_train)

10. Scikit-learn: Grid search: Use GridSearchCV to perform hyperparameter tuning.

from sklearn.model_selection import GridSearchCV  

param_grid = {
"classifier__C": [0.1, 1, 10],
"classifier__penalty": ["l1", "l2"]
}

grid_search = GridSearchCV(pipe, param_grid, cv=5)
grid_search.fit(X_train, y_train)

11. Joblib:joblib is a popular library for saving and loading Scikit-learn models. Use dump() to save a model to a file, and load() to restore the model from the file.

import joblib  

# Save the model
joblib.dump(grid_search.best_estimator_, "model.pkl")

# Load the model
loaded_model = joblib.load("model.pkl")

12. Tensorflow: Simple neural network. Use the Keras API to define a simple feedforward neural network with dense (fully connected) layers.

import tensorflow as tf  

model = tf.keras.Sequential([
tf.keras.layers.Dense(64, activation="relu", input_shape=(10,)),
tf.keras.layers.Dense(32, activation="relu"),
tf.keras.layers.Dense(1, activation="sigmoid")
])

model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])

13. Early Stopping: Code snippet for early stopping

early_stopping = tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=3)  

history = model.fit(X_train, y_train, epochs=100, validation_split=0.2, callbacks=[early_stopping])

14. Tensorflow Model-Save and Load: Use the save() method to save the model architecture, weights, and optimizer state to a single file. Use load_model() to restore the saved model from the file.

# Save the model  
model.save("model.h5")

# Load the model
loaded_model = tf.keras.models.load_model("model.h5")

15. Dask: Parallelize operations: Use Dask to parallelize operations on large datasets.

import dask.array as da  

x = da.ones((10000, 10000), chunks=(1000, 1000))
y = x + x.T
z = y.sum(axis=0)
result = z.compute()
  1. TPOT: Automated machine learning: TPOT (Tree-based Pipeline Optimization Tool) is a genetic algorithm-based automated machine learning library. Use TPOTClassifier or TPOTRegressor to optimize a machine learning pipeline for your data.
from tpot import TPOTClassifier  

tpot = TPOTClassifier(generations=5, population_size=20, verbosity=2)
tpot.fit(X_train, y_train)

For more information please check.

17. Category Encoders: Category Encoders is a library that provides various encoding methods for categorical variables, such as target encoding, one-hot encoding, and ordinal encoding.

import category_encoders as ce  

encoder = ce.TargetEncoder()
X_train_encoded = encoder.fit_transform(X_train, y_train)
X_test_encoded = encoder.transform(X_test)

18. Imbalanced-learn: is a library that provides various techniques for handling imbalanced datasets, such as oversampling, undersampling, and combination methods. Use the appropriate resampling technique, such as SMOTE, to balance your dataset before training your model.

from imblearn.over_sampling import SMOTE  

smote = SMOTE()
X_resampled, y_resampled = smote.fit_resample(X_train, y_train)

For more information please check.

19. Auto-sklearn: is an automated machine learning library that wraps Scikit-learn, providing the automated model and preprocessing selection. Use AutoSklearnClassifier or AutoSklearnRegressor to optimize a machine learning pipeline data.

from autosklearn.classification import AutoSklearnClassifier  

auto_classifier = AutoSklearnClassifier(time_left_for_this_task=600)
auto_classifier.fit(X_train, y_train)

20. Scikit-learn: Column Transformer: ColumnTransformer allows you to apply different preprocessing steps to different columns of your input data, which is particularly useful when dealing with mixed data types.

from sklearn.compose import ColumnTransformer  
from sklearn.preprocessing import StandardScaler, OneHotEncoder

preprocessor = ColumnTransformer(
transformers=[
("num", StandardScaler(), ["numerical_feature_1", "numerical_feature_2"]),
("cat", OneHotEncoder(), ["categorical_feature"]),
]
)

X_train_transformed = preprocessor.fit_transform(X_train)
X_test_transformed = preprocessor.transform(X_test)

21. RandomizedSearchCV is an alternative to GridSearchCV that searches the parameter space more efficiently by randomly sampling a fixed number of parameter settings. Define a parameter distribution as a dictionary, where the keys are the parameter names (including the step name if using a pipeline) and the values are distributions from which to sample parameter values. Pass the model (or pipeline) and parameter distribution to RandomizedSearchCV and fit the data.

from sklearn.model_selection import RandomizedSearchCV  
from scipy.stats import uniform

param_dist = {
"classifier__C": uniform(loc=0, scale=4),
"preprocessor__num__with_mean": [True, False],
}

random_search = RandomizedSearchCV(pipe, param_dist, n_iter=10, cv=5, scoring="accuracy")
random_search.fit(X_train, y_train)

22. TensorFlow Data Validation: Use TensorFlow Data Validation (TFDV) to validate and explore your data.

import tensorflow_data_validation as tfdv  

stats = tfdv.generate_statistics_from_csv(data_location="train.csv")
schema = tfdv.infer_schema(statistics=stats)
tfdv.display_schema(schema=schema)

23. TensorFlow Model Analysis: Use TensorFlow Model Analysis (TFMA) to evaluate your TensorFlow models.

import tensorflow_model_analysis as tfma  

eval_shared_model = tfma.default_eval_shared_model(
eval_saved_model_path="path/to/saved_model"
)
results = tfma.run_model_analysis(
eval_shared_model=eval_shared_model,
data_location="test.tfrecords",
file_format="tfrecords",
slice_spec=[tfma.slicer.SingleSliceSpec()]
)
tfma.view.render_slicing_metrics(results)

24. TensorFlow Transform: Use TensorFlow Transform (TFT) to preprocess your data for TensorFlow models.

import tensorflow_transform as tft  

def preprocessing_fn(inputs):
outputs = {}
outputs["scaled_feature"] = tft.scale_to_z_score(inputs["numerical_feature"])
outputs["one_hot_feature"] = tft.compute_and_apply_vocabulary(inputs["categorical_feature"])
return outputs

25. TensorFlow Extended (TFX): Use TensorFlow Extended (TFX) to create end-to-end machine learning pipelines.

from tfx.components import CsvExampleGen, Trainer  
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

context = InteractiveContext()

example_gen = CsvExampleGen(input_base="path/to/data")
context.run(example_gen)

trainer = Trainer(
module_file="path/to/trainer_module.py",
examples=example_gen.outputs["examples"],
train_args=trainer_pb2.TrainArgs(num_steps=10000),
eval_args=trainer_pb2.EvalArgs(num_steps=5000)
)
context.run(trainer)

26. CuPy: CuPy is a library that provides a NumPy-like interface for GPU-accelerated computing. Use CuPy arrays, which have a similar interface to NumPy arrays, to perform computations on GPU. Many common NumPy functions are available in CuPy, allowing you to perform GPU-accelerated computations with familiar syntax.

import cupy as cp  

x = cp.array([1, 2, 3, 4, 5])
y = cp.array([6, 7, 8, 9, 10])

z = cp.dot(x, y)

27. RAPIDS is a suite of GPU-accelerated libraries for data science, including cuDF (GPU-accelerated DataFrame library similar to Pandas) and cuML (GPU-accelerated machine learning library similar to Scikit-learn). Use cuDF DataFrames to perform data manipulation tasks on GPU, and cuML models to train and evaluate machine learning models on GPU.

import cudf  
import cuml

df = cudf.read_csv("data.csv")
kmeans_model = cuml.KMeans(n_clusters=5)
kmeans_model.fit(df)

28. FastAPI is a modern, high-performance web framework for building APIs with Python, particularly suitable for machine learning models.

  • Create an instance of FastAPI, and define API endpoints using decorators, such as @app.post().
  • Use uvicorn to run your FastAPI application, specifying the host and port.
from fastapi import FastAPI  
import uvicorn

app = FastAPI()

@app.post("/predict")
async def predict(text: str):
prediction = model.predict([text])
return {"prediction": prediction}

if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

29. Streamlit is a library for quickly creating interactive web applications for machine learning and data science, using only Python.

  • Use Streamlit’s simple API to create user interface elements, such as text inputs and sliders, and display output or visualizations.
  • Run the Streamlit app using the command streamlit run app.py in your terminal.
import streamlit as st  

st.title("My Streamlit App")

input_text = st.text_input("Enter some text:")
st.write(f"You entered: {input_text}")

slider_value = st.slider("Select a value:", 0, 100, 50)
st.write(f"Slider value: {slider_value}")

For more information please check.

30. Docker File: Create a Dockerfile to define a custom Docker image for your machine learning application.

FROM python:3.8  

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "app.py"]
  • Use the FROM keyword to specify the base image, such as the official Python image.
  • Use the WORKDIR keyword to set the working directory for subsequent instructions.
  • Use the COPY keyword to copy files and directories from the host system to the image.
  • Use the RUN keyword to execute commands during the build process, such as installing dependencies.
  • Use the CMD keyword to define the default command to run when the container starts.

31. Build a docker image:

docker build -t my_ml_app:latest .

32. Run a docker container: Use the docker run command to create and start a Docker container from an image. Use the -p flag to map a host port to a container port, allowing external access to services running inside the container.

docker run -p 5000:5000 my_ml_app:latest

32. Kubernetes YAML Config File:

apiVersion: apps/v1  
kind: Deployment
metadata:
name: my-ml-app
spec:
replicas: 3
selector:
matchLabels:
app: my-ml-app
template:
metadata:
labels:
app: my-ml-app
spec:
containers:
- name: my-ml-app-container
image: my_ml_app:latest
ports:
- containerPort: 5000

---

apiVersion: v1
kind: Service
metadata:
name: my-ml-app-service
spec:
selector:
app: my-ml-app
ports:
- protocol: TCP
port: 80
targetPort: 5000
type: LoadBalancer
  • Use apiVersion, kind, and metadata to define the Kubernetes resource type and metadata.
  • Use spec to define the desired state of the resource, such as the number of replicas, container images, and exposed ports.
  • Use the --- separator to define multiple resources in the same file, such as a Deployment and a Service.

33. kubectl: Use the kubectl command-line tool to manage the Kubernetes cluster and resources.

# Apply the Kubernetes configuration file  
kubectl apply -f my_ml_app.yaml

# List all deployments
kubectl get deployments

# List all services
kubectl get services

# Scale the deployment
kubectl scale deployment my-ml-app --replicas=5

# Delete the deployment and service
kubectl delete -f my_ml_app.yaml

34. Organize your project:

my_ml_project/  
|-- data/
| |-- raw/
| |-- processed/
|-- models/
|-- notebooks/
|-- src/
| |-- features/
| |-- models/
| |-- utils/
|-- Dockerfile
|-- requirements.txt
  • Use separate directories for data, models, notebooks, and source code.
  • Further, subdivide directories to separate raw and processed data or different types of source code modules.

35. Model versioning: Use model versioning tools like DVC or MLflow to track different versions of your trained machine learning models.

  • Store model artifacts (e.g., weights, metadata) in a centralized storage system, such as Amazon S3 or Google Cloud Storage.
  • Use a versioning tool to keep track of model versions, their associated training data, and hyperparameters.
  • Enable easy model comparison and reproducibility by tracking performance metrics and training configurations.

36. Automated testing:

  • Use testing libraries like unittest or pytest to write and run tests.
  • Test individual functions and classes with unit tests, and test interactions between components with integration tests.
  • Perform end-to-end tests to ensure the entire system works as expected, including model serving and API endpoints.

37. Papermill:

  • Papermill allows you to parameterize Jupyter Notebooks by injecting new values for specific cells.
  • Execute Notebooks programmatically and generate reports with different parameter values without manual intervention.
import papermill as pm  

pm.execute_notebook(
input_path='input_notebook.ipynb',
output_path='output_notebook.ipynb',
parameters={'param1': 'value1', 'param2': 'value2'}
)

38. Environment management: tools like Conda or virtualenv to create isolated environments for projects.

# Create a new Conda environment  
conda create -n my_ml_env python=3.8

# Activate the environment
conda activate my_ml_env

# Install packages
conda install pandas scikit-learn

# Deactivate the environment
conda deactivate

39. Progressive model loading: Load large models in chunks to reduce memory consumption and improve performance.

import numpy as np  
import pandas as pd
from sklearn.linear_model import LinearRegression

chunksize = 10000
model = LinearRegression()

for i, chunk in enumerate(pd.read_csv("large_dataset.csv", chunksize=chunksize)):
X_chunk = chunk.drop("target", axis=1)
y_chunk = chunk["target"]
model.partial_fit(X_chunk, y_chunk)
print(f"Processed chunk {i + 1}")

40. Feature encoding:

  • Feature encoding techniques transform categorical variables into numerical representations that machine learning models can use.
  • One-hot encoding creates binary columns for each category, while target encoding replaces each category with the mean of the target variable for that category.
import pandas as pd  
from sklearn.preprocessing import OneHotEncoder

data = pd.DataFrame({"Category": ["A", "B", "A", "C"]})

encoder = OneHotEncoder()
encoded_data = encoder.fit_transform(data)

print(encoded_data.toarray())

41. Data validation: Validate the quality and consistency of your data using data validation frameworks like Great Expectations, Pandera, or custom validation functions.

import pandera as pa  
from pandera import DataFrameSchema, Column, Check

schema = DataFrameSchema({
"age": Column(pa.Int, Check(lambda x: 18 <= x <= 100)),
"income": Column(pa.Float, Check(lambda x: x >= 0)),
"gender": Column(pa.String, Check(lambda x: x in ["M", "F", "Other"])),
})

# Validate your DataFrame
validated_df = schema.validate(df)

42. Data versioning: Use data versioning tools like DVC or Pachyderm to track changes to your datasets and ensure reproducibility across different experiments and model versions.

# Initialize DVC in your project  
dvc init

# Add your dataset to DVC
dvc add data/my_dataset

# Commit the changes to your Git repository
git add data/my_dataset.dvc .dvc/config
git commit -m "Add my_dataset to DVC"

43. Use feature stores: Implement feature stores like Feast or Hopsworks to store, manage, and serve features for machine learning models.

from feast import FeatureStore  

# Initialize the feature store
store = FeatureStore(repo_path="path/to/your/feature_store")

# Fetch features for training
training_df = store.get_historical_features(
entity_df=entity_df,
feature_refs=["your_feature_name"]
).to_df()

# Fetch features for serving
feature_vector = store.get_online_features(
feature_refs=["your_feature_name"],
entity_rows=[{"your_entity_key": "your_value"}]
).to_dict()

Feature stores can help you centralize the management of your features, ensuring consistency and reducing duplication across different models and experiments.

44. Feature scaling: Apply feature scaling techniques like MinMax scaling, standard scaling, or normalization to ensure that your features have similar scales and distributions.

from sklearn.datasets import load_iris  
from sklearn.preprocessing import StandardScaler

X, y = load_iris(return_X_y=True)

# Scale features using standard scaling
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

45. Dimensionality reduction: Apply dimensionality reduction techniques like PCA, t-SNE, or UMAP to reduce the number of features in your dataset while preserving important patterns and relationships.

from sklearn.datasets import load_iris  
from sklearn.decomposition import PCA

X, y = load_iris(return_X_y=True)

# Apply PCA to reduce the dimensionality of the dataset
pca = PCA(n_components=2)
X_reduced = pca.fit_transform(X)

46. Pandas chaining: Chain Pandas operations together to create more readable and concise data manipulation code.

import pandas as pd  

data = pd.read_csv("my_data.csv")

# Chain Pandas operations
result = (
data.query("age >= 30")
.groupby("city")
.agg({"salary": "mean"})
.sort_values("salary", ascending=False)
)

47. Use the ‘pipe’ function: Use the pipe function to integrate custom functions or operations in your Pandas chaining workflow.

import pandas as pd  

def custom_operation(df, column, value):
return df[df[column] > value]

data = pd.read_csv("data.csv")

# Integrate custom operations using 'pipe'
result = (
data.pipe(custom_operation, "age", 18)
.groupby("city")
.agg({"salary": "mean"})
.sort_values("salary", ascending=False)
)

48. Pandas’ built-in plotting: Use Pandas’ built-in plotting functions for quick and easy data visualization.

import pandas as pd  

data = pd.read_csv("my_data.csv")

# Create a bar plot of average salary by city
data.groupby("city")["salary"].mean().plot(kind="bar")

49. Visualize missing data with Missingno: Use the Missingno library to visualize missing data in your dataset.

import pandas as pd  
import missingno as msno

data = pd.read_csv("data.csv")

# Visualize missing data
msno.matrix(data)

50. Use SQL Databases: You can use the sqlite3 library in Python to interact with an SQLite database. For example, you can create a table in an SQLite database and insert some data into it:

import sqlite3  

# Connect to an SQLite database
conn = sqlite3.connect('example.db')

# Create a table
conn.execute('CREATE TABLE IF NOT EXISTS my_table (id INTEGER PRIMARY KEY, name TEXT)')

# Insert some data
conn.execute('INSERT INTO my_table (id, name) VALUES (?, ?)', (1, 'John'))
conn.execute('INSERT INTO my_table (id, name) VALUES (?, ?)', (2, 'Jane'))

# Commit the changes
conn.commit()

# Retrieve data
cursor = conn.execute('SELECT * FROM my_table')
for row in cursor:
print(row)

51. Requests Library: Use the requests library to make HTTP requests: The requests library provides a simple way to make HTTP requests to APIs or websites. Here’s an example of how to make a GET request.

import requests  

# make a GET request to a website
response = requests.get('https://www.google.com')

# print the response content
print(response.content)

52. OS Library: Use the os library to manipulate files and directories: The os library provides functions for interacting with files and directories.

import os  

# create a directory
os.mkdir('my_directory')

53. Working with JSON:

Encoding Python data to JSON format:

import json  

data = {
"name": "Mark",
"age": 28,
"gender": "Male"
}

json_data = json.dumps(data)
print(json_data)

Decoding JSON data to Python format:

import json  

json_data = '{"name": "Mark", "age": 28, "gender": "Male"}'

data = json.loads(json_data)
print(data)

54. Working with CSV Files: USing CSV module.

import csv  

# Reading a CSV file
with open('example.csv', 'r') as file:
csv_reader = csv.reader(file)
for row in csv_reader:
print(row)

# Writing to a CSV file
with open('example.csv', 'w', newline='') as file:
csv_writer = csv.writer(file)
csv_writer.writerow(['Name', 'Age', 'Gender'])
csv_writer.writerow(['John', 25, 'Male'])
csv_writer.writerow(['Jane', 30, 'Female'])

55. Using SQL Alchemy for Database Access: SQL Alchemy is a popular Python library for working with databases. It provides a simple interface for connecting to various databases and executing SQL queries.

from sqlalchemy import create_engine  

# Connect to a PostgreSQL database
engine = create_engine('postgresql://username:password@host:port/database_name')

# Execute a SQL query and return the results as a dataframe
query = "SELECT * FROM table_name WHERE column_name > 100"
df = pd.read_sql(query, engine)

# Write a dataframe to a new table in the database
df.to_sql('new_table_name', engine)

56. Feature selection using Recursive Feature Elimination (RFE):

  • RFE helps identify the most important features, leading to better model performance and faster training.
  • Feature selection can reduce overfitting and improve the generalization of your model.
from sklearn.datasets import load_iris  
from sklearn.feature_selection import RFE
from sklearn.linear_model import LogisticRegression

# Load your data
iris = load_iris()
X, y = iris.data, iris.target

# Create a Logistic Regression model
model = LogisticRegression()

# Perform Recursive Feature Elimination
rfe = RFE(model, n_features_to_select=2)
rfe.fit(X, y)

# Get the most important features
important_features = rfe.support_

57. Use Apache Parquet for efficient storage of columnar data: Apache Parquet is a columnar storage file format that provides efficient compression and encoding schemes, making it ideal for storing large datasets used in machine learning.

import pandas as pd  
import pyarrow as pa
import pyarrow.parquet as pq

# Read a CSV file using pandas
data = pd.read_csv("data.csv")

# Convert the pandas DataFrame to an Apache Arrow Table
table = pa.Table.from_pandas(data)

# Write the Arrow Table to a Parquet file
pq.write_table(table, "data.parquet")

# Read the Parquet file into a pandas DataFrame
data_from_parquet = pq.read_table("data.parquet").to_pandas()

58. Use Apache Kafka for real-time data streaming: Apache Kafka is a distributed streaming platform that enables you to build real-time data pipelines and applications.

from kafka import KafkaProducer, KafkaConsumer  

# Create a Kafka producer
producer = KafkaProducer(bootstrap_servers="localhost:9092")

# Send a message to a Kafka topic
producer.send("my_topic", b"Hello, Kafka!")

# Create a Kafka consumer
consumer = KafkaConsumer("my_topic", bootstrap_servers="localhost:9092")

# Consume messages from the Kafka topic
for msg in consumer:
print(msg.value)

59. Partition your data for efficient querying: Partitioning your data can help improve query performance by reducing the amount of data that needs to be read for a given query.

import pandas as pd  
import pyarrow as pa
import pyarrow.parquet as pq

# Read a CSV file using pandas
data = pd.read_csv("data.csv")

# Convert the pandas DataFrame to an Apache Arrow Table
table = pa.Table.from_pandas(data)

# Write the Arrow Table to a partitioned Parquet dataset
pq.write_to_dataset(table, root_path="partitioned_data", partition_cols=["state"])

# Read the partitioned Parquet dataset into a pandas DataFrame
data_from_partitioned_parquet = pq.ParquetDataset("partitioned_data").read().to_pandas()

60. Use data augmentation techniques to increase dataset size: Data augmentation involves creating new training examples by applying various transformations to the existing data, which can help improve model performance.

import numpy as np  
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Define an image data generator for data augmentation
datagen = ImageDataGenerator(
rotation_range=20,
width_shift_range=0.2,
height_shift_range=0.2,
horizontal_flip=True,
)

# Load your data
(x_train, y_train), (_, _) = tf.keras.datasets.cifar10.load_data()
x_train = x_train.astype(np.float32) / 255.0

# Fit the data generator to your data
datagen.fit(x_train)

# Train your model with augmented data
model = create_your_model()
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
model.fit(datagen.flow(x_train, y_train, batch_size=32), epochs=10)

61. Using Flask for model deployment: Below is an example of how to use Flask to deploy a machine learning model:

from flask import Flask, request, jsonify  
import joblib

app = Flask(__name__)

@app.route('/predict', methods=['POST'])
def predict():
data = request.get_json()
features = [data['feature1'], data['feature2'], data['feature3']]
model = joblib.load('model.pkl')
prediction = model.predict([features])[0]
response = {'prediction': int(prediction)}
return jsonify(response)

if __name__ == '__main__':
app.run()

62. Using Pytest for testing:

For example, we have a file called math_operations.py.

# math_operations.py  

def add(a, b):
return a + b

def multiply(a, b):
return a * b

Next, create a test module with the same name as your module, but with a test_ prefix. In our case, we'll create a file calledtest_math_operations.py:

# test_math_operations.py  

import math_operations

def test_add():
assert math_operations.add(2, 3) == 5
assert math_operations.add(-1, 1) == 0
assert math_operations.add(0, 0) == 0

def test_multiply():
assert math_operations.multiply(2, 3) == 6
assert math_operations.multiply(-1, 1) == -1
assert math_operations.multiply(0, 0) == 0

Run the tests using the pytest command

pytest test_math_operations.py

Pytest will discover and run the test functions in the test_math_operations.py module.

63. Use automated data pipelines: Automated data pipelines can help you automate the process of data ingestion, cleaning, and transformation. Some of the important tools are

Apache Airflow Ml pipeline

from airflow import DAG  
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def preprocess_data():
# Preprocess data here
pass

def train_model():
# Train model here
pass

default_args = {
'owner': 'myname',
'start_date': datetime(2023, 3, 15),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

with DAG('my_dag', default_args=default_args, schedule_interval='@daily') as dag:
preprocess_task = PythonOperator(task_id='preprocess_task', python_callable=preprocess_data)
train_task = PythonOperator(task_id='train_task', python_callable=train_model)

preprocess_task >> train_task

64. Use Transfer Learning: Transfer learning can help you reuse and adapt pre-trained machine learning models for your own use cases. Here’s an example of how to use transfer learning with TensorFlow:

import tensorflow as tf  
from tensorflow.keras.applications import VGG16

# Load pre-trained model
base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Freeze base layers
for layer in base_model.layers:
layer.trainable = False

# Add custom top layers
x = base_model.output
x = tf.keras.layers.GlobalAveragePooling2D()(x)
x = tf.keras.layers.Dense(256, activation='relu')(x)
predictions = tf.keras.layers.Dense(10, activation='softmax')(x)

# Create new model
model = tf.keras.models.Model(inputs=base_model.input, outputs=predictions)

# Compile and train model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
model.fit(X_train, y_train, epochs=10)

65. Use automated machine learning(Auto ML): By using platforms like H2O.ai or Google Cloud AutoML, you can automatically select, train, and deploy models based on your data and requirements. Here’s an example of how to use H2O.ai’s AutoML platform:

import h2o  
from h2o.automl import H2OAutoML

# Start H2O cluster
h2o.init()

# Load data
data = h2o.import_file('my-data.csv')

# Define target variable
target = 'label'

# Split data into train and test sets
train, test = data.split_frame(ratios=[0.8])

# Define AutoML settings
automl = H2OAutoML(max_models=10, seed=1234)

# Train AutoML model
automl.train(x=data.columns, y=target, training_frame=train)

# Evaluate AutoML model
predictions = automl.leader.predict(test)
accuracy = (predictions['predict'] == test[target]).mean()
print(f'Accuracy: {accuracy}')

66. Use anomaly detection: By using libraries like PyOD or TensorFlow, you can detect anomalies based on statistical or machine learning techniques. Here’s an example of how to use PyOD to detect anomalies in a dataset:

import numpy as np  
from pyod.models.knn import KNN

# Load data
X = np.load('my-data.npy')

# Define anomaly detector
detector = KNN(n_neighbors=5)

# Train detector
detector.fit(X)

# Detect anomalies
anomaly_scores = detector.decision_scores_
threshold = np.percentile(anomaly_scores, 95)
anomalies = np.where(anomaly_scores > threshold)

# Print anomalies
print(f'Anomalies: {anomalies}')

67. Using Weights and Biases: Here’s an example of how to use Weights & Biases to run and track machine learning experiments.

import wandb  
import tensorflow as tf

# Initialize W&B
wandb.init(project='my-project')

# Load data
data = tf.data.TFRecordDataset('my-data.tfrecord')

# Define hyperparameters
config = wandb.config
config.learning_rate = 0.1
config.num_epochs = 10

# Define model
model = tf.keras.models.Sequential([
tf.keras.layers.Dense(32, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
model.compile(optimizer=tf.keras.optimizers.Adam(), loss='categorical_crossentropy', metrics=['accuracy'])

# Train model
history = model.fit(data.batch(32), epochs=config.num_epochs)

# Log metrics and artifacts to W&B
wandb.log({'accuracy': history.history['accuracy'][-1]})
wandb.log_artifact('my-model.h5')

68. Important tools managing machine learning workflows:

69. Use Data Compression: Consider using tools and libraries such as zlib, gzip, or bz2 for data compression in Python.

import zlib  

# Compress data with zlib
data_compressed = zlib.compress(data)

# Decompress data with zlib
data_decompressed = zlib.decompress(data_compressed)

70. Data serialization: Consider using tools and libraries such as JSON, YAML, or protobuf for data serialization in Python.

import json  

# Serialize data to JSON
data_json = json.dumps(data)

# Deserialize data from JSON
data_deserialized = json.loads(data_json)

71. Data normalization and scaling: Consider using tools and libraries such as scikit-learn, TensorFlow, or PyTorch for data normalization and scaling in Python.

import pandas as pd  
from sklearn.preprocessing import StandardScaler, MinMaxScaler

# Standardize data with Z-score normalization
scaler = StandardScaler()
data_normalized = scaler.fit_transform(data)

# Scale data with min-max scaling
scaler = MinMaxScaler()
data_scaled = scaler.fit_transform(data)

72. Data encryption and security: Consider using tools and libraries such as cryptography, Fernet, or PyAesCrypt for data encryption and security in Python.

from cryptography.fernet import Fernet  

# Generate encryption key with Fernet
key = Fernet.generate_key()

# Encrypt data with Fernet
cipher_suite = Fernet(key)
encrypted_data = cipher_suite.encrypt(data)

# Decrypt data with Fernet
decrypted_data = cipher_suite.decrypt(encrypted_data)

import hashlib

# Hash data with hashlib
hash_value = hashlib.sha256(data.encode('utf-8')).hexdigest()

import tokenizers

# Define tokenization with tokenizers
tokenizer = tokenizers.Tokenizer(tokenizers.models.WordPiece('vocab.txt', unk_token='[UNK]'))
encoded_data = tokenizer.encode(data).ids

73. Data Validation using Great Expectation:

import great_expectations as ge  

# Load a dataset (e.g., a Pandas DataFrame)
data = ge.read_csv("data.csv")

# Create an Expectation Suite
expectation_suite = data.create_expectation_suite("my_suite")

# Add expectations
data.expect_column_values_to_be_unique("id")
data.expect_column_values_to_not_be_null("name")
data.expect_column_mean_to_be_between("age", min_value=20, max_value=40)

# Validate data against the Expectation Suite
validation_result = data.validate(expectation_type="basic")

# Save the Expectation Suite and the validation result
ge.save_expectation_suite(expectation_suite, "my_suite.json")
ge.save_validation_result(validation_result, "my_suite_validation.json")

74. **logging** module: Use the logging module for flexible logging.

import logging  

logging.basicConfig(level=logging.INFO)
logging.info("This is an info message.")
logging.error("This is an error message.")

75. Use Dask dataframe : Dask is a powerful library for parallel and distributed computing in Python. It allows you to process large datasets that don’t fit into memory by breaking them into smaller chunks and processing them in parallel.

import dask.dataframe as dd  

# Read CSV file using Dask (file is partitioned into smaller chunks)
ddf = dd.read_csv('large_file.csv')

# Perform operations on the data (lazy evaluation)
filtered_ddf = ddf[ddf['column_A'] > 10]
mean_value = filtered_ddf['column_B'].mean()

# Compute the result (operations are executed in parallel)
result = mean_value.compute()
print("Mean of column B for rows where column A > 10:", result)