기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
다음 단원에서는 가장 기본적인 생산자부터 완전 비동기식 코드까지 진행되는 샘플 코드를 보여줍니다.
베어본 생산자 코드
다음 코드만 있으면 작동되는 최소한의 생산자를 쓸 수 있습니다. Kinesis 생산자 라이브러리(KPL) 사용자 레코드는 백그라운드에서 처리됩니다.
// KinesisProducer gets credentials automatically like
// DefaultAWSCredentialsProviderChain.
// It also gets region automatically from the EC2 metadata service.
KinesisProducer kinesis = new KinesisProducer();
// Put some records
for (int i = 0; i < 100; ++i) {
ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
// doesn't block
kinesis.addUserRecord("myStream", "myPartitionKey", data);
}
// Do other stuff ...
결과에 동기적으로 응답
이전 예제에서 코드는 KPL 사용자 레코드가 성공했는지 여부를 확인하지 않습니다. KPL은 실패를 설명하는 데 필요한 재시도를 수행합니다. 하지만 결과를 확인하려면 다음 예제(컨텍스트에 맞게 표시된 이전 예제)와 같이 addUserRecord
에서 반환되는Future
객체를 사용하여 검사할 수 있습니다.
KinesisProducer kinesis = new KinesisProducer();
// Put some records and save the Futures
List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>();
for (int i = 0; i < 100; i++) {
ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
// doesn't block
putFutures.add(
kinesis.addUserRecord("myStream", "myPartitionKey", data));
}
// Wait for puts to finish and check the results
for (Future<UserRecordResult> f : putFutures) {
UserRecordResult result = f.get(); // this does block
if (result.isSuccessful()) {
System.out.println("Put record into shard " +
result.getShardId());
} else {
for (Attempt attempt : result.getAttempts()) {
// Analyze and respond to the failure
}
}
}
결과에 비동기적으로 응답
이전 예제에서는 런타임을 차단하는 Future
객체get()
에서를 호출합니다. 런타임을 차단하지 않으려면 다음 예제와 같이 비동기 콜백을 사용할 수 있습니다.
KinesisProducer kinesis = new KinesisProducer();
FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {
@Override public void onFailure(Throwable t) {
/* Analyze and respond to the failure */
};
@Override public void onSuccess(UserRecordResult result) {
/* Respond to the success */
};
};
for (int i = 0; i < 100; ++i) {
ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data);
// If the Future is complete by the time we call addCallback, the callback will be invoked immediately.
Futures.addCallback(f, myCallback);
}