Construct robust workflows with Temporal.io using Go

Workflows in Go using Temporal

When it comes to building complex, long-running workflows in Go, developers often face challenges in managing dependencies, handling failures, and ensuring determinism. Temporal comes to the rescue by providing a powerful framework that promotes decoupling and flexibility in workflow development.

In this blog post, we will explore how Temporal enables decoupling from engine internals while maintaining determinism. We'll delve into essential concepts such as workers, task queues, activities, heartbeats, timeouts, and retries, showcasing how Temporal empowers developers to build resilient and scalable systems.

How Temporal works

The Temporal cluster lives its own life once you have deployed it on your infrastructure. This is the machinery of you workflows execution but what's interesting is that you have a full control on code you want to be used on your workflows.

Indeed, you will have to code your Workflows, your Activities (blocks into your workflows) and then register them into a Worker.

How it works

You have the full control on the worker code, you specify which workflow or activity it can handle and you can scale it depending on your needs.

The worker will then register on the Temporal server (frontend service) and is using a gRPC connection to exchange with the server.

Decoupling from Engine Internals

One of the key advantages of Temporal is its ability to decouple your workflow code from the underlying engine. This separation allows for greater flexibility, maintainability, and portability.

Let's see how Temporal achieves this decoupling.

Temporal introduces a workflow interface that acts as a contract between your application code and the Temporal engine. By defining your workflow logic using this interface, you remain decoupled from engine-specific details.

This separation enables you to focus on your business logic and abstract away engine complexities.

type MyWorkflow interface {
  MyWorkflow(ctx workflow.Context) error
}

type myWorkflowImpl struct{}

func (w *myWorkflowImpl) MyWorkflow(ctx workflow.Context) error {
  // Workflow logic goes here
  return nil
}

Deterministic Execution

Temporal guarantees determinism by replaying workflow code from the beginning on each execution. This means that even if the workflow execution fails or restarts, the outcome remains the same.

Determinism is achieved by ensuring that all non-deterministic operations are explicitly handled using Temporal constructs like time and random number generators.

func (w *myWorkflowImpl) MyWorkflow(ctx workflow.Context) error {
  // Workflow logic goes here
  currentTime := workflow.Now(ctx)
  randomNumber := workflow.GetRandomValue(ctx)

  return nil
}

Workers and Task Queues

Workers are the backbone of a Temporal-based workflow system. They are responsible for executing activities and listening for workflow tasks.

Task queues act as communication channels between the Temporal engine and workers.

Let's explore how workers and task queues contribute to decoupling.

Worker Deployment

Workers can be deployed independently from the workflow code, making them highly scalable and portable.

They can run in different environments, enabling you to distribute the workload effectively.

Here is a simple worker code example, registering to task queue my-task-queue and allowing to execute workflow MyWorkflow:

func main() {
  // Create a Temporal client
  c, err := client.NewClient(client.Options{})
  if err != nil {
    log.Fatal("Failed to create Temporal client:", err)
  }
  defer c.Close()

  // Create a worker to process workflow tasks
  workerOptions := worker.Options{
    TaskQueue: "my-task-queue",
  }
  worker := worker.New(c, "my-namespace", workerOptions)
  defer worker.Stop()

  // Register workflow implementation with the worker
  worker.RegisterWorkflow(MyWorkflow)

  // Start the worker
  err = worker.Run(workerOptions)
  if err != nil {
    log.Fatal("Failed to start worker:", err)
  }
}

Task Queue Configuration

By associating workers with specific task queues, you can decouple the processing of different workflows and activities.

This allows you to scale and prioritize different parts of your system independently:

func main() {
  // Create a worker to process workflow tasks from a specific task queue
  workerOptions := worker.Options{
    TaskQueue: "my-task-queue",
  }


  worker := worker.New(c, "my-namespace", workerOptions)
  worker.RegisterWorkflow(MyWorkflow)

  worker.RegisterActivity(Activity1)
  worker.RegisterActivity(Activity2)
}

Activities: Building Blocks of Workflows

Activities represent individual units of work in Temporal workflows. They encapsulate the logic required to perform specific tasks.

Here's how activities enhance decoupling and flexibility.

Activity Implementations

Activities are implemented separately from the workflow code, providing a clear separation of concerns. This separation allows you to modify, add, or remove activities without affecting the workflow codebase.

Here is a code example for an activity, this is just a simple function taking parameters and returning values (or an error):

func MyActivity(ctx context.Context) error {
  // Activity logic goes here
  return nil
}

Heartbeats, Timeouts, and Retries

Temporal provides powerful features for handling heartbeats, timeouts, and retries, further enhancing flexibility and resilience.

Heartbeats

Long-running activities in Temporal workflows can periodically send heartbeats to indicate progress and prevent timeouts.

Heartbeats ensure that activities don't exceed their defined timeouts and provide a way to handle long-running processes efficiently.

func MyActivity(ctx context.Context) error {
  for {
    // Perform work
    // ...

    // Send heartbeat to indicate progress
    activity.RecordHeartbeat(ctx, &data)
  }
}

If an activity execution heartbeats its progress before it failed, the retry attempt will have access to the progress information, so that the activity execution can resume from the failed state.

Timeouts

