Tag Archives: iopipe

New iopipe Abstraction – SegmentedPipe

For those who know my library iopipe, it has been pretty stagnant as of late. However, the other day, I needed to process a file line-by-line, along with N lines of context. Now, I do have an example in the iopipe library that shows how to do N lines of context, but it does this via a separately maintained list of line references. Such a thing is a pain to keep track of, and ironically works just like a buffer in iopipe does.

So I thought, “what if I make an iopipe which is a pipe of lines, where each element is another line from the source pipe?” Element 0 is the first line in the buffer, element 1 is the next one, etc.

What needs to be stored for this? If I store slices of the underlying window, those can change when data is released, which means reconstructing everything upon release (not ideal). If I store offsets into the source window, those also can change, but it’s probably more manageable. Instead of re-constructing all the lines, I can subtract the number of bytes removed from the source chain from each element. Each position in my “offset buffer” is a number from 0 to source.window.size, and will be in increasing order. Then when I fetch an “element” from the pipe, it will slice the data out of source.window based on these endpoints.

But that still means extra work on release, and it also invalidates any windows stored elsewhere (some of this can’t be helped). However, there’s a simple solution to this: if the first offset in the list is treated as the “origin”, then we can slice based on that being 0! As a bonus, it also tells us the position within the entire stream (since starting the line pipe) for each line.

So I went about writing this, and I couldn’t believe how simple it was! I can copy it all here, since it’s pretty short (I’m using shortened methods here to save some space):

struct SegmentedPipe(SourceChain, Allocator = GCNoPointerAllocator) {
    private {
        SourceChain source;
        AllocatedBuffer!(size_t, Allocator, 16) buffer;
    }

    private this(SourceChain source) {
        this.source = source;
        auto nelems = buffer.extend(1);
        assert(nelems == 1);
        buffer.window[0] = 0; // initialize with an offset of 0.
    }

    mixin implementValve!source;

    // the "range"
    static struct Window {
        private {
            SegmentedPipe *owner;
            size_t[] offsets; // all the offsets of each of the segments
        }

        // standard random-access-range fare
        auto front() => this[0];
        auto back() => this[$-1];
        bool empty() => offsets.length < 2; // needs at least 2 offsets to properly slice
        void popFront() => offsets.popFront;
        void popBack() => offsets.popBack;
        size_t length() => offsets.length - 1;
        alias opDollar = length;

        auto opIndex(size_t idx) {
            immutable base = owner.buffer.window[0]; // first offset is always the front
            return owner.source.window[offsets[idx] - base .. offsets[idx + 1] - base];
        }
    }

    Window window() => Window(&this, buffer.window);

    size_t extend(size_t elements) {
        // ensure we can get a new element
        if(buffer.extend(1) == 0)
            return 0; // can't get any more buffer space!
        // always going to extend the source chain with 0, and give us a new segment
        auto baseElems = source.extend(0);
        if(baseElems == 0) {
            // no new data
            buffer.releaseBack(1);
            return 0;
        }
        buffer.window[$-1] = buffer.window[$-2] + baseElems;
        return 1;
    }

    void release(size_t elements) {
        source.release(buffer.window[elements] - buffer.window[0]);
        buffer.releaseFront(elements);
    }
}

// factory
auto segmentedPipe(Chain, Allocator = GCNoPointerAllocator)(Chain base) {
    return SegmentedPipe!(Chain, Allocator)(base);
}

For those not familiar with iopipe, the eponymous concept is similar to a range, but is essentially a sliding window of elements. extend gets more elements, window gives the current elements (as a random access range), and release forgets the front N elements from the window. In this way, you can completely control the buffer, and don’t have to allocate your own buffer for things.

You might notice the comment “needs 2 elements”, that’s because we always need 2 offsets to slice an element. Now, I could special case e.g. the last element so I don’t have to store that one, but the code is so much nicer with a sentinel instead.

So how do we use it to get lines? What we need is an iopipe that extends one line at a time. That’s exactly what iopipe.textpipe.byLine does. The code looks like this:

        auto lines = File(filename, mode!"r").refCounted
            .bufd // buffered
            .assumeText // assume it's utf8
            .byLine // extend one line at a time
            .segmentedPipe; // store lines in a buffer

And I was kind of shocked when this built and worked the first time. You know an abstraction is good when it writes easy, reads easy, everything is a simple composition of existing API, and it just works!

Expect this to be in iopipe soon. I want some more features here, like I’d like to be able to get the offset from each element, and allow some way to store more information from the underlying pipe/process. I think I might replace jsoniopipe’s JsonTokenizer with a JsonTokenPipe, and build things on top of that (i.e. validator, skip, etc). That actually would supersede the awkward cache system. Maybe I can get rid of the awkwardness of getting the string data too? One can only dream…