Skip to main content

Command Palette

Search for a command to run...

From For Loops to For Each

Optimizing Databricks Workflows with For Each Task Type

Updated
3 min read
K

I’m the kuya at Kurdapyo Labs — a recovering Oracle developer who saw the light and helped migrate legacy systems out of Oracle (and saved a lot of money doing it). I used to write PL/SQL, Perl, ksh, Bash, and all kinds of hand-crafted ETL. These days, I wrestle with PySpark, Airflow, Terraform, and YAML that refuses to cooperate. I’ve been around long enough to know when things were harder… and when they were actually better. This blog is where I write (and occasionally rant) about modern data tools — especially the ones marketed as “no-code” that promise simplicity, but still break in production anyway.

Disclaimer: These are my thoughts—100% my own, not my employer’s, my client’s, or that one loud guy on tech Twitter. I’m just sharing what I’ve learned (and unlearned) along the way. No promises, no warranties—just real talk, some opinions, and the occasional coffee/beer-fueled rant.

If something here helps you out, awesome! If you think I’ve missed something or want to share your own take, I’d love to hear from you. Let’s learn from each other.

I'm a strong advocate for using the enterprise chosen orchestration tool for complex data pipelines. However, there are instances where using that strictly is not practical, and the scheduling tool within the database itself makes more sense.

Take Databricks, for example. We create Databricks workflows, but the Databricks workflow itself acts as an orchestrator. The challenge lies in determining what should be executed as a single Databricks workflow and what should be tasks within that workflow.

The Problem and Initial Approach

The problem I wanted to solve was flattening nested JSON data into multiple child tables. This required using the variant datatype to transform and store the data in standard tables with native datatypes. While creating a separate Databricks workflow for each table was possible, it wasn't practical. Instead, we chose to use a single Databricks workflow, triggered by the enterprise orchestration tool (Airflow), to process all the necessary tables. The simplest solution was to go through each table one by one to refine the flattening logic. This was version 1 of the implementation.

The Issue with the Loop

While this approach worked in most cases, the total processing time was the sum of all individual iterations. For example, if we had 10 tables that finished in 1 minute each and 2 tables that took 20 minutes each, the entire workflow would take 50 minutes to complete. Adding more compute power wouldn't significantly reduce this time and would only increase costs exponentially.

For Each to the Rescue

This is where the "for each" Databricks task type becomes helpful. Instead of using a loop, I restructured the iterations into a single generic notebook, which became an individual Databricks workflow task. In the job definition, I can specify the individual tables in a JSON array that is formatted for YAML.

tasks:
  - task_key: "process"
    for_each_task:
      inputs: "[\"table_a\",\"table_b\",\"table_c\"]"
      concurrency: 2
      task:
        task_key: "process_iterator"
        notebook_task:
          base_parameters:
            p_table: "{{input}}"
          notebook_path: "process.py"

A Pre-Task to Make it Dynamic

That's all well and good, but sometimes we don't want to hardcode everything in the job definition. We want things to be dynamic.

This is where introducing a pre-task becomes useful. By using another task, we can set up some Python code that builds an array and passes it to the next tasks.

For example:

# sample logic to build a list for the foreach
tables_to_process = ["table_1", "table_2"]
tables_to_process.append("table_3")

dbutils.jobs.taskValues.set(key="tables_to_process", value=tables_to_process)

How It All Looks

The job definition should look like this:

tasks:
  - task_key: "prepare"
    notebook_task:
      notebook_path: "prepare.py"
  - task_key: "process"
    depends_on:
      - task_key: prepare
    for_each_task:
      inputs: "{{ tasks.prepare.values.tables_to_process }}"
      concurrency: 4 # or whatever
      task:
        task_key: "process_iterator"
        notebook_task:
          base_parameters:
            p_table: "{{input}}"
        notebook_path: "process.py"

Once everything is set up, the Databricks job/workflow definition will have the "for each" task receiving input from the pre-task's output.

After deployment:

In a dynamic "for each" setup, you won't see the actual list of tables or iterations beforehand. This list only appears when the job runs and finishes the pre-step. Once that's done, the "for each" task starts. Clicking on the "for each" task takes you to a page that looks like this:

Interestingly, the iterations were not in alphabetical order, and I haven't needed to make them alphabetical.

Conclusion

So there you have it, the "for each" loop. It's quite useful and can help improve serialized for loops if you have them.

I did encounter one limitation. Currently, the "for each" task can only take inputs from other tasks. It can't pass task values to the next tasks. My workaround was to append the results to a single table which the next tasks can query later.