class Sequel::ThreadedConnectionPool

A connection pool allowing multi-threaded access to a pool of connections. This is the default connection pool used by Sequel.

Constants

USE_WAITER

Attributes

allocated[R]

A hash with thread/fiber keys and connection values for currently allocated connections. The calling code should already have the mutex before calling this.

available_connections[R]

An array of connections that are available for use by the pool. The calling code should already have the mutex before calling this.

max_size[R]

The maximum number of connections this pool will create (per shard/server if sharding).

Public Class Methods

new(db, opts = OPTS) click to toggle source

The following additional options are respected:

:max_connections

The maximum number of connections the connection pool will open (default 4)

:pool_timeout

The amount of seconds to wait to acquire a connection before raising a PoolTimeout error (default 5)

Calls superclass method Sequel::ConnectionPool::new
   # File lib/sequel/connection_pool/threaded.rb
26 def initialize(db, opts = OPTS)
27   super
28   @max_size = Integer(opts[:max_connections] || 4)
29   raise(Sequel::Error, ':max_connections must be positive') if @max_size < 1
30   @mutex = Mutex.new  
31   @connection_handling = opts[:connection_handling]
32   @available_connections = []
33   @allocated = {}
34   @allocated.compare_by_identity
35   @timeout = Float(opts[:pool_timeout] || 5)
36   @waiter = ConditionVariable.new
37 end

Public Instance Methods

all_connections() { |c| ... } click to toggle source

Yield all of the available connections, and the one currently allocated to this thread. This will not yield connections currently allocated to other threads, as it is not safe to operate on them. This holds the mutex while it is yielding all of the available connections, which means that until the method’s block returns, the pool is locked.

   # File lib/sequel/connection_pool/threaded.rb
44 def all_connections
45   hold do |c|
46     sync do
47       yield c
48       @available_connections.each{|conn| yield conn}
49     end
50   end
51 end
disconnect(opts=OPTS) click to toggle source

Removes all connections currently available. This method has the effect of disconnecting from the database, assuming that no connections are currently being used. If you want to be able to disconnect connections that are currently in use, use the ShardedThreadedConnectionPool, which can do that. This connection pool does not, for performance reasons. To use the sharded pool, pass the servers: {} option when connecting to the database.

Once a connection is requested using hold, the connection pool creates new connections to the database.

   # File lib/sequel/connection_pool/threaded.rb
62 def disconnect(opts=OPTS)
63   conns = nil
64   sync do
65     conns = @available_connections.dup
66     @available_connections.clear
67     @waiter.signal
68   end
69   conns.each{|conn| disconnect_connection(conn)}
70 end
hold(server=nil) { |conn| ... } click to toggle source

Chooses the first available connection, or if none are available, creates a new connection. Passes the connection to the supplied block:

pool.hold {|conn| conn.execute('DROP TABLE posts')}

Pool#hold is re-entrant, meaning it can be called recursively in the same thread without blocking.

If no connection is immediately available and the pool is already using the maximum number of connections, Pool#hold will block until a connection is available or the timeout expires. If the timeout expires before a connection can be acquired, a Sequel::PoolTimeout is raised.

    # File lib/sequel/connection_pool/threaded.rb
 85 def hold(server=nil)
 86   t = Sequel.current
 87   if conn = owned_connection(t)
 88     return yield(conn)
 89   end
 90   begin
 91     conn = acquire(t)
 92     yield conn
 93   rescue Sequel::DatabaseDisconnectError, *@error_classes => e
 94     if disconnect_error?(e)
 95       oconn = conn
 96       conn = nil
 97       disconnect_connection(oconn) if oconn
 98       sync do 
 99         @allocated.delete(t)
100         @waiter.signal
101       end
102     end
103     raise
104   ensure
105     if conn
106       sync{release(t)}
107       if @connection_handling == :disconnect
108         disconnect_connection(conn)
109       end
110     end
111   end
112 end
pool_type() click to toggle source
    # File lib/sequel/connection_pool/threaded.rb
114 def pool_type
115   :threaded
116 end
size() click to toggle source

The total number of connections opened, either available or allocated. The calling code should not have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
120 def size
121   @mutex.synchronize{_size}
122 end

Private Instance Methods

_size() click to toggle source

The total number of connections opened, either available or allocated. The calling code should already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
128 def _size
129   @allocated.length + @available_connections.length
130 end
acquire(thread) click to toggle source

Assigns a connection to the supplied thread, if one is available. The calling code should NOT already have the mutex when calling this.

This should return a connection is one is available within the timeout, or raise PoolTimeout if a connection could not be acquired within the timeout.

    # File lib/sequel/connection_pool/threaded.rb
