ETL 및 데이터 분석에 CTAS 및 INSERT INTO 사용 - Amazon Athena

ETL 및 데이터 분석에 CTAS 및 INSERT INTO 사용

Athena에서 Create Table as Select(CTAS) 및 INSERT INTO 문을 사용하면 데이터 처리를 위해 Amazon S3에 데이터를 추출, 변환 및 로드(ETL)할 수 있습니다. 이 주제에서는 이러한 문을 사용하여 데이터 세트를 분할하고 열 기반 데이터 형식으로 변환함으로써 데이터 분석에 최적화되도록 하는 방법을 보여줍니다.

CTAS 문은 표준 SELECT 쿼리를 사용하여 새 테이블을 생성합니다. CTAS 문을 사용하여 분석할 데이터의 하위 세트를 만들 수 있습니다. 하나의 CTAS 문에서, 데이터를 분할하고 압축을 지정하고 데이터를 Apache Parquet 또는 Apache ORC와 같은 열 기반 형식으로 변환할 수 있습니다. CTAS 쿼리를 실행하면 생성한 테이블 및 파티션은 AWS Glue Data Catalog에 자동으로 추가됩니다. 이렇게 하면 생성한 새 테이블 및 파티션을 후속 쿼리에 즉시 사용할 수 있습니다.

INSERT INTO 문은 원본 테이블에서 실행되는 SELECT 쿼리 문을 기반으로 대상 테이블에 새 행을 삽입합니다. INSERT INTO 문을 사용하면 CTAS가 지원하는 모든 변환을 사용하여 CSV 형식의 원본 테이블 데이터를 대상 테이블 데이터로 변환하고 로드할 수 있습니다.

개요

Athena에서 CTAS 문을 사용하여 데이터의 초기 배치 변환을 수행합니다. 그런 다음, 여러 INSERT INTO 문을 사용하여 CTAS 문에 의해 생성된 테이블에 증분 업데이트를 만듭니다.

1단계: 원본 데이터 집합을 기반으로 테이블 생성

이 주제의 예제에서는 공개적으로 이용 가능한 NOAA Global Historical Climatology Network Daily(GHCN-D) 데이터 집합의 Amazon S3 읽기 가능 하위 집합을 사용합니다. Amazon S3의 데이터에는 다음과 같은 특성이 있습니다.

Location: s3://aws-bigdata-blog/artifacts/athena-ctas-insert-into-blog/ Total objects: 41727 Size of CSV dataset: 11.3 GB Region: us-east-1

원본 데이터는 파티션 없이 Amazon S3에 저장됩니다. 데이터는 다음과 유사한 파일에서 CSV 형식입니다.

2019-10-31 13:06:57 413.1 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0000 2019-10-31 13:06:57 412.0 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0001 2019-10-31 13:06:57 34.4 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0002 2019-10-31 13:06:57 412.2 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0100 2019-10-31 13:06:57 412.7 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0101

이 샘플의 파일 크기는 상대적으로 작습니다. 파일을 더 큰 파일로 병합하면 총 파일 수를 줄일 수 있으므로 쿼리 성능이 향상됩니다. CTAS 및 INSERT INTO 문을 사용하여 쿼리 성능을 개선할 수 있습니다.

샘플 데이터 세트를 기반으로 데이터베이스 및 테이블을 생성하려면
  1. Athena 콘솔에서 미국 동부(버지니아 북부)(US East (N. Virginia)) AWS 리전을 선택합니다. us-east-1에서 이 자습서의 모든 쿼리를 실행합니다.

  2. Athena 쿼리 편집기에서 CREATE DATABASE 명령을 실행하여 데이터베이스를 생성합니다.

    CREATE DATABASE blogdb
  3. 다음 문을 실행하여 테이블을 생성합니다.

    CREATE EXTERNAL TABLE `blogdb`.`original_csv` ( `id` string, `date` string, `element` string, `datavalue` bigint, `mflag` string, `qflag` string, `sflag` string, `obstime` bigint) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://aws-bigdata-blog/artifacts/athena-ctas-insert-into-blog/'

2단계: CTAS를 사용하여 데이터 분할, 변환 및 압축

테이블을 생성한 후에는 단일 CTAS 문을 사용하여 데이터를 Snappy 압축을 사용하는 Parquet 형식으로 변환하고 연도별로 데이터를 분할할 수 있습니다.

1단계에서 생성한 테이블에는 날짜 형식이 YYYYMMDD(예: 20100104)인 date 필드가 있습니다. 새 테이블이 year에 분할되므로 다음 절차의 샘플 문은 Presto 함수 substr("date",1,4)을 사용하여 date 필드에서 year 값을 추출합니다.

