Exploring Solutions for Distributed Transactions (2)
A Comprehensive Overview of Techniques, Patterns, and Algorithms for Maintaining Data Consistency in Distributed Systems.
Thank you for being a part of this journey with me, and I hope to continue providing value to you for years to come! Giving tips by supporting me.
Hello friend, I hope you’re doing well! I’m writing to you today because I am truly passionate about sharing my knowledge and helping others learn. If you’ve found my articles to be valuable and they’ve made a positive impact on your life, I would be overjoyed if you could support me as a referred member. Your support not only helps me financially but also motivates me to continue creating content that makes a difference in people’s lives. Thank you from the bottom of my heart for considering supporting me. It means the world to me to have your support on this journey.
In different business scenarios, the solutions will be different, and the common methods are:
- Blocking Retry
- Two-Phase Commit (2PC) and Three-Phase Commit (3PC)
- Using Queues to Process Asynchronously in the Background
- TCC Compensation Matters
- Local Message Table (Asynchronously Ensured)/Outbox Pattern
- MQ Transaction
- Saga Pattern
- Event Sourcing
- CQRS
- Atomic Commitment
- Parallel Commits
- Transactional Replication
- Consensus Algorithms
- Timestamp Ordering
- Optimistic Concurrency Control
- Byzantine Fault Tolerance (BFT)
- Distributed Locking
- Sharding
- Multi-Version Concurrency Control (MVCC)
- Distributed Snapshots
- Leader-Follower Replication
4. Try-Confirm-Cancel (TCC) Compensation Matters

