Handlers

Http

You can have a http endpoint that can be triggered by calling the cloudfunction directly. This is the simpliest endpoint and will only deploy a cloudfunction. The function takes in a flask request object

from goblet import Goblet

app = Goblet(function_name='simple_http')

@app.http()
def http_entrypoint(request):
    return request.json

You can have multiple http endpoints that get triggered based on the request headers. Note if multiple header filters match, only the first match will be triggered.

The following endpoint will be triggered on any request that has a “X-Github-Event” header

@app.http(headers={"X-Github-Event"})
def main(request):
    return jsonify(request.json)

The following endpoints will be triggered if the request contains a “X-Github-Event” header that matches the corresponding value

@app.http(headers={"X-Github-Event": "issue"})
def main(request):
    return jsonify("triggered on issue")

@app.http(headers={"X-Github-Event": "pr"})
    def main(request):
        return jsonify("triggered on pr")

Routes

The Goblet.route() method is used to construct which routes you want to create for your API. Behind the scenese goblet will configure an Api Config and Api Gateway on gcp.

The concept of routes are the same mechanism used by Flask. You decorate a function with @app.route(...), and whenever a user requests that URL from the api gateway url, the function you’ve decorated is called. For example, suppose you deployed this app:

from goblet import Goblet

app = Goblet(function_name='helloworld')

@app.route('/')
def index():
    return {'view': 'index'}

@app.route('/a')
def a():
    return {'view': 'a'}

@app.route('/b')
def b():
    return {'view': 'b'}

If you go to https://endpoint/, the index() function would be called. If you went to https://endpoint/a and https://endpoint/b, then the a() and b() function would be called, respectively.

You can also create a route that captures part of the URL. This captured value will then be passed in as arguments to your view function:

@app.route('/users/{name}')
def users(name):
    return {'name': name}

If you then go to https://endpoint/users/james, then the view function will be called as: users(‘james’). The parameters are passed as keyword parameters based on the name as they appear in the URL. The argument names for the view function must match the name of the captured argument:

@app.route('/a/{first}/b/{second}')
def users(first, second):
    return {'first': first, 'second': second}

By default there is a timeout on api gateway of 15 seconds. This can be overriden by setting “api_gateway”: {“deadline”: 45} in config.json. You can configure a per route deadline by passing in the deadline parameter in your route.

@app.route('/deadline', deadline=10)
def deadline():
    return 'custom_deadline'

By default routes are deployed to an api gateway. For cloudrun you have the option to use route_type=cloudrun to simply use the cloudrun instance itself. The routes work the same as with an apigateway, but you would access the api via the cloudrun url instead of the api gateway url.

app = Goblet(function_name="cloudrun-routing", routes_type="cloudrun", backend="cloudrun")

Scheduled Jobs

To deploy scheduled jobs using a cron schedule use the @app.schedule(…) decorator. The cron schedule follows the unix-cron format. More information on the cron format can be found in the gcp docs. Make sure Cloud Scheduler is enabled in your account if you want to deploy scheduled jobs.

Example usage:

@app.schedule('5 * * * *')
def scheduled_job():
    return app.jsonify("success")

You can pass in additional fields to your schedule to add custom headers, body, and method using the types defines for job.

@app.schedule('5 * * * *', headers={"x-cron": "5 * * * *"}, body="a base64-encoded string")
@app.schedule('6 * * * *', headers={"x-cron": "6 * * * *"}, body="another base64-encoded string")
@app.schedule('10 * * * *', httpMethod="POST")
def scheduled_job():
    app.current_request.body
    app.current_request.headers
    return app.jsonify("success")

Note that several of customizable fields require specific formats which include body which is a base64 encoded string. In order to use a json field for the body you would need to use the following code

base64.b64encode(json.dumps({"key":"value"}).encode('utf-8')).decode('ascii')

and then in your function you would decode the body using

json.loads(base64.b64decode(raw_payload).decode('utf-8'))

Another unique field is attemptDeadline which requires a duration format such as 3.5s

To test your scheduled jobs locally you will need to pass a X-Goblet-Type header with the value schedule and a X-Goblet-Name header with the name of your scheduled function.

For example:

"X-Goblet-Type": "schedule",
"X-Goblet-Name": FUNCTION_NAME

PubSub

