1 module requests.ftp;
2 
3 private:
4 import std.ascii;
5 import std.algorithm;
6 import std.conv;
7 import std.datetime;
8 import std.format;
9 import std.exception;
10 import std.string;
11 import std.range;
12 import std.experimental.logger;
13 import std.stdio;
14 import std.path;
15 import std.traits;
16 
17 import requests.uri;
18 import requests.utils;
19 import requests.streams;
20 import requests.base;
21 
22 public class FTPServerResponseError: Exception {
23     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
24         super(message, file, line, next);
25     }
26 }
27 
28 public class FTPResponse : Response {
29 }
30 
31 public class FtpAuthentication: Auth {
32     private {
33         string   _username, _password;
34     }
35     /// Constructor.
36     /// Params:
37     /// username = username
38     /// password = password
39     ///
40     this(string username, string password) {
41         _username = username;
42         _password = password;
43     }
44     override string userName() {
45         return _username;
46     }
47     override string password() {
48         return _password;
49     }
50     override string[string] authHeaders(string domain) {
51         return null;
52     }
53 }
54 
55 enum defaultBufferSize = 8192;
56 
57 public struct FTPRequest {
58     private {
59         URI           _uri;
60         Duration      _timeout = 60.seconds;
61         uint          _verbosity = 0;
62         size_t        _bufferSize = defaultBufferSize;
63         long          _maxContentLength = 5*1024*1024*1024;
64         long          _contentLength = -1;
65         long          _contentReceived;
66         NetworkStream _controlChannel;
67         string[]      _responseHistory;
68         FTPResponse   _response;
69         bool          _useStreaming;
70         Auth           _authenticator;
71         string        _method;
72         string        _proxy;
73         string        _bind;
74     }
75     mixin(Getter_Setter!Duration("timeout"));
76     mixin(Getter_Setter!uint("verbosity"));
77     mixin(Getter_Setter!size_t("bufferSize"));
78     mixin(Getter_Setter!long("maxContentLength"));
79     mixin(Getter_Setter!bool("useStreaming"));
80     mixin(Getter!long("contentLength"));
81     mixin(Getter!long("contentReceived"));
82     mixin(Setter!Auth("authenticator"));
83     mixin(Getter_Setter!string("proxy"));
84     mixin(Getter_Setter!string("bind"));
85 
86     @property final string[] responseHistory() @safe @nogc nothrow {
87         return _responseHistory;
88     }
89     this(string uri) {
90         _uri = URI(uri);
91     }
92 
93     this(in URI uri) {
94         _uri = uri;
95     }
96 
97     ~this() {
98         if ( _controlChannel ) {
99             _controlChannel.close();
100         }
101     }
102     string toString() const {
103         return "FTPRequest(%s, %s)".format(_method, _uri.uri());
104     }
105     string format(string fmt) const {
106         import std.array;
107         import std.stdio;
108         auto a = appender!string();
109         auto f = FormatSpec!char(fmt);
110         while (f.writeUpToNextSpec(a)) {
111             switch(f.spec) {
112                 case 'h':
113                     // Remote hostname.
114                     a.put(_uri.host);
115                     break;
116                 case 'm':
117                     // method.
118                     a.put(_method);
119                     break;
120                 case 'p':
121                     // Remote port.
122                     a.put("%d".format(_uri.port));
123                     break;
124                 case 'P':
125                     // Path
126                     a.put(_uri.path);
127                     break;
128                 case 'q':
129                     // query parameters supplied with url.
130                     a.put(_uri.query);
131                     break;
132                 case 'U':
133                     a.put(_uri.uri());
134                     break;
135                 default:
136                     throw new FormatException("Unknown Request format spec " ~ f.spec);
137             }
138         }
139         return a.data();
140     }
141     ushort sendCmdGetResponse(string cmd) {
142         debug(requests) tracef("cmd to server: %s", cmd.strip);
143         if ( _verbosity >=1 ) {
144             writefln("> %s", cmd.strip);
145         }
146         _controlChannel.send(cmd);
147         string response = serverResponse();
148         _responseHistory ~= response;
149         return responseToCode(response);
150     }
151 
152     ushort responseToCode(string response) pure const @safe {
153         return to!ushort(response[0..3]);
154     }
155 
156     void handleChangeURI(in string uri) @safe {
157         // if control channel exists and new URL not match old, then close
158         URI newURI = URI(uri);
159         if ( _controlChannel && 
160             (newURI.host != _uri.host || newURI.port != _uri.port || newURI.username != _uri.username)) {
161             _controlChannel.close();
162             _controlChannel = null;
163         }
164         _uri = newURI;
165     }
166 
167     string serverResponse() {
168         string res, buffer;
169         immutable bufferLimit = 16*1024;
170         _controlChannel.readTimeout = _timeout;
171         scope(exit) {
172             _controlChannel.readTimeout = 0.seconds;
173         }
174         auto b = new ubyte[_bufferSize];
175         while ( buffer.length < bufferLimit ) {
176             debug(requests) trace("Wait on control channel");
177             ptrdiff_t rc;
178             try {
179                 rc = _controlChannel.receive(b);
180             }
181             catch (Exception e) {
182                 error("Failed to read response from server");
183                 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__, e);
184             }
185             debug(requests) tracef("Got %d bytes from control socket", rc);
186             if ( rc == 0 ) {
187                 error("Failed to read response from server");
188                 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__);
189             }
190             if ( _verbosity >= 1 ) {
191                 (cast(string)b[0..rc]).
192                     splitLines.
193                     each!(l=>writefln("< %s", l));
194             }
195             buffer ~= b[0..rc];
196             if ( buffer.endsWith('\n') ){
197                 auto responseLines = buffer.
198                     splitLines.
199                     filter!(l => l.length>3 && l[3]==' ' && l[0..3].all!isDigit);
200                 if ( responseLines.count > 0 ) {
201                     return responseLines.front;
202                 }
203             }
204         }
205         throw new FTPServerResponseError("Failed to read server responce over control channel");
206         assert(0);
207     }
208     ushort tryCdOrCreatePath(string[] path) {
209         /*
210          * At start we stay at original path, we have to create next path element
211          * For example:
212          * path = ["", "a", "b"] - we stay in root (path[0]), we have to cd and return ok
213          * or try to cteate "a" and cd to "a".
214          */
215         debug(requests) info("Trying to create path %s".format(path));
216         enforce(path.length>=2, "You called tryCdOrCreate, but there is nothing to create: %s".format(path));
217         auto next_dir = path[1];
218         auto code = sendCmdGetResponse("CWD " ~ next_dir ~ "\r\n");
219         if ( code >= 300) {
220             // try to create, then again CWD
221             code = sendCmdGetResponse("MKD " ~ next_dir ~ "\r\n");
222             if ( code > 300 ) {
223                 return code;
224             }
225             code = sendCmdGetResponse("CWD " ~ next_dir ~ "\r\n");
226         }
227         if ( path.length == 2 ) {
228             return code;
229         }
230         return tryCdOrCreatePath(path[1..$]);
231     }
232     auto post(R, A...)(string uri, R content, A args) 
233         if ( __traits(compiles, cast(ubyte[])content) 
234         || (rank!R == 2 && isSomeChar!(Unqual!(typeof(content.front.front)))) 
235         || (rank!R == 2 && (is(Unqual!(typeof(content.front.front)) == ubyte)))
236         ) {
237         enforce( uri || _uri.host, "FTP URL undefined");
238         string response;
239         ushort code;
240 
241         _response = new FTPResponse;
242         _response._startedAt = Clock.currTime;
243         _method = "POST";
244 
245         scope(exit) {
246             _response._finishedAt = Clock.currTime;
247         }
248 
249         if ( uri ) {
250             handleChangeURI(uri);
251         }
252 
253         _response.uri = _uri;
254         _response.finalURI = _uri;
255 
256         if ( !_controlChannel ) {
257             _controlChannel = new TCPStream();
258             _controlChannel.connect(_uri.host, _uri.port, _timeout);
259             response = serverResponse();
260             _responseHistory ~= response;
261             
262             code = responseToCode(response);
263             debug(requests) tracef("Server initial response: %s", response);
264             if ( code/100 > 2 ) {
265                 _response.code = code;
266                 return _response;
267             }
268             // Log in
269             string user, pass;
270             if ( _authenticator ) {
271                 user = _authenticator.userName();
272                 pass = _authenticator.password();
273             }
274             else{
275                 user = _uri.username.length ? _uri.username : "anonymous";
276                 pass = _uri.password.length ? _uri.password : "requests@";
277             }
278             debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1));
279             
280             code = sendCmdGetResponse("USER " ~ user ~ "\r\n");
281             if ( code/100 > 3 ) {
282                 _response.code = code;
283                 return _response;
284             } else if ( code/100 == 3) {
285                 
286                 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n");
287                 if ( code/100 > 2 ) {
288                     _response.code = code;
289                     return _response;
290                 }
291             }
292             
293         }
294         code = sendCmdGetResponse("PWD\r\n");
295         string pwd;
296         if ( code/100 == 2 ) {
297             // like '257 "/home/testuser"'
298             auto a = _responseHistory[$-1].split();
299             if ( a.length > 1 ) {
300                 pwd = a[1].chompPrefix(`"`).chomp(`"`);
301             }
302         }
303         scope (exit) {
304             if ( pwd && _controlChannel ) {
305                 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n");
306             }
307         }
308 
309         auto path = dirName(_uri.path);
310         if ( path != "/") {
311             path = path.chompPrefix("/");
312         }
313         code = sendCmdGetResponse("CWD " ~ path ~ "\r\n");
314         if ( code == 550 ) {
315             // try to create directory end enter it
316             code = tryCdOrCreatePath(dirName(_uri.path).split('/'));
317         }
318         if ( code/100 > 2 ) {
319             _response.code = code;
320             return _response;
321         }
322 
323         code = sendCmdGetResponse("PASV\r\n");
324         if ( code/100 > 2 ) {
325             _response.code = code;
326             return _response;
327         }
328         // something like  "227 Entering Passive Mode (132,180,15,2,210,187)" expected
329         // in last response.
330         // Cut anything between ( and )
331         auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
332         string host;
333         ushort port;
334         try {
335             ubyte a1,a2,a3,a4,p1,p2;
336             formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2);
337             host = std.format.format("%d.%d.%d.%d", a1, a2, a3, a4);
338             port = (p1<<8) + p2;
339         } catch (FormatException e) {
340             error("Failed to parse ", v);
341             _response.code = 500;
342             return _response;
343         }
344 
345         auto dataStream = new TCPStream();
346         scope (exit ) {
347             if ( dataStream !is null ) {
348                 dataStream.close();
349             }
350         }
351 
352         dataStream.connect(host, port, _timeout);
353 
354         code = sendCmdGetResponse("TYPE I\r\n");
355         if ( code/100 > 2 ) {
356             _response.code = code;
357             return _response;
358         }
359 
360         code = sendCmdGetResponse("STOR " ~ baseName(_uri.path) ~ "\r\n");
361         if ( code/100 > 1 ) {
362             _response.code = code;
363             return _response;
364         }
365         static if ( __traits(compiles, cast(ubyte[])content) ) {
366             auto data = cast(ubyte[])content;
367             auto b = new ubyte[_bufferSize];
368             for(size_t pos = 0; pos < data.length;) {
369                 auto chunk = data.take(_bufferSize).array;
370                 auto rc = dataStream.send(chunk);
371                 if ( rc <= 0 ) {
372                     debug(requests) trace("done");
373                     break;
374                 }
375                 debug(requests) tracef("sent %d bytes to data channel", rc);
376                 pos += rc;
377             }
378         } else {
379             while (!content.empty) {
380                 auto chunk = content.front;
381                 debug(requests) trace("ftp posting %d of data chunk".format(chunk.length));
382                 auto rc = dataStream.send(chunk);
383                 if ( rc <= 0 ) {
384                     debug(requests) trace("done");
385                     break;
386                 }
387                 content.popFront;
388             }
389         }
390         dataStream.close();
391         dataStream = null;
392         response = serverResponse();
393         code = responseToCode(response);
394         if ( code/100 == 2 ) {
395             debug(requests) tracef("Successfully uploaded %d bytes", _response._responseBody.length);
396         }
397         _response.code = code;
398         return _response;
399     }
400 
401     auto get(string uri = null) {
402         enforce( uri || _uri.host, "FTP URL undefined");
403         string response;
404         ushort code;
405 
406         _response = new FTPResponse;
407         _contentReceived = 0;
408         _method = "GET";
409 
410         _response._startedAt = Clock.currTime;
411         scope(exit) {
412             _response._finishedAt = Clock.currTime;
413         }
414 
415         if ( uri ) {
416             handleChangeURI(uri);
417         }
418 
419         _response.uri = _uri;
420         _response.finalURI = _uri;
421 
422         if ( !_controlChannel ) {
423             _controlChannel = new TCPStream();
424             _controlChannel.bind(_bind);
425             _controlChannel.connect(_uri.host, _uri.port, _timeout);
426             _response._connectedAt = Clock.currTime;
427             response = serverResponse();
428             _responseHistory ~= response;
429             
430             code = responseToCode(response);
431             debug(requests) tracef("Server initial response: %s", response);
432             if ( code/100 > 2 ) {
433                 _response.code = code;
434                 return _response;
435             }
436             // Log in
437             string user, pass;
438             if ( _authenticator ) {
439                 user = _authenticator.userName();
440                 pass = _authenticator.password();
441             }
442             else{
443                 user = _uri.username.length ? _uri.username : "anonymous";
444                 pass = _uri.password.length ? _uri.password : "requests@";
445             }
446             debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1));
447             
448             code = sendCmdGetResponse("USER " ~ user ~ "\r\n");
449             if ( code/100 > 3 ) {
450                 _response.code = code;
451                 return _response;
452             } else if ( code/100 == 3) {
453                 
454                 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n");
455                 if ( code/100 > 2 ) {
456                     _response.code = code;
457                     return _response;
458                 }
459             }
460         }
461         else {
462             _response._connectedAt = Clock.currTime;
463         }
464 
465         code = sendCmdGetResponse("PWD\r\n");
466         string pwd;
467         if ( code/100 == 2 ) {
468             // like '257 "/home/testuser"'
469             auto a = _responseHistory[$-1].split();
470             if ( a.length > 1 ) {
471                 pwd = a[1].chompPrefix(`"`).chomp(`"`);
472             }
473         }
474         scope (exit) {
475             if ( pwd && _controlChannel && !_useStreaming ) {
476                 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n");
477             }
478         }
479         
480         auto path = dirName(_uri.path);
481         if ( path != "/") {
482             path = path.chompPrefix("/");
483         }
484         code = sendCmdGetResponse("CWD " ~ path ~ "\r\n");
485         if ( code/100 > 2 ) {
486             _response.code = code;
487             return _response;
488         }
489 
490         code = sendCmdGetResponse("TYPE I\r\n");
491         if ( code/100 > 2 ) {
492             _response.code = code;
493             return _response;
494         }
495         
496         code = sendCmdGetResponse("SIZE " ~ baseName(_uri.path) ~ "\r\n");
497         if ( code/100 == 2 ) {
498             // something like 
499             // 213 229355520
500             auto s = _responseHistory[$-1].findSplitAfter(" ");
501             if ( s.length ) {
502                 try {
503                     _contentLength = to!long(s[1]);
504                 } catch (ConvException) {
505                     debug(requests) trace("Failed to convert string %s to file size".format(s[1]));
506                 }
507             }
508         }
509 
510         if ( _maxContentLength && _contentLength > _maxContentLength ) {
511             throw new RequestException("maxContentLength exceeded for ftp data");
512         }
513 
514         code = sendCmdGetResponse("PASV\r\n");
515         if ( code/100 > 2 ) {
516             _response.code = code;
517             return _response;
518         }
519         // something like  "227 Entering Passive Mode (132,180,15,2,210,187)" expected
520         // in last response.
521         // Cut anything between ( and )
522         auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
523         string host;
524         ushort port;
525         try {
526             ubyte a1,a2,a3,a4,p1,p2;
527             formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2);
528             host = std.format.format("%d.%d.%d.%d", a1, a2, a3, a4);
529             port = (p1<<8) + p2;
530         } catch (FormatException e) {
531             error("Failed to parse ", v);
532             _response.code = 500;
533             return _response;
534         }
535         
536         auto dataStream = new TCPStream();
537         scope (exit ) {
538             if ( dataStream !is null && !_response._receiveAsRange.activated ) {
539                 dataStream.close();
540             }
541         }
542         dataStream.bind(_bind);
543         dataStream.connect(host, port, _timeout);
544         
545         code = sendCmdGetResponse("RETR " ~ baseName(_uri.path) ~ "\r\n");
546         if ( code/100 > 1 ) {
547             _response.code = code;
548             return _response;
549         }
550         while ( true ) {
551             auto b = new ubyte[_bufferSize];
552             auto rc = dataStream.receive(b);
553             if ( rc <= 0 ) {
554                 debug(requests) trace("done");
555                 break;
556             }
557             debug(requests) tracef("got %d bytes from data channel", rc);
558 
559             _contentReceived += rc;
560             _response._responseBody.putNoCopy(b[0..rc]);
561 
562             if ( _maxContentLength && _response._responseBody.length >= _maxContentLength ) {
563                 throw new RequestException("maxContentLength exceeded for ftp data");
564             }
565             if ( _useStreaming ) {
566                 debug(requests) trace("ftp uses streaming");
567                 _response.receiveAsRange.activated = true;
568                 _response.receiveAsRange.data.length = 0;
569                 _response.receiveAsRange.data = _response._responseBody.data;
570                 _response.receiveAsRange.read = delegate ubyte[] () {
571                     Buffer!ubyte result;
572                     while(true) {
573                         // check if we received everything we need
574                         if ( _contentReceived >= _maxContentLength ) 
575                         {
576                             throw new RequestException("ContentLength > maxContentLength (%d>%d)".
577                                 format(_contentLength, _maxContentLength));
578                         }
579                         // have to continue
580                         auto b = new ubyte[_bufferSize];
581                         ptrdiff_t read;
582                         try {
583                             read = dataStream.receive(b);
584                         }
585                         catch (Exception e) {
586                             throw new RequestException("streaming_in error reading from socket", __FILE__, __LINE__, e);
587                         }
588 
589                         if ( read > 0 ) {
590                             _contentReceived += read;
591                             result.putNoCopy(b[0..read]);
592                             return result.data;
593                         }
594                         if ( read == 0 ) {
595                             debug(requests) tracef("streaming_in: server closed connection");
596                             dataStream.close();
597                             code = responseToCode(serverResponse());
598                             if ( code/100 == 2 ) {
599                                 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length);
600                             }
601                             _response.code = code;
602                             sendCmdGetResponse("CWD " ~ pwd ~ "\r\n");
603                             break;
604                         }
605                     }
606                     return result.data;
607                 };
608                 return _response;
609             }
610         }
611         dataStream.close();
612         response = serverResponse();
613         code = responseToCode(response);
614         if ( code/100 == 2 ) {
615             debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length);
616         }
617         _response.code = code;
618         return _response;
619     }
620 }
621 
622 package unittest {
623     import std.process;
624 
625     globalLogLevel(LogLevel.info);
626     bool unreliable_network = environment.get("UNRELIABLENETWORK", "false") == "true";
627 
628     info("testing ftp");
629     auto rq = FTPRequest();
630     info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT");
631     auto rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "test, ignore please\n".representation);
632     assert(unreliable_network || rs.code == 226);
633     info("ftp get  ", "ftp://speedtest.tele2.net/nonexistent", ", in same session.");
634     rs = rq.get("ftp://speedtest.tele2.net/nonexistent");
635     assert(unreliable_network || rs.code != 226);
636     info("ftp get  ", "ftp://speedtest.tele2.net/1KB.zip", ", in same session.");
637     rs = rq.get("ftp://speedtest.tele2.net/1KB.zip");
638     assert(unreliable_network || rs.code == 226);
639     assert(unreliable_network || rs.responseBody.length == 1024);
640     info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT");
641     rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "another test, ignore please\n".representation);
642     assert(unreliable_network || rs.code == 226);
643     info("ftp get  ", "ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
644     try {
645         rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
646     }
647     catch (ConnectError e)
648     {
649     }
650     assert(unreliable_network || rs.code == 226);
651     info("ftp get  ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT with authenticator");
652     rq.authenticator = new FtpAuthentication("anonymous", "requests@");
653     try {
654         rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
655     }
656     catch (ConnectError e)
657     {
658     }
659     assert(unreliable_network || rs.code == 226);
660     assert(unreliable_network || rs.finalURI.path == "/pub/FreeBSD/README.TXT");
661     assert(unreliable_network || rq.format("%m|%h|%p|%P|%q|%U") == "GET|ftp.iij.ad.jp|21|/pub/FreeBSD/README.TXT||ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
662     assert(unreliable_network || rs.format("%h|%p|%P|%q|%U") == "ftp.iij.ad.jp|21|/pub/FreeBSD/README.TXT||ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
663     info("testing ftp - done.");
664 }