138 def acquire(thread)
139   if conn = assign_connection(thread)
140     return conn
141   end
142 
143   timeout = @timeout
144   timer = Sequel.start_timer
145 
146   if conn = acquire_available(thread, timeout)
147     return conn
148   end
149 
150   until conn = assign_connection(thread)
151     elapsed = Sequel.elapsed_seconds_since(timer)
152     # :nocov:
153     raise_pool_timeout(elapsed) if elapsed > timeout
154 
155     # It's difficult to get to this point, it can only happen if there is a race condition
156     # where a connection cannot be acquired even after the thread is signalled by the condition variable
157     if conn = acquire_available(thread, timeout - elapsed)
158       return conn
159     end
160     # :nocov:
161   end
162 
163   conn
164 end
acquire_available(thread, timeout) click to toggle source

Acquire a connection if one is already available, or waiting until it becomes available.

    # File lib/sequel/connection_pool/threaded.rb
167 def acquire_available(thread, timeout)
168   sync do
169     # Check if connection was checked in between when assign_connection failed and now.
170     # This is very unlikely, but necessary to prevent a situation where the waiter
171     # will wait for a connection even though one has already been checked in.
172     # :nocov:
173     if conn = next_available
174       return(@allocated[thread] = conn)
175     end
176     # :nocov:
177 
178     @waiter.wait(@mutex, timeout)
179 
180     # Connection still not available, could be because a connection was disconnected,
181     # may have to retry assign_connection to see if a new connection can be made.
182     if conn = next_available
183       return(@allocated[thread] = conn)
184     end
185   end
186 end
assign_connection(thread) click to toggle source

Assign a connection to the thread, or return nil if one cannot be assigned. The caller should NOT have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
190 def assign_connection(thread)
191   # Thread safe as instance variable is only assigned to local variable
192   # and not operated on outside mutex.
193   allocated = @allocated
194   do_make_new = false
195   to_disconnect = nil
196 
197   sync do
198     if conn = next_available
199       return(allocated[thread] = conn)
200     end
201 
202     if (n = _size) >= (max = @max_size)
203       allocated.keys.each do |t|
204         unless t.alive?
205           (to_disconnect ||= []) << allocated.delete(t)
206         end
207       end
208       n = nil
209     end
210 
211     if (n || _size) < max
212       do_make_new = allocated[thread] = true
213     end
214   end
215 
216   if to_disconnect
217     to_disconnect.each{|dconn| disconnect_connection(dconn)}
218   end
219 
220   # Connect to the database outside of the connection pool mutex,
221   # as that can take a long time and the connection pool mutex
222   # shouldn't be locked while the connection takes place.
223   if do_make_new
224     begin
225       conn = make_new(:default)
226       sync{allocated[thread] = conn}
227     ensure
228       unless conn
229         sync{allocated.delete(thread)}
230       end
231     end
232   end
233 
234   conn
235 end
checkin_connection(conn) click to toggle source

Return a connection to the pool of available connections, returns the connection. The calling code should already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
239 def checkin_connection(conn)
240   @available_connections << conn
241   conn
242 end
next_available() click to toggle source

Return the next available connection in the pool, or nil if there is not currently an available connection. The calling code should already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
247 def next_available
248   case @connection_handling
249   when :stack
250     @available_connections.pop
251   else
252     @available_connections.shift
253   end
254 end
owned_connection(thread) click to toggle source

Returns the connection owned by the supplied thread, if any. The calling code should NOT already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
258 def owned_connection(thread)
259   sync{@allocated[thread]}
260 end
preconnect(concurrent = false) click to toggle source

Create the maximum number of connections immediately. The calling code should NOT have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
264 def preconnect(concurrent = false)
265   enum = (max_size - _size).times
266 
267   conns = if concurrent
268     enum.map{Thread.new{make_new(:default)}}.map(&:value)
269   else
270     enum.map{make_new(:default)}
271   end
272 
273   sync{conns.each{|conn| checkin_connection(conn)}}
274 end
raise_pool_timeout(elapsed) click to toggle source

Raise a PoolTimeout error showing the current timeout, the elapsed time, and the database’s name (if any).

    # File lib/sequel/connection_pool/threaded.rb
278 def raise_pool_timeout(elapsed)
279   name = db.opts[:name]
280   raise ::Sequel::PoolTimeout, "timeout: #{@timeout}, elapsed: #{elapsed}#{", database name: #{name}" if name}"
281 end
release(thread) click to toggle source

Releases the connection assigned to the supplied thread back to the pool. The calling code should already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
285 def release(thread)
286   conn = @allocated.delete(thread)
287 
288   unless @connection_handling == :disconnect
289     checkin_connection(conn)
290   end
291 
292   @waiter.signal
293   
294   # Ensure that after signalling the condition, some other thread is given the
295   # opportunity to acquire the mutex.
296   # See <https://github.com/socketry/async/issues/99> for more context.
297   sleep(0)
298   
299   nil
300 end
sync() { || ... } click to toggle source

Yield to the block while inside the mutex. The calling code should NOT already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
304 def sync
305   @mutex.synchronize{yield}
306 end