Skip to main content
Version: 0.5.4

Load data from an API

In this section, we will retrieve and load data from the GitHub API into DuckDB. Specifically, we will load issues from our dlt-hub/dlt repository. We picked DuckDB as our destination because it is a lightweight, in-process database that is easy to set up and use.

Before we start, make sure you have installed dlt with the DuckDB dependency:

pip install "dlt[duckdb]"
tip

Need help with this tutorial? Join our Slack community for quick support.

Create a pipeline

First, we need to create a pipeline. Pipelines are the main building blocks of dlt and are used to load data from sources to destinations. Open your favorite text editor and create a file called github_issues.py. Add the following code to it:

import dlt
from dlt.sources.helpers import requests

# Specify the URL of the API endpoint
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
# Make a request and check if it was successful
response = requests.get(url)
response.raise_for_status()

pipeline = dlt.pipeline(
pipeline_name="github_issues",
destination="duckdb",
dataset_name="github_data",
)
# The response contains a list of issues
load_info = pipeline.run(response.json(), table_name="issues")

print(load_info)

Here's what the code above does:

  1. It makes a request to the GitHub API endpoint and checks if the response is successful.
  2. Then it creates a dlt pipeline with the name github_issues and specifies that the data should be loaded to the duckdb destination and the github_data dataset. Nothing gets loaded yet.
  3. Finally, it runs the pipeline with the data from the API response (response.json()) and specifies that the data should be loaded to the issues table. The run method returns a LoadInfo object that contains information about the loaded data.

Run the pipeline

Save github_issues.py and run the following command:

python github_issues.py

Once the data has been loaded, you can inspect the created dataset using the Streamlit app:

dlt pipeline github_issues show

Append or replace your data

Try running the pipeline again with python github_issues.py. You will notice that the issues table contains two copies of the same data. This happens because the default load mode is append. It is very useful, for example, when you have daily data updates and you want to ingest them.

To get the latest data, we'd need to run the script again. But how to do that without duplicating the data? One option is to tell dlt to replace the data in existing tables in the destination by using replace write disposition. Change the github_issues.py script to the following:

import dlt
from dlt.sources.helpers import requests

# Specify the URL of the API endpoint
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
# Make a request and check if it was successful
response = requests.get(url)
response.raise_for_status()

pipeline = dlt.pipeline(
pipeline_name='github_issues',
destination='duckdb',
dataset_name='github_data',
)
# The response contains a list of issues
load_info = pipeline.run(
response.json(),
table_name="issues",
write_disposition="replace" # <-- Add this line
)

print(load_info)

Run this script twice to see that issues table still contains only one copy of the data.

tip

What if the API has changed and new fields get added to the response? dlt will migrate your tables! See the replace mode and table schema migration in action in our Schema evolution colab demo.

Learn more:

Declare loading behavior

So far we have been passing the data to the run method directly. This is a quick way to get started. However, frequenly, you receive data in chunks, and you want to load it as it arrives. For example, you might want to load data from an API endpoint with pagination or a large file that does not fit in memory. In such cases, you can use Python generators as a data source.

You can pass a generator to the run method directly or use the @dlt.resource decorator to turn the generator into a dlt resource. The decorator allows you to specify the loading behavior and relevant resource parameters.

Load only new data (incremental loading)

Let's improve our GitHub API example and get only issues that were created since last load. Instead of using replace write disposition and downloading all issues each time the pipeline is run, we do the following:

import dlt
from dlt.sources.helpers import requests

@dlt.resource(table_name="issues", write_disposition="append")
def get_issues(
created_at=dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z")
):
# NOTE: we read only open issues to minimize number of calls to the API.
# There's a limit of ~50 calls for not authenticated Github users.
url = (
"https://api.github.com/repos/dlt-hub/dlt/issues"
"?per_page=100&sort=created&directions=desc&state=open"
)

while True:
response = requests.get(url)
response.raise_for_status()
yield response.json()

# Stop requesting pages if the last element was already
# older than initial value
# Note: incremental will skip those items anyway, we just
# do not want to use the api limits
if created_at.start_out_of_range:
break

# get next page
if "next" not in response.links:
break
url = response.links["next"]["url"]

pipeline = dlt.pipeline(
pipeline_name="github_issues_incremental",
destination="duckdb",
dataset_name="github_data_append",
)

