1 module requests.ftp;
2 
3 private:
4 import std.ascii;
5 import std.algorithm;
6 import std.conv;
7 import std.datetime;
8 import std.format;
9 import std.exception;
10 import std.string;
11 import std.range;
12 import std.experimental.logger;
13 import std.stdio;
14 import std.path;
15 import std.traits;
16 
17 import requests.uri;
18 import requests.utils;
19 import requests.streams;
20 import requests.base;
21 
22 public class FTPServerResponseError: Exception {
23     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
24         super(message, file, line, next);
25     }
26 }
27 
28 public class FTPResponse : Response {
29 }
30 
31 public class FtpAuthentication: Auth {
32     private {
33         string   _username, _password;
34     }
35     /// Constructor.
36     /// Params:
37     /// username = username
38     /// password = password
39     ///
40     this(string username, string password) {
41         _username = username;
42         _password = password;
43     }
44     override string userName() {
45         return _username;
46     }
47     override string password() {
48         return _password;
49     }
50     override string[string] authHeaders(string domain) {
51         return null;
52     }
53 }
54 
55 enum defaultBufferSize = 8192;
56 
57 public struct FTPRequest {
58     private {
59         URI           _uri;
60         Duration      _timeout = 60.seconds;
61         uint          _verbosity = 0;
62         size_t        _bufferSize = defaultBufferSize;
63         long          _maxContentLength = 5*1024*1024*1024;
64         long          _contentLength = -1;
65         long          _contentReceived;
66         NetworkStream _controlChannel;
67         string[]      _responseHistory;
68         FTPResponse   _response;
69         bool          _useStreaming;
70         Auth           _authenticator;
71     }
72     mixin(Getter_Setter!Duration("timeout"));
73     mixin(Getter_Setter!uint("verbosity"));
74     mixin(Getter_Setter!size_t("bufferSize"));
75     mixin(Getter_Setter!long("maxContentLength"));
76     mixin(Getter_Setter!bool("useStreaming"));
77     mixin(Getter!long("contentLength"));
78     mixin(Getter!long("contentReceived"));
79     mixin(Setter!Auth("authenticator"));
80 
81     @property final string[] responseHistory() @safe @nogc nothrow {
82         return _responseHistory;
83     }
84     this(string uri) {
85         _uri = URI(uri);
86     }
87 
88     this(in URI uri) {
89         _uri = uri;
90     }
91 
92     ~this() {
93         if ( _controlChannel ) {
94             _controlChannel.close();
95         }
96     }
97 
98     ushort sendCmdGetResponse(string cmd) {
99         debug(requests) tracef("cmd to server: %s", cmd.strip);
100         if ( _verbosity >=1 ) {
101             writefln("> %s", cmd.strip);
102         }
103         _controlChannel.send(cmd);
104         string response = serverResponse();
105         _responseHistory ~= response;
106         return responseToCode(response);
107     }
108 
109     ushort responseToCode(string response) pure const @safe {
110         return to!ushort(response[0..3]);
111     }
112 
113     void handleChangeURI(in string uri) @safe {
114         // if control channel exists and new URL not match old, then close
115         URI newURI = URI(uri);
116         if ( _controlChannel && 
117             (newURI.host != _uri.host || newURI.port != _uri.port || newURI.username != _uri.username)) {
118             _controlChannel.close();
119             _controlChannel = null;
120         }
121         _uri = newURI;
122     }
123 
124     string serverResponse() {
125         string res, buffer;
126         immutable bufferLimit = 16*1024;
127         _controlChannel.readTimeout = _timeout;
128         scope(exit) {
129             _controlChannel.readTimeout = 0.seconds;
130         }
131         auto b = new ubyte[_bufferSize];
132         while ( buffer.length < bufferLimit ) {
133             debug(requests) trace("Wait on control channel");
134             ptrdiff_t rc;
135             try {
136                 rc = _controlChannel.receive(b);
137             }
138             catch (Exception e) {
139                 error("Failed to read response from server");
140                 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__, e);
141             }
142             debug(requests) tracef("Got %d bytes from control socket", rc);
143             if ( rc == 0 ) {
144                 error("Failed to read response from server");
145                 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__);
146             }
147             if ( _verbosity >= 1 ) {
148                 (cast(string)b[0..rc]).
149                     splitLines.
150                     each!(l=>writefln("< %s", l));
151             }
152             buffer ~= b[0..rc];
153             if ( buffer.endsWith('\n') ){
154                 auto responseLines = buffer.
155                     splitLines.
156                     filter!(l => l.length>3 && l[3]==' ' && l[0..3].all!isDigit);
157                 if ( responseLines.count > 0 ) {
158                     return responseLines.front;
159                 }
160             }
161         }
162         throw new FTPServerResponseError("Failed to read server responce over control channel");
163         assert(0);
164     }
165     ushort tryCdOrCreatePath(string[] path) {
166         /*
167          * At start we stay at original path, we have to create next path element
168          * For example:
169          * path = ["", "a", "b"] - we stay in root (path[0]), we have to cd and return ok
170          * or try to cteate "a" and cd to "a".
171          */
172         debug(requests) info("Trying to create path %s".format(path));
173         enforce(path.length>=2, "You called tryCdOrCreate, but there is nothing to create: %s".format(path));
174         auto next_dir = path[1];
175         auto code = sendCmdGetResponse("CWD " ~ next_dir ~ "\r\n");
176         if ( code >= 300) {
177             // try to create, then again CWD
178             code = sendCmdGetResponse("MKD " ~ next_dir ~ "\r\n");
179             if ( code > 300 ) {
180                 return code;
181             }
182             code = sendCmdGetResponse("CWD " ~ next_dir ~ "\r\n");
183         }
184         if ( path.length == 2 ) {
185             return code;
186         }
187         return tryCdOrCreatePath(path[1..$]);
188     }
189     auto post(R, A...)(string uri, R content, A args) 
190         if ( __traits(compiles, cast(ubyte[])content) 
191         || (rank!R == 2 && isSomeChar!(Unqual!(typeof(content.front.front)))) 
192         || (rank!R == 2 && (is(Unqual!(typeof(content.front.front)) == ubyte)))
193         ) {
194         enforce( uri || _uri.host, "FTP URL undefined");
195         string response;
196         ushort code;
197 
198         _response = new FTPResponse;
199 
200         if ( uri ) {
201             handleChangeURI(uri);
202         }
203 
204         _response.uri = _uri;
205         _response.finalURI = _uri;
206 
207         if ( !_controlChannel ) {
208             _controlChannel = new TCPStream();
209             _controlChannel.connect(_uri.host, _uri.port, _timeout);
210             response = serverResponse();
211             _responseHistory ~= response;
212             
213             code = responseToCode(response);
214             debug(requests) tracef("Server initial response: %s", response);
215             if ( code/100 > 2 ) {
216                 _response.code = code;
217                 return _response;
218             }
219             // Log in
220             string user, pass;
221             if ( _authenticator ) {
222                 user = _authenticator.userName();
223                 pass = _authenticator.password();
224             }
225             else{
226                 user = _uri.username.length ? _uri.username : "anonymous";
227                 pass = _uri.password.length ? _uri.password : "requests@";
228             }
229             debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1));
230             
231             code = sendCmdGetResponse("USER " ~ user ~ "\r\n");
232             if ( code/100 > 3 ) {
233                 _response.code = code;
234                 return _response;
235             } else if ( code/100 == 3) {
236                 
237                 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n");
238                 if ( code/100 > 2 ) {
239                     _response.code = code;
240                     return _response;
241                 }
242             }
243             
244         }
245         code = sendCmdGetResponse("PWD\r\n");
246         string pwd;
247         if ( code/100 == 2 ) {
248             // like '257 "/home/testuser"'
249             auto a = _responseHistory[$-1].split();
250             if ( a.length > 1 ) {
251                 pwd = a[1].chompPrefix(`"`).chomp(`"`);
252             }
253         }
254         scope (exit) {
255             if ( pwd && _controlChannel ) {
256                 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n");
257             }
258         }
259 
260         auto path = dirName(_uri.path);
261         if ( path != "/") {
262             path = path.chompPrefix("/");
263         }
264         code = sendCmdGetResponse("CWD " ~ path ~ "\r\n");
265         if ( code == 550 ) {
266             // try to create directory end enter it
267             code = tryCdOrCreatePath(dirName(_uri.path).split('/'));
268         }
269         if ( code/100 > 2 ) {
270             _response.code = code;
271             return _response;
272         }
273 
274         code = sendCmdGetResponse("PASV\r\n");
275         if ( code/100 > 2 ) {
276             _response.code = code;
277             return _response;
278         }
279         // something like  "227 Entering Passive Mode (132,180,15,2,210,187)" expected
280         // in last response.
281         // Cut anything between ( and )
282         auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
283         string host;
284         ushort port;
285         try {
286             ubyte a1,a2,a3,a4,p1,p2;
287             formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2);
288             host = format("%d.%d.%d.%d", a1, a2, a3, a4);
289             port = (p1<<8) + p2;
290         } catch (FormatException e) {
291             error("Failed to parse ", v);
292             _response.code = 500;
293             return _response;
294         }
295 
296         auto dataStream = new TCPStream();
297         scope (exit ) {
298             if ( dataStream !is null ) {
299                 dataStream.close();
300             }
301         }
302 
303         dataStream.connect(host, port, _timeout);
304 
305         code = sendCmdGetResponse("TYPE I\r\n");
306         if ( code/100 > 2 ) {
307             _response.code = code;
308             return _response;
309         }
310 
311         code = sendCmdGetResponse("STOR " ~ baseName(_uri.path) ~ "\r\n");
312         if ( code/100 > 1 ) {
313             _response.code = code;
314             return _response;
315         }
316         static if ( __traits(compiles, cast(ubyte[])content) ) {
317             auto data = cast(ubyte[])content;
318             auto b = new ubyte[_bufferSize];
319             for(size_t pos = 0; pos < data.length;) {
320                 auto chunk = data.take(_bufferSize).array;
321                 auto rc = dataStream.send(chunk);
322                 if ( rc <= 0 ) {
323                     debug(requests) trace("done");
324                     break;
325                 }
326                 debug(requests) tracef("sent %d bytes to data channel", rc);
327                 pos += rc;
328             }
329         } else {
330             while (!content.empty) {
331                 auto chunk = content.front;
332                 debug(requests) trace("ftp posting %d of data chunk".format(chunk.length));
333                 auto rc = dataStream.send(chunk);
334                 if ( rc <= 0 ) {
335                     debug(requests) trace("done");
336                     break;
337                 }
338                 content.popFront;
339             }
340         }
341         dataStream.close();
342         dataStream = null;
343         response = serverResponse();
344         code = responseToCode(response);
345         if ( code/100 == 2 ) {
346             debug(requests) tracef("Successfully uploaded %d bytes", _response._responseBody.length);
347         }
348         _response.code = code;
349         return _response;
350     }
351 
352     auto get(string uri = null) {
353         enforce( uri || _uri.host, "FTP URL undefined");
354         string response;
355         ushort code;
356 
357         _response = new FTPResponse;
358         _contentReceived = 0;
359 
360         if ( uri ) {
361             handleChangeURI(uri);
362         }
363 
364         _response.uri = _uri;
365         _response.finalURI = _uri;
366 
367         if ( !_controlChannel ) {
368             _controlChannel = new TCPStream();
369             _controlChannel.connect(_uri.host, _uri.port, _timeout);
370             response = serverResponse();
371             _responseHistory ~= response;
372             
373             code = responseToCode(response);
374             debug(requests) tracef("Server initial response: %s", response);
375             if ( code/100 > 2 ) {
376                 _response.code = code;
377                 return _response;
378             }
379             // Log in
380             string user, pass;
381             if ( _authenticator ) {
382                 user = _authenticator.userName();
383                 pass = _authenticator.password();
384             }
385             else{
386                 user = _uri.username.length ? _uri.username : "anonymous";
387                 pass = _uri.password.length ? _uri.password : "requests@";
388             }
389             debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1));
390             
391             code = sendCmdGetResponse("USER " ~ user ~ "\r\n");
392             if ( code/100 > 3 ) {
393                 _response.code = code;
394                 return _response;
395             } else if ( code/100 == 3) {
396                 
397                 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n");
398                 if ( code/100 > 2 ) {
399                     _response.code = code;
400                     return _response;
401                 }
402             }
403 
404         }
405 
406         code = sendCmdGetResponse("PWD\r\n");
407         string pwd;
408         if ( code/100 == 2 ) {
409             // like '257 "/home/testuser"'
410             auto a = _responseHistory[$-1].split();
411             if ( a.length > 1 ) {
412                 pwd = a[1].chompPrefix(`"`).chomp(`"`);
413             }
414         }
415         scope (exit) {
416             if ( pwd && _controlChannel ) {
417                 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n");
418             }
419         }
420         
421         auto path = dirName(_uri.path);
422         if ( path != "/") {
423             path = path.chompPrefix("/");
424         }
425         code = sendCmdGetResponse("CWD " ~ path ~ "\r\n");
426         if ( code/100 > 2 ) {
427             _response.code = code;
428             return _response;
429         }
430 
431         code = sendCmdGetResponse("TYPE I\r\n");
432         if ( code/100 > 2 ) {
433             _response.code = code;
434             return _response;
435         }
436         
437         code = sendCmdGetResponse("SIZE " ~ baseName(_uri.path) ~ "\r\n");
438         if ( code/100 == 2 ) {
439             // something like 
440             // 213 229355520
441             auto s = _responseHistory[$-1].findSplitAfter(" ");
442             if ( s.length ) {
443                 try {
444                     _contentLength = to!long(s[1]);
445                 } catch (ConvException) {
446                     debug(requests) trace("Failed to convert string %s to file size".format(s[1]));
447                 }
448             }
449         }
450 
451         if ( _maxContentLength && _contentLength > _maxContentLength ) {
452             throw new RequestException("maxContentLength exceeded for ftp data");
453         }
454 
455         code = sendCmdGetResponse("PASV\r\n");
456         if ( code/100 > 2 ) {
457             _response.code = code;
458             return _response;
459         }
460         // something like  "227 Entering Passive Mode (132,180,15,2,210,187)" expected
461         // in last response.
462         // Cut anything between ( and )
463         auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
464         string host;
465         ushort port;
466         try {
467             ubyte a1,a2,a3,a4,p1,p2;
468             formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2);
469             host = format("%d.%d.%d.%d", a1, a2, a3, a4);
470             port = (p1<<8) + p2;
471         } catch (FormatException e) {
472             error("Failed to parse ", v);
473             _response.code = 500;
474             return _response;
475         }
476         
477         auto dataStream = new TCPStream();
478         scope (exit ) {
479             if ( dataStream !is null && !_response._receiveAsRange.activated ) {
480                 dataStream.close();
481             }
482         }
483         
484         dataStream.connect(host, port, _timeout);
485         
486         code = sendCmdGetResponse("RETR " ~ baseName(_uri.path) ~ "\r\n");
487         if ( code/100 > 1 ) {
488             _response.code = code;
489             return _response;
490         }
491         while ( true ) {
492             auto b = new ubyte[_bufferSize];
493             auto rc = dataStream.receive(b);
494             if ( rc <= 0 ) {
495                 debug(requests) trace("done");
496                 break;
497             }
498             debug(requests) tracef("got %d bytes from data channel", rc);
499 
500             _contentReceived += rc;
501             _response._responseBody.putNoCopy(b[0..rc]);
502 
503             if ( _maxContentLength && _response._responseBody.length >= _maxContentLength ) {
504                 throw new RequestException("maxContentLength exceeded for ftp data");
505             }
506             if ( _useStreaming ) {
507                 debug(requests) trace("ftp uses streaming");
508                 _response.receiveAsRange.activated = true;
509                 _response.receiveAsRange.data.length = 0;
510                 _response.receiveAsRange.data = _response._responseBody.data;
511                 _response.receiveAsRange.read = delegate ubyte[] () {
512                     Buffer!ubyte result;
513                     while(true) {
514                         // check if we received everything we need
515                         if ( _contentReceived >= _maxContentLength ) 
516                         {
517                             throw new RequestException("ContentLength > maxContentLength (%d>%d)".
518                                 format(_contentLength, _maxContentLength));
519                         }
520                         // have to continue
521                         auto b = new ubyte[_bufferSize];
522                         ptrdiff_t read;
523                         try {
524                             read = dataStream.receive(b);
525                         }
526                         catch (Exception e) {
527                             throw new RequestException("streaming_in error reading from socket", __FILE__, __LINE__, e);
528                         }
529 
530                         if ( read > 0 ) {
531                             _contentReceived += read;
532                             result.putNoCopy(b[0..read]);
533                             return result.data;
534                         }
535                         if ( read == 0 ) {
536                             debug(requests) tracef("streaming_in: server closed connection");
537                             dataStream.close();
538                             code = responseToCode(serverResponse());
539                             if ( code/100 == 2 ) {
540                                 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length);
541                             }
542                             _response.code = code;
543                             break;
544                         }
545                     }
546                     return result.data;
547                 };
548                 return _response;
549             }
550         }
551         dataStream.close();
552         response = serverResponse();
553         code = responseToCode(response);
554         if ( code/100 == 2 ) {
555             debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length);
556         }
557         _response.code = code;
558         return _response;
559     }
560 }
561 
562 package unittest {
563     globalLogLevel(LogLevel.info);
564     info("testing ftp");
565     auto rq = FTPRequest();
566     info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT");
567     auto rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "test, ignore please\n".representation);
568     assert(rs.code == 226);
569     info("ftp get  ", "ftp://speedtest.tele2.net/nonexistent", ", in same session.");
570     rs = rq.get("ftp://speedtest.tele2.net/nonexistent");
571     assert(rs.code != 226);
572     info("ftp get  ", "ftp://speedtest.tele2.net/1KB.zip", ", in same session.");
573     rs = rq.get("ftp://speedtest.tele2.net/1KB.zip");
574     assert(rs.code == 226);
575     assert(rs.responseBody.length == 1024);
576     info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT");
577     rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "another test, ignore please\n".representation);
578     assert(rs.code == 226);
579     info("ftp get  ", "ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
580     rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
581     assert(rs.code == 226);
582     info("ftp get  ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT with authenticator");
583     rq.authenticator = new FtpAuthentication("anonymous", "requests@");
584     rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
585     assert(rs.code == 226);
586     assert(rs.finalURI.path == "/pub/FreeBSD/README.TXT");
587     info("testing ftp - done.");
588 }