Apache Beam을 사용하여 애플리케이션 생성 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink는 이전에 Amazon Kinesis Data Analytics for Apache Flink로 알려졌습니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Apache Beam을 사용하여 애플리케이션 생성

이 연습에서는 Apache Beam을 사용하여 데이터를 변환하는 Managed Service for Apache Flink 애플리케이션을 생성합니다. Apache Beam은 스트리밍 데이터를 처리하기 위한 프로그래밍 모델입니다. Managed Service for Apache Flink로 Apache Beam을 사용하는 방법에 대한 자세한 내용을 알아보려면 Apache Beam 사용 섹션을 참조하세요.

참고

이 연습에 필수 사전 조건을 설정하려면 먼저 시작하기 (API) DataStream 연습을 완료하세요.

종속 리소스 생성

이 연습을 위해 Managed Service for Apache Flink 애플리케이션을 생성하기 전에 다음과 같은 종속 리소스를 생성해야 합니다.

  • 두 개의 Kinesis Data Streams(ExampleInputStreamExampleOutputStream)

  • 애플리케이션 코드를 저장할 Amazon S3 버킷(ka-app-code-<username>)

콘솔을 사용하여 Kinesis 스트림과 Amazon S3 버킷을 만들 수 있습니다. 이러한 리소스를 만드는 방법 설명은 다음 주제를 참조하세요.

입력 스트림에 샘플 레코드 쓰기

이 섹션에서는 Python 스크립트를 사용하여 애플리케이션에서 처리할 임의의 문자열을 스트림에 씁니다.

참고

이 섹션에서는 AWS SDK for Python (Boto)이 필요합니다.

  1. 다음 콘텐츠를 가진 ping.py이라는 파일을 생성합니다:

    import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
  2. ping.py 스크립트를 실행합니다.

    $ python ping.py

    자습서의 나머지 부분을 완료하는 동안 스크립트가 계속 돌아가게 둡니다.

애플리케이션 코드 다운로드 및 검토

이 예제의 Java 응용 프로그램 코드는 에서 제공됩니다 GitHub. 애플리케이션 코드를 다운로드하려면 다음을 수행하세요.

  1. 아직 설치하지 않았다면 Git 클라이언트를 설치합니다. 자세한 정보는 Git 설치를 참조하세요.

  2. 다음 명령을 사용하여 원격 리포지토리를 복제합니다:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. amazon-kinesis-data-analytics-java-examples/Beam 디렉터리로 이동합니다.

애플리케이션 코드는 BasicBeamStreamingJob.java 파일에 있습니다. 애플리케이션 코드에 대해 다음을 유의하십시오:

  • 애플리케이션은 Apache ParDoBeam을 사용하여 라는 사용자 지정 변환 함수를 호출하여 들어오는 레코드를 처리합니다. PingPongFn

    PingPongFn 함수를 호출하는 코드는 다음과 같습니다.

    .apply("Pong transform", ParDo.of(new PingPongFn())
  • Apache Beam을 사용하는 Managed Service for Apache Flink 애플리케이션에는 다음과 같은 구성 요소가 필요합니다. 이러한 구성 요소와 버전을 pom.xml에 포함시키지 않으면 애플리케이션이 환경 종속성에서 잘못된 버전을 로드하고 버전이 일치하지 않으므로 애플리케이션이 런타임에 충돌합니다.

    <jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
  • PingPongFn 변환 함수는 입력 데이터가 ping이 아닌 경우 입력 데이터를 출력 스트림으로 전달합니다. 이 경우 pong\n 문자열을 출력 스트림으로 내보냅니다.

    변환 함수의 코드는 다음과 같습니다.

    private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }

애플리케이션 코드를 컴파일합니다.

애플리케이션을 컴파일하려면 다음을 수행하세요.

  1. 아직 Java 및 Maven을 설치하지 않았으면 설치합니다. 자세한 정보는 시작하기 (API) DataStream 자습서의 사전 조건  섹션을 참조하세요.

  2. 다음 명령을 사용하여 애플리케이션을 컴파일합니다.

    mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
    참고

    제공된 소스 코드는 Java 11의 라이브러리를 사용합니다.

