Construire des workflows robustes avec Temporal.io et Go

Workflows in Go using Temporal

Lorsqu'il s'agit de construire des workflows complexes et de longue durée en Go, les développeurs sont souvent confrontés à des défis en matière de gestion des dépendances, de gestion des échecs et de garantie du déterminisme. Temporal vient à la rescousse en fournissant un cadre puissant qui favorise le découplage et la flexibilité dans le développement des workflows.

Dans cet article de blog, nous explorerons comment Temporal permet le découplage des aspects internes du moteur tout en maintenant la déterminisme. Nous plongerons dans des concepts essentiels tels que les workers, les task queues, les activités, les heartbeats, les délais et les réessais, en montrant comment Temporal permet aux développeurs de construire des systèmes résilients et évolutifs.

Comment Temporal fonctionne ?

Le cluster Temporal mène sa propre vie une fois que vous l'avez déployé sur votre infrastructure. C'est le moteur de l'exécution de vos workflows, mais ce qui est intéressant, c'est que vous avez un contrôle total sur le code que vous souhaitez utiliser dans vos workflows.

En effet, vous devrez coder vos Workflows, vos Activités (blocs dans vos workflow) et ensuite les enregistrer dans un Worker.

Comment ça fonctionne

Vous avez un contrôle total sur le code du worker. Vous spécifiez quel workflow ou activité il peut gérer, et vous pouvez le mettre à l'échelle en fonction de vos besoins.

Le worker s'enregistrera ensuite sur le serveur Temporal (service frontal) et utilisera une connexion gRPC pour échanger des informations avec le serveur.

Découplé du moteur de workflow

L'un des principaux avantages de Temporal est sa capacité à découpler le code de votre workflow du moteur sous-jacent. Cette séparation offre une plus grande flexibilité, maintenabilité et portabilité.

Voyons comment Temporal parvient à réaliser ce découplage.

Temporal introduit une interface de workflow qui agit comme un contrat entre le code de votre application et le moteur Temporal. En définissant votre logique de workflow à l'aide de cette interface, vous restez découplé des détails spécifiques au moteur.

Cette séparation vous permet de vous concentrer sur votre logique métier et d'abstraire les complexités du moteur.

type MyWorkflow interface {
  MyWorkflow(ctx workflow.Context) error
}

type myWorkflowImpl struct{}

func (w *myWorkflowImpl) MyWorkflow(ctx workflow.Context) error {
  // Workflow logic goes here
  return nil
}

Exécution déterministe

Temporal garantit le déterminisme en rejouant le code du workflow depuis le début à chaque exécution. Cela signifie que même si l'exécution du workflow échoue ou redémarre, le résultat reste le même.

Le déterminisme est réalisé en veillant à ce que toutes les opérations non déterministes soient explicitement gérées à l'aide de constructions Temporal telles que le temps et les générateurs de nombres aléatoires.

func (w *myWorkflowImpl) MyWorkflow(ctx workflow.Context) error {
  // Workflow logic goes here
  currentTime := workflow.Now(ctx)
  randomNumber := workflow.GetRandomValue(ctx)

  return nil
}

Workers et Task queues

Les workers sont l'épine dorsale d'un système de workflow basé sur Temporal. Ils sont responsables de l'exécution des activités et de l'écoute des tâches de workflow.

Les task queues servent de canaux de communication entre le moteur Temporal et les workers.

Voyons comment les workers et les task queues contribuent au découplage.

Déploiement des workers

Les workers peuvent être déployés indépendamment du code du workflow, ce qui les rend hautement évolutifs et portables.

Ils peuvent s'exécuter dans différents environnements, ce qui vous permet de répartir efficacement la charge de travail.

Voici un exemple simple de code de worker qui s'enregistre sur la file de tâches my-task-queue et permet d'exécuter le workflow MyWorkflow :

func main() {
  // Create a Temporal client
  c, err := client.NewClient(client.Options{})
  if err != nil {
    log.Fatal("Failed to create Temporal client:", err)
  }
  defer c.Close()

  // Create a worker to process workflow tasks
  workerOptions := worker.Options{
    TaskQueue: "my-task-queue",
  }
  worker := worker.New(c, "my-namespace", workerOptions)
  defer worker.Stop()

  // Register workflow implementation with the worker
  worker.RegisterWorkflow(MyWorkflow)

  // Start the worker
  err = worker.Run(workerOptions)
  if err != nil {
    log.Fatal("Failed to start worker:", err)
  }
}

