Pépin Rémi, Ensai, 2025
remi.pepin@insee.fr
from constructs import Construct
from cdktf import App, TerraformStack
from cdktf_cdktf_provider_aws.provider import AwsProvider
from cdktf_cdktf_provider_aws.s3_bucket import S3Bucket
class S3Stack(TerraformStack):
def __init__(self, scope: Construct, id: str):
super().__init__(scope, id)
AwsProvider(self, "AWS", region="us-east-1")
bucket = S3Bucket(
self, "s3_bucket",
bucket_prefix = "my-cdtf-test-bucket",
acl="private",
force_destroy=True
)
app = App()
S3Stack(app, "S3")
app.synth()
import boto3
# Create an S3 resource
s3 = boto3.resource('s3')
# Create a bucket
s3.create_bucket(Bucket='mybucket')
# Upload file
with open('/tmp/hello.txt', 'rb') as file:
s3.Object('mybucket', 'hello_s3.txt').put(Body=file)
# Download file
s3.Bucket('mybucket').download_file('hello_s3.txt', 'hello.txt')
# Delete file
s3.Object('mybucket', 'hello_s3.txt').delete()
No SQL : pas de schéma, pas de jointure : ne pas penser en model relationnel
from constructs import Construct
from cdktf import App, TerraformStack
from cdktf_cdktf_provider_aws.provider import AwsProvider
from cdktf_cdktf_provider_aws.dynamodb_table import DynamodbTable, DynamodbTableAttribute
class DynamoDBStack(TerraformStack):
def __init__(self, scope: Construct, id: str):
super().__init__(scope, id)
AwsProvider(self, "AWS", region="us-east-1")
bucket = DynamodbTable(
self, "DynamodDB-table",
name= "user_score",
hash_key="username",
range_key="lastename",
attribute=[
DynamodbTableAttribute(name="username",type="S" ),
DynamodbTableAttribute(name="lastename",type="S" )
],
billing_mode="PROVISIONED",
read_capacity=5,
write_capacity=5
)
app = App()
DynamoDBStack(app, "DynamoDBStack")
app.synth()
import boto3
# Get the service resource.
dynamodb = boto3.resource('dynamodb')
# Create the DynamoDB table.
table = dynamodb.create_table(
TableName='users',
KeySchema=[
{'AttributeName': 'username','KeyType': 'HASH'},
{'AttributeName': 'last_name','KeyType': 'RANGE'}
],
AttributeDefinitions=[
{'AttributeName': 'username','AttributeType': 'S'},
{'AttributeName': 'last_name','AttributeType': 'S'},
],
ProvisionedThroughput={'ReadCapacityUnits': 5,'WriteCapacityUnits': 5}
)
# Wait until the table exists.
table.meta.client.get_waiter('table_exists').wait(TableName='users')
import boto3
# Get the service resource.
dynamodb = boto3.resource('dynamodb')
# Get the table.
table = dynamodb.Table('users')
# Put item to the table.
table.put_item(
Item={
'username': 'janedoe',
'first_name': 'Jane',
'last_name': 'Doe',
'age': 25,
'account_type': 'standard_user',
}
)
# Delete item from the table.
table.delete_item(
Key={
'username': 'janedoe',
'last_name': 'Doe'
}
)
import boto3
# Get the service resource.
dynamodb = boto3.resource('dynamodb')
# Get the table.
table = dynamodb.Table('users')
# Batch writing item. Only one big query, cost less ans it's quicker
with table.batch_writer() as batch:
for i in range(50):
batch.put_item(
Item={
'account_type': 'anonymous',
'username': 'user' + str(i),
'first_name': 'unknown',
'last_name': 'unknown'
}
)
import boto3
# Get the service resource.
dynamodb = boto3.resource('dynamodb')
# Get the table.
table = dynamodb.Table('users')
# Get item from the table.
response = table.get_item(
Key={
'username': 'janedoe',
'last_name': 'Doe'
}
)
item = response['Item']
print(item)
import boto3
# Get the service resource.
dynamodb = boto3.resource('dynamodb')
# Get the table.
table = dynamodb.Table('users')
# Update item from the table.
table.update_item(
Key={
'username': 'janedoe',
'last_name': 'Doe'
},
UpdateExpression='SET age = :val1',
ExpressionAttributeValues={
':val1': 26
}
)
# SELECT * FROM table
response = table.scan()
# SELECT COUNT(*) FROM table
response = table.scan(
Select='COUNT'
)
# SELECT SongTitle, AlbumTitle FROM table WHERE Artist = 'No One You Know'
response = table.scan(
ExpressionAttributeNames={
'#AT': 'AlbumTitle',
'#ST': 'SongTitle',
},
ExpressionAttributeValues={
':a': 'No One You Know',
},
FilterExpression='Artist = :a',
ProjectionExpression='#ST, #AT',
)
Le système scan TOUTE la table et retourne que les valeurs qui respectent la condition
# SELECT * FROM table WHERE PK ='USER#johnsonscott' AND SK='#METADATA#johnsonscott'
user="johnsonscott"
response = table.get_item(
Key={
'PK': f"USER#{user}",
'SK' : f"#METADATA#{user}"
}
)
#SELECT * FROM table WHERE PK ='GAME#azertyazerty' AND SK LIKE 'USER#%'
game="azertyazerty"
response = table.query(
Select='ALL_ATTRIBUTES',
KeyConditionExpression="PK = :pk AND begins_with(SK, :sk)",
ExpressionAttributeValues={
":pk": f"GAME#{game}",
":sk": f"USER#"
}
)
Le système scan uniquement les valeurs qui respectent la condition car elle est placée sur la COMPOSITE PRIMARY KEY.
Objectif