1 module requests.http;
2 
3 private:
4 import std.algorithm;
5 import std.array;
6 import std.conv;
7 import std.datetime;
8 import std.exception;
9 import std.format;
10 import std.stdio;
11 import std.range;
12 import std.socket;
13 import std.string;
14 import std.traits;
15 import std.typecons;
16 import std.experimental.logger;
17 import core.thread;
18 import core.stdc.errno;
19 import requests.streams;
20 import requests.uri;
21 import requests.utils;
22 
23 static this() {
24     globalLogLevel(LogLevel.error);
25 }
26 
27 extern(C) {
28     int SSL_library_init();
29     void OpenSSL_add_all_ciphers();
30     void OpenSSL_add_all_digests();
31     void SSL_load_error_strings();
32     
33     struct SSL {}
34     struct SSL_CTX {}
35     struct SSL_METHOD {}
36     
37     SSL_CTX* SSL_CTX_new(const SSL_METHOD* method);
38     SSL* SSL_new(SSL_CTX*);
39     int SSL_set_fd(SSL*, int);
40     int SSL_connect(SSL*);
41     int SSL_write(SSL*, const void*, int);
42     int SSL_read(SSL*, void*, int);
43     int SSL_shutdown(SSL*) @trusted @nogc nothrow;
44     void SSL_free(SSL*);
45     void SSL_CTX_free(SSL_CTX*);
46     
47     long    SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg);
48     
49     long SSL_CTX_set_mode(SSL_CTX *ctx, long mode);
50     long SSL_set_mode(SSL *ssl, long mode);
51     
52     long SSL_CTX_get_mode(SSL_CTX *ctx);
53     long SSL_get_mode(SSL *ssl);
54     
55     SSL_METHOD* SSLv3_client_method();
56     SSL_METHOD* TLSv1_2_client_method();
57     SSL_METHOD* TLSv1_client_method();
58 }
59 
60 pragma(lib, "crypto");
61 pragma(lib, "ssl");
62 
63 shared static this() {
64     SSL_library_init();
65     OpenSSL_add_all_ciphers();
66     OpenSSL_add_all_digests();
67     SSL_load_error_strings();
68 }
69 
70 class OpenSslSocket : Socket {
71     enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L;
72     private SSL* ssl;
73     private SSL_CTX* ctx;
74     private void initSsl() {
75         //ctx = SSL_CTX_new(SSLv3_client_method());
76         ctx = SSL_CTX_new(TLSv1_client_method());
77         assert(ctx !is null);
78         
79         //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS);
80         //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null);
81         ssl = SSL_new(ctx);
82         SSL_set_fd(ssl, this.handle);
83     }
84     
85     @trusted
86     override void connect(Address to) {
87         super.connect(to);
88         if(SSL_connect(ssl) == -1)
89             throw new Exception("ssl connect failed");
90     }
91     
92     @trusted
93     override ptrdiff_t send(const(void)[] buf, SocketFlags flags) {
94         return SSL_write(ssl, buf.ptr, cast(uint) buf.length);
95     }
96     override ptrdiff_t send(const(void)[] buf) {
97         return send(buf, SocketFlags.NONE);
98     }
99     @trusted
100     override ptrdiff_t receive(void[] buf, SocketFlags flags) {
101         return SSL_read(ssl, buf.ptr, cast(int)buf.length);
102     }
103     override ptrdiff_t receive(void[] buf) {
104         return receive(buf, SocketFlags.NONE);
105     }
106     this(AddressFamily af, SocketType type = SocketType.STREAM) {
107         super(af, type);
108         initSsl();
109     }
110     
111     this(socket_t sock, AddressFamily af) {
112         super(sock, af);
113         initSsl();
114     }
115     override void close() {
116         //SSL_shutdown(ssl);
117         super.close();
118     }
119     ~this() {
120         SSL_free(ssl);
121         SSL_CTX_free(ctx);
122     }
123 }
124 
125 unittest {
126     struct S {
127         private {
128             int    __i;
129             string __s;
130             bool   __b;
131         }
132         mixin(getter("i"));
133         mixin(setter("i"));
134         mixin(getter("b"));
135     }
136     S s;
137     assert(s.i == 0);
138     s.i = 1;
139     assert(s.i == 1);
140     assert(s.b == false);
141 }
142 
143 public class RequestException: Exception {
144     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure {
145         super(msg, file, line);
146     }
147 }
148 
149 public class ConnectError: Exception {
150     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure {
151         super(msg, file, line);
152     }
153 }
154 
155 public class TimeoutException: Exception {
156     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure {
157         super(msg, file, line);
158     }
159 }
160 
161 interface Auth {
162     string[string] authHeaders(string domain);
163 }
164 /**
165  * Basic authentication.
166  * Adds $(B Authorization: Basic) header to request.
167  */
168 public class BasicAuthentication: Auth {
169     private {
170         string   _username, _password;
171         string[] _domains;
172     }
173     /// Constructor.
174     /// Params:
175     /// username = username
176     /// password = password
177     /// domains = not used now
178     /// 
179     this(string username, string password, string[] domains = []) {
180         _username = username;
181         _password = password;
182         _domains = domains;
183     }
184     override string[string] authHeaders(string domain) {
185         import std.base64;
186         string[string] auth;
187         auth["Authorization"] = "Basic " ~ to!string(Base64.encode(cast(ubyte[])"%s:%s".format(_username, _password)));
188         return auth;
189     }
190 }
191 
192 
193 abstract class SocketStream {
194     private {
195         Duration timeout;
196         Socket   s;
197         bool     __isOpen;
198         bool     __isConnected;
199     }
200     void open(AddressFamily fa) {
201     }
202     @property bool isOpen() @safe @nogc pure const {
203         return s && __isOpen;
204     }
205     @property bool isConnected() @safe @nogc pure const {
206         return s && __isConnected;
207     }
208     void close() {
209         tracef("Close socket");
210         if ( isOpen ) {
211             s.close();
212             __isOpen = false;
213             __isConnected = false;
214         }
215         s = null;
216     }
217 
218     auto connect(string host, ushort port, Duration timeout = 10.seconds) {
219         tracef(format("Create connection to %s:%d", host, port));
220         Address[] addresses;
221         __isConnected = false;
222         try {
223             addresses = getAddress(host, port);
224         } catch (Exception e) {
225             errorf("Failed to connect: can't resolve %s - %s", host, e.msg);
226             throw new ConnectError("Can't connect to %s:%d: %s".format(host, port, e.msg));
227         }
228         foreach(a; addresses) {
229             tracef("Trying %s", a);
230             try {
231                 open(a.addressFamily);
232                 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout);
233                 s.connect(a);
234                 tracef("Connected to %s", a);
235                 __isConnected = true;
236                 break;
237             } catch (SocketException e) {
238                 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg);
239                 s.close();
240             }
241         }
242         if ( !__isConnected ) {
243             throw new ConnectError("Can't connect to %s:%d".format(host, port));
244         }
245         return this;
246     }
247 
248     ptrdiff_t send(const(void)[] buff)
249     in {assert(isConnected);}
250     body {
251         return s.send(buff);
252     }
253 
254     ptrdiff_t receive(void[] buff) {
255         auto r = s.receive(buff);
256         if ( r > 0 ) {
257             buff.length = r;
258         }
259         return r;
260     }
261 }
262 
263 class SSLSocketStream: SocketStream {
264     override void open(AddressFamily fa) {
265         if ( s !is null ) {
266             s.close();
267         }
268         s = new OpenSslSocket(fa);
269         assert(s !is null, "Can't create socket");
270         __isOpen = true;
271     }
272 }
273 class TCPSocketStream : SocketStream {
274     override void open(AddressFamily fa) {
275         if ( s !is null ) {
276             s.close();
277         }
278         s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP);
279         assert(s !is null, "Can't create socket");
280         __isOpen = true;
281     }
282 }
283 
284 /**
285  * Call GET, and return response content.
286  * This is the simplest case, when all you need is the response body.
287  * Returns:
288  * Buffer!ubyte which you can use as ForwardRange or DirectAccessRange, or extract data with .data() method.
289  */
290 public auto getContent(A...)(A args) {
291     auto rq = Request();
292     rq.addHeaders(["Accept-Encoding":"gzip, deflate"]);
293     auto rs = rq.exec!"GET"(args);
294     return rs.responseBody;
295 }
296 ///
297 public unittest {
298     globalLogLevel(LogLevel.info);
299     auto r = getContent("https://httpbin.org/stream/20");
300     assert(r.splitter('\n').filter!("a.length>0").count == 20);
301 }
302 
303 /**
304  * Call post and return response content.
305  */
306 public auto postContent(A...)(A args) {
307     auto rq = Request();
308     rq.addHeaders(["Accept-Encoding":"gzip, deflate"]);
309     auto rs = rq.exec!"POST"(args);
310     return rs.responseBody;
311 }
312 ///
313 public unittest {
314     import std.json;
315     globalLogLevel(LogLevel.info);
316     auto r = postContent("http://httpbin.org/post", `{"a":"b", "c":1}`, "application/json");
317     assert(parseJSON(r.data).object["json"].object["c"].integer == 1);
318 }
319 
320 ///
321 /// Response - result of request execution.
322 ///
323 /// Response.code - response code
324 /// Response.status_line - received status line
325 /// Response.responseBody - container for received body
326 /// Response.history - for redirected responses contain all history
327 /// 
328 public struct Response {
329     private {
330         ushort         __code;
331         string         __status_line;
332         string[string] __responseHeaders;
333         Buffer!ubyte   __responseBody;
334         Response[]     __history; // redirects history
335         SysTime        __startedAt, __connectedAt, __requestSentAt, __finishedAt;
336     }
337    ~this() {
338         __responseHeaders = null;
339         __history.length = 0;
340     }
341     mixin(getter("code"));
342     mixin(getter("status_line"));
343     mixin(getter("responseHeaders"));
344     @property auto responseBody() inout pure @safe {
345         return __responseBody;
346     }
347     mixin(getter("history"));
348     private {
349         mixin(setter("code"));
350         mixin(setter("status_line"));
351         mixin(setter("responseHeaders"));
352         mixin(setter("responseBody"));
353     }
354     @property auto getStats() const pure @safe {
355         alias statTuple = Tuple!(Duration, "connectTime",
356                                  Duration, "sendTime",
357                                  Duration, "recvTime");
358         statTuple stat;
359         stat.connectTime = __connectedAt - __startedAt;
360         stat.sendTime = __requestSentAt - __connectedAt;
361         stat.recvTime = __finishedAt - __requestSentAt;
362         return stat;
363     }
364 }
365 
366 template rank(R) {
367     static if ( isInputRange!R ) {
368         enum size_t rank = 1 + rank!(ElementType!R);
369     } else {
370         enum size_t rank = 0;
371     }
372 }
373 unittest {
374     assert(rank!(char) == 0);
375     assert(rank!(string) == 1);
376     assert(rank!(ubyte[][]) == 2);
377 }
378 
379 static immutable ushort[] redirectCodes = [301, 302, 303];
380 
381 static string urlEncoded(string p) pure @safe {
382     immutable string[dchar] translationTable = [
383         ' ':  "%20", '!': "%21", '*': "%2A", '\'': "%27", '(': "%28", ')': "%29",
384         ';':  "%3B", ':': "%3A", '@': "%40", '&':  "%26", '=': "%3D", '+': "%2B",
385         '$':  "%24", ',': "%2C", '/': "%2F", '?':  "%3F", '#': "%23", '[': "%5B",
386         ']':  "%5D", '%': "%25",
387     ];
388     return p.translate(translationTable);
389 }
390 unittest {
391     assert(urlEncoded(`abc !#$&'()*+,/:;=?@[]`) == "abc%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D");
392 }
393 /**
394  * Struct to send multiple files in POST request.
395  */
396 public struct PostFile {
397     /// Path to the file to send.
398     string fileName;
399     /// Name of the field (if empty - send file base name)
400     string fieldName;
401     /// contentType of the file if not empty
402     string contentType;
403 }
404 ///
405 /// Request.
406 /// Configurable parameters:
407 /// $(B headers) - add any additional headers you'd like to send.
408 /// $(B authenticator) - class to send auth headers.
409 /// $(B keepAlive) - set true for keepAlive requests. default false.
410 /// $(B maxRedirects) - maximum number of redirects. default 10.
411 /// $(B maxHeadersLength) - maximum length of server response headers. default = 32KB.
412 /// $(B maxContentLength) - maximun content length. delault = 5MB.
413 /// $(B bufferSize) - send and receive buffer size. default = 16KB.
414 /// $(B verbosity) - level of verbosity(0 - nothing, 1 - headers, 2 - headers and body progress). default = 0.
415 /// $(B proxy) - set proxy url if needed. default - null.
416 /// 
417 public struct Request {
418     private {
419         string         __method = "GET";
420         URI            __uri;
421         string[string] __headers;
422         Auth           __authenticator;
423         bool           __keepAlive;
424         uint           __maxRedirects = 10;
425         size_t         __maxHeadersLength = 32 * 1024; // 32 KB
426         size_t         __maxContentLength = 5 * 1024 * 1024; // 5MB
427         ptrdiff_t      __contentLength;
428         SocketStream   __stream;
429         Duration       __timeout = 30.seconds;
430         Response       __response;
431         Response[]     __history; // redirects history
432         size_t         __bufferSize = 16*1024; // 16k
433         uint           __verbosity = 0;  // 0 - no output, 1 - headers, 2 - headers+body info
434         DataPipe!ubyte __bodyDecoder;
435         DecodeChunked  __unChunker;
436         string         __proxy;
437     }
438 
439     mixin(getter("keepAlive"));
440     mixin(setter("keepAlive"));
441     mixin(getter("method"));
442     mixin(setter("method"));
443     mixin(getter("timeout"));
444     mixin(setter("timeout"));
445     mixin(setter("authenticator"));
446     mixin(getter("maxContentLength"));
447     mixin(setter("maxContentLength"));
448     mixin(getter("maxRedirects"));
449     mixin(setter("maxRedirects"));
450     mixin(getter("maxHeadersLength"));
451     mixin(setter("maxHeadersLength"));
452     mixin(getter("bufferSize"));
453     mixin(setter("bufferSize"));
454     mixin(getter("verbosity"));
455     mixin(setter("verbosity"));
456     mixin(setter("proxy"));
457 
458     this(string uri) {
459         __uri = URI(uri);
460     }
461    ~this() {
462         if ( __stream && __stream.isConnected) {
463             __stream.close();
464         }
465         __stream = null;
466         __headers = null;
467         __authenticator = null;
468         __history = null;
469     }
470     /// Add headers to request
471     /// Params:
472     /// headers = headers to send.
473     void addHeaders(in string[string] headers) {
474         foreach(pair; headers.byKeyValue) {
475             __headers[pair.key] = pair.value;
476         }
477     }
478     ///
479     /// compose headers to send
480     /// 
481     private @property string[string] headers() {
482         string[string] generatedHeaders;
483         if ( __authenticator ) {
484             foreach(pair; __authenticator.authHeaders(__uri.host).byKeyValue) {
485                 generatedHeaders[pair.key] = pair.value;
486             }
487         }
488 
489         generatedHeaders["Connection"] = __keepAlive?"Keep-Alive":"Close";
490         generatedHeaders["Host"] = __uri.host;
491         if ( __uri.scheme !in standard_ports || __uri.port != standard_ports[__uri.scheme] ) {
492             generatedHeaders["Host"] ~= ":%d".format(__uri.port);
493         }
494         foreach(pair; __headers.byKeyValue) {
495             generatedHeaders[pair.key] = pair.value;
496         }
497         return generatedHeaders;
498     }
499     ///
500     /// Build request string.
501     /// Handle proxy and query parameters.
502     /// 
503     private @property string requestString(string[string] params = null) {
504         if ( __proxy ) {
505             return "%s %s HTTP/1.1\r\n".format(__method, __uri.uri);
506         }
507         if ( __method != "GET" ) {
508             // encode params into url only for GET
509             return "%s %s HTTP/1.1\r\n".format(__method, __uri.path);
510         }
511         auto query = __uri.query.dup;
512         if ( params ) {
513             query ~= params2query(params);
514             if ( query[0] != '?' ) {
515                 query = "?" ~ query;
516             }
517         }
518         return "%s %s%s HTTP/1.1\r\n".format(__method, __uri.path, query);
519     }
520     ///
521     /// encode parameters and build query part of the url
522     /// 
523     private static string params2query(string[string] params) {
524         auto m = params.keys.
525                         sort().
526                         map!(a=>urlEncoded(a) ~ "=" ~ urlEncoded(params[a])).
527                         join("&");
528         return m;
529     }
530     unittest {
531         assert(Request.params2query(["c ":"d", "a":"b"])=="a=b&c%20=d");
532     }
533     ///
534     /// Analyze received headers, take appropriate actions:
535     /// check content length, attach unchunk and uncompress
536     /// 
537     private void analyzeHeaders(in string[string] headers) {
538 
539         __contentLength = -1;
540         __unChunker = null;
541         auto contentLength = "content-length" in headers;
542         if ( contentLength ) {
543             try {
544                 __contentLength = to!ptrdiff_t(*contentLength);
545                 if ( __contentLength > maxContentLength) {
546                     throw new RequestException("ContentLength > maxContentLength (%d>%d)".
547                                 format(__contentLength, __maxContentLength));
548                 }
549             } catch (ConvException e) {
550                 throw new RequestException("Can't convert Content-Length from %s".format(*contentLength));
551             }
552         }
553         auto transferEncoding = "transfer-encoding" in headers;
554         if ( transferEncoding ) {
555             tracef("transferEncoding: %s", *transferEncoding);
556             if ( *transferEncoding == "chunked") {
557                 __unChunker = new DecodeChunked();
558                 __bodyDecoder.insert(__unChunker);
559             }
560         }
561         auto contentEncoding = "content-encoding" in headers;
562         if ( contentEncoding ) switch (*contentEncoding) {
563             default:
564                 throw new RequestException("Unknown content-encoding " ~ *contentEncoding);
565             case "gzip":
566             case "deflate":
567                 __bodyDecoder.insert(new Decompressor!ubyte);
568         }
569     }
570     ///
571     /// Called when we know that all headers already received in buffer
572     /// 1. Split headers on lines
573     /// 2. store status line, store response code
574     /// 3. unfold headers if needed
575     /// 4. store headers
576     /// 
577     private void parseResponseHeaders(ref Buffer!ubyte buffer) {
578         string lastHeader;
579         foreach(line; buffer.data!(string).split("\n").map!(l => l.stripRight)) {
580             if ( ! __response.status_line.length ) {
581                 tracef("statusLine: %s", line);
582                 __response.status_line = line;
583                 if ( __verbosity >= 1 ) {
584                     writefln("< %s", line);
585                 }
586                 auto parsed = line.split(" ");
587                 if ( parsed.length >= 3 ) {
588                     __response.code = parsed[1].to!ushort;
589                 }
590                 continue;
591             }
592             if ( line[0] == ' ' || line[0] == '\t' ) {
593                 // unfolding https://tools.ietf.org/html/rfc822#section-3.1
594                 auto stored = lastHeader in __response.__responseHeaders;
595                 if ( stored ) {
596                     *stored ~= line;
597                 }
598                 continue;
599             }
600             auto parsed = line.findSplit(":");
601             auto header = parsed[0].toLower;
602             auto value = parsed[2].strip;
603             auto stored = __response.responseHeaders.get(header, null);
604             if ( stored ) {
605                 value = stored ~ ", " ~ value;
606             }
607             __response.__responseHeaders[header] = value;
608             if ( __verbosity >= 1 ) {
609                 writefln("< %s: %s", parsed[0], value);
610             }
611 
612             tracef("Header %s = %s", header, value);
613             lastHeader = header;
614         }
615     }
616 
617     ///
618     /// Do we received \r\n\r\n?
619     /// 
620     private bool headersHaveBeenReceived(in ubyte[] data, ref Buffer!ubyte buffer, out string separator) pure const @safe {
621         foreach(s; ["\r\n\r\n", "\n\n"]) {
622             if ( data.canFind(s) || buffer.canFind(s) ) {
623                 separator = s;
624                 return true;
625             }
626         }
627         return false;
628     }
629 
630     private bool followRedirectResponse() {
631         __history ~= __response;
632         if ( __history.length >= __maxRedirects ) {
633             return false;
634         }
635         auto location = "location" in __response.responseHeaders;
636         if ( !location ) {
637             return false;
638         }
639         auto connection = "connection" in __response.__responseHeaders;
640         if ( !connection || *connection == "close" ) {
641             tracef("Closing connection because of 'Connection: close' or no 'Connection' header");
642             __stream.close();
643         }
644         URI oldURI = __uri;
645         URI newURI = oldURI;
646         try {
647             newURI = URI(*location);
648         } catch (UriException e) {
649             trace("Can't parse Location:, try relative uri");
650             newURI.path = *location;
651             newURI.uri = newURI.recalc_uri;
652         }
653         handleURLChange(oldURI, newURI);
654         __uri = newURI;
655         __response = Response.init;
656         return true;
657     }
658     ///
659     /// If uri changed so that we have to change host or port, then we have to close socket stream
660     /// 
661     private void handleURLChange(in URI from, in URI to) {
662         if ( __stream !is null && __stream.isConnected && 
663             ( from.scheme != to.scheme || from.host != to.host || from.port != to.port) ) {
664             tracef("Have to reopen stream");
665             __stream.close();
666         }
667     }
668     
669     private void checkURL(string url, string file=__FILE__, size_t line=__LINE__) {
670         if (url is null && __uri.uri == "" ) {
671             throw new RequestException("No url configured", file, line);
672         }
673         
674         if ( url !is null ) {
675             URI newURI = URI(url);
676             handleURLChange(__uri, newURI);
677             __uri = newURI;
678         }
679     }
680     ///
681     /// Setup connection. Handle proxy and https case
682     /// 
683     private void setupConnection() {
684         if ( !__stream || !__stream.isConnected ) {
685             tracef("Set up new connection");
686             URI   uri;
687             if ( __proxy ) {
688                 // use proxy uri to connect
689                 uri.uri_parse(__proxy);
690             } else {
691                 // use original uri
692                 uri = __uri;
693             }
694             final switch (uri.scheme) {
695                 case "http":
696                     __stream = new TCPSocketStream().connect(uri.host, uri.port, __timeout);
697                     break;
698                 case "https":
699                     __stream = new SSLSocketStream().connect(uri.host, uri.port, __timeout);
700                     break;
701             }
702         } else {
703             tracef("Use old connection");
704         }
705     }
706     ///
707     /// Receive response after request we sent.
708     /// Find headers, split on headers and body, continue to receive body
709     /// 
710     private void receiveResponse() {
711 
712         __stream.s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout);
713         scope(exit) {
714             __stream.s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, 0.seconds);
715         }
716 
717         __bodyDecoder = new DataPipe!ubyte();
718         auto b = new ubyte[__bufferSize];
719         scope(exit) {
720             __bodyDecoder = null;
721             __unChunker = null;
722             b = null;
723         }
724 
725         auto buffer = Buffer!ubyte();
726         Buffer!ubyte ResponseHeaders, partialBody;
727         size_t receivedBodyLength;
728         ptrdiff_t read;
729         string separator;
730         
731         while(true) {
732             read = __stream.receive(b);
733             tracef("read: %d", read);
734             if ( read < 0 ) {
735                 if ( errno == EAGAIN ) {
736                     throw new TimeoutException("Timeout receiving headers");
737                 }
738                 throw new ErrnoException("receiving Headers");
739             }
740             if ( read == 0 ) {
741                 break;
742             }
743             
744             auto data = b[0..read];
745             buffer.put(data);
746             if ( buffer.length > maxHeadersLength ) {
747                 throw new RequestException("Headers length > maxHeadersLength (%d > %d)".format(buffer.length, maxHeadersLength));
748             }
749             if ( headersHaveBeenReceived(data, buffer, separator) ) {
750                 auto s = buffer.data!(ubyte[]).findSplit(separator);
751                 ResponseHeaders = Buffer!ubyte(s[0]);
752                 partialBody = Buffer!ubyte(s[2]);
753                 receivedBodyLength += partialBody.length;
754                 parseResponseHeaders(ResponseHeaders);
755                 break;
756             }
757         }
758         
759         analyzeHeaders(__response.__responseHeaders);
760         __bodyDecoder.put(partialBody);
761 
762         if ( __verbosity >= 2 ) {
763             writefln("< %d bytes of body received", partialBody.length);
764         }
765 
766         if ( __method == "HEAD" ) {
767             // HEAD response have ContentLength, but have no body
768             return;
769         }
770 
771         while( true ) {
772             if ( __contentLength >= 0 && receivedBodyLength >= __contentLength ) {
773                 trace("Body received.");
774                 break;
775             }
776             if ( __unChunker && __unChunker.done ) {
777                 break;
778             }
779             read = __stream.receive(b);
780             if ( read < 0 ) {
781                 if ( errno == EAGAIN ) {
782                     throw new TimeoutException("Timeout receiving body");
783                 }
784                 throw new ErrnoException("receiving body");
785             }
786             if ( __verbosity >= 2 ) {
787                 writefln("< %d bytes of body received", read);
788             }
789             tracef("read: %d", read);
790             if ( read == 0 ) {
791                 trace("read done");
792                 break;
793             }
794             receivedBodyLength += read;
795             __bodyDecoder.put(b[0..read].dup);
796             __response.__responseBody.put(__bodyDecoder.get());
797             tracef("receivedTotal: %d, contentLength: %d, bodyLength: %d", receivedBodyLength, __contentLength, __response.__responseBody.length);
798         }
799         __bodyDecoder.flush();
800         __response.__responseBody.put(__bodyDecoder.get());
801     }
802     ///
803     /// execute POST request.
804     /// Send form-urlencoded data
805     /// 
806     /// Parameters:
807     ///     url = url to request
808     ///     rqData = data to send
809     ///  Returns:
810     ///     Response
811     ///  Examples:
812     ///  ------------------------------------------------------------------
813     ///  rs = rq.exec!"POST"("http://httpbin.org/post", ["a":"b", "c":"d"]);
814     ///  ------------------------------------------------------------------
815     ///
816     Response exec(string method)(string url, string[string] rqData) if (method=="POST") {
817         //
818         // application/x-www-form-urlencoded
819         //
820         __method = method;
821 
822         __response = Response.init;
823         checkURL(url);
824     connect:
825         __response.__startedAt = Clock.currTime;
826         setupConnection();
827         
828         if ( !__stream.isConnected() ) {
829             return __response;
830         }
831         __response.__connectedAt = Clock.currTime;
832 
833         string encoded = params2query(rqData);
834         auto h = headers;
835         h["Content-Type"] = "application/x-www-form-urlencoded";
836         h["Content-Length"] = to!string(encoded.length);
837 
838         Appender!string req;
839         req.put(requestString());
840         h.byKeyValue.
841             map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n").
842                 each!(h => req.put(h));
843         req.put("\r\n");
844         req.put(encoded);
845         trace(req.data);
846 
847         if ( __verbosity >= 1 ) {
848             req.data.splitLines.each!(a => writeln("> " ~ a));
849         }
850 
851         auto rc = __stream.send(req.data());
852         if ( rc == -1 ) {
853             errorf("Error sending request: ", lastSocketError);
854             return __response;
855         }
856         __response.__requestSentAt = Clock.currTime;
857 
858         receiveResponse();
859 
860         __response.__finishedAt = Clock.currTime;
861 
862         auto connection = "connection" in __response.__responseHeaders;
863         if ( !connection || *connection == "close" ) {
864             tracef("Closing connection because of 'Connection: close' or no 'Connection' header");
865             __stream.close();
866         }
867         if ( canFind(redirectCodes, __response.__code) && followRedirectResponse() ) {
868             if ( __method != "GET" ) {
869                 return this.get();
870             }
871             goto connect;
872         }
873         __response.__history = __history;
874         return __response;
875     }
876     ///
877     /// send file(s) using POST
878     /// Parameters:
879     ///     url = url
880     ///     files = array of PostFile structures
881     /// Returns:
882     ///     Response
883     /// Example:
884     /// ---------------------------------------------------------------
885     ///    PostFile[] files = [
886     ///                   {fileName:"tests/abc.txt", fieldName:"abc", contentType:"application/octet-stream"}, 
887     ///                   {fileName:"tests/test.txt"}
888     ///               ];
889     ///    rs = rq.exec!"POST"("http://httpbin.org/post", files);
890     /// ---------------------------------------------------------------
891     /// 
892     Response exec(string method="POST")(string url, PostFile[] files) {
893         import std.uuid;
894         import std.file;
895         //
896         // application/json
897         //
898         bool restartedRequest = false;
899         
900         __method = method;
901         
902         __response = Response.init;
903         checkURL(url);
904     connect:
905         __response.__startedAt = Clock.currTime;
906         setupConnection();
907         
908         if ( !__stream.isConnected() ) {
909             return __response;
910         }
911         __response.__connectedAt = Clock.currTime;
912 
913         Appender!string req;
914         req.put(requestString());
915         
916         string   boundary = randomUUID().toString;
917         string[] partHeaders;
918         size_t   contentLength;
919 
920         foreach(part; files) {
921             string fieldName = part.fieldName ? part.fieldName : part.fileName;
922             string h = "--" ~ boundary ~ "\r\n";
923             h ~= `Content-Disposition: form-data; name="%s"; filename="%s"`.
924                 format(fieldName, part.fileName) ~ "\r\n";
925             if ( part.contentType ) {
926                 h ~= "Content-Type: " ~ part.contentType ~ "\r\n";
927             }
928             h ~= "\r\n";
929             partHeaders ~= h;
930             contentLength += h.length + getSize(part.fileName) + "\r\n".length;
931         }
932         contentLength += "--".length + boundary.length + "--\r\n".length;
933 
934         auto h = headers;
935         h["Content-Type"] = "multipart/form-data; boundary=" ~ boundary;
936         h["Content-Length"] = to!string(contentLength);
937         h.byKeyValue.
938             map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n").
939             each!(h => req.put(h));
940         req.put("\r\n");
941         
942         trace(req.data);
943         if ( __verbosity >= 1 ) {
944             req.data.splitLines.each!(a => writeln("> " ~ a));
945         }
946 
947         auto rc = __stream.send(req.data());
948         if ( rc == -1 ) {
949             errorf("Error sending request: ", lastSocketError);
950             return __response;
951         }
952         foreach(hdr, f; zip(partHeaders, files)) {
953             tracef("sending part headers <%s>", hdr);
954             __stream.send(hdr);
955             auto file = File(f.fileName, "rb");
956             scope(exit) {
957                 file.close();
958             }
959             foreach(chunk; file.byChunk(16*1024)) {
960                 __stream.send(chunk);
961             }
962             __stream.send("\r\n");
963         }
964         __stream.send("--" ~ boundary ~ "--\r\n");
965         __response.__requestSentAt = Clock.currTime;
966 
967         receiveResponse();
968 
969         if ( __response.__responseHeaders.length == 0 
970             && __keepAlive
971             && !restartedRequest
972             && __method == "GET"
973             ) {
974             tracef("Server closed keepalive connection");
975             __stream.close();
976             restartedRequest = true;
977             goto connect;
978         }
979 
980         __response.__finishedAt = Clock.currTime;
981         ///
982         auto connection = "connection" in __response.__responseHeaders;
983         if ( !connection || *connection == "close" ) {
984             tracef("Closing connection because of 'Connection: close' or no 'Connection' header");
985             __stream.close();
986         }
987         if ( canFind(redirectCodes, __response.__code) && followRedirectResponse() ) {
988             if ( __method != "GET" ) {
989                 return this.get();
990             }
991             goto connect;
992         }
993         __response.__history = __history;
994         ///
995         return __response;
996     }
997     ///
998     /// POST data from some string(with Content-Length), or from range of strings (use Transfer-Encoding: chunked)
999     /// 
1000     /// Parameters:
1001     ///    url = url
1002     ///    content = string or input range
1003     ///    contentType = content type
1004     ///  Returns:
1005     ///     Response
1006     ///  Examples:
1007     ///  ---------------------------------------------------------------------------------------------------------
1008     ///      rs = rq.exec!"POST"("http://httpbin.org/post", "привiт, свiт!", "application/octet-stream");
1009     ///      
1010     ///      auto s = lineSplitter("one,\ntwo,\nthree.");
1011     ///      rs = rq.exec!"POST"("http://httpbin.org/post", s, "application/octet-stream");
1012     ///      
1013     ///      auto s = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
1014     ///      rs = rq.exec!"POST"("http://httpbin.org/post", s.representation.chunks(10), "application/octet-stream");
1015     ///
1016     ///      auto f = File("tests/test.txt", "rb");
1017     ///      rs = rq.exec!"POST"("http://httpbin.org/post", f.byChunk(3), "application/octet-stream");
1018     ///  --------------------------------------------------------------------------------------------------------
1019     Response exec(string method="POST", R)(string url, R content, string contentType="text/html")
1020         if ( isSomeString!R
1021             || (rank!R == 2 && isSomeChar!(Unqual!(typeof(content.front.front)))) 
1022             || (rank!R == 2 && (is(Unqual!(typeof(content.front.front)) == ubyte)))
1023             )
1024     {
1025         //
1026         // application/json
1027         //
1028         bool restartedRequest = false;
1029         
1030         __method = method;
1031         
1032         __response = Response.init;
1033         checkURL(url);
1034     connect:
1035         __response.__startedAt = Clock.currTime;
1036         setupConnection();
1037         
1038         if ( !__stream.isConnected() ) {
1039             return __response;
1040         }
1041         __response.__connectedAt = Clock.currTime;
1042 
1043         Appender!string req;
1044         req.put(requestString());
1045 
1046         auto h = headers;
1047         h["Content-Type"] = contentType;
1048         static if ( isSomeString!R ) {
1049             h["Content-Length"] = to!string(content.length);
1050         } else {
1051             h["Transfer-Encoding"] = "chunked";
1052         }
1053         h.byKeyValue.
1054             map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n").
1055             each!(h => req.put(h));
1056         req.put("\r\n");
1057 
1058         trace(req.data);
1059         if ( __verbosity >= 1 ) {
1060             req.data.splitLines.each!(a => writeln("> " ~ a));
1061         }
1062 
1063         auto rc = __stream.send(req.data());
1064         if ( rc == -1 ) {
1065             errorf("Error sending request: ", lastSocketError);
1066             return __response;
1067         }
1068 
1069         static if ( isSomeString!R ) {
1070             __stream.send(content);
1071         } else {
1072             while ( !content.empty ) {
1073                 auto chunk = content.front;
1074                 auto chunkHeader = "%x\r\n".format(chunk.length);
1075                 tracef("sending %s%s", chunkHeader, chunk);
1076                 __stream.send(chunkHeader);
1077                 __stream.send(chunk);
1078                 __stream.send("\r\n");
1079                 content.popFront;
1080             }
1081             tracef("sent");
1082             __stream.send("0\r\n\r\n");
1083         }
1084         __response.__requestSentAt = Clock.currTime;
1085 
1086         receiveResponse();
1087 
1088         if ( __response.__responseHeaders.length == 0 
1089             && __keepAlive
1090             && !restartedRequest
1091             && __method == "GET"
1092             ) {
1093             tracef("Server closed keepalive connection");
1094             __stream.close();
1095             restartedRequest = true;
1096             goto connect;
1097         }
1098 
1099         __response.__finishedAt = Clock.currTime;
1100 
1101         ///
1102         auto connection = "connection" in __response.__responseHeaders;
1103         if ( !connection || *connection == "close" ) {
1104             tracef("Closing connection because of 'Connection: close' or no 'Connection' header");
1105             __stream.close();
1106         }
1107         if ( canFind(redirectCodes, __response.__code) && followRedirectResponse() ) {
1108             if ( __method != "GET" ) {
1109                 return this.get();
1110             }
1111             goto connect;
1112         }
1113         ///
1114         __response.__history = __history;
1115         return __response;
1116     }
1117     ///
1118     /// Send request without data
1119     /// Request parameters will be encoded into request string
1120     /// Parameters:
1121     ///     url = url
1122     ///     params = request parameters
1123     ///  Returns:
1124     ///     Response
1125     ///  Examples:
1126     ///  ---------------------------------------------------------------------------------
1127     ///     rs = Request().exec!"GET"("http://httpbin.org/get", ["c":"d", "a":"b"]);
1128     ///  ---------------------------------------------------------------------------------
1129     ///     
1130     Response exec(string method="GET")(string url = null, string[string] params = null) if (method != "POST")
1131     {
1132 
1133         __method = method;
1134         __response = Response.init;
1135         __history.length = 0;
1136         bool restartedRequest = false; // True if this is restarted keepAlive request
1137 
1138         checkURL(url);
1139 
1140     connect:
1141         __response.__startedAt = Clock.currTime;
1142         setupConnection();
1143 
1144         if ( !__stream.isConnected() ) {
1145             return __response;
1146         }
1147         __response.__connectedAt = Clock.currTime;
1148 
1149         Appender!string req;
1150         req.put(requestString(params));
1151         headers.byKeyValue.
1152             map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n").
1153             each!(h => req.put(h));
1154         req.put("\r\n");
1155         trace(req.data);
1156 
1157         if ( __verbosity >= 1 ) {
1158             req.data.splitLines.each!(a => writeln("> " ~ a));
1159         }
1160         auto rc = __stream.send(req.data());
1161         if ( rc == -1 ) {
1162             errorf("Error sending request: ", lastSocketError);
1163             return __response;
1164         }
1165         __response.__requestSentAt = Clock.currTime;
1166 
1167         receiveResponse();
1168 
1169         if ( __response.__responseHeaders.length == 0 
1170             && __keepAlive
1171             && !restartedRequest
1172             && __method == "GET"
1173         ) {
1174             tracef("Server closed keepalive connection");
1175             __stream.close();
1176             restartedRequest = true;
1177             goto connect;
1178         }
1179         __response.__finishedAt = Clock.currTime;
1180 
1181         ///
1182         auto connection = "connection" in __response.__responseHeaders;
1183         if ( !connection || *connection == "close" ) {
1184             tracef("Closing connection because of 'Connection: close' or no 'Connection' header");
1185             __stream.close();
1186         }
1187         if ( __verbosity >= 1 ) {
1188             writeln(">> Connect time: ", __response.__connectedAt - __response.__startedAt);
1189             writeln(">> Request send time: ", __response.__requestSentAt - __response.__connectedAt);
1190             writeln(">> Response recv time: ", __response.__finishedAt - __response.__requestSentAt);
1191         }
1192         if ( canFind(redirectCodes, __response.__code) && followRedirectResponse() ) {
1193             if ( __method != "GET" ) {
1194                 return this.get();
1195             }
1196             goto connect;
1197         }
1198         ///
1199         __response.__history = __history;
1200         return __response;
1201     }
1202     ///
1203     /// GET request. Simple wrapper over exec!"GET"
1204     /// Params:
1205     /// args = request parameters. see exec docs.
1206     ///
1207     Response get(A...)(A args) {
1208         return exec!"GET"(args);
1209     }
1210     ///
1211     /// POST request. Simple wrapper over exec!"POST"
1212     /// Params:
1213     /// args = request parameters. see exec docs.
1214     ///
1215     Response post(A...)(A args) {
1216         return exec!"POST"(args);
1217     }
1218 }
1219 
1220 ///
1221 public unittest {
1222     import std.json;
1223     globalLogLevel(LogLevel.info);
1224     tracef("http tests - start");
1225 
1226     auto rq = Request();
1227     auto rs = rq.get("https://httpbin.org/");
1228     assert(rs.code==200);
1229     assert(rs.responseBody.length > 0);
1230     rs = Request().get("http://httpbin.org/get", ["c":" d", "a":"b"]);
1231     assert(rs.code == 200);
1232     auto json = parseJSON(rs.responseBody.data).object["args"].object;
1233     assert(json["c"].str == " d");
1234     assert(json["a"].str == "b");
1235 
1236     globalLogLevel(LogLevel.info);
1237     rq = Request();
1238     rq.keepAlive = true;
1239     // handmade json
1240     info("Check POST json");
1241     rs = rq.post("http://httpbin.org/post", `{"a":"☺ ", "c":[1,2,3]}`, "application/json");
1242     assert(rs.code==200);
1243     json = parseJSON(rs.responseBody.data).object["json"].object;
1244     assert(json["a"].str == "☺ ");
1245     assert(json["c"].array.map!(a=>a.integer).array == [1,2,3]);
1246     {
1247         // files
1248         globalLogLevel(LogLevel.info);
1249         info("Check POST files");
1250         PostFile[] files = [
1251                         {fileName:"tests/abc.txt", fieldName:"abc", contentType:"application/octet-stream"}, 
1252                         {fileName:"tests/test.txt"}
1253                     ];
1254         rs = rq.post("http://httpbin.org/post", files);
1255         assert(rs.code==200);
1256     }
1257     {
1258         // string
1259         info("Check POST utf8 string");
1260         rs = rq.post("http://httpbin.org/post", "привiт, свiт!", "application/octet-stream");
1261         assert(rs.code==200);
1262         auto data = parseJSON(rs.responseBody.data).object["data"].str;
1263         assert(data=="привiт, свiт!");
1264     }
1265     // ranges
1266     {
1267         info("Check POST chunked from lineSplitter");
1268         auto s = lineSplitter("one,\ntwo,\nthree.");
1269         rs = rq.exec!"POST"("http://httpbin.org/post", s, "application/octet-stream");
1270         assert(rs.code==200);
1271         auto data = parseJSON(rs.responseBody.toString).object["data"].str;
1272         assert(data=="one,two,three.");
1273     }
1274     {
1275         info("Check POST chunked from array");
1276         auto s = ["one,", "two,", "three."];
1277         rs = rq.post("http://httpbin.org/post", s, "application/octet-stream");
1278         assert(rs.code==200);
1279         auto data = parseJSON(rs.responseBody.data).object["data"].str;
1280         assert(data=="one,two,three.");
1281     }
1282     {
1283         info("Check POST chunked using std.range.chunks()");
1284         auto s = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
1285         rs = rq.post("http://httpbin.org/post", s.representation.chunks(10), "application/octet-stream");
1286         assert(rs.code==200);
1287         auto data = parseJSON(rs.responseBody.data).object["data"].str;
1288         assert(data==s);
1289     }
1290     {
1291         info("Check POST chunked from file.byChunk");
1292         auto f = File("tests/test.txt", "rb");
1293         rs = rq.post("http://httpbin.org/post", f.byChunk(3), "application/octet-stream");
1294         assert(rs.code==200);
1295         auto data = parseJSON(rs.responseBody.data).object["data"].str;
1296         assert(data=="abcdefgh\n12345678\n");
1297         f.close();
1298     }
1299     // associative array
1300     rs = rq.post("http://httpbin.org/post", ["a":"b ", "c":"d"]);
1301     assert(rs.code==200);
1302     auto form = parseJSON(rs.responseBody.data).object["form"].object;
1303     assert(form["a"].str == "b ");
1304     assert(form["c"].str == "d");
1305     info("Check HEAD");
1306     rs = rq.exec!"HEAD"("http://httpbin.org/");
1307     assert(rs.code==200);
1308     info("Check DELETE");
1309     rs = rq.exec!"DELETE"("http://httpbin.org/delete");
1310     assert(rs.code==200);
1311     info("Check PUT");
1312     rs = rq.exec!"PUT"("http://httpbin.org/put",  `{"a":"b", "c":[1,2,3]}`, "application/json");
1313     assert(rs.code==200);
1314     info("Check PATCH");
1315     rs = rq.exec!"PATCH"("http://httpbin.org/patch", "привiт, свiт!", "application/octet-stream");
1316     assert(rs.code==200);
1317 
1318     info("Check compressed content");
1319     globalLogLevel(LogLevel.info);
1320     rq = Request();
1321     rq.keepAlive = true;
1322     rq.addHeaders(["Accept-Encoding":"gzip"]);
1323     rs = rq.get("http://httpbin.org/gzip");
1324     assert(rs.code==200);
1325     info("gzip - ok");
1326     rq.addHeaders(["Accept-Encoding":"deflate"]);
1327     rs = rq.get("http://httpbin.org/deflate");
1328     assert(rs.code==200);
1329     info("deflate - ok");
1330 
1331     info("Check redirects");
1332     globalLogLevel(LogLevel.info);
1333     rq = Request();
1334     rq.keepAlive = true;
1335     rs = rq.get("http://httpbin.org/relative-redirect/2");
1336     assert(rs.history.length == 2);
1337     assert(rs.code==200);
1338 //    rq = Request();
1339 //    rq.keepAlive = true;
1340 //    rq.proxy = "http://localhost:8888/";
1341     rs = rq.get("http://httpbin.org/absolute-redirect/2");
1342     assert(rs.history.length == 2);
1343     assert(rs.code==200);
1344 //    rq = Request();
1345     rq.maxRedirects = 2;
1346     rq.keepAlive = false;
1347     rs = rq.get("https://httpbin.org/absolute-redirect/3");
1348     assert(rs.history.length == 2);
1349     assert(rs.code==302);
1350 
1351     info("Check utf8 content");
1352     globalLogLevel(LogLevel.info);
1353     rq = Request();
1354     rs = rq.get("http://httpbin.org/encoding/utf8");
1355     assert(rs.code==200);
1356 
1357     info("Check chunked content");
1358     globalLogLevel(LogLevel.info);
1359     rq = Request();
1360     rq.keepAlive = true;
1361     rq.bufferSize = 16*1024;
1362     rs = rq.get("http://httpbin.org/range/1024");
1363     assert(rs.code==200);
1364     assert(rs.responseBody.length==1024);
1365 
1366     info("Check basic auth");
1367     globalLogLevel(LogLevel.info);
1368     rq = Request();
1369     rq.authenticator = new BasicAuthentication("user", "passwd");
1370     rs = rq.get("http://httpbin.org/basic-auth/user/passwd");
1371     assert(rs.code==200);
1372  
1373     globalLogLevel(LogLevel.info);
1374     info("Check exception handling, error messages are OK");
1375     rq = Request();
1376     rq.timeout = 1.seconds;
1377     assertThrown!TimeoutException(rq.get("http://httpbin.org/delay/3"));
1378     assertThrown!ConnectError(rq.get("http://0.0.0.0:65000/"));
1379     assertThrown!ConnectError(rq.get("http://1.1.1.1/"));
1380     assertThrown!ConnectError(rq.get("http://gkhgkhgkjhgjhgfjhgfjhgf/"));
1381 
1382     globalLogLevel(LogLevel.info);
1383     info("Check limits");
1384     rq = Request();
1385     rq.maxContentLength = 1;
1386     assertThrown!RequestException(rq.get("http://httpbin.org/"));
1387     rq = Request();
1388     rq.maxHeadersLength = 1;
1389     assertThrown!RequestException(rq.get("http://httpbin.org/"));
1390     tracef("http tests - ok");
1391 }