1 module requests.connmanager;
2 
3 import std.typecons;
4 import std.datetime;
5 import std.array;
6 import std.algorithm;
7 import std.exception;
8 
9 import std.experimental.logger;
10 
11 import requests.streams;
12 
13 import cachetools.cache;
14 
15 /**
16  * Keep opened connections for HTTP.
17  * It is actually cache over tuple(schema, host, port) -> connection
18  * with limited number of items.
19  *
20  * Evict least used.
21 */
22 package struct ConnManager {
23     package alias  CMKey = Tuple!(string, string, ushort);
24     package struct CMValue {
25         NetworkStream   stream;
26         SysTime         timestamp;
27     }
28     private {
29         CacheLRU!(CMKey, CMValue) __cache;
30     }
31     this(int limit) {
32         __cache = new CacheLRU!(CMKey, CMValue);
33         __cache.size = limit;
34         __cache.enableCacheEvents();
35     }
36     ~this() {
37         clear();
38     }
39     @property auto length() {
40         return __cache.length;
41     }
42     ///
43     /// put new stream in cache, evict old stream and return it.
44     /// If nothing evicted return null. Returned(evicted) connection can be
45     /// closed.
46     ///
47     NetworkStream put(string schema, string host, ushort port, NetworkStream stream)
48     in { assert(stream !is null);}
49     out{ assert(__cache.length>0);}
50     do {
51         NetworkStream e;
52         CMKey     key = CMKey(schema, host, port);
53         CMValue value = {stream: stream, timestamp: Clock.currTime};
54         __cache.put(key, value);
55         auto cacheEvents = __cache.cacheEvents();
56         switch( cacheEvents.length )
57         {
58             case 0:
59                 return null;
60             case 1:
61                 return cacheEvents.front.val.stream;
62             default:
63                 assert(0);
64         }
65     }
66     /**
67         Lookup connection.
68      */
69     NetworkStream get(string schema, string host, ushort port)
70     do
71     {
72         if ( __cache is null ) return null;
73         auto v = __cache.get(CMKey(schema, host, port));
74         if ( ! v.isNull() )
75         {
76             return v.get.stream;
77         }
78         return null;
79     }
80 
81     /**
82         Remove connection from cache (without close).
83      */
84     NetworkStream del(string schema, string host, ushort port) {
85         NetworkStream s;
86         CMKey key = CMKey(schema, host, port);
87         __cache.remove(key);
88         auto cacheEvents = __cache.cacheEvents();
89         switch( cacheEvents.length )
90         {
91             case 0:
92                 return null;
93             case 1:
94                 return cacheEvents.front.val.stream;
95             default:
96                 assert(0);
97         }
98     }
99 
100     /**
101         clear cache (and close connections)
102      */
103     void clear()
104     out { assert(__cache is null || __cache.length == 0); }
105     do  {
106         if ( __cache is null ) return;
107 
108         __cache.clear();
109         foreach(e; __cache.cacheEvents )
110         {
111             try
112             {
113                 e.val.stream.close();
114             }
115             catch(Exception e)
116             {
117                 debug(requests) tracef("%s while clear connmanager", e.msg);
118             }
119         }
120         __cache = null;
121     }
122 }
123 
124 unittest {
125     globalLogLevel = LogLevel.info;
126     ConnManager cm = ConnManager(2);
127     auto s0 = new TCPStream();
128     auto s1 = new TCPStream();
129     auto s2 = new TCPStream();
130 
131     auto e = cm.put("http", "s0", 1, s0);
132     assert(e is null);
133     assert(cm.get("http", "s0", 1) == s0);
134 
135     e = cm.put("http", "s1", 1, s1);
136     assert(e is null);
137     assert(cm.get("http", "s1", 1) == s1);
138 
139     e = cm.put("http", "s2", 1, s2);
140     assert(e !is null);
141     assert(cm.get("http", "s2", 1) == s2);
142     assert(e == s0); // oldest
143     e.close();
144 
145     // at this moment we have s1, s2
146     // let try to update s1
147     auto s3 = new TCPStream;
148     e = cm.put("http", "s1", 1, s3);
149     assert(e == s1);
150     e.close();
151     assert(cm.get("http", "s1", 1) == s3);
152 
153     cm.clear();
154     assert(cm.get("http", "s1", 1) is null);
155 }