Search code examples
pythonamazon-s3boto3wandb

wandb: artifact.add_reference() option to add specific (not current) versionId or ETag to stop the need for re-upload to s3?


I feel like this should be possible, but I looked through the wandb SDK code and I can't find an easy/logical way to do it. It might be possible to hack it by modifying the manifest entries at some point later (but maybe before the artifact is logged to wandb as then the manifest and the entries might be locked)? I saw things like this in the SDK code:

version = manifest_entry.extra.get("versionID")
etag = manifest_entry.extra.get("etag")

So, I figure we can probably edit those?

UPDATE

So, I tried to hack it together with something like this and it works but it feels wrong:

import os
import wandb
import boto3
from wandb.util import md5_file

ENTITY = os.environ.get("WANDB_ENTITY")
PROJECT = os.environ.get("WANDB_PROJECT")
API_KEY = os.environ.get("WANDB_API_KEY")

api = api = wandb.Api(overrides={"entity": ENTITY, "project": ENTITY})
run = wandb.init(entity=ENTITY, project=PROJECT, job_type="test upload")
file = "admin2Codes.txt"  # "admin1CodesASCII.txt" # (both already on s3 with a couple versions)
artifact = wandb.Artifact("test_data", type="dataset")

# modify one of the local files so it has a new md5hash etc.
with open(file, "a") as f:
    f.write("new_line_1\n")

# upload local file to s3
local_file_path = file
s3_url = f"s3://bucket/prefix/{file}"
s3_url_arr = s3_url.replace("s3://", "").split("/")
s3_bucket = s3_url_arr[0]
key = "/".join(s3_url_arr[1:])

s3_client = boto3.client("s3")
file_digest = md5_file(local_file_path)
s3_client.upload_file(
    local_file_path,
    s3_bucket,
    key,
    # save the md5_digest in metadata,
    # can be used later to only upload new files to s3,
    # as AWS doesn't digest the file consistently in the E-tag
    ExtraArgs={"Metadata": {"md5_digest": file_digest}},
)
head_response = s3_client.head_object(Bucket=s3_bucket, Key=key)
version_id: str = head_response["VersionId"]
print(version_id)

# upload a link/ref to this s3 object in wandb:
artifact.add_reference(s3_dir)
# at this point we might be able to modify the artifact._manifest.entries and each entry.extra.get("etag") etc.?
print([(name, entry.extra) for name, entry in artifact._manifest.entries.items()])
# set these to an older version on s3 that we know we want (rather than latest) - do this via wandb public API:
dataset_v2 = api.artifact(f"{ENTITY}/{PROJECT}/test_data:v2", type="dataset")
# artifact._manifest.add_entry(dataset_v2.manifest.entries["admin1CodesASCII.txt"])
artifact._manifest.entries["admin1CodesASCII.txt"] = dataset_v2.manifest.entries[
    "admin1CodesASCII.txt"
]
# verify that it did change:
print([(name, entry.extra) for name, entry in artifact._manifest.entries.items()])

run.log_artifact(artifact)  # at this point the manifest is locked I believe?
artifact.wait()  # wait for upload to finish (blocking - but should be very quick given it is just an s3 link)
print(artifact.name)
run_id = run.id
run.finish()
curr_run = api.run(f"{ENTITY}/{PROJECT}/{run_id}")
used_artifacts = curr_run.used_artifacts()
logged_artifacts = curr_run.logged_artifacts()

Am I on the right track here? I guess the other workaround is to make a copy on s3 (so that older version is the latest again) but I wanted to avoid this as the 1 file that I want to use an old version of is a large NLP model and the only files I want to change are small config.json files etc. (so seems very wasteful to upload all files again).

I was also wondering if when I copy an old version of an object back into the same key in the bucket if that creates a real copy or just like a pointer to the same underlying object. Neither boto3 nor AWS documentation makes that clear - although it seems like it is a proper copy.


Solution

  • I think I found the correct way to do it now:

    import os
    import wandb
    import boto3
    from wandb.util import md5_file
    
    ENTITY = os.environ.get("WANDB_ENTITY")
    PROJECT = os.environ.get("WANDB_PROJECT")
    
    
    def wandb_update_only_some_files_in_artifact(
        existing_artifact_name: str,
        new_s3_file_urls: list[str],
        entity: str = ENTITY,
        project: str = PROJECT,
    ) -> Artifact:
        """If you want to just update a config.json file for example,
        but the rest of the artifact can remain the same, then you can
        use this functions like so:
        wandb_update_only_some_files_in_artifact(
            "old_artifact:v3",
            ["s3://bucket/prefix/config.json"],
        )
        and then all the other files like model.bin will be the same as in v3,
        even if there was a v4 or v5 in between (as the v3 VersionIds are used)
    
        Args:
            existing_artifact_name (str): name with version like "old_artifact:v3"
            new_s3_file_urls (list[str]): files that should be updated
            entity (str, optional): wandb entity. Defaults to ENTITY.
            project (str, optional): wandb project. Defaults to PROJECT.
    
        Returns:
            Artifact: the new artifact object
        """
        api = wandb.Api(overrides={"entity": entity, "project": project})
        old_artifact = api.artifact(existing_artifact_name)
        old_artifact_name = re.sub(r":v\d+$", "", old_artifact.name)
        with wandb.init(entity=entity, project=project) as run:
            new_artifact = wandb.Artifact(old_artifact_name, type=old_artifact.type)
    
            s3_file_names = [s3_url.split("/")[-1] for s3_url in new_s3_file_urls]
            # add the new ones:
            for s3_url, filename in zip(new_s3_file_urls, s3_file_names):
                new_artifact.add_reference(s3_url, filename)
            # add the old ones:
            for filename, entry in old_artifact.manifest.entries.items():
                if filename in s3_file_names:
                    continue
                new_artifact.add_reference(entry, filename)
                # this also works but feels hackier:
                # new_artifact._manifest.entries[filename] = entry
    
            run.log_artifact(new_artifact)
            new_artifact.wait()  # wait for upload to finish (blocking - but should be very quick given it is just an s3 link)
            print(new_artifact.name)
            print(run.id)
        return new_artifact
    
    
    # usage:
    local_file_path = "config.json" # modified file
    s3_url = "s3://bucket/prefix/config.json"
    s3_url_arr = s3_url.replace("s3://", "").split("/")
    s3_bucket = s3_url_arr[0]
    key = "/".join(s3_url_arr[1:])
    
    s3_client = boto3.client("s3")
    file_digest = md5_file(local_file_path)
    s3_client.upload_file(
        local_file_path,
        s3_bucket,
        key,
        # save the md5_digest in metadata,
        # can be used later to only upload new files to s3,
        # as AWS doesn't digest the file consistently in the E-tag
        ExtraArgs={"Metadata": {"md5_digest": file_digest}},
    )
    
    wandb_update_only_some_files_in_artifact(
        "old_artifact:v3",
        ["s3://bucket/prefix/config.json"],
    )