1 module requests.ftp;
2 
3 private:
4 import std.ascii;
5 import std.algorithm;
6 import std.conv;
7 import std.datetime;
8 import std.format;
9 import std.socket;
10 import std.exception;
11 import std.string;
12 import std.range;
13 import std.experimental.logger;
14 import std.stdio;
15 import std.path;
16 
17 import core.stdc.errno;
18 
19 import requests.uri;
20 import requests.utils;
21 import requests.streams;
22 import requests.base;
23 
24 public class FTPServerResponseError: Exception {
25     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure {
26         super(msg, file, line);
27     }
28 }
29 
30 public class FTPResponse : Response {
31 }
32 
33 enum defaultBufferSize = 8192;
34 
35 public struct FTPRequest {
36     private {
37         URI           _uri;
38         Duration      _timeout = 60.seconds;
39         uint          _verbosity = 0;
40         size_t        _bufferSize = defaultBufferSize;
41         long          _maxContentLength = 5*1024*1024*1024;
42         long          _contentLength = -1;
43         long          _contentReceived;
44         SocketStream  _controlChannel;
45         string[]      _responseHistory;
46         FTPResponse   _response;
47         bool          _useStreaming;
48     }
49     mixin(Getter_Setter!Duration("timeout"));
50     mixin(Getter_Setter!uint("verbosity"));
51     mixin(Getter_Setter!size_t("bufferSize"));
52     mixin(Getter_Setter!long("maxContentLength"));
53     mixin(Getter_Setter!bool("useStreaming"));
54     mixin(Getter!long("contentLength"));
55     mixin(Getter!long("contentReceived"));
56 
57     @property final string[] responseHistory() @safe @nogc nothrow {
58         return _responseHistory;
59     }
60     this(string uri) {
61         _uri = URI(uri);
62     }
63 
64     this(in URI uri) {
65         _uri = uri;
66     }
67 
68     ~this() {
69         if ( _controlChannel ) {
70             _controlChannel.close();
71         }
72     }
73 
74     ushort sendCmdGetResponse(string cmd) {
75         tracef("cmd to server: %s", cmd.strip);
76         if ( _verbosity >=1 ) {
77             writefln("> %s", cmd.strip);
78         }
79         _controlChannel.send(cmd);
80         string response = serverResponse();
81         _responseHistory ~= response;
82         return responseToCode(response);
83     }
84 
85     ushort responseToCode(string response) pure const @safe {
86         return to!ushort(response[0..3]);
87     }
88 
89     void handleChangeURI(in string uri) @safe {
90         // if control channel exists and new URL not match old, then close
91         URI newURI = URI(uri);
92         if ( _controlChannel && 
93             (newURI.host != _uri.host || newURI.port != _uri.port || newURI.username != _uri.username)) {
94             _controlChannel.close();
95             _controlChannel = null;
96         }
97         _uri = newURI;
98     }
99 
100     string serverResponse() {
101         string res, buffer;
102         immutable bufferLimit = 16*1024;
103         _controlChannel.so.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, _timeout);
104         scope(exit) {
105             _controlChannel.so.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, 0.seconds);
106         }
107         auto b = new ubyte[_bufferSize];
108         while ( buffer.length < bufferLimit ) {
109             trace("Wait on control channel");
110             auto rc = _controlChannel.receive(b);
111             version(Posix) {
112                 if ( rc < 0 && errno == EINTR ) {
113                     continue;
114                 }
115             }
116             tracef("Got %d bytes from control socket", rc);
117             if ( rc <= 0 ) {
118                 error("Failed to read response from server");
119                 throw new FTPServerResponseError("Failed to read server responce over control channel: rc=%d, errno: %d".format(rc, errno()));
120             }
121             if ( _verbosity >= 1 ) {
122                 (cast(string)b[0..rc]).
123                     splitLines.
124                     each!(l=>writefln("< %s", l));
125             }
126             buffer ~= b[0..rc];
127             if ( buffer.endsWith('\n') ){
128                 auto responseLines = buffer.
129                     splitLines.
130                     filter!(l => l.length>3 && l[3]==' ' && l[0..3].all!isDigit);
131                 if ( responseLines.count > 0 ) {
132                     return responseLines.front;
133                 }
134             }
135         }
136         throw new FTPServerResponseError("Failed to read server responce over control channel");
137         assert(0);
138     }
139 
140     auto post(R, A...)(string uri, R data, A args) {
141         enforce( uri || _uri.host, "FTP URL undefined");
142         string response;
143         ushort code;
144 
145         _response = new FTPResponse;
146 
147         if ( uri ) {
148             handleChangeURI(uri);
149         }
150 
151         _response.uri = _uri;
152         _response.finalURI = _uri;
153 
154         if ( !_controlChannel ) {
155             _controlChannel = new TCPSocketStream();
156             _controlChannel.connect(_uri.host, _uri.port, _timeout);
157             response = serverResponse();
158             _responseHistory ~= response;
159             
160             code = responseToCode(response);
161             tracef("Server initial response: %s", response);
162             if ( code/100 > 2 ) {
163                 _response.code = code;
164                 return _response;
165             }
166             // Log in
167             string user = _uri.username.length ? _uri.username : "anonymous";
168             string pass = _uri.password.length ? _uri.password : "requests@";
169             tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1));
170             
171             code = sendCmdGetResponse("USER " ~ user ~ "\r\n");
172             if ( code/100 > 3 ) {
173                 _response.code = code;
174                 return _response;
175             } else if ( code/100 == 3) {
176                 
177                 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n");
178                 if ( code/100 > 2 ) {
179                     _response.code = code;
180                     return _response;
181                 }
182             }
183             
184         }
185 
186         code = sendCmdGetResponse("CWD " ~ dirName(_uri.path) ~ "\r\n");
187         if ( code/100 > 2 ) {
188             _response.code = code;
189             return _response;
190         }
191 
192         code = sendCmdGetResponse("PASV\r\n");
193         if ( code/100 > 2 ) {
194             _response.code = code;
195             return _response;
196         }
197         // something like  "227 Entering Passive Mode (132,180,15,2,210,187)" expected
198         // in last response.
199         // Cut anything between ( and )
200         auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
201         string host;
202         ushort port;
203         try {
204             ubyte a1,a2,a3,a4,p1,p2;
205             formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2);
206             host = format("%d.%d.%d.%d", a1, a2, a3, a4);
207             port = (p1<<8) + p2;
208         } catch (FormatException e) {
209             error("Failed to parse ", v);
210             _response.code = 500;
211             return _response;
212         }
213 
214         auto dataStream = new TCPSocketStream();
215         scope (exit ) {
216             if ( dataStream !is null ) {
217                 dataStream.close();
218             }
219         }
220 
221         dataStream.connect(host, port, _timeout);
222 
223         code = sendCmdGetResponse("TYPE I\r\n");
224         if ( code/100 > 2 ) {
225             _response.code = code;
226             return _response;
227         }
228 
229         code = sendCmdGetResponse("STOR " ~ baseName(_uri.path) ~ "\r\n");
230         if ( code/100 > 1 ) {
231             _response.code = code;
232             return _response;
233         }
234         auto b = new ubyte[_bufferSize];
235         for(size_t pos = 0; pos < data.length;) {
236             auto chunk = data.take(_bufferSize).array;
237             auto rc = dataStream.send(chunk);
238             if ( rc <= 0 ) {
239                 trace("done");
240                 break;
241             }
242             tracef("sent %d bytes to data channel", rc);
243             pos += rc;
244         }
245         dataStream.close();
246         dataStream = null;
247         response = serverResponse();
248         code = responseToCode(response);
249         if ( code/100 == 2 ) {
250             tracef("Successfully uploaded %d bytes", _response._responseBody.length);
251         }
252         _response.code = code;
253         return _response;
254     }
255 
256     auto get(string uri = null) {
257         enforce( uri || _uri.host, "FTP URL undefined");
258         string response;
259         ushort code;
260 
261         _response = new FTPResponse;
262 
263         if ( uri ) {
264             handleChangeURI(uri);
265         }
266 
267         _response.uri = _uri;
268         _response.finalURI = _uri;
269 
270         if ( !_controlChannel ) {
271             _controlChannel = new TCPSocketStream();
272             _controlChannel.connect(_uri.host, _uri.port, _timeout);
273             response = serverResponse();
274             _responseHistory ~= response;
275             
276             code = responseToCode(response);
277             tracef("Server initial response: %s", response);
278             if ( code/100 > 2 ) {
279                 _response.code = code;
280                 return _response;
281             }
282             // Log in
283             string user = _uri.username.length ? _uri.username : "anonymous";
284             string pass = _uri.password.length ? _uri.password : "requests@";
285             tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1));
286             
287             code = sendCmdGetResponse("USER " ~ user ~ "\r\n");
288             if ( code/100 > 3 ) {
289                 _response.code = code;
290                 return _response;
291             } else if ( code/100 == 3) {
292                 
293                 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n");
294                 if ( code/100 > 2 ) {
295                     _response.code = code;
296                     return _response;
297                 }
298             }
299 
300         }
301 
302         code = sendCmdGetResponse("CWD " ~ dirName(_uri.path) ~ "\r\n");
303         if ( code/100 > 2 ) {
304             _response.code = code;
305             return _response;
306         }
307 
308         code = sendCmdGetResponse("TYPE I\r\n");
309         if ( code/100 > 2 ) {
310             _response.code = code;
311             return _response;
312         }
313         
314         code = sendCmdGetResponse("SIZE " ~ baseName(_uri.path) ~ "\r\n");
315         if ( code/100 == 2 ) {
316             // something like 
317             // 213 229355520
318             auto s = _responseHistory[$-1].findSplitAfter(" ");
319             if ( s.length ) {
320                 try {
321                     _contentLength = to!long(s[1]);
322                 } catch (ConvException) {
323                     trace("Failed to convert string %s to file size".format(s[1]));
324                 }
325             }
326         }
327 
328         if ( _maxContentLength && _contentLength > _maxContentLength ) {
329             throw new RequestException("maxContentLength exceeded for ftp data");
330         }
331 
332         code = sendCmdGetResponse("PASV\r\n");
333         if ( code/100 > 2 ) {
334             _response.code = code;
335             return _response;
336         }
337         // something like  "227 Entering Passive Mode (132,180,15,2,210,187)" expected
338         // in last response.
339         // Cut anything between ( and )
340         auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
341         string host;
342         ushort port;
343         try {
344             ubyte a1,a2,a3,a4,p1,p2;
345             formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2);
346             host = format("%d.%d.%d.%d", a1, a2, a3, a4);
347             port = (p1<<8) + p2;
348         } catch (FormatException e) {
349             error("Failed to parse ", v);
350             _response.code = 500;
351             return _response;
352         }
353         
354         auto dataStream = new TCPSocketStream();
355         scope (exit ) {
356             if ( dataStream !is null && !_response._receiveAsRange.activated ) {
357                 dataStream.close();
358             }
359         }
360         
361         dataStream.connect(host, port, _timeout);
362         
363         code = sendCmdGetResponse("RETR " ~ baseName(_uri.path) ~ "\r\n");
364         if ( code/100 > 1 ) {
365             _response.code = code;
366             return _response;
367         }
368         while ( true ) {
369             auto b = new ubyte[_bufferSize];
370             auto rc = dataStream.receive(b);
371             if ( rc <= 0 ) {
372                 trace("done");
373                 break;
374             }
375             tracef("got %d bytes from data channel", rc);
376 
377             _contentReceived += rc;
378             _response._responseBody.putNoCopy(b[0..rc]);
379 
380             if ( _maxContentLength && _response._responseBody.length >= _maxContentLength ) {
381                 throw new RequestException("maxContentLength exceeded for ftp data");
382             }
383             if ( _useStreaming ) {
384                 _response.receiveAsRange.activated = true;
385                 _response.receiveAsRange.data = _response._responseBody;
386                 _response.receiveAsRange.read = delegate Buffer!ubyte () {
387                     Buffer!ubyte result;
388                     while(true) {
389                         // check if we received everything we need
390                         if ( _contentReceived >= _maxContentLength ) 
391                         {
392                             throw new RequestException("ContentLength > maxContentLength (%d>%d)".
393                                 format(_contentLength, _maxContentLength));
394                         }
395                         // have to continue
396                         auto b = new ubyte[_bufferSize];
397                         auto read = dataStream.receive(b);
398 
399                         if ( read > 0 ) {
400                             _contentReceived += read;
401                             result.putNoCopy(b[0..read]);
402                             return result;
403                         }
404                         if ( read < 0 ) {
405                             version(Posix) {
406                                 if ( errno == EINTR ) {
407                                     continue;
408                                 }
409                             }
410                             throw new RequestException("streaming_in error reading from socket");
411                         }
412                         if ( read == 0 ) {
413                             debug tracef("streaming_in: server closed connection");
414                             dataStream.close();
415                             code = responseToCode(serverResponse());
416                             if ( code/100 == 2 ) {
417                                 tracef("Successfully received %d bytes", _response._responseBody.length);
418                             }
419                             _response.code = code;
420                             break;
421                         }
422                     }
423                     return result;
424                 };
425                 return _response;
426             }
427         }
428         dataStream.close();
429         response = serverResponse();
430         code = responseToCode(response);
431         if ( code/100 == 2 ) {
432             tracef("Successfully received %d bytes", _response._responseBody.length);
433         }
434         _response.code = code;
435         return _response;
436     }
437 }
438 
439 package unittest {
440     globalLogLevel(LogLevel.info );
441     info("testing ftp");
442     auto rq = FTPRequest();
443     info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT");
444     auto rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "test, ignore please\n".representation);
445     assert(rs.code == 226);
446     info("ftp get  ", "ftp://speedtest.tele2.net/nonexistent", ", in same session.");
447     rs = rq.get("ftp://speedtest.tele2.net/nonexistent");
448     assert(rs.code != 226);
449     info("ftp get  ", "ftp://speedtest.tele2.net/1KB.zip", ", in same session.");
450     rs = rq.get("ftp://speedtest.tele2.net/1KB.zip");
451     assert(rs.code == 226);
452     assert(rs.responseBody.length == 1024);
453     info("ftp get  ", "ftp://ftp.uni-bayreuth.de/README");
454     rs = rq.get("ftp://ftp.uni-bayreuth.de/README");
455     assert(rs.code == 226);
456     info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT");
457     rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "another test, ignore please\n".representation);
458     assert(rs.code == 226);
459     info("ftp get  ", "ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
460     rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
461     assert(rs.code == 226);
462     assert(rs.finalURI.path == "/pub/FreeBSD/README.TXT");
463     info("testing ftp - done.");
464 }