1 module requests.streams;
2 
3 private:
4 import std.algorithm;
5 import std.array;
6 import std.conv;
7 import std.experimental.logger;
8 import std.exception;
9 import std.format;
10 import std.range;
11 import std.range.primitives;
12 import std.string;
13 import std.stdio;
14 import std.traits;
15 import std.zlib;
16 
17 
18 class DecodingExceptioin: Exception {
19     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure {
20         super(msg, file, line);
21     }
22 }
23 /**
24  * DataPipeIface can accept some data, process, and return processed data.
25  */
26 public interface DataPipeIface(E) {
27     /// Is there any processed data ready for reading?
28     bool empty();
29     /// Put next data portion for processing
30     void put(E[]);
31     /// Get any ready data
32     E[] get();
33     /// Signal on end of incoming data stream.
34     void flush();
35 }
36 /**
37  * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line.
38  * This class used to combine different Transfer- and Content- encodings. For example: unchunk chunked transfer-encoding,
39  * and uncompress compressed Content-Encoding.
40  */
41 public class DataPipe(E) : DataPipeIface!E {
42 
43     DataPipeIface!(E)[]  pipe;
44     Buffer!E             buffer;
45     /// Append data processor to pipeline
46     /// Params:
47     /// p = processor
48     void insert(DataPipeIface!E p) {
49         pipe ~= p;
50     }
51     E[][] process(DataPipeIface!E p, E[][] data) {
52         E[][] result;
53         data.each!(e => p.put(e));
54         while(!p.empty()) result ~= p.get();
55         return result;
56     }
57     /// Process next data portion. Data passed over pipeline and result stored in buffer.
58     /// Params:
59     /// data = input data array.
60     void put(E[] data) {
61         if ( pipe.empty ) {
62             buffer.put(data);
63             return;
64         }
65         auto t = process(pipe.front, [data]);
66         foreach(ref p; pipe[1..$]) {
67             t = process(p, t);
68         }
69         t.each!(b => buffer.put(b));
70     }
71     /// Process next data portion. Data passed over pipeline and store result in buffer.
72     /// Params:
73     /// buff = input data buffer.
74     void put(Buffer!E buff) {
75         if ( pipe.empty ) {
76             if ( buffer.__repr is null ) {
77                 buffer = buff;
78                 return;
79             }
80             buffer.__repr.__buffer ~= buff.__repr.__buffer;
81             buffer.__repr.__length += buff.length;
82             return;
83         }
84         auto t = process(pipe.front, buff.__repr.__buffer);
85         foreach(ref p; pipe[1..$]) {
86             t = process(p, t);
87         }
88         t.each!(b => buffer.put(b));
89     }
90     /// Get what was collected in internal buffer and clear it. 
91     /// Returns:
92     /// data collected.
93     E[] get() pure {
94         if ( buffer.empty ) {
95             return E[].init;
96         }
97         auto res = buffer.data;
98         buffer = Buffer!E.init;
99         return res;
100     }
101     /// Test if internal buffer is empty
102     /// Returns:
103     /// true if internal buffer is empty (nothing to get())
104     bool empty() pure const @safe {
105         return buffer.empty;
106     }
107     void flush() {
108         E[][] product;
109         foreach(ref p; pipe) {
110             product.each!(e => p.put(e));
111             p.flush();
112             product.length = 0;
113             while( !p.empty ) product ~= p.get();
114         }
115         product.each!(b => buffer.put(b));
116     }
117 }
118 
119 /**
120  * Processor for gzipped/compressed content.
121  * Also support InputRange interface.
122  */
123 public class Decompressor(E) : DataPipeIface!E {
124     private {
125         Buffer!ubyte __buff;
126         UnCompress   __zlib;
127     }
128     this() {
129         __buff = Buffer!ubyte();
130         __zlib = new UnCompress();
131     }
132 //    this(E[] r) {
133 //        //__range = r;
134 //        __buff = Buffer!ubyte();
135 //        __zlib = new UnCompress();
136 //        auto l = r.length;
137 //        if ( l ) {
138 //            __buff.put(__zlib.uncompress(r.take(l)));
139 //        }
140 //    }
141     override void put(E[] data) {
142         if ( __zlib is null  ) {
143             __zlib = new UnCompress();
144         }
145         __buff.put(__zlib.uncompress(data));
146     }
147     override E[] get() pure {
148         assert(__buff.length);
149         auto r = __buff.__repr.__buffer[0];
150         __buff.popFrontN(r.length);
151         return cast(E[])r;
152     }
153     override void flush() {
154         if ( __zlib is null  ) {
155             return;
156         }
157         __buff.put(__zlib.flush());
158     }
159     override @property bool empty() const pure @safe {
160         debug tracef("empty=%b", __buff.empty);
161         return __buff.empty;
162     }
163     @property auto ref front() pure const @safe {
164         debug tracef("front: buff length=%d", __buff.length);
165         return __buff.front;
166     }
167     @property auto popFront() pure @safe {
168         debug tracef("popFront: buff length=%d", __buff.length);
169         return __buff.popFront;
170     }
171     @property void popFrontN(size_t n) pure @safe {
172         __buff.popFrontN(n);
173     }
174 }
175 
176 /**
177  * Unchunk chunked http responce body.
178  */
179 public class DecodeChunked : DataPipeIface!ubyte {
180     //    length := 0
181     //    read chunk-size, chunk-extension (if any) and CRLF
182     //    while (chunk-size > 0) {
183     //        read chunk-data and CRLF
184     //        append chunk-data to entity-body
185     //        length := length + chunk-size
186     //        read chunk-size and CRLF
187     //    }
188     //    read entity-header
189     //    while (entity-header not empty) {
190     //        append entity-header to existing header fields
191     //        read entity-header
192     //    }
193     //    Content-Length := length
194     //    Remove "chunked" from Transfer-Encoding
195     //
196 
197     //    Chunked-Body   = *chunk
198     //                      last-chunk
199     //                      trailer
200     //                      CRLF
201     //            
202     //    chunk          = chunk-size [ chunk-extension ] CRLF
203     //                     chunk-data CRLF
204     //                     chunk-size     = 1*HEX
205     //                     last-chunk     = 1*("0") [ chunk-extension ] CRLF
206     //        
207     //    chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
208     //    chunk-ext-name = token
209     //    chunk-ext-val  = token | quoted-string
210     //    chunk-data     = chunk-size(OCTET)
211     //    trailer        = *(entity-header CRLF)
212 
213     alias eType = ubyte;
214     immutable eType[] CRLF = ['\r', '\n'];
215     private {
216         enum         States {huntingSize, huntingSeparator, receiving, trailer};
217         char         state = States.huntingSize;
218         size_t       chunk_size, to_receive;
219         Buffer!ubyte buff;
220         ubyte[]      linebuff;
221     }
222     void put(eType[] data) {
223         while ( data.length ) {
224             if ( state == States.trailer ) {
225                 return;
226             }
227             if ( state == States.huntingSize ) {
228                 linebuff ~= data;
229                 data.length = 0;
230                 auto s = linebuff.findSplit(CRLF);
231                 if ( !s[1].length ) {
232                     if ( linebuff.length >= 80 ) {
233                         throw new DecodingExceptioin("Can't find chunk size in the body");
234                     }
235                     continue;
236                 }
237                 tracef("Got chunk size line %s", s[0]);
238                 string x = castFrom!(ubyte[]).to!string(s[0]);
239                 formattedRead(x, "%x", &chunk_size);
240                 tracef("Got chunk size %s", chunk_size);
241                 state = States.receiving;
242                 to_receive = chunk_size;
243                 data = s[2];
244                 if ( chunk_size == 0 ) {
245                     state = States.trailer;
246                     tracef("Unchunk completed");
247                     return;
248                 }
249                 continue;
250             }
251             if ( state == States.receiving ) {
252                 if (to_receive > 0 ) {
253                     auto can_store = min(to_receive, data.length);
254                     buff.put(data[0..can_store]);
255                     data = data[can_store..$];
256                     to_receive -= can_store;
257                     tracef("Unchunked %d bytes from %d", can_store, chunk_size);
258                     if ( to_receive == 0 ) {
259                         tracef("switch to huntig separator");
260                         state = States.huntingSeparator;
261                         to_receive = 2;
262                         linebuff.length = 0;
263                         continue;
264                     }
265                     continue;
266                 }
267                 assert(false);
268             }
269             if ( state == States.huntingSeparator ) {
270                 linebuff ~= data;
271                 data.length = 0;
272                 auto s = linebuff.findSplit(CRLF);
273                 if ( s[1].length ) {
274                     data = s[2];
275                     chunk_size = 0;
276                     linebuff.length = 0;
277                     state = States.huntingSize;
278                     tracef("switch to huntig size");
279                     continue;
280                 }
281             }
282         }
283     }
284     eType[] get() {
285         auto r = buff.__repr.__buffer[0];
286         buff.popFrontN(r.length);
287         return r;
288     }
289     void flush() {
290     }
291     bool empty() {
292         debug tracef("empty=%b", buff.empty);
293         return buff.empty;
294     }
295     bool done() {
296         return state==States.trailer;
297     }
298 }
299 
300 unittest {
301     info("Testing DataPipe");
302     globalLogLevel(LogLevel.info);
303     alias eType = char;
304     eType[] gzipped = [
305         0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56,
306         0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49,
307         0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88,
308         0x08, 0x00, 0x00, 0x00
309     ]; // "abc\ndef\n"
310     auto d = new Decompressor!eType();
311     d.put(gzipped[0..2]);
312     d.put(gzipped[2..10]);
313     d.put(gzipped[10..$]);
314     d.flush();
315     assert(equal(d.filter!(a => a!='b'), "ac\ndef\n"));
316 
317     auto e = new Decompressor!eType();
318     e.put(gzipped[0..10]);
319     e.put(gzipped[10..$]);
320     e.flush();
321     assert(equal(e.filter!(a => a!='b'), "ac\ndef\n"));
322     //    writeln(gzipped.decompress.filter!(a => a!='b').array);
323     auto dp = new DataPipe!eType;
324     dp.insert(new Decompressor!eType());
325     dp.put(gzipped[0..2]);
326     dp.put(gzipped[2..$]);
327     dp.flush();
328     assert(equal(dp.get(), "abc\ndef\n"));
329     // empty datapipe shoul just pass input to output
330     auto dpu = new DataPipe!ubyte;
331     dpu.put("abcd".dup.representation);
332     dpu.put("efgh".dup.representation);
333     dpu.flush();
334     assert(equal(dpu.get(), "abcdefgh"));
335     info("Testing DataPipe - done");
336 }
337 /**
338  * Buffer used to collect and process data from network. It remainds Appender, but support
339  * also Range interface.
340  * $(P To place data in buffer use put() method.)
341  * $(P  To retrieve data from buffer you can use several methods:)
342  * $(UL
343  *  $(LI Range methods: front, back, index [])
344  *  $(LI data method: return collected data (like Appender.data))
345  * )
346  */
347 public struct Buffer(T) {
348     private {
349         class Repr {
350             size_t         __length;
351             Unqual!T[][]   __buffer;
352             this() {
353                 __length = 0;
354             }
355             this(Repr other) {
356                 if ( other is null )
357                     return;
358                 __length = other.__length;
359                 __buffer = other.__buffer.dup;
360             }
361         }
362         Repr __repr;
363     }
364 
365     alias toString = data!string;
366 
367     this(this) {
368         __repr = new Repr(__repr);
369     }
370     this(U)(U[] data) pure {
371         put(data);
372     }
373    ~this() {
374         __repr = null;
375     }
376     auto put(U)(U[] data) pure {
377         if ( data.length == 0 ) {
378             return this;
379         }
380         if ( !__repr ) {
381             __repr = new Repr;
382         }
383         debug tracef("Append %d bytes", data.length);
384         static if (!is(U == T)) {
385             auto d = castFrom!(U[]).to!(T[])(data);
386             __repr.__length += d.length;
387             __repr.__buffer ~= d.dup;
388         } else {
389             __repr.__length += data.length;
390             __repr.__buffer ~= data.dup;
391         }
392         return this;
393     }
394     @property auto opDollar() const pure @safe {
395         return __repr.__length;
396     }
397     @property auto length() const pure @safe {
398         if ( !__repr ) {
399             return 0;
400         }
401         return __repr.__length;
402     }
403     @property auto empty() const pure @safe {
404         return length == 0;
405     }
406     @property auto ref front() const pure @safe {
407         assert(length);
408         return __repr.__buffer.front.front;
409     }
410     @property auto ref back() const pure @safe {
411         assert(length);
412         return __repr.__buffer.back.back;
413     }
414     @property void popFront() pure @safe {
415         assert(length);
416         with ( __repr ) {
417             __buffer.front.popFront;
418             if ( __buffer.front.length == 0 ) {
419                 __buffer.popFront;
420             }
421             __length--;
422         }
423     }
424     @property void popFrontN(size_t n) pure @safe {
425         assert(n <= length, "lengnt: %d, n=%d".format(length, n));
426         __repr.__length -= n;
427         while( n ) {
428             if ( n <= __repr.__buffer.front.length ) {
429                 __repr.__buffer.front.popFrontN(n);
430                 if ( __repr.__buffer.front.length == 0 ) {
431                     __repr.__buffer.popFront;
432                 }
433                 return;
434             }
435             n -= __repr.__buffer.front.length;
436             __repr.__buffer.popFront;
437         }
438     }
439     @property void popBack() pure @safe {
440         assert(length);
441         __repr.__buffer.back.popBack;
442         if ( __repr.__buffer.back.length == 0 ) {
443             __repr.__buffer.popBack;
444         }
445         __repr.__length--;
446     }
447     @property void popBackN(size_t n) pure @safe {
448         assert(n <= length);
449         __repr.__length -= n;
450         while( n ) {
451             if ( n <= __repr.__buffer.back.length ) {
452                 __repr.__buffer.back.popBackN(n);
453                 if ( __repr.__buffer.back.length == 0 ) {
454                     __repr.__buffer.popBack;
455                 }
456                 return;
457             }
458             n -= __repr.__buffer.back.length;
459             __repr.__buffer.popBack;
460         }
461     }
462     @property auto save() pure @safe {
463         auto n = Buffer!T();
464         n.__repr = new Repr(__repr);
465         return n;
466     }
467     @property auto ref opIndex(size_t n) const pure @safe {
468         assert( __repr && n < __repr.__length );
469         foreach(b; __repr.__buffer) {
470             if ( n < b.length ) {
471                 return b[n];
472             }
473             n -= b.length;
474         }
475         assert(false, "Impossible");
476     }
477     Buffer!T opSlice(size_t m, size_t n) {
478         assert( m <= n && n <= __repr.__length);
479         auto res = Buffer!T();
480         if ( m == n ) {
481             res.__repr = new Repr;
482             return res;
483         }
484         res.__repr = new Repr(this.__repr);
485         res.popBackN(res.length-n);
486         res.popFrontN(m);
487         return res;
488     }
489 //    ptrdiff_t countUntil(in T[] needle) const pure @safe {
490 //        ptrdiff_t haystackpos, needlepos;
491 //        while(haystackpos < length) {
492 //            if ( opIndex(haystackpos) == needle[needlepos] ) {
493 //
494 //                return haystackpos;
495 //            } else {
496 //                needlepos = 0;
497 //                haystackpos++;
498 //            }
499 //        }
500 //        return -1;
501 //    }
502     @property auto data(U=T[])() const pure {
503         Appender!(T[]) a;
504         if ( __repr && __repr.__buffer ) {
505             foreach(ref b; __repr.__buffer) {
506                 a.put(b);
507             }
508         }
509         static if ( is(U==T[]) ) {
510             return a.data;
511         } else {
512             return castFrom!(T[]).to!U(a.data);
513         }
514     }
515     string opCast(string)() {
516         return this.toString;
517     }
518     bool opEquals(U)(U x) {
519         return cast(U)this == x;
520     }
521 }
522 ///
523 public unittest {
524 
525     static assert(isInputRange!(Buffer!ubyte));
526     static assert(isForwardRange!(Buffer!ubyte));
527     static assert(hasLength!(Buffer!ubyte));
528     static assert(hasSlicing!(Buffer!ubyte));
529     static assert(isBidirectionalRange!(Buffer!ubyte));
530     static assert(isRandomAccessRange!(Buffer!ubyte));
531     
532     auto b = Buffer!ubyte();
533     b.put("abc".representation.dup);
534     b.put("def".representation.dup);
535     assert(b.length == 6);
536     assert(b.toString == "abcdef");
537     assert(b.front == 'a');
538     assert(b.back == 'f');
539     assert(equal(b[0..$], "abcdef"));
540     assert(equal(b[$-2..$], "ef"));
541     assert(b == "abcdef");
542     b.popFront;
543     b.popBack;
544     assert(b.front == 'b');
545     assert(b.back == 'e');
546     assert(b.length == 4);
547     assert(retro(b).front == 'e');
548     assert(countUntil(b, 'e') == 3);
549     assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c'
550     assert(equal(b, "bcde"));
551     b.popFront; b.popFront;
552     assert(b.front == 'd');
553     assert(b.front == b[0]);
554     assert(b.back == b[$-1]);
555 
556     auto c = Buffer!ubyte();
557     c.put("Header0: value0\n".representation.dup);
558     c.put("Header1: value1\n".representation.dup);
559     c.put("Header2: value2\n\nbody".representation.dup);
560     auto c_length = c.length;
561     auto eoh = countUntil(c, "\n\n");
562     assert(eoh == 47);
563     foreach(header; c[0..eoh].splitter('\n') ) {
564         writeln(castFrom!(ubyte[]).to!(string)(header.data));
565     }
566     assert(equal(findSplit(c, "\n\n")[2], "body"));
567     assert(c.length == c_length);
568 }