You can trigger endpoints from pubsub using the @app.pubsub_subscription(...) decorator. All that is required is the topic name. You can optionally provide an attribute dictionary which will only trigger the function if the pubsub message attributes matches those defined in the decorator. If using cloudrun backend or use_subscription=true the attributes will be created as a filter on the subscription itself. You can also pass in a custom filter as well. Note that filters are not able to be modified once they are applied to a subscription. In order to use DLQ you can pass in dlq=True to the decorator which will create a subscription with a dead letter topic. You can customize the dead letter topic configuration and pull subscription to that dlq by passing in dlq_topic_config={“name”: “custom-dlq-name”, “dlq_topic_config”: {“name”: “custom-pull-subscription-name”, “config”: {}}} to the decorator.

In addition to filters you can also add configuration values that will be passed directly to the subscription. By setting config={“enableExactlyOnceDelivery”: True} you can enable exactly delivery to ensure messages are not redelivered once acknowledged. For additional information on configuration values available see PubSub Subscription Fields

Example usage:

# pubsub topic
@app.pubsub_subscription('test')
def topic(data):
    app.log.info(data)
    return

# pubsub topic with matching message attributes
@app.pubsub_subscription('test', attributes={'key': 'value'})
def home2(data):
    app.log.info(data)
    return

# pubsub topic in a different project
@app.pubsub_subscription('test', project="CROSS_PROJECT")
def cross_project(data):
    return

# create a pubsub subscription instead of pubsub triggered function
@app.pubsub_subscription('test', use_subscription=True)
def pubsub_subscription_use_subscription(data):
    return

# create a pubsub subscription instead of pubsub triggered function and add filter
@app.pubsub_subscription('test', use_subscription=True, filter='attributes.name = "com" AND -attributes:"iana.org/language_tag"')
def pubsub_subscription_filter(data):
    return

# switching the pubsub topic to a different project requires force_update, since it requires the subscription to be recreated
@app.pubsub_subscription('test', project="NEW_CROSS_PROJECT", force_update=True)
def cross_project(data):
    return

To test a pubsub topic locally you will need to include the subscription in the payload as well as a base64 encoded string for the body.

{
    "subscription": "TOPIC_NAME",
    "body": base64.b64encode(json.dumps({"key":"value"}).encode())
}

Storage

You can trigger functions from storage events using the @app.storage(BUCKET, EVENT) decorator. It is required to pass in the bucket name and the event_type. The following events are supported by GCP

  • finalize

  • delete

  • archive

  • metadataUpdate

Example usage:

@app.storage('BUCKET_NAME', 'finalize')
def storage(event):
    app.log.info(event)

To trigger a function on multiple events or multiple buckets you can specify multiple decorators.

@app.storage('BUCKET_NAME', 'archive')
@app.storage('BUCKET_NAME', 'delete')
@app.storage('BUCKET_NAME2', 'finalize')
def storage(event):
    app.log.info(event)

Eventarc

You can trigger functions from evenarc events using the @app.eventarc(topic=None, event_filters=[]) decorator. Specifying a topic will create a trigger on a custom pubsub topic. For all other events, specify the event attribute and event value in the event_filters list. See Creating Triggers for more information on possible values.

Example usage:

# Example eventarc pubsub topic
@app.eventarc(topic="test")
def pubsub(data):
    app.log.info("pubsub")
    return


# Example eventarc direct event
@app.eventarc(
    event_filters=[
        {"attribute": "type", "value": "google.cloud.storage.object.v1.finalized"},
        {"attribute": "bucket", "value": "BUCKET"},
    ],
    region="us-east1",
)
def bucket(data):
    app.log.info("bucket_post")
    return


# Example eventarc audit log
@app.eventarc(
    event_filters=[
        {"attribute": "type", "value": "google.cloud.audit.log.v1.written"},
        {"attribute": "methodName", "value": "storage.objects.get"},
        {"attribute": "serviceName", "value": "storage.googleapis.com"},
    ],
    region="us-central1",
)
def bucket_get(data):
    app.log.info("bucket_get")
    return

To test an eventarc event locally you will need to add Ce-Type and Ce-Source headers

curl -H Ce-Type:google.cloud.pubsub.topic.v1.messagePublished -H Ce-Sourc://pubsub.googleapis.com/projects/goblet/topics/test localhost:8080

Jobs

You can create and trigger cloudrun jobs using the @app.job(…) decorator. If you would like to trigger multiple tasks in one job execution you can specify multiple decorators with a different task_id. Any custom job configurations such as a schedule should be added to the task_id=0. Jobs can be further configured by setting various configs in config.json.

job_spec can be found at Cloudrun Jobs Spec

job_container can be found at Cloudrun Jobs Container

You can schedule executions by passing in a cron schedule to the first task. Each job task function takes in the task id.

To test a job locally you can run goblet job run APP_NAME-JOB_NAME TASK_ID

