1 module requests.streams;
2 
3 private:
4 import std.algorithm;
5 import std.array;
6 import std.conv;
7 import std.experimental.logger;
8 import std.exception;
9 import std.format;
10 import std.range;
11 import std.range.primitives;
12 import std.string;
13 import std.stdio;
14 import std.traits;
15 import std.zlib;
16 import std.datetime;
17 import std.socket;
18 
19 alias InDataHandler = DataPipeIface!ubyte;
20 
21 public class ConnectError: Exception {
22     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure {
23         super(msg, file, line);
24     }
25 }
26 
27 class DecodingExceptioin: Exception {
28     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure {
29         super(msg, file, line);
30     }
31 }
32 /**
33  * DataPipeIface can accept some data, process, and return processed data.
34  */
35 public interface DataPipeIface(E) {
36     /// Is there any processed data ready for reading?
37     bool empty();
38     /// Put next data portion for processing
39     void put(E[]);
40     /// Get any ready data
41     E[] get();
42     /// Signal on end of incoming data stream.
43     void flush();
44 }
45 /**
46  * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line.
47  * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked",
48  * and uncompress Content-Encoding "gzip".
49  */
50 public class DataPipe(E) : DataPipeIface!E {
51 
52     DataPipeIface!(E)[]  pipe;
53     Buffer!E             buffer;
54     /// Append data processor to pipeline
55     /// Params:
56     /// p = processor
57     void insert(DataPipeIface!E p) {
58         pipe ~= p;
59     }
60     E[][] process(DataPipeIface!E p, E[][] data) {
61         E[][] result;
62         data.each!(e => p.put(e));
63         while(!p.empty()) result ~= p.get();
64         return result;
65     }
66     /// Process next data portion. Data passed over pipeline and result stored in buffer.
67     /// Params:
68     /// data = input data array.
69     void put(E[] data) {
70         if ( pipe.empty ) {
71             buffer.put(data);
72             return;
73         }
74         auto t = process(pipe.front, [data]);
75         foreach(ref p; pipe[1..$]) {
76             t = process(p, t);
77         }
78         t.each!(b => buffer.put(b));
79     }
80     /// Process next data portion. Data passed over pipeline and store result in buffer.
81     /// Params:
82     /// buff = input data buffer.
83     void put(Buffer!E buff) {
84         if ( pipe.empty ) {
85             if ( buffer.__repr is null ) {
86                 buffer = buff;
87                 return;
88             }
89             buffer.__repr.__buffer ~= buff.__repr.__buffer;
90             buffer.__repr.__length += buff.length;
91             return;
92         }
93         auto t = process(pipe.front, buff.__repr.__buffer);
94         foreach(ref p; pipe[1..$]) {
95             t = process(p, t);
96         }
97         t.each!(b => buffer.put(b));
98     }
99     /// Get what was collected in internal buffer and clear it. 
100     /// Returns:
101     /// data collected.
102     E[] get() pure {
103         if ( buffer.empty ) {
104             return E[].init;
105         }
106         auto res = buffer.data;
107         buffer = Buffer!E.init;
108         return res;
109     }
110     /// Test if internal buffer is empty
111     /// Returns:
112     /// true if internal buffer is empty (nothing to get())
113     bool empty() pure const @safe {
114         return buffer.empty;
115     }
116     void flush() {
117         E[][] product;
118         foreach(ref p; pipe) {
119             product.each!(e => p.put(e));
120             p.flush();
121             product.length = 0;
122             while( !p.empty ) product ~= p.get();
123         }
124         product.each!(b => buffer.put(b));
125     }
126 }
127 
128 /**
129  * Processor for gzipped/compressed content.
130  * Also support InputRange interface.
131  */
132 public class Decompressor(E) : DataPipeIface!E {
133     private {
134         Buffer!ubyte __buff;
135         UnCompress   __zlib;
136     }
137     this() {
138         __buff = Buffer!ubyte();
139         __zlib = new UnCompress();
140     }
141 //    this(E[] r) {
142 //        //__range = r;
143 //        __buff = Buffer!ubyte();
144 //        __zlib = new UnCompress();
145 //        auto l = r.length;
146 //        if ( l ) {
147 //            __buff.put(__zlib.uncompress(r.take(l)));
148 //        }
149 //    }
150     override void put(E[] data) {
151         if ( __zlib is null  ) {
152             __zlib = new UnCompress();
153         }
154         __buff.put(__zlib.uncompress(data));
155     }
156     override E[] get() pure {
157         assert(__buff.length);
158         auto r = __buff.__repr.__buffer[0];
159         __buff.popFrontN(r.length);
160         return cast(E[])r;
161     }
162     override void flush() {
163         if ( __zlib is null  ) {
164             return;
165         }
166         __buff.put(__zlib.flush());
167     }
168     override @property bool empty() const pure @safe {
169         debug tracef("empty=%b", __buff.empty);
170         return __buff.empty;
171     }
172     @property auto ref front() pure const @safe {
173         debug tracef("front: buff length=%d", __buff.length);
174         return __buff.front;
175     }
176     @property auto popFront() pure @safe {
177         debug tracef("popFront: buff length=%d", __buff.length);
178         return __buff.popFront;
179     }
180     @property void popFrontN(size_t n) pure @safe {
181         __buff.popFrontN(n);
182     }
183 }
184 
185 /**
186  * Unchunk chunked http responce body.
187  */
188 public class DecodeChunked : DataPipeIface!ubyte {
189     //    length := 0
190     //    read chunk-size, chunk-extension (if any) and CRLF
191     //    while (chunk-size > 0) {
192     //        read chunk-data and CRLF
193     //        append chunk-data to entity-body
194     //        length := length + chunk-size
195     //        read chunk-size and CRLF
196     //    }
197     //    read entity-header
198     //    while (entity-header not empty) {
199     //        append entity-header to existing header fields
200     //        read entity-header
201     //    }
202     //    Content-Length := length
203     //    Remove "chunked" from Transfer-Encoding
204     //
205 
206     //    Chunked-Body   = *chunk
207     //                      last-chunk
208     //                      trailer
209     //                      CRLF
210     //            
211     //    chunk          = chunk-size [ chunk-extension ] CRLF
212     //                     chunk-data CRLF
213     //                     chunk-size     = 1*HEX
214     //                     last-chunk     = 1*("0") [ chunk-extension ] CRLF
215     //        
216     //    chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
217     //    chunk-ext-name = token
218     //    chunk-ext-val  = token | quoted-string
219     //    chunk-data     = chunk-size(OCTET)
220     //    trailer        = *(entity-header CRLF)
221 
222     alias eType = ubyte;
223     immutable eType[] CRLF = ['\r', '\n'];
224     private {
225         enum         States {huntingSize, huntingSeparator, receiving, trailer};
226         char         state = States.huntingSize;
227         size_t       chunk_size, to_receive;
228         Buffer!ubyte buff;
229         ubyte[]      linebuff;
230     }
231     void put(eType[] data) {
232         while ( data.length ) {
233             if ( state == States.trailer ) {
234                 return;
235             }
236             if ( state == States.huntingSize ) {
237                 linebuff ~= data;
238                 data.length = 0;
239                 auto s = linebuff.findSplit(CRLF);
240                 if ( !s[1].length ) {
241                     if ( linebuff.length >= 80 ) {
242                         throw new DecodingExceptioin("Can't find chunk size in the body");
243                     }
244                     continue;
245                 }
246                 string x = castFrom!(ubyte[]).to!string(s[0]);
247                 formattedRead(x, "%x", &chunk_size);
248                 tracef("Got chunk size %s", chunk_size);
249                 state = States.receiving;
250                 to_receive = chunk_size;
251                 data = s[2];
252                 if ( chunk_size == 0 ) {
253                     state = States.trailer;
254                     tracef("Unchunk completed");
255                     return;
256                 }
257                 continue;
258             }
259             if ( state == States.receiving ) {
260                 if (to_receive > 0 ) {
261                     auto can_store = min(to_receive, data.length);
262                     buff.put(data[0..can_store]);
263                     data = data[can_store..$];
264                     to_receive -= can_store;
265                     tracef("Unchunked %d bytes from %d", can_store, chunk_size);
266                     if ( to_receive == 0 ) {
267                         tracef("switch to huntig separator");
268                         state = States.huntingSeparator;
269                         to_receive = 2;
270                         linebuff.length = 0;
271                         continue;
272                     }
273                     continue;
274                 }
275                 assert(false);
276             }
277             if ( state == States.huntingSeparator ) {
278                 linebuff ~= data;
279                 data.length = 0;
280                 auto s = linebuff.findSplit(CRLF);
281                 if ( s[1].length ) {
282                     data = s[2];
283                     chunk_size = 0;
284                     linebuff.length = 0;
285                     state = States.huntingSize;
286                     tracef("switch to huntig size");
287                     continue;
288                 }
289             }
290         }
291     }
292     eType[] get() {
293         auto r = buff.__repr.__buffer[0];
294         buff.popFrontN(r.length);
295         return r;
296     }
297     void flush() {
298     }
299     bool empty() {
300         debug tracef("empty=%b", buff.empty);
301         return buff.empty;
302     }
303     bool done() {
304         return state==States.trailer;
305     }
306 }
307 
308 unittest {
309     info("Testing DataPipe");
310     globalLogLevel(LogLevel.info);
311     alias eType = char;
312     eType[] gzipped = [
313         0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56,
314         0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49,
315         0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88,
316         0x08, 0x00, 0x00, 0x00
317     ]; // "abc\ndef\n"
318     auto d = new Decompressor!eType();
319     d.put(gzipped[0..2]);
320     d.put(gzipped[2..10]);
321     d.put(gzipped[10..$]);
322     d.flush();
323     assert(equal(d.filter!(a => a!='b'), "ac\ndef\n"));
324 
325     auto e = new Decompressor!eType();
326     e.put(gzipped[0..10]);
327     e.put(gzipped[10..$]);
328     e.flush();
329     assert(equal(e.filter!(a => a!='b'), "ac\ndef\n"));
330     //    writeln(gzipped.decompress.filter!(a => a!='b').array);
331     auto dp = new DataPipe!eType;
332     dp.insert(new Decompressor!eType());
333     dp.put(gzipped[0..2]);
334     dp.put(gzipped[2..$]);
335     dp.flush();
336     assert(equal(dp.get(), "abc\ndef\n"));
337     // empty datapipe shoul just pass input to output
338     auto dpu = new DataPipe!ubyte;
339     dpu.put("abcd".dup.representation);
340     dpu.put("efgh".dup.representation);
341     dpu.flush();
342     assert(equal(dpu.get(), "abcdefgh"));
343     info("Testing DataPipe - done");
344 }
345 /**
346  * Buffer used to collect and process data from network. It remainds Appender, but support
347  * also Range interface.
348  * $(P To place data in buffer use put() method.)
349  * $(P  To retrieve data from buffer you can use several methods:)
350  * $(UL
351  *  $(LI Range methods: front, back, index [])
352  *  $(LI data method: return collected data (like Appender.data))
353  * )
354  */
355 public struct Buffer(T) {
356     private {
357         class Repr {
358             size_t         __length;
359             Unqual!T[][]   __buffer;
360             this() {
361                 __length = 0;
362             }
363             this(Repr other) {
364                 if ( other is null )
365                     return;
366                 __length = other.__length;
367                 __buffer = other.__buffer.dup;
368             }
369         }
370         Repr __repr;
371     }
372 
373     alias toString = data!string;
374 
375     this(this) {
376         __repr = new Repr(__repr);
377     }
378     this(U)(U[] data) pure {
379         put(data);
380     }
381    ~this() {
382         __repr = null;
383     }
384     /***************
385      * store data. Data copied
386      */
387     auto put(U)(U[] data) pure {
388         if ( data.length == 0 ) {
389             return this;
390         }
391         if ( !__repr ) {
392             __repr = new Repr;
393         }
394         debug tracef("Append %d bytes", data.length);
395         static if (!is(U == T)) {
396             auto d = castFrom!(U[]).to!(T[])(data);
397             __repr.__length += d.length;
398             __repr.__buffer ~= d.dup;
399         } else {
400             __repr.__length += data.length;
401             __repr.__buffer ~= data.dup;
402         }
403         return this;
404     }
405     @property auto opDollar() const pure @safe {
406         return __repr.__length;
407     }
408     @property auto length() const pure @safe {
409         if ( !__repr ) {
410             return 0;
411         }
412         return __repr.__length;
413     }
414     @property auto empty() const pure @safe {
415         return length == 0;
416     }
417     @property auto ref front() const pure @safe {
418         assert(length);
419         return __repr.__buffer.front.front;
420     }
421     @property auto ref back() const pure @safe {
422         assert(length);
423         return __repr.__buffer.back.back;
424     }
425     @property void popFront() pure @safe {
426         assert(length);
427         with ( __repr ) {
428             __buffer.front.popFront;
429             if ( __buffer.front.length == 0 ) {
430                 __buffer.popFront;
431             }
432             __length--;
433         }
434     }
435     @property void popFrontN(size_t n) pure @safe {
436         assert(n <= length, "lengnt: %d, n=%d".format(length, n));
437         __repr.__length -= n;
438         while( n ) {
439             if ( n <= __repr.__buffer.front.length ) {
440                 __repr.__buffer.front.popFrontN(n);
441                 if ( __repr.__buffer.front.length == 0 ) {
442                     __repr.__buffer.popFront;
443                 }
444                 return;
445             }
446             n -= __repr.__buffer.front.length;
447             __repr.__buffer.popFront;
448         }
449     }
450     @property void popBack() pure @safe {
451         assert(length);
452         __repr.__buffer.back.popBack;
453         if ( __repr.__buffer.back.length == 0 ) {
454             __repr.__buffer.popBack;
455         }
456         __repr.__length--;
457     }
458     @property void popBackN(size_t n) pure @safe {
459         assert(n <= length);
460         __repr.__length -= n;
461         while( n ) {
462             if ( n <= __repr.__buffer.back.length ) {
463                 __repr.__buffer.back.popBackN(n);
464                 if ( __repr.__buffer.back.length == 0 ) {
465                     __repr.__buffer.popBack;
466                 }
467                 return;
468             }
469             n -= __repr.__buffer.back.length;
470             __repr.__buffer.popBack;
471         }
472     }
473     @property auto save() pure @safe {
474         auto n = Buffer!T();
475         n.__repr = new Repr(__repr);
476         return n;
477     }
478     @property auto ref opIndex(size_t n) const pure @safe {
479         assert( __repr && n < __repr.__length );
480         foreach(b; __repr.__buffer) {
481             if ( n < b.length ) {
482                 return b[n];
483             }
484             n -= b.length;
485         }
486         assert(false, "Impossible");
487     }
488     Buffer!T opSlice(size_t m, size_t n) {
489         assert( m <= n && n <= __repr.__length);
490         auto res = Buffer!T();
491         if ( m == n ) {
492             res.__repr = new Repr;
493             return res;
494         }
495         res.__repr = new Repr(this.__repr);
496         res.popBackN(res.length-n);
497         res.popFrontN(m);
498         return res;
499     }
500 //    ptrdiff_t countUntil(in T[] needle) const pure @safe {
501 //        ptrdiff_t haystackpos, needlepos;
502 //        while(haystackpos < length) {
503 //            if ( opIndex(haystackpos) == needle[needlepos] ) {
504 //
505 //                return haystackpos;
506 //            } else {
507 //                needlepos = 0;
508 //                haystackpos++;
509 //            }
510 //        }
511 //        return -1;
512 //    }
513     @property auto data(U=T[])() const pure {
514         Appender!(T[]) a;
515         if ( __repr && __repr.__buffer ) {
516             foreach(ref b; __repr.__buffer) {
517                 a.put(b);
518             }
519         }
520         static if ( is(U==T[]) ) {
521             return a.data;
522         } else {
523             return castFrom!(T[]).to!U(a.data);
524         }
525     }
526     string opCast(string)() {
527         return this.toString;
528     }
529     bool opEquals(U)(U x) {
530         return cast(U)this == x;
531     }
532 }
533 ///
534 public unittest {
535 
536     static assert(isInputRange!(Buffer!ubyte));
537     static assert(isForwardRange!(Buffer!ubyte));
538     static assert(hasLength!(Buffer!ubyte));
539     static assert(hasSlicing!(Buffer!ubyte));
540     static assert(isBidirectionalRange!(Buffer!ubyte));
541     static assert(isRandomAccessRange!(Buffer!ubyte));
542     
543     auto b = Buffer!ubyte();
544     b.put("abc".representation.dup);
545     b.put("def".representation.dup);
546     assert(b.length == 6);
547     assert(b.toString == "abcdef");
548     assert(b.front == 'a');
549     assert(b.back == 'f');
550     assert(equal(b[0..$], "abcdef"));
551     assert(equal(b[$-2..$], "ef"));
552     assert(b == "abcdef");
553     b.popFront;
554     b.popBack;
555     assert(b.front == 'b');
556     assert(b.back == 'e');
557     assert(b.length == 4);
558     assert(retro(b).front == 'e');
559     assert(countUntil(b, 'e') == 3);
560     assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c'
561     assert(equal(b, "bcde"));
562     b.popFront; b.popFront;
563     assert(b.front == 'd');
564     assert(b.front == b[0]);
565     assert(b.back == b[$-1]);
566 
567     auto c = Buffer!ubyte();
568     c.put("Header0: value0\n".representation.dup);
569     c.put("Header1: value1\n".representation.dup);
570     c.put("Header2: value2\n\nbody".representation.dup);
571     auto c_length = c.length;
572     auto eoh = countUntil(c, "\n\n");
573     assert(eoh == 47);
574     foreach(header; c[0..eoh].splitter('\n') ) {
575         writeln(castFrom!(ubyte[]).to!(string)(header.data));
576     }
577     assert(equal(findSplit(c, "\n\n")[2], "body"));
578     assert(c.length == c_length);
579 }
580 
581 extern(C) {
582     int SSL_library_init();
583     void OpenSSL_add_all_ciphers();
584     void OpenSSL_add_all_digests();
585     void SSL_load_error_strings();
586     
587     struct SSL {}
588     struct SSL_CTX {}
589     struct SSL_METHOD {}
590     
591     SSL_CTX* SSL_CTX_new(const SSL_METHOD* method);
592     SSL* SSL_new(SSL_CTX*);
593     int SSL_set_fd(SSL*, int);
594     int SSL_connect(SSL*);
595     int SSL_write(SSL*, const void*, int);
596     int SSL_read(SSL*, void*, int);
597     int SSL_shutdown(SSL*) @trusted @nogc nothrow;
598     void SSL_free(SSL*);
599     void SSL_CTX_free(SSL_CTX*);
600     
601     long    SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg);
602     
603     long SSL_CTX_set_mode(SSL_CTX *ctx, long mode);
604     long SSL_set_mode(SSL *ssl, long mode);
605     
606     long SSL_CTX_get_mode(SSL_CTX *ctx);
607     long SSL_get_mode(SSL *ssl);
608     
609     SSL_METHOD* SSLv3_client_method();
610     SSL_METHOD* TLSv1_2_client_method();
611     SSL_METHOD* TLSv1_client_method();
612 }
613 
614 //pragma(lib, "crypto");
615 //pragma(lib, "ssl");
616 
617 shared static this() {
618     SSL_library_init();
619     OpenSSL_add_all_ciphers();
620     OpenSSL_add_all_digests();
621     SSL_load_error_strings();
622 }
623 
624 public class OpenSslSocket : Socket {
625     enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L;
626     private SSL* ssl;
627     private SSL_CTX* ctx;
628     private void initSsl() {
629         //ctx = SSL_CTX_new(SSLv3_client_method());
630         ctx = SSL_CTX_new(TLSv1_client_method());
631         assert(ctx !is null);
632         
633         //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS);
634         //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null);
635         ssl = SSL_new(ctx);
636         SSL_set_fd(ssl, this.handle);
637     }
638     
639     @trusted
640     override void connect(Address to) {
641         super.connect(to);
642         if(SSL_connect(ssl) == -1)
643             throw new Exception("ssl connect failed");
644     }
645     
646     @trusted
647     override ptrdiff_t send(const(void)[] buf, SocketFlags flags) {
648         return SSL_write(ssl, buf.ptr, cast(uint) buf.length);
649     }
650     override ptrdiff_t send(const(void)[] buf) {
651         return send(buf, SocketFlags.NONE);
652     }
653     @trusted
654     override ptrdiff_t receive(void[] buf, SocketFlags flags) {
655         return SSL_read(ssl, buf.ptr, cast(int)buf.length);
656     }
657     override ptrdiff_t receive(void[] buf) {
658         return receive(buf, SocketFlags.NONE);
659     }
660     this(AddressFamily af, SocketType type = SocketType.STREAM) {
661         super(af, type);
662         initSsl();
663     }
664     
665     this(socket_t sock, AddressFamily af) {
666         super(sock, af);
667         initSsl();
668     }
669     override void close() {
670         //SSL_shutdown(ssl);
671         super.close();
672     }
673     ~this() {
674         SSL_free(ssl);
675         SSL_CTX_free(ctx);
676     }
677 }
678 
679 public abstract class SocketStream {
680     private {
681         Duration timeout;
682         Socket   s;
683         bool     __isOpen;
684         bool     __isConnected;
685     }
686     void open(AddressFamily fa) {
687     }
688     @property ref Socket so() @safe pure {
689         return s;
690     }
691     @property bool isOpen() @safe @nogc pure const {
692         return s && __isOpen;
693     }
694     @property bool isConnected() @safe @nogc pure const {
695         return s && __isConnected;
696     }
697     void close() @trusted {
698         tracef("Close socket");
699         if ( isOpen ) {
700             s.close();
701             __isOpen = false;
702             __isConnected = false;
703         }
704         s = null;
705     }
706     
707     auto connect(string host, ushort port, Duration timeout = 10.seconds) {
708         tracef(format("Create connection to %s:%d", host, port));
709         Address[] addresses;
710         __isConnected = false;
711         try {
712             addresses = getAddress(host, port);
713         } catch (Exception e) {
714             errorf("Failed to connect: can't resolve %s - %s", host, e.msg);
715             throw new ConnectError("Can't connect to %s:%d: %s".format(host, port, e.msg));
716         }
717         foreach(a; addresses) {
718             tracef("Trying %s", a);
719             try {
720                 open(a.addressFamily);
721                 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout);
722                 s.connect(a);
723                 tracef("Connected to %s", a);
724                 __isConnected = true;
725                 break;
726             } catch (SocketException e) {
727                 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg);
728                 s.close();
729             }
730         }
731         if ( !__isConnected ) {
732             throw new ConnectError("Can't connect to %s:%d".format(host, port));
733         }
734         return this;
735     }
736     
737     ptrdiff_t send(const(void)[] buff) @safe
738     in {assert(isConnected);}
739     body {
740         return s.send(buff);
741     }
742     
743     ptrdiff_t receive(void[] buff) @safe {
744         auto r = s.receive(buff);
745         if ( r > 0 ) {
746             buff.length = r;
747         }
748         return r;
749     }
750 }
751 
752 public class SSLSocketStream: SocketStream {
753     override void open(AddressFamily fa) {
754         if ( s !is null ) {
755             s.close();
756         }
757         s = new OpenSslSocket(fa);
758         assert(s !is null, "Can't create socket");
759         __isOpen = true;
760     }
761 }
762 
763 public class TCPSocketStream : SocketStream {
764     override void open(AddressFamily fa) {
765         if ( s !is null ) {
766             s.close();
767         }
768         s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP);
769         assert(s !is null, "Can't create socket");
770         __isOpen = true;
771     }
772 }
773