1 module requests.streams; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.conv; 7 import std.experimental.logger; 8 import std.exception; 9 import std.format; 10 import std.range; 11 import std.range.primitives; 12 import std.string; 13 import std.stdio; 14 import std.traits; 15 import std.zlib; 16 17 18 class DecodingExceptioin: Exception { 19 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure { 20 super(msg, file, line); 21 } 22 } 23 /** 24 * DataPipeIface can accept some data, process, and return processed data. 25 */ 26 public interface DataPipeIface(E) { 27 /// Is there any processed data ready for reading? 28 bool empty(); 29 /// Put next data portion for processing 30 void put(E[]); 31 /// Get any ready data 32 E[] get(); 33 /// Signal on end of incoming data stream. 34 void flush(); 35 } 36 /** 37 * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line. 38 * This class used to combine different Transfer- and Content- encodings. For example: unchunk chunked transfer-encoding, 39 * and uncompress compressed Content-Encoding. 40 */ 41 public class DataPipe(E) : DataPipeIface!E { 42 43 DataPipeIface!(E)[] pipe; 44 Buffer!E buffer; 45 /// Append data processor to pipeline 46 /// Params: 47 /// p = processor 48 void insert(DataPipeIface!E p) { 49 pipe ~= p; 50 } 51 E[][] process(DataPipeIface!E p, E[][] data) { 52 E[][] result; 53 data.each!(e => p.put(e)); 54 while(!p.empty()) result ~= p.get(); 55 return result; 56 } 57 /// Process next data portion. Data passed over pipeline and result stored in buffer. 58 /// Params: 59 /// data = input data array. 60 void put(E[] data) { 61 if ( pipe.empty ) { 62 buffer.put(data); 63 return; 64 } 65 auto t = process(pipe.front, [data]); 66 foreach(ref p; pipe[1..$]) { 67 t = process(p, t); 68 } 69 t.each!(b => buffer.put(b)); 70 } 71 /// Process next data portion. Data passed over pipeline and store result in buffer. 72 /// Params: 73 /// buff = input data buffer. 74 void put(Buffer!E buff) { 75 if ( pipe.empty ) { 76 if ( buffer.__repr is null ) { 77 buffer = buff; 78 return; 79 } 80 buffer.__repr.__buffer ~= buff.__repr.__buffer; 81 buffer.__repr.__length += buff.length; 82 return; 83 } 84 auto t = process(pipe.front, buff.__repr.__buffer); 85 foreach(ref p; pipe[1..$]) { 86 t = process(p, t); 87 } 88 t.each!(b => buffer.put(b)); 89 } 90 /// Get what was collected in internal buffer and clear it. 91 /// Returns: 92 /// data collected. 93 E[] get() pure { 94 if ( buffer.empty ) { 95 return E[].init; 96 } 97 auto res = buffer.data; 98 buffer = Buffer!E.init; 99 return res; 100 } 101 /// Test if internal buffer is empty 102 /// Returns: 103 /// true if internal buffer is empty (nothing to get()) 104 bool empty() pure const @safe { 105 return buffer.empty; 106 } 107 void flush() { 108 E[][] product; 109 foreach(ref p; pipe) { 110 product.each!(e => p.put(e)); 111 p.flush(); 112 product.length = 0; 113 while( !p.empty ) product ~= p.get(); 114 } 115 product.each!(b => buffer.put(b)); 116 } 117 } 118 119 /** 120 * Processor for gzipped/compressed content. 121 * Also support InputRange interface. 122 */ 123 public class Decompressor(E) : DataPipeIface!E { 124 private { 125 Buffer!ubyte __buff; 126 UnCompress __zlib; 127 } 128 this() { 129 __buff = Buffer!ubyte(); 130 __zlib = new UnCompress(); 131 } 132 // this(E[] r) { 133 // //__range = r; 134 // __buff = Buffer!ubyte(); 135 // __zlib = new UnCompress(); 136 // auto l = r.length; 137 // if ( l ) { 138 // __buff.put(__zlib.uncompress(r.take(l))); 139 // } 140 // } 141 override void put(E[] data) { 142 if ( __zlib is null ) { 143 __zlib = new UnCompress(); 144 } 145 __buff.put(__zlib.uncompress(data)); 146 } 147 override E[] get() pure { 148 assert(__buff.length); 149 auto r = __buff.__repr.__buffer[0]; 150 __buff.popFrontN(r.length); 151 return cast(E[])r; 152 } 153 override void flush() { 154 if ( __zlib is null ) { 155 return; 156 } 157 __buff.put(__zlib.flush()); 158 } 159 override @property bool empty() const pure @safe { 160 debug tracef("empty=%b", __buff.empty); 161 return __buff.empty; 162 } 163 @property auto ref front() pure const @safe { 164 debug tracef("front: buff length=%d", __buff.length); 165 return __buff.front; 166 } 167 @property auto popFront() pure @safe { 168 debug tracef("popFront: buff length=%d", __buff.length); 169 return __buff.popFront; 170 } 171 @property void popFrontN(size_t n) pure @safe { 172 __buff.popFrontN(n); 173 } 174 } 175 176 /** 177 * Unchunk chunked http responce body. 178 */ 179 public class DecodeChunked : DataPipeIface!ubyte { 180 // length := 0 181 // read chunk-size, chunk-extension (if any) and CRLF 182 // while (chunk-size > 0) { 183 // read chunk-data and CRLF 184 // append chunk-data to entity-body 185 // length := length + chunk-size 186 // read chunk-size and CRLF 187 // } 188 // read entity-header 189 // while (entity-header not empty) { 190 // append entity-header to existing header fields 191 // read entity-header 192 // } 193 // Content-Length := length 194 // Remove "chunked" from Transfer-Encoding 195 // 196 197 // Chunked-Body = *chunk 198 // last-chunk 199 // trailer 200 // CRLF 201 // 202 // chunk = chunk-size [ chunk-extension ] CRLF 203 // chunk-data CRLF 204 // chunk-size = 1*HEX 205 // last-chunk = 1*("0") [ chunk-extension ] CRLF 206 // 207 // chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] ) 208 // chunk-ext-name = token 209 // chunk-ext-val = token | quoted-string 210 // chunk-data = chunk-size(OCTET) 211 // trailer = *(entity-header CRLF) 212 213 alias eType = ubyte; 214 immutable eType[] CRLF = ['\r', '\n']; 215 private { 216 enum States {huntingSize, huntingSeparator, receiving, trailer}; 217 char state = States.huntingSize; 218 size_t chunk_size, to_receive; 219 Buffer!ubyte buff; 220 ubyte[] linebuff; 221 } 222 void put(eType[] data) { 223 while ( data.length ) { 224 if ( state == States.trailer ) { 225 return; 226 } 227 if ( state == States.huntingSize ) { 228 linebuff ~= data; 229 data.length = 0; 230 auto s = linebuff.findSplit(CRLF); 231 if ( !s[1].length ) { 232 if ( linebuff.length >= 80 ) { 233 throw new DecodingExceptioin("Can't find chunk size in the body"); 234 } 235 continue; 236 } 237 tracef("Got chunk size line %s", s[0]); 238 string x = castFrom!(ubyte[]).to!string(s[0]); 239 formattedRead(x, "%x", &chunk_size); 240 tracef("Got chunk size %s", chunk_size); 241 state = States.receiving; 242 to_receive = chunk_size; 243 data = s[2]; 244 if ( chunk_size == 0 ) { 245 state = States.trailer; 246 tracef("Unchunk completed"); 247 return; 248 } 249 continue; 250 } 251 if ( state == States.receiving ) { 252 if (to_receive > 0 ) { 253 auto can_store = min(to_receive, data.length); 254 buff.put(data[0..can_store]); 255 data = data[can_store..$]; 256 to_receive -= can_store; 257 tracef("Unchunked %d bytes from %d", can_store, chunk_size); 258 if ( to_receive == 0 ) { 259 tracef("switch to huntig separator"); 260 state = States.huntingSeparator; 261 to_receive = 2; 262 linebuff.length = 0; 263 continue; 264 } 265 continue; 266 } 267 assert(false); 268 } 269 if ( state == States.huntingSeparator ) { 270 linebuff ~= data; 271 data.length = 0; 272 auto s = linebuff.findSplit(CRLF); 273 if ( s[1].length ) { 274 data = s[2]; 275 chunk_size = 0; 276 linebuff.length = 0; 277 state = States.huntingSize; 278 tracef("switch to huntig size"); 279 continue; 280 } 281 } 282 } 283 } 284 eType[] get() { 285 auto r = buff.__repr.__buffer[0]; 286 buff.popFrontN(r.length); 287 return r; 288 } 289 void flush() { 290 } 291 bool empty() { 292 debug tracef("empty=%b", buff.empty); 293 return buff.empty; 294 } 295 bool done() { 296 return state==States.trailer; 297 } 298 } 299 300 unittest { 301 info("Testing DataPipe"); 302 globalLogLevel(LogLevel.info); 303 alias eType = char; 304 eType[] gzipped = [ 305 0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56, 306 0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49, 307 0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88, 308 0x08, 0x00, 0x00, 0x00 309 ]; // "abc\ndef\n" 310 auto d = new Decompressor!eType(); 311 d.put(gzipped[0..2]); 312 d.put(gzipped[2..10]); 313 d.put(gzipped[10..$]); 314 d.flush(); 315 assert(equal(d.filter!(a => a!='b'), "ac\ndef\n")); 316 317 auto e = new Decompressor!eType(); 318 e.put(gzipped[0..10]); 319 e.put(gzipped[10..$]); 320 e.flush(); 321 assert(equal(e.filter!(a => a!='b'), "ac\ndef\n")); 322 // writeln(gzipped.decompress.filter!(a => a!='b').array); 323 auto dp = new DataPipe!eType; 324 dp.insert(new Decompressor!eType()); 325 dp.put(gzipped[0..2]); 326 dp.put(gzipped[2..$]); 327 dp.flush(); 328 assert(equal(dp.get(), "abc\ndef\n")); 329 // empty datapipe shoul just pass input to output 330 auto dpu = new DataPipe!ubyte; 331 dpu.put("abcd".dup.representation); 332 dpu.put("efgh".dup.representation); 333 dpu.flush(); 334 assert(equal(dpu.get(), "abcdefgh")); 335 info("Testing DataPipe - done"); 336 } 337 /** 338 * Buffer used to collect and process data from network. It remainds Appender, but support 339 * also Range interface. 340 * $(P To place data in buffer use put() method.) 341 * $(P To retrieve data from buffer you can use several methods:) 342 * $(UL 343 * $(LI Range methods: front, back, index []) 344 * $(LI data method: return collected data (like Appender.data)) 345 * ) 346 */ 347 public struct Buffer(T) { 348 private { 349 class Repr { 350 size_t __length; 351 Unqual!T[][] __buffer; 352 this() { 353 __length = 0; 354 } 355 this(Repr other) { 356 if ( other is null ) 357 return; 358 __length = other.__length; 359 __buffer = other.__buffer.dup; 360 } 361 } 362 Repr __repr; 363 } 364 365 alias toString = data!string; 366 367 this(this) { 368 __repr = new Repr(__repr); 369 } 370 this(U)(U[] data) pure { 371 put(data); 372 } 373 ~this() { 374 __repr = null; 375 } 376 auto put(U)(U[] data) pure { 377 if ( data.length == 0 ) { 378 return this; 379 } 380 if ( !__repr ) { 381 __repr = new Repr; 382 } 383 debug tracef("Append %d bytes", data.length); 384 static if (!is(U == T)) { 385 auto d = castFrom!(U[]).to!(T[])(data); 386 __repr.__length += d.length; 387 __repr.__buffer ~= d.dup; 388 } else { 389 __repr.__length += data.length; 390 __repr.__buffer ~= data.dup; 391 } 392 return this; 393 } 394 @property auto opDollar() const pure @safe { 395 return __repr.__length; 396 } 397 @property auto length() const pure @safe { 398 if ( !__repr ) { 399 return 0; 400 } 401 return __repr.__length; 402 } 403 @property auto empty() const pure @safe { 404 return length == 0; 405 } 406 @property auto ref front() const pure @safe { 407 assert(length); 408 return __repr.__buffer.front.front; 409 } 410 @property auto ref back() const pure @safe { 411 assert(length); 412 return __repr.__buffer.back.back; 413 } 414 @property void popFront() pure @safe { 415 assert(length); 416 with ( __repr ) { 417 __buffer.front.popFront; 418 if ( __buffer.front.length == 0 ) { 419 __buffer.popFront; 420 } 421 __length--; 422 } 423 } 424 @property void popFrontN(size_t n) pure @safe { 425 assert(n <= length, "lengnt: %d, n=%d".format(length, n)); 426 __repr.__length -= n; 427 while( n ) { 428 if ( n <= __repr.__buffer.front.length ) { 429 __repr.__buffer.front.popFrontN(n); 430 if ( __repr.__buffer.front.length == 0 ) { 431 __repr.__buffer.popFront; 432 } 433 return; 434 } 435 n -= __repr.__buffer.front.length; 436 __repr.__buffer.popFront; 437 } 438 } 439 @property void popBack() pure @safe { 440 assert(length); 441 __repr.__buffer.back.popBack; 442 if ( __repr.__buffer.back.length == 0 ) { 443 __repr.__buffer.popBack; 444 } 445 __repr.__length--; 446 } 447 @property void popBackN(size_t n) pure @safe { 448 assert(n <= length); 449 __repr.__length -= n; 450 while( n ) { 451 if ( n <= __repr.__buffer.back.length ) { 452 __repr.__buffer.back.popBackN(n); 453 if ( __repr.__buffer.back.length == 0 ) { 454 __repr.__buffer.popBack; 455 } 456 return; 457 } 458 n -= __repr.__buffer.back.length; 459 __repr.__buffer.popBack; 460 } 461 } 462 @property auto save() pure @safe { 463 auto n = Buffer!T(); 464 n.__repr = new Repr(__repr); 465 return n; 466 } 467 @property auto ref opIndex(size_t n) const pure @safe { 468 assert( __repr && n < __repr.__length ); 469 foreach(b; __repr.__buffer) { 470 if ( n < b.length ) { 471 return b[n]; 472 } 473 n -= b.length; 474 } 475 assert(false, "Impossible"); 476 } 477 Buffer!T opSlice(size_t m, size_t n) { 478 assert( m <= n && n <= __repr.__length); 479 auto res = Buffer!T(); 480 if ( m == n ) { 481 res.__repr = new Repr; 482 return res; 483 } 484 res.__repr = new Repr(this.__repr); 485 res.popBackN(res.length-n); 486 res.popFrontN(m); 487 return res; 488 } 489 // ptrdiff_t countUntil(in T[] needle) const pure @safe { 490 // ptrdiff_t haystackpos, needlepos; 491 // while(haystackpos < length) { 492 // if ( opIndex(haystackpos) == needle[needlepos] ) { 493 // 494 // return haystackpos; 495 // } else { 496 // needlepos = 0; 497 // haystackpos++; 498 // } 499 // } 500 // return -1; 501 // } 502 @property auto data(U=T[])() const pure { 503 Appender!(T[]) a; 504 if ( __repr && __repr.__buffer ) { 505 foreach(ref b; __repr.__buffer) { 506 a.put(b); 507 } 508 } 509 static if ( is(U==T[]) ) { 510 return a.data; 511 } else { 512 return castFrom!(T[]).to!U(a.data); 513 } 514 } 515 string opCast(string)() { 516 return this.toString; 517 } 518 bool opEquals(U)(U x) { 519 return cast(U)this == x; 520 } 521 } 522 /// 523 public unittest { 524 525 static assert(isInputRange!(Buffer!ubyte)); 526 static assert(isForwardRange!(Buffer!ubyte)); 527 static assert(hasLength!(Buffer!ubyte)); 528 static assert(hasSlicing!(Buffer!ubyte)); 529 static assert(isBidirectionalRange!(Buffer!ubyte)); 530 static assert(isRandomAccessRange!(Buffer!ubyte)); 531 532 auto b = Buffer!ubyte(); 533 b.put("abc".representation.dup); 534 b.put("def".representation.dup); 535 assert(b.length == 6); 536 assert(b.toString == "abcdef"); 537 assert(b.front == 'a'); 538 assert(b.back == 'f'); 539 assert(equal(b[0..$], "abcdef")); 540 assert(equal(b[$-2..$], "ef")); 541 assert(b == "abcdef"); 542 b.popFront; 543 b.popBack; 544 assert(b.front == 'b'); 545 assert(b.back == 'e'); 546 assert(b.length == 4); 547 assert(retro(b).front == 'e'); 548 assert(countUntil(b, 'e') == 3); 549 assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c' 550 assert(equal(b, "bcde")); 551 b.popFront; b.popFront; 552 assert(b.front == 'd'); 553 assert(b.front == b[0]); 554 assert(b.back == b[$-1]); 555 556 auto c = Buffer!ubyte(); 557 c.put("Header0: value0\n".representation.dup); 558 c.put("Header1: value1\n".representation.dup); 559 c.put("Header2: value2\n\nbody".representation.dup); 560 auto c_length = c.length; 561 auto eoh = countUntil(c, "\n\n"); 562 assert(eoh == 47); 563 foreach(header; c[0..eoh].splitter('\n') ) { 564 writeln(castFrom!(ubyte[]).to!(string)(header.data)); 565 } 566 assert(equal(findSplit(c, "\n\n")[2], "body")); 567 assert(c.length == c_length); 568 }