Example usage:

@app.job("job1", schedule="* * * * *")
def job1_task1(id):
    app.log.info(f"job...{id}")
    return "200"

@app.job("job1", task_id=1)
def job1_task2(id):
    app.log.info(f"different task for job...{id}")
    return "200"

@app.job("job2")
def job2(id):
    app.log.info(f"another job...{id}")
    return "200"

See the example config.json

BigQuery Remote Functions

To deploy BigQuery remote functions use @app.bqremotefunction(...) decorator. BigQuery remote functions documentation can be found here.

Example usage:

@app.bqremotefunction(dataset_id=...)
def my_remote_function(x: str, y: str) -> str:
    return f"input parameters are {x} and {y}"

Allowed data type can be found data types for remote functions. The routine name on BigQuery will be <goblet_function_name>_<remotefunction_name>.

As an example:

from goblet import Goblet, goblet_entrypoint

app = Goblet(function_name="my_functions")
goblet_entrypoint(app)

@app.bqremotefunction(dataset_id="my_dataset_id")
def my_routine(x: str, y: str) -> str:
    return f"Name: {x} LastName: {y}"

Creates a routine named my_functions_my_routine with two strings as input parameters with names x and y and return a formatted string. The dataset id will be “my_dataset_id”.

The dataset_id reference a dataset in BigQuery already created in the same location and project in GCP.

To call a routine from BigQuery just use

select PROJECT.my_dataset_id.my_functions_my_routine(name, last_name) from my_dataset_id.table

This will apply my_functions_my_routine to every tuple present in table my_dataset_id.table passing fields name and last_name from the table as inputs. Data type inputs for the routine used in BigQuery query must match with data types used in the python function definition. In the example above select will return as many single-tuple value as tuples exist in the table.

Another example:

from goblet import Goblet, goblet_entrypoint

app = Goblet(function_name="math_example")
goblet_entrypoint(app)
@app.bqremotefunction(dataset_id="blogs")
def multiply(x: int, y: int, z: int) -> int:
    w = x * y * z
    return w

Should be called this way:

select PROJECT.my_dataset_id.math_example_multiply(x,y,z) from my_dataset_id.table

And will return an integer resulting from the multiplication from the three fields x,y,z in table my_dataset_id.table for every tuple in the table.

When deploying a BigQuery remote function, Goblet creates the resources in GCP: a BigQuery connection, a BigQuery routine and a cloudfunction or cloudrun (depending on the parameter backend used in Goblet instantiation).

To test an bqremotefunction locally you will need to add a userDefinedContext field to the body with a X-Goblet-Name field with the format of APP_NAME _ FUNCTION_NAME. You pass in the arguments to you function in a list in the calls field.

{
    "userDefinedContext": {
        "X-Goblet-Name": "bqremotefunction_test_function_test"
    },
    "calls": [[2, 2], [3, 3]],
}

CloudTask Target

You can handle http requests from a CloudTask by using the @app.cloudtasktarget(name="target") decorator. For Goblet to route a request to a function decorated with cloudtasktarget(name="target"), the request must include the following headers:

"User-Agent": "Google-Cloud-Tasks",
"X-Goblet-CloudTask-Target": "target"

Note

  • Goblet uses the User-Agent headers to route the request to the CloudTaskTarget instance. The CloudTask Queue adds this headers.

  • The X-Goblet-CloudTask-Target routes the request to the expected function.

The next example shows the function that would serve a request with the headers shown above.

from goblet import Goblet, goblet_entrypoint

app = Goblet(function_name="cloudtask_example")
goblet_entrypoint(app)

@app.cloudtasktarget(name="target")
def my_target_handler(request):
    ''' handle request '''
    return

Another example using app.cloudtaskqueue to queue and handle tasks in the same instance.

from goblet import Goblet, goblet_entrypoint

app = Goblet(function_name="cloudtask_example")
goblet_entrypoint(app)

client = app.cloudtaskqueue("queue")
@app.cloudtasktarget(name="target")
def my_target_handler(request):
    ''' handle request '''
    return {}


@app.route("/enqueue", methods=["GET"])
def enqueue():
    payload = {"message": {"title": "enqueue"}}
    client.enqueue(target="target", payload=payload)
    return {}

Uptime Check

You can create uptime checks for backends that are public using the @app.uptime decorator. You can customize the check using all available fields found in the uptime documentation

from goblet import Goblet, goblet_entrypoint

app = Goblet(function_name="uptime_example")
goblet_entrypoint(app)

@app.uptime(name="target")
def example_uptime():
    return "success"