Le pattern Transactional Outbox avec Postgresql et Rabbitmq

7 min read

Comment écrire dans deux systèmes différents, par exemple RabbitMQ et Postgresql ?

Vous allez me dire, c’est plutôt simple.

kotlin
@Transactional   
fun doSomething() {
    ... 
    // save in database through JPA
    myRepo.save(myEntity)
    // post message
    broker.publishEvent(event)
    ...
}

Mais que passe t’il en cas d’erreur, si la transaction SQL échoue ? On se retrouve avec un message publié mais pour une opération qui n’a jamais eu lieu.

Les transactions two phase commit

C’est un problème relativement classique et on a l’habitude de le traiter avec des transactions 2 Phase commit. L’idée étant d’introduire une transaction sur toutes les opérations vers des systèmes externes, donc incluant RabbitMQ ici.

C’est un mécanisme un peu complexe qui nécessite un coordinateur supplémentaire qui va s’assurer que tout se passe bien pour ensuite valider l’écriture dans l’ensemble des systèmes.

2 phase commit workflow
2 phase commit workflow

Mais il y a plusieurs problèmes.

Le premier c’est qu’il n’est pas possible de faire des transactions 2PC avec RabbitMQ (appelé aussi transaction XA). Ce qui, vous me l’accorderez, est déjà un problème relativement important en soit.

extrait de la documentation qui confirme l'absence de transactions XA avec RabbitMQ
extrait de la documentation qui confirme l'absence de transactions XA avec RabbitMQ

On pourrait utiliser ActiveMQ qui supporte les transactions 2PC. Mais d’un autre côté, ce serait dommage de faire un article pour Postgresql et RabbitMQ pour finalement conclure qu’il faut utiliser ActiveMQ non ?

Le second problème, c’est que dans tout les cas une transaction 2PC pénalise les performances globales du système. Pour synchroniser deux ou plusieurs systèmes, non seulement le temps total de l’opération ne pourra être inférieur au temps minimal du système le plus lent, mais vous ajoutez également un cout lié à la coordination.

Le dernier problème, c’est que vous dégradez la disponibilité globale. La disponibilité étant la multiplication des disponibilités moyenne de chaque système. Par exemple avec deux systèmes avec 99% de disponibilité, le système total obtient une disponibilité de 98%.

$0,99 \times 0,99 = 0,9801$

Et ça c’était globalement l’état de mes connaissances jusqu’à hier. J’avais déjà utilisé des transactions XA, j’avais déjà géré des problématiques de systèmes distribuées et j’ai déjà travaillé sur un tas de scénarios de mitigation de ce type de problème.

J’ai ajouté hier RabbitMQ dans la stack de Writizzy et j’ai utilisé une autre solution : le pattern Transactional Outbox.

Le pattern transactional outbox

Si le concept sous-jacent n’est pas forcément nouveau, ce pattern spécifiquement a été popularisé par Chris Richardson entre 2014 et 2016.

Le concept est “simple” :

  • ne s’appuyer que sur la base de données pour publier un évènement (étape 1)
  • lire la table dans un job asynchrone (étape 2)
  • publier dans la file de message depuis le job (étape 3)

Le code devient

kotlin
@Transactional   
fun doSomething() {
    ... 
    // save in database through JPA
    myRepo.save(myEntity)
    // save in database
    outboxRepo.save(event)
    ...
}

Nous n’avons plus qu’une seule transaction, et la garantie que l’event est publié uniquement si la transaction réussit. Event qui sera envoyé dans RabbitMQ plus tard.

Maintenant, justement pour envoyer dans RabbitMQ, il vous faut un job pour lire la table :

javascript
@Scheduled(fixedDelay = 500)
@SchedulerLock(name = "outbox_publisher", lockAtMostFor = "PT30S", lockAtLeastFor = "PT0S")
fun run() {
    outboxPublisherService.publishPending()
}

Vous noterez ici l’usage de SchedulerLock qui permet l’usage de scheduler dans un contexte multi-noeud, pour éviter une lecture simultanée par plusieurs applications. Ca permet de remplacer Quartz que j’utilisais fréquemment jusqu’à présent.

Et il vous faut le code du service de publication

javascript

    fun publishPending() {
        val events = outboxEventRepository.findTop20ByOrderByCreatedAtAsc()
        events.forEach { event -> publishEvent(event) }
    }

    private fun publishEvent(event: OutboxEvent) {
        try {
            rabbitTemplate.invoke { t ->
                t.convertAndSend(event.exchange, event.routingKey, event.payload) { message ->
                    message.messageProperties.messageId = event.id
                    message.messageProperties.headers[HEADER_ORIGINAL_EXCHANGE] = event.exchange
                    message.messageProperties.headers[HEADER_ORIGINAL_ROUTING_KEY] = event.routingKey
                    message
                }
                t.waitForConfirmsOrDie(5_000)
            }
            outboxEventRepository.delete(event)
        } catch (e: Exception) {
            logger.error("Failed to publish outbox event ${event.id} (${event.eventType})", e)
        }
    }

Ici vous noterez qu’on a activé le mode publisher confirms de RabbitMQ et qu’on attend la confirmation de l’écriture : t.waitForConfirmsOrDie(5_000)