연도별로 분할하기 위해 데이터를 Snappy 압축을 사용하는 Parquet 형식으로 변환하려면
  • 다음 CTAS 문을 실행하여 버킷을 Amazon S3 버킷 위치로 바꿉니다.

    CREATE table new_parquet WITH (format='PARQUET', parquet_compression='SNAPPY', partitioned_by=array['year'], external_location = 's3://your-bucket/optimized-data/') AS SELECT id, date, element, datavalue, mflag, qflag, sflag, obstime, substr("date",1,4) AS year FROM original_csv WHERE cast(substr("date",1,4) AS bigint) >= 2015 AND cast(substr("date",1,4) AS bigint) <= 2019
    참고

    이 예제에서 생성한 테이블에는 2015년부터 2019년까지의 데이터만 포함됩니다. 3단계에서는 INSERT INTO 명령을 사용하여 이 테이블에 새 데이터를 추가합니다.

쿼리가 완료되면 다음 절차를 사용하여 CTAS 문에서 지정한 Amazon S3 위치의 출력을 확인합니다.

CTAS 문에 의해 생성된 파티션 및 Parquet 파일을 확인하려면
  1. 생성된 파티션을 표시하려면 다음 AWS CLI 명령을 실행합니다. 끝에 슬래시(/)가 있어야 합니다.

    aws s3 ls s3://your-bucket/optimized-data/

    출력이 파티션에 표시됩니다.

    PRE year=2015/ PRE year=2016/ PRE year=2017/ PRE year=2018/ PRE year=2019/
  2. Parquet 파일을 확인하려면 다음 명령을 실행합니다. 출력을 처음 5개의 결과로 제한하는 | head -5 옵션은 Windows에서 사용할 수 없습니다.

    aws s3 ls s3://your-bucket/optimized-data/ --recursive --human-readable | head -5

    다음과 유사하게 출력됩니다.

    2019-10-31 14:51:05 7.3 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_1be48df2-3154-438b-b61d-8fb23809679d 2019-10-31 14:51:05 7.0 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_2a57f4e2-ffa0-4be3-9c3f-28b16d86ed5a 2019-10-31 14:51:05 9.9 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_34381db1-00ca-4092-bd65-ab04e06dc799 2019-10-31 14:51:05 7.5 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_354a2bc1-345f-4996-9073-096cb863308d 2019-10-31 14:51:05 6.9 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_42da4cfd-6e21-40a1-8152-0b902da385a1

3단계: INSERT INTO를 사용하여 데이터 추가

2단계에서는 CTAS를 사용하여 2015년부터 2019년까지 파티션이 있는 테이블을 생성했습니다. 하지만 원본 데이터 세트에는 2010년부터 2014년까지의 데이터도 포함되어 있습니다. 이제 INSERT INTO 문을 사용하여 해당 데이터를 추가합니다.

하나 이상의 INSERT INTO 문을 사용하여 테이블에 데이터를 추가하려면
  1. WHERE 절에 2015년 이전의 연도를 지정하여 다음 INSERT INTO 명령을 실행합니다.

    INSERT INTO new_parquet SELECT id, date, element, datavalue, mflag, qflag, sflag, obstime, substr("date",1,4) AS year FROM original_csv WHERE cast(substr("date",1,4) AS bigint) < 2015
  2. 다음 구문을 사용하여 aws s3 ls 명령을 다시 실행합니다.

    aws s3 ls s3://your-bucket/optimized-data/

    출력이 새 파티션에 표시됩니다.

    PRE year=2010/ PRE year=2011/ PRE year=2012/ PRE year=2013/ PRE year=2014/ PRE year=2015/ PRE year=2016/ PRE year=2017/ PRE year=2018/ PRE year=2019/
  3. Parquet 형식에서 압축 및 열 기반 스토리지를 사용하여 얻은 데이터 세트의 크기 감소를 확인하려면 다음 명령을 실행합니다.

    aws s3 ls s3://your-bucket/optimized-data/ --recursive --human-readable --summarize

    다음 결과는 Snappy 압축을 사용하는 Parquet 형식으로 처리한 후 데이터 세트의 크기가 1.2GB임을 보여줍니다.

    ... 2020-01-22 18:12:02 2.8 MiB optimized-data/year=2019/20200122_181132_00003_nja5r_f0182e6c-38f4-4245-afa2-9f5bfa8d6d8f 2020-01-22 18:11:59 3.7 MiB optimized-data/year=2019/20200122_181132_00003_nja5r_fd9906b7-06cf-4055-a05b-f050e139946e Total Objects: 300 Total Size: 1.2 GiB
  4. 더 많은 CSV 데이터가 원본 테이블에 추가되면 INSERT INTO 문을 사용하여 해당 데이터를 Parquet 테이블에 추가할 수 있습니다. 예를 들어 2020년에 대한 새 데이터가 있는 경우 다음 INSERT INTO 문을 실행할 수 있습니다. INSERT INTO 문은 데이터 및 관련 파티션을 new_parquet 테이블에 추가합니다.

    INSERT INTO new_parquet SELECT id, date, element, datavalue, mflag, qflag, sflag, obstime, substr("date",1,4) AS year FROM original_csv WHERE cast(substr("date",1,4) AS bigint) = 2020
    참고

    INSERT INTO 문은 대상 테이블에 최대 100개의 파티션 쓰기를 지원합니다. 또한 100개 이상의 파티션을 추가하려면 여러 INSERT INTO 문을 실행합니다. 자세한 내용은 CTAS 및 INSERT INTO를 사용하여 100개 파티션 한도 해결 단원을 참조하세요.

