Key Points:
- Download S3 File: The
download_s3_file
function reads the file from S3 into a pandas DataFrame. - Encryption: The
encrypt_data
function encrypts SSN and credit card information using the KMS key. - Processing: The
process_and_encrypt_pii
function applies encryption and removes sensitive fields. - Save as Parquet: The
save_as_parquet
function converts the DataFrame to a Parquet file. - Upload to S3: The
upload_parquet_to_s3
function uploads the Parquet file back to S3. - ML Model Loading and Prediction:
- The
apply_ml_model
function loads a pre-trained ML model usingjoblib
and applies it to the DataFrame. The model's prediction is added as a new column to the DataFrame - ML Model Path:
- The
ml_model_path
variable specifies the location of your pre-trained ML model (e.g., a.pkl
file).
- The
Prerequisites:
- You need to have a pre-trained ML model saved as a
.pkl
file. The model should be trained and serialized using a library likescikit-learn
. - Make sure the feature set used by the ML model is compatible with the DataFrame after encryption.
import boto3
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from botocore.exceptions import ClientError
from cryptography.fernet import Fernet
import base64
import io
from sklearn.externals import joblib # for loading the ML model
# Initialize the AWS services
s3 = boto3.client('s3')
kms = boto3.client('kms')
def download_s3_file(bucket_name, file_key):
"""Download file from S3 and return its contents as a pandas DataFrame."""
try:
obj = s3.get_object(Bucket=bucket_name, Key=file_key)
df = pd.read_csv(io.BytesIO(obj['Body'].read())) # Assuming the file is in CSV format
return df
except ClientError as e:
print(f"Error downloading file from S3: {e}")
raise
def encrypt_data(kms_key_id, data):
"""Encrypt data using AWS KMS."""
response = kms.encrypt(KeyId=kms_key_id, Plaintext=data.encode())
encrypted_data = base64.b64encode(response['CiphertextBlob']).decode('utf-8')
return encrypted_data
def process_and_encrypt_pii(df, kms_key_id):
"""Encrypt SSN and credit card information in the DataFrame."""
df['encrypted_ssn'] = df['ssn'].apply(lambda x: encrypt_data(kms_key_id, x))
df['encrypted_credit_card'] = df['credit_card'].apply(lambda x: encrypt_data(kms_key_id, x))
# Drop original sensitive columns
df = df.drop(columns=['ssn', 'credit_card'])
return df
def apply_ml_model(df, model_path):
"""Apply a pre-trained ML model to the DataFrame."""
# Load the ML model (assuming it's a scikit-learn model saved with joblib)
model = joblib.load(model_path)
# Assuming the model predicts a column called 'prediction'
features = df.drop(columns=['encrypted_ssn', 'encrypted_credit_card']) # Adjust based on your feature set
df['prediction'] = model.predict(features)
return df
def save_as_parquet(df, output_file_path):
"""Save the DataFrame as a Parquet file."""
table = pa.Table.from_pandas(df)
pq.write_table(table, output_file_path)
def upload_parquet_to_s3(bucket_name, output_file_key, file_path):
"""Upload the Parquet file to an S3 bucket."""
try:
s3.upload_file(file_path, bucket_name, output_file_key)
print(f"Successfully uploaded Parquet file to s3://{bucket_name}/{output_file_key}")
except ClientError as e:
print(f"Error uploading Parquet file to S3: {e}")
raise
def main():
# S3 bucket and file details
input_bucket = 'your-input-bucket-name'
input_file_key = 'path/to/your/input-file.csv'
output_bucket = 'your-output-bucket-name'
output_file_key = 'path/to/your/output-file.parquet'
# KMS key ID
kms_key_id = 'your-kms-key-id'
# ML model path
ml_model_path = 'path/to/your/ml-model.pkl'
# Local output file path
local_output_file = '/tmp/output-file.parquet'
# Download the file from S3
df = download_s3_file(input_bucket, input_file_key)
# Encrypt sensitive information
encrypted_df = process_and_encrypt_pii(df, kms_key_id)
# Apply the ML model
final_df = apply_ml_model(encrypted_df, ml_model_path)
# Save the DataFrame as a Parquet file
save_as_parquet(final_df, local_output_file)
# Upload the Parquet file back to S3
upload_parquet_to_s3(output_bucket, output_file_key, local_output_file)
if __name__ == "__main__":
main()