Uçtan Uca Apache Nifi Uygulaması

Burak Uğur
6 min readOct 8, 2023

--

Apache NiFi’nin güçlü veri akışı yönetimi yetenekleri sayesinde, örnek bir uygulama geliştirdim ve bu makalede sizlere bu etkileyici platformun nasıl kullanıldığını göstereceğim. Nifi hakkında daha detaylı bilgi almak isterseniz Apache Nifi Nedir? adlı makalemi okuyabilirsiniz.

Uygulama Mimarisi

Örnek uygulamada, sahte ve rastgele veri üreten REST endpoint’e Nifi ortamından istek atarak gelen verileri veri tabanına aktaracağız.

Kullanacağımız endpoint şu olacak;

https://random-data-api.com/api/v2/users

A) Kurulumlar

Tüm araçları docker üzerinden ayağa kaldırıyoruz.

1.Apache Nifi

docker run --name mynifi --network host -i -v ~/drivers/shared-directory:/opt/nifi/nifi-current/ls-target apache/nifi

Nifi üzerinden veritabanlarına erişim için driver dosyalarına ihtiyaç vardır. Bu sebeple volume vermeniz gerekir.
~/drivers/shared-directory:/opt/nifi/nifi-current/ls-target

2. PostgreSQL

docker run --name docker_postgres -e POSTGRES_PASSWORD=123456 -d -p 5432:5432 -v $HOME/docker/volumes/postgres:/var/lib/postgresql/data postgres

Table Oluşturma

CREATE TABLE IF NOT EXISTS public.users
(
id integer NOT NULL,
uid text COLLATE pg_catalog."default" NOT NULL,
password text COLLATE pg_catalog."default" NOT NULL,
first_name text COLLATE pg_catalog."default" NOT NULL,
last_name text COLLATE pg_catalog."default" NOT NULL,
username text COLLATE pg_catalog."default" NOT NULL,
email text COLLATE pg_catalog."default" NOT NULL,
avatar text COLLATE pg_catalog."default" NOT NULL,
gender text COLLATE pg_catalog."default" NOT NULL,
phone_number text COLLATE pg_catalog."default" NOT NULL,
social_insurance_number integer NOT NULL,
date_of_birth date NOT NULL,
employment text COLLATE pg_catalog."default",
)

CREATE TABLE IF NOT EXISTS public.subscriptions
(
plan text COLLATE pg_catalog."default" NOT NULL,
status text COLLATE pg_catalog."default" NOT NULL,
payment_method text COLLATE pg_catalog."default" NOT NULL,
term text COLLATE pg_catalog."default" NOT NULL,
)

CREATE TABLE IF NOT EXISTS public.creditcard
(
cc_number text COLLATE pg_catalog."default"
)

CREATE TABLE IF NOT EXISTS public.adress
(
city text COLLATE pg_catalog."default",
street_name text COLLATE pg_catalog."default",
street_address text COLLATE pg_catalog."default",
zip_code text COLLATE pg_catalog."default",
state text COLLATE pg_catalog."default",
country text COLLATE pg_catalog."default",
coordinates text COLLATE pg_catalog."default"
)

3.PostgreSQL Driver

Bu driver ile apache nifi postgresql ile haberleşebilecektir.

mkdir jar
cd jar
curl -O -L "https://jdbc.postgresql.org/download/postgresql-42.6.0.jar"

4. Superset

git clone https://github.com/apache/superset.git
cd superset
docker-compose -f docker-compose-non-dev.yml pull
docker-compose -f docker-compose-non-dev.yml up

Veri görselleştirme aracı olan Superset hakkında daha fazla bilgi almak ve kurulum yardım almak isterseniz aşağıdaki adrese başvurun.

B) Ortamlara Erişim

Nifi ortamına erişebilmek için öncelikle user ve password bilgilerine erişmek gereklidir. Bu bilgiler nifi ayağa kalkarken logların arasında bulunur.

Generated Username [USERNAME]
Generated Password [PASSWORD]

Daha sonra bu erişim bilgileri ile arayüze erişim sağlanır.

Ana Sayfa

Artık sol üstte bulunan processorleri sürükle bırak yaparak dataflow oluşturulabilir.

C) Dataflow Oluşturma

İlk olarak endpointe istek atacağız, daha sonra cevap olarak gelen veriyi uygun ETL sürecine sokarak veritabanına kaydedeceğiz.