애플리케이션을 컴파일하면 애플리케이션 JAR 파일(target/basic-beam-app-1.0.jar)이 생성됩니다.

아파치 플링크 스트리밍 자바 코드를 업로드하세요.

이 섹션에서는 종속 리소스 생성 섹션에서 생성한 Amazon S3 버킷에 애플리케이션 코드를 업로드합니다.

  1. Amazon S3 콘솔에서 ka-app-code- <username>버킷을 선택하고 업로드를 선택합니다.

  2. 파일 선택 단계에서 파일 추가를 선택합니다. 이전 단계에서 생성한 basic-beam-app-1.0.jar 파일로 이동합니다.

  3. 개체 정보에 대한 설정은 변경할 필요가 없으므로 업로드를 선택합니다.

이제 애플리케이션 코드가 애플리케이션에서 액세스할 수 있는 Amazon S3 버킷에 저장됩니다.

Apache Flink용 관리형 서비스 애플리케이션을 생성하고 실행합니다.

콘솔을 사용하여 애플리케이션을 생성, 구성, 업데이트 및 실행하려면 다음 단계를 수행하세요.

애플리케이션 생성

  1. https://console.aws.amazon.com/flink에서 Managed Service for Apache Flink 콘솔을 엽니다.

  2. Managed Service for Apache Flink 대시보드에서 분석 애플리케이션 생성을 선택합니다.

  3. Managed Service for Apache Flink - 애플리케이션 생성 페이지에서 다음과 같이 애플리케이션 세부 정보를 제공합니다.

    • 애플리케이션 명칭MyApplication을 입력합니다.

    • 런타임에서 Apache Flink를 선택합니다.

      참고

      아파치 빔은 현재 아파치 플링크 버전 1.19 이상과 호환되지 않습니다.

    • 버전 풀다운에서 아파치 플링크 버전 1.15를 선택합니다.

  4. 액세스 권한에서 IAM 역할 kinesis-analytics-MyApplication-us-west-2 생성/업데이트를 선택합니다.

  5. 애플리케이션 생성을 선택합니다.

참고

콘솔을 사용하여 Managed Service for Apache Flink 애플리케이션을 만들 때 내 애플리케이션에 대한 IAM 역할 및 정책을 둘 수 있는 옵션이 있습니다. 귀하의 애플리케이션은 이 역할 및 정책을 사용하여 종속 리소스에 액세스합니다. 이러한 IAM 리소스의 이름은 애플리케이션 명칭과 리전을 사용하여 다음과 같이 지정됩니다.

  • 정책: kinesis-analytics-service-MyApplication-us-west-2

  • 역할: kinesis-analytics-MyApplication-us-west-2

IAM 정책을 편집합니다.

IAM 정책을 편집하여 Kinesis Data Streams에 액세스할 수 있는 권한을 추가합니다.

  1. https://console.aws.amazon.com/iam/에서 IAM 콘솔을 여세요.

  2. 정책을 선택하세요. 이전 섹션에서 콘솔이 생성한 kinesis-analytics-service-MyApplication-us-west-2 정책을 선택합니다.

  3. 요약 페이지에서 정책 편집을 선택합니다. JSON 탭을 선택합니다.

  4. 다음 정책 예제의 강조 표시된 부분을 정책에 추가하세요. 샘플 계정 ID(012345678901)를 내 계정 ID로 바꿉니다.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

애플리케이션 구성

  1. MyApplication페이지에서 구성을 선택합니다.

  2. 애플리케이션 구성 페이지에서 코드 위치를 입력합니다.

    • Amazon S3 버킷의 경우 ka-app-code-<username>를 입력합니다.

    • Amazon S3 객체 경로에는 basic-beam-app-1.0.jar를 입력합니다.

  3. 애플리케이션 리소스에 대한 액세스 아래에서 액세스 권한의 경우 IAM 역할 kinesis-analytics-MyApplication-us-west-2 생성/업데이트를 선택합니다.

  4. 다음을 입력합니다:

    그룹 ID
    BeamApplicationProperties InputStreamName ExampleInputStream
    BeamApplicationProperties OutputStreamName ExampleOutputStream
    BeamApplicationProperties AwsRegion us-west-2
  5. 모니터링에서 지표 수준 모니터링애플리케이션으로 설정되어 있는지 확인합니다.

  6. CloudWatch 로깅의 경우 활성화 확인란을 선택합니다.

  7. 업데이트를 선택합니다.

