1. 程式人生 > >[bigdata-086] python3+neo4j 從mysql資料庫讀取記錄然後建立節點和關係寫入到neo4j

[bigdata-086] python3+neo4j 從mysql資料庫讀取記錄然後建立節點和關係寫入到neo4j

1. 測試
1.1web介面  http://tz211:7474/browser/
在這裡執行 MATCH (n) RETURN n,能看到節點和相互關係
一共是3個節點,6個相互關係


1.2 在211執行cypher-shell
./cypher-shell -u neo4j -p 123456


1.3 執行convert-test.py


上述1.1和1.2和1.3是一致的。


1.4 在web節點執行刪除所有節點命令
然後再找查詢節點,發現沒有節點了。符合預期。
在cypher-shell執行查詢結果,也沒有節點,符合預期。
在convert-test執行查詢結果,也沒有節點,符合預期。




1.5 在cypher建立節點
CREATE (:MP {num:"18717917632"});
在三處都能看到節點,正確。


MERGE (:MP {num:"18717917632"});
三處只有一個節點,正確。


MERGE (:MP {num:"18717917666"});
三處均增加了一個節點,正確。


再次執行
MERGE (:MP {num:"18717917666"});
三處沒有變化,正確。


2. 程式碼測試
2.1 參考文件
《The Neo4j Developer Manual v3.2》python版


2.2 session
session是一個容器,它儲存一系列的transaction事務。session從一個池裡獲取到連線。session不是執行緒安全的。
要把session放在一個context block,那麼,當這個block執行完畢,session就可以正確地關閉。程式碼形如:
-----------------------------
def add_person(self, name):
    session = self._driver.session()
    session.run("CREATE (a:Person {name: $name})", {"name": name})
-----------------------------
session在add_person這個context裡,如果add_persion執行結束,session就會正確關閉。


2.3 transaction事務
事務是一個原子單元,它包括一個或者多個cypher語句。每個事務都必然在一個session裡執行。
執行一個cypher語句,需要兩個資訊:cyphter語句模板; cyphper語句引數。也可以執行無引數cypher,但有引數可以更好的調優。
有三種事務:
A. 自動提交事務  auto-commit transactions
B. 事務函式 Transaction functions
C. 顯式事務 Explicit transactions 


注意:只有B在事務出錯的時候可以自動重新執行。


2.3.1 auto-commit transactions
這種事務,是簡單的,形式有限,這種事務只有一條語句,事務失敗後不能自動重放,不能放入causal chain執行。
這種事務,在session.run函式執行,形如:
--------------------------
def add_person(self, name):
    session = self._driver.session() 
    session.run( "CREATE (a:Person {name: $name})", {"name": name} )
--------------------------
這種事務會立刻傳送到資料庫進行處理。
不要在生產環境使用這種事務。如果擔心效能和穩定性,也不要使用這種事務。


2.3.2 事務函式 Transaction functions
這是推薦的事務型別。優先使用這種事務。舉例如下:
--------------------------
def add_person(self, name):
    with self._driver.session() as session:
        session.write_transaction(lambda tx: self.create_person_node(tx, name))


def create_person_node(self, tx, name):
    tx.run("CREATE (a:Person {name: $name})", {"name": name})
    return 1
--------------------------




2.3.3 顯式事務
這類事務,是2.3.2的詳細版本,也就是說,顯式生命BEGIN、COMMIT、ROOBACK操作。
這塊需要檢查python的原始碼,具體細節在官方文件裡沒有。


3. 一個完整的將資料以transaction functions的方式寫入到neo4j的demo
從mysql資料庫取記錄,然後建立節點,然後建立關係。
---------------------------
#!/usr/bin/env python3
#!-*- coding:utf-8 -*-


import pymysql
from neo4j.v1 import GraphDatabase


uri = "bolt://10.xx.xx.xx:7687"
driver = GraphDatabase.driver(uri, auth=("neo4j", "888888"))


print("先刪除所有節點和關係")
with driver.session() as session:
    session.write_transaction(lambda tx: tx.run("MATCH (n) DETACH DELETE n"))


print("檢查是否為空")
with driver.session() as session:
    session.write_transaction(lambda tx: tx.run('MATCH (n) RETURN n'))


conn = pymysql.connect(host='xxxxdb.mysql.rds.aliyuncs.com',
                       port=3306, user='userxxx', passwd='pswdxxxx',
                       db='ust', charset='utf8')
c_t = conn.cursor()
sql = 'select query_idcard, query_mobile, query_name, query_bankcard ' \
      'from id34_record ' \
      'where result_code=\'00\' or result_code=\'000\';'
c_t.execute(sql)
r_t = c_t.fetchall()
nnn = 0
for i in r_t:
    # print(i)
    if i[0] == None or i[1] == None or i[2] == None or i[3] == None:
        #暫時不考慮三要素
        continue


    if (nnn % 100 == 0):
        print('-'*10)
        print(nnn)
    # print(nnn)
    nnn += 1


    id = i[0]
    mp = i[1]
    name = i[2] #暫時不考慮姓名
    bankcard = i[3]


    #不重複地建立節點
    node_type = 'MP'
    with driver.session() as session:
        session.write_transaction(
            lambda tx: tx.run("MERGE (:MP {val:$val})", {'val':mp}))
    node_type = 'ID'
    with driver.session() as session:
        session.write_transaction(
            lambda tx: tx.run("MERGE (:ID {val:$val})", {'val':id}))


    node_type = 'BANKCARD'
    with driver.session() as session:
        session.write_transaction(
            lambda tx: tx.run("MERGE (:BANKCARD {val:$val})", {'val':bankcard}))


    #建立關係
    cmd = "MATCH (n:MP) WHERE n.val=\"%s\" \n" \
          "MATCH (m:ID) WHERE m.val=\"%s\" \n" \
          "MATCH (p:BANKCARD) WHERE p.val=\"%s\" \n" \
          "MERGE (n)-[:BIND]->(m) \n" \
          "MERGE (m)-[:BIND]->(n) \n" \
          "MERGE (n)-[:BIND]->(p) \n" \
          "MERGE (p)-[:BIND]->(n) \n" \
          "MERGE (m)-[:BIND]->(p) \n" \
          "MERGE (p)-[:BIND]->(m) \n" \
          %(mp, id, bankcard)
    with driver.session() as session:
        session.write_transaction(
            lambda tx: tx.run(cmd))

---------------------------

4. 建立索引

CREATE INDEX ON :MP(val)
CREATE INDEX ON :ID(val)
CREATE INDEX ON :BANKCARD(val)