1 module requests.connmanager; 2 3 import std.typecons; 4 import std.datetime; 5 import std.array; 6 import std.algorithm; 7 import std.exception; 8 9 import std.experimental.logger; 10 11 import requests.streams; 12 13 /** 14 * Keep opened connections for HTTP. 15 * It is actually cache over tuple(schema, host, port) -> connection 16 * with limited number of items. 17 * 18 * Evict least used. 19 */ 20 package struct ConnManager { 21 package alias CMKey = Tuple!(string, string, ushort); 22 package struct CMValue { 23 NetworkStream stream; 24 SysTime timestamp; 25 } 26 private { 27 int _limit = 10; 28 CMValue[CMKey] _cache; 29 } 30 this(int limit) { 31 _limit = limit; 32 } 33 ~this() { 34 //assert(0); 35 clear(); 36 } 37 @property auto length() { 38 return _cache.length; 39 } 40 /// 41 /// evict oldest connection 42 /// 43 private CMKey evict() 44 in { assert(_cache.length>0); } 45 do { 46 debug(requests) trace("looking something to evict"); 47 return _cache.byKeyValue().array.sort!"a.value.timestamp < b.value.timestamp".front().key; 48 } 49 /// 50 /// put new stream in cache, evict old stream and return it. 51 /// If nothing evicted return null. Returned(evicted) connection can be 52 /// closed. 53 /// 54 NetworkStream put(string schema, string host, ushort port, NetworkStream stream) 55 in { assert(stream !is null);} 56 out{ assert(_cache.length>0);} 57 do { 58 NetworkStream e; 59 auto key = CMKey(schema, host, port); 60 auto value_ptr = key in _cache; 61 62 if ( value_ptr is null ) { 63 CMValue v = {stream: stream, timestamp: Clock.currTime}; 64 if ( _cache.length >= _limit ) { 65 CMKey k = evict(); 66 e = _cache[k].stream; 67 _cache.remove(k); 68 } 69 _cache[key] = v; 70 return e; 71 } 72 auto old_stream = (*value_ptr).stream; 73 if ( old_stream != stream ) { 74 debug(requests) trace("old stream != new stream"); 75 e = old_stream; 76 (*value_ptr).stream = stream; 77 } 78 (*value_ptr).timestamp = Clock.currTime; 79 return e; 80 } 81 /** 82 Lookup connection. 83 */ 84 NetworkStream get(string schema, string host, ushort port) { 85 if ( auto value_ptr = CMKey(schema, host, port) in _cache ) { 86 return (*value_ptr).stream; 87 } 88 return null; 89 } 90 91 /** 92 Remove connection from cache (without close). 93 */ 94 NetworkStream del(string schema, string host, ushort port) { 95 NetworkStream s; 96 CMKey key = CMKey(schema, host, port); 97 if ( auto value_ptr = key in _cache ) { 98 s = (*value_ptr).stream; 99 } 100 _cache.remove(key); 101 return s; 102 } 103 104 /** 105 clear cache (and close connections) 106 */ 107 void clear() 108 out { assert(_cache.length == 0); } 109 do { 110 foreach(k,ref v; _cache) { 111 debug(requests) tracef("Clear ConnManager entry %s", k); 112 try { 113 v.stream.close(); 114 } catch (Exception e) { 115 debug(requests) tracef("%s while clear connmanager", e.msg); 116 } 117 _cache.remove(k); 118 } 119 } 120 121 } 122 123 unittest { 124 globalLogLevel = LogLevel.info; 125 ConnManager cm = ConnManager(2); 126 auto s0 = new TCPStream(); 127 auto s1 = new TCPStream(); 128 auto s2 = new TCPStream(); 129 130 auto e = cm.put("http", "s0", 1, s0); 131 assert(e is null); 132 assert(cm.get("http", "s0", 1) == s0); 133 134 e = cm.put("http", "s1", 1, s1); 135 assert(e is null); 136 assert(cm.get("http", "s1", 1) == s1); 137 138 e = cm.put("http", "s2", 1, s2); 139 assert(e !is null); 140 assert(cm.get("http", "s2", 1) == s2); 141 assert(e == s0); // oldest 142 e.close(); 143 144 // at this moment we have s1, s2 145 // let try to update s1 146 auto s3 = new TCPStream; 147 e = cm.put("http", "s1", 1, s3); 148 assert(e == s1); 149 e.close(); 150 assert(cm.get("http", "s1", 1) == s3); 151 152 cm.clear(); 153 assert(cm.get("http", "s1", 1) is null); 154 }