4 - Cloud function

Écrivons une cloud function pour automatiser le chargement de la table des naissances à chaque fois qu’un fichier .csv dont le nom commence par "naissance" est déposé dans notre bucket.

Dans ce tutoriel nous utilisons le langage Python, mais d’autres langages sont supportés : Nodejs, Go, Java, .Net, Ruby, PHP. Si vous préférez expérimenter dans un de ces langages, c’est tout à fait possible, référez-vous pour celà à la documentation

Écriture de la fonction

Accédons au service des Cloud Functions au travers de la console GCP

✏ Cliquez sur le bouton CREATE FUNCTION au centre de l’écran

✏ Remplissez la première page du formulaire:

  • laissez l’environnement "1st gen"

  • donnez le nom suivant à la fonction: "<PREFIX>-load-naissances"

  • choisissez la région "europe-west1"

  • nous souhaitons que la fonction soit appelée à chaque fois qu’un fichier est déposé dans le bucket, donc choisissez le trigger "Cloud storage" avec le configuration suivante (pensez à sélectionner votre bucket)

cloudfunction create 1

✏ Modifiez les paramètres d’execution de la fonction:

Modifiez le compte de service d’execution afin d’utiliser le service account dédié à la fonction sa-workshop@{PROJECT}.iam.gserviceaccount.com

cloudfunction parameter

✏ Cliquez sur le bouton NEXT pour accéder à la suite de la définition de la fonction

✏ Choisissez le runtime Python 3.10 dans la liste déroulante (ou autre runtime si vous préférez utiliser un autre langage pour cet exercice)

cloudfunction create 2

Ici il s’agit de saisir le code source de la fonction.

Pensez à changer l’entry point de la fonction. L’entry point de la fonction est le nom de la fonction python qui sert de point d’entrée lors de l’appel de la cloud function.

Voyez le fichier requirements.txt sur la gauche qui vous permet de déclarer des librairies dont votre fonction a besoin

✏ Ajoutez la librairie cliente Python pour BigQuery afin de pouvoir facilement invoquer les services de BigQuery

google-cloud-bigquery==3.7.0

✏ Écrivez le code de la fonction de chargement

  • event["name"] contient le nom du fichier déposé

  • event["bucket"] contient le nom du bucket dans lequel le fichier a été déposé

  • load_table_from_uri est une fonction du client BigQuery qui permet de charger une table depuis un fichier stocké dans cloudstorage via son uri gs://…​

  • la tâche de chargement doit être bien configurée pour le fichier à charger : délimiteur, lignes d’entête, format, schéma

Vous pouvez suivre la documentation Google Cloud pour charger le contenu d’un fichier csv dans une table. Vous pouvez aussi utiliser et compléter le code Python suivant afin de réaliser l’exercice.

from google.cloud import bigquery

PROJECT = "workshop-data-platform"
PREFIX = "YOUR-PREFIX-HERE"
RAW_NAISSANCES_TABLE = f"{PROJECT}.{PREFIX}_ds_raw.tb_naissances"
LOCATION = "europe-west1"
FILE_NAISSANCES_PREFIX = "naissance"

def load_raw_file_incremental(event, context):
    """Triggered by a change to a Cloud Storage bucket.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    print(f"Processing file: {event['name']}.")
    filename = event["name"]
    # if payload file match pattern
        # Load file
    print(f"Unrecognized file {filename}, nothing to do")

def load_naissances(source: str):
    """Load naissances raw file from cloud storage."""
    bq_client = bigquery.Client() # instantiate a bigquery client

    # write here the load job
    # The csv schema is provided below for convenience
    # schema=[
    #     bigquery.SchemaField("sexe", "STRING"),
    #     bigquery.SchemaField("prenom", "STRING"),
    #     bigquery.SchemaField("annee_naissance", "STRING"),
    #     bigquery.SchemaField("code_departement", "STRING"),
    #     bigquery.SchemaField("nombre", "STRING")
    # ]
    load_job.result()  # Waits for the job to complete.
    print("Naissances loaded")
⚠️ SPOILER ALERT: Solution d’implémentation en Python
from google.cloud import bigquery

PROJECT = "workshop-data-platform"
PREFIX = "YOUR-PREFIX-HERE"
RAW_NAISSANCES_TABLE = f"{PROJECT}.{PREFIX}_ds_raw.tb_naissances"
LOCATION = "europe-west1"
FILE_NAISSANCES_PREFIX = "naissance"

def load_raw_file_incremental(event, context):
    """Triggered by a change to a Cloud Storage bucket.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    print(f"Processing file: {event['name']}.")
    filename = event["name"]
    if not filename.endswith(".csv"):
        print(f"Skip received file {filename} as it is not a CSV")
        return
    if filename.startswith(FILE_NAISSANCES_PREFIX):
        load_naissances(f"gs://{event['bucket']}/{event['name']}")
        return
    print(f"Unrecognized file {filename}, nothing to do")

def load_naissances(source: str):
    """Load naissances raw file from cloud storage."""
    bq_client = bigquery.Client()

    load_job = bq_client.load_table_from_uri(
        source_uris=source,
        destination=RAW_NAISSANCES_TABLE,
        location=LOCATION,
        job_config=bigquery.LoadJobConfig(
            schema=[
                bigquery.SchemaField("sexe", "STRING"),
                bigquery.SchemaField("prenom", "STRING"),
                bigquery.SchemaField("annee_naissance", "STRING"),
                bigquery.SchemaField("code_departement", "STRING"),
                bigquery.SchemaField("nombre", "STRING")
            ],
            source_format=bigquery.SourceFormat.CSV,
            field_delimiter=";",
            skip_leading_rows=1
        ),
    )
    load_job.result()  # Waits for the job to complete.
    print("Naissances loaded")
Vous pouvez aussi dupliquer la fonction load_naissances par un clic droit → copier la fonction.

✏ Cliquez sur DEPLOY

Le déploiement prend environ 5 minutes

Test de la fonction

✏ Supprimez la table tb_naissances préalablement créée dans votre dataset (Clic sur les 3 points à droite du nom de la table dans BigQuery puis Delete)

✏ Rechargez tous les fichiers naissances-part-[0-9]+.csv dans le bucket de dépôt en choisissant l’option Overwrite dans la boîte de dialogue

✏ Retournez dans BigQuery : la table tb_naissances a dû réapparaître dans votre dataset ds_raw. Vérifiez qu’elle contient bien 3,784,673 lignes

cloudfunction test

Si la table n’apparaît pas après avoir déposé le fichier dans le bucket, c’est sans doute qu’il y a eu un problème dans le traitement effectué par la fonction. Pour voir les logs de celle-ci, cliquez sur l’onglet LOGS après avoir sélectionné votre fonction. Attention, les logs peuvent mettre 2 à 3 minutes à apparaître après l’exécution.

Désormais, le chargement de nos données est effectué automatiquement dès qu’un fichier de naissances est déposé. Notre couche de chargement des données brutes est terminée, il nous faut maintenant nettoyer et mettre en forme ces données.