电商网站通常有大量的商品信息,通常包括两类信息文本(商品名称,描述)和图片信息。根据某些国家及地区的法律及法规要求,成人用品在某些区域销售属于禁售品类,第三发卖家有时通过删除关键字的方法绕过网站基于关键字匹配禁售机制,导致法律风险。本文描述了基于AWS Comprehend、SageMaker自带的内建BlazingText算法及Facebook Fasttext算法和AWS ECS Fargate自建方案来实现通过对商品文本的基于NLP(自然语言处理)的品类自动判别,从而不依赖个别关键字。同时使用AWS的Rekognition服务来轻松实现对含NSFW(Not Safe for Work)内容的商品图片的审核。
如果有关于下面Demo网站的使用、整体架构、设计及实现的任何相关问题,请联系:dapengt@amazon.com
因为Demo的数据存储在DynamoDB的表上应用了2个小时TTL设置,所以数据两个小时候会自动清空
从上图可以看出,这个方案分别从AWS AI服务的三个层面:应用服务层(Amazon Comprehend)、平台服务层(Amazon SageMaker自带BlazingText算法)及自定义算法和容器来实现根据商品文本的自然语言分类模型来识别是否是成人商品,同时使用应用服务层的Rekognition Image服务来实现对商品图片是否含有NSFW的内容审核。
架构整体上是云原生、无服务器且基于事件驱动,各个功能组件实现松耦合。涵盖了从最简单的AWS AI服务的API调用到使用SageMaker内置算法,最后从算法到容器定义托管的全自定义方案,从而达到从AWS AI服务不同层面来实现商品类别识别的需求。
import json
import urllib.parse
import boto3
print('Loading function')
dynamodb = boto3.client('dynamodb')
def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=2))
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
s3 = boto3.resource('s3')
object_acl = s3.ObjectAcl(bucket, key)
# Update image file to public readable so that website could access it
response = object_acl.put(ACL='public-read')
# Invoke Rekognition image moderation API
rek = boto3.client('rekognition')
rek_resp = rek.detect_moderation_labels(
Image={
'S3Object': {
'Bucket': bucket,
'Name': key,
}
},
MinConfidence=20
)
# Flush Rek result into DDB
product_id = key[key.find('/')+1:]
response = dynamodb.update_item(
TableName='Products',
Key={
'id': {
'S': product_id
}
},
UpdateExpression="set nsfw=:nsfw",
# Update only if record exists by using condition expression
ConditionExpression='id = :id',
ExpressionAttributeValues={
':id' : {
'S': product_id
},
':nsfw': {
'S': json.dumps(rek_resp['ModerationLabels'])
}
},
ReturnValues="UPDATED_NEW"
)
print("DDB Response: " + json.dumps(response))
return {
'statusCode': 200,
'headers' : {
"Access-Control-Allow-Origin": "*"
},
'body': {
"rek" : json.dumps(rek_resp),
'ddb' : json.dumps(response)
}
}
import os
import os.path
import sys
# envLambdaTaskRoot = os.environ["LAMBDA_TASK_ROOT"]
# print("LAMBDA_TASK_ROOT env var:"+os.environ["LAMBDA_TASK_ROOT"])
# print("sys.path:"+str(sys.path))
# sys.path.insert(0,envLambdaTaskRoot+"/NewBotoVersion")
# print("sys.path:"+str(sys.path))
import botocore
import boto3
import json
print(boto3.__version__)
CCC_BUCKET_NAME = os.environ['CCC_BUCKET_NAME']
CCC_BUCKET_PREFIX = os.environ['CCC_BUCKET_PREFIX']
CCC_BUCKET_PREFIX_OUTPUT = os.environ['CCC_BUCKET_PREFIX_OUTPUT']
s3 = boto3.client('s3')
comprehend = boto3.client('comprehend')
def lambda_handler(event, context):
#print("Received event: " + json.dumps(event, indent=2))
for record in event['Records']:
# print(record['eventID'])
# print(record['eventName'])
# Submit a Comprehend custom classification analysis job to S3 bucket when a new product created
if record['eventName'] == 'INSERT':
print(record['dynamodb']['NewImage']['id']['S'])
print(record['dynamodb']['NewImage']['name']['S'])
#print(record['dynamodb']['NewImage']['desc']['S'])
id = record['dynamodb']['NewImage']['id']['S']
text = record['dynamodb']['NewImage']['name']['S'] + record['dynamodb']['NewImage']['desc']['S']
file = CCC_BUCKET_PREFIX + '/' + id
#response = s3.get_object(Bucket=bucket, Key=key)
s3_resp = s3.put_object(Bucket=CCC_BUCKET_NAME, Key=file, Body=text)
start_response = comprehend.start_document_classification_job(
JobName=id,
InputDataConfig={
'S3Uri': 's3://{}/{}'.format(CCC_BUCKET_NAME, file),
'InputFormat': 'ONE_DOC_PER_FILE'
},
OutputDataConfig={
'S3Uri': 's3://{}/{}'.format(CCC_BUCKET_NAME, CCC_BUCKET_PREFIX_OUTPUT)
},
#DataAccessRoleArn='arn:aws:iam::124456859051:role/service-role/GetProductDetail-role-eselzlwx',
#DataAccessRoleArn='arn:aws:iam::124456859051:role/Admin',
DataAccessRoleArn='arn:aws:iam::124456859051:role/service-role/AmazonComprehendServiceRole-AmazonComprehendServiceRole2-',
DocumentClassifierArn='arn:aws:comprehend:us-east-1:124456859051:document-classifier/AdultProduct'
)
print("Start response: %s\n", start_response)
# Check the status of the job
#describe_response = comprehend.describe_document_classification_job(JobId=start_response['JobId'])
#print("Describe response: %s\n", describe_response)
return 'Successfully processed {} records.'.format(len(event['Records']))
import json
import boto3
import pandas
import sagemaker
from sagemaker import get_execution_role
sess = sagemaker.Session()
role = get_execution_role()
# 设置数据集
bucket = 'zjutdp-mb3-data'
s3_test_data = 's3://{}/{}'.format(bucket, 'labeled/flipkart-blazingtext/flipkart-test.csv')
s3_train_data = 's3://{}/{}'.format(bucket, 'labeled/flipkart-blazingtext/flipkart-train.csv')
s3_validation_data = 's3://{}/{}'.format(bucket, 'labeled/flipkart-blazingtext/flipkart-valid.csv')
s3_output_location = 's3://{}/{}/output'.format(bucket, 'labeled/flipkart-blazingtext')
region_name = boto3.Session().region_name
# 设置算法容器
container = sagemaker.amazon.amazon_estimator.get_image_uri(region_name, "blazingtext", "latest")
print('Using SageMaker BlazingText container: {} ({})'.format(container, region_name))
# 建立SageMaker模型
bt_model = sagemaker.estimator.Estimator(container,
role,
train_instance_count=1,
train_instance_type='ml.c4.xlarge',
train_volume_size = 30,
train_max_run = 360000,
input_mode= 'File',
output_path=s3_output_location,
sagemaker_session=sess)
# 设置超参
bt_model.set_hyperparameters(mode="supervised",
epochs=20,
min_count=20,
learning_rate=0.1,
vector_dim=150,
early_stopping=True,
patience=4,
min_epochs=5,
word_ngrams=2)
# 设置训练验证数据集
train_data = sagemaker.session.s3_input(s3_train_data, distribution='FullyReplicated',
content_type='text/plain', s3_data_type='S3Prefix')
validation_data = sagemaker.session.s3_input(s3_validation_data, distribution='FullyReplicated',
content_type='text/plain', s3_data_type='S3Prefix')
data_channels = {'train': train_data, 'validation': validation_data}
# 训练模型
bt_model.fit(inputs=data_channels, logs=True)
# 部署模型
text_classifier = bt_model.deploy(initial_instance_count = 1,instance_type = 'ml.m4.xlarge')
# 读取测试数据
df = pandas.read_csv(s3_test_data, header = None)
# 取出前X(100)个产品文本进行测试
n = 100
sentences = df[1][:n].values.tolist()
payload = {"instances" : sentences}
# 使用部署好的文本分类服务进行推理
response = text_classifier.predict(json.dumps(payload))
predictions = json.loads(response)
# 验证前x的产品文本分类结果和测试数据集合中的相同
assert df[0][:n].values.tolist() == [p['label'][0] for p in predictions]