30. General use cases¶
Overview¶
Cookbook¶
Create a Session We covered SQLAlchemy session creation in the previous post and explained the concept of engines in the post before that. If you skipped those posts, don't. The below is copy+pasta courtesy:
"""Database engine & session creation.""" from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker
engine = create_engine( 'mysql+pymysql://user:password@host:3600/database', echo=True ) Session = sessionmaker(bind=engine) session = Session() database.py Basic Query Syntax Let's quickly become familiar with the basic structure of SQLAlchemy's query API. SQLAlchemy session objects have a query() method that accepts the raw class of a data model we've previously defined. Below are the humble beginnings of a query to run on Customer model; or in other words, a query on the customers SQL table:
"""Construct database queries from SQLAlchemy sessions.""" from .database import session from .models import Customer
Example structure of an ORM query¶
records = session .query(Customer) .FUNCTION() Select records from an SQLAlchemy session Calling .query(Customer) on our session isn't a valid query until we add one more method to the chain. All session queries end with a final method to shape/anticipate the result(s) of our query:
all() will return all records which match our query as a list of objects. If we were to use all on the query above, we would receive all customer records with the Python data type List[Customer]. first() returns the first record matching our query, despite how many records match the query (what constitutes "first" depends on how your table is sorted). This is the equivalent of adding LIMIT 1 to a SQL query. As a result, the Python type to be returned would be Customer. one() is extremely useful for cases where a maximum of one record should exist for the query we're executing (think of querying by primary key). This syntax is notably useful when verifying whether or not a record exists prior to creating one. scalar() returns a single value if one exists, None if no values exist, or raises an exception if multiple records are returned. get([VALUE(S)]) searches against a model's primary key to return rows where the primary key is equal to the value provided. get() also accepts tuples in the event that multiple foreign keys should be searched. Lastly, get() can also accept a dictionary and return rows where the columns (dictionary keys) match the values provided. To create more complex queries, we'd add to our query by chaining methods on our original query:
"""Construct database queries from SQLAlchemy sessions.""" from .database import session from .models import Customer
Example structure of an ORM query¶
records = session .query(Customer) .METHOD_1() .METHOD_2() .FUNCTION() Complex SELECT query Query Results If we execute a query that returns multiple records, we'll need to loop through them to see the results:
"""Construct database queries from SQLAlchemy sessions.""" from .database import session from .models import Customer
Fetch all customer records¶
records = session .query(Customer) .all()
Loop over records¶
for record in records: print(record) Fetch all customer records and print the output The SQLAlchemy ORM will return an instance of a class by default, which means the above will result in the following output:
records = session
.query(Customer)
.all()
for record in records:
pp.pprint(record.**dict**)
View query results as dictionaries
This instead returns dictionary objects for each row:
{ '_sa_instance_state':
The output of a session query as dictionaries
Of course, you could also create your own object instead to receive only the columns you want/need:
### Fetch all customers
records = session.query(Customer).all()
for record in records: recordObject = { 'name': record.name, 'position': record.position, 'team_name': record.team.name, 'team_city': record.team.city } print(recordObject)
{ 'email': 'kpaladini5i@senate.gov', 'first_name': 'Kenna', 'join_date': datetime.datetime(2019, 4, 19, 0, 0), 'last_name': 'Paladini', 'preferred_language': 'Bulgarian'} { 'email': 'rlebrun5j@narod.ru', 'first_name': 'Rriocard', 'join_date': datetime.datetime(2015, 6, 8, 0, 0), 'last_name': 'Le Brun', 'preferred_language': 'Khmer'},
The output of query deserialization
Filtering Results
Probably the most common method you'll use on a query is the filter() method. filter() is the equivalent of a SQL WHERE clause to return only rows that match the criteria we want:
### Fetch records where `first_name` is `Carl`
records = session
.query(Customer)
.filter(Customer)
.first_name == 'Carl')
.all()
Select all customers named Carl
filter_by()
We could write the above query using the filter_by() method instead like so:
Fetch records where first_name is Carl¶
records = session .query(Customer) .filter_by(first_name="Carl") .all() Filter with filter_by Unlike filter(), filter_by() accepts keyword arguments (note the difference in syntax here: filter() checks a conditional against a column object whereas filter_by() finds columns which match the arguments we pass). filter_by() can only search for exact values and serves as a kind of shorthand for simple filtering queries.
like() We can do more than filter on simple conditionals. SQLAlchemy has a like() method which works in an equivalent manner to SQL's LIKE:
records = session .query(Customer) .filter(Customer.first_name.like('J%')) .all()
Select customer records where first names begin with "J"
As expected, this will give us all rows where the customer's first name starts with a J:
{ 'email': 'jpugsley9@netvibes.com', 'first_name': 'Jarid', 'join_date': datetime.datetime(2017, 10, 11, 0, 0), 'last_name': 'Pugsley', 'preferred_language': 'Burmese'} { 'email': 'jdymockek@is.gd', 'first_name': 'Jeanna', 'join_date': datetime.datetime(2017, 11, 13, 0, 0), 'last_name': 'Dymocke', 'preferred_language': 'Malayalam'}
Output
High-level Query Methods
In addition to filter(), there are a few basic methods we should be familiar with. Each of these corresponds to SQL keywords you're probably familiar with:
limit([INTEGER]): Limits the number of rows to a maximum of the number provided.
order_by([COLUMN]): Sorts results by the provided column.
offset([INTEGER]): Begins the query at row n.
This next part involves executing JOIN queries between models, which requires us to define relationships on our models first. Things are a bit out of order at the moment, as I actually don't cover this until the next post. Sorry for the mess, I'm working on it!
Performing JOINs & UNIONs
We've touched on JOINs a bit previously, but we're about to kick it up a notch. We have two data models we're working with: one for customers, and one for orders. Each customer
import pprint from .models import Order, Customer
pp = pprint.PrettyPrinter(indent=4)
records = session .query(Customer) .join(Order, Order.customer_id == Customer.id) .all()
for record in records: record_object = { 'first_name': record.first_name, 'last_name': record.last_name, 'email': record.email, 'preferred_language': record.preferred_language, 'join_date': record.join_date, 'orders': [] } for order in record.order: order = { 'order_price': order.price, 'currency': order.currency, 'purchase_date': order.purchase_date, 'product': order.product } record_object['orders'].append(order) pp.pprint(record_object)
Join records from different tables and deserialize records
We perform our JOIN using the join() method. The first parameter we pass is the data model we'll be joining with on the "right." We then specify what we'll be joining "on": the customer_id column of our order model, and the id column of our customer model.
Our outer loop gives us each customer, and our inner loop adds each order to the appropriate customer. Check out an example record:
{ 'email': 'jtinline16@arizona.edu', 'first_name': 'Jerry', 'join_date': datetime.datetime(2016, 10, 27, 0, 0), 'last_name': 'Tinline', 'preferred_language': 'Icelandic', 'orders': [{'currency': 'IDR', 'order_price': 34.24, 'product': 'Beer - Corona', 'purchase_date': datetime.datetime(2019, 5, 5, 0, 0)}, {'currency': 'GEL', 'order_price': 25.75, 'product': 'Creamers - 10%', 'purchase_date': datetime.datetime(2019, 1, 27, 0, 0)}]}
Output
Our friend Jerry here has two orders: one for some Coronas, and another for creamers. Get at it, Jerry.
Outer JOINs
In addition to simple JOINs, we can perform outer JOINs using the same syntax:
from .models import ExampleModel1, ExampleModel2
### Execute an outer JOIN
records = session
.query(ExampleModel1)
.outerjoin(ExampleModel2)
.all()
Perform an outer join
Unions
We can perform UNIONs and UNION ALLs as well:
from .models import ExampleModel1, ExampleModel2
Execute a UNION¶
records = ExampleModel1.union(ExampleModel2) Perform a union To perform a union all, simply replace union() with union_all()!
Aggregate Functions and Stats As with all SQL-like query languages, we can perform some aggregate stats as well. The following are available to us:
count([COLUMN]): Counts the number of records in a column. count(distinct([COLUMN])): Counts the distinct number of records in a column. sum([COLUMN]): Adds the numerical values in a column. Here's how we'd perform a query that counts the values in a column:
from sqlalchemy import func
### Count number of records with a `first_name` value
records = session
.query(func.count(Customer.first_name))
.all()
for record in records:
print(record)
Aggregate queries
Which outputs:
(200,)
Output
This query can easily be modified to only count distinct values:
from sqlalchemy import func from sqlalchemy import distinct
Count number of DISTINCT first_name values¶
records = session .query(func.count(distinct(Customer.first_name))) .all()
for record in records: print(record) Query to aggregate distinct results Using Group_by() Of course, we can use the group_by() method on queries based on aggregates as well. group_by() works similarly to what we'd expect from SQL and Pandas:
### Execute a `GROUP BY` aggregation query
records = session
.query(func.count(Customer.first_name))
.group_by(Customer.first_name)
.all()
"Group by" aggregation
Mutations
We've spent an awful lot of time going over how to extract data from our database, but haven't talked about modifying our data yet! The last item on our agenda today is looking at how to add, remove, and change records using the SQLAlchemy ORM.
Inserting Rows
The first way we can add data is by using the add() method. add() expects an instance of a class (data model specifically) to be passed, and will create a new database row as a result:
from .database import session
from .models import Customer
### Inserting records via data models
customer = Customer(
first_name='Todd',
last_name='Birchard',
email='fake@example.com',
preferred_language='English',
join_date=datetime.now()
)
session.add(customer)
session.commit()
Insert records via ORM
An alternative way to add data is by using the insert() method. Unlike add(), insert() is called on an SQLAlchemy Table object and doesn't rely on receiving a data model. insert() is not part of the ORM:
Inserting records via SQLAlchemy Table objects¶
insert = [TABLE] .insert() .values( first_name='Todd', last_name='Jack Jones', email='fake@example.com', preferred_language='English', join_date=datetime.now() ) Insert records Updating Building on the syntax of insert(), we can drop in the update() method to change an existing record's values. We chain in the where() method to specify which rows should be updated:
### Updating records via SQLAlchemy `Table` objects
result = [TABLE]
.update()
.where([TABLE].c.name == 'Todd')
.values(email='newemail@example.com')
Update records
Deleting
On any query we execute, we can append the delete() method to delete all rows which are contained in that query (be careful!). The below deletes all records where the first_name column contains a value of "Carl":
Delete records where first_name is Carl¶
result = session .query(Customer) .filter(Customer.first_name == 'Carl') .delete() Delete records delete() accepts the synchronize_session parameter, which determines how deletions should be handled:
False won't perform the delete until the session is committed. 'fetch' selects all rows to be deleted and removes matched rows. 'evaluate' will evaluate the objects in the current session to determine which rows should be removed. Never Stop Exploring™ There's a lot we've left out for the sake of simplicity. There are plenty of cool methods left to explore, like the correlate() method, for instance. You're armed with enough to be dangerous in SQLAlchemy now, but I encourage anybody to look over the query documentation and find the cool things we didn't speak to in detail here.
I'm still working on throwing together the source code for this post in the Github repo below. The source for the previous chapters can be found there as well. In the meantime, I apologize for being a bit of a shit show:
hackersandslackers/sqlalchemy-tutorial 🧪🔬 Use SQLAlchemy to connect, query, and interact with relational databases. - hackersandslackers/sqlalchemy-tutorial
GitHub hackersandslackers
SQLALCHEMY PYTHON SQL SOFTWARE Previous Databases in Python Made Easy with SQLAlchemy Next Databases in Python Made Easy with SQLAlchemy Todd Birchard's' avatar Todd Birchard New York City Wasted youth as a Product Manager, enjoying life as a Lead Software Engineer. Happily retired from rapidly propping up doomed startups. Focused on creating meaningful work and relationships.
Monthly Newsletter Your name Your email address Sign Up Support us We started sharing these tutorials to help and inspire new scientists and engineers around the world. If Hackers and Slackers has been helpful to you, feel free to buy us a coffee to keep us going :).
Microsoft Office 64-Bit The Level-Up: What I've Learned in the Four Years Since My Last Post A journey through Power BI, PowerPivot, PowerQuery, XLOOKUP, and all the goodies of 64-bit Microsoft Office. Jan 30, 2023 10 mins Google Cloud architecture serving assets via a Load Balancer Serving Assets via CDN with Google Cloud Serve static content via a Google Cloud CDN to improve load times. Fine-tune your load balancer and caching to match your app"s needs. Apr 13, 2022 11 mins Featured image for "Async Python HTTP Requests with AIOHTTP & AIOFiles" tutorial. Async Python HTTP Requests with AIOHTTP & AIOFiles Handle hundreds of HTTP requests, disk writes, and other I/O-bound tasks with quintessential async Python libraries. Jan 18, 2022 14 mins Hackers and Slackers Community of hackers obsessed with data science, data engineering, and analysis. Openly pushing a pro-robot agenda.
NAVIGATION About Series Join Donate Sign in Subscribe SERIES' Data Analysis with Pandas Build Flask Apps Google Cloud Architecture Learning Apache Spark Mastering SQLAlchemy Welcome to SQL GraphQL Tutorials Working with MySQL Mapping Data with Mapbox Python Concurrency with Asyncio Getting Started with Django Web Scraping With Python AUTHORS Todd Birchard Matthew Alhonte Max Mileaf Ryan Rosado Graham Beckley David Aquino Paul Armstrong Dylan Castillo
©2023 Hackers and Slackers, All Rights Reserved.
https://hackersandslackers.com/database-queries-sqlalchemy-orm/
Convert RoW Statement to dict
Example
¶
First the setup for the example:
import datetime as dt from sqlalchemy import Column, Date, Integer, Text, create_engine, inspect from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base() Session = sessionmaker()
class User(Base): tablename = 'users'
id = Column(Integer, primary_key=True)
name = Column(Text, nullable=False)
birthday = Column(Date)
engine = create_engine('sqlite://') Base.metadata.create_all(bind=engine) Session.configure(bind=engine)
session = Session() session.add(User(name='Alice', birthday=dt.date(1990, 1, 1))) session.commit() If you're querying columns individually, the row is a KeyedTuple which has an _asdict method. The method name starts with a single underscore, to match the namedtuple API (it's not private!).
query = session.query(User.name, User.birthday) for row in query: print(row._asdict()) When using the ORM to retrieve objects, this is not available by default. The SQLAlchemy inspection system should be used.
def object_as_dict(obj): return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs}
query = session.query(User) for user in query: print(object_as_dict(user)
Instead of using declarative_base as above, you can create it from your own class:
from sqlalchemy.ext.declarative import as_declarative
@as_declarative() class Base: def _asdict(self): return {c.key: getattr(self, c.key) for c in inspect(self).mapper.column_attrs}
https://riptutorial.com/sqlalchemy/example/6614/converting-a-query-result-to-dict