A. Endpointe İstek Atmak

Endpoint’de istek atıp cevap döndürmek için InvokeHTTP processor’ünü kullanacağız.

InvokeHTTP Processor Config

Bu processor HTTP URL parametresine yazılan adrese HTTP Method’da seçili olan istek türünden istek atar. Örnek Json:

{
"id":1575,
"uid":"f4889144-613e-4e0b-9d79-ea3fa0d293b4",
"password":"PlmGU8pgE3",
"first_name":"Jay",
"last_name":"O'Kon",
"username":"jay.o'kon",
"email":"jay.o'kon@email.com",
"avatar":"https://robohash.org/suscipitutadipisci.png?size=300x300\u0026set=set1",
"gender":"Genderfluid",
"phone_number":"+370 (130) 836-6108 x9251",
"social_insurance_number":"122106982",
"date_of_birth":"1961-05-04",
"employment":{
"title":"International Hospitality Architect",
"key_skill":"Leadership"
},
"address":{
"city":"East Vina",
"street_name":"Davis Mall",
"street_address":"9214 Stacie Square",
"zip_code":"41334",
"state":"Pennsylvania",
"country":"United States",
"coordinates":{
"lat":-29.476027673621594,
"lng":21.52274723252242
}
},
"credit_card":{
"cc_number":"5216-5092-0649-6639"
},
"subscription":{
"plan":"Business",
"status":"Blocked",
"payment_method":"Cash",
"term":"Annual"
}
}

B. Json Parse İşlemleri

Bu aşamada ilk olarak gelen veriyi incelememiz gerekiyor. Gelen veriyi parse edebilmek için SplitRecord Processorünü kullanacağız. InvokeHTTP ile bu nodu birbirine bağlamanız gerekmetedir.

Split record, Record Reader & Record Writer parametrelerine sahiptir. Bunlar gelen verinin türüne göre değiştirilmelidir.

Desteklenen Veri Tipleri

SplitRecord işlemi sonucunda, key:value şeklinde gelen veri Json yapısına dönüştü.

Gelen veriyi incelediğimizde credit_card,subscription,adresss olarak childlara bölündüğünü görüyoruz. Veri tabanında buna benzer şekilde kaydedeceğiz.

Şimdi ise SplitJson ve EvulateJsonPath processorlerini kullanarak gelen veriyi childlarına böleceğiz.

SplitJson: Veri içerisinden child olanları ayırmak için kullanıyoruz.

JsonPath Expression: Veriyi nasıl pars edeceğimizi belirler.

  • “$” işareti gelen veriyi temsil eder.
  • “..” ise JSON belgesi içindeki tüm derinliklerde arama yapmayı ifade eder.

EvulateJsonPath: Childlar dışında kalan verileri almak için kullanıyoruz.

JsonPath Expression parametresine aşağıdaki değerleri giriyoruz.

$.[0][‘id’, ‘uid’, ‘password’, ‘first_name’, ‘last_name’, ‘username’, ‘email’, ‘avatar’, ‘gender’, ‘phone_number’, ‘social_insurance_number’, ‘date_of_birth’, ‘employment’]

Tüm childlara bölüm işlemi tamamlandıktan sonra dataflow şuna benzeyecek.

Child Parse İşlemi Sonucu Dataflow
User Data
Credit Card & Subscription
  • Parse işlemi sonucunda bazı verilerin Json tipinde olmadığını fark ettik. Bu sebeple tekrar splitrecord kullanarak json objesine çeviriyoruz.

C. Veritabanı İşlemleri

Hazırlanan Json tipindeki veri, PutDatabaseRecord processor’ü kullanılarak veri tabanına yazılır.

Record Reader: Gelen verinin hangi türde olduğunu belirler

Statement Type: Veritabanına hangi CRUD işleminin yapılacağını belirler.

Table Name: Verilerin hangi tabloya yazılacağını belirler.

Database Connection Pooling Service: veritabanı erişim bilgilerini içerir.

Controller Servisler

PostgreSQL bağlantı bilgileri için DBCPConnectionPool aktif edilmelidir.

Database Connection URL: jdbc:postgresql://{HOST_IP}:{HOST_PORT}/{DATABASE_NAME}

Database Driver Class Name: org.postgresql.Driver

Database Driver Location(s): /opt/nifi/nifi-current/ls-target

Her şey tamamlandı, artık dataflowu çalıştırabiliriz.

--

--