本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在离线存储中创建 Feature Store 特征组后,您可以选择使用以下方法来获取数据:
-
使用亚马逊 SageMaker Python 软件开发工具包
-
在 Amazon Athena 中运行 SQL 查询
重要
功能存储要求在数据目录中注册 AWS Glue 数据。默认情况下,Feature Store 会在您创建要素组时自动生成 AWS Glue 数据目录。
为离线存储创建特征组并填充数据后,您可以通过运行查询或通过使用 SDK 联接存储在离线存储中的不同特征组数据来创建数据集。也可以将特征组联接到单个 pandas 数据框。您可以使用 Amazon Athena 编写和执行 SQL 查询。
注意
为确保您的数据是最新的,您可以将 AWS Glue 抓取程序设置为按计划运行。
要设置爬虫,请指定 AWS Glue 爬虫用于访问离线商店的 Amazon S3 存储桶的 IAM 角色。有关更多信息,请参阅创建 IAM 角色。
有关如何使用 AWS Glue 和 Athena 构建用于模型训练和推理的训练数据集的更多信息,请参阅。将 Feature Store 与 SDK for Python (Boto3) 结合使用
使用 Amaz SageMaker on Python 软件开发工具包从您的功能组中获取数据
您可以使用功能存储create_dataset()
函数创建数据集。您可以使用 SDK 执行以下操作:
-
从多个特征组创建数据集。
-
从特征组和 pandas 数据框创建数据集。
默认情况下,Feature Store 不包含您已从数据集中删除的记录,也不包含重复的记录。重复记录在事件时间列中具有记录 ID 和时间戳值。
在使用 SDK 创建数据集之前,必须启动 A SageMaker I 会话。使用以下代码启动会话。
import boto3
from sagemaker.session import Session
from sagemaker.feature_store.feature_store import FeatureStore
region = boto3.Session().region_name
boto_session = boto3.Session(region_name=region)
sagemaker_client = boto_session.client(
service_name="sagemaker", region_name=region
)
featurestore_runtime = boto_session.client(
service_name="sagemaker-featurestore-runtime",region_name=region
)
feature_store_session = Session(
boto_session=boto_session,
sagemaker_client=sagemaker_client,
sagemaker_featurestore_runtime_client=featurestore_runtime,
)
feature_store = FeatureStore(feature_store_session)
以下代码显示了从多个特征组创建数据集的示例。以下代码段使用示例功能组 “”、base_fg_name
“” 和 first_fg_name
“”,这些功能组可能不存在或在您的功能商店中具有相同的架构。second_fg_name
建议将这些特征组替换为 Feature Store 中存在的特征组。有关如何创建特征组的信息,请参阅步骤 3:创建特征组。
from sagemaker.feature_store.feature_group import FeatureGroup
s3_bucket_name = "offline-store-sdk-test"
base_fg_name = "base_fg_name
"
base_fg = FeatureGroup(name=base_fg_name, sagemaker_session=feature_store_session)
first_fg_name = "first_fg_name
"
first_fg = FeatureGroup(name=first_fg_name, sagemaker_session=feature_store_session)
second_fg_name = "second_fg_name
"
second_fg = FeatureGroup(name=second_fg_name, sagemaker_session=feature_store_session)
feature_store = FeatureStore(feature_store_session)
builder = feature_store.create_dataset(
base=base_fg,
output_path=f"s3://{amzn-s3-demo-bucket1}",
).with_feature_group(first_fg
).with_feature_group(second_fg, "base_id", ["base_feature_1"])
以下代码显示了从多个特征组和 pandas 数据框创建数据集的示例。
base_data = [[1, 187512346.0, 123, 128],
[2, 187512347.0, 168, 258],
[3, 187512348.0, 125, 184],
[1, 187512349.0, 195, 206]]
base_data_df = pd.DataFrame(
base_data,
columns=["base_id", "base_time", "base_feature_1", "base_feature_2"]
)
builder = feature_store.create_dataset(
base=base_data_df,
event_time_identifier_feature_name='base_time',
record_identifier_feature_name='base_id',
output_path=f"s3://{s3_bucket_name}"
).with_feature_group(first_fg
).with_feature_group(second_fg, "base_id", ["base_feature_1"])
Fe ature Stor APIscreate_dataset
函数的辅助方法。您可以使用它们执行以下操作:
-
从多个特征组创建数据集。
-
从多个特征组和一个 pandas 数据框创建数据集。
-
从单个特征组和一个 pandas 数据框创建数据集。
-
使用时间点精确联接创建数据集,其中联接的特征组中的记录按顺序排列。
-
使用重复的记录创建数据集,而不是遵循函数的默认行为。
-
使用已删除的记录创建数据集,而不是遵循函数的默认行为。
-
为您指定的时间段创建数据集。
-
将数据集另存为 CSV 文件。
-
将数据集另存为 pandas 数据框。
基本 特征组是联接的一个重要概念。基本特征组是有其他特征组或 pandas 数据框与之联接的特征组。对于每个数据集
您可以将以下可选方法添加到 create_dataset
函数,以配置创建数据集的方式:
-
with_feature_group
- 使用基本特征组中的记录标识符和目标特征名称在基本特征组和另一个特征组之间执行内部联接。下面提供了有关您指定的参数的信息:-
feature_group
- 要联接的特征组。 -
target_feature_name_in_base
- 基本特征组中用作联接键的特征的名称。其他特征组中的记录标识符是 Feature Store 在联接中使用的其他键。 -
included_feature_names
- 表示基本特征组特征名称的字符串列表。您还可以使用该字段指定要包含在数据集中的特征。 -
feature_name_in_target
- 可选字符串,表示目标特征组中要与基本特征组中的目标特征进行比较的特征。 -
join_comparator
- 可选JoinComparatorEnum
,表示将基本特征组中的目标特征与目标特征组中的特征联接时使用的比较器。默认情况下,这些JoinComparatorEnum
值可以是GREATER_THAN
、GREATER_THAN_OR_EQUAL_TO
、LESS_THAN
、LESS_THAN_OR_EQUAL_TO
、NOT_EQUAL_TO
或EQUALS
。 -
join_type
- 可选JoinTypeEnum
,表示基本特征组和目标特征组之间的联接类型。默认情况下,这些JoinTypeEnum
值可以是LEFT_JOIN
、RIGHT_JOIN
、FULL_JOIN
、CROSS_JOIN
或INNER_JOIN
。
-
-
with_event_time_range
- 使用您指定的事件时间范围创建数据集。 -
as_of
- 创建不超过您指定的时间戳的数据集。例如,如果指定datetime(2021, 11, 28, 23, 55, 59, 342380)
作为值,则会创建一个截至 2021 年 11 月 28 日的数据集。 -
point_time_accurate_join
- 创建一个数据集,其中基本特征组的所有事件时间值都小于您要联接的特征组或 pandas 数据帧的所有事件时间值。 -
include_duplicated_records
- 在特征组中保留重复的值。 -
include_deleted_records
- 在特征组中保留已删除的值。 -
with_number_of_recent_records_by_record_identifier
- 您指定的一个整数,用于确定数据集内显示了多少条最近记录。 -
with_number_of_records_by_record_identifier
- 一个整数,表示数据集内显示了多少条记录。
配置数据集后,您可以使用以下方法之一指定输出:
-
to_csv_file
- 将数据集另存为 CSV 文件。 -
to_dataframe
- 将数据集另存为 pandas 数据框。
您可以检索特定时间段之后的数据。以下代码检索某个时间戳之后的数据。
fg1 = FeatureGroup("example-feature-group-1")
feature_store.create_dataset(
base=fg1,
output_path="s3://example-S3-path
"
).with_number_of_records_from_query_results(5).to_csv_file()
您还可以检索特定时间段内的数据。您可以使用以下代码来获取特定时间范围内的数据:
fg1 = FeatureGroup("fg1")
feature_store.create_dataset(
base=fg1,
output_path="example-S3-path"
).with_event_time_range(
datetime(2021, 11, 28, 23, 55, 59, 342380),
datetime(2020, 11, 28, 23, 55, 59, 342380)
).to_csv_file() #example time range specified in datetime functions
您可能想将多个特征组联接到一个 pandas 数据框,其中特征组的事件时间值不迟于数据框的事件时间。使用以下代码作为模板来帮助执行联接。
fg1 = FeatureGroup("fg1")
fg2 = FeatureGroup("fg2")
events = [['2020-02-01T08:30:00Z', 6, 1],
['2020-02-02T10:15:30Z', 5, 2],
['2020-02-03T13:20:59Z', 1, 3],
['2021-01-01T00:00:00Z', 1, 4]]
df = pd.DataFrame(events, columns=['event_time', 'customer-id', 'title-id'])
feature_store.create_dataset(
base=df,
event_time_identifier_feature_name='event_time',
record_identifier_feature_name='customer_id',
output_path="s3://example-S3-path
"
).with_feature_group(fg1, "customer-id"
).with_feature_group(fg2, "title-id"
).point_in_time_accurate_join(
).to_csv_file()
您还可以检索特定时间段之后的数据。以下代码检索 as_of
方法中时间戳指定的时间之后的数据。
fg1 = FeatureGroup("fg1")
feature_store.create_dataset(
base=fg1,
output_path="s3://example-s3-file-path
"
).as_of(datetime(2021, 11, 28, 23, 55, 59, 342380)
).to_csv_file() # example datetime values
Amazon Athena 示例查询
您可以在 Amazon Athena 中编写查询,以便从特征组创建数据集。您还可以编写查询,从特征组和单个 pandas 数据框创建数据集。
交互式探索
此查询选择前 1000 条记录。
SELECT *
FROM <FeatureGroup.DataCatalogConfig.DatabaseName>.<FeatureGroup.DataCatalogConfig.TableName>
LIMIT 1000
没有重复项的最新快照
此查询选择最新的非重复记录。
SELECT *
FROM
(SELECT *,
row_number()
OVER (PARTITION BY <RecordIdentiferFeatureName>
ORDER BY <EventTimeFeatureName> desc, Api_Invocation_Time DESC, write_time DESC) AS row_num
FROM <FeatureGroup.DataCatalogConfig.DatabaseName>.<FeatureGroup.DataCatalogConfig.TableName>)
WHERE row_num = 1;
离线存储中没有重复项和已删除记录的最新快照
此查询会筛选出所有已删除记录,并从离线存储中选择非重复记录。
SELECT *
FROM
(SELECT *,
row_number()
OVER (PARTITION BY <RecordIdentiferFeatureName>
ORDER BY <EventTimeFeatureName> desc, Api_Invocation_Time DESC, write_time DESC) AS row_num
FROM <FeatureGroup.DataCatalogConfig.DatabaseName>.<FeatureGroup.DataCatalogConfig.TableName>)
WHERE row_num = 1 and
NOT is_deleted;
离线存储中没有重复项和已删除记录的时间旅行
此查询会筛选出所有已删除记录,并选择特定时间点的非重复记录。
SELECT *
FROM
(SELECT *,
row_number()
OVER (PARTITION BY <RecordIdentiferFeatureName>
ORDER BY <EventTimeFeatureName> desc, Api_Invocation_Time DESC, write_time DESC) AS row_num
FROM <FeatureGroup.DataCatalogConfig.DatabaseName>.<FeatureGroup.DataCatalogConfig.TableName>
where <EventTimeFeatureName> <= timestamp '<timestamp>')
-- replace timestamp '<timestamp>' with just <timestamp> if EventTimeFeature is of type fractional
WHERE row_num = 1 and
NOT is_deleted