Mais vous pourriez me dire : “tu as de nouveau un problème double écriture dans deux systèmes”.

Et… c’est pas faux.

Mais ici c’est plus simple. Je n’ai que 3 cas :

  • Tout se passe bien, rien à dire, c’est parfait.
  • RabbitMQ échoue => on passe dans l’exception, on logge, on ressort, le message reste dans la table et sera réessayé
  • La transaction Postgresql échoue. On passe dans l’exception mais le message est déjà envoyé !!

Le 3eme cas est important, il va arriver donc il faut le gérer. Ca implique que chaque consommateur doit être capable d’être idempotent sur la réception d’un message. C’est à dire que chaque consommateur doit pouvoir accepter deux fois le même évènement sans incident.

Ici les stratégies sont multiples :

  • lire l’Id du message et le stocker quelque part pour vérifier qu’on ne fait pas deux fois l’opération, idéal pour les appels à des services externes
  • déclencher des opérations idempotentes (Exemple : set status = pending, même si on le fait deux fois, ca ne pose pas de problème)

etc…

Et la gestion d’erreur ?

Mais il y a un souci, si RabbitMQ tombe, s’il est injoignable, si le disque est plein, on va logger l’exception et avec un batch qui tourne toutes les 500ms je vous laisse imaginer la quantité astronomique de log que ça va produire. Donc il nous faut un Circuit Breaker. Ca tombe bien, on peut utiliser resilience4j.

Avec un circuit breaker, on va couper l’envoi vers Rabbit en cas d’erreur et attendre un peu avant de réessayer. Je vous laisse le code ici mais je ne détaille pas, ce serait le sujet d’un autre article.

javascript

init {
    circuitBreakerRegistry.circuitBreaker("rabbitmq-outbox").eventPublisher
        .onStateTransition { event ->
            logger.warn("RabbitMQ circuit breaker: ${event.stateTransition}")
        }
}

@CircuitBreaker(name = "rabbitmq-outbox", fallbackMethod = "skipOnOpenCircuit")
fun publishPending() {
    val events = outboxEventRepository.findTop20ByOrderByCreatedAtAsc()
    for (event in events) {
        publishEvent(event)
    }
}

private fun skipOnOpenCircuit(e: CallNotPermittedException) {
    // Circuit open — state transition already logged via eventPublisher
}

private fun publishEvent(event: OutboxEvent) {
    try {
        rabbitTemplate.invoke { t ->
            t.convertAndSend(event.exchange, event.routingKey, event.payload) { message ->
                message.messageProperties.messageId = event.id
                message.messageProperties.headers[HEADER_ORIGINAL_EXCHANGE] = event.exchange
                message.messageProperties.headers[HEADER_ORIGINAL_ROUTING_KEY] = event.routingKey
                message
            }
            t.waitForConfirmsOrDie(5_000)
        }
        outboxEventRepository.delete(event)
    } catch (e: AmqpException) {
        throw e  // broker error → propagates to circuit breaker
    } catch (e: Exception) {
        logger.error("Failed to publish outbox event ${event.id} (${event.eventType})", e)
    }
}

Challenger ses propres certitudes de dev avec l'IA

Comme je le disais plus haut, je connaissais bien le mécanisme des transactions XA et l’état de mes connaissances pour résoudre ce problème s’arrêtait là.

Je me suis demandé avec curiosité si Claude allait me proposer une implémentation plus élégante et j’ai été plutôt surpris. Dans ce genre de cas précis, c’était l’occasion parfaite pour essayer d’apprendre avec l’IA et pas juste de subir un code qu’on ne comprend pas.

Laisser l’IA écrire du code sans contrôle, sans trop de surprise c’est rarement bon. Il faut un œil d’expert et in fine le code produit reste de notre responsabilité et on doit être en mesure de le comprendre. Donc il faut trouver un juste milieu entre laisser faire et micro manager l’IA.

En lui donnant mes contraintes (issues de mon expérience), c’est l’agent qui est venu avec la proposition du pattern Transactional Outbox. Et si au début j’étais un peu sceptique, j’ai essayé de comprendre chaque partie du code pour me l’approprier en posant de multiples questions. Le code n’était pas parfait, c’est suite à ces échanges qu’a été ajouté :

  • le pattern publish confirm
  • la résilience avec resilience4j
  • le scheduler lock pour éviter les soucis en multi noeud
  • l’ajout du messageid dans les headers (pour déduplication)
  • claude avait ajouté une gestion d’état sur les messages qui n’était pas pertinente

Donc oui, c’était pas un plan sans accroc mais c’était une bonne façon pour moi de me forcer à me mettre à jour, me documenter et bref, d’apprendre de nouveaux trucs.

Au delà de l’aspect technique de ce billet, je voulais surtout illustrer sur la méthode que j’utilise pour coder avec un agent, et qui me permet d’allier productivité ET craft.

Written by

Stay in the loop

Get new articles delivered directly to your inbox. No spam, unsubscribe anytime.

0 Comments

No comments yet. Be the first to comment!

Copyright © 2026EventuallycodingPowered by Writizzy