From fda6479f534025057f37c15395b20b20ae6fbfda Mon Sep 17 00:00:00 2001 From: Jahnvi Thakkar Date: Tue, 5 Aug 2025 11:59:30 +0530 Subject: [PATCH 1/2] FEAT: Adding Cursor.connection attribute --- mssql_python/cursor.py | 22 +++- tests/test_004_cursor.py | 245 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 259 insertions(+), 8 deletions(-) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index c5daaafe1..66aae3928 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -54,7 +54,7 @@ def __init__(self, connection) -> None: Args: connection: Database connection object. """ - self.connection = connection + self._connection = connection # Store as private attribute # self.connection.autocommit = False self.hstmt = None self._initialize_cursor() @@ -426,7 +426,7 @@ def _allocate_statement_handle(self): """ Allocate the DDBC statement handle. """ - self.hstmt = self.connection._conn.alloc_statement_handle() + self.hstmt = self._connection._conn.alloc_statement_handle() def _reset_cursor(self) -> None: """ @@ -564,6 +564,24 @@ def rownumber(self): return self._rownumber + @property + def connection(self): + """ + DB-API 2.0 attribute: Connection object that created this cursor. + + This is a read-only reference to the Connection object that was used to create + this cursor. This attribute is useful for polymorphic code that needs access + to connection-level functionality. + + Returns: + Connection: The connection object that created this cursor. + + Note: + This attribute is read-only as specified by DB-API 2.0. Attempting to + assign to this attribute will raise an AttributeError. + """ + return self._connection + def _reset_rownumber(self): """Reset the rownumber tracking when starting a new result set.""" self._rownumber = 0 diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index e86b33679..bcee1372b 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -513,12 +513,8 @@ def test_longwvarchar(cursor, db_connection): expectedRows = 2 # fetchone test cursor.execute("SELECT longwvarchar_column FROM #pytest_longwvarchar_test") - rows = [] - for i in range(0, expectedRows): - rows.append(cursor.fetchone()) - assert cursor.fetchone() == None, "longwvarchar_column is expected to have only {} rows".format(expectedRows) - assert rows[0] == ["ABCDEFGHI"], "SQL_LONGWVARCHAR parsing failed for fetchone - row 0" - assert rows[1] == [None], "SQL_LONGWVARCHAR parsing failed for fetchone - row 1" + row = cursor.fetchone() + assert row[0] == "ABCDEFGHI", "SQL_LONGWVARCHAR parsing failed for fetchone" # fetchall test cursor.execute("SELECT longwvarchar_column FROM #pytest_longwvarchar_test") rows = cursor.fetchall() @@ -2045,3 +2041,240 @@ def test_close(db_connection): pytest.fail(f"Cursor close test failed: {e}") finally: cursor = db_connection.cursor() + +def test_cursor_connection_attribute_exists(cursor, db_connection): + """Test that cursor.connection attribute exists and returns the correct connection""" + assert hasattr(cursor, 'connection'), "Cursor should have connection attribute" + assert cursor.connection is db_connection, "Cursor.connection should return the same connection object used to create it" + assert id(cursor.connection) == id(db_connection), "Cursor.connection should be the exact same object reference" + +def test_cursor_connection_attribute_readonly(cursor, db_connection): + """Test that cursor.connection is read-only""" + # Test that we can read the connection + conn = cursor.connection + assert conn is db_connection, "Should be able to read cursor.connection" + + # Test that we cannot write to the connection attribute + try: + cursor.connection = None + pytest.fail("Should not be able to assign to cursor.connection (read-only attribute)") + except AttributeError as e: + assert "can't set attribute" in str(e).lower() or "has no setter" in str(e).lower(), "Should raise AttributeError for read-only property" + + # Verify the connection is still intact after failed assignment + assert cursor.connection is db_connection, "Connection should remain unchanged after failed assignment" + +def test_cursor_connection_multiple_cursors(db_connection): + """Test cursor.connection with multiple cursors from same connection""" + # Create multiple cursors from the same connection + cursor1 = db_connection.cursor() + cursor2 = db_connection.cursor() + cursor3 = db_connection.cursor() + + try: + # All cursors should reference the same connection + assert cursor1.connection is db_connection, "First cursor should reference the connection" + assert cursor2.connection is db_connection, "Second cursor should reference the connection" + assert cursor3.connection is db_connection, "Third cursor should reference the connection" + + # All cursors should reference the exact same connection object + assert cursor1.connection is cursor2.connection, "All cursors should reference the same connection" + assert cursor2.connection is cursor3.connection, "All cursors should reference the same connection" + assert cursor1.connection is cursor3.connection, "All cursors should reference the same connection" + + # Test that they can all be used independently + cursor1.execute("SELECT 1 as test1") + result1 = cursor1.fetchone() # Fetch immediately to free connection + + cursor2.execute("SELECT 2 as test2") + result2 = cursor2.fetchone() # Fetch immediately to free connection + + cursor3.execute("SELECT 3 as test3") + result3 = cursor3.fetchone() # Fetch immediately to free connection + + assert result1[0] == 1, "First cursor should work independently" + assert result2[0] == 2, "Second cursor should work independently" + assert result3[0] == 3, "Third cursor should work independently" + + finally: + # Clean up cursors + for cursor in [cursor1, cursor2, cursor3]: + try: + if not cursor.closed: + cursor.close() + except: + pass + +def test_cursor_connection_multi_connection_environment(conn_str): + """Test cursor.connection in multi-connection environment""" + from mssql_python import connect + + # Create multiple connections + conn1 = connect(conn_str) + conn2 = connect(conn_str) + + try: + # Create cursors from different connections + cursor1 = conn1.cursor() + cursor2 = conn2.cursor() + + # Each cursor should reference its own connection + assert cursor1.connection is conn1, "First cursor should reference first connection" + assert cursor2.connection is conn2, "Second cursor should reference second connection" + + # Cursors should reference different connections + assert cursor1.connection is not cursor2.connection, "Cursors from different connections should reference different connections" + + # Test polymorphic code example - function that works with any cursor + def get_connection_info(cursor): + """Example of polymorphic code using cursor.connection""" + return { + 'cursor_id': id(cursor), + 'connection_id': id(cursor.connection), + 'autocommit': cursor.connection.autocommit + } + + info1 = get_connection_info(cursor1) + info2 = get_connection_info(cursor2) + + assert info1['connection_id'] != info2['connection_id'], "Should have different connection IDs" + assert info1['cursor_id'] != info2['cursor_id'], "Should have different cursor IDs" + + # Test that both cursors work with their respective connections + cursor1.execute("SELECT 'conn1' as source") + cursor2.execute("SELECT 'conn2' as source") + + result1 = cursor1.fetchone() + result2 = cursor2.fetchone() + + assert result1[0] == 'conn1', "First cursor should work with first connection" + assert result2[0] == 'conn2', "Second cursor should work with second connection" + + finally: + # Clean up + try: + conn1.close() + except: + pass + try: + conn2.close() + except: + pass + +def test_cursor_connection_after_cursor_close(db_connection): + """Test that cursor.connection is still accessible after cursor is closed""" + cursor = db_connection.cursor() + + # Verify connection works before closing cursor + assert cursor.connection is db_connection, "Connection should be accessible before close" + + # Close the cursor + cursor.close() + assert cursor.closed, "Cursor should be marked as closed" + + # Connection should still be accessible even after cursor is closed + assert cursor.connection is db_connection, "Connection should still be accessible after cursor close" + + # Should still be the same object reference + assert id(cursor.connection) == id(db_connection), "Should still be the same connection object" + +def test_cursor_connection_after_connection_close(conn_str): + """Test cursor.connection behavior after connection is closed""" + from mssql_python import connect + + temp_conn = connect(conn_str) + cursor = temp_conn.cursor() + + # Verify connection works initially + assert cursor.connection is temp_conn, "Connection should be accessible initially" + + # Close the connection + temp_conn.close() + + # Connection reference should still exist (it's just a reference) + assert cursor.connection is temp_conn, "Connection reference should still exist" + + # But the connection itself should be closed + assert temp_conn._closed, "Connection should be marked as closed" + +def test_cursor_connection_polymorphic_code_examples(db_connection): + """Test real-world examples of polymorphic code using cursor.connection""" + + def execute_with_autocommit_control(cursor, sql, autocommit_mode=True): + """Example function that uses cursor.connection for autocommit control""" + original_autocommit = cursor.connection.autocommit + try: + cursor.connection.autocommit = autocommit_mode + cursor.execute(sql) + return cursor.fetchall() if sql.strip().upper().startswith('SELECT') else cursor.rowcount + finally: + cursor.connection.autocommit = original_autocommit + + def get_cursor_metadata(cursor): + """Example function that extracts metadata using cursor.connection""" + return { + 'connection_closed': getattr(cursor.connection, '_closed', False), + 'autocommit': cursor.connection.autocommit, + 'cursor_closed': cursor.closed, + 'has_exception_attributes': all(hasattr(cursor.connection, attr) for attr in + ['Error', 'InterfaceError', 'DatabaseError']) + } + + # Test the polymorphic functions + cursor = db_connection.cursor() + + try: + # Test autocommit control function + result = execute_with_autocommit_control(cursor, "SELECT 1 as test", autocommit_mode=True) + assert len(result) == 1, "Should return one row" + assert result[0][0] == 1, "Should return correct value" + + # Test metadata extraction function + metadata = get_cursor_metadata(cursor) + assert metadata['connection_closed'] == False, "Connection should not be closed" + assert isinstance(metadata['autocommit'], bool), "Autocommit should be boolean" + assert metadata['cursor_closed'] == False, "Cursor should not be closed" + + finally: + cursor.close() + +def test_cursor_connection_transaction_control(db_connection): + """Test using cursor.connection for transaction control""" + cursor = db_connection.cursor() + + try: + # Create a test table + cursor.execute("CREATE TABLE #test_cursor_connection (id INT, value VARCHAR(50))") + db_connection.commit() + + # Test transaction control through cursor.connection + original_autocommit = cursor.connection.autocommit + cursor.connection.autocommit = False + + try: + # Insert data in transaction + cursor.execute("INSERT INTO #test_cursor_connection VALUES (1, 'test')") + + # Verify data exists before commit + cursor.execute("SELECT COUNT(*) FROM #test_cursor_connection") + count = cursor.fetchone()[0] + assert count == 1, "Data should exist before commit" + + # Rollback the transaction using cursor.connection + cursor.connection.rollback() + + # Verify data was rolled back + cursor.execute("SELECT COUNT(*) FROM #test_cursor_connection") + count = cursor.fetchone()[0] + assert count == 0, "Data should be rolled back" + + finally: + cursor.connection.autocommit = original_autocommit + + finally: + try: + cursor.execute("DROP TABLE #test_cursor_connection") + db_connection.commit() + except: + pass + cursor.close() From 84428bfd6ceb1e8ed3598d3a9bfa63d71e5828ea Mon Sep 17 00:00:00 2001 From: Jahnvi Thakkar Date: Tue, 5 Aug 2025 12:02:16 +0530 Subject: [PATCH 2/2] FEAT: Adding Cursor.connection attribute --- tests/test_004_cursor.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index bcee1372b..50c0e1312 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -513,8 +513,12 @@ def test_longwvarchar(cursor, db_connection): expectedRows = 2 # fetchone test cursor.execute("SELECT longwvarchar_column FROM #pytest_longwvarchar_test") - row = cursor.fetchone() - assert row[0] == "ABCDEFGHI", "SQL_LONGWVARCHAR parsing failed for fetchone" + rows = [] + for i in range(0, expectedRows): + rows.append(cursor.fetchone()) + assert cursor.fetchone() == None, "longwvarchar_column is expected to have only {} rows".format(expectedRows) + assert rows[0] == ["ABCDEFGHI"], "SQL_LONGWVARCHAR parsing failed for fetchone - row 0" + assert rows[1] == [None], "SQL_LONGWVARCHAR parsing failed for fetchone - row 1" # fetchall test cursor.execute("SELECT longwvarchar_column FROM #pytest_longwvarchar_test") rows = cursor.fetchall()