1 module requests.ftp;
2 
3 private:
4 import std.ascii;
5 import std.algorithm;
6 import std.conv;
7 import std.datetime;
8 import std.format;
9 import std.socket;
10 import std.exception;
11 import std.string;
12 import std.range;
13 import std.experimental.logger;
14 import std.stdio;
15 import std.path;
16 
17 import core.stdc.errno;
18 
19 import requests.uri;
20 import requests.utils;
21 import requests.streams;
22 import requests.base;
23 
24 public class FTPServerResponseError: Exception {
25     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure {
26         super(msg, file, line);
27     }
28 }
29 
30 public class FTPResponse : Response {
31 }
32 
33 public struct FTPRequest {
34     private {
35         URI           _uri;
36         Duration      _timeout = 60.seconds;
37         uint          _verbosity = 0;
38         size_t        _bufferSize = 16*1024; // 16k
39         size_t        _maxContentLength;
40         size_t        _contentLength;
41         SocketStream  _controlChannel;
42         string[]      _responseHistory;
43         FTPResponse   _response;
44     }
45     mixin(Getter_Setter!Duration("timeout"));
46     mixin(Getter_Setter!uint("verbosity"));
47     mixin(Getter_Setter!size_t("bufferSize"));
48     mixin(Getter_Setter!size_t("maxContentLength"));
49 
50     @property final string[] responseHistory() @safe @nogc nothrow {
51         return _responseHistory;
52     }
53     this(string uri) {
54         _uri = URI(uri);
55     }
56 
57     this(in URI uri) {
58         _uri = uri;
59     }
60 
61     ~this() {
62         if ( _controlChannel ) {
63             _controlChannel.close();
64         }
65     }
66 
67     ushort sendCmdGetResponse(string cmd) {
68         tracef("cmd to server: %s", cmd.strip);
69         if ( _verbosity >=1 ) {
70             writefln("> %s", cmd.strip);
71         }
72         _controlChannel.send(cmd);
73         string response = serverResponse();
74         _responseHistory ~= response;
75         return responseToCode(response);
76     }
77 
78     ushort responseToCode(string response) pure const @safe {
79         return to!ushort(response[0..3]);
80     }
81 
82     void handleChangeURI(in string uri) @safe {
83         // if control channel exists and new URL not match old, then close
84         URI newURI = URI(uri);
85         if ( _controlChannel && 
86             (newURI.host != _uri.host || newURI.port != _uri.port || newURI.username != _uri.username)) {
87             _controlChannel.close();
88             _controlChannel = null;
89         }
90         _uri = newURI;
91     }
92 
93     string serverResponse() {
94         string res, buffer;
95         immutable bufferLimit = 16*1024;
96         _controlChannel.so.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, _timeout);
97         scope(exit) {
98             _controlChannel.so.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, 0.seconds);
99         }
100         auto b = new ubyte[_bufferSize];
101         while ( buffer.length < bufferLimit ) {
102             trace("Wait on control channel");
103             auto rc = _controlChannel.receive(b);
104             version(Posix) {
105                 if ( rc < 0 && errno == EINTR ) {
106                     continue;
107                 }
108             }
109             tracef("Got %d bytes from control socket", rc);
110             if ( rc <= 0 ) {
111                 error("Failed to read response from server");
112                 throw new FTPServerResponseError("Failed to read server responce over control channel: rc=%d, errno: %d".format(rc, errno()));
113             }
114             if ( _verbosity >= 1 ) {
115                 (cast(string)b[0..rc]).
116                     splitLines.
117                     each!(l=>writefln("< %s", l));
118             }
119             buffer ~= b[0..rc];
120             if ( buffer.endsWith('\n') ){
121                 auto responseLines = buffer.
122                     splitLines.
123                     filter!(l => l.length>3 && l[3]==' ' && l[0..3].all!isDigit);
124                 if ( responseLines.count > 0 ) {
125                     return responseLines.front;
126                 }
127             }
128         }
129         throw new FTPServerResponseError("Failed to read server responce over control channel");
130         assert(0);
131     }
132 
133     auto post(R, A...)(string uri, R data, A args) {
134         enforce( uri || _uri.host, "FTP URL undefined");
135         string response;
136         ushort code;
137 
138         _response = new FTPResponse;
139 
140         if ( uri ) {
141             handleChangeURI(uri);
142         }
143 
144         _response.uri = _uri;
145         _response.finalURI = _uri;
146 
147         if ( !_controlChannel ) {
148             _controlChannel = new TCPSocketStream();
149             _controlChannel.connect(_uri.host, _uri.port, _timeout);
150             response = serverResponse();
151             _responseHistory ~= response;
152             
153             code = responseToCode(response);
154             tracef("Server initial response: %s", response);
155             if ( code/100 > 2 ) {
156                 _response.code = code;
157                 return _response;
158             }
159             // Log in
160             string user = _uri.username.length ? _uri.username : "anonymous";
161             string pass = _uri.password.length ? _uri.password : "requests@";
162             tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1));
163             
164             code = sendCmdGetResponse("USER " ~ user ~ "\r\n");
165             if ( code/100 > 3 ) {
166                 _response.code = code;
167                 return _response;
168             } else if ( code/100 == 3) {
169                 
170                 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n");
171                 if ( code/100 > 2 ) {
172                     _response.code = code;
173                     return _response;
174                 }
175             }
176             
177         }
178 
179         code = sendCmdGetResponse("CWD " ~ dirName(_uri.path) ~ "\r\n");
180         if ( code/100 > 2 ) {
181             _response.code = code;
182             return _response;
183         }
184 
185         code = sendCmdGetResponse("PASV\r\n");
186         if ( code/100 > 2 ) {
187             _response.code = code;
188             return _response;
189         }
190         // something like  "227 Entering Passive Mode (132,180,15,2,210,187)" expected
191         // in last response.
192         // Cut anything between ( and )
193         auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
194         string host;
195         ushort port;
196         try {
197             ubyte a1,a2,a3,a4,p1,p2;
198             formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2);
199             host = format("%d.%d.%d.%d", a1, a2, a3, a4);
200             port = (p1<<8) + p2;
201         } catch (FormatException e) {
202             error("Failed to parse ", v);
203             _response.code = 500;
204             return _response;
205         }
206 
207         auto dataStream = new TCPSocketStream();
208         scope (exit ) {
209             if ( dataStream !is null ) {
210                 dataStream.close();
211             }
212         }
213 
214         dataStream.connect(host, port, _timeout);
215 
216         code = sendCmdGetResponse("TYPE I\r\n");
217         if ( code/100 > 2 ) {
218             _response.code = code;
219             return _response;
220         }
221 
222         code = sendCmdGetResponse("STOR " ~ baseName(_uri.path) ~ "\r\n");
223         if ( code/100 > 1 ) {
224             _response.code = code;
225             return _response;
226         }
227         auto b = new ubyte[_bufferSize];
228         for(size_t pos = 0; pos < data.length;) {
229             auto chunk = data.take(_bufferSize).array;
230             auto rc = dataStream.send(chunk);
231             if ( rc <= 0 ) {
232                 trace("done");
233                 break;
234             }
235             tracef("sent %d bytes to data channel", rc);
236             pos += rc;
237         }
238         dataStream.close();
239         dataStream = null;
240         response = serverResponse();
241         code = responseToCode(response);
242         if ( code/100 == 2 ) {
243             tracef("Successfully uploaded %d bytes", _response._responseBody.length);
244         }
245         _response.code = code;
246         return _response;
247     }
248 
249     auto get(string uri = null) {
250         enforce( uri || _uri.host, "FTP URL undefined");
251         string response;
252         ushort code;
253 
254         _response = new FTPResponse;
255 
256         if ( uri ) {
257             handleChangeURI(uri);
258         }
259 
260         _response.uri = _uri;
261         _response.finalURI = _uri;
262 
263         if ( !_controlChannel ) {
264             _controlChannel = new TCPSocketStream();
265             _controlChannel.connect(_uri.host, _uri.port, _timeout);
266             response = serverResponse();
267             _responseHistory ~= response;
268             
269             code = responseToCode(response);
270             tracef("Server initial response: %s", response);
271             if ( code/100 > 2 ) {
272                 _response.code = code;
273                 return _response;
274             }
275             // Log in
276             string user = _uri.username.length ? _uri.username : "anonymous";
277             string pass = _uri.password.length ? _uri.password : "requests@";
278             tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1));
279             
280             code = sendCmdGetResponse("USER " ~ user ~ "\r\n");
281             if ( code/100 > 3 ) {
282                 _response.code = code;
283                 return _response;
284             } else if ( code/100 == 3) {
285                 
286                 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n");
287                 if ( code/100 > 2 ) {
288                     _response.code = code;
289                     return _response;
290                 }
291             }
292 
293         }
294 
295         code = sendCmdGetResponse("CWD " ~ dirName(_uri.path) ~ "\r\n");
296         if ( code/100 > 2 ) {
297             _response.code = code;
298             return _response;
299         }
300         code = sendCmdGetResponse("SIZE " ~ baseName(_uri.path) ~ "\r\n");
301         if ( code/100 == 2 ) {
302             // something like 
303             // 213 229355520
304             auto s = _responseHistory[$-1].findSplitAfter(" ");
305             if ( s.length ) {
306                 try {
307                     _contentLength = to!size_t(s[1]);
308                 } catch (ConvException) {
309                     trace("Failed to convert string %s to file size".format(s[1]));
310                 }
311             }
312         }
313 
314         if ( _maxContentLength && _contentLength > _maxContentLength ) {
315             throw new RequestException("maxContentLength exceeded for ftp data");
316         }
317 
318         code = sendCmdGetResponse("PASV\r\n");
319         if ( code/100 > 2 ) {
320             _response.code = code;
321             return _response;
322         }
323         // something like  "227 Entering Passive Mode (132,180,15,2,210,187)" expected
324         // in last response.
325         // Cut anything between ( and )
326         auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
327         string host;
328         ushort port;
329         try {
330             ubyte a1,a2,a3,a4,p1,p2;
331             formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2);
332             host = format("%d.%d.%d.%d", a1, a2, a3, a4);
333             port = (p1<<8) + p2;
334         } catch (FormatException e) {
335             error("Failed to parse ", v);
336             _response.code = 500;
337             return _response;
338         }
339         
340         auto dataStream = new TCPSocketStream();
341         scope (exit ) {
342             if ( dataStream !is null ) {
343                 dataStream.close();
344             }
345         }
346         
347         dataStream.connect(host, port, _timeout);
348         
349         code = sendCmdGetResponse("TYPE I\r\n");
350         if ( code/100 > 2 ) {
351             _response.code = code;
352             return _response;
353         }
354         
355         code = sendCmdGetResponse("RETR " ~ baseName(_uri.path) ~ "\r\n");
356         if ( code/100 > 1 ) {
357             _response.code = code;
358             return _response;
359         }
360         auto b = new ubyte[_bufferSize];
361         while ( true ) {
362             auto rc = dataStream.receive(b);
363             if ( rc <= 0 ) {
364                 trace("done");
365                 break;
366             }
367             tracef("got %d bytes from data channel", rc);
368             _response._responseBody.put(b[0..rc]);
369 
370             if ( _maxContentLength && _response._responseBody.length >= _maxContentLength ) {
371                 throw new RequestException("maxContentLength exceeded for ftp data");
372             }
373         }
374         dataStream.close();
375         response = serverResponse();
376         code = responseToCode(response);
377         if ( code/100 == 2 ) {
378             tracef("Successfully received %d bytes", _response._responseBody.length);
379         }
380         _response.code = code;
381         return _response;
382     }
383 }
384 
385 package unittest {
386     globalLogLevel(LogLevel.info );
387     info("testing ftp");
388     auto rq = FTPRequest();
389     info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT");
390     auto rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "test, ignore please\n".representation);
391     assert(rs.code == 226);
392     info("ftp get  ", "ftp://speedtest.tele2.net/nonexistent", ", in same session.");
393     rs = rq.get("ftp://speedtest.tele2.net/nonexistent");
394     assert(rs.code != 226);
395     info("ftp get  ", "ftp://speedtest.tele2.net/1KB.zip", ", in same session.");
396     rs = rq.get("ftp://speedtest.tele2.net/1KB.zip");
397     assert(rs.code == 226);
398     assert(rs.responseBody.length == 1024);
399     info("ftp get  ", "ftp://ftp.uni-bayreuth.de/README");
400     rs = rq.get("ftp://ftp.uni-bayreuth.de/README");
401     assert(rs.code == 226);
402     info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT");
403     rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "another test, ignore please\n".representation);
404     assert(rs.code == 226);
405     info("ftp get  ", "ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
406     rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
407     assert(rs.code == 226);
408     assert(rs.finalURI.path == "/pub/FreeBSD/README.TXT");
409     info("testing ftp - done.");
410 }