1 module requests.connmanager; 2 3 import std.typecons; 4 import std.datetime; 5 import std.array; 6 import std.algorithm; 7 import std.exception; 8 import std.format; 9 10 import std.experimental.logger; 11 12 import requests.streams; 13 14 import cachetools.cache; 15 16 /** 17 * Keep opened connections for HTTP. 18 * It is actually cache over tuple(schema, host, port) -> connection 19 * with limited number of items. 20 * 21 * Evict least used. 22 */ 23 24 package struct ConnManager { 25 package alias CMKey = Tuple!(string, string, ushort); 26 package class CMValue { 27 this(NetworkStream s) { 28 stream = s; 29 } 30 NetworkStream stream; 31 string toString() inout { 32 return "%s".format(stream); 33 } 34 } 35 private { 36 CMValue[][CMKey] map; 37 int capacity; 38 int counter; 39 } 40 this(int c) { 41 capacity = c; 42 } 43 void clear() 44 { 45 foreach(v;map.byValue) 46 { 47 foreach(s; v) { 48 s.stream.close(); 49 } 50 } 51 map.clear; 52 counter = 0; 53 } 54 void put(string schema, string host, ushort port, NetworkStream stream) 55 in 56 { 57 assert(stream !is null); 58 assert(stream.isConnected); 59 } 60 do 61 { 62 debug(requests) { 63 tracef("Place connection to pool: %s://%s:%s", schema, host, port); 64 } 65 CMKey key = CMKey(schema, host, port); 66 auto c = key in map; 67 if ( c ) { 68 (*c) ~= new CMValue(stream); 69 } else { 70 map[key] = [new CMValue(stream)]; 71 } 72 counter++; 73 if ( counter > capacity ) { 74 auto k = map.byKey.front; 75 debug(requests) { 76 tracef("remove random key %s", k); 77 } 78 auto streams = map[k]; 79 auto s = streams[0].stream; 80 counter--; 81 map[k] = streams[1..$]; 82 if (map[k].length == 0) { 83 map.remove(k); 84 } 85 s.close(); 86 } 87 } 88 89 NetworkStream get(string schema, string host, ushort port) 90 do 91 { 92 CMKey key = CMKey(schema, host, port); 93 auto c = key in map; 94 if (c) { 95 // trim and return last 96 auto v = *c; 97 auto s = v[$-1].stream; 98 v = v[0..$-1]; 99 counter--; 100 debug (requests) { 101 tracef("Get connection from pool: %s://%s:%s - %s", schema, host, port, s); 102 } 103 if ( v.length == 0 ) { 104 map.remove(key); 105 } 106 return s; 107 } 108 else { 109 return null; 110 } 111 } 112 113 // void del(string schema, string host, ushort port, NetworkStream str) 114 // { 115 // CMKey key = CMKey(schema, host, port); 116 // auto c = key in map; 117 // if ( c ) { 118 // immutable index = (*c).countUntil!(a => a.stream == str); 119 // if ( index > -1 ) { 120 // (*c).remove(index); 121 // } 122 // } 123 // } 124 } 125 126 package struct ConnManager1 { 127 package alias CMKey = Tuple!(string, string, ushort); 128 package class CMValue { 129 NetworkStream stream; 130 bool in_use; 131 string toString() inout { 132 return "%s:%s".format(in_use, stream); 133 } 134 } 135 private { 136 CacheLRU!(CMKey, CMValue[]) __cache; 137 } 138 this(int limit) { 139 __cache = new CacheLRU!(CMKey, CMValue[]); 140 __cache.size = limit; 141 __cache.enableCacheEvents(); 142 } 143 ~this() { 144 clear(); 145 } 146 @property auto length() { 147 return __cache.length; 148 } 149 /// 150 /// put new stream in cache, evict old stream and return it. 151 /// If nothing evicted return null. Returned(evicted) connection can be 152 /// closed. 153 /// 154 NetworkStream put(string schema, string host, ushort port, NetworkStream stream) 155 in { assert(stream !is null);} 156 out{ assert(__cache.length>0);} 157 do { 158 import std.stdio; 159 CMKey key = CMKey(schema, host, port); 160 CMValue new_value = new CMValue; 161 new_value.stream = stream; 162 new_value.in_use = true; 163 164 CMValue[] old_values; 165 auto cs = __cache.get(key); 166 if ( !cs.isNull ) { 167 old_values = cs.get(); 168 } 169 __cache.put(key, old_values ~ new_value); 170 171 writefln("new: %s", old_values ~ new_value); 172 173 auto cacheEvents = __cache.cacheEvents(); 174 foreach(e; cacheEvents) { 175 // if ( e.event == EventType.Evicted ) { 176 177 // } else if (e.event == EventType.Updated) { 178 179 // } 180 //writeln(e); 181 } 182 return null; 183 // switch( cacheEvents.length ) 184 // { 185 // case 0: 186 // return null; 187 // case 1: 188 // old_values = cacheEvents.front.val; 189 // foreach(s; old_values) { 190 // s.stream.close(); 191 // } 192 // return null; 193 // default: 194 // assert(0); 195 // } 196 } 197 /** 198 Lookup connection. 199 */ 200 NetworkStream get(string schema, string host, ushort port) 201 do 202 { 203 if ( __cache is null ) return null; 204 auto v = __cache.get(CMKey(schema, host, port)); 205 if ( ! v.isNull() ) 206 { 207 auto streams = v.get; 208 foreach(s; streams) { 209 if ( !s.in_use ) { 210 s.in_use = true; 211 return s.stream; 212 } 213 } 214 } 215 return null; 216 } 217 218 /** 219 Remove connection from cache (without close). 220 */ 221 void del(string schema, string host, ushort port, NetworkStream str) { 222 NetworkStream s; 223 CMKey key = CMKey(schema, host, port); 224 auto v = __cache.get(key); 225 if ( v.isNull ) { 226 return; 227 } 228 auto streams = v.get(); 229 int index; 230 do { 231 if ( streams[index].stream is str) { 232 break; 233 } 234 index++; 235 } while(index < streams.length); 236 237 if ( index < streams.length ) { 238 streams.remove(index); 239 } 240 241 if ( streams.length == 0 ) { 242 __cache.remove(key); 243 } else { 244 __cache.put(key, streams); 245 } 246 } 247 248 /** 249 clear cache (and close connections) 250 */ 251 void clear() 252 out { assert(__cache is null || __cache.length == 0); } 253 do { 254 if ( __cache is null ) return; 255 256 __cache.clear(); 257 foreach(e; __cache.cacheEvents ) 258 { 259 try 260 { 261 foreach(v; e.val) { 262 v.stream.close(); 263 } 264 } 265 catch(Exception e) 266 { 267 debug(requests) tracef("%s while clear connmanager", e.msg); 268 } 269 } 270 __cache = null; 271 } 272 } 273 274 // unittest { 275 // globalLogLevel = LogLevel.info; 276 // ConnManager cm = ConnManager(2); 277 // auto s0 = new TCPStream(); 278 // auto s1 = new TCPStream(); 279 // auto s2 = new TCPStream(); 280 281 // auto e = cm.put("http", "s0", 1, s0); 282 // assert(e is null); 283 // assert(cm.get("http", "s0", 1) == s0); 284 285 // e = cm.put("http", "s1", 1, s1); 286 // assert(e is null); 287 // assert(cm.get("http", "s1", 1) == s1); 288 289 // e = cm.put("http", "s2", 1, s2); 290 // // assert(e !is null); 291 // assert(cm.get("http", "s2", 1) == s2); 292 // // assert(e == s0); // oldest 293 // // e.close(); 294 295 // // at this moment we have s1, s2 296 // // let try to update s1 297 // auto s3 = new TCPStream; 298 // e = cm.put("http", "s1", 1, s3); 299 // // assert(e == s1); 300 // // e.close(); 301 // assert(cm.get("http", "s1", 1) == s3); 302 303 // cm.clear(); 304 // assert(cm.get("http", "s1", 1) is null); 305 // }