We then validate the input and extract some fields as a dictionary. We use this task instance to get the information that our task extracting_user has pushed to Xcoms, using xcom_pull call. So this function receives a Task Instance (referred as ti). loads (users_txt ) if not len (users ) or 'results' not in users : raise ValueError ( "User is empty" ) user = users user_map = processed_user = json_normalize (user_map ) Variable. We will do this using a Python function and hence the new operator that we are going to use is the PythonOperator.ĭef _processing_user (ti ) : users_txt = ti. Once the data is fetched, We are going to create a processor, which will process the data that was pushed into Xcoms by the previous task. Hence we will use another task to retrieve the value and process it. The SimpleHttpOperator stores the response inside these Xcoms. It basically stores key-value pairs of the information you want to store that can be accessed by other tasks in a DAG. Xcoms is a way you can share data between your tasks. For this let's look into another concept called Xcoms Xcoms Now you must be wondering what happens to the response. ) extracting_user = SimpleHttpOperator ( task_id = 'extracting_user', http_conn_id = 'user_api', endpoint = 'api/', method = 'GET' )Īs you can see, we have the task_idto identify this task, the http_conn_id we spoke about before, the endpoint, and the method you want to execute. We want to call the URL, which will return a JSON response of some random user information. Here we would provide the URL we want to trigger and set the connection id to user_api. In our case, we would be using the HTTP connection option. Here you can provide various connections like AWS connections, ElasticSearch connections, Kubernetes cluster connections, etc. One such type is configuring “Connections”. Now the third parameter i.e http_conn_id will require something to be explained.Īirflow provides a mechanism to create and store some configurations that you can use across workflows. The task id identifies the task in the DAG, and the endpoint identifies the API to fetch. To create the HttpSensor operator, we provide it with a task id, HTTP connection id, and endpoint. with DAG ( 'user_content_processing', schedule_interval =, default_args =default_args ) as dag : is_api_available = HttpSensor ( task_id = 'is_api_available', http_conn_id = 'user_api', endpoint = 'api/' )
0 Comments
Leave a Reply. |