当前位置:首页>编程日记>正文

python redis链接建立实现分析

  今天在写zabbix storm job监控脚本的时候用到了python的redis模块,之前也有用过,但是没有过多的了解,今天看了下相关的api和源码,看到有ConnectionPool的实现,这里简单说下。
在ConnectionPool之前,如果需要连接redis,我都是用StrictRedis这个类,在源码中可以看到这个类的具体解释:

redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an 
implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server

使用的方法:

 r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx)r.xxxx()

有了ConnectionPool这个类之后,可以使用如下方法

pool = redis.ConnectionPool(host=xxx, port=xxx, db=xxxx)
r = redis.Redis(connection_pool=pool)

这里Redis是StrictRedis的子类
简单分析如下:
在StrictRedis类的__init__方法中,可以初始化connection_pool这个参数,其对应的是一个ConnectionPool的对象:

class StrictRedis(object):
........def __init__(self, host='localhost', port=6379,db=0, password=None, socket_timeout=None,socket_connect_timeout=None,socket_keepalive=None, socket_keepalive_options=None,connection_pool=None, unix_socket_path=None,encoding='utf-8', encoding_errors='strict',charset=None, errors=None,decode_responses=False, retry_on_timeout=False,ssl=False, ssl_keyfile=None, ssl_certfile=None,ssl_cert_reqs=None, ssl_ca_certs=None):if not connection_pool:..........connection_pool = ConnectionPool(**kwargs)self.connection_pool = connection_pool

在StrictRedis的实例执行具体的命令时会调用execute_command方法,这里可以看到具体实现是从连接池中获取一个具体的连接,然后执行命令,完成后释放连接:

   # COMMAND EXECUTION AND PROTOCOL PARSINGdef execute_command(self, *args, **options):"Execute a command and return a parsed response"pool = self.connection_poolcommand_name = args[0]connection = pool.get_connection(command_name, **options)  #调用ConnectionPool.get_connection方法获取一个连接try:connection.send_command(*args)  #命令执行,这里为Connection.send_commandreturn self.parse_response(connection, command_name, **options)except (ConnectionError, TimeoutError) as e:connection.disconnect()if not connection.retry_on_timeout and isinstance(e, TimeoutError):raiseconnection.send_command(*args)  return self.parse_response(connection, command_name, **options)finally:pool.release(connection)  #调用ConnectionPool.release释放连接

在来看看ConnectionPool类:

     class ConnectionPool(object):  ...........def __init__(self, connection_class=Connection, max_connections=None,**connection_kwargs):   #类初始化时调用构造函数max_connections = max_connections or 2 ** 31if not isinstance(max_connections, (int, long)) or max_connections < 0:  #判断输入的max_connections是否合法raise ValueError('"max_connections" must be a positive integer')self.connection_class = connection_class  #设置对应的参数self.connection_kwargs = connection_kwargsself.max_connections = max_connectionsself.reset()  #初始化ConnectionPool 时的reset操作def reset(self):self.pid = os.getpid()self._created_connections = 0  #已经创建的连接的计数器self._available_connections = []   #声明一个空的数组,用来存放可用的连接self._in_use_connections = set()  #声明一个空的集合,用来存放已经在用的连接self._check_lock = threading.Lock()
.......def get_connection(self, command_name, *keys, **options):  #在连接池中获取连接的方法"Get a connection from the pool"self._checkpid()try:connection = self._available_connections.pop()  #获取并删除代表连接的元素,在第一次获取connectiong时,因为_available_connections是一个空的数组,会直接调用make_connection方法except IndexError:connection = self.make_connection()self._in_use_connections.add(connection)   #向代表正在使用的连接的集合中添加元素return connection   def make_connection(self): #在_available_connections数组为空时获取连接调用的方法"Create a new connection"if self._created_connections >= self.max_connections:   #判断创建的连接是否已经达到最大限制,max_connections可以通过参数初始化raise ConnectionError("Too many connections")self._created_connections += 1   #把代表已经创建的连接的数值+1return self.connection_class(**self.connection_kwargs)     #返回有效的连接,默认为Connection(**self.connection_kwargs)def release(self, connection):  #释放连接,链接并没有断开,只是存在链接池中"Releases the connection back to the pool"self._checkpid()if connection.pid != self.pid:returnself._in_use_connections.remove(connection)   #从集合中删除元素self._available_connections.append(connection) #并添加到_available_connections 的数组中def disconnect(self): #断开所有连接池中的链接"Disconnects all connections in the pool"all_conns = chain(self._available_connections,self._in_use_connections)for connection in all_conns:connection.disconnect()

execute_command最终调用的是Connection.send_command方法,关闭链接为 Connection.disconnect方法,而Connection类的实现:

class Connection(object):"Manages TCP communication to and from a Redis server"def __del__(self):   #对象删除时的操作,调用disconnect释放连接try:self.disconnect()except Exception:pass

