1 module requests.streams;
2 
3 private:
4 import std.algorithm;
5 import std.array;
6 import std.conv;
7 import std.experimental.logger;
8 import std.exception;
9 import std.format;
10 import std.range;
11 import std.range.primitives;
12 import std.string;
13 import std.stdio;
14 import std.traits;
15 import std.zlib;
16 import std.datetime;
17 import std.socket;
18 
19 public class ConnectError: Exception {
20     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure {
21         super(msg, file, line);
22     }
23 }
24 
25 class DecodingExceptioin: Exception {
26     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure {
27         super(msg, file, line);
28     }
29 }
30 /**
31  * DataPipeIface can accept some data, process, and return processed data.
32  */
33 public interface DataPipeIface(E) {
34     /// Is there any processed data ready for reading?
35     bool empty();
36     /// Put next data portion for processing
37     void put(E[]);
38     /// Get any ready data
39     E[] get();
40     /// Signal on end of incoming data stream.
41     void flush();
42 }
43 /**
44  * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line.
45  * This class used to combine different Transfer- and Content- encodings. For example: unchunk chunked transfer-encoding,
46  * and uncompress compressed Content-Encoding.
47  */
48 public class DataPipe(E) : DataPipeIface!E {
49 
50     DataPipeIface!(E)[]  pipe;
51     Buffer!E             buffer;
52     /// Append data processor to pipeline
53     /// Params:
54     /// p = processor
55     void insert(DataPipeIface!E p) {
56         pipe ~= p;
57     }
58     E[][] process(DataPipeIface!E p, E[][] data) {
59         E[][] result;
60         data.each!(e => p.put(e));
61         while(!p.empty()) result ~= p.get();
62         return result;
63     }
64     /// Process next data portion. Data passed over pipeline and result stored in buffer.
65     /// Params:
66     /// data = input data array.
67     void put(E[] data) {
68         if ( pipe.empty ) {
69             buffer.put(data);
70             return;
71         }
72         auto t = process(pipe.front, [data]);
73         foreach(ref p; pipe[1..$]) {
74             t = process(p, t);
75         }
76         t.each!(b => buffer.put(b));
77     }
78     /// Process next data portion. Data passed over pipeline and store result in buffer.
79     /// Params:
80     /// buff = input data buffer.
81     void put(Buffer!E buff) {
82         if ( pipe.empty ) {
83             if ( buffer.__repr is null ) {
84                 buffer = buff;
85                 return;
86             }
87             buffer.__repr.__buffer ~= buff.__repr.__buffer;
88             buffer.__repr.__length += buff.length;
89             return;
90         }
91         auto t = process(pipe.front, buff.__repr.__buffer);
92         foreach(ref p; pipe[1..$]) {
93             t = process(p, t);
94         }
95         t.each!(b => buffer.put(b));
96     }
97     /// Get what was collected in internal buffer and clear it. 
98     /// Returns:
99     /// data collected.
100     E[] get() pure {
101         if ( buffer.empty ) {
102             return E[].init;
103         }
104         auto res = buffer.data;
105         buffer = Buffer!E.init;
106         return res;
107     }
108     /// Test if internal buffer is empty
109     /// Returns:
110     /// true if internal buffer is empty (nothing to get())
111     bool empty() pure const @safe {
112         return buffer.empty;
113     }
114     void flush() {
115         E[][] product;
116         foreach(ref p; pipe) {
117             product.each!(e => p.put(e));
118             p.flush();
119             product.length = 0;
120             while( !p.empty ) product ~= p.get();
121         }
122         product.each!(b => buffer.put(b));
123     }
124 }
125 
126 /**
127  * Processor for gzipped/compressed content.
128  * Also support InputRange interface.
129  */
130 public class Decompressor(E) : DataPipeIface!E {
131     private {
132         Buffer!ubyte __buff;
133         UnCompress   __zlib;
134     }
135     this() {
136         __buff = Buffer!ubyte();
137         __zlib = new UnCompress();
138     }
139 //    this(E[] r) {
140 //        //__range = r;
141 //        __buff = Buffer!ubyte();
142 //        __zlib = new UnCompress();
143 //        auto l = r.length;
144 //        if ( l ) {
145 //            __buff.put(__zlib.uncompress(r.take(l)));
146 //        }
147 //    }
148     override void put(E[] data) {
149         if ( __zlib is null  ) {
150             __zlib = new UnCompress();
151         }
152         __buff.put(__zlib.uncompress(data));
153     }
154     override E[] get() pure {
155         assert(__buff.length);
156         auto r = __buff.__repr.__buffer[0];
157         __buff.popFrontN(r.length);
158         return cast(E[])r;
159     }
160     override void flush() {
161         if ( __zlib is null  ) {
162             return;
163         }
164         __buff.put(__zlib.flush());
165     }
166     override @property bool empty() const pure @safe {
167         debug tracef("empty=%b", __buff.empty);
168         return __buff.empty;
169     }
170     @property auto ref front() pure const @safe {
171         debug tracef("front: buff length=%d", __buff.length);
172         return __buff.front;
173     }
174     @property auto popFront() pure @safe {
175         debug tracef("popFront: buff length=%d", __buff.length);
176         return __buff.popFront;
177     }
178     @property void popFrontN(size_t n) pure @safe {
179         __buff.popFrontN(n);
180     }
181 }
182 
183 /**
184  * Unchunk chunked http responce body.
185  */
186 public class DecodeChunked : DataPipeIface!ubyte {
187     //    length := 0
188     //    read chunk-size, chunk-extension (if any) and CRLF
189     //    while (chunk-size > 0) {
190     //        read chunk-data and CRLF
191     //        append chunk-data to entity-body
192     //        length := length + chunk-size
193     //        read chunk-size and CRLF
194     //    }
195     //    read entity-header
196     //    while (entity-header not empty) {
197     //        append entity-header to existing header fields
198     //        read entity-header
199     //    }
200     //    Content-Length := length
201     //    Remove "chunked" from Transfer-Encoding
202     //
203 
204     //    Chunked-Body   = *chunk
205     //                      last-chunk
206     //                      trailer
207     //                      CRLF
208     //            
209     //    chunk          = chunk-size [ chunk-extension ] CRLF
210     //                     chunk-data CRLF
211     //                     chunk-size     = 1*HEX
212     //                     last-chunk     = 1*("0") [ chunk-extension ] CRLF
213     //        
214     //    chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
215     //    chunk-ext-name = token
216     //    chunk-ext-val  = token | quoted-string
217     //    chunk-data     = chunk-size(OCTET)
218     //    trailer        = *(entity-header CRLF)
219 
220     alias eType = ubyte;
221     immutable eType[] CRLF = ['\r', '\n'];
222     private {
223         enum         States {huntingSize, huntingSeparator, receiving, trailer};
224         char         state = States.huntingSize;
225         size_t       chunk_size, to_receive;
226         Buffer!ubyte buff;
227         ubyte[]      linebuff;
228     }
229     void put(eType[] data) {
230         while ( data.length ) {
231             if ( state == States.trailer ) {
232                 return;
233             }
234             if ( state == States.huntingSize ) {
235                 linebuff ~= data;
236                 data.length = 0;
237                 auto s = linebuff.findSplit(CRLF);
238                 if ( !s[1].length ) {
239                     if ( linebuff.length >= 80 ) {
240                         throw new DecodingExceptioin("Can't find chunk size in the body");
241                     }
242                     continue;
243                 }
244                 string x = castFrom!(ubyte[]).to!string(s[0]);
245                 formattedRead(x, "%x", &chunk_size);
246                 tracef("Got chunk size %s", chunk_size);
247                 state = States.receiving;
248                 to_receive = chunk_size;
249                 data = s[2];
250                 if ( chunk_size == 0 ) {
251                     state = States.trailer;
252                     tracef("Unchunk completed");
253                     return;
254                 }
255                 continue;
256             }
257             if ( state == States.receiving ) {
258                 if (to_receive > 0 ) {
259                     auto can_store = min(to_receive, data.length);
260                     buff.put(data[0..can_store]);
261                     data = data[can_store..$];
262                     to_receive -= can_store;
263                     tracef("Unchunked %d bytes from %d", can_store, chunk_size);
264                     if ( to_receive == 0 ) {
265                         tracef("switch to huntig separator");
266                         state = States.huntingSeparator;
267                         to_receive = 2;
268                         linebuff.length = 0;
269                         continue;
270                     }
271                     continue;
272                 }
273                 assert(false);
274             }
275             if ( state == States.huntingSeparator ) {
276                 linebuff ~= data;
277                 data.length = 0;
278                 auto s = linebuff.findSplit(CRLF);
279                 if ( s[1].length ) {
280                     data = s[2];
281                     chunk_size = 0;
282                     linebuff.length = 0;
283                     state = States.huntingSize;
284                     tracef("switch to huntig size");
285                     continue;
286                 }
287             }
288         }
289     }
290     eType[] get() {
291         auto r = buff.__repr.__buffer[0];
292         buff.popFrontN(r.length);
293         return r;
294     }
295     void flush() {
296     }
297     bool empty() {
298         debug tracef("empty=%b", buff.empty);
299         return buff.empty;
300     }
301     bool done() {
302         return state==States.trailer;
303     }
304 }
305 
306 unittest {
307     info("Testing DataPipe");
308     globalLogLevel(LogLevel.info);
309     alias eType = char;
310     eType[] gzipped = [
311         0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56,
312         0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49,
313         0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88,
314         0x08, 0x00, 0x00, 0x00
315     ]; // "abc\ndef\n"
316     auto d = new Decompressor!eType();
317     d.put(gzipped[0..2]);
318     d.put(gzipped[2..10]);
319     d.put(gzipped[10..$]);
320     d.flush();
321     assert(equal(d.filter!(a => a!='b'), "ac\ndef\n"));
322 
323     auto e = new Decompressor!eType();
324     e.put(gzipped[0..10]);
325     e.put(gzipped[10..$]);
326     e.flush();
327     assert(equal(e.filter!(a => a!='b'), "ac\ndef\n"));
328     //    writeln(gzipped.decompress.filter!(a => a!='b').array);
329     auto dp = new DataPipe!eType;
330     dp.insert(new Decompressor!eType());
331     dp.put(gzipped[0..2]);
332     dp.put(gzipped[2..$]);
333     dp.flush();
334     assert(equal(dp.get(), "abc\ndef\n"));
335     // empty datapipe shoul just pass input to output
336     auto dpu = new DataPipe!ubyte;
337     dpu.put("abcd".dup.representation);
338     dpu.put("efgh".dup.representation);
339     dpu.flush();
340     assert(equal(dpu.get(), "abcdefgh"));
341     info("Testing DataPipe - done");
342 }
343 /**
344  * Buffer used to collect and process data from network. It remainds Appender, but support
345  * also Range interface.
346  * $(P To place data in buffer use put() method.)
347  * $(P  To retrieve data from buffer you can use several methods:)
348  * $(UL
349  *  $(LI Range methods: front, back, index [])
350  *  $(LI data method: return collected data (like Appender.data))
351  * )
352  */
353 public struct Buffer(T) {
354     private {
355         class Repr {
356             size_t         __length;
357             Unqual!T[][]   __buffer;
358             this() {
359                 __length = 0;
360             }
361             this(Repr other) {
362                 if ( other is null )
363                     return;
364                 __length = other.__length;
365                 __buffer = other.__buffer.dup;
366             }
367         }
368         Repr __repr;
369     }
370 
371     alias toString = data!string;
372 
373     this(this) {
374         __repr = new Repr(__repr);
375     }
376     this(U)(U[] data) pure {
377         put(data);
378     }
379    ~this() {
380         __repr = null;
381     }
382     /***************
383      * store data. Data copied
384      */
385     auto put(U)(U[] data) pure {
386         if ( data.length == 0 ) {
387             return this;
388         }
389         if ( !__repr ) {
390             __repr = new Repr;
391         }
392         debug tracef("Append %d bytes", data.length);
393         static if (!is(U == T)) {
394             auto d = castFrom!(U[]).to!(T[])(data);
395             __repr.__length += d.length;
396             __repr.__buffer ~= d.dup;
397         } else {
398             __repr.__length += data.length;
399             __repr.__buffer ~= data.dup;
400         }
401         return this;
402     }
403     @property auto opDollar() const pure @safe {
404         return __repr.__length;
405     }
406     @property auto length() const pure @safe {
407         if ( !__repr ) {
408             return 0;
409         }
410         return __repr.__length;
411     }
412     @property auto empty() const pure @safe {
413         return length == 0;
414     }
415     @property auto ref front() const pure @safe {
416         assert(length);
417         return __repr.__buffer.front.front;
418     }
419     @property auto ref back() const pure @safe {
420         assert(length);
421         return __repr.__buffer.back.back;
422     }
423     @property void popFront() pure @safe {
424         assert(length);
425         with ( __repr ) {
426             __buffer.front.popFront;
427             if ( __buffer.front.length == 0 ) {
428                 __buffer.popFront;
429             }
430             __length--;
431         }
432     }
433     @property void popFrontN(size_t n) pure @safe {
434         assert(n <= length, "lengnt: %d, n=%d".format(length, n));
435         __repr.__length -= n;
436         while( n ) {
437             if ( n <= __repr.__buffer.front.length ) {
438                 __repr.__buffer.front.popFrontN(n);
439                 if ( __repr.__buffer.front.length == 0 ) {
440                     __repr.__buffer.popFront;
441                 }
442                 return;
443             }
444             n -= __repr.__buffer.front.length;
445             __repr.__buffer.popFront;
446         }
447     }
448     @property void popBack() pure @safe {
449         assert(length);
450         __repr.__buffer.back.popBack;
451         if ( __repr.__buffer.back.length == 0 ) {
452             __repr.__buffer.popBack;
453         }
454         __repr.__length--;
455     }
456     @property void popBackN(size_t n) pure @safe {
457         assert(n <= length);
458         __repr.__length -= n;
459         while( n ) {
460             if ( n <= __repr.__buffer.back.length ) {
461                 __repr.__buffer.back.popBackN(n);
462                 if ( __repr.__buffer.back.length == 0 ) {
463                     __repr.__buffer.popBack;
464                 }
465                 return;
466             }
467             n -= __repr.__buffer.back.length;
468             __repr.__buffer.popBack;
469         }
470     }
471     @property auto save() pure @safe {
472         auto n = Buffer!T();
473         n.__repr = new Repr(__repr);
474         return n;
475     }
476     @property auto ref opIndex(size_t n) const pure @safe {
477         assert( __repr && n < __repr.__length );
478         foreach(b; __repr.__buffer) {
479             if ( n < b.length ) {
480                 return b[n];
481             }
482             n -= b.length;
483         }
484         assert(false, "Impossible");
485     }
486     Buffer!T opSlice(size_t m, size_t n) {
487         assert( m <= n && n <= __repr.__length);
488         auto res = Buffer!T();
489         if ( m == n ) {
490             res.__repr = new Repr;
491             return res;
492         }
493         res.__repr = new Repr(this.__repr);
494         res.popBackN(res.length-n);
495         res.popFrontN(m);
496         return res;
497     }
498 //    ptrdiff_t countUntil(in T[] needle) const pure @safe {
499 //        ptrdiff_t haystackpos, needlepos;
500 //        while(haystackpos < length) {
501 //            if ( opIndex(haystackpos) == needle[needlepos] ) {
502 //
503 //                return haystackpos;
504 //            } else {
505 //                needlepos = 0;
506 //                haystackpos++;
507 //            }
508 //        }
509 //        return -1;
510 //    }
511     @property auto data(U=T[])() const pure {
512         Appender!(T[]) a;
513         if ( __repr && __repr.__buffer ) {
514             foreach(ref b; __repr.__buffer) {
515                 a.put(b);
516             }
517         }
518         static if ( is(U==T[]) ) {
519             return a.data;
520         } else {
521             return castFrom!(T[]).to!U(a.data);
522         }
523     }
524     string opCast(string)() {
525         return this.toString;
526     }
527     bool opEquals(U)(U x) {
528         return cast(U)this == x;
529     }
530 }
531 ///
532 public unittest {
533 
534     static assert(isInputRange!(Buffer!ubyte));
535     static assert(isForwardRange!(Buffer!ubyte));
536     static assert(hasLength!(Buffer!ubyte));
537     static assert(hasSlicing!(Buffer!ubyte));
538     static assert(isBidirectionalRange!(Buffer!ubyte));
539     static assert(isRandomAccessRange!(Buffer!ubyte));
540     
541     auto b = Buffer!ubyte();
542     b.put("abc".representation.dup);
543     b.put("def".representation.dup);
544     assert(b.length == 6);
545     assert(b.toString == "abcdef");
546     assert(b.front == 'a');
547     assert(b.back == 'f');
548     assert(equal(b[0..$], "abcdef"));
549     assert(equal(b[$-2..$], "ef"));
550     assert(b == "abcdef");
551     b.popFront;
552     b.popBack;
553     assert(b.front == 'b');
554     assert(b.back == 'e');
555     assert(b.length == 4);
556     assert(retro(b).front == 'e');
557     assert(countUntil(b, 'e') == 3);
558     assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c'
559     assert(equal(b, "bcde"));
560     b.popFront; b.popFront;
561     assert(b.front == 'd');
562     assert(b.front == b[0]);
563     assert(b.back == b[$-1]);
564 
565     auto c = Buffer!ubyte();
566     c.put("Header0: value0\n".representation.dup);
567     c.put("Header1: value1\n".representation.dup);
568     c.put("Header2: value2\n\nbody".representation.dup);
569     auto c_length = c.length;
570     auto eoh = countUntil(c, "\n\n");
571     assert(eoh == 47);
572     foreach(header; c[0..eoh].splitter('\n') ) {
573         writeln(castFrom!(ubyte[]).to!(string)(header.data));
574     }
575     assert(equal(findSplit(c, "\n\n")[2], "body"));
576     assert(c.length == c_length);
577 }
578 
579 extern(C) {
580     int SSL_library_init();
581     void OpenSSL_add_all_ciphers();
582     void OpenSSL_add_all_digests();
583     void SSL_load_error_strings();
584     
585     struct SSL {}
586     struct SSL_CTX {}
587     struct SSL_METHOD {}
588     
589     SSL_CTX* SSL_CTX_new(const SSL_METHOD* method);
590     SSL* SSL_new(SSL_CTX*);
591     int SSL_set_fd(SSL*, int);
592     int SSL_connect(SSL*);
593     int SSL_write(SSL*, const void*, int);
594     int SSL_read(SSL*, void*, int);
595     int SSL_shutdown(SSL*) @trusted @nogc nothrow;
596     void SSL_free(SSL*);
597     void SSL_CTX_free(SSL_CTX*);
598     
599     long    SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg);
600     
601     long SSL_CTX_set_mode(SSL_CTX *ctx, long mode);
602     long SSL_set_mode(SSL *ssl, long mode);
603     
604     long SSL_CTX_get_mode(SSL_CTX *ctx);
605     long SSL_get_mode(SSL *ssl);
606     
607     SSL_METHOD* SSLv3_client_method();
608     SSL_METHOD* TLSv1_2_client_method();
609     SSL_METHOD* TLSv1_client_method();
610 }
611 
612 //pragma(lib, "crypto");
613 //pragma(lib, "ssl");
614 
615 shared static this() {
616     SSL_library_init();
617     OpenSSL_add_all_ciphers();
618     OpenSSL_add_all_digests();
619     SSL_load_error_strings();
620 }
621 
622 public class OpenSslSocket : Socket {
623     enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L;
624     private SSL* ssl;
625     private SSL_CTX* ctx;
626     private void initSsl() {
627         //ctx = SSL_CTX_new(SSLv3_client_method());
628         ctx = SSL_CTX_new(TLSv1_client_method());
629         assert(ctx !is null);
630         
631         //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS);
632         //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null);
633         ssl = SSL_new(ctx);
634         SSL_set_fd(ssl, this.handle);
635     }
636     
637     @trusted
638     override void connect(Address to) {
639         super.connect(to);
640         if(SSL_connect(ssl) == -1)
641             throw new Exception("ssl connect failed");
642     }
643     
644     @trusted
645     override ptrdiff_t send(const(void)[] buf, SocketFlags flags) {
646         return SSL_write(ssl, buf.ptr, cast(uint) buf.length);
647     }
648     override ptrdiff_t send(const(void)[] buf) {
649         return send(buf, SocketFlags.NONE);
650     }
651     @trusted
652     override ptrdiff_t receive(void[] buf, SocketFlags flags) {
653         return SSL_read(ssl, buf.ptr, cast(int)buf.length);
654     }
655     override ptrdiff_t receive(void[] buf) {
656         return receive(buf, SocketFlags.NONE);
657     }
658     this(AddressFamily af, SocketType type = SocketType.STREAM) {
659         super(af, type);
660         initSsl();
661     }
662     
663     this(socket_t sock, AddressFamily af) {
664         super(sock, af);
665         initSsl();
666     }
667     override void close() {
668         //SSL_shutdown(ssl);
669         super.close();
670     }
671     ~this() {
672         SSL_free(ssl);
673         SSL_CTX_free(ctx);
674     }
675 }
676 
677 public abstract class SocketStream {
678     private {
679         Duration timeout;
680         Socket   s;
681         bool     __isOpen;
682         bool     __isConnected;
683     }
684     void open(AddressFamily fa) {
685     }
686     @property ref Socket so() @safe pure {
687         return s;
688     }
689     @property bool isOpen() @safe @nogc pure const {
690         return s && __isOpen;
691     }
692     @property bool isConnected() @safe @nogc pure const {
693         return s && __isConnected;
694     }
695     void close() {
696         tracef("Close socket");
697         if ( isOpen ) {
698             s.close();
699             __isOpen = false;
700             __isConnected = false;
701         }
702         s = null;
703     }
704     
705     auto connect(string host, ushort port, Duration timeout = 10.seconds) {
706         tracef(format("Create connection to %s:%d", host, port));
707         Address[] addresses;
708         __isConnected = false;
709         try {
710             addresses = getAddress(host, port);
711         } catch (Exception e) {
712             errorf("Failed to connect: can't resolve %s - %s", host, e.msg);
713             throw new ConnectError("Can't connect to %s:%d: %s".format(host, port, e.msg));
714         }
715         foreach(a; addresses) {
716             tracef("Trying %s", a);
717             try {
718                 open(a.addressFamily);
719                 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout);
720                 s.connect(a);
721                 tracef("Connected to %s", a);
722                 __isConnected = true;
723                 break;
724             } catch (SocketException e) {
725                 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg);
726                 s.close();
727             }
728         }
729         if ( !__isConnected ) {
730             throw new ConnectError("Can't connect to %s:%d".format(host, port));
731         }
732         return this;
733     }
734     
735     ptrdiff_t send(const(void)[] buff) @safe
736     in {assert(isConnected);}
737     body {
738         return s.send(buff);
739     }
740     
741     ptrdiff_t receive(void[] buff) @safe {
742         auto r = s.receive(buff);
743         if ( r > 0 ) {
744             buff.length = r;
745         }
746         return r;
747     }
748 }
749 
750 public class SSLSocketStream: SocketStream {
751     override void open(AddressFamily fa) {
752         if ( s !is null ) {
753             s.close();
754         }
755         s = new OpenSslSocket(fa);
756         assert(s !is null, "Can't create socket");
757         __isOpen = true;
758     }
759 }
760 
761 public class TCPSocketStream : SocketStream {
762     override void open(AddressFamily fa) {
763         if ( s !is null ) {
764             s.close();
765         }
766         s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP);
767         assert(s !is null, "Can't create socket");
768         __isOpen = true;
769     }
770 }
771