Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to avoid mid-write of a BLOB in GCS? #790

Open
findinpath opened this issue May 20, 2022 · 1 comment
Open

How to avoid mid-write of a BLOB in GCS? #790

findinpath opened this issue May 20, 2022 · 1 comment

Comments

@findinpath
Copy link

findinpath commented May 20, 2022

Let's take the following code snippet as context:

       GoogleHadoopFileSystem ghfs 
        ....
        try(FSDataOutputStream fs = ghfs.create(new Path("gs://tiny-world/tiny/lines.txt"), false)){
             fs.write("first line\n".getBytes(StandardCharsets.UTF_8));
             if (true) throw new IOException("Unexpected I/O");
             fs.write("second line\n".getBytes(StandardCharsets.UTF_8));
        }

When the FSDataOutputStream output stream is closed the file lines.txt will appear on GCS containing only the content:

first line

I'm exploring how to reach the functionality of seeing on GCS a file either when the file has been fully written or not see it all in the situation when it has been mid-written.

One possibility of doing this would be to close the stream only and only if ALL the content of the file has been successfully written.

             FSDataOutputStream fs = ghfs.create(new Path("gs://tiny-world/tiny/lines.txt"), false)){
             fs.write("first line\n".getBytes(StandardCharsets.UTF_8));
             fs.write("second line\n".getBytes(StandardCharsets.UTF_8));
             fs.close()

In such a case however, when an exception occurs while writing the file, the stream would remain unclosed resulting in "leaking" of resources within the application.

Is there a recipe of achieving the functionality of writing a BLOB on GCS with the following constraints:

  • the destination BLOB must not exist on GCS. If a concurrent process tries later to write under the same path it fails.
  • the BLOB is written on GCS if and only if it has been fully written
@findinpath
Copy link
Author

I investigated the hadoop-connectors project code and opted to use reflection in order to get access to com.google.api.services.storage.Storage from the GoogleHadoopFileSystem

    GoogleCloudStorage googleCloudStorage =  ghfs.getGcsFs().getGcs();
    Field gcsField = googleCloudStorage.getClass().getDeclaredField("gcs");
    gcsField.setAccessible(true);
    Storage gcs = (Storage) gcsField.get(googleCloudStorage);

I have made an own version of the Insert call based on the template found in com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel#createRequest and this solved my specific problem.

private static StorageObject createBlob(URI blobPath, byte[] content, GoogleHadoopFileSystem ghfs, Storage gcs)
        throws IOException
{
    CreateFileOptions createFileOptions = new CreateFileOptions(false);
    CreateObjectOptions createObjectOptions = objectOptionsFromFileOptions(createFileOptions);
    PathCodec pathCodec = ghfs.getGcsFs().getOptions().getPathCodec();
    StorageResourceId storageResourceId = pathCodec.validatePathAndGetId(blobPath, false);

    StorageObject object =
            new StorageObject()
                    .setContentEncoding(createObjectOptions.getContentEncoding())
                    .setMetadata(encodeMetadata(createObjectOptions.getMetadata()))
                    .setName(storageResourceId.getObjectName());

    InputStream inputStream = new ByteArrayInputStream(content, 0, content.length);
    Storage.Objects.Insert insert = gcs.objects().insert(
            storageResourceId.getBucketName(),
            object,
            new InputStreamContent(createObjectOptions.getContentType(), inputStream));
    // The operation succeeds only if there are no live versions of the blob. 
    insert.setIfGenerationMatch(0L);
    insert.getMediaHttpUploader().setDirectUploadEnabled(true);
    insert.setName(storageResourceId.getObjectName());
    return insert.execute();
}

/**
 * Helper for converting from a Map<String, byte[]> metadata map that may be in a
 * StorageObject into a Map<String, String> suitable for placement inside a
 * GoogleCloudStorageItemInfo.
 */
@VisibleForTesting
static Map<String, String> encodeMetadata(Map<String, byte[]> metadata) {
    return Maps.transformValues(metadata, QuickstartParallelApiWriteExample::encodeMetadataValues);
}

// A function to encode metadata map values
private static String encodeMetadataValues(byte[] bytes) {
    return bytes == null ? Data.NULL_STRING : BaseEncoding.base64().encode(bytes);
}

Would it be possible to expose gcs field through a getter in GoogleCloudStorageImpl to avoid the rather hacky reflection
trick?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant