from distutils.version import StrictVersion from socket import error as SocketError, timeout as SocketTimeout import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.poolmanager import ( PoolManager, HTTPSConnectionPool, ) from requests.packages.urllib3.exceptions import ConnectTimeoutError from requests.packages.urllib3.util import connection from requests.packages.urllib3.exceptions import NewConnectionError from requests.packages.urllib3.connection import HTTPSConnection class ForcedIPHTTPSAdapter(HTTPAdapter): def __init__(self, *args, **kwargs): self.dest_ip = kwargs.pop('dest_ip', None) super(ForcedIPHTTPSAdapter, self).__init__(*args, **kwargs) def init_poolmanager(self, *args, **pool_kwargs): pool_kwargs['dest_ip'] = self.dest_ip self.poolmanager = ForcedIPHTTPSPoolManager(*args, **pool_kwargs) class ForcedIPHTTPSPoolManager(PoolManager): def __init__(self, *args, **kwargs): self.dest_ip = kwargs.pop('dest_ip', None) super(ForcedIPHTTPSPoolManager, self).__init__(*args, **kwargs) def _new_pool(self, scheme, host, port, request_context=None): kwargs = self.connection_pool_kw assert scheme == 'https' kwargs['dest_ip'] = self.dest_ip return ForcedIPHTTPSConnectionPool(host, port, **kwargs) class ForcedIPHTTPSConnectionPool(HTTPSConnectionPool): def __init__(self, *args, **kwargs): self.dest_ip = kwargs.pop('dest_ip', None) super(ForcedIPHTTPSConnectionPool, self).__init__(*args, **kwargs) def _new_conn(self): self.num_connections += 1 actual_host = self.host actual_port = self.port if self.proxy is not None: actual_host = self.proxy.host actual_port = self.proxy.port self.conn_kw = getattr(self, 'conn_kw', {}) self.conn_kw['dest_ip'] = self.dest_ip conn = ForcedIPHTTPSConnection( host=actual_host, port=actual_port, timeout=self.timeout.connect_timeout, strict=self.strict, **self.conn_kw) pc = self._prepare_conn(conn) return pc def __str__(self): return '%s(host=%r, port=%r, dest_ip=%s)' % ( type(self).__name__, self.host, self.port, self.dest_ip) class ForcedIPHTTPSConnection(HTTPSConnection, object): def __init__(self, *args, **kwargs): self.dest_ip = kwargs.pop('dest_ip', None) super(ForcedIPHTTPSConnection, self).__init__(*args, **kwargs) def _new_conn(self): extra_kw = {} if self.source_address: extra_kw['source_address'] = self.source_address if getattr(self, 'socket_options', None): extra_kw['socket_options'] = self.socket_options dest_host = self.dest_ip if self.dest_ip else self.host try: conn = connection.create_connection( (dest_host, self.port), self.timeout, **extra_kw) except SocketTimeout as e: raise ConnectTimeoutError( self, "Connection to %s timed out. (connect timeout=%s)" % (self.host, self.timeout)) except SocketError as e: raise NewConnectionError( self, "Failed to establish a new connection: %s" % e) return conn # The script above comes from: https://github.com/Roadmaster/forcediphttpsadapter/blob/master/w/adapters.py session = requests.Session() session.mount("https://query.wikidata.org", ForcedIPHTTPSAdapter(dest_ip='198.35.26.96')) response = session.get( 'https://query.wikidata.org/sparql?query=SELECT%20%0A%20%20%3Fitem%20%3Frelative%20%3FitemLabel%20%20%3FrelativeLabel%20%0A%7B%0A%20%20%20%20%3Fitem%20wdt%3AP40%20%3Frelative%20.%0A%20%20%20%20%3Fitem%20wdt%3AP31%20wd%3AQ5%20%3B%20wdt%3AP21%20wd%3AQ6581097.%0A%20%20%20%20%3Frelative%20wdt%3AP31%20wd%3AQ5%20.%0A%20%20%20%20MINUS%20%7B%20%3Frelative%20wdt%3AP22%20%3Fitem%20%7D%0A%7D', headers={'Host': 'query.wikidata.org'}, verify=False) print(response.status_code) print(response.text)