참고

CloudWatch 로깅을 활성화하도록 선택하면 Apache Flink용 관리형 서비스에서 로그 그룹과 로그 스트림을 자동으로 생성합니다. 이러한 리소스의 이름은 다음과 같습니다.

  • 로그 그룹: /aws/kinesis-analytics/MyApplication

  • 로그 스트림: kinesis-analytics-log-stream

이 로그 스트림은 애플리케이션을 모니터링하는 데 사용됩니다. 이 로그 스트림은 애플리케이션이 결과를 전송하는 데 사용하는 로그 스트림과 다릅니다.

애플리케이션을 실행합니다

애플리케이션을 실행하고 Apache Flink 대시보드를 연 다음 원하는 Flink 작업을 선택하면 Flink 작업 그래프를 볼 수 있습니다.

CloudWatch 콘솔에서 Apache Flink용 관리 서비스 메트릭을 확인하여 애플리케이션이 작동하는지 확인할 수 있습니다.

리소스를 정리하세요. AWS

이 섹션에는 텀블링 윈도우 튜토리얼에서 만든 AWS 리소스를 정리하는 절차가 포함되어 있습니다.

Apache Flink용 관리형 서비스 애플리케이션을 삭제하십시오.

  1. https://console.aws.amazon.com/flink에서 Managed Service for Apache Flink 콘솔을 엽니다.

  2. Apache Flink용 관리형 서비스 패널에서 선택합니다. MyApplication

  3. 애플리케이션 페이지에서 삭제를 선택한 다음 삭제를 확인합니다.

Kinesis 데이터 스트림을 삭제합니다.

  1. https://console.aws.amazon.com/kinesis에서 Kinesis 콘솔을 엽니다.

  2. Kinesis Data Streams 패널에서 을 선택합니다. ExampleInputStream

  3. ExampleInputStream페이지에서 Kinesis 스트림 삭제를 선택한 다음 삭제를 확인합니다.

  4. Kinesis 스트림 페이지에서 를 선택하고 작업을 선택하고 삭제를 선택한 다음 삭제를 확인합니다. ExampleOutputStream

Amazon S3 객체 및 버킷을 삭제합니다.

  1. https://console.aws.amazon.com/s3/에서 S3 콘솔을 엽니다.

  2. ka-app-code- 버킷을 선택합니다. <username>

  3. 삭제를 선택한 후 버킷 이름을 입력하여 삭제를 확인합니다.

IAM 리소스를 삭제합니다.

  1. https://console.aws.amazon.com/iam/에서 IAM 콘솔을 엽니다.

  2. 탐색 바에서 정책을 선택합니다.

  3. 필터 컨트롤에서 kinesis를 입력합니다.

  4. kinesis-analytics-service- MyApplication -us-west-2 정책을 선택합니다.

  5. 정책 작업을 선택한 후 삭제를 선택합니다.

  6. 탐색 모음에서 역할을 선택합니다.

  7. 키네시스-애널리틱스- -US-West-2 역할을 선택합니다. MyApplication

  8. 역할 삭제를 선택하고 삭제를 확인합니다.

CloudWatch 리소스 삭제하기

  1. https://console.aws.amazon.com/cloudwatch/ 에서 CloudWatch 콘솔을 엽니다.

  2. 탐색 바에서 로그를 선택합니다.

  3. MyApplication/aws/kinesis-analytics/ 로그 그룹을 선택합니다.

  4. 로그 그룹 삭제를 선택한 다음 삭제를 확인합니다.

다음 단계

이제 Apache Beam을 사용하여 데이터를 변환하는 기본 Managed Service for Apache Flink 애플리케이션을 생성하고 실행했으니, 고급 Managed Service for Apache Flink 솔루션의 예에 대한 자세한 내용을 알아보려면 다음의 애플리케이션을 참조하세요.