- It is used to handle long-lived transactions across multiple services.
- It involves breaking down a complex transaction into multiple steps, where each step is a separate service call or database operation. Each step in the transaction has 3 phases: try, confirm, and cancel.
- Try phase — The service tries to perform the operation. It performs a series of checks to ensure that it is safe to perform the operation. If all checks pass, the operation is performed, and the state is saved in temporary storage.
- Confirm phase — The service confirms that the operation was successful. It validates the state of the transaction and ensures that all dependencies are satisfied. If everything is valid, the temporary state is committed to the main database.
- Cancel phase — If something goes wrong during the try phase or confirm phase, the cancel phase is initiated. The service undoes the changes made in the try phase. It restores the state to the point before the transaction started.
import requests
class OrderService:
def __init__(self):
self.session = requests.Session()
def create_order(self, order_data):
try:
# Step 1: Reserve stock
res = self.session.post('http://inventory-service/reserve-stock', json=order_data)
res.raise_for_status()
# Step 2: Charge payment
res = self.session.post('http://payment-service/charge-payment', json=order_data)
res.raise_for_status()
# Step 3: Confirm order
res = self.session.post('http://order-service/confirm-order', json=order_data)
res.raise_for_status()
except requests.exceptions.RequestException as e:
# Step 4: Cancel order
self.session.post('http://order-service/cancel-order', json=order_data)
self.session.post('http://payment-service/refund-payment', json=order_data)
self.session.post('http://inventory-service/release-stock', json=order_data)
raise eClass OrderService — Creates an order by following the Try-Confirm-Cancel (TCC) pattern
The order creation process involves 3 steps
- Reserve stock
- Charge payment
- Confirm order
If any of these steps fail, cancel the order and undo any changes that were made. This is done in the exception handler, which rolls back any database changes made during the execution of the try block.
In the reserve_stock() method, check if there is enough stock available for the product. If there is, reserve the stock by decrementing the available stock count in the database. If there is not enough stock available, raise an exception and roll back any changes made so far.
In the charge_payment() method, check if the customer has enough funds to pay for the order. If they do, we charge their account by deducting the order amount from their balance. If they do not have enough funds, raise an exception and roll back any changes made so far.
In the confirm_order() method, update the order status to “confirmed” in the database. If there is an exception during this step, cancel the order and roll back any changes made so far.
Advantages
- Avoid locking resources for long periods of time
- Handle compensation matters in case of transaction failure or incompleteness
- Handle distributed transactions in systems where 2PC is not practical
Disadvantages
- Require additional effort to implement compensating transactions for each transaction
- No atomicity guarantees
When to use
- Handling long-live transactions in systems
Challenges
- Implementing compensating transactions can be complex
- Coordinating multiple services in a distributed environment can be challenging
5. Local Message Table (Asynchronously Ensured)/Outbox Pattern
- It is a messaging pattern used in a microservices architecture.
- It allows microservices to exchange messages asynchronously using a local message table as an intermediate buffer.
- Each microservice that needs to exchange messages with other microservices has its own message table.
- When a microservice needs to send a message to another microservice, it adds the message to its own message table. The message contains all the necessary information that the receiving microservice needs to process the request.
- Once the message is added to the table, the sending microservice can continue processing other tasks. Meanwhile, a background worker or a separate process continuously monitors the message table for new messages.
- When a new message is detected, the worker retrieves the message and sends it to the receiving microservice.
- Once the receiving microservice has processed the message, it sends an acknowledgment to the sending microservice indicating that the message has been successfully processed. The acknowledgment may also include any results or data that the sending microservice needs.
- If the acknowledgment is not received within a certain timeframe, the sending microservice can assume that the message was not processed successfully and can take appropriate action, such as resending the message or canceling the request.
import psycopg2
from psycopg2 import sql
from psycopg2.extras import DictCursor
import json
import threading
import time
# Define the message schema
message_schema = {
"id": "",
"data": {},
"status": ""
}
# Connect to the PostgreSQL database
conn = psycopg2.connect(
dbname="my_database",
user="my_username",
password="my_password",
host="localhost",
port="5432"
)
# Create the messages table if it doesn't exist
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS messages (
id UUID PRIMARY KEY,
data JSONB NOT NULL,
status TEXT NOT NULL
)
""")
conn.commit()
# Define the message producer
def send_message(data):
with conn.cursor() as cur:
# Create the message using the schema
message = message_schema.copy()
message["id"] = str(uuid.uuid4())
message["data"] = data
message["status"] = "new"
# Insert the message into the messages table
cur.execute("""
INSERT INTO messages (id, data, status)
VALUES (%s, %s, %s)
""", (message["id"], json.dumps(message["data"]), message["status"]))
conn.commit()
# Define the message consumer
def message_consumer():
while True:
with conn.cursor(cursor_factory=DictCursor) as cur:
# Select the oldest message from the messages table that has a status of "new"
cur.execute("""
SELECT *
FROM messages
WHERE status = 'new'
ORDER BY id
LIMIT 1
FOR UPDATE SKIP LOCKED
""")
row = cur.fetchone()
if row:
message = dict(row)
# Update the message status to "processing"
cur.execute("""
UPDATE messages
SET status = 'processing'
WHERE id = %s
""", (message["id"],))
conn.commit()
# Process the message
process_message(message)
# Update the message status to "processed"
cur.execute("""
UPDATE messages
SET status = 'processed'
WHERE id = %s
""", (message["id"],))
conn.commit()
time.sleep(1)
# Start the message consumer in a separate thread
consumer_thread = threading.Thread(target=message_consumer)
consumer_thread.start()- Use PostgreSQL as the database and psycopg2 library for interacting with the database.
- The
send_messagefunction — create a new message and insert it into the PostgreSQL database - The
message_consumerfunction — continuously polling the database for new messages with a "new" status, processing them, and updating their status to "processed". - The actual implementation of the Local Message Table pattern may vary depending on the specific requirements and the database solution that is chosen.
Advantages
- Messages are not lost or processed more than once
- The sending microservice continues processing other tasks while messages are being exchanged in the background.
- Microservices are decoupled from each other and can be developed and deployed independently
- It allows for the addition of new microservices without affecting the existing system
Disadvantages
- It can be complex by considering some factors such as message format, message size, and error handling
- It may not suitable for scenarios where real-time processing is required as there may be some delay between sending and receiving microservices
- It requires additional infrastructure such as a message queue or a database to store and manage messages
When to use
- An E-commerce website that has multiple microservices handles different aspects such as inventory management, order processing, and payment processing
- A healthcare system can be used to share electronic medical records and medical billing between different services.
- Banks that may have multiple microservices handle different aspects such as account management, transaction processing, and fraud detection.
Challenges
- 1) Message format — Messages should be formatted in a way that is compatible with all the services that will be exchanging messages. This may require some standardization across services.
- 2) Message size — Messages should be kept small to reduce the processing time and the likelihood of errors occurring during transmission.
- 3) Error handling — It handles errors that occur during message transmission or processing. For example, retrying failed messages or taking other corrective actions.
- 4) Real-time processing — This pattern may not be suitable for scenarios where real-time processing is required, as there may be some delay between sending and receiving messages.
Outbox Pattern


- It is a message-oriented pattern that helps in achieving transactional consistency between services
- Ensure that messages are delivered exactly once and that they remain in a consistent state even in the event of failures or errors
- The are some steps:
- 1) A service receives a request from a client or another service
- 2) When a request is received, the service writes the data that needs to be sent to other services to an Outbox table in its own database. The Outbox table contains all the necessary information that the receiving service needs to process the request
- 3) After writing to the Outbox table, the service commits the database transaction to ensure the data is safely stored in the database.
- 4) An Outbox processor is a background process that runs periodically and reads the Outbox table in the database to check for new messages that need to be sent
- 5) If the Outbox processor detects a new message in the Outbox table, it retrieves the message from the database and sends it to the appropriate message broker.
- 6) Once the message is received by the message broker, it is sent to the receivers. The receivers can be other services or downstream systems.
- 7) After the message is successfully delivered to the receivers, the message broker sends an acknowledgment back to the Outbox processor to mark the message as successfully sent.
- 8) Once the Outbox processor receives an acknowledgment from the message broker, it deletes the message from the Outbox table to prevent it from being sent again.
import json
import pika
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, DateTime
# Create SQLAlchemy engine and metadata
engine = create_engine('postgresql://user:password@localhost:5432/db')
metadata = MetaData()
# Define the Outbox table schema
outbox_table = Table('outbox', metadata,
Column('id', Integer, primary_key=True),
Column('event_type', String),
Column('payload', String),
Column('created_at', DateTime)
)
# Define RabbitMQ connection parameters
rabbitmq_params = pika.ConnectionParameters(host='localhost', port=5672)
# Define RabbitMQ queue name
queue_name = 'outbox'
# Connect to RabbitMQ
connection = pika.BlockingConnection(rabbitmq_params)
channel = connection.channel()
# Declare the queue
channel.queue_declare(queue=queue_name, durable=True)
# Define function to publish messages to RabbitMQ
def publish_message(message):
channel.basic_publish(exchange='',
routing_key=queue_name,
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # Make messages persistent
))
# Define function to read messages from the Outbox table and publish them to RabbitMQ
def process_outbox():
with engine.connect() as conn:
result = conn.execute(outbox_table.select().order_by(outbox_table.c.id))
for row in result:
# Create message payload as a dictionary
payload = {
'event_type': row['event_type'],
'data': json.loads(row['payload'])
}
# Convert payload to JSON string
message = json.dumps(payload)
# Publish message to RabbitMQ
publish_message(message)
# Delete message from Outbox table
conn.execute(outbox_table.delete().where(outbox_table.c.id == row['id']))
# Define function to write data to the Outbox table
def write_to_outbox(event_type, data):
with engine.connect() as conn:
# Convert data to JSON string
payload = json.dumps(data)
# Insert data into Outbox table
conn.execute(outbox_table.insert().values(
event_type=event_type,
payload=payload,
created_at=datetime.now()
))
# Example usage:
write_to_outbox('user_created', {'user_id': 123, 'name': 'John Doe'})
process_outbox()- Implement by using SQLAlchemy and RabbitMQ
- Use SQLAlchemy to connect to a PostgreSQL database and defines an Outbox table schema.
- Use Pika to connect to a RabbitMQ message broker and defines a queue name.
- The
write_to_outboxfunction — write data to the Outbox table - The
process_outboxfunction — read messages from the Outbox table and publishes them to RabbitMQ - The
publish_messagefunction — Publishe a message to RabbitMQ.
Advantages
- All events are published to the message broker even if the database transaction fails
- Decouple the publishing of events from the main application code
- Allows for efficient batch processing of events
Disadvantages
- Require additional development effort to implement the Outbox table and the background worker to publish events to the message broker
- Introduce additional complexity to the system architecture
- Require careful management of database transactions to ensure that messages are only published if the transaction succeeds
When to use
- When a microservice needs to publish events to multiple subscribers, such as in a publish/subscribe messaging scenario and ensure that all database updates are captured in the message stream.
- An e-commerce platform publishes events about new orders, updated order status, and customer feedback to multiple subscribers, including order fulfillment systems, customer service, and marketing
- A financial services platform needs to capture all financial transactions in the message stream for auditing and compliance purposes.
Challenges
- Ensuring that the background worker responsible for publishing events is highly available and fault-tolerant.
- Managing database transactions to ensure that messages are only published if the associated database transaction succeeds
- Handling errors and retries when publishing events to the message broker
Considerations
- 1) Message format — Use a JSON or Protobuf message format to represent the events to be published. The format will depend on the requirements of the specific application, but it must be compatible with the message broker being used.
- 2) Message size — Depends on the size and complexity of the events being published. Ensure that the message size does not exceed the limits of the message broker being used. Large messages can cause performance issues and may need to be split into smaller messages for efficient processing.
- 3) Error handling — If an error occurs during the publishing of an event, handle the error and retry the operation. Retries should be performed with a back-off strategy to avoid overwhelming the message broker or causing a denial-of-service attack. If there is an error occurred during inserting events into the outbox table, the event should not be published to the message broker to avoid data inconsistencies.
Similarities
- Both patterns use a message table to store messages that need to be exchanged between services
- Both patterns allow services to continue processing other tasks while messages are being exchanged in the background
- Both patterns provide a microservice to exchange messages between another microservice
Differences
- 1) Database — The Outbox pattern uses a single database to store all the messages that need to be exchanged, while the Local Message Table pattern uses a message table for each service that needs to exchange messages.
- 2) Implementation — The background worker or a separate process reads messages from the Outbox table and sends them to the message broker such as RabbitMQ or Kafka. The background worker or a separate process monitors the message table for new messages and sends them to the receiving service by using any messaging mechanism such as direct HTTP requests or message brokers.
- 3) Use Cases — The Outbox pattern is particularly useful in scenarios where a microservice needs to publish events to multiple subscribers. The Local Message Table pattern is useful when multiple services need to exchange messages with each other, but not all services need to receive every message.
- 4) Challenges — The Outbox pattern requires careful handling of database transactions to ensure that messages are only published if the associated database transaction succeeds. The Local Message Table pattern may require more complex error handling, as messages may need to be retried or resent if they fail to reach their destination.
Message Queuing (MQ) Transaction

- A sequence of multiple message operations is grouped together as a single unit of work.
- The transaction ensures that either all the operations are completed successfully or none of them are. This ensures that the system remains in a consistent state even in the event of failures or errors.
- Messages are processed in a way that ensures atomicity, consistency, isolation, and durability (ACID).
- The transaction ensures that either all the operations are completed successfully or none of them are. This ensures that the system remains in a consistent state even in the event of failures or errors.
- There are some steps:
- 1) Establish a connection to the message broker by specifying the connection settings such as the broker URL, username, and password
- 2) Once the connection is established, the transaction is initiated by calling the
beginmethod. This method notifies the message broker that a transaction has started and all subsequent message operations should be grouped together as part of the transaction. - 3) Perform the message operations that need to be grouped together as part of the transaction by sending messages to a queue or topic.
- 4) Once all the message operations have been performed successfully, the transaction is committed by calling the
commitmethod. This method notifies the message broker that all the message operations performed as part of the transaction have been completed successfully. - 5) If any errors occur during the transaction, the transaction is rolled back by calling the
rollbackmethod. This method notifies the message broker that the transaction has failed and all the message operations performed as part of the transaction should be undone. - A transaction is a sequence of message operations that are grouped together as a single unit of work.
- The transaction ensures that either all the operations are completed successfully or none of them are. This ensures that the system remains in a consistent state even in the event of failures or errors.
- Messages are processed in a way that ensures atomicity, consistency, isolation, and durability (ACID).
import stomp
class MyListener(stomp.ConnectionListener):
def __init__(self, conn):
self.conn = conn
def on_message(self, headers, message):
# Process the message
print("Received message:", message)
# Acknowledge the message
self.conn.ack(id=headers['message-id'], subscription=headers['subscription'])
def on_error(self, headers, message):
print('Received an error "{}": {}'.format(headers, message))
def on_disconnected(self):
print('Disconnected from the message broker.')
# Define the message broker connection settings
broker_url = "tcp://localhost:61613"
username = "myusername"
password = "mypassword"
queue_name = "/queue/myqueue"
# Establish a connection to the message broker
conn = stomp.Connection([(broker_url, 61613)])
conn.set_listener('', MyListener(conn))
conn.start()
conn.connect(username, password)
try:
# Begin the transaction
conn.begin()
# Send the first message
conn.send(body="Message 1", destination=queue_name)
# Send the second message
conn.send(body="Message 2", destination=queue_name)
# Commit the transaction
conn.commit()
except Exception as ex:
# Rollback the transaction in case of errors
conn.rollback()
print("Error sending messages:", str(ex))
# Disconnect from the message broker
conn.disconnect()- Implement using the Apache ActiveMQ library
- The
begin,send, andcommitmethods are used to group the sending of two messages as a single transaction. - If an error occurs during the transaction, the
rollbackmethod is called to ensure that none of the messages are processed. - Not all message brokers support transactions and the exact implementation may vary depending on the specific message broker and library being used.
Advantages
- Ensure that messages are delivered to their intended recipients, even in the event of network failures or other disruptions
- A group of message operations is treated as a single unit of work. Either all of the messages in the transaction are processed or none of them are processed.
- It can handle large volumes of messages and can be scaled horizontally as needed.
- It can be used across different platforms and programming languages, making them a versatile solution for message exchange.
Disadvantages
- Implementing MQ transactions can be complex
- Add overhead to message processing, which can impact system performance.
- Require licensing fees and hardware costs, making it potentially expensive to implement
When to use
- When handling high volumes of messages and requiring atomicity
- Exchange messages related to financial transactions, such as stock trades or online payments
- Exchange messages related to supply chain management, such as tracking inventory levels or coordinating deliveries
- Exchange messages related to online ordering systems, such as processing orders and tracking shipments
Considerations
- 1) Message Format — Depends on the specific implementation being used. Generally, messages are formatted as a payload of data, along with additional metadata such as message headers and properties. The payload can be in a variety of formats, such as XML, JSON, or binary data.
- 2) Message Size — The maximum message size depends on the specific implementation being used. Large messages may need to be split into smaller chunks for processing.
- 3) Error Handling — When errors occur, ensure that message processing is not disrupted and errors are properly handled. So, implement appropriate retry mechanisms and error reporting. For example, the MQ transaction system returns an error message to the sender and the sender can resend the message or notify an administrator. There are some error-handling strategies to implement such as setting up a dedicated error-handling process, implementing retry mechanisms, or using logging and monitoring tools to track errors and performance metrics.
References
If you’ve found any of my articles helpful or useful then please consider throwing a coffee my way to help support my work or give me patronage😊, by using
Last but not least, if you are not a Medium Member yet and plan to become one, I kindly ask you to do so using the following link. I will receive a portion of your membership fee at no additional cost to you.