Configuration d'un workflow

En associant des workers à des workflows et activités spécifiques, vous pouvez découpler le traitement des différents workflows et activités.

Cela vous permet de mettre à l'échelle et de prioriser différentes parties de votre système de manière indépendante :

func main() {
  // Create a worker to process workflow tasks from a specific task queue
  workerOptions := worker.Options{
    TaskQueue: "my-task-queue",
  }


  worker := worker.New(c, "my-namespace", workerOptions)
  worker.RegisterWorkflow(MyWorkflow)

  worker.RegisterActivity(Activity1)
  worker.RegisterActivity(Activity2)
}

Activities : Les blocs de construction des workflow

Les activités représentent des unités de travail individuelles dans les workflows Temporal. Elles encapsulent la logique nécessaire pour effectuer des tâches spécifiques.

Voici comment les activités améliorent le découplage et la flexibilité.

Implémentations d'activités

Les activités sont implémentées séparément du code du workflow, ce qui permet une séparation claire des responsabilités. Cette séparation vous permet de modifier, ajouter ou supprimer des activités sans affecter le code source du workflow.

Voici un exemple de code pour une activité, il s'agit simplement d'une fonction prenant des paramètres et renvoyant des valeurs (ou une erreur) :

func MyActivity(ctx context.Context) error {
  // Activity logic goes here
  return nil
}

Heartbeats, délais d'expiration et réessais

Temporal offre des fonctionnalités puissantes pour gérer les heartbeats, les délais d'expiration et les réessais, ce qui améliore encore la flexibilité et la résilience.

Heartbeats

Les activités de longue durée dans les workflows Temporal peuvent envoyer périodiquement des heartbeats pour indiquer leur progression et éviter les délais d'expiration.

Les heartbeats veillent à ce que les activités ne dépassent pas leurs délais d'expiration définis et fournissent un moyen de gérer efficacement les processus de longue durée.

func MyActivity(ctx context.Context) error {
  for {
    // Perform work
    // ...

    // Send heartbeat to indicate progress
    activity.RecordHeartbeat(ctx, &data)
  }
}

Si une exécution d'activité envoie des heartbeats indiquant sa progression avant de rencontrer une erreur, la nouvelle tentative de réessai aura accès aux informations de progression, ce qui permettra à l'exécution de l'activité de reprendre à partir de l'état de l'échec.

Délais d'expiration

Temporal vous permet de définir des délais d'expiration pour les activités, ce qui vous donne le contrôle sur la durée pendant laquelle une activité peut s'exécuter. Les délais d'expiration aident à éviter les situations où une activité reste bloquée indéfiniment, en garantissant une exécution complète ou un échec en temps opportun.

Vous pouvez le faire directement dans la fonction d'activité :

func MyActivity(ctx context.Context) error {
  timeout := time.Minute
  ctx, cancel := context.WithTimeout(ctx, timeout)
  defer cancel()

  // Activity logic goes here

  return nil
}

Vous pouvez également déclarer plusieurs types de délais d'expiration sur la fonction du workflow lorsque vous exécutez l'activité :

func MyWorkflow(ctx workflow.Context) error {
  opts := workflow.ActivityOptions{
    // ...
    TaskQueue:              "my-task-queue",
    ScheduleToStartTimeout: 5 * time.Minute,
    ScheduleToCloseTimeout: 60 * time.Second,
    StartToCloseTimeout:    30 * time.Second,
  }

  activityCtx := workflow.WithActivityOptions(ctx, opts)

  var result MyActivityResult
  err := workflow.
    ExecuteActivity(activityCtx, MyActivity, MyActivityParams{}).
    Get(ctx, &result)
}

Retries

Les échecs temporaires sont courants dans les systèmes distribués, et Temporal simplifie la gestion de ces échecs grâce à des mécanismes de réessai intégrés.

En configurant des politiques de réessai pour les activités, vous pouvez spécifier le nombre de retry, les intervalles de retry (via une stratégie backoff) et le nombre maximum de tentatives.

Voici un exemple de configuration de la politique de retry pour une activité :

func MyWorkflow(ctx workflow.Context) error {
  opts := workflow.ActivityOptions{
    // ...
    RetryPolicy: &temporal.RetryPolicy{
      InitialInterval:        10 * time.Millisecond,
      BackoffCoefficient:     1.2,
      MaximumInterval:        3 * time.Second,
      MaximumAttempts:        5,
      NonRetryableErrorTypes: nil,
    },
  }

  activityCtx := workflow.WithActivityOptions(ctx, opts)

  var result MyActivityResult
  err := workflow.
    ExecuteActivity(activityCtx, MyActivity, MyActivityParams{}).
    Get(ctx, &result)
}