核心的链接建立方法是通过socket模块实现:

    def _connect(self):err = Nonefor res in socket.getaddrinfo(self.host, self.port, 0,socket.SOCK_STREAM):family, socktype, proto, canonname, socket_address = ressock = Nonetry:sock = socket.socket(family, socktype, proto)# TCP_NODELAYsock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)# TCP_KEEPALIVEif self.socket_keepalive:   #构造函数中默认 socket_keepalive=False,因此这里默认为短连接sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)for k, v in iteritems(self.socket_keepalive_options):sock.setsockopt(socket.SOL_TCP, k, v)# set the socket_connect_timeout before we connectsock.settimeout(self.socket_connect_timeout)  #构造函数中默认socket_connect_timeout=None,即连接为blocking的模式# connectsock.connect(socket_address)# set the socket_timeout now that we're connectedsock.settimeout(self.socket_timeout)  #构造函数中默认socket_timeout=Nonereturn sockexcept socket.error as _:err = _if sock is not None:sock.close()
.....

关闭链接的方法:

    def disconnect(self):"Disconnects from the Redis server"self._parser.on_disconnect()if self._sock is None:returntry:self._sock.shutdown(socket.SHUT_RDWR)  #先shutdown再closeself._sock.close()except socket.error:passself._sock = None

        
可以小结如下
1)默认情况下每创建一个Redis实例都会构造出一个ConnectionPool实例,每一次访问redis都会从这个连接池得到一个连接,操作完成后会把该连接放回连接池(连接并没有释放),可以构造一个统一的ConnectionPool,在创建Redis实例时,可以将该ConnectionPool传入,那么后续的操作会从给定的ConnectionPool获得连接,不会再重复创建ConnectionPool。
2)默认情况下没有设置keepalive和timeout,建立的连接是blocking模式的短连接。
3)不考虑底层tcp的情况下,连接池中的连接会在ConnectionPool.disconnect中统一销毁。


http://www.coolblog.cn/news/d044f2c489f05bc0.html

相关文章:

  • asp多表查询并显示_SpringBoot系列(五):SpringBoot整合Mybatis实现多表关联查询
  • s7day2学习记录
  • 【求锤得锤的故事】Redis锁从面试连环炮聊到神仙打架。
  • 矿Spring入门Demo
  • 拼音怎么写_老师:不会写的字用圈代替,看到孩子试卷,网友:人才
  • Linux 实时流量监测(iptraf中文图解)
  • Win10 + Python + GPU版MXNet + VS2015 + RTools + R配置
  • 美颜
  • shell访问php文件夹,Shell获取某目录下所有文件夹的名称
  • 如何优雅的实现 Spring Boot 接口参数加密解密?
  • LeCun亲授的深度学习入门课:从飞行器的发明到卷积神经网络
  • Mac原生Terminal快速登录ssh
  • java受保护的数据与_Javascript类定义语法,私有成员、受保护成员、静态成员等介绍...
  • mysql commit 机制_1024MySQL事物提交机制
  • 支撑微博千亿调用的轻量级RPC框架:Motan
  • jquery 使用小技巧
  • 2019-9
  • 法拉利虚拟学院2010 服务器,法拉利虚拟学院2010
  • vscode pylint 错误_将实际未错误的py库添加到pylint白名单
  • 科学计算工具NumPy(3):ndarray的元素处理
  • 工程师在工作电脑存 64G 不雅文件,被公司开除后索赔 41 万,结果…
  • linux批量创建用户和密码
  • newinsets用法java_Java XYPlot.setInsets方法代碼示例
  • js常用阻止冒泡事件
  • 气泡图在开源监控工具中的应用效果
  • 各类型土地利用图例_划重点!国土空间总体规划——土地利用
  • php 启动服务器监听
  • dubbo简单示例
  • 【设计模式】 模式PK:策略模式VS状态模式
  • [iptables]Redhat 7.2下使用iptables实现NAT
  • Ubuntu13.10:[3]如何开启SSH SERVER服务
  • CSS小技巧——CSS滚动条美化
  • JS实现-页面数据无限加载
  • 阿里巴巴分布式服务框架 Dubbo
  • 最新DOS大全
  • Django View(视图系统)
  • 阿里大鱼.net core 发送短信
  • 程序员入错行怎么办?
  • 两张超级大表join优化
  • 第九天函数
  • Linux软件安装-----apache安装
  • HDU 5988 最小费用流
  • Sorenson Capital:值得投资的 5 种 AI 技术
  • 《看透springmvc源码分析与实践》读书笔记一
  • 正式开课!如何学习相机模型与标定?(单目+双目+鱼眼+深度相机)
  • Arm芯片的新革命在缓缓上演
  • nagios自写插件—check_file
  • python3 错误 Max retries exceeded with url 解决方法
  • 行为模式之Template Method模式
  • 通过Spark进行ALS离线和Stream实时推荐