1 module requests.connmanager; 2 3 import std.typecons; 4 import std.datetime; 5 import std.array; 6 import std.algorithm; 7 import std.exception; 8 9 import std.experimental.logger; 10 11 import requests.streams; 12 13 /// 14 /// Keep opened connections for HTTP 15 /// it is cache over tuple(schema, host, port) -> connection 16 /// 17 /// Evict least used 18 /// 19 package class ConnManager { 20 package alias CMKey = Tuple!(string, string, ushort); 21 package struct CMValue { 22 NetworkStream stream; 23 SysTime timestamp; 24 } 25 private { 26 ubyte _limit; 27 CMValue[CMKey] _cache; 28 } 29 this(ubyte limit = 10) { 30 _limit = limit; 31 } 32 ~this() { 33 enforce!Exception(_cache.length == 0, "You must clear connManager before it GC-ed"); 34 } 35 /// 36 /// evict oldest connection 37 /// 38 private CMKey evict() 39 in { assert(_cache.length>0); } 40 do { 41 debug(requests) trace("looking something to evict"); 42 return _cache.byKeyValue().array.sort!"a.value.timestamp < b.value.timestamp".front().key; 43 } 44 /// 45 /// put new stream in cache, evict old stream and return it 46 /// If nothing evicted return null 47 /// 48 NetworkStream put(string schema, string host, ushort port, NetworkStream stream) 49 in { assert(stream !is null);} 50 out{ assert(_cache.length>0);} 51 do { 52 NetworkStream e; 53 auto key = CMKey(schema, host, port); 54 auto value_ptr = key in _cache; 55 56 if ( value_ptr is null ) { 57 CMValue v = {stream: stream, timestamp: Clock.currTime}; 58 if ( _cache.length >= _limit ) { 59 CMKey k = evict(); 60 e = _cache[k].stream; 61 _cache.remove(k); 62 } 63 _cache[key] = v; 64 return e; 65 } 66 auto old_stream = (*value_ptr).stream; 67 if ( old_stream != stream ) { 68 debug(requests) trace("old stream != new stream"); 69 e = old_stream; 70 (*value_ptr).stream = stream; 71 } 72 (*value_ptr).timestamp = Clock.currTime; 73 return e; 74 } 75 76 NetworkStream get(string schema, string host, ushort port) { 77 if ( auto value_ptr = CMKey(schema, host, port) in _cache ) { 78 return (*value_ptr).stream; 79 } 80 return null; 81 } 82 83 NetworkStream del(string schema, string host, ushort port) { 84 NetworkStream s; 85 CMKey key = CMKey(schema, host, port); 86 if ( auto value_ptr = key in _cache ) { 87 s = (*value_ptr).stream; 88 } 89 _cache.remove(key); 90 return s; 91 } 92 93 void clear() 94 out { assert(_cache.length == 0); } 95 do { 96 foreach(k,ref v; _cache) { 97 debug(requests) tracef("Clear ConnManager entry %s", k); 98 try { 99 v.stream.close(); 100 } catch (Exception e) { 101 debug(requests) tracef("%s while clear connmanager", e.msg); 102 } 103 _cache.remove(k); 104 } 105 } 106 107 } 108 109 unittest { 110 globalLogLevel = LogLevel.info; 111 ConnManager cm = new ConnManager(2); 112 auto s0 = new TCPStream(); 113 auto s1 = new TCPStream(); 114 auto s2 = new TCPStream(); 115 116 auto e = cm.put("http", "s0", 1, s0); 117 assert(e is null); 118 assert(cm.get("http", "s0", 1) == s0); 119 120 e = cm.put("http", "s1", 1, s1); 121 assert(e is null); 122 assert(cm.get("http", "s1", 1) == s1); 123 124 e = cm.put("http", "s2", 1, s2); 125 assert(e !is null); 126 assert(cm.get("http", "s2", 1) == s2); 127 assert(e == s0); // oldest 128 e.close(); 129 130 // at this moment we have s1, s2 131 // let try to update s1 132 auto s3 = new TCPStream; 133 e = cm.put("http", "s1", 1, s3); 134 assert(e == s1); 135 e.close(); 136 assert(cm.get("http", "s1", 1) == s3); 137 138 cm.clear(); 139 assert(cm.get("http", "s1", 1) is null); 140 }