소비자 분해 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

소비자 분해

KCL은 릴리스 1.4.0부터 KPL 사용자 레코드 자동 분해를 지원합니다. 이전 버전의 KCL로 쓴 소비자 애플리케이션 코드는 KCL 업데이트 후에 수정하지 않고 컴파일됩니다. 그러나 생산자 측에서 KPL 집계를 사용하는 경우에는 체크포인트 수행을 포함한 세부적인 사항이 있습니다. 집계된 레코드 내의 모든 하위 레코드는 시퀀스 번호가 동일하므로 하위 레코드를 구별해야 하는 경우 추가 데이터를 체크포인트와 함께 저장해야 합니다. 이 추가 데이터를 하위 시퀀스 번호라고 합니다.

이전 버전의 KPL에서 마이그레이션

기존 호출을 변경하지 않고도 집계와 함께 검사를 수행할 수 있습니다. Kinesis Data Streams에 성공적으로 저장된 모든 레코드를 여전히 검색할 수 있습니다. 이제 KCL은 두 가지 체크포인트 작업을 새로 제공하여 아래에 설명된 특정 사용 사례를 지원합니다.

KPL 지원 전에 KCL용으로 기존 코드가 쓰여지고 인수를 사용하지 않고 체크포인트 작업이 직접적으로 호출되면 배치에 있는 마지막 KPL 사용자 레코드의 시퀀스 번호 체크포인트를 수행하는 것과 같습니다. 체크포인트 작업이 시퀀스 번호 문자열로 호출되면 암시적 시퀀스 번호 0(영)과 함께 배치의 제공된 시퀀스 번호를 검사하는 것과 같습니다.

인수를 사용하지 않고 새로운 KCL 체크포인트 작업 checkpoint()를 직접적으로 호출하는 것은 암시적 시퀀스 번호 0(영)과 함께 배치에서 마지막 Record 호출의 시퀀스 번호 체크포인트를 수행하는 것과 같은 의미입니다.

새로운 KCL 체크포인트 작업 checkpoint(Record record)를 직접적으로 호출하는 것은 암시적 시퀀스 번호 0(영)과 함께 제공된 Record의 시퀀스 번호 체크포인트를 수행하는 것과 같은 의미입니다. Record 호출이 실제로 UserRecord인 경우 UserRecord 시퀀스 번호 및 하위 시퀀스 번호가 검사됩니다.

새로운 KCL 체크포인트 작업 checkpoint(String sequenceNumber, long subSequenceNumber)를 직접적으로 호출하면 주어진 하위 시퀀스 번호와 함께 제공된 시퀀스 번호 체크포인트가 명시적으로 수행됩니다.

이 경우 체크포인트가 Amazon DynamoDB 체크포인트 테이블에 저장되면 애플리케이션이 충돌하고 다시 시작될 때 KCL이 레코드 검색을 올바르게 다시 시작할 수 있습니다. 시퀀스에 레코드가 더 있으면 가장 최근에 검사된 시퀀스 번호가 있는 레코드에서 다음 하위 시퀀스 번호 레코드부터 검색이 시작됩니다. 가장 최근 체크포인트에 이전 시퀀스 번호 레코드의 마지막 하위 시퀀스 번호가 포함된 경우 다음 시퀀스 번호를 가진 레코드부터 검색이 시작됩니다.

다음 단원에서는 레코드 건너뛰기와 중복을 방지해야 하는 소비자의 시퀀스 및 하위 시퀀스 검사를 자세히 설명합니다. 소비자의 레코드 처리를 중단하고 다시 시작할 때 레코드 건너뛰기나 중복이 문제가 되지 않는다면 수정하지 않고 기존의 코드를 실행해도 좋습니다.

KPL 분해를 위한 KCL 확장

