本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
提取和查询数据湖中的 AWS IoT SiteWise 元数据属性
由 Ambarish Dongaonkar 创作 () AWS
摘要
AWS物联网 SiteWise 使用资产模型和层次结构来表示您的工业设备、流程和设施。每个模型或资产可以具有特定于您的环境的多个属性。示例元数据属性包括资产的站点或物理位置、工厂详细信息和设备标识符。这些属性值与资产计量数据相辅相成,实现商业价值最大化。机器学习 (ML) 可以为这些元数据提供更多见解,并简化工程作业。
但是,无法直接从 AWS IoT SiteWise 服务查询元数据属性。要使这些属性可查询,您必须将它们提取并摄取到数据湖中。此模式使用 Python 脚本提取所有AWS物联网 SiteWise 资产的属性,并将它们摄取到亚马逊简单存储服务 (Amazon S3) 存储桶中的数据湖中。完成此过程后,您可以在 Amazon Athena 中使用SQL查询来访问AWS物 SiteWise 联网元数据属性和其他数据集,例如测量数据集。使用AWS物联网 SiteWise 监视器或仪表板时,元数据属性信息也很有用。您也可以使用 S3 存储桶中提取的属性来构建AWS QuickSight 控制面板。
该模式具有参考代码,您可以通过使用最适合您的用例的计算服务(例如 AWS Lambda 或 G AWS lue)来实现代码。
先决条件和限制
先决条件
一个活动的 AWS 账户。
设置 AWS Lambda 函数或 Glue 任务的权限AWS。
Amazon S3 存储桶。
资产模型和层次结构是在AWS物联网 SiteWise中设置的。有关更多信息,请参阅创建资产模型(AWSIoT SiteWise 文档)。
架构
您可以使用 Lambda 函数或 Glue AWS 任务来完成此过程。如果您的模型少于 100 个,并且每个模型平均具有 15 个或更少的属性,我们建议使用 Lambda。对于所有其他用例,我们建议使用 AWS Glue。
下图演示了参考架构和工作流。
![显示所描述的提取和查询过程的架构图。](images/pattern-img/22b59ff7-3df3-4a5b-9973-d43967bd58fd/images/fa3d80bf-df9a-49fe-971c-a055339b2cd2.png)
计划的 Gl AWS ue 作业或 Lambda 函数将运行。它从 AWS IoT SiteWise 中提取资产元数据属性并将其提取到 S3 存储桶中。
AWSGlue 爬网程序在 S3 存储桶中抓取的数据并在 Glue 数据目录中AWS创建表。
Amazon Athena 使用标准版SQL查询 Glue 数据目录中的表AWS。
自动化和扩缩
您可以根据物AWS联网 SiteWise 资产配置的更新频率AWS将 Lambda 函数或 Glue 作业安排为每天或每周运行。
示例代码可以处理的AWS物联网 SiteWise 资产数量没有限制,但是大量资产会增加完成该过程所需的时间。
工具
Amazon Athena 是一项交互式查询服务,可帮助您使用标准直接分析亚马逊简单存储服务 (Amazon S3) 中的数据。SQL
AWSGlu e 是一项完全托管的提取、转换和加载 (ETL) 服务。它可以帮助您在数据存储和数据流之间对数据进行可靠地分类、清理、扩充和移动。
AWSIdentity and Access Management (IAM) 通过控制谁经过身份验证并有权使用AWS资源,从而帮助您安全地管理对资源的访问权限。
AWS物联网 SiteWise可帮助您大规模收集、建模、分析和可视化来自工业设备的数据。
AWSLambda 是一项计算服务,可帮助您运行代码,而无需预置或管理服务器。它仅在需要时运行您的代码,并且能自动扩缩,因此您只需为使用的计算时间付费。
Amazon Simple Storage Service (Amazon S3)是一项基于云的对象存储服务,可帮助您存储、保护和检索任意数量的数据。
AWSSDKf@@ or Python (Boto3)
是一个软件开发套件,可帮助你将 Python 应用程序、库或脚本与AWS服务集成。
操作说明
任务 | 描述 | 所需技能 |
---|---|---|
在中配置权限IAM。 | 在IAM控制台中,向 Lambda 函数或 Glue 任务所担任的IAM角色授予AWS执行以下操作的权限:
有关更多信息,请参阅为AWS服务创建角色(IAM文档)。 | 将军 AWS |
创建 Lambda 函数或 Glue 任务AWS。 | 如果您使用 Lambda,请创建新的 Lambda 函数。对于运行时系统,请选择 Python。有关更多信息,请参阅使用 Python 构建 Lambda 函数(Lambda 文档)。 如果你使用的是 Glue,AWS请在 Glue 控制台中创建一个新的 Python 外壳任务。AWS有关更多信息,请参阅添加 Python 外壳任务(AWSGlue 文档)。 | 将军 AWS |
更新 Lambda 函数或 Glue 任务AWS。 | 修改新的 Lambda 函数或 Gl AWS ue 任务,然后在 “其他信息” 部分输入代码示例。修改您的用例所需代码。有关更多信息,请参阅使用控制台编辑器编辑代码(Lambda 文档)和使用脚本(Gl ue 文档AWS)。 | 将军 AWS |
任务 | 描述 | 所需技能 |
---|---|---|
运行 Lambda 函数或 Glue 作业AWS。 | 运行 Lambda 函数或 Glue 作业AWS。有关更多信息,请参阅调用 Lambda 函数(Lambd a 文档)或使用触发器启动作业(G AWS lue 文档)。这会提取物AWS联网 SiteWise 层次结构中资产和模型的元数据属性,并将其存储在指定的 S3 存储桶中。 | 将军 AWS |
设置一个 AWS Glue 爬行器。 | 使用格式化文件所需的格式分类器设置 AWS Glue CSV 爬虫。使用 Lambda 函数或 Glue 任务中使用的 S3 存储桶和前缀详细信息AWS。有关更多信息,请参阅定义爬虫(AWSGlue 文档)。 | 将军 AWS |
运行 Glu AWS e 爬行器。 | 运行爬网程序以处理由 Lambda 函数或 AWS Glue 任务创建的数据文件。爬虫在指定的 AWS Glue 数据目录中创建一个表。有关更多信息,请参阅或使用触发器启动爬虫(AWSGlue 文档)。 | 将军 AWS |
查询元数据属性。 | 使用 Amazon Athena,根据您的用例的AWS需要使用SQL标准来查询 Glue 数据目录。您可将元数据属性表与其他数据库和表联接。有关更多信息,请参阅入门(Amazon Athena 文档)。 | 将军 AWS |
相关资源
其他信息
代码
提供的示例代码仅供参考,您可根据用例的需要自定义此代码。
# Following code can be used in an AWS Lambda function or in an AWS Glue Python shell job. # IAM roles used for this job need read access to the AWS IoT SiteWise service and write access to the S3 bucket. sw_client = boto3.client('iotsitewise') s3_client = boto3.client('s3') output = io.StringIO() attribute_list=[] bucket = '{3_bucket name}' prefix = '{s3_bucket prefix}' output.write("model_id,model_name,asset_id,asset_name,attribuet_id,attribute_name,attribute_value\n") m_resp = sw_client.list_asset_models() for m_rec in m_resp['assetModelSummaries']: model_id = m_rec['id'] model_name = m_rec['name'] attribute_list.clear() dam_response = sw_client.describe_asset_model(assetModelId=model_id) for rec in dam_response['assetModelProperties']: if 'attribute' in rec['type']: attribute_list.append(rec['name']) response = sw_client.list_assets(assetModelId=model_id, filter='ALL') for asset in response['assetSummaries']: asset_id = asset['id'] asset_name = asset['name'] resp = sw_client.describe_asset(assetId=asset_id) for rec in resp['assetProperties']: if rec['name'] in attribute_list: p_resp = sw_client.get_asset_property_value(assetId=asset_id, propertyId=rec['id']) if 'propertyValue' in p_resp: if p_resp['propertyValue']['value']: if 'stringValue' in p_resp['propertyValue']['value']: output.write(model_id + "," + model_name + "," + asset_id + "," + asset_name + "," + rec['id'] + "," + rec['name'] + "," + str(p_resp['propertyValue']['value']['stringValue']) + "\n") if 'doubleValue' in p_resp['propertyValue']['value']: output.write(model_id + "," + model_name + "," + asset_id + "," + asset_name + "," + rec['id'] + "," + rec['name'] + "," + str(p_resp['propertyValue']['value']['doubleValue']) + "\n") if 'integerValue' in p_resp['propertyValue']['value']: output.write(model_id + "," + model_name + "," + asset_id + "," + asset_name + "," + rec['id'] + "," + rec['name'] + "," + str(p_resp['propertyValue']['value']['integerValue']) + "\n") if 'booleanValue' in p_resp['propertyValue']['value']: output.write(model_id + "," + model_name + "," + asset_id + "," + asset_name + "," + rec['id'] + "," + rec['name'] + "," + str(p_resp['propertyValue']['value']['booleanValue']) + "\n") output.seek(0) s3_client.put_object(Bucket=bucket, Key= prefix + '/data.csv', Body=output.getvalue()) output.close()