Namespace: OpenEdge.Messaging.Kafka
Type: Class KafkaTransactionalProducer
Parent Classes:
Inherits: OpenEdge.Messaging.Kafka.KafkaProducer
Implements: OpenEdge.Messaging.ITransactionalProducer


/*
Copyright © 2022 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
Purpose:
This class implements the API's necessary for a Kafka transactional producer. This extends the regular
Kafka producer with additional methods needed for initializing/starting/stopping/aborting transactions.
The ABL application is expected to call InitTransactions() to ensure any previous, incomplete transactions
are properly fenced by the broker.
Kafka transactional producers must only send messages from within the transaction. To start a transaction,
the ABL application calls BeginTransaction(). Once the transaction is started, all messages are sent by the
application as with the regular producer.
To complete the transaction, the application calls CommitTransaction() or AbortTransaction() as approprate.
Transaction boundary calls (InitTransaction/BeginTransaction/CommitTransaction/SendOffsetsToTransaction) are
blocking calls that may timeout.
Read more here:
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L7866



Method Summary
  Options Name Purpose
  AbortTransaction () /** Abort the current transaction. If any exception occurs, you can retry the abort, or delete the producer. You cannot commit a transaction once an attempt has been made to abort it; the abort may be retried, or the producer must be deleted. This method will block until all outstanding messages are purged, and transaction abort is successful or until the timeout occurs. The timeout is based on the 'transaction.timeout.ms' option configured for the producer. On timeout the application may call the function again. The application may retry this operation if MessagingErrorRetriable is raised. This method may raise one or more of the following errors: @throws Progress.Messaging.MessagingError @throws Progress.Lang.MessagingErrorRetriable @throws Progress.Lang.MessagingErrorFatal @throws Progress.Lang.MessagingErrorAbort */
  BeginTransaction () /** Begin a transaction. Only one active transaction per producer is allowed. Messages may not be produced outside of the transaction boundary. This method is blocking, but may timeout. The timeout is based on the 'transaction.timeout.ms' option configured for the producer. This method may raise one or more of the following errors: @throws Progress.Messaging.MessagingError @throws Progress.Lang.MessagingErrorRetriable @throws Progress.Lang.MessagingErrorFatal @throws Progress.Lang.MessagingErrorAbort */
  CommitTransaction () /** Commit the current transaction. If any exception occurs, you can retry the commit or delete the producer. You cannot abort any transaction once an attempt has been made to commit it; the commit may be retried, or the producer must be deleted. Flush is automatically called. This method will block until all outstanding messages are delivered, and transaction commit request is successful or until the timeout occurs. The timeout is based on the 'transaction.timeout.ms' option configured for the producer. The application may retry this operation if MessagingErrorRetriable is raised. This method may raise one or more of the following errors: @throws Progress.Messaging.MessagingError @throws Progress.Lang.MessagingErrorRetriable @throws Progress.Lang.MessagingErrorFatal @throws Progress.Lang.MessagingErrorAbort */
  OpenEdge.Messaging.Internal.IConsumerDelegate ExtractAndValidateConsumerDelegate (IConsumer) /* Validate the consumer is the correct type, and extract the consumer delegate that holds onto the BIO consumer object. */
  InitTransactions () /** Called to ensure any previously opened transactions are fenced off from new transactions that are using the transactional.id of the producer. This method must be called before BeginTransaction in order to prepare the client and Kafka cluster for handling transactions from the producer. This method is blocking, but may timeout. The timeout is based on the 'transaction.timeout.ms' option configured for the producer. The application may retry this operation if MessagingErrorRetriable is raised. This method may raise one or more of the following errors: @throws Progress.Messaging.MessagingError @throws Progress.Lang.MessagingErrorRetriable @throws Progress.Lang.MessagingErrorFatal @throws Progress.Lang.MessagingErrorAbort */
  SendOffsetsToTransaction (INPUT, IConsumer) /** SendOffsetsToTransaction is used to record the offsets for a consumer within the transaction used to produce records. The offsets are sent to the cluster and recorded as part of the current transaction. SendOffsetsToTransaction is used in a "consume->process->produce" scenario where the producer wants to ensure that offset acknowledgements for consumed records are stored with the broker if the transaction is successfully committed. This method is used when a process is both a consumer of records and a producer of records. This method is called passing in the offsets and partition metadata related to any consumed messages before the transaction is completed. The consumer must be configured with isolation.level=read_committed and enable.auto.commit=false, and the application must not call CommitOffset on the consumer. The recording of consumer offsets via SendOffsetToTransaction or SendOffsetsToTransaction may only be done once per transaction. @param offsets List of Topic+Partition+Offset to commit to the consumer group upon successful commit of the transaction. Offsets should be the next message to consume, e.g., offset of last processed message + 1. @param consumer The consumer client that is consuming messages within the transaction This method may raise one or more of the following errors: @throws Progress.Messaging.MessagingError @throws Progress.Lang.MessagingErrorRetriable @throws Progress.Lang.MessagingErrorFatal @throws Progress.Lang.MessagingErrorAbort */
  SendOffsetToTransaction (TopicPartitionOffset, IConsumer) /** SendOffsetToTransaction is used to record the offsets for a consumer within the transaction used to produce records. The offsets are sent to the cluster and recorded as part of the current transaction. SendOffsetToTransaction is used in a "consume->process->produce" scenario where the producer wants to ensure that offset acknowledgements for consumed records are stored with the broker if the transaction is successfully committed. This method is used when a process is both a consumer of records and a producer of records. This method is called passing in the offset and partition metadata related to any consumed messages before the transaction is completed. The consumer must be configured with isolation.level=read_committed and enable.auto.commit=false, and the application must not call CommitOffset on the consumer. The recording of consumer offsets via SendOffsetToTransaction or SendOffsetsToTransaction may only be done once per transaction. @param offsets Topic+Partition+Offset to commit to the consumer group upon successful commit of the transaction. The offset should be the next message to consume, e.g., offset of last processed message + 1. @param consumer The consumer client that is consuming messages within the transaction This method may raise one or more of the following errors: @throws Progress.Messaging.MessagingError @throws Progress.Lang.MessagingErrorRetriable @throws Progress.Lang.MessagingErrorFatal @throws Progress.Lang.MessagingErrorAbort */
  ValidateOffset (TopicPartitionOffset) /* Validate the offset information we're about to pass to the BIO. @throws MessagingError if any invalid property values are found in the offset */

