Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: log bulk mutation entry errors #3198

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,10 +21,13 @@
import com.google.bigtable.v2.MutateRowsResponse.Entry;
import com.google.cloud.bigtable.config.RetryOptions;
import com.google.cloud.bigtable.grpc.BigtableDataClient;
import com.google.common.base.MoreObjects;
import com.google.rpc.Status;
import io.grpc.Status.Code;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

/**
* Performs retries for {@link BigtableDataClient#mutateRows(MutateRowsRequest)} operations.
Expand Down Expand Up @@ -194,4 +197,51 @@ public MutateRowsResponse buildResponse() {
}
return MutateRowsResponse.newBuilder().addAllEntries(entries).build();
}

public String getResultString() {
// 2D map: code -> msg -> count
Map<Integer, Map<String, Long>> resultCounts = new TreeMap<>();

for (Status result : results) {
if (result == null) {
result = STATUS_INTERNAL;
}
Map<String, Long> msgCounts = resultCounts.get(result.getCode());
if (msgCounts == null) {
msgCounts = new TreeMap<>();
resultCounts.put(result.getCode(), msgCounts);
}

String msg = MoreObjects.firstNonNull(result.getMessage(), "");
Long count = MoreObjects.firstNonNull(msgCounts.get(msg), 0L);
msgCounts.put(msg, count + 1);
}

// Format string as: code: msg(count), msg2(count); code2: msg(count);
StringBuilder buffer = new StringBuilder();
boolean isFirstCode = true;
for (Map.Entry<Integer, Map<String, Long>> codeEntry : resultCounts.entrySet()) {
if (!isFirstCode) {
buffer.append("; ");
} else {
isFirstCode = false;
}

buffer.append(io.grpc.Status.fromCodeValue(codeEntry.getKey()).getCode());
buffer.append(": ");

boolean isFirstMsg = true;
for (Map.Entry<String, Long> msgEntry : codeEntry.getValue().entrySet()) {
if (!isFirstMsg) {
buffer.append(", ");
} else {
isFirstMsg = false;
}
buffer.append(msgEntry.getKey());
buffer.append("(" + msgEntry.getValue() + ")");
}
}

return buffer.toString();
}
}
Expand Up @@ -19,6 +19,7 @@
import com.google.api.core.InternalApi;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.cloud.bigtable.config.Logger;
import com.google.cloud.bigtable.config.RetryOptions;
import com.google.cloud.bigtable.grpc.BigtableDataClient;
import com.google.cloud.bigtable.grpc.DeadlineGenerator;
Expand All @@ -39,6 +40,8 @@
public class RetryingMutateRowsOperation
extends AbstractRetryingOperation<
MutateRowsRequest, MutateRowsResponse, List<MutateRowsResponse>> {
protected static final Logger LOG = new Logger(RetryingMutateRowsOperation.class);

private static final io.grpc.Status INVALID_RESPONSE =
io.grpc.Status.INTERNAL.withDescription("The server returned an invalid response");

Expand Down Expand Up @@ -86,6 +89,8 @@ protected boolean onOK(Metadata trailers) {
ProcessingStatus status = requestManager.onOK();

if (status == ProcessingStatus.INVALID) {
LOG.error("BulkMutateRows was invalid, final state: " + requestManager.getResultString());

// Set an exception.
onError(INVALID_RESPONSE, trailers);
return true;
Expand All @@ -95,6 +100,12 @@ protected boolean onOK(Metadata trailers) {
if (status == ProcessingStatus.SUCCESS || status == ProcessingStatus.NOT_RETRYABLE) {
// Set the response, with either success, or non-retryable responses.
completionFuture.set(Arrays.asList(requestManager.buildResponse()));

if (status != ProcessingStatus.SUCCESS) {
LOG.error(
"BulkMutateRows partially failed with nonretryable errors, final state: "
+ requestManager.getResultString());
}
return true;
}

Expand All @@ -110,6 +121,9 @@ protected boolean onOK(Metadata trailers) {
"failureCount",
AttributeValue.longAttributeValue(
requestManager.getRetryRequest().getEntriesCount())));

LOG.error(
"BulkMutateRows exhausted retries, final state: " + requestManager.getResultString());
return true;
}

Expand Down
Expand Up @@ -28,6 +28,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -227,4 +229,33 @@ public void testInvalid() {
new MutateRowsRequestManager(retryOptions, createRequest(3));
Assert.assertEquals(ProcessingStatus.INVALID, send(underTest, createResponse(OK, OK)));
}

@Test
public void testMessage() {
MutateRowsRequestManager underTest =
new MutateRowsRequestManager(retryOptions, createRequest(4));
send(
underTest,
createResponse(
Status.newBuilder().setCode(io.grpc.Status.Code.OK.value()).build(),
Status.newBuilder()
.setCode(io.grpc.Status.Code.DEADLINE_EXCEEDED.value())
.setMessage("deadline exceeded custom message")
.build(),
Status.newBuilder()
.setCode(io.grpc.Status.Code.DEADLINE_EXCEEDED.value())
.setMessage("deadline exceeded custom message")
.build(),
Status.newBuilder()
.setCode(io.grpc.Status.Code.INVALID_ARGUMENT.value())
.setMessage("invalid arg custom message")
.build()));
String resultString = underTest.getResultString();
MatcherAssert.assertThat(resultString, Matchers.containsString("OK: (1)"));
MatcherAssert.assertThat(
resultString,
Matchers.containsString("DEADLINE_EXCEEDED: deadline exceeded custom message(2)"));
MatcherAssert.assertThat(
resultString, Matchers.containsString("INVALID_ARGUMENT: invalid arg custom message(1)"));
}
}