Tulis ke aliran data Kinesis Anda menggunakan KPL - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Tulis ke aliran data Kinesis Anda menggunakan KPL

Bagian berikut menunjukkan kode sampel dalam perkembangan dari produsen paling dasar ke kode asinkron sepenuhnya.

Kode produsen Barebones

Kode berikut adalah semua yang diperlukan untuk menulis produsen kerja minimal. Catatan pengguna Kinesis Producer Library (KPL) diproses di latar belakang.

// KinesisProducer gets credentials automatically like // DefaultAWSCredentialsProviderChain. // It also gets region automatically from the EC2 metadata service. KinesisProducer kinesis = new KinesisProducer(); // Put some records for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block kinesis.addUserRecord("myStream", "myPartitionKey", data); } // Do other stuff ...

Menanggapi hasil secara sinkron

Pada contoh sebelumnya, kode tidak memeriksa apakah catatan KPL pengguna berhasil. KPLMelakukan percobaan ulang yang diperlukan untuk memperhitungkan kegagalan. Tetapi jika Anda ingin memeriksa hasilnya, Anda dapat memeriksanya menggunakan Future objek yang dikembalikanaddUserRecord, seperti pada contoh berikut (contoh sebelumnya ditampilkan untuk konteks):

KinesisProducer kinesis = new KinesisProducer(); // Put some records and save the Futures List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); for (int i = 0; i < 100; i++) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block putFutures.add( kinesis.addUserRecord("myStream", "myPartitionKey", data)); } // Wait for puts to finish and check the results for (Future<UserRecordResult> f : putFutures) { UserRecordResult result = f.get(); // this does block if (result.isSuccessful()) { System.out.println("Put record into shard " + result.getShardId()); } else { for (Attempt attempt : result.getAttempts()) { // Analyze and respond to the failure } } }

Menanggapi hasil secara asinkron

Contoh sebelumnya adalah memanggil get() Future objek, yang memblokir eksekusi. Jika Anda tidak ingin memblokir eksekusi, Anda dapat menggunakan callback asinkron, seperti yang ditunjukkan pada contoh berikut:

KinesisProducer kinesis = new KinesisProducer(); FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() { @Override public void onFailure(Throwable t) { /* Analyze and respond to the failure */ }; @Override public void onSuccess(UserRecordResult result) { /* Respond to the success */ }; }; for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data); // If the Future is complete by the time we call addCallback, the callback will be invoked immediately. Futures.addCallback(f, myCallback); }