1 module requests.ftp;
2 
3 private:
4 import std.ascii;
5 import std.algorithm;
6 import std.conv;
7 import std.datetime;
8 import std.format;
9 import std.exception;
10 import std.string;
11 import std.range;
12 import std.experimental.logger;
13 import std.stdio;
14 import std.path;
15 
16 import requests.uri;
17 import requests.utils;
18 import requests.streams;
19 import requests.base;
20 
21 public class FTPServerResponseError: Exception {
22     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
23         super(message, file, line, next);
24     }
25 }
26 
27 public class FTPResponse : Response {
28 }
29 
30 enum defaultBufferSize = 8192;
31 
32 public struct FTPRequest {
33     private {
34         URI           _uri;
35         Duration      _timeout = 60.seconds;
36         uint          _verbosity = 0;
37         size_t        _bufferSize = defaultBufferSize;
38         long          _maxContentLength = 5*1024*1024*1024;
39         long          _contentLength = -1;
40         long          _contentReceived;
41         NetworkStream _controlChannel;
42         string[]      _responseHistory;
43         FTPResponse   _response;
44         bool          _useStreaming;
45     }
46     mixin(Getter_Setter!Duration("timeout"));
47     mixin(Getter_Setter!uint("verbosity"));
48     mixin(Getter_Setter!size_t("bufferSize"));
49     mixin(Getter_Setter!long("maxContentLength"));
50     mixin(Getter_Setter!bool("useStreaming"));
51     mixin(Getter!long("contentLength"));
52     mixin(Getter!long("contentReceived"));
53 
54     @property final string[] responseHistory() @safe @nogc nothrow {
55         return _responseHistory;
56     }
57     this(string uri) {
58         _uri = URI(uri);
59     }
60 
61     this(in URI uri) {
62         _uri = uri;
63     }
64 
65     ~this() {
66         if ( _controlChannel ) {
67             _controlChannel.close();
68         }
69     }
70 
71     ushort sendCmdGetResponse(string cmd) {
72         tracef("cmd to server: %s", cmd.strip);
73         if ( _verbosity >=1 ) {
74             writefln("> %s", cmd.strip);
75         }
76         _controlChannel.send(cmd);
77         string response = serverResponse();
78         _responseHistory ~= response;
79         return responseToCode(response);
80     }
81 
82     ushort responseToCode(string response) pure const @safe {
83         return to!ushort(response[0..3]);
84     }
85 
86     void handleChangeURI(in string uri) @safe {
87         // if control channel exists and new URL not match old, then close
88         URI newURI = URI(uri);
89         if ( _controlChannel && 
90             (newURI.host != _uri.host || newURI.port != _uri.port || newURI.username != _uri.username)) {
91             _controlChannel.close();
92             _controlChannel = null;
93         }
94         _uri = newURI;
95     }
96 
97     string serverResponse() {
98         string res, buffer;
99         immutable bufferLimit = 16*1024;
100         _controlChannel.readTimeout = _timeout;
101         scope(exit) {
102             _controlChannel.readTimeout = 0.seconds;
103         }
104         auto b = new ubyte[_bufferSize];
105         while ( buffer.length < bufferLimit ) {
106             trace("Wait on control channel");
107             ptrdiff_t rc;
108             try {
109                 rc = _controlChannel.receive(b);
110             }
111             catch (Exception e) {
112                 error("Failed to read response from server");
113                 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__, e);
114             }
115             tracef("Got %d bytes from control socket", rc);
116             if ( rc == 0 ) {
117                 error("Failed to read response from server");
118                 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__);
119             }
120             if ( _verbosity >= 1 ) {
121                 (cast(string)b[0..rc]).
122                     splitLines.
123                     each!(l=>writefln("< %s", l));
124             }
125             buffer ~= b[0..rc];
126             if ( buffer.endsWith('\n') ){
127                 auto responseLines = buffer.
128                     splitLines.
129                     filter!(l => l.length>3 && l[3]==' ' && l[0..3].all!isDigit);
130                 if ( responseLines.count > 0 ) {
131                     return responseLines.front;
132                 }
133             }
134         }
135         throw new FTPServerResponseError("Failed to read server responce over control channel");
136         assert(0);
137     }
138 
139     auto post(R, A...)(string uri, R data, A args) {
140         enforce( uri || _uri.host, "FTP URL undefined");
141         string response;
142         ushort code;
143 
144         _response = new FTPResponse;
145 
146         if ( uri ) {
147             handleChangeURI(uri);
148         }
149 
150         _response.uri = _uri;
151         _response.finalURI = _uri;
152 
153         if ( !_controlChannel ) {
154             _controlChannel = new TCPStream();
155             _controlChannel.connect(_uri.host, _uri.port, _timeout);
156             response = serverResponse();
157             _responseHistory ~= response;
158             
159             code = responseToCode(response);
160             tracef("Server initial response: %s", response);
161             if ( code/100 > 2 ) {
162                 _response.code = code;
163                 return _response;
164             }
165             // Log in
166             string user = _uri.username.length ? _uri.username : "anonymous";
167             string pass = _uri.password.length ? _uri.password : "requests@";
168             tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1));
169             
170             code = sendCmdGetResponse("USER " ~ user ~ "\r\n");
171             if ( code/100 > 3 ) {
172                 _response.code = code;
173                 return _response;
174             } else if ( code/100 == 3) {
175                 
176                 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n");
177                 if ( code/100 > 2 ) {
178                     _response.code = code;
179                     return _response;
180                 }
181             }
182             
183         }
184 
185         code = sendCmdGetResponse("CWD " ~ dirName(_uri.path) ~ "\r\n");
186         if ( code/100 > 2 ) {
187             _response.code = code;
188             return _response;
189         }
190 
191         code = sendCmdGetResponse("PASV\r\n");
192         if ( code/100 > 2 ) {
193             _response.code = code;
194             return _response;
195         }
196         // something like  "227 Entering Passive Mode (132,180,15,2,210,187)" expected
197         // in last response.
198         // Cut anything between ( and )
199         auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
200         string host;
201         ushort port;
202         try {
203             ubyte a1,a2,a3,a4,p1,p2;
204             formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2);
205             host = format("%d.%d.%d.%d", a1, a2, a3, a4);
206             port = (p1<<8) + p2;
207         } catch (FormatException e) {
208             error("Failed to parse ", v);
209             _response.code = 500;
210             return _response;
211         }
212 
213         auto dataStream = new TCPStream();
214         scope (exit ) {
215             if ( dataStream !is null ) {
216                 dataStream.close();
217             }
218         }
219 
220         dataStream.connect(host, port, _timeout);
221 
222         code = sendCmdGetResponse("TYPE I\r\n");
223         if ( code/100 > 2 ) {
224             _response.code = code;
225             return _response;
226         }
227 
228         code = sendCmdGetResponse("STOR " ~ baseName(_uri.path) ~ "\r\n");
229         if ( code/100 > 1 ) {
230             _response.code = code;
231             return _response;
232         }
233         auto b = new ubyte[_bufferSize];
234         for(size_t pos = 0; pos < data.length;) {
235             auto chunk = data.take(_bufferSize).array;
236             auto rc = dataStream.send(chunk);
237             if ( rc <= 0 ) {
238                 trace("done");
239                 break;
240             }
241             tracef("sent %d bytes to data channel", rc);
242             pos += rc;
243         }
244         dataStream.close();
245         dataStream = null;
246         response = serverResponse();
247         code = responseToCode(response);
248         if ( code/100 == 2 ) {
249             tracef("Successfully uploaded %d bytes", _response._responseBody.length);
250         }
251         _response.code = code;
252         return _response;
253     }
254 
255     auto get(string uri = null) {
256         enforce( uri || _uri.host, "FTP URL undefined");
257         string response;
258         ushort code;
259 
260         _response = new FTPResponse;
261 
262         if ( uri ) {
263             handleChangeURI(uri);
264         }
265 
266         _response.uri = _uri;
267         _response.finalURI = _uri;
268 
269         if ( !_controlChannel ) {
270             _controlChannel = new TCPStream();
271             _controlChannel.connect(_uri.host, _uri.port, _timeout);
272             response = serverResponse();
273             _responseHistory ~= response;
274             
275             code = responseToCode(response);
276             tracef("Server initial response: %s", response);
277             if ( code/100 > 2 ) {
278                 _response.code = code;
279                 return _response;
280             }
281             // Log in
282             string user = _uri.username.length ? _uri.username : "anonymous";
283             string pass = _uri.password.length ? _uri.password : "requests@";
284             tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1));
285             
286             code = sendCmdGetResponse("USER " ~ user ~ "\r\n");
287             if ( code/100 > 3 ) {
288                 _response.code = code;
289                 return _response;
290             } else if ( code/100 == 3) {
291                 
292                 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n");
293                 if ( code/100 > 2 ) {
294                     _response.code = code;
295                     return _response;
296                 }
297             }
298 
299         }
300 
301         code = sendCmdGetResponse("CWD " ~ dirName(_uri.path) ~ "\r\n");
302         if ( code/100 > 2 ) {
303             _response.code = code;
304             return _response;
305         }
306 
307         code = sendCmdGetResponse("TYPE I\r\n");
308         if ( code/100 > 2 ) {
309             _response.code = code;
310             return _response;
311         }
312         
313         code = sendCmdGetResponse("SIZE " ~ baseName(_uri.path) ~ "\r\n");
314         if ( code/100 == 2 ) {
315             // something like 
316             // 213 229355520
317             auto s = _responseHistory[$-1].findSplitAfter(" ");
318             if ( s.length ) {
319                 try {
320                     _contentLength = to!long(s[1]);
321                 } catch (ConvException) {
322                     trace("Failed to convert string %s to file size".format(s[1]));
323                 }
324             }
325         }
326 
327         if ( _maxContentLength && _contentLength > _maxContentLength ) {
328             throw new RequestException("maxContentLength exceeded for ftp data");
329         }
330 
331         code = sendCmdGetResponse("PASV\r\n");
332         if ( code/100 > 2 ) {
333             _response.code = code;
334             return _response;
335         }
336         // something like  "227 Entering Passive Mode (132,180,15,2,210,187)" expected
337         // in last response.
338         // Cut anything between ( and )
339         auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
340         string host;
341         ushort port;
342         try {
343             ubyte a1,a2,a3,a4,p1,p2;
344             formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2);
345             host = format("%d.%d.%d.%d", a1, a2, a3, a4);
346             port = (p1<<8) + p2;
347         } catch (FormatException e) {
348             error("Failed to parse ", v);
349             _response.code = 500;
350             return _response;
351         }
352         
353         auto dataStream = new TCPStream();
354         scope (exit ) {
355             if ( dataStream !is null && !_response._receiveAsRange.activated ) {
356                 dataStream.close();
357             }
358         }
359         
360         dataStream.connect(host, port, _timeout);
361         
362         code = sendCmdGetResponse("RETR " ~ baseName(_uri.path) ~ "\r\n");
363         if ( code/100 > 1 ) {
364             _response.code = code;
365             return _response;
366         }
367         while ( true ) {
368             auto b = new ubyte[_bufferSize];
369             auto rc = dataStream.receive(b);
370             if ( rc <= 0 ) {
371                 trace("done");
372                 break;
373             }
374             tracef("got %d bytes from data channel", rc);
375 
376             _contentReceived += rc;
377             _response._responseBody.putNoCopy(b[0..rc]);
378 
379             if ( _maxContentLength && _response._responseBody.length >= _maxContentLength ) {
380                 throw new RequestException("maxContentLength exceeded for ftp data");
381             }
382             if ( _useStreaming ) {
383                 _response.receiveAsRange.activated = true;
384                 _response.receiveAsRange.data = _response._responseBody.data;
385                 _response.receiveAsRange.read = delegate ubyte[] () {
386                     Buffer!ubyte result;
387                     while(true) {
388                         // check if we received everything we need
389                         if ( _contentReceived >= _maxContentLength ) 
390                         {
391                             throw new RequestException("ContentLength > maxContentLength (%d>%d)".
392                                 format(_contentLength, _maxContentLength));
393                         }
394                         // have to continue
395                         auto b = new ubyte[_bufferSize];
396                         ptrdiff_t read;
397                         try {
398                             read = dataStream.receive(b);
399                         }
400                         catch (Exception e) {
401                             throw new RequestException("streaming_in error reading from socket", __FILE__, __LINE__, e);
402                         }
403 
404                         if ( read > 0 ) {
405                             _contentReceived += read;
406                             result.putNoCopy(b[0..read]);
407                             return result.data;
408                         }
409                         if ( read == 0 ) {
410                             debug tracef("streaming_in: server closed connection");
411                             dataStream.close();
412                             code = responseToCode(serverResponse());
413                             if ( code/100 == 2 ) {
414                                 tracef("Successfully received %d bytes", _response._responseBody.length);
415                             }
416                             _response.code = code;
417                             break;
418                         }
419                     }
420                     return result.data;
421                 };
422                 return _response;
423             }
424         }
425         dataStream.close();
426         response = serverResponse();
427         code = responseToCode(response);
428         if ( code/100 == 2 ) {
429             tracef("Successfully received %d bytes", _response._responseBody.length);
430         }
431         _response.code = code;
432         return _response;
433     }
434 }
435 
436 package unittest {
437     globalLogLevel(LogLevel.info );
438     info("testing ftp");
439     auto rq = FTPRequest();
440     info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT");
441     auto rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "test, ignore please\n".representation);
442     assert(rs.code == 226);
443     info("ftp get  ", "ftp://speedtest.tele2.net/nonexistent", ", in same session.");
444     rs = rq.get("ftp://speedtest.tele2.net/nonexistent");
445     assert(rs.code != 226);
446     info("ftp get  ", "ftp://speedtest.tele2.net/1KB.zip", ", in same session.");
447     rs = rq.get("ftp://speedtest.tele2.net/1KB.zip");
448     assert(rs.code == 226);
449     assert(rs.responseBody.length == 1024);
450     info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT");
451     rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "another test, ignore please\n".representation);
452     assert(rs.code == 226);
453     info("ftp get  ", "ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
454     rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
455     assert(rs.code == 226);
456     assert(rs.finalURI.path == "/pub/FreeBSD/README.TXT");
457     info("testing ftp - done.");
458 }