Skip to main content

Embeddings

In order to store the data in our vector database, we need to convert the free text into embeddings. Let's look at GitHub first. First when we are extracting data from GitHub, we will want to be careful to avoid rate limiting. We can do this by partitioning our asset into weekly chunks using a WeeklyPartitionsDefinition:

START_TIME = "2023-01-01"
weekly_partition = dg.WeeklyPartitionsDefinition(start_date=START_TIME)

We will supply that partition in the decorator of our asset, as well as an AutomationCondition to ensure that the weekly partition is updated every Monday. The rest of our asset will use the GithubResource we defined earlier and return a list of LangChain Documents:

@dg.asset(
group_name="ingestion",
kinds={"github"},
partitions_def=weekly_partition,
io_manager_key="document_io_manager",
automation_condition=dg.AutomationCondition.on_cron("0 0 * * 1"),
description="""
Ingests raw GitHub issues data from the Dagster repository on a weekly basis.

This asset fetches GitHub issues, including:
- Issue title and body
- Comments and discussion threads
- Issue metadata (status, labels, assignees)
- Creation and update timestamps

Technical Details:
- Runs weekly (Mondays at midnight)
- Processes issues in weekly partitions
- Converts issues to Document format for embedding
- Preserves all issue metadata for search context

Returns:
List[Document]: Collection of Document objects containing issue content
and associated metadata for each weekly partition
""",
)
def github_issues_raw(
context: dg.AssetExecutionContext,
github: GithubResource,
) -> list[Document]:
start, end = context.partition_time_window
context.log.info(f"Finding issues from {start} to {end}")

issues = github.get_issues(
start_date=start.strftime("%Y-%m-%d"), end_date=end.strftime("%Y-%m-%d")
)

return github.convert_issues_to_documents(issues)

The next asset will convert those Documents to vectors and upload them to Pinecone. In order to generate the embeddings, we will need an AI model. In this case, we will use OpenAI's text-embedding-3-small model to transform the text we have collected from GitHub into embeddings. Dagster provides an OpenAIResource to interact with the OpenAI client ,and we will use that to create the embeddings. This asset will also create the index in Pinecone:

@dg.asset(
group_name="embeddings",
kinds={"github", "openai", "pinecone"},
partitions_def=weekly_partition,
io_manager_key="document_io_manager",
automation_condition=dg.AutomationCondition.any_deps_updated(),
description="""
Creates and stores vector embeddings for GitHub issues in Pinecone.

This asset processes weekly batches of GitHub issues by:
1. Converting issue content to OpenAI embeddings
2. Storing embeddings and metadata in Pinecone vector database
3. Using namespace 'dagster-github' for unified GitHub content storage

Dependencies:
- github_issues_raw: Raw issue documents from weekly partition

Technical Details:
- Uses OpenAI's text-embedding-3-small model
- Embedding dimension: 1536
- Stores in Pinecone index: 'dagster-knowledge'
- Preserves metadata like issue status, labels, and timestamps
- Processes issues in weekly batches

Vector Storage:
- Each vector contains issue content embedding and metadata
- Uses auto-generated sequential IDs
- Stored in 'dagster-github' namespace for consolidated search

Returns:
MaterializeResult with metadata about number of issues processed
""",
)
def github_issues_embeddings(
context: dg.AssetExecutionContext,
openai: OpenAIResource,
pinecone: PineconeResource,
github_issues_raw: list[Document],
) -> dg.MaterializeResult:
# Create index if doesn't exist
pinecone.create_index("dagster-knowledge", dimension=1536)
index, namespace_kwargs = pinecone.get_index("dagster-knowledge", namespace="dagster-github")

texts = [doc.page_content for doc in github_issues_raw]
with openai.get_client(context) as client:
embeddings = [
item.embedding
for item in client.embeddings.create(model="text-embedding-3-small", input=texts).data
]
# Prepare metadata
metadata = [
{k: v for k, v in doc.metadata.items() if isinstance(v, (str, int, float, bool))}
for doc in github_issues_raw
]

