1 module requests.http;
2 
3 private:
4 import std.algorithm;
5 import std.array;
6 import std.ascii;
7 import std.conv;
8 import std.datetime;
9 import std.exception;
10 import std.format;
11 import std.stdio;
12 import std.range;
13 import std..string;
14 import std.traits;
15 import std.typecons;
16 import std.experimental.logger;
17 import core.thread;
18 
19 import requests.streams;
20 import requests.uri;
21 import requests.utils;
22 import requests.base;
23 import requests.connmanager;
24 import requests.rangeadapter;
25 
26 static immutable ushort[] redirectCodes = [301, 302, 303, 307, 308];
27 
28 enum   HTTP11 = 101;
29 enum   HTTP10 = 100;
30 
31 static immutable string[string] proxies;
32 shared static this() {
33     import std.process;
34     proxies["http"] = environment.get("http_proxy", environment.get("HTTP_PROXY"));
35     proxies["https"] = environment.get("https_proxy", environment.get("HTTPS_PROXY"));
36     proxies["all"] = environment.get("all_proxy", environment.get("ALL_PROXY"));
37     foreach(p; proxies.byKey()) {
38         if (proxies[p] is null) {
39             continue;
40         }
41         URI u = URI(proxies[p]);
42     }
43 }
44 
45 public class MaxRedirectsException: Exception {
46     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
47         super(message, file, line, next);
48     }
49 }
50 
51 ///
52 ///
53 ///
54 //public auto queryParams(T...)(T params) pure nothrow @safe
55 //{
56 //    static assert (T.length % 2 == 0, "wrong args count");
57 //
58 //    QueryParam[] output;
59 //    output.reserve = T.length / 2;
60 //
61 //    void queryParamsHelper(T...)(T params, ref QueryParam[] output)
62 //    {
63 //        static if (T.length > 0)
64 //        {
65 //            output ~= QueryParam(params[0].to!string, params[1].to!string);
66 //            queryParamsHelper(params[2..$], output);
67 //        }
68 //    }
69 //
70 //    queryParamsHelper(params, output);
71 //    return output;
72 //}
73 
74 ///
75 /// Response - result of request execution.
76 ///
77 /// Response.code - response HTTP code.
78 /// Response.status_line - received HTTP status line.
79 /// Response.responseHeaders - received headers.
80 /// Response.responseBody - container for received body
81 /// Response.history - for redirected responses contain all history
82 ///
83 public class HTTPResponse : Response {
84     private {
85         string              _status_line;
86 
87         HTTPResponse[]      _history; // redirects history
88 
89         mixin(Setter!string("status_line"));
90 
91         int                 _version;
92     }
93 
94     ~this() {
95         _responseHeaders = null;
96         _history.length = 0;
97     }
98 
99     mixin(Getter("status_line"));
100 
101     @property final string[string] responseHeaders() @safe @nogc nothrow {
102         return _responseHeaders;
103     }
104     @property final HTTPResponse[] history() @safe @nogc nothrow {
105         return _history;
106     }
107 
108     private int parse_version(in string v) pure const nothrow @safe {
109         // try to parse HTTP/1.x to version
110         try if ( v.length > 5 ) {
111             return (v[5..$].split(".").map!"to!int(a)".array[0..2].reduce!((a,b) => a*100 + b));
112         } catch (Exception e) {
113         }
114         return 0;
115     }
116     unittest {
117         auto r = new HTTPResponse();
118         assert(r.parse_version("HTTP/1.1") == 101);
119         assert(r.parse_version("HTTP/1.0") == 100);
120         assert(r.parse_version("HTTP/0.9") == 9);
121         assert(r.parse_version("HTTP/xxx") == 0);
122     }
123 }
124 
125 ///
126 /// Request.
127 /// Configurable parameters:
128 /// $(B method) - string, method to use (GET, POST, ...)
129 /// $(B headers) - string[string], add any additional headers you'd like to send.
130 /// $(B authenticator) - class Auth, class to send auth headers.
131 /// $(B keepAlive) - bool, set true for keepAlive requests. default true.
132 /// $(B maxRedirects) - uint, maximum number of redirects. default 10.
133 /// $(B maxHeadersLength) - size_t, maximum length of server response headers. default = 32KB.
134 /// $(B maxContentLength) - size_t, maximun content length. delault - 0 = unlimited.
135 /// $(B bufferSize) - size_t, send and receive buffer size. default = 16KB.
136 /// $(B verbosity) - uint, level of verbosity(0 - nothing, 1 - headers, 2 - headers and body progress). default = 0.
137 /// $(B proxy) - string, set proxy url if needed. default - null.
138 /// $(B cookie) - Tuple Cookie, Read/Write cookie You can get cookie setted by server, or set cookies before doing request.
139 /// $(B timeout) - Duration, Set timeout value for connect/receive/send.
140 ///
141 public struct HTTPRequest {
142     private {
143         string         _method = "GET";
144         URI            _uri;
145         string[string] _headers;
146         string[]       _filteredHeaders;
147         Auth           _authenticator;
148         bool           _keepAlive = true;
149         uint           _maxRedirects = 10;
150         size_t         _maxHeadersLength = 32 * 1024;   // 32 KB
151         size_t         _maxContentLength;               // 0 - Unlimited
152         string         _proxy;
153         uint           _verbosity = 0;                  // 0 - no output, 1 - headers, 2 - headers+body info
154         Duration       _timeout = 30.seconds;
155         size_t         _bufferSize = 16*1024; // 16k
156         bool           _useStreaming;                   // return iterator instead of completed request
157 
158         HTTPResponse[] _history;                        // redirects history
159         DataPipe!ubyte _bodyDecoder;
160         DecodeChunked  _unChunker;
161         long           _contentLength;
162         long           _contentReceived;
163         SSLOptions     _sslOptions;
164         string         _bind;
165         _UH            _userHeaders;
166 
167         RefCounted!ConnManager      _cm;
168         RefCounted!Cookies          _cookie;
169         string[URI]                 _permanent_redirects;            // cache 301 redirects for GET requests
170         MultipartForm               _multipartForm;
171 
172         NetStreamFactory  _socketFactory;
173 
174         QueryParam[]        _params;
175         string              _contentType;
176         InputRangeAdapter   _postData;
177     }
178     package HTTPResponse   _response;
179 
180     mixin(Getter_Setter!string     ("method"));
181     mixin(Getter_Setter!bool       ("keepAlive"));
182     mixin(Getter_Setter!size_t     ("maxContentLength"));
183     mixin(Getter_Setter!size_t     ("maxHeadersLength"));
184     mixin(Getter_Setter!size_t     ("bufferSize"));
185     mixin(Getter_Setter!uint       ("maxRedirects"));
186     mixin(Getter_Setter!uint       ("verbosity"));
187     mixin(Getter                   ("proxy"));
188     mixin(Getter_Setter!Duration   ("timeout"));
189     mixin(Setter!Auth              ("authenticator"));
190     mixin(Getter_Setter!bool       ("useStreaming"));
191     mixin(Getter                   ("contentLength"));
192     mixin(Getter                   ("contentReceived"));
193     mixin(Getter_Setter!SSLOptions ("sslOptions"));
194     mixin(Getter_Setter!string     ("bind"));
195     mixin(Setter!NetStreamFactory  ("socketFactory"));
196 
197     @property void sslSetVerifyPeer(bool v) pure @safe nothrow @nogc {
198         _sslOptions.setVerifyPeer(v);
199     }
200     @property void sslSetKeyFile(string p, SSLOptions.filetype t = SSLOptions.filetype.pem) pure @safe nothrow @nogc {
201         _sslOptions.setKeyFile(p, t);
202     }
203     @property void sslSetCertFile(string p, SSLOptions.filetype t = SSLOptions.filetype.pem) pure @safe nothrow @nogc {
204         _sslOptions.setCertFile(p, t);
205     }
206     @property void sslSetCaCert(string path) pure @safe nothrow @nogc {
207         _sslOptions.setCaCert(path);
208     }
209     //@property final void cookie(Cookie[] s) pure @safe @nogc nothrow {
210     //    _cookie = s;
211     //}
212     @property final void proxy(string v) {
213         if ( v != _proxy ) {
214             _cm.clear();
215         }
216         _proxy = v;
217     }
218     //@property final Cookie[] cookie() pure @safe @nogc nothrow {
219     //    return _cookie;
220     //}
221 
222     this(string uri) {
223         _uri = URI(uri);
224         _cm = ConnManager(10);
225     }
226     ~this() {
227         _headers = null;
228         _authenticator = null;
229         _history = null;
230         _bodyDecoder = null;
231         _unChunker = null;
232         //if ( _cm ) {
233         //    _cm.clear();
234         //}
235     }
236     string toString() const {
237         return "HTTPRequest(%s, %s)".format(_method, _uri.uri());
238     }
239     string format(string fmt) const {
240         import std.array;
241         import std.stdio;
242         auto a = appender!string();
243         auto f = FormatSpec!char(fmt);
244         while (f.writeUpToNextSpec(a)) {
245             switch(f.spec) {
246                 case 'h':
247                     // Remote hostname.
248                     a.put(_uri.host);
249                     break;
250                 case 'm':
251                     // method.
252                     a.put(_method);
253                     break;
254                 case 'p':
255                     // Remote port.
256                     a.put("%d".format(_uri.port));
257                     break;
258                 case 'P':
259                     // Path
260                     a.put(_uri.path);
261                     break;
262                 case 'q':
263                     // query parameters supplied with url.
264                     a.put(_uri.query);
265                     break;
266                 case 'U':
267                     a.put(_uri.uri());
268                     break;
269                 default:
270                     throw new FormatException("Unknown Request format spec " ~ f.spec);
271             }
272         }
273         return a.data();
274     }
275     string select_proxy(string scheme) {
276         if ( _proxy is null && proxies.length == 0 ) {
277             debug(requests) tracef("proxy=null");
278             return null;
279         }
280         if ( _proxy ) {
281             debug(requests) tracef("proxy=%s", _proxy);
282             return _proxy;
283         }
284         auto p = scheme in proxies;
285         if ( p !is null && *p != "") {
286             debug(requests) tracef("proxy=%s", *p);
287             return *p;
288         }
289         p = "all" in proxies;
290         if ( p !is null && *p != "") {
291             debug(requests) tracef("proxy=%s", *p);
292             return *p;
293         }
294         debug(requests) tracef("proxy=null");
295         return null;
296     }
297     void clearHeaders() {
298         _headers = null;
299     }
300     @property void uri(in URI newURI) {
301         //handleURLChange(_uri, newURI);
302         _uri = newURI;
303     }
304     /// Add headers to request
305     /// Params:
306     /// headers = headers to send.
307     void addHeaders(in string[string] headers) {
308         foreach(pair; headers.byKeyValue) {
309             string _h = pair.key;
310             switch(toLower(_h)) {
311             case "host":
312                 _userHeaders.Host = true;
313                 break;
314             case "user-agent":
315                 _userHeaders.UserAgent = true;
316                 break;
317             case "content-length":
318                 _userHeaders.ContentLength = true;
319                 break;
320             case "content-type":
321                 _userHeaders.ContentType = true;
322                 break;
323             case "connection":
324                 _userHeaders.Connection = true;
325                 break;
326             case "cookie":
327                 _userHeaders.Cookie = true;
328                 break;
329             default:
330                 break;
331             }
332             _headers[pair.key] = pair.value;
333         }
334     }
335     private void safeSetHeader(ref string[string] headers, bool userAdded, string h, string v) pure @safe {
336         if ( !userAdded ) {
337             headers[h] = v;
338         }
339     }
340     /// Remove headers from request
341     /// Params:
342     /// headers = headers to remove.
343     void removeHeaders(in string[] headers) pure {
344         _filteredHeaders ~= headers;
345     }
346     ///
347     /// compose headers to send
348     ///
349     private string[string] requestHeaders() {
350 
351         string[string] generatedHeaders;
352 
353         if ( _authenticator ) {
354             _authenticator.
355                 authHeaders(_uri.host).
356                 byKeyValue.
357                 each!(pair => generatedHeaders[pair.key] = pair.value);
358         }
359 
360         _headers.byKey.each!(h => generatedHeaders[h] = _headers[h]);
361 
362         safeSetHeader(generatedHeaders, _userHeaders.AcceptEncoding, "Accept-Encoding", "gzip,deflate");
363         safeSetHeader(generatedHeaders, _userHeaders.UserAgent, "User-Agent", "dlang-requests");
364         safeSetHeader(generatedHeaders, _userHeaders.Connection, "Connection", _keepAlive?"Keep-Alive":"Close");
365 
366         if ( !_userHeaders.Host )
367         {
368             generatedHeaders["Host"] = _uri.host;
369             if ( _uri.scheme !in standard_ports || _uri.port != standard_ports[_uri.scheme] ) {
370                 generatedHeaders["Host"] ~= ":%d".format(_uri.port);
371             }
372         }
373         if ( _cookie._map.length && !_userHeaders.Cookie ) {
374             string[] cookie_strings;
375             foreach(pair; _cookie._map.byPair) {
376                 string cookie_name = pair.key;
377                 auto   cookie = pair.value;
378                 if ( _uri.path.pathMatches(cookie.path) && _uri.host.domainMatches(cookie.domain) ) {
379                     cookie_strings ~= "%s=%s".format(cookie_name, cookie.value);
380                 }
381             }
382             if ( cookie_strings.length > 0 ) {
383                 generatedHeaders["Cookie"] = cookie_strings.join("; ");
384             }
385         }
386 
387         _filteredHeaders.each!(h => generatedHeaders.remove(h));
388 
389         return generatedHeaders;
390     }
391     ///
392     /// Build request string.
393     /// Handle proxy and query parameters.
394     ///
395     private @property string requestString(QueryParam[] params = null) {
396         auto query = _uri.query.dup;
397         if ( params ) {
398             query ~= "&" ~ params2query(params);
399             if ( query[0] != '?' ) {
400                 query = "?" ~ query;
401             }
402         }
403         string actual_proxy = select_proxy(_uri.scheme);
404         if ( actual_proxy && _uri.scheme != "https" ) {
405             return "%s %s%s HTTP/1.1\r\n".format(_method, _uri.uri(No.params), query);
406         }
407         return "%s %s%s HTTP/1.1\r\n".format(_method, _uri.path, query);
408     }
409     ///
410     /// encode parameters and build query part of the url
411     ///
412     private static string params2query(in QueryParam[] params) pure @safe {
413         return params.
414                 map!(a => "%s=%s".format(a.key.urlEncoded, a.value.urlEncoded)).
415                 join("&");
416     }
417     //
418     package unittest {
419         assert(params2query(queryParams("a","b", "c", " d "))=="a=b&c=%20d%20");
420     }
421     ///
422     /// Analyze received headers, take appropriate actions:
423     /// check content length, attach unchunk and uncompress
424     ///
425     private void analyzeHeaders(in string[string] headers) {
426 
427         _contentLength = -1;
428         _unChunker = null;
429         auto contentLength = "content-length" in headers;
430         if ( contentLength ) {
431             try {
432                 string l = *contentLength;
433                 _contentLength = parse!long(l);
434                 // TODO: maybe add a strict mode that checks if l was parsed completely
435                 if ( _maxContentLength && _contentLength > _maxContentLength) {
436                     throw new RequestException("ContentLength > maxContentLength (%d>%d)".
437                                 format(_contentLength, _maxContentLength));
438                 }
439             } catch (ConvException e) {
440                 throw new RequestException("Can't convert Content-Length from %s".format(*contentLength));
441             }
442         }
443         auto transferEncoding = "transfer-encoding" in headers;
444         if ( transferEncoding ) {
445             debug(requests) tracef("transferEncoding: %s", *transferEncoding);
446             if ( (*transferEncoding).toLower == "chunked") {
447                 _unChunker = new DecodeChunked();
448                 _bodyDecoder.insert(_unChunker);
449             }
450         }
451         auto contentEncoding = "content-encoding" in headers;
452         if ( contentEncoding ) switch (*contentEncoding) {
453             default:
454                 throw new RequestException("Unknown content-encoding " ~ *contentEncoding);
455             case "gzip":
456             case "deflate":
457                 _bodyDecoder.insert(new Decompressor!ubyte);
458         }
459 
460     }
461     ///
462     /// Called when we know that all headers already received in buffer.
463     /// This routine does not interpret headers content (see analyzeHeaders).
464     /// 1. Split headers on lines
465     /// 2. store status line, store response code
466     /// 3. unfold headers if needed
467     /// 4. store headers
468     ///
469     private void parseResponseHeaders(in ubyte[] input, string lineSep) {
470         string lastHeader;
471         auto buffer = cast(string)input;
472 
473         foreach(line; buffer.split(lineSep).map!(l => l.stripRight)) {
474             if ( ! _response.status_line.length ) {
475                 debug (requests) tracef("statusLine: %s", line);
476                 _response.status_line = line;
477                 if ( _verbosity >= 1 ) {
478                     writefln("< %s", line);
479                 }
480                 auto parsed = line.split(" ");
481                 if ( parsed.length >= 2 ) {
482                     _response.code = parsed[1].to!ushort;
483                     _response._version = _response.parse_version(parsed[0]);
484                 }
485                 continue;
486             }
487             if ( line[0] == ' ' || line[0] == '\t' ) {
488                 // unfolding https://tools.ietf.org/html/rfc822#section-3.1
489                 if ( auto stored = lastHeader in _response._responseHeaders) {
490                     *stored ~= line;
491                 }
492                 continue;
493             }
494             auto parsed = line.findSplit(":");
495             auto header = parsed[0].toLower;
496             auto value = parsed[2].strip;
497 
498             if ( _verbosity >= 1 ) {
499                 writefln("< %s: %s", header, value);
500             }
501 
502             lastHeader = header;
503             debug (requests) tracef("Header %s = %s", header, value);
504 
505             if ( header != "set-cookie" ) {
506                 auto stored = _response.responseHeaders.get(header, null);
507                 if ( stored ) {
508                     value = stored ~ "," ~ value;
509                 }
510                 _response._responseHeaders[header] = value;
511                 continue;
512             }
513             auto cookie = processCookie(value);
514             debug(requests) tracef("store cookie[%s]=%s", cookie.name, cookie.data);
515             _cookie._map[cookie.name] = cookie.data;
516         }
517     }
518 
519     ///
520     /// Process Set-Cookie header from server response
521     ///
522     private auto processCookie(string value ) pure {
523         // cookie processing
524         //
525         // Separate processing as we can't join several set-cookie lines in single line (as other headers)
526         // < Set-Cookie: idx=7f2800f63c112a65ef5082957bcca24b; expires=Mon, 29-May-2017 00:31:25 GMT; path=/; domain=example.com
527         // < Set-Cookie: idx=7f2800f63c112a65ef5082957bcca24b; expires=Mon, 29-May-2017 00:31:25 GMT; path=/; domain=example.com, cs=ip764-RgKqc-HvSkxRxdQQAKW8LA; path=/; domain=.example.com; HttpOnly
528         //
529         string name;
530         Cookie cookie;
531         auto fields = value.split(";").map!strip;
532         auto c = fields.front.findSplit("=");
533         name = c[0];
534         cookie.value = c[2];
535         fields.popFront;
536         while(!fields.empty) {
537             auto s = fields.front.findSplit("=");
538             fields.popFront;
539             if ( s[1] != "=" ) {
540                 continue;
541             }
542             auto k = s[0];
543             auto v = s[2];
544             switch(k.toLower()) {
545                 case "domain":
546                     cookie.domain = v;
547                     break;
548                 case "path":
549                     cookie.path = v;
550                     break;
551                 case "expires":
552                     continue;
553                 case "max-age":
554                     continue;
555                 default:
556                     continue;
557             }
558         }
559         if ( cookie.domain == "" ) {
560             cookie.domain = _uri.host;
561         }
562         if ( cookie.path == "" ) {
563             cookie.path = _uri.path;
564         }
565         return Tuple!(string, "name", Cookie, "data")(name, cookie);
566     }
567 
568     private bool willFollowRedirect() {
569         if ( !canFind(redirectCodes, _response.code) ) {
570             return false;
571         }
572         if ( !_maxRedirects ) {
573             return false;
574         }
575         if ( "location" !in _response.responseHeaders ) {
576             return false;
577         }
578         return true;
579     }
580     private URI uriFromLocation(const ref URI uri, in string location) {
581         URI newURI = uri;
582         try {
583             newURI = URI(location);
584         } catch (UriException e) {
585             debug(requests) trace("Can't parse Location:, try relative uri");
586             newURI.path = location;
587             newURI.uri = newURI.recalc_uri;
588         }
589         return newURI;
590     }
591     ///
592     /// if we have new uri, then we need to check if we have to reopen existent connection
593     ///
594     private void checkURL(string url, string file=__FILE__, size_t line=__LINE__) {
595         if (url is null && _uri.uri == "" ) {
596             throw new RequestException("No url configured", file, line);
597         }
598 
599         if ( url !is null ) {
600             URI newURI = URI(url);
601             //handleURLChange(_uri, newURI);
602             _uri = newURI;
603         }
604     }
605     ///
606     /// Setup connection. Handle proxy and https case
607     ///
608     /// Place new connection in ConnManager cache
609     ///
610     private NetworkStream setupConnection()
611     do {
612 
613         debug(requests) tracef("Set up new connection");
614         NetworkStream stream;
615 
616         if ( _socketFactory )
617         {
618             debug(requests) tracef("use socketFactory");
619             stream = _socketFactory(_uri.scheme, _uri.host, _uri.port);
620         }
621 
622         if ( stream ) // socket factory created connection
623         {
624             return stream;
625         }
626 
627         URI   uri; // this URI will be used temporarry if we need proxy
628         string actual_proxy = select_proxy(_uri.scheme);
629         final switch (_uri.scheme) {
630             case"http":
631                 if ( actual_proxy ) {
632                     uri.uri_parse(actual_proxy);
633                     uri.idn_encode();
634                 } else {
635                     // use original uri
636                     uri = _uri;
637                 }
638                 stream = new TCPStream();
639                 stream.bind(_bind);
640                 stream.connect(uri.host, uri.port, _timeout);
641                 break ;
642             case"https":
643                 if ( actual_proxy ) {
644                     uri.uri_parse(actual_proxy);
645                     uri.idn_encode();
646                     stream = new TCPStream();
647                     stream.bind(_bind);
648                     stream.connect(uri.host, uri.port, _timeout);
649                     if ( verbosity>=1 ) {
650                         writeln("> CONNECT %s:%d HTTP/1.1".format(_uri.host, _uri.port));
651                     }
652                     stream.send("CONNECT %s:%d HTTP/1.1\r\n\r\n".format(_uri.host, _uri.port));
653                     while ( stream.isConnected ) {
654                         ubyte[1024] b;
655                         auto read = stream.receive(b);
656                         if ( verbosity>=1) {
657                             writefln("< %s", cast(string)b[0..read]);
658                         }
659                         debug(requests) tracef("read: %d", read);
660                         if ( b[0..read].canFind("\r\n\r\n") || b[0..read].canFind("\n\n") ) {
661                             debug(requests) tracef("proxy connection ready");
662                             // convert connection to ssl
663                             stream = new SSLStream(stream, _sslOptions, _uri.host);
664                             break ;
665                         } else {
666                             debug(requests) tracef("still wait for proxy connection");
667                         }
668                     }
669                 } else {
670                     uri = _uri;
671                     stream = new SSLStream(_sslOptions);
672                     stream.bind(_bind);
673                     stream.connect(uri.host, uri.port, _timeout);
674                     debug(requests) tracef("ssl connection to origin server ready");
675                 }
676                 break ;
677         }
678 
679         return stream;
680     }
681     ///
682     /// Request sent, now receive response.
683     /// Find headers, split on headers and body, continue to receive body
684     ///
685     private void receiveResponse(NetworkStream _stream) {
686 
687         try {
688             _stream.readTimeout = timeout;
689         } catch (Exception e) {
690             debug(requests) tracef("Failed to set read timeout for stream: %s", e.msg);
691             return;
692         }
693         // Commented this out as at exit we can have alreade closed socket
694         // scope(exit) {
695         //     if ( _stream && _stream.isOpen ) {
696         //         _stream.readTimeout = 0.seconds;
697         //     }
698         // }
699 
700         _bodyDecoder = new DataPipe!ubyte();
701         scope(exit) {
702             if ( !_useStreaming ) {
703                 _bodyDecoder = null;
704                 _unChunker = null;
705             }
706         }
707 
708         auto buffer = Buffer!ubyte();
709         Buffer!ubyte partialBody;
710         ptrdiff_t read;
711         string lineSep = null, headersEnd = null;
712         bool headersHaveBeenReceived;
713 
714         while( !headersHaveBeenReceived ) {
715 
716             auto b = new ubyte[_bufferSize];
717             read = _stream.receive(b);
718 
719             debug(requests) tracef("read: %d", read);
720             if ( read == 0 ) {
721                 break;
722             }
723             auto data = b[0..read];
724             buffer.putNoCopy(data);
725             if ( verbosity>=3 ) {
726                 writeln(data.dump.join("\n"));
727             }
728 
729             if ( buffer.length > maxHeadersLength ) {
730                 throw new RequestException("Headers length > maxHeadersLength (%d > %d)".format(buffer.length, maxHeadersLength));
731             }
732 
733             // Proper HTTP uses "\r\n" as a line separator, but broken servers sometimes use "\n".
734             // Servers that use "\r\n" might have "\n" inside a header.
735             // For any half-sane server, the first '\n' should be at the end of the status line, so this can be used to detect the line separator.
736             // In any case, all the interesting points in the header for now are at '\n' characters, so scan the newly read data for them.
737             foreach (idx; buffer.length-read..buffer.length)
738             {
739                 if ( buffer[idx] == '\n' )
740                 {
741                     if ( lineSep is null )
742                     {
743                         // First '\n'. Detect line/header endings.
744                         // HTTP header sections end with a double line separator
745                         lineSep = "\n";
746                         headersEnd = "\n\n";
747                         if ( idx > 0 && buffer[idx-1] == '\r' )
748                         {
749                             lineSep = "\r\n";
750                             headersEnd = "\r\n\r\n";
751                         }
752                     }
753                     else
754                     {
755                         // Potential header ending.
756                         if ( buffer.data[0..idx+1].endsWith(headersEnd) )
757                         {
758                             auto ResponseHeaders = buffer.data[0..idx+1-headersEnd.length];
759                             partialBody = buffer[idx+1..$];
760                             _contentReceived += partialBody.length;
761                             parseResponseHeaders(ResponseHeaders, lineSep);
762                             headersHaveBeenReceived = true;
763                             break;
764                         }
765                     }
766                 }
767             }
768         }
769 
770         analyzeHeaders(_response._responseHeaders);
771 
772         _bodyDecoder.putNoCopy(partialBody.data);
773 
774         auto v = _bodyDecoder.get();
775         _response._responseBody.putNoCopy(v);
776 
777         // https://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.4
778         if ( (_method == "HEAD") || responseMustNotIncludeBody(_response.code) || (_contentLength < 0 && _unChunker is null) )
779         {
780             debug(requests) tracef("response without body");
781             return;
782         }
783 
784         _response._contentLength = _contentLength;
785         _response._contentReceived = _contentReceived;
786 
787         if ( _verbosity >= 2 ) writefln("< %d bytes of body received", partialBody.length);
788 
789         while( true ) {
790             if ( _contentLength >= 0 && _contentReceived >= _contentLength ) {
791                 debug(requests) trace("Body received.");
792                 break;
793             }
794             if ( _unChunker && _unChunker.done ) {
795                 break;
796             }
797 
798             if ( _useStreaming && _response._responseBody.length && !redirectCodes.canFind(_response.code) ) {
799                 debug(requests) trace("streaming requested");
800                 // save _stream in closure
801                 auto __stream = _stream;
802                 auto __bodyDecoder = _bodyDecoder;
803                 auto __unChunker = _unChunker;
804                 auto __contentReceived = _contentReceived;
805                 auto __contentLength = _contentLength;
806                 auto __bufferSize = _bufferSize;
807                 auto __response = _response;
808                 auto __verbosity = _verbosity;
809                 auto __uri = _uri;
810                 auto __cm = _cm;
811 
812                 // set up response
813                 _response._contentLength = _contentLength;
814                 _response.receiveAsRange.activated = true;
815                 _response.receiveAsRange.data = _response._responseBody.data;
816                 _response.receiveAsRange.read = delegate ubyte[] () {
817 
818                     while(true) {
819                         // check if we received everything we need
820                         if ( ( __unChunker && __unChunker.done )
821                             || !__stream.isConnected()
822                             || (__contentLength > 0 && __contentReceived >= __contentLength) )
823                         {
824                             debug(requests) trace("streaming_in receive completed");
825                             __bodyDecoder.flush();
826                             if (_stream && _stream.isConnected) {
827                                 // return to pool
828                                 __cm.put(__uri.scheme, __uri.host, __uri.port, __stream);
829                             } else {
830                                 _stream.close();
831                             }
832                             return __bodyDecoder.get();
833                         }
834                         // have to continue
835                         auto b = new ubyte[__bufferSize];
836                         try {
837                             read = __stream.receive(b);
838                         }
839                         catch (Exception e) {
840                             throw new RequestException("streaming_in error reading from socket", __FILE__, __LINE__, e);
841                         }
842                         debug(requests) tracef("streaming_in received %d bytes", read);
843 
844                         if ( read == 0 ) {
845                             debug(requests) tracef("streaming_in: server closed connection");
846                             _stream.close();
847                             __bodyDecoder.flush();
848                             return __bodyDecoder.get();
849                         }
850 
851                         if ( __verbosity>=3 ) {
852                             writeln(b[0..read].dump.join("\n"));
853                         }
854                         __response._contentReceived += read;
855                         __contentReceived += read;
856                         __bodyDecoder.putNoCopy(b[0..read]);
857                         auto res = __bodyDecoder.getNoCopy();
858                         if ( res.length == 0 ) {
859                             // there were nothing to produce (beginning of the chunk or no decompressed data)
860                             continue;
861                         }
862                         if (res.length == 1) {
863                             return res[0];
864                         }
865                         //
866                         // I'd like to "return _bodyDecoder.getNoCopy().join;" but it is slower
867                         //
868                         auto total = res.map!(b=>b.length).sum;
869                         // create buffer for joined bytes
870                         ubyte[] joined = new ubyte[total];
871                         size_t p;
872                         // memcopy
873                         foreach(ref _; res) {
874                             joined[p .. p + _.length] = _;
875                             p += _.length;
876                         }
877                         return joined;
878                     }
879                     assert(0);
880                 };
881                 // we prepared for streaming
882                 return;
883             }
884 
885             auto b = new ubyte[_bufferSize];
886             read = _stream.receive(b);
887 
888             if ( read == 0 ) {
889                 debug(requests) trace("read done");
890                 break;
891             }
892             if ( _verbosity >= 2 ) {
893                 writefln("< %d bytes of body received", read);
894             }
895 
896             if ( verbosity>=3 ) {
897                 writeln(b[0..read].dump.join("\n"));
898             }
899 
900             debug(requests) tracef("read: %d", read);
901             _contentReceived += read;
902             if ( _maxContentLength && _contentReceived > _maxContentLength ) {
903                 throw new RequestException("ContentLength > maxContentLength (%d>%d)".
904                     format(_contentLength, _maxContentLength));
905             }
906 
907             _bodyDecoder.putNoCopy(b[0..read]); // send buffer to all decoders
908 
909             _bodyDecoder.getNoCopy.             // fetch result and place to body
910                 each!(b => _response._responseBody.putNoCopy(b));
911 
912             debug(requests) tracef("receivedTotal: %d, contentLength: %d, bodyLength: %d", _contentReceived, _contentLength, _response._responseBody.length);
913 
914         }
915         _bodyDecoder.flush();
916         _response._responseBody.putNoCopy(_bodyDecoder.get());
917         _response._contentReceived = _contentReceived;
918     }
919     ///
920     /// Check that we received anything.
921     /// Server can close previous connection (keepalive or not)
922     ///
923     private bool serverPrematurelyClosedConnection() pure @safe {
924         immutable server_closed_connection = _response._responseHeaders.length == 0 && _response._status_line.length == 0;
925         // debug(requests) tracef("server closed connection = %s (headers.length=%s, status_line.length=%s)",
926         //     server_closed_connection, _response._responseHeaders.length,  _response._status_line.length);
927         return server_closed_connection;
928     }
929     private bool isIdempotent(in string method) pure @safe nothrow {
930         return ["GET", "HEAD"].canFind(method);
931     }
932     ///
933     /// If we do not want keepalive request,
934     /// or server signalled to close connection,
935     /// then close it
936     ///
937     void close_connection_if_not_keepalive(NetworkStream _stream) {
938         auto connection = "connection" in _response._responseHeaders;
939         if ( !_keepAlive ) {
940             _stream.close();
941         } else switch(_response._version) {
942             case HTTP11:
943                 // HTTP/1.1 defines the "close" connection option for the sender to signal that the connection
944                 // will be closed after completion of the response. For example,
945                 //        Connection: close
946                 // in either the request or the response header fields indicates that the connection
947                 // SHOULD NOT be considered `persistent' (section 8.1) after the current request/response is complete.
948                 // HTTP/1.1 applications that do not support persistent connections MUST include the "close" connection
949                 // option in every message.
950                 if ( connection && (*connection).toLower.split(",").canFind("close") ) {
951                     _stream.close();
952                 }
953                 break;
954             default:
955                 // for anything else close connection if there is no keep-alive in Connection
956                 if ( connection && !(*connection).toLower.split(",").canFind("keep-alive") ) {
957                     _stream.close();
958                 }
959                 break;
960         }
961     }
962 
963     private void sendFlattenContent(T)(NetworkStream _stream, T content) {
964         while ( !content.empty ) {
965             auto chunk = content.front;
966             _stream.send(chunk);
967             content.popFront;
968         }
969         debug(requests) tracef("sent");
970     }
971     // we use this if we send from ubyte[][] as chunked content
972     private void sendChunkedContent(T)(NetworkStream _stream, T content) {
973         while ( !content.empty ) {
974             auto chunk = content.front;
975             auto chunkHeader = "%x\r\n".format(chunk.length);
976             debug(requests) tracef("sending %s%s", chunkHeader, chunk);
977             _stream.send(chunkHeader);
978             _stream.send(chunk);
979             _stream.send("\r\n");
980             content.popFront;
981         }
982         debug(requests) tracef("sent");
983         _stream.send("0\r\n\r\n");
984     }
985     ///
986     /// POST/PUT/... data from some string(with Content-Length), or from range of strings/bytes (use Transfer-Encoding: chunked).
987     /// When rank 1 (flat array) used as content it must have length. In that case "content" will be sent directly to network, and Content-Length headers will be added.
988     /// If you are goung to send some range and do not know length at the moment when you start to send request, then you can send chunks of chars or ubyte.
989     /// Try not to send too short chunks as this will put additional load on client and server. Chunks of length 2048 or 4096 are ok.
990     ///
991     /// Parameters:
992     ///    url = url
993     ///    content = string or input range
994     ///    contentType = content type
995     ///  Returns:
996     ///     Response
997     ///  Examples:
998     ///  ---------------------------------------------------------------------------------------------------------
999     ///      rs = rq.exec!"POST"("http://httpbin.org/post", "привiт, свiт!", "application/octet-stream");
1000     ///
1001     ///      auto s = lineSplitter("one,\ntwo,\nthree.");
1002     ///      rs = rq.exec!"POST"("http://httpbin.org/post", s, "application/octet-stream");
1003     ///
1004     ///      auto s = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
1005     ///      rs = rq.exec!"POST"("http://httpbin.org/post", s.representation.chunks(10), "application/octet-stream");
1006     ///
1007     ///      auto f = File("tests/test.txt", "rb");
1008     ///      rs = rq.exec!"POST"("http://httpbin.org/post", f.byChunk(3), "application/octet-stream");
1009     ///  --------------------------------------------------------------------------------------------------------
1010     /// Send request with parameters.
1011     /// If used for POST or PUT requests then application/x-www-form-urlencoded used.
1012     /// Request parameters will be encoded into request string or placed in request body for POST/PUT
1013     /// requests.
1014     /// Parameters:
1015     ///     url = url
1016     ///     params = request parameters
1017     ///  Returns:
1018     ///     Response
1019     ///  Examples:
1020     ///  ---------------------------------------------------------------------------------
1021     ///     rs = Request().exec!"GET"("http://httpbin.org/get", ["c":"d", "a":"b"]);
1022     ///  ---------------------------------------------------------------------------------
1023     ///
1024 
1025     /// WRAPPERS
1026     ///
1027     /// send file(s) using POST and multipart form.
1028     /// This wrapper will be deprecated, use post with MultipartForm - it is more general and clear.
1029     /// Parameters:
1030     ///     url = url
1031     ///     files = array of PostFile structures
1032     /// Returns:
1033     ///     Response
1034     /// Each PostFile structure contain path to file, and optional field name and content type.
1035     /// If no field name provided, then basename of the file will be used.
1036     /// application/octet-stream is default when no content type provided.
1037     /// Example:
1038     /// ---------------------------------------------------------------
1039     ///    PostFile[] files = [
1040     ///                   {fileName:"tests/abc.txt", fieldName:"abc", contentType:"application/octet-stream"},
1041     ///                   {fileName:"tests/test.txt"}
1042     ///               ];
1043     ///    rs = rq.exec!"POST"("http://httpbin.org/post", files);
1044     /// ---------------------------------------------------------------
1045     ///
1046 
1047     import requests.request;
1048 
1049     // we use this if we send from ubyte[][] and user provided Content-Length
1050     private void sendFlattenContent(NetworkStream _stream) {
1051         while ( !_postData.empty ) {
1052             auto chunk = _postData.front;
1053             _stream.send(chunk);
1054             _postData.popFront;
1055         }
1056         debug(requests) tracef("sent");
1057     }
1058     // we use this if we send from ubyte[][] as chunked content
1059     private void sendChunkedContent(NetworkStream _stream) {
1060         while ( !_postData.empty ) {
1061             auto chunk = _postData.front;
1062             auto chunkHeader = "%x\r\n".format(chunk.length);
1063             debug(requests) tracef("sending %s%s", chunkHeader, cast(string)chunk);
1064             _stream.send(chunkHeader);
1065             _stream.send(chunk);
1066             _stream.send("\r\n");
1067             debug(requests) tracef("chunk sent");
1068             _postData.popFront;
1069         }
1070         debug(requests) tracef("sent");
1071         _stream.send("0\r\n\r\n");
1072     }
1073 
1074     HTTPResponse exec_from_range(InputRangeAdapter postData)
1075     do {
1076 
1077         _postData = postData;
1078 
1079         debug(requests) tracef("exec from range");
1080 
1081         NetworkStream _stream;
1082         _response = new HTTPResponse;
1083         _history.length = 0;
1084         _response.uri = _uri;
1085         _response.finalURI = _uri;
1086         bool restartedRequest = false;
1087         bool send_flat;
1088 
1089     connect:
1090         _contentReceived = 0;
1091         _response._startedAt = Clock.currTime;
1092 
1093         assert(_stream is null);
1094 
1095         _stream = _cm.get(_uri.scheme, _uri.host, _uri.port);
1096 
1097         if ( _stream is null ) {
1098             debug(requests) trace("create new connection");
1099             _stream = setupConnection();
1100         } else {
1101             debug(requests) trace("reuse old connection");
1102         }
1103 
1104         assert(_stream !is null);
1105 
1106         if ( !_stream.isConnected ) {
1107             debug(requests) trace("disconnected stream on enter");
1108             if ( !restartedRequest ) {
1109                 debug(requests) trace("disconnected stream on enter: retry");
1110                 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream);
1111 
1112                 // _cm.del(_uri.scheme, _uri.host, _uri.port, _stream);
1113                 _stream.close();
1114                 _stream = null;
1115 
1116                 restartedRequest = true;
1117                 goto connect;
1118             }
1119             debug(requests) trace("disconnected stream on enter: return response");
1120             //_stream = null;
1121             return _response;
1122         }
1123         _response._connectedAt = Clock.currTime;
1124 
1125         Appender!string req;
1126         req.put(requestString());
1127 
1128         auto h = requestHeaders;
1129         if ( _contentType ) {
1130             safeSetHeader(h, _userHeaders.ContentType, "Content-Type", _contentType);
1131         }
1132 
1133         if ( _postData.length >= 0 )
1134         {
1135             // we know t
1136             safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(_postData.length));
1137         }
1138 
1139         if ( _userHeaders.ContentLength || "Content-Length" in h )
1140         {
1141             debug(requests) tracef("User provided content-length for chunked content");
1142             send_flat = true;
1143         }
1144         else
1145         {
1146             h["Transfer-Encoding"] = "chunked";
1147             send_flat = false;
1148         }
1149         h.byKeyValue.
1150             map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n").
1151             each!(h => req.put(h));
1152         req.put("\r\n");
1153 
1154         debug(requests) tracef("send <%s>", req.data);
1155         if ( _verbosity >= 1 ) {
1156             req.data.splitLines.each!(a => writeln("> " ~ a));
1157         }
1158 
1159         try {
1160             // send headers
1161             _stream.send(req.data());
1162             // send body
1163             if ( send_flat ) {
1164                 sendFlattenContent(_stream);
1165             } else {
1166                 sendChunkedContent(_stream);
1167             }
1168             _response._requestSentAt = Clock.currTime;
1169             debug(requests) trace("starting receive response");
1170             receiveResponse(_stream);
1171             debug(requests) trace("finished receive response");
1172             _response._finishedAt = Clock.currTime;
1173         }
1174         catch (NetworkException e)
1175         {
1176             _stream.close();
1177             throw new RequestException("Network error during data exchange");
1178         }
1179         if ( serverPrematurelyClosedConnection()
1180         && !restartedRequest
1181         && isIdempotent(_method)
1182         ) {
1183             ///
1184             /// We didn't receive any data (keepalive connectioin closed?)
1185             /// and we can restart this request.
1186             /// Go ahead.
1187             ///
1188             debug(requests) tracef("Server closed keepalive connection");
1189 
1190             //assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream);
1191 
1192             //_cm.del(_uri.scheme, _uri.host, _uri.port, _stream);
1193             _stream.close();
1194             _stream = null;
1195 
1196             restartedRequest = true;
1197             goto connect;
1198         }
1199 
1200         if ( _useStreaming ) {
1201             if ( _response._receiveAsRange.activated ) {
1202                 debug(requests) trace("streaming_in activated");
1203                 return _response;
1204             } else {
1205                 // this can happen if whole response body received together with headers
1206                 _response._receiveAsRange.data = _response.responseBody.data;
1207             }
1208         }
1209 
1210         close_connection_if_not_keepalive(_stream);
1211 
1212         if ( _verbosity >= 1 ) {
1213             writeln(">> Connect time: ", _response._connectedAt - _response._startedAt);
1214             writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt);
1215             writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt);
1216         }
1217 
1218 
1219         if ( willFollowRedirect ) {
1220             if ( _history.length >= _maxRedirects ) {
1221                 _stream = null;
1222                 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects));
1223             }
1224             // "location" in response already checked in canFollowRedirect
1225             immutable new_location = *("location" in _response.responseHeaders);
1226             immutable current_uri = _uri, next_uri = uriFromLocation(_uri, new_location);
1227 
1228             immutable get_or_head = _method == "GET" || _method == "HEAD";
1229             immutable code = _response.code;
1230 
1231             // save current response for history
1232             _history ~= _response;
1233 
1234             if ( code == 301 )
1235             {
1236                 // permanent redirect and change method
1237                 _permanent_redirects[_uri] = new_location;
1238                 if ( !get_or_head )
1239                 {
1240                     _method = "GET";
1241                 }
1242             }
1243             if ( (code == 302 || code == 303) && !get_or_head)
1244             {
1245                 // only change method
1246                 _method = "GET";
1247             }
1248             if ( code == 307 )
1249             {
1250                 // no change method, no permanent
1251             }
1252             if ( code == 308 )
1253             {
1254                 // permanent redirection and do not change method
1255                 _permanent_redirects[_uri] = new_location;
1256             }
1257 
1258             // prepare new response (for redirected request)
1259             _response = new HTTPResponse;
1260             _response.uri = current_uri;
1261             _response.finalURI = next_uri;
1262 
1263             _stream = null;
1264 
1265             // set new uri
1266             this._uri = next_uri;
1267             debug(requests) tracef("Redirected to %s", next_uri);
1268             if ( restartedRequest ) {
1269                 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect");
1270                 restartedRequest = false;
1271             }
1272             if ( _method == "GET")
1273             {
1274                 return exec_from_parameters();
1275             }
1276             goto connect;
1277         }
1278 
1279         _response._history = _history;
1280         if (_stream && _stream.isConnected) {
1281             // return to pool
1282             _cm.put(_uri.scheme, _uri.host, _uri.port, _stream);
1283         }
1284         return _response;
1285     }
1286 
1287     ///
1288     /// Send multipart request.
1289     /// You would like to use this method for sending large portions of mixed data or uploading files to forms.
1290     /// Content of the posted form consist of sources. Each source have at least name and value (can be string-like object or opened file, see more docs for MultipartForm struct)
1291     /// Params:
1292     ///     url = url
1293     ///     sources = array of sources.
1294     // we use this if we send from ubyte[][] and user provided Content-Length
1295     HTTPResponse exec_from_multipart_form(MultipartForm form) {
1296         import std.uuid;
1297         import std.file;
1298 
1299         _multipartForm = form;
1300 
1301         debug(requests) tracef("exec from multipart form");
1302 
1303         NetworkStream _stream;
1304         _response = new HTTPResponse;
1305         _response.uri = _uri;
1306         _response.finalURI = _uri;
1307         bool restartedRequest = false;
1308 
1309     connect:
1310         _contentReceived = 0;
1311         _response._startedAt = Clock.currTime;
1312 
1313         assert(_stream is null);
1314 
1315         _stream = _cm.get(_uri.scheme, _uri.host, _uri.port);
1316 
1317         if ( _stream is null ) {
1318             debug(requests) trace("create new connection");
1319             _stream = setupConnection();
1320         } else {
1321             debug(requests) trace("reuse old connection");
1322         }
1323 
1324         assert(_stream !is null);
1325 
1326         if ( !_stream.isConnected ) {
1327             debug(requests) trace("disconnected stream on enter");
1328             if ( !restartedRequest ) {
1329                 debug(requests) trace("disconnected stream on enter: retry");
1330                 //assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream);
1331 
1332                 //_cm.del(_uri.scheme, _uri.host, _uri.port, _stream);
1333                 _stream.close();
1334                 _stream = null;
1335 
1336                 restartedRequest = true;
1337                 goto connect;
1338             }
1339             debug(requests) trace("disconnected stream on enter: return response");
1340             //_stream = null;
1341             return _response;
1342         }
1343         _response._connectedAt = Clock.currTime;
1344 
1345         Appender!string req;
1346         req.put(requestString());
1347 
1348         string   boundary = randomUUID().toString;
1349         string[] partHeaders;
1350         size_t   contentLength;
1351 
1352         foreach(ref part; _multipartForm._sources) {
1353             string h = "--" ~ boundary ~ "\r\n";
1354             string disposition = `form-data; name="%s"`.format(part.name);
1355             string optionals = part.
1356             parameters.byKeyValue().
1357             filter!(p => p.key!="Content-Type").
1358             map!   (p => "%s=%s".format(p.key, p.value)).
1359             join("; ");
1360 
1361             h ~= `Content-Disposition: ` ~ [disposition, optionals].join("; ") ~ "\r\n";
1362 
1363             auto contentType = "Content-Type" in part.parameters;
1364             if ( contentType ) {
1365                 h ~= "Content-Type: " ~ *contentType ~ "\r\n";
1366             }
1367 
1368             h ~= "\r\n";
1369             partHeaders ~= h;
1370             contentLength += h.length + part.input.getSize() + "\r\n".length;
1371         }
1372         contentLength += "--".length + boundary.length + "--\r\n".length;
1373 
1374         auto h = requestHeaders();
1375         safeSetHeader(h, _userHeaders.ContentType, "Content-Type", "multipart/form-data; boundary=" ~ boundary);
1376         safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(contentLength));
1377 
1378         h.byKeyValue.
1379         map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n").
1380         each!(h => req.put(h));
1381         req.put("\r\n");
1382 
1383         debug(requests) trace(req.data);
1384         if ( _verbosity >= 1 ) req.data.splitLines.each!(a => writeln("> " ~ a));
1385 
1386         try {
1387             _stream.send(req.data());
1388             foreach(ref source; _multipartForm._sources) {
1389                 debug(requests) tracef("sending part headers <%s>", partHeaders.front);
1390                 _stream.send(partHeaders.front);
1391                 partHeaders.popFront;
1392                 while (true) {
1393                     auto chunk = source.input.read();
1394                     if ( chunk.length <= 0 ) {
1395                         break;
1396                     }
1397                     _stream.send(chunk);
1398                 }
1399                 _stream.send("\r\n");
1400             }
1401             _stream.send("--" ~ boundary ~ "--\r\n");
1402             _response._requestSentAt = Clock.currTime;
1403             receiveResponse(_stream);
1404             _response._finishedAt = Clock.currTime;
1405         }
1406         catch (NetworkException e) {
1407             errorf("Error sending request: ", e.msg);
1408             _stream.close();
1409             return _response;
1410         }
1411 
1412         if ( serverPrematurelyClosedConnection()
1413         && !restartedRequest
1414         && isIdempotent(_method)
1415         ) {
1416             ///
1417             /// We didn't receive any data (keepalive connectioin closed?)
1418             /// and we can restart this request.
1419             /// Go ahead.
1420             ///
1421             debug(requests) tracef("Server closed keepalive connection");
1422 
1423             //assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream);
1424 
1425             //_cm.del(_uri.scheme, _uri.host, _uri.port, _stream);
1426             _stream.close();
1427             _stream = null;
1428 
1429             restartedRequest = true;
1430             goto connect;
1431         }
1432 
1433         if ( _useStreaming ) {
1434             if ( _response._receiveAsRange.activated ) {
1435                 debug(requests) trace("streaming_in activated");
1436                 return _response;
1437             } else {
1438                 // this can happen if whole response body received together with headers
1439                 _response._receiveAsRange.data = _response.responseBody.data;
1440             }
1441         }
1442 
1443         close_connection_if_not_keepalive(_stream);
1444 
1445         if ( _verbosity >= 1 ) {
1446             writeln(">> Connect time: ", _response._connectedAt - _response._startedAt);
1447             writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt);
1448             writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt);
1449         }
1450 
1451         if ( willFollowRedirect ) {
1452             if ( _history.length >= _maxRedirects ) {
1453                 _stream = null;
1454                 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects));
1455             }
1456             // "location" in response already checked in canFollowRedirect
1457             immutable new_location = *("location" in _response.responseHeaders);
1458             immutable current_uri = _uri;
1459             immutable next_uri = uriFromLocation(_uri, new_location);
1460 
1461             immutable get_or_head = _method == "GET" || _method == "HEAD";
1462             immutable code = _response.code;
1463 
1464             // save current response for history
1465             _history ~= _response;
1466 
1467             if ( code == 301 )
1468             {
1469                 // permanent redirect and change method
1470                 _permanent_redirects[_uri] = new_location;
1471                 if ( !get_or_head )
1472                 {
1473                     _method = "GET";
1474                 }
1475             }
1476             if ( (code == 302 || code == 303) && !get_or_head)
1477             {
1478                 // only change method
1479                 _method = "GET";
1480             }
1481             if ( code == 307 )
1482             {
1483                 // no change method, no permanent
1484             }
1485             if ( code == 308 )
1486             {
1487                 // permanent redirection and do not change method
1488                 _permanent_redirects[_uri] = new_location;
1489             }
1490 
1491             // prepare new response (for redirected request)
1492             _response = new HTTPResponse;
1493             _response.uri = current_uri;
1494             _response.finalURI = next_uri;
1495             _stream = null;
1496 
1497             // set new uri
1498             this._uri = next_uri;
1499             debug(requests) tracef("Redirected to %s", next_uri);
1500             if ( restartedRequest ) {
1501                 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect");
1502                 restartedRequest = false;
1503             }
1504             if ( _method == "GET")
1505             {
1506                 return exec_from_parameters();
1507             }
1508             goto connect;
1509         }
1510 
1511         if (_stream && _stream.isConnected) {
1512             // return to pool
1513             _cm.put(_uri.scheme, _uri.host, _uri.port, _stream);
1514         }
1515         _response._history = _history;
1516         return _response;
1517     }
1518 
1519     ///
1520     HTTPResponse exec_from_parameters() {
1521 
1522         debug(requests) tracef("exec from parameters request");
1523 
1524         assert(_uri != URI.init);
1525         NetworkStream _stream;
1526         _response = new HTTPResponse;
1527         _history.length = 0;
1528         _response.uri = _uri;
1529         _response.finalURI = _uri;
1530         bool restartedRequest = false; // True if this is restarted keepAlive request
1531 
1532     connect:
1533         if ( _method == "GET" && _uri in _permanent_redirects ) {
1534             debug(requests) trace("use parmanent redirects cache");
1535             _uri = uriFromLocation(_uri, _permanent_redirects[_uri]);
1536             _response._finalURI = _uri;
1537         }
1538         _contentReceived = 0;
1539         _response._startedAt = Clock.currTime;
1540 
1541         assert(_stream is null);
1542 
1543         _stream = _cm.get(_uri.scheme, _uri.host, _uri.port);
1544 
1545         if ( _stream is null ) {
1546             debug(requests) trace("create new connection");
1547             _stream = setupConnection();
1548         } else {
1549             debug(requests) trace("reuse old connection");
1550         }
1551 
1552         assert(_stream !is null);
1553 
1554         if ( !_stream.isConnected ) {
1555             debug(requests) trace("disconnected stream on enter");
1556             if ( !restartedRequest ) {
1557                 debug(requests) trace("disconnected stream on enter: retry");
1558                 // assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream);
1559 
1560                 // _cm.del(_uri.scheme, _uri.host, _uri.port, _stream);
1561                 _stream.close();
1562                 _stream = null;
1563 
1564                 restartedRequest = true;
1565                 goto connect;
1566             }
1567             debug(requests) trace("disconnected stream on enter: return response");
1568             //_stream = null;
1569             return _response;
1570         }
1571         _response._connectedAt = Clock.currTime;
1572 
1573         auto h = requestHeaders();
1574 
1575         Appender!string req;
1576 
1577         string encoded;
1578 
1579         switch (_method) {
1580             case "POST","PUT","PATCH":
1581                 encoded = params2query(_params);
1582                 safeSetHeader(h, _userHeaders.ContentType, "Content-Type", "application/x-www-form-urlencoded");
1583                 if ( encoded.length > 0) {
1584                     safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(encoded.length));
1585                 }
1586                 req.put(requestString());
1587                 break;
1588             default:
1589                 req.put(requestString(_params));
1590         }
1591 
1592         h.byKeyValue.
1593         map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n").
1594         each!(h => req.put(h));
1595         req.put("\r\n");
1596         if ( encoded ) {
1597             req.put(encoded);
1598         }
1599 
1600         debug(requests) trace(req.data);
1601         if ( _verbosity >= 1 ) {
1602             req.data.splitLines.each!(a => writeln("> " ~ a));
1603         }
1604         //
1605         // Now send request and receive response
1606         //
1607         try {
1608             _stream.send(req.data());
1609             _response._requestSentAt = Clock.currTime;
1610             debug(requests) trace("starting receive response");
1611             receiveResponse(_stream);
1612             debug(requests) tracef("done receive response");
1613             _response._finishedAt = Clock.currTime;
1614         }
1615         catch (NetworkException e) {
1616             // On SEND this can means:
1617             // we started to send request to the server, but it closed connection because of keepalive timeout.
1618             // We have to restart request if possible.
1619 
1620             // On RECEIVE - if we received something - then this exception is real and unexpected error.
1621             // If we didn't receive anything - we can restart request again as it can be
1622             debug(requests) tracef("Exception on receive response: %s", e.msg);
1623             if ( _response._responseHeaders.length != 0 )
1624             {
1625                 _stream.close();
1626                 throw new RequestException("Unexpected network error");
1627             }
1628         }
1629 
1630         if ( serverPrematurelyClosedConnection()
1631             && !restartedRequest
1632             && isIdempotent(_method)
1633             ) {
1634             ///
1635             /// We didn't receive any data (keepalive connectioin closed?)
1636             /// and we can restart this request.
1637             /// Go ahead.
1638             ///
1639             debug(requests) tracef("Server closed keepalive connection");
1640 
1641             //assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream);
1642 
1643             //_cm.del(_uri.scheme, _uri.host, _uri.port, _stream);
1644             _stream.close();
1645             _stream = null;
1646 
1647             restartedRequest = true;
1648             goto connect;
1649         }
1650 
1651         if ( _useStreaming ) {
1652             if ( _response._receiveAsRange.activated ) {
1653                 debug(requests) trace("streaming_in activated");
1654                 return _response;
1655             } else {
1656                 // this can happen if whole response body received together with headers
1657                 _response._receiveAsRange.data = _response.responseBody.data;
1658             }
1659         }
1660 
1661         close_connection_if_not_keepalive(_stream);
1662 
1663         if ( _verbosity >= 1 ) {
1664             writeln(">> Connect time: ", _response._connectedAt - _response._startedAt);
1665             writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt);
1666             writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt);
1667         }
1668 
1669         if ( willFollowRedirect ) {
1670             debug(requests) trace("going to follow redirect");
1671             if ( _history.length >= _maxRedirects ) {
1672                 _stream = null;
1673                 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects));
1674             }
1675             // "location" in response already checked in canFollowRedirect
1676             immutable new_location = *("location" in _response.responseHeaders);
1677             immutable current_uri = _uri;
1678             immutable next_uri = uriFromLocation(_uri, new_location);
1679 
1680             immutable get_or_head = _method == "GET" || _method == "HEAD";
1681             immutable code = _response.code;
1682 
1683             // save current response for history
1684             _history ~= _response;
1685 
1686             if ( code == 301 )
1687             {
1688                 // permanent redirect and change method
1689                 debug(requests) tracef("store [%s]=%s in permanent_redirects", _uri, new_location);
1690                 _permanent_redirects[_uri] = new_location;
1691                 if ( !get_or_head )
1692                 {
1693                     _method = "GET";
1694                 }
1695             }
1696             if ( (code == 302 || code == 303) && !get_or_head)
1697             {
1698                 // only change method
1699                 _method = "GET";
1700             }
1701             if ( code == 307 )
1702             {
1703                 // no change method, no permanent
1704             }
1705             if ( code == 308 )
1706             {
1707                 // permanent redirection and do not change method
1708                 _permanent_redirects[_uri] = new_location;
1709             }
1710 
1711             // prepare new response (for redirected request)
1712             _response = new HTTPResponse;
1713             _response.uri = current_uri;
1714             _response.finalURI = next_uri;
1715             if (_stream && _stream.isConnected) {
1716                 // return to pool
1717                 _cm.put(current_uri.scheme, current_uri.host, current_uri.port, _stream);
1718             }
1719             _stream = null;
1720 
1721             // set new uri
1722             _uri = next_uri;
1723             debug(requests) tracef("Redirected to %s", next_uri);
1724             if ( restartedRequest ) {
1725                 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect");
1726                 restartedRequest = false;
1727             }
1728             goto connect;
1729         }
1730 
1731         _response._history = _history;
1732         if ( _stream && _stream.isConnected ) {
1733             // return to pool
1734             _cm.put(_uri.scheme, _uri.host, _uri.port, _stream);
1735         }
1736         return _response;
1737     }
1738     HTTPResponse execute(Request r)
1739     {
1740         _method = r.method;
1741         _uri = r.uri;
1742         _useStreaming = r.useStreaming;
1743         _permanent_redirects = r.permanent_redirects;
1744         _maxRedirects = r.maxRedirects;
1745         _authenticator = r.authenticator;
1746         _maxHeadersLength = r.maxHeadersLength;
1747         _maxContentLength = r.maxContentLength;
1748         _verbosity = r.verbosity;
1749         _keepAlive = r.keepAlive;
1750         _bufferSize = r.bufferSize;
1751         _proxy = r.proxy;
1752         _timeout = r.timeout;
1753         _contentType = r.contentType;
1754         _socketFactory = r.socketFactory;
1755         _sslOptions = r.sslOptions;
1756         _bind = r.bind;
1757         _headers = r.headers;
1758         _userHeaders = r.userHeaders;
1759 
1760         _params = r.params;
1761 
1762         // this assignments increments refCounts, so we can't use const Request
1763         // but Request is anyway struct and called by-value
1764         _cm = r.cm;
1765         _cookie = r.cookie;
1766 
1767         debug(requests) trace("serving %s".format(r));
1768         if ( !r.postData.empty)
1769         {
1770             return exec_from_range(r.postData);
1771         }
1772         if ( r.hasMultipartForm )
1773         {
1774             return exec_from_multipart_form(r.multipartForm);
1775         }
1776         auto rs = exec_from_parameters();
1777         return rs;
1778     }
1779 }
1780 
1781 version(vibeD) {
1782     import std.json;
1783     package string httpTestServer() {
1784         return "http://httpbin.org/";
1785     }
1786     package string fromJsonArrayToStr(JSONValue v) {
1787         return v.str;
1788     }
1789 }
1790 else {
1791     import std.json;
1792     package string httpTestServer() {
1793         return "http://127.0.0.1:8081/";
1794     }
1795     package string fromJsonArrayToStr(JSONValue v) {
1796         return cast(string)(v.array.map!"cast(ubyte)a.integer".array);
1797     }
1798 }