Gestion des versions

Lorsque votre code et votre logique métier évoluent, votre workflow doit les suivre ! C'est pourquoi Temporal est doté d'une logique de gestion des versions des workflow.

Lorsqu'un workflow est lancé, chaque activité de ce workflow doit être exécutée sur la même version pour éviter tout problème.

Lorsque vous devrez mettre à jour une activité, vous devrez créer une nouvelle version de cette activité et conserver l'ancienne si vous souhaitez être en mesure de rejouer un ancien workflow/activité.

Votre logique de workflow ressemblera alors à ceci :

var err error
v := workflow.GetVersion(ctx, "Step1", workflow.DefaultVersion, 1)
if v == workflow.DefaultVersion {
  err = workflow.ExecuteActivity(ctx, ActivityA, data).Get(ctx, &result1)
} else {
  err = workflow.ExecuteActivity(ctx, ActivityC, data).Get(ctx, &result1)
}
if err != nil {
  return "", err
}

Notez qu'ici, la constante workflow.DefaultVersion (qui est la valeur de départ) est égale à -1.

Aller plus loin

Le kit de développement Temporal pour Go propose d'autres fonctions qui vous permettront d'organiser vos workflows.

Workflow enfant

Par exemple, vous pourrez exécuter des workflows enfants dans vos workflows :

func MyWorkflow1(ctx workflow.Context) (string, error) {
	var result string
	err := workflow.ExecuteChildWorkflow(ctx, MyWorkflow2, "Hello").Get(ctx, &result)
	if err != nil {
		return "", err
	}

	logger.Info("Parent execution (containing child workflow) completed.", "Result", result)
	return result, nil
}

Goroutines de manière déterministe

Certaines fonctions Go ont été réimplémentées pour répondre à vos besoins, de sorte que vous pouvez exécuter des goroutines dans vos workflows à l'aide des fonctions workflow.Go et workflow.Await :

func SampleGoroutineWorkflow(ctx workflow.Context, parallelism int) (results []string, err error) {
	for i := 0; i < parallelism; i++ {
		input1 := fmt.Sprint(i)

    workflow.Go(ctx, func(gCtx workflow.Context) {
			// It is important to use the context passed to the goroutine function
			// An attempt to use the enclosing context would lead to failure.
      var result1 string
			err = workflow.ExecuteActivity(gCtx, Step1, input1).Get(gCtx, &result1)
			if err != nil {
				// Very naive error handling. Only the last error will be returned by the workflow
				return
			}

      var result2 string
			err = workflow.ExecuteActivity(gCtx, Step2, result1).Get(gCtx, &result2)
			if err != nil {
				return
			}
			results = append(results, result2)
		})
	}

	// Wait for Goroutines to complete. Await blocks until the condition function returns true.
	// The function is evaluated on every workflow state change. Consider using `workflow.AwaitWithTimeout` to
	// limit duration of the wait.
	_ = workflow.Await(ctx, func() bool {
		return err != nil || len(results) == parallelism
	})
	return
}

En savoir plus

Si vous souhaitez en savoir plus, je vous invite vivement à plonger dans ce référentiel temporalio/samples-go. Vous y trouverez de nombreux exemples !

Conclusion

Temporal libère les développeurs Go des complexités de l'orchestration des workflows en permettant le découplage et en favorisant la flexibilité.

En séparant le code du workflow des aspects internes du moteur, Temporal permet aux développeurs de se concentrer sur la logique métier tout en maintenant la déterminisme.

Les workers, les files de tâches, les activités, les battements de cœur, les délais d'expiration et les réessais sont des éléments clés qui renforcent le découplage et la résilience. Vous pouvez ainsi construire des systèmes évolutifs et tolérants aux pannes qui s'adaptent aux exigences changeantes et offrent des workflows fiables de longue durée.

N'oubliez pas que cet article de blog n'effleure que la surface des capacités de Temporal. Je vous encourage à approfondir la documentation officielle de Temporal et à explorer les fonctionnalités supplémentaires et les meilleures pratiques pour exploiter pleinement le potentiel de ce puissant framework.

Bon développement de workflow avec Temporal !