1 module requests.ftp; 2 3 private: 4 import std.ascii; 5 import std.algorithm; 6 import std.conv; 7 import std.datetime; 8 import std.format; 9 import std.socket; 10 import std.exception; 11 import std.string; 12 import std.range; 13 import std.experimental.logger; 14 import std.stdio; 15 import std.path; 16 17 import core.stdc.errno; 18 19 import requests.uri; 20 import requests.utils; 21 import requests.streams; 22 import requests.base; 23 24 public class FTPServerResponseError: Exception { 25 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure { 26 super(msg, file, line); 27 } 28 } 29 30 public class FTPResponse : Response { 31 // ushort __code; 32 // Buffer!ubyte __responseBody; 33 // mixin(getter("code")); 34 // @property auto responseBody() inout pure @safe { 35 // return __responseBody; 36 // } 37 } 38 39 public struct FTPRequest { 40 private { 41 URI __uri; 42 Duration __timeout = 60.seconds; 43 uint __verbosity = 0; 44 size_t __bufferSize = 16*1024; // 16k 45 size_t __maxContentLength = 5*1024*1024; // 5MB 46 SocketStream __controlChannel; 47 string[] __responseHistory; 48 FTPResponse __response; 49 } 50 mixin(setter("timeout")); 51 mixin(getter("timeout")); 52 mixin(setter("verbosity")); 53 mixin(getter("verbosity")); 54 mixin(getter("responseHistory")); 55 mixin(setter("bufferSize")); 56 mixin(getter("bufferSize")); 57 mixin(setter("maxContentLength")); 58 mixin(getter("maxContentLength")); 59 60 this(string uri) { 61 __uri = URI(uri); 62 } 63 64 this(in URI uri) { 65 __uri = uri; 66 } 67 68 ~this() { 69 if ( __controlChannel ) { 70 __controlChannel.close(); 71 } 72 } 73 74 ushort sendCmdGetResponse(string cmd) { 75 tracef("cmd to server: %s", cmd.strip); 76 if ( __verbosity >=1 ) { 77 writefln("> %s", cmd.strip); 78 } 79 __controlChannel.send(cmd); 80 string response = serverResponse(); 81 __responseHistory ~= response; 82 return responseToCode(response); 83 } 84 85 ushort responseToCode(string response) pure const @safe { 86 return to!ushort(response[0..3]); 87 } 88 89 string serverResponse() { 90 string res, buffer; 91 immutable bufferLimit = 16*1024; 92 __controlChannel.so.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, __timeout); 93 scope(exit) { 94 __controlChannel.so.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, 0.seconds); 95 } 96 auto b = new ubyte[__bufferSize]; 97 while ( buffer.length < bufferLimit ) { 98 trace("Wait on control channel"); 99 auto rc = __controlChannel.receive(b); 100 version(Posix) { 101 if ( rc < 0 && errno == EINTR ) { 102 continue; 103 } 104 } 105 tracef("Got %d bytes from control socket", rc); 106 if ( rc <= 0 ) { 107 error("Failed to read response from server"); 108 throw new FTPServerResponseError("Failed to read server responce over control channel: rc=%d, errno: %d".format(rc, errno())); 109 } 110 if ( __verbosity >= 1 ) { 111 (cast(string)b[0..rc]). 112 splitLines. 113 each!(l=>writefln("< %s", l)); 114 } 115 buffer ~= b[0..rc]; 116 if ( buffer.endsWith('\n') ){ 117 auto responseLines = buffer. 118 splitLines. 119 filter!(l => l.length>3 && l[3]==' ' && l[0..3].all!isDigit); 120 if ( responseLines.count > 0 ) { 121 return responseLines.front; 122 } 123 } 124 } 125 throw new FTPServerResponseError("Failed to read server responce over control channel"); 126 assert(0); 127 } 128 129 auto post(R, A...)(string uri, R data, A args) { 130 enforce( uri || __uri.host, "FTP URL undefined"); 131 string response; 132 ushort code; 133 134 __response = new FTPResponse; 135 136 if ( uri ) { 137 // if control channel exists and new URL not match old, then close 138 URI __new = URI(uri); 139 if ( __controlChannel && 140 (__new.host != __uri.host || __new.port != __uri.port || __new.username != __uri.username)) { 141 __controlChannel.close(); 142 __controlChannel = null; 143 } 144 __uri = __new; 145 } 146 147 __response.URI = __uri; 148 __response.finalURI = __uri; 149 150 if ( !__controlChannel ) { 151 __controlChannel = new TCPSocketStream(); 152 __controlChannel.connect(__uri.host, __uri.port, __timeout); 153 response = serverResponse(); 154 __responseHistory ~= response; 155 156 code = responseToCode(response); 157 tracef("Server initial response: %s", response); 158 if ( code/100 > 2 ) { 159 __response.__code = code; 160 return __response; 161 } 162 // Log in 163 string user = __uri.username.length ? __uri.username : "anonymous"; 164 string pass = __uri.password.length ? __uri.password : "requests@"; 165 tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1)); 166 167 code = sendCmdGetResponse("USER " ~ user ~ "\r\n"); 168 if ( code/100 > 3 ) { 169 __response.__code = code; 170 return __response; 171 } else if ( code/100 == 3) { 172 173 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n"); 174 if ( code/100 > 2 ) { 175 __response.__code = code; 176 return __response; 177 } 178 } 179 180 } 181 182 code = sendCmdGetResponse("CWD " ~ dirName(__uri.path) ~ "\r\n"); 183 if ( code/100 > 2 ) { 184 __response.__code = code; 185 return __response; 186 } 187 188 code = sendCmdGetResponse("PASV\r\n"); 189 if ( code/100 > 2 ) { 190 __response.__code = code; 191 return __response; 192 } 193 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected 194 // in last response. 195 // Cut anything between ( and ) 196 auto v = __responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 197 string host; 198 ushort port; 199 try { 200 ubyte a1,a2,a3,a4,p1,p2; 201 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2); 202 host = format("%d.%d.%d.%d", a1, a2, a3, a4); 203 port = (p1<<8) + p2; 204 } catch (FormatException e) { 205 error("Failed to parse ", v); 206 __response.__code = 500; 207 return __response; 208 } 209 210 auto __dataStream = new TCPSocketStream(); 211 scope (exit ) { 212 __dataStream.close(); 213 } 214 215 __dataStream.connect(host, port, __timeout); 216 217 code = sendCmdGetResponse("TYPE I\r\n"); 218 if ( code/100 > 2 ) { 219 __response.__code = code; 220 return __response; 221 } 222 223 code = sendCmdGetResponse("STOR " ~ baseName(__uri.path) ~ "\r\n"); 224 if ( code/100 > 1 ) { 225 __response.__code = code; 226 return __response; 227 } 228 auto b = new ubyte[__bufferSize]; 229 for(size_t pos = 0; pos < data.length;) { 230 auto chunk = data.take(__bufferSize).array; 231 auto rc = __dataStream.send(chunk); 232 if ( rc <= 0 ) { 233 trace("done"); 234 break; 235 } 236 tracef("sent %d bytes to data channel", rc); 237 pos += rc; 238 } 239 __dataStream.close(); 240 response = serverResponse(); 241 code = responseToCode(response); 242 if ( code/100 == 2 ) { 243 tracef("Successfully uploaded %d bytes", __response.__responseBody.length); 244 } 245 __response.__code = code; 246 return __response; 247 } 248 249 auto get(string uri = null) { 250 enforce( uri || __uri.host, "FTP URL undefined"); 251 string response; 252 ushort code; 253 254 __response = new FTPResponse; 255 256 if ( uri ) { 257 // if control channel exists and new URL not match old, then close 258 URI __new = URI(uri); 259 if ( __controlChannel && 260 (__new.host != __uri.host || __new.port != __uri.port || __new.username != __uri.username)) { 261 __controlChannel.close(); 262 __controlChannel = null; 263 } 264 __uri = __new; 265 } 266 267 __response.URI = __uri; 268 __response.finalURI = __uri; 269 270 if ( !__controlChannel ) { 271 __controlChannel = new TCPSocketStream(); 272 __controlChannel.connect(__uri.host, __uri.port, __timeout); 273 response = serverResponse(); 274 __responseHistory ~= response; 275 276 code = responseToCode(response); 277 tracef("Server initial response: %s", response); 278 if ( code/100 > 2 ) { 279 __response.__code = code; 280 return __response; 281 } 282 // Log in 283 string user = __uri.username.length ? __uri.username : "anonymous"; 284 string pass = __uri.password.length ? __uri.password : "requests@"; 285 tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1)); 286 287 code = sendCmdGetResponse("USER " ~ user ~ "\r\n"); 288 if ( code/100 > 3 ) { 289 __response.__code = code; 290 return __response; 291 } else if ( code/100 == 3) { 292 293 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n"); 294 if ( code/100 > 2 ) { 295 __response.__code = code; 296 return __response; 297 } 298 } 299 300 } 301 302 code = sendCmdGetResponse("CWD " ~ dirName(__uri.path) ~ "\r\n"); 303 if ( code/100 > 2 ) { 304 __response.__code = code; 305 return __response; 306 } 307 308 code = sendCmdGetResponse("PASV\r\n"); 309 if ( code/100 > 2 ) { 310 __response.__code = code; 311 return __response; 312 } 313 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected 314 // in last response. 315 // Cut anything between ( and ) 316 auto v = __responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 317 string host; 318 ushort port; 319 try { 320 ubyte a1,a2,a3,a4,p1,p2; 321 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2); 322 host = format("%d.%d.%d.%d", a1, a2, a3, a4); 323 port = (p1<<8) + p2; 324 } catch (FormatException e) { 325 error("Failed to parse ", v); 326 __response.__code = 500; 327 return __response; 328 } 329 330 auto __dataStream = new TCPSocketStream(); 331 scope (exit ) { 332 __dataStream.close(); 333 } 334 335 __dataStream.connect(host, port, __timeout); 336 337 code = sendCmdGetResponse("TYPE I\r\n"); 338 if ( code/100 > 2 ) { 339 __response.__code = code; 340 return __response; 341 } 342 343 code = sendCmdGetResponse("RETR " ~ baseName(__uri.path) ~ "\r\n"); 344 if ( code/100 > 1 ) { 345 __response.__code = code; 346 return __response; 347 } 348 auto b = new ubyte[__bufferSize]; 349 while ( true ) { 350 auto rc = __dataStream.receive(b); 351 if ( rc <= 0 ) { 352 trace("done"); 353 break; 354 } 355 tracef("got %d bytes from data channel", rc); 356 __response.__responseBody.put(b[0..rc]); 357 358 if ( __response.__responseBody.length >= __maxContentLength ) { 359 throw new RequestException("maxContentLength exceeded for ftp data"); 360 } 361 } 362 __dataStream.close(); 363 response = serverResponse(); 364 code = responseToCode(response); 365 if ( code/100 == 2 ) { 366 tracef("Successfully received %d bytes", __response.__responseBody.length); 367 } 368 __response.__code = code; 369 return __response; 370 } 371 } 372 373 unittest { 374 globalLogLevel(LogLevel.info ); 375 info("testing ftp"); 376 auto rq = FTPRequest(); 377 info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT"); 378 auto rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "test, ignore please\n".representation); 379 assert(rs.code == 226); 380 info("ftp get ", "ftp://speedtest.tele2.net/nonexistent", ", in same session."); 381 rs = rq.get("ftp://speedtest.tele2.net/nonexistent"); 382 assert(rs.code != 226); 383 info("ftp get ", "ftp://speedtest.tele2.net/1KB.zip", ", in same session."); 384 rs = rq.get("ftp://speedtest.tele2.net/1KB.zip"); 385 assert(rs.code == 226); 386 assert(rs.responseBody.length == 1024); 387 info("ftp get ", "ftp://ftp.uni-bayreuth.de/README"); 388 rs = rq.get("ftp://ftp.uni-bayreuth.de/README"); 389 assert(rs.code == 226); 390 info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT"); 391 rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "another test, ignore please\n".representation); 392 assert(rs.code == 226); 393 info("ftp get ", "ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 394 rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 395 assert(rs.code == 226); 396 assert(rs.finalURI.path == "/pub/FreeBSD/README.TXT"); 397 info("testing ftp - done."); 398 }