Constructor Summary
  Options Name Purpose
  KafkaTransactionalProducer (IProducerDelegate, IStringStringMap, IStringKeyedMap)

Method Detail
Top

AbortTransaction ()

Purpose:
Abort the current transaction. If any exception occurs, you can retry the abort, or delete the producer.
You cannot commit a transaction once an attempt has been made to abort it; the abort may be retried, or the producer must be deleted.
This method will block until all outstanding messages are purged, and transaction abort is successful or until the timeout occurs.
The timeout is based on the 'transaction.timeout.ms' option configured for the producer.
On timeout the application may call the function again.
The application may retry this operation if MessagingErrorRetriable is raised.
This method may raise one or more of the following errors:
@throws Progress.Messaging.MessagingError
@throws Progress.Lang.MessagingErrorRetriable
@throws Progress.Lang.MessagingErrorFatal
@throws Progress.Lang.MessagingErrorAbort
Top

BeginTransaction ()

Purpose:
Begin a transaction. Only one active transaction per producer is allowed. Messages may not be produced outside of the transaction
boundary.
This method is blocking, but may timeout. The timeout is based on the 'transaction.timeout.ms' option configured for the producer.
This method may raise one or more of the following errors:
@throws Progress.Messaging.MessagingError
@throws Progress.Lang.MessagingErrorRetriable
@throws Progress.Lang.MessagingErrorFatal
@throws Progress.Lang.MessagingErrorAbort
Top

CommitTransaction ()

Purpose:
Commit the current transaction. If any exception occurs, you can retry the commit or delete the producer.
You cannot abort any transaction once an attempt has been made to commit it; the commit may be retried, or the producer must be deleted.
Flush is automatically called.
This method will block until all outstanding messages are delivered, and transaction commit request is successful or until the timeout occurs.
The timeout is based on the 'transaction.timeout.ms' option configured for the producer.
The application may retry this operation if MessagingErrorRetriable is raised.
This method may raise one or more of the following errors:
@throws Progress.Messaging.MessagingError
@throws Progress.Lang.MessagingErrorRetriable
@throws Progress.Lang.MessagingErrorFatal
@throws Progress.Lang.MessagingErrorAbort
Top

