Luvit 2.5.6 Documentation
Table of Contents
Stream#
A port of node.js's stream module for luvit.
local Stream = require('Stream')
Class: Stream.Stream#
This is the stream core or base. Extends the emitter class described in Core.
You will most likely not use this class. The only relevant part of this class, the pipe method, is overriden in Readable.
Class: Stream.ReadableState#
Used to hold state by the Readable class
ReadableState:initialize(options, stream)#
Options table:
- HighWaterMark - Defaults to 16, maxes out at 128MB. 128MB limit cannot be overwritten without modifying luvit/deps/stream/stream_readable
- objectMode - If false/nil then the highWaterMark is set to 16 * 1024
Class: Stream.Readable#
Extends Stream.Stream, implements a readable stream interface. Uses ReadableState to keep track of self._readableState
Readable:push(chunk)#
Manually shove something into the read buffer. This returns true if the highWaterMark has not been hit yet, similar to how Writable.write() returns true if you should write() some more.
Readable:unshift(chunk)#
Unshift should always be something directly out of read()
Readable:read(n)#
Reads and returns n chunk bytes
Readable:_read(n)#
Internal method executed by Readable:read. Can be overwritten in child classes.
Readable:unpipe(dest)#
Removes pipes to dest
Readable:on(ev, fn)#
Triggers a callback fn
when an event ev
is triggered.
E.g.
> local child = require('stream').Readable
> child:on('foo', function() print('bar') end)
> child:emit('foo')
'bar'
> child:on('bar', function(data) print(data) end)
> child:emit('bar', 'foo')
'foo'
Readable.addListener#
Alias for Readable:on
You can use Readable:addListener for an implicit self or use Readable.addListener(self, ...)
Readable:resume()#
Resumes a stream
Readable:pause()#
Pauses a stream
Class: Stream.WriteReq#
Used internally within the Writable class.
Class: Stream.WritableState#
Used internally within the Writable class to hold state.
WritableState:initialize(options, stream)#
Options table:
- HighWaterMark - Defaults to 16, maxes out at 128MB. 128MB limit cannot be overwritten without modifying luvit/deps/stream/stream_readable
- objectMode - If false/nil then the highWaterMark is set to 16 * 1024
Class: Stream.Writable#
The writable stream class
Emits end
when done
Writable:initialize(options)#
You can modify the writable state options here, or set them.
Writable:write(chunk, cb)#
Manually write a chunk
Writable:cork()#
Kind of like pause
Writable:uncork()#
Kind of like resume
Class: Stream.Duplex#
A Duplex stream is both readable and writable and inherits the functionality and methods of the aforementioned readable and writable classes.
Duplex:initialize(options)#
These options are passed along to the initializers of the readable and writable streams this class uses.
Furthermore, we can have the following key values in the options table.
- readable - false/true
- writable - false/true
- allowHalfOpen - false/true
Class: Stream.Transform#
a transform stream is a readable/writable stream where you do something with the data. Sometimes it's called a "filter", but that's not a great name for it, since that implies a thing where some bits pass through, and others are simply ignored. (That would be a valid example of a transform, of course.)
While the output is causally related to the input, it's not a necessarily symmetric or synchronous transformation. For example, a zlib stream might take multiple plain-text writes(), and then emit a single compressed chunk some time in the future.
Here's how this works:
The Transform stream has all the aspects of the readable and writable stream classes. When you write(chunk), that calls _write(chunk,cb) internally, and returns false if there's a lot of pending writes buffered up. When you call read(), that calls _read(n) until there's enough pending readable data buffered up.
In a transform stream, the written data is placed in a buffer. When _read(n) is called, it transforms the queued up data, calling the buffered _write cb's as it consumes chunks. If consuming a single written chunk would result in multiple output chunks, then the first outputted bit calls the readcb, and subsequent chunks just go into the read buffer, and will cause it to emit 'readable' if necessary.
This way, back-pressure is actually determined by the reading side, since _read has to be called to start processing a new chunk. However, a pathological inflate type of transform can cause excessive buffering here. For example, imagine a stream where every byte of input is interpreted as an integer from 0-255, and then results in that many bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in 1kb of data being output. In this case, you could write a very small amount of input, and end up with a very large amount of output. In such a pathological inflating mechanism, there'd be no way to tell the system to stop doing the transform. A single 4MB write could cause the system to run out of memory.
However, even in such a pathological case, only a single written chunk would be consumed, and then the rest would wait (un-transformed) until the results of the previous transformed chunk were consumed.
Transform:initialize(options)#
Extendable initializer for the Transform class.
Transform:_transform(chunk, cb)#
The internal transform method. You must define this in your child class. E.g. implement a passthrough filter aka a very fancy way to print hello world
local Transform = require('stream').Transform
local Transformer = Transform:extend()
function Transformer:initialize()
Transform.initialize(self, {objectMode = true})
end
function Transformer:_transform(line, cb)
self:push(line)
return cb()
end
local transformer = Transformer:new()
transformer:on('data', print)
transformer:write('hello world')
Class: Stream.PassThrough#
An extension of the transform stream class with the transform method declared.
Basically just the most minimal sort of Transform stream.
Every written chunk gets output as-is.
Class: Stream.Observable#
Observable is a stream that can be observed outside the pipeline. observe() returns a new Readable stream that emits all data that passes through this stream. Streams created by observe() do not affect back-pressure.