Temporal allows you to set timeouts for activities, providing control over how long an activity can run. Timeouts help avoid situations where an activity gets stuck indefinitely, ensuring timely completion or failure.

You can do it on activity function directly:

func MyActivity(ctx context.Context) error {
  timeout := time.Minute
  ctx, cancel := context.WithTimeout(ctx, timeout)
  defer cancel()

  // Activity logic goes here

  return nil
}

You can also declare multiple kind of timeouts on workflow function, when you execute the activity:

func MyWorkflow(ctx workflow.Context) error {
  opts := workflow.ActivityOptions{
    // ...
    TaskQueue:              "my-task-queue",
    ScheduleToStartTimeout: 5 * time.Minute,
    ScheduleToCloseTimeout: 60 * time.Second,
    StartToCloseTimeout:    30 * time.Second,
  }

  activityCtx := workflow.WithActivityOptions(ctx, opts)

  var result MyActivityResult
  err := workflow.
    ExecuteActivity(activityCtx, MyActivity, MyActivityParams{}).
    Get(ctx, &result)
}

Retries

Transient failures are common in distributed systems, and Temporal simplifies the handling of such failures through built-in retry mechanisms.

By configuring retry policies for activities, you can specify the number of retries, backoff intervals, and maximum attempts.

func MyWorkflow(ctx workflow.Context) error {
  opts := workflow.ActivityOptions{
    // ...
    RetryPolicy: &temporal.RetryPolicy{
      InitialInterval:        10 * time.Millisecond,
      BackoffCoefficient:     1.2,
      MaximumInterval:        3 * time.Second,
      MaximumAttempts:        5,
      NonRetryableErrorTypes: nil,
    },
  }

  activityCtx := workflow.WithActivityOptions(ctx, opts)

  var result MyActivityResult
  err := workflow.
    ExecuteActivity(activityCtx, MyActivity, MyActivityParams{}).
    Get(ctx, &result)
}

Versioning

When your code and business logic evolves, you workflow have to follow them! This is why Temporal comes with a Workflow versioning logic.

When a workflow is started, each activity in this workflow have to be executed on the same version to avoid any issue.

When you will need to update an activity, you will have to create a new version of this activity and keep the existing one if you want to be able to replay an old workflow/activity.

Your workflow logic will then look like:

var err error
v := workflow.GetVersion(ctx, "Step1", workflow.DefaultVersion, 1)
if v == workflow.DefaultVersion {
  err = workflow.ExecuteActivity(ctx, ActivityA, data).Get(ctx, &result1)
} else {
  err = workflow.ExecuteActivity(ctx, ActivityC, data).Get(ctx, &result1)
}
if err != nil {
  return "", err
}

Note that here workflow.DefaultVersion constant (which is the start value) is equal to -1.

Going further

Temporal Go SDK comes with more functions that you will like to organize your workflows.

Child workflows

For instance, you will be able to execute child workflows in your workflows:

func MyWorkflow1(ctx workflow.Context) (string, error) {
	var result string
	err := workflow.ExecuteChildWorkflow(ctx, MyWorkflow2, "Hello").Get(ctx, &result)
	if err != nil {
		return "", err
	}

	logger.Info("Parent execution (containing child workflow) completed.", "Result", result)
	return result, nil
}

Goroutines in a deterministic way

Some Go functions have been re-implemented to gather your needs, such as you can execute Goroutines in your workflows using workflow.Go and workflow.Await functions:

func SampleGoroutineWorkflow(ctx workflow.Context, parallelism int) (results []string, err error) {
	for i := 0; i < parallelism; i++ {
		input1 := fmt.Sprint(i)

    workflow.Go(ctx, func(gCtx workflow.Context) {
			// It is important to use the context passed to the goroutine function
			// An attempt to use the enclosing context would lead to failure.
      var result1 string
			err = workflow.ExecuteActivity(gCtx, Step1, input1).Get(gCtx, &result1)
			if err != nil {
				// Very naive error handling. Only the last error will be returned by the workflow
				return
			}

      var result2 string
			err = workflow.ExecuteActivity(gCtx, Step2, result1).Get(gCtx, &result2)
			if err != nil {
				return
			}
			results = append(results, result2)
		})
	}

	// Wait for Goroutines to complete. Await blocks until the condition function returns true.
	// The function is evaluated on every workflow state change. Consider using `workflow.AwaitWithTimeout` to
	// limit duration of the wait.
	_ = workflow.Await(ctx, func() bool {
		return err != nil || len(results) == parallelism
	})
	return
}

Learn more

In case you want to learn more, I really invite you to dive into this temporalio/samples-go repository. There are plenty of examples!

Conclusion

Temporal liberates Go developers from the complexities of workflow orchestration by enabling decoupling and promoting flexibility.

By separating the workflow code from engine internals, Temporal empowers developers to focus on business logic while maintaining determinism.

Workers, task queues, activities, heartbeats, timeouts, and retries are key building blocks that enhance decoupling and resilience. You can build scalable and fault-tolerant systems that adapt to changing requirements and provide reliable long-running workflows.

Remember, this blog post only scratches the surface of Temporal's capabilities. I encourage you to dive deeper into the official Temporal documentation and explore additional features and best practices to leverage the full potential of this powerful framework.

Happy workflow development with Temporal!