4단계: 성능 및 비용 차이 측정

데이터를 변환한 후에는 새 테이블 및 이전 테이블에서 동일한 쿼리를 실행하고 결과를 비교하여 성능 향상 및 비용 절감을 측정할 수 있습니다.

참고

Athena 쿼리별 비용에 대한 자세한 내용은 Amazon Athena 요금을 참조하세요.

성능 향상 및 비용 차이를 측정하려면
  1. 원본 테이블에서 다음 쿼리를 실행합니다. 쿼리는 연도의 모든 값에 대해 고유 ID 수를 찾습니다.

    SELECT substr("date",1,4) as year, COUNT(DISTINCT id) FROM original_csv GROUP BY 1 ORDER BY 1 DESC
  2. 쿼리가 실행된 시간과 스캔 데이터의 양을 적어둡니다.

  3. 새 테이블에서 동일한 쿼리를 실행하고 쿼리 런타임 및 스캔 데이터의 양을 적어둡니다.

    SELECT year, COUNT(DISTINCT id) FROM new_parquet GROUP BY 1 ORDER BY 1 DESC
  4. 결과를 비교하고 성능 및 비용 차이를 계산합니다. 다음 샘플 결과는 새 테이블의 테스트 쿼리가 이전 테이블의 쿼리보다 빠르고 저렴하다는 것을 보여줍니다.

    런타임 스캔 데이터
    원본 16.88초 11.35GB
    신규 3.79초 428.05MB
  5. 원본 테이블에서 다음 샘플 쿼리를 실행합니다. 이 쿼리에서는 2018년 지구의 평균 최고 기온(섭씨), 평균 최저 기온(섭씨) 및 평균 강우량(mm)을 계산합니다.

    SELECT element, round(avg(CAST(datavalue AS real)/10),2) AS value FROM original_csv WHERE element IN ('TMIN', 'TMAX', 'PRCP') AND substr("date",1,4) = '2018' GROUP BY 1
  6. 쿼리가 실행된 시간과 스캔 데이터의 양을 적어둡니다.

  7. 새 테이블에서 동일한 쿼리를 실행하고 쿼리 런타임 및 스캔 데이터의 양을 적어둡니다.

    SELECT element, round(avg(CAST(datavalue AS real)/10),2) AS value FROM new_parquet WHERE element IN ('TMIN', 'TMAX', 'PRCP') and year = '2018' GROUP BY 1
  8. 결과를 비교하고 성능 및 비용 차이를 계산합니다. 다음 샘플 결과는 새 테이블의 테스트 쿼리가 이전 테이블의 쿼리보다 빠르고 저렴하다는 것을 보여줍니다.

    런타임 스캔 데이터
    원본 18.65초 11.35GB
    신규 1.92초 68MB

요약

이 주제에서는 Athena에 CTAS 및 INSERT INTO 문을 사용하여 ETL 작업을 수행하는 방법을 알아보았습니다. CTAS 문을 사용하여 데이터를 Snappy 압축을 사용하는 Parquet 형식으로 변환하는 첫 번째 변환 세트를 수행했습니다. 또한 CTAS 문을 사용하여 분할되지 않은 데이터 세트에서 분할된 데이터 세트로 변환했습니다. 이를 통해 데이터 세트의 크기를 줄이고 쿼리 실행 비용을 낮출 수 있었습니다. 새 데이터를 사용할 수 있게 되면 CTAS 문을 통해 생성한 테이블에 데이터를 변환하고 로드하는 데 INSERT INTO 문을 사용할 수 있습니다.