OpenEdge.Messaging.Internal.IConsumerDelegate ExtractAndValidateConsumerDelegate (IConsumer)

/*
Validate the consumer is the correct type, and extract the consumer delegate that holds onto the BIO consumer object.
Parameters:
consumer OpenEdge.Messaging.IConsumer
 
Returns OpenEdge.Messaging.Internal.IConsumerDelegate
 
Top

InitTransactions ()

Purpose:
Called to ensure any previously opened transactions are fenced off from new transactions that are using the
transactional.id of the producer. This method must be called before BeginTransaction in order to prepare the client and Kafka cluster
for handling transactions from the producer.
This method is blocking, but may timeout. The timeout is based on the 'transaction.timeout.ms' option configured for the producer.
The application may retry this operation if MessagingErrorRetriable is raised.
This method may raise one or more of the following errors:
@throws Progress.Messaging.MessagingError
@throws Progress.Lang.MessagingErrorRetriable
@throws Progress.Lang.MessagingErrorFatal
@throws Progress.Lang.MessagingErrorAbort
Top

SendOffsetsToTransaction (INPUT, IConsumer)

Purpose:
SendOffsetsToTransaction is used to record the offsets for a consumer within the transaction used to produce records. The offsets are sent to
the cluster and recorded as part of the current transaction.
SendOffsetsToTransaction is used in a "consume->process->produce" scenario where the producer wants to ensure that offset acknowledgements
for consumed records are stored with the broker if the transaction is successfully committed. This method is used when a process is both
a consumer of records and a producer of records. This method is called passing in the offsets and partition metadata related to any
consumed messages before the transaction is completed.
The consumer must be configured with isolation.level=read_committed and enable.auto.commit=false, and the application must not call CommitOffset on the consumer.
The recording of consumer offsets via SendOffsetToTransaction or SendOffsetsToTransaction may only be done once per transaction.
successful commit of the transaction. Offsets should be
the next message to consume, e.g., offset of last processed message + 1.
This method may raise one or more of the following errors:
@throws Progress.Messaging.MessagingError
@throws Progress.Lang.MessagingErrorRetriable
@throws Progress.Lang.MessagingErrorFatal
@throws Progress.Lang.MessagingErrorAbort
Parameters:
offsets UNKNOWN DATATYPE
  List of Topic+Partition+Offset to commit to the consumer group upon
consumer OpenEdge.Messaging.IConsumer
  The consumer client that is consuming messages within the transaction
Top

SendOffsetToTransaction (TopicPartitionOffset, IConsumer)

Purpose:
SendOffsetToTransaction is used to record the offsets for a consumer within the transaction used to produce records. The offsets are sent to
the cluster and recorded as part of the current transaction.
SendOffsetToTransaction is used in a "consume->process->produce" scenario where the producer wants to ensure that offset acknowledgements
for consumed records are stored with the broker if the transaction is successfully committed. This method is used when a process is both
a consumer of records and a producer of records. This method is called passing in the offset and partition metadata related to any
consumed messages before the transaction is completed.
The consumer must be configured with isolation.level=read_committed and enable.auto.commit=false, and the application must not call CommitOffset on the consumer.
The recording of consumer offsets via SendOffsetToTransaction or SendOffsetsToTransaction may only be done once per transaction.
successful commit of the transaction. The offset should be
the next message to consume, e.g., offset of last processed message + 1.
This method may raise one or more of the following errors:
@throws Progress.Messaging.MessagingError
@throws Progress.Lang.MessagingErrorRetriable
@throws Progress.Lang.MessagingErrorFatal
@throws Progress.Lang.MessagingErrorAbort
Parameters:
offset OpenEdge.Messaging.TopicPartitionOffset
 
consumer OpenEdge.Messaging.IConsumer
  The consumer client that is consuming messages within the transaction
Top

ValidateOffset (TopicPartitionOffset)

/*
Validate the offset information we're about to pass to the BIO.
@throws MessagingError if any invalid property values are found in the offset
Parameters:
offset OpenEdge.Messaging.TopicPartitionOffset
 


Constructor Detail
Top

KafkaTransactionalProducer (IProducerDelegate, IStringStringMap, IStringKeyedMap)

Parameters:
producerDelegate OpenEdge.Messaging.Internal.IProducerDelegate
 
config OpenEdge.Core.Collections.IStringStringMap
 
topicConfigurations OpenEdge.Core.Collections.IStringKeyedMap