load_info = pipeline.run(get_issues)
row_counts = pipeline.last_trace.last_normalize_info

print(row_counts)
print("------")
print(load_info)

Let's take a closer look at the code above.

We use the @dlt.resource decorator to declare the table name into which data will be loaded and specify the append write disposition.

We request issues for dlt-hub/dlt repository ordered by created_at field (descending) and yield them page by page in get_issues generator function.

We also use dlt.sources.incremental to track created_at field present in each issue to filter in the newly created.

Now run the script. It loads all the issues from our repo to duckdb. Run it again, and you can see that no issues got added (if no issues were created in the meantime).

Now you can run this script on a daily schedule and each day you’ll load only issues created after the time of the previous pipeline run.

tip

Between pipeline runs, dlt keeps the state in the same database it loaded data to. Peek into that state, the tables loaded and get other information with:

dlt pipeline -v github_issues_incremental info

Learn more:

Update and deduplicate your data

The script above finds new issues and adds them to the database. It will ignore any updates to existing issue text, emoji reactions etc. To get always fresh content of all the issues you combine incremental load with merge write disposition, like in the script below.

import dlt
from dlt.sources.helpers import requests

@dlt.resource(
table_name="issues",
write_disposition="merge",
primary_key="id",
)
def get_issues(
updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
# NOTE: we read only open issues to minimize number of calls to
# the API. There's a limit of ~50 calls for not authenticated
# Github users
url = (
"https://api.github.com/repos/dlt-hub/dlt/issues"
f"?since={updated_at.last_value}&per_page=100&sort=updated"
"&directions=desc&state=open"
)

while True:
response = requests.get(url)
response.raise_for_status()
yield response.json()

# Get next page
if "next" not in response.links:
break
url = response.links["next"]["url"]

pipeline = dlt.pipeline(
pipeline_name="github_issues_merge",
destination="duckdb",
dataset_name="github_data_merge",
)
load_info = pipeline.run(get_issues)
row_counts = pipeline.last_trace.last_normalize_info

print(row_counts)
print("------")
print(load_info)

Above we add primary_key argument to the dlt.resource() that tells dlt how to identify the issues in the database to find duplicates which content it will merge.

Note that we now track the updated_at field — so we filter in all issues updated since the last pipeline run (which also includes those newly created).

Pay attention how we use since parameter from GitHub API and updated_at.last_value to tell GitHub to return issues updated only after the date we pass. updated_at.last_value holds the last updated_at value from the previous run.

Learn more about merge write disposition.

Using pagination helper

In the previous examples, we used the requests library to make HTTP requests to the GitHub API and handled pagination manually. dlt has the built-in REST client that simplifies API requests. We'll pick the paginate() helper from it for the next example. The paginate function takes a URL and optional parameters (quite similar to requests) and returns a generator that yields pages of data.

Here's how the updated script looks:

import dlt
from dlt.sources.helpers.rest_client import paginate

@dlt.resource(
table_name="issues",
write_disposition="merge",
primary_key="id",
)
def get_issues(
updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
for page in paginate(
"https://api.github.com/repos/dlt-hub/dlt/issues",
params={
"since": updated_at.last_value,
"per_page": 100,
"sort": "updated",
"direction": "desc",
"state": "open",
},
):
yield page

pipeline = dlt.pipeline(
pipeline_name="github_issues_merge",
destination="duckdb",
dataset_name="github_data_merge",
)
load_info = pipeline.run(get_issues)
row_counts = pipeline.last_trace.last_normalize_info

print(row_counts)
print("------")
print(load_info)

Let's zoom in on the changes:

  1. The while loop that handled pagination is replaced with reading pages from the paginate() generator.
  2. paginate() takes the URL of the API endpoint and optional parameters. In this case, we pass the since parameter to get only issues updated after the last pipeline run.
  3. We're not explicitly setting up pagination, paginate() handles it for us. Magic! Under the hood, paginate() analyzes the response and detects the pagination method used by the API. Read more about pagination in the REST client documentation.

Next steps

Continue your journey with the Resource Grouping and Secrets tutorial.

If you want to take full advantage of the dlt library, then we strongly suggest that you build your sources out of existing building blocks:

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.