Apache Flink용 매니지드 서비스에 스트리밍 데이터 소스 추가 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink는 이전에 Amazon Kinesis Data Analytics for Apache Flink로 알려졌습니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Apache Flink용 매니지드 서비스에 스트리밍 데이터 소스 추가

Apache Flink는 파일, 소켓, 컬렉션, 맞춤 소스에서 읽을 수 있는 커넥터를 제공합니다. 애플리케이션 코드에서 Apache Flink 소스를 사용하여 스트림으로부터 데이터를 수신합니다. 이 섹션에서는 Amazon 서비스에 사용할 수 있는 소스를 설명합니다.

Kinesis Data Streams

FlinkKinesisConsumer 소스는 Amazon Kinesis 데이터 스트림에서 애플리케이션으로 스트리밍 데이터를 제공합니다.

FlinkKinesisConsumer 생성

다음 코드 예는 FlinkKinesisConsumer를 생성하는 방법을 보여 줍니다.

Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); DataStream<string> input = env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

FlinkKinesisConsumer 사용에 대한 자세한 내용을 알아보려면 Apache Flink 스트리밍 Java 코드를 다운로드하여 검토하십시오. 섹션을 참조하세요.

EFO 컨슈머를 사용하는 FlinkKinesisConsumer 생성

는 FlinkKinesisConsumer 이제 향상된 팬아웃 (EFO) 을 지원합니다.

Kinesis 컨슈머가 EFO를 사용하는 경우 Kinesis Data Streams 서비스는 컨슈머가 스트림에서 읽는 다른 컨슈머와 스트림의 고정 대역폭을 공유하지 않고 자체 전용 대역폭을 제공합니다.

Kinesis 소비자와 함께 EFO를 사용하는 방법에 대한 자세한 내용은 FLIP-128: Kinesis 소비자를 위한 향상된 팬아웃을 AWS 참조하십시오.

Kinesis 소비자에 다음 파라미터를 설정하여 EFO 소비자를 활성화합니다.

  • RECORD_PUBLISHER_TYPE: 애플리케이션이 EFO 소비자를 사용하여 Kinesis Data Stream 데이터에 액세스하도록 이 파라미터를 EFO로 설정하세요.

  • EFO_CONSUMER_NAME: 이 파라미터를 이 스트림의 소비자 간에 고유한 문자열 값으로 설정합니다. 동일한 Kinesis Data Stream에서 컨슈머 명칭을 재사용하면 해당 명칭을 사용하던 이전 컨슈머가 종료됩니다.

EFO를 사용하도록 FlinkKinesisConsumer를 구성하려면 컨슈머에 다음 파라미터를 추가하세요.

consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO"); consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");

EFO 컨슈머를 사용하는 Managed Service for Apache Flink 애플리케이션의 예는 EFO 컨슈머 섹션을 참조하세요.

Amazon MSK

KafkaSource 소스는 Amazon MSK 주제에서 스트리밍 데이터를 애플리케이션에 제공합니다.

KafkaSource 생성

다음 코드 예는 KafkaSource를 생성하는 방법을 보여 줍니다.

KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

KafkaSource 사용에 대한 자세한 내용을 알아보려면 MSK 복제 섹션을 참조하세요.