For those who know my library iopipe, it has been pretty stagnant as of late. However, the other day, I needed to process a file line-by-line, along with N lines of context. Now, I do have an example in the iopipe library that shows how to do N lines of context, but it does this via a separately maintained list of line references. Such a thing is a pain to keep track of, and ironically works just like a buffer in iopipe does.
So I thought, “what if I make an iopipe which is a pipe of lines, where each element is another line from the source pipe?” Element 0 is the first line in the buffer, element 1 is the next one, etc.
What needs to be stored for this? If I store slices of the underlying window, those can change when data is released, which means reconstructing everything upon release
(not ideal). If I store offsets into the source window, those also can change, but it’s probably more manageable. Instead of re-constructing all the lines, I can subtract the number of bytes removed from the source chain from each element. Each position in my “offset buffer” is a number from 0 to source.window.size, and will be in increasing order. Then when I fetch an “element” from the pipe, it will slice the data out of source.window
based on these endpoints.
But that still means extra work on release
, and it also invalidates any windows stored elsewhere (some of this can’t be helped). However, there’s a simple solution to this: if the first offset in the list is treated as the “origin”, then we can slice based on that being 0! As a bonus, it also tells us the position within the entire stream (since starting the line pipe) for each line.
So I went about writing this, and I couldn’t believe how simple it was! I can copy it all here, since it’s pretty short (I’m using shortened methods here to save some space):
struct SegmentedPipe(SourceChain, Allocator = GCNoPointerAllocator) {
private {
SourceChain source;
AllocatedBuffer!(size_t, Allocator, 16) buffer;
}
private this(SourceChain source) {
this.source = source;
auto nelems = buffer.extend(1);
assert(nelems == 1);
buffer.window[0] = 0; // initialize with an offset of 0.
}
mixin implementValve!source;
// the "range"
static struct Window {
private {
SegmentedPipe *owner;
size_t[] offsets; // all the offsets of each of the segments
}
// standard random-access-range fare
auto front() => this[0];
auto back() => this[$-1];
bool empty() => offsets.length < 2; // needs at least 2 offsets to properly slice
void popFront() => offsets.popFront;
void popBack() => offsets.popBack;
size_t length() => offsets.length - 1;
alias opDollar = length;
auto opIndex(size_t idx) {
immutable base = owner.buffer.window[0]; // first offset is always the front
return owner.source.window[offsets[idx] - base .. offsets[idx + 1] - base];
}
}
Window window() => Window(&this, buffer.window);
size_t extend(size_t elements) {
// ensure we can get a new element
if(buffer.extend(1) == 0)
return 0; // can't get any more buffer space!
// always going to extend the source chain with 0, and give us a new segment
auto baseElems = source.extend(0);
if(baseElems == 0) {
// no new data
buffer.releaseBack(1);
return 0;
}
buffer.window[$-1] = buffer.window[$-2] + baseElems;
return 1;
}
void release(size_t elements) {
source.release(buffer.window[elements] - buffer.window[0]);
buffer.releaseFront(elements);
}
}
// factory
auto segmentedPipe(Chain, Allocator = GCNoPointerAllocator)(Chain base) {
return SegmentedPipe!(Chain, Allocator)(base);
}
For those not familiar with iopipe, the eponymous concept is similar to a range, but is essentially a sliding window of elements. extend
gets more elements, window
gives the current elements (as a random access range), and release
forgets the front N elements from the window. In this way, you can completely control the buffer, and don’t have to allocate your own buffer for things.
You might notice the comment “needs 2 elements”, that’s because we always need 2 offsets to slice an element. Now, I could special case e.g. the last element so I don’t have to store that one, but the code is so much nicer with a sentinel instead.
So how do we use it to get lines? What we need is an iopipe that extends one line at a time. That’s exactly what iopipe.textpipe.byLine
does. The code looks like this:
auto lines = File(filename, mode!"r").refCounted
.bufd // buffered
.assumeText // assume it's utf8
.byLine // extend one line at a time
.segmentedPipe; // store lines in a buffer
And I was kind of shocked when this built and worked the first time. You know an abstraction is good when it writes easy, reads easy, everything is a simple composition of existing API, and it just works!
Expect this to be in iopipe soon. I want some more features here, like I’d like to be able to get the offset from each element, and allow some way to store more information from the underlying pipe/process. I think I might replace jsoniopipe’s JsonTokenizer
with a JsonTokenPipe
, and build things on top of that (i.e. validator, skip, etc). That actually would supersede the awkward cache system. Maybe I can get rid of the awkwardness of getting the string data too? One can only dream…