Skip to content

Commit

Permalink
fix: Add paging to hbase client (#4166)
Browse files Browse the repository at this point in the history
* Added paging to hbase client

* minor fixes

* minor fixes

* Fix tests

* Fix tests

* Fix lint

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* Brought back test and fixed page size handling

* fixed test

* add tests

* minor refactor

* minor refactor

* add error handling tests

* fix format

* Add protection against OOM exceptions

* Add protection against OOM exceptions

* Remove useless assertion

* handle setCaching properly

* handle setCaching properly

* handle setCaching properly

* handle setCaching properly

* handle setCaching properly

* handle setCaching properly

* remove useless code

* Get page size directly from the paginator

* fix lint

* cancel serverStream when reaching memory limit

* add test for low memory

* add test for low memory

* fix lint

* update java-bigtable dependency

* Fixed several PR comments

* Fixed several PR comments

* Fixed several PR comments

* Moved to async API

* Fixed several PR comments

* Fixed several PR comments

* Fixed several PR comments

* Fixed several PR comments

* Fixed several PR comments

* Fixed several PR comments

* Fixed according to PR comments

* Fix wrong advance

* Fixed according to PR

* Fixed according to PR

* Fixed according to PR

* remove test

* adjust tests according to PR

* adjust tests according to PR

* fix bug found on beam
  • Loading branch information
ron-gal committed Apr 15, 2024
1 parent a7240cf commit 33facf5
Show file tree
Hide file tree
Showing 6 changed files with 413 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,33 @@ public void testGetScannerNoQualifiers() throws IOException {
}

@Test
public void test100ResultsInScanner() throws IOException {
public void testManyResultsInScanner_lessThanPageSize() throws IOException {
testManyResultsInScanner(95, true);
}

@Test
public void testManyResultsInScanner_equalToPageSize() throws IOException {
testManyResultsInScanner(100, true);
}

@Test
public void testManyResultsInScanner_greaterThanPageSize() throws IOException {
testManyResultsInScanner(105, true);
}

@Test
public void testManyResultsInScanner_greaterThanTwoPageSizes() throws IOException {
testManyResultsInScanner(205, true);
}

@Test
public void testManyResultsInScanner_onePageSizeNoPagination() throws IOException {
testManyResultsInScanner(100, false);
}

private void testManyResultsInScanner(int rowsToWrite, boolean withPagination)
throws IOException {
String prefix = "scan_row_";
int rowsToWrite = 100;

// Initialize variables
Table table = getDefaultTable();
Expand Down Expand Up @@ -208,9 +232,13 @@ public void test100ResultsInScanner() throws IOException {

Scan scan = new Scan();
scan.withStartRow(rowKeys[0])
.withStopRow(rowFollowing(rowKeys[rowsToWrite - 1]))
.withStopRow(rowFollowingSameLength(rowKeys[rowsToWrite - 1]))
.addFamily(COLUMN_FAMILY);

if (withPagination) {
scan = scan.setCaching(100);
}

try (ResultScanner resultScanner = table.getScanner(scan)) {
for (int rowIndex = 0; rowIndex < rowsToWrite; rowIndex++) {
Result result = resultScanner.next();
Expand Down Expand Up @@ -275,7 +303,7 @@ public void testScanDelete() throws IOException {

Scan scan = new Scan();
scan.withStartRow(rowKeys[0])
.withStopRow(rowFollowing(rowKeys[rowsToWrite - 1]))
.withStopRow(rowFollowingSameLength(rowKeys[rowsToWrite - 1]))
.addFamily(COLUMN_FAMILY);
int deleteCount = 0;
try (ResultScanner resultScanner = table.getScanner(scan)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.api.core.InternalApi;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.hbase.adapters.Adapters;
Expand Down Expand Up @@ -92,6 +93,14 @@ public abstract class AbstractBigtableTable implements Table {

private static final Tracer TRACER = Tracing.getTracer();

private static final int MIN_BYTE_BUFFER_SIZE = 100 * 1024 * 1024;
private static final double DEFAULT_BYTE_LIMIT_PERCENTAGE = .1;
private static final long DEFAULT_MAX_SEGMENT_SIZE =
(long)
Math.max(
MIN_BYTE_BUFFER_SIZE,
(Runtime.getRuntime().totalMemory() * DEFAULT_BYTE_LIMIT_PERCENTAGE));

private static class TableMetrics {
Timer putTimer = BigtableClientMetrics.timer(MetricLevel.Info, "table.put.latency");
Timer getTimer = BigtableClientMetrics.timer(MetricLevel.Info, "table.get.latency");
Expand Down Expand Up @@ -295,8 +304,14 @@ public ResultScanner getScanner(final Scan scan) throws IOException {
LOG.trace("getScanner(Scan)");
Span span = TRACER.spanBuilder("BigtableTable.scan").startSpan();
try (Scope scope = TRACER.withSpan(span)) {

final ResultScanner scanner = clientWrapper.readRows(hbaseAdapter.adapt(scan));
ResultScanner scanner;
if (scan.getCaching() == -1) {
scanner = clientWrapper.readRows(hbaseAdapter.adapt(scan));
} else {
Query.QueryPaginator paginator =
hbaseAdapter.adapt(scan).createPaginator(scan.getCaching());
scanner = clientWrapper.readRows(paginator, DEFAULT_MAX_SEGMENT_SIZE);
}
if (hasWhileMatchFilter(scan.getFilter())) {
return Adapters.BIGTABLE_WHILE_MATCH_RESULT_RESULT_SCAN_ADAPTER.adapt(scanner, span);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,10 @@ ApiFuture<Result> readRowAsync(

@Override
void close() throws IOException;

/**
* Perform a scan over {@link Result}s, in key order, using a paginator. maxSegmentByteSize is
* used for testing purposes only.
*/
ResultScanner readRows(Query.QueryPaginator paginator, long maxSegmentByteSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StateCheckingResponseObserver;
import com.google.api.gax.rpc.StreamController;
Expand All @@ -43,12 +44,18 @@
import com.google.cloud.bigtable.metrics.Timer;
import com.google.cloud.bigtable.metrics.Timer.Context;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.grpc.Deadline;
import io.grpc.stub.StreamObserver;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.hbase.client.AbstractClientScanner;
Expand Down Expand Up @@ -134,6 +141,12 @@ public Result apply(Row row) {
MoreExecutors.directExecutor());
}

@Override
public ResultScanner readRows(Query.QueryPaginator paginator, long maxSegmentByteSize) {
return new PaginatedRowResultScanner(
paginator, delegate, maxSegmentByteSize, this.createScanCallContext());
}

@Override
public ResultScanner readRows(Query request) {
return new RowResultScanner(
Expand Down Expand Up @@ -228,6 +241,151 @@ protected void onCompleteImpl() {
}
}

/**
* wraps {@link ServerStream} onto HBase {@link ResultScanner}. {@link PaginatedRowResultScanner}
* gets a paginator and a {@link Query.QueryPaginator} used to get a {@link ServerStream}<{@link
* Result}> using said paginator to iterate over pages of rows. The {@link Query.QueryPaginator}
* pageSize property indicates the size of each page in every API call. A cache of a maximum size
* of 1.1*pageSize and a minimum of 0.1*pageSize is held at all times. In order to avoid OOM
* exceptions, there is a limit for the total byte size held in cache.
*/
static class PaginatedRowResultScanner extends AbstractClientScanner {
// Percentage of max number of rows allowed in the buffer
private static final double WATERMARK_PERCENTAGE = .1;
private static final RowResultAdapter RESULT_ADAPTER = new RowResultAdapter();

private final Meter scannerResultMeter =
BigtableClientMetrics.meter(BigtableClientMetrics.MetricLevel.Info, "scanner.results");
private final Timer scannerResultTimer =
BigtableClientMetrics.timer(
BigtableClientMetrics.MetricLevel.Debug, "scanner.results.latency");

private ByteString lastSeenRowKey = ByteString.EMPTY;
private Boolean hasMore = true;
private final Queue<Result> buffer;
private final Query.QueryPaginator paginator;
private final int refillSegmentWaterMark;

private final BigtableDataClient dataClient;

private final long maxSegmentByteSize;

private long currentByteSize = 0;

private @Nullable Future<List<Result>> future;
private GrpcCallContext scanCallContext;

PaginatedRowResultScanner(
Query.QueryPaginator paginator,
BigtableDataClient dataClient,
long maxSegmentByteSize,
GrpcCallContext scanCallContext) {
this.maxSegmentByteSize = maxSegmentByteSize;

this.paginator = paginator;
this.dataClient = dataClient;
this.buffer = new ArrayDeque<>();
this.refillSegmentWaterMark =
(int) Math.max(1, paginator.getPageSize() * WATERMARK_PERCENTAGE);
this.scanCallContext = scanCallContext;
this.future = fetchNextSegment();
}

@Override
public Result next() {
try (Context ignored = scannerResultTimer.time()) {
if (this.future != null && this.future.isDone()) {
this.consumeReadRowsFuture();
}
if (this.buffer.size() < this.refillSegmentWaterMark && this.future == null && hasMore) {
future = fetchNextSegment();
}
if (this.buffer.isEmpty() && this.future != null) {
this.consumeReadRowsFuture();
}
Result result = this.buffer.poll();
if (result != null) {
scannerResultMeter.mark();
currentByteSize -= Result.getTotalSizeOfCells(result);
}
return result;
}
}

@Override
public void close() {
if (this.future != null) {
this.future.cancel(true);
}
}

public boolean renewLease() {
return true;
}

private Future<List<Result>> fetchNextSegment() {
SettableFuture<List<Result>> resultsFuture = SettableFuture.create();

dataClient
.readRowsCallable(RESULT_ADAPTER)
.call(
paginator.getNextQuery(),
new ResponseObserver<Result>() {
private StreamController controller;
List<Result> results = new ArrayList();

@Override
public void onStart(StreamController controller) {
this.controller = controller;
}

@Override
public void onResponse(Result result) {
// calculate size of the response
currentByteSize += Result.getTotalSizeOfCells(result);
results.add(result);
if (result != null && result.rawCells() != null) {
lastSeenRowKey = RESULT_ADAPTER.getKey(result);
}

if (currentByteSize > maxSegmentByteSize) {
controller.cancel();
return;
}
}

@Override
public void onError(Throwable t) {
if (currentByteSize > maxSegmentByteSize) {
onComplete();
} else {
resultsFuture.setException(t);
}
}

@Override
public void onComplete() {
resultsFuture.set(results);
}
},
this.scanCallContext);
return resultsFuture;
}

private void consumeReadRowsFuture() {
try {
List<Result> results = this.future.get();
this.buffer.addAll(results);
this.hasMore = this.paginator.advance(this.lastSeenRowKey);
this.future = null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
// Do nothing.
}
}
}

/** wraps {@link ServerStream} onto HBase {@link ResultScanner}. */
private static class RowResultScanner extends AbstractClientScanner {

Expand Down Expand Up @@ -264,7 +422,7 @@ public void close() {
}

public boolean renewLease() {
throw new UnsupportedOperationException("renewLease");
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,9 @@ public void close() throws IOException {
delegate.close();
owner.release(key);
}

@Override
public ResultScanner readRows(Query.QueryPaginator paginator, long maxSegmentByteSize) {
return delegate.readRows(paginator, maxSegmentByteSize);
}
}

0 comments on commit 33facf5

Please sign in to comment.