public abstract class AbstractQueuedStreamView extends AbstractContextStreamView<org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext>
A read queue is a priority queue where addresses can be inserted, and are dequeued in ascending order. Subclasses implement the fillReadQueue() function, which defines how the read queue should be filled, and the read() function, which reads an entry and updates the pointers for the stream view.
The addresses in the read queue must be global addresses.
This implementation does not handle bulk reads and depends on IStreamView's implementation of remainingUpTo(), which simply calls nextUpTo() under a lock until it returns null.
Created by mwei on 1/6/17.
Modifier and Type | Class and Description |
---|---|
protected static class |
AbstractQueuedStreamView.BackpointerOp |
IStreamView.SearchDirection
Constructor and Description |
---|
AbstractQueuedStreamView(CorfuRuntime runtime,
UUID streamId)
Create a new queued stream view.
|
Modifier and Type | Method and Description |
---|---|
protected void |
addToResolvedQueue(org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext context,
long globalAddress,
ILogData ld)
Add the given address to the resolved queue of the
given context.
|
long |
append(Object object,
Function<TokenResponse,Boolean> acquisitionCallback,
Function<TokenResponse,Boolean> deacquisitionCallback)
Append an object to the stream, returning the global address
it was written at.
|
void |
close() |
ILogData |
current()
Retrieve the current entry in the stream, which was the entry previously
returned by a call to next() or previous().
|
protected abstract boolean |
discoverAddressSpace(UUID streamId,
NavigableSet<Long> queue,
long startAddress,
long stopAddress,
Function<ILogData,AbstractQueuedStreamView.BackpointerOp> filter,
boolean checkpoint,
long maxGlobal)
Defines the strategy to discover addresses belonging to this stream.
|
protected boolean |
fillFromResolved(long maxGlobal,
org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext context) |
protected boolean |
fillReadQueue(long maxGlobal,
org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext context)
Fill the read queue for the current context.
|
long |
find(long globalAddress,
IStreamView.SearchDirection direction)
Find the global address of the next entry in this stream,
in the direction given.
|
void |
gc(long trimMark)
Garbage collect all the trimmed entries on this stream
|
long |
getCurrentGlobalPosition()
Get the current position of the pointer in this stream (global address).
|
boolean |
getHasNext(org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext context)
Return whether calling getNextEntry() may return more
entries, given the context.
|
protected List<ILogData> |
getNextEntries(org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext context,
long maxGlobal,
Function<ILogData,Boolean> contextCheckFn)
Retrieve the next entries in the stream, given the context.
|
protected ILogData |
getNextEntry(org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext context,
long maxGlobal)
Retrieve the next entry in the stream, given the context.
|
ILogData |
previous()
Retrieve the previous entry in the stream.
|
protected ILogData |
read(long address) |
protected ILogData |
read(long address,
long readStartTime)
Reads data from an address in the address space.
|
protected List<ILogData> |
readAll(List<Long> addresses) |
protected ILogData |
readRange(long address,
List<Long> addresses) |
protected abstract ILogData |
removeFromQueue(NavigableSet<Long> queue)
Remove next entry from the queue.
|
protected AbstractQueuedStreamView.BackpointerOp |
resolveCheckpoint(org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext context,
ILogData data,
long maxGlobal) |
doesEntryUpdateContext, getCurrentContext, hasNext, nextUpTo, popContext, processEntryForContext, pushNewContext, remainingUpTo, reset, seek, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
append, getId, getTotalUpdates, next, remaining, spliterator, spliteratorUpTo, stream, streamUpTo
forEachRemaining, remove
public AbstractQueuedStreamView(CorfuRuntime runtime, UUID streamId)
streamId
- The ID of the streamruntime
- The runtime used to create this view.protected void addToResolvedQueue(org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext context, long globalAddress, ILogData ld)
context
- The context to add the address toglobalAddress
- The resolved global address.protected ILogData getNextEntry(org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext context, long maxGlobal)
getNextEntry
in class AbstractContextStreamView<org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext>
context
- The context to retrieve the next entry from.maxGlobal
- The maximum global address to read to.protected abstract ILogData removeFromQueue(NavigableSet<Long> queue)
queue
- queue of entries.public void gc(long trimMark)
IStreamView
trimMark
- start of the active logpublic long append(Object object, Function<TokenResponse,Boolean> acquisitionCallback, Function<TokenResponse,Boolean> deacquisitionCallback)
Optionally, provide a method to be called when an address is acquired, and also a method to be called when an address is released (due to an unsuccessful append).
We loop forever trying to write, and automatically retrying if we get overwritten (hole filled).
object
- The object to append.acquisitionCallback
- A function to call when an address is
acquired.
It should return true to continue with the
append.deacquisitionCallback
- A function to call when an address is
released. It should return true to retry
writing.protected ILogData read(long address, long readStartTime)
address
- address to read.readStartTime
- start time of the range of reads.protected List<ILogData> getNextEntries(org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext context, long maxGlobal, Function<ILogData,Boolean> contextCheckFn)
This function is designed to implement a bulk read. In a bulk read, one of the entries may cause the context to change - the implementation should check if the entry changes the context and stop reading if this occurs, returning the entry that caused contextCheckFn to return true.
The default implementation simply calls getNextEntry.
In the queued implementation, we just read all entries in the read queue in parallel. If there is any entry which changes the context, we cut the list off there.
getNextEntries
in class AbstractContextStreamView<org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext>
context
- The context to retrieve the next entry from.maxGlobal
- The maximum global address to read to.contextCheckFn
- A function which returns true if the entry changes the
stream context.protected boolean fillReadQueue(long maxGlobal, org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext context)
This method returns true if entries were added to the read queue, false otherwise.
maxGlobal
- The maximum global address to read to.context
- The current stream context.protected abstract boolean discoverAddressSpace(UUID streamId, NavigableSet<Long> queue, long startAddress, long stopAddress, Function<ILogData,AbstractQueuedStreamView.BackpointerOp> filter, boolean checkpoint, long maxGlobal)
streamId
- stream unique identifier.queue
- queue to fill up.startAddress
- read start address (inclusive)stopAddress
- read stop address (exclusive)filter
- filter to apply to datacheckpoint
- true if checkpoint discovery, false otherwise.maxGlobal
- max address to resolve discovery.protected boolean fillFromResolved(long maxGlobal, org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext context)
protected ILogData read(long address)
public boolean getHasNext(org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext context)
We indicate we may have entries available if the read queue contains entries to read -or- if the next token is greater than our log pointer.
getHasNext
in class AbstractContextStreamView<org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext>
context
- The context to retrieve the next entry from.public void close()
close
in interface AutoCloseable
close
in class AbstractContextStreamView<org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext>
public long find(long globalAddress, IStreamView.SearchDirection direction)
globalAddress
- The global address to start searching from.direction
- The direction to search.protected AbstractQueuedStreamView.BackpointerOp resolveCheckpoint(org.corfudb.runtime.view.stream.AbstractQueuedStreamView.QueuedStreamContext context, ILogData data, long maxGlobal)
public ILogData previous()
public ILogData current()
public long getCurrentGlobalPosition()
Copyright © 2019 CorfuDB. All rights reserved.