# Upsert to Pinecone with namespace
index.upsert(
vectors=zip(
[str(i) for i in range(len(texts))], # IDs
embeddings,
metadata,
),
**namespace_kwargs, # Include namespace parameters
)

return dg.MaterializeResult(
metadata={
"number_of_issues": len(github_issues_raw),
}
)

This process will be very similar for the GitHub discussions content.

Custom IO Manager

Looking at the code, you may have noticed the document_io_manager. Because LangChain Documents are a special object type, we need to do some work to serialize and deserialize the data. I/O Managers are responsible for handling the inputs and outputs of assets and how the data is persisted. This I/O manager will use the local file system to save the output of assets returning Documents as JSON files. It will then read those JSON files back into Documents in assets that take in those inputs:

class DocumentIOManager(dg.IOManager):
def __init__(self, base_dir):
self.base_dir = base_dir
os.makedirs(base_dir, exist_ok=True)

def handle_output(self, context, obj):
# Convert documents to simple dicts
file_path = os.path.join(self.base_dir, f"{context.asset_key.path[-1]}.json")

# Convert documents to simple dicts
serialized_docs = [
{"page_content": doc.page_content, "metadata": doc.metadata} for doc in obj
]

# Save as JSON
with open(file_path, "w") as f:
json.dump(serialized_docs, f)

def load_input(self, context):
file_path = os.path.join(self.base_dir, f"{context.asset_key.path[-1]}.json")

if not os.path.exists(file_path):
return []

# Load and reconstruct Documents
with open(file_path) as f:
data = json.load(f)

return [
Document(page_content=doc["page_content"], metadata=doc["metadata"]) for doc in data
]

This I/O manager will be attached to the Definitions of the project, which also contains our assets and resources:

defs = dg.Definitions(
assets=[*ingestion_assets, *retrieval_assets],
resources={
"github": github_resource,
"scraper": scraper_resource,
"pinecone": pinecone_resource,
"openai": OpenAIResource(api_key=dg.EnvVar("OPENAI_API_KEY")),
"document_io_manager": document_io_manager.configured({"base_dir": "data/documents"}),
},
)

Scraping embeddings

The assets for the documentation scraping will behave similar to the GitHub assets. We will not partition by date like Github, so we can leave out that out of the asset. But like the GitHub assets, our ingestion asset will return a collection of Documents that will be handled by the I/O manager. This asset will also include the AutomationCondition to update data on the same cadence as our GitHub source.

The asset that generates the embeddings with the documentation site will need one additional change. Because the content of the documentation pages is so large, we need to split data into chunks. The split_text function ensures that we split the text into equal length chunks. We also want to keep similar chunks together and associated with the page they were on so we will hash the index of the URL to ensure data stays together. correctly Once the data is chunked, it can be batched and sent to Pinecone:

    # Create vectors with metadata
vectors = []
for i, embedding in enumerate(all_embeddings):
doc_idx = chunk_to_doc[i]
doc = docs_scrape_raw[doc_idx]
meta = {k: v for k, v in doc.metadata.items() if isinstance(v, (str, int, float, bool))}
meta["chunk_index"] = i
doc_id = f"{hashlib.md5(doc.metadata['source'].encode()).hexdigest()}_{i}"
vectors.append((doc_id, embedding, meta))

# Upsert when batch is full or at end
if len(vectors) >= PINECONE_BATCH_SIZE or i == len(all_embeddings) - 1:
index.upsert(vectors=vectors, **namespace_kwargs)
vectors = []
time.sleep(1)

Dagster is now set to continuously ingest data from all of our configured sources and populate the Pinecone index. We have now completed the main half of our RAG system. Next we need to ensure we can pull relevant information when it is answering questions. We will add one final asset to query our system.

Next steps