앞에서 설명한 대로 KPL 분해에 하위 시퀀스 체크포인트 수행을 포함할 수 있습니다. 하위 시퀀스 체크포인트 수행을 쉽게 사용할 수 있도록 UserRecord 클래스가 KCL에 추가되었습니다.

public class UserRecord extends Record { public long getSubSequenceNumber() { /* ... */ } @Override public int hashCode() { /* contract-satisfying implementation */ } @Override public boolean equals(Object obj) { /* contract-satisfying implementation */ } }

이제 Record 대신 이 클래스가 사용됩니다. 이 클래스는 Record의 하위 클래스이므로 기존 코드가 손상되지 않습니다. UserRecord 클래스는 실제 하위 레코드와 집계되지 않은 표준 레코드를 나타냅니다. 집계되지 않은 레코드는 하위 레코드가 하나뿐인 집계된 레코드라고 간주할 수 있습니다.

또한 IRecordProcessorCheckpointer에 두 가지 새로운 작업이 추가되었습니다.

public void checkpoint(Record record); public void checkpoint(String sequenceNumber, long subSequenceNumber);

하위 시퀀스 번호 검사를 사용하기 위해 다음 변환을 수행할 수 있습니다. 다음 양식 코드를 변경합니다.

checkpointer.checkpoint(record.getSequenceNumber());

새 양식 코드:

checkpointer.checkpoint(record);

하위 시퀀스 검사에 checkpoint(Record record) 양식을 사용하는 것이 좋습니다. 그러나 검사에 사용할 sequenceNumbers를 문자열에 이미 저장한 경우 다음 예제와 같이 subSequenceNumber도 저장해야 합니다.

String sequenceNumber = record.getSequenceNumber(); long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber(); // ... do other processing checkpointer.checkpoint(sequenceNumber, subSequenceNumber);

항상 Record가 기본적으로 구현에 사용되므로 UserRecord에서 UserRecord로 향하는 캐스트가 언제나 성공합니다. 시퀀스 번호에 산술 연산을 수행할 필요가 없으면 이 방법을 사용하지 않는 것이 좋습니다.

KPL 사용자 레코드를 처리하는 동안 KCL은 하위 시퀀스 번호를 각 행의 추가 필드로 Amazon DynamoDB에 씁니다. 이전 버전의 KCL에서는 체크포인트를 다시 시작할 때 레코드를 가져오기 위해 AFTER_SEQUENCE_NUMBER를 사용하지만 KPL이 지원되는 현재 KCL에서는 AT_SEQUENCE_NUMBER를 대신 사용합니다. 검사된 시퀀스 번호의 레코드가 검색되면 검사된 하위 시퀀스 번호가 확인되고 하위 레코드가 적절히 배치됩니다(마지막 하위 레코드가 검사된 레코드일 경우 모두 해당). 다시 말해 집계되지 않은 레코드는 단일 하위 레코드가 있는 집계 레코드로 간주할 수 있으므로 집계된 레코드와 집계되지 않은 레코드 모두 동일한 알고리즘이 적용됩니다.

GetRecords직접 사용

KCL을 사용하지 않고 API 작업 GetRecords를 간접적으로 호출하여 Kinesis Data Streams 레코드를 검색할 수도 있습니다. 원래의 KPL 사용자 레코드에 검색된 이 레코드의 압축을 풀려면 UserRecord.java에서 다음 정적 작업 중 하나를 직접적으로 호출합니다.

public static List<Record> deaggregate(List<Record> records) public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)

첫 번째 작업에서는 0에 기본값 startingHashKey(영)을 사용하고 2^128 -1에 기본값 endingHashKey을 사용합니다.

이 작업은 각각 지정된 Kinesis Data Streams 레코드 목록을 KPL 사용자 레코드 목록으로 분해합니다. 명시적 해시 키 또는 파티션 키가 startingHashKey(포함) 및 endingHashKey(포함) 범위를 벗어나는 모든 KPL 사용자 레코드는 반환된 레코